From 3fe4668ad26c9714c8775bad7f3d735cda9c7178 Mon Sep 17 00:00:00 2001 From: qydysky Date: Mon, 8 May 2023 02:06:05 +0800 Subject: [PATCH] Improve --- websocket/Client.go | 258 ++++++++++++++++----------------------- websocket/Client_test.go | 77 ++++++++++++ 2 files changed, 181 insertions(+), 154 deletions(-) create mode 100644 websocket/Client_test.go diff --git a/websocket/Client.go b/websocket/Client.go index f2ff8b3..37f43a5 100644 --- a/websocket/Client.go +++ b/websocket/Client.go @@ -5,34 +5,36 @@ import ( "io" "net/http" "net/url" - "reflect" + "sync/atomic" "time" "github.com/gorilla/websocket" - s "github.com/qydysky/part/signal" + msgq "github.com/qydysky/part/msgq" ) type Client struct { - Url string - SendChan chan interface{} - RecvChan chan []byte + Url string + // rec send close + msg *msgq.MsgType[*WsMsg] TO int Header map[string]string Proxy string - Ping Ping + Ping Ping + pingT int64 Msg_normal_close string Func_normal_close func() Func_abort_close func() - err error - signal *s.Signal + close atomic.Bool + + err error } -type ws_msg struct { +type WsMsg struct { Type int Msg []byte } @@ -43,15 +45,17 @@ type Ping struct { had_pong bool } -func New_client(config Client) (o *Client) { +func New_client(config Client) (*Client, error) { tmp := Client{ TO: 300 * 1000, Func_normal_close: func() {}, Func_abort_close: func() {}, - SendChan: make(chan interface{}, 1e4), - RecvChan: make(chan []byte, 1e4), + msg: msgq.NewType[*WsMsg](), } tmp.Url = config.Url + if tmp.Url == "" { + return nil, errors.New(`url == ""`) + } if v := config.TO; v != 0 { tmp.TO = v } @@ -69,23 +73,20 @@ func New_client(config Client) (o *Client) { if config.Ping.Period != 0 { tmp.Ping = config.Ping } - return &tmp + return &tmp, nil } -func (i *Client) Handle() (o *Client) { - o = i - - if o.signal.Islive() { - return - } - o.signal = s.Init() - - if o.Url == "" { - o.signal.Done() - o.err = errors.New(`url == ""`) - return - } - +// 处理 +// +// msg.PushLock_tag(`send`, &WsMsg{ +// Msg: []byte("message"), +// }) +// +// msg.Pull_tag_only(`rec`, func(wm *WsMsg) (disable bool) { +// fmt.Println(string(wm.Msg)) +// return false +// }) +func (o *Client) Handle() (*msgq.MsgType[*WsMsg], error) { tmp_Header := make(http.Header) for k, v := range o.Header { tmp_Header.Set(k, v) @@ -101,7 +102,6 @@ func (i *Client) Handle() (o *Client) { c, response, err := dial.Dial(o.Url, tmp_Header) if err != nil { - o.signal.Done() e := err.Error() if response != nil { if response.Status != "" { @@ -110,159 +110,109 @@ func (i *Client) Handle() (o *Client) { if response.Body != nil { body, err := io.ReadAll(response.Body) if err != nil { - o.err = err - return + return nil, err } response.Body.Close() e += ` ` + string(body) } } - o.err = errors.New(e) - return + return nil, errors.New(e) } + // rec go func() { - defer func() { - o.signal.Done() - close(o.RecvChan) - c.Close() - }() - - done := s.Init() - defer done.Done() - - go func() { - defer done.Done() - - for { - c.SetReadDeadline(time.Now().Add(time.Millisecond * time.Duration(o.TO))) - msg_type, message, err := c.ReadMessage() - if err != nil { - if e, ok := err.(*websocket.CloseError); ok { - switch e.Code { - case websocket.CloseNormalClosure: - o.Func_normal_close() - case websocket.CloseAbnormalClosure: - o.Func_abort_close() - default: - } - o.err = e - } - return - } - if !done.Islive() { - return - } - switch msg_type { - case websocket.PingMessage: - o.SendChan <- ws_msg{ - Type: websocket.PongMessage, - Msg: message, + defer o.msg.ClearAll() + defer o.close.Store(true) + for { + c.SetReadDeadline(time.Now().Add(time.Duration(o.TO * int(time.Millisecond)))) + msg_type, message, err := c.ReadMessage() + if err != nil { + if e, ok := err.(*websocket.CloseError); ok { + switch e.Code { + case websocket.CloseNormalClosure: + o.Func_normal_close() + case websocket.CloseAbnormalClosure: + o.Func_abort_close() + default: } - case websocket.PongMessage: - o.Ping.had_pong = true - default: - o.RecvChan <- message + o.err = errors.Join(o.err, err) } - } - }() - - donec, donecF := done.WaitC() - defer donecF() - osignal, osignalF := o.signal.WaitC() - defer osignalF() - - for { - select { - case <-donec: return - case t := <-o.SendChan: - if !done.Islive() { - return - } - - var err error - switch reflect.ValueOf(t).Type().Name() { - case `ws_msg`: - err = c.WriteMessage(t.(ws_msg).Type, t.(ws_msg).Msg) - default: - err = c.WriteMessage(websocket.TextMessage, t.([]byte)) - } - if err != nil { - o.err = err - return - } - c.SetWriteDeadline(time.Now().Add(time.Millisecond * time.Duration(o.TO))) - case <-osignal: - if !done.Islive() { - return - } + } - err := c.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, o.Msg_normal_close)) - if err != nil { - o.err = err - } - select { - case <-donec: - case <-time.After(time.Second): - } - return + switch msg_type { + case websocket.PingMessage: + o.msg.PushLock_tag(`send`, &WsMsg{ + Type: websocket.PongMessage, + Msg: message, + }) + case websocket.PongMessage: + o.pingT = time.Now().UnixMilli() + time.AfterFunc(time.Duration(o.Ping.Period*int(time.Millisecond)), func() { + o.msg.PushLock_tag(`send`, &WsMsg{ + Type: websocket.PingMessage, + Msg: o.Ping.Msg, + }) + }) + o.Ping.had_pong = true + default: + o.msg.PushLock_tag(`rec`, &WsMsg{ + Type: websocket.TextMessage, + Msg: message, + }) } } }() - return -} - -func (o *Client) Heartbeat() (err error) { - if !o.signal.Islive() { - return errors.New(`not alive`) - } - var ticker_ping = time.NewTicker(time.Duration(o.TO) * time.Millisecond) - if o.Ping.Period > 0 { - if o.Ping.Period < o.TO { - ticker_ping.Reset(time.Duration(o.Ping.Period) * time.Millisecond) - o.Ping.had_pong = true - } else { - err = errors.New(`Ping.Period < o.TO`) + // send + o.msg.Pull_tag_only(`send`, func(wm *WsMsg) (disable bool) { + if wm.Type == 0 { + wm.Type = websocket.TextMessage } - } else { - ticker_ping.Stop() - } - - go func(ticker_ping *time.Ticker) { - defer ticker_ping.Stop() - - osignal, osignalF := o.signal.WaitC() - defer osignalF() - for { - select { - case <-ticker_ping.C: - if !o.Ping.had_pong { - o.err = errors.New("PongFail") + if err := c.WriteMessage(wm.Type, wm.Msg); err != nil { + o.err = errors.Join(o.err, err) + o.msg.ClearAll() + return true + } + if wm.Type == websocket.PingMessage { + time.AfterFunc(time.Duration(o.TO*int(time.Millisecond)), func() { + if time.Now().UnixMilli() > o.pingT+int64(o.TO) { + o.err = errors.Join(o.err, errors.New("PongFail")) o.Close() - return - } - o.SendChan <- ws_msg{ - Type: websocket.PingMessage, - Msg: o.Ping.Msg, } - o.Ping.had_pong = false - case <-osignal: - return - } + }) + } + o.err = errors.Join(o.err, c.SetWriteDeadline(time.Now().Add(time.Duration(o.TO*int(time.Millisecond))))) + return false + }) + + // close + o.msg.Pull_tag_only(`close`, func(_ *WsMsg) (disable bool) { + if err := c.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, o.Msg_normal_close)); err != nil { + o.err = errors.Join(o.err, err) + o.msg.ClearAll() } - }(ticker_ping) + return true + }) - return + return o.msg, nil +} + +func (o *Client) Heartbeat() (err error) { + o.msg.PushLock_tag(`send`, &WsMsg{ + Type: websocket.PingMessage, + Msg: o.Ping.Msg, + }) + time.Sleep(time.Duration(o.TO * int(time.Millisecond))) + return o.Error() } func (o *Client) Close() { - o.signal.Done() + o.msg.PushLock_tag(`close`, nil) } func (o *Client) Isclose() bool { - return !o.signal.Islive() + return o.close.Load() } func (o *Client) Error() error { diff --git a/websocket/Client_test.go b/websocket/Client_test.go new file mode 100644 index 0000000..c7376b1 --- /dev/null +++ b/websocket/Client_test.go @@ -0,0 +1,77 @@ +package part + +import ( + "net/http" + "testing" + "time" + + web "github.com/qydysky/part/web" +) + +func Test_Client(t *testing.T) { + s := New_server() + { + ws_mq := s.Interface() + + ws_mq.Pull_tag(map[string]func(interface{}) bool{ + `error`: func(data interface{}) bool { + return true + }, + `recv`: func(data interface{}) bool { + if tmp, ok := data.(Uinterface); ok { + t.Log(tmp.Id, `=>`, string(tmp.Data)) + t.Log(string(tmp.Data), `=>`, tmp.Id) + ws_mq.Push_tag(`send`, Uinterface{ //just reply + Id: tmp.Id, + Data: tmp.Data, + }) + } + return false + }, + }) + } + + w := web.New(&http.Server{ + Addr: "127.0.0.1:10888", + WriteTimeout: time.Second * time.Duration(10), + }) + w.Handle(map[string]func(http.ResponseWriter, *http.Request){ + `/ws`: func(w http.ResponseWriter, r *http.Request) { + conn := s.WS(w, r) + id := <-conn + t.Log(`user connect!`, id) + <-conn + t.Log(`user disconnect!`, id) + }, + }) + + time.Sleep(time.Second) + + c, e := New_client(Client{ + Url: "ws://127.0.0.1:10888/ws", + Func_normal_close: func() { + t.Log("close") + }, + TO: 5, + }) + if e != nil { + t.Fatal(e) + } + + ws, e := c.Handle() + if e != nil { + t.Fatal(e) + } + + ws.Pull_tag_only(`rec`, func(wm *WsMsg) (disable bool) { + if string(wm.Msg) != "test" { + t.Fatal() + } + return false + }) + ws.PushLock_tag(`send`, &WsMsg{ + Msg: []byte("test"), + }) + + time.Sleep(time.Second) +} -- 2.39.2