From: qydysky Date: Tue, 8 Aug 2023 18:38:54 +0000 (+0800) Subject: add X-Git-Tag: v0.28.0+202308089caa551~1 X-Git-Url: http://127.0.0.1:8081/?a=commitdiff_plain;h=696b5d4840c4ad8f98690caf3724a68b94b09e93;p=part%2F.git add --- diff --git a/io/io.go b/io/io.go index e3ea2a1..83b8abe 100644 --- a/io/io.go +++ b/io/io.go @@ -220,23 +220,67 @@ var ( // to avoid writer block after ctx done, you should close writer after ctx done // // call Close() after writer fin -func WithCtxCopy(ctx context.Context, callTree string, to time.Duration, w io.Writer, r io.Reader, panicf ...func(s string)) error { - rwc := WithCtxTO(ctx, callTree, to, w, r, panicf...) - defer rwc.Close() - for buf := make([]byte, 2048); true; { - if n, e := rwc.Read(buf); n != 0 { - if n, e := rwc.Write(buf[:n]); n == 0 && e != nil { - if !errors.Is(e, io.EOF) { - return errors.Join(ErrWrite, e) +func WithCtxCopy(ctx context.Context, callTree string, copybuf []byte, to time.Duration, w io.Writer, r io.Reader, panicf ...func(s string)) error { + var chanw atomic.Int64 + chanw.Store(time.Now().Unix()) + if len(panicf) == 0 { + panicf = append(panicf, func(s string) { panic(s) }) + } + + go func() { + var timer = time.NewTicker(to) + defer timer.Stop() + for { + select { + case <-ctx.Done(): + if old, now := chanw.Load(), time.Now(); old > 0 && now.Unix()-old > int64(to.Seconds()) { + panicf[0](fmt.Sprintf("rw blocking while close %vs > %v, goruntime leak \n%v", now.Unix()-old, to, callTree)) + } else { + time.AfterFunc(to, func() { + if chanw.Load() != -1 { + panicf[0](fmt.Sprintf("rw blocking after close %v, goruntime leak \n%v", to, callTree)) + } + }) + } + return + case now := <-timer.C: + if old := chanw.Load(); old > 0 && now.Unix()-old > int64(to.Seconds()) { + panicf[0](fmt.Sprintf("rw blocking after rw %vs > %v, goruntime leak \n%v", now.Unix()-old, to, callTree)) + return } - break } - } else if e != nil { - if !errors.Is(e, io.EOF) { - return errors.Join(ErrRead, e) + } + }() + + defer chanw.Store(-1) + + for { + select { + case <-ctx.Done(): + return errors.Join(ErrRead, context.Canceled) + default: + n, e := r.Read(copybuf) + chanw.Store(time.Now().Unix()) + if n != 0 { + select { + case <-ctx.Done(): + return errors.Join(ErrRead, context.Canceled) + default: + n, e := w.Write(copybuf[:n]) + chanw.Store(time.Now().Unix()) + if n == 0 && e != nil { + if !errors.Is(e, io.EOF) { + return errors.Join(ErrWrite, e) + } + return nil + } + } + } else if e != nil { + if !errors.Is(e, io.EOF) { + return errors.Join(ErrRead, e) + } + return nil } - break } } - return nil } diff --git a/pool/Pool.go b/pool/Pool.go index f7c4b24..1da7225 100644 --- a/pool/Pool.go +++ b/pool/Pool.go @@ -10,15 +10,10 @@ type Buf[T any] struct { inUse func(*T) bool reuseF func(*T) *T poolF func(*T) *T - buf []poolItem[T] + mbuf map[*T]bool l sync.RWMutex } -type poolItem[T any] struct { - i *T - pooled bool -} - // 创建池 // // NewF: func() *T 新值 @@ -37,49 +32,42 @@ func New[T any](NewF func() *T, InUse func(*T) bool, ReuseF func(*T) *T, PoolF f t.reuseF = ReuseF t.poolF = PoolF t.maxsize = maxsize + t.mbuf = make(map[*T]bool) return t } -func (t *Buf[T]) PoolInUse() (inUse int) { - t.l.RLock() - defer t.l.RUnlock() - - for i := 0; i < len(t.buf); i++ { - if !t.buf[i].pooled && t.inUse(t.buf[i].i) { - inUse++ - } - } - - return -} - -func (t *Buf[T]) PoolSum() int { +// states[] 0:pooled, 1:nopooled, 2:inuse, 3:nouse, 4:sum +func (t *Buf[T]) PoolState() (states []any) { t.l.RLock() defer t.l.RUnlock() - return len(t.buf) -} - -func (t *Buf[T]) Trim() { - t.l.Lock() - defer t.l.Unlock() + var pooled, nopooled, inuse, nouse, sum int - for i := 0; i < len(t.buf); i++ { - if t.buf[i].pooled && !t.inUse(t.buf[i].i) { - t.buf = append(t.buf[:i], t.buf[i+1:]...) - i-- + sum = len(t.mbuf) + for k, v := range t.mbuf { + if v { + pooled++ + } else { + nopooled++ + } + if t.inUse(k) { + inuse++ + } else { + nouse++ } } + + return []any{pooled, nopooled, inuse, nouse, sum} } func (t *Buf[T]) Get() *T { t.l.Lock() defer t.l.Unlock() - for i := 0; i < len(t.buf); i++ { - if t.buf[i].pooled && !t.inUse(t.buf[i].i) { - t.buf[i].pooled = false - return t.reuseF(t.buf[i].i) + for k, v := range t.mbuf { + if v && !t.inUse(k) { + t.mbuf[k] = true + return t.reuseF(k) } } @@ -94,19 +82,13 @@ func (t *Buf[T]) Put(item ...*T) { t.l.Lock() defer t.l.Unlock() - var cu = 0 - for i := 0; i < len(t.buf); i++ { - if t.buf[i].pooled && !t.inUse(t.buf[i].i) { - t.buf[i].i = t.poolF(item[cu]) - t.buf[i].pooled = true - cu++ - if cu >= len(item) { - return - } + for i := 0; i < len(item); i++ { + if _, ok := t.mbuf[item[i]]; ok { + t.poolF(item[i]) + t.mbuf[item[i]] = true + } else if t.maxsize > len(t.mbuf) { + t.poolF(item[i]) + t.mbuf[item[i]] = true } } - - for i := cu; i < len(item) && t.maxsize > len(t.buf); i++ { - t.buf = append(t.buf, poolItem[T]{t.poolF(item[i]), true}) - } } diff --git a/pool/Pool_test.go b/pool/Pool_test.go index 4fa847c..30f1716 100644 --- a/pool/Pool_test.go +++ b/pool/Pool_test.go @@ -40,7 +40,7 @@ func TestXxx(t *testing.T) { var c2p = uintptr(unsafe.Pointer(c2)) c2.d = append(c2.d, []byte("2")...) - if c1p == c2p || bytes.Equal(c1.d, c2.d) || b.PoolInUse() != 0 || b.PoolSum() != 0 { + if c1p == c2p || bytes.Equal(c1.d, c2.d) || b.PoolState()[2] != 0 || b.PoolState()[4] != 0 { t.Fatal() } @@ -49,13 +49,13 @@ func TestXxx(t *testing.T) { var c3 = b.Get() var c3p = uintptr(unsafe.Pointer(c3)) - if c1p == c3p || len(c1.d) == 0 || b.PoolInUse() != 0 || b.PoolSum() != 0 { + if c1p == c3p || len(c1.d) == 0 || b.PoolState()[2] != 0 || b.PoolState()[4] != 0 { t.Fatal() } b.Put(c1) - if len(c1.d) == 0 || b.PoolInUse() != 0 || b.PoolSum() != 1 { - t.Fatal(len(c1.d) != 0, b.PoolInUse() != 0, b.PoolSum() != 1) + if len(c1.d) == 0 || b.PoolState()[2] != 0 || b.PoolState()[4] != 1 { + t.Fatal(len(c1.d) != 0, b.PoolState()[2] != 0, b.PoolState()[4] != 1) } }