case <-ctx.Done():
if old := chanw.Load(); old == -1 {
return
- } else if now := time.Now(); old > 0 && now.Unix()-old > int64(to.Seconds()) {
+ } else if now := time.Now(); old > 0 && now.Unix()-old >= int64(to.Seconds()) {
if old != 0 {
panicf[0](fmt.Sprintf("rw blocking while close %vs > %v, goruntime leak \n%v", now.Unix()-old, to, callTree))
}
time.AfterFunc(to, func() {
if old := chanw.Load(); old == -1 {
return
- } else if now := time.Now(); old > 0 && now.Unix()-old > int64(to.Seconds()) {
+ } else if now := time.Now(); old > 0 && now.Unix()-old >= int64(to.Seconds()) {
panicf[0](fmt.Sprintf("rw blocking after close %vs > %v, goruntime leak \n%v", now.Unix()-old, to, callTree))
}
})
func(p []byte) (n int, err error) {
select {
case <-ctx.Done():
- chanw.Store(-1)
err = context.Canceled
- return
default:
if n, err = r.Read(p); n != 0 {
chanw.Store(time.Now().Unix())
func(p []byte) (n int, err error) {
select {
case <-ctx.Done():
- chanw.Store(-1)
err = context.Canceled
- return
default:
if n, err = w.Write(p); n != 0 {
chanw.Store(time.Now().Unix())
Response *http.Response
UsedTime time.Duration
- cancelP atomic.Pointer[func()]
+ cancelP atomic.Pointer[context.CancelFunc]
ctx context.Context
state atomic.Int32
t.l.Lock()
t.state.Store(running)
- t.prepare(&val)
+ pctx, cancelF := t.prepare(&val)
+ t.cancelP.Store(&cancelF)
// 同步
if !val.Async {
beginTime := time.Now()
for i := 0; i <= val.Retry; i++ {
- ctx, cancle := t.prepareRes(&val)
+ ctx, cancel := t.prepareRes(pctx, &val)
t.err = t.Reqf_1(ctx, val)
- if cancle != nil {
- cancle()
- }
+ cancel()
if t.err == nil || IsCancel(t.err) {
break
}
}
}
+ cancelF()
t.updateUseDur(beginTime)
t.clean(&val)
t.state.Store(free)
beginTime := time.Now()
for i := 0; i <= val.Retry; i++ {
- ctx, cancle := t.prepareRes(&val)
+ ctx, cancel := t.prepareRes(pctx, &val)
t.err = t.Reqf_1(ctx, val)
- if cancle != nil {
- cancle()
- }
+ cancel()
if t.err == nil || IsCancel(t.err) {
break
}
}
}
+ cancelF()
t.updateUseDur(beginTime)
t.clean(&val)
t.state.Store(free)
w := io.MultiWriter(ws...)
- var resReadCloser io.ReadCloser
+ var resReadCloser = resp.Body
if compress_type := resp.Header[`Content-Encoding`]; len(compress_type) != 0 {
switch compress_type[0] {
case `br`:
case `deflate`:
resReadCloser = flate.NewReader(resp.Body)
default:
- resReadCloser = resp.Body
}
- } else {
- resReadCloser = resp.Body
}
writeLoopTO := val.WriteLoopTO
if writeLoopTO == 0 {
writeLoopTO = 1000
}
- rwc := pio.WithCtxTO(ctx, t.callTree, time.Duration(int(time.Millisecond)*writeLoopTO), w, resReadCloser)
+ rwc := pio.WithCtxTO(req.Context(), t.callTree, time.Duration(int(time.Millisecond)*writeLoopTO), w, resReadCloser)
defer rwc.Close()
for buf := make([]byte, 2048); true; {
if n, e := rwc.Read(buf); n != 0 {
- if wn, we := rwc.Write(buf[:n]); wn != 0 {
+ if n, e := rwc.Write(buf[:n]); n != 0 {
if val.SaveToChan != nil {
val.SaveToChan <- buf[:n]
}
- } else if we != nil {
- if !errors.Is(we, io.EOF) {
- err = errors.Join(err, ErrWriteRes, we)
+ } else if e != nil {
+ if !errors.Is(e, io.EOF) {
+ err = errors.Join(err, ErrWriteRes, e)
}
break
}
return t.state.Load() == running
}
-func (t *Req) prepareRes(val *Rval) (context.Context, context.CancelFunc) {
+func (t *Req) prepareRes(ctx context.Context, val *Rval) (context.Context, context.CancelFunc) {
if !val.NoResponse {
if t.responBuf == nil {
t.responBuf = new(bytes.Buffer)
t.Response = nil
t.err = nil
if val.Timeout > 0 {
- return context.WithTimeout(t.ctx, time.Duration(val.Timeout*int(time.Millisecond)))
+ return context.WithTimeout(ctx, time.Duration(val.Timeout*int(time.Millisecond)))
}
- return t.ctx, nil
+ return context.WithCancel(ctx)
}
-func (t *Req) prepare(val *Rval) {
+func (t *Req) prepare(val *Rval) (ctx context.Context, cancel context.CancelFunc) {
t.UsedTime = 0
t.responFile = nil
t.callTree = ""
t.callTree += fmt.Sprintf("call by %s\n\t%s:%d\n", runtime.FuncForPC(pc).Name(), file, line)
}
}
- var cancel func()
if val.Ctx != nil {
- t.ctx, cancel = context.WithCancel(val.Ctx)
+ return context.WithCancel(val.Ctx)
} else {
- t.ctx, cancel = context.WithCancel(context.Background())
+ return context.WithCancel(context.Background())
}
- t.cancelP.Store(&cancel)
}
func (t *Req) clean(val *Rval) {
- if p := t.cancelP.Load(); p != nil {
- (*p)()
- }
if val.SaveToChan != nil {
close(val.SaveToChan)
}