]> 127.0.0.1 Git - part/.git/commitdiff
95
authorqydysky <qydysky@foxmail.com>
Tue, 22 Dec 2020 06:19:00 +0000 (14:19 +0800)
committerqydysky <qydysky@foxmail.com>
Tue, 22 Dec 2020 06:19:00 +0000 (14:19 +0800)
go.mod
go.sum
websocket/Client.go [new file with mode: 0644]

diff --git a/go.mod b/go.mod
index f55243062f8cf31e596812069fd33cb2b7e7d531..5e96ca1d4d1974dd2980b21d4ad8dce0395ab59e 100644 (file)
--- a/go.mod
+++ b/go.mod
@@ -6,6 +6,7 @@ require (
        github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d // indirect
        github.com/andybalholm/brotli v1.0.1
        github.com/go-ole/go-ole v1.2.4 // indirect
+       github.com/gorilla/websocket v1.4.2
        github.com/klauspost/compress v1.10.10
        github.com/klauspost/pgzip v1.2.5
        github.com/miekg/dns v1.1.31
diff --git a/go.sum b/go.sum
index 62755513299269798253c0715e30bcf5c88df516..3d760f3386db5a16fedf77a0c69e26d9f8028ef0 100644 (file)
--- a/go.sum
+++ b/go.sum
@@ -4,6 +4,8 @@ github.com/andybalholm/brotli v1.0.1 h1:KqhlKozYbRtJvsPrrEeXcO+N2l6NYT5A2QAFmSUL
 github.com/andybalholm/brotli v1.0.1/go.mod h1:loMXtMfwqflxFJPmdbJO0a3KNoPuLBgiu3qAvBg8x/Y=
 github.com/go-ole/go-ole v1.2.4 h1:nNBDSCOigTSiarFpYE9J/KtEA1IOW4CNeqT9TQDqCxI=
 github.com/go-ole/go-ole v1.2.4/go.mod h1:XCwSNxSkXRo4vlyPy93sltvi/qJq0jqQhjqQNIwKuxM=
+github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc=
+github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
 github.com/klauspost/compress v1.10.10 h1:a/y8CglcM7gLGYmlbP/stPE5sR3hbhFRUjCBfd/0B3I=
 github.com/klauspost/compress v1.10.10/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
 github.com/klauspost/pgzip v1.2.5 h1:qnWYvvKqedOF2ulHpMG72XQol4ILEJ8k2wwRl/Km8oE=
diff --git a/websocket/Client.go b/websocket/Client.go
new file mode 100644 (file)
index 0000000..62764cb
--- /dev/null
@@ -0,0 +1,192 @@
+package part
+
+import (
+       "time"
+       "errors"
+       "reflect"
+
+       "github.com/gorilla/websocket"
+
+       s "github.com/qydysky/part/signal"
+)
+
+type Client struct {
+       Url string
+       SendChan chan interface{}
+       RecvChan chan []byte
+
+       TO int
+       Header map[string][]string
+       
+       Ping Ping
+
+       Msg_normal_close string
+       Func_normal_close func()
+       Func_abort_close func()
+       
+       err error
+       signal *s.Signal
+}
+
+type ws_msg struct {
+       Type int
+       Msg []byte
+}
+
+type Ping struct {
+       Msg []byte
+       Period int
+       had_pong bool
+}
+
+func New_client(config Client) (o *Client) {
+       tmp := Client{
+               TO: 300 * 1000,
+               Func_normal_close:func(){},
+               Func_abort_close:func(){},
+               SendChan: make(chan interface{}, 1e4),
+               RecvChan: make(chan []byte, 1e4),
+       }
+       tmp.Url = config.Url
+       if v := config.TO;v != 0 {tmp.TO = v}
+       tmp.Header = config.Header
+       tmp.Msg_normal_close = config.Msg_normal_close
+       if v := config.Func_normal_close;v != nil {tmp.Func_normal_close = v}
+       if v := config.Func_abort_close;v != nil {tmp.Func_abort_close = v}
+       if config.Ping.Period != 0 {tmp.Ping = config.Ping}
+       return &tmp
+}
+
+func (i *Client) Handle() (o *Client) {
+       o = i
+
+       if o.signal.Islive() {return}
+       o.signal = s.Init()
+
+       if o.Url == "" {
+               o.signal.Done()
+               o.err = errors.New(`url == ""`)
+               return
+       }
+
+       go func(){
+               defer func(){
+                       close(o.RecvChan)
+                       o.signal.Done()
+               }()
+
+               c, _, err := websocket.DefaultDialer.Dial(o.Url, o.Header)
+               if err != nil {return}
+               defer c.Close()
+
+               done := make(chan struct{})
+
+               go func() {
+                       defer close(done)
+       
+                       for {
+                               c.SetReadDeadline(time.Now().Add(time.Millisecond*time.Duration(o.TO)))
+                               msg_type, message, err := c.ReadMessage()
+                               if err != nil {
+                                       if e, ok := err.(*websocket.CloseError); ok {
+                                               switch e.Code {
+                                               case websocket.CloseNormalClosure:o.Func_normal_close()
+                                               case websocket.CloseAbnormalClosure:o.Func_abort_close()
+                                               default:
+                                               }
+                                               o.err = e
+                                       }
+                                       return
+                               }
+                               switch msg_type {
+                               case websocket.PingMessage:
+                                       o.SendChan <- ws_msg{
+                                               Type:websocket.PongMessage,
+                                               Msg:message,
+                                       }
+                               case websocket.PongMessage:
+                                       o.Ping.had_pong = true
+                               default:o.RecvChan <- message
+                               }
+                       }
+               }()
+       
+               for {
+                       select {
+                       case <- done:
+                               return
+                       case t := <- o.SendChan:
+                               var err error
+                               switch reflect.ValueOf(t).Type().Name() {
+                               case `ws_msg`:
+                                       err = c.WriteMessage(t.(ws_msg).Type, t.(ws_msg).Msg)
+                               default:
+                                       err = c.WriteMessage(websocket.TextMessage, t.([]byte))
+                               }
+                               if err != nil {
+                                       o.err = err
+                                       return
+                               }
+                               c.SetWriteDeadline(time.Now().Add(time.Millisecond*time.Duration(o.TO)))
+                       case <- o.signal.Chan:
+                               err := c.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, o.Msg_normal_close))
+                               if err != nil {o.err = err}
+                               select {
+                               case <- done:
+                               case <- time.After(time.Second):
+                               }
+                               return
+                       }
+               }       
+       }()
+       return
+}
+
+func (o *Client) Heartbeat() (err error) {
+       if !o.signal.Islive() {return errors.New(`not alive`)}
+
+       var ticker_ping = time.NewTicker(time.Duration(o.TO)*time.Millisecond)
+       if o.Ping.Period > 0 {
+               if o.Ping.Period < o.TO {
+                       ticker_ping.Reset(time.Duration(o.Ping.Period)*time.Millisecond)
+                       o.Ping.had_pong = true
+               } else {
+                       err = errors.New(`Ping.Period < o.TO`)
+               }
+       } else {ticker_ping.Stop()}
+
+       go func(ticker_ping *time.Ticker){
+               defer ticker_ping.Stop()
+               for {
+                       select {
+                       case <-ticker_ping.C:
+                               if !o.Ping.had_pong {
+                                       o.err = errors.New(`Pong fail!`)
+                                       o.Close()
+                                       return
+                               }
+                               o.SendChan <- ws_msg{
+                                       Type:websocket.PingMessage,
+                                       Msg:o.Ping.Msg,
+                               }
+                               o.Ping.had_pong = false
+                       case <- o.signal.Chan:
+                               return
+                       }
+               }
+       }(ticker_ping)
+
+       return
+}
+
+func (o *Client) Close() {
+       o.signal.Done()
+}
+
+func (o *Client) Isclose() bool {
+       return !o.signal.Islive()
+}
+
+func (o *Client) Error() error {
+       return o.err
+}
\ No newline at end of file