From: qydysky Date: Mon, 12 Aug 2024 13:37:25 +0000 (+0800) Subject: 1 X-Git-Tag: v0.28.20240815155147~1 X-Git-Url: http://127.0.0.1:8081/?a=commitdiff_plain;h=46555dde0dd13f8a866eb7f3750dda35bf866aa8;p=part%2F.git 1 --- diff --git a/slice/Buf.go b/slice/Buf.go index c0549ed..09f066d 100644 --- a/slice/Buf.go +++ b/slice/Buf.go @@ -2,47 +2,53 @@ package part import ( "runtime" - "sync/atomic" ) type BufI[T any] interface { // // eg // - // if tmpbuf, e := buf.Get(); e == nil { + // if tmpbuf, reachMax := buf.Get(); reachMax { // // do something with tmpbuf // } Get() []T - CacheCount() int64 + Cache() int } type bufs[T any] struct { - _ noCopy - num atomic.Int64 - buf [][]T + _ noCopy + size uint64 + buf chan []T } -func NewBufs[T any]() BufI[T] { +func NewBufs[T any](size uint64) BufI[T] { return &bufs[T]{ - buf: [][]T{}, + size: size, + buf: make(chan []T, size), } } -func (t *bufs[T]) Get() (b []T) { +func (t *bufs[T]) Get() (tmpbuf []T) { if len(t.buf) > 0 { - b = t.buf[0][:0] - t.buf = t.buf[:copy(t.buf, t.buf[1:])] - t.num.Add(-1) - return + b := (<-t.buf)[:0] + runtime.SetFinalizer(&b, func(objp any) { + select { + case t.buf <- *objp.(*[]T): + default: + } + }) + return b } else { - b = []T{} + b := []T{} + runtime.SetFinalizer(&b, func(objp any) { + select { + case t.buf <- *objp.(*[]T): + default: + } + }) + return b } - runtime.SetFinalizer(&b, func(objp any) { - t.buf = append(t.buf, *objp.(*[]T)) - t.num.Add(1) - }) - return } -func (t *bufs[T]) CacheCount() int64 { - return t.num.Load() +func (t *bufs[T]) Cache() int { + return len(t.buf) } diff --git a/slice/Buf_test.go b/slice/Buf_test.go index 260739c..771d840 100644 --- a/slice/Buf_test.go +++ b/slice/Buf_test.go @@ -7,24 +7,24 @@ import ( ) func TestBuf(t *testing.T) { - bu := NewBufs[byte]() + bu := NewBufs[byte](5) allocs(bu.Get()) runtime.GC() time.Sleep(time.Second) - if bu.CacheCount() != 1 { + if bu.Cache() != 1 { t.Fatal() } b := allocs(bu.Get()) runtime.GC() time.Sleep(time.Second) - if bu.CacheCount() != 0 { + if bu.Cache() != 0 { t.Fatal() } allocs(bu.Get()) runtime.GC() time.Sleep(time.Second) - t.Log(b, bu.CacheCount()) - if bu.CacheCount() != 1 { + t.Log(b, bu.Cache()) + if bu.Cache() != 1 { t.Fatal() } } diff --git a/websocket/Client.go b/websocket/Client.go index 543c3b2..6dcb6be 100644 --- a/websocket/Client.go +++ b/websocket/Client.go @@ -13,6 +13,7 @@ import ( pio "github.com/qydysky/part/io" msgq "github.com/qydysky/part/msgq" + pslice "github.com/qydysky/part/slice" ) type Client struct { @@ -23,7 +24,7 @@ type Client struct { TO int // depercated: use RTOMs WTOMs instead RTOMs int WTOMs int - BufSize int // msg buf 1: always use single buf >1: use bufs cycle + BufSize int // msg buf Header map[string]string Proxy string @@ -57,14 +58,14 @@ func New_client(config *Client) (*Client, error) { WTOMs: 300 * 1000, Func_normal_close: func() {}, Func_abort_close: func() {}, - BufSize: 10, + BufSize: 100, msg: msgq.NewType[*WsMsg](), } tmp.Url = config.Url if tmp.Url == "" { return nil, errors.New(`url == ""`) } - if v := config.BufSize; v >= 1 { + if v := config.BufSize; v > 100 { tmp.BufSize = v } if v := config.TO; v != 0 { @@ -153,25 +154,19 @@ func (o *Client) Handle() (*msgq.MsgType[*WsMsg], error) { }() buf := make([]byte, humanize.KByte) - var ( - msgs = make([][]byte, o.BufSize) - cuP = 0 - ) + msgs := pslice.NewBufs[byte](uint64(o.BufSize)) for { if err := c.SetReadDeadline(time.Now().Add(time.Duration(o.RTOMs * int(time.Millisecond)))); err != nil { o.error(err) return } + msg := msgs.Get() msg_type, r, err := c.NextReader() if err == nil { - if msg, e := pio.ReadAll(r, buf); e != nil { + if tmp, e := pio.ReadAll(r, buf); e != nil { err = e } else { - cuP++ - if cuP >= o.BufSize { - cuP = 0 - } - msgs[cuP] = append(msgs[cuP][:0], msg...) + msg = append(msg, tmp...) } } if err != nil { @@ -195,7 +190,7 @@ func (o *Client) Handle() (*msgq.MsgType[*WsMsg], error) { case websocket.PingMessage: o.msg.PushLock_tag(`send`, &WsMsg{ Type: websocket.PongMessage, - Msg: msgs[cuP], + Msg: msg, }) case websocket.PongMessage: o.pingT = time.Now().UnixMilli() @@ -209,7 +204,7 @@ func (o *Client) Handle() (*msgq.MsgType[*WsMsg], error) { default: o.msg.PushLock_tag(`rec`, &WsMsg{ Type: websocket.TextMessage, - Msg: msgs[cuP], + Msg: msg, }) } }