]> 127.0.0.1 Git - part/.git/commitdiff
update v0.19.0
authorqydysky <32743305+qydysky@users.noreply.github.com>
Sun, 13 Nov 2022 14:51:57 +0000 (22:51 +0800)
committerqydysky <32743305+qydysky@users.noreply.github.com>
Sun, 13 Nov 2022 14:51:57 +0000 (22:51 +0800)
reqf/Reqf.go
reqf/Reqf_test.go

index 671d5cb6fa50e5d64c49f684a005f36881b9d863..0a8d2e0d7c75105bd78faa4d52812aa8d99c9e41 100644 (file)
@@ -11,7 +11,6 @@ import (
        "os"
        "strconv"
        "strings"
-       "sync"
        "time"
 
        flate "compress/flate"
@@ -34,10 +33,11 @@ type Rval struct {
        SleepTime        int
        JustResponseCode bool
        NoResponse       bool
+       Async            bool
        Cookies          []*http.Cookie
 
        SaveToPath       string
-       SaveToChan       chan []byte // deprecated
+       SaveToChan       chan []byte
        SaveToPipeWriter *io.PipeWriter
 
        Header map[string]string
@@ -48,8 +48,10 @@ type Req struct {
        Response *http.Response
        UsedTime time.Duration
 
-       cancel *signal.Signal
-       sync.Mutex
+       cancel     *signal.Signal
+       running    *signal.Signal
+       responBuf  *bytes.Buffer
+       responFile *os.File
 }
 
 func New() *Req {
@@ -67,8 +69,12 @@ func New() *Req {
 // }
 
 func (t *Req) Reqf(val Rval) error {
-       t.Lock()
-       defer t.Unlock()
+       if val.SaveToChan != nil && len(val.SaveToChan) == 1 && !val.Async {
+               panic("must make sure chan size larger then 1 or use Async true")
+       }
+       if val.SaveToPipeWriter != nil && !val.Async {
+               panic("SaveToPipeWriter must use Async true")
+       }
 
        t.Respon = []byte{}
        t.Response = nil
@@ -78,9 +84,6 @@ func (t *Req) Reqf(val Rval) error {
 
        _val := val
 
-       t.cancel = signal.Init()
-       defer t.cancel.Done()
-
        for SleepTime, Retry := _val.SleepTime, _val.Retry; Retry >= 0; Retry -= 1 {
                returnErr = t.Reqf_1(_val)
                select {
@@ -98,7 +101,6 @@ func (t *Req) Reqf(val Rval) error {
 }
 
 func (t *Req) Reqf_1(val Rval) (err error) {
-
        var (
                Header map[string]string = val.Header
        )
@@ -207,45 +209,22 @@ func (t *Req) Reqf_1(val Rval) (err error) {
 
        var ws []io.Writer
        if val.SaveToPath != "" {
-               out, err := os.Create(val.SaveToPath)
+               t.responFile, err = os.Create(val.SaveToPath)
                if err != nil {
-                       out.Close()
+                       t.responFile.Close()
                        return err
                }
-               defer out.Close()
-               ws = append(ws, out)
+               ws = append(ws, t.responFile)
        }
        if val.SaveToPipeWriter != nil {
-               defer val.SaveToPipeWriter.Close()
                ws = append(ws, val.SaveToPipeWriter)
        }
-       // if val.SaveToChan != nil {
-       //      r, w := io.Pipe()
-       //      go func() {
-       //              buf := make([]byte, 1<<16)
-       //              for {
-       //                      n, e := r.Read(buf)
-       //                      if n != 0 {
-       //                              val.SaveToChan <- buf[:n]
-       //                      } else if e != nil {
-       //                              defer close(val.SaveToChan)
-       //                              break
-       //                      }
-       //              }
-       //      }()
-       //      defer w.Close()
-       //      ws = append(ws, w)
-       // }
        if !val.NoResponse {
-               var buf bytes.Buffer
-               defer func() {
-                       t.Respon = buf.Bytes()
-               }()
-               ws = append(ws, &buf)
+               t.responBuf = new(bytes.Buffer)
+               ws = append(ws, t.responBuf)
        }
 
        w := io.MultiWriter(ws...)
-       s := signal.Init()
 
        var resReader io.Reader
        if compress_type := resp.Header[`Content-Encoding`]; len(compress_type) != 0 {
@@ -262,13 +241,18 @@ func (t *Req) Reqf_1(val Rval) (err error) {
        } else {
                resReader = resp.Body
        }
-       defer resp.Body.Close()
 
+       t.running = signal.Init()
+       t.cancel = signal.Init()
        go func() {
-               buf := make([]byte, 1<<16)
+               buf := make([]byte, 512)
                for {
                        if n, e := resReader.Read(buf); n != 0 {
                                w.Write(buf[:n])
+                               select {
+                               case val.SaveToChan <- buf[:n]:
+                               default:
+                               }
                        } else if e != nil {
                                if !errors.Is(e, io.EOF) {
                                        err = e
@@ -281,15 +265,35 @@ func (t *Req) Reqf_1(val Rval) (err error) {
                                break
                        }
                }
-               s.Done()
+               resp.Body.Close()
+               if val.SaveToChan != nil {
+                       close(val.SaveToChan)
+               }
+               if t.responFile != nil {
+                       t.responFile.Close()
+               }
+               if val.SaveToPipeWriter != nil {
+                       val.SaveToPipeWriter.Close()
+               }
+               if t.responBuf != nil {
+                       t.Respon = t.responBuf.Bytes()
+               }
+               t.cancel.Done()
+               t.running.Done()
        }()
-       s.Wait()
+       if !val.Async {
+               t.Wait()
+       }
        // if _, e := io.Copy(w, resp.Body); e != nil {
        //      err = e
        // }
        return
 }
 
+func (t *Req) Wait() {
+       t.running.Wait()
+}
+
 func (t *Req) Cancel() { t.Close() }
 
 func (t *Req) Close() {
index 9506be83b6c697417bc78d18675774b9dceff40b..8c6569a8085c2cc345b9fd66f7d8ab01ba635d16 100644 (file)
@@ -12,99 +12,16 @@ import (
        web "github.com/qydysky/part/web"
 )
 
-func Test_Timeout(t *testing.T) {
-       r := New()
-       if e := r.Reqf(Rval{
-               Url:     `https://cdimage.debian.org/debian-cd/current/amd64/iso-cd/debian-10.9.0-amd64-netinst.iso`,
-               Timeout: 1000,
-       }); e != nil {
-               if !IsTimeout(e) {
-                       t.Error(`type error`, e)
-               }
-               return
-       }
-       t.Log(`no error`)
-}
-
-func Test_Cancel(t *testing.T) {
-       r := New()
-
-       go func() {
-               time.Sleep(time.Second)
-               r.Cancel()
-       }()
-
-       if e := r.Reqf(Rval{
-               Url: `https://cdimage.debian.org/debian-cd/current/amd64/iso-cd/debian-10.9.0-amd64-netinst.iso`,
-       }); e != nil {
-               if !IsCancel(e) {
-                       t.Error(`type error`, e)
-               }
-               return
-       }
-       t.Log(`no error`)
-}
-
-func Test_Cancel_chan(t *testing.T) {
-       r := New()
-
-       c := make(chan []byte, 1<<16)
-
-       go func() {
-               for {
-                       <-c
-               }
-       }()
-
-       go func() {
-               time.Sleep(time.Second * 3)
-               r.Cancel()
-       }()
-
-       if e := r.Reqf(Rval{
-               Url:        `https://cdimage.debian.org/debian-cd/current/amd64/iso-cd/debian-10.9.0-amd64-netinst.iso`,
-               SaveToChan: c,
-               Timeout:    5000,
-       }); e != nil {
-               if !IsCancel(e) {
-                       t.Error(`type error`, e)
-               }
-               return
-       }
-       t.Log(`no error`)
-}
-
-func Test_Io_Pipe(t *testing.T) {
-       r := New()
-       rp, wp := io.Pipe()
-       c := make(chan struct{}, 1)
-       go func() {
-               buf, _ := io.ReadAll(rp)
-               t.Log("Test_Io_Pipe download:", len(buf))
-               t.Log("Test_Io_Pipe download:", len(r.Respon))
-               close(c)
-       }()
-       if e := r.Reqf(Rval{
-               Url:              `https://cdimage.debian.org/debian-cd/current/amd64/iso-cd/debian-10.9.0-amd64-netinst.iso`,
-               SaveToPipeWriter: wp,
-               Timeout:          5000,
-       }); e != nil {
-               if !IsTimeout(e) {
-                       t.Error(`type error`, e)
-               }
-               return
-       }
-       t.Log(`no error`)
-       <-c
-}
-
-func Test_compress(t *testing.T) {
+func Test_req(t *testing.T) {
        addr := "127.0.0.1:10001"
        s := web.New(&http.Server{
                Addr:         addr,
                WriteTimeout: time.Second * time.Duration(10),
        })
        s.Handle(map[string]func(http.ResponseWriter, *http.Request){
+               `/no`: func(w http.ResponseWriter, _ *http.Request) {
+                       w.Write([]byte("abc强强强强"))
+               },
                `/br`: func(w http.ResponseWriter, _ *http.Request) {
                        d, _ := compress.InBr([]byte("abc强强强强"), 6)
                        w.Header().Set("Content-Encoding", "br")
@@ -120,6 +37,12 @@ func Test_compress(t *testing.T) {
                        w.Header().Set("Content-Encoding", "gzip")
                        w.Write(d)
                },
+               `/to`: func(w http.ResponseWriter, _ *http.Request) {
+                       time.Sleep(time.Minute)
+                       d, _ := compress.InGzip([]byte("abc强强强强"), -1)
+                       w.Header().Set("Content-Encoding", "gzip")
+                       w.Write(d)
+               },
                `/exit`: func(_ http.ResponseWriter, _ *http.Request) {
                        s.Server.Shutdown(context.Background())
                },
@@ -144,7 +67,47 @@ func Test_compress(t *testing.T) {
        if !bytes.Equal(r.Respon, []byte("abc强强强强")) {
                t.Error("flate fail")
        }
-
+       {
+               c := make(chan []byte)
+               r.Reqf(Rval{
+                       Url:        "http://" + addr + "/no",
+                       Async:      true,
+                       SaveToChan: c,
+               })
+               b := []byte{}
+               for {
+                       buf := <-c
+                       if len(buf) == 0 {
+                               break
+                       }
+                       b = append(b, buf...)
+               }
+               if !bytes.Equal(b, []byte("abc强强强强")) {
+                       t.Error("chan fail")
+               }
+       }
+       {
+               e := r.Reqf(Rval{
+                       Url:     "http://" + addr + "/to",
+                       Timeout: 1000,
+               })
+               if !IsTimeout(e) {
+                       t.Error("Timeout fail")
+               }
+       }
+       {
+               timer := time.NewTimer(time.Second)
+               go func() {
+                       <-timer.C
+                       r.Cancel()
+               }()
+               e := r.Reqf(Rval{
+                       Url: "http://" + addr + "/to",
+               })
+               if !IsCancel(e) {
+                       t.Error("Cancel fail")
+               }
+       }
        {
                rc, wc := io.Pipe()
                c := make(chan struct{})
@@ -158,6 +121,7 @@ func Test_compress(t *testing.T) {
                r.Reqf(Rval{
                        Url:              "http://" + addr + "/br",
                        SaveToPipeWriter: wc,
+                       Async:            true,
                })
                <-c
        }
@@ -174,6 +138,7 @@ func Test_compress(t *testing.T) {
                r.Reqf(Rval{
                        Url:              "http://" + addr + "/gzip",
                        SaveToPipeWriter: wc,
+                       Async:            true,
                })
                <-c
        }
@@ -190,7 +155,40 @@ func Test_compress(t *testing.T) {
                r.Reqf(Rval{
                        Url:              "http://" + addr + "/flate",
                        SaveToPipeWriter: wc,
+                       Async:            true,
                })
                <-c
        }
+       {
+               r.Reqf(Rval{
+                       Url:   "http://" + addr + "/flate",
+                       Async: true,
+               })
+               if len(r.Respon) != 0 {
+                       t.Error("async fail")
+               }
+               r.Wait()
+               if !bytes.Equal(r.Respon, []byte("abc强强强强")) {
+                       t.Error("async fail")
+               }
+       }
+       {
+               rc, wc := io.Pipe()
+               r.Reqf(Rval{
+                       Url:              "http://" + addr + "/flate",
+                       SaveToPipeWriter: wc,
+                       NoResponse:       true,
+                       Async:            true,
+               })
+               if len(r.Respon) != 0 {
+                       t.Error("io async fail")
+               }
+               d, _ := io.ReadAll(rc)
+               if !bytes.Equal(d, []byte("abc强强强强")) {
+                       t.Error("io async fail")
+               }
+               if !bytes.Equal(r.Respon, []byte("abc强强强强")) {
+                       t.Error("io async fail")
+               }
+       }
 }