]> 127.0.0.1 Git - part/.git/commitdiff
add
authorqydysky <qydysky@foxmail.com>
Sun, 16 Jul 2023 19:12:39 +0000 (03:12 +0800)
committerqydysky <qydysky@foxmail.com>
Sun, 16 Jul 2023 19:12:39 +0000 (03:12 +0800)
io/io.go
reqf/Reqf.go

index 475e3f933421a5ef58ea35274e09ab42b3c726a2..f1e2af5f8f3eb4a2ef1d4b787baa26fb9e06d95e 100644 (file)
--- a/io/io.go
+++ b/io/io.go
@@ -78,7 +78,7 @@ func WithCtxTO(ctx context.Context, callTree string, to time.Duration, w io.Writ
                        case <-ctx.Done():
                                if old := chanw.Load(); old == -1 {
                                        return
-                               } else if now := time.Now(); old > 0 && now.Unix()-old > int64(to.Seconds()) {
+                               } else if now := time.Now(); old > 0 && now.Unix()-old >= int64(to.Seconds()) {
                                        if old != 0 {
                                                panicf[0](fmt.Sprintf("rw blocking while close %vs > %v, goruntime leak \n%v", now.Unix()-old, to, callTree))
                                        }
@@ -86,7 +86,7 @@ func WithCtxTO(ctx context.Context, callTree string, to time.Duration, w io.Writ
                                        time.AfterFunc(to, func() {
                                                if old := chanw.Load(); old == -1 {
                                                        return
-                                               } else if now := time.Now(); old > 0 && now.Unix()-old > int64(to.Seconds()) {
+                                               } else if now := time.Now(); old > 0 && now.Unix()-old >= int64(to.Seconds()) {
                                                        panicf[0](fmt.Sprintf("rw blocking after close %vs > %v, goruntime leak \n%v", now.Unix()-old, to, callTree))
                                                }
                                        })
@@ -107,9 +107,7 @@ func WithCtxTO(ctx context.Context, callTree string, to time.Duration, w io.Writ
                func(p []byte) (n int, err error) {
                        select {
                        case <-ctx.Done():
-                               chanw.Store(-1)
                                err = context.Canceled
-                               return
                        default:
                                if n, err = r.Read(p); n != 0 {
                                        chanw.Store(time.Now().Unix())
@@ -120,9 +118,7 @@ func WithCtxTO(ctx context.Context, callTree string, to time.Duration, w io.Writ
                func(p []byte) (n int, err error) {
                        select {
                        case <-ctx.Done():
-                               chanw.Store(-1)
                                err = context.Canceled
-                               return
                        default:
                                if n, err = w.Write(p); n != 0 {
                                        chanw.Store(time.Now().Unix())
index 02339215f186c0047409836bfad686d5c8ef7af3..0b223469b4da1fc8e9040173caad4dd14b17a747 100644 (file)
@@ -70,7 +70,7 @@ type Req struct {
        Response *http.Response
        UsedTime time.Duration
 
-       cancelP atomic.Pointer[func()]
+       cancelP atomic.Pointer[context.CancelFunc]
        ctx     context.Context
        state   atomic.Int32
 
@@ -90,18 +90,17 @@ func (t *Req) Reqf(val Rval) error {
        t.l.Lock()
        t.state.Store(running)
 
-       t.prepare(&val)
+       pctx, cancelF := t.prepare(&val)
+       t.cancelP.Store(&cancelF)
 
        // 同步
        if !val.Async {
                beginTime := time.Now()
 
                for i := 0; i <= val.Retry; i++ {
-                       ctx, cancle := t.prepareRes(&val)
+                       ctx, cancel := t.prepareRes(pctx, &val)
                        t.err = t.Reqf_1(ctx, val)
-                       if cancle != nil {
-                               cancle()
-                       }
+                       cancel()
                        if t.err == nil || IsCancel(t.err) {
                                break
                        }
@@ -110,6 +109,7 @@ func (t *Req) Reqf(val Rval) error {
                        }
                }
 
+               cancelF()
                t.updateUseDur(beginTime)
                t.clean(&val)
                t.state.Store(free)
@@ -122,11 +122,9 @@ func (t *Req) Reqf(val Rval) error {
                beginTime := time.Now()
 
                for i := 0; i <= val.Retry; i++ {
-                       ctx, cancle := t.prepareRes(&val)
+                       ctx, cancel := t.prepareRes(pctx, &val)
                        t.err = t.Reqf_1(ctx, val)
-                       if cancle != nil {
-                               cancle()
-                       }
+                       cancel()
                        if t.err == nil || IsCancel(t.err) {
                                break
                        }
@@ -135,6 +133,7 @@ func (t *Req) Reqf(val Rval) error {
                        }
                }
 
+               cancelF()
                t.updateUseDur(beginTime)
                t.clean(&val)
                t.state.Store(free)
@@ -250,7 +249,7 @@ func (t *Req) Reqf_1(ctx context.Context, val Rval) (err error) {
 
        w := io.MultiWriter(ws...)
 
-       var resReadCloser io.ReadCloser
+       var resReadCloser = resp.Body
        if compress_type := resp.Header[`Content-Encoding`]; len(compress_type) != 0 {
                switch compress_type[0] {
                case `br`:
@@ -260,28 +259,25 @@ func (t *Req) Reqf_1(ctx context.Context, val Rval) (err error) {
                case `deflate`:
                        resReadCloser = flate.NewReader(resp.Body)
                default:
-                       resReadCloser = resp.Body
                }
-       } else {
-               resReadCloser = resp.Body
        }
 
        writeLoopTO := val.WriteLoopTO
        if writeLoopTO == 0 {
                writeLoopTO = 1000
        }
-       rwc := pio.WithCtxTO(ctx, t.callTree, time.Duration(int(time.Millisecond)*writeLoopTO), w, resReadCloser)
+       rwc := pio.WithCtxTO(req.Context(), t.callTree, time.Duration(int(time.Millisecond)*writeLoopTO), w, resReadCloser)
        defer rwc.Close()
 
        for buf := make([]byte, 2048); true; {
                if n, e := rwc.Read(buf); n != 0 {
-                       if wn, we := rwc.Write(buf[:n]); wn != 0 {
+                       if n, e := rwc.Write(buf[:n]); n != 0 {
                                if val.SaveToChan != nil {
                                        val.SaveToChan <- buf[:n]
                                }
-                       } else if we != nil {
-                               if !errors.Is(we, io.EOF) {
-                                       err = errors.Join(err, ErrWriteRes, we)
+                       } else if e != nil {
+                               if !errors.Is(e, io.EOF) {
+                                       err = errors.Join(err, ErrWriteRes, e)
                                }
                                break
                        }
@@ -320,7 +316,7 @@ func (t *Req) IsLive() bool {
        return t.state.Load() == running
 }
 
-func (t *Req) prepareRes(val *Rval) (context.Context, context.CancelFunc) {
+func (t *Req) prepareRes(ctx context.Context, val *Rval) (context.Context, context.CancelFunc) {
        if !val.NoResponse {
                if t.responBuf == nil {
                        t.responBuf = new(bytes.Buffer)
@@ -335,12 +331,12 @@ func (t *Req) prepareRes(val *Rval) (context.Context, context.CancelFunc) {
        t.Response = nil
        t.err = nil
        if val.Timeout > 0 {
-               return context.WithTimeout(t.ctx, time.Duration(val.Timeout*int(time.Millisecond)))
+               return context.WithTimeout(ctx, time.Duration(val.Timeout*int(time.Millisecond)))
        }
-       return t.ctx, nil
+       return context.WithCancel(ctx)
 }
 
-func (t *Req) prepare(val *Rval) {
+func (t *Req) prepare(val *Rval) (ctx context.Context, cancel context.CancelFunc) {
        t.UsedTime = 0
        t.responFile = nil
        t.callTree = ""
@@ -351,19 +347,14 @@ func (t *Req) prepare(val *Rval) {
                        t.callTree += fmt.Sprintf("call by %s\n\t%s:%d\n", runtime.FuncForPC(pc).Name(), file, line)
                }
        }
-       var cancel func()
        if val.Ctx != nil {
-               t.ctx, cancel = context.WithCancel(val.Ctx)
+               return context.WithCancel(val.Ctx)
        } else {
-               t.ctx, cancel = context.WithCancel(context.Background())
+               return context.WithCancel(context.Background())
        }
-       t.cancelP.Store(&cancel)
 }
 
 func (t *Req) clean(val *Rval) {
-       if p := t.cancelP.Load(); p != nil {
-               (*p)()
-       }
        if val.SaveToChan != nil {
                close(val.SaveToChan)
        }