filiter "github.com/qydysky/front/filiter"
component2 "github.com/qydysky/part/component2"
pctx "github.com/qydysky/part/ctx"
+ pio "github.com/qydysky/part/io"
pslice "github.com/qydysky/part/slice"
pweb "github.com/qydysky/part/web"
)
Pub string `json:"pub"`
Key string `json:"key"`
} `json:"tls"`
- RetryBlocks struct {
- Size string `json:"size"`
- size int `json:"-"`
- Num int `json:"num"`
- } `json:"retryBlocks"`
+ RetryBlocks RetryBlocks `json:"retryBlocks"`
RetryBlocksI pslice.BlocksI[byte] `json:"-"`
MatchRule string `json:"matchRule"`
FdPath string `json:"fdPath"`
reqId atomic.Uint32 `json:"-"`
}
+type RetryBlocks struct {
+ Size string `json:"size"`
+ size int `json:"-"`
+ Num int `json:"num"`
+}
+
func (t *Config) Run(ctx context.Context, logger Logger) {
ctx, done := pctx.WithWait(ctx, 0, time.Minute)
defer func() {
}
t.BlocksI = pslice.NewBlocks[byte](16*1024, t.CopyBlocks)
}
- if size, err := humanize.ParseBytes(t.RetryBlocks.Size); err != nil || size < humanize.MByte {
- t.RetryBlocks.size = humanize.MByte
- } else {
+ if size, err := humanize.ParseBytes(t.RetryBlocks.Size); err == nil && size > 0 {
t.RetryBlocks.size = int(size)
+ } else {
+ t.RetryBlocks.size = humanize.MByte
}
if t.RetryBlocks.size > 0 && t.RetryBlocks.Num > 0 {
t.RetryBlocksI = pslice.NewBlocks[byte](t.RetryBlocks.size, t.RetryBlocks.Num)
// repack
var (
- reqBuf []byte
- reqBufUsed bool
- reqBufAllRead bool
+ reqBuf []byte
+ reqBufUsed bool
+ reqAllRead bool
+ delayBody io.ReadCloser
)
if t.RetryBlocksI != nil && r.Body != nil {
if contentLength := r.Header.Get("Content-Length"); contentLength != "" {
- if _, e := strconv.Atoi(contentLength); e == nil {
+ if n, e := strconv.Atoi(contentLength); e == nil && n < t.RetryBlocks.size {
var putBack func()
var e error
reqBuf, putBack, e = t.RetryBlocksI.Get()
w.WriteHeader(http.StatusBadRequest)
return
}
- reqBufAllRead = true
+ reqAllRead = true
+ break
+ } else if n == 0 {
break
}
}
reqBuf = reqBuf[:offset]
- if !reqBufAllRead {
+ if !reqAllRead {
+ delayBody = r.Body
logger.Warn(`W:`, fmt.Sprintf(logFormat, reqId, r.RemoteAddr, route.config.Addr, routePath, "Err", ErrReqReBodyFull))
}
} else {
backP.lock.Unlock()
if reqBufUsed {
- if !reqBufAllRead {
- r.Body = io.NopCloser(io.MultiReader(bytes.NewBuffer(reqBuf), r.Body))
+ if !reqAllRead {
+ r.Body = pio.RWC{
+ R: io.MultiReader(bytes.NewBuffer(reqBuf), delayBody).Read,
+ C: delayBody.Close,
+ }
reqBufUsed = false
} else {
r.Body = io.NopCloser(bytes.NewBuffer(reqBuf))
package front
import (
+ "bytes"
"context"
+ "io"
"net/http"
"testing"
"time"
"github.com/qydysky/front/filiter"
plog "github.com/qydysky/part/log"
reqf "github.com/qydysky/part/reqf"
+ pweb "github.com/qydysky/part/web"
)
var logger = plog.New(plog.Config{
t.Fail()
}
}
+
+func Test_Res(t *testing.T) {
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ pweb.New(&http.Server{
+ Addr: "127.0.0.1:19001",
+ }).Handle(map[string]func(http.ResponseWriter, *http.Request){
+ `/`: func(w http.ResponseWriter, r *http.Request) {
+ io.Copy(w, r.Body)
+ },
+ })
+
+ conf := &Config{
+ RetryBlocks: RetryBlocks{
+ Num: 10,
+ Size: "3B",
+ },
+ Addr: "127.0.0.1:19000",
+ Routes: []Route{
+ {
+ Path: []string{"/"},
+ PathAdd: true,
+ Backs: []Back{
+ {
+ Name: "1",
+ To: "://127.0.0.1:19001",
+ Weight: 1,
+ },
+ },
+ },
+ },
+ }
+
+ go conf.Run(ctx, logger)
+
+ time.Sleep(time.Second)
+
+ reqb := []byte("1234")
+ resb := make([]byte, 5)
+
+ pipe := reqf.NewRawReqRes()
+ r := reqf.New()
+ if e := r.Reqf(reqf.Rval{
+ Url: "http://127.0.0.1:19000/",
+ RawPipe: pipe,
+ Async: true,
+ }); e != nil {
+ t.Fatal()
+ }
+ pipe.ReqWrite(reqb)
+ pipe.ReqClose()
+ n, _ := pipe.ResRead(resb)
+ resb = resb[:n]
+ if !bytes.Equal(resb, reqb) {
+ t.Fatal(resb)
+ }
+}