]> 127.0.0.1 Git - part/.git/commitdiff
add
authorqydysky <qydysky@foxmail.com>
Fri, 2 Jun 2023 17:42:52 +0000 (01:42 +0800)
committerqydysky <qydysky@foxmail.com>
Fri, 2 Jun 2023 17:42:52 +0000 (01:42 +0800)
compress/Flate.go
go.mod
go.sum
reqf/Reqf.go
reqf/Reqf_test.go

index ee4839f44715bd2893c9c13d3de2fefc99268a92..379952efebd45ee21681326bf0239a0822db253b 100644 (file)
@@ -14,10 +14,10 @@ func InFlate(byteS []byte, level int) ([]byte, error) {
        if err != nil {
                return buf.Bytes(), err
        }
-       defer flateWrite.Close()
        // 写入待压缩内容
        flateWrite.Write(byteS)
        flateWrite.Flush()
+       flateWrite.Close()
        return buf.Bytes(), nil
 }
 
diff --git a/go.mod b/go.mod
index 4b72ab0e7e347099509a30f2189b99dcd879879d..ba4797a1112957d302a3c0f39fdfcd70b41f8a3b 100644 (file)
--- a/go.mod
+++ b/go.mod
@@ -16,17 +16,17 @@ require (
 require (
        github.com/google/uuid v1.3.0 // indirect
        github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 // indirect
-       github.com/mattn/go-isatty v0.0.18 // indirect
+       github.com/mattn/go-isatty v0.0.19 // indirect
        github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect
        github.com/tklauser/go-sysconf v0.3.11 // indirect
-       github.com/tklauser/numcpus v0.6.0 // indirect
-       github.com/yusufpapurcu/wmi v1.2.2 // indirect
+       github.com/tklauser/numcpus v0.6.1 // indirect
+       github.com/yusufpapurcu/wmi v1.2.3 // indirect
        golang.org/x/mod v0.10.0 // indirect
-       golang.org/x/tools v0.8.0 // indirect
+       golang.org/x/tools v0.9.3 // indirect
        lukechampine.com/uint128 v1.3.0 // indirect
        modernc.org/cc/v3 v3.40.0 // indirect
        modernc.org/ccgo/v3 v3.16.13 // indirect
-       modernc.org/libc v1.22.5 // indirect
+       modernc.org/libc v1.22.6 // indirect
        modernc.org/mathutil v1.5.0 // indirect
        modernc.org/memory v1.5.0 // indirect
        modernc.org/opt v0.1.3 // indirect
@@ -38,8 +38,8 @@ require (
        github.com/dustin/go-humanize v1.0.1
        github.com/go-ole/go-ole v1.2.6 // indirect
        github.com/stretchr/testify v1.8.2 // indirect
-       golang.org/x/net v0.9.0 // indirect
-       golang.org/x/sys v0.7.0 // indirect
+       golang.org/x/net v0.10.0 // indirect
+       golang.org/x/sys v0.8.0 // indirect
        modernc.org/sqlite v1.22.1
 )
 
diff --git a/go.sum b/go.sum
index d786af2b639360128d6b8245225e27244cbd0e42..976bffdc8ea6231541b0baef39134698b1e48aaa 100644 (file)
--- a/go.sum
+++ b/go.sum
@@ -17,8 +17,8 @@ github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 h1:Z9n2FFNU
 github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51/go.mod h1:CzGEWj7cYgsdH8dAjBGEr58BoE7ScuLd+fwFZ44+/x8=
 github.com/klauspost/compress v1.16.5 h1:IFV2oUNUzZaz+XyusxpLzpzS8Pt5rh0Z16For/djlyI=
 github.com/klauspost/compress v1.16.5/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
-github.com/mattn/go-isatty v0.0.18 h1:DOKFKCQ7FNG2L1rbrmstDN4QVRdS89Nkh85u68Uwp98=
-github.com/mattn/go-isatty v0.0.18/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
+github.com/mattn/go-isatty v0.0.19 h1:JITubQf0MOLdlGRuRq+jtsDlekdYPia9ZFsB8h/APPA=
+github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
 github.com/mattn/go-sqlite3 v1.14.16 h1:yOQRA0RpS5PFz/oikGwBEqvAWhWg5ufRz4ETLjwpU1Y=
 github.com/miekg/dns v1.1.54 h1:5jon9mWcb0sFJGpnI99tOMhCPyJ+RPVz5b63MQG0VWI=
 github.com/miekg/dns v1.1.54/go.mod h1:uInx36IzPl7FYnDcMeVWxj9byh7DutNykX4G9Sj60FY=
@@ -42,24 +42,25 @@ github.com/thedevsaddam/gojsonq/v2 v2.5.2 h1:CoMVaYyKFsVj6TjU6APqAhAvC07hTI6IQen
 github.com/thedevsaddam/gojsonq/v2 v2.5.2/go.mod h1:bv6Xa7kWy82uT0LnXPE2SzGqTj33TAEeR560MdJkiXs=
 github.com/tklauser/go-sysconf v0.3.11 h1:89WgdJhk5SNwJfu+GKyYveZ4IaJ7xAkecBo+KdJV0CM=
 github.com/tklauser/go-sysconf v0.3.11/go.mod h1:GqXfhXY3kiPa0nAXPDIQIWzJbMCB7AmcWpGR8lSZfqI=
-github.com/tklauser/numcpus v0.6.0 h1:kebhY2Qt+3U6RNK7UqpYNA+tJ23IBEGKkB7JQBfDYms=
 github.com/tklauser/numcpus v0.6.0/go.mod h1:FEZLMke0lhOUG6w2JadTzp0a+Nl8PF/GFkQ5UVIcaL4=
-github.com/yusufpapurcu/wmi v1.2.2 h1:KBNDSne4vP5mbSWnJbO+51IMOXJB67QiYCSBrubbPRg=
-github.com/yusufpapurcu/wmi v1.2.2/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0=
+github.com/tklauser/numcpus v0.6.1 h1:ng9scYS7az0Bk4OZLvrNXNSAO2Pxr1XXRAPyjhIx+Fk=
+github.com/tklauser/numcpus v0.6.1/go.mod h1:1XfjsgE2zo8GVw7POkMbHENHzVg3GzmoZ9fESEdAacY=
+github.com/yusufpapurcu/wmi v1.2.3 h1:E1ctvB7uKFMOJw3fdOW32DwGE9I7t++CRUEMKvFoFiw=
+github.com/yusufpapurcu/wmi v1.2.3/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0=
 golang.org/x/mod v0.10.0 h1:lFO9qtOdlre5W1jxS3r/4szv2/6iXxScdzjoBMXNhYk=
 golang.org/x/mod v0.10.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
-golang.org/x/net v0.9.0 h1:aWJ/m6xSmxWBx+V0XRHTlrYrPG56jKsLdTFmsSsCzOM=
-golang.org/x/net v0.9.0/go.mod h1:d48xBJpPfHeWQsugry2m+kC02ZBRGRgulfHnEXEuWns=
-golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o=
+golang.org/x/net v0.10.0 h1:X2//UzNDwYmtCLn7To6G58Wr6f5ahEAQgKNzv9Y951M=
+golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg=
+golang.org/x/sync v0.2.0 h1:PUR+T4wwASmuSTYdKjYHI5TD22Wy5ogLU5qZCOLxBrI=
 golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.2.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
 golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
-golang.org/x/sys v0.7.0 h1:3jlCCIQZPdOYu1h8BkNvLz8Kgwtae2cagcG/VamtZRU=
-golang.org/x/sys v0.7.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.8.0 h1:EBmGv8NaZBZTWvrbjNoL6HVt+IVy3QDQpJs7VRIw3tU=
+golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
 golang.org/x/text v0.9.0 h1:2sjJmO8cDvYveuX97RDLsxlyUxLl+GHoLxBiRdHllBE=
 golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
-golang.org/x/tools v0.8.0 h1:vSDcovVPld282ceKgDimkRSC8kpaH1dgyc9UMzlt84Y=
-golang.org/x/tools v0.8.0/go.mod h1:JxBZ99ISMI5ViVkT1tr6tdNmXeTrcpVSD3vZ1RsRdN4=
+golang.org/x/tools v0.9.3 h1:Gn1I8+64MsuTb/HpH+LmQtNas23LhUVr3rYZ0eKuaMM=
+golang.org/x/tools v0.9.3/go.mod h1:owI94Op576fPu3cIGQeHs3joujW/2Oc6MtlxbF5dfNc=
 gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
 gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
 gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
@@ -72,8 +73,8 @@ modernc.org/ccgo/v3 v3.16.13 h1:Mkgdzl46i5F/CNR/Kj80Ri59hC8TKAhZrYSaqvkwzUw=
 modernc.org/ccgo/v3 v3.16.13/go.mod h1:2Quk+5YgpImhPjv2Qsob1DnZ/4som1lJTodubIcoUkY=
 modernc.org/ccorpus v1.11.6 h1:J16RXiiqiCgua6+ZvQot4yUuUy8zxgqbqEEUuGPlISk=
 modernc.org/httpfs v1.0.6 h1:AAgIpFZRXuYnkjftxTAZwMIiwEqAfk8aVB2/oA6nAeM=
-modernc.org/libc v1.22.5 h1:91BNch/e5B0uPbJFgqbxXuOnxBQjlS//icfQEGmvyjE=
-modernc.org/libc v1.22.5/go.mod h1:jj+Z7dTNX8fBScMVNRAYZ/jF91K8fdT2hYMThc3YjBY=
+modernc.org/libc v1.22.6 h1:cbXU8R+A6aOjRuhsFh3nbDWXO/Hs4ClJRXYB11KmPDo=
+modernc.org/libc v1.22.6/go.mod h1:jj+Z7dTNX8fBScMVNRAYZ/jF91K8fdT2hYMThc3YjBY=
 modernc.org/mathutil v1.5.0 h1:rV0Ko/6SfM+8G+yKiyI830l3Wuz1zRutdslNoQ0kfiQ=
 modernc.org/mathutil v1.5.0/go.mod h1:mZW8CKdRPY1v87qxC/wUdX5O1qDzXMP5TH3wjfpga6E=
 modernc.org/memory v1.5.0 h1:N+/8c5rE6EqugZwHii4IFsaJ7MUhoWX07J5tC/iI5Ds=
index 37f00dba3f52726e3a832e982e948b76e874c57c..643091d781f5b8d6cfa6eadd27f6126ff35d1802 100644 (file)
@@ -10,15 +10,16 @@ import (
        "net/http"
        "net/url"
        "os"
+       "runtime"
        "strings"
        "sync"
+       "sync/atomic"
        "time"
 
        flate "compress/flate"
        gzip "compress/gzip"
 
        br "github.com/andybalholm/brotli"
-       signal "github.com/qydysky/part/signal"
        s "github.com/qydysky/part/strings"
        // "encoding/binary"
 )
@@ -30,10 +31,13 @@ type Rval struct {
        Proxy            string
        Retry            int
        SleepTime        int
+       WriteLoopTO      int
        JustResponseCode bool
        NoResponse       bool
-       Async            bool
-       Cookies          []*http.Cookie
+       // 当Async为true时,Respon、Response必须在Wait()之后读取,否则有DATA RACE可能
+       Async   bool
+       Cookies []*http.Cookie
+       Ctx     context.Context
 
        SaveToPath       string
        SaveToChan       chan []byte
@@ -42,22 +46,39 @@ type Rval struct {
        Header map[string]string
 }
 
+const (
+       defaultUA     = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/113.0.0.0 Safari/537.36"
+       defaultAccept = `text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8`
+       free          = iota
+       running
+)
+
+var (
+       ErrEmptyUrl         = errors.New("ErrEmptyUrl")
+       ErrNewRequest       = errors.New("ErrNewRequest")
+       ErrClientDo         = errors.New("ErrClientDo")
+       ErrResponFileCreate = errors.New("ErrResponFileCreate")
+       ErrWriteRes         = errors.New("ErrWriteRes")
+       ErrReadRes          = errors.New("ErrReadRes")
+)
+
 type Req struct {
-       Respon    []byte
-       responBuf *bytes.Buffer
-       Response  *http.Response
-       UsedTime  time.Duration
+       // 当Async为true时,必须在Wait()之后读取,否则有DATA RACE可能
+       Respon []byte
+       // 当Async为true时,必须在Wait()之后读取,否则有DATA RACE可能
+       Response *http.Response
+       UsedTime time.Duration
 
-       cancelFs chan func()
-       cancel   *signal.Signal
-       running  *signal.Signal
-       isLive   *signal.Signal
+       cancelP atomic.Pointer[func()]
+       ctx     context.Context
+       state   atomic.Int32
 
        responFile *os.File
+       responBuf  *bytes.Buffer
        err        error
+       callTree   string
 
-       init sync.RWMutex
-       l    sync.Mutex
+       l sync.RWMutex
 }
 
 func New() *Req {
@@ -65,102 +86,68 @@ func New() *Req {
 }
 
 func (t *Req) Reqf(val Rval) error {
-       t.isLive.Wait()
        t.l.Lock()
-       t.init.Lock()
-
-       t.Respon = t.Respon[:0]
-       if t.responBuf == nil {
-               t.responBuf = new(bytes.Buffer)
-       }
-       t.Response = nil
-       t.UsedTime = 0
-       t.cancelFs = make(chan func(), 5)
-       t.isLive = signal.Init()
-       t.cancel = signal.Init()
-       t.running = signal.Init()
-       t.responFile = nil
-       t.err = nil
-       go func() {
-               cancel, cancelFin := t.cancel.WaitC()
-               defer cancelFin()
-               running, runningFin := t.running.WaitC()
-               defer runningFin()
-
-               select {
-               case <-cancel:
-                       for len(t.cancelFs) != 0 {
-                               (<-t.cancelFs)()
-                       }
-               case <-running:
-               }
-       }()
+       t.state.Store(running)
 
-       t.init.Unlock()
+       t.prepare(&val)
 
-       go func() {
+       // 同步
+       if !val.Async {
                beginTime := time.Now()
-               _val := val
 
-               for SleepTime, Retry := _val.SleepTime, _val.Retry; Retry >= 0; Retry -= 1 {
-                       for len(t.cancelFs) != 0 {
-                               <-t.cancelFs
+               for i := 0; i <= val.Retry; i++ {
+                       ctx, cancle := t.prepareRes(&val)
+                       t.err = t.Reqf_1(ctx, val)
+                       if cancle != nil {
+                               cancle()
                        }
-
-                       t.err = t.Reqf_1(_val)
                        if t.err == nil || IsCancel(t.err) {
                                break
                        }
-                       time.Sleep(time.Duration(SleepTime) * time.Millisecond)
+                       if val.SleepTime != 0 {
+                               time.Sleep(time.Duration(val.SleepTime * int(time.Millisecond)))
+                       }
                }
-               t.UsedTime = time.Since(beginTime)
-               t.running.Done()
-       }()
 
-       if !val.Async {
-               t.Wait()
-               if val.SaveToChan != nil {
-                       close(val.SaveToChan)
-               }
-               if t.responFile != nil {
-                       t.responFile.Close()
-               }
-               if val.SaveToPipeWriter != nil {
-                       val.SaveToPipeWriter.Close()
-               }
-               t.cancel.Done()
-               t.running.Done()
+               t.updateUseDur(beginTime)
+               t.clean(&val)
+               t.state.Store(free)
                t.l.Unlock()
-               t.isLive.Done()
                return t.err
-       } else {
-               go func() {
-                       t.Wait()
-                       if val.SaveToChan != nil {
-                               close(val.SaveToChan)
+       }
+
+       //异步
+       go func() {
+               beginTime := time.Now()
+
+               for i := 0; i <= val.Retry; i++ {
+                       ctx, cancle := t.prepareRes(&val)
+                       t.err = t.Reqf_1(ctx, val)
+                       if cancle != nil {
+                               cancle()
                        }
-                       if t.responFile != nil {
-                               t.responFile.Close()
+                       if t.err == nil || IsCancel(t.err) {
+                               break
                        }
-                       if val.SaveToPipeWriter != nil {
-                               val.SaveToPipeWriter.Close()
+                       if val.SleepTime != 0 {
+                               time.Sleep(time.Duration(val.SleepTime * int(time.Millisecond)))
                        }
-                       t.cancel.Done()
-                       t.running.Done()
-                       t.l.Unlock()
-                       t.isLive.Done()
-               }()
-       }
+               }
+
+               t.updateUseDur(beginTime)
+               t.clean(&val)
+               t.state.Store(free)
+               t.l.Unlock()
+       }()
        return nil
 }
 
-func (t *Req) Reqf_1(val Rval) (err error) {
+func (t *Req) Reqf_1(ctx context.Context, val Rval) (err error) {
        var (
                Header map[string]string = val.Header
+               client http.Client
        )
 
-       var client http.Client
-
        if Header == nil {
                Header = make(map[string]string)
        }
@@ -180,7 +167,7 @@ func (t *Req) Reqf_1(val Rval) (err error) {
        }
 
        if val.Url == "" {
-               return errors.New("url is empty")
+               return ErrEmptyUrl
        }
 
        Method := "GET"
@@ -193,24 +180,18 @@ func (t *Req) Reqf_1(val Rval) (err error) {
                }
        }
 
-       cx, cancel := context.WithCancel(context.Background())
-       if val.Timeout > 0 {
-               cx, cancel = context.WithTimeout(cx, time.Duration(val.Timeout)*time.Millisecond)
-       }
-       t.cancelFs <- cancel
-
        req, e := http.NewRequest(Method, val.Url, body)
        if e != nil {
-               panic(e)
+               return errors.Join(ErrNewRequest, e)
        }
-       req = req.WithContext(cx)
+       req = req.WithContext(ctx)
 
        for _, v := range val.Cookies {
                req.AddCookie(v)
        }
 
        if _, ok := Header["Accept"]; !ok {
-               Header["Accept"] = `text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8`
+               Header["Accept"] = defaultAccept
        }
        if _, ok := Header["Connection"]; !ok {
                Header["Connection"] = "keep-alive"
@@ -222,23 +203,17 @@ func (t *Req) Reqf_1(val Rval) (err error) {
                Header["Accept-Encoding"] = "identity"
        }
        if _, ok := Header["User-Agent"]; !ok {
-               Header["User-Agent"] = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/75.0.3770.142 Safari/537.36"
+               Header["User-Agent"] = defaultUA
        }
 
        for k, v := range Header {
                req.Header.Set(k, v)
        }
 
-       if !t.cancel.Islive() {
-               err = context.Canceled
-               return
-       }
-
        resp, e := client.Do(req)
 
        if e != nil {
-               err = e
-               return
+               return errors.Join(ErrClientDo, e)
        }
 
        if v, ok := Header["Connection"]; ok && strings.ToLower(v) != "keep-alive" {
@@ -258,61 +233,64 @@ func (t *Req) Reqf_1(val Rval) (err error) {
        var ws []io.Writer
        if val.SaveToPath != "" {
                t.responFile, e = os.Create(val.SaveToPath)
-               if err != nil {
+               if e != nil {
                        t.responFile.Close()
-                       err = e
-                       return
+                       return errors.Join(err, ErrResponFileCreate, e)
                }
                ws = append(ws, t.responFile)
-               t.cancelFs <- func() { t.responFile.Close() }
        }
        if val.SaveToPipeWriter != nil {
                ws = append(ws, val.SaveToPipeWriter)
-               t.cancelFs <- func() { val.SaveToPipeWriter.Close() }
        }
        if !val.NoResponse {
+               //will clear t.Respon
                t.responBuf.Reset()
                ws = append(ws, t.responBuf)
        }
 
        w := io.MultiWriter(ws...)
 
-       var resReader io.Reader
+       var resReadCloser io.ReadCloser
        if compress_type := resp.Header[`Content-Encoding`]; len(compress_type) != 0 {
                switch compress_type[0] {
                case `br`:
-                       resReader = br.NewReader(resp.Body)
+                       resReadCloser = rwc{r: br.NewReader(resp.Body).Read}
                case `gzip`:
-                       resReader, _ = gzip.NewReader(resp.Body)
+                       resReadCloser, _ = gzip.NewReader(resp.Body)
                case `deflate`:
-                       resReader = flate.NewReader(resp.Body)
+                       resReadCloser = flate.NewReader(resp.Body)
                default:
-                       resReader = resp.Body
+                       resReadCloser = resp.Body
                }
        } else {
-               resReader = resp.Body
+               resReadCloser = resp.Body
        }
-       t.cancelFs <- func() { resp.Body.Close() }
-
-       buf := make([]byte, 512)
 
-       for {
-               if n, e := resReader.Read(buf); n != 0 {
-                       w.Write(buf[:n])
-                       if val.SaveToChan != nil {
-                               val.SaveToChan <- buf[:n]
+       writeLoopTO := val.WriteLoopTO
+       if writeLoopTO == 0 {
+               writeLoopTO = 1000
+       }
+       rwc := t.withCtxTO(ctx, time.Duration(int(time.Millisecond)*writeLoopTO), w, resReadCloser)
+       defer rwc.Close()
+
+       for buf := make([]byte, 512); true; {
+               if n, e := rwc.Read(buf); n != 0 {
+                       if wn, we := rwc.Write(buf[:n]); wn != 0 {
+                               if val.SaveToChan != nil {
+                                       val.SaveToChan <- buf[:n]
+                               }
+                       } else if we != nil {
+                               if !errors.Is(e, io.EOF) {
+                                       err = errors.Join(err, ErrWriteRes, e)
+                               }
+                               break
                        }
                } else if e != nil {
                        if !errors.Is(e, io.EOF) {
-                               err = e
+                               err = errors.Join(err, ErrReadRes, e)
                        }
                        break
                }
-
-               if !t.cancel.Islive() {
-                       err = context.Canceled
-                       break
-               }
        }
 
        resp.Body.Close()
@@ -324,31 +302,145 @@ func (t *Req) Reqf_1(val Rval) (err error) {
        return
 }
 
-func (t *Req) Wait() error {
-       t.init.RLock()
-       defer t.init.RUnlock()
+func (t *Req) Wait() (err error) {
+       t.l.RLock()
+       err = t.err
+       t.l.RUnlock()
+       return
+}
 
-       t.running.Wait()
-       return t.err
+func (t *Req) Close() { t.Cancel() }
+func (t *Req) Cancel() {
+       if p := t.cancelP.Load(); p != nil {
+               (*p)()
+       }
 }
 
-func (t *Req) Cancel() { t.Close() }
+func (t *Req) IsLive() bool {
+       return t.state.Load() == running
+}
 
-func (t *Req) Close() {
-       t.init.RLock()
-       defer t.init.RUnlock()
+func (t *Req) prepareRes(val *Rval) (context.Context, context.CancelFunc) {
+       if !val.NoResponse {
+               if t.responBuf == nil {
+                       t.responBuf = new(bytes.Buffer)
+                       t.Respon = t.responBuf.Bytes()
+               } else {
+                       t.responBuf.Reset()
+               }
+       } else {
+               t.Respon = []byte{}
+               t.responBuf = nil
+       }
+       t.Response = nil
+       t.err = nil
+       if val.Timeout > 0 {
+               return context.WithTimeout(t.ctx, time.Duration(val.Timeout*int(time.Millisecond)))
+       }
+       return t.ctx, nil
+}
 
-       if !t.cancel.Islive() {
-               return
+func (t *Req) prepare(val *Rval) {
+       t.UsedTime = 0
+       t.responFile = nil
+       t.callTree = ""
+       for i := 2; true; i++ {
+               if pc, file, line, ok := runtime.Caller(i); !ok {
+                       break
+               } else {
+                       t.callTree += fmt.Sprintf("call by %s\n\t%s:%d\n", runtime.FuncForPC(pc).Name(), file, line)
+               }
        }
-       t.cancel.Done()
+       var cancel func()
+       if val.Ctx != nil {
+               t.ctx, cancel = context.WithCancel(val.Ctx)
+       } else {
+               t.ctx, cancel = context.WithCancel(context.Background())
+       }
+       t.cancelP.Store(&cancel)
 }
 
-func (t *Req) IsLive() bool {
-       t.init.RLock()
-       defer t.init.RUnlock()
+func (t *Req) clean(val *Rval) {
+       if p := t.cancelP.Load(); p != nil {
+               (*p)()
+       }
+       if val.SaveToChan != nil {
+               close(val.SaveToChan)
+       }
+       if t.responFile != nil {
+               t.responFile.Close()
+       }
+       if val.SaveToPipeWriter != nil {
+               val.SaveToPipeWriter.Close()
+       }
+}
+
+func (t *Req) updateUseDur(u time.Time) {
+       t.UsedTime = time.Since(u)
+}
+
+type rwc struct {
+       r func(p []byte) (n int, err error)
+       w func(p []byte) (n int, err error)
+       c func() error
+}
 
-       return t.isLive.Islive()
+func (t rwc) Write(p []byte) (n int, err error) {
+       return t.w(p)
+}
+func (t rwc) Read(p []byte) (n int, err error) {
+       return t.r(p)
+}
+func (t rwc) Close() error {
+       return t.c()
+}
+
+func (t *Req) withCtxTO(ctx context.Context, to time.Duration, w io.Writer, r io.Reader) io.ReadWriteCloser {
+       var chanw = make(chan struct{}, 1)
+
+       go func(callTree string) {
+               var timer = time.NewTicker(to)
+               defer timer.Stop()
+               for {
+                       select {
+                       case <-ctx.Done():
+                               if len(chanw) != 0 {
+                                       panic(fmt.Sprintf("write blocking after %v, goruntime leak \n%v", to, callTree))
+                               }
+                               return
+                       case <-timer.C:
+                               if len(chanw) != 0 {
+                                       panic(fmt.Sprintf("write blocking after %v, goruntime leak \n%v", to, callTree))
+                               }
+                       }
+               }
+       }(t.callTree)
+
+       return rwc{
+               func(p []byte) (n int, err error) {
+                       if n, err = r.Read(p); n != 0 {
+                               select {
+                               case <-ctx.Done():
+                               case chanw <- struct{}{}:
+                               default:
+                               }
+                       }
+                       return
+               },
+               func(p []byte) (n int, err error) {
+                       if n, err = w.Write(p); n != 0 {
+                               select {
+                               case <-chanw:
+                               default:
+                               }
+                       }
+                       return
+               },
+               func() error {
+                       close(chanw)
+                       return nil
+               },
+       }
 }
 
 func IsTimeout(e error) bool {
index 6133ff681a5318ec653ed7c2cc6ead3aec2fe36b..b48adf80ab69416790e84e4ed7917d721fd07689 100644 (file)
@@ -7,6 +7,7 @@ import (
        "io"
        "net/http"
        "strconv"
+       "sync"
        "testing"
        "time"
 
@@ -95,7 +96,7 @@ func Test_req13(t *testing.T) {
                Retry:   2,
        })
        if e.Error() != "403 Forbidden" {
-               t.Fatal()
+               t.Fatal(e.Error())
        }
 }
 
@@ -282,6 +283,8 @@ func Test_req8(t *testing.T) {
        }
 }
 
+// panic
+/*
 func Test_req10(t *testing.T) {
        r := New()
        {
@@ -300,6 +303,7 @@ func Test_req10(t *testing.T) {
                }
        }
 }
+*/
 
 func Test_req3(t *testing.T) {
        r := New()
@@ -384,21 +388,28 @@ func Test_req3(t *testing.T) {
                }
        }
        {
+               var wg sync.WaitGroup
                rc, wc := io.Pipe()
+               wg.Add(1)
+               go func() {
+                       var buf []byte = make([]byte, 1<<16)
+                       n, _ := rc.Read(buf)
+                       d := buf[:n]
+                       if !bytes.Equal(d, []byte("abc强强强强")) {
+                               t.Error("io async fail", d)
+                       }
+                       wg.Done()
+               }()
                r.Reqf(Rval{
                        Url:              "http://" + addr + "/flate",
                        SaveToPipeWriter: wc,
                        NoResponse:       true,
                        Async:            true,
                })
+               r.Wait()
                if len(r.Respon) != 0 {
-                       t.Error("io async fail")
-               }
-               var buf []byte = make([]byte, 1<<16)
-               n, _ := rc.Read(buf)
-               d := buf[:n]
-               if !bytes.Equal(d, []byte("abc强强强强")) {
-                       t.Error("io async fail", d)
+                       t.Error("io async fail", r.Respon)
                }
+               wg.Wait()
        }
 }