From 7fa620182f67740e93788475118b46ce7096beb4 Mon Sep 17 00:00:00 2001 From: qydysky <32743305+qydysky@users.noreply.github.com> Date: Thu, 16 Feb 2023 20:13:18 +0800 Subject: [PATCH] Fix --- websocket/Client.go | 144 +++++++++++++++++++++++---------------- websocket/Recoder.go | 3 +- websocket/Server_test.go | 2 +- 3 files changed, 89 insertions(+), 60 deletions(-) diff --git a/websocket/Client.go b/websocket/Client.go index f06344e..643bdaf 100644 --- a/websocket/Client.go +++ b/websocket/Client.go @@ -1,12 +1,12 @@ package part import ( - "io" - "time" "errors" + "io" "net/http" "net/url" "reflect" + "time" "github.com/gorilla/websocket" @@ -14,58 +14,70 @@ import ( ) type Client struct { - Url string + Url string SendChan chan interface{} RecvChan chan []byte - TO int + TO int Header map[string]string - Proxy string - + Proxy string + Ping Ping - Msg_normal_close string + Msg_normal_close string Func_normal_close func() - Func_abort_close func() - - err error + Func_abort_close func() + + err error signal *s.Signal } type ws_msg struct { Type int - Msg []byte + Msg []byte } type Ping struct { - Msg []byte - Period int + Msg []byte + Period int had_pong bool } func New_client(config Client) (o *Client) { tmp := Client{ - TO: 300 * 1000, - Func_normal_close:func(){}, - Func_abort_close:func(){}, - SendChan: make(chan interface{}, 1e4), - RecvChan: make(chan []byte, 1e4), + TO: 300 * 1000, + Func_normal_close: func() {}, + Func_abort_close: func() {}, + SendChan: make(chan interface{}, 1e4), + RecvChan: make(chan []byte, 1e4), } tmp.Url = config.Url - if v := config.TO;v != 0 {tmp.TO = v} + if v := config.TO; v != 0 { + tmp.TO = v + } tmp.Msg_normal_close = config.Msg_normal_close tmp.Header = config.Header - if v := config.Func_normal_close;v != nil {tmp.Func_normal_close = v} - if v := config.Func_abort_close;v != nil {tmp.Func_abort_close = v} - if v := config.Proxy;v != "" {tmp.Proxy = v} - if config.Ping.Period != 0 {tmp.Ping = config.Ping} + if v := config.Func_normal_close; v != nil { + tmp.Func_normal_close = v + } + if v := config.Func_abort_close; v != nil { + tmp.Func_abort_close = v + } + if v := config.Proxy; v != "" { + tmp.Proxy = v + } + if config.Ping.Period != 0 { + tmp.Ping = config.Ping + } return &tmp } func (i *Client) Handle() (o *Client) { o = i - if o.signal.Islive() {return} + if o.signal.Islive() { + return + } o.signal = s.Init() if o.Url == "" { @@ -75,7 +87,7 @@ func (i *Client) Handle() (o *Client) { } tmp_Header := make(http.Header) - for k,v := range o.Header { + for k, v := range o.Header { tmp_Header.Set(k, v) } @@ -93,7 +105,7 @@ func (i *Client) Handle() (o *Client) { e := err.Error() if response != nil { if response.Status != "" { - e += ` `+response.Status + e += ` ` + response.Status } if response.Body != nil { body, err := io.ReadAll(response.Body) @@ -102,15 +114,15 @@ func (i *Client) Handle() (o *Client) { return } response.Body.Close() - e += ` `+string(body) + e += ` ` + string(body) } } o.err = errors.New(e) return } - go func(){ - defer func(){ + go func() { + defer func() { o.signal.Done() close(o.RecvChan) c.Close() @@ -121,41 +133,49 @@ func (i *Client) Handle() (o *Client) { go func() { defer done.Done() - + for { - c.SetReadDeadline(time.Now().Add(time.Millisecond*time.Duration(o.TO))) + c.SetReadDeadline(time.Now().Add(time.Millisecond * time.Duration(o.TO))) msg_type, message, err := c.ReadMessage() if err != nil { if e, ok := err.(*websocket.CloseError); ok { switch e.Code { - case websocket.CloseNormalClosure:o.Func_normal_close() - case websocket.CloseAbnormalClosure:o.Func_abort_close() + case websocket.CloseNormalClosure: + o.Func_normal_close() + case websocket.CloseAbnormalClosure: + o.Func_abort_close() default: } o.err = e } return } - if !done.Islive() {return} + if !done.Islive() { + return + } switch msg_type { case websocket.PingMessage: o.SendChan <- ws_msg{ - Type:websocket.PongMessage, - Msg:message, + Type: websocket.PongMessage, + Msg: message, } case websocket.PongMessage: o.Ping.had_pong = true - default:o.RecvChan <- message + default: + o.RecvChan <- message } } }() - + for { select { - case <- done.WaitC():return - case t := <- o.SendChan: - if !done.Islive() {return} - + case <-done.WaitC(): + return + case t := <-o.SendChan: + if !done.Islive() { + return + } + var err error switch reflect.ValueOf(t).Type().Name() { case `ws_msg`: @@ -167,37 +187,45 @@ func (i *Client) Handle() (o *Client) { o.err = err return } - c.SetWriteDeadline(time.Now().Add(time.Millisecond*time.Duration(o.TO))) - case <- o.signal.WaitC(): - if !done.Islive() {return} + c.SetWriteDeadline(time.Now().Add(time.Millisecond * time.Duration(o.TO))) + case <-o.signal.WaitC(): + if !done.Islive() { + return + } err := c.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, o.Msg_normal_close)) - if err != nil {o.err = err} + if err != nil { + o.err = err + } select { - case <- done.WaitC(): - case <- time.After(time.Second): + case <-done.WaitC(): + case <-time.After(time.Second): } return } - } + } }() return } func (o *Client) Heartbeat() (err error) { - if !o.signal.Islive() {return errors.New(`not alive`)} + if !o.signal.Islive() { + return errors.New(`not alive`) + } - var ticker_ping = time.NewTicker(time.Duration(o.TO)*time.Millisecond) + var ticker_ping = time.NewTicker(time.Duration(o.TO) * time.Millisecond) if o.Ping.Period > 0 { if o.Ping.Period < o.TO { - ticker_ping.Reset(time.Duration(o.Ping.Period)*time.Millisecond) + ticker_ping.Reset(time.Duration(o.Ping.Period) * time.Millisecond) o.Ping.had_pong = true } else { err = errors.New(`Ping.Period < o.TO`) } - } else {ticker_ping.Stop()} + } else { + ticker_ping.Stop() + } - go func(ticker_ping *time.Ticker){ + go func(ticker_ping *time.Ticker) { defer ticker_ping.Stop() for { select { @@ -208,11 +236,11 @@ func (o *Client) Heartbeat() (err error) { return } o.SendChan <- ws_msg{ - Type:websocket.PingMessage, - Msg:o.Ping.Msg, + Type: websocket.PingMessage, + Msg: o.Ping.Msg, } o.Ping.had_pong = false - case <- o.signal.Chan: + case <-o.signal.Chan: return } } @@ -231,4 +259,4 @@ func (o *Client) Isclose() bool { func (o *Client) Error() error { return o.err -} \ No newline at end of file +} diff --git a/websocket/Recoder.go b/websocket/Recoder.go index 992f315..dc7f497 100644 --- a/websocket/Recoder.go +++ b/websocket/Recoder.go @@ -8,6 +8,7 @@ import ( "strconv" "time" + "github.com/dustin/go-humanize" file "github.com/qydysky/part/file" funcCtrl "github.com/qydysky/part/funcCtrl" signal "github.com/qydysky/part/signal" @@ -121,7 +122,7 @@ func Play(filePath string) (s *Server, close func()) { for sg.Islive() { if data == nil { - if data, e = f.ReadUntil('\n', 70, 1000); e != nil && !errors.Is(e, io.EOF) { + if data, e = f.ReadUntil('\n', humanize.KByte, humanize.MByte); e != nil && !errors.Is(e, io.EOF) { panic(e) } if len(data) == 0 { diff --git a/websocket/Server_test.go b/websocket/Server_test.go index a7ec681..1ca4d9c 100644 --- a/websocket/Server_test.go +++ b/websocket/Server_test.go @@ -65,7 +65,7 @@ func Test_Server(t *testing.T) { } func Test_Recoder(t *testing.T) { - s, cf := Play("l.csv", 50, 5000) + s, cf := Play("l.csv") defer cf() w := web.Easy_boot() -- 2.39.2