From 011b8c3fe51ae2127efc8e29ccf6d9d7f915305f Mon Sep 17 00:00:00 2001 From: qydysky Date: Tue, 11 May 2021 08:22:38 +0800 Subject: [PATCH] req id --- io/io.go | 4 ++-- reqf/Reqf.go | 47 +++++++++++++++++++++++++++++++++-------------- reqf/Reqf_test.go | 29 +++++++++++++++++++++++++++++ signal/Signal.go | 5 +++++ 4 files changed, 69 insertions(+), 16 deletions(-) diff --git a/io/io.go b/io/io.go index fb0be33..1572fdf 100644 --- a/io/io.go +++ b/io/io.go @@ -12,10 +12,10 @@ func RW2Chan(r io.ReadCloser,w io.WriteCloser) (rc,wc chan[]byte) { go func(rc chan[]byte,r io.ReadCloser){ for { buf := make([]byte, 1<<16) - n,e := r.Read(buf) + n,_ := r.Read(buf) if n != 0 { rc <- buf[:n] - } else if e != nil { + } else { close(rc) break } diff --git a/reqf/Reqf.go b/reqf/Reqf.go index f9d271a..2b3b9d2 100644 --- a/reqf/Reqf.go +++ b/reqf/Reqf.go @@ -13,6 +13,8 @@ import ( "net/url" compress "github.com/qydysky/part/compress" pio "github.com/qydysky/part/io" + signal "github.com/qydysky/part/signal" + idpool "github.com/qydysky/part/idpool" // "encoding/binary" ) @@ -38,8 +40,8 @@ type Req struct { Response *http.Response UsedTime time.Duration - cancelOpen bool - cancel chan interface{} + id *idpool.Id + cancel *signal.Signal sync.Mutex } @@ -67,10 +69,21 @@ func (this *Req) Reqf(val Rval) (error) { if _val.Timeout==0{_val.Timeout=3} + { + idp := idpool.New() + this.id = idp.Get() + defer func(){ + idp.Put(this.id) + this.id = nil + }() + } + + this.cancel = signal.Init() + for ;_val.Retry>=0;_val.Retry-- { returnErr=this.Reqf_1(_val) select { - case <- this.cancel://cancel + case <- this.cancel.WaitC()://cancel return returnErr default: if returnErr==nil {return nil} @@ -132,10 +145,8 @@ func (this *Req) Reqf_1(val Rval) (error) { var done = make(chan struct{}) defer close(done) go func(){ - this.cancel = make(chan interface{}) - this.cancelOpen = true select { - case <- this.cancel:cancel() + case <- this.cancel.WaitC():cancel() case <- done: } }() @@ -241,6 +252,15 @@ func (this *Req) Reqf_1(val Rval) (error) { } return context.DeadlineExceeded } + if !this.cancel.Islive() { + if SaveToChan != nil { + close(SaveToChan) + } + if SaveToPipeWriter != nil { + SaveToPipeWriter.Close() + } + return context.Canceled + } } } } @@ -254,14 +274,13 @@ func (this *Req) Reqf_1(val Rval) (error) { func (t *Req) Cancel(){t.Close()} func (t *Req) Close(){ - if !t.cancelOpen {return} - select { - case <- t.cancel://had close - return - default: - close(t.cancel) - t.cancelOpen = false - } + if !t.cancel.Islive() {return} + t.cancel.Done() +} + +func (t *Req) Id() uintptr { + if t.id == nil {return 0} + return t.id.Id } func Cookies_String_2_Map(Cookies string) (o map[string]string) { diff --git a/reqf/Reqf_test.go b/reqf/Reqf_test.go index 5d506f0..fb70b49 100644 --- a/reqf/Reqf_test.go +++ b/reqf/Reqf_test.go @@ -36,4 +36,33 @@ func Test_Cancel(t *testing.T) { return } t.Error(`no error`) +} + +func Test_Cancel_chan(t *testing.T) { + r := New() + + c := make(chan[]byte,1<<16) + + go func(){ + for{ + <-c + } + }() + + go func(){ + time.Sleep(time.Second*7) + r.Cancel() + }() + + if e := r.Reqf(Rval{ + Url:`https://cdimage.debian.org/debian-cd/current/amd64/iso-cd/debian-10.9.0-amd64-netinst.iso`, + SaveToChan:c, + Timeout:10, + });e != nil { + if !IsCancel(e) { + t.Error(`type error`,e) + } + return + } + t.Error(`no error`) } \ No newline at end of file diff --git a/signal/Signal.go b/signal/Signal.go index 8180acb..2721d99 100644 --- a/signal/Signal.go +++ b/signal/Signal.go @@ -12,6 +12,11 @@ func (i *Signal) Wait() { if i.Islive() {<-i.Chan} } +func (i *Signal) WaitC() (<-chan struct{}) { + if i.Islive() {return i.Chan} + return nil +} + func (i *Signal) Done() { if i.Islive() {close(i.Chan)} } -- 2.39.2