}
func (t RWC) Write(p []byte) (n int, err error) {
- return t.W(p)
+ if t.W != nil {
+ return t.W(p)
+ }
+ return 0, nil
}
func (t RWC) Read(p []byte) (n int, err error) {
- return t.R(p)
+ if t.R != nil {
+ return t.R(p)
+ }
+ return 0, nil
}
func (t RWC) Close() error {
- return t.C()
+ if t.C != nil {
+ return t.C()
+ }
+ return nil
}
-func WithCtxTO(ctx context.Context, callTree string, to time.Duration, w io.Writer, r io.Reader, panicf ...func(s string)) io.ReadWriteCloser {
+// close reader by yourself
+// call Close() after writer fin
+func WithCtxTO(ctx context.Context, callTree string, to time.Duration, w []io.WriteCloser, r io.Reader, panicf ...func(s string)) io.ReadWriteCloser {
var chanw atomic.Int64
chanw.Store(time.Now().Unix())
if len(panicf) == 0 {
for {
select {
case <-ctx.Done():
- if old := chanw.Load(); old == -1 {
- return
- } else if now := time.Now(); old > 0 && now.Unix()-old > int64(to.Seconds()) {
- if old != 0 {
- panicf[0](fmt.Sprintf("rw blocking while close %vs > %v, goruntime leak \n%v", now.Unix()-old, to, callTree))
- }
+ // avoid write block
+ for i := 0; i < len(w); i++ {
+ w[i].Close()
+ }
+ if old, now := chanw.Load(), time.Now(); old > 0 && now.Unix()-old > int64(to.Seconds()) {
+ panicf[0](fmt.Sprintf("rw blocking while close %vs > %v, goruntime leak \n%v", now.Unix()-old, to, callTree))
} else {
time.AfterFunc(to, func() {
- if old := chanw.Load(); old == -1 {
- return
- } else if now := time.Now(); old > 0 && now.Unix()-old > int64(to.Seconds()) {
- panicf[0](fmt.Sprintf("rw blocking after close %vs > %v, goruntime leak \n%v", now.Unix()-old, to, callTree))
+ if chanw.Load() != -1 {
+ panicf[0](fmt.Sprintf("rw blocking after close %v, goruntime leak \n%v", to, callTree))
}
})
}
return
case now := <-timer.C:
- if old := chanw.Load(); old == -1 {
- return
- } else if old > 0 && now.Unix()-old > int64(to.Seconds()) {
+ if old := chanw.Load(); old > 0 && now.Unix()-old > int64(to.Seconds()) {
panicf[0](fmt.Sprintf("rw blocking after rw %vs > %v, goruntime leak \n%v", now.Unix()-old, to, callTree))
return
}
case <-ctx.Done():
err = context.Canceled
default:
- if n, err = w.Write(p); n != 0 {
- chanw.Store(time.Now().Unix())
+ for i := 0; i < len(w); i++ {
+ if n, err = w[i].Write(p); n != 0 {
+ chanw.Store(time.Now().Unix())
+ }
}
}
return
package part
import (
- "testing"
"io"
+ "testing"
)
+func Test_rwc(t *testing.T) {
+ rwc := RWC{R: func(p []byte) (n int, err error) { return 1, nil }}
+ rwc.Close()
+}
+
func Test_RW2Chan(t *testing.T) {
{
- r,w := io.Pipe()
- _,rw := RW2Chan(nil,w)
-
- go func(){
- rw<-[]byte{0x01}
+ r, w := io.Pipe()
+ _, rw := RW2Chan(nil, w)
+
+ go func() {
+ rw <- []byte{0x01}
}()
buf := make([]byte, 1<<16)
- n,_:=r.Read(buf)
- if buf[:n][0] != 1 {t.Error(`no`)}
+ n, _ := r.Read(buf)
+ if buf[:n][0] != 1 {
+ t.Error(`no`)
+ }
}
-
+
{
- r,w := io.Pipe()
- rc,_ := RW2Chan(r,nil)
-
- go func(){
+ r, w := io.Pipe()
+ rc, _ := RW2Chan(r, nil)
+
+ go func() {
w.Write([]byte{0x09})
}()
- if b:=<-rc;b[0] != 9 {t.Error(`no2`)}
+ if b := <-rc; b[0] != 9 {
+ t.Error(`no2`)
+ }
}
-
+
{
- r,w := io.Pipe()
- rc,rw := RW2Chan(r,w)
-
- go func(){
+ r, w := io.Pipe()
+ rc, rw := RW2Chan(r, w)
+
+ go func() {
rw <- []byte{0x07}
}()
- if b:=<-rc;b[0] != 7 {t.Error(`no3`)}
+ if b := <-rc; b[0] != 7 {
+ t.Error(`no3`)
+ }
}
}
Ctx context.Context
SaveToPath string
- SaveToChan chan []byte
SaveToPipeWriter *io.PipeWriter
Header map[string]string
UsedTime time.Duration
cancelP atomic.Pointer[context.CancelFunc]
- ctx context.Context
state atomic.Int32
responFile *os.File
err = fmt.Errorf("%d %s", resp.StatusCode, http.StatusText(resp.StatusCode))
}
- var ws []io.Writer
+ var ws []io.WriteCloser
if val.SaveToPath != "" {
t.responFile, e = os.Create(val.SaveToPath)
if e != nil {
ws = append(ws, t.responFile)
}
if val.SaveToPipeWriter != nil {
- ws = append(ws, val.SaveToPipeWriter)
+ ws = append(ws, pio.RWC{W: val.SaveToPipeWriter.Write, C: func() error { return val.SaveToPipeWriter.CloseWithError(context.Canceled) }})
}
if !val.NoResponse {
//will clear t.Respon
t.responBuf.Reset()
- ws = append(ws, t.responBuf)
+ ws = append(ws, pio.RWC{W: t.responBuf.Write, C: func() error { return nil }})
}
- w := io.MultiWriter(ws...)
-
var resReadCloser = resp.Body
if compress_type := resp.Header[`Content-Encoding`]; len(compress_type) != 0 {
switch compress_type[0] {
if writeLoopTO == 0 {
writeLoopTO = 1000
}
- rwc := pio.WithCtxTO(req.Context(), t.callTree, time.Duration(int(time.Millisecond)*writeLoopTO), w, resReadCloser)
+
+ rwc := pio.WithCtxTO(req.Context(), t.callTree, time.Duration(int(time.Millisecond)*writeLoopTO), ws, resReadCloser)
defer rwc.Close()
for buf := make([]byte, 2048); true; {
if n, e := rwc.Read(buf); n != 0 {
- if n, e := rwc.Write(buf[:n]); n != 0 {
- if val.SaveToChan != nil {
- val.SaveToChan <- buf[:n]
- }
- } else if e != nil {
+ if n, e := rwc.Write(buf[:n]); n == 0 && e != nil {
if !errors.Is(e, io.EOF) {
err = errors.Join(err, ErrWriteRes, e)
}
}
func (t *Req) clean(val *Rval) {
- if val.SaveToChan != nil {
- close(val.SaveToChan)
- }
if t.responFile != nil {
t.responFile.Close()
}
"bytes"
"context"
"encoding/json"
+ "errors"
"io"
"net/http"
"strconv"
flusher.Flush()
}
},
+ `/stream`: func(w http.ResponseWriter, r *http.Request) {
+ flusher, flushSupport := w.(http.Flusher)
+ if flushSupport {
+ flusher.Flush()
+ }
+ for {
+ select {
+ case <-r.Context().Done():
+ println("server req ctx done")
+ return
+ default:
+ w.Write([]byte{'0'})
+ flusher.Flush()
+ }
+ }
+ },
`/exit`: func(_ http.ResponseWriter, _ *http.Request) {
s.Server.Shutdown(context.Background())
},
time.Sleep(time.Second)
}
+func Test14(t *testing.T) {
+ ctx, cancel := context.WithCancel(context.Background())
+
+ i, o := io.Pipe()
+
+ r := New()
+ if e := r.Reqf(Rval{
+ Url: "http://" + addr + "/stream",
+ Ctx: ctx,
+ NoResponse: true,
+ SaveToPipeWriter: o,
+ Async: true,
+ WriteLoopTO: 5*1000*2 + 1,
+ }); e != nil {
+ t.Log(e)
+ }
+
+ start := time.Now()
+
+ t.Log("Do", time.Since(start))
+
+ go func() {
+ buf := make([]byte, 1<<8)
+ for {
+ if n, e := i.Read(buf); n != 0 {
+ if time.Since(start) > time.Second {
+ cancel()
+ t.Log("Cancel", time.Since(start))
+ break
+ }
+ // do nothing
+ continue
+ } else if e != nil {
+ t.Log(e)
+ break
+ }
+ }
+ }()
+
+ if !errors.Is(r.Wait(), context.Canceled) {
+ t.Fatal()
+ }
+ t.Log("Do finished", time.Since(start))
+}
+
func Test_req13(t *testing.T) {
r := New()
e := r.Reqf(Rval{
}
}
-func Test_req5(t *testing.T) {
- r := New()
- {
- c := make(chan []byte)
- r.Reqf(Rval{
- Url: "http://" + addr + "/to",
- Timeout: 1000,
- Async: true,
- SaveToChan: c,
- })
- for {
- buf := <-c
- if len(buf) == 0 {
- break
- }
- }
- if !IsTimeout(r.Wait()) {
- t.Error("async IsTimeout fail")
- }
- }
-}
+// func Test_req5(t *testing.T) {
+// r := New()
+// {
+// c := make(chan []byte)
+// r.Reqf(Rval{
+// Url: "http://" + addr + "/to",
+// Timeout: 1000,
+// Async: true,
+// SaveToChan: c,
+// })
+// for {
+// buf := <-c
+// if len(buf) == 0 {
+// break
+// }
+// }
+// if !IsTimeout(r.Wait()) {
+// t.Error("async IsTimeout fail")
+// }
+// }
+// }
-func Test_req6(t *testing.T) {
- r := New()
- {
- c := make(chan []byte)
- r.Reqf(Rval{
- Url: "http://" + addr + "/no",
- Async: true,
- SaveToChan: c,
- })
- b := []byte{}
- for {
- buf := <-c
- if len(buf) == 0 {
- break
- }
- b = append(b, buf...)
- }
- if !bytes.Equal(b, []byte("abc强强强强")) {
- t.Error("chan fail")
- }
- }
-}
+// func Test_req6(t *testing.T) {
+// r := New()
+// {
+// c := make(chan []byte)
+// r.Reqf(Rval{
+// Url: "http://" + addr + "/no",
+// Async: true,
+// SaveToChan: c,
+// })
+// b := []byte{}
+// for {
+// buf := <-c
+// if len(buf) == 0 {
+// break
+// }
+// b = append(b, buf...)
+// }
+// if !bytes.Equal(b, []byte("abc强强强强")) {
+// t.Error("chan fail")
+// }
+// }
+// }
func Test_req11(t *testing.T) {
r := New()
import (
"bytes"
- "context"
"encoding/json"
"fmt"
"io"
defer s.Shutdown()
m.Store("/to", func(w http.ResponseWriter, r *http.Request) {
-
- rwc := pio.WithCtxTO(context.Background(), fmt.Sprintf("server handle %v by %v ", r.URL.Path, r.RemoteAddr), time.Second, w, r.Body, func(s string) {
- fmt.Println(s)
- if !strings.Contains(s, "write blocking after rw 2s > 1s, goruntime leak") {
- t.Fatal(s)
- }
- })
+ rwc := pio.WithCtxTO(r.Context(), fmt.Sprintf("server handle %v by %v ", r.URL.Path, r.RemoteAddr), time.Second,
+ []io.WriteCloser{pio.RWC{W: w.Write}}, r.Body, func(s string) {
+ fmt.Println(s)
+ if !strings.Contains(s, "write blocking after rw 2s > 1s, goruntime leak") {
+ t.Fatal(s)
+ }
+ })
defer rwc.Close()
type d struct {