github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51/go.mod h1:CzGEWj7cYgsdH8dAjBGEr58BoE7ScuLd+fwFZ44+/x8=
github.com/klauspost/compress v1.16.5 h1:IFV2oUNUzZaz+XyusxpLzpzS8Pt5rh0Z16For/djlyI=
github.com/klauspost/compress v1.16.5/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
-github.com/mattn/go-isatty v0.0.18 h1:DOKFKCQ7FNG2L1rbrmstDN4QVRdS89Nkh85u68Uwp98=
-github.com/mattn/go-isatty v0.0.18/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
+github.com/mattn/go-isatty v0.0.19 h1:JITubQf0MOLdlGRuRq+jtsDlekdYPia9ZFsB8h/APPA=
+github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/mattn/go-sqlite3 v1.14.16 h1:yOQRA0RpS5PFz/oikGwBEqvAWhWg5ufRz4ETLjwpU1Y=
github.com/miekg/dns v1.1.54 h1:5jon9mWcb0sFJGpnI99tOMhCPyJ+RPVz5b63MQG0VWI=
github.com/miekg/dns v1.1.54/go.mod h1:uInx36IzPl7FYnDcMeVWxj9byh7DutNykX4G9Sj60FY=
github.com/thedevsaddam/gojsonq/v2 v2.5.2/go.mod h1:bv6Xa7kWy82uT0LnXPE2SzGqTj33TAEeR560MdJkiXs=
github.com/tklauser/go-sysconf v0.3.11 h1:89WgdJhk5SNwJfu+GKyYveZ4IaJ7xAkecBo+KdJV0CM=
github.com/tklauser/go-sysconf v0.3.11/go.mod h1:GqXfhXY3kiPa0nAXPDIQIWzJbMCB7AmcWpGR8lSZfqI=
-github.com/tklauser/numcpus v0.6.0 h1:kebhY2Qt+3U6RNK7UqpYNA+tJ23IBEGKkB7JQBfDYms=
github.com/tklauser/numcpus v0.6.0/go.mod h1:FEZLMke0lhOUG6w2JadTzp0a+Nl8PF/GFkQ5UVIcaL4=
-github.com/yusufpapurcu/wmi v1.2.2 h1:KBNDSne4vP5mbSWnJbO+51IMOXJB67QiYCSBrubbPRg=
-github.com/yusufpapurcu/wmi v1.2.2/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0=
+github.com/tklauser/numcpus v0.6.1 h1:ng9scYS7az0Bk4OZLvrNXNSAO2Pxr1XXRAPyjhIx+Fk=
+github.com/tklauser/numcpus v0.6.1/go.mod h1:1XfjsgE2zo8GVw7POkMbHENHzVg3GzmoZ9fESEdAacY=
+github.com/yusufpapurcu/wmi v1.2.3 h1:E1ctvB7uKFMOJw3fdOW32DwGE9I7t++CRUEMKvFoFiw=
+github.com/yusufpapurcu/wmi v1.2.3/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0=
golang.org/x/mod v0.10.0 h1:lFO9qtOdlre5W1jxS3r/4szv2/6iXxScdzjoBMXNhYk=
golang.org/x/mod v0.10.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
-golang.org/x/net v0.9.0 h1:aWJ/m6xSmxWBx+V0XRHTlrYrPG56jKsLdTFmsSsCzOM=
-golang.org/x/net v0.9.0/go.mod h1:d48xBJpPfHeWQsugry2m+kC02ZBRGRgulfHnEXEuWns=
-golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o=
+golang.org/x/net v0.10.0 h1:X2//UzNDwYmtCLn7To6G58Wr6f5ahEAQgKNzv9Y951M=
+golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg=
+golang.org/x/sync v0.2.0 h1:PUR+T4wwASmuSTYdKjYHI5TD22Wy5ogLU5qZCOLxBrI=
golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.2.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
-golang.org/x/sys v0.7.0 h1:3jlCCIQZPdOYu1h8BkNvLz8Kgwtae2cagcG/VamtZRU=
-golang.org/x/sys v0.7.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.8.0 h1:EBmGv8NaZBZTWvrbjNoL6HVt+IVy3QDQpJs7VRIw3tU=
+golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/text v0.9.0 h1:2sjJmO8cDvYveuX97RDLsxlyUxLl+GHoLxBiRdHllBE=
golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
-golang.org/x/tools v0.8.0 h1:vSDcovVPld282ceKgDimkRSC8kpaH1dgyc9UMzlt84Y=
-golang.org/x/tools v0.8.0/go.mod h1:JxBZ99ISMI5ViVkT1tr6tdNmXeTrcpVSD3vZ1RsRdN4=
+golang.org/x/tools v0.9.3 h1:Gn1I8+64MsuTb/HpH+LmQtNas23LhUVr3rYZ0eKuaMM=
+golang.org/x/tools v0.9.3/go.mod h1:owI94Op576fPu3cIGQeHs3joujW/2Oc6MtlxbF5dfNc=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
modernc.org/ccgo/v3 v3.16.13/go.mod h1:2Quk+5YgpImhPjv2Qsob1DnZ/4som1lJTodubIcoUkY=
modernc.org/ccorpus v1.11.6 h1:J16RXiiqiCgua6+ZvQot4yUuUy8zxgqbqEEUuGPlISk=
modernc.org/httpfs v1.0.6 h1:AAgIpFZRXuYnkjftxTAZwMIiwEqAfk8aVB2/oA6nAeM=
-modernc.org/libc v1.22.5 h1:91BNch/e5B0uPbJFgqbxXuOnxBQjlS//icfQEGmvyjE=
-modernc.org/libc v1.22.5/go.mod h1:jj+Z7dTNX8fBScMVNRAYZ/jF91K8fdT2hYMThc3YjBY=
+modernc.org/libc v1.22.6 h1:cbXU8R+A6aOjRuhsFh3nbDWXO/Hs4ClJRXYB11KmPDo=
+modernc.org/libc v1.22.6/go.mod h1:jj+Z7dTNX8fBScMVNRAYZ/jF91K8fdT2hYMThc3YjBY=
modernc.org/mathutil v1.5.0 h1:rV0Ko/6SfM+8G+yKiyI830l3Wuz1zRutdslNoQ0kfiQ=
modernc.org/mathutil v1.5.0/go.mod h1:mZW8CKdRPY1v87qxC/wUdX5O1qDzXMP5TH3wjfpga6E=
modernc.org/memory v1.5.0 h1:N+/8c5rE6EqugZwHii4IFsaJ7MUhoWX07J5tC/iI5Ds=
"net/http"
"net/url"
"os"
+ "runtime"
"strings"
"sync"
+ "sync/atomic"
"time"
flate "compress/flate"
gzip "compress/gzip"
br "github.com/andybalholm/brotli"
- signal "github.com/qydysky/part/signal"
s "github.com/qydysky/part/strings"
// "encoding/binary"
)
Proxy string
Retry int
SleepTime int
+ WriteLoopTO int
JustResponseCode bool
NoResponse bool
- Async bool
- Cookies []*http.Cookie
+ // 当Async为true时,Respon、Response必须在Wait()之后读取,否则有DATA RACE可能
+ Async bool
+ Cookies []*http.Cookie
+ Ctx context.Context
SaveToPath string
SaveToChan chan []byte
Header map[string]string
}
+const (
+ defaultUA = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/113.0.0.0 Safari/537.36"
+ defaultAccept = `text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8`
+ free = iota
+ running
+)
+
+var (
+ ErrEmptyUrl = errors.New("ErrEmptyUrl")
+ ErrNewRequest = errors.New("ErrNewRequest")
+ ErrClientDo = errors.New("ErrClientDo")
+ ErrResponFileCreate = errors.New("ErrResponFileCreate")
+ ErrWriteRes = errors.New("ErrWriteRes")
+ ErrReadRes = errors.New("ErrReadRes")
+)
+
type Req struct {
- Respon []byte
- responBuf *bytes.Buffer
- Response *http.Response
- UsedTime time.Duration
+ // 当Async为true时,必须在Wait()之后读取,否则有DATA RACE可能
+ Respon []byte
+ // 当Async为true时,必须在Wait()之后读取,否则有DATA RACE可能
+ Response *http.Response
+ UsedTime time.Duration
- cancelFs chan func()
- cancel *signal.Signal
- running *signal.Signal
- isLive *signal.Signal
+ cancelP atomic.Pointer[func()]
+ ctx context.Context
+ state atomic.Int32
responFile *os.File
+ responBuf *bytes.Buffer
err error
+ callTree string
- init sync.RWMutex
- l sync.Mutex
+ l sync.RWMutex
}
func New() *Req {
}
func (t *Req) Reqf(val Rval) error {
- t.isLive.Wait()
t.l.Lock()
- t.init.Lock()
-
- t.Respon = t.Respon[:0]
- if t.responBuf == nil {
- t.responBuf = new(bytes.Buffer)
- }
- t.Response = nil
- t.UsedTime = 0
- t.cancelFs = make(chan func(), 5)
- t.isLive = signal.Init()
- t.cancel = signal.Init()
- t.running = signal.Init()
- t.responFile = nil
- t.err = nil
- go func() {
- cancel, cancelFin := t.cancel.WaitC()
- defer cancelFin()
- running, runningFin := t.running.WaitC()
- defer runningFin()
-
- select {
- case <-cancel:
- for len(t.cancelFs) != 0 {
- (<-t.cancelFs)()
- }
- case <-running:
- }
- }()
+ t.state.Store(running)
- t.init.Unlock()
+ t.prepare(&val)
- go func() {
+ // 同步
+ if !val.Async {
beginTime := time.Now()
- _val := val
- for SleepTime, Retry := _val.SleepTime, _val.Retry; Retry >= 0; Retry -= 1 {
- for len(t.cancelFs) != 0 {
- <-t.cancelFs
+ for i := 0; i <= val.Retry; i++ {
+ ctx, cancle := t.prepareRes(&val)
+ t.err = t.Reqf_1(ctx, val)
+ if cancle != nil {
+ cancle()
}
-
- t.err = t.Reqf_1(_val)
if t.err == nil || IsCancel(t.err) {
break
}
- time.Sleep(time.Duration(SleepTime) * time.Millisecond)
+ if val.SleepTime != 0 {
+ time.Sleep(time.Duration(val.SleepTime * int(time.Millisecond)))
+ }
}
- t.UsedTime = time.Since(beginTime)
- t.running.Done()
- }()
- if !val.Async {
- t.Wait()
- if val.SaveToChan != nil {
- close(val.SaveToChan)
- }
- if t.responFile != nil {
- t.responFile.Close()
- }
- if val.SaveToPipeWriter != nil {
- val.SaveToPipeWriter.Close()
- }
- t.cancel.Done()
- t.running.Done()
+ t.updateUseDur(beginTime)
+ t.clean(&val)
+ t.state.Store(free)
t.l.Unlock()
- t.isLive.Done()
return t.err
- } else {
- go func() {
- t.Wait()
- if val.SaveToChan != nil {
- close(val.SaveToChan)
+ }
+
+ //异步
+ go func() {
+ beginTime := time.Now()
+
+ for i := 0; i <= val.Retry; i++ {
+ ctx, cancle := t.prepareRes(&val)
+ t.err = t.Reqf_1(ctx, val)
+ if cancle != nil {
+ cancle()
}
- if t.responFile != nil {
- t.responFile.Close()
+ if t.err == nil || IsCancel(t.err) {
+ break
}
- if val.SaveToPipeWriter != nil {
- val.SaveToPipeWriter.Close()
+ if val.SleepTime != 0 {
+ time.Sleep(time.Duration(val.SleepTime * int(time.Millisecond)))
}
- t.cancel.Done()
- t.running.Done()
- t.l.Unlock()
- t.isLive.Done()
- }()
- }
+ }
+
+ t.updateUseDur(beginTime)
+ t.clean(&val)
+ t.state.Store(free)
+ t.l.Unlock()
+ }()
return nil
}
-func (t *Req) Reqf_1(val Rval) (err error) {
+func (t *Req) Reqf_1(ctx context.Context, val Rval) (err error) {
var (
Header map[string]string = val.Header
+ client http.Client
)
- var client http.Client
-
if Header == nil {
Header = make(map[string]string)
}
}
if val.Url == "" {
- return errors.New("url is empty")
+ return ErrEmptyUrl
}
Method := "GET"
}
}
- cx, cancel := context.WithCancel(context.Background())
- if val.Timeout > 0 {
- cx, cancel = context.WithTimeout(cx, time.Duration(val.Timeout)*time.Millisecond)
- }
- t.cancelFs <- cancel
-
req, e := http.NewRequest(Method, val.Url, body)
if e != nil {
- panic(e)
+ return errors.Join(ErrNewRequest, e)
}
- req = req.WithContext(cx)
+ req = req.WithContext(ctx)
for _, v := range val.Cookies {
req.AddCookie(v)
}
if _, ok := Header["Accept"]; !ok {
- Header["Accept"] = `text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8`
+ Header["Accept"] = defaultAccept
}
if _, ok := Header["Connection"]; !ok {
Header["Connection"] = "keep-alive"
Header["Accept-Encoding"] = "identity"
}
if _, ok := Header["User-Agent"]; !ok {
- Header["User-Agent"] = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/75.0.3770.142 Safari/537.36"
+ Header["User-Agent"] = defaultUA
}
for k, v := range Header {
req.Header.Set(k, v)
}
- if !t.cancel.Islive() {
- err = context.Canceled
- return
- }
-
resp, e := client.Do(req)
if e != nil {
- err = e
- return
+ return errors.Join(ErrClientDo, e)
}
if v, ok := Header["Connection"]; ok && strings.ToLower(v) != "keep-alive" {
var ws []io.Writer
if val.SaveToPath != "" {
t.responFile, e = os.Create(val.SaveToPath)
- if err != nil {
+ if e != nil {
t.responFile.Close()
- err = e
- return
+ return errors.Join(err, ErrResponFileCreate, e)
}
ws = append(ws, t.responFile)
- t.cancelFs <- func() { t.responFile.Close() }
}
if val.SaveToPipeWriter != nil {
ws = append(ws, val.SaveToPipeWriter)
- t.cancelFs <- func() { val.SaveToPipeWriter.Close() }
}
if !val.NoResponse {
+ //will clear t.Respon
t.responBuf.Reset()
ws = append(ws, t.responBuf)
}
w := io.MultiWriter(ws...)
- var resReader io.Reader
+ var resReadCloser io.ReadCloser
if compress_type := resp.Header[`Content-Encoding`]; len(compress_type) != 0 {
switch compress_type[0] {
case `br`:
- resReader = br.NewReader(resp.Body)
+ resReadCloser = rwc{r: br.NewReader(resp.Body).Read}
case `gzip`:
- resReader, _ = gzip.NewReader(resp.Body)
+ resReadCloser, _ = gzip.NewReader(resp.Body)
case `deflate`:
- resReader = flate.NewReader(resp.Body)
+ resReadCloser = flate.NewReader(resp.Body)
default:
- resReader = resp.Body
+ resReadCloser = resp.Body
}
} else {
- resReader = resp.Body
+ resReadCloser = resp.Body
}
- t.cancelFs <- func() { resp.Body.Close() }
-
- buf := make([]byte, 512)
- for {
- if n, e := resReader.Read(buf); n != 0 {
- w.Write(buf[:n])
- if val.SaveToChan != nil {
- val.SaveToChan <- buf[:n]
+ writeLoopTO := val.WriteLoopTO
+ if writeLoopTO == 0 {
+ writeLoopTO = 1000
+ }
+ rwc := t.withCtxTO(ctx, time.Duration(int(time.Millisecond)*writeLoopTO), w, resReadCloser)
+ defer rwc.Close()
+
+ for buf := make([]byte, 512); true; {
+ if n, e := rwc.Read(buf); n != 0 {
+ if wn, we := rwc.Write(buf[:n]); wn != 0 {
+ if val.SaveToChan != nil {
+ val.SaveToChan <- buf[:n]
+ }
+ } else if we != nil {
+ if !errors.Is(e, io.EOF) {
+ err = errors.Join(err, ErrWriteRes, e)
+ }
+ break
}
} else if e != nil {
if !errors.Is(e, io.EOF) {
- err = e
+ err = errors.Join(err, ErrReadRes, e)
}
break
}
-
- if !t.cancel.Islive() {
- err = context.Canceled
- break
- }
}
resp.Body.Close()
return
}
-func (t *Req) Wait() error {
- t.init.RLock()
- defer t.init.RUnlock()
+func (t *Req) Wait() (err error) {
+ t.l.RLock()
+ err = t.err
+ t.l.RUnlock()
+ return
+}
- t.running.Wait()
- return t.err
+func (t *Req) Close() { t.Cancel() }
+func (t *Req) Cancel() {
+ if p := t.cancelP.Load(); p != nil {
+ (*p)()
+ }
}
-func (t *Req) Cancel() { t.Close() }
+func (t *Req) IsLive() bool {
+ return t.state.Load() == running
+}
-func (t *Req) Close() {
- t.init.RLock()
- defer t.init.RUnlock()
+func (t *Req) prepareRes(val *Rval) (context.Context, context.CancelFunc) {
+ if !val.NoResponse {
+ if t.responBuf == nil {
+ t.responBuf = new(bytes.Buffer)
+ t.Respon = t.responBuf.Bytes()
+ } else {
+ t.responBuf.Reset()
+ }
+ } else {
+ t.Respon = []byte{}
+ t.responBuf = nil
+ }
+ t.Response = nil
+ t.err = nil
+ if val.Timeout > 0 {
+ return context.WithTimeout(t.ctx, time.Duration(val.Timeout*int(time.Millisecond)))
+ }
+ return t.ctx, nil
+}
- if !t.cancel.Islive() {
- return
+func (t *Req) prepare(val *Rval) {
+ t.UsedTime = 0
+ t.responFile = nil
+ t.callTree = ""
+ for i := 2; true; i++ {
+ if pc, file, line, ok := runtime.Caller(i); !ok {
+ break
+ } else {
+ t.callTree += fmt.Sprintf("call by %s\n\t%s:%d\n", runtime.FuncForPC(pc).Name(), file, line)
+ }
}
- t.cancel.Done()
+ var cancel func()
+ if val.Ctx != nil {
+ t.ctx, cancel = context.WithCancel(val.Ctx)
+ } else {
+ t.ctx, cancel = context.WithCancel(context.Background())
+ }
+ t.cancelP.Store(&cancel)
}
-func (t *Req) IsLive() bool {
- t.init.RLock()
- defer t.init.RUnlock()
+func (t *Req) clean(val *Rval) {
+ if p := t.cancelP.Load(); p != nil {
+ (*p)()
+ }
+ if val.SaveToChan != nil {
+ close(val.SaveToChan)
+ }
+ if t.responFile != nil {
+ t.responFile.Close()
+ }
+ if val.SaveToPipeWriter != nil {
+ val.SaveToPipeWriter.Close()
+ }
+}
+
+func (t *Req) updateUseDur(u time.Time) {
+ t.UsedTime = time.Since(u)
+}
+
+type rwc struct {
+ r func(p []byte) (n int, err error)
+ w func(p []byte) (n int, err error)
+ c func() error
+}
- return t.isLive.Islive()
+func (t rwc) Write(p []byte) (n int, err error) {
+ return t.w(p)
+}
+func (t rwc) Read(p []byte) (n int, err error) {
+ return t.r(p)
+}
+func (t rwc) Close() error {
+ return t.c()
+}
+
+func (t *Req) withCtxTO(ctx context.Context, to time.Duration, w io.Writer, r io.Reader) io.ReadWriteCloser {
+ var chanw = make(chan struct{}, 1)
+
+ go func(callTree string) {
+ var timer = time.NewTicker(to)
+ defer timer.Stop()
+ for {
+ select {
+ case <-ctx.Done():
+ if len(chanw) != 0 {
+ panic(fmt.Sprintf("write blocking after %v, goruntime leak \n%v", to, callTree))
+ }
+ return
+ case <-timer.C:
+ if len(chanw) != 0 {
+ panic(fmt.Sprintf("write blocking after %v, goruntime leak \n%v", to, callTree))
+ }
+ }
+ }
+ }(t.callTree)
+
+ return rwc{
+ func(p []byte) (n int, err error) {
+ if n, err = r.Read(p); n != 0 {
+ select {
+ case <-ctx.Done():
+ case chanw <- struct{}{}:
+ default:
+ }
+ }
+ return
+ },
+ func(p []byte) (n int, err error) {
+ if n, err = w.Write(p); n != 0 {
+ select {
+ case <-chanw:
+ default:
+ }
+ }
+ return
+ },
+ func() error {
+ close(chanw)
+ return nil
+ },
+ }
}
func IsTimeout(e error) bool {