From f7abe2f0b61d78692e7ddc92a7d2eaf4e47c1c11 Mon Sep 17 00:00:00 2001 From: qydysky Date: Sun, 25 May 2025 13:50:22 +0800 Subject: [PATCH] 1 (#58) --- reqf/Reqf.go | 147 ++++++++++++++++++++++++++++++---------------- reqf/Reqf_test.go | 55 +++++++++++++++-- web/Web_test.go | 8 +-- 3 files changed, 148 insertions(+), 62 deletions(-) diff --git a/reqf/Reqf.go b/reqf/Reqf.go index 5e933a1..f65ff0b 100644 --- a/reqf/Reqf.go +++ b/reqf/Reqf.go @@ -33,14 +33,18 @@ type Rval struct { PostStr string Proxy string Retry int - // Millisecond + // Millisecond,总体请求超时,context.DeadlineExceeded,IsTimeout()==true Timeout int - // Millisecond + // Millisecond,Retry重试间隔 SleepTime int - // Deprecated: use Timeout - WriteLoopTO int - JustResponseCode bool - NoResponse bool + // Millisecond,空闲连接释放,默认1min + IdleConnTimeout int + // Millisecond,无响应超时,ErrClientDo,IsTimeout()==true + ResponseHeaderTimeout int + // Millisecond,拷贝响应超时,ErrCopyRes + CopyResponseTimeout int + JustResponseCode bool + NoResponse bool // 当Async为true时,Respon、Response必须在Wait()之后读取,否则有DATA RACE可能 Async bool Cookies []*http.Cookie @@ -68,8 +72,7 @@ var ( ErrNewRequest = pe.Action("ErrNewRequest") ErrClientDo = pe.Action("ErrClientDo") ErrResponFileCreate = pe.Action("ErrResponFileCreate") - ErrWriteRes = pe.Action("ErrWriteRes") - ErrReadRes = pe.Action("ErrReadRes") + ErrCopyRes = pe.Action("ErrCopyRes") ErrPostStrOrRawPipe = pe.Action("ErrPostStrOrRawPipe") ErrNoDate = pe.Action("ErrNoDate") ) @@ -86,6 +89,8 @@ type Req struct { client *http.Client responFile *os.File responBuf *bytes.Buffer + reqBody io.Reader + rwTO *time.Timer err error callTree string @@ -101,27 +106,26 @@ func (t *Req) Reqf(val Rval) error { t.l.Lock() t.state.Store(running) - t.prepare(&val) - - if !val.Async { + if ctx, cancel, e := t.prepare(&val); e != nil { + return e + } else if !val.Async { // 同步 - t.reqfM(val.Ctx, val) + t.reqfM(ctx, cancel, val) return t.err } else { //异步 - go t.reqfM(val.Ctx, val) + go t.reqfM(ctx, cancel, val) } return nil } -func (t *Req) reqfM(ctxMain context.Context, val Rval) { +func (t *Req) reqfM(ctx context.Context, ctxf1 context.CancelCauseFunc, val Rval) { beginTime := time.Now() for i := 0; i <= val.Retry; i++ { - ctx, cancel := t.prepareRes(ctxMain, &val) + t.prepareRes(&val) t.err = t.reqf(ctx, val) - cancel() if t.err == nil || IsCancel(t.err) { break } @@ -130,6 +134,7 @@ func (t *Req) reqfM(ctxMain context.Context, val Rval) { } } + ctxf1(nil) t.updateUseDur(beginTime) t.clean(&val) t.state.Store(free) @@ -137,36 +142,7 @@ func (t *Req) reqfM(ctxMain context.Context, val Rval) { } func (t *Req) reqf(ctx context.Context, val Rval) (err error) { - if t.client.Transport == nil { - t.client.Transport = &http.Transport{} - } - if val.Proxy != "" { - t.client.Transport.(*http.Transport).Proxy = func(_ *http.Request) (*url.URL, error) { - return url.Parse(val.Proxy) - } - } - t.client.Transport.(*http.Transport).IdleConnTimeout = time.Minute - - if val.Url == "" { - return ErrEmptyUrl.New() - } - - var body io.Reader - if len(val.PostStr) > 0 && val.RawPipe != nil { - return ErrPostStrOrRawPipe.New() - } - if val.Retry != 0 && val.RawPipe != nil { - return ErrCantRetry.New() - } - if val.RawPipe != nil { - body = val.RawPipe - } - - if len(val.PostStr) > 0 { - body = strings.NewReader(val.PostStr) - } - - req, e := http.NewRequestWithContext(ctx, val.Method, val.Url, body) + req, e := http.NewRequestWithContext(ctx, val.Method, val.Url, t.reqBody) if e != nil { return pe.Join(ErrNewRequest.New(), e) } @@ -254,10 +230,17 @@ func (t *Req) reqf(ctx context.Context, val Rval) (err error) { // io copy { + rwTODra := time.Duration(val.CopyResponseTimeout) * time.Millisecond w := io.MultiWriter(ws...) for { + if rwTODra > 0 { + t.rwTO.Reset(rwTODra) + } n, e := resReadCloser.Read(t.copyResBuf) if n != 0 { + if rwTODra > 0 { + t.rwTO.Reset(rwTODra) + } n, e := w.Write(t.copyResBuf[:n]) if n == 0 && e != nil { if !errors.Is(e, io.EOF) { @@ -272,6 +255,7 @@ func (t *Req) reqf(ctx context.Context, val Rval) (err error) { break } } + t.rwTO.Stop() } if t.responBuf != nil { @@ -300,7 +284,7 @@ func (t *Req) IsLive() bool { return t.state.Load() == running } -func (t *Req) prepareRes(ctx context.Context, val *Rval) (ctx1 context.Context, ctxf1 context.CancelFunc) { +func (t *Req) prepareRes(val *Rval) { if !val.NoResponse { if t.responBuf == nil { t.responBuf = new(bytes.Buffer) @@ -315,15 +299,26 @@ func (t *Req) prepareRes(ctx context.Context, val *Rval) (ctx1 context.Context, t.Response = nil t.err = nil - if val.Timeout > 0 { - ctx1, ctxf1 = context.WithTimeout(ctx, time.Duration(val.Timeout)*time.Millisecond) - } else { - ctx1, ctxf1 = context.WithCancel(ctx) + if reader, ok := t.reqBody.(*strings.Reader); ok { + reader.Seek(0, io.SeekStart) } return } -func (t *Req) prepare(val *Rval) { +func (t *Req) prepare(val *Rval) (ctx1 context.Context, ctxf1 context.CancelCauseFunc, e error) { + if val.Url == "" { + e = ErrEmptyUrl.New() + return + } + if len(val.PostStr) > 0 && val.RawPipe != nil { + e = ErrPostStrOrRawPipe.New() + return + } + if val.Retry != 0 && val.RawPipe != nil { + e = ErrCantRetry.New() + return + } + t.UsedTime = 0 t.responFile = nil t.callTree = "" @@ -342,6 +337,22 @@ func (t *Req) prepare(val *Rval) { if t.client == nil { t.client = &http.Client{} } + if t.client.Transport == nil { + t.client.Transport = &http.Transport{} + } + if val.Proxy != "" { + t.client.Transport.(*http.Transport).Proxy = func(_ *http.Request) (*url.URL, error) { + return url.Parse(val.Proxy) + } + } + if val.IdleConnTimeout == 0 { + t.client.Transport.(*http.Transport).IdleConnTimeout = time.Minute + } else if val.IdleConnTimeout > 0 { + t.client.Transport.(*http.Transport).IdleConnTimeout = time.Duration(val.IdleConnTimeout) * time.Millisecond + } + if val.ResponseHeaderTimeout > 0 { + t.client.Transport.(*http.Transport).ResponseHeaderTimeout = time.Duration(val.ResponseHeaderTimeout) * time.Millisecond + } if val.Ctx == nil { val.Ctx = context.Background() } @@ -361,6 +372,38 @@ func (t *Req) prepare(val *Rval) { val.Method = "GET" } } + if val.RawPipe != nil { + t.reqBody = val.RawPipe + } + if len(val.PostStr) > 0 { + t.reqBody = strings.NewReader(val.PostStr) + } + { + var ctx context.Context + if val.Ctx != nil { + ctx = val.Ctx + } else { + ctx = context.Background() + } + if val.Timeout > 0 { + ctx, _ = context.WithTimeout(ctx, time.Duration(val.Timeout)*time.Millisecond) + } + ctx1, ctxf1 = context.WithCancelCause(ctx) + } + if t.rwTO == nil { + t.rwTO = time.NewTimer(time.Duration(val.CopyResponseTimeout) * time.Millisecond) + } + t.rwTO.Stop() + if val.CopyResponseTimeout > 0 { + go func() { + select { + case <-t.rwTO.C: + ctxf1(ErrCopyRes) + case <-ctx1.Done(): + } + }() + } + return } func (t *Req) clean(val *Rval) { diff --git a/reqf/Reqf_test.go b/reqf/Reqf_test.go index 654f734..a035007 100644 --- a/reqf/Reqf_test.go +++ b/reqf/Reqf_test.go @@ -101,6 +101,7 @@ func init() { default: w.Write([]byte{'0'}) flusher.Flush() + time.Sleep(time.Millisecond * 500) } } }, @@ -112,6 +113,9 @@ func init() { w.Header().Set(k, v[0]) } }, + `/nores`: func(w http.ResponseWriter, r *http.Request) { + <-r.Context().Done() + }, }) time.Sleep(time.Second) reuse.Reqf(Rval{ @@ -119,6 +123,16 @@ func init() { }) } +func Test_7(t *testing.T) { + e := reuse.Reqf(Rval{ + Url: "http://" + addr + "/nores", + ResponseHeaderTimeout: 500, + }) + if !IsTimeout(e) { + t.Fatal(e) + } +} + func Test_6(t *testing.T) { reuse.Reqf(Rval{ Url: "http://" + addr + "/header", @@ -156,6 +170,35 @@ func Benchmark(b *testing.B) { } } +func Test15(t *testing.T) { + i, o := io.Pipe() + + if e := reuse.Reqf(Rval{ + Url: "http://" + addr + "/stream", + NoResponse: true, + SaveToPipe: pio.NewPipeRaw(i, o), + Async: true, + CopyResponseTimeout: 100, + }); e != nil { + t.Log(e) + } + + go func() { + buf := make([]byte, 1<<8) + for { + if n, e := i.Read(buf); n != 0 { + continue + } else if e != nil { + break + } + } + }() + + if !errors.Is(reuse.Wait(), ErrCopyRes) { + t.Fatal() + } +} + func Test14(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) @@ -163,12 +206,12 @@ func Test14(t *testing.T) { r := New() if e := r.Reqf(Rval{ - Url: "http://" + addr + "/stream", - Ctx: ctx, - NoResponse: true, - SaveToPipe: pio.NewPipeRaw(i, o), - Async: true, - WriteLoopTO: 5*1000*2 + 1, + Url: "http://" + addr + "/stream", + Ctx: ctx, + NoResponse: true, + SaveToPipe: pio.NewPipeRaw(i, o), + Async: true, + CopyResponseTimeout: 5*1000*2 + 1, }); e != nil { t.Log(e) } diff --git a/web/Web_test.go b/web/Web_test.go index 1d74ba4..2095ed0 100644 --- a/web/Web_test.go +++ b/web/Web_test.go @@ -621,10 +621,10 @@ func Test_ClientBlock(t *testing.T) { close(c) }() r.Reqf(reqf.Rval{ - Url: "http://127.0.0.1:13001/to", - SaveToPipe: pio.NewPipeRaw(rc, wc), - WriteLoopTO: 5000, - Async: true, + Url: "http://127.0.0.1:13001/to", + SaveToPipe: pio.NewPipeRaw(rc, wc), + CopyResponseTimeout: 5000, + Async: true, }) <-c } -- 2.39.2