From: qydysky Date: Tue, 18 Jul 2023 17:55:08 +0000 (+0800) Subject: add X-Git-Tag: v0.28.0+202307186d5eeb5 X-Git-Url: http://127.0.0.1:8081/?a=commitdiff_plain;h=6d5eeb534544fb33a87cfa880ae6dea58b520f1f;p=part%2F.git add --- diff --git a/io/io.go b/io/io.go index ec8de7d..fe60b1a 100644 --- a/io/io.go +++ b/io/io.go @@ -54,16 +54,27 @@ type RWC struct { } func (t RWC) Write(p []byte) (n int, err error) { - return t.W(p) + if t.W != nil { + return t.W(p) + } + return 0, nil } func (t RWC) Read(p []byte) (n int, err error) { - return t.R(p) + if t.R != nil { + return t.R(p) + } + return 0, nil } func (t RWC) Close() error { - return t.C() + if t.C != nil { + return t.C() + } + return nil } -func WithCtxTO(ctx context.Context, callTree string, to time.Duration, w io.Writer, r io.Reader, panicf ...func(s string)) io.ReadWriteCloser { +// close reader by yourself +// call Close() after writer fin +func WithCtxTO(ctx context.Context, callTree string, to time.Duration, w []io.WriteCloser, r io.Reader, panicf ...func(s string)) io.ReadWriteCloser { var chanw atomic.Int64 chanw.Store(time.Now().Unix()) if len(panicf) == 0 { @@ -76,26 +87,22 @@ func WithCtxTO(ctx context.Context, callTree string, to time.Duration, w io.Writ for { select { case <-ctx.Done(): - if old := chanw.Load(); old == -1 { - return - } else if now := time.Now(); old > 0 && now.Unix()-old > int64(to.Seconds()) { - if old != 0 { - panicf[0](fmt.Sprintf("rw blocking while close %vs > %v, goruntime leak \n%v", now.Unix()-old, to, callTree)) - } + // avoid write block + for i := 0; i < len(w); i++ { + w[i].Close() + } + if old, now := chanw.Load(), time.Now(); old > 0 && now.Unix()-old > int64(to.Seconds()) { + panicf[0](fmt.Sprintf("rw blocking while close %vs > %v, goruntime leak \n%v", now.Unix()-old, to, callTree)) } else { time.AfterFunc(to, func() { - if old := chanw.Load(); old == -1 { - return - } else if now := time.Now(); old > 0 && now.Unix()-old > int64(to.Seconds()) { - panicf[0](fmt.Sprintf("rw blocking after close %vs > %v, goruntime leak \n%v", now.Unix()-old, to, callTree)) + if chanw.Load() != -1 { + panicf[0](fmt.Sprintf("rw blocking after close %v, goruntime leak \n%v", to, callTree)) } }) } return case now := <-timer.C: - if old := chanw.Load(); old == -1 { - return - } else if old > 0 && now.Unix()-old > int64(to.Seconds()) { + if old := chanw.Load(); old > 0 && now.Unix()-old > int64(to.Seconds()) { panicf[0](fmt.Sprintf("rw blocking after rw %vs > %v, goruntime leak \n%v", now.Unix()-old, to, callTree)) return } @@ -120,8 +127,10 @@ func WithCtxTO(ctx context.Context, callTree string, to time.Duration, w io.Writ case <-ctx.Done(): err = context.Canceled default: - if n, err = w.Write(p); n != 0 { - chanw.Store(time.Now().Unix()) + for i := 0; i < len(w); i++ { + if n, err = w[i].Write(p); n != 0 { + chanw.Store(time.Now().Unix()) + } } } return diff --git a/io/io_test.go b/io/io_test.go index dbef24f..2dce990 100644 --- a/io/io_test.go +++ b/io/io_test.go @@ -1,40 +1,51 @@ package part import ( - "testing" "io" + "testing" ) +func Test_rwc(t *testing.T) { + rwc := RWC{R: func(p []byte) (n int, err error) { return 1, nil }} + rwc.Close() +} + func Test_RW2Chan(t *testing.T) { { - r,w := io.Pipe() - _,rw := RW2Chan(nil,w) - - go func(){ - rw<-[]byte{0x01} + r, w := io.Pipe() + _, rw := RW2Chan(nil, w) + + go func() { + rw <- []byte{0x01} }() buf := make([]byte, 1<<16) - n,_:=r.Read(buf) - if buf[:n][0] != 1 {t.Error(`no`)} + n, _ := r.Read(buf) + if buf[:n][0] != 1 { + t.Error(`no`) + } } - + { - r,w := io.Pipe() - rc,_ := RW2Chan(r,nil) - - go func(){ + r, w := io.Pipe() + rc, _ := RW2Chan(r, nil) + + go func() { w.Write([]byte{0x09}) }() - if b:=<-rc;b[0] != 9 {t.Error(`no2`)} + if b := <-rc; b[0] != 9 { + t.Error(`no2`) + } } - + { - r,w := io.Pipe() - rc,rw := RW2Chan(r,w) - - go func(){ + r, w := io.Pipe() + rc, rw := RW2Chan(r, w) + + go func() { rw <- []byte{0x07} }() - if b:=<-rc;b[0] != 7 {t.Error(`no3`)} + if b := <-rc; b[0] != 7 { + t.Error(`no3`) + } } } diff --git a/reqf/Reqf.go b/reqf/Reqf.go index 0b22346..5326c1a 100644 --- a/reqf/Reqf.go +++ b/reqf/Reqf.go @@ -41,7 +41,6 @@ type Rval struct { Ctx context.Context SaveToPath string - SaveToChan chan []byte SaveToPipeWriter *io.PipeWriter Header map[string]string @@ -71,7 +70,6 @@ type Req struct { UsedTime time.Duration cancelP atomic.Pointer[context.CancelFunc] - ctx context.Context state atomic.Int32 responFile *os.File @@ -229,7 +227,7 @@ func (t *Req) Reqf_1(ctx context.Context, val Rval) (err error) { err = fmt.Errorf("%d %s", resp.StatusCode, http.StatusText(resp.StatusCode)) } - var ws []io.Writer + var ws []io.WriteCloser if val.SaveToPath != "" { t.responFile, e = os.Create(val.SaveToPath) if e != nil { @@ -239,16 +237,14 @@ func (t *Req) Reqf_1(ctx context.Context, val Rval) (err error) { ws = append(ws, t.responFile) } if val.SaveToPipeWriter != nil { - ws = append(ws, val.SaveToPipeWriter) + ws = append(ws, pio.RWC{W: val.SaveToPipeWriter.Write, C: func() error { return val.SaveToPipeWriter.CloseWithError(context.Canceled) }}) } if !val.NoResponse { //will clear t.Respon t.responBuf.Reset() - ws = append(ws, t.responBuf) + ws = append(ws, pio.RWC{W: t.responBuf.Write, C: func() error { return nil }}) } - w := io.MultiWriter(ws...) - var resReadCloser = resp.Body if compress_type := resp.Header[`Content-Encoding`]; len(compress_type) != 0 { switch compress_type[0] { @@ -266,16 +262,13 @@ func (t *Req) Reqf_1(ctx context.Context, val Rval) (err error) { if writeLoopTO == 0 { writeLoopTO = 1000 } - rwc := pio.WithCtxTO(req.Context(), t.callTree, time.Duration(int(time.Millisecond)*writeLoopTO), w, resReadCloser) + + rwc := pio.WithCtxTO(req.Context(), t.callTree, time.Duration(int(time.Millisecond)*writeLoopTO), ws, resReadCloser) defer rwc.Close() for buf := make([]byte, 2048); true; { if n, e := rwc.Read(buf); n != 0 { - if n, e := rwc.Write(buf[:n]); n != 0 { - if val.SaveToChan != nil { - val.SaveToChan <- buf[:n] - } - } else if e != nil { + if n, e := rwc.Write(buf[:n]); n == 0 && e != nil { if !errors.Is(e, io.EOF) { err = errors.Join(err, ErrWriteRes, e) } @@ -355,9 +348,6 @@ func (t *Req) prepare(val *Rval) (ctx context.Context, cancel context.CancelFunc } func (t *Req) clean(val *Rval) { - if val.SaveToChan != nil { - close(val.SaveToChan) - } if t.responFile != nil { t.responFile.Close() } diff --git a/reqf/Reqf_test.go b/reqf/Reqf_test.go index da4ef88..a52e7ec 100644 --- a/reqf/Reqf_test.go +++ b/reqf/Reqf_test.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "encoding/json" + "errors" "io" "net/http" "strconv" @@ -81,6 +82,22 @@ func init() { flusher.Flush() } }, + `/stream`: func(w http.ResponseWriter, r *http.Request) { + flusher, flushSupport := w.(http.Flusher) + if flushSupport { + flusher.Flush() + } + for { + select { + case <-r.Context().Done(): + println("server req ctx done") + return + default: + w.Write([]byte{'0'}) + flusher.Flush() + } + } + }, `/exit`: func(_ http.ResponseWriter, _ *http.Request) { s.Server.Shutdown(context.Background()) }, @@ -88,6 +105,51 @@ func init() { time.Sleep(time.Second) } +func Test14(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + + i, o := io.Pipe() + + r := New() + if e := r.Reqf(Rval{ + Url: "http://" + addr + "/stream", + Ctx: ctx, + NoResponse: true, + SaveToPipeWriter: o, + Async: true, + WriteLoopTO: 5*1000*2 + 1, + }); e != nil { + t.Log(e) + } + + start := time.Now() + + t.Log("Do", time.Since(start)) + + go func() { + buf := make([]byte, 1<<8) + for { + if n, e := i.Read(buf); n != 0 { + if time.Since(start) > time.Second { + cancel() + t.Log("Cancel", time.Since(start)) + break + } + // do nothing + continue + } else if e != nil { + t.Log(e) + break + } + } + }() + + if !errors.Is(r.Wait(), context.Canceled) { + t.Fatal() + } + t.Log("Do finished", time.Since(start)) +} + func Test_req13(t *testing.T) { r := New() e := r.Reqf(Rval{ @@ -176,50 +238,50 @@ func Test_req4(t *testing.T) { } } -func Test_req5(t *testing.T) { - r := New() - { - c := make(chan []byte) - r.Reqf(Rval{ - Url: "http://" + addr + "/to", - Timeout: 1000, - Async: true, - SaveToChan: c, - }) - for { - buf := <-c - if len(buf) == 0 { - break - } - } - if !IsTimeout(r.Wait()) { - t.Error("async IsTimeout fail") - } - } -} +// func Test_req5(t *testing.T) { +// r := New() +// { +// c := make(chan []byte) +// r.Reqf(Rval{ +// Url: "http://" + addr + "/to", +// Timeout: 1000, +// Async: true, +// SaveToChan: c, +// }) +// for { +// buf := <-c +// if len(buf) == 0 { +// break +// } +// } +// if !IsTimeout(r.Wait()) { +// t.Error("async IsTimeout fail") +// } +// } +// } -func Test_req6(t *testing.T) { - r := New() - { - c := make(chan []byte) - r.Reqf(Rval{ - Url: "http://" + addr + "/no", - Async: true, - SaveToChan: c, - }) - b := []byte{} - for { - buf := <-c - if len(buf) == 0 { - break - } - b = append(b, buf...) - } - if !bytes.Equal(b, []byte("abc强强强强")) { - t.Error("chan fail") - } - } -} +// func Test_req6(t *testing.T) { +// r := New() +// { +// c := make(chan []byte) +// r.Reqf(Rval{ +// Url: "http://" + addr + "/no", +// Async: true, +// SaveToChan: c, +// }) +// b := []byte{} +// for { +// buf := <-c +// if len(buf) == 0 { +// break +// } +// b = append(b, buf...) +// } +// if !bytes.Equal(b, []byte("abc强强强强")) { +// t.Error("chan fail") +// } +// } +// } func Test_req11(t *testing.T) { r := New() diff --git a/web/Web_test.go b/web/Web_test.go index a2f30fb..6149dc9 100644 --- a/web/Web_test.go +++ b/web/Web_test.go @@ -2,7 +2,6 @@ package part import ( "bytes" - "context" "encoding/json" "fmt" "io" @@ -85,13 +84,13 @@ func Test_ClientBlock(t *testing.T) { defer s.Shutdown() m.Store("/to", func(w http.ResponseWriter, r *http.Request) { - - rwc := pio.WithCtxTO(context.Background(), fmt.Sprintf("server handle %v by %v ", r.URL.Path, r.RemoteAddr), time.Second, w, r.Body, func(s string) { - fmt.Println(s) - if !strings.Contains(s, "write blocking after rw 2s > 1s, goruntime leak") { - t.Fatal(s) - } - }) + rwc := pio.WithCtxTO(r.Context(), fmt.Sprintf("server handle %v by %v ", r.URL.Path, r.RemoteAddr), time.Second, + []io.WriteCloser{pio.RWC{W: w.Write}}, r.Body, func(s string) { + fmt.Println(s) + if !strings.Contains(s, "write blocking after rw 2s > 1s, goruntime leak") { + t.Fatal(s) + } + }) defer rwc.Close() type d struct {