]> 127.0.0.1 Git - part/.git/commitdiff
Improve
authorqydysky <qydysky@foxmail.com>
Sun, 7 May 2023 18:06:05 +0000 (02:06 +0800)
committerqydysky <qydysky@foxmail.com>
Sun, 7 May 2023 18:06:05 +0000 (02:06 +0800)
websocket/Client.go
websocket/Client_test.go [new file with mode: 0644]

index f2ff8b3744b72b41ba353902d11f9b390a726e93..37f43a5d48cf834197a21be87779477fca333296 100644 (file)
@@ -5,34 +5,36 @@ import (
        "io"
        "net/http"
        "net/url"
-       "reflect"
+       "sync/atomic"
        "time"
 
        "github.com/gorilla/websocket"
 
-       s "github.com/qydysky/part/signal"
+       msgq "github.com/qydysky/part/msgq"
 )
 
 type Client struct {
-       Url      string
-       SendChan chan interface{}
-       RecvChan chan []byte
+       Url string
+       // rec send close
+       msg *msgq.MsgType[*WsMsg]
 
        TO     int
        Header map[string]string
        Proxy  string
 
-       Ping Ping
+       Ping  Ping
+       pingT int64
 
        Msg_normal_close  string
        Func_normal_close func()
        Func_abort_close  func()
 
-       err    error
-       signal *s.Signal
+       close atomic.Bool
+
+       err error
 }
 
-type ws_msg struct {
+type WsMsg struct {
        Type int
        Msg  []byte
 }
@@ -43,15 +45,17 @@ type Ping struct {
        had_pong bool
 }
 
-func New_client(config Client) (o *Client) {
+func New_client(config Client) (*Client, error) {
        tmp := Client{
                TO:                300 * 1000,
                Func_normal_close: func() {},
                Func_abort_close:  func() {},
-               SendChan:          make(chan interface{}, 1e4),
-               RecvChan:          make(chan []byte, 1e4),
+               msg:               msgq.NewType[*WsMsg](),
        }
        tmp.Url = config.Url
+       if tmp.Url == "" {
+               return nil, errors.New(`url == ""`)
+       }
        if v := config.TO; v != 0 {
                tmp.TO = v
        }
@@ -69,23 +73,20 @@ func New_client(config Client) (o *Client) {
        if config.Ping.Period != 0 {
                tmp.Ping = config.Ping
        }
-       return &tmp
+       return &tmp, nil
 }
 
-func (i *Client) Handle() (o *Client) {
-       o = i
-
-       if o.signal.Islive() {
-               return
-       }
-       o.signal = s.Init()
-
-       if o.Url == "" {
-               o.signal.Done()
-               o.err = errors.New(`url == ""`)
-               return
-       }
-
+// 处理
+//
+//     msg.PushLock_tag(`send`, &WsMsg{
+//             Msg:  []byte("message"),
+//     })
+//
+//     msg.Pull_tag_only(`rec`, func(wm *WsMsg) (disable bool) {
+//             fmt.Println(string(wm.Msg))
+//             return false
+//     })
+func (o *Client) Handle() (*msgq.MsgType[*WsMsg], error) {
        tmp_Header := make(http.Header)
        for k, v := range o.Header {
                tmp_Header.Set(k, v)
@@ -101,7 +102,6 @@ func (i *Client) Handle() (o *Client) {
        c, response, err := dial.Dial(o.Url, tmp_Header)
 
        if err != nil {
-               o.signal.Done()
                e := err.Error()
                if response != nil {
                        if response.Status != "" {
@@ -110,159 +110,109 @@ func (i *Client) Handle() (o *Client) {
                        if response.Body != nil {
                                body, err := io.ReadAll(response.Body)
                                if err != nil {
-                                       o.err = err
-                                       return
+                                       return nil, err
                                }
                                response.Body.Close()
                                e += ` ` + string(body)
                        }
                }
-               o.err = errors.New(e)
-               return
+               return nil, errors.New(e)
        }
 
+       // rec
        go func() {
-               defer func() {
-                       o.signal.Done()
-                       close(o.RecvChan)
-                       c.Close()
-               }()
-
-               done := s.Init()
-               defer done.Done()
-
-               go func() {
-                       defer done.Done()
-
-                       for {
-                               c.SetReadDeadline(time.Now().Add(time.Millisecond * time.Duration(o.TO)))
-                               msg_type, message, err := c.ReadMessage()
-                               if err != nil {
-                                       if e, ok := err.(*websocket.CloseError); ok {
-                                               switch e.Code {
-                                               case websocket.CloseNormalClosure:
-                                                       o.Func_normal_close()
-                                               case websocket.CloseAbnormalClosure:
-                                                       o.Func_abort_close()
-                                               default:
-                                               }
-                                               o.err = e
-                                       }
-                                       return
-                               }
-                               if !done.Islive() {
-                                       return
-                               }
-                               switch msg_type {
-                               case websocket.PingMessage:
-                                       o.SendChan <- ws_msg{
-                                               Type: websocket.PongMessage,
-                                               Msg:  message,
+               defer o.msg.ClearAll()
+               defer o.close.Store(true)
+               for {
+                       c.SetReadDeadline(time.Now().Add(time.Duration(o.TO * int(time.Millisecond))))
+                       msg_type, message, err := c.ReadMessage()
+                       if err != nil {
+                               if e, ok := err.(*websocket.CloseError); ok {
+                                       switch e.Code {
+                                       case websocket.CloseNormalClosure:
+                                               o.Func_normal_close()
+                                       case websocket.CloseAbnormalClosure:
+                                               o.Func_abort_close()
+                                       default:
                                        }
-                               case websocket.PongMessage:
-                                       o.Ping.had_pong = true
-                               default:
-                                       o.RecvChan <- message
+                                       o.err = errors.Join(o.err, err)
                                }
-                       }
-               }()
-
-               donec, donecF := done.WaitC()
-               defer donecF()
-               osignal, osignalF := o.signal.WaitC()
-               defer osignalF()
-
-               for {
-                       select {
-                       case <-donec:
                                return
-                       case t := <-o.SendChan:
-                               if !done.Islive() {
-                                       return
-                               }
-
-                               var err error
-                               switch reflect.ValueOf(t).Type().Name() {
-                               case `ws_msg`:
-                                       err = c.WriteMessage(t.(ws_msg).Type, t.(ws_msg).Msg)
-                               default:
-                                       err = c.WriteMessage(websocket.TextMessage, t.([]byte))
-                               }
-                               if err != nil {
-                                       o.err = err
-                                       return
-                               }
-                               c.SetWriteDeadline(time.Now().Add(time.Millisecond * time.Duration(o.TO)))
-                       case <-osignal:
-                               if !done.Islive() {
-                                       return
-                               }
+                       }
 
-                               err := c.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, o.Msg_normal_close))
-                               if err != nil {
-                                       o.err = err
-                               }
-                               select {
-                               case <-donec:
-                               case <-time.After(time.Second):
-                               }
-                               return
+                       switch msg_type {
+                       case websocket.PingMessage:
+                               o.msg.PushLock_tag(`send`, &WsMsg{
+                                       Type: websocket.PongMessage,
+                                       Msg:  message,
+                               })
+                       case websocket.PongMessage:
+                               o.pingT = time.Now().UnixMilli()
+                               time.AfterFunc(time.Duration(o.Ping.Period*int(time.Millisecond)), func() {
+                                       o.msg.PushLock_tag(`send`, &WsMsg{
+                                               Type: websocket.PingMessage,
+                                               Msg:  o.Ping.Msg,
+                                       })
+                               })
+                               o.Ping.had_pong = true
+                       default:
+                               o.msg.PushLock_tag(`rec`, &WsMsg{
+                                       Type: websocket.TextMessage,
+                                       Msg:  message,
+                               })
                        }
                }
        }()
-       return
-}
-
-func (o *Client) Heartbeat() (err error) {
-       if !o.signal.Islive() {
-               return errors.New(`not alive`)
-       }
 
-       var ticker_ping = time.NewTicker(time.Duration(o.TO) * time.Millisecond)
-       if o.Ping.Period > 0 {
-               if o.Ping.Period < o.TO {
-                       ticker_ping.Reset(time.Duration(o.Ping.Period) * time.Millisecond)
-                       o.Ping.had_pong = true
-               } else {
-                       err = errors.New(`Ping.Period < o.TO`)
+       // send
+       o.msg.Pull_tag_only(`send`, func(wm *WsMsg) (disable bool) {
+               if wm.Type == 0 {
+                       wm.Type = websocket.TextMessage
                }
-       } else {
-               ticker_ping.Stop()
-       }
-
-       go func(ticker_ping *time.Ticker) {
-               defer ticker_ping.Stop()
-
-               osignal, osignalF := o.signal.WaitC()
-               defer osignalF()
-               for {
-                       select {
-                       case <-ticker_ping.C:
-                               if !o.Ping.had_pong {
-                                       o.err = errors.New("PongFail")
+               if err := c.WriteMessage(wm.Type, wm.Msg); err != nil {
+                       o.err = errors.Join(o.err, err)
+                       o.msg.ClearAll()
+                       return true
+               }
+               if wm.Type == websocket.PingMessage {
+                       time.AfterFunc(time.Duration(o.TO*int(time.Millisecond)), func() {
+                               if time.Now().UnixMilli() > o.pingT+int64(o.TO) {
+                                       o.err = errors.Join(o.err, errors.New("PongFail"))
                                        o.Close()
-                                       return
-                               }
-                               o.SendChan <- ws_msg{
-                                       Type: websocket.PingMessage,
-                                       Msg:  o.Ping.Msg,
                                }
-                               o.Ping.had_pong = false
-                       case <-osignal:
-                               return
-                       }
+                       })
+               }
+               o.err = errors.Join(o.err, c.SetWriteDeadline(time.Now().Add(time.Duration(o.TO*int(time.Millisecond)))))
+               return false
+       })
+
+       // close
+       o.msg.Pull_tag_only(`close`, func(_ *WsMsg) (disable bool) {
+               if err := c.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, o.Msg_normal_close)); err != nil {
+                       o.err = errors.Join(o.err, err)
+                       o.msg.ClearAll()
                }
-       }(ticker_ping)
+               return true
+       })
 
-       return
+       return o.msg, nil
+}
+
+func (o *Client) Heartbeat() (err error) {
+       o.msg.PushLock_tag(`send`, &WsMsg{
+               Type: websocket.PingMessage,
+               Msg:  o.Ping.Msg,
+       })
+       time.Sleep(time.Duration(o.TO * int(time.Millisecond)))
+       return o.Error()
 }
 
 func (o *Client) Close() {
-       o.signal.Done()
+       o.msg.PushLock_tag(`close`, nil)
 }
 
 func (o *Client) Isclose() bool {
-       return !o.signal.Islive()
+       return o.close.Load()
 }
 
 func (o *Client) Error() error {
diff --git a/websocket/Client_test.go b/websocket/Client_test.go
new file mode 100644 (file)
index 0000000..c7376b1
--- /dev/null
@@ -0,0 +1,77 @@
+package part
+
+import (
+       "net/http"
+       "testing"
+       "time"
+
+       web "github.com/qydysky/part/web"
+)
+
+func Test_Client(t *testing.T) {
+       s := New_server()
+       {
+               ws_mq := s.Interface()
+
+               ws_mq.Pull_tag(map[string]func(interface{}) bool{
+                       `error`: func(data interface{}) bool {
+                               return true
+                       },
+                       `recv`: func(data interface{}) bool {
+                               if tmp, ok := data.(Uinterface); ok {
+                                       t.Log(tmp.Id, `=>`, string(tmp.Data))
+                                       t.Log(string(tmp.Data), `=>`, tmp.Id)
+                                       ws_mq.Push_tag(`send`, Uinterface{ //just reply
+                                               Id:   tmp.Id,
+                                               Data: tmp.Data,
+                                       })
+                               }
+                               return false
+                       },
+               })
+       }
+
+       w := web.New(&http.Server{
+               Addr:         "127.0.0.1:10888",
+               WriteTimeout: time.Second * time.Duration(10),
+       })
+       w.Handle(map[string]func(http.ResponseWriter, *http.Request){
+               `/ws`: func(w http.ResponseWriter, r *http.Request) {
+                       conn := s.WS(w, r)
+                       id := <-conn
+                       t.Log(`user connect!`, id)
+                       <-conn
+                       t.Log(`user disconnect!`, id)
+               },
+       })
+
+       time.Sleep(time.Second)
+
+       c, e := New_client(Client{
+               Url: "ws://127.0.0.1:10888/ws",
+               Func_normal_close: func() {
+                       t.Log("close")
+               },
+               TO: 5,
+       })
+       if e != nil {
+               t.Fatal(e)
+       }
+
+       ws, e := c.Handle()
+       if e != nil {
+               t.Fatal(e)
+       }
+
+       ws.Pull_tag_only(`rec`, func(wm *WsMsg) (disable bool) {
+               if string(wm.Msg) != "test" {
+                       t.Fatal()
+               }
+               return false
+       })
+       ws.PushLock_tag(`send`, &WsMsg{
+               Msg: []byte("test"),
+       })
+
+       time.Sleep(time.Second)
+}