]> 127.0.0.1 Git - front/.git/commitdiff
1 v0.1.20250322011029
authorqydysky <qydysky@foxmail.com>
Sat, 22 Mar 2025 01:08:35 +0000 (09:08 +0800)
committerqydysky <qydysky@foxmail.com>
Sat, 22 Mar 2025 01:08:35 +0000 (09:08 +0800)
config.go
config_test.go
local.go

index 3ebcb88f07a5b3304d006692e1eb3fec75b59845..b73c63776b2368756dcf27ce4b3b292fd5944624 100755 (executable)
--- a/config.go
+++ b/config.go
@@ -23,6 +23,7 @@ import (
        filiter "github.com/qydysky/front/filiter"
        component2 "github.com/qydysky/part/component2"
        pctx "github.com/qydysky/part/ctx"
+       pio "github.com/qydysky/part/io"
        pslice "github.com/qydysky/part/slice"
        pweb "github.com/qydysky/part/web"
 )
@@ -34,11 +35,7 @@ type Config struct {
                Pub string `json:"pub"`
                Key string `json:"key"`
        } `json:"tls"`
-       RetryBlocks struct {
-               Size string `json:"size"`
-               size int    `json:"-"`
-               Num  int    `json:"num"`
-       } `json:"retryBlocks"`
+       RetryBlocks  RetryBlocks          `json:"retryBlocks"`
        RetryBlocksI pslice.BlocksI[byte] `json:"-"`
        MatchRule    string               `json:"matchRule"`
        FdPath       string               `json:"fdPath"`
@@ -53,6 +50,12 @@ type Config struct {
        reqId     atomic.Uint32 `json:"-"`
 }
 
+type RetryBlocks struct {
+       Size string `json:"size"`
+       size int    `json:"-"`
+       Num  int    `json:"num"`
+}
+
 func (t *Config) Run(ctx context.Context, logger Logger) {
        ctx, done := pctx.WithWait(ctx, 0, time.Minute)
        defer func() {
@@ -85,10 +88,10 @@ func (t *Config) Run(ctx context.Context, logger Logger) {
                }
                t.BlocksI = pslice.NewBlocks[byte](16*1024, t.CopyBlocks)
        }
-       if size, err := humanize.ParseBytes(t.RetryBlocks.Size); err != nil || size < humanize.MByte {
-               t.RetryBlocks.size = humanize.MByte
-       } else {
+       if size, err := humanize.ParseBytes(t.RetryBlocks.Size); err == nil && size > 0 {
                t.RetryBlocks.size = int(size)
+       } else {
+               t.RetryBlocks.size = humanize.MByte
        }
        if t.RetryBlocks.size > 0 && t.RetryBlocks.Num > 0 {
                t.RetryBlocksI = pslice.NewBlocks[byte](t.RetryBlocks.size, t.RetryBlocks.Num)
@@ -249,13 +252,14 @@ func (t *Config) SwapSign(ctx context.Context, logger Logger) {
 
                                // repack
                                var (
-                                       reqBuf        []byte
-                                       reqBufUsed    bool
-                                       reqBufAllRead bool
+                                       reqBuf     []byte
+                                       reqBufUsed bool
+                                       reqAllRead bool
+                                       delayBody  io.ReadCloser
                                )
                                if t.RetryBlocksI != nil && r.Body != nil {
                                        if contentLength := r.Header.Get("Content-Length"); contentLength != "" {
-                                               if _, e := strconv.Atoi(contentLength); e == nil {
+                                               if n, e := strconv.Atoi(contentLength); e == nil && n < t.RetryBlocks.size {
                                                        var putBack func()
                                                        var e error
                                                        reqBuf, putBack, e = t.RetryBlocksI.Get()
@@ -274,12 +278,15 @@ func (t *Config) SwapSign(ctx context.Context, logger Logger) {
                                                                                        w.WriteHeader(http.StatusBadRequest)
                                                                                        return
                                                                                }
-                                                                               reqBufAllRead = true
+                                                                               reqAllRead = true
+                                                                               break
+                                                                       } else if n == 0 {
                                                                                break
                                                                        }
                                                                }
                                                                reqBuf = reqBuf[:offset]
-                                                               if !reqBufAllRead {
+                                                               if !reqAllRead {
+                                                                       delayBody = r.Body
                                                                        logger.Warn(`W:`, fmt.Sprintf(logFormat, reqId, r.RemoteAddr, route.config.Addr, routePath, "Err", ErrReqReBodyFull))
                                                                }
                                                        } else {
@@ -299,8 +306,11 @@ func (t *Config) SwapSign(ctx context.Context, logger Logger) {
                                        backP.lock.Unlock()
 
                                        if reqBufUsed {
-                                               if !reqBufAllRead {
-                                                       r.Body = io.NopCloser(io.MultiReader(bytes.NewBuffer(reqBuf), r.Body))
+                                               if !reqAllRead {
+                                                       r.Body = pio.RWC{
+                                                               R: io.MultiReader(bytes.NewBuffer(reqBuf), delayBody).Read,
+                                                               C: delayBody.Close,
+                                                       }
                                                        reqBufUsed = false
                                                } else {
                                                        r.Body = io.NopCloser(bytes.NewBuffer(reqBuf))
index 7afa772766b61544beffe6345261131ca145375b..9f081af36af3c7cd6cc368f5825cc389354b9cbc 100644 (file)
@@ -1,7 +1,9 @@
 package front
 
 import (
+       "bytes"
        "context"
+       "io"
        "net/http"
        "testing"
        "time"
@@ -9,6 +11,7 @@ import (
        "github.com/qydysky/front/filiter"
        plog "github.com/qydysky/part/log"
        reqf "github.com/qydysky/part/reqf"
+       pweb "github.com/qydysky/part/web"
 )
 
 var logger = plog.New(plog.Config{
@@ -134,3 +137,61 @@ func Test_Back(t *testing.T) {
                t.Fail()
        }
 }
+
+func Test_Res(t *testing.T) {
+       ctx, cancel := context.WithCancel(context.Background())
+       defer cancel()
+
+       pweb.New(&http.Server{
+               Addr: "127.0.0.1:19001",
+       }).Handle(map[string]func(http.ResponseWriter, *http.Request){
+               `/`: func(w http.ResponseWriter, r *http.Request) {
+                       io.Copy(w, r.Body)
+               },
+       })
+
+       conf := &Config{
+               RetryBlocks: RetryBlocks{
+                       Num:  10,
+                       Size: "3B",
+               },
+               Addr: "127.0.0.1:19000",
+               Routes: []Route{
+                       {
+                               Path:    []string{"/"},
+                               PathAdd: true,
+                               Backs: []Back{
+                                       {
+                                               Name:   "1",
+                                               To:     "://127.0.0.1:19001",
+                                               Weight: 1,
+                                       },
+                               },
+                       },
+               },
+       }
+
+       go conf.Run(ctx, logger)
+
+       time.Sleep(time.Second)
+
+       reqb := []byte("1234")
+       resb := make([]byte, 5)
+
+       pipe := reqf.NewRawReqRes()
+       r := reqf.New()
+       if e := r.Reqf(reqf.Rval{
+               Url:     "http://127.0.0.1:19000/",
+               RawPipe: pipe,
+               Async:   true,
+       }); e != nil {
+               t.Fatal()
+       }
+       pipe.ReqWrite(reqb)
+       pipe.ReqClose()
+       n, _ := pipe.ResRead(resb)
+       resb = resb[:n]
+       if !bytes.Equal(resb, reqb) {
+               t.Fatal(resb)
+       }
+}
index 85eab757a07d61f699dbcfaf2f847bb3852f5f4f..8c35fd5a7087713e04c5e11a913692136f83be92 100644 (file)
--- a/local.go
+++ b/local.go
@@ -28,7 +28,7 @@ type localDealer struct{}
 func (localDealer) Deal(ctx context.Context, reqId uint32, w http.ResponseWriter, r *http.Request, routePath string, chosenBack *Back, logger Logger, blocksi pslice.BlocksI[byte]) error {
        var (
                opT       = time.Now()
-               logFormat = "%v %v%v > %v local %v %v %v"
+               logFormat = "%v %v %v%v > %v local %v %v %v"
        )
 
        path := chosenBack.To