return
}
-func NewPipe() *IOpipe {
+func NewPipe() (u *IOpipe) {
r, w := io.Pipe()
- return &IOpipe{R: r, W: w}
+ u = &IOpipe{r: r, w: w}
+ u.ctx, u.ctxC = context.WithCancel(context.Background())
+ return
+}
+func NewPipeRaw(r *io.PipeReader, w *io.PipeWriter) (u *IOpipe) {
+ u = &IOpipe{r: r, w: w}
+ u.ctx, u.ctxC = context.WithCancel(context.Background())
+ return
}
type onceError struct {
}
type IOpipe struct {
- R *io.PipeReader
- W *io.PipeWriter
- re onceError
- we onceError
+ r *io.PipeReader
+ w *io.PipeWriter
+ ctx context.Context
+ ctxC context.CancelFunc
+ re onceError
+ we onceError
}
func (t *IOpipe) Write(p []byte) (n int, err error) {
- if t.W != nil {
- n, err = t.W.Write(p)
+ if t.w != nil {
+ n, err = t.w.Write(p)
if errors.Is(err, io.ErrClosedPipe) {
err = errors.Join(err, t.we.Load())
}
return
}
func (t *IOpipe) Read(p []byte) (n int, err error) {
- if t.R != nil {
- n, err = t.R.Read(p)
+ if t.r != nil {
+ n, err = t.r.Read(p)
if errors.Is(err, io.ErrClosedPipe) {
err = errors.Join(err, t.re.Load())
}
return
}
func (t *IOpipe) Close() (err error) {
- if t.W != nil {
- err = errors.Join(err, t.W.Close())
+ if t.w != nil {
+ err = errors.Join(err, t.w.Close())
}
- if t.R != nil {
- err = errors.Join(err, t.R.Close())
+ if t.r != nil {
+ err = errors.Join(err, t.r.Close())
}
+ t.ctxC()
return
}
func (t *IOpipe) CloseWithError(e error) (err error) {
- if t.W != nil {
+ if t.w != nil {
t.we.Store(e)
- err = errors.Join(err, t.W.CloseWithError(e))
+ err = errors.Join(err, t.w.CloseWithError(e))
}
- if t.R != nil {
+ if t.r != nil {
t.re.Store(e)
- err = errors.Join(err, t.R.CloseWithError(e))
+ err = errors.Join(err, t.r.CloseWithError(e))
}
+ t.ctxC()
return
}
+func (t *IOpipe) WithCtx(ctx context.Context) *IOpipe {
+ go func() {
+ select {
+ case <-ctx.Done():
+ if t.w != nil {
+ t.we.Store(ctx.Err())
+ t.w.CloseWithError(ctx.Err())
+ }
+ if t.r != nil {
+ t.re.Store(ctx.Err())
+ t.r.CloseWithError(ctx.Err())
+ }
+ case <-t.ctx.Done():
+ }
+ }()
+ return t
+}
type RWC struct {
R func(p []byte) (n int, err error)
}
}
+func WithCtxCopyNoCheck(ctx context.Context, copybuf []byte, w io.Writer, r io.Reader) error {
+ for {
+ n, e := r.Read(copybuf)
+ if n != 0 {
+ n, e := w.Write(copybuf[:n])
+ if n == 0 && e != nil {
+ if !errors.Is(e, io.EOF) {
+ return errors.Join(ErrWrite, e)
+ }
+ return nil
+ }
+ } else if e != nil {
+ if !errors.Is(e, io.EOF) {
+ return errors.Join(ErrRead, e)
+ }
+ return nil
+ }
+ }
+}
+
type CopyConfig struct {
BytePerLoop, MaxLoop, MaxByte, BytePerSec uint64
SkipByte int
package part
import (
+ "context"
"io"
"sync/atomic"
return nil
}
+func (t RawReqRes) WithCtx(ctx context.Context) {
+ if !t.resC.Swap(true) {
+ t.res.WithCtx(ctx)
+ }
+}
func (t RawReqRes) ResCloseWithError(e error) error {
if !t.resC.Swap(true) {
return t.res.CloseWithError(e)
flate "compress/flate"
gzip "compress/gzip"
+ "github.com/dustin/go-humanize"
br "github.com/qydysky/brotli"
pe "github.com/qydysky/part/errors"
pio "github.com/qydysky/part/io"
Timeout int
// Millisecond
SleepTime int
- // Millisecond
+ // Deprecated: use Timeout
WriteLoopTO int
JustResponseCode bool
NoResponse bool
Response *http.Response
UsedTime time.Duration
- cancelP atomic.Pointer[context.CancelFunc]
- state atomic.Int32
+ state atomic.Int32
+ client *http.Client
responFile *os.File
responBuf *bytes.Buffer
err error
t.l.Lock()
t.state.Store(running)
- pctx, cancelF := t.prepare(&val)
- t.cancelP.Store(&cancelF)
+ t.prepare(&val)
if !val.Async {
// 同步
- t.reqfM(pctx, cancelF, val)
+ t.reqfM(val.Ctx, val)
return t.err
} else {
//异步
- go t.reqfM(pctx, cancelF, val)
+ go t.reqfM(val.Ctx, val)
}
return nil
}
-func (t *Req) reqfM(ctxMain context.Context, cancelMain context.CancelFunc, val Rval) {
+func (t *Req) reqfM(ctxMain context.Context, val Rval) {
beginTime := time.Now()
for i := 0; i <= val.Retry; i++ {
}
}
- cancelMain()
t.updateUseDur(beginTime)
t.clean(&val)
t.state.Store(free)
}
func (t *Req) reqf(ctx context.Context, val Rval) (err error) {
- var (
- Header map[string]string = val.Header
- client http.Client
- )
-
- if Header == nil {
- Header = make(map[string]string)
+ if t.client.Transport == nil {
+ t.client.Transport = &http.Transport{}
}
-
if val.Proxy != "" {
- proxy := func(_ *http.Request) (*url.URL, error) {
+ t.client.Transport.(*http.Transport).Proxy = func(_ *http.Request) (*url.URL, error) {
return url.Parse(val.Proxy)
}
- client.Transport = &http.Transport{
- Proxy: proxy,
- IdleConnTimeout: time.Minute,
- }
- } else {
- client.Transport = &http.Transport{
- IdleConnTimeout: time.Minute,
- }
}
+ t.client.Transport.(*http.Transport).IdleConnTimeout = time.Minute
if val.Url == "" {
return ErrEmptyUrl.New()
if len(val.PostStr) > 0 {
body = strings.NewReader(val.PostStr)
- if _, ok := Header["Content-Type"]; !ok {
- Header["Content-Type"] = "application/x-www-form-urlencoded"
- }
}
req, e := http.NewRequestWithContext(ctx, val.Method, val.Url, body)
req.AddCookie(v)
}
- if _, ok := Header["Accept"]; !ok {
- Header["Accept"] = defaultAccept
+ for k, v := range val.Header {
+ req.Header.Set(k, v)
+ }
+
+ if len(val.PostStr) > 0 {
+ if _, ok := req.Header["Content-Type"]; !ok {
+ req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
+ }
}
- if _, ok := Header["Connection"]; !ok {
- Header["Connection"] = "keep-alive"
+ if _, ok := req.Header["Accept"]; !ok {
+ req.Header.Set("Accept", defaultAccept)
}
- if _, ok := Header["Accept-Encoding"]; !ok {
- Header["Accept-Encoding"] = "gzip, deflate, br"
+ if _, ok := req.Header["Connection"]; !ok {
+ req.Header.Set("Connection", "keep-alive")
}
- if val.SaveToPath != "" || val.SaveToPipe != nil {
- Header["Accept-Encoding"] = "identity"
+ if _, ok := req.Header["Accept-Encoding"]; !ok {
+ req.Header.Set("Accept-Encoding", "gzip, deflate, br")
}
- if _, ok := Header["User-Agent"]; !ok {
- Header["User-Agent"] = defaultUA
+ if val.SaveToPath != "" || val.SaveToPipe != nil {
+ req.Header.Set("Accept-Encoding", "identity")
}
-
- for k, v := range Header {
- req.Header.Set(k, v)
+ if _, ok := req.Header["User-Agent"]; !ok {
+ req.Header.Set("User-Agent", defaultUA)
}
- resp, e := client.Do(req)
+ resp, e := t.client.Do(req)
if e != nil {
return pe.Join(ErrClientDo.New(), e)
}
- if v, ok := Header["Connection"]; ok && strings.ToLower(v) != "keep-alive" {
- defer client.CloseIdleConnections()
+ if v, ok := val.Header["Connection"]; ok && strings.ToLower(v) != "keep-alive" {
+ defer t.client.CloseIdleConnections()
}
t.Response = resp
}
}
- writeLoopTO := val.WriteLoopTO
- if writeLoopTO == 0 {
- if val.Timeout > 0 {
- writeLoopTO = val.Timeout + 500
- } else {
- writeLoopTO = 1000
- }
- }
-
// io copy
- errChan := make(chan error, 3)
- errChan <- pio.WithCtxCopy(
- req.Context(),
- t.callTree,
- t.copyResBuf[:],
- time.Duration(int(time.Millisecond)*writeLoopTO), io.MultiWriter(ws...),
- resReadCloser,
- func(s string) { errChan <- pe.New(s) },
- )
- for len(errChan) > 0 {
- err = pe.Join(err, <-errChan)
+ {
+ w := io.MultiWriter(ws...)
+ for {
+ n, e := resReadCloser.Read(t.copyResBuf)
+ if n != 0 {
+ n, e := w.Write(t.copyResBuf[:n])
+ if n == 0 && e != nil {
+ if !errors.Is(e, io.EOF) {
+ err = pe.Join(err, e)
+ }
+ break
+ }
+ } else if e != nil {
+ if !errors.Is(e, io.EOF) {
+ err = pe.Join(err, e)
+ }
+ break
+ }
+ }
}
if t.responBuf != nil {
}
func (t *Req) Close() { t.Cancel() }
+
+// Deprecated: use rval.Ctx.Cancle
func (t *Req) Cancel() {
- if p := t.cancelP.Load(); p != nil {
- (*p)()
- }
}
func (t *Req) IsLive() bool {
t.err = nil
if val.Timeout > 0 {
- ctx1, ctxf1 = context.WithTimeout(ctx, time.Duration(val.Timeout*int(time.Millisecond)))
+ ctx1, ctxf1 = context.WithTimeout(ctx, time.Duration(val.Timeout)*time.Millisecond)
} else {
ctx1, ctxf1 = context.WithCancel(ctx)
}
return
}
-func (t *Req) prepare(val *Rval) (ctx context.Context, cancel context.CancelFunc) {
+func (t *Req) prepare(val *Rval) {
t.UsedTime = 0
t.responFile = nil
t.callTree = ""
}
}
if cap(t.copyResBuf) == 0 {
- t.copyResBuf = make([]byte, 1<<17)
- }
- if val.Ctx != nil {
- ctx, cancel = context.WithCancel(val.Ctx)
+ t.copyResBuf = make([]byte, humanize.KByte*4)
} else {
- ctx, cancel = context.WithCancel(context.Background())
+ t.copyResBuf = t.copyResBuf[:cap(t.copyResBuf)]
+ }
+ if t.client == nil {
+ t.client = &http.Client{}
+ }
+ if val.Ctx == nil {
+ val.Ctx = context.Background()
}
-
if val.SaveToPipe != nil {
- go func() {
- <-ctx.Done()
- if e := val.SaveToPipe.CloseWithError(context.Canceled); e != nil {
- println(e)
- }
- }()
+ val.SaveToPipe.WithCtx(val.Ctx)
}
if val.RawPipe != nil {
- go func() {
- <-ctx.Done()
- if e := val.RawPipe.ResCloseWithError(context.Canceled); e != nil {
- println(e)
- }
- }()
+ val.RawPipe.WithCtx(val.Ctx)
}
if val.Method == "" {
val.Method = "GET"
}
}
-
- return
}
func (t *Req) clean(val *Rval) {
var addr = "127.0.0.1:10001"
+var reuse = New()
+
func init() {
s := web.New(&http.Server{
Addr: addr,
`/exit`: func(_ http.ResponseWriter, _ *http.Request) {
s.Server.Shutdown(context.Background())
},
+ `/header`: func(w http.ResponseWriter, r *http.Request) {
+ for k, v := range r.Header {
+ w.Header().Set(k, v[0])
+ }
+ },
})
time.Sleep(time.Second)
+ reuse.Reqf(Rval{
+ Url: "http://" + addr + "/no",
+ })
+}
+
+func Test_6(t *testing.T) {
+ reuse.Reqf(Rval{
+ Url: "http://" + addr + "/header",
+ Header: map[string]string{
+ `I`: `1`,
+ },
+ })
+ if reuse.Response.Header.Get(`I`) != `1` {
+ t.Fail()
+ }
+}
+
+// go test -timeout 30s -run ^Test_reuse$ github.com/qydysky/part/reqf -race -count=1 -v -memprofile mem.out
+func Test_reuse(t *testing.T) {
+ reuse.Reqf(Rval{
+ Url: "http://" + addr + "/no",
+ })
+ if !bytes.Equal(reuse.Respon, []byte("abc强强强强")) {
+ t.Fail()
+ }
+}
+
+// 2710 430080 ns/op 9896 B/op 111 allocs/op
+func Benchmark(b *testing.B) {
+ rval := Rval{
+ Url: "http://" + addr + "/no",
+ }
+
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ reuse.Reqf(rval)
+ if !bytes.Equal(reuse.Respon, []byte("abc强强强强")) {
+ b.Fail()
+ }
+ }
}
func Test14(t *testing.T) {
Url: "http://" + addr + "/stream",
Ctx: ctx,
NoResponse: true,
- SaveToPipe: &pio.IOpipe{R: i, W: o},
+ SaveToPipe: pio.NewPipeRaw(i, o),
Async: true,
WriteLoopTO: 5*1000*2 + 1,
}); e != nil {
}
func Test_req7(t *testing.T) {
+ ctx, ctxc := context.WithCancel(context.Background())
r := New()
r.Reqf(Rval{
+ Ctx: ctx,
Url: "http://" + addr + "/to",
Async: true,
})
- r.Cancel()
+ ctxc()
if !IsCancel(r.Wait()) {
t.Error("async Cancel fail")
}
// }
func Test_req11(t *testing.T) {
+ ctx, ctxc := context.WithCancel(context.Background())
r := New()
{
timer := time.NewTimer(time.Second)
go func() {
<-timer.C
- r.Cancel()
+ ctxc()
}()
e := r.Reqf(Rval{
+ Ctx: ctx,
Url: "http://" + addr + "/to",
})
if !IsCancel(e) {
}()
r.Reqf(Rval{
Url: "http://" + addr + "/1min",
- SaveToPipe: &pio.IOpipe{R: rc, W: wc},
+ SaveToPipe: pio.NewPipeRaw(rc, wc),
Async: true,
})
if r.Wait() != nil {
}
func Test_req8(t *testing.T) {
+ ctx, ctxc := context.WithCancel(context.Background())
r := New()
{
rc, wc := io.Pipe()
var buf []byte = make([]byte, 1<<16)
_, _ = rc.Read(buf)
time.Sleep(time.Millisecond * 500)
- r.Cancel()
+ ctxc()
}()
r.Reqf(Rval{
+ Ctx: ctx,
Url: "http://" + addr + "/1min",
- SaveToPipe: &pio.IOpipe{R: rc, W: wc},
+ SaveToPipe: pio.NewPipeRaw(rc, wc),
Async: true,
})
if !IsCancel(r.Wait()) {
}()
r.Reqf(Rval{
Url: "http://" + addr + "/br",
- SaveToPipe: &pio.IOpipe{R: rc, W: wc},
+ SaveToPipe: pio.NewPipeRaw(rc, wc),
Async: true,
})
<-c
}()
r.Reqf(Rval{
Url: "http://" + addr + "/gzip",
- SaveToPipe: &pio.IOpipe{R: rc, W: wc},
+ SaveToPipe: pio.NewPipeRaw(rc, wc),
Async: true,
})
<-c
}()
if e := r.Reqf(Rval{
Url: "http://" + addr + "/flate",
- SaveToPipe: &pio.IOpipe{R: rc, W: wc},
+ SaveToPipe: pio.NewPipeRaw(rc, wc),
}); e != nil {
t.Error(e)
}
}()
r.Reqf(Rval{
Url: "http://" + addr + "/flate",
- SaveToPipe: &pio.IOpipe{R: rc, W: wc},
+ SaveToPipe: pio.NewPipeRaw(rc, wc),
Async: true,
})
<-c
}()
r.Reqf(Rval{
Url: "http://" + addr + "/flate",
- SaveToPipe: &pio.IOpipe{R: rc, W: wc},
+ SaveToPipe: pio.NewPipeRaw(rc, wc),
NoResponse: true,
Async: true,
})
},
})
if r.Response.StatusCode != http.StatusNotModified {
- t.Fatal(r.Respon)
+ t.Fatal(string(r.Respon))
}
}
time.Sleep(time.Second)
}()
r.Reqf(reqf.Rval{
Url: "http://127.0.0.1:13001/to",
- SaveToPipe: &pio.IOpipe{R: rc, W: wc},
+ SaveToPipe: pio.NewPipeRaw(rc, wc),
WriteLoopTO: 5000,
Async: true,
})