--- /dev/null
+package part
+
+import (
+ "io"
+
+ pio "github.com/qydysky/part/io"
+)
+
+type RawReqRes struct {
+ req *pio.IOpipe
+ res *pio.IOpipe
+ reqC chan struct{}
+ resC chan struct{}
+}
+
+func NewRawReqRes() *RawReqRes {
+ return &RawReqRes{req: pio.NewPipe(), res: pio.NewPipe(), reqC: make(chan struct{}), resC: make(chan struct{})}
+}
+
+func (t RawReqRes) ReqClose() error {
+ select {
+ case <-t.reqC:
+ return nil
+ default:
+ close(t.reqC)
+ return t.req.Close()
+ }
+}
+
+func (t RawReqRes) ReqCloseWithError(e error) error {
+ select {
+ case <-t.reqC:
+ return nil
+ default:
+ close(t.reqC)
+ return t.req.CloseWithError(e)
+ }
+}
+
+func (t RawReqRes) ResClose() error {
+ select {
+ case <-t.resC:
+ return nil
+ default:
+ close(t.resC)
+ return t.res.Close()
+ }
+}
+
+func (t RawReqRes) ResCloseWithError(e error) error {
+ select {
+ case <-t.resC:
+ return nil
+ default:
+ close(t.resC)
+ return t.res.CloseWithError(e)
+ }
+}
+
+func (t RawReqRes) Write(p []byte) (n int, err error) {
+ select {
+ case <-t.reqC:
+ return t.res.Write(p)
+ default:
+ return 0, io.EOF
+ }
+}
+
+func (t RawReqRes) Read(p []byte) (n int, err error) {
+ select {
+ case <-t.reqC:
+ return 0, io.EOF
+ default:
+ return t.req.Read(p)
+ }
+}
+
+func (t RawReqRes) ReqWrite(p []byte) (n int, err error) {
+ select {
+ case <-t.reqC:
+ return 0, io.EOF
+ default:
+ return t.req.Write(p)
+ }
+}
+
+func (t RawReqRes) ResRead(p []byte) (n int, err error) {
+ select {
+ case <-t.reqC:
+ return t.res.Read(p)
+ default:
+ return 0, io.EOF
+ }
+}
)
type Rval struct {
+ Method string
Url string
PostStr string
Proxy string
// 为避免write阻塞导致panic,请使用此项目io包中的NewPipe(),或在ctx done时,自行关闭pipe writer reader
SaveToPipe *pio.IOpipe
+ RawPipe *RawReqRes
+
Header map[string]string
}
var (
ErrEmptyUrl = errors.New("ErrEmptyUrl")
+ ErrMustAsync = errors.New("ErrMustAsync")
+ ErrCantRetry = errors.New("ErrCantRetry")
ErrNewRequest = errors.New("ErrNewRequest")
ErrClientDo = errors.New("ErrClientDo")
ErrResponFileCreate = errors.New("ErrResponFileCreate")
ErrWriteRes = errors.New("ErrWriteRes")
ErrReadRes = errors.New("ErrReadRes")
+ ErrPostStrOrRawPipe = errors.New("ErrPostStrOrRawPipe")
)
type Req struct {
pctx, cancelF := t.prepare(&val)
t.cancelP.Store(&cancelF)
- // 同步
if !val.Async {
- beginTime := time.Now()
-
- for i := 0; i <= val.Retry; i++ {
- ctx, cancel := t.prepareRes(pctx, &val)
- t.err = t.Reqf_1(ctx, val)
- cancel()
- if t.err == nil || IsCancel(t.err) {
- break
- }
- if val.SleepTime != 0 {
- time.Sleep(time.Duration(val.SleepTime * int(time.Millisecond)))
- }
- }
-
- cancelF()
- t.updateUseDur(beginTime)
- t.clean(&val)
- t.state.Store(free)
- t.l.Unlock()
- return t.err
+ // 同步
+ return t.reqfM(pctx, cancelF, val)
+ } else {
+ //异步
+ go func() {
+ _ = t.reqfM(pctx, cancelF, val)
+ }()
}
- //异步
- go func() {
- beginTime := time.Now()
+ return nil
+}
- for i := 0; i <= val.Retry; i++ {
- ctx, cancel := t.prepareRes(pctx, &val)
- t.err = t.Reqf_1(ctx, val)
- cancel()
- if t.err == nil || IsCancel(t.err) {
- break
- }
- if val.SleepTime != 0 {
- time.Sleep(time.Duration(val.SleepTime * int(time.Millisecond)))
- }
+func (t *Req) reqfM(ctx context.Context, cancel context.CancelFunc, val Rval) error {
+ beginTime := time.Now()
+
+ for i := 0; i <= val.Retry; i++ {
+ ctx, cancel := t.prepareRes(ctx, &val)
+ t.err = t.reqf(ctx, val)
+ cancel()
+ if t.err == nil || IsCancel(t.err) {
+ break
}
+ if val.SleepTime != 0 {
+ time.Sleep(time.Duration(val.SleepTime * int(time.Millisecond)))
+ }
+ }
- cancelF()
- t.updateUseDur(beginTime)
- t.clean(&val)
- t.state.Store(free)
- t.l.Unlock()
- }()
- return nil
+ cancel()
+ t.updateUseDur(beginTime)
+ t.clean(&val)
+ t.state.Store(free)
+ t.l.Unlock()
+ return t.err
}
-func (t *Req) Reqf_1(ctx context.Context, val Rval) (err error) {
+func (t *Req) reqf(ctx context.Context, val Rval) (err error) {
var (
Header map[string]string = val.Header
client http.Client
return ErrEmptyUrl
}
- Method := "GET"
var body io.Reader
+ if len(val.PostStr) > 0 && val.RawPipe != nil {
+ return ErrPostStrOrRawPipe
+ }
+ if val.Retry != 0 && val.RawPipe != nil {
+ return ErrCantRetry
+ }
+ if val.SaveToPipe != nil && !val.Async {
+ return ErrMustAsync
+ }
+ if val.RawPipe != nil {
+ body = val.RawPipe
+ }
if len(val.PostStr) > 0 {
- Method = "POST"
body = strings.NewReader(val.PostStr)
if _, ok := Header["Content-Type"]; !ok {
Header["Content-Type"] = "application/x-www-form-urlencoded"
}
}
- req, e := http.NewRequestWithContext(ctx, Method, val.Url, body)
+ req, e := http.NewRequestWithContext(ctx, val.Method, val.Url, body)
if e != nil {
return errors.Join(ErrNewRequest, e)
}
if val.SaveToPipe != nil {
ws = append(ws, val.SaveToPipe)
}
+ if val.RawPipe != nil {
+ ws = append(ws, val.RawPipe)
+ }
if !val.NoResponse {
ws = append(ws, t.responBuf)
}
}
}()
}
+ if val.RawPipe != nil {
+ go func() {
+ <-ctx.Done()
+ if e := val.RawPipe.ResCloseWithError(context.Canceled); e != nil {
+ println(e)
+ }
+ }()
+ }
+
+ if val.Method == "" {
+ val.Method = "GET"
+ if len(val.PostStr) > 0 {
+ val.Method = "POST"
+ }
+ }
return
}
if val.SaveToPipe != nil {
val.SaveToPipe.Close()
}
+ if val.RawPipe != nil {
+ val.RawPipe.ReqClose()
+ val.RawPipe.ResClose()
+ }
}
func (t *Req) updateUseDur(u time.Time) {
code, _ := strconv.Atoi(r.URL.Query().Get(`code`))
w.WriteHeader(code)
},
+ `/reply`: func(w http.ResponseWriter, r *http.Request) {
+ io.Copy(w, r.Body)
+ },
`/no`: func(w http.ResponseWriter, _ *http.Request) {
w.Write([]byte("abc强强强强"))
},
wg.Wait()
}
}
+
+func Test_req5(t *testing.T) {
+ r := New()
+ r.Reqf(Rval{
+ Url: "http://" + addr + "/reply",
+ PostStr: "123",
+ })
+ if !bytes.Equal(r.Respon, []byte("123")) {
+ t.Fatal()
+ }
+
+ raw := NewRawReqRes()
+ buf := []byte("123")
+ r.Reqf(Rval{
+ Url: "http://" + addr + "/reply",
+ Async: true,
+ RawPipe: raw,
+ NoResponse: true,
+ })
+ if _, e := raw.ReqWrite(buf); e != nil {
+ t.Fatal(e)
+ }
+ raw.ReqClose()
+ clear(buf)
+ if _, e := raw.ResRead(buf); e != nil && !errors.Is(e, io.EOF) {
+ t.Fatal(e)
+ }
+ if !bytes.Equal([]byte("123"), buf) {
+ t.Log(r.Respon, buf)
+ t.Fatal()
+ }
+}