]> 127.0.0.1 Git - part/.git/commitdiff
1 v0.28.20240717161224
authorqydysky <qydysky@foxmail.com>
Wed, 17 Jul 2024 16:07:02 +0000 (00:07 +0800)
committerqydysky <qydysky@foxmail.com>
Wed, 17 Jul 2024 16:07:02 +0000 (00:07 +0800)
msgq/Msgq.go
websocket/Client.go
websocket/Client_test.go

index 53acdee6b6d91620ccb2be7fdebe78f2e127e357..d2f732af061f2574c6f82983b0119a330a1a4319 100644 (file)
@@ -247,10 +247,15 @@ func (m *Msgq) Pull_tag_chan(key string, size int, ctx context.Context) (cancel
                                close(c)
                                return true
                        default:
-                               for len(ch) != 0 {
-                                       <-ch
+                               empty := false
+                               for !empty {
+                                       select {
+                                       case <-c:
+                                       default:
+                                               c <- d.Data
+                                               empty = true
+                                       }
                                }
-                               c <- d.Data
                        }
                }
                return false
@@ -450,10 +455,15 @@ func (m *MsgType[T]) Pull_tag_chan(key string, size int, ctx context.Context) (c
                                        close(c)
                                        return true
                                default:
-                                       for len(ch) != 0 {
-                                               <-ch
+                                       empty := false
+                                       for !empty {
+                                               select {
+                                               case <-c:
+                                               default:
+                                                       c <- *data1.Data
+                                                       empty = true
+                                               }
                                        }
-                                       c <- *data1.Data
                                }
                        }
                }
index a305be8ec9a71235e4e65eeac7e372c8141d2c05..c2e9788bee26406819007d8b2711b653c5a04341 100644 (file)
@@ -18,7 +18,9 @@ type Client struct {
        // rec send close
        msg *msgq.MsgType[*WsMsg]
 
-       TO     int
+       TO     int // depercated: use RTOMs WTOMs instead
+       RTOMs  int
+       WTOMs  int
        Header map[string]string
        Proxy  string
 
@@ -48,7 +50,8 @@ type Ping struct {
 
 func New_client(config *Client) (*Client, error) {
        tmp := Client{
-               TO:                300 * 1000,
+               RTOMs:             300 * 1000,
+               WTOMs:             300 * 1000,
                Func_normal_close: func() {},
                Func_abort_close:  func() {},
                msg:               msgq.NewType[*WsMsg](),
@@ -58,7 +61,14 @@ func New_client(config *Client) (*Client, error) {
                return nil, errors.New(`url == ""`)
        }
        if v := config.TO; v != 0 {
-               tmp.TO = v
+               tmp.RTOMs = v
+               tmp.WTOMs = v
+       }
+       if v := config.RTOMs; v != 0 {
+               tmp.RTOMs = v
+       }
+       if v := config.WTOMs; v != 0 {
+               tmp.WTOMs = v
        }
        tmp.Msg_normal_close = config.Msg_normal_close
        tmp.Header = config.Header
@@ -105,7 +115,7 @@ func (o *Client) Handle() (*msgq.MsgType[*WsMsg], error) {
        c, response, err := dial.Dial(o.Url, tmp_Header)
        if err != nil {
                o.error(err)
-       } else if err := c.SetWriteDeadline(time.Now().Add(time.Duration(o.TO * int(time.Millisecond)))); err != nil {
+       } else if err := c.SetWriteDeadline(time.Now().Add(time.Duration(o.WTOMs * int(time.Millisecond)))); err != nil {
                o.error(err)
        }
        if err != nil {
@@ -137,6 +147,11 @@ func (o *Client) Handle() (*msgq.MsgType[*WsMsg], error) {
                }()
 
                for {
+                       if err := c.SetReadDeadline(time.Now().Add(time.Duration(o.RTOMs * int(time.Millisecond)))); err != nil {
+                               o.error(err)
+                               o.msg.ClearAll()
+                               return
+                       }
                        msg_type, message, err := c.ReadMessage()
                        if err != nil {
                                if e, ok := err.(*websocket.CloseError); ok {
@@ -181,7 +196,7 @@ func (o *Client) Handle() (*msgq.MsgType[*WsMsg], error) {
                if wm.Type == 0 {
                        wm.Type = websocket.TextMessage
                }
-               if err := c.SetWriteDeadline(time.Now().Add(time.Duration(o.TO * int(time.Millisecond)))); err != nil {
+               if err := c.SetWriteDeadline(time.Now().Add(time.Duration(o.WTOMs * int(time.Millisecond)))); err != nil {
                        o.error(err)
                        o.msg.ClearAll()
                        return true
@@ -192,8 +207,8 @@ func (o *Client) Handle() (*msgq.MsgType[*WsMsg], error) {
                        return true
                }
                if wm.Type == websocket.PingMessage {
-                       time.AfterFunc(time.Duration(o.TO*int(time.Millisecond)), func() {
-                               if time.Now().UnixMilli() > o.pingT+int64(o.TO) {
+                       time.AfterFunc(time.Duration(o.RTOMs*int(time.Millisecond)), func() {
+                               if time.Now().UnixMilli() > o.pingT+int64(o.RTOMs) {
                                        o.error(errors.New("PongFail"))
                                        o.Close()
                                }
@@ -219,7 +234,7 @@ func (o *Client) Heartbeat() (err error) {
                Type: websocket.PingMessage,
                Msg:  o.Ping.Msg,
        })
-       time.Sleep(time.Duration(o.TO * int(time.Millisecond)))
+       time.Sleep(time.Duration((o.RTOMs + 100) * int(time.Millisecond)))
        return o.Error()
 }
 
index d2fe5a50fa7f4d48909167e940ca9f81515fa60a..14704c4ab82663afbe605c24c0e295ae2b4db16c 100644 (file)
@@ -1,6 +1,7 @@
 package part
 
 import (
+       "context"
        "net/http"
        "testing"
        "time"
@@ -52,7 +53,7 @@ func Test_Client(t *testing.T) {
                Func_normal_close: func() {
                        t.Log("close")
                },
-               TO: 5,
+               TO: 5000,
        })
        if e != nil {
                t.Fatal(e)
@@ -73,5 +74,18 @@ func Test_Client(t *testing.T) {
                Msg: []byte("test"),
        })
 
+       go func() {
+               time.Sleep(time.Second)
+               t.Log("call close")
+               c.Close()
+               t.Log("call close done")
+       }()
+
+       {
+               cancel, c := ws.Pull_tag_chan(`exit`, 1, context.Background())
+               <-c
+               cancel()
+               t.Log("exit")
+       }
        time.Sleep(time.Second)
 }