"net/http"
"net/url"
"os"
+ "strconv"
"strings"
"sync"
"time"
compress "github.com/qydysky/part/compress"
- pio "github.com/qydysky/part/io"
signal "github.com/qydysky/part/signal"
// "encoding/binary"
)
-var (
- ErrConnectTimeout = errors.New("ErrConnectTimeout")
- ErrReadTimeout = errors.New("ErrReadTimeout")
-)
-
type Rval struct {
Url string
PostStr string
Timeout int
- ReadTimeout int
- ConnectTimeout int
+ ReadTimeout int // deprecated
+ ConnectTimeout int // deprecated
Proxy string
Retry int
SleepTime int
JustResponseCode bool
+ NoResponse bool
Cookies []*http.Cookie
SaveToPath string
_val := val
t.cancel = signal.Init()
+ defer t.cancel.Done()
for SleepTime, Retry := _val.SleepTime, _val.Retry; Retry >= 0; Retry -= 1 {
returnErr = t.Reqf_1(_val)
func (t *Req) Reqf_1(val Rval) (err error) {
var (
- Url string = val.Url
- PostStr string = val.PostStr
- Proxy string = val.Proxy
- Timeout int = val.Timeout
- ReadTimeout int = val.ReadTimeout
- ConnectTimeout int = val.ConnectTimeout
- JustResponseCode bool = val.JustResponseCode
- SaveToChan chan []byte = val.SaveToChan
- SaveToPath string = val.SaveToPath
- SaveToPipeWriter *io.PipeWriter = val.SaveToPipeWriter
-
Header map[string]string = val.Header
)
Header = make(map[string]string)
}
- if Proxy != "" {
+ if val.Proxy != "" {
proxy := func(_ *http.Request) (*url.URL, error) {
- return url.Parse(Proxy)
+ return url.Parse(val.Proxy)
}
client.Transport = &http.Transport{
Proxy: proxy,
}
}
- if Url == "" {
+ if val.Url == "" {
return errors.New("url is empty")
}
Method := "GET"
var body io.Reader
- if len(PostStr) > 0 {
+ if len(val.PostStr) > 0 {
Method = "POST"
- body = strings.NewReader(PostStr)
+ body = strings.NewReader(val.PostStr)
if _, ok := Header["Content-Type"]; !ok {
Header["Content-Type"] = "application/x-www-form-urlencoded"
}
}
cx, cancel := context.WithCancel(context.Background())
- if Timeout > 0 {
- cx, cancel = context.WithTimeout(cx, time.Duration(Timeout)*time.Millisecond)
+ if val.Timeout > 0 {
+ cx, cancel = context.WithTimeout(cx, time.Duration(val.Timeout)*time.Millisecond)
}
- req, e := http.NewRequest(Method, Url, body)
+ req, e := http.NewRequest(Method, val.Url, body)
if e != nil {
panic(e)
}
if _, ok := Header["Accept-Encoding"]; !ok {
Header["Accept-Encoding"] = "gzip, deflate, br"
}
- if SaveToPath != "" {
+ if val.SaveToPath != "" || val.SaveToChan != nil || val.SaveToPipeWriter != nil {
Header["Accept-Encoding"] = "identity"
}
if _, ok := Header["User-Agent"]; !ok {
return err
}
- var (
- saveToFile func(io.Reader, string) ([]byte, error) = func(Body io.Reader, filepath string) (bodyB []byte, err error) {
- out, err := os.Create(filepath + ".dtmp")
- if err != nil {
- out.Close()
- return bodyB, err
- }
- var buf bytes.Buffer
+ t.Response = resp
+ defer resp.Body.Close()
+ defer func() {
+ t.UsedTime = time.Since(beginTime)
+ }()
- w := io.MultiWriter(out, &buf)
- if _, err = io.Copy(w, Body); err != nil {
- out.Close()
- return bodyB, err
- }
- out.Close()
- bodyB = buf.Bytes()
+ if val.JustResponseCode {
+ return
+ }
- if err = os.RemoveAll(filepath); err != nil {
- return bodyB, err
- }
- if err = os.Rename(filepath+".dtmp", filepath); err != nil {
- return bodyB, err
- }
- return bodyB, nil
+ if resp.StatusCode >= 400 {
+ err = errors.New(strconv.Itoa(resp.StatusCode))
+ }
+
+ if compress_type := resp.Header[`Content-Encoding`]; len(compress_type) != 0 && (compress_type[0] == `br` ||
+ compress_type[0] == `gzip` ||
+ compress_type[0] == `deflate`) {
+
+ if val.NoResponse {
+ return errors.New("respose had compress, must load all data, but NoResponse is true")
}
- )
- t.Response = resp
- if !JustResponseCode {
- defer resp.Body.Close()
- if compress_type := resp.Header[`Content-Encoding`]; len(compress_type) != 0 && (compress_type[0] == `br` ||
- compress_type[0] == `gzip` ||
- compress_type[0] == `deflate`) {
- var err error
- t.Respon, err = ioutil.ReadAll(resp.Body)
- if err != nil {
- return err
- }
- if compress_type := resp.Header[`Content-Encoding`]; len(compress_type) != 0 {
- switch compress_type[0] {
- case `br`:
- if tmp, err := compress.UnBr(t.Respon); err != nil {
- return err
- } else {
- t.Respon = append([]byte{}, tmp...)
- }
- case `gzip`:
- if tmp, err := compress.UnGzip(t.Respon); err != nil {
- return err
- } else {
- t.Respon = append([]byte{}, tmp...)
- }
- case `deflate`:
- if tmp, err := compress.UnFlate(t.Respon); err != nil {
- return err
- } else {
- t.Respon = append([]byte{}, tmp...)
- }
- default:
+ var err error
+ t.Respon, err = ioutil.ReadAll(resp.Body)
+ if err != nil {
+ return err
+ }
+
+ if compress_type := resp.Header[`Content-Encoding`]; len(compress_type) != 0 {
+ switch compress_type[0] {
+ case `br`:
+ if tmp, err := compress.UnBr(t.Respon); err != nil {
+ return err
+ } else {
+ t.Respon = append([]byte{}, tmp...)
}
- }
- } else {
- if SaveToPath != "" {
- if bodyB, err := saveToFile(resp.Body, SaveToPath); err != nil {
+ case `gzip`:
+ if tmp, err := compress.UnGzip(t.Respon); err != nil {
return err
} else {
- if len(bodyB) != 0 {
- if SaveToChan != nil {
- SaveToChan <- bodyB
- } else if SaveToPipeWriter != nil {
- SaveToPipeWriter.Write(bodyB)
- } else {
- t.Respon = append(t.Respon, bodyB...)
- }
- } else {
- return io.EOF
- }
-
- if SaveToChan != nil {
- close(SaveToChan)
- }
- if SaveToPipeWriter != nil {
- SaveToPipeWriter.Close()
- }
+ t.Respon = append([]byte{}, tmp...)
}
- } else {
- rc, _ := pio.RW2Chan(resp.Body, nil)
- var After = func(ReadTimeout int) (c <-chan time.Time) {
- if ReadTimeout > 0 {
- c = time.NewTimer(time.Millisecond * time.Duration(ReadTimeout)).C
- }
- return
+ case `deflate`:
+ if tmp, err := compress.UnFlate(t.Respon); err != nil {
+ return err
+ } else {
+ t.Respon = append([]byte{}, tmp...)
}
-
- select {
- case buf := <-rc:
- if len(buf) != 0 {
- if SaveToChan != nil {
- SaveToChan <- buf
- } else if SaveToPipeWriter != nil {
- SaveToPipeWriter.Write(buf)
- } else {
- t.Respon = append(t.Respon, buf...)
- }
- } else {
- err = io.EOF
- return
+ default:
+ }
+ }
+ } else {
+ var ws []io.Writer
+ if val.SaveToPath != "" {
+ out, err := os.Create(val.SaveToPath)
+ if err != nil {
+ out.Close()
+ return err
+ }
+ defer out.Close()
+ ws = append(ws, out)
+ }
+ if val.SaveToPipeWriter != nil {
+ defer val.SaveToPipeWriter.Close()
+ ws = append(ws, val.SaveToPipeWriter)
+ }
+ if val.SaveToChan != nil {
+ r, w := io.Pipe()
+ go func() {
+ buf := make([]byte, 1<<16)
+ for {
+ n, e := r.Read(buf)
+ if n != 0 {
+ val.SaveToChan <- buf[:n]
+ } else if e != nil {
+ defer close(val.SaveToChan)
+ break
}
- case <-After(ConnectTimeout):
- err = ErrConnectTimeout
- return
}
+ }()
+ defer w.Close()
+ ws = append(ws, w)
+ }
+ if !val.NoResponse {
+ var buf bytes.Buffer
+ defer func() {
+ t.Respon = buf.Bytes()
+ }()
+ ws = append(ws, &buf)
+ }
- for loop := true; loop; {
- select {
- case buf := <-rc:
- if len(buf) != 0 {
- if SaveToChan != nil {
- SaveToChan <- buf
- } else if SaveToPipeWriter != nil {
- SaveToPipeWriter.Write(buf)
- } else {
- t.Respon = append(t.Respon, buf...)
- }
- } else {
- err = io.EOF
- loop = false
- }
- case <-After(ReadTimeout):
- err = ErrReadTimeout
- loop = false
+ w := io.MultiWriter(ws...)
+ s := signal.Init()
+ go func() {
+ buf := make([]byte, 1<<16)
+ for {
+ if n, e := resp.Body.Read(buf); n != 0 {
+ w.Write(buf[:n])
+ } else if e != nil {
+ if !errors.Is(e, io.EOF) {
+ err = e
}
- if !t.cancel.Islive() {
- err = context.Canceled
- loop = false
- break
- }
- }
- if SaveToChan != nil {
- close(SaveToChan)
+ break
}
- if SaveToPipeWriter != nil {
- SaveToPipeWriter.Close()
+
+ if !t.cancel.Islive() {
+ err = context.Canceled
+ break
}
}
- }
- } else {
- resp.Body.Close()
+ s.Done()
+ }()
+ s.Wait()
+ // if _, e := io.Copy(w, resp.Body); e != nil {
+ // err = e
+ // }
}
-
- t.UsedTime = time.Since(beginTime)
-
- return nil
+ return
}
func (t *Req) Cancel() { t.Close() }
}
func IsTimeout(e error) bool {
- if errors.Is(e, context.DeadlineExceeded) || errors.Is(e, ErrConnectTimeout) || errors.Is(e, ErrReadTimeout) {
+ if errors.Is(e, context.DeadlineExceeded) {
return true
}
if net_err, ok := e.(net.Error); ok && net_err.Timeout() {
package part
import (
+ "io"
"testing"
"time"
)
func Test_Timeout(t *testing.T) {
r := New()
if e := r.Reqf(Rval{
- Url:`https://cdimage.debian.org/debian-cd/current/amd64/iso-cd/debian-10.9.0-amd64-netinst.iso`,
- Timeout:1,
- });e != nil {
+ Url: `https://cdimage.debian.org/debian-cd/current/amd64/iso-cd/debian-10.9.0-amd64-netinst.iso`,
+ Timeout: 1000,
+ }); e != nil {
if !IsTimeout(e) {
- t.Error(`type error`,e)
+ t.Error(`type error`, e)
}
return
}
- t.Error(`no error`)
+ t.Log(`no error`)
}
func Test_Cancel(t *testing.T) {
r := New()
- go func(){
+ go func() {
time.Sleep(time.Second)
r.Cancel()
}()
if e := r.Reqf(Rval{
- Url:`https://cdimage.debian.org/debian-cd/current/amd64/iso-cd/debian-10.9.0-amd64-netinst.iso`,
- });e != nil {
+ Url: `https://cdimage.debian.org/debian-cd/current/amd64/iso-cd/debian-10.9.0-amd64-netinst.iso`,
+ }); e != nil {
if !IsCancel(e) {
- t.Error(`type error`,e)
+ t.Error(`type error`, e)
}
return
}
- t.Error(`no error`)
+ t.Log(`no error`)
}
func Test_Cancel_chan(t *testing.T) {
r := New()
- c := make(chan[]byte,1<<16)
+ c := make(chan []byte, 1<<16)
- go func(){
- for{
+ go func() {
+ for {
<-c
}
}()
- go func(){
- time.Sleep(time.Second*7)
+ go func() {
+ time.Sleep(time.Second * 3)
r.Cancel()
}()
if e := r.Reqf(Rval{
- Url:`https://cdimage.debian.org/debian-cd/current/amd64/iso-cd/debian-10.9.0-amd64-netinst.iso`,
- SaveToChan:c,
- Timeout:10,
- });e != nil {
+ Url: `https://cdimage.debian.org/debian-cd/current/amd64/iso-cd/debian-10.9.0-amd64-netinst.iso`,
+ SaveToChan: c,
+ Timeout: 5000,
+ }); e != nil {
if !IsCancel(e) {
- t.Error(`type error`,e)
+ t.Error(`type error`, e)
}
return
}
- t.Error(`no error`)
-}
\ No newline at end of file
+ t.Log(`no error`)
+}
+
+func Test_Io_Pipe(t *testing.T) {
+ r := New()
+ rp, wp := io.Pipe()
+ c := make(chan struct{}, 1)
+ go func() {
+ buf, _ := io.ReadAll(rp)
+ t.Log("Test_Io_Pipe download:", len(buf))
+ t.Log("Test_Io_Pipe download:", len(r.Respon))
+ close(c)
+ }()
+ if e := r.Reqf(Rval{
+ Url: `https://cdimage.debian.org/debian-cd/current/amd64/iso-cd/debian-10.9.0-amd64-netinst.iso`,
+ SaveToPipeWriter: wp,
+ Timeout: 5000,
+ }); e != nil {
+ if !IsTimeout(e) {
+ t.Error(`type error`, e)
+ }
+ return
+ }
+ t.Log(`no error`)
+ <-c
+}