From f360c8f54f215db70f98a186940dc409ecb96e4d Mon Sep 17 00:00:00 2001 From: qydysky Date: Thu, 13 May 2021 21:13:42 +0800 Subject: [PATCH] =?utf8?q?panic=E4=BF=AE=E5=A4=8D=20req=20connectTO?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit --- Random.go | 5 ++++ reqf/Reqf.go | 71 +++++++++++++++++++++++++++------------------ websocket/Client.go | 19 +++++++----- 3 files changed, 60 insertions(+), 35 deletions(-) diff --git a/Random.go b/Random.go index debb671..b1402cb 100644 --- a/Random.go +++ b/Random.go @@ -31,6 +31,11 @@ func (*random) FakeRandom(max int64) int64 { func (t *random) MixRandom(min, max int64) int64 { lenght := max - min + if lenght == 0 { + return min + } else if lenght < 0 { + panic(`max < min`) + } r := t.TrueRandom(lenght) if r != -1 {return min + r} return min + t.FakeRandom(lenght) diff --git a/reqf/Reqf.go b/reqf/Reqf.go index 34ac6cf..3e129bd 100644 --- a/reqf/Reqf.go +++ b/reqf/Reqf.go @@ -23,6 +23,7 @@ type Rval struct { PostStr string Timeout int ReadTimeout int + ConnectTimeout int Proxy string Retry int SleepTime int @@ -72,7 +73,7 @@ func (this *Req) Reqf(val Rval) (error) { _val := val; - if _val.Timeout==0{_val.Timeout=3} + if _val.Timeout==0{_val.Timeout=3000} defer func(){ this.idp.Put(this.id) @@ -80,7 +81,7 @@ func (this *Req) Reqf(val Rval) (error) { this.cancel = signal.Init() - for ;_val.Retry>=0;_val.Retry-- { + for SleepTime,Retry:=_val.SleepTime,_val.Retry;Retry>=0;Retry-=1 { returnErr=this.Reqf_1(_val) select { case <- this.cancel.WaitC()://cancel @@ -88,13 +89,13 @@ func (this *Req) Reqf(val Rval) (error) { default: if returnErr==nil {return nil} } - time.Sleep(time.Duration(_val.SleepTime)*time.Millisecond) + time.Sleep(time.Duration(SleepTime)*time.Millisecond) } return returnErr } -func (this *Req) Reqf_1(val Rval) (error) { +func (this *Req) Reqf_1(val Rval) (err error) { var ( Url string = val.Url @@ -102,6 +103,7 @@ func (this *Req) Reqf_1(val Rval) (error) { Proxy string = val.Proxy Timeout int = val.Timeout ReadTimeout int = val.ReadTimeout + ConnectTimeout int = val.ConnectTimeout JustResponseCode bool = val.JustResponseCode SaveToChan chan[]byte = val.SaveToChan SaveToPath string = val.SaveToPath @@ -137,7 +139,7 @@ func (this *Req) Reqf_1(val Rval) (error) { cx, cancel := context.WithCancel(context.Background()) if Timeout != -1 { - cx, _ = context.WithTimeout(cx,time.Duration(Timeout)*time.Second) + cx, _ = context.WithTimeout(cx,time.Duration(Timeout)*time.Millisecond) } req,_ := http.NewRequest(Method, Url, body) req = req.WithContext(cx) @@ -218,12 +220,31 @@ func (this *Req) Reqf_1(val Rval) (error) { rc,_ := pio.RW2Chan(resp.Body,nil) var After = func(ReadTimeout int) (c <-chan time.Time) { if ReadTimeout > 0 { - c = time.NewTimer(time.Second*time.Duration(ReadTimeout)).C + c = time.NewTimer(time.Millisecond*time.Duration(ReadTimeout)).C } return } - for { + select { + case buf :=<- rc: + if len(buf) != 0 { + if SaveToChan != nil { + SaveToChan <- buf + } else if SaveToPipeWriter != nil { + SaveToPipeWriter.Write(buf) + } else { + this.Respon = append(this.Respon,buf...) + } + } else { + err = io.EOF + return + } + case <-After(ConnectTimeout): + err = context.DeadlineExceeded + return + } + + for loop:=true;loop; { select { case buf :=<- rc: if len(buf) != 0 { @@ -235,33 +256,27 @@ func (this *Req) Reqf_1(val Rval) (error) { this.Respon = append(this.Respon,buf...) } } else { - if SaveToChan != nil { - close(SaveToChan) - } - if SaveToPipeWriter != nil { - SaveToPipeWriter.Close() - } - return nil + err = io.EOF + loop = false + break } case <-After(ReadTimeout): - if SaveToChan != nil { - close(SaveToChan) - } - if SaveToPipeWriter != nil { - SaveToPipeWriter.Close() - } - return context.DeadlineExceeded + err = context.DeadlineExceeded + loop = false + break } if !this.cancel.Islive() { - if SaveToChan != nil { - close(SaveToChan) - } - if SaveToPipeWriter != nil { - SaveToPipeWriter.Close() - } - return context.Canceled + err = context.Canceled + loop = false + break } } + if SaveToChan != nil { + close(SaveToChan) + } + if SaveToPipeWriter != nil { + SaveToPipeWriter.Close() + } } } } else {resp.Body.Close()} diff --git a/websocket/Client.go b/websocket/Client.go index e94a1e6..9f92a9c 100644 --- a/websocket/Client.go +++ b/websocket/Client.go @@ -75,8 +75,8 @@ func (i *Client) Handle() (o *Client) { go func(){ defer func(){ - close(o.RecvChan) o.signal.Done() + close(o.RecvChan) }() tmp_Header := make(http.Header) @@ -96,10 +96,11 @@ func (i *Client) Handle() (o *Client) { if err != nil {return} defer c.Close() - done := make(chan struct{}) + done := s.Init() + defer done.Done() go func() { - defer close(done) + defer done.Done() for { c.SetReadDeadline(time.Now().Add(time.Millisecond*time.Duration(o.TO))) @@ -115,6 +116,7 @@ func (i *Client) Handle() (o *Client) { } return } + if !done.Islive() {return} switch msg_type { case websocket.PingMessage: o.SendChan <- ws_msg{ @@ -130,9 +132,10 @@ func (i *Client) Handle() (o *Client) { for { select { - case <- done: - return + case <- done.WaitC():return case t := <- o.SendChan: + if !done.Islive() {return} + var err error switch reflect.ValueOf(t).Type().Name() { case `ws_msg`: @@ -145,11 +148,13 @@ func (i *Client) Handle() (o *Client) { return } c.SetWriteDeadline(time.Now().Add(time.Millisecond*time.Duration(o.TO))) - case <- o.signal.Chan: + case <- o.signal.WaitC(): + if !done.Islive() {return} + err := c.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, o.Msg_normal_close)) if err != nil {o.err = err} select { - case <- done: + case <- done.WaitC(): case <- time.After(time.Second): } return -- 2.39.2