From 4c1090fea332258eae453c9302e30be97e678b5b Mon Sep 17 00:00:00 2001 From: qydysky Date: Sat, 3 Jun 2023 01:42:52 +0800 Subject: [PATCH] add --- compress/Flate.go | 2 +- go.mod | 14 +- go.sum | 29 ++-- reqf/Reqf.go | 382 ++++++++++++++++++++++++++++------------------ reqf/Reqf_test.go | 27 +++- 5 files changed, 279 insertions(+), 175 deletions(-) diff --git a/compress/Flate.go b/compress/Flate.go index ee4839f..379952e 100644 --- a/compress/Flate.go +++ b/compress/Flate.go @@ -14,10 +14,10 @@ func InFlate(byteS []byte, level int) ([]byte, error) { if err != nil { return buf.Bytes(), err } - defer flateWrite.Close() // 写入待压缩内容 flateWrite.Write(byteS) flateWrite.Flush() + flateWrite.Close() return buf.Bytes(), nil } diff --git a/go.mod b/go.mod index 4b72ab0..ba4797a 100644 --- a/go.mod +++ b/go.mod @@ -16,17 +16,17 @@ require ( require ( github.com/google/uuid v1.3.0 // indirect github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 // indirect - github.com/mattn/go-isatty v0.0.18 // indirect + github.com/mattn/go-isatty v0.0.19 // indirect github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect github.com/tklauser/go-sysconf v0.3.11 // indirect - github.com/tklauser/numcpus v0.6.0 // indirect - github.com/yusufpapurcu/wmi v1.2.2 // indirect + github.com/tklauser/numcpus v0.6.1 // indirect + github.com/yusufpapurcu/wmi v1.2.3 // indirect golang.org/x/mod v0.10.0 // indirect - golang.org/x/tools v0.8.0 // indirect + golang.org/x/tools v0.9.3 // indirect lukechampine.com/uint128 v1.3.0 // indirect modernc.org/cc/v3 v3.40.0 // indirect modernc.org/ccgo/v3 v3.16.13 // indirect - modernc.org/libc v1.22.5 // indirect + modernc.org/libc v1.22.6 // indirect modernc.org/mathutil v1.5.0 // indirect modernc.org/memory v1.5.0 // indirect modernc.org/opt v0.1.3 // indirect @@ -38,8 +38,8 @@ require ( github.com/dustin/go-humanize v1.0.1 github.com/go-ole/go-ole v1.2.6 // indirect github.com/stretchr/testify v1.8.2 // indirect - golang.org/x/net v0.9.0 // indirect - golang.org/x/sys v0.7.0 // indirect + golang.org/x/net v0.10.0 // indirect + golang.org/x/sys v0.8.0 // indirect modernc.org/sqlite v1.22.1 ) diff --git a/go.sum b/go.sum index d786af2..976bffd 100644 --- a/go.sum +++ b/go.sum @@ -17,8 +17,8 @@ github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 h1:Z9n2FFNU github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51/go.mod h1:CzGEWj7cYgsdH8dAjBGEr58BoE7ScuLd+fwFZ44+/x8= github.com/klauspost/compress v1.16.5 h1:IFV2oUNUzZaz+XyusxpLzpzS8Pt5rh0Z16For/djlyI= github.com/klauspost/compress v1.16.5/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= -github.com/mattn/go-isatty v0.0.18 h1:DOKFKCQ7FNG2L1rbrmstDN4QVRdS89Nkh85u68Uwp98= -github.com/mattn/go-isatty v0.0.18/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= +github.com/mattn/go-isatty v0.0.19 h1:JITubQf0MOLdlGRuRq+jtsDlekdYPia9ZFsB8h/APPA= +github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/mattn/go-sqlite3 v1.14.16 h1:yOQRA0RpS5PFz/oikGwBEqvAWhWg5ufRz4ETLjwpU1Y= github.com/miekg/dns v1.1.54 h1:5jon9mWcb0sFJGpnI99tOMhCPyJ+RPVz5b63MQG0VWI= github.com/miekg/dns v1.1.54/go.mod h1:uInx36IzPl7FYnDcMeVWxj9byh7DutNykX4G9Sj60FY= @@ -42,24 +42,25 @@ github.com/thedevsaddam/gojsonq/v2 v2.5.2 h1:CoMVaYyKFsVj6TjU6APqAhAvC07hTI6IQen github.com/thedevsaddam/gojsonq/v2 v2.5.2/go.mod h1:bv6Xa7kWy82uT0LnXPE2SzGqTj33TAEeR560MdJkiXs= github.com/tklauser/go-sysconf v0.3.11 h1:89WgdJhk5SNwJfu+GKyYveZ4IaJ7xAkecBo+KdJV0CM= github.com/tklauser/go-sysconf v0.3.11/go.mod h1:GqXfhXY3kiPa0nAXPDIQIWzJbMCB7AmcWpGR8lSZfqI= -github.com/tklauser/numcpus v0.6.0 h1:kebhY2Qt+3U6RNK7UqpYNA+tJ23IBEGKkB7JQBfDYms= github.com/tklauser/numcpus v0.6.0/go.mod h1:FEZLMke0lhOUG6w2JadTzp0a+Nl8PF/GFkQ5UVIcaL4= -github.com/yusufpapurcu/wmi v1.2.2 h1:KBNDSne4vP5mbSWnJbO+51IMOXJB67QiYCSBrubbPRg= -github.com/yusufpapurcu/wmi v1.2.2/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0= +github.com/tklauser/numcpus v0.6.1 h1:ng9scYS7az0Bk4OZLvrNXNSAO2Pxr1XXRAPyjhIx+Fk= +github.com/tklauser/numcpus v0.6.1/go.mod h1:1XfjsgE2zo8GVw7POkMbHENHzVg3GzmoZ9fESEdAacY= +github.com/yusufpapurcu/wmi v1.2.3 h1:E1ctvB7uKFMOJw3fdOW32DwGE9I7t++CRUEMKvFoFiw= +github.com/yusufpapurcu/wmi v1.2.3/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0= golang.org/x/mod v0.10.0 h1:lFO9qtOdlre5W1jxS3r/4szv2/6iXxScdzjoBMXNhYk= golang.org/x/mod v0.10.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= -golang.org/x/net v0.9.0 h1:aWJ/m6xSmxWBx+V0XRHTlrYrPG56jKsLdTFmsSsCzOM= -golang.org/x/net v0.9.0/go.mod h1:d48xBJpPfHeWQsugry2m+kC02ZBRGRgulfHnEXEuWns= -golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o= +golang.org/x/net v0.10.0 h1:X2//UzNDwYmtCLn7To6G58Wr6f5ahEAQgKNzv9Y951M= +golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= +golang.org/x/sync v0.2.0 h1:PUR+T4wwASmuSTYdKjYHI5TD22Wy5ogLU5qZCOLxBrI= golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.2.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.7.0 h1:3jlCCIQZPdOYu1h8BkNvLz8Kgwtae2cagcG/VamtZRU= -golang.org/x/sys v0.7.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.8.0 h1:EBmGv8NaZBZTWvrbjNoL6HVt+IVy3QDQpJs7VRIw3tU= +golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/text v0.9.0 h1:2sjJmO8cDvYveuX97RDLsxlyUxLl+GHoLxBiRdHllBE= golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= -golang.org/x/tools v0.8.0 h1:vSDcovVPld282ceKgDimkRSC8kpaH1dgyc9UMzlt84Y= -golang.org/x/tools v0.8.0/go.mod h1:JxBZ99ISMI5ViVkT1tr6tdNmXeTrcpVSD3vZ1RsRdN4= +golang.org/x/tools v0.9.3 h1:Gn1I8+64MsuTb/HpH+LmQtNas23LhUVr3rYZ0eKuaMM= +golang.org/x/tools v0.9.3/go.mod h1:owI94Op576fPu3cIGQeHs3joujW/2Oc6MtlxbF5dfNc= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= @@ -72,8 +73,8 @@ modernc.org/ccgo/v3 v3.16.13 h1:Mkgdzl46i5F/CNR/Kj80Ri59hC8TKAhZrYSaqvkwzUw= modernc.org/ccgo/v3 v3.16.13/go.mod h1:2Quk+5YgpImhPjv2Qsob1DnZ/4som1lJTodubIcoUkY= modernc.org/ccorpus v1.11.6 h1:J16RXiiqiCgua6+ZvQot4yUuUy8zxgqbqEEUuGPlISk= modernc.org/httpfs v1.0.6 h1:AAgIpFZRXuYnkjftxTAZwMIiwEqAfk8aVB2/oA6nAeM= -modernc.org/libc v1.22.5 h1:91BNch/e5B0uPbJFgqbxXuOnxBQjlS//icfQEGmvyjE= -modernc.org/libc v1.22.5/go.mod h1:jj+Z7dTNX8fBScMVNRAYZ/jF91K8fdT2hYMThc3YjBY= +modernc.org/libc v1.22.6 h1:cbXU8R+A6aOjRuhsFh3nbDWXO/Hs4ClJRXYB11KmPDo= +modernc.org/libc v1.22.6/go.mod h1:jj+Z7dTNX8fBScMVNRAYZ/jF91K8fdT2hYMThc3YjBY= modernc.org/mathutil v1.5.0 h1:rV0Ko/6SfM+8G+yKiyI830l3Wuz1zRutdslNoQ0kfiQ= modernc.org/mathutil v1.5.0/go.mod h1:mZW8CKdRPY1v87qxC/wUdX5O1qDzXMP5TH3wjfpga6E= modernc.org/memory v1.5.0 h1:N+/8c5rE6EqugZwHii4IFsaJ7MUhoWX07J5tC/iI5Ds= diff --git a/reqf/Reqf.go b/reqf/Reqf.go index 37f00db..643091d 100644 --- a/reqf/Reqf.go +++ b/reqf/Reqf.go @@ -10,15 +10,16 @@ import ( "net/http" "net/url" "os" + "runtime" "strings" "sync" + "sync/atomic" "time" flate "compress/flate" gzip "compress/gzip" br "github.com/andybalholm/brotli" - signal "github.com/qydysky/part/signal" s "github.com/qydysky/part/strings" // "encoding/binary" ) @@ -30,10 +31,13 @@ type Rval struct { Proxy string Retry int SleepTime int + WriteLoopTO int JustResponseCode bool NoResponse bool - Async bool - Cookies []*http.Cookie + // 当Async为true时,Respon、Response必须在Wait()之后读取,否则有DATA RACE可能 + Async bool + Cookies []*http.Cookie + Ctx context.Context SaveToPath string SaveToChan chan []byte @@ -42,22 +46,39 @@ type Rval struct { Header map[string]string } +const ( + defaultUA = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/113.0.0.0 Safari/537.36" + defaultAccept = `text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8` + free = iota + running +) + +var ( + ErrEmptyUrl = errors.New("ErrEmptyUrl") + ErrNewRequest = errors.New("ErrNewRequest") + ErrClientDo = errors.New("ErrClientDo") + ErrResponFileCreate = errors.New("ErrResponFileCreate") + ErrWriteRes = errors.New("ErrWriteRes") + ErrReadRes = errors.New("ErrReadRes") +) + type Req struct { - Respon []byte - responBuf *bytes.Buffer - Response *http.Response - UsedTime time.Duration + // 当Async为true时,必须在Wait()之后读取,否则有DATA RACE可能 + Respon []byte + // 当Async为true时,必须在Wait()之后读取,否则有DATA RACE可能 + Response *http.Response + UsedTime time.Duration - cancelFs chan func() - cancel *signal.Signal - running *signal.Signal - isLive *signal.Signal + cancelP atomic.Pointer[func()] + ctx context.Context + state atomic.Int32 responFile *os.File + responBuf *bytes.Buffer err error + callTree string - init sync.RWMutex - l sync.Mutex + l sync.RWMutex } func New() *Req { @@ -65,102 +86,68 @@ func New() *Req { } func (t *Req) Reqf(val Rval) error { - t.isLive.Wait() t.l.Lock() - t.init.Lock() - - t.Respon = t.Respon[:0] - if t.responBuf == nil { - t.responBuf = new(bytes.Buffer) - } - t.Response = nil - t.UsedTime = 0 - t.cancelFs = make(chan func(), 5) - t.isLive = signal.Init() - t.cancel = signal.Init() - t.running = signal.Init() - t.responFile = nil - t.err = nil - go func() { - cancel, cancelFin := t.cancel.WaitC() - defer cancelFin() - running, runningFin := t.running.WaitC() - defer runningFin() - - select { - case <-cancel: - for len(t.cancelFs) != 0 { - (<-t.cancelFs)() - } - case <-running: - } - }() + t.state.Store(running) - t.init.Unlock() + t.prepare(&val) - go func() { + // 同步 + if !val.Async { beginTime := time.Now() - _val := val - for SleepTime, Retry := _val.SleepTime, _val.Retry; Retry >= 0; Retry -= 1 { - for len(t.cancelFs) != 0 { - <-t.cancelFs + for i := 0; i <= val.Retry; i++ { + ctx, cancle := t.prepareRes(&val) + t.err = t.Reqf_1(ctx, val) + if cancle != nil { + cancle() } - - t.err = t.Reqf_1(_val) if t.err == nil || IsCancel(t.err) { break } - time.Sleep(time.Duration(SleepTime) * time.Millisecond) + if val.SleepTime != 0 { + time.Sleep(time.Duration(val.SleepTime * int(time.Millisecond))) + } } - t.UsedTime = time.Since(beginTime) - t.running.Done() - }() - if !val.Async { - t.Wait() - if val.SaveToChan != nil { - close(val.SaveToChan) - } - if t.responFile != nil { - t.responFile.Close() - } - if val.SaveToPipeWriter != nil { - val.SaveToPipeWriter.Close() - } - t.cancel.Done() - t.running.Done() + t.updateUseDur(beginTime) + t.clean(&val) + t.state.Store(free) t.l.Unlock() - t.isLive.Done() return t.err - } else { - go func() { - t.Wait() - if val.SaveToChan != nil { - close(val.SaveToChan) + } + + //异步 + go func() { + beginTime := time.Now() + + for i := 0; i <= val.Retry; i++ { + ctx, cancle := t.prepareRes(&val) + t.err = t.Reqf_1(ctx, val) + if cancle != nil { + cancle() } - if t.responFile != nil { - t.responFile.Close() + if t.err == nil || IsCancel(t.err) { + break } - if val.SaveToPipeWriter != nil { - val.SaveToPipeWriter.Close() + if val.SleepTime != 0 { + time.Sleep(time.Duration(val.SleepTime * int(time.Millisecond))) } - t.cancel.Done() - t.running.Done() - t.l.Unlock() - t.isLive.Done() - }() - } + } + + t.updateUseDur(beginTime) + t.clean(&val) + t.state.Store(free) + t.l.Unlock() + }() return nil } -func (t *Req) Reqf_1(val Rval) (err error) { +func (t *Req) Reqf_1(ctx context.Context, val Rval) (err error) { var ( Header map[string]string = val.Header + client http.Client ) - var client http.Client - if Header == nil { Header = make(map[string]string) } @@ -180,7 +167,7 @@ func (t *Req) Reqf_1(val Rval) (err error) { } if val.Url == "" { - return errors.New("url is empty") + return ErrEmptyUrl } Method := "GET" @@ -193,24 +180,18 @@ func (t *Req) Reqf_1(val Rval) (err error) { } } - cx, cancel := context.WithCancel(context.Background()) - if val.Timeout > 0 { - cx, cancel = context.WithTimeout(cx, time.Duration(val.Timeout)*time.Millisecond) - } - t.cancelFs <- cancel - req, e := http.NewRequest(Method, val.Url, body) if e != nil { - panic(e) + return errors.Join(ErrNewRequest, e) } - req = req.WithContext(cx) + req = req.WithContext(ctx) for _, v := range val.Cookies { req.AddCookie(v) } if _, ok := Header["Accept"]; !ok { - Header["Accept"] = `text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8` + Header["Accept"] = defaultAccept } if _, ok := Header["Connection"]; !ok { Header["Connection"] = "keep-alive" @@ -222,23 +203,17 @@ func (t *Req) Reqf_1(val Rval) (err error) { Header["Accept-Encoding"] = "identity" } if _, ok := Header["User-Agent"]; !ok { - Header["User-Agent"] = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/75.0.3770.142 Safari/537.36" + Header["User-Agent"] = defaultUA } for k, v := range Header { req.Header.Set(k, v) } - if !t.cancel.Islive() { - err = context.Canceled - return - } - resp, e := client.Do(req) if e != nil { - err = e - return + return errors.Join(ErrClientDo, e) } if v, ok := Header["Connection"]; ok && strings.ToLower(v) != "keep-alive" { @@ -258,61 +233,64 @@ func (t *Req) Reqf_1(val Rval) (err error) { var ws []io.Writer if val.SaveToPath != "" { t.responFile, e = os.Create(val.SaveToPath) - if err != nil { + if e != nil { t.responFile.Close() - err = e - return + return errors.Join(err, ErrResponFileCreate, e) } ws = append(ws, t.responFile) - t.cancelFs <- func() { t.responFile.Close() } } if val.SaveToPipeWriter != nil { ws = append(ws, val.SaveToPipeWriter) - t.cancelFs <- func() { val.SaveToPipeWriter.Close() } } if !val.NoResponse { + //will clear t.Respon t.responBuf.Reset() ws = append(ws, t.responBuf) } w := io.MultiWriter(ws...) - var resReader io.Reader + var resReadCloser io.ReadCloser if compress_type := resp.Header[`Content-Encoding`]; len(compress_type) != 0 { switch compress_type[0] { case `br`: - resReader = br.NewReader(resp.Body) + resReadCloser = rwc{r: br.NewReader(resp.Body).Read} case `gzip`: - resReader, _ = gzip.NewReader(resp.Body) + resReadCloser, _ = gzip.NewReader(resp.Body) case `deflate`: - resReader = flate.NewReader(resp.Body) + resReadCloser = flate.NewReader(resp.Body) default: - resReader = resp.Body + resReadCloser = resp.Body } } else { - resReader = resp.Body + resReadCloser = resp.Body } - t.cancelFs <- func() { resp.Body.Close() } - - buf := make([]byte, 512) - for { - if n, e := resReader.Read(buf); n != 0 { - w.Write(buf[:n]) - if val.SaveToChan != nil { - val.SaveToChan <- buf[:n] + writeLoopTO := val.WriteLoopTO + if writeLoopTO == 0 { + writeLoopTO = 1000 + } + rwc := t.withCtxTO(ctx, time.Duration(int(time.Millisecond)*writeLoopTO), w, resReadCloser) + defer rwc.Close() + + for buf := make([]byte, 512); true; { + if n, e := rwc.Read(buf); n != 0 { + if wn, we := rwc.Write(buf[:n]); wn != 0 { + if val.SaveToChan != nil { + val.SaveToChan <- buf[:n] + } + } else if we != nil { + if !errors.Is(e, io.EOF) { + err = errors.Join(err, ErrWriteRes, e) + } + break } } else if e != nil { if !errors.Is(e, io.EOF) { - err = e + err = errors.Join(err, ErrReadRes, e) } break } - - if !t.cancel.Islive() { - err = context.Canceled - break - } } resp.Body.Close() @@ -324,31 +302,145 @@ func (t *Req) Reqf_1(val Rval) (err error) { return } -func (t *Req) Wait() error { - t.init.RLock() - defer t.init.RUnlock() +func (t *Req) Wait() (err error) { + t.l.RLock() + err = t.err + t.l.RUnlock() + return +} - t.running.Wait() - return t.err +func (t *Req) Close() { t.Cancel() } +func (t *Req) Cancel() { + if p := t.cancelP.Load(); p != nil { + (*p)() + } } -func (t *Req) Cancel() { t.Close() } +func (t *Req) IsLive() bool { + return t.state.Load() == running +} -func (t *Req) Close() { - t.init.RLock() - defer t.init.RUnlock() +func (t *Req) prepareRes(val *Rval) (context.Context, context.CancelFunc) { + if !val.NoResponse { + if t.responBuf == nil { + t.responBuf = new(bytes.Buffer) + t.Respon = t.responBuf.Bytes() + } else { + t.responBuf.Reset() + } + } else { + t.Respon = []byte{} + t.responBuf = nil + } + t.Response = nil + t.err = nil + if val.Timeout > 0 { + return context.WithTimeout(t.ctx, time.Duration(val.Timeout*int(time.Millisecond))) + } + return t.ctx, nil +} - if !t.cancel.Islive() { - return +func (t *Req) prepare(val *Rval) { + t.UsedTime = 0 + t.responFile = nil + t.callTree = "" + for i := 2; true; i++ { + if pc, file, line, ok := runtime.Caller(i); !ok { + break + } else { + t.callTree += fmt.Sprintf("call by %s\n\t%s:%d\n", runtime.FuncForPC(pc).Name(), file, line) + } } - t.cancel.Done() + var cancel func() + if val.Ctx != nil { + t.ctx, cancel = context.WithCancel(val.Ctx) + } else { + t.ctx, cancel = context.WithCancel(context.Background()) + } + t.cancelP.Store(&cancel) } -func (t *Req) IsLive() bool { - t.init.RLock() - defer t.init.RUnlock() +func (t *Req) clean(val *Rval) { + if p := t.cancelP.Load(); p != nil { + (*p)() + } + if val.SaveToChan != nil { + close(val.SaveToChan) + } + if t.responFile != nil { + t.responFile.Close() + } + if val.SaveToPipeWriter != nil { + val.SaveToPipeWriter.Close() + } +} + +func (t *Req) updateUseDur(u time.Time) { + t.UsedTime = time.Since(u) +} + +type rwc struct { + r func(p []byte) (n int, err error) + w func(p []byte) (n int, err error) + c func() error +} - return t.isLive.Islive() +func (t rwc) Write(p []byte) (n int, err error) { + return t.w(p) +} +func (t rwc) Read(p []byte) (n int, err error) { + return t.r(p) +} +func (t rwc) Close() error { + return t.c() +} + +func (t *Req) withCtxTO(ctx context.Context, to time.Duration, w io.Writer, r io.Reader) io.ReadWriteCloser { + var chanw = make(chan struct{}, 1) + + go func(callTree string) { + var timer = time.NewTicker(to) + defer timer.Stop() + for { + select { + case <-ctx.Done(): + if len(chanw) != 0 { + panic(fmt.Sprintf("write blocking after %v, goruntime leak \n%v", to, callTree)) + } + return + case <-timer.C: + if len(chanw) != 0 { + panic(fmt.Sprintf("write blocking after %v, goruntime leak \n%v", to, callTree)) + } + } + } + }(t.callTree) + + return rwc{ + func(p []byte) (n int, err error) { + if n, err = r.Read(p); n != 0 { + select { + case <-ctx.Done(): + case chanw <- struct{}{}: + default: + } + } + return + }, + func(p []byte) (n int, err error) { + if n, err = w.Write(p); n != 0 { + select { + case <-chanw: + default: + } + } + return + }, + func() error { + close(chanw) + return nil + }, + } } func IsTimeout(e error) bool { diff --git a/reqf/Reqf_test.go b/reqf/Reqf_test.go index 6133ff6..b48adf8 100644 --- a/reqf/Reqf_test.go +++ b/reqf/Reqf_test.go @@ -7,6 +7,7 @@ import ( "io" "net/http" "strconv" + "sync" "testing" "time" @@ -95,7 +96,7 @@ func Test_req13(t *testing.T) { Retry: 2, }) if e.Error() != "403 Forbidden" { - t.Fatal() + t.Fatal(e.Error()) } } @@ -282,6 +283,8 @@ func Test_req8(t *testing.T) { } } +// panic +/* func Test_req10(t *testing.T) { r := New() { @@ -300,6 +303,7 @@ func Test_req10(t *testing.T) { } } } +*/ func Test_req3(t *testing.T) { r := New() @@ -384,21 +388,28 @@ func Test_req3(t *testing.T) { } } { + var wg sync.WaitGroup rc, wc := io.Pipe() + wg.Add(1) + go func() { + var buf []byte = make([]byte, 1<<16) + n, _ := rc.Read(buf) + d := buf[:n] + if !bytes.Equal(d, []byte("abc强强强强")) { + t.Error("io async fail", d) + } + wg.Done() + }() r.Reqf(Rval{ Url: "http://" + addr + "/flate", SaveToPipeWriter: wc, NoResponse: true, Async: true, }) + r.Wait() if len(r.Respon) != 0 { - t.Error("io async fail") - } - var buf []byte = make([]byte, 1<<16) - n, _ := rc.Read(buf) - d := buf[:n] - if !bytes.Equal(d, []byte("abc强强强强")) { - t.Error("io async fail", d) + t.Error("io async fail", r.Respon) } + wg.Wait() } } -- 2.39.2