"errors"
"fmt"
"io"
+ "sync"
"sync/atomic"
"time"
)
return
}
+func NewPipe() *IOpipe {
+ r, w := io.Pipe()
+ return &IOpipe{R: r, W: w}
+}
+
+type onceError struct {
+ sync.Mutex // guards following
+ err error
+}
+
+func (a *onceError) Store(err error) {
+ a.Lock()
+ defer a.Unlock()
+ if a.err != nil {
+ return
+ }
+ a.err = err
+}
+func (a *onceError) Load() error {
+ a.Lock()
+ defer a.Unlock()
+ return a.err
+}
+
+type IOpipe struct {
+ R *io.PipeReader
+ W *io.PipeWriter
+ re onceError
+ we onceError
+}
+
+func (t *IOpipe) Write(p []byte) (n int, err error) {
+ if t.W != nil {
+ n, err = t.W.Write(p)
+ if errors.Is(err, io.ErrClosedPipe) {
+ err = errors.Join(err, t.we.Load())
+ }
+ }
+ return
+}
+func (t *IOpipe) Read(p []byte) (n int, err error) {
+ if t.R != nil {
+ n, err = t.R.Read(p)
+ if errors.Is(err, io.ErrClosedPipe) {
+ err = errors.Join(err, t.re.Load())
+ }
+ }
+ return
+}
+func (t *IOpipe) Close() (err error) {
+ if t.W != nil {
+ err = errors.Join(err, t.W.Close())
+ }
+ if t.R != nil {
+ err = errors.Join(err, t.R.Close())
+ }
+ return
+}
+func (t *IOpipe) CloseWithError(e error) (err error) {
+ if t.W != nil {
+ t.we.Store(e)
+ err = errors.Join(err, t.W.CloseWithError(e))
+ }
+ if t.R != nil {
+ t.re.Store(e)
+ err = errors.Join(err, t.R.CloseWithError(e))
+ }
+ return
+}
+
type RWC struct {
R func(p []byte) (n int, err error)
W func(p []byte) (n int, err error)
}
// close reader by yourself
+//
+// to avoid writer block after ctx done, you should close writer after ctx done
+//
// call Close() after writer fin
-func WithCtxTO(ctx context.Context, callTree string, to time.Duration, w []io.WriteCloser, r io.Reader, panicf ...func(s string)) io.ReadWriteCloser {
+func WithCtxTO(ctx context.Context, callTree string, to time.Duration, w []io.Writer, r io.Reader, panicf ...func(s string)) io.ReadWriteCloser {
var chanw atomic.Int64
chanw.Store(time.Now().Unix())
if len(panicf) == 0 {
for {
select {
case <-ctx.Done():
- // avoid write block
- for i := 0; i < len(w); i++ {
- w[i].Close()
- }
if old, now := chanw.Load(), time.Now(); old > 0 && now.Unix()-old > int64(to.Seconds()) {
panicf[0](fmt.Sprintf("rw blocking while close %vs > %v, goruntime leak \n%v", now.Unix()-old, to, callTree))
} else {
err = context.Canceled
default:
for i := 0; i < len(w); i++ {
- if n, err = w[i].Write(p); n != 0 {
- chanw.Store(time.Now().Unix())
- }
+ _, err = w[i].Write(p)
}
+ chanw.Store(time.Now().Unix())
}
return
},
)
// close reader by yourself
-func WithCtxCopy(ctx context.Context, callTree string, to time.Duration, w []io.WriteCloser, r io.Reader, panicf ...func(s string)) error {
+//
+// to avoid writer block after ctx done, you should close writer after ctx done
+//
+// call Close() after writer fin
+func WithCtxCopy(ctx context.Context, callTree string, to time.Duration, w []io.Writer, r io.Reader, panicf ...func(s string)) error {
rwc := WithCtxTO(ctx, callTree, to, w, r)
defer rwc.Close()
for buf := make([]byte, 2048); true; {
Cookies []*http.Cookie
Ctx context.Context
- SaveToPath string
- SaveToPipeWriter *io.PipeWriter
+ SaveToPath string
+ // 为避免write阻塞导致panic,请使用此项目io包中的NewPipe(),或在ctx done时,自行关闭pipe writer reader
+ SaveToPipe *pio.IOpipe
Header map[string]string
}
if _, ok := Header["Accept-Encoding"]; !ok {
Header["Accept-Encoding"] = "gzip, deflate, br"
}
- if val.SaveToPath != "" || val.SaveToPipeWriter != nil {
+ if val.SaveToPath != "" || val.SaveToPipe != nil {
Header["Accept-Encoding"] = "identity"
}
if _, ok := Header["User-Agent"]; !ok {
err = fmt.Errorf("%d %s", resp.StatusCode, http.StatusText(resp.StatusCode))
}
- var ws []io.WriteCloser
+ var ws []io.Writer
if val.SaveToPath != "" {
t.responFile, e = os.Create(val.SaveToPath)
if e != nil {
}
ws = append(ws, t.responFile)
}
- if val.SaveToPipeWriter != nil {
- ws = append(ws, pio.RWC{W: val.SaveToPipeWriter.Write, C: func() error { return val.SaveToPipeWriter.CloseWithError(context.Canceled) }})
+ if val.SaveToPipe != nil {
+ ws = append(ws, val.SaveToPipe)
}
if !val.NoResponse {
- //will clear t.Respon
- t.responBuf.Reset()
- ws = append(ws, pio.RWC{W: t.responBuf.Write, C: func() error { return nil }})
+ ws = append(ws, t.responBuf)
}
var resReadCloser = resp.Body
return t.state.Load() == running
}
-func (t *Req) prepareRes(ctx context.Context, val *Rval) (context.Context, context.CancelFunc) {
+func (t *Req) prepareRes(ctx context.Context, val *Rval) (ctx1 context.Context, ctxf1 context.CancelFunc) {
if !val.NoResponse {
if t.responBuf == nil {
t.responBuf = new(bytes.Buffer)
}
t.Response = nil
t.err = nil
+
if val.Timeout > 0 {
- return context.WithTimeout(ctx, time.Duration(val.Timeout*int(time.Millisecond)))
+ ctx1, ctxf1 = context.WithTimeout(ctx, time.Duration(val.Timeout*int(time.Millisecond)))
+ } else {
+ ctx1, ctxf1 = context.WithCancel(ctx)
}
- return context.WithCancel(ctx)
+ return
}
func (t *Req) prepare(val *Rval) (ctx context.Context, cancel context.CancelFunc) {
}
}
if val.Ctx != nil {
- return context.WithCancel(val.Ctx)
+ ctx, cancel = context.WithCancel(val.Ctx)
} else {
- return context.WithCancel(context.Background())
+ ctx, cancel = context.WithCancel(context.Background())
+ }
+
+ if val.SaveToPipe != nil {
+ go func() {
+ <-ctx.Done()
+ if e := val.SaveToPipe.CloseWithError(context.Canceled); e != nil {
+ println(e)
+ }
+ }()
}
+
+ return
}
func (t *Req) clean(val *Rval) {
if t.responFile != nil {
t.responFile.Close()
}
- if val.SaveToPipeWriter != nil {
- val.SaveToPipeWriter.Close()
+ if val.SaveToPipe != nil {
+ val.SaveToPipe.Close()
}
}
"time"
compress "github.com/qydysky/part/compress"
+ pio "github.com/qydysky/part/io"
web "github.com/qydysky/part/web"
)
r := New()
if e := r.Reqf(Rval{
- Url: "http://" + addr + "/stream",
- Ctx: ctx,
- NoResponse: true,
- SaveToPipeWriter: o,
- Async: true,
- WriteLoopTO: 5*1000*2 + 1,
+ Url: "http://" + addr + "/stream",
+ Ctx: ctx,
+ NoResponse: true,
+ SaveToPipe: &pio.IOpipe{R: i, W: o},
+ Async: true,
+ WriteLoopTO: 5*1000*2 + 1,
}); e != nil {
t.Log(e)
}
}
}()
r.Reqf(Rval{
- Url: "http://" + addr + "/1min",
- SaveToPipeWriter: wc,
- Async: true,
+ Url: "http://" + addr + "/1min",
+ SaveToPipe: &pio.IOpipe{R: rc, W: wc},
+ Async: true,
})
if r.Wait() != nil {
t.Fatal()
r.Cancel()
}()
r.Reqf(Rval{
- Url: "http://" + addr + "/1min",
- SaveToPipeWriter: wc,
- Async: true,
+ Url: "http://" + addr + "/1min",
+ SaveToPipe: &pio.IOpipe{R: rc, W: wc},
+ Async: true,
})
if !IsCancel(r.Wait()) {
t.Fatal("read from block response")
}()
r.Reqf(Rval{
Url: "http://" + addr + "/1min",
- SaveToPipeWriter: wc,
+ SaveToPipe: wc,
Async: true,
})
if !IsCancel(r.Wait()) {
close(c)
}()
r.Reqf(Rval{
- Url: "http://" + addr + "/br",
- SaveToPipeWriter: wc,
- Async: true,
+ Url: "http://" + addr + "/br",
+ SaveToPipe: &pio.IOpipe{R: rc, W: wc},
+ Async: true,
})
<-c
}
close(c)
}()
r.Reqf(Rval{
- Url: "http://" + addr + "/gzip",
- SaveToPipeWriter: wc,
- Async: true,
+ Url: "http://" + addr + "/gzip",
+ SaveToPipe: &pio.IOpipe{R: rc, W: wc},
+ Async: true,
})
<-c
}
close(c)
}()
r.Reqf(Rval{
- Url: "http://" + addr + "/flate",
- SaveToPipeWriter: wc,
+ Url: "http://" + addr + "/flate",
+ SaveToPipe: &pio.IOpipe{R: rc, W: wc},
})
<-c
}
close(c)
}()
r.Reqf(Rval{
- Url: "http://" + addr + "/flate",
- SaveToPipeWriter: wc,
- Async: true,
+ Url: "http://" + addr + "/flate",
+ SaveToPipe: &pio.IOpipe{R: rc, W: wc},
+ Async: true,
})
<-c
}
wg.Done()
}()
r.Reqf(Rval{
- Url: "http://" + addr + "/flate",
- SaveToPipeWriter: wc,
- NoResponse: true,
- Async: true,
+ Url: "http://" + addr + "/flate",
+ SaveToPipe: &pio.IOpipe{R: rc, W: wc},
+ NoResponse: true,
+ Async: true,
})
r.Wait()
if len(r.Respon) != 0 {
m.Store("/to", func(w http.ResponseWriter, r *http.Request) {
rwc := pio.WithCtxTO(r.Context(), fmt.Sprintf("server handle %v by %v ", r.URL.Path, r.RemoteAddr), time.Second,
- []io.WriteCloser{pio.RWC{W: w.Write}}, r.Body, func(s string) {
+ []io.Writer{w}, r.Body, func(s string) {
fmt.Println(s)
if !strings.Contains(s, "write blocking after rw 2s > 1s, goruntime leak") {
t.Fatal(s)
close(c)
}()
r.Reqf(reqf.Rval{
- Url: "http://127.0.0.1:10000/to",
- SaveToPipeWriter: wc,
- WriteLoopTO: 5000,
- Async: true,
+ Url: "http://127.0.0.1:10000/to",
+ SaveToPipe: &pio.IOpipe{R: rc, W: wc},
+ WriteLoopTO: 5000,
+ Async: true,
})
<-c
}