free: make(chan int, blockNum+1),
buf: make([]T, blockSize*blockNum),
}
- for i := 0; i < blockNum; i++ {
+ for i := range blockNum {
p.free <- i
}
return p
}
func (t *blocks[T]) Get() ([]T, func(), error) {
+ t.gc()
select {
case offset := <-t.free:
return t.buf[offset*t.size : (offset+1)*t.size], func() {
}
func (t *blocks[T]) GetAuto() (b []T, e error) {
+ t.gc()
select {
case offset := <-t.free:
b = t.buf[offset*t.size : (offset+1)*t.size]
- runtime.SetFinalizer(&b, func(p any) {
- clear(*p.(*[]T))
+ runtime.AddCleanup(&b, func(offset int) {
+ clear(t.buf[offset*t.size : (offset+1)*t.size])
t.free <- offset
- })
+ }, offset)
return
default:
return nil, ErrOverflow
}
}
+
+func (t *blocks[T]) gc() {
+ if len(t.free) == 0 {
+ runtime.GC()
+ }
+}
pio "github.com/qydysky/part/io"
msgq "github.com/qydysky/part/msgq"
+ pslice "github.com/qydysky/part/slice"
)
type Client struct {
type WsMsg struct {
Type int
- Msg []byte
+ Msg func(func([]byte) error) error
}
type Ping struct {
}()
buf := make([]byte, humanize.KByte)
- var (
- msgs = make([][]byte, o.BufSize)
- cuP = 0
- )
- for {
- if err := c.SetReadDeadline(time.Now().Add(time.Duration(o.RTOMs * int(time.Millisecond)))); err != nil {
- o.error(err)
- return
- }
- msg_type, r, err := c.NextReader()
- if err == nil {
- if msg, e := pio.ReadAll(r, buf); e != nil {
- err = e
- } else {
- cuP++
- if cuP >= o.BufSize {
- cuP = 0
- }
- msgs[cuP] = append(msgs[cuP][:0], msg...)
+ var msgs = pslice.NewBlocks[byte](humanize.KByte, o.BufSize)
+ var err error
+ for err == nil {
+ if e := c.SetReadDeadline(time.Now().Add(time.Duration(o.RTOMs * int(time.Millisecond)))); e != nil {
+ err = e
+ } else if msg_type, r, e := c.NextReader(); e != nil {
+ err = e
+ } else if msg, e := pio.ReadAll(r, buf); e != nil {
+ err = e
+ } else if tmpbuf, putBack, e := msgs.Get(); e != nil {
+ err = e
+ } else {
+ tmpbuf = append(tmpbuf[:0], msg...)
+ switch msg_type {
+ case websocket.PingMessage:
+ o.msg.PushLock_tag(`send`, &WsMsg{
+ Type: websocket.PongMessage,
+ Msg: func(f func([]byte) error) error {
+ f(tmpbuf)
+ putBack()
+ return nil
+ },
+ })
+ case websocket.PongMessage:
+ o.pingT = time.Now().UnixMilli()
+ time.AfterFunc(time.Duration(o.Ping.Period*int(time.Millisecond)), func() {
+ o.msg.PushLock_tag(`send`, &WsMsg{
+ Type: websocket.PingMessage,
+ Msg: func(f func([]byte) error) error {
+ f(o.Ping.Msg)
+ return nil
+ },
+ })
+ })
+ o.Ping.had_pong = true
+ default:
+ o.msg.PushLock_tag(`rec`, &WsMsg{
+ Type: websocket.TextMessage,
+ Msg: func(f func([]byte) error) error {
+ f(tmpbuf)
+ putBack()
+ return nil
+ },
+ })
}
}
- if err != nil {
- if e, ok := err.(*websocket.CloseError); ok {
- switch e.Code {
- case websocket.CloseNormalClosure:
- o.Func_normal_close()
- case websocket.CloseAbnormalClosure:
- o.Func_abort_close()
- o.error(err)
- default:
- o.error(err)
- }
- } else {
+ if e, ok := err.(*websocket.CloseError); ok {
+ switch e.Code {
+ case websocket.CloseNormalClosure:
+ o.Func_normal_close()
+ case websocket.CloseAbnormalClosure:
+ o.Func_abort_close()
+ o.error(err)
+ default:
o.error(err)
}
- return
- }
-
- switch msg_type {
- case websocket.PingMessage:
- o.msg.PushLock_tag(`send`, &WsMsg{
- Type: websocket.PongMessage,
- Msg: msgs[cuP],
- })
- case websocket.PongMessage:
- o.pingT = time.Now().UnixMilli()
- time.AfterFunc(time.Duration(o.Ping.Period*int(time.Millisecond)), func() {
- o.msg.PushLock_tag(`send`, &WsMsg{
- Type: websocket.PingMessage,
- Msg: o.Ping.Msg,
- })
- })
- o.Ping.had_pong = true
- default:
- o.msg.PushLock_tag(`rec`, &WsMsg{
- Type: websocket.TextMessage,
- Msg: msgs[cuP],
- })
+ } else if err != nil {
+ o.error(err)
}
}
}()
o.error(err)
return true
}
- if err := c.WriteMessage(wm.Type, wm.Msg); err != nil {
+ if err := wm.Msg(func(b []byte) error {
+ return c.WriteMessage(wm.Type, b)
+ }); err != nil {
o.error(err)
return true
}
func (o *Client) Heartbeat() (err error) {
o.msg.PushLock_tag(`send`, &WsMsg{
Type: websocket.PingMessage,
- Msg: o.Ping.Msg,
+ Msg: func(f func([]byte) error) error {
+ f(o.Ping.Msg)
+ return nil
+ },
})
time.Sleep(time.Duration((o.RTOMs + 100) * int(time.Millisecond)))
return o.Error()
}
ws.Pull_tag_only(`rec`, func(wm *WsMsg) (disable bool) {
- if string(wm.Msg) != "test" {
- t.Fatal()
- }
+ wm.Msg(func(b []byte) error {
+ if string(b) != "test" {
+ t.Fatal()
+ }
+ return nil
+ })
return false
})
ws.PushLock_tag(`send`, &WsMsg{
- Msg: []byte("test"),
+ Msg: func(f func([]byte) error) error {
+ return f([]byte("test"))
+ },
})
go func() {
}
ws.Pull_tag_only(`rec`, func(wm *WsMsg) (disable bool) {
- if string(wm.Msg) != "test" {
- t.Fatal()
- }
+ wm.Msg(func(b []byte) error {
+ if string(b) != "test" {
+ t.Fatal()
+ }
+ return nil
+ })
return false
})
ws.PushLock_tag(`send`, &WsMsg{
- Msg: []byte("test"),
+ Msg: func(f func([]byte) error) error {
+ return f([]byte("test"))
+ },
})
go func() {