From: qydysky Date: Wed, 17 Jul 2024 16:07:02 +0000 (+0800) Subject: 1 X-Git-Tag: v0.28.20240717161224 X-Git-Url: http://127.0.0.1:8081/?a=commitdiff_plain;h=a62721cb32f1e946baac335e64c76ddea01d08e0;p=part%2F.git 1 --- diff --git a/msgq/Msgq.go b/msgq/Msgq.go index 53acdee..d2f732a 100644 --- a/msgq/Msgq.go +++ b/msgq/Msgq.go @@ -247,10 +247,15 @@ func (m *Msgq) Pull_tag_chan(key string, size int, ctx context.Context) (cancel close(c) return true default: - for len(ch) != 0 { - <-ch + empty := false + for !empty { + select { + case <-c: + default: + c <- d.Data + empty = true + } } - c <- d.Data } } return false @@ -450,10 +455,15 @@ func (m *MsgType[T]) Pull_tag_chan(key string, size int, ctx context.Context) (c close(c) return true default: - for len(ch) != 0 { - <-ch + empty := false + for !empty { + select { + case <-c: + default: + c <- *data1.Data + empty = true + } } - c <- *data1.Data } } } diff --git a/websocket/Client.go b/websocket/Client.go index a305be8..c2e9788 100644 --- a/websocket/Client.go +++ b/websocket/Client.go @@ -18,7 +18,9 @@ type Client struct { // rec send close msg *msgq.MsgType[*WsMsg] - TO int + TO int // depercated: use RTOMs WTOMs instead + RTOMs int + WTOMs int Header map[string]string Proxy string @@ -48,7 +50,8 @@ type Ping struct { func New_client(config *Client) (*Client, error) { tmp := Client{ - TO: 300 * 1000, + RTOMs: 300 * 1000, + WTOMs: 300 * 1000, Func_normal_close: func() {}, Func_abort_close: func() {}, msg: msgq.NewType[*WsMsg](), @@ -58,7 +61,14 @@ func New_client(config *Client) (*Client, error) { return nil, errors.New(`url == ""`) } if v := config.TO; v != 0 { - tmp.TO = v + tmp.RTOMs = v + tmp.WTOMs = v + } + if v := config.RTOMs; v != 0 { + tmp.RTOMs = v + } + if v := config.WTOMs; v != 0 { + tmp.WTOMs = v } tmp.Msg_normal_close = config.Msg_normal_close tmp.Header = config.Header @@ -105,7 +115,7 @@ func (o *Client) Handle() (*msgq.MsgType[*WsMsg], error) { c, response, err := dial.Dial(o.Url, tmp_Header) if err != nil { o.error(err) - } else if err := c.SetWriteDeadline(time.Now().Add(time.Duration(o.TO * int(time.Millisecond)))); err != nil { + } else if err := c.SetWriteDeadline(time.Now().Add(time.Duration(o.WTOMs * int(time.Millisecond)))); err != nil { o.error(err) } if err != nil { @@ -137,6 +147,11 @@ func (o *Client) Handle() (*msgq.MsgType[*WsMsg], error) { }() for { + if err := c.SetReadDeadline(time.Now().Add(time.Duration(o.RTOMs * int(time.Millisecond)))); err != nil { + o.error(err) + o.msg.ClearAll() + return + } msg_type, message, err := c.ReadMessage() if err != nil { if e, ok := err.(*websocket.CloseError); ok { @@ -181,7 +196,7 @@ func (o *Client) Handle() (*msgq.MsgType[*WsMsg], error) { if wm.Type == 0 { wm.Type = websocket.TextMessage } - if err := c.SetWriteDeadline(time.Now().Add(time.Duration(o.TO * int(time.Millisecond)))); err != nil { + if err := c.SetWriteDeadline(time.Now().Add(time.Duration(o.WTOMs * int(time.Millisecond)))); err != nil { o.error(err) o.msg.ClearAll() return true @@ -192,8 +207,8 @@ func (o *Client) Handle() (*msgq.MsgType[*WsMsg], error) { return true } if wm.Type == websocket.PingMessage { - time.AfterFunc(time.Duration(o.TO*int(time.Millisecond)), func() { - if time.Now().UnixMilli() > o.pingT+int64(o.TO) { + time.AfterFunc(time.Duration(o.RTOMs*int(time.Millisecond)), func() { + if time.Now().UnixMilli() > o.pingT+int64(o.RTOMs) { o.error(errors.New("PongFail")) o.Close() } @@ -219,7 +234,7 @@ func (o *Client) Heartbeat() (err error) { Type: websocket.PingMessage, Msg: o.Ping.Msg, }) - time.Sleep(time.Duration(o.TO * int(time.Millisecond))) + time.Sleep(time.Duration((o.RTOMs + 100) * int(time.Millisecond))) return o.Error() } diff --git a/websocket/Client_test.go b/websocket/Client_test.go index d2fe5a5..14704c4 100644 --- a/websocket/Client_test.go +++ b/websocket/Client_test.go @@ -1,6 +1,7 @@ package part import ( + "context" "net/http" "testing" "time" @@ -52,7 +53,7 @@ func Test_Client(t *testing.T) { Func_normal_close: func() { t.Log("close") }, - TO: 5, + TO: 5000, }) if e != nil { t.Fatal(e) @@ -73,5 +74,18 @@ func Test_Client(t *testing.T) { Msg: []byte("test"), }) + go func() { + time.Sleep(time.Second) + t.Log("call close") + c.Close() + t.Log("call close done") + }() + + { + cancel, c := ws.Pull_tag_chan(`exit`, 1, context.Background()) + <-c + cancel() + t.Log("exit") + } time.Sleep(time.Second) }