From: qydysky Date: Sat, 22 Mar 2025 01:08:35 +0000 (+0800) Subject: 1 X-Git-Tag: v0.1.20250322011029 X-Git-Url: http://127.0.0.1:8081/?a=commitdiff_plain;h=54847ee326efa3430198961889346a3b89874b82;p=front%2F.git 1 --- diff --git a/config.go b/config.go index 3ebcb88..b73c637 100755 --- a/config.go +++ b/config.go @@ -23,6 +23,7 @@ import ( filiter "github.com/qydysky/front/filiter" component2 "github.com/qydysky/part/component2" pctx "github.com/qydysky/part/ctx" + pio "github.com/qydysky/part/io" pslice "github.com/qydysky/part/slice" pweb "github.com/qydysky/part/web" ) @@ -34,11 +35,7 @@ type Config struct { Pub string `json:"pub"` Key string `json:"key"` } `json:"tls"` - RetryBlocks struct { - Size string `json:"size"` - size int `json:"-"` - Num int `json:"num"` - } `json:"retryBlocks"` + RetryBlocks RetryBlocks `json:"retryBlocks"` RetryBlocksI pslice.BlocksI[byte] `json:"-"` MatchRule string `json:"matchRule"` FdPath string `json:"fdPath"` @@ -53,6 +50,12 @@ type Config struct { reqId atomic.Uint32 `json:"-"` } +type RetryBlocks struct { + Size string `json:"size"` + size int `json:"-"` + Num int `json:"num"` +} + func (t *Config) Run(ctx context.Context, logger Logger) { ctx, done := pctx.WithWait(ctx, 0, time.Minute) defer func() { @@ -85,10 +88,10 @@ func (t *Config) Run(ctx context.Context, logger Logger) { } t.BlocksI = pslice.NewBlocks[byte](16*1024, t.CopyBlocks) } - if size, err := humanize.ParseBytes(t.RetryBlocks.Size); err != nil || size < humanize.MByte { - t.RetryBlocks.size = humanize.MByte - } else { + if size, err := humanize.ParseBytes(t.RetryBlocks.Size); err == nil && size > 0 { t.RetryBlocks.size = int(size) + } else { + t.RetryBlocks.size = humanize.MByte } if t.RetryBlocks.size > 0 && t.RetryBlocks.Num > 0 { t.RetryBlocksI = pslice.NewBlocks[byte](t.RetryBlocks.size, t.RetryBlocks.Num) @@ -249,13 +252,14 @@ func (t *Config) SwapSign(ctx context.Context, logger Logger) { // repack var ( - reqBuf []byte - reqBufUsed bool - reqBufAllRead bool + reqBuf []byte + reqBufUsed bool + reqAllRead bool + delayBody io.ReadCloser ) if t.RetryBlocksI != nil && r.Body != nil { if contentLength := r.Header.Get("Content-Length"); contentLength != "" { - if _, e := strconv.Atoi(contentLength); e == nil { + if n, e := strconv.Atoi(contentLength); e == nil && n < t.RetryBlocks.size { var putBack func() var e error reqBuf, putBack, e = t.RetryBlocksI.Get() @@ -274,12 +278,15 @@ func (t *Config) SwapSign(ctx context.Context, logger Logger) { w.WriteHeader(http.StatusBadRequest) return } - reqBufAllRead = true + reqAllRead = true + break + } else if n == 0 { break } } reqBuf = reqBuf[:offset] - if !reqBufAllRead { + if !reqAllRead { + delayBody = r.Body logger.Warn(`W:`, fmt.Sprintf(logFormat, reqId, r.RemoteAddr, route.config.Addr, routePath, "Err", ErrReqReBodyFull)) } } else { @@ -299,8 +306,11 @@ func (t *Config) SwapSign(ctx context.Context, logger Logger) { backP.lock.Unlock() if reqBufUsed { - if !reqBufAllRead { - r.Body = io.NopCloser(io.MultiReader(bytes.NewBuffer(reqBuf), r.Body)) + if !reqAllRead { + r.Body = pio.RWC{ + R: io.MultiReader(bytes.NewBuffer(reqBuf), delayBody).Read, + C: delayBody.Close, + } reqBufUsed = false } else { r.Body = io.NopCloser(bytes.NewBuffer(reqBuf)) diff --git a/config_test.go b/config_test.go index 7afa772..9f081af 100644 --- a/config_test.go +++ b/config_test.go @@ -1,7 +1,9 @@ package front import ( + "bytes" "context" + "io" "net/http" "testing" "time" @@ -9,6 +11,7 @@ import ( "github.com/qydysky/front/filiter" plog "github.com/qydysky/part/log" reqf "github.com/qydysky/part/reqf" + pweb "github.com/qydysky/part/web" ) var logger = plog.New(plog.Config{ @@ -134,3 +137,61 @@ func Test_Back(t *testing.T) { t.Fail() } } + +func Test_Res(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + pweb.New(&http.Server{ + Addr: "127.0.0.1:19001", + }).Handle(map[string]func(http.ResponseWriter, *http.Request){ + `/`: func(w http.ResponseWriter, r *http.Request) { + io.Copy(w, r.Body) + }, + }) + + conf := &Config{ + RetryBlocks: RetryBlocks{ + Num: 10, + Size: "3B", + }, + Addr: "127.0.0.1:19000", + Routes: []Route{ + { + Path: []string{"/"}, + PathAdd: true, + Backs: []Back{ + { + Name: "1", + To: "://127.0.0.1:19001", + Weight: 1, + }, + }, + }, + }, + } + + go conf.Run(ctx, logger) + + time.Sleep(time.Second) + + reqb := []byte("1234") + resb := make([]byte, 5) + + pipe := reqf.NewRawReqRes() + r := reqf.New() + if e := r.Reqf(reqf.Rval{ + Url: "http://127.0.0.1:19000/", + RawPipe: pipe, + Async: true, + }); e != nil { + t.Fatal() + } + pipe.ReqWrite(reqb) + pipe.ReqClose() + n, _ := pipe.ResRead(resb) + resb = resb[:n] + if !bytes.Equal(resb, reqb) { + t.Fatal(resb) + } +} diff --git a/local.go b/local.go index 85eab75..8c35fd5 100644 --- a/local.go +++ b/local.go @@ -28,7 +28,7 @@ type localDealer struct{} func (localDealer) Deal(ctx context.Context, reqId uint32, w http.ResponseWriter, r *http.Request, routePath string, chosenBack *Back, logger Logger, blocksi pslice.BlocksI[byte]) error { var ( opT = time.Now() - logFormat = "%v %v%v > %v local %v %v %v" + logFormat = "%v %v %v%v > %v local %v %v %v" ) path := chosenBack.To