]> 127.0.0.1 Git - part/.git/commitdiff
1
authorqydysky <qydysky@foxmail.com>
Tue, 3 Oct 2023 17:53:01 +0000 (01:53 +0800)
committerqydysky <qydysky@foxmail.com>
Tue, 3 Oct 2023 17:53:01 +0000 (01:53 +0800)
reqf/RawReqRes.go [new file with mode: 0644]
reqf/Reqf.go
reqf/Reqf_test.go

diff --git a/reqf/RawReqRes.go b/reqf/RawReqRes.go
new file mode 100644 (file)
index 0000000..9bbdcd6
--- /dev/null
@@ -0,0 +1,94 @@
+package part
+
+import (
+       "io"
+
+       pio "github.com/qydysky/part/io"
+)
+
+type RawReqRes struct {
+       req  *pio.IOpipe
+       res  *pio.IOpipe
+       reqC chan struct{}
+       resC chan struct{}
+}
+
+func NewRawReqRes() *RawReqRes {
+       return &RawReqRes{req: pio.NewPipe(), res: pio.NewPipe(), reqC: make(chan struct{}), resC: make(chan struct{})}
+}
+
+func (t RawReqRes) ReqClose() error {
+       select {
+       case <-t.reqC:
+               return nil
+       default:
+               close(t.reqC)
+               return t.req.Close()
+       }
+}
+
+func (t RawReqRes) ReqCloseWithError(e error) error {
+       select {
+       case <-t.reqC:
+               return nil
+       default:
+               close(t.reqC)
+               return t.req.CloseWithError(e)
+       }
+}
+
+func (t RawReqRes) ResClose() error {
+       select {
+       case <-t.resC:
+               return nil
+       default:
+               close(t.resC)
+               return t.res.Close()
+       }
+}
+
+func (t RawReqRes) ResCloseWithError(e error) error {
+       select {
+       case <-t.resC:
+               return nil
+       default:
+               close(t.resC)
+               return t.res.CloseWithError(e)
+       }
+}
+
+func (t RawReqRes) Write(p []byte) (n int, err error) {
+       select {
+       case <-t.reqC:
+               return t.res.Write(p)
+       default:
+               return 0, io.EOF
+       }
+}
+
+func (t RawReqRes) Read(p []byte) (n int, err error) {
+       select {
+       case <-t.reqC:
+               return 0, io.EOF
+       default:
+               return t.req.Read(p)
+       }
+}
+
+func (t RawReqRes) ReqWrite(p []byte) (n int, err error) {
+       select {
+       case <-t.reqC:
+               return 0, io.EOF
+       default:
+               return t.req.Write(p)
+       }
+}
+
+func (t RawReqRes) ResRead(p []byte) (n int, err error) {
+       select {
+       case <-t.reqC:
+               return t.res.Read(p)
+       default:
+               return 0, io.EOF
+       }
+}
index 77d15a73ac653bfcdb72109c572ef8ed1492cb2e..3dacd121821d86dbeba3b489a18829ff4987be78 100644 (file)
@@ -26,6 +26,7 @@ import (
 )
 
 type Rval struct {
+       Method  string
        Url     string
        PostStr string
        Proxy   string
@@ -47,6 +48,8 @@ type Rval struct {
        // 为避免write阻塞导致panic,请使用此项目io包中的NewPipe(),或在ctx done时,自行关闭pipe writer reader
        SaveToPipe *pio.IOpipe
 
+       RawPipe *RawReqRes
+
        Header map[string]string
 }
 
@@ -59,11 +62,14 @@ const (
 
 var (
        ErrEmptyUrl         = errors.New("ErrEmptyUrl")
+       ErrMustAsync        = errors.New("ErrMustAsync")
+       ErrCantRetry        = errors.New("ErrCantRetry")
        ErrNewRequest       = errors.New("ErrNewRequest")
        ErrClientDo         = errors.New("ErrClientDo")
        ErrResponFileCreate = errors.New("ErrResponFileCreate")
        ErrWriteRes         = errors.New("ErrWriteRes")
        ErrReadRes          = errors.New("ErrReadRes")
+       ErrPostStrOrRawPipe = errors.New("ErrPostStrOrRawPipe")
 )
 
 type Req struct {
@@ -96,56 +102,43 @@ func (t *Req) Reqf(val Rval) error {
        pctx, cancelF := t.prepare(&val)
        t.cancelP.Store(&cancelF)
 
-       // 同步
        if !val.Async {
-               beginTime := time.Now()
-
-               for i := 0; i <= val.Retry; i++ {
-                       ctx, cancel := t.prepareRes(pctx, &val)
-                       t.err = t.Reqf_1(ctx, val)
-                       cancel()
-                       if t.err == nil || IsCancel(t.err) {
-                               break
-                       }
-                       if val.SleepTime != 0 {
-                               time.Sleep(time.Duration(val.SleepTime * int(time.Millisecond)))
-                       }
-               }
-
-               cancelF()
-               t.updateUseDur(beginTime)
-               t.clean(&val)
-               t.state.Store(free)
-               t.l.Unlock()
-               return t.err
+               // 同步
+               return t.reqfM(pctx, cancelF, val)
+       } else {
+               //异步
+               go func() {
+                       _ = t.reqfM(pctx, cancelF, val)
+               }()
        }
 
-       //异步
-       go func() {
-               beginTime := time.Now()
+       return nil
+}
 
-               for i := 0; i <= val.Retry; i++ {
-                       ctx, cancel := t.prepareRes(pctx, &val)
-                       t.err = t.Reqf_1(ctx, val)
-                       cancel()
-                       if t.err == nil || IsCancel(t.err) {
-                               break
-                       }
-                       if val.SleepTime != 0 {
-                               time.Sleep(time.Duration(val.SleepTime * int(time.Millisecond)))
-                       }
+func (t *Req) reqfM(ctx context.Context, cancel context.CancelFunc, val Rval) error {
+       beginTime := time.Now()
+
+       for i := 0; i <= val.Retry; i++ {
+               ctx, cancel := t.prepareRes(ctx, &val)
+               t.err = t.reqf(ctx, val)
+               cancel()
+               if t.err == nil || IsCancel(t.err) {
+                       break
                }
+               if val.SleepTime != 0 {
+                       time.Sleep(time.Duration(val.SleepTime * int(time.Millisecond)))
+               }
+       }
 
-               cancelF()
-               t.updateUseDur(beginTime)
-               t.clean(&val)
-               t.state.Store(free)
-               t.l.Unlock()
-       }()
-       return nil
+       cancel()
+       t.updateUseDur(beginTime)
+       t.clean(&val)
+       t.state.Store(free)
+       t.l.Unlock()
+       return t.err
 }
 
-func (t *Req) Reqf_1(ctx context.Context, val Rval) (err error) {
+func (t *Req) reqf(ctx context.Context, val Rval) (err error) {
        var (
                Header map[string]string = val.Header
                client http.Client
@@ -173,17 +166,27 @@ func (t *Req) Reqf_1(ctx context.Context, val Rval) (err error) {
                return ErrEmptyUrl
        }
 
-       Method := "GET"
        var body io.Reader
+       if len(val.PostStr) > 0 && val.RawPipe != nil {
+               return ErrPostStrOrRawPipe
+       }
+       if val.Retry != 0 && val.RawPipe != nil {
+               return ErrCantRetry
+       }
+       if val.SaveToPipe != nil && !val.Async {
+               return ErrMustAsync
+       }
+       if val.RawPipe != nil {
+               body = val.RawPipe
+       }
        if len(val.PostStr) > 0 {
-               Method = "POST"
                body = strings.NewReader(val.PostStr)
                if _, ok := Header["Content-Type"]; !ok {
                        Header["Content-Type"] = "application/x-www-form-urlencoded"
                }
        }
 
-       req, e := http.NewRequestWithContext(ctx, Method, val.Url, body)
+       req, e := http.NewRequestWithContext(ctx, val.Method, val.Url, body)
        if e != nil {
                return errors.Join(ErrNewRequest, e)
        }
@@ -244,6 +247,9 @@ func (t *Req) Reqf_1(ctx context.Context, val Rval) (err error) {
        if val.SaveToPipe != nil {
                ws = append(ws, val.SaveToPipe)
        }
+       if val.RawPipe != nil {
+               ws = append(ws, val.RawPipe)
+       }
        if !val.NoResponse {
                ws = append(ws, t.responBuf)
        }
@@ -354,6 +360,21 @@ func (t *Req) prepare(val *Rval) (ctx context.Context, cancel context.CancelFunc
                        }
                }()
        }
+       if val.RawPipe != nil {
+               go func() {
+                       <-ctx.Done()
+                       if e := val.RawPipe.ResCloseWithError(context.Canceled); e != nil {
+                               println(e)
+                       }
+               }()
+       }
+
+       if val.Method == "" {
+               val.Method = "GET"
+               if len(val.PostStr) > 0 {
+                       val.Method = "POST"
+               }
+       }
 
        return
 }
@@ -365,6 +386,10 @@ func (t *Req) clean(val *Rval) {
        if val.SaveToPipe != nil {
                val.SaveToPipe.Close()
        }
+       if val.RawPipe != nil {
+               val.RawPipe.ReqClose()
+               val.RawPipe.ResClose()
+       }
 }
 
 func (t *Req) updateUseDur(u time.Time) {
index 6e5c05547857319141341e7932ad916cfe5eb838..b5d58ad766d2ff9d3fc29d05a843a43c05124384 100644 (file)
@@ -29,6 +29,9 @@ func init() {
                        code, _ := strconv.Atoi(r.URL.Query().Get(`code`))
                        w.WriteHeader(code)
                },
+               `/reply`: func(w http.ResponseWriter, r *http.Request) {
+                       io.Copy(w, r.Body)
+               },
                `/no`: func(w http.ResponseWriter, _ *http.Request) {
                        w.Write([]byte("abc强强强强"))
                },
@@ -476,3 +479,35 @@ func Test_req3(t *testing.T) {
                wg.Wait()
        }
 }
+
+func Test_req5(t *testing.T) {
+       r := New()
+       r.Reqf(Rval{
+               Url:     "http://" + addr + "/reply",
+               PostStr: "123",
+       })
+       if !bytes.Equal(r.Respon, []byte("123")) {
+               t.Fatal()
+       }
+
+       raw := NewRawReqRes()
+       buf := []byte("123")
+       r.Reqf(Rval{
+               Url:        "http://" + addr + "/reply",
+               Async:      true,
+               RawPipe:    raw,
+               NoResponse: true,
+       })
+       if _, e := raw.ReqWrite(buf); e != nil {
+               t.Fatal(e)
+       }
+       raw.ReqClose()
+       clear(buf)
+       if _, e := raw.ResRead(buf); e != nil && !errors.Is(e, io.EOF) {
+               t.Fatal(e)
+       }
+       if !bytes.Equal([]byte("123"), buf) {
+               t.Log(r.Respon, buf)
+               t.Fatal()
+       }
+}