package part
import (
+ "context"
+ "fmt"
"io"
+ "sync/atomic"
+ "time"
)
-//no close rc any time
-//you can close wc, r, w.
+// no close rc any time
+// you can close wc, r, w.
func RW2Chan(r io.ReadCloser, w io.WriteCloser) (rc, wc chan []byte) {
if r != nil {
rc = make(chan []byte, 10)
}
return
}
+
+type RWC struct {
+ R func(p []byte) (n int, err error)
+ W func(p []byte) (n int, err error)
+ C func() error
+}
+
+func (t RWC) Write(p []byte) (n int, err error) {
+ return t.W(p)
+}
+func (t RWC) Read(p []byte) (n int, err error) {
+ return t.R(p)
+}
+func (t RWC) Close() error {
+ return t.C()
+}
+
+func WithCtxTO(ctx context.Context, callTree string, to time.Duration, w io.Writer, r io.Reader, panicf ...func(s string)) io.ReadWriteCloser {
+ var chanw atomic.Int64
+ chanw.Store(time.Now().Unix())
+ if len(panicf) == 0 {
+ panicf = append(panicf, func(s string) { panic(s) })
+ }
+
+ go func() {
+ var timer = time.NewTicker(to)
+ defer timer.Stop()
+ for {
+ select {
+ case <-ctx.Done():
+ if old, now := chanw.Load(), time.Now(); old > 0 && now.Unix()-old > int64(to.Seconds()) {
+ if old != 0 {
+ panicf[0](fmt.Sprintf("write blocking while close %vs > %v, goruntime leak \n%v", now.Unix()-old, to, callTree))
+ }
+ } else if old < 0 {
+ return
+ } else {
+ time.AfterFunc(to, func() {
+ if old, now := chanw.Load(), time.Now(); old != 0 && now.Unix()-old > int64(to.Seconds()) {
+ panicf[0](fmt.Sprintf("write blocking after close %vs > %v, goruntime leak \n%v", now.Unix()-old, to, callTree))
+ }
+ })
+ }
+ return
+ case now := <-timer.C:
+ if old := chanw.Load(); old > 0 && now.Unix()-old > int64(to.Seconds()) {
+ panicf[0](fmt.Sprintf("write blocking after rw %vs > %v, goruntime leak \n%v", now.Unix()-old, to, callTree))
+ return
+ } else if old < 0 {
+ return
+ }
+ }
+ }
+ }()
+
+ return RWC{
+ func(p []byte) (n int, err error) {
+ if n, err = r.Read(p); n != 0 {
+ select {
+ case <-ctx.Done():
+ default:
+ chanw.Store(time.Now().Unix())
+ }
+ }
+ return
+ },
+ func(p []byte) (n int, err error) {
+ if n, err = w.Write(p); n != 0 {
+ chanw.Store(time.Now().Unix())
+ }
+ return
+ },
+ func() error {
+ chanw.Store(-1)
+ return nil
+ },
+ }
+}
gzip "compress/gzip"
br "github.com/andybalholm/brotli"
+ pio "github.com/qydysky/part/io"
s "github.com/qydysky/part/strings"
// "encoding/binary"
)
ErrResponFileCreate = errors.New("ErrResponFileCreate")
ErrWriteRes = errors.New("ErrWriteRes")
ErrReadRes = errors.New("ErrReadRes")
- ErrWriteAfterWrite = errors.New("ErrWriteAfterWrite")
)
type Req struct {
if compress_type := resp.Header[`Content-Encoding`]; len(compress_type) != 0 {
switch compress_type[0] {
case `br`:
- resReadCloser = rwc{r: br.NewReader(resp.Body).Read}
+ resReadCloser = pio.RWC{R: br.NewReader(resp.Body).Read}
case `gzip`:
resReadCloser, _ = gzip.NewReader(resp.Body)
case `deflate`:
if writeLoopTO == 0 {
writeLoopTO = 1000
}
- rwc := t.withCtxTO(ctx, time.Duration(int(time.Millisecond)*writeLoopTO), w, resReadCloser)
+ rwc := pio.WithCtxTO(ctx, t.callTree, time.Duration(int(time.Millisecond)*writeLoopTO), w, resReadCloser)
defer rwc.Close()
for buf := make([]byte, 2048); true; {
t.UsedTime = time.Since(u)
}
-type rwc struct {
- r func(p []byte) (n int, err error)
- w func(p []byte) (n int, err error)
- c func() error
-}
-
-func (t rwc) Write(p []byte) (n int, err error) {
- return t.w(p)
-}
-func (t rwc) Read(p []byte) (n int, err error) {
- return t.r(p)
-}
-func (t rwc) Close() error {
- return t.c()
-}
-
-func (t *Req) withCtxTO(ctx context.Context, to time.Duration, w io.Writer, r io.Reader) io.ReadWriteCloser {
- var chanw atomic.Int64
-
- go func(callTree string) {
- var timer = time.NewTicker(to)
- defer timer.Stop()
- for {
- select {
- case <-ctx.Done():
- if old, now := chanw.Load(), time.Now(); old != 0 && now.Unix()-old > int64(to.Seconds()) {
- if chanw.Load() != 0 {
- panic(fmt.Sprintf("write blocking after %v, goruntime leak \n%v", now.Unix()-old, callTree))
- }
- } else {
- time.AfterFunc(to, func() {
- if chanw.Load() != 0 {
- panic(fmt.Sprintf("write blocking after %v, goruntime leak \n%v", now.Unix()-old, callTree))
- }
- })
- }
- return
- case now := <-timer.C:
- if old := chanw.Load(); old != 0 && now.Unix()-old > int64(to.Seconds()) {
- panic(fmt.Sprintf("write blocking after %v, goruntime leak \n%v", now.Unix()-old, callTree))
- }
- }
- }
- }(t.callTree)
-
- return rwc{
- func(p []byte) (n int, err error) {
- if n, err = r.Read(p); n != 0 {
- select {
- case <-ctx.Done():
- default:
- chanw.Store(time.Now().Unix())
- }
- }
- return
- },
- func(p []byte) (n int, err error) {
- if n, err = w.Write(p); n != 0 {
- if chanw.Swap(0) == 0 {
- panic(ErrWriteAfterWrite)
- }
- }
- return
- },
- func() error {
- return nil
- },
- }
-}
-
func IsTimeout(e error) bool {
if errors.Is(e, context.DeadlineExceeded) {
return true
import (
"bytes"
+ "context"
"encoding/json"
+ "fmt"
+ "io"
"net/http"
+ "strings"
"testing"
"time"
+ pio "github.com/qydysky/part/io"
reqf "github.com/qydysky/part/reqf"
)
}
}
+func Test_ClientBlock(t *testing.T) {
+ var m WebPath
+ m.Store("/", func(w http.ResponseWriter, _ *http.Request) {
+ w.Write([]byte("1"))
+ })
+ s := NewSyncMap(&http.Server{
+ Addr: "127.0.0.1:10000",
+ WriteTimeout: time.Millisecond,
+ }, &m)
+ defer s.Shutdown()
+
+ m.Store("/to", func(w http.ResponseWriter, r *http.Request) {
+
+ rwc := pio.WithCtxTO(context.Background(), fmt.Sprintf("server handle %v by %v ", r.URL.Path, r.RemoteAddr), time.Second, w, r.Body, func(s string) {
+ fmt.Println(s)
+ if !strings.Contains(s, "write blocking after rw 2s > 1s, goruntime leak") {
+ t.Fatal(s)
+ }
+ })
+ defer rwc.Close()
+
+ type d struct {
+ A string `json:"a"`
+ B []string `json:"b"`
+ C map[string]int `json:"c"`
+ }
+
+ var t = ResStruct{0, "ok", d{"0", []string{"0"}, map[string]int{"0": 1}}}
+ data, e := json.Marshal(t)
+ if e != nil {
+ t.Code = -1
+ t.Data = nil
+ t.Message = e.Error()
+ data, _ = json.Marshal(t)
+ }
+ w.Header().Set("Content-Type", "application/json")
+ _, _ = w.Write(data)
+ })
+
+ time.Sleep(time.Second)
+
+ r := reqf.New()
+ {
+ rc, wc := io.Pipe()
+ c := make(chan struct{})
+ go func() {
+ time.Sleep(time.Second * 3)
+ d, _ := io.ReadAll(rc)
+ fmt.Println(string(d))
+ fmt.Println(r.Response.Status)
+ close(c)
+ }()
+ r.Reqf(reqf.Rval{
+ Url: "http://127.0.0.1:10000/to",
+ SaveToPipeWriter: wc,
+ WriteLoopTO: 5000,
+ Async: true,
+ })
+ <-c
+ }
+}
+
func BenchmarkXxx(b *testing.B) {
var m WebPath
type d struct {