]> 127.0.0.1 Git - part/.git/commitdiff
1 (#50) v0.28.20250507194255
authorqydysky <qydysky@foxmail.com>
Wed, 7 May 2025 19:42:48 +0000 (03:42 +0800)
committerGitHub <noreply@github.com>
Wed, 7 May 2025 19:42:48 +0000 (03:42 +0800)
* 1

* 1

slice/Blocks.go
slice/Blocks_test.go
websocket/Client.go
websocket/Client_test.go

index 3a89f91988e4e9b9bbba7b576cdf5dcfdc3beab4..bb428f2474a5a35100efea33f41560d637eba936 100644 (file)
@@ -42,13 +42,14 @@ func NewBlocks[T any](blockSize int, blockNum int) BlocksI[T] {
                free: make(chan int, blockNum+1),
                buf:  make([]T, blockSize*blockNum),
        }
-       for i := 0; i < blockNum; i++ {
+       for i := range blockNum {
                p.free <- i
        }
        return p
 }
 
 func (t *blocks[T]) Get() ([]T, func(), error) {
+       t.gc()
        select {
        case offset := <-t.free:
                return t.buf[offset*t.size : (offset+1)*t.size], func() {
@@ -61,15 +62,22 @@ func (t *blocks[T]) Get() ([]T, func(), error) {
 }
 
 func (t *blocks[T]) GetAuto() (b []T, e error) {
+       t.gc()
        select {
        case offset := <-t.free:
                b = t.buf[offset*t.size : (offset+1)*t.size]
-               runtime.SetFinalizer(&b, func(p any) {
-                       clear(*p.(*[]T))
+               runtime.AddCleanup(&b, func(offset int) {
+                       clear(t.buf[offset*t.size : (offset+1)*t.size])
                        t.free <- offset
-               })
+               }, offset)
                return
        default:
                return nil, ErrOverflow
        }
 }
+
+func (t *blocks[T]) gc() {
+       if len(t.free) == 0 {
+               runtime.GC()
+       }
+}
index a23eb5097c5828cbde852f103432a917b7af3507..1df7f2b1d106ff83f5e2107ea0ada64e11247f90 100644 (file)
@@ -26,6 +26,22 @@ func TestMain(t *testing.T) {
        }
 }
 
+func TestMain2(t *testing.T) {
+       buf := NewBlocks[byte](1024, 1)
+       if tmpbuf, e := buf.GetAuto(); e == nil {
+               clear(tmpbuf)
+       } else {
+               t.Fatal()
+       }
+       if tmpbuf, e := buf.GetAuto(); e == nil {
+               clear(tmpbuf)
+               if tmpbuf, e := buf.GetAuto(); e != nil {
+                       clear(tmpbuf)
+                       t.Fatal()
+               }
+       }
+}
+
 // 374.4 ns/op            32 B/op          1 allocs/op
 func Benchmark(b *testing.B) {
        buf := NewBlocks[byte](1024, 1)
index 543c3b282d68680c849ff1e4967a0d394da4949e..3dceb939ec78c0311d6cbcb1345e65f14096abf7 100644 (file)
@@ -13,6 +13,7 @@ import (
 
        pio "github.com/qydysky/part/io"
        msgq "github.com/qydysky/part/msgq"
+       pslice "github.com/qydysky/part/slice"
 )
 
 type Client struct {
@@ -42,7 +43,7 @@ type Client struct {
 
 type WsMsg struct {
        Type int
-       Msg  []byte
+       Msg  func(func([]byte) error) error
 }
 
 type Ping struct {
@@ -153,64 +154,64 @@ func (o *Client) Handle() (*msgq.MsgType[*WsMsg], error) {
                }()
 
                buf := make([]byte, humanize.KByte)
-               var (
-                       msgs = make([][]byte, o.BufSize)
-                       cuP  = 0
-               )
-               for {
-                       if err := c.SetReadDeadline(time.Now().Add(time.Duration(o.RTOMs * int(time.Millisecond)))); err != nil {
-                               o.error(err)
-                               return
-                       }
-                       msg_type, r, err := c.NextReader()
-                       if err == nil {
-                               if msg, e := pio.ReadAll(r, buf); e != nil {
-                                       err = e
-                               } else {
-                                       cuP++
-                                       if cuP >= o.BufSize {
-                                               cuP = 0
-                                       }
-                                       msgs[cuP] = append(msgs[cuP][:0], msg...)
+               var msgs = pslice.NewBlocks[byte](humanize.KByte, o.BufSize)
+               var err error
+               for err == nil {
+                       if e := c.SetReadDeadline(time.Now().Add(time.Duration(o.RTOMs * int(time.Millisecond)))); e != nil {
+                               err = e
+                       } else if msg_type, r, e := c.NextReader(); e != nil {
+                               err = e
+                       } else if msg, e := pio.ReadAll(r, buf); e != nil {
+                               err = e
+                       } else if tmpbuf, putBack, e := msgs.Get(); e != nil {
+                               err = e
+                       } else {
+                               tmpbuf = append(tmpbuf[:0], msg...)
+                               switch msg_type {
+                               case websocket.PingMessage:
+                                       o.msg.PushLock_tag(`send`, &WsMsg{
+                                               Type: websocket.PongMessage,
+                                               Msg: func(f func([]byte) error) error {
+                                                       f(tmpbuf)
+                                                       putBack()
+                                                       return nil
+                                               },
+                                       })
+                               case websocket.PongMessage:
+                                       o.pingT = time.Now().UnixMilli()
+                                       time.AfterFunc(time.Duration(o.Ping.Period*int(time.Millisecond)), func() {
+                                               o.msg.PushLock_tag(`send`, &WsMsg{
+                                                       Type: websocket.PingMessage,
+                                                       Msg: func(f func([]byte) error) error {
+                                                               f(o.Ping.Msg)
+                                                               return nil
+                                                       },
+                                               })
+                                       })
+                                       o.Ping.had_pong = true
+                               default:
+                                       o.msg.PushLock_tag(`rec`, &WsMsg{
+                                               Type: websocket.TextMessage,
+                                               Msg: func(f func([]byte) error) error {
+                                                       f(tmpbuf)
+                                                       putBack()
+                                                       return nil
+                                               },
+                                       })
                                }
                        }
-                       if err != nil {
-                               if e, ok := err.(*websocket.CloseError); ok {
-                                       switch e.Code {
-                                       case websocket.CloseNormalClosure:
-                                               o.Func_normal_close()
-                                       case websocket.CloseAbnormalClosure:
-                                               o.Func_abort_close()
-                                               o.error(err)
-                                       default:
-                                               o.error(err)
-                                       }
-                               } else {
+                       if e, ok := err.(*websocket.CloseError); ok {
+                               switch e.Code {
+                               case websocket.CloseNormalClosure:
+                                       o.Func_normal_close()
+                               case websocket.CloseAbnormalClosure:
+                                       o.Func_abort_close()
+                                       o.error(err)
+                               default:
                                        o.error(err)
                                }
-                               return
-                       }
-
-                       switch msg_type {
-                       case websocket.PingMessage:
-                               o.msg.PushLock_tag(`send`, &WsMsg{
-                                       Type: websocket.PongMessage,
-                                       Msg:  msgs[cuP],
-                               })
-                       case websocket.PongMessage:
-                               o.pingT = time.Now().UnixMilli()
-                               time.AfterFunc(time.Duration(o.Ping.Period*int(time.Millisecond)), func() {
-                                       o.msg.PushLock_tag(`send`, &WsMsg{
-                                               Type: websocket.PingMessage,
-                                               Msg:  o.Ping.Msg,
-                                       })
-                               })
-                               o.Ping.had_pong = true
-                       default:
-                               o.msg.PushLock_tag(`rec`, &WsMsg{
-                                       Type: websocket.TextMessage,
-                                       Msg:  msgs[cuP],
-                               })
+                       } else if err != nil {
+                               o.error(err)
                        }
                }
        }()
@@ -224,7 +225,9 @@ func (o *Client) Handle() (*msgq.MsgType[*WsMsg], error) {
                        o.error(err)
                        return true
                }
-               if err := c.WriteMessage(wm.Type, wm.Msg); err != nil {
+               if err := wm.Msg(func(b []byte) error {
+                       return c.WriteMessage(wm.Type, b)
+               }); err != nil {
                        o.error(err)
                        return true
                }
@@ -256,7 +259,10 @@ func (o *Client) Handle() (*msgq.MsgType[*WsMsg], error) {
 func (o *Client) Heartbeat() (err error) {
        o.msg.PushLock_tag(`send`, &WsMsg{
                Type: websocket.PingMessage,
-               Msg:  o.Ping.Msg,
+               Msg: func(f func([]byte) error) error {
+                       f(o.Ping.Msg)
+                       return nil
+               },
        })
        time.Sleep(time.Duration((o.RTOMs + 100) * int(time.Millisecond)))
        return o.Error()
index ba891192605a86d9087a04985f2229911d0d3045..1ad293d3c9bb6505a8feeda81094fe5b08a8d6e7 100644 (file)
@@ -68,13 +68,18 @@ func Test_Client(t *testing.T) {
                }
 
                ws.Pull_tag_only(`rec`, func(wm *WsMsg) (disable bool) {
-                       if string(wm.Msg) != "test" {
-                               t.Fatal()
-                       }
+                       wm.Msg(func(b []byte) error {
+                               if string(b) != "test" {
+                                       t.Fatal()
+                               }
+                               return nil
+                       })
                        return false
                })
                ws.PushLock_tag(`send`, &WsMsg{
-                       Msg: []byte("test"),
+                       Msg: func(f func([]byte) error) error {
+                               return f([]byte("test"))
+                       },
                })
 
                go func() {
@@ -115,13 +120,18 @@ func Test_Client(t *testing.T) {
                }
 
                ws.Pull_tag_only(`rec`, func(wm *WsMsg) (disable bool) {
-                       if string(wm.Msg) != "test" {
-                               t.Fatal()
-                       }
+                       wm.Msg(func(b []byte) error {
+                               if string(b) != "test" {
+                                       t.Fatal()
+                               }
+                               return nil
+                       })
                        return false
                })
                ws.PushLock_tag(`send`, &WsMsg{
-                       Msg: []byte("test"),
+                       Msg: func(f func([]byte) error) error {
+                               return f([]byte("test"))
+                       },
                })
 
                go func() {