// to avoid writer block after ctx done, you should close writer after ctx done
//
// call Close() after writer fin
-func WithCtxCopy(ctx context.Context, callTree string, to time.Duration, w io.Writer, r io.Reader, panicf ...func(s string)) error {
- rwc := WithCtxTO(ctx, callTree, to, w, r, panicf...)
- defer rwc.Close()
- for buf := make([]byte, 2048); true; {
- if n, e := rwc.Read(buf); n != 0 {
- if n, e := rwc.Write(buf[:n]); n == 0 && e != nil {
- if !errors.Is(e, io.EOF) {
- return errors.Join(ErrWrite, e)
+func WithCtxCopy(ctx context.Context, callTree string, copybuf []byte, to time.Duration, w io.Writer, r io.Reader, panicf ...func(s string)) error {
+ var chanw atomic.Int64
+ chanw.Store(time.Now().Unix())
+ if len(panicf) == 0 {
+ panicf = append(panicf, func(s string) { panic(s) })
+ }
+
+ go func() {
+ var timer = time.NewTicker(to)
+ defer timer.Stop()
+ for {
+ select {
+ case <-ctx.Done():
+ if old, now := chanw.Load(), time.Now(); old > 0 && now.Unix()-old > int64(to.Seconds()) {
+ panicf[0](fmt.Sprintf("rw blocking while close %vs > %v, goruntime leak \n%v", now.Unix()-old, to, callTree))
+ } else {
+ time.AfterFunc(to, func() {
+ if chanw.Load() != -1 {
+ panicf[0](fmt.Sprintf("rw blocking after close %v, goruntime leak \n%v", to, callTree))
+ }
+ })
+ }
+ return
+ case now := <-timer.C:
+ if old := chanw.Load(); old > 0 && now.Unix()-old > int64(to.Seconds()) {
+ panicf[0](fmt.Sprintf("rw blocking after rw %vs > %v, goruntime leak \n%v", now.Unix()-old, to, callTree))
+ return
}
- break
}
- } else if e != nil {
- if !errors.Is(e, io.EOF) {
- return errors.Join(ErrRead, e)
+ }
+ }()
+
+ defer chanw.Store(-1)
+
+ for {
+ select {
+ case <-ctx.Done():
+ return errors.Join(ErrRead, context.Canceled)
+ default:
+ n, e := r.Read(copybuf)
+ chanw.Store(time.Now().Unix())
+ if n != 0 {
+ select {
+ case <-ctx.Done():
+ return errors.Join(ErrRead, context.Canceled)
+ default:
+ n, e := w.Write(copybuf[:n])
+ chanw.Store(time.Now().Unix())
+ if n == 0 && e != nil {
+ if !errors.Is(e, io.EOF) {
+ return errors.Join(ErrWrite, e)
+ }
+ return nil
+ }
+ }
+ } else if e != nil {
+ if !errors.Is(e, io.EOF) {
+ return errors.Join(ErrRead, e)
+ }
+ return nil
}
- break
}
}
- return nil
}
inUse func(*T) bool
reuseF func(*T) *T
poolF func(*T) *T
- buf []poolItem[T]
+ mbuf map[*T]bool
l sync.RWMutex
}
-type poolItem[T any] struct {
- i *T
- pooled bool
-}
-
// 创建池
//
// NewF: func() *T 新值
t.reuseF = ReuseF
t.poolF = PoolF
t.maxsize = maxsize
+ t.mbuf = make(map[*T]bool)
return t
}
-func (t *Buf[T]) PoolInUse() (inUse int) {
- t.l.RLock()
- defer t.l.RUnlock()
-
- for i := 0; i < len(t.buf); i++ {
- if !t.buf[i].pooled && t.inUse(t.buf[i].i) {
- inUse++
- }
- }
-
- return
-}
-
-func (t *Buf[T]) PoolSum() int {
+// states[] 0:pooled, 1:nopooled, 2:inuse, 3:nouse, 4:sum
+func (t *Buf[T]) PoolState() (states []any) {
t.l.RLock()
defer t.l.RUnlock()
- return len(t.buf)
-}
-
-func (t *Buf[T]) Trim() {
- t.l.Lock()
- defer t.l.Unlock()
+ var pooled, nopooled, inuse, nouse, sum int
- for i := 0; i < len(t.buf); i++ {
- if t.buf[i].pooled && !t.inUse(t.buf[i].i) {
- t.buf = append(t.buf[:i], t.buf[i+1:]...)
- i--
+ sum = len(t.mbuf)
+ for k, v := range t.mbuf {
+ if v {
+ pooled++
+ } else {
+ nopooled++
+ }
+ if t.inUse(k) {
+ inuse++
+ } else {
+ nouse++
}
}
+
+ return []any{pooled, nopooled, inuse, nouse, sum}
}
func (t *Buf[T]) Get() *T {
t.l.Lock()
defer t.l.Unlock()
- for i := 0; i < len(t.buf); i++ {
- if t.buf[i].pooled && !t.inUse(t.buf[i].i) {
- t.buf[i].pooled = false
- return t.reuseF(t.buf[i].i)
+ for k, v := range t.mbuf {
+ if v && !t.inUse(k) {
+ t.mbuf[k] = true
+ return t.reuseF(k)
}
}
t.l.Lock()
defer t.l.Unlock()
- var cu = 0
- for i := 0; i < len(t.buf); i++ {
- if t.buf[i].pooled && !t.inUse(t.buf[i].i) {
- t.buf[i].i = t.poolF(item[cu])
- t.buf[i].pooled = true
- cu++
- if cu >= len(item) {
- return
- }
+ for i := 0; i < len(item); i++ {
+ if _, ok := t.mbuf[item[i]]; ok {
+ t.poolF(item[i])
+ t.mbuf[item[i]] = true
+ } else if t.maxsize > len(t.mbuf) {
+ t.poolF(item[i])
+ t.mbuf[item[i]] = true
}
}
-
- for i := cu; i < len(item) && t.maxsize > len(t.buf); i++ {
- t.buf = append(t.buf, poolItem[T]{t.poolF(item[i]), true})
- }
}
var c2p = uintptr(unsafe.Pointer(c2))
c2.d = append(c2.d, []byte("2")...)
- if c1p == c2p || bytes.Equal(c1.d, c2.d) || b.PoolInUse() != 0 || b.PoolSum() != 0 {
+ if c1p == c2p || bytes.Equal(c1.d, c2.d) || b.PoolState()[2] != 0 || b.PoolState()[4] != 0 {
t.Fatal()
}
var c3 = b.Get()
var c3p = uintptr(unsafe.Pointer(c3))
- if c1p == c3p || len(c1.d) == 0 || b.PoolInUse() != 0 || b.PoolSum() != 0 {
+ if c1p == c3p || len(c1.d) == 0 || b.PoolState()[2] != 0 || b.PoolState()[4] != 0 {
t.Fatal()
}
b.Put(c1)
- if len(c1.d) == 0 || b.PoolInUse() != 0 || b.PoolSum() != 1 {
- t.Fatal(len(c1.d) != 0, b.PoolInUse() != 0, b.PoolSum() != 1)
+ if len(c1.d) == 0 || b.PoolState()[2] != 0 || b.PoolState()[4] != 1 {
+ t.Fatal(len(c1.d) != 0, b.PoolState()[2] != 0, b.PoolState()[4] != 1)
}
}