]> 127.0.0.1 Git - part/.git/commitdiff
add
authorqydysky <qydysky@foxmail.com>
Tue, 8 Aug 2023 18:38:54 +0000 (02:38 +0800)
committerqydysky <qydysky@foxmail.com>
Tue, 8 Aug 2023 18:38:54 +0000 (02:38 +0800)
io/io.go
pool/Pool.go
pool/Pool_test.go

index e3ea2a1178b4cc9486e9048c18acaeb1a441723f..83b8abe75b6150337b488b237c66a0c11e42a01f 100644 (file)
--- a/io/io.go
+++ b/io/io.go
@@ -220,23 +220,67 @@ var (
 // to avoid writer block after ctx done, you should close writer after ctx done
 //
 // call Close() after writer fin
-func WithCtxCopy(ctx context.Context, callTree string, to time.Duration, w io.Writer, r io.Reader, panicf ...func(s string)) error {
-       rwc := WithCtxTO(ctx, callTree, to, w, r, panicf...)
-       defer rwc.Close()
-       for buf := make([]byte, 2048); true; {
-               if n, e := rwc.Read(buf); n != 0 {
-                       if n, e := rwc.Write(buf[:n]); n == 0 && e != nil {
-                               if !errors.Is(e, io.EOF) {
-                                       return errors.Join(ErrWrite, e)
+func WithCtxCopy(ctx context.Context, callTree string, copybuf []byte, to time.Duration, w io.Writer, r io.Reader, panicf ...func(s string)) error {
+       var chanw atomic.Int64
+       chanw.Store(time.Now().Unix())
+       if len(panicf) == 0 {
+               panicf = append(panicf, func(s string) { panic(s) })
+       }
+
+       go func() {
+               var timer = time.NewTicker(to)
+               defer timer.Stop()
+               for {
+                       select {
+                       case <-ctx.Done():
+                               if old, now := chanw.Load(), time.Now(); old > 0 && now.Unix()-old > int64(to.Seconds()) {
+                                       panicf[0](fmt.Sprintf("rw blocking while close %vs > %v, goruntime leak \n%v", now.Unix()-old, to, callTree))
+                               } else {
+                                       time.AfterFunc(to, func() {
+                                               if chanw.Load() != -1 {
+                                                       panicf[0](fmt.Sprintf("rw blocking after close %v, goruntime leak \n%v", to, callTree))
+                                               }
+                                       })
+                               }
+                               return
+                       case now := <-timer.C:
+                               if old := chanw.Load(); old > 0 && now.Unix()-old > int64(to.Seconds()) {
+                                       panicf[0](fmt.Sprintf("rw blocking after rw %vs > %v, goruntime leak \n%v", now.Unix()-old, to, callTree))
+                                       return
                                }
-                               break
                        }
-               } else if e != nil {
-                       if !errors.Is(e, io.EOF) {
-                               return errors.Join(ErrRead, e)
+               }
+       }()
+
+       defer chanw.Store(-1)
+
+       for {
+               select {
+               case <-ctx.Done():
+                       return errors.Join(ErrRead, context.Canceled)
+               default:
+                       n, e := r.Read(copybuf)
+                       chanw.Store(time.Now().Unix())
+                       if n != 0 {
+                               select {
+                               case <-ctx.Done():
+                                       return errors.Join(ErrRead, context.Canceled)
+                               default:
+                                       n, e := w.Write(copybuf[:n])
+                                       chanw.Store(time.Now().Unix())
+                                       if n == 0 && e != nil {
+                                               if !errors.Is(e, io.EOF) {
+                                                       return errors.Join(ErrWrite, e)
+                                               }
+                                               return nil
+                                       }
+                               }
+                       } else if e != nil {
+                               if !errors.Is(e, io.EOF) {
+                                       return errors.Join(ErrRead, e)
+                               }
+                               return nil
                        }
-                       break
                }
        }
-       return nil
 }
index f7c4b2457a70256568f21486ba524c8f751e88a6..1da72251d6f40a4a214ec7abd4ceadd3383546e2 100644 (file)
@@ -10,15 +10,10 @@ type Buf[T any] struct {
        inUse   func(*T) bool
        reuseF  func(*T) *T
        poolF   func(*T) *T
-       buf     []poolItem[T]
+       mbuf    map[*T]bool
        l       sync.RWMutex
 }
 
-type poolItem[T any] struct {
-       i      *T
-       pooled bool
-}
-
 // 创建池
 //
 // NewF: func() *T 新值
@@ -37,49 +32,42 @@ func New[T any](NewF func() *T, InUse func(*T) bool, ReuseF func(*T) *T, PoolF f
        t.reuseF = ReuseF
        t.poolF = PoolF
        t.maxsize = maxsize
+       t.mbuf = make(map[*T]bool)
        return t
 }
 
-func (t *Buf[T]) PoolInUse() (inUse int) {
-       t.l.RLock()
-       defer t.l.RUnlock()
-
-       for i := 0; i < len(t.buf); i++ {
-               if !t.buf[i].pooled && t.inUse(t.buf[i].i) {
-                       inUse++
-               }
-       }
-
-       return
-}
-
-func (t *Buf[T]) PoolSum() int {
+// states[] 0:pooled, 1:nopooled, 2:inuse, 3:nouse, 4:sum
+func (t *Buf[T]) PoolState() (states []any) {
        t.l.RLock()
        defer t.l.RUnlock()
 
-       return len(t.buf)
-}
-
-func (t *Buf[T]) Trim() {
-       t.l.Lock()
-       defer t.l.Unlock()
+       var pooled, nopooled, inuse, nouse, sum int
 
-       for i := 0; i < len(t.buf); i++ {
-               if t.buf[i].pooled && !t.inUse(t.buf[i].i) {
-                       t.buf = append(t.buf[:i], t.buf[i+1:]...)
-                       i--
+       sum = len(t.mbuf)
+       for k, v := range t.mbuf {
+               if v {
+                       pooled++
+               } else {
+                       nopooled++
+               }
+               if t.inUse(k) {
+                       inuse++
+               } else {
+                       nouse++
                }
        }
+
+       return []any{pooled, nopooled, inuse, nouse, sum}
 }
 
 func (t *Buf[T]) Get() *T {
        t.l.Lock()
        defer t.l.Unlock()
 
-       for i := 0; i < len(t.buf); i++ {
-               if t.buf[i].pooled && !t.inUse(t.buf[i].i) {
-                       t.buf[i].pooled = false
-                       return t.reuseF(t.buf[i].i)
+       for k, v := range t.mbuf {
+               if v && !t.inUse(k) {
+                       t.mbuf[k] = true
+                       return t.reuseF(k)
                }
        }
 
@@ -94,19 +82,13 @@ func (t *Buf[T]) Put(item ...*T) {
        t.l.Lock()
        defer t.l.Unlock()
 
-       var cu = 0
-       for i := 0; i < len(t.buf); i++ {
-               if t.buf[i].pooled && !t.inUse(t.buf[i].i) {
-                       t.buf[i].i = t.poolF(item[cu])
-                       t.buf[i].pooled = true
-                       cu++
-                       if cu >= len(item) {
-                               return
-                       }
+       for i := 0; i < len(item); i++ {
+               if _, ok := t.mbuf[item[i]]; ok {
+                       t.poolF(item[i])
+                       t.mbuf[item[i]] = true
+               } else if t.maxsize > len(t.mbuf) {
+                       t.poolF(item[i])
+                       t.mbuf[item[i]] = true
                }
        }
-
-       for i := cu; i < len(item) && t.maxsize > len(t.buf); i++ {
-               t.buf = append(t.buf, poolItem[T]{t.poolF(item[i]), true})
-       }
 }
index 4fa847c6ffdeb8471d90a819f0786542d6d5236e..30f171645a8d1727c26cf25b71e0084877eb3642 100644 (file)
@@ -40,7 +40,7 @@ func TestXxx(t *testing.T) {
        var c2p = uintptr(unsafe.Pointer(c2))
        c2.d = append(c2.d, []byte("2")...)
 
-       if c1p == c2p || bytes.Equal(c1.d, c2.d) || b.PoolInUse() != 0 || b.PoolSum() != 0 {
+       if c1p == c2p || bytes.Equal(c1.d, c2.d) || b.PoolState()[2] != 0 || b.PoolState()[4] != 0 {
                t.Fatal()
        }
 
@@ -49,13 +49,13 @@ func TestXxx(t *testing.T) {
        var c3 = b.Get()
        var c3p = uintptr(unsafe.Pointer(c3))
 
-       if c1p == c3p || len(c1.d) == 0 || b.PoolInUse() != 0 || b.PoolSum() != 0 {
+       if c1p == c3p || len(c1.d) == 0 || b.PoolState()[2] != 0 || b.PoolState()[4] != 0 {
                t.Fatal()
        }
 
        b.Put(c1)
 
-       if len(c1.d) == 0 || b.PoolInUse() != 0 || b.PoolSum() != 1 {
-               t.Fatal(len(c1.d) != 0, b.PoolInUse() != 0, b.PoolSum() != 1)
+       if len(c1.d) == 0 || b.PoolState()[2] != 0 || b.PoolState()[4] != 1 {
+               t.Fatal(len(c1.d) != 0, b.PoolState()[2] != 0, b.PoolState()[4] != 1)
        }
 }