]> 127.0.0.1 Git - part/.git/commitdiff
Add v0.28.0+20230602e713cd3
authorqydysky <qydysky@foxmail.com>
Fri, 2 Jun 2023 19:08:05 +0000 (03:08 +0800)
committerqydysky <qydysky@foxmail.com>
Fri, 2 Jun 2023 19:08:05 +0000 (03:08 +0800)
reqf/Reqf.go

index 643091d781f5b8d6cfa6eadd27f6126ff35d1802..f3c98185e77f5dd7ddacd615907de83812d10ade 100644 (file)
@@ -60,6 +60,7 @@ var (
        ErrResponFileCreate = errors.New("ErrResponFileCreate")
        ErrWriteRes         = errors.New("ErrWriteRes")
        ErrReadRes          = errors.New("ErrReadRes")
+       ErrWriteAfterWrite  = errors.New("ErrWriteAfterWrite")
 )
 
 type Req struct {
@@ -273,7 +274,7 @@ func (t *Req) Reqf_1(ctx context.Context, val Rval) (err error) {
        rwc := t.withCtxTO(ctx, time.Duration(int(time.Millisecond)*writeLoopTO), w, resReadCloser)
        defer rwc.Close()
 
-       for buf := make([]byte, 512); true; {
+       for buf := make([]byte, 2048); true; {
                if n, e := rwc.Read(buf); n != 0 {
                        if wn, we := rwc.Write(buf[:n]); wn != 0 {
                                if val.SaveToChan != nil {
@@ -396,7 +397,7 @@ func (t rwc) Close() error {
 }
 
 func (t *Req) withCtxTO(ctx context.Context, to time.Duration, w io.Writer, r io.Reader) io.ReadWriteCloser {
-       var chanw = make(chan struct{}, 1)
+       var chanw atomic.Int64
 
        go func(callTree string) {
                var timer = time.NewTicker(to)
@@ -404,13 +405,13 @@ func (t *Req) withCtxTO(ctx context.Context, to time.Duration, w io.Writer, r io
                for {
                        select {
                        case <-ctx.Done():
-                               if len(chanw) != 0 {
-                                       panic(fmt.Sprintf("write blocking after %v, goruntime leak \n%v", to, callTree))
+                               if chanw.Load() != 0 {
+                                       panic(fmt.Sprintf("write blocking after Ctx Done, goruntime leak \n%v", callTree))
                                }
                                return
-                       case <-timer.C:
-                               if len(chanw) != 0 {
-                                       panic(fmt.Sprintf("write blocking after %v, goruntime leak \n%v", to, callTree))
+                       case now := <-timer.C:
+                               if old := chanw.Load(); old != 0 && now.Unix()-old > int64(to.Seconds()) {
+                                       panic(fmt.Sprintf("write blocking after %v, goruntime leak \n%v", now.Unix()-old, callTree))
                                }
                        }
                }
@@ -421,23 +422,21 @@ func (t *Req) withCtxTO(ctx context.Context, to time.Duration, w io.Writer, r io
                        if n, err = r.Read(p); n != 0 {
                                select {
                                case <-ctx.Done():
-                               case chanw <- struct{}{}:
                                default:
+                                       chanw.Store(time.Now().Unix())
                                }
                        }
                        return
                },
                func(p []byte) (n int, err error) {
                        if n, err = w.Write(p); n != 0 {
-                               select {
-                               case <-chanw:
-                               default:
+                               if chanw.Swap(0) == 0 {
+                                       panic(ErrWriteAfterWrite)
                                }
                        }
                        return
                },
                func() error {
-                       close(chanw)
                        return nil
                },
        }