From 33cc17cf84c0aa7f961cf29d105233be001ca368 Mon Sep 17 00:00:00 2001 From: qydysky <32743305+qydysky@users.noreply.github.com> Date: Thu, 2 Mar 2023 22:44:39 +0800 Subject: [PATCH] Fix req Cancle --- reqf/Reqf.go | 28 ++++++++-------- reqf/Reqf_test.go | 82 +++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 97 insertions(+), 13 deletions(-) diff --git a/reqf/Reqf.go b/reqf/Reqf.go index b38a0d2..0e4fbb4 100644 --- a/reqf/Reqf.go +++ b/reqf/Reqf.go @@ -27,8 +27,6 @@ type Rval struct { Url string PostStr string Timeout int - ReadTimeout int // deprecated - ConnectTimeout int // deprecated Proxy string Retry int SleepTime int @@ -49,10 +47,10 @@ type Req struct { Response *http.Response UsedTime time.Duration - cancelF func() - cancel *signal.Signal - running *signal.Signal - isLive *signal.Signal + cancelFs []func() + cancel *signal.Signal + running *signal.Signal + isLive *signal.Signal responFile *os.File err error @@ -70,7 +68,7 @@ func (t *Req) Reqf(val Rval) error { t.Respon = []byte{} t.Response = nil t.UsedTime = 0 - t.cancelF = nil + t.cancelFs = []func(){} t.cancel = signal.Init() t.running = signal.Init() t.isLive = signal.Init() @@ -80,8 +78,8 @@ func (t *Req) Reqf(val Rval) error { go func() { select { case <-t.cancel.Chan: - if t.cancelF != nil { - t.cancelF() + for i := 0; i < len(t.cancelFs); i++ { + t.cancelFs[i]() } case <-t.running.Chan: } @@ -113,6 +111,7 @@ func (t *Req) Reqf(val Rval) error { if val.SaveToPipeWriter != nil { val.SaveToPipeWriter.Close() } + t.running.Done() t.cancel.Done() t.l.Unlock() t.isLive.Done() @@ -128,6 +127,7 @@ func (t *Req) Reqf(val Rval) error { if val.SaveToPipeWriter != nil { val.SaveToPipeWriter.Close() } + t.running.Done() t.cancel.Done() t.l.Unlock() t.isLive.Done() @@ -179,7 +179,7 @@ func (t *Req) Reqf_1(val Rval) (err error) { if val.Timeout > 0 { cx, cancel = context.WithTimeout(cx, time.Duration(val.Timeout)*time.Millisecond) } - t.cancelF = cancel + t.cancelFs = append(t.cancelFs, cancel) req, e := http.NewRequest(Method, val.Url, body) if e != nil { @@ -247,9 +247,11 @@ func (t *Req) Reqf_1(val Rval) (err error) { return } ws = append(ws, t.responFile) + t.cancelFs = append(t.cancelFs, func() { t.responFile.Close() }) } if val.SaveToPipeWriter != nil { ws = append(ws, val.SaveToPipeWriter) + t.cancelFs = append(t.cancelFs, func() { val.SaveToPipeWriter.Close() }) } if !val.NoResponse { if responBuf == nil { @@ -275,15 +277,15 @@ func (t *Req) Reqf_1(val Rval) (err error) { } else { resReader = resp.Body } + t.cancelFs = append(t.cancelFs, func() { resp.Body.Close() }) buf := make([]byte, 512) for { if n, e := resReader.Read(buf); n != 0 { w.Write(buf[:n]) - select { - case val.SaveToChan <- buf[:n]: - default: + if val.SaveToChan != nil { + val.SaveToChan <- buf[:n] } } else if e != nil { if !errors.Is(e, io.EOF) { diff --git a/reqf/Reqf_test.go b/reqf/Reqf_test.go index 6c5fe3b..da6438d 100644 --- a/reqf/Reqf_test.go +++ b/reqf/Reqf_test.go @@ -44,6 +44,20 @@ func init() { w.Header().Set("Content-Encoding", "gzip") w.Write(d) }, + `/1min`: func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(200) + flusher, flushSupport := w.(http.Flusher) + if flushSupport { + flusher.Flush() + } + for i := 0; i < 3; i++ { + w.Write([]byte("0")) + if flushSupport { + flusher.Flush() + } + time.Sleep(time.Second) + } + }, `/exit`: func(_ http.ResponseWriter, _ *http.Request) { s.Server.Shutdown(context.Background()) }, @@ -154,6 +168,10 @@ func Test_req6(t *testing.T) { t.Error("chan fail") } } +} + +func Test_req11(t *testing.T) { + r := New() { timer := time.NewTimer(time.Second) go func() { @@ -169,6 +187,70 @@ func Test_req6(t *testing.T) { } } +func Test_req9(t *testing.T) { + r := New() + { + rc, wc := io.Pipe() + go func() { + var buf []byte = make([]byte, 1<<16) + for { + n, _ := rc.Read(buf) + if n == 0 { + break + } + } + }() + r.Reqf(Rval{ + Url: "http://" + addr + "/1min", + SaveToPipeWriter: wc, + Async: true, + }) + if r.Wait() != nil { + t.Fatal() + } + } +} + +func Test_req8(t *testing.T) { + r := New() + { + rc, wc := io.Pipe() + go func() { + var buf []byte = make([]byte, 1<<16) + rc.Read(buf) + time.Sleep(time.Millisecond * 500) + r.Cancel() + }() + r.Reqf(Rval{ + Url: "http://" + addr + "/1min", + SaveToPipeWriter: wc, + Async: true, + }) + if !IsCancel(r.Wait()) { + t.Fatal("read from block response") + } + } +} + +func Test_req10(t *testing.T) { + r := New() + { + _, wc := io.Pipe() + go func() { + time.Sleep(time.Millisecond * 500) + r.Cancel() + }() + r.Reqf(Rval{ + Url: "http://" + addr + "/1min", + SaveToPipeWriter: wc, + Async: true, + }) + if !IsCancel(r.Wait()) { + t.Fatal("write to block io.pipe") + } + } +} + func Test_req3(t *testing.T) { r := New() { -- 2.39.2