From 9aabaa1860087d19a51b24e0c83f8cfdff7d46d2 Mon Sep 17 00:00:00 2001 From: qydysky Date: Sun, 25 May 2025 04:32:22 +0800 Subject: [PATCH] 1 (#57) * 1 * 1 * 1 * 1 * 1 --- io/io.go | 84 +++++++++++++++++++------ reqf/RawReqRes.go | 6 ++ reqf/Reqf.go | 156 ++++++++++++++++++++-------------------------- reqf/Reqf_test.go | 75 ++++++++++++++++++---- web/Web_test.go | 4 +- 5 files changed, 204 insertions(+), 121 deletions(-) diff --git a/io/io.go b/io/io.go index 985a07e..d45a8ed 100644 --- a/io/io.go +++ b/io/io.go @@ -51,9 +51,16 @@ func RW2Chan(r io.ReadCloser, w io.WriteCloser) (rc, wc chan []byte) { return } -func NewPipe() *IOpipe { +func NewPipe() (u *IOpipe) { r, w := io.Pipe() - return &IOpipe{R: r, W: w} + u = &IOpipe{r: r, w: w} + u.ctx, u.ctxC = context.WithCancel(context.Background()) + return +} +func NewPipeRaw(r *io.PipeReader, w *io.PipeWriter) (u *IOpipe) { + u = &IOpipe{r: r, w: w} + u.ctx, u.ctxC = context.WithCancel(context.Background()) + return } type onceError struct { @@ -76,15 +83,17 @@ func (a *onceError) Load() error { } type IOpipe struct { - R *io.PipeReader - W *io.PipeWriter - re onceError - we onceError + r *io.PipeReader + w *io.PipeWriter + ctx context.Context + ctxC context.CancelFunc + re onceError + we onceError } func (t *IOpipe) Write(p []byte) (n int, err error) { - if t.W != nil { - n, err = t.W.Write(p) + if t.w != nil { + n, err = t.w.Write(p) if errors.Is(err, io.ErrClosedPipe) { err = errors.Join(err, t.we.Load()) } @@ -92,8 +101,8 @@ func (t *IOpipe) Write(p []byte) (n int, err error) { return } func (t *IOpipe) Read(p []byte) (n int, err error) { - if t.R != nil { - n, err = t.R.Read(p) + if t.r != nil { + n, err = t.r.Read(p) if errors.Is(err, io.ErrClosedPipe) { err = errors.Join(err, t.re.Load()) } @@ -101,25 +110,44 @@ func (t *IOpipe) Read(p []byte) (n int, err error) { return } func (t *IOpipe) Close() (err error) { - if t.W != nil { - err = errors.Join(err, t.W.Close()) + if t.w != nil { + err = errors.Join(err, t.w.Close()) } - if t.R != nil { - err = errors.Join(err, t.R.Close()) + if t.r != nil { + err = errors.Join(err, t.r.Close()) } + t.ctxC() return } func (t *IOpipe) CloseWithError(e error) (err error) { - if t.W != nil { + if t.w != nil { t.we.Store(e) - err = errors.Join(err, t.W.CloseWithError(e)) + err = errors.Join(err, t.w.CloseWithError(e)) } - if t.R != nil { + if t.r != nil { t.re.Store(e) - err = errors.Join(err, t.R.CloseWithError(e)) + err = errors.Join(err, t.r.CloseWithError(e)) } + t.ctxC() return } +func (t *IOpipe) WithCtx(ctx context.Context) *IOpipe { + go func() { + select { + case <-ctx.Done(): + if t.w != nil { + t.we.Store(ctx.Err()) + t.w.CloseWithError(ctx.Err()) + } + if t.r != nil { + t.re.Store(ctx.Err()) + t.r.CloseWithError(ctx.Err()) + } + case <-t.ctx.Done(): + } + }() + return t +} type RWC struct { R func(p []byte) (n int, err error) @@ -288,6 +316,26 @@ func WithCtxCopy(ctx context.Context, callTree string, copybuf []byte, to time.D } } +func WithCtxCopyNoCheck(ctx context.Context, copybuf []byte, w io.Writer, r io.Reader) error { + for { + n, e := r.Read(copybuf) + if n != 0 { + n, e := w.Write(copybuf[:n]) + if n == 0 && e != nil { + if !errors.Is(e, io.EOF) { + return errors.Join(ErrWrite, e) + } + return nil + } + } else if e != nil { + if !errors.Is(e, io.EOF) { + return errors.Join(ErrRead, e) + } + return nil + } + } +} + type CopyConfig struct { BytePerLoop, MaxLoop, MaxByte, BytePerSec uint64 SkipByte int diff --git a/reqf/RawReqRes.go b/reqf/RawReqRes.go index a67f712..56fad7e 100644 --- a/reqf/RawReqRes.go +++ b/reqf/RawReqRes.go @@ -1,6 +1,7 @@ package part import ( + "context" "io" "sync/atomic" @@ -40,6 +41,11 @@ func (t RawReqRes) ResClose() error { return nil } +func (t RawReqRes) WithCtx(ctx context.Context) { + if !t.resC.Swap(true) { + t.res.WithCtx(ctx) + } +} func (t RawReqRes) ResCloseWithError(e error) error { if !t.resC.Swap(true) { return t.res.CloseWithError(e) diff --git a/reqf/Reqf.go b/reqf/Reqf.go index 5411b33..5e933a1 100644 --- a/reqf/Reqf.go +++ b/reqf/Reqf.go @@ -19,6 +19,7 @@ import ( flate "compress/flate" gzip "compress/gzip" + "github.com/dustin/go-humanize" br "github.com/qydysky/brotli" pe "github.com/qydysky/part/errors" pio "github.com/qydysky/part/io" @@ -36,7 +37,7 @@ type Rval struct { Timeout int // Millisecond SleepTime int - // Millisecond + // Deprecated: use Timeout WriteLoopTO int JustResponseCode bool NoResponse bool @@ -80,9 +81,9 @@ type Req struct { Response *http.Response UsedTime time.Duration - cancelP atomic.Pointer[context.CancelFunc] - state atomic.Int32 + state atomic.Int32 + client *http.Client responFile *os.File responBuf *bytes.Buffer err error @@ -100,22 +101,21 @@ func (t *Req) Reqf(val Rval) error { t.l.Lock() t.state.Store(running) - pctx, cancelF := t.prepare(&val) - t.cancelP.Store(&cancelF) + t.prepare(&val) if !val.Async { // 同步 - t.reqfM(pctx, cancelF, val) + t.reqfM(val.Ctx, val) return t.err } else { //异步 - go t.reqfM(pctx, cancelF, val) + go t.reqfM(val.Ctx, val) } return nil } -func (t *Req) reqfM(ctxMain context.Context, cancelMain context.CancelFunc, val Rval) { +func (t *Req) reqfM(ctxMain context.Context, val Rval) { beginTime := time.Now() for i := 0; i <= val.Retry; i++ { @@ -130,7 +130,6 @@ func (t *Req) reqfM(ctxMain context.Context, cancelMain context.CancelFunc, val } } - cancelMain() t.updateUseDur(beginTime) t.clean(&val) t.state.Store(free) @@ -138,28 +137,15 @@ func (t *Req) reqfM(ctxMain context.Context, cancelMain context.CancelFunc, val } func (t *Req) reqf(ctx context.Context, val Rval) (err error) { - var ( - Header map[string]string = val.Header - client http.Client - ) - - if Header == nil { - Header = make(map[string]string) + if t.client.Transport == nil { + t.client.Transport = &http.Transport{} } - if val.Proxy != "" { - proxy := func(_ *http.Request) (*url.URL, error) { + t.client.Transport.(*http.Transport).Proxy = func(_ *http.Request) (*url.URL, error) { return url.Parse(val.Proxy) } - client.Transport = &http.Transport{ - Proxy: proxy, - IdleConnTimeout: time.Minute, - } - } else { - client.Transport = &http.Transport{ - IdleConnTimeout: time.Minute, - } } + t.client.Transport.(*http.Transport).IdleConnTimeout = time.Minute if val.Url == "" { return ErrEmptyUrl.New() @@ -178,9 +164,6 @@ func (t *Req) reqf(ctx context.Context, val Rval) (err error) { if len(val.PostStr) > 0 { body = strings.NewReader(val.PostStr) - if _, ok := Header["Content-Type"]; !ok { - Header["Content-Type"] = "application/x-www-form-urlencoded" - } } req, e := http.NewRequestWithContext(ctx, val.Method, val.Url, body) @@ -192,34 +175,39 @@ func (t *Req) reqf(ctx context.Context, val Rval) (err error) { req.AddCookie(v) } - if _, ok := Header["Accept"]; !ok { - Header["Accept"] = defaultAccept + for k, v := range val.Header { + req.Header.Set(k, v) + } + + if len(val.PostStr) > 0 { + if _, ok := req.Header["Content-Type"]; !ok { + req.Header.Set("Content-Type", "application/x-www-form-urlencoded") + } } - if _, ok := Header["Connection"]; !ok { - Header["Connection"] = "keep-alive" + if _, ok := req.Header["Accept"]; !ok { + req.Header.Set("Accept", defaultAccept) } - if _, ok := Header["Accept-Encoding"]; !ok { - Header["Accept-Encoding"] = "gzip, deflate, br" + if _, ok := req.Header["Connection"]; !ok { + req.Header.Set("Connection", "keep-alive") } - if val.SaveToPath != "" || val.SaveToPipe != nil { - Header["Accept-Encoding"] = "identity" + if _, ok := req.Header["Accept-Encoding"]; !ok { + req.Header.Set("Accept-Encoding", "gzip, deflate, br") } - if _, ok := Header["User-Agent"]; !ok { - Header["User-Agent"] = defaultUA + if val.SaveToPath != "" || val.SaveToPipe != nil { + req.Header.Set("Accept-Encoding", "identity") } - - for k, v := range Header { - req.Header.Set(k, v) + if _, ok := req.Header["User-Agent"]; !ok { + req.Header.Set("User-Agent", defaultUA) } - resp, e := client.Do(req) + resp, e := t.client.Do(req) if e != nil { return pe.Join(ErrClientDo.New(), e) } - if v, ok := Header["Connection"]; ok && strings.ToLower(v) != "keep-alive" { - defer client.CloseIdleConnections() + if v, ok := val.Header["Connection"]; ok && strings.ToLower(v) != "keep-alive" { + defer t.client.CloseIdleConnections() } t.Response = resp @@ -264,27 +252,26 @@ func (t *Req) reqf(ctx context.Context, val Rval) (err error) { } } - writeLoopTO := val.WriteLoopTO - if writeLoopTO == 0 { - if val.Timeout > 0 { - writeLoopTO = val.Timeout + 500 - } else { - writeLoopTO = 1000 - } - } - // io copy - errChan := make(chan error, 3) - errChan <- pio.WithCtxCopy( - req.Context(), - t.callTree, - t.copyResBuf[:], - time.Duration(int(time.Millisecond)*writeLoopTO), io.MultiWriter(ws...), - resReadCloser, - func(s string) { errChan <- pe.New(s) }, - ) - for len(errChan) > 0 { - err = pe.Join(err, <-errChan) + { + w := io.MultiWriter(ws...) + for { + n, e := resReadCloser.Read(t.copyResBuf) + if n != 0 { + n, e := w.Write(t.copyResBuf[:n]) + if n == 0 && e != nil { + if !errors.Is(e, io.EOF) { + err = pe.Join(err, e) + } + break + } + } else if e != nil { + if !errors.Is(e, io.EOF) { + err = pe.Join(err, e) + } + break + } + } } if t.responBuf != nil { @@ -304,10 +291,9 @@ func (t *Req) Wait() (err error) { } func (t *Req) Close() { t.Cancel() } + +// Deprecated: use rval.Ctx.Cancle func (t *Req) Cancel() { - if p := t.cancelP.Load(); p != nil { - (*p)() - } } func (t *Req) IsLive() bool { @@ -330,14 +316,14 @@ func (t *Req) prepareRes(ctx context.Context, val *Rval) (ctx1 context.Context, t.err = nil if val.Timeout > 0 { - ctx1, ctxf1 = context.WithTimeout(ctx, time.Duration(val.Timeout*int(time.Millisecond))) + ctx1, ctxf1 = context.WithTimeout(ctx, time.Duration(val.Timeout)*time.Millisecond) } else { ctx1, ctxf1 = context.WithCancel(ctx) } return } -func (t *Req) prepare(val *Rval) (ctx context.Context, cancel context.CancelFunc) { +func (t *Req) prepare(val *Rval) { t.UsedTime = 0 t.responFile = nil t.callTree = "" @@ -349,29 +335,21 @@ func (t *Req) prepare(val *Rval) (ctx context.Context, cancel context.CancelFunc } } if cap(t.copyResBuf) == 0 { - t.copyResBuf = make([]byte, 1<<17) - } - if val.Ctx != nil { - ctx, cancel = context.WithCancel(val.Ctx) + t.copyResBuf = make([]byte, humanize.KByte*4) } else { - ctx, cancel = context.WithCancel(context.Background()) + t.copyResBuf = t.copyResBuf[:cap(t.copyResBuf)] + } + if t.client == nil { + t.client = &http.Client{} + } + if val.Ctx == nil { + val.Ctx = context.Background() } - if val.SaveToPipe != nil { - go func() { - <-ctx.Done() - if e := val.SaveToPipe.CloseWithError(context.Canceled); e != nil { - println(e) - } - }() + val.SaveToPipe.WithCtx(val.Ctx) } if val.RawPipe != nil { - go func() { - <-ctx.Done() - if e := val.RawPipe.ResCloseWithError(context.Canceled); e != nil { - println(e) - } - }() + val.RawPipe.WithCtx(val.Ctx) } if val.Method == "" { @@ -383,8 +361,6 @@ func (t *Req) prepare(val *Rval) (ctx context.Context, cancel context.CancelFunc val.Method = "GET" } } - - return } func (t *Req) clean(val *Rval) { diff --git a/reqf/Reqf_test.go b/reqf/Reqf_test.go index 7c0e802..654f734 100644 --- a/reqf/Reqf_test.go +++ b/reqf/Reqf_test.go @@ -19,6 +19,8 @@ import ( var addr = "127.0.0.1:10001" +var reuse = New() + func init() { s := web.New(&http.Server{ Addr: addr, @@ -105,8 +107,53 @@ func init() { `/exit`: func(_ http.ResponseWriter, _ *http.Request) { s.Server.Shutdown(context.Background()) }, + `/header`: func(w http.ResponseWriter, r *http.Request) { + for k, v := range r.Header { + w.Header().Set(k, v[0]) + } + }, }) time.Sleep(time.Second) + reuse.Reqf(Rval{ + Url: "http://" + addr + "/no", + }) +} + +func Test_6(t *testing.T) { + reuse.Reqf(Rval{ + Url: "http://" + addr + "/header", + Header: map[string]string{ + `I`: `1`, + }, + }) + if reuse.Response.Header.Get(`I`) != `1` { + t.Fail() + } +} + +// go test -timeout 30s -run ^Test_reuse$ github.com/qydysky/part/reqf -race -count=1 -v -memprofile mem.out +func Test_reuse(t *testing.T) { + reuse.Reqf(Rval{ + Url: "http://" + addr + "/no", + }) + if !bytes.Equal(reuse.Respon, []byte("abc强强强强")) { + t.Fail() + } +} + +// 2710 430080 ns/op 9896 B/op 111 allocs/op +func Benchmark(b *testing.B) { + rval := Rval{ + Url: "http://" + addr + "/no", + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + reuse.Reqf(rval) + if !bytes.Equal(reuse.Respon, []byte("abc强强强强")) { + b.Fail() + } + } } func Test14(t *testing.T) { @@ -119,7 +166,7 @@ func Test14(t *testing.T) { Url: "http://" + addr + "/stream", Ctx: ctx, NoResponse: true, - SaveToPipe: &pio.IOpipe{R: i, W: o}, + SaveToPipe: pio.NewPipeRaw(i, o), Async: true, WriteLoopTO: 5*1000*2 + 1, }); e != nil { @@ -167,12 +214,14 @@ func Test_req13(t *testing.T) { } func Test_req7(t *testing.T) { + ctx, ctxc := context.WithCancel(context.Background()) r := New() r.Reqf(Rval{ + Ctx: ctx, Url: "http://" + addr + "/to", Async: true, }) - r.Cancel() + ctxc() if !IsCancel(r.Wait()) { t.Error("async Cancel fail") } @@ -288,14 +337,16 @@ func Test_req4(t *testing.T) { // } func Test_req11(t *testing.T) { + ctx, ctxc := context.WithCancel(context.Background()) r := New() { timer := time.NewTimer(time.Second) go func() { <-timer.C - r.Cancel() + ctxc() }() e := r.Reqf(Rval{ + Ctx: ctx, Url: "http://" + addr + "/to", }) if !IsCancel(e) { @@ -319,7 +370,7 @@ func Test_req9(t *testing.T) { }() r.Reqf(Rval{ Url: "http://" + addr + "/1min", - SaveToPipe: &pio.IOpipe{R: rc, W: wc}, + SaveToPipe: pio.NewPipeRaw(rc, wc), Async: true, }) if r.Wait() != nil { @@ -329,6 +380,7 @@ func Test_req9(t *testing.T) { } func Test_req8(t *testing.T) { + ctx, ctxc := context.WithCancel(context.Background()) r := New() { rc, wc := io.Pipe() @@ -336,11 +388,12 @@ func Test_req8(t *testing.T) { var buf []byte = make([]byte, 1<<16) _, _ = rc.Read(buf) time.Sleep(time.Millisecond * 500) - r.Cancel() + ctxc() }() r.Reqf(Rval{ + Ctx: ctx, Url: "http://" + addr + "/1min", - SaveToPipe: &pio.IOpipe{R: rc, W: wc}, + SaveToPipe: pio.NewPipeRaw(rc, wc), Async: true, }) if !IsCancel(r.Wait()) { @@ -385,7 +438,7 @@ func Test_req3(t *testing.T) { }() r.Reqf(Rval{ Url: "http://" + addr + "/br", - SaveToPipe: &pio.IOpipe{R: rc, W: wc}, + SaveToPipe: pio.NewPipeRaw(rc, wc), Async: true, }) <-c @@ -402,7 +455,7 @@ func Test_req3(t *testing.T) { }() r.Reqf(Rval{ Url: "http://" + addr + "/gzip", - SaveToPipe: &pio.IOpipe{R: rc, W: wc}, + SaveToPipe: pio.NewPipeRaw(rc, wc), Async: true, }) <-c @@ -419,7 +472,7 @@ func Test_req3(t *testing.T) { }() if e := r.Reqf(Rval{ Url: "http://" + addr + "/flate", - SaveToPipe: &pio.IOpipe{R: rc, W: wc}, + SaveToPipe: pio.NewPipeRaw(rc, wc), }); e != nil { t.Error(e) } @@ -440,7 +493,7 @@ func Test_req3(t *testing.T) { }() r.Reqf(Rval{ Url: "http://" + addr + "/flate", - SaveToPipe: &pio.IOpipe{R: rc, W: wc}, + SaveToPipe: pio.NewPipeRaw(rc, wc), Async: true, }) <-c @@ -470,7 +523,7 @@ func Test_req3(t *testing.T) { }() r.Reqf(Rval{ Url: "http://" + addr + "/flate", - SaveToPipe: &pio.IOpipe{R: rc, W: wc}, + SaveToPipe: pio.NewPipeRaw(rc, wc), NoResponse: true, Async: true, }) diff --git a/web/Web_test.go b/web/Web_test.go index cd53ec4..1d74ba4 100644 --- a/web/Web_test.go +++ b/web/Web_test.go @@ -93,7 +93,7 @@ func Test_Mod(t *testing.T) { }, }) if r.Response.StatusCode != http.StatusNotModified { - t.Fatal(r.Respon) + t.Fatal(string(r.Respon)) } } time.Sleep(time.Second) @@ -622,7 +622,7 @@ func Test_ClientBlock(t *testing.T) { }() r.Reqf(reqf.Rval{ Url: "http://127.0.0.1:13001/to", - SaveToPipe: &pio.IOpipe{R: rc, W: wc}, + SaveToPipe: pio.NewPipeRaw(rc, wc), WriteLoopTO: 5000, Async: true, }) -- 2.39.2