]> 127.0.0.1 Git - part/.git/commitdiff
1 (#57) v0.28.20250524203231
authorqydysky <qydysky@foxmail.com>
Sat, 24 May 2025 20:32:22 +0000 (04:32 +0800)
committerGitHub <noreply@github.com>
Sat, 24 May 2025 20:32:22 +0000 (04:32 +0800)
* 1

* 1

* 1

* 1

* 1

io/io.go
reqf/RawReqRes.go
reqf/Reqf.go
reqf/Reqf_test.go
web/Web_test.go

index 985a07e074309e71e30e5cdbb3237d4b989d8163..d45a8edaa93221ceddd5f5e02978986f74d24a7e 100644 (file)
--- a/io/io.go
+++ b/io/io.go
@@ -51,9 +51,16 @@ func RW2Chan(r io.ReadCloser, w io.WriteCloser) (rc, wc chan []byte) {
        return
 }
 
-func NewPipe() *IOpipe {
+func NewPipe() (u *IOpipe) {
        r, w := io.Pipe()
-       return &IOpipe{R: r, W: w}
+       u = &IOpipe{r: r, w: w}
+       u.ctx, u.ctxC = context.WithCancel(context.Background())
+       return
+}
+func NewPipeRaw(r *io.PipeReader, w *io.PipeWriter) (u *IOpipe) {
+       u = &IOpipe{r: r, w: w}
+       u.ctx, u.ctxC = context.WithCancel(context.Background())
+       return
 }
 
 type onceError struct {
@@ -76,15 +83,17 @@ func (a *onceError) Load() error {
 }
 
 type IOpipe struct {
-       R  *io.PipeReader
-       W  *io.PipeWriter
-       re onceError
-       we onceError
+       r    *io.PipeReader
+       w    *io.PipeWriter
+       ctx  context.Context
+       ctxC context.CancelFunc
+       re   onceError
+       we   onceError
 }
 
 func (t *IOpipe) Write(p []byte) (n int, err error) {
-       if t.W != nil {
-               n, err = t.W.Write(p)
+       if t.w != nil {
+               n, err = t.w.Write(p)
                if errors.Is(err, io.ErrClosedPipe) {
                        err = errors.Join(err, t.we.Load())
                }
@@ -92,8 +101,8 @@ func (t *IOpipe) Write(p []byte) (n int, err error) {
        return
 }
 func (t *IOpipe) Read(p []byte) (n int, err error) {
-       if t.R != nil {
-               n, err = t.R.Read(p)
+       if t.r != nil {
+               n, err = t.r.Read(p)
                if errors.Is(err, io.ErrClosedPipe) {
                        err = errors.Join(err, t.re.Load())
                }
@@ -101,25 +110,44 @@ func (t *IOpipe) Read(p []byte) (n int, err error) {
        return
 }
 func (t *IOpipe) Close() (err error) {
-       if t.W != nil {
-               err = errors.Join(err, t.W.Close())
+       if t.w != nil {
+               err = errors.Join(err, t.w.Close())
        }
-       if t.R != nil {
-               err = errors.Join(err, t.R.Close())
+       if t.r != nil {
+               err = errors.Join(err, t.r.Close())
        }
+       t.ctxC()
        return
 }
 func (t *IOpipe) CloseWithError(e error) (err error) {
-       if t.W != nil {
+       if t.w != nil {
                t.we.Store(e)
-               err = errors.Join(err, t.W.CloseWithError(e))
+               err = errors.Join(err, t.w.CloseWithError(e))
        }
-       if t.R != nil {
+       if t.r != nil {
                t.re.Store(e)
-               err = errors.Join(err, t.R.CloseWithError(e))
+               err = errors.Join(err, t.r.CloseWithError(e))
        }
+       t.ctxC()
        return
 }
+func (t *IOpipe) WithCtx(ctx context.Context) *IOpipe {
+       go func() {
+               select {
+               case <-ctx.Done():
+                       if t.w != nil {
+                               t.we.Store(ctx.Err())
+                               t.w.CloseWithError(ctx.Err())
+                       }
+                       if t.r != nil {
+                               t.re.Store(ctx.Err())
+                               t.r.CloseWithError(ctx.Err())
+                       }
+               case <-t.ctx.Done():
+               }
+       }()
+       return t
+}
 
 type RWC struct {
        R func(p []byte) (n int, err error)
@@ -288,6 +316,26 @@ func WithCtxCopy(ctx context.Context, callTree string, copybuf []byte, to time.D
        }
 }
 
+func WithCtxCopyNoCheck(ctx context.Context, copybuf []byte, w io.Writer, r io.Reader) error {
+       for {
+               n, e := r.Read(copybuf)
+               if n != 0 {
+                       n, e := w.Write(copybuf[:n])
+                       if n == 0 && e != nil {
+                               if !errors.Is(e, io.EOF) {
+                                       return errors.Join(ErrWrite, e)
+                               }
+                               return nil
+                       }
+               } else if e != nil {
+                       if !errors.Is(e, io.EOF) {
+                               return errors.Join(ErrRead, e)
+                       }
+                       return nil
+               }
+       }
+}
+
 type CopyConfig struct {
        BytePerLoop, MaxLoop, MaxByte, BytePerSec uint64
        SkipByte                                  int
index a67f712972448ef723279d93502fa6f29f11fe0e..56fad7ea2b518f3e64b140b2f6efdd230ade8707 100644 (file)
@@ -1,6 +1,7 @@
 package part
 
 import (
+       "context"
        "io"
        "sync/atomic"
 
@@ -40,6 +41,11 @@ func (t RawReqRes) ResClose() error {
        return nil
 }
 
+func (t RawReqRes) WithCtx(ctx context.Context) {
+       if !t.resC.Swap(true) {
+               t.res.WithCtx(ctx)
+       }
+}
 func (t RawReqRes) ResCloseWithError(e error) error {
        if !t.resC.Swap(true) {
                return t.res.CloseWithError(e)
index 5411b3388e4ed3cc1bbe057ecfaca5c1de29898e..5e933a1becefb18ee2b9247586ceffe406987a62 100644 (file)
@@ -19,6 +19,7 @@ import (
        flate "compress/flate"
        gzip "compress/gzip"
 
+       "github.com/dustin/go-humanize"
        br "github.com/qydysky/brotli"
        pe "github.com/qydysky/part/errors"
        pio "github.com/qydysky/part/io"
@@ -36,7 +37,7 @@ type Rval struct {
        Timeout int
        // Millisecond
        SleepTime int
-       // Millisecond
+       // Deprecated: use Timeout
        WriteLoopTO      int
        JustResponseCode bool
        NoResponse       bool
@@ -80,9 +81,9 @@ type Req struct {
        Response *http.Response
        UsedTime time.Duration
 
-       cancelP atomic.Pointer[context.CancelFunc]
-       state   atomic.Int32
+       state atomic.Int32
 
+       client     *http.Client
        responFile *os.File
        responBuf  *bytes.Buffer
        err        error
@@ -100,22 +101,21 @@ func (t *Req) Reqf(val Rval) error {
        t.l.Lock()
        t.state.Store(running)
 
-       pctx, cancelF := t.prepare(&val)
-       t.cancelP.Store(&cancelF)
+       t.prepare(&val)
 
        if !val.Async {
                // 同步
-               t.reqfM(pctx, cancelF, val)
+               t.reqfM(val.Ctx, val)
                return t.err
        } else {
                //异步
-               go t.reqfM(pctx, cancelF, val)
+               go t.reqfM(val.Ctx, val)
        }
 
        return nil
 }
 
-func (t *Req) reqfM(ctxMain context.Context, cancelMain context.CancelFunc, val Rval) {
+func (t *Req) reqfM(ctxMain context.Context, val Rval) {
        beginTime := time.Now()
 
        for i := 0; i <= val.Retry; i++ {
@@ -130,7 +130,6 @@ func (t *Req) reqfM(ctxMain context.Context, cancelMain context.CancelFunc, val
                }
        }
 
-       cancelMain()
        t.updateUseDur(beginTime)
        t.clean(&val)
        t.state.Store(free)
@@ -138,28 +137,15 @@ func (t *Req) reqfM(ctxMain context.Context, cancelMain context.CancelFunc, val
 }
 
 func (t *Req) reqf(ctx context.Context, val Rval) (err error) {
-       var (
-               Header map[string]string = val.Header
-               client http.Client
-       )
-
-       if Header == nil {
-               Header = make(map[string]string)
+       if t.client.Transport == nil {
+               t.client.Transport = &http.Transport{}
        }
-
        if val.Proxy != "" {
-               proxy := func(_ *http.Request) (*url.URL, error) {
+               t.client.Transport.(*http.Transport).Proxy = func(_ *http.Request) (*url.URL, error) {
                        return url.Parse(val.Proxy)
                }
-               client.Transport = &http.Transport{
-                       Proxy:           proxy,
-                       IdleConnTimeout: time.Minute,
-               }
-       } else {
-               client.Transport = &http.Transport{
-                       IdleConnTimeout: time.Minute,
-               }
        }
+       t.client.Transport.(*http.Transport).IdleConnTimeout = time.Minute
 
        if val.Url == "" {
                return ErrEmptyUrl.New()
@@ -178,9 +164,6 @@ func (t *Req) reqf(ctx context.Context, val Rval) (err error) {
 
        if len(val.PostStr) > 0 {
                body = strings.NewReader(val.PostStr)
-               if _, ok := Header["Content-Type"]; !ok {
-                       Header["Content-Type"] = "application/x-www-form-urlencoded"
-               }
        }
 
        req, e := http.NewRequestWithContext(ctx, val.Method, val.Url, body)
@@ -192,34 +175,39 @@ func (t *Req) reqf(ctx context.Context, val Rval) (err error) {
                req.AddCookie(v)
        }
 
-       if _, ok := Header["Accept"]; !ok {
-               Header["Accept"] = defaultAccept
+       for k, v := range val.Header {
+               req.Header.Set(k, v)
+       }
+
+       if len(val.PostStr) > 0 {
+               if _, ok := req.Header["Content-Type"]; !ok {
+                       req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
+               }
        }
-       if _, ok := Header["Connection"]; !ok {
-               Header["Connection"] = "keep-alive"
+       if _, ok := req.Header["Accept"]; !ok {
+               req.Header.Set("Accept", defaultAccept)
        }
-       if _, ok := Header["Accept-Encoding"]; !ok {
-               Header["Accept-Encoding"] = "gzip, deflate, br"
+       if _, ok := req.Header["Connection"]; !ok {
+               req.Header.Set("Connection", "keep-alive")
        }
-       if val.SaveToPath != "" || val.SaveToPipe != nil {
-               Header["Accept-Encoding"] = "identity"
+       if _, ok := req.Header["Accept-Encoding"]; !ok {
+               req.Header.Set("Accept-Encoding", "gzip, deflate, br")
        }
-       if _, ok := Header["User-Agent"]; !ok {
-               Header["User-Agent"] = defaultUA
+       if val.SaveToPath != "" || val.SaveToPipe != nil {
+               req.Header.Set("Accept-Encoding", "identity")
        }
-
-       for k, v := range Header {
-               req.Header.Set(k, v)
+       if _, ok := req.Header["User-Agent"]; !ok {
+               req.Header.Set("User-Agent", defaultUA)
        }
 
-       resp, e := client.Do(req)
+       resp, e := t.client.Do(req)
 
        if e != nil {
                return pe.Join(ErrClientDo.New(), e)
        }
 
-       if v, ok := Header["Connection"]; ok && strings.ToLower(v) != "keep-alive" {
-               defer client.CloseIdleConnections()
+       if v, ok := val.Header["Connection"]; ok && strings.ToLower(v) != "keep-alive" {
+               defer t.client.CloseIdleConnections()
        }
 
        t.Response = resp
@@ -264,27 +252,26 @@ func (t *Req) reqf(ctx context.Context, val Rval) (err error) {
                }
        }
 
-       writeLoopTO := val.WriteLoopTO
-       if writeLoopTO == 0 {
-               if val.Timeout > 0 {
-                       writeLoopTO = val.Timeout + 500
-               } else {
-                       writeLoopTO = 1000
-               }
-       }
-
        // io copy
-       errChan := make(chan error, 3)
-       errChan <- pio.WithCtxCopy(
-               req.Context(),
-               t.callTree,
-               t.copyResBuf[:],
-               time.Duration(int(time.Millisecond)*writeLoopTO), io.MultiWriter(ws...),
-               resReadCloser,
-               func(s string) { errChan <- pe.New(s) },
-       )
-       for len(errChan) > 0 {
-               err = pe.Join(err, <-errChan)
+       {
+               w := io.MultiWriter(ws...)
+               for {
+                       n, e := resReadCloser.Read(t.copyResBuf)
+                       if n != 0 {
+                               n, e := w.Write(t.copyResBuf[:n])
+                               if n == 0 && e != nil {
+                                       if !errors.Is(e, io.EOF) {
+                                               err = pe.Join(err, e)
+                                       }
+                                       break
+                               }
+                       } else if e != nil {
+                               if !errors.Is(e, io.EOF) {
+                                       err = pe.Join(err, e)
+                               }
+                               break
+                       }
+               }
        }
 
        if t.responBuf != nil {
@@ -304,10 +291,9 @@ func (t *Req) Wait() (err error) {
 }
 
 func (t *Req) Close() { t.Cancel() }
+
+// Deprecated: use rval.Ctx.Cancle
 func (t *Req) Cancel() {
-       if p := t.cancelP.Load(); p != nil {
-               (*p)()
-       }
 }
 
 func (t *Req) IsLive() bool {
@@ -330,14 +316,14 @@ func (t *Req) prepareRes(ctx context.Context, val *Rval) (ctx1 context.Context,
        t.err = nil
 
        if val.Timeout > 0 {
-               ctx1, ctxf1 = context.WithTimeout(ctx, time.Duration(val.Timeout*int(time.Millisecond)))
+               ctx1, ctxf1 = context.WithTimeout(ctx, time.Duration(val.Timeout)*time.Millisecond)
        } else {
                ctx1, ctxf1 = context.WithCancel(ctx)
        }
        return
 }
 
-func (t *Req) prepare(val *Rval) (ctx context.Context, cancel context.CancelFunc) {
+func (t *Req) prepare(val *Rval) {
        t.UsedTime = 0
        t.responFile = nil
        t.callTree = ""
@@ -349,29 +335,21 @@ func (t *Req) prepare(val *Rval) (ctx context.Context, cancel context.CancelFunc
                }
        }
        if cap(t.copyResBuf) == 0 {
-               t.copyResBuf = make([]byte, 1<<17)
-       }
-       if val.Ctx != nil {
-               ctx, cancel = context.WithCancel(val.Ctx)
+               t.copyResBuf = make([]byte, humanize.KByte*4)
        } else {
-               ctx, cancel = context.WithCancel(context.Background())
+               t.copyResBuf = t.copyResBuf[:cap(t.copyResBuf)]
+       }
+       if t.client == nil {
+               t.client = &http.Client{}
+       }
+       if val.Ctx == nil {
+               val.Ctx = context.Background()
        }
-
        if val.SaveToPipe != nil {
-               go func() {
-                       <-ctx.Done()
-                       if e := val.SaveToPipe.CloseWithError(context.Canceled); e != nil {
-                               println(e)
-                       }
-               }()
+               val.SaveToPipe.WithCtx(val.Ctx)
        }
        if val.RawPipe != nil {
-               go func() {
-                       <-ctx.Done()
-                       if e := val.RawPipe.ResCloseWithError(context.Canceled); e != nil {
-                               println(e)
-                       }
-               }()
+               val.RawPipe.WithCtx(val.Ctx)
        }
 
        if val.Method == "" {
@@ -383,8 +361,6 @@ func (t *Req) prepare(val *Rval) (ctx context.Context, cancel context.CancelFunc
                        val.Method = "GET"
                }
        }
-
-       return
 }
 
 func (t *Req) clean(val *Rval) {
index 7c0e802cedeed4a44a17a4e0ad0fcbeaad79bbbb..654f73491961b8c00dd460e22121d97fcbe9949c 100644 (file)
@@ -19,6 +19,8 @@ import (
 
 var addr = "127.0.0.1:10001"
 
+var reuse = New()
+
 func init() {
        s := web.New(&http.Server{
                Addr:         addr,
@@ -105,8 +107,53 @@ func init() {
                `/exit`: func(_ http.ResponseWriter, _ *http.Request) {
                        s.Server.Shutdown(context.Background())
                },
+               `/header`: func(w http.ResponseWriter, r *http.Request) {
+                       for k, v := range r.Header {
+                               w.Header().Set(k, v[0])
+                       }
+               },
        })
        time.Sleep(time.Second)
+       reuse.Reqf(Rval{
+               Url: "http://" + addr + "/no",
+       })
+}
+
+func Test_6(t *testing.T) {
+       reuse.Reqf(Rval{
+               Url: "http://" + addr + "/header",
+               Header: map[string]string{
+                       `I`: `1`,
+               },
+       })
+       if reuse.Response.Header.Get(`I`) != `1` {
+               t.Fail()
+       }
+}
+
+// go test -timeout 30s -run ^Test_reuse$ github.com/qydysky/part/reqf -race -count=1 -v -memprofile mem.out
+func Test_reuse(t *testing.T) {
+       reuse.Reqf(Rval{
+               Url: "http://" + addr + "/no",
+       })
+       if !bytes.Equal(reuse.Respon, []byte("abc强强强强")) {
+               t.Fail()
+       }
+}
+
+// 2710            430080 ns/op            9896 B/op        111 allocs/op
+func Benchmark(b *testing.B) {
+       rval := Rval{
+               Url: "http://" + addr + "/no",
+       }
+
+       b.ResetTimer()
+       for i := 0; i < b.N; i++ {
+               reuse.Reqf(rval)
+               if !bytes.Equal(reuse.Respon, []byte("abc强强强强")) {
+                       b.Fail()
+               }
+       }
 }
 
 func Test14(t *testing.T) {
@@ -119,7 +166,7 @@ func Test14(t *testing.T) {
                Url:         "http://" + addr + "/stream",
                Ctx:         ctx,
                NoResponse:  true,
-               SaveToPipe:  &pio.IOpipe{R: i, W: o},
+               SaveToPipe:  pio.NewPipeRaw(i, o),
                Async:       true,
                WriteLoopTO: 5*1000*2 + 1,
        }); e != nil {
@@ -167,12 +214,14 @@ func Test_req13(t *testing.T) {
 }
 
 func Test_req7(t *testing.T) {
+       ctx, ctxc := context.WithCancel(context.Background())
        r := New()
        r.Reqf(Rval{
+               Ctx:   ctx,
                Url:   "http://" + addr + "/to",
                Async: true,
        })
-       r.Cancel()
+       ctxc()
        if !IsCancel(r.Wait()) {
                t.Error("async Cancel fail")
        }
@@ -288,14 +337,16 @@ func Test_req4(t *testing.T) {
 // }
 
 func Test_req11(t *testing.T) {
+       ctx, ctxc := context.WithCancel(context.Background())
        r := New()
        {
                timer := time.NewTimer(time.Second)
                go func() {
                        <-timer.C
-                       r.Cancel()
+                       ctxc()
                }()
                e := r.Reqf(Rval{
+                       Ctx: ctx,
                        Url: "http://" + addr + "/to",
                })
                if !IsCancel(e) {
@@ -319,7 +370,7 @@ func Test_req9(t *testing.T) {
                }()
                r.Reqf(Rval{
                        Url:        "http://" + addr + "/1min",
-                       SaveToPipe: &pio.IOpipe{R: rc, W: wc},
+                       SaveToPipe: pio.NewPipeRaw(rc, wc),
                        Async:      true,
                })
                if r.Wait() != nil {
@@ -329,6 +380,7 @@ func Test_req9(t *testing.T) {
 }
 
 func Test_req8(t *testing.T) {
+       ctx, ctxc := context.WithCancel(context.Background())
        r := New()
        {
                rc, wc := io.Pipe()
@@ -336,11 +388,12 @@ func Test_req8(t *testing.T) {
                        var buf []byte = make([]byte, 1<<16)
                        _, _ = rc.Read(buf)
                        time.Sleep(time.Millisecond * 500)
-                       r.Cancel()
+                       ctxc()
                }()
                r.Reqf(Rval{
+                       Ctx:        ctx,
                        Url:        "http://" + addr + "/1min",
-                       SaveToPipe: &pio.IOpipe{R: rc, W: wc},
+                       SaveToPipe: pio.NewPipeRaw(rc, wc),
                        Async:      true,
                })
                if !IsCancel(r.Wait()) {
@@ -385,7 +438,7 @@ func Test_req3(t *testing.T) {
                }()
                r.Reqf(Rval{
                        Url:        "http://" + addr + "/br",
-                       SaveToPipe: &pio.IOpipe{R: rc, W: wc},
+                       SaveToPipe: pio.NewPipeRaw(rc, wc),
                        Async:      true,
                })
                <-c
@@ -402,7 +455,7 @@ func Test_req3(t *testing.T) {
                }()
                r.Reqf(Rval{
                        Url:        "http://" + addr + "/gzip",
-                       SaveToPipe: &pio.IOpipe{R: rc, W: wc},
+                       SaveToPipe: pio.NewPipeRaw(rc, wc),
                        Async:      true,
                })
                <-c
@@ -419,7 +472,7 @@ func Test_req3(t *testing.T) {
                }()
                if e := r.Reqf(Rval{
                        Url:        "http://" + addr + "/flate",
-                       SaveToPipe: &pio.IOpipe{R: rc, W: wc},
+                       SaveToPipe: pio.NewPipeRaw(rc, wc),
                }); e != nil {
                        t.Error(e)
                }
@@ -440,7 +493,7 @@ func Test_req3(t *testing.T) {
                }()
                r.Reqf(Rval{
                        Url:        "http://" + addr + "/flate",
-                       SaveToPipe: &pio.IOpipe{R: rc, W: wc},
+                       SaveToPipe: pio.NewPipeRaw(rc, wc),
                        Async:      true,
                })
                <-c
@@ -470,7 +523,7 @@ func Test_req3(t *testing.T) {
                }()
                r.Reqf(Rval{
                        Url:        "http://" + addr + "/flate",
-                       SaveToPipe: &pio.IOpipe{R: rc, W: wc},
+                       SaveToPipe: pio.NewPipeRaw(rc, wc),
                        NoResponse: true,
                        Async:      true,
                })
index cd53ec4dc8b67f45c11f1d42a0ffa61b227e93c2..1d74ba49a9c05844d5233d63d9edd52ef37b6bcb 100644 (file)
@@ -93,7 +93,7 @@ func Test_Mod(t *testing.T) {
                        },
                })
                if r.Response.StatusCode != http.StatusNotModified {
-                       t.Fatal(r.Respon)
+                       t.Fatal(string(r.Respon))
                }
        }
        time.Sleep(time.Second)
@@ -622,7 +622,7 @@ func Test_ClientBlock(t *testing.T) {
                }()
                r.Reqf(reqf.Rval{
                        Url:         "http://127.0.0.1:13001/to",
-                       SaveToPipe:  &pio.IOpipe{R: rc, W: wc},
+                       SaveToPipe:  pio.NewPipeRaw(rc, wc),
                        WriteLoopTO: 5000,
                        Async:       true,
                })