From e8042d87f89df4abe52d30e2e76fad06a688d0ed Mon Sep 17 00:00:00 2001 From: qydysky Date: Thu, 8 May 2025 03:42:48 +0800 Subject: [PATCH] 1 (#50) * 1 * 1 --- slice/Blocks.go | 16 ++++-- slice/Blocks_test.go | 16 ++++++ websocket/Client.go | 120 ++++++++++++++++++++------------------- websocket/Client_test.go | 26 ++++++--- 4 files changed, 109 insertions(+), 69 deletions(-) diff --git a/slice/Blocks.go b/slice/Blocks.go index 3a89f91..bb428f2 100644 --- a/slice/Blocks.go +++ b/slice/Blocks.go @@ -42,13 +42,14 @@ func NewBlocks[T any](blockSize int, blockNum int) BlocksI[T] { free: make(chan int, blockNum+1), buf: make([]T, blockSize*blockNum), } - for i := 0; i < blockNum; i++ { + for i := range blockNum { p.free <- i } return p } func (t *blocks[T]) Get() ([]T, func(), error) { + t.gc() select { case offset := <-t.free: return t.buf[offset*t.size : (offset+1)*t.size], func() { @@ -61,15 +62,22 @@ func (t *blocks[T]) Get() ([]T, func(), error) { } func (t *blocks[T]) GetAuto() (b []T, e error) { + t.gc() select { case offset := <-t.free: b = t.buf[offset*t.size : (offset+1)*t.size] - runtime.SetFinalizer(&b, func(p any) { - clear(*p.(*[]T)) + runtime.AddCleanup(&b, func(offset int) { + clear(t.buf[offset*t.size : (offset+1)*t.size]) t.free <- offset - }) + }, offset) return default: return nil, ErrOverflow } } + +func (t *blocks[T]) gc() { + if len(t.free) == 0 { + runtime.GC() + } +} diff --git a/slice/Blocks_test.go b/slice/Blocks_test.go index a23eb50..1df7f2b 100644 --- a/slice/Blocks_test.go +++ b/slice/Blocks_test.go @@ -26,6 +26,22 @@ func TestMain(t *testing.T) { } } +func TestMain2(t *testing.T) { + buf := NewBlocks[byte](1024, 1) + if tmpbuf, e := buf.GetAuto(); e == nil { + clear(tmpbuf) + } else { + t.Fatal() + } + if tmpbuf, e := buf.GetAuto(); e == nil { + clear(tmpbuf) + if tmpbuf, e := buf.GetAuto(); e != nil { + clear(tmpbuf) + t.Fatal() + } + } +} + // 374.4 ns/op 32 B/op 1 allocs/op func Benchmark(b *testing.B) { buf := NewBlocks[byte](1024, 1) diff --git a/websocket/Client.go b/websocket/Client.go index 543c3b2..3dceb93 100644 --- a/websocket/Client.go +++ b/websocket/Client.go @@ -13,6 +13,7 @@ import ( pio "github.com/qydysky/part/io" msgq "github.com/qydysky/part/msgq" + pslice "github.com/qydysky/part/slice" ) type Client struct { @@ -42,7 +43,7 @@ type Client struct { type WsMsg struct { Type int - Msg []byte + Msg func(func([]byte) error) error } type Ping struct { @@ -153,64 +154,64 @@ func (o *Client) Handle() (*msgq.MsgType[*WsMsg], error) { }() buf := make([]byte, humanize.KByte) - var ( - msgs = make([][]byte, o.BufSize) - cuP = 0 - ) - for { - if err := c.SetReadDeadline(time.Now().Add(time.Duration(o.RTOMs * int(time.Millisecond)))); err != nil { - o.error(err) - return - } - msg_type, r, err := c.NextReader() - if err == nil { - if msg, e := pio.ReadAll(r, buf); e != nil { - err = e - } else { - cuP++ - if cuP >= o.BufSize { - cuP = 0 - } - msgs[cuP] = append(msgs[cuP][:0], msg...) + var msgs = pslice.NewBlocks[byte](humanize.KByte, o.BufSize) + var err error + for err == nil { + if e := c.SetReadDeadline(time.Now().Add(time.Duration(o.RTOMs * int(time.Millisecond)))); e != nil { + err = e + } else if msg_type, r, e := c.NextReader(); e != nil { + err = e + } else if msg, e := pio.ReadAll(r, buf); e != nil { + err = e + } else if tmpbuf, putBack, e := msgs.Get(); e != nil { + err = e + } else { + tmpbuf = append(tmpbuf[:0], msg...) + switch msg_type { + case websocket.PingMessage: + o.msg.PushLock_tag(`send`, &WsMsg{ + Type: websocket.PongMessage, + Msg: func(f func([]byte) error) error { + f(tmpbuf) + putBack() + return nil + }, + }) + case websocket.PongMessage: + o.pingT = time.Now().UnixMilli() + time.AfterFunc(time.Duration(o.Ping.Period*int(time.Millisecond)), func() { + o.msg.PushLock_tag(`send`, &WsMsg{ + Type: websocket.PingMessage, + Msg: func(f func([]byte) error) error { + f(o.Ping.Msg) + return nil + }, + }) + }) + o.Ping.had_pong = true + default: + o.msg.PushLock_tag(`rec`, &WsMsg{ + Type: websocket.TextMessage, + Msg: func(f func([]byte) error) error { + f(tmpbuf) + putBack() + return nil + }, + }) } } - if err != nil { - if e, ok := err.(*websocket.CloseError); ok { - switch e.Code { - case websocket.CloseNormalClosure: - o.Func_normal_close() - case websocket.CloseAbnormalClosure: - o.Func_abort_close() - o.error(err) - default: - o.error(err) - } - } else { + if e, ok := err.(*websocket.CloseError); ok { + switch e.Code { + case websocket.CloseNormalClosure: + o.Func_normal_close() + case websocket.CloseAbnormalClosure: + o.Func_abort_close() + o.error(err) + default: o.error(err) } - return - } - - switch msg_type { - case websocket.PingMessage: - o.msg.PushLock_tag(`send`, &WsMsg{ - Type: websocket.PongMessage, - Msg: msgs[cuP], - }) - case websocket.PongMessage: - o.pingT = time.Now().UnixMilli() - time.AfterFunc(time.Duration(o.Ping.Period*int(time.Millisecond)), func() { - o.msg.PushLock_tag(`send`, &WsMsg{ - Type: websocket.PingMessage, - Msg: o.Ping.Msg, - }) - }) - o.Ping.had_pong = true - default: - o.msg.PushLock_tag(`rec`, &WsMsg{ - Type: websocket.TextMessage, - Msg: msgs[cuP], - }) + } else if err != nil { + o.error(err) } } }() @@ -224,7 +225,9 @@ func (o *Client) Handle() (*msgq.MsgType[*WsMsg], error) { o.error(err) return true } - if err := c.WriteMessage(wm.Type, wm.Msg); err != nil { + if err := wm.Msg(func(b []byte) error { + return c.WriteMessage(wm.Type, b) + }); err != nil { o.error(err) return true } @@ -256,7 +259,10 @@ func (o *Client) Handle() (*msgq.MsgType[*WsMsg], error) { func (o *Client) Heartbeat() (err error) { o.msg.PushLock_tag(`send`, &WsMsg{ Type: websocket.PingMessage, - Msg: o.Ping.Msg, + Msg: func(f func([]byte) error) error { + f(o.Ping.Msg) + return nil + }, }) time.Sleep(time.Duration((o.RTOMs + 100) * int(time.Millisecond))) return o.Error() diff --git a/websocket/Client_test.go b/websocket/Client_test.go index ba89119..1ad293d 100644 --- a/websocket/Client_test.go +++ b/websocket/Client_test.go @@ -68,13 +68,18 @@ func Test_Client(t *testing.T) { } ws.Pull_tag_only(`rec`, func(wm *WsMsg) (disable bool) { - if string(wm.Msg) != "test" { - t.Fatal() - } + wm.Msg(func(b []byte) error { + if string(b) != "test" { + t.Fatal() + } + return nil + }) return false }) ws.PushLock_tag(`send`, &WsMsg{ - Msg: []byte("test"), + Msg: func(f func([]byte) error) error { + return f([]byte("test")) + }, }) go func() { @@ -115,13 +120,18 @@ func Test_Client(t *testing.T) { } ws.Pull_tag_only(`rec`, func(wm *WsMsg) (disable bool) { - if string(wm.Msg) != "test" { - t.Fatal() - } + wm.Msg(func(b []byte) error { + if string(b) != "test" { + t.Fatal() + } + return nil + }) return false }) ws.PushLock_tag(`send`, &WsMsg{ - Msg: []byte("test"), + Msg: func(f func([]byte) error) error { + return f([]byte("test")) + }, }) go func() { -- 2.39.2