From: qydysky Date: Fri, 2 Jun 2023 19:08:05 +0000 (+0800) Subject: Add X-Git-Tag: v0.28.0+20230602e713cd3 X-Git-Url: http://127.0.0.1:8081/?a=commitdiff_plain;h=e713cd3914aabd280671c227dadb5d36510ce337;p=part%2F.git Add --- diff --git a/reqf/Reqf.go b/reqf/Reqf.go index 643091d..f3c9818 100644 --- a/reqf/Reqf.go +++ b/reqf/Reqf.go @@ -60,6 +60,7 @@ var ( ErrResponFileCreate = errors.New("ErrResponFileCreate") ErrWriteRes = errors.New("ErrWriteRes") ErrReadRes = errors.New("ErrReadRes") + ErrWriteAfterWrite = errors.New("ErrWriteAfterWrite") ) type Req struct { @@ -273,7 +274,7 @@ func (t *Req) Reqf_1(ctx context.Context, val Rval) (err error) { rwc := t.withCtxTO(ctx, time.Duration(int(time.Millisecond)*writeLoopTO), w, resReadCloser) defer rwc.Close() - for buf := make([]byte, 512); true; { + for buf := make([]byte, 2048); true; { if n, e := rwc.Read(buf); n != 0 { if wn, we := rwc.Write(buf[:n]); wn != 0 { if val.SaveToChan != nil { @@ -396,7 +397,7 @@ func (t rwc) Close() error { } func (t *Req) withCtxTO(ctx context.Context, to time.Duration, w io.Writer, r io.Reader) io.ReadWriteCloser { - var chanw = make(chan struct{}, 1) + var chanw atomic.Int64 go func(callTree string) { var timer = time.NewTicker(to) @@ -404,13 +405,13 @@ func (t *Req) withCtxTO(ctx context.Context, to time.Duration, w io.Writer, r io for { select { case <-ctx.Done(): - if len(chanw) != 0 { - panic(fmt.Sprintf("write blocking after %v, goruntime leak \n%v", to, callTree)) + if chanw.Load() != 0 { + panic(fmt.Sprintf("write blocking after Ctx Done, goruntime leak \n%v", callTree)) } return - case <-timer.C: - if len(chanw) != 0 { - panic(fmt.Sprintf("write blocking after %v, goruntime leak \n%v", to, callTree)) + case now := <-timer.C: + if old := chanw.Load(); old != 0 && now.Unix()-old > int64(to.Seconds()) { + panic(fmt.Sprintf("write blocking after %v, goruntime leak \n%v", now.Unix()-old, callTree)) } } } @@ -421,23 +422,21 @@ func (t *Req) withCtxTO(ctx context.Context, to time.Duration, w io.Writer, r io if n, err = r.Read(p); n != 0 { select { case <-ctx.Done(): - case chanw <- struct{}{}: default: + chanw.Store(time.Now().Unix()) } } return }, func(p []byte) (n int, err error) { if n, err = w.Write(p); n != 0 { - select { - case <-chanw: - default: + if chanw.Swap(0) == 0 { + panic(ErrWriteAfterWrite) } } return }, func() error { - close(chanw) return nil }, }