From: qydysky Date: Sun, 16 Jul 2023 19:12:39 +0000 (+0800) Subject: add X-Git-Tag: v0.28.0+20230716eec7661~1 X-Git-Url: http://127.0.0.1:8081/?a=commitdiff_plain;h=fb1c8f5697cb889939895cb8124dbea15a240860;p=part%2F.git add --- diff --git a/io/io.go b/io/io.go index 475e3f9..f1e2af5 100644 --- a/io/io.go +++ b/io/io.go @@ -78,7 +78,7 @@ func WithCtxTO(ctx context.Context, callTree string, to time.Duration, w io.Writ case <-ctx.Done(): if old := chanw.Load(); old == -1 { return - } else if now := time.Now(); old > 0 && now.Unix()-old > int64(to.Seconds()) { + } else if now := time.Now(); old > 0 && now.Unix()-old >= int64(to.Seconds()) { if old != 0 { panicf[0](fmt.Sprintf("rw blocking while close %vs > %v, goruntime leak \n%v", now.Unix()-old, to, callTree)) } @@ -86,7 +86,7 @@ func WithCtxTO(ctx context.Context, callTree string, to time.Duration, w io.Writ time.AfterFunc(to, func() { if old := chanw.Load(); old == -1 { return - } else if now := time.Now(); old > 0 && now.Unix()-old > int64(to.Seconds()) { + } else if now := time.Now(); old > 0 && now.Unix()-old >= int64(to.Seconds()) { panicf[0](fmt.Sprintf("rw blocking after close %vs > %v, goruntime leak \n%v", now.Unix()-old, to, callTree)) } }) @@ -107,9 +107,7 @@ func WithCtxTO(ctx context.Context, callTree string, to time.Duration, w io.Writ func(p []byte) (n int, err error) { select { case <-ctx.Done(): - chanw.Store(-1) err = context.Canceled - return default: if n, err = r.Read(p); n != 0 { chanw.Store(time.Now().Unix()) @@ -120,9 +118,7 @@ func WithCtxTO(ctx context.Context, callTree string, to time.Duration, w io.Writ func(p []byte) (n int, err error) { select { case <-ctx.Done(): - chanw.Store(-1) err = context.Canceled - return default: if n, err = w.Write(p); n != 0 { chanw.Store(time.Now().Unix()) diff --git a/reqf/Reqf.go b/reqf/Reqf.go index 0233921..0b22346 100644 --- a/reqf/Reqf.go +++ b/reqf/Reqf.go @@ -70,7 +70,7 @@ type Req struct { Response *http.Response UsedTime time.Duration - cancelP atomic.Pointer[func()] + cancelP atomic.Pointer[context.CancelFunc] ctx context.Context state atomic.Int32 @@ -90,18 +90,17 @@ func (t *Req) Reqf(val Rval) error { t.l.Lock() t.state.Store(running) - t.prepare(&val) + pctx, cancelF := t.prepare(&val) + t.cancelP.Store(&cancelF) // 同步 if !val.Async { beginTime := time.Now() for i := 0; i <= val.Retry; i++ { - ctx, cancle := t.prepareRes(&val) + ctx, cancel := t.prepareRes(pctx, &val) t.err = t.Reqf_1(ctx, val) - if cancle != nil { - cancle() - } + cancel() if t.err == nil || IsCancel(t.err) { break } @@ -110,6 +109,7 @@ func (t *Req) Reqf(val Rval) error { } } + cancelF() t.updateUseDur(beginTime) t.clean(&val) t.state.Store(free) @@ -122,11 +122,9 @@ func (t *Req) Reqf(val Rval) error { beginTime := time.Now() for i := 0; i <= val.Retry; i++ { - ctx, cancle := t.prepareRes(&val) + ctx, cancel := t.prepareRes(pctx, &val) t.err = t.Reqf_1(ctx, val) - if cancle != nil { - cancle() - } + cancel() if t.err == nil || IsCancel(t.err) { break } @@ -135,6 +133,7 @@ func (t *Req) Reqf(val Rval) error { } } + cancelF() t.updateUseDur(beginTime) t.clean(&val) t.state.Store(free) @@ -250,7 +249,7 @@ func (t *Req) Reqf_1(ctx context.Context, val Rval) (err error) { w := io.MultiWriter(ws...) - var resReadCloser io.ReadCloser + var resReadCloser = resp.Body if compress_type := resp.Header[`Content-Encoding`]; len(compress_type) != 0 { switch compress_type[0] { case `br`: @@ -260,28 +259,25 @@ func (t *Req) Reqf_1(ctx context.Context, val Rval) (err error) { case `deflate`: resReadCloser = flate.NewReader(resp.Body) default: - resReadCloser = resp.Body } - } else { - resReadCloser = resp.Body } writeLoopTO := val.WriteLoopTO if writeLoopTO == 0 { writeLoopTO = 1000 } - rwc := pio.WithCtxTO(ctx, t.callTree, time.Duration(int(time.Millisecond)*writeLoopTO), w, resReadCloser) + rwc := pio.WithCtxTO(req.Context(), t.callTree, time.Duration(int(time.Millisecond)*writeLoopTO), w, resReadCloser) defer rwc.Close() for buf := make([]byte, 2048); true; { if n, e := rwc.Read(buf); n != 0 { - if wn, we := rwc.Write(buf[:n]); wn != 0 { + if n, e := rwc.Write(buf[:n]); n != 0 { if val.SaveToChan != nil { val.SaveToChan <- buf[:n] } - } else if we != nil { - if !errors.Is(we, io.EOF) { - err = errors.Join(err, ErrWriteRes, we) + } else if e != nil { + if !errors.Is(e, io.EOF) { + err = errors.Join(err, ErrWriteRes, e) } break } @@ -320,7 +316,7 @@ func (t *Req) IsLive() bool { return t.state.Load() == running } -func (t *Req) prepareRes(val *Rval) (context.Context, context.CancelFunc) { +func (t *Req) prepareRes(ctx context.Context, val *Rval) (context.Context, context.CancelFunc) { if !val.NoResponse { if t.responBuf == nil { t.responBuf = new(bytes.Buffer) @@ -335,12 +331,12 @@ func (t *Req) prepareRes(val *Rval) (context.Context, context.CancelFunc) { t.Response = nil t.err = nil if val.Timeout > 0 { - return context.WithTimeout(t.ctx, time.Duration(val.Timeout*int(time.Millisecond))) + return context.WithTimeout(ctx, time.Duration(val.Timeout*int(time.Millisecond))) } - return t.ctx, nil + return context.WithCancel(ctx) } -func (t *Req) prepare(val *Rval) { +func (t *Req) prepare(val *Rval) (ctx context.Context, cancel context.CancelFunc) { t.UsedTime = 0 t.responFile = nil t.callTree = "" @@ -351,19 +347,14 @@ func (t *Req) prepare(val *Rval) { t.callTree += fmt.Sprintf("call by %s\n\t%s:%d\n", runtime.FuncForPC(pc).Name(), file, line) } } - var cancel func() if val.Ctx != nil { - t.ctx, cancel = context.WithCancel(val.Ctx) + return context.WithCancel(val.Ctx) } else { - t.ctx, cancel = context.WithCancel(context.Background()) + return context.WithCancel(context.Background()) } - t.cancelP.Store(&cancel) } func (t *Req) clean(val *Rval) { - if p := t.cancelP.Load(); p != nil { - (*p)() - } if val.SaveToChan != nil { close(val.SaveToChan) }