]> 127.0.0.1 Git - part/.git/commitdiff
Fix req Cancle v0.23.3
authorqydysky <32743305+qydysky@users.noreply.github.com>
Thu, 2 Mar 2023 14:44:39 +0000 (22:44 +0800)
committerqydysky <32743305+qydysky@users.noreply.github.com>
Thu, 2 Mar 2023 14:44:39 +0000 (22:44 +0800)
reqf/Reqf.go
reqf/Reqf_test.go

index b38a0d28a686f39faabc4ce3a3f22aed7d1100ae..0e4fbb4baffceb63cb39b0b08d3a0ccddbbd3c42 100644 (file)
@@ -27,8 +27,6 @@ type Rval struct {
        Url              string
        PostStr          string
        Timeout          int
-       ReadTimeout      int // deprecated
-       ConnectTimeout   int // deprecated
        Proxy            string
        Retry            int
        SleepTime        int
@@ -49,10 +47,10 @@ type Req struct {
        Response *http.Response
        UsedTime time.Duration
 
-       cancelF func()
-       cancel  *signal.Signal
-       running *signal.Signal
-       isLive  *signal.Signal
+       cancelFs []func()
+       cancel   *signal.Signal
+       running  *signal.Signal
+       isLive   *signal.Signal
 
        responFile *os.File
        err        error
@@ -70,7 +68,7 @@ func (t *Req) Reqf(val Rval) error {
        t.Respon = []byte{}
        t.Response = nil
        t.UsedTime = 0
-       t.cancelF = nil
+       t.cancelFs = []func(){}
        t.cancel = signal.Init()
        t.running = signal.Init()
        t.isLive = signal.Init()
@@ -80,8 +78,8 @@ func (t *Req) Reqf(val Rval) error {
        go func() {
                select {
                case <-t.cancel.Chan:
-                       if t.cancelF != nil {
-                               t.cancelF()
+                       for i := 0; i < len(t.cancelFs); i++ {
+                               t.cancelFs[i]()
                        }
                case <-t.running.Chan:
                }
@@ -113,6 +111,7 @@ func (t *Req) Reqf(val Rval) error {
                if val.SaveToPipeWriter != nil {
                        val.SaveToPipeWriter.Close()
                }
+               t.running.Done()
                t.cancel.Done()
                t.l.Unlock()
                t.isLive.Done()
@@ -128,6 +127,7 @@ func (t *Req) Reqf(val Rval) error {
                        if val.SaveToPipeWriter != nil {
                                val.SaveToPipeWriter.Close()
                        }
+                       t.running.Done()
                        t.cancel.Done()
                        t.l.Unlock()
                        t.isLive.Done()
@@ -179,7 +179,7 @@ func (t *Req) Reqf_1(val Rval) (err error) {
        if val.Timeout > 0 {
                cx, cancel = context.WithTimeout(cx, time.Duration(val.Timeout)*time.Millisecond)
        }
-       t.cancelF = cancel
+       t.cancelFs = append(t.cancelFs, cancel)
 
        req, e := http.NewRequest(Method, val.Url, body)
        if e != nil {
@@ -247,9 +247,11 @@ func (t *Req) Reqf_1(val Rval) (err error) {
                        return
                }
                ws = append(ws, t.responFile)
+               t.cancelFs = append(t.cancelFs, func() { t.responFile.Close() })
        }
        if val.SaveToPipeWriter != nil {
                ws = append(ws, val.SaveToPipeWriter)
+               t.cancelFs = append(t.cancelFs, func() { val.SaveToPipeWriter.Close() })
        }
        if !val.NoResponse {
                if responBuf == nil {
@@ -275,15 +277,15 @@ func (t *Req) Reqf_1(val Rval) (err error) {
        } else {
                resReader = resp.Body
        }
+       t.cancelFs = append(t.cancelFs, func() { resp.Body.Close() })
 
        buf := make([]byte, 512)
 
        for {
                if n, e := resReader.Read(buf); n != 0 {
                        w.Write(buf[:n])
-                       select {
-                       case val.SaveToChan <- buf[:n]:
-                       default:
+                       if val.SaveToChan != nil {
+                               val.SaveToChan <- buf[:n]
                        }
                } else if e != nil {
                        if !errors.Is(e, io.EOF) {
index 6c5fe3b592c397bb559591ebf3cb97f9846c9547..da6438da17b5e90a9c3bdfafe36a85424ca1321e 100644 (file)
@@ -44,6 +44,20 @@ func init() {
                        w.Header().Set("Content-Encoding", "gzip")
                        w.Write(d)
                },
+               `/1min`: func(w http.ResponseWriter, _ *http.Request) {
+                       w.WriteHeader(200)
+                       flusher, flushSupport := w.(http.Flusher)
+                       if flushSupport {
+                               flusher.Flush()
+                       }
+                       for i := 0; i < 3; i++ {
+                               w.Write([]byte("0"))
+                               if flushSupport {
+                                       flusher.Flush()
+                               }
+                               time.Sleep(time.Second)
+                       }
+               },
                `/exit`: func(_ http.ResponseWriter, _ *http.Request) {
                        s.Server.Shutdown(context.Background())
                },
@@ -154,6 +168,10 @@ func Test_req6(t *testing.T) {
                        t.Error("chan fail")
                }
        }
+}
+
+func Test_req11(t *testing.T) {
+       r := New()
        {
                timer := time.NewTimer(time.Second)
                go func() {
@@ -169,6 +187,70 @@ func Test_req6(t *testing.T) {
        }
 }
 
+func Test_req9(t *testing.T) {
+       r := New()
+       {
+               rc, wc := io.Pipe()
+               go func() {
+                       var buf []byte = make([]byte, 1<<16)
+                       for {
+                               n, _ := rc.Read(buf)
+                               if n == 0 {
+                                       break
+                               }
+                       }
+               }()
+               r.Reqf(Rval{
+                       Url:              "http://" + addr + "/1min",
+                       SaveToPipeWriter: wc,
+                       Async:            true,
+               })
+               if r.Wait() != nil {
+                       t.Fatal()
+               }
+       }
+}
+
+func Test_req8(t *testing.T) {
+       r := New()
+       {
+               rc, wc := io.Pipe()
+               go func() {
+                       var buf []byte = make([]byte, 1<<16)
+                       rc.Read(buf)
+                       time.Sleep(time.Millisecond * 500)
+                       r.Cancel()
+               }()
+               r.Reqf(Rval{
+                       Url:              "http://" + addr + "/1min",
+                       SaveToPipeWriter: wc,
+                       Async:            true,
+               })
+               if !IsCancel(r.Wait()) {
+                       t.Fatal("read from block response")
+               }
+       }
+}
+
+func Test_req10(t *testing.T) {
+       r := New()
+       {
+               _, wc := io.Pipe()
+               go func() {
+                       time.Sleep(time.Millisecond * 500)
+                       r.Cancel()
+               }()
+               r.Reqf(Rval{
+                       Url:              "http://" + addr + "/1min",
+                       SaveToPipeWriter: wc,
+                       Async:            true,
+               })
+               if !IsCancel(r.Wait()) {
+                       t.Fatal("write to block io.pipe")
+               }
+       }
+}
+
 func Test_req3(t *testing.T) {
        r := New()
        {