From d7beb5edf8162e57c382bbed652cec499fee8c6c Mon Sep 17 00:00:00 2001 From: qydysky Date: Sat, 31 May 2025 01:17:21 +0800 Subject: [PATCH] 1 (#61) * 1 * 1 * 1 --- get/Get.go | 111 +++++++++++++++++++++++++++------------- reqf/Reqf.go | 102 +++++++++++++++++++++---------------- reqf/Reqf_test.go | 126 +++++++++++++++++++++++++++++----------------- web/Web_test.go | 115 +++++++++++++++++++++++++++++------------- 4 files changed, 298 insertions(+), 156 deletions(-) diff --git a/get/Get.go b/get/Get.go index dd7b4f9..f543a01 100644 --- a/get/Get.go +++ b/get/Get.go @@ -1,85 +1,128 @@ package part import ( - "strings" "errors" "net/http" + "strings" + reqf "github.com/qydysky/part/reqf" ) type get struct { body []byte - + Response *http.Response - RS []string - Err error + RS []string + Err error } -func Get(r reqf.Rval) (o *get){ +func Get(r reqf.Rval) (o *get) { o = new(get) - if r.Url == "" {o.Err = errors.New("url == nil");return} + if r.Url == "" { + o.Err = errors.New("url == nil") + return + } R := reqf.New() o.Err = R.Reqf(r) - (*o).body = R.Respon - (*o).Response = R.Response - + R.Respon(func(rRespon []byte) error { + (*o).body = rRespon + return nil + }) + R.Response(func(r *http.Response) error { + (*o).Response = r + return nil + }) + return } -func (i *get) S(stratS,endS string, startI,lenI int) (o *get) { +func (i *get) S(stratS, endS string, startI, lenI int) (o *get) { o = i var tmp string - tmp,o.Err = SS(string(o.body), stratS, endS, startI, lenI) - if o.Err != nil {return} + tmp, o.Err = SS(string(o.body), stratS, endS, startI, lenI) + if o.Err != nil { + return + } o.RS = []string{tmp} return } -func (i *get) S2(stratS,endS string) (o *get) { +func (i *get) S2(stratS, endS string) (o *get) { o = i - o.RS,o.Err = SS2(string(o.body), stratS, endS) - if o.Err != nil {return} + o.RS, o.Err = SS2(string(o.body), stratS, endS) + if o.Err != nil { + return + } return } -func SS2(source,stratS,endS string) (return_val []string,last_err error) { - if source == `` {last_err = errors.New("ss2:no source");return} - if stratS == `` {last_err = errors.New("ss2:no stratS");return} +func SS2(source, stratS, endS string) (return_val []string, last_err error) { + if source == `` { + last_err = errors.New("ss2:no source") + return + } + if stratS == `` { + last_err = errors.New("ss2:no stratS") + return + } - return_val = strings.Split(source,stratS)[1:] - if len(return_val) == 0 {last_err = errors.New("ss2:no found");return} - if endS == `` {return} - for k,v := range return_val { - first_index := strings.Index(v,endS) - if first_index == -1 {continue} + return_val = strings.Split(source, stratS)[1:] + if len(return_val) == 0 { + last_err = errors.New("ss2:no found") + return + } + if endS == `` { + return + } + for k, v := range return_val { + first_index := strings.Index(v, endS) + if first_index == -1 { + continue + } return_val[k] = string([]rune(v)[:first_index]) } return } -func SS(source,stratS,endS string, startI,lenI int) (string,error) { - if stratS == "" && startI == 0 {return "", errors.New("no symbol to start")} - if endS == "" && lenI == 0 {return "", errors.New("no symbol to stop")} +func SS(source, stratS, endS string, startI, lenI int) (string, error) { + if stratS == "" && startI == 0 { + return "", errors.New("no symbol to start") + } + if endS == "" && lenI == 0 { + return "", errors.New("no symbol to stop") + } - var ts,te int + var ts, te int if stratS != "" { - if tmp := strings.Index(source, stratS);tmp != -1{ts = tmp + len(stratS)} + if tmp := strings.Index(source, stratS); tmp != -1 { + ts = tmp + len(stratS) + } } else if startI != 0 { - if startI < len(source){ts = startI} + if startI < len(source) { + ts = startI + } } - if ts == 0 {return "", errors.New("no start symbol "+ stratS +" in " + source)} + if ts == 0 { + return "", errors.New("no start symbol " + stratS + " in " + source) + } if endS != "" { - if tmp := strings.Index(source[ts:], endS);tmp != -1{te = ts + tmp} + if tmp := strings.Index(source[ts:], endS); tmp != -1 { + te = ts + tmp + } } else if lenI != 0 { - if startI + lenI < len(source){te = startI + lenI} + if startI+lenI < len(source) { + te = startI + lenI + } } - if te == 0 {return "", errors.New("no stop symbol "+ endS +" in " + source)} + if te == 0 { + return "", errors.New("no stop symbol " + endS + " in " + source) + } return string(source[ts:te]), nil } diff --git a/reqf/Reqf.go b/reqf/Reqf.go index 22cdcee..ec1bb47 100644 --- a/reqf/Reqf.go +++ b/reqf/Reqf.go @@ -49,10 +49,12 @@ type Rval struct { CopyResponseTimeout int JustResponseCode bool NoResponse bool - // 当Async为true时,Respon、Response必须在Wait()之后读取,否则有DATA RACE可能 - Async bool - Cookies []*http.Cookie - Ctx context.Context + Async bool + Cookies []*http.Cookie + Ctx context.Context + + // 预分配响应长度,若合理设置,将可以节约内存 + ResponsePreCap int SaveToPath string // 为避免write阻塞导致panic,请使用此项目io包中的NewPipe(),或在ctx done时,自行关闭pipe writer reader @@ -82,14 +84,9 @@ var ( ) type Req struct { - // 当Async为true时,必须在Wait()之后读取,否则有DATA RACE可能 - Respon []byte - // 当Async为true时,必须在Wait()之后读取,否则有DATA RACE可能 - Response *http.Response - UsedTime time.Duration - - state atomic.Int32 - + UsedTime time.Duration + response *http.Response + state atomic.Int32 client *http.Client reqProxy string responFile *os.File @@ -99,7 +96,6 @@ type Req struct { rwTO *time.Timer err error callTree string - copyResBuf []byte l sync.RWMutex } @@ -126,11 +122,47 @@ func (t *Req) Reqf(val Rval) error { return nil } +func (t *Req) ResStatusCode() (code int) { + t.l.RLock() + defer t.l.RUnlock() + return t.response.StatusCode +} + +func (t *Req) ResHeader() http.Header { + t.l.RLock() + defer t.l.RUnlock() + return t.response.Header.Clone() +} + +func (t *Req) Response(f func(r *http.Response) error) error { + t.l.RLock() + defer t.l.RUnlock() + + return f(t.response) +} + +func (t *Req) Respon(f func(b []byte) error) error { + t.l.RLock() + defer t.l.RUnlock() + + return f(t.responBuf.Bytes()) +} + +func (t *Req) ResponUnmarshal(f func(b []byte, v any) error, v any) error { + t.l.RLock() + defer t.l.RUnlock() + + return f(t.responBuf.Bytes(), v) +} + func (t *Req) reqfM(ctx context.Context, ctxf1 context.CancelCauseFunc, val Rval) { - beginTime := time.Now() + defer func() { + t.UsedTime = time.Since(time.Now()) + t.l.Unlock() + }() for i := 0; i <= val.Retry; i++ { - t.err = t.prepareRes(&val) + t.err = t.prepareRes() if t.err != nil { break } @@ -144,10 +176,8 @@ func (t *Req) reqfM(ctx context.Context, ctxf1 context.CancelCauseFunc, val Rval } ctxf1(nil) - t.updateUseDur(beginTime) t.clean(&val) t.state.Store(free) - t.l.Unlock() } func (t *Req) reqf(ctx context.Context, val Rval) (err error) { @@ -190,12 +220,13 @@ func (t *Req) reqf(ctx context.Context, val Rval) (err error) { if e != nil { return pe.Join(ErrClientDo.New(), e) } + defer resp.Body.Close() if v, ok := val.Header["Connection"]; ok && strings.ToLower(v) != "keep-alive" { defer t.client.CloseIdleConnections() } - t.Response = resp + t.response = resp if val.JustResponseCode { return @@ -267,12 +298,7 @@ func (t *Req) reqf(ctx context.Context, val Rval) (err error) { t.rwTO.Stop() } - if t.responBuf != nil { - t.Respon = t.responBuf.Bytes() - } - - resReadCloser.Close() - + t.response.Body = io.NopCloser(t.responBuf) return } @@ -293,19 +319,9 @@ func (t *Req) IsLive() bool { return t.state.Load() == running } -func (t *Req) prepareRes(val *Rval) (e error) { - if !val.NoResponse { - if t.responBuf == nil { - t.responBuf = new(bytes.Buffer) - t.Respon = t.responBuf.Bytes() - } else { - t.responBuf.Reset() - } - } else { - t.Respon = []byte{} - t.responBuf = nil - } - t.Response = nil +func (t *Req) prepareRes() (e error) { + t.responBuf.Reset() + t.response = nil t.err = nil if seeker, ok := t.reqBody.(io.Seeker); ok { @@ -327,7 +343,6 @@ func (t *Req) prepare(val *Rval) (ctx1 context.Context, ctxf1 context.CancelCaus e = ErrCantRetry.New() return } - t.UsedTime = 0 t.responFile = nil t.callTree = "" @@ -404,6 +419,13 @@ func (t *Req) prepare(val *Rval) (ctx1 context.Context, ctxf1 context.CancelCaus } else { t.reqBody = nil } + if t.responBuf == nil { + t.responBuf = new(bytes.Buffer) + } + t.responBuf.Reset() + if val.ResponsePreCap > 0 { + t.responBuf.Grow(val.ResponsePreCap) + } { var ( ctx context.Context @@ -464,10 +486,6 @@ func (t *Req) clean(val *Rval) { } } -func (t *Req) updateUseDur(u time.Time) { - t.UsedTime = time.Since(u) -} - func IsTimeout(e error) bool { if errors.Is(e, context.DeadlineExceeded) { return true diff --git a/reqf/Reqf_test.go b/reqf/Reqf_test.go index c646d76..35e3f41 100644 --- a/reqf/Reqf_test.go +++ b/reqf/Reqf_test.go @@ -142,9 +142,12 @@ func Test_6(t *testing.T) { }); e != nil { t.Fatal(e) } - if reuse.Response.Header.Get(`I`) != `1` { - t.Fail() - } + reuse.Response(func(r *http.Response) error { + if r.Header.Get(`I`) != `1` { + t.Fail() + } + return nil + }) } func Test_8(t *testing.T) { @@ -152,24 +155,29 @@ func Test_8(t *testing.T) { Url: "http://" + addr + "/reply", PostStr: "123", }) - if !bytes.Equal([]byte("123"), reuse.Respon) { - t.Fatal() - } + reuse.Respon(func(b []byte) error { + if !bytes.Equal([]byte("123"), b) { + t.Fatal() + } + return nil + }) reuse.Reqf(Rval{ Url: "http://" + addr + "/reply", }) - if bytes.Equal([]byte("123"), reuse.Respon) { - t.Fatal() - } + reuse.Respon(func(b []byte) error { + if bytes.Equal([]byte("123"), b) { + t.Fatal() + } + return nil + }) } // go test -timeout 30s -run ^Test_reuse$ github.com/qydysky/part/reqf -race -count=1 -v -memprofile mem.out func Test_reuse(t *testing.T) { - reuse.Reqf(Rval{ - Url: "http://" + addr + "/no", - }) - if !bytes.Equal(reuse.Respon, []byte("abc强强强强")) { - t.Fail() + for i := 0; i < 20; i++ { + reuse.Reqf(Rval{ + Url: "http://" + addr + "/no", + }) } } @@ -182,9 +190,12 @@ func Benchmark(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { reuse.Reqf(rval) - if !bytes.Equal(reuse.Respon, []byte("abc强强强强")) { - b.Fail() - } + reuse.Respon(func(buf []byte) error { + if !bytes.Equal([]byte("abc强强强强"), buf) { + b.Fail() + } + return nil + }) } } @@ -289,25 +300,33 @@ func Test_req7(t *testing.T) { } func Test_req(t *testing.T) { - r := New() - r.Reqf(Rval{ + reuse.Reqf(Rval{ Url: "http://" + addr + "/br", }) - if !bytes.Equal(r.Respon, []byte("abc强强强强")) { - t.Error("br fail", r.Respon) - } - r.Reqf(Rval{ + reuse.Respon(func(buf []byte) error { + if !bytes.Equal([]byte("abc强强强强"), buf) { + t.Fail() + } + return nil + }) + reuse.Reqf(Rval{ Url: "http://" + addr + "/gzip", }) - if !bytes.Equal(r.Respon, []byte("abc强强强强")) { - t.Error("gzip fail") - } - r.Reqf(Rval{ + reuse.Respon(func(buf []byte) error { + if !bytes.Equal([]byte("abc强强强强"), buf) { + t.Error("gzip fail") + } + return nil + }) + reuse.Reqf(Rval{ Url: "http://" + addr + "/flate", }) - if !bytes.Equal(r.Respon, []byte("abc强强强强")) { - t.Error("flate fail") - } + reuse.Respon(func(buf []byte) error { + if !bytes.Equal([]byte("abc强强强强"), buf) { + t.Error("flate fail") + } + return nil + }) } type J struct { @@ -315,14 +334,18 @@ type J struct { } func Test_req12(t *testing.T) { - r := New() - r.Reqf(Rval{ + reuse.Reqf(Rval{ Url: "http://" + addr + "/json", Timeout: 10 * 1000, Retry: 2, }) var j J - json.Unmarshal(r.Respon, &j) + reuse.Respon(func(buf []byte) error { + if json.Unmarshal(buf, &j) != nil { + t.Error("json fail") + } + return nil + }) } func Test_req2(t *testing.T) { @@ -565,9 +588,12 @@ func Test_req3(t *testing.T) { Async: true, }) r.Wait() - if !bytes.Equal(r.Respon, []byte("abc强强强强")) { - t.Error("async fail", r.Respon) - } + r.Respon(func(buf []byte) error { + if !bytes.Equal(buf, []byte("abc强强强强")) { + t.Error("async fail", buf) + } + return nil + }) } { var wg sync.WaitGroup @@ -589,9 +615,12 @@ func Test_req3(t *testing.T) { Async: true, }) r.Wait() - if len(r.Respon) != 0 { - t.Error("io async fail", r.Respon) - } + r.Respon(func(buf []byte) error { + if len(buf) != 0 { + t.Error("io async fail", buf) + } + return nil + }) wg.Wait() } } @@ -602,9 +631,12 @@ func Test_req5(t *testing.T) { Url: "http://" + addr + "/reply", PostStr: "123", }) - if !bytes.Equal(r.Respon, []byte("123")) { - t.Fatal() - } + r.Respon(func(buf []byte) error { + if !bytes.Equal(buf, []byte("123")) { + t.Fatal() + } + return nil + }) raw := NewRawReqRes() buf := []byte("123") @@ -623,10 +655,12 @@ func Test_req5(t *testing.T) { t.Fatal(e) } if !bytes.Equal([]byte("123"), buf) { - t.Log(r.Respon, buf) - t.Fatal() - } - if _, e := ResDate(r.Response); e != nil { t.Fatal() } + reuse.Response(func(r *http.Response) error { + if _, e := ResDate(r); e != nil { + t.Fatal() + } + return nil + }) } diff --git a/web/Web_test.go b/web/Web_test.go index 2095ed0..1556f23 100644 --- a/web/Web_test.go +++ b/web/Web_test.go @@ -83,17 +83,23 @@ func Test_Mod(t *testing.T) { r.Reqf(reqf.Rval{ Url: "http://127.0.0.1:13000/mod", }) - if !bytes.Equal(r.Respon, []byte("abc强强强强")) { - t.Fatal(r.Respon) - } + r.Respon(func(rRespon []byte) error { + if !bytes.Equal(rRespon, []byte("abc强强强强")) { + t.Fatal(rRespon) + } + return nil + }) r.Reqf(reqf.Rval{ Url: "http://127.0.0.1:13000/mod", Header: map[string]string{ - `If-None-Match`: r.Response.Header.Get(`ETag`), + `If-None-Match`: r.ResHeader().Get(`ETag`), }, }) - if r.Response.StatusCode != http.StatusNotModified { - t.Fatal(string(r.Respon)) + if r.ResStatusCode() != http.StatusNotModified { + r.Respon(func(rRespon []byte) error { + t.Fatal(string(rRespon)) + return nil + }) } } time.Sleep(time.Second) @@ -121,17 +127,23 @@ func Test_Server(t *testing.T) { r.Reqf(reqf.Rval{ Url: "http://127.0.0.1:13000/no", }) - if !bytes.Equal(r.Respon, []byte("abc强强强强")) { - t.Fatal(r.Respon) - } + r.Respon(func(rRespon []byte) error { + if !bytes.Equal(rRespon, []byte("abc强强强强")) { + t.Fatal(rRespon) + } + return nil + }) } { r.Reqf(reqf.Rval{ Url: "http://127.0.0.1:13000//no1", }) - if !bytes.Equal(r.Respon, []byte("abc强强强强1")) { - t.Fatal(string(r.Respon)) - } + r.Respon(func(rRespon []byte) error { + if !bytes.Equal(rRespon, []byte("abc强强强强1")) { + t.Fatal(rRespon) + } + return nil + }) } } @@ -505,17 +517,23 @@ func Test_Server2(t *testing.T) { r.Reqf(reqf.Rval{ Url: "http://127.0.0.1:13001/1", }) - if !bytes.Equal(r.Respon, []byte("/1")) { - t.Fatal(r.Respon) - } + r.Respon(func(buf []byte) error { + if !bytes.Equal(buf, []byte("/1")) { + t.Fatal(buf) + } + return nil + }) } { r.Reqf(reqf.Rval{ Url: "http://127.0.0.1:13001/2", }) - if !bytes.Equal(r.Respon, []byte("/")) { - t.Fatal(r.Respon) - } + r.Respon(func(rRespon []byte) error { + if !bytes.Equal(rRespon, []byte("/")) { + t.Fatal(rRespon) + } + return nil + }) } } @@ -549,21 +567,30 @@ func Test_ServerSyncMap(t *testing.T) { r.Reqf(reqf.Rval{ Url: "http://127.0.0.1:13000/1", }) - if !bytes.Equal(r.Respon, []byte("{\"code\":0,\"message\":\"ok\",\"data\":{\"a\":\"0\",\"b\":[\"0\"],\"c\":{\"0\":1}}}")) { - t.Error(string(r.Respon)) - } + r.Respon(func(buf []byte) error { + if !bytes.Equal(buf, []byte("{\"code\":0,\"message\":\"ok\",\"data\":{\"a\":\"0\",\"b\":[\"0\"],\"c\":{\"0\":1}}}")) { + t.Error(string(buf)) + } + return nil + }) r.Reqf(reqf.Rval{ Url: "http://127.0.0.1:13000/2", }) - if r.Response.StatusCode != 404 { - t.Error(string(r.Respon)) + if r.ResStatusCode() != 404 { + r.Respon(func(buf []byte) error { + t.Error(string(buf)) + return nil + }) } m.Store("/2/", nil) r.Reqf(reqf.Rval{ Url: "http://127.0.0.1:13000/2/", }) - if r.Response.StatusCode != 404 { - t.Error(string(r.Respon)) + if r.ResStatusCode() != 404 { + r.Respon(func(buf []byte) error { + t.Error(string(buf)) + return nil + }) } } } @@ -617,7 +644,7 @@ func Test_ClientBlock(t *testing.T) { time.Sleep(time.Second * 3) d, _ := io.ReadAll(rc) fmt.Println(string(d)) - fmt.Println(r.Response.Status) + fmt.Println(r.ResStatusCode()) close(c) }() r.Reqf(reqf.Rval{ @@ -680,55 +707,75 @@ func Test_ServerSyncMapP(t *testing.T) { r.Reqf(reqf.Rval{ Url: "http://127.0.0.1:13002/conn", }) - json.Unmarshal(r.Respon, &res) + r.Respon(func(rRespon []byte) error { + if json.Unmarshal(rRespon, &res) != nil { + t.Fatal(rRespon) + } + return nil + }) if res.Message != "ok" { t.Fatal("") } r.Reqf(reqf.Rval{ Url: "http://127.0.0.1:13002/", }) - json.Unmarshal(r.Respon, &res) + r.Respon(func(rRespon []byte) error { + if json.Unmarshal(rRespon, &res) != nil { + t.Fatal(rRespon) + } + return nil + }) if data, ok := res.Data.(map[string]any); !ok || data["path"].(string) != "/" { t.Fatal("") } r.Reqf(reqf.Rval{ Url: "http://127.0.0.1:13002/1", }) - json.Unmarshal(r.Respon, &res) + r.ResponUnmarshal(json.Unmarshal, &res) if data, ok := res.Data.(map[string]any); !ok || data["path"].(string) != "/" { t.Fatal("") } r.Reqf(reqf.Rval{ Url: "http://127.0.0.1:13002/1/", }) - json.Unmarshal(r.Respon, &res) + r.Respon(func(rRespon []byte) error { + if json.Unmarshal(rRespon, &res) != nil { + t.Fatal(rRespon) + } + return nil + }) if data, ok := res.Data.(map[string]any); !ok || data["path"].(string) != "/1/" { t.Fatal("") } r.Reqf(reqf.Rval{ Url: "http://127.0.0.1:13002/2", }) - if r.Response.StatusCode != 404 { + if r.ResStatusCode() != 404 { t.Fatal("") } r.Reqf(reqf.Rval{ Url: "http://127.0.0.1:13002/1/23", }) - json.Unmarshal(r.Respon, &res) + r.ResponUnmarshal(json.Unmarshal, &res) if data, ok := res.Data.(map[string]any); !ok || data["path"].(string) != "/1/" { t.Fatal("") } r.Reqf(reqf.Rval{ Url: "http://127.0.0.1:13002/1/2/3", }) - json.Unmarshal(r.Respon, &res) + r.ResponUnmarshal(json.Unmarshal, &res) if data, ok := res.Data.(map[string]any); !ok || data["path"].(string) != "/1/" { t.Fatal("") } r.Reqf(reqf.Rval{ Url: "http://127.0.0.1:13002/1/2", }) - json.Unmarshal(r.Respon, &res) + r.Respon(func(rRespon []byte) error { + if json.Unmarshal(rRespon, &res) != nil { + t.Fatal(rRespon) + } + return nil + }) if data, ok := res.Data.(map[string]any); !ok || data["path"].(string) != "/1/2" { t.Fatal("") } -- 2.39.2