]> 127.0.0.1 Git - part/.git/commitdiff
add v0.28.0+202307186d5eeb5
authorqydysky <qydysky@foxmail.com>
Tue, 18 Jul 2023 17:55:08 +0000 (01:55 +0800)
committerqydysky <qydysky@foxmail.com>
Tue, 18 Jul 2023 17:55:08 +0000 (01:55 +0800)
io/io.go
io/io_test.go
reqf/Reqf.go
reqf/Reqf_test.go
web/Web_test.go

index ec8de7d7c46b14d4d2f058e0f0ff7c6390ac9e14..fe60b1aa9fb3554f8a3cdc0327efa2483574bfe4 100644 (file)
--- a/io/io.go
+++ b/io/io.go
@@ -54,16 +54,27 @@ type RWC struct {
 }
 
 func (t RWC) Write(p []byte) (n int, err error) {
-       return t.W(p)
+       if t.W != nil {
+               return t.W(p)
+       }
+       return 0, nil
 }
 func (t RWC) Read(p []byte) (n int, err error) {
-       return t.R(p)
+       if t.R != nil {
+               return t.R(p)
+       }
+       return 0, nil
 }
 func (t RWC) Close() error {
-       return t.C()
+       if t.C != nil {
+               return t.C()
+       }
+       return nil
 }
 
-func WithCtxTO(ctx context.Context, callTree string, to time.Duration, w io.Writer, r io.Reader, panicf ...func(s string)) io.ReadWriteCloser {
+// close reader by yourself
+// call Close() after writer fin
+func WithCtxTO(ctx context.Context, callTree string, to time.Duration, w []io.WriteCloser, r io.Reader, panicf ...func(s string)) io.ReadWriteCloser {
        var chanw atomic.Int64
        chanw.Store(time.Now().Unix())
        if len(panicf) == 0 {
@@ -76,26 +87,22 @@ func WithCtxTO(ctx context.Context, callTree string, to time.Duration, w io.Writ
                for {
                        select {
                        case <-ctx.Done():
-                               if old := chanw.Load(); old == -1 {
-                                       return
-                               } else if now := time.Now(); old > 0 && now.Unix()-old > int64(to.Seconds()) {
-                                       if old != 0 {
-                                               panicf[0](fmt.Sprintf("rw blocking while close %vs > %v, goruntime leak \n%v", now.Unix()-old, to, callTree))
-                                       }
+                               // avoid write block
+                               for i := 0; i < len(w); i++ {
+                                       w[i].Close()
+                               }
+                               if old, now := chanw.Load(), time.Now(); old > 0 && now.Unix()-old > int64(to.Seconds()) {
+                                       panicf[0](fmt.Sprintf("rw blocking while close %vs > %v, goruntime leak \n%v", now.Unix()-old, to, callTree))
                                } else {
                                        time.AfterFunc(to, func() {
-                                               if old := chanw.Load(); old == -1 {
-                                                       return
-                                               } else if now := time.Now(); old > 0 && now.Unix()-old > int64(to.Seconds()) {
-                                                       panicf[0](fmt.Sprintf("rw blocking after close %vs > %v, goruntime leak \n%v", now.Unix()-old, to, callTree))
+                                               if chanw.Load() != -1 {
+                                                       panicf[0](fmt.Sprintf("rw blocking after close %v, goruntime leak \n%v", to, callTree))
                                                }
                                        })
                                }
                                return
                        case now := <-timer.C:
-                               if old := chanw.Load(); old == -1 {
-                                       return
-                               } else if old > 0 && now.Unix()-old > int64(to.Seconds()) {
+                               if old := chanw.Load(); old > 0 && now.Unix()-old > int64(to.Seconds()) {
                                        panicf[0](fmt.Sprintf("rw blocking after rw %vs > %v, goruntime leak \n%v", now.Unix()-old, to, callTree))
                                        return
                                }
@@ -120,8 +127,10 @@ func WithCtxTO(ctx context.Context, callTree string, to time.Duration, w io.Writ
                        case <-ctx.Done():
                                err = context.Canceled
                        default:
-                               if n, err = w.Write(p); n != 0 {
-                                       chanw.Store(time.Now().Unix())
+                               for i := 0; i < len(w); i++ {
+                                       if n, err = w[i].Write(p); n != 0 {
+                                               chanw.Store(time.Now().Unix())
+                                       }
                                }
                        }
                        return
index dbef24f9a238af5615fb8fc5b91dca9c2792a573..2dce99045cfb79d670d83551977e74f174419d5e 100644 (file)
@@ -1,40 +1,51 @@
 package part
 
 import (
-       "testing"
        "io"
+       "testing"
 )
 
+func Test_rwc(t *testing.T) {
+       rwc := RWC{R: func(p []byte) (n int, err error) { return 1, nil }}
+       rwc.Close()
+}
+
 func Test_RW2Chan(t *testing.T) {
        {
-               r,w := io.Pipe()
-               _,rw := RW2Chan(nil,w)
-               
-               go func(){
-                       rw<-[]byte{0x01}
+               r, w := io.Pipe()
+               _, rw := RW2Chan(nil, w)
+
+               go func() {
+                       rw <- []byte{0x01}
                }()
                buf := make([]byte, 1<<16)
-               n,_:=r.Read(buf)
-               if buf[:n][0] != 1 {t.Error(`no`)}
+               n, _ := r.Read(buf)
+               if buf[:n][0] != 1 {
+                       t.Error(`no`)
+               }
        }
-       
+
        {
-               r,w := io.Pipe()
-               rc,_ := RW2Chan(r,nil)
-               
-               go func(){
+               r, w := io.Pipe()
+               rc, _ := RW2Chan(r, nil)
+
+               go func() {
                        w.Write([]byte{0x09})
                }()
-               if b:=<-rc;b[0] != 9 {t.Error(`no2`)}
+               if b := <-rc; b[0] != 9 {
+                       t.Error(`no2`)
+               }
        }
-       
+
        {
-               r,w := io.Pipe()
-               rc,rw := RW2Chan(r,w)
-               
-               go func(){
+               r, w := io.Pipe()
+               rc, rw := RW2Chan(r, w)
+
+               go func() {
                        rw <- []byte{0x07}
                }()
-               if b:=<-rc;b[0] != 7 {t.Error(`no3`)}
+               if b := <-rc; b[0] != 7 {
+                       t.Error(`no3`)
+               }
        }
 }
index 0b223469b4da1fc8e9040173caad4dd14b17a747..5326c1a26075710b7bbd69c5cf0c0155be8f638f 100644 (file)
@@ -41,7 +41,6 @@ type Rval struct {
        Ctx     context.Context
 
        SaveToPath       string
-       SaveToChan       chan []byte
        SaveToPipeWriter *io.PipeWriter
 
        Header map[string]string
@@ -71,7 +70,6 @@ type Req struct {
        UsedTime time.Duration
 
        cancelP atomic.Pointer[context.CancelFunc]
-       ctx     context.Context
        state   atomic.Int32
 
        responFile *os.File
@@ -229,7 +227,7 @@ func (t *Req) Reqf_1(ctx context.Context, val Rval) (err error) {
                err = fmt.Errorf("%d %s", resp.StatusCode, http.StatusText(resp.StatusCode))
        }
 
-       var ws []io.Writer
+       var ws []io.WriteCloser
        if val.SaveToPath != "" {
                t.responFile, e = os.Create(val.SaveToPath)
                if e != nil {
@@ -239,16 +237,14 @@ func (t *Req) Reqf_1(ctx context.Context, val Rval) (err error) {
                ws = append(ws, t.responFile)
        }
        if val.SaveToPipeWriter != nil {
-               ws = append(ws, val.SaveToPipeWriter)
+               ws = append(ws, pio.RWC{W: val.SaveToPipeWriter.Write, C: func() error { return val.SaveToPipeWriter.CloseWithError(context.Canceled) }})
        }
        if !val.NoResponse {
                //will clear t.Respon
                t.responBuf.Reset()
-               ws = append(ws, t.responBuf)
+               ws = append(ws, pio.RWC{W: t.responBuf.Write, C: func() error { return nil }})
        }
 
-       w := io.MultiWriter(ws...)
-
        var resReadCloser = resp.Body
        if compress_type := resp.Header[`Content-Encoding`]; len(compress_type) != 0 {
                switch compress_type[0] {
@@ -266,16 +262,13 @@ func (t *Req) Reqf_1(ctx context.Context, val Rval) (err error) {
        if writeLoopTO == 0 {
                writeLoopTO = 1000
        }
-       rwc := pio.WithCtxTO(req.Context(), t.callTree, time.Duration(int(time.Millisecond)*writeLoopTO), w, resReadCloser)
+
+       rwc := pio.WithCtxTO(req.Context(), t.callTree, time.Duration(int(time.Millisecond)*writeLoopTO), ws, resReadCloser)
        defer rwc.Close()
 
        for buf := make([]byte, 2048); true; {
                if n, e := rwc.Read(buf); n != 0 {
-                       if n, e := rwc.Write(buf[:n]); n != 0 {
-                               if val.SaveToChan != nil {
-                                       val.SaveToChan <- buf[:n]
-                               }
-                       } else if e != nil {
+                       if n, e := rwc.Write(buf[:n]); n == 0 && e != nil {
                                if !errors.Is(e, io.EOF) {
                                        err = errors.Join(err, ErrWriteRes, e)
                                }
@@ -355,9 +348,6 @@ func (t *Req) prepare(val *Rval) (ctx context.Context, cancel context.CancelFunc
 }
 
 func (t *Req) clean(val *Rval) {
-       if val.SaveToChan != nil {
-               close(val.SaveToChan)
-       }
        if t.responFile != nil {
                t.responFile.Close()
        }
index da4ef88e7e607322f74801d6ead737d91f7a9e63..a52e7ec8ebdd97ba73257a5f5eebad87abda28d2 100644 (file)
@@ -4,6 +4,7 @@ import (
        "bytes"
        "context"
        "encoding/json"
+       "errors"
        "io"
        "net/http"
        "strconv"
@@ -81,6 +82,22 @@ func init() {
                                flusher.Flush()
                        }
                },
+               `/stream`: func(w http.ResponseWriter, r *http.Request) {
+                       flusher, flushSupport := w.(http.Flusher)
+                       if flushSupport {
+                               flusher.Flush()
+                       }
+                       for {
+                               select {
+                               case <-r.Context().Done():
+                                       println("server req ctx done")
+                                       return
+                               default:
+                                       w.Write([]byte{'0'})
+                                       flusher.Flush()
+                               }
+                       }
+               },
                `/exit`: func(_ http.ResponseWriter, _ *http.Request) {
                        s.Server.Shutdown(context.Background())
                },
@@ -88,6 +105,51 @@ func init() {
        time.Sleep(time.Second)
 }
 
+func Test14(t *testing.T) {
+       ctx, cancel := context.WithCancel(context.Background())
+
+       i, o := io.Pipe()
+
+       r := New()
+       if e := r.Reqf(Rval{
+               Url:              "http://" + addr + "/stream",
+               Ctx:              ctx,
+               NoResponse:       true,
+               SaveToPipeWriter: o,
+               Async:            true,
+               WriteLoopTO:      5*1000*2 + 1,
+       }); e != nil {
+               t.Log(e)
+       }
+
+       start := time.Now()
+
+       t.Log("Do", time.Since(start))
+
+       go func() {
+               buf := make([]byte, 1<<8)
+               for {
+                       if n, e := i.Read(buf); n != 0 {
+                               if time.Since(start) > time.Second {
+                                       cancel()
+                                       t.Log("Cancel", time.Since(start))
+                                       break
+                               }
+                               // do nothing
+                               continue
+                       } else if e != nil {
+                               t.Log(e)
+                               break
+                       }
+               }
+       }()
+
+       if !errors.Is(r.Wait(), context.Canceled) {
+               t.Fatal()
+       }
+       t.Log("Do finished", time.Since(start))
+}
+
 func Test_req13(t *testing.T) {
        r := New()
        e := r.Reqf(Rval{
@@ -176,50 +238,50 @@ func Test_req4(t *testing.T) {
        }
 }
 
-func Test_req5(t *testing.T) {
-       r := New()
-       {
-               c := make(chan []byte)
-               r.Reqf(Rval{
-                       Url:        "http://" + addr + "/to",
-                       Timeout:    1000,
-                       Async:      true,
-                       SaveToChan: c,
-               })
-               for {
-                       buf := <-c
-                       if len(buf) == 0 {
-                               break
-                       }
-               }
-               if !IsTimeout(r.Wait()) {
-                       t.Error("async IsTimeout fail")
-               }
-       }
-}
+// func Test_req5(t *testing.T) {
+//     r := New()
+//     {
+//             c := make(chan []byte)
+//             r.Reqf(Rval{
+//                     Url:        "http://" + addr + "/to",
+//                     Timeout:    1000,
+//                     Async:      true,
+//                     SaveToChan: c,
+//             })
+//             for {
+//                     buf := <-c
+//                     if len(buf) == 0 {
+//                             break
+//                     }
+//             }
+//             if !IsTimeout(r.Wait()) {
+//                     t.Error("async IsTimeout fail")
+//             }
+//     }
+// }
 
-func Test_req6(t *testing.T) {
-       r := New()
-       {
-               c := make(chan []byte)
-               r.Reqf(Rval{
-                       Url:        "http://" + addr + "/no",
-                       Async:      true,
-                       SaveToChan: c,
-               })
-               b := []byte{}
-               for {
-                       buf := <-c
-                       if len(buf) == 0 {
-                               break
-                       }
-                       b = append(b, buf...)
-               }
-               if !bytes.Equal(b, []byte("abc强强强强")) {
-                       t.Error("chan fail")
-               }
-       }
-}
+// func Test_req6(t *testing.T) {
+//     r := New()
+//     {
+//             c := make(chan []byte)
+//             r.Reqf(Rval{
+//                     Url:        "http://" + addr + "/no",
+//                     Async:      true,
+//                     SaveToChan: c,
+//             })
+//             b := []byte{}
+//             for {
+//                     buf := <-c
+//                     if len(buf) == 0 {
+//                             break
+//                     }
+//                     b = append(b, buf...)
+//             }
+//             if !bytes.Equal(b, []byte("abc强强强强")) {
+//                     t.Error("chan fail")
+//             }
+//     }
+// }
 
 func Test_req11(t *testing.T) {
        r := New()
index a2f30fb35da5bcf17a462ec8ce9c52c9ac42f909..6149dc9c7d216dd2746fbb0071564f85bb1c819a 100644 (file)
@@ -2,7 +2,6 @@ package part
 
 import (
        "bytes"
-       "context"
        "encoding/json"
        "fmt"
        "io"
@@ -85,13 +84,13 @@ func Test_ClientBlock(t *testing.T) {
        defer s.Shutdown()
 
        m.Store("/to", func(w http.ResponseWriter, r *http.Request) {
-
-               rwc := pio.WithCtxTO(context.Background(), fmt.Sprintf("server handle %v by %v ", r.URL.Path, r.RemoteAddr), time.Second, w, r.Body, func(s string) {
-                       fmt.Println(s)
-                       if !strings.Contains(s, "write blocking after rw 2s > 1s, goruntime leak") {
-                               t.Fatal(s)
-                       }
-               })
+               rwc := pio.WithCtxTO(r.Context(), fmt.Sprintf("server handle %v by %v ", r.URL.Path, r.RemoteAddr), time.Second,
+                       []io.WriteCloser{pio.RWC{W: w.Write}}, r.Body, func(s string) {
+                               fmt.Println(s)
+                               if !strings.Contains(s, "write blocking after rw 2s > 1s, goruntime leak") {
+                                       t.Fatal(s)
+                               }
+                       })
                defer rwc.Close()
 
                type d struct {