]> 127.0.0.1 Git - part/.git/commitdiff
req read time v0.5.13
authorqydysky <qydysky@foxmail.com>
Thu, 6 May 2021 13:11:34 +0000 (21:11 +0800)
committerqydysky <qydysky@foxmail.com>
Thu, 6 May 2021 13:11:34 +0000 (21:11 +0800)
io reader writer to chan

io/io.go [new file with mode: 0644]
io/io_test.go [new file with mode: 0644]
reqf/Reqf.go
reqf/Reqf_test.go

diff --git a/io/io.go b/io/io.go
new file mode 100644 (file)
index 0000000..fb0be33
--- /dev/null
+++ b/io/io.go
@@ -0,0 +1,44 @@
+package part
+
+import (
+       "io"
+)
+
+//no close rc any time
+//you can close wc, r, w.
+func RW2Chan(r io.ReadCloser,w io.WriteCloser) (rc,wc chan[]byte) {
+       if r != nil {
+               rc = make(chan[]byte, 1<<16)
+               go func(rc chan[]byte,r io.ReadCloser){
+                       for {
+                               buf := make([]byte, 1<<16)
+                               n,e := r.Read(buf)
+                               if n != 0 {
+                                       rc <- buf[:n]
+                               } else if e != nil {
+                                       close(rc)
+                                       break
+                               }
+                       }
+               }(rc,r)
+       }
+       
+       if w != nil {
+               wc = make(chan[]byte, 1<<16)
+               go func(wc chan[]byte,w io.WriteCloser){
+                       for {
+                               buf :=<- wc
+                               if len(buf) == 0 {//chan close
+                                       w.Close()
+                                       break
+                               }
+                               _,e := w.Write(buf)
+                               if e != nil {
+                                       close(wc)
+                                       break
+                               }
+                       }
+               }(wc,w)
+       }
+       return
+}
\ No newline at end of file
diff --git a/io/io_test.go b/io/io_test.go
new file mode 100644 (file)
index 0000000..dbef24f
--- /dev/null
@@ -0,0 +1,40 @@
+package part
+
+import (
+       "testing"
+       "io"
+)
+
+func Test_RW2Chan(t *testing.T) {
+       {
+               r,w := io.Pipe()
+               _,rw := RW2Chan(nil,w)
+               
+               go func(){
+                       rw<-[]byte{0x01}
+               }()
+               buf := make([]byte, 1<<16)
+               n,_:=r.Read(buf)
+               if buf[:n][0] != 1 {t.Error(`no`)}
+       }
+       
+       {
+               r,w := io.Pipe()
+               rc,_ := RW2Chan(r,nil)
+               
+               go func(){
+                       w.Write([]byte{0x09})
+               }()
+               if b:=<-rc;b[0] != 9 {t.Error(`no2`)}
+       }
+       
+       {
+               r,w := io.Pipe()
+               rc,rw := RW2Chan(r,w)
+               
+               go func(){
+                       rw <- []byte{0x07}
+               }()
+               if b:=<-rc;b[0] != 7 {t.Error(`no3`)}
+       }
+}
index df5b851011b08ded1fc0acc569d0a671d00470d9..f9d271ac3140d0e3b377a38aa003dcdd1f0c35b2 100644 (file)
@@ -12,6 +12,7 @@ import (
     "io/ioutil"
     "net/url"
     compress "github.com/qydysky/part/compress"
+    pio "github.com/qydysky/part/io"
     // "encoding/binary"
 )
 
@@ -19,6 +20,7 @@ type Rval struct {
     Url string
     PostStr string
     Timeout int
+    ReadTimeout int
     Proxy string
     Retry int
     SleepTime int
@@ -86,6 +88,7 @@ func (this *Req) Reqf_1(val Rval) (error) {
         PostStr string = val.PostStr
         Proxy string = val.Proxy
         Timeout int = val.Timeout
+        ReadTimeout int = val.ReadTimeout
         JustResponseCode bool = val.JustResponseCode
         SaveToChan chan[]byte = val.SaveToChan
         SaveToPath string = val.SaveToPath
@@ -201,26 +204,42 @@ func (this *Req) Reqf_1(val Rval) (error) {
                     return err
                 }
             } else {
-                buf := make([]byte, 1<<20)
+                rc,_ := pio.RW2Chan(resp.Body,nil)
+                var After = func(ReadTimeout int) (c <-chan time.Time) {
+                    if ReadTimeout > 0 {
+                        c = time.NewTimer(time.Second*time.Duration(ReadTimeout)).C
+                    }
+                    return
+                }
+                
                 for {
-                    if n,e := resp.Body.Read(buf); n != 0{
-                        b := make([]byte,n)
-                        copy(b,buf[:n])
-                        if SaveToChan != nil {
-                            SaveToChan <- b
-                        } else if SaveToPipeWriter != nil {
-                            SaveToPipeWriter.Write(b)
+                    select {
+                    case buf :=<- rc:
+                        if len(buf) != 0 {
+                            if SaveToChan != nil {
+                                SaveToChan <- buf
+                            } else if SaveToPipeWriter != nil {
+                                SaveToPipeWriter.Write(buf)
+                            } else {
+                                this.Respon = append(this.Respon,buf...)
+                            }
                         } else {
-                            this.Respon = append(this.Respon,b...)
+                            if SaveToChan != nil {
+                                close(SaveToChan)
+                            }
+                            if SaveToPipeWriter != nil {
+                                SaveToPipeWriter.Close()
+                            }
+                            return nil
                         }
-                    } else {
+                    case <-After(ReadTimeout):
                         if SaveToChan != nil {
                             close(SaveToChan)
                         }
                         if SaveToPipeWriter != nil {
-                            SaveToPipeWriter.CloseWithError(e)
+                            SaveToPipeWriter.Close()
                         }
-                        return err
+                        return context.DeadlineExceeded
                     }
                 }
             }
index 29d65bdb7650149d0f38809694b8f3676e996464..5d506f0ecd2d6d74dd259c40b5b740ece6364889 100644 (file)
@@ -6,7 +6,7 @@ import (
 )
 
 func Test_Timeout(t *testing.T) {
-       r := Req()
+       r := New()
        if e := r.Reqf(Rval{
                Url:`https://cdimage.debian.org/debian-cd/current/amd64/iso-cd/debian-10.9.0-amd64-netinst.iso`,
                Timeout:1,
@@ -20,7 +20,7 @@ func Test_Timeout(t *testing.T) {
 }
 
 func Test_Cancel(t *testing.T) {
-       r := Req()
+       r := New()
 
        go func(){
                time.Sleep(time.Second)