]> 127.0.0.1 Git - part/.git/commitdiff
1 (#58) v0.28.20250525055031
authorqydysky <qydysky@foxmail.com>
Sun, 25 May 2025 05:50:22 +0000 (13:50 +0800)
committerGitHub <noreply@github.com>
Sun, 25 May 2025 05:50:22 +0000 (13:50 +0800)
reqf/Reqf.go
reqf/Reqf_test.go
web/Web_test.go

index 5e933a1becefb18ee2b9247586ceffe406987a62..f65ff0b3387cb9d4df996afc05bd386ec8f596e5 100644 (file)
@@ -33,14 +33,18 @@ type Rval struct {
        PostStr string
        Proxy   string
        Retry   int
-       // Millisecond
+       // Millisecond,总体请求超时,context.DeadlineExceeded,IsTimeout()==true
        Timeout int
-       // Millisecond
+       // Millisecond,Retry重试间隔
        SleepTime int
-       // Deprecated: use Timeout
-       WriteLoopTO      int
-       JustResponseCode bool
-       NoResponse       bool
+       // Millisecond,空闲连接释放,默认1min
+       IdleConnTimeout int
+       // Millisecond,无响应超时,ErrClientDo,IsTimeout()==true
+       ResponseHeaderTimeout int
+       // Millisecond,拷贝响应超时,ErrCopyRes
+       CopyResponseTimeout int
+       JustResponseCode    bool
+       NoResponse          bool
        // 当Async为true时,Respon、Response必须在Wait()之后读取,否则有DATA RACE可能
        Async   bool
        Cookies []*http.Cookie
@@ -68,8 +72,7 @@ var (
        ErrNewRequest       = pe.Action("ErrNewRequest")
        ErrClientDo         = pe.Action("ErrClientDo")
        ErrResponFileCreate = pe.Action("ErrResponFileCreate")
-       ErrWriteRes         = pe.Action("ErrWriteRes")
-       ErrReadRes          = pe.Action("ErrReadRes")
+       ErrCopyRes          = pe.Action("ErrCopyRes")
        ErrPostStrOrRawPipe = pe.Action("ErrPostStrOrRawPipe")
        ErrNoDate           = pe.Action("ErrNoDate")
 )
@@ -86,6 +89,8 @@ type Req struct {
        client     *http.Client
        responFile *os.File
        responBuf  *bytes.Buffer
+       reqBody    io.Reader
+       rwTO       *time.Timer
        err        error
        callTree   string
 
@@ -101,27 +106,26 @@ func (t *Req) Reqf(val Rval) error {
        t.l.Lock()
        t.state.Store(running)
 
-       t.prepare(&val)
-
-       if !val.Async {
+       if ctx, cancel, e := t.prepare(&val); e != nil {
+               return e
+       } else if !val.Async {
                // 同步
-               t.reqfM(val.Ctx, val)
+               t.reqfM(ctx, cancel, val)
                return t.err
        } else {
                //异步
-               go t.reqfM(val.Ctx, val)
+               go t.reqfM(ctx, cancel, val)
        }
 
        return nil
 }
 
-func (t *Req) reqfM(ctxMain context.Context, val Rval) {
+func (t *Req) reqfM(ctx context.Context, ctxf1 context.CancelCauseFunc, val Rval) {
        beginTime := time.Now()
 
        for i := 0; i <= val.Retry; i++ {
-               ctx, cancel := t.prepareRes(ctxMain, &val)
+               t.prepareRes(&val)
                t.err = t.reqf(ctx, val)
-               cancel()
                if t.err == nil || IsCancel(t.err) {
                        break
                }
@@ -130,6 +134,7 @@ func (t *Req) reqfM(ctxMain context.Context, val Rval) {
                }
        }
 
+       ctxf1(nil)
        t.updateUseDur(beginTime)
        t.clean(&val)
        t.state.Store(free)
@@ -137,36 +142,7 @@ func (t *Req) reqfM(ctxMain context.Context, val Rval) {
 }
 
 func (t *Req) reqf(ctx context.Context, val Rval) (err error) {
-       if t.client.Transport == nil {
-               t.client.Transport = &http.Transport{}
-       }
-       if val.Proxy != "" {
-               t.client.Transport.(*http.Transport).Proxy = func(_ *http.Request) (*url.URL, error) {
-                       return url.Parse(val.Proxy)
-               }
-       }
-       t.client.Transport.(*http.Transport).IdleConnTimeout = time.Minute
-
-       if val.Url == "" {
-               return ErrEmptyUrl.New()
-       }
-
-       var body io.Reader
-       if len(val.PostStr) > 0 && val.RawPipe != nil {
-               return ErrPostStrOrRawPipe.New()
-       }
-       if val.Retry != 0 && val.RawPipe != nil {
-               return ErrCantRetry.New()
-       }
-       if val.RawPipe != nil {
-               body = val.RawPipe
-       }
-
-       if len(val.PostStr) > 0 {
-               body = strings.NewReader(val.PostStr)
-       }
-
-       req, e := http.NewRequestWithContext(ctx, val.Method, val.Url, body)
+       req, e := http.NewRequestWithContext(ctx, val.Method, val.Url, t.reqBody)
        if e != nil {
                return pe.Join(ErrNewRequest.New(), e)
        }
@@ -254,10 +230,17 @@ func (t *Req) reqf(ctx context.Context, val Rval) (err error) {
 
        // io copy
        {
+               rwTODra := time.Duration(val.CopyResponseTimeout) * time.Millisecond
                w := io.MultiWriter(ws...)
                for {
+                       if rwTODra > 0 {
+                               t.rwTO.Reset(rwTODra)
+                       }
                        n, e := resReadCloser.Read(t.copyResBuf)
                        if n != 0 {
+                               if rwTODra > 0 {
+                                       t.rwTO.Reset(rwTODra)
+                               }
                                n, e := w.Write(t.copyResBuf[:n])
                                if n == 0 && e != nil {
                                        if !errors.Is(e, io.EOF) {
@@ -272,6 +255,7 @@ func (t *Req) reqf(ctx context.Context, val Rval) (err error) {
                                break
                        }
                }
+               t.rwTO.Stop()
        }
 
        if t.responBuf != nil {
@@ -300,7 +284,7 @@ func (t *Req) IsLive() bool {
        return t.state.Load() == running
 }
 
-func (t *Req) prepareRes(ctx context.Context, val *Rval) (ctx1 context.Context, ctxf1 context.CancelFunc) {
+func (t *Req) prepareRes(val *Rval) {
        if !val.NoResponse {
                if t.responBuf == nil {
                        t.responBuf = new(bytes.Buffer)
@@ -315,15 +299,26 @@ func (t *Req) prepareRes(ctx context.Context, val *Rval) (ctx1 context.Context,
        t.Response = nil
        t.err = nil
 
-       if val.Timeout > 0 {
-               ctx1, ctxf1 = context.WithTimeout(ctx, time.Duration(val.Timeout)*time.Millisecond)
-       } else {
-               ctx1, ctxf1 = context.WithCancel(ctx)
+       if reader, ok := t.reqBody.(*strings.Reader); ok {
+               reader.Seek(0, io.SeekStart)
        }
        return
 }
 
-func (t *Req) prepare(val *Rval) {
+func (t *Req) prepare(val *Rval) (ctx1 context.Context, ctxf1 context.CancelCauseFunc, e error) {
+       if val.Url == "" {
+               e = ErrEmptyUrl.New()
+               return
+       }
+       if len(val.PostStr) > 0 && val.RawPipe != nil {
+               e = ErrPostStrOrRawPipe.New()
+               return
+       }
+       if val.Retry != 0 && val.RawPipe != nil {
+               e = ErrCantRetry.New()
+               return
+       }
+
        t.UsedTime = 0
        t.responFile = nil
        t.callTree = ""
@@ -342,6 +337,22 @@ func (t *Req) prepare(val *Rval) {
        if t.client == nil {
                t.client = &http.Client{}
        }
+       if t.client.Transport == nil {
+               t.client.Transport = &http.Transport{}
+       }
+       if val.Proxy != "" {
+               t.client.Transport.(*http.Transport).Proxy = func(_ *http.Request) (*url.URL, error) {
+                       return url.Parse(val.Proxy)
+               }
+       }
+       if val.IdleConnTimeout == 0 {
+               t.client.Transport.(*http.Transport).IdleConnTimeout = time.Minute
+       } else if val.IdleConnTimeout > 0 {
+               t.client.Transport.(*http.Transport).IdleConnTimeout = time.Duration(val.IdleConnTimeout) * time.Millisecond
+       }
+       if val.ResponseHeaderTimeout > 0 {
+               t.client.Transport.(*http.Transport).ResponseHeaderTimeout = time.Duration(val.ResponseHeaderTimeout) * time.Millisecond
+       }
        if val.Ctx == nil {
                val.Ctx = context.Background()
        }
@@ -361,6 +372,38 @@ func (t *Req) prepare(val *Rval) {
                        val.Method = "GET"
                }
        }
+       if val.RawPipe != nil {
+               t.reqBody = val.RawPipe
+       }
+       if len(val.PostStr) > 0 {
+               t.reqBody = strings.NewReader(val.PostStr)
+       }
+       {
+               var ctx context.Context
+               if val.Ctx != nil {
+                       ctx = val.Ctx
+               } else {
+                       ctx = context.Background()
+               }
+               if val.Timeout > 0 {
+                       ctx, _ = context.WithTimeout(ctx, time.Duration(val.Timeout)*time.Millisecond)
+               }
+               ctx1, ctxf1 = context.WithCancelCause(ctx)
+       }
+       if t.rwTO == nil {
+               t.rwTO = time.NewTimer(time.Duration(val.CopyResponseTimeout) * time.Millisecond)
+       }
+       t.rwTO.Stop()
+       if val.CopyResponseTimeout > 0 {
+               go func() {
+                       select {
+                       case <-t.rwTO.C:
+                               ctxf1(ErrCopyRes)
+                       case <-ctx1.Done():
+                       }
+               }()
+       }
+       return
 }
 
 func (t *Req) clean(val *Rval) {
index 654f73491961b8c00dd460e22121d97fcbe9949c..a03500762bc08b1e25f829b740e92a22d17d7e7b 100644 (file)
@@ -101,6 +101,7 @@ func init() {
                                default:
                                        w.Write([]byte{'0'})
                                        flusher.Flush()
+                                       time.Sleep(time.Millisecond * 500)
                                }
                        }
                },
@@ -112,6 +113,9 @@ func init() {
                                w.Header().Set(k, v[0])
                        }
                },
+               `/nores`: func(w http.ResponseWriter, r *http.Request) {
+                       <-r.Context().Done()
+               },
        })
        time.Sleep(time.Second)
        reuse.Reqf(Rval{
@@ -119,6 +123,16 @@ func init() {
        })
 }
 
+func Test_7(t *testing.T) {
+       e := reuse.Reqf(Rval{
+               Url:                   "http://" + addr + "/nores",
+               ResponseHeaderTimeout: 500,
+       })
+       if !IsTimeout(e) {
+               t.Fatal(e)
+       }
+}
+
 func Test_6(t *testing.T) {
        reuse.Reqf(Rval{
                Url: "http://" + addr + "/header",
@@ -156,6 +170,35 @@ func Benchmark(b *testing.B) {
        }
 }
 
+func Test15(t *testing.T) {
+       i, o := io.Pipe()
+
+       if e := reuse.Reqf(Rval{
+               Url:                 "http://" + addr + "/stream",
+               NoResponse:          true,
+               SaveToPipe:          pio.NewPipeRaw(i, o),
+               Async:               true,
+               CopyResponseTimeout: 100,
+       }); e != nil {
+               t.Log(e)
+       }
+
+       go func() {
+               buf := make([]byte, 1<<8)
+               for {
+                       if n, e := i.Read(buf); n != 0 {
+                               continue
+                       } else if e != nil {
+                               break
+                       }
+               }
+       }()
+
+       if !errors.Is(reuse.Wait(), ErrCopyRes) {
+               t.Fatal()
+       }
+}
+
 func Test14(t *testing.T) {
        ctx, cancel := context.WithCancel(context.Background())
 
@@ -163,12 +206,12 @@ func Test14(t *testing.T) {
 
        r := New()
        if e := r.Reqf(Rval{
-               Url:         "http://" + addr + "/stream",
-               Ctx:         ctx,
-               NoResponse:  true,
-               SaveToPipe:  pio.NewPipeRaw(i, o),
-               Async:       true,
-               WriteLoopTO: 5*1000*2 + 1,
+               Url:                 "http://" + addr + "/stream",
+               Ctx:                 ctx,
+               NoResponse:          true,
+               SaveToPipe:          pio.NewPipeRaw(i, o),
+               Async:               true,
+               CopyResponseTimeout: 5*1000*2 + 1,
        }); e != nil {
                t.Log(e)
        }
index 1d74ba49a9c05844d5233d63d9edd52ef37b6bcb..2095ed0fe12fcc8b0c24453810d724805b1a78f0 100644 (file)
@@ -621,10 +621,10 @@ func Test_ClientBlock(t *testing.T) {
                        close(c)
                }()
                r.Reqf(reqf.Rval{
-                       Url:         "http://127.0.0.1:13001/to",
-                       SaveToPipe:  pio.NewPipeRaw(rc, wc),
-                       WriteLoopTO: 5000,
-                       Async:       true,
+                       Url:                 "http://127.0.0.1:13001/to",
+                       SaveToPipe:          pio.NewPipeRaw(rc, wc),
+                       CopyResponseTimeout: 5000,
+                       Async:               true,
                })
                <-c
        }