"os"
"strconv"
"strings"
- "sync"
"time"
flate "compress/flate"
SleepTime int
JustResponseCode bool
NoResponse bool
+ Async bool
Cookies []*http.Cookie
SaveToPath string
- SaveToChan chan []byte // deprecated
+ SaveToChan chan []byte
SaveToPipeWriter *io.PipeWriter
Header map[string]string
Response *http.Response
UsedTime time.Duration
- cancel *signal.Signal
- sync.Mutex
+ cancel *signal.Signal
+ running *signal.Signal
+ responBuf *bytes.Buffer
+ responFile *os.File
}
func New() *Req {
// }
func (t *Req) Reqf(val Rval) error {
- t.Lock()
- defer t.Unlock()
+ if val.SaveToChan != nil && len(val.SaveToChan) == 1 && !val.Async {
+ panic("must make sure chan size larger then 1 or use Async true")
+ }
+ if val.SaveToPipeWriter != nil && !val.Async {
+ panic("SaveToPipeWriter must use Async true")
+ }
t.Respon = []byte{}
t.Response = nil
_val := val
- t.cancel = signal.Init()
- defer t.cancel.Done()
-
for SleepTime, Retry := _val.SleepTime, _val.Retry; Retry >= 0; Retry -= 1 {
returnErr = t.Reqf_1(_val)
select {
}
func (t *Req) Reqf_1(val Rval) (err error) {
-
var (
Header map[string]string = val.Header
)
var ws []io.Writer
if val.SaveToPath != "" {
- out, err := os.Create(val.SaveToPath)
+ t.responFile, err = os.Create(val.SaveToPath)
if err != nil {
- out.Close()
+ t.responFile.Close()
return err
}
- defer out.Close()
- ws = append(ws, out)
+ ws = append(ws, t.responFile)
}
if val.SaveToPipeWriter != nil {
- defer val.SaveToPipeWriter.Close()
ws = append(ws, val.SaveToPipeWriter)
}
- // if val.SaveToChan != nil {
- // r, w := io.Pipe()
- // go func() {
- // buf := make([]byte, 1<<16)
- // for {
- // n, e := r.Read(buf)
- // if n != 0 {
- // val.SaveToChan <- buf[:n]
- // } else if e != nil {
- // defer close(val.SaveToChan)
- // break
- // }
- // }
- // }()
- // defer w.Close()
- // ws = append(ws, w)
- // }
if !val.NoResponse {
- var buf bytes.Buffer
- defer func() {
- t.Respon = buf.Bytes()
- }()
- ws = append(ws, &buf)
+ t.responBuf = new(bytes.Buffer)
+ ws = append(ws, t.responBuf)
}
w := io.MultiWriter(ws...)
- s := signal.Init()
var resReader io.Reader
if compress_type := resp.Header[`Content-Encoding`]; len(compress_type) != 0 {
} else {
resReader = resp.Body
}
- defer resp.Body.Close()
+ t.running = signal.Init()
+ t.cancel = signal.Init()
go func() {
- buf := make([]byte, 1<<16)
+ buf := make([]byte, 512)
for {
if n, e := resReader.Read(buf); n != 0 {
w.Write(buf[:n])
+ select {
+ case val.SaveToChan <- buf[:n]:
+ default:
+ }
} else if e != nil {
if !errors.Is(e, io.EOF) {
err = e
break
}
}
- s.Done()
+ resp.Body.Close()
+ if val.SaveToChan != nil {
+ close(val.SaveToChan)
+ }
+ if t.responFile != nil {
+ t.responFile.Close()
+ }
+ if val.SaveToPipeWriter != nil {
+ val.SaveToPipeWriter.Close()
+ }
+ if t.responBuf != nil {
+ t.Respon = t.responBuf.Bytes()
+ }
+ t.cancel.Done()
+ t.running.Done()
}()
- s.Wait()
+ if !val.Async {
+ t.Wait()
+ }
// if _, e := io.Copy(w, resp.Body); e != nil {
// err = e
// }
return
}
+func (t *Req) Wait() {
+ t.running.Wait()
+}
+
func (t *Req) Cancel() { t.Close() }
func (t *Req) Close() {
web "github.com/qydysky/part/web"
)
-func Test_Timeout(t *testing.T) {
- r := New()
- if e := r.Reqf(Rval{
- Url: `https://cdimage.debian.org/debian-cd/current/amd64/iso-cd/debian-10.9.0-amd64-netinst.iso`,
- Timeout: 1000,
- }); e != nil {
- if !IsTimeout(e) {
- t.Error(`type error`, e)
- }
- return
- }
- t.Log(`no error`)
-}
-
-func Test_Cancel(t *testing.T) {
- r := New()
-
- go func() {
- time.Sleep(time.Second)
- r.Cancel()
- }()
-
- if e := r.Reqf(Rval{
- Url: `https://cdimage.debian.org/debian-cd/current/amd64/iso-cd/debian-10.9.0-amd64-netinst.iso`,
- }); e != nil {
- if !IsCancel(e) {
- t.Error(`type error`, e)
- }
- return
- }
- t.Log(`no error`)
-}
-
-func Test_Cancel_chan(t *testing.T) {
- r := New()
-
- c := make(chan []byte, 1<<16)
-
- go func() {
- for {
- <-c
- }
- }()
-
- go func() {
- time.Sleep(time.Second * 3)
- r.Cancel()
- }()
-
- if e := r.Reqf(Rval{
- Url: `https://cdimage.debian.org/debian-cd/current/amd64/iso-cd/debian-10.9.0-amd64-netinst.iso`,
- SaveToChan: c,
- Timeout: 5000,
- }); e != nil {
- if !IsCancel(e) {
- t.Error(`type error`, e)
- }
- return
- }
- t.Log(`no error`)
-}
-
-func Test_Io_Pipe(t *testing.T) {
- r := New()
- rp, wp := io.Pipe()
- c := make(chan struct{}, 1)
- go func() {
- buf, _ := io.ReadAll(rp)
- t.Log("Test_Io_Pipe download:", len(buf))
- t.Log("Test_Io_Pipe download:", len(r.Respon))
- close(c)
- }()
- if e := r.Reqf(Rval{
- Url: `https://cdimage.debian.org/debian-cd/current/amd64/iso-cd/debian-10.9.0-amd64-netinst.iso`,
- SaveToPipeWriter: wp,
- Timeout: 5000,
- }); e != nil {
- if !IsTimeout(e) {
- t.Error(`type error`, e)
- }
- return
- }
- t.Log(`no error`)
- <-c
-}
-
-func Test_compress(t *testing.T) {
+func Test_req(t *testing.T) {
addr := "127.0.0.1:10001"
s := web.New(&http.Server{
Addr: addr,
WriteTimeout: time.Second * time.Duration(10),
})
s.Handle(map[string]func(http.ResponseWriter, *http.Request){
+ `/no`: func(w http.ResponseWriter, _ *http.Request) {
+ w.Write([]byte("abc强强强强"))
+ },
`/br`: func(w http.ResponseWriter, _ *http.Request) {
d, _ := compress.InBr([]byte("abc强强强强"), 6)
w.Header().Set("Content-Encoding", "br")
w.Header().Set("Content-Encoding", "gzip")
w.Write(d)
},
+ `/to`: func(w http.ResponseWriter, _ *http.Request) {
+ time.Sleep(time.Minute)
+ d, _ := compress.InGzip([]byte("abc强强强强"), -1)
+ w.Header().Set("Content-Encoding", "gzip")
+ w.Write(d)
+ },
`/exit`: func(_ http.ResponseWriter, _ *http.Request) {
s.Server.Shutdown(context.Background())
},
if !bytes.Equal(r.Respon, []byte("abc强强强强")) {
t.Error("flate fail")
}
-
+ {
+ c := make(chan []byte)
+ r.Reqf(Rval{
+ Url: "http://" + addr + "/no",
+ Async: true,
+ SaveToChan: c,
+ })
+ b := []byte{}
+ for {
+ buf := <-c
+ if len(buf) == 0 {
+ break
+ }
+ b = append(b, buf...)
+ }
+ if !bytes.Equal(b, []byte("abc强强强强")) {
+ t.Error("chan fail")
+ }
+ }
+ {
+ e := r.Reqf(Rval{
+ Url: "http://" + addr + "/to",
+ Timeout: 1000,
+ })
+ if !IsTimeout(e) {
+ t.Error("Timeout fail")
+ }
+ }
+ {
+ timer := time.NewTimer(time.Second)
+ go func() {
+ <-timer.C
+ r.Cancel()
+ }()
+ e := r.Reqf(Rval{
+ Url: "http://" + addr + "/to",
+ })
+ if !IsCancel(e) {
+ t.Error("Cancel fail")
+ }
+ }
{
rc, wc := io.Pipe()
c := make(chan struct{})
r.Reqf(Rval{
Url: "http://" + addr + "/br",
SaveToPipeWriter: wc,
+ Async: true,
})
<-c
}
r.Reqf(Rval{
Url: "http://" + addr + "/gzip",
SaveToPipeWriter: wc,
+ Async: true,
})
<-c
}
r.Reqf(Rval{
Url: "http://" + addr + "/flate",
SaveToPipeWriter: wc,
+ Async: true,
})
<-c
}
+ {
+ r.Reqf(Rval{
+ Url: "http://" + addr + "/flate",
+ Async: true,
+ })
+ if len(r.Respon) != 0 {
+ t.Error("async fail")
+ }
+ r.Wait()
+ if !bytes.Equal(r.Respon, []byte("abc强强强强")) {
+ t.Error("async fail")
+ }
+ }
+ {
+ rc, wc := io.Pipe()
+ r.Reqf(Rval{
+ Url: "http://" + addr + "/flate",
+ SaveToPipeWriter: wc,
+ NoResponse: true,
+ Async: true,
+ })
+ if len(r.Respon) != 0 {
+ t.Error("io async fail")
+ }
+ d, _ := io.ReadAll(rc)
+ if !bytes.Equal(d, []byte("abc强强强强")) {
+ t.Error("io async fail")
+ }
+ if !bytes.Equal(r.Respon, []byte("abc强强强强")) {
+ t.Error("io async fail")
+ }
+ }
}