From d0c0ca9fc246db480352f5529fe5cf64bc790f9d Mon Sep 17 00:00:00 2001 From: qydysky Date: Tue, 22 Dec 2020 14:19:00 +0800 Subject: [PATCH] 95 --- go.mod | 1 + go.sum | 2 + websocket/Client.go | 192 ++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 195 insertions(+) create mode 100644 websocket/Client.go diff --git a/go.mod b/go.mod index f552430..5e96ca1 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,7 @@ require ( github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d // indirect github.com/andybalholm/brotli v1.0.1 github.com/go-ole/go-ole v1.2.4 // indirect + github.com/gorilla/websocket v1.4.2 github.com/klauspost/compress v1.10.10 github.com/klauspost/pgzip v1.2.5 github.com/miekg/dns v1.1.31 diff --git a/go.sum b/go.sum index 6275551..3d760f3 100644 --- a/go.sum +++ b/go.sum @@ -4,6 +4,8 @@ github.com/andybalholm/brotli v1.0.1 h1:KqhlKozYbRtJvsPrrEeXcO+N2l6NYT5A2QAFmSUL github.com/andybalholm/brotli v1.0.1/go.mod h1:loMXtMfwqflxFJPmdbJO0a3KNoPuLBgiu3qAvBg8x/Y= github.com/go-ole/go-ole v1.2.4 h1:nNBDSCOigTSiarFpYE9J/KtEA1IOW4CNeqT9TQDqCxI= github.com/go-ole/go-ole v1.2.4/go.mod h1:XCwSNxSkXRo4vlyPy93sltvi/qJq0jqQhjqQNIwKuxM= +github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc= +github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/klauspost/compress v1.10.10 h1:a/y8CglcM7gLGYmlbP/stPE5sR3hbhFRUjCBfd/0B3I= github.com/klauspost/compress v1.10.10/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= github.com/klauspost/pgzip v1.2.5 h1:qnWYvvKqedOF2ulHpMG72XQol4ILEJ8k2wwRl/Km8oE= diff --git a/websocket/Client.go b/websocket/Client.go new file mode 100644 index 0000000..62764cb --- /dev/null +++ b/websocket/Client.go @@ -0,0 +1,192 @@ +package part + +import ( + "time" + "errors" + "reflect" + + "github.com/gorilla/websocket" + + s "github.com/qydysky/part/signal" +) + +type Client struct { + Url string + SendChan chan interface{} + RecvChan chan []byte + + TO int + Header map[string][]string + + Ping Ping + + Msg_normal_close string + Func_normal_close func() + Func_abort_close func() + + err error + signal *s.Signal +} + +type ws_msg struct { + Type int + Msg []byte +} + +type Ping struct { + Msg []byte + Period int + had_pong bool +} + +func New_client(config Client) (o *Client) { + tmp := Client{ + TO: 300 * 1000, + Func_normal_close:func(){}, + Func_abort_close:func(){}, + SendChan: make(chan interface{}, 1e4), + RecvChan: make(chan []byte, 1e4), + } + tmp.Url = config.Url + if v := config.TO;v != 0 {tmp.TO = v} + tmp.Header = config.Header + tmp.Msg_normal_close = config.Msg_normal_close + if v := config.Func_normal_close;v != nil {tmp.Func_normal_close = v} + if v := config.Func_abort_close;v != nil {tmp.Func_abort_close = v} + if config.Ping.Period != 0 {tmp.Ping = config.Ping} + return &tmp +} + +func (i *Client) Handle() (o *Client) { + o = i + + if o.signal.Islive() {return} + o.signal = s.Init() + + if o.Url == "" { + o.signal.Done() + o.err = errors.New(`url == ""`) + return + } + + go func(){ + defer func(){ + close(o.RecvChan) + o.signal.Done() + }() + + c, _, err := websocket.DefaultDialer.Dial(o.Url, o.Header) + if err != nil {return} + defer c.Close() + + done := make(chan struct{}) + + go func() { + defer close(done) + + for { + c.SetReadDeadline(time.Now().Add(time.Millisecond*time.Duration(o.TO))) + msg_type, message, err := c.ReadMessage() + if err != nil { + if e, ok := err.(*websocket.CloseError); ok { + switch e.Code { + case websocket.CloseNormalClosure:o.Func_normal_close() + case websocket.CloseAbnormalClosure:o.Func_abort_close() + default: + } + o.err = e + } + return + } + switch msg_type { + case websocket.PingMessage: + o.SendChan <- ws_msg{ + Type:websocket.PongMessage, + Msg:message, + } + case websocket.PongMessage: + o.Ping.had_pong = true + default:o.RecvChan <- message + } + } + }() + + for { + select { + case <- done: + return + case t := <- o.SendChan: + var err error + switch reflect.ValueOf(t).Type().Name() { + case `ws_msg`: + err = c.WriteMessage(t.(ws_msg).Type, t.(ws_msg).Msg) + default: + err = c.WriteMessage(websocket.TextMessage, t.([]byte)) + } + if err != nil { + o.err = err + return + } + c.SetWriteDeadline(time.Now().Add(time.Millisecond*time.Duration(o.TO))) + case <- o.signal.Chan: + err := c.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, o.Msg_normal_close)) + if err != nil {o.err = err} + select { + case <- done: + case <- time.After(time.Second): + } + return + } + } + }() + return +} + +func (o *Client) Heartbeat() (err error) { + if !o.signal.Islive() {return errors.New(`not alive`)} + + var ticker_ping = time.NewTicker(time.Duration(o.TO)*time.Millisecond) + if o.Ping.Period > 0 { + if o.Ping.Period < o.TO { + ticker_ping.Reset(time.Duration(o.Ping.Period)*time.Millisecond) + o.Ping.had_pong = true + } else { + err = errors.New(`Ping.Period < o.TO`) + } + } else {ticker_ping.Stop()} + + go func(ticker_ping *time.Ticker){ + defer ticker_ping.Stop() + for { + select { + case <-ticker_ping.C: + if !o.Ping.had_pong { + o.err = errors.New(`Pong fail!`) + o.Close() + return + } + o.SendChan <- ws_msg{ + Type:websocket.PingMessage, + Msg:o.Ping.Msg, + } + o.Ping.had_pong = false + case <- o.signal.Chan: + return + } + } + }(ticker_ping) + + return +} + +func (o *Client) Close() { + o.signal.Done() +} + +func (o *Client) Isclose() bool { + return !o.signal.Islive() +} + +func (o *Client) Error() error { + return o.err +} \ No newline at end of file -- 2.39.2