]> 127.0.0.1 Git - part/.git/commitdiff
1
authorqydysky <qydysky@foxmail.com>
Mon, 12 Aug 2024 13:37:25 +0000 (21:37 +0800)
committerqydysky <qydysky@foxmail.com>
Mon, 12 Aug 2024 13:37:25 +0000 (21:37 +0800)
slice/Buf.go
slice/Buf_test.go
websocket/Client.go

index c0549eda8901b17855ab210acc53cd124c61553a..09f066df56e8cbf245c1d10f06c52698fb43d674 100644 (file)
@@ -2,47 +2,53 @@ package part
 
 import (
        "runtime"
-       "sync/atomic"
 )
 
 type BufI[T any] interface {
        // // eg
        //
-       //      if tmpbuf, e := buf.Get(); e == nil {
+       //      if tmpbuf, reachMax := buf.Get(); reachMax {
        //              // do something with tmpbuf
        //      }
        Get() []T
-       CacheCount() int64
+       Cache() int
 }
 
 type bufs[T any] struct {
-       _   noCopy
-       num atomic.Int64
-       buf [][]T
+       _    noCopy
+       size uint64
+       buf  chan []T
 }
 
-func NewBufs[T any]() BufI[T] {
+func NewBufs[T any](size uint64) BufI[T] {
        return &bufs[T]{
-               buf: [][]T{},
+               size: size,
+               buf:  make(chan []T, size),
        }
 }
 
-func (t *bufs[T]) Get() (b []T) {
+func (t *bufs[T]) Get() (tmpbuf []T) {
        if len(t.buf) > 0 {
-               b = t.buf[0][:0]
-               t.buf = t.buf[:copy(t.buf, t.buf[1:])]
-               t.num.Add(-1)
-               return
+               b := (<-t.buf)[:0]
+               runtime.SetFinalizer(&b, func(objp any) {
+                       select {
+                       case t.buf <- *objp.(*[]T):
+                       default:
+                       }
+               })
+               return b
        } else {
-               b = []T{}
+               b := []T{}
+               runtime.SetFinalizer(&b, func(objp any) {
+                       select {
+                       case t.buf <- *objp.(*[]T):
+                       default:
+                       }
+               })
+               return b
        }
-       runtime.SetFinalizer(&b, func(objp any) {
-               t.buf = append(t.buf, *objp.(*[]T))
-               t.num.Add(1)
-       })
-       return
 }
 
-func (t *bufs[T]) CacheCount() int64 {
-       return t.num.Load()
+func (t *bufs[T]) Cache() int {
+       return len(t.buf)
 }
index 260739ca21218a266b1ad27144e8e7aba17c1ec0..771d84003b036c816b3809cbc4f166eaab957004 100644 (file)
@@ -7,24 +7,24 @@ import (
 )
 
 func TestBuf(t *testing.T) {
-       bu := NewBufs[byte]()
+       bu := NewBufs[byte](5)
        allocs(bu.Get())
        runtime.GC()
        time.Sleep(time.Second)
-       if bu.CacheCount() != 1 {
+       if bu.Cache() != 1 {
                t.Fatal()
        }
        b := allocs(bu.Get())
        runtime.GC()
        time.Sleep(time.Second)
-       if bu.CacheCount() != 0 {
+       if bu.Cache() != 0 {
                t.Fatal()
        }
        allocs(bu.Get())
        runtime.GC()
        time.Sleep(time.Second)
-       t.Log(b, bu.CacheCount())
-       if bu.CacheCount() != 1 {
+       t.Log(b, bu.Cache())
+       if bu.Cache() != 1 {
                t.Fatal()
        }
 }
index 543c3b282d68680c849ff1e4967a0d394da4949e..6dcb6be4f4ec0618526c31b986f10878e22ac5d4 100644 (file)
@@ -13,6 +13,7 @@ import (
 
        pio "github.com/qydysky/part/io"
        msgq "github.com/qydysky/part/msgq"
+       pslice "github.com/qydysky/part/slice"
 )
 
 type Client struct {
@@ -23,7 +24,7 @@ type Client struct {
        TO      int // depercated: use RTOMs WTOMs instead
        RTOMs   int
        WTOMs   int
-       BufSize int // msg buf 1: always use single buf >1: use bufs cycle
+       BufSize int // msg buf
        Header  map[string]string
        Proxy   string
 
@@ -57,14 +58,14 @@ func New_client(config *Client) (*Client, error) {
                WTOMs:             300 * 1000,
                Func_normal_close: func() {},
                Func_abort_close:  func() {},
-               BufSize:           10,
+               BufSize:           100,
                msg:               msgq.NewType[*WsMsg](),
        }
        tmp.Url = config.Url
        if tmp.Url == "" {
                return nil, errors.New(`url == ""`)
        }
-       if v := config.BufSize; v >= 1 {
+       if v := config.BufSize; v > 100 {
                tmp.BufSize = v
        }
        if v := config.TO; v != 0 {
@@ -153,25 +154,19 @@ func (o *Client) Handle() (*msgq.MsgType[*WsMsg], error) {
                }()
 
                buf := make([]byte, humanize.KByte)
-               var (
-                       msgs = make([][]byte, o.BufSize)
-                       cuP  = 0
-               )
+               msgs := pslice.NewBufs[byte](uint64(o.BufSize))
                for {
                        if err := c.SetReadDeadline(time.Now().Add(time.Duration(o.RTOMs * int(time.Millisecond)))); err != nil {
                                o.error(err)
                                return
                        }
+                       msg := msgs.Get()
                        msg_type, r, err := c.NextReader()
                        if err == nil {
-                               if msg, e := pio.ReadAll(r, buf); e != nil {
+                               if tmp, e := pio.ReadAll(r, buf); e != nil {
                                        err = e
                                } else {
-                                       cuP++
-                                       if cuP >= o.BufSize {
-                                               cuP = 0
-                                       }
-                                       msgs[cuP] = append(msgs[cuP][:0], msg...)
+                                       msg = append(msg, tmp...)
                                }
                        }
                        if err != nil {
@@ -195,7 +190,7 @@ func (o *Client) Handle() (*msgq.MsgType[*WsMsg], error) {
                        case websocket.PingMessage:
                                o.msg.PushLock_tag(`send`, &WsMsg{
                                        Type: websocket.PongMessage,
-                                       Msg:  msgs[cuP],
+                                       Msg:  msg,
                                })
                        case websocket.PongMessage:
                                o.pingT = time.Now().UnixMilli()
@@ -209,7 +204,7 @@ func (o *Client) Handle() (*msgq.MsgType[*WsMsg], error) {
                        default:
                                o.msg.PushLock_tag(`rec`, &WsMsg{
                                        Type: websocket.TextMessage,
-                                       Msg:  msgs[cuP],
+                                       Msg:  msg,
                                })
                        }
                }