import (
"runtime"
- "sync/atomic"
)
type BufI[T any] interface {
// // eg
//
- // if tmpbuf, e := buf.Get(); e == nil {
+ // if tmpbuf, reachMax := buf.Get(); reachMax {
// // do something with tmpbuf
// }
Get() []T
- CacheCount() int64
+ Cache() int
}
type bufs[T any] struct {
- _ noCopy
- num atomic.Int64
- buf [][]T
+ _ noCopy
+ size uint64
+ buf chan []T
}
-func NewBufs[T any]() BufI[T] {
+func NewBufs[T any](size uint64) BufI[T] {
return &bufs[T]{
- buf: [][]T{},
+ size: size,
+ buf: make(chan []T, size),
}
}
-func (t *bufs[T]) Get() (b []T) {
+func (t *bufs[T]) Get() (tmpbuf []T) {
if len(t.buf) > 0 {
- b = t.buf[0][:0]
- t.buf = t.buf[:copy(t.buf, t.buf[1:])]
- t.num.Add(-1)
- return
+ b := (<-t.buf)[:0]
+ runtime.SetFinalizer(&b, func(objp any) {
+ select {
+ case t.buf <- *objp.(*[]T):
+ default:
+ }
+ })
+ return b
} else {
- b = []T{}
+ b := []T{}
+ runtime.SetFinalizer(&b, func(objp any) {
+ select {
+ case t.buf <- *objp.(*[]T):
+ default:
+ }
+ })
+ return b
}
- runtime.SetFinalizer(&b, func(objp any) {
- t.buf = append(t.buf, *objp.(*[]T))
- t.num.Add(1)
- })
- return
}
-func (t *bufs[T]) CacheCount() int64 {
- return t.num.Load()
+func (t *bufs[T]) Cache() int {
+ return len(t.buf)
}
pio "github.com/qydysky/part/io"
msgq "github.com/qydysky/part/msgq"
+ pslice "github.com/qydysky/part/slice"
)
type Client struct {
TO int // depercated: use RTOMs WTOMs instead
RTOMs int
WTOMs int
- BufSize int // msg buf 1: always use single buf >1: use bufs cycle
+ BufSize int // msg buf
Header map[string]string
Proxy string
WTOMs: 300 * 1000,
Func_normal_close: func() {},
Func_abort_close: func() {},
- BufSize: 10,
+ BufSize: 100,
msg: msgq.NewType[*WsMsg](),
}
tmp.Url = config.Url
if tmp.Url == "" {
return nil, errors.New(`url == ""`)
}
- if v := config.BufSize; v >= 1 {
+ if v := config.BufSize; v > 100 {
tmp.BufSize = v
}
if v := config.TO; v != 0 {
}()
buf := make([]byte, humanize.KByte)
- var (
- msgs = make([][]byte, o.BufSize)
- cuP = 0
- )
+ msgs := pslice.NewBufs[byte](uint64(o.BufSize))
for {
if err := c.SetReadDeadline(time.Now().Add(time.Duration(o.RTOMs * int(time.Millisecond)))); err != nil {
o.error(err)
return
}
+ msg := msgs.Get()
msg_type, r, err := c.NextReader()
if err == nil {
- if msg, e := pio.ReadAll(r, buf); e != nil {
+ if tmp, e := pio.ReadAll(r, buf); e != nil {
err = e
} else {
- cuP++
- if cuP >= o.BufSize {
- cuP = 0
- }
- msgs[cuP] = append(msgs[cuP][:0], msg...)
+ msg = append(msg, tmp...)
}
}
if err != nil {
case websocket.PingMessage:
o.msg.PushLock_tag(`send`, &WsMsg{
Type: websocket.PongMessage,
- Msg: msgs[cuP],
+ Msg: msg,
})
case websocket.PongMessage:
o.pingT = time.Now().UnixMilli()
default:
o.msg.PushLock_tag(`rec`, &WsMsg{
Type: websocket.TextMessage,
- Msg: msgs[cuP],
+ Msg: msg,
})
}
}