From: qydysky Date: Thu, 6 May 2021 13:11:34 +0000 (+0800) Subject: req read time X-Git-Tag: v0.5.13 X-Git-Url: http://127.0.0.1:8081/?a=commitdiff_plain;h=5e802c4a301f0d75795e1d6d48d4f5bf93bfa0ec;p=part%2F.git req read time io reader writer to chan --- diff --git a/io/io.go b/io/io.go new file mode 100644 index 0000000..fb0be33 --- /dev/null +++ b/io/io.go @@ -0,0 +1,44 @@ +package part + +import ( + "io" +) + +//no close rc any time +//you can close wc, r, w. +func RW2Chan(r io.ReadCloser,w io.WriteCloser) (rc,wc chan[]byte) { + if r != nil { + rc = make(chan[]byte, 1<<16) + go func(rc chan[]byte,r io.ReadCloser){ + for { + buf := make([]byte, 1<<16) + n,e := r.Read(buf) + if n != 0 { + rc <- buf[:n] + } else if e != nil { + close(rc) + break + } + } + }(rc,r) + } + + if w != nil { + wc = make(chan[]byte, 1<<16) + go func(wc chan[]byte,w io.WriteCloser){ + for { + buf :=<- wc + if len(buf) == 0 {//chan close + w.Close() + break + } + _,e := w.Write(buf) + if e != nil { + close(wc) + break + } + } + }(wc,w) + } + return +} \ No newline at end of file diff --git a/io/io_test.go b/io/io_test.go new file mode 100644 index 0000000..dbef24f --- /dev/null +++ b/io/io_test.go @@ -0,0 +1,40 @@ +package part + +import ( + "testing" + "io" +) + +func Test_RW2Chan(t *testing.T) { + { + r,w := io.Pipe() + _,rw := RW2Chan(nil,w) + + go func(){ + rw<-[]byte{0x01} + }() + buf := make([]byte, 1<<16) + n,_:=r.Read(buf) + if buf[:n][0] != 1 {t.Error(`no`)} + } + + { + r,w := io.Pipe() + rc,_ := RW2Chan(r,nil) + + go func(){ + w.Write([]byte{0x09}) + }() + if b:=<-rc;b[0] != 9 {t.Error(`no2`)} + } + + { + r,w := io.Pipe() + rc,rw := RW2Chan(r,w) + + go func(){ + rw <- []byte{0x07} + }() + if b:=<-rc;b[0] != 7 {t.Error(`no3`)} + } +} diff --git a/reqf/Reqf.go b/reqf/Reqf.go index df5b851..f9d271a 100644 --- a/reqf/Reqf.go +++ b/reqf/Reqf.go @@ -12,6 +12,7 @@ import ( "io/ioutil" "net/url" compress "github.com/qydysky/part/compress" + pio "github.com/qydysky/part/io" // "encoding/binary" ) @@ -19,6 +20,7 @@ type Rval struct { Url string PostStr string Timeout int + ReadTimeout int Proxy string Retry int SleepTime int @@ -86,6 +88,7 @@ func (this *Req) Reqf_1(val Rval) (error) { PostStr string = val.PostStr Proxy string = val.Proxy Timeout int = val.Timeout + ReadTimeout int = val.ReadTimeout JustResponseCode bool = val.JustResponseCode SaveToChan chan[]byte = val.SaveToChan SaveToPath string = val.SaveToPath @@ -201,26 +204,42 @@ func (this *Req) Reqf_1(val Rval) (error) { return err } } else { - buf := make([]byte, 1<<20) + rc,_ := pio.RW2Chan(resp.Body,nil) + var After = func(ReadTimeout int) (c <-chan time.Time) { + if ReadTimeout > 0 { + c = time.NewTimer(time.Second*time.Duration(ReadTimeout)).C + } + return + } + for { - if n,e := resp.Body.Read(buf); n != 0{ - b := make([]byte,n) - copy(b,buf[:n]) - if SaveToChan != nil { - SaveToChan <- b - } else if SaveToPipeWriter != nil { - SaveToPipeWriter.Write(b) + select { + case buf :=<- rc: + if len(buf) != 0 { + if SaveToChan != nil { + SaveToChan <- buf + } else if SaveToPipeWriter != nil { + SaveToPipeWriter.Write(buf) + } else { + this.Respon = append(this.Respon,buf...) + } } else { - this.Respon = append(this.Respon,b...) + if SaveToChan != nil { + close(SaveToChan) + } + if SaveToPipeWriter != nil { + SaveToPipeWriter.Close() + } + return nil } - } else { + case <-After(ReadTimeout): if SaveToChan != nil { close(SaveToChan) } if SaveToPipeWriter != nil { - SaveToPipeWriter.CloseWithError(e) + SaveToPipeWriter.Close() } - return err + return context.DeadlineExceeded } } } diff --git a/reqf/Reqf_test.go b/reqf/Reqf_test.go index 29d65bd..5d506f0 100644 --- a/reqf/Reqf_test.go +++ b/reqf/Reqf_test.go @@ -6,7 +6,7 @@ import ( ) func Test_Timeout(t *testing.T) { - r := Req() + r := New() if e := r.Reqf(Rval{ Url:`https://cdimage.debian.org/debian-cd/current/amd64/iso-cd/debian-10.9.0-amd64-netinst.iso`, Timeout:1, @@ -20,7 +20,7 @@ func Test_Timeout(t *testing.T) { } func Test_Cancel(t *testing.T) { - r := Req() + r := New() go func(){ time.Sleep(time.Second)