PostStr string
Proxy string
Retry int
- // Millisecond
+ // Millisecond,总体请求超时,context.DeadlineExceeded,IsTimeout()==true
Timeout int
- // Millisecond
+ // Millisecond,Retry重试间隔
SleepTime int
- // Deprecated: use Timeout
- WriteLoopTO int
- JustResponseCode bool
- NoResponse bool
+ // Millisecond,空闲连接释放,默认1min
+ IdleConnTimeout int
+ // Millisecond,无响应超时,ErrClientDo,IsTimeout()==true
+ ResponseHeaderTimeout int
+ // Millisecond,拷贝响应超时,ErrCopyRes
+ CopyResponseTimeout int
+ JustResponseCode bool
+ NoResponse bool
// 当Async为true时,Respon、Response必须在Wait()之后读取,否则有DATA RACE可能
Async bool
Cookies []*http.Cookie
ErrNewRequest = pe.Action("ErrNewRequest")
ErrClientDo = pe.Action("ErrClientDo")
ErrResponFileCreate = pe.Action("ErrResponFileCreate")
- ErrWriteRes = pe.Action("ErrWriteRes")
- ErrReadRes = pe.Action("ErrReadRes")
+ ErrCopyRes = pe.Action("ErrCopyRes")
ErrPostStrOrRawPipe = pe.Action("ErrPostStrOrRawPipe")
ErrNoDate = pe.Action("ErrNoDate")
)
client *http.Client
responFile *os.File
responBuf *bytes.Buffer
+ reqBody io.Reader
+ rwTO *time.Timer
err error
callTree string
t.l.Lock()
t.state.Store(running)
- t.prepare(&val)
-
- if !val.Async {
+ if ctx, cancel, e := t.prepare(&val); e != nil {
+ return e
+ } else if !val.Async {
// 同步
- t.reqfM(val.Ctx, val)
+ t.reqfM(ctx, cancel, val)
return t.err
} else {
//异步
- go t.reqfM(val.Ctx, val)
+ go t.reqfM(ctx, cancel, val)
}
return nil
}
-func (t *Req) reqfM(ctxMain context.Context, val Rval) {
+func (t *Req) reqfM(ctx context.Context, ctxf1 context.CancelCauseFunc, val Rval) {
beginTime := time.Now()
for i := 0; i <= val.Retry; i++ {
- ctx, cancel := t.prepareRes(ctxMain, &val)
+ t.prepareRes(&val)
t.err = t.reqf(ctx, val)
- cancel()
if t.err == nil || IsCancel(t.err) {
break
}
}
}
+ ctxf1(nil)
t.updateUseDur(beginTime)
t.clean(&val)
t.state.Store(free)
}
func (t *Req) reqf(ctx context.Context, val Rval) (err error) {
- if t.client.Transport == nil {
- t.client.Transport = &http.Transport{}
- }
- if val.Proxy != "" {
- t.client.Transport.(*http.Transport).Proxy = func(_ *http.Request) (*url.URL, error) {
- return url.Parse(val.Proxy)
- }
- }
- t.client.Transport.(*http.Transport).IdleConnTimeout = time.Minute
-
- if val.Url == "" {
- return ErrEmptyUrl.New()
- }
-
- var body io.Reader
- if len(val.PostStr) > 0 && val.RawPipe != nil {
- return ErrPostStrOrRawPipe.New()
- }
- if val.Retry != 0 && val.RawPipe != nil {
- return ErrCantRetry.New()
- }
- if val.RawPipe != nil {
- body = val.RawPipe
- }
-
- if len(val.PostStr) > 0 {
- body = strings.NewReader(val.PostStr)
- }
-
- req, e := http.NewRequestWithContext(ctx, val.Method, val.Url, body)
+ req, e := http.NewRequestWithContext(ctx, val.Method, val.Url, t.reqBody)
if e != nil {
return pe.Join(ErrNewRequest.New(), e)
}
// io copy
{
+ rwTODra := time.Duration(val.CopyResponseTimeout) * time.Millisecond
w := io.MultiWriter(ws...)
for {
+ if rwTODra > 0 {
+ t.rwTO.Reset(rwTODra)
+ }
n, e := resReadCloser.Read(t.copyResBuf)
if n != 0 {
+ if rwTODra > 0 {
+ t.rwTO.Reset(rwTODra)
+ }
n, e := w.Write(t.copyResBuf[:n])
if n == 0 && e != nil {
if !errors.Is(e, io.EOF) {
break
}
}
+ t.rwTO.Stop()
}
if t.responBuf != nil {
return t.state.Load() == running
}
-func (t *Req) prepareRes(ctx context.Context, val *Rval) (ctx1 context.Context, ctxf1 context.CancelFunc) {
+func (t *Req) prepareRes(val *Rval) {
if !val.NoResponse {
if t.responBuf == nil {
t.responBuf = new(bytes.Buffer)
t.Response = nil
t.err = nil
- if val.Timeout > 0 {
- ctx1, ctxf1 = context.WithTimeout(ctx, time.Duration(val.Timeout)*time.Millisecond)
- } else {
- ctx1, ctxf1 = context.WithCancel(ctx)
+ if reader, ok := t.reqBody.(*strings.Reader); ok {
+ reader.Seek(0, io.SeekStart)
}
return
}
-func (t *Req) prepare(val *Rval) {
+func (t *Req) prepare(val *Rval) (ctx1 context.Context, ctxf1 context.CancelCauseFunc, e error) {
+ if val.Url == "" {
+ e = ErrEmptyUrl.New()
+ return
+ }
+ if len(val.PostStr) > 0 && val.RawPipe != nil {
+ e = ErrPostStrOrRawPipe.New()
+ return
+ }
+ if val.Retry != 0 && val.RawPipe != nil {
+ e = ErrCantRetry.New()
+ return
+ }
+
t.UsedTime = 0
t.responFile = nil
t.callTree = ""
if t.client == nil {
t.client = &http.Client{}
}
+ if t.client.Transport == nil {
+ t.client.Transport = &http.Transport{}
+ }
+ if val.Proxy != "" {
+ t.client.Transport.(*http.Transport).Proxy = func(_ *http.Request) (*url.URL, error) {
+ return url.Parse(val.Proxy)
+ }
+ }
+ if val.IdleConnTimeout == 0 {
+ t.client.Transport.(*http.Transport).IdleConnTimeout = time.Minute
+ } else if val.IdleConnTimeout > 0 {
+ t.client.Transport.(*http.Transport).IdleConnTimeout = time.Duration(val.IdleConnTimeout) * time.Millisecond
+ }
+ if val.ResponseHeaderTimeout > 0 {
+ t.client.Transport.(*http.Transport).ResponseHeaderTimeout = time.Duration(val.ResponseHeaderTimeout) * time.Millisecond
+ }
if val.Ctx == nil {
val.Ctx = context.Background()
}
val.Method = "GET"
}
}
+ if val.RawPipe != nil {
+ t.reqBody = val.RawPipe
+ }
+ if len(val.PostStr) > 0 {
+ t.reqBody = strings.NewReader(val.PostStr)
+ }
+ {
+ var ctx context.Context
+ if val.Ctx != nil {
+ ctx = val.Ctx
+ } else {
+ ctx = context.Background()
+ }
+ if val.Timeout > 0 {
+ ctx, _ = context.WithTimeout(ctx, time.Duration(val.Timeout)*time.Millisecond)
+ }
+ ctx1, ctxf1 = context.WithCancelCause(ctx)
+ }
+ if t.rwTO == nil {
+ t.rwTO = time.NewTimer(time.Duration(val.CopyResponseTimeout) * time.Millisecond)
+ }
+ t.rwTO.Stop()
+ if val.CopyResponseTimeout > 0 {
+ go func() {
+ select {
+ case <-t.rwTO.C:
+ ctxf1(ErrCopyRes)
+ case <-ctx1.Done():
+ }
+ }()
+ }
+ return
}
func (t *Req) clean(val *Rval) {
default:
w.Write([]byte{'0'})
flusher.Flush()
+ time.Sleep(time.Millisecond * 500)
}
}
},
w.Header().Set(k, v[0])
}
},
+ `/nores`: func(w http.ResponseWriter, r *http.Request) {
+ <-r.Context().Done()
+ },
})
time.Sleep(time.Second)
reuse.Reqf(Rval{
})
}
+func Test_7(t *testing.T) {
+ e := reuse.Reqf(Rval{
+ Url: "http://" + addr + "/nores",
+ ResponseHeaderTimeout: 500,
+ })
+ if !IsTimeout(e) {
+ t.Fatal(e)
+ }
+}
+
func Test_6(t *testing.T) {
reuse.Reqf(Rval{
Url: "http://" + addr + "/header",
}
}
+func Test15(t *testing.T) {
+ i, o := io.Pipe()
+
+ if e := reuse.Reqf(Rval{
+ Url: "http://" + addr + "/stream",
+ NoResponse: true,
+ SaveToPipe: pio.NewPipeRaw(i, o),
+ Async: true,
+ CopyResponseTimeout: 100,
+ }); e != nil {
+ t.Log(e)
+ }
+
+ go func() {
+ buf := make([]byte, 1<<8)
+ for {
+ if n, e := i.Read(buf); n != 0 {
+ continue
+ } else if e != nil {
+ break
+ }
+ }
+ }()
+
+ if !errors.Is(reuse.Wait(), ErrCopyRes) {
+ t.Fatal()
+ }
+}
+
func Test14(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
r := New()
if e := r.Reqf(Rval{
- Url: "http://" + addr + "/stream",
- Ctx: ctx,
- NoResponse: true,
- SaveToPipe: pio.NewPipeRaw(i, o),
- Async: true,
- WriteLoopTO: 5*1000*2 + 1,
+ Url: "http://" + addr + "/stream",
+ Ctx: ctx,
+ NoResponse: true,
+ SaveToPipe: pio.NewPipeRaw(i, o),
+ Async: true,
+ CopyResponseTimeout: 5*1000*2 + 1,
}); e != nil {
t.Log(e)
}