From 126d9ac64db682d1242bbc0abc5359bb29a4b10f Mon Sep 17 00:00:00 2001 From: qydysky Date: Wed, 4 Oct 2023 01:53:01 +0800 Subject: [PATCH] 1 --- reqf/RawReqRes.go | 94 +++++++++++++++++++++++++++++++++++++ reqf/Reqf.go | 115 ++++++++++++++++++++++++++++------------------ reqf/Reqf_test.go | 35 ++++++++++++++ 3 files changed, 199 insertions(+), 45 deletions(-) create mode 100644 reqf/RawReqRes.go diff --git a/reqf/RawReqRes.go b/reqf/RawReqRes.go new file mode 100644 index 0000000..9bbdcd6 --- /dev/null +++ b/reqf/RawReqRes.go @@ -0,0 +1,94 @@ +package part + +import ( + "io" + + pio "github.com/qydysky/part/io" +) + +type RawReqRes struct { + req *pio.IOpipe + res *pio.IOpipe + reqC chan struct{} + resC chan struct{} +} + +func NewRawReqRes() *RawReqRes { + return &RawReqRes{req: pio.NewPipe(), res: pio.NewPipe(), reqC: make(chan struct{}), resC: make(chan struct{})} +} + +func (t RawReqRes) ReqClose() error { + select { + case <-t.reqC: + return nil + default: + close(t.reqC) + return t.req.Close() + } +} + +func (t RawReqRes) ReqCloseWithError(e error) error { + select { + case <-t.reqC: + return nil + default: + close(t.reqC) + return t.req.CloseWithError(e) + } +} + +func (t RawReqRes) ResClose() error { + select { + case <-t.resC: + return nil + default: + close(t.resC) + return t.res.Close() + } +} + +func (t RawReqRes) ResCloseWithError(e error) error { + select { + case <-t.resC: + return nil + default: + close(t.resC) + return t.res.CloseWithError(e) + } +} + +func (t RawReqRes) Write(p []byte) (n int, err error) { + select { + case <-t.reqC: + return t.res.Write(p) + default: + return 0, io.EOF + } +} + +func (t RawReqRes) Read(p []byte) (n int, err error) { + select { + case <-t.reqC: + return 0, io.EOF + default: + return t.req.Read(p) + } +} + +func (t RawReqRes) ReqWrite(p []byte) (n int, err error) { + select { + case <-t.reqC: + return 0, io.EOF + default: + return t.req.Write(p) + } +} + +func (t RawReqRes) ResRead(p []byte) (n int, err error) { + select { + case <-t.reqC: + return t.res.Read(p) + default: + return 0, io.EOF + } +} diff --git a/reqf/Reqf.go b/reqf/Reqf.go index 77d15a7..3dacd12 100644 --- a/reqf/Reqf.go +++ b/reqf/Reqf.go @@ -26,6 +26,7 @@ import ( ) type Rval struct { + Method string Url string PostStr string Proxy string @@ -47,6 +48,8 @@ type Rval struct { // 为避免write阻塞导致panic,请使用此项目io包中的NewPipe(),或在ctx done时,自行关闭pipe writer reader SaveToPipe *pio.IOpipe + RawPipe *RawReqRes + Header map[string]string } @@ -59,11 +62,14 @@ const ( var ( ErrEmptyUrl = errors.New("ErrEmptyUrl") + ErrMustAsync = errors.New("ErrMustAsync") + ErrCantRetry = errors.New("ErrCantRetry") ErrNewRequest = errors.New("ErrNewRequest") ErrClientDo = errors.New("ErrClientDo") ErrResponFileCreate = errors.New("ErrResponFileCreate") ErrWriteRes = errors.New("ErrWriteRes") ErrReadRes = errors.New("ErrReadRes") + ErrPostStrOrRawPipe = errors.New("ErrPostStrOrRawPipe") ) type Req struct { @@ -96,56 +102,43 @@ func (t *Req) Reqf(val Rval) error { pctx, cancelF := t.prepare(&val) t.cancelP.Store(&cancelF) - // 同步 if !val.Async { - beginTime := time.Now() - - for i := 0; i <= val.Retry; i++ { - ctx, cancel := t.prepareRes(pctx, &val) - t.err = t.Reqf_1(ctx, val) - cancel() - if t.err == nil || IsCancel(t.err) { - break - } - if val.SleepTime != 0 { - time.Sleep(time.Duration(val.SleepTime * int(time.Millisecond))) - } - } - - cancelF() - t.updateUseDur(beginTime) - t.clean(&val) - t.state.Store(free) - t.l.Unlock() - return t.err + // 同步 + return t.reqfM(pctx, cancelF, val) + } else { + //异步 + go func() { + _ = t.reqfM(pctx, cancelF, val) + }() } - //异步 - go func() { - beginTime := time.Now() + return nil +} - for i := 0; i <= val.Retry; i++ { - ctx, cancel := t.prepareRes(pctx, &val) - t.err = t.Reqf_1(ctx, val) - cancel() - if t.err == nil || IsCancel(t.err) { - break - } - if val.SleepTime != 0 { - time.Sleep(time.Duration(val.SleepTime * int(time.Millisecond))) - } +func (t *Req) reqfM(ctx context.Context, cancel context.CancelFunc, val Rval) error { + beginTime := time.Now() + + for i := 0; i <= val.Retry; i++ { + ctx, cancel := t.prepareRes(ctx, &val) + t.err = t.reqf(ctx, val) + cancel() + if t.err == nil || IsCancel(t.err) { + break } + if val.SleepTime != 0 { + time.Sleep(time.Duration(val.SleepTime * int(time.Millisecond))) + } + } - cancelF() - t.updateUseDur(beginTime) - t.clean(&val) - t.state.Store(free) - t.l.Unlock() - }() - return nil + cancel() + t.updateUseDur(beginTime) + t.clean(&val) + t.state.Store(free) + t.l.Unlock() + return t.err } -func (t *Req) Reqf_1(ctx context.Context, val Rval) (err error) { +func (t *Req) reqf(ctx context.Context, val Rval) (err error) { var ( Header map[string]string = val.Header client http.Client @@ -173,17 +166,27 @@ func (t *Req) Reqf_1(ctx context.Context, val Rval) (err error) { return ErrEmptyUrl } - Method := "GET" var body io.Reader + if len(val.PostStr) > 0 && val.RawPipe != nil { + return ErrPostStrOrRawPipe + } + if val.Retry != 0 && val.RawPipe != nil { + return ErrCantRetry + } + if val.SaveToPipe != nil && !val.Async { + return ErrMustAsync + } + if val.RawPipe != nil { + body = val.RawPipe + } if len(val.PostStr) > 0 { - Method = "POST" body = strings.NewReader(val.PostStr) if _, ok := Header["Content-Type"]; !ok { Header["Content-Type"] = "application/x-www-form-urlencoded" } } - req, e := http.NewRequestWithContext(ctx, Method, val.Url, body) + req, e := http.NewRequestWithContext(ctx, val.Method, val.Url, body) if e != nil { return errors.Join(ErrNewRequest, e) } @@ -244,6 +247,9 @@ func (t *Req) Reqf_1(ctx context.Context, val Rval) (err error) { if val.SaveToPipe != nil { ws = append(ws, val.SaveToPipe) } + if val.RawPipe != nil { + ws = append(ws, val.RawPipe) + } if !val.NoResponse { ws = append(ws, t.responBuf) } @@ -354,6 +360,21 @@ func (t *Req) prepare(val *Rval) (ctx context.Context, cancel context.CancelFunc } }() } + if val.RawPipe != nil { + go func() { + <-ctx.Done() + if e := val.RawPipe.ResCloseWithError(context.Canceled); e != nil { + println(e) + } + }() + } + + if val.Method == "" { + val.Method = "GET" + if len(val.PostStr) > 0 { + val.Method = "POST" + } + } return } @@ -365,6 +386,10 @@ func (t *Req) clean(val *Rval) { if val.SaveToPipe != nil { val.SaveToPipe.Close() } + if val.RawPipe != nil { + val.RawPipe.ReqClose() + val.RawPipe.ResClose() + } } func (t *Req) updateUseDur(u time.Time) { diff --git a/reqf/Reqf_test.go b/reqf/Reqf_test.go index 6e5c055..b5d58ad 100644 --- a/reqf/Reqf_test.go +++ b/reqf/Reqf_test.go @@ -29,6 +29,9 @@ func init() { code, _ := strconv.Atoi(r.URL.Query().Get(`code`)) w.WriteHeader(code) }, + `/reply`: func(w http.ResponseWriter, r *http.Request) { + io.Copy(w, r.Body) + }, `/no`: func(w http.ResponseWriter, _ *http.Request) { w.Write([]byte("abc强强强强")) }, @@ -476,3 +479,35 @@ func Test_req3(t *testing.T) { wg.Wait() } } + +func Test_req5(t *testing.T) { + r := New() + r.Reqf(Rval{ + Url: "http://" + addr + "/reply", + PostStr: "123", + }) + if !bytes.Equal(r.Respon, []byte("123")) { + t.Fatal() + } + + raw := NewRawReqRes() + buf := []byte("123") + r.Reqf(Rval{ + Url: "http://" + addr + "/reply", + Async: true, + RawPipe: raw, + NoResponse: true, + }) + if _, e := raw.ReqWrite(buf); e != nil { + t.Fatal(e) + } + raw.ReqClose() + clear(buf) + if _, e := raw.ResRead(buf); e != nil && !errors.Is(e, io.EOF) { + t.Fatal(e) + } + if !bytes.Equal([]byte("123"), buf) { + t.Log(r.Respon, buf) + t.Fatal() + } +} -- 2.39.2