]> 127.0.0.1 Git - part/.git/commitdiff
add v0.28.0+2023060419d1532
authorqydysky <qydysky@foxmail.com>
Sun, 4 Jun 2023 17:51:29 +0000 (01:51 +0800)
committerqydysky <qydysky@foxmail.com>
Sun, 4 Jun 2023 17:51:29 +0000 (01:51 +0800)
io/io.go
reqf/Reqf.go
web/Web_test.go

index 5e89be68f843de91a5ce903baf9decc65723a135..9ea6d5289a867eff2875a4ef31f5e8c75dd9dcda 100644 (file)
--- a/io/io.go
+++ b/io/io.go
@@ -1,11 +1,15 @@
 package part
 
 import (
+       "context"
+       "fmt"
        "io"
+       "sync/atomic"
+       "time"
 )
 
-//no close rc any time
-//you can close wc, r, w.
+// no close rc any time
+// you can close wc, r, w.
 func RW2Chan(r io.ReadCloser, w io.WriteCloser) (rc, wc chan []byte) {
        if r != nil {
                rc = make(chan []byte, 10)
@@ -42,3 +46,81 @@ func RW2Chan(r io.ReadCloser, w io.WriteCloser) (rc, wc chan []byte) {
        }
        return
 }
+
+type RWC struct {
+       R func(p []byte) (n int, err error)
+       W func(p []byte) (n int, err error)
+       C func() error
+}
+
+func (t RWC) Write(p []byte) (n int, err error) {
+       return t.W(p)
+}
+func (t RWC) Read(p []byte) (n int, err error) {
+       return t.R(p)
+}
+func (t RWC) Close() error {
+       return t.C()
+}
+
+func WithCtxTO(ctx context.Context, callTree string, to time.Duration, w io.Writer, r io.Reader, panicf ...func(s string)) io.ReadWriteCloser {
+       var chanw atomic.Int64
+       chanw.Store(time.Now().Unix())
+       if len(panicf) == 0 {
+               panicf = append(panicf, func(s string) { panic(s) })
+       }
+
+       go func() {
+               var timer = time.NewTicker(to)
+               defer timer.Stop()
+               for {
+                       select {
+                       case <-ctx.Done():
+                               if old, now := chanw.Load(), time.Now(); old > 0 && now.Unix()-old > int64(to.Seconds()) {
+                                       if old != 0 {
+                                               panicf[0](fmt.Sprintf("write blocking while close %vs > %v, goruntime leak \n%v", now.Unix()-old, to, callTree))
+                                       }
+                               } else if old < 0 {
+                                       return
+                               } else {
+                                       time.AfterFunc(to, func() {
+                                               if old, now := chanw.Load(), time.Now(); old != 0 && now.Unix()-old > int64(to.Seconds()) {
+                                                       panicf[0](fmt.Sprintf("write blocking after close %vs > %v, goruntime leak \n%v", now.Unix()-old, to, callTree))
+                                               }
+                                       })
+                               }
+                               return
+                       case now := <-timer.C:
+                               if old := chanw.Load(); old > 0 && now.Unix()-old > int64(to.Seconds()) {
+                                       panicf[0](fmt.Sprintf("write blocking after rw %vs > %v, goruntime leak \n%v", now.Unix()-old, to, callTree))
+                                       return
+                               } else if old < 0 {
+                                       return
+                               }
+                       }
+               }
+       }()
+
+       return RWC{
+               func(p []byte) (n int, err error) {
+                       if n, err = r.Read(p); n != 0 {
+                               select {
+                               case <-ctx.Done():
+                               default:
+                                       chanw.Store(time.Now().Unix())
+                               }
+                       }
+                       return
+               },
+               func(p []byte) (n int, err error) {
+                       if n, err = w.Write(p); n != 0 {
+                               chanw.Store(time.Now().Unix())
+                       }
+                       return
+               },
+               func() error {
+                       chanw.Store(-1)
+                       return nil
+               },
+       }
+}
index 707cea7e47ecaa678847367e1c7e59caa01b85f4..97b714ccd60661d47f02034e62aaa39085af9770 100644 (file)
@@ -20,6 +20,7 @@ import (
        gzip "compress/gzip"
 
        br "github.com/andybalholm/brotli"
+       pio "github.com/qydysky/part/io"
        s "github.com/qydysky/part/strings"
        // "encoding/binary"
 )
@@ -60,7 +61,6 @@ var (
        ErrResponFileCreate = errors.New("ErrResponFileCreate")
        ErrWriteRes         = errors.New("ErrWriteRes")
        ErrReadRes          = errors.New("ErrReadRes")
-       ErrWriteAfterWrite  = errors.New("ErrWriteAfterWrite")
 )
 
 type Req struct {
@@ -255,7 +255,7 @@ func (t *Req) Reqf_1(ctx context.Context, val Rval) (err error) {
        if compress_type := resp.Header[`Content-Encoding`]; len(compress_type) != 0 {
                switch compress_type[0] {
                case `br`:
-                       resReadCloser = rwc{r: br.NewReader(resp.Body).Read}
+                       resReadCloser = pio.RWC{R: br.NewReader(resp.Body).Read}
                case `gzip`:
                        resReadCloser, _ = gzip.NewReader(resp.Body)
                case `deflate`:
@@ -271,7 +271,7 @@ func (t *Req) Reqf_1(ctx context.Context, val Rval) (err error) {
        if writeLoopTO == 0 {
                writeLoopTO = 1000
        }
-       rwc := t.withCtxTO(ctx, time.Duration(int(time.Millisecond)*writeLoopTO), w, resReadCloser)
+       rwc := pio.WithCtxTO(ctx, t.callTree, time.Duration(int(time.Millisecond)*writeLoopTO), w, resReadCloser)
        defer rwc.Close()
 
        for buf := make([]byte, 2048); true; {
@@ -380,76 +380,6 @@ func (t *Req) updateUseDur(u time.Time) {
        t.UsedTime = time.Since(u)
 }
 
-type rwc struct {
-       r func(p []byte) (n int, err error)
-       w func(p []byte) (n int, err error)
-       c func() error
-}
-
-func (t rwc) Write(p []byte) (n int, err error) {
-       return t.w(p)
-}
-func (t rwc) Read(p []byte) (n int, err error) {
-       return t.r(p)
-}
-func (t rwc) Close() error {
-       return t.c()
-}
-
-func (t *Req) withCtxTO(ctx context.Context, to time.Duration, w io.Writer, r io.Reader) io.ReadWriteCloser {
-       var chanw atomic.Int64
-
-       go func(callTree string) {
-               var timer = time.NewTicker(to)
-               defer timer.Stop()
-               for {
-                       select {
-                       case <-ctx.Done():
-                               if old, now := chanw.Load(), time.Now(); old != 0 && now.Unix()-old > int64(to.Seconds()) {
-                                       if chanw.Load() != 0 {
-                                               panic(fmt.Sprintf("write blocking after %v, goruntime leak \n%v", now.Unix()-old, callTree))
-                                       }
-                               } else {
-                                       time.AfterFunc(to, func() {
-                                               if chanw.Load() != 0 {
-                                                       panic(fmt.Sprintf("write blocking after %v, goruntime leak \n%v", now.Unix()-old, callTree))
-                                               }
-                                       })
-                               }
-                               return
-                       case now := <-timer.C:
-                               if old := chanw.Load(); old != 0 && now.Unix()-old > int64(to.Seconds()) {
-                                       panic(fmt.Sprintf("write blocking after %v, goruntime leak \n%v", now.Unix()-old, callTree))
-                               }
-                       }
-               }
-       }(t.callTree)
-
-       return rwc{
-               func(p []byte) (n int, err error) {
-                       if n, err = r.Read(p); n != 0 {
-                               select {
-                               case <-ctx.Done():
-                               default:
-                                       chanw.Store(time.Now().Unix())
-                               }
-                       }
-                       return
-               },
-               func(p []byte) (n int, err error) {
-                       if n, err = w.Write(p); n != 0 {
-                               if chanw.Swap(0) == 0 {
-                                       panic(ErrWriteAfterWrite)
-                               }
-                       }
-                       return
-               },
-               func() error {
-                       return nil
-               },
-       }
-}
-
 func IsTimeout(e error) bool {
        if errors.Is(e, context.DeadlineExceeded) {
                return true
index a3d4bb3fca5dcc1735ac4fb31661c1be5249dd0f..a2f30fb35da5bcf17a462ec8ce9c52c9ac42f909 100644 (file)
@@ -2,11 +2,16 @@ package part
 
 import (
        "bytes"
+       "context"
        "encoding/json"
+       "fmt"
+       "io"
        "net/http"
+       "strings"
        "testing"
        "time"
 
+       pio "github.com/qydysky/part/io"
        reqf "github.com/qydysky/part/reqf"
 )
 
@@ -68,6 +73,68 @@ func Test_ServerSyncMap(t *testing.T) {
        }
 }
 
+func Test_ClientBlock(t *testing.T) {
+       var m WebPath
+       m.Store("/", func(w http.ResponseWriter, _ *http.Request) {
+               w.Write([]byte("1"))
+       })
+       s := NewSyncMap(&http.Server{
+               Addr:         "127.0.0.1:10000",
+               WriteTimeout: time.Millisecond,
+       }, &m)
+       defer s.Shutdown()
+
+       m.Store("/to", func(w http.ResponseWriter, r *http.Request) {
+
+               rwc := pio.WithCtxTO(context.Background(), fmt.Sprintf("server handle %v by %v ", r.URL.Path, r.RemoteAddr), time.Second, w, r.Body, func(s string) {
+                       fmt.Println(s)
+                       if !strings.Contains(s, "write blocking after rw 2s > 1s, goruntime leak") {
+                               t.Fatal(s)
+                       }
+               })
+               defer rwc.Close()
+
+               type d struct {
+                       A string         `json:"a"`
+                       B []string       `json:"b"`
+                       C map[string]int `json:"c"`
+               }
+
+               var t = ResStruct{0, "ok", d{"0", []string{"0"}, map[string]int{"0": 1}}}
+               data, e := json.Marshal(t)
+               if e != nil {
+                       t.Code = -1
+                       t.Data = nil
+                       t.Message = e.Error()
+                       data, _ = json.Marshal(t)
+               }
+               w.Header().Set("Content-Type", "application/json")
+               _, _ = w.Write(data)
+       })
+
+       time.Sleep(time.Second)
+
+       r := reqf.New()
+       {
+               rc, wc := io.Pipe()
+               c := make(chan struct{})
+               go func() {
+                       time.Sleep(time.Second * 3)
+                       d, _ := io.ReadAll(rc)
+                       fmt.Println(string(d))
+                       fmt.Println(r.Response.Status)
+                       close(c)
+               }()
+               r.Reqf(reqf.Rval{
+                       Url:              "http://127.0.0.1:10000/to",
+                       SaveToPipeWriter: wc,
+                       WriteLoopTO:      5000,
+                       Async:            true,
+               })
+               <-c
+       }
+}
+
 func BenchmarkXxx(b *testing.B) {
        var m WebPath
        type d struct {