From 196b95e0f1ca140c716492974ca0abb8c0e21106 Mon Sep 17 00:00:00 2001 From: qydysky <32743305+qydysky@users.noreply.github.com> Date: Sun, 13 Nov 2022 22:51:57 +0800 Subject: [PATCH] update --- reqf/Reqf.go | 88 ++++++++++++----------- reqf/Reqf_test.go | 174 +++++++++++++++++++++++----------------------- 2 files changed, 132 insertions(+), 130 deletions(-) diff --git a/reqf/Reqf.go b/reqf/Reqf.go index 671d5cb..0a8d2e0 100644 --- a/reqf/Reqf.go +++ b/reqf/Reqf.go @@ -11,7 +11,6 @@ import ( "os" "strconv" "strings" - "sync" "time" flate "compress/flate" @@ -34,10 +33,11 @@ type Rval struct { SleepTime int JustResponseCode bool NoResponse bool + Async bool Cookies []*http.Cookie SaveToPath string - SaveToChan chan []byte // deprecated + SaveToChan chan []byte SaveToPipeWriter *io.PipeWriter Header map[string]string @@ -48,8 +48,10 @@ type Req struct { Response *http.Response UsedTime time.Duration - cancel *signal.Signal - sync.Mutex + cancel *signal.Signal + running *signal.Signal + responBuf *bytes.Buffer + responFile *os.File } func New() *Req { @@ -67,8 +69,12 @@ func New() *Req { // } func (t *Req) Reqf(val Rval) error { - t.Lock() - defer t.Unlock() + if val.SaveToChan != nil && len(val.SaveToChan) == 1 && !val.Async { + panic("must make sure chan size larger then 1 or use Async true") + } + if val.SaveToPipeWriter != nil && !val.Async { + panic("SaveToPipeWriter must use Async true") + } t.Respon = []byte{} t.Response = nil @@ -78,9 +84,6 @@ func (t *Req) Reqf(val Rval) error { _val := val - t.cancel = signal.Init() - defer t.cancel.Done() - for SleepTime, Retry := _val.SleepTime, _val.Retry; Retry >= 0; Retry -= 1 { returnErr = t.Reqf_1(_val) select { @@ -98,7 +101,6 @@ func (t *Req) Reqf(val Rval) error { } func (t *Req) Reqf_1(val Rval) (err error) { - var ( Header map[string]string = val.Header ) @@ -207,45 +209,22 @@ func (t *Req) Reqf_1(val Rval) (err error) { var ws []io.Writer if val.SaveToPath != "" { - out, err := os.Create(val.SaveToPath) + t.responFile, err = os.Create(val.SaveToPath) if err != nil { - out.Close() + t.responFile.Close() return err } - defer out.Close() - ws = append(ws, out) + ws = append(ws, t.responFile) } if val.SaveToPipeWriter != nil { - defer val.SaveToPipeWriter.Close() ws = append(ws, val.SaveToPipeWriter) } - // if val.SaveToChan != nil { - // r, w := io.Pipe() - // go func() { - // buf := make([]byte, 1<<16) - // for { - // n, e := r.Read(buf) - // if n != 0 { - // val.SaveToChan <- buf[:n] - // } else if e != nil { - // defer close(val.SaveToChan) - // break - // } - // } - // }() - // defer w.Close() - // ws = append(ws, w) - // } if !val.NoResponse { - var buf bytes.Buffer - defer func() { - t.Respon = buf.Bytes() - }() - ws = append(ws, &buf) + t.responBuf = new(bytes.Buffer) + ws = append(ws, t.responBuf) } w := io.MultiWriter(ws...) - s := signal.Init() var resReader io.Reader if compress_type := resp.Header[`Content-Encoding`]; len(compress_type) != 0 { @@ -262,13 +241,18 @@ func (t *Req) Reqf_1(val Rval) (err error) { } else { resReader = resp.Body } - defer resp.Body.Close() + t.running = signal.Init() + t.cancel = signal.Init() go func() { - buf := make([]byte, 1<<16) + buf := make([]byte, 512) for { if n, e := resReader.Read(buf); n != 0 { w.Write(buf[:n]) + select { + case val.SaveToChan <- buf[:n]: + default: + } } else if e != nil { if !errors.Is(e, io.EOF) { err = e @@ -281,15 +265,35 @@ func (t *Req) Reqf_1(val Rval) (err error) { break } } - s.Done() + resp.Body.Close() + if val.SaveToChan != nil { + close(val.SaveToChan) + } + if t.responFile != nil { + t.responFile.Close() + } + if val.SaveToPipeWriter != nil { + val.SaveToPipeWriter.Close() + } + if t.responBuf != nil { + t.Respon = t.responBuf.Bytes() + } + t.cancel.Done() + t.running.Done() }() - s.Wait() + if !val.Async { + t.Wait() + } // if _, e := io.Copy(w, resp.Body); e != nil { // err = e // } return } +func (t *Req) Wait() { + t.running.Wait() +} + func (t *Req) Cancel() { t.Close() } func (t *Req) Close() { diff --git a/reqf/Reqf_test.go b/reqf/Reqf_test.go index 9506be8..8c6569a 100644 --- a/reqf/Reqf_test.go +++ b/reqf/Reqf_test.go @@ -12,99 +12,16 @@ import ( web "github.com/qydysky/part/web" ) -func Test_Timeout(t *testing.T) { - r := New() - if e := r.Reqf(Rval{ - Url: `https://cdimage.debian.org/debian-cd/current/amd64/iso-cd/debian-10.9.0-amd64-netinst.iso`, - Timeout: 1000, - }); e != nil { - if !IsTimeout(e) { - t.Error(`type error`, e) - } - return - } - t.Log(`no error`) -} - -func Test_Cancel(t *testing.T) { - r := New() - - go func() { - time.Sleep(time.Second) - r.Cancel() - }() - - if e := r.Reqf(Rval{ - Url: `https://cdimage.debian.org/debian-cd/current/amd64/iso-cd/debian-10.9.0-amd64-netinst.iso`, - }); e != nil { - if !IsCancel(e) { - t.Error(`type error`, e) - } - return - } - t.Log(`no error`) -} - -func Test_Cancel_chan(t *testing.T) { - r := New() - - c := make(chan []byte, 1<<16) - - go func() { - for { - <-c - } - }() - - go func() { - time.Sleep(time.Second * 3) - r.Cancel() - }() - - if e := r.Reqf(Rval{ - Url: `https://cdimage.debian.org/debian-cd/current/amd64/iso-cd/debian-10.9.0-amd64-netinst.iso`, - SaveToChan: c, - Timeout: 5000, - }); e != nil { - if !IsCancel(e) { - t.Error(`type error`, e) - } - return - } - t.Log(`no error`) -} - -func Test_Io_Pipe(t *testing.T) { - r := New() - rp, wp := io.Pipe() - c := make(chan struct{}, 1) - go func() { - buf, _ := io.ReadAll(rp) - t.Log("Test_Io_Pipe download:", len(buf)) - t.Log("Test_Io_Pipe download:", len(r.Respon)) - close(c) - }() - if e := r.Reqf(Rval{ - Url: `https://cdimage.debian.org/debian-cd/current/amd64/iso-cd/debian-10.9.0-amd64-netinst.iso`, - SaveToPipeWriter: wp, - Timeout: 5000, - }); e != nil { - if !IsTimeout(e) { - t.Error(`type error`, e) - } - return - } - t.Log(`no error`) - <-c -} - -func Test_compress(t *testing.T) { +func Test_req(t *testing.T) { addr := "127.0.0.1:10001" s := web.New(&http.Server{ Addr: addr, WriteTimeout: time.Second * time.Duration(10), }) s.Handle(map[string]func(http.ResponseWriter, *http.Request){ + `/no`: func(w http.ResponseWriter, _ *http.Request) { + w.Write([]byte("abc强强强强")) + }, `/br`: func(w http.ResponseWriter, _ *http.Request) { d, _ := compress.InBr([]byte("abc强强强强"), 6) w.Header().Set("Content-Encoding", "br") @@ -120,6 +37,12 @@ func Test_compress(t *testing.T) { w.Header().Set("Content-Encoding", "gzip") w.Write(d) }, + `/to`: func(w http.ResponseWriter, _ *http.Request) { + time.Sleep(time.Minute) + d, _ := compress.InGzip([]byte("abc强强强强"), -1) + w.Header().Set("Content-Encoding", "gzip") + w.Write(d) + }, `/exit`: func(_ http.ResponseWriter, _ *http.Request) { s.Server.Shutdown(context.Background()) }, @@ -144,7 +67,47 @@ func Test_compress(t *testing.T) { if !bytes.Equal(r.Respon, []byte("abc强强强强")) { t.Error("flate fail") } - + { + c := make(chan []byte) + r.Reqf(Rval{ + Url: "http://" + addr + "/no", + Async: true, + SaveToChan: c, + }) + b := []byte{} + for { + buf := <-c + if len(buf) == 0 { + break + } + b = append(b, buf...) + } + if !bytes.Equal(b, []byte("abc强强强强")) { + t.Error("chan fail") + } + } + { + e := r.Reqf(Rval{ + Url: "http://" + addr + "/to", + Timeout: 1000, + }) + if !IsTimeout(e) { + t.Error("Timeout fail") + } + } + { + timer := time.NewTimer(time.Second) + go func() { + <-timer.C + r.Cancel() + }() + e := r.Reqf(Rval{ + Url: "http://" + addr + "/to", + }) + if !IsCancel(e) { + t.Error("Cancel fail") + } + } { rc, wc := io.Pipe() c := make(chan struct{}) @@ -158,6 +121,7 @@ func Test_compress(t *testing.T) { r.Reqf(Rval{ Url: "http://" + addr + "/br", SaveToPipeWriter: wc, + Async: true, }) <-c } @@ -174,6 +138,7 @@ func Test_compress(t *testing.T) { r.Reqf(Rval{ Url: "http://" + addr + "/gzip", SaveToPipeWriter: wc, + Async: true, }) <-c } @@ -190,7 +155,40 @@ func Test_compress(t *testing.T) { r.Reqf(Rval{ Url: "http://" + addr + "/flate", SaveToPipeWriter: wc, + Async: true, }) <-c } + { + r.Reqf(Rval{ + Url: "http://" + addr + "/flate", + Async: true, + }) + if len(r.Respon) != 0 { + t.Error("async fail") + } + r.Wait() + if !bytes.Equal(r.Respon, []byte("abc强强强强")) { + t.Error("async fail") + } + } + { + rc, wc := io.Pipe() + r.Reqf(Rval{ + Url: "http://" + addr + "/flate", + SaveToPipeWriter: wc, + NoResponse: true, + Async: true, + }) + if len(r.Respon) != 0 { + t.Error("io async fail") + } + d, _ := io.ReadAll(rc) + if !bytes.Equal(d, []byte("abc强强强强")) { + t.Error("io async fail") + } + if !bytes.Equal(r.Respon, []byte("abc强强强强")) { + t.Error("io async fail") + } + } } -- 2.39.2