From: qydysky Date: Sun, 4 Jun 2023 17:51:29 +0000 (+0800) Subject: add X-Git-Tag: v0.28.0+2023060419d1532 X-Git-Url: http://127.0.0.1:8081/?a=commitdiff_plain;h=19d1532a8fbc5f4279fed957ab987c3b8980896a;p=part%2F.git add --- diff --git a/io/io.go b/io/io.go index 5e89be6..9ea6d52 100644 --- a/io/io.go +++ b/io/io.go @@ -1,11 +1,15 @@ package part import ( + "context" + "fmt" "io" + "sync/atomic" + "time" ) -//no close rc any time -//you can close wc, r, w. +// no close rc any time +// you can close wc, r, w. func RW2Chan(r io.ReadCloser, w io.WriteCloser) (rc, wc chan []byte) { if r != nil { rc = make(chan []byte, 10) @@ -42,3 +46,81 @@ func RW2Chan(r io.ReadCloser, w io.WriteCloser) (rc, wc chan []byte) { } return } + +type RWC struct { + R func(p []byte) (n int, err error) + W func(p []byte) (n int, err error) + C func() error +} + +func (t RWC) Write(p []byte) (n int, err error) { + return t.W(p) +} +func (t RWC) Read(p []byte) (n int, err error) { + return t.R(p) +} +func (t RWC) Close() error { + return t.C() +} + +func WithCtxTO(ctx context.Context, callTree string, to time.Duration, w io.Writer, r io.Reader, panicf ...func(s string)) io.ReadWriteCloser { + var chanw atomic.Int64 + chanw.Store(time.Now().Unix()) + if len(panicf) == 0 { + panicf = append(panicf, func(s string) { panic(s) }) + } + + go func() { + var timer = time.NewTicker(to) + defer timer.Stop() + for { + select { + case <-ctx.Done(): + if old, now := chanw.Load(), time.Now(); old > 0 && now.Unix()-old > int64(to.Seconds()) { + if old != 0 { + panicf[0](fmt.Sprintf("write blocking while close %vs > %v, goruntime leak \n%v", now.Unix()-old, to, callTree)) + } + } else if old < 0 { + return + } else { + time.AfterFunc(to, func() { + if old, now := chanw.Load(), time.Now(); old != 0 && now.Unix()-old > int64(to.Seconds()) { + panicf[0](fmt.Sprintf("write blocking after close %vs > %v, goruntime leak \n%v", now.Unix()-old, to, callTree)) + } + }) + } + return + case now := <-timer.C: + if old := chanw.Load(); old > 0 && now.Unix()-old > int64(to.Seconds()) { + panicf[0](fmt.Sprintf("write blocking after rw %vs > %v, goruntime leak \n%v", now.Unix()-old, to, callTree)) + return + } else if old < 0 { + return + } + } + } + }() + + return RWC{ + func(p []byte) (n int, err error) { + if n, err = r.Read(p); n != 0 { + select { + case <-ctx.Done(): + default: + chanw.Store(time.Now().Unix()) + } + } + return + }, + func(p []byte) (n int, err error) { + if n, err = w.Write(p); n != 0 { + chanw.Store(time.Now().Unix()) + } + return + }, + func() error { + chanw.Store(-1) + return nil + }, + } +} diff --git a/reqf/Reqf.go b/reqf/Reqf.go index 707cea7..97b714c 100644 --- a/reqf/Reqf.go +++ b/reqf/Reqf.go @@ -20,6 +20,7 @@ import ( gzip "compress/gzip" br "github.com/andybalholm/brotli" + pio "github.com/qydysky/part/io" s "github.com/qydysky/part/strings" // "encoding/binary" ) @@ -60,7 +61,6 @@ var ( ErrResponFileCreate = errors.New("ErrResponFileCreate") ErrWriteRes = errors.New("ErrWriteRes") ErrReadRes = errors.New("ErrReadRes") - ErrWriteAfterWrite = errors.New("ErrWriteAfterWrite") ) type Req struct { @@ -255,7 +255,7 @@ func (t *Req) Reqf_1(ctx context.Context, val Rval) (err error) { if compress_type := resp.Header[`Content-Encoding`]; len(compress_type) != 0 { switch compress_type[0] { case `br`: - resReadCloser = rwc{r: br.NewReader(resp.Body).Read} + resReadCloser = pio.RWC{R: br.NewReader(resp.Body).Read} case `gzip`: resReadCloser, _ = gzip.NewReader(resp.Body) case `deflate`: @@ -271,7 +271,7 @@ func (t *Req) Reqf_1(ctx context.Context, val Rval) (err error) { if writeLoopTO == 0 { writeLoopTO = 1000 } - rwc := t.withCtxTO(ctx, time.Duration(int(time.Millisecond)*writeLoopTO), w, resReadCloser) + rwc := pio.WithCtxTO(ctx, t.callTree, time.Duration(int(time.Millisecond)*writeLoopTO), w, resReadCloser) defer rwc.Close() for buf := make([]byte, 2048); true; { @@ -380,76 +380,6 @@ func (t *Req) updateUseDur(u time.Time) { t.UsedTime = time.Since(u) } -type rwc struct { - r func(p []byte) (n int, err error) - w func(p []byte) (n int, err error) - c func() error -} - -func (t rwc) Write(p []byte) (n int, err error) { - return t.w(p) -} -func (t rwc) Read(p []byte) (n int, err error) { - return t.r(p) -} -func (t rwc) Close() error { - return t.c() -} - -func (t *Req) withCtxTO(ctx context.Context, to time.Duration, w io.Writer, r io.Reader) io.ReadWriteCloser { - var chanw atomic.Int64 - - go func(callTree string) { - var timer = time.NewTicker(to) - defer timer.Stop() - for { - select { - case <-ctx.Done(): - if old, now := chanw.Load(), time.Now(); old != 0 && now.Unix()-old > int64(to.Seconds()) { - if chanw.Load() != 0 { - panic(fmt.Sprintf("write blocking after %v, goruntime leak \n%v", now.Unix()-old, callTree)) - } - } else { - time.AfterFunc(to, func() { - if chanw.Load() != 0 { - panic(fmt.Sprintf("write blocking after %v, goruntime leak \n%v", now.Unix()-old, callTree)) - } - }) - } - return - case now := <-timer.C: - if old := chanw.Load(); old != 0 && now.Unix()-old > int64(to.Seconds()) { - panic(fmt.Sprintf("write blocking after %v, goruntime leak \n%v", now.Unix()-old, callTree)) - } - } - } - }(t.callTree) - - return rwc{ - func(p []byte) (n int, err error) { - if n, err = r.Read(p); n != 0 { - select { - case <-ctx.Done(): - default: - chanw.Store(time.Now().Unix()) - } - } - return - }, - func(p []byte) (n int, err error) { - if n, err = w.Write(p); n != 0 { - if chanw.Swap(0) == 0 { - panic(ErrWriteAfterWrite) - } - } - return - }, - func() error { - return nil - }, - } -} - func IsTimeout(e error) bool { if errors.Is(e, context.DeadlineExceeded) { return true diff --git a/web/Web_test.go b/web/Web_test.go index a3d4bb3..a2f30fb 100644 --- a/web/Web_test.go +++ b/web/Web_test.go @@ -2,11 +2,16 @@ package part import ( "bytes" + "context" "encoding/json" + "fmt" + "io" "net/http" + "strings" "testing" "time" + pio "github.com/qydysky/part/io" reqf "github.com/qydysky/part/reqf" ) @@ -68,6 +73,68 @@ func Test_ServerSyncMap(t *testing.T) { } } +func Test_ClientBlock(t *testing.T) { + var m WebPath + m.Store("/", func(w http.ResponseWriter, _ *http.Request) { + w.Write([]byte("1")) + }) + s := NewSyncMap(&http.Server{ + Addr: "127.0.0.1:10000", + WriteTimeout: time.Millisecond, + }, &m) + defer s.Shutdown() + + m.Store("/to", func(w http.ResponseWriter, r *http.Request) { + + rwc := pio.WithCtxTO(context.Background(), fmt.Sprintf("server handle %v by %v ", r.URL.Path, r.RemoteAddr), time.Second, w, r.Body, func(s string) { + fmt.Println(s) + if !strings.Contains(s, "write blocking after rw 2s > 1s, goruntime leak") { + t.Fatal(s) + } + }) + defer rwc.Close() + + type d struct { + A string `json:"a"` + B []string `json:"b"` + C map[string]int `json:"c"` + } + + var t = ResStruct{0, "ok", d{"0", []string{"0"}, map[string]int{"0": 1}}} + data, e := json.Marshal(t) + if e != nil { + t.Code = -1 + t.Data = nil + t.Message = e.Error() + data, _ = json.Marshal(t) + } + w.Header().Set("Content-Type", "application/json") + _, _ = w.Write(data) + }) + + time.Sleep(time.Second) + + r := reqf.New() + { + rc, wc := io.Pipe() + c := make(chan struct{}) + go func() { + time.Sleep(time.Second * 3) + d, _ := io.ReadAll(rc) + fmt.Println(string(d)) + fmt.Println(r.Response.Status) + close(c) + }() + r.Reqf(reqf.Rval{ + Url: "http://127.0.0.1:10000/to", + SaveToPipeWriter: wc, + WriteLoopTO: 5000, + Async: true, + }) + <-c + } +} + func BenchmarkXxx(b *testing.B) { var m WebPath type d struct {