ErrResponFileCreate = errors.New("ErrResponFileCreate")
ErrWriteRes = errors.New("ErrWriteRes")
ErrReadRes = errors.New("ErrReadRes")
+ ErrWriteAfterWrite = errors.New("ErrWriteAfterWrite")
)
type Req struct {
rwc := t.withCtxTO(ctx, time.Duration(int(time.Millisecond)*writeLoopTO), w, resReadCloser)
defer rwc.Close()
- for buf := make([]byte, 512); true; {
+ for buf := make([]byte, 2048); true; {
if n, e := rwc.Read(buf); n != 0 {
if wn, we := rwc.Write(buf[:n]); wn != 0 {
if val.SaveToChan != nil {
}
func (t *Req) withCtxTO(ctx context.Context, to time.Duration, w io.Writer, r io.Reader) io.ReadWriteCloser {
- var chanw = make(chan struct{}, 1)
+ var chanw atomic.Int64
go func(callTree string) {
var timer = time.NewTicker(to)
for {
select {
case <-ctx.Done():
- if len(chanw) != 0 {
- panic(fmt.Sprintf("write blocking after %v, goruntime leak \n%v", to, callTree))
+ if chanw.Load() != 0 {
+ panic(fmt.Sprintf("write blocking after Ctx Done, goruntime leak \n%v", callTree))
}
return
- case <-timer.C:
- if len(chanw) != 0 {
- panic(fmt.Sprintf("write blocking after %v, goruntime leak \n%v", to, callTree))
+ case now := <-timer.C:
+ if old := chanw.Load(); old != 0 && now.Unix()-old > int64(to.Seconds()) {
+ panic(fmt.Sprintf("write blocking after %v, goruntime leak \n%v", now.Unix()-old, callTree))
}
}
}
if n, err = r.Read(p); n != 0 {
select {
case <-ctx.Done():
- case chanw <- struct{}{}:
default:
+ chanw.Store(time.Now().Unix())
}
}
return
},
func(p []byte) (n int, err error) {
if n, err = w.Write(p); n != 0 {
- select {
- case <-chanw:
- default:
+ if chanw.Swap(0) == 0 {
+ panic(ErrWriteAfterWrite)
}
}
return
},
func() error {
- close(chanw)
return nil
},
}