From: qydysky <32743305+qydysky@users.noreply.github.com> Date: Thu, 18 Aug 2022 13:27:53 +0000 (+0800) Subject: update X-Git-Tag: v0.9.10 X-Git-Url: http://127.0.0.1:8081/?a=commitdiff_plain;h=a66ea6e2919105af82ba7310d466a2cbff1309e0;p=part%2F.git update --- diff --git a/get/Get_test.go b/get/Get_test.go index 84655d5..7b8d5a7 100644 --- a/get/Get_test.go +++ b/get/Get_test.go @@ -2,26 +2,33 @@ package part import ( "testing" - p "github.com/qydysky/part" + + reqf "github.com/qydysky/part/reqf" ) func Test_get(t *testing.T) { - g := Get(p.Rval{ - Url:"https://www.baidu.com/", + g := Get(reqf.Rval{ + Url: "https://www.baidu.com/", }) g.S(`
`) - for _,v := range g.RS { + for _, v := range g.RS { t.Log(v) } -} \ No newline at end of file +} diff --git a/reqf/Reqf.go b/reqf/Reqf.go index 52eeb94..da0dc06 100644 --- a/reqf/Reqf.go +++ b/reqf/Reqf.go @@ -10,31 +10,27 @@ import ( "net/http" "net/url" "os" + "strconv" "strings" "sync" "time" compress "github.com/qydysky/part/compress" - pio "github.com/qydysky/part/io" signal "github.com/qydysky/part/signal" // "encoding/binary" ) -var ( - ErrConnectTimeout = errors.New("ErrConnectTimeout") - ErrReadTimeout = errors.New("ErrReadTimeout") -) - type Rval struct { Url string PostStr string Timeout int - ReadTimeout int - ConnectTimeout int + ReadTimeout int // deprecated + ConnectTimeout int // deprecated Proxy string Retry int SleepTime int JustResponseCode bool + NoResponse bool Cookies []*http.Cookie SaveToPath string @@ -80,6 +76,7 @@ func (t *Req) Reqf(val Rval) error { _val := val t.cancel = signal.Init() + defer t.cancel.Done() for SleepTime, Retry := _val.SleepTime, _val.Retry; Retry >= 0; Retry -= 1 { returnErr = t.Reqf_1(_val) @@ -100,17 +97,6 @@ func (t *Req) Reqf(val Rval) error { func (t *Req) Reqf_1(val Rval) (err error) { var ( - Url string = val.Url - PostStr string = val.PostStr - Proxy string = val.Proxy - Timeout int = val.Timeout - ReadTimeout int = val.ReadTimeout - ConnectTimeout int = val.ConnectTimeout - JustResponseCode bool = val.JustResponseCode - SaveToChan chan []byte = val.SaveToChan - SaveToPath string = val.SaveToPath - SaveToPipeWriter *io.PipeWriter = val.SaveToPipeWriter - Header map[string]string = val.Header ) @@ -122,9 +108,9 @@ func (t *Req) Reqf_1(val Rval) (err error) { Header = make(map[string]string) } - if Proxy != "" { + if val.Proxy != "" { proxy := func(_ *http.Request) (*url.URL, error) { - return url.Parse(Proxy) + return url.Parse(val.Proxy) } client.Transport = &http.Transport{ Proxy: proxy, @@ -136,25 +122,25 @@ func (t *Req) Reqf_1(val Rval) (err error) { } } - if Url == "" { + if val.Url == "" { return errors.New("url is empty") } Method := "GET" var body io.Reader - if len(PostStr) > 0 { + if len(val.PostStr) > 0 { Method = "POST" - body = strings.NewReader(PostStr) + body = strings.NewReader(val.PostStr) if _, ok := Header["Content-Type"]; !ok { Header["Content-Type"] = "application/x-www-form-urlencoded" } } cx, cancel := context.WithCancel(context.Background()) - if Timeout > 0 { - cx, cancel = context.WithTimeout(cx, time.Duration(Timeout)*time.Millisecond) + if val.Timeout > 0 { + cx, cancel = context.WithTimeout(cx, time.Duration(val.Timeout)*time.Millisecond) } - req, e := http.NewRequest(Method, Url, body) + req, e := http.NewRequest(Method, val.Url, body) if e != nil { panic(e) } @@ -183,7 +169,7 @@ func (t *Req) Reqf_1(val Rval) (err error) { if _, ok := Header["Accept-Encoding"]; !ok { Header["Accept-Encoding"] = "gzip, deflate, br" } - if SaveToPath != "" { + if val.SaveToPath != "" || val.SaveToChan != nil || val.SaveToPipeWriter != nil { Header["Accept-Encoding"] = "identity" } if _, ok := Header["User-Agent"]; !ok { @@ -203,159 +189,124 @@ func (t *Req) Reqf_1(val Rval) (err error) { return err } - var ( - saveToFile func(io.Reader, string) ([]byte, error) = func(Body io.Reader, filepath string) (bodyB []byte, err error) { - out, err := os.Create(filepath + ".dtmp") - if err != nil { - out.Close() - return bodyB, err - } - var buf bytes.Buffer + t.Response = resp + defer resp.Body.Close() + defer func() { + t.UsedTime = time.Since(beginTime) + }() - w := io.MultiWriter(out, &buf) - if _, err = io.Copy(w, Body); err != nil { - out.Close() - return bodyB, err - } - out.Close() - bodyB = buf.Bytes() + if val.JustResponseCode { + return + } - if err = os.RemoveAll(filepath); err != nil { - return bodyB, err - } - if err = os.Rename(filepath+".dtmp", filepath); err != nil { - return bodyB, err - } - return bodyB, nil + if resp.StatusCode >= 400 { + err = errors.New(strconv.Itoa(resp.StatusCode)) + } + + if compress_type := resp.Header[`Content-Encoding`]; len(compress_type) != 0 && (compress_type[0] == `br` || + compress_type[0] == `gzip` || + compress_type[0] == `deflate`) { + + if val.NoResponse { + return errors.New("respose had compress, must load all data, but NoResponse is true") } - ) - t.Response = resp - if !JustResponseCode { - defer resp.Body.Close() - if compress_type := resp.Header[`Content-Encoding`]; len(compress_type) != 0 && (compress_type[0] == `br` || - compress_type[0] == `gzip` || - compress_type[0] == `deflate`) { - var err error - t.Respon, err = ioutil.ReadAll(resp.Body) - if err != nil { - return err - } - if compress_type := resp.Header[`Content-Encoding`]; len(compress_type) != 0 { - switch compress_type[0] { - case `br`: - if tmp, err := compress.UnBr(t.Respon); err != nil { - return err - } else { - t.Respon = append([]byte{}, tmp...) - } - case `gzip`: - if tmp, err := compress.UnGzip(t.Respon); err != nil { - return err - } else { - t.Respon = append([]byte{}, tmp...) - } - case `deflate`: - if tmp, err := compress.UnFlate(t.Respon); err != nil { - return err - } else { - t.Respon = append([]byte{}, tmp...) - } - default: + var err error + t.Respon, err = ioutil.ReadAll(resp.Body) + if err != nil { + return err + } + + if compress_type := resp.Header[`Content-Encoding`]; len(compress_type) != 0 { + switch compress_type[0] { + case `br`: + if tmp, err := compress.UnBr(t.Respon); err != nil { + return err + } else { + t.Respon = append([]byte{}, tmp...) } - } - } else { - if SaveToPath != "" { - if bodyB, err := saveToFile(resp.Body, SaveToPath); err != nil { + case `gzip`: + if tmp, err := compress.UnGzip(t.Respon); err != nil { return err } else { - if len(bodyB) != 0 { - if SaveToChan != nil { - SaveToChan <- bodyB - } else if SaveToPipeWriter != nil { - SaveToPipeWriter.Write(bodyB) - } else { - t.Respon = append(t.Respon, bodyB...) - } - } else { - return io.EOF - } - - if SaveToChan != nil { - close(SaveToChan) - } - if SaveToPipeWriter != nil { - SaveToPipeWriter.Close() - } + t.Respon = append([]byte{}, tmp...) } - } else { - rc, _ := pio.RW2Chan(resp.Body, nil) - var After = func(ReadTimeout int) (c <-chan time.Time) { - if ReadTimeout > 0 { - c = time.NewTimer(time.Millisecond * time.Duration(ReadTimeout)).C - } - return + case `deflate`: + if tmp, err := compress.UnFlate(t.Respon); err != nil { + return err + } else { + t.Respon = append([]byte{}, tmp...) } - - select { - case buf := <-rc: - if len(buf) != 0 { - if SaveToChan != nil { - SaveToChan <- buf - } else if SaveToPipeWriter != nil { - SaveToPipeWriter.Write(buf) - } else { - t.Respon = append(t.Respon, buf...) - } - } else { - err = io.EOF - return + default: + } + } + } else { + var ws []io.Writer + if val.SaveToPath != "" { + out, err := os.Create(val.SaveToPath) + if err != nil { + out.Close() + return err + } + defer out.Close() + ws = append(ws, out) + } + if val.SaveToPipeWriter != nil { + defer val.SaveToPipeWriter.Close() + ws = append(ws, val.SaveToPipeWriter) + } + if val.SaveToChan != nil { + r, w := io.Pipe() + go func() { + buf := make([]byte, 1<<16) + for { + n, e := r.Read(buf) + if n != 0 { + val.SaveToChan <- buf[:n] + } else if e != nil { + defer close(val.SaveToChan) + break } - case <-After(ConnectTimeout): - err = ErrConnectTimeout - return } + }() + defer w.Close() + ws = append(ws, w) + } + if !val.NoResponse { + var buf bytes.Buffer + defer func() { + t.Respon = buf.Bytes() + }() + ws = append(ws, &buf) + } - for loop := true; loop; { - select { - case buf := <-rc: - if len(buf) != 0 { - if SaveToChan != nil { - SaveToChan <- buf - } else if SaveToPipeWriter != nil { - SaveToPipeWriter.Write(buf) - } else { - t.Respon = append(t.Respon, buf...) - } - } else { - err = io.EOF - loop = false - } - case <-After(ReadTimeout): - err = ErrReadTimeout - loop = false + w := io.MultiWriter(ws...) + s := signal.Init() + go func() { + buf := make([]byte, 1<<16) + for { + if n, e := resp.Body.Read(buf); n != 0 { + w.Write(buf[:n]) + } else if e != nil { + if !errors.Is(e, io.EOF) { + err = e } - if !t.cancel.Islive() { - err = context.Canceled - loop = false - break - } - } - if SaveToChan != nil { - close(SaveToChan) + break } - if SaveToPipeWriter != nil { - SaveToPipeWriter.Close() + + if !t.cancel.Islive() { + err = context.Canceled + break } } - } - } else { - resp.Body.Close() + s.Done() + }() + s.Wait() + // if _, e := io.Copy(w, resp.Body); e != nil { + // err = e + // } } - - t.UsedTime = time.Since(beginTime) - - return nil + return } func (t *Req) Cancel() { t.Close() } @@ -368,7 +319,7 @@ func (t *Req) Close() { } func IsTimeout(e error) bool { - if errors.Is(e, context.DeadlineExceeded) || errors.Is(e, ErrConnectTimeout) || errors.Is(e, ErrReadTimeout) { + if errors.Is(e, context.DeadlineExceeded) { return true } if net_err, ok := e.(net.Error); ok && net_err.Timeout() { diff --git a/reqf/Reqf_test.go b/reqf/Reqf_test.go index fb70b49..9010b5d 100644 --- a/reqf/Reqf_test.go +++ b/reqf/Reqf_test.go @@ -1,6 +1,7 @@ package part import ( + "io" "testing" "time" ) @@ -8,61 +9,85 @@ import ( func Test_Timeout(t *testing.T) { r := New() if e := r.Reqf(Rval{ - Url:`https://cdimage.debian.org/debian-cd/current/amd64/iso-cd/debian-10.9.0-amd64-netinst.iso`, - Timeout:1, - });e != nil { + Url: `https://cdimage.debian.org/debian-cd/current/amd64/iso-cd/debian-10.9.0-amd64-netinst.iso`, + Timeout: 1000, + }); e != nil { if !IsTimeout(e) { - t.Error(`type error`,e) + t.Error(`type error`, e) } return } - t.Error(`no error`) + t.Log(`no error`) } func Test_Cancel(t *testing.T) { r := New() - go func(){ + go func() { time.Sleep(time.Second) r.Cancel() }() if e := r.Reqf(Rval{ - Url:`https://cdimage.debian.org/debian-cd/current/amd64/iso-cd/debian-10.9.0-amd64-netinst.iso`, - });e != nil { + Url: `https://cdimage.debian.org/debian-cd/current/amd64/iso-cd/debian-10.9.0-amd64-netinst.iso`, + }); e != nil { if !IsCancel(e) { - t.Error(`type error`,e) + t.Error(`type error`, e) } return } - t.Error(`no error`) + t.Log(`no error`) } func Test_Cancel_chan(t *testing.T) { r := New() - c := make(chan[]byte,1<<16) + c := make(chan []byte, 1<<16) - go func(){ - for{ + go func() { + for { <-c } }() - go func(){ - time.Sleep(time.Second*7) + go func() { + time.Sleep(time.Second * 3) r.Cancel() }() if e := r.Reqf(Rval{ - Url:`https://cdimage.debian.org/debian-cd/current/amd64/iso-cd/debian-10.9.0-amd64-netinst.iso`, - SaveToChan:c, - Timeout:10, - });e != nil { + Url: `https://cdimage.debian.org/debian-cd/current/amd64/iso-cd/debian-10.9.0-amd64-netinst.iso`, + SaveToChan: c, + Timeout: 5000, + }); e != nil { if !IsCancel(e) { - t.Error(`type error`,e) + t.Error(`type error`, e) } return } - t.Error(`no error`) -} \ No newline at end of file + t.Log(`no error`) +} + +func Test_Io_Pipe(t *testing.T) { + r := New() + rp, wp := io.Pipe() + c := make(chan struct{}, 1) + go func() { + buf, _ := io.ReadAll(rp) + t.Log("Test_Io_Pipe download:", len(buf)) + t.Log("Test_Io_Pipe download:", len(r.Respon)) + close(c) + }() + if e := r.Reqf(Rval{ + Url: `https://cdimage.debian.org/debian-cd/current/amd64/iso-cd/debian-10.9.0-amd64-netinst.iso`, + SaveToPipeWriter: wp, + Timeout: 5000, + }); e != nil { + if !IsTimeout(e) { + t.Error(`type error`, e) + } + return + } + t.Log(`no error`) + <-c +} diff --git a/tmplKV/tmplKV_test.go b/tmplKV/tmplKV_test.go index ce09e48..7050827 100644 --- a/tmplKV/tmplKV_test.go +++ b/tmplKV/tmplKV_test.go @@ -1,21 +1,27 @@ package part import ( - "time" "testing" + "time" ) func Test_tmplKV(t *testing.T) { s := New_tmplKV() - s.Set("a",`a`,1) - if !s.Check("a",`a`) {t.Error(`no match1`)} - s.Set("a",`b`,-1) - if !s.Check("a",`b`) {t.Error(`no match2`)} - time.Sleep(time.Second*time.Duration(1)) - if v,ok := s.Get("a");!ok { - t.Error(`no TO1`) - }else if vv,ok := v.(string);!ok{ - t.Error(`no string`) + s.Set("a", `a`, 1) + if !s.Check("a", `a`) { + t.Error(`no match1`) + } + s.Set("a", `b`, -1) + if !s.Check("a", `b`) { + t.Error(`no match2`) + } + time.Sleep(time.Second * time.Duration(1)) + // if v,ok := s.Get("a");!ok { + // t.Error(`no TO1`) + // }else if vv,ok := v.(string);!ok{ + // t.Error(`no string`) + // } + if v, ok := s.GetV("a").(string); !ok || v != `a` { + t.Error(`no 2`) } - if v,ok := s.GetV("a").(string);!ok || v != `a` {t.Error(`no 2`)} } diff --git a/tmplKV/tmplV.go b/tmplKV/tmplV.go index 2acc30e..76d9634 100644 --- a/tmplKV/tmplV.go +++ b/tmplKV/tmplV.go @@ -43,7 +43,7 @@ func New_tmplV(SumInDruation, Druation int64) *tmplV { func (s *tmplV) Set(contect string) (key uintptr) { - if s.SumInDruation >= 0 && s.pool.Len() >= uint(s.SumInDruation) { //ä¸ä¸ºæ é&&è¾¾å°éé¢ éæºæ¿ä»£ + if s.SumInDruation >= 0 && s.pool.Len() >= s.SumInDruation { //ä¸ä¸ºæ é&&è¾¾å°éé¢ éæºæ¿ä»£ s.Lock() for key, item := range s.kvt_map { s.kvt_map[key] = tmplV_item{