From 1cd4290c29fe80f15c9434d9ceb3f018c51d9e70 Mon Sep 17 00:00:00 2001 From: qydysky Date: Wed, 19 Jul 2023 15:08:24 +0800 Subject: [PATCH] add --- io/io.go | 91 ++++++++++++++++++++++++++++++++++++++++++----- reqf/Reqf.go | 45 ++++++++++++++--------- reqf/Reqf_test.go | 57 ++++++++++++++--------------- web/Web_test.go | 10 +++--- 4 files changed, 145 insertions(+), 58 deletions(-) diff --git a/io/io.go b/io/io.go index 96f5a9e..6382348 100644 --- a/io/io.go +++ b/io/io.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "io" + "sync" "sync/atomic" "time" ) @@ -48,6 +49,76 @@ func RW2Chan(r io.ReadCloser, w io.WriteCloser) (rc, wc chan []byte) { return } +func NewPipe() *IOpipe { + r, w := io.Pipe() + return &IOpipe{R: r, W: w} +} + +type onceError struct { + sync.Mutex // guards following + err error +} + +func (a *onceError) Store(err error) { + a.Lock() + defer a.Unlock() + if a.err != nil { + return + } + a.err = err +} +func (a *onceError) Load() error { + a.Lock() + defer a.Unlock() + return a.err +} + +type IOpipe struct { + R *io.PipeReader + W *io.PipeWriter + re onceError + we onceError +} + +func (t *IOpipe) Write(p []byte) (n int, err error) { + if t.W != nil { + n, err = t.W.Write(p) + if errors.Is(err, io.ErrClosedPipe) { + err = errors.Join(err, t.we.Load()) + } + } + return +} +func (t *IOpipe) Read(p []byte) (n int, err error) { + if t.R != nil { + n, err = t.R.Read(p) + if errors.Is(err, io.ErrClosedPipe) { + err = errors.Join(err, t.re.Load()) + } + } + return +} +func (t *IOpipe) Close() (err error) { + if t.W != nil { + err = errors.Join(err, t.W.Close()) + } + if t.R != nil { + err = errors.Join(err, t.R.Close()) + } + return +} +func (t *IOpipe) CloseWithError(e error) (err error) { + if t.W != nil { + t.we.Store(e) + err = errors.Join(err, t.W.CloseWithError(e)) + } + if t.R != nil { + t.re.Store(e) + err = errors.Join(err, t.R.CloseWithError(e)) + } + return +} + type RWC struct { R func(p []byte) (n int, err error) W func(p []byte) (n int, err error) @@ -74,8 +145,11 @@ func (t RWC) Close() error { } // close reader by yourself +// +// to avoid writer block after ctx done, you should close writer after ctx done +// // call Close() after writer fin -func WithCtxTO(ctx context.Context, callTree string, to time.Duration, w []io.WriteCloser, r io.Reader, panicf ...func(s string)) io.ReadWriteCloser { +func WithCtxTO(ctx context.Context, callTree string, to time.Duration, w []io.Writer, r io.Reader, panicf ...func(s string)) io.ReadWriteCloser { var chanw atomic.Int64 chanw.Store(time.Now().Unix()) if len(panicf) == 0 { @@ -88,10 +162,6 @@ func WithCtxTO(ctx context.Context, callTree string, to time.Duration, w []io.Wr for { select { case <-ctx.Done(): - // avoid write block - for i := 0; i < len(w); i++ { - w[i].Close() - } if old, now := chanw.Load(), time.Now(); old > 0 && now.Unix()-old > int64(to.Seconds()) { panicf[0](fmt.Sprintf("rw blocking while close %vs > %v, goruntime leak \n%v", now.Unix()-old, to, callTree)) } else { @@ -129,10 +199,9 @@ func WithCtxTO(ctx context.Context, callTree string, to time.Duration, w []io.Wr err = context.Canceled default: for i := 0; i < len(w); i++ { - if n, err = w[i].Write(p); n != 0 { - chanw.Store(time.Now().Unix()) - } + _, err = w[i].Write(p) } + chanw.Store(time.Now().Unix()) } return }, @@ -149,7 +218,11 @@ var ( ) // close reader by yourself -func WithCtxCopy(ctx context.Context, callTree string, to time.Duration, w []io.WriteCloser, r io.Reader, panicf ...func(s string)) error { +// +// to avoid writer block after ctx done, you should close writer after ctx done +// +// call Close() after writer fin +func WithCtxCopy(ctx context.Context, callTree string, to time.Duration, w []io.Writer, r io.Reader, panicf ...func(s string)) error { rwc := WithCtxTO(ctx, callTree, to, w, r) defer rwc.Close() for buf := make([]byte, 2048); true; { diff --git a/reqf/Reqf.go b/reqf/Reqf.go index b6cde2f..b4a9836 100644 --- a/reqf/Reqf.go +++ b/reqf/Reqf.go @@ -40,8 +40,9 @@ type Rval struct { Cookies []*http.Cookie Ctx context.Context - SaveToPath string - SaveToPipeWriter *io.PipeWriter + SaveToPath string + // 为避免write阻塞导致panic,请使用此项目io包中的NewPipe(),或在ctx done时,自行关闭pipe writer reader + SaveToPipe *pio.IOpipe Header map[string]string } @@ -196,7 +197,7 @@ func (t *Req) Reqf_1(ctx context.Context, val Rval) (err error) { if _, ok := Header["Accept-Encoding"]; !ok { Header["Accept-Encoding"] = "gzip, deflate, br" } - if val.SaveToPath != "" || val.SaveToPipeWriter != nil { + if val.SaveToPath != "" || val.SaveToPipe != nil { Header["Accept-Encoding"] = "identity" } if _, ok := Header["User-Agent"]; !ok { @@ -227,7 +228,7 @@ func (t *Req) Reqf_1(ctx context.Context, val Rval) (err error) { err = fmt.Errorf("%d %s", resp.StatusCode, http.StatusText(resp.StatusCode)) } - var ws []io.WriteCloser + var ws []io.Writer if val.SaveToPath != "" { t.responFile, e = os.Create(val.SaveToPath) if e != nil { @@ -236,13 +237,11 @@ func (t *Req) Reqf_1(ctx context.Context, val Rval) (err error) { } ws = append(ws, t.responFile) } - if val.SaveToPipeWriter != nil { - ws = append(ws, pio.RWC{W: val.SaveToPipeWriter.Write, C: func() error { return val.SaveToPipeWriter.CloseWithError(context.Canceled) }}) + if val.SaveToPipe != nil { + ws = append(ws, val.SaveToPipe) } if !val.NoResponse { - //will clear t.Respon - t.responBuf.Reset() - ws = append(ws, pio.RWC{W: t.responBuf.Write, C: func() error { return nil }}) + ws = append(ws, t.responBuf) } var resReadCloser = resp.Body @@ -293,7 +292,7 @@ func (t *Req) IsLive() bool { return t.state.Load() == running } -func (t *Req) prepareRes(ctx context.Context, val *Rval) (context.Context, context.CancelFunc) { +func (t *Req) prepareRes(ctx context.Context, val *Rval) (ctx1 context.Context, ctxf1 context.CancelFunc) { if !val.NoResponse { if t.responBuf == nil { t.responBuf = new(bytes.Buffer) @@ -307,10 +306,13 @@ func (t *Req) prepareRes(ctx context.Context, val *Rval) (context.Context, conte } t.Response = nil t.err = nil + if val.Timeout > 0 { - return context.WithTimeout(ctx, time.Duration(val.Timeout*int(time.Millisecond))) + ctx1, ctxf1 = context.WithTimeout(ctx, time.Duration(val.Timeout*int(time.Millisecond))) + } else { + ctx1, ctxf1 = context.WithCancel(ctx) } - return context.WithCancel(ctx) + return } func (t *Req) prepare(val *Rval) (ctx context.Context, cancel context.CancelFunc) { @@ -325,18 +327,29 @@ func (t *Req) prepare(val *Rval) (ctx context.Context, cancel context.CancelFunc } } if val.Ctx != nil { - return context.WithCancel(val.Ctx) + ctx, cancel = context.WithCancel(val.Ctx) } else { - return context.WithCancel(context.Background()) + ctx, cancel = context.WithCancel(context.Background()) + } + + if val.SaveToPipe != nil { + go func() { + <-ctx.Done() + if e := val.SaveToPipe.CloseWithError(context.Canceled); e != nil { + println(e) + } + }() } + + return } func (t *Req) clean(val *Rval) { if t.responFile != nil { t.responFile.Close() } - if val.SaveToPipeWriter != nil { - val.SaveToPipeWriter.Close() + if val.SaveToPipe != nil { + val.SaveToPipe.Close() } } diff --git a/reqf/Reqf_test.go b/reqf/Reqf_test.go index a52e7ec..6e5c055 100644 --- a/reqf/Reqf_test.go +++ b/reqf/Reqf_test.go @@ -13,6 +13,7 @@ import ( "time" compress "github.com/qydysky/part/compress" + pio "github.com/qydysky/part/io" web "github.com/qydysky/part/web" ) @@ -112,12 +113,12 @@ func Test14(t *testing.T) { r := New() if e := r.Reqf(Rval{ - Url: "http://" + addr + "/stream", - Ctx: ctx, - NoResponse: true, - SaveToPipeWriter: o, - Async: true, - WriteLoopTO: 5*1000*2 + 1, + Url: "http://" + addr + "/stream", + Ctx: ctx, + NoResponse: true, + SaveToPipe: &pio.IOpipe{R: i, W: o}, + Async: true, + WriteLoopTO: 5*1000*2 + 1, }); e != nil { t.Log(e) } @@ -314,9 +315,9 @@ func Test_req9(t *testing.T) { } }() r.Reqf(Rval{ - Url: "http://" + addr + "/1min", - SaveToPipeWriter: wc, - Async: true, + Url: "http://" + addr + "/1min", + SaveToPipe: &pio.IOpipe{R: rc, W: wc}, + Async: true, }) if r.Wait() != nil { t.Fatal() @@ -335,9 +336,9 @@ func Test_req8(t *testing.T) { r.Cancel() }() r.Reqf(Rval{ - Url: "http://" + addr + "/1min", - SaveToPipeWriter: wc, - Async: true, + Url: "http://" + addr + "/1min", + SaveToPipe: &pio.IOpipe{R: rc, W: wc}, + Async: true, }) if !IsCancel(r.Wait()) { t.Fatal("read from block response") @@ -357,7 +358,7 @@ func Test_req10(t *testing.T) { }() r.Reqf(Rval{ Url: "http://" + addr + "/1min", - SaveToPipeWriter: wc, + SaveToPipe: wc, Async: true, }) if !IsCancel(r.Wait()) { @@ -380,9 +381,9 @@ func Test_req3(t *testing.T) { close(c) }() r.Reqf(Rval{ - Url: "http://" + addr + "/br", - SaveToPipeWriter: wc, - Async: true, + Url: "http://" + addr + "/br", + SaveToPipe: &pio.IOpipe{R: rc, W: wc}, + Async: true, }) <-c } @@ -397,9 +398,9 @@ func Test_req3(t *testing.T) { close(c) }() r.Reqf(Rval{ - Url: "http://" + addr + "/gzip", - SaveToPipeWriter: wc, - Async: true, + Url: "http://" + addr + "/gzip", + SaveToPipe: &pio.IOpipe{R: rc, W: wc}, + Async: true, }) <-c } @@ -414,8 +415,8 @@ func Test_req3(t *testing.T) { close(c) }() r.Reqf(Rval{ - Url: "http://" + addr + "/flate", - SaveToPipeWriter: wc, + Url: "http://" + addr + "/flate", + SaveToPipe: &pio.IOpipe{R: rc, W: wc}, }) <-c } @@ -433,9 +434,9 @@ func Test_req3(t *testing.T) { close(c) }() r.Reqf(Rval{ - Url: "http://" + addr + "/flate", - SaveToPipeWriter: wc, - Async: true, + Url: "http://" + addr + "/flate", + SaveToPipe: &pio.IOpipe{R: rc, W: wc}, + Async: true, }) <-c } @@ -463,10 +464,10 @@ func Test_req3(t *testing.T) { wg.Done() }() r.Reqf(Rval{ - Url: "http://" + addr + "/flate", - SaveToPipeWriter: wc, - NoResponse: true, - Async: true, + Url: "http://" + addr + "/flate", + SaveToPipe: &pio.IOpipe{R: rc, W: wc}, + NoResponse: true, + Async: true, }) r.Wait() if len(r.Respon) != 0 { diff --git a/web/Web_test.go b/web/Web_test.go index 6149dc9..40e327a 100644 --- a/web/Web_test.go +++ b/web/Web_test.go @@ -85,7 +85,7 @@ func Test_ClientBlock(t *testing.T) { m.Store("/to", func(w http.ResponseWriter, r *http.Request) { rwc := pio.WithCtxTO(r.Context(), fmt.Sprintf("server handle %v by %v ", r.URL.Path, r.RemoteAddr), time.Second, - []io.WriteCloser{pio.RWC{W: w.Write}}, r.Body, func(s string) { + []io.Writer{w}, r.Body, func(s string) { fmt.Println(s) if !strings.Contains(s, "write blocking after rw 2s > 1s, goruntime leak") { t.Fatal(s) @@ -125,10 +125,10 @@ func Test_ClientBlock(t *testing.T) { close(c) }() r.Reqf(reqf.Rval{ - Url: "http://127.0.0.1:10000/to", - SaveToPipeWriter: wc, - WriteLoopTO: 5000, - Async: true, + Url: "http://127.0.0.1:10000/to", + SaveToPipe: &pio.IOpipe{R: rc, W: wc}, + WriteLoopTO: 5000, + Async: true, }) <-c } -- 2.39.2