"os"
"strconv"
"strings"
+ "sync"
"time"
flate "compress/flate"
Response *http.Response
UsedTime time.Duration
- cancel *signal.Signal
- running *signal.Signal
- responBuf *bytes.Buffer
+ cancelF func()
+ cancel *signal.Signal
+ running *signal.Signal
+
responFile *os.File
- asyncErr error
+ err error
+
+ l sync.Mutex
}
func New() *Req {
return new(Req)
}
-// func main(){
-// var _ReqfVal = ReqfVal{
-// Url:url,
-// Proxy:proxy,
-// Timeout:10,
-// Retry:2,
-// }
-// Reqf(_ReqfVal)
-// }
-
func (t *Req) Reqf(val Rval) error {
-
- if val.SaveToChan != nil && len(val.SaveToChan) == 1 && !val.Async {
- panic("must make sure chan size larger then 1 or use Async true")
- }
- if val.SaveToPipeWriter != nil && !val.Async {
- panic("SaveToPipeWriter must use Async true")
- }
+ t.l.Lock()
t.Respon = []byte{}
t.Response = nil
t.UsedTime = 0
+ t.cancelF = nil
t.cancel = signal.Init()
t.running = signal.Init()
+ t.responFile = nil
+ t.err = nil
- var returnErr error
-
- _val := val
-
- for SleepTime, Retry := _val.SleepTime, _val.Retry; Retry >= 0; Retry -= 1 {
- returnErr = t.Reqf_1(_val)
+ go func() {
select {
- case <-t.cancel.WaitC(): //cancel
- return returnErr
- default:
- if returnErr == nil {
- return nil
+ case <-t.cancel.Chan:
+ if t.cancelF != nil {
+ t.cancelF()
}
+ case <-t.running.Chan:
}
- time.Sleep(time.Duration(SleepTime) * time.Millisecond)
- }
+ }()
+ go func() {
+ beginTime := time.Now()
+ _val := val
+
+ for SleepTime, Retry := _val.SleepTime, _val.Retry; Retry >= 0; Retry -= 1 {
+ t.err = t.Reqf_1(_val)
+ if t.err == nil || IsCancel(t.err) {
+ break
+ }
+ time.Sleep(time.Duration(SleepTime) * time.Millisecond)
+ }
+
+ t.UsedTime = time.Since(beginTime)
+ t.running.Done()
+ }()
- if !val.Async || returnErr != nil {
- t.asyncErr = returnErr
+ if !val.Async {
+ t.Wait()
if val.SaveToChan != nil {
close(val.SaveToChan)
}
if val.SaveToPipeWriter != nil {
val.SaveToPipeWriter.Close()
}
- if t.responBuf != nil {
- t.Respon = t.responBuf.Bytes()
- }
- t.running.Done()
t.cancel.Done()
+ t.l.Unlock()
+ } else {
+ go func() {
+ t.Wait()
+ if val.SaveToChan != nil {
+ close(val.SaveToChan)
+ }
+ if t.responFile != nil {
+ t.responFile.Close()
+ }
+ if val.SaveToPipeWriter != nil {
+ val.SaveToPipeWriter.Close()
+ }
+ t.cancel.Done()
+ t.l.Unlock()
+ }()
}
- return returnErr
+ return t.err
}
func (t *Req) Reqf_1(val Rval) (err error) {
Header map[string]string = val.Header
)
- var beginTime time.Time = time.Now()
-
var client http.Client
if Header == nil {
if val.Timeout > 0 {
cx, cancel = context.WithTimeout(cx, time.Duration(val.Timeout)*time.Millisecond)
}
-
- go func() {
- select {
- case <-t.cancel.WaitC():
- cancel()
- case <-t.running.WaitC():
- }
- }()
+ t.cancelF = cancel
req, e := http.NewRequest(Method, val.Url, body)
if e != nil {
req.Header.Set(k, v)
}
- resp, err := client.Do(req)
- if v, ok := Header["Connection"]; ok && strings.ToLower(v) != "keep-alive" {
- defer client.CloseIdleConnections()
+ if !t.cancel.Islive() {
+ err = context.Canceled
+ return
}
- if err != nil {
- return err
+ resp, e := client.Do(req)
+
+ if e != nil {
+ err = e
+ return
+ }
+
+ if v, ok := Header["Connection"]; ok && strings.ToLower(v) != "keep-alive" {
+ defer client.CloseIdleConnections()
}
t.Response = resp
- defer func() {
- t.UsedTime = time.Since(beginTime)
- }()
if val.JustResponseCode {
return
err = errors.New(strconv.Itoa(resp.StatusCode))
}
+ var responBuf *bytes.Buffer
var ws []io.Writer
if val.SaveToPath != "" {
- t.responFile, err = os.Create(val.SaveToPath)
+ t.responFile, e = os.Create(val.SaveToPath)
if err != nil {
t.responFile.Close()
- return err
+ err = e
+ return
}
ws = append(ws, t.responFile)
}
ws = append(ws, val.SaveToPipeWriter)
}
if !val.NoResponse {
- if t.responBuf == nil {
- t.responBuf = new(bytes.Buffer)
- } else {
- t.responBuf.Reset()
+ if responBuf == nil {
+ responBuf = new(bytes.Buffer)
}
- ws = append(ws, t.responBuf)
+ ws = append(ws, responBuf)
}
w := io.MultiWriter(ws...)
resReader = resp.Body
}
- go func() {
- buf := make([]byte, 512)
-
- for {
- if n, e := resReader.Read(buf); n != 0 {
- w.Write(buf[:n])
- select {
- case val.SaveToChan <- buf[:n]:
- default:
- }
- } else if e != nil {
- if !errors.Is(e, io.EOF) {
- err = e
- }
- break
- }
+ buf := make([]byte, 512)
- if !t.cancel.Islive() {
- err = context.Canceled
- break
+ for {
+ if n, e := resReader.Read(buf); n != 0 {
+ w.Write(buf[:n])
+ select {
+ case val.SaveToChan <- buf[:n]:
+ default:
}
+ } else if e != nil {
+ if !errors.Is(e, io.EOF) {
+ err = e
+ }
+ break
}
- if val.Async {
- t.asyncErr = err
+ if !t.cancel.Islive() {
+ err = context.Canceled
+ break
}
- resp.Body.Close()
- if val.SaveToChan != nil {
- close(val.SaveToChan)
- }
- if t.responFile != nil {
- t.responFile.Close()
- }
- if val.SaveToPipeWriter != nil {
- val.SaveToPipeWriter.Close()
- }
- if t.responBuf != nil {
- t.Respon = t.responBuf.Bytes()
- }
- t.running.Done()
- }()
- if !val.Async {
- t.Wait()
}
- // if _, e := io.Copy(w, resp.Body); e != nil {
- // err = e
- // }
+
+ resp.Body.Close()
+
+ if responBuf != nil {
+ t.Respon = responBuf.Bytes()
+ }
+
return
}
func (t *Req) Wait() error {
t.running.Wait()
- return t.asyncErr
+ return t.err
}
func (t *Req) Cancel() { t.Close() }
web "github.com/qydysky/part/web"
)
-func Test_req(t *testing.T) {
- addr := "127.0.0.1:10001"
+var addr = "127.0.0.1:10001"
+
+func init() {
s := web.New(&http.Server{
Addr: addr,
WriteTimeout: time.Second * time.Duration(10),
s.Server.Shutdown(context.Background())
},
})
+}
+
+func Test_req7(t *testing.T) {
+ r := New()
+ r.Reqf(Rval{
+ Url: "http://" + addr + "/to",
+ Async: true,
+ })
+ r.Cancel()
+ if !IsCancel(r.Wait()) {
+ t.Error("async Cancel fail")
+ }
+}
+func Test_req(t *testing.T) {
r := New()
r.Reqf(Rval{
Url: "http://" + addr + "/br",
})
if !bytes.Equal(r.Respon, []byte("abc强强强强")) {
- t.Error("br fail")
+ t.Error("br fail", r.Respon)
}
r.Reqf(Rval{
Url: "http://" + addr + "/gzip",
if !bytes.Equal(r.Respon, []byte("abc强强强强")) {
t.Error("flate fail")
}
+}
+
+func Test_req2(t *testing.T) {
+ r := New()
{
e := r.Reqf(Rval{
Url: "http://" + addr + "/to",
t.Error("Timeout fail")
}
}
+}
+
+func Test_req4(t *testing.T) {
+ r := New()
{
r.Reqf(Rval{
Url: "http://" + addr + "/to",
Timeout: 1000,
Async: true,
})
- if !IsTimeout(r.Wait()) {
- t.Error("Async Timeout fail")
+ if e := r.Wait(); !IsTimeout(e) {
+ t.Error("Async Timeout fail", e)
}
}
+}
+
+func Test_req5(t *testing.T) {
+ r := New()
{
c := make(chan []byte)
r.Reqf(Rval{
}
}
if !IsTimeout(r.Wait()) {
- t.Error("async cancel fail")
+ t.Error("async IsTimeout fail")
}
}
+}
+
+func Test_req6(t *testing.T) {
+ r := New()
{
c := make(chan []byte)
r.Reqf(Rval{
t.Error("Cancel fail")
}
}
+}
+
+func Test_req3(t *testing.T) {
+ r := New()
{
rc, wc := io.Pipe()
c := make(chan struct{})
}
close(c)
}()
+ r.Reqf(Rval{
+ Url: "http://" + addr + "/flate",
+ SaveToPipeWriter: wc,
+ })
+ <-c
+ }
+ {
+ rc, wc := io.Pipe()
+ c := make(chan struct{})
+ go func() {
+ var buf []byte = make([]byte, 1<<16)
+ n, _ := rc.Read(buf)
+ d := buf[:n]
+ // d, _ := io.ReadAll(rc)
+ if !bytes.Equal(d, []byte("abc强强强强")) {
+ t.Error("flate fail")
+ }
+ close(c)
+ }()
r.Reqf(Rval{
Url: "http://" + addr + "/flate",
SaveToPipeWriter: wc,
Url: "http://" + addr + "/flate",
Async: true,
})
- if len(r.Respon) != 0 {
- t.Error("async fail")
- }
r.Wait()
if !bytes.Equal(r.Respon, []byte("abc强强强强")) {
- t.Error("async fail")
+ t.Error("async fail", r.Respon)
}
}
{
if len(r.Respon) != 0 {
t.Error("io async fail")
}
- d, _ := io.ReadAll(rc)
+ var buf []byte = make([]byte, 1<<16)
+ n, _ := rc.Read(buf)
+ d := buf[:n]
if !bytes.Equal(d, []byte("abc强强强强")) {
- t.Error("io async fail")
- }
- if !bytes.Equal(r.Respon, []byte("abc强强强强")) {
- t.Error("io async fail")
+ t.Error("io async fail", d)
}
}
}