"io"
"net/http"
"net/url"
- "reflect"
+ "sync/atomic"
"time"
"github.com/gorilla/websocket"
- s "github.com/qydysky/part/signal"
+ msgq "github.com/qydysky/part/msgq"
)
type Client struct {
- Url string
- SendChan chan interface{}
- RecvChan chan []byte
+ Url string
+ // rec send close
+ msg *msgq.MsgType[*WsMsg]
TO int
Header map[string]string
Proxy string
- Ping Ping
+ Ping Ping
+ pingT int64
Msg_normal_close string
Func_normal_close func()
Func_abort_close func()
- err error
- signal *s.Signal
+ close atomic.Bool
+
+ err error
}
-type ws_msg struct {
+type WsMsg struct {
Type int
Msg []byte
}
had_pong bool
}
-func New_client(config Client) (o *Client) {
+func New_client(config Client) (*Client, error) {
tmp := Client{
TO: 300 * 1000,
Func_normal_close: func() {},
Func_abort_close: func() {},
- SendChan: make(chan interface{}, 1e4),
- RecvChan: make(chan []byte, 1e4),
+ msg: msgq.NewType[*WsMsg](),
}
tmp.Url = config.Url
+ if tmp.Url == "" {
+ return nil, errors.New(`url == ""`)
+ }
if v := config.TO; v != 0 {
tmp.TO = v
}
if config.Ping.Period != 0 {
tmp.Ping = config.Ping
}
- return &tmp
+ return &tmp, nil
}
-func (i *Client) Handle() (o *Client) {
- o = i
-
- if o.signal.Islive() {
- return
- }
- o.signal = s.Init()
-
- if o.Url == "" {
- o.signal.Done()
- o.err = errors.New(`url == ""`)
- return
- }
-
+// 处理
+//
+// msg.PushLock_tag(`send`, &WsMsg{
+// Msg: []byte("message"),
+// })
+//
+// msg.Pull_tag_only(`rec`, func(wm *WsMsg) (disable bool) {
+// fmt.Println(string(wm.Msg))
+// return false
+// })
+func (o *Client) Handle() (*msgq.MsgType[*WsMsg], error) {
tmp_Header := make(http.Header)
for k, v := range o.Header {
tmp_Header.Set(k, v)
c, response, err := dial.Dial(o.Url, tmp_Header)
if err != nil {
- o.signal.Done()
e := err.Error()
if response != nil {
if response.Status != "" {
if response.Body != nil {
body, err := io.ReadAll(response.Body)
if err != nil {
- o.err = err
- return
+ return nil, err
}
response.Body.Close()
e += ` ` + string(body)
}
}
- o.err = errors.New(e)
- return
+ return nil, errors.New(e)
}
+ // rec
go func() {
- defer func() {
- o.signal.Done()
- close(o.RecvChan)
- c.Close()
- }()
-
- done := s.Init()
- defer done.Done()
-
- go func() {
- defer done.Done()
-
- for {
- c.SetReadDeadline(time.Now().Add(time.Millisecond * time.Duration(o.TO)))
- msg_type, message, err := c.ReadMessage()
- if err != nil {
- if e, ok := err.(*websocket.CloseError); ok {
- switch e.Code {
- case websocket.CloseNormalClosure:
- o.Func_normal_close()
- case websocket.CloseAbnormalClosure:
- o.Func_abort_close()
- default:
- }
- o.err = e
- }
- return
- }
- if !done.Islive() {
- return
- }
- switch msg_type {
- case websocket.PingMessage:
- o.SendChan <- ws_msg{
- Type: websocket.PongMessage,
- Msg: message,
+ defer o.msg.ClearAll()
+ defer o.close.Store(true)
+ for {
+ c.SetReadDeadline(time.Now().Add(time.Duration(o.TO * int(time.Millisecond))))
+ msg_type, message, err := c.ReadMessage()
+ if err != nil {
+ if e, ok := err.(*websocket.CloseError); ok {
+ switch e.Code {
+ case websocket.CloseNormalClosure:
+ o.Func_normal_close()
+ case websocket.CloseAbnormalClosure:
+ o.Func_abort_close()
+ default:
}
- case websocket.PongMessage:
- o.Ping.had_pong = true
- default:
- o.RecvChan <- message
+ o.err = errors.Join(o.err, err)
}
- }
- }()
-
- donec, donecF := done.WaitC()
- defer donecF()
- osignal, osignalF := o.signal.WaitC()
- defer osignalF()
-
- for {
- select {
- case <-donec:
return
- case t := <-o.SendChan:
- if !done.Islive() {
- return
- }
-
- var err error
- switch reflect.ValueOf(t).Type().Name() {
- case `ws_msg`:
- err = c.WriteMessage(t.(ws_msg).Type, t.(ws_msg).Msg)
- default:
- err = c.WriteMessage(websocket.TextMessage, t.([]byte))
- }
- if err != nil {
- o.err = err
- return
- }
- c.SetWriteDeadline(time.Now().Add(time.Millisecond * time.Duration(o.TO)))
- case <-osignal:
- if !done.Islive() {
- return
- }
+ }
- err := c.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, o.Msg_normal_close))
- if err != nil {
- o.err = err
- }
- select {
- case <-donec:
- case <-time.After(time.Second):
- }
- return
+ switch msg_type {
+ case websocket.PingMessage:
+ o.msg.PushLock_tag(`send`, &WsMsg{
+ Type: websocket.PongMessage,
+ Msg: message,
+ })
+ case websocket.PongMessage:
+ o.pingT = time.Now().UnixMilli()
+ time.AfterFunc(time.Duration(o.Ping.Period*int(time.Millisecond)), func() {
+ o.msg.PushLock_tag(`send`, &WsMsg{
+ Type: websocket.PingMessage,
+ Msg: o.Ping.Msg,
+ })
+ })
+ o.Ping.had_pong = true
+ default:
+ o.msg.PushLock_tag(`rec`, &WsMsg{
+ Type: websocket.TextMessage,
+ Msg: message,
+ })
}
}
}()
- return
-}
-
-func (o *Client) Heartbeat() (err error) {
- if !o.signal.Islive() {
- return errors.New(`not alive`)
- }
- var ticker_ping = time.NewTicker(time.Duration(o.TO) * time.Millisecond)
- if o.Ping.Period > 0 {
- if o.Ping.Period < o.TO {
- ticker_ping.Reset(time.Duration(o.Ping.Period) * time.Millisecond)
- o.Ping.had_pong = true
- } else {
- err = errors.New(`Ping.Period < o.TO`)
+ // send
+ o.msg.Pull_tag_only(`send`, func(wm *WsMsg) (disable bool) {
+ if wm.Type == 0 {
+ wm.Type = websocket.TextMessage
}
- } else {
- ticker_ping.Stop()
- }
-
- go func(ticker_ping *time.Ticker) {
- defer ticker_ping.Stop()
-
- osignal, osignalF := o.signal.WaitC()
- defer osignalF()
- for {
- select {
- case <-ticker_ping.C:
- if !o.Ping.had_pong {
- o.err = errors.New("PongFail")
+ if err := c.WriteMessage(wm.Type, wm.Msg); err != nil {
+ o.err = errors.Join(o.err, err)
+ o.msg.ClearAll()
+ return true
+ }
+ if wm.Type == websocket.PingMessage {
+ time.AfterFunc(time.Duration(o.TO*int(time.Millisecond)), func() {
+ if time.Now().UnixMilli() > o.pingT+int64(o.TO) {
+ o.err = errors.Join(o.err, errors.New("PongFail"))
o.Close()
- return
- }
- o.SendChan <- ws_msg{
- Type: websocket.PingMessage,
- Msg: o.Ping.Msg,
}
- o.Ping.had_pong = false
- case <-osignal:
- return
- }
+ })
+ }
+ o.err = errors.Join(o.err, c.SetWriteDeadline(time.Now().Add(time.Duration(o.TO*int(time.Millisecond)))))
+ return false
+ })
+
+ // close
+ o.msg.Pull_tag_only(`close`, func(_ *WsMsg) (disable bool) {
+ if err := c.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, o.Msg_normal_close)); err != nil {
+ o.err = errors.Join(o.err, err)
+ o.msg.ClearAll()
}
- }(ticker_ping)
+ return true
+ })
- return
+ return o.msg, nil
+}
+
+func (o *Client) Heartbeat() (err error) {
+ o.msg.PushLock_tag(`send`, &WsMsg{
+ Type: websocket.PingMessage,
+ Msg: o.Ping.Msg,
+ })
+ time.Sleep(time.Duration(o.TO * int(time.Millisecond)))
+ return o.Error()
}
func (o *Client) Close() {
- o.signal.Done()
+ o.msg.PushLock_tag(`close`, nil)
}
func (o *Client) Isclose() bool {
- return !o.signal.Islive()
+ return o.close.Load()
}
func (o *Client) Error() error {
--- /dev/null
+package part
+
+import (
+ "net/http"
+ "testing"
+ "time"
+
+ web "github.com/qydysky/part/web"
+)
+
+func Test_Client(t *testing.T) {
+ s := New_server()
+ {
+ ws_mq := s.Interface()
+
+ ws_mq.Pull_tag(map[string]func(interface{}) bool{
+ `error`: func(data interface{}) bool {
+ return true
+ },
+ `recv`: func(data interface{}) bool {
+ if tmp, ok := data.(Uinterface); ok {
+ t.Log(tmp.Id, `=>`, string(tmp.Data))
+ t.Log(string(tmp.Data), `=>`, tmp.Id)
+ ws_mq.Push_tag(`send`, Uinterface{ //just reply
+ Id: tmp.Id,
+ Data: tmp.Data,
+ })
+ }
+ return false
+ },
+ })
+ }
+
+ w := web.New(&http.Server{
+ Addr: "127.0.0.1:10888",
+ WriteTimeout: time.Second * time.Duration(10),
+ })
+ w.Handle(map[string]func(http.ResponseWriter, *http.Request){
+ `/ws`: func(w http.ResponseWriter, r *http.Request) {
+ conn := s.WS(w, r)
+ id := <-conn
+ t.Log(`user connect!`, id)
+ <-conn
+ t.Log(`user disconnect!`, id)
+ },
+ })
+
+ time.Sleep(time.Second)
+
+ c, e := New_client(Client{
+ Url: "ws://127.0.0.1:10888/ws",
+ Func_normal_close: func() {
+ t.Log("close")
+ },
+ TO: 5,
+ })
+ if e != nil {
+ t.Fatal(e)
+ }
+
+ ws, e := c.Handle()
+ if e != nil {
+ t.Fatal(e)
+ }
+
+ ws.Pull_tag_only(`rec`, func(wm *WsMsg) (disable bool) {
+ if string(wm.Msg) != "test" {
+ t.Fatal()
+ }
+ return false
+ })
+ ws.PushLock_tag(`send`, &WsMsg{
+ Msg: []byte("test"),
+ })
+
+ time.Sleep(time.Second)
+}