]> 127.0.0.1 Git - part/.git/commitdiff
add v0.28.0+202307191cd4290
authorqydysky <qydysky@foxmail.com>
Wed, 19 Jul 2023 07:08:24 +0000 (15:08 +0800)
committerqydysky <qydysky@foxmail.com>
Wed, 19 Jul 2023 07:08:24 +0000 (15:08 +0800)
io/io.go
reqf/Reqf.go
reqf/Reqf_test.go
web/Web_test.go

index 96f5a9eae6a00af56684c0f1f3925bee6538c9f3..6382348e11e974a891c56625ab1c4489110fb738 100644 (file)
--- a/io/io.go
+++ b/io/io.go
@@ -5,6 +5,7 @@ import (
        "errors"
        "fmt"
        "io"
+       "sync"
        "sync/atomic"
        "time"
 )
@@ -48,6 +49,76 @@ func RW2Chan(r io.ReadCloser, w io.WriteCloser) (rc, wc chan []byte) {
        return
 }
 
+func NewPipe() *IOpipe {
+       r, w := io.Pipe()
+       return &IOpipe{R: r, W: w}
+}
+
+type onceError struct {
+       sync.Mutex // guards following
+       err        error
+}
+
+func (a *onceError) Store(err error) {
+       a.Lock()
+       defer a.Unlock()
+       if a.err != nil {
+               return
+       }
+       a.err = err
+}
+func (a *onceError) Load() error {
+       a.Lock()
+       defer a.Unlock()
+       return a.err
+}
+
+type IOpipe struct {
+       R  *io.PipeReader
+       W  *io.PipeWriter
+       re onceError
+       we onceError
+}
+
+func (t *IOpipe) Write(p []byte) (n int, err error) {
+       if t.W != nil {
+               n, err = t.W.Write(p)
+               if errors.Is(err, io.ErrClosedPipe) {
+                       err = errors.Join(err, t.we.Load())
+               }
+       }
+       return
+}
+func (t *IOpipe) Read(p []byte) (n int, err error) {
+       if t.R != nil {
+               n, err = t.R.Read(p)
+               if errors.Is(err, io.ErrClosedPipe) {
+                       err = errors.Join(err, t.re.Load())
+               }
+       }
+       return
+}
+func (t *IOpipe) Close() (err error) {
+       if t.W != nil {
+               err = errors.Join(err, t.W.Close())
+       }
+       if t.R != nil {
+               err = errors.Join(err, t.R.Close())
+       }
+       return
+}
+func (t *IOpipe) CloseWithError(e error) (err error) {
+       if t.W != nil {
+               t.we.Store(e)
+               err = errors.Join(err, t.W.CloseWithError(e))
+       }
+       if t.R != nil {
+               t.re.Store(e)
+               err = errors.Join(err, t.R.CloseWithError(e))
+       }
+       return
+}
+
 type RWC struct {
        R func(p []byte) (n int, err error)
        W func(p []byte) (n int, err error)
@@ -74,8 +145,11 @@ func (t RWC) Close() error {
 }
 
 // close reader by yourself
+//
+// to avoid writer block after ctx done, you should close writer after ctx done
+//
 // call Close() after writer fin
-func WithCtxTO(ctx context.Context, callTree string, to time.Duration, w []io.WriteCloser, r io.Reader, panicf ...func(s string)) io.ReadWriteCloser {
+func WithCtxTO(ctx context.Context, callTree string, to time.Duration, w []io.Writer, r io.Reader, panicf ...func(s string)) io.ReadWriteCloser {
        var chanw atomic.Int64
        chanw.Store(time.Now().Unix())
        if len(panicf) == 0 {
@@ -88,10 +162,6 @@ func WithCtxTO(ctx context.Context, callTree string, to time.Duration, w []io.Wr
                for {
                        select {
                        case <-ctx.Done():
-                               // avoid write block
-                               for i := 0; i < len(w); i++ {
-                                       w[i].Close()
-                               }
                                if old, now := chanw.Load(), time.Now(); old > 0 && now.Unix()-old > int64(to.Seconds()) {
                                        panicf[0](fmt.Sprintf("rw blocking while close %vs > %v, goruntime leak \n%v", now.Unix()-old, to, callTree))
                                } else {
@@ -129,10 +199,9 @@ func WithCtxTO(ctx context.Context, callTree string, to time.Duration, w []io.Wr
                                err = context.Canceled
                        default:
                                for i := 0; i < len(w); i++ {
-                                       if n, err = w[i].Write(p); n != 0 {
-                                               chanw.Store(time.Now().Unix())
-                                       }
+                                       _, err = w[i].Write(p)
                                }
+                               chanw.Store(time.Now().Unix())
                        }
                        return
                },
@@ -149,7 +218,11 @@ var (
 )
 
 // close reader by yourself
-func WithCtxCopy(ctx context.Context, callTree string, to time.Duration, w []io.WriteCloser, r io.Reader, panicf ...func(s string)) error {
+//
+// to avoid writer block after ctx done, you should close writer after ctx done
+//
+// call Close() after writer fin
+func WithCtxCopy(ctx context.Context, callTree string, to time.Duration, w []io.Writer, r io.Reader, panicf ...func(s string)) error {
        rwc := WithCtxTO(ctx, callTree, to, w, r)
        defer rwc.Close()
        for buf := make([]byte, 2048); true; {
index b6cde2fb13131a1a1adb732a8bb42ac2543c6481..b4a98364fa5deb3c227e524597194a70542137b6 100644 (file)
@@ -40,8 +40,9 @@ type Rval struct {
        Cookies []*http.Cookie
        Ctx     context.Context
 
-       SaveToPath       string
-       SaveToPipeWriter *io.PipeWriter
+       SaveToPath string
+       // 为避免write阻塞导致panic,请使用此项目io包中的NewPipe(),或在ctx done时,自行关闭pipe writer reader
+       SaveToPipe *pio.IOpipe
 
        Header map[string]string
 }
@@ -196,7 +197,7 @@ func (t *Req) Reqf_1(ctx context.Context, val Rval) (err error) {
        if _, ok := Header["Accept-Encoding"]; !ok {
                Header["Accept-Encoding"] = "gzip, deflate, br"
        }
-       if val.SaveToPath != "" || val.SaveToPipeWriter != nil {
+       if val.SaveToPath != "" || val.SaveToPipe != nil {
                Header["Accept-Encoding"] = "identity"
        }
        if _, ok := Header["User-Agent"]; !ok {
@@ -227,7 +228,7 @@ func (t *Req) Reqf_1(ctx context.Context, val Rval) (err error) {
                err = fmt.Errorf("%d %s", resp.StatusCode, http.StatusText(resp.StatusCode))
        }
 
-       var ws []io.WriteCloser
+       var ws []io.Writer
        if val.SaveToPath != "" {
                t.responFile, e = os.Create(val.SaveToPath)
                if e != nil {
@@ -236,13 +237,11 @@ func (t *Req) Reqf_1(ctx context.Context, val Rval) (err error) {
                }
                ws = append(ws, t.responFile)
        }
-       if val.SaveToPipeWriter != nil {
-               ws = append(ws, pio.RWC{W: val.SaveToPipeWriter.Write, C: func() error { return val.SaveToPipeWriter.CloseWithError(context.Canceled) }})
+       if val.SaveToPipe != nil {
+               ws = append(ws, val.SaveToPipe)
        }
        if !val.NoResponse {
-               //will clear t.Respon
-               t.responBuf.Reset()
-               ws = append(ws, pio.RWC{W: t.responBuf.Write, C: func() error { return nil }})
+               ws = append(ws, t.responBuf)
        }
 
        var resReadCloser = resp.Body
@@ -293,7 +292,7 @@ func (t *Req) IsLive() bool {
        return t.state.Load() == running
 }
 
-func (t *Req) prepareRes(ctx context.Context, val *Rval) (context.Context, context.CancelFunc) {
+func (t *Req) prepareRes(ctx context.Context, val *Rval) (ctx1 context.Context, ctxf1 context.CancelFunc) {
        if !val.NoResponse {
                if t.responBuf == nil {
                        t.responBuf = new(bytes.Buffer)
@@ -307,10 +306,13 @@ func (t *Req) prepareRes(ctx context.Context, val *Rval) (context.Context, conte
        }
        t.Response = nil
        t.err = nil
+
        if val.Timeout > 0 {
-               return context.WithTimeout(ctx, time.Duration(val.Timeout*int(time.Millisecond)))
+               ctx1, ctxf1 = context.WithTimeout(ctx, time.Duration(val.Timeout*int(time.Millisecond)))
+       } else {
+               ctx1, ctxf1 = context.WithCancel(ctx)
        }
-       return context.WithCancel(ctx)
+       return
 }
 
 func (t *Req) prepare(val *Rval) (ctx context.Context, cancel context.CancelFunc) {
@@ -325,18 +327,29 @@ func (t *Req) prepare(val *Rval) (ctx context.Context, cancel context.CancelFunc
                }
        }
        if val.Ctx != nil {
-               return context.WithCancel(val.Ctx)
+               ctx, cancel = context.WithCancel(val.Ctx)
        } else {
-               return context.WithCancel(context.Background())
+               ctx, cancel = context.WithCancel(context.Background())
+       }
+
+       if val.SaveToPipe != nil {
+               go func() {
+                       <-ctx.Done()
+                       if e := val.SaveToPipe.CloseWithError(context.Canceled); e != nil {
+                               println(e)
+                       }
+               }()
        }
+
+       return
 }
 
 func (t *Req) clean(val *Rval) {
        if t.responFile != nil {
                t.responFile.Close()
        }
-       if val.SaveToPipeWriter != nil {
-               val.SaveToPipeWriter.Close()
+       if val.SaveToPipe != nil {
+               val.SaveToPipe.Close()
        }
 }
 
index a52e7ec8ebdd97ba73257a5f5eebad87abda28d2..6e5c05547857319141341e7932ad916cfe5eb838 100644 (file)
@@ -13,6 +13,7 @@ import (
        "time"
 
        compress "github.com/qydysky/part/compress"
+       pio "github.com/qydysky/part/io"
        web "github.com/qydysky/part/web"
 )
 
@@ -112,12 +113,12 @@ func Test14(t *testing.T) {
 
        r := New()
        if e := r.Reqf(Rval{
-               Url:              "http://" + addr + "/stream",
-               Ctx:              ctx,
-               NoResponse:       true,
-               SaveToPipeWriter: o,
-               Async:            true,
-               WriteLoopTO:      5*1000*2 + 1,
+               Url:         "http://" + addr + "/stream",
+               Ctx:         ctx,
+               NoResponse:  true,
+               SaveToPipe:  &pio.IOpipe{R: i, W: o},
+               Async:       true,
+               WriteLoopTO: 5*1000*2 + 1,
        }); e != nil {
                t.Log(e)
        }
@@ -314,9 +315,9 @@ func Test_req9(t *testing.T) {
                        }
                }()
                r.Reqf(Rval{
-                       Url:              "http://" + addr + "/1min",
-                       SaveToPipeWriter: wc,
-                       Async:            true,
+                       Url:        "http://" + addr + "/1min",
+                       SaveToPipe: &pio.IOpipe{R: rc, W: wc},
+                       Async:      true,
                })
                if r.Wait() != nil {
                        t.Fatal()
@@ -335,9 +336,9 @@ func Test_req8(t *testing.T) {
                        r.Cancel()
                }()
                r.Reqf(Rval{
-                       Url:              "http://" + addr + "/1min",
-                       SaveToPipeWriter: wc,
-                       Async:            true,
+                       Url:        "http://" + addr + "/1min",
+                       SaveToPipe: &pio.IOpipe{R: rc, W: wc},
+                       Async:      true,
                })
                if !IsCancel(r.Wait()) {
                        t.Fatal("read from block response")
@@ -357,7 +358,7 @@ func Test_req10(t *testing.T) {
                }()
                r.Reqf(Rval{
                        Url:              "http://" + addr + "/1min",
-                       SaveToPipeWriter: wc,
+                       SaveToPipe: wc,
                        Async:            true,
                })
                if !IsCancel(r.Wait()) {
@@ -380,9 +381,9 @@ func Test_req3(t *testing.T) {
                        close(c)
                }()
                r.Reqf(Rval{
-                       Url:              "http://" + addr + "/br",
-                       SaveToPipeWriter: wc,
-                       Async:            true,
+                       Url:        "http://" + addr + "/br",
+                       SaveToPipe: &pio.IOpipe{R: rc, W: wc},
+                       Async:      true,
                })
                <-c
        }
@@ -397,9 +398,9 @@ func Test_req3(t *testing.T) {
                        close(c)
                }()
                r.Reqf(Rval{
-                       Url:              "http://" + addr + "/gzip",
-                       SaveToPipeWriter: wc,
-                       Async:            true,
+                       Url:        "http://" + addr + "/gzip",
+                       SaveToPipe: &pio.IOpipe{R: rc, W: wc},
+                       Async:      true,
                })
                <-c
        }
@@ -414,8 +415,8 @@ func Test_req3(t *testing.T) {
                        close(c)
                }()
                r.Reqf(Rval{
-                       Url:              "http://" + addr + "/flate",
-                       SaveToPipeWriter: wc,
+                       Url:        "http://" + addr + "/flate",
+                       SaveToPipe: &pio.IOpipe{R: rc, W: wc},
                })
                <-c
        }
@@ -433,9 +434,9 @@ func Test_req3(t *testing.T) {
                        close(c)
                }()
                r.Reqf(Rval{
-                       Url:              "http://" + addr + "/flate",
-                       SaveToPipeWriter: wc,
-                       Async:            true,
+                       Url:        "http://" + addr + "/flate",
+                       SaveToPipe: &pio.IOpipe{R: rc, W: wc},
+                       Async:      true,
                })
                <-c
        }
@@ -463,10 +464,10 @@ func Test_req3(t *testing.T) {
                        wg.Done()
                }()
                r.Reqf(Rval{
-                       Url:              "http://" + addr + "/flate",
-                       SaveToPipeWriter: wc,
-                       NoResponse:       true,
-                       Async:            true,
+                       Url:        "http://" + addr + "/flate",
+                       SaveToPipe: &pio.IOpipe{R: rc, W: wc},
+                       NoResponse: true,
+                       Async:      true,
                })
                r.Wait()
                if len(r.Respon) != 0 {
index 6149dc9c7d216dd2746fbb0071564f85bb1c819a..40e327aa34fa2c09e4d9bb214927515ae3a5bd9d 100644 (file)
@@ -85,7 +85,7 @@ func Test_ClientBlock(t *testing.T) {
 
        m.Store("/to", func(w http.ResponseWriter, r *http.Request) {
                rwc := pio.WithCtxTO(r.Context(), fmt.Sprintf("server handle %v by %v ", r.URL.Path, r.RemoteAddr), time.Second,
-                       []io.WriteCloser{pio.RWC{W: w.Write}}, r.Body, func(s string) {
+                       []io.Writer{w}, r.Body, func(s string) {
                                fmt.Println(s)
                                if !strings.Contains(s, "write blocking after rw 2s > 1s, goruntime leak") {
                                        t.Fatal(s)
@@ -125,10 +125,10 @@ func Test_ClientBlock(t *testing.T) {
                        close(c)
                }()
                r.Reqf(reqf.Rval{
-                       Url:              "http://127.0.0.1:10000/to",
-                       SaveToPipeWriter: wc,
-                       WriteLoopTO:      5000,
-                       Async:            true,
+                       Url:         "http://127.0.0.1:10000/to",
+                       SaveToPipe:  &pio.IOpipe{R: rc, W: wc},
+                       WriteLoopTO: 5000,
+                       Async:       true,
                })
                <-c
        }