import (
"errors"
+ "runtime"
)
type BlocksI[T any] interface {
// // eg
//
// if tmpbuf, putBack, e := buf.Get(); e == nil {
- // clear(tmpbuf)
// // do something with tmpbuf
// putBack()
// }
Get() ([]T, func(), error)
+
+ // // eg
+ //
+ // if tmpbuf, e := buf.GetAuto(); e == nil {
+ // // do something with tmpbuf
+ // }
+ GetAuto() ([]T, error)
}
type blocks[T any] struct {
return nil, func() {}, ErrOverflow
}
}
+
+func (t *blocks[T]) GetAuto() (b []T, e error) {
+ select {
+ case offset := <-t.free:
+ b = t.buf[offset*t.size : (offset+1)*t.size]
+ runtime.SetFinalizer(&b, func(p any) {
+ clear(*p.(*[]T))
+ t.free <- offset
+ })
+ return
+ default:
+ return nil, ErrOverflow
+ }
+}
putBack()
}
}
+
+// 374.4 ns/op 32 B/op 1 allocs/op
+func Benchmark(b *testing.B) {
+ buf := NewBlocks[byte](1024, 1)
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ if _, f, e := buf.Get(); e != nil {
+ b.Fatal(e)
+ } else {
+ f()
+ }
+ }
+}
+
+// 895.5 ns/op 56 B/op 2 allocs/op
+func Benchmark2(b *testing.B) {
+ buf := NewBlocks[byte](1, 1000000)
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ if _, e := buf.GetAuto(); e != nil {
+ b.Fatal(e)
+ }
+ }
+}
+++ /dev/null
-package part
-
-import (
- "runtime"
-)
-
-type BufI[T any] interface {
- // // eg
- //
- // if tmpbuf, reachMax := buf.Get(); reachMax {
- // // do something with tmpbuf
- // }
- Get() []T
- Cache() int
-}
-
-type bufs[T any] struct {
- _ noCopy
- size uint64
- buf chan []T
-}
-
-func NewBufs[T any](size uint64) BufI[T] {
- return &bufs[T]{
- size: size,
- buf: make(chan []T, size),
- }
-}
-
-func (t *bufs[T]) Get() (tmpbuf []T) {
- if len(t.buf) > 0 {
- b := (<-t.buf)[:0]
- runtime.SetFinalizer(&b, func(objp any) {
- select {
- case t.buf <- *objp.(*[]T):
- default:
- }
- })
- return b
- } else {
- b := []T{}
- runtime.SetFinalizer(&b, func(objp any) {
- select {
- case t.buf <- *objp.(*[]T):
- default:
- }
- })
- return b
- }
-}
-
-func (t *bufs[T]) Cache() int {
- return len(t.buf)
-}
+++ /dev/null
-package part
-
-import (
- "runtime"
- "testing"
- "time"
-)
-
-func TestBuf(t *testing.T) {
- bu := NewBufs[byte](5)
- allocs(bu.Get())
- runtime.GC()
- time.Sleep(time.Second)
- if bu.Cache() != 1 {
- t.Fatal()
- }
- b := allocs(bu.Get())
- runtime.GC()
- time.Sleep(time.Second)
- if bu.Cache() != 0 {
- t.Fatal()
- }
- allocs(bu.Get())
- runtime.GC()
- time.Sleep(time.Second)
- t.Log(b, bu.Cache())
- if bu.Cache() != 1 {
- t.Fatal()
- }
-}
-
-func allocs(b []byte) []byte {
- return append(b, 0x01)
-}
pio "github.com/qydysky/part/io"
msgq "github.com/qydysky/part/msgq"
- pslice "github.com/qydysky/part/slice"
)
type Client struct {
TO int // depercated: use RTOMs WTOMs instead
RTOMs int
WTOMs int
- BufSize int // msg buf
+ BufSize int // msg buf 1: always use single buf >1: use bufs cycle
Header map[string]string
Proxy string
WTOMs: 300 * 1000,
Func_normal_close: func() {},
Func_abort_close: func() {},
- BufSize: 100,
+ BufSize: 10,
msg: msgq.NewType[*WsMsg](),
}
tmp.Url = config.Url
if tmp.Url == "" {
return nil, errors.New(`url == ""`)
}
- if v := config.BufSize; v > 100 {
+ if v := config.BufSize; v >= 1 {
tmp.BufSize = v
}
if v := config.TO; v != 0 {
}()
buf := make([]byte, humanize.KByte)
- msgs := pslice.NewBufs[byte](uint64(o.BufSize))
+ var (
+ msgs = make([][]byte, o.BufSize)
+ cuP = 0
+ )
for {
if err := c.SetReadDeadline(time.Now().Add(time.Duration(o.RTOMs * int(time.Millisecond)))); err != nil {
o.error(err)
return
}
- msg := msgs.Get()
msg_type, r, err := c.NextReader()
if err == nil {
- if tmp, e := pio.ReadAll(r, buf); e != nil {
+ if msg, e := pio.ReadAll(r, buf); e != nil {
err = e
} else {
- msg = append(msg, tmp...)
+ cuP++
+ if cuP >= o.BufSize {
+ cuP = 0
+ }
+ msgs[cuP] = append(msgs[cuP][:0], msg...)
}
}
if err != nil {
case websocket.PingMessage:
o.msg.PushLock_tag(`send`, &WsMsg{
Type: websocket.PongMessage,
- Msg: msg,
+ Msg: msgs[cuP],
})
case websocket.PongMessage:
o.pingT = time.Now().UnixMilli()
default:
o.msg.PushLock_tag(`rec`, &WsMsg{
Type: websocket.TextMessage,
- Msg: msg,
+ Msg: msgs[cuP],
})
}
}