From 51d06712250c3b4f8d2bd575ca4d3451672ad17f Mon Sep 17 00:00:00 2001 From: qydysky Date: Thu, 15 Aug 2024 23:46:53 +0800 Subject: [PATCH] 1 --- slice/Blocks.go | 23 ++++++++++++++++++- slice/Blocks_test.go | 24 ++++++++++++++++++++ slice/Buf.go | 54 -------------------------------------------- slice/Buf_test.go | 34 ---------------------------- websocket/Client.go | 25 ++++++++++++-------- 5 files changed, 61 insertions(+), 99 deletions(-) delete mode 100644 slice/Buf.go delete mode 100644 slice/Buf_test.go diff --git a/slice/Blocks.go b/slice/Blocks.go index 34acc57..3a89f91 100644 --- a/slice/Blocks.go +++ b/slice/Blocks.go @@ -2,17 +2,24 @@ package part import ( "errors" + "runtime" ) type BlocksI[T any] interface { // // eg // // if tmpbuf, putBack, e := buf.Get(); e == nil { - // clear(tmpbuf) // // do something with tmpbuf // putBack() // } Get() ([]T, func(), error) + + // // eg + // + // if tmpbuf, e := buf.GetAuto(); e == nil { + // // do something with tmpbuf + // } + GetAuto() ([]T, error) } type blocks[T any] struct { @@ -52,3 +59,17 @@ func (t *blocks[T]) Get() ([]T, func(), error) { return nil, func() {}, ErrOverflow } } + +func (t *blocks[T]) GetAuto() (b []T, e error) { + select { + case offset := <-t.free: + b = t.buf[offset*t.size : (offset+1)*t.size] + runtime.SetFinalizer(&b, func(p any) { + clear(*p.(*[]T)) + t.free <- offset + }) + return + default: + return nil, ErrOverflow + } +} diff --git a/slice/Blocks_test.go b/slice/Blocks_test.go index 303e159..a23eb50 100644 --- a/slice/Blocks_test.go +++ b/slice/Blocks_test.go @@ -25,3 +25,27 @@ func TestMain(t *testing.T) { putBack() } } + +// 374.4 ns/op 32 B/op 1 allocs/op +func Benchmark(b *testing.B) { + buf := NewBlocks[byte](1024, 1) + b.ResetTimer() + for i := 0; i < b.N; i++ { + if _, f, e := buf.Get(); e != nil { + b.Fatal(e) + } else { + f() + } + } +} + +// 895.5 ns/op 56 B/op 2 allocs/op +func Benchmark2(b *testing.B) { + buf := NewBlocks[byte](1, 1000000) + b.ResetTimer() + for i := 0; i < b.N; i++ { + if _, e := buf.GetAuto(); e != nil { + b.Fatal(e) + } + } +} diff --git a/slice/Buf.go b/slice/Buf.go deleted file mode 100644 index 09f066d..0000000 --- a/slice/Buf.go +++ /dev/null @@ -1,54 +0,0 @@ -package part - -import ( - "runtime" -) - -type BufI[T any] interface { - // // eg - // - // if tmpbuf, reachMax := buf.Get(); reachMax { - // // do something with tmpbuf - // } - Get() []T - Cache() int -} - -type bufs[T any] struct { - _ noCopy - size uint64 - buf chan []T -} - -func NewBufs[T any](size uint64) BufI[T] { - return &bufs[T]{ - size: size, - buf: make(chan []T, size), - } -} - -func (t *bufs[T]) Get() (tmpbuf []T) { - if len(t.buf) > 0 { - b := (<-t.buf)[:0] - runtime.SetFinalizer(&b, func(objp any) { - select { - case t.buf <- *objp.(*[]T): - default: - } - }) - return b - } else { - b := []T{} - runtime.SetFinalizer(&b, func(objp any) { - select { - case t.buf <- *objp.(*[]T): - default: - } - }) - return b - } -} - -func (t *bufs[T]) Cache() int { - return len(t.buf) -} diff --git a/slice/Buf_test.go b/slice/Buf_test.go deleted file mode 100644 index 771d840..0000000 --- a/slice/Buf_test.go +++ /dev/null @@ -1,34 +0,0 @@ -package part - -import ( - "runtime" - "testing" - "time" -) - -func TestBuf(t *testing.T) { - bu := NewBufs[byte](5) - allocs(bu.Get()) - runtime.GC() - time.Sleep(time.Second) - if bu.Cache() != 1 { - t.Fatal() - } - b := allocs(bu.Get()) - runtime.GC() - time.Sleep(time.Second) - if bu.Cache() != 0 { - t.Fatal() - } - allocs(bu.Get()) - runtime.GC() - time.Sleep(time.Second) - t.Log(b, bu.Cache()) - if bu.Cache() != 1 { - t.Fatal() - } -} - -func allocs(b []byte) []byte { - return append(b, 0x01) -} diff --git a/websocket/Client.go b/websocket/Client.go index 6dcb6be..543c3b2 100644 --- a/websocket/Client.go +++ b/websocket/Client.go @@ -13,7 +13,6 @@ import ( pio "github.com/qydysky/part/io" msgq "github.com/qydysky/part/msgq" - pslice "github.com/qydysky/part/slice" ) type Client struct { @@ -24,7 +23,7 @@ type Client struct { TO int // depercated: use RTOMs WTOMs instead RTOMs int WTOMs int - BufSize int // msg buf + BufSize int // msg buf 1: always use single buf >1: use bufs cycle Header map[string]string Proxy string @@ -58,14 +57,14 @@ func New_client(config *Client) (*Client, error) { WTOMs: 300 * 1000, Func_normal_close: func() {}, Func_abort_close: func() {}, - BufSize: 100, + BufSize: 10, msg: msgq.NewType[*WsMsg](), } tmp.Url = config.Url if tmp.Url == "" { return nil, errors.New(`url == ""`) } - if v := config.BufSize; v > 100 { + if v := config.BufSize; v >= 1 { tmp.BufSize = v } if v := config.TO; v != 0 { @@ -154,19 +153,25 @@ func (o *Client) Handle() (*msgq.MsgType[*WsMsg], error) { }() buf := make([]byte, humanize.KByte) - msgs := pslice.NewBufs[byte](uint64(o.BufSize)) + var ( + msgs = make([][]byte, o.BufSize) + cuP = 0 + ) for { if err := c.SetReadDeadline(time.Now().Add(time.Duration(o.RTOMs * int(time.Millisecond)))); err != nil { o.error(err) return } - msg := msgs.Get() msg_type, r, err := c.NextReader() if err == nil { - if tmp, e := pio.ReadAll(r, buf); e != nil { + if msg, e := pio.ReadAll(r, buf); e != nil { err = e } else { - msg = append(msg, tmp...) + cuP++ + if cuP >= o.BufSize { + cuP = 0 + } + msgs[cuP] = append(msgs[cuP][:0], msg...) } } if err != nil { @@ -190,7 +195,7 @@ func (o *Client) Handle() (*msgq.MsgType[*WsMsg], error) { case websocket.PingMessage: o.msg.PushLock_tag(`send`, &WsMsg{ Type: websocket.PongMessage, - Msg: msg, + Msg: msgs[cuP], }) case websocket.PongMessage: o.pingT = time.Now().UnixMilli() @@ -204,7 +209,7 @@ func (o *Client) Handle() (*msgq.MsgType[*WsMsg], error) { default: o.msg.PushLock_tag(`rec`, &WsMsg{ Type: websocket.TextMessage, - Msg: msg, + Msg: msgs[cuP], }) } } -- 2.39.2