Response *http.Response
UsedTime time.Duration
- cancelFs []func()
+ cancelFs chan func()
cancel *signal.Signal
running *signal.Signal
isLive *signal.Signal
responFile *os.File
err error
- l sync.Mutex
+ init sync.RWMutex
+ l sync.Mutex
}
func New() *Req {
}
func (t *Req) Reqf(val Rval) error {
+ t.isLive.Wait()
t.l.Lock()
+ t.init.Lock()
t.Respon = t.Respon[:0]
if t.responBuf == nil {
}
t.Response = nil
t.UsedTime = 0
- t.cancelFs = []func(){}
+ t.cancelFs = make(chan func(), 5)
+ t.isLive = signal.Init()
t.cancel = signal.Init()
t.running = signal.Init()
- t.isLive = signal.Init()
t.responFile = nil
t.err = nil
+ t.init.Unlock()
+
go func() {
+ cancel, cancelFin := t.cancel.WaitC()
+ defer cancelFin()
+ running, runningFin := t.running.WaitC()
+ defer runningFin()
+
select {
- case <-t.cancel.Chan:
- for i := 0; i < len(t.cancelFs); i++ {
- t.cancelFs[i]()
+ case <-cancel:
+ for len(t.cancelFs) != 0 {
+ (<-t.cancelFs)()
}
- case <-t.running.Chan:
+ case <-running:
}
}()
go func() {
if val.SaveToPipeWriter != nil {
val.SaveToPipeWriter.Close()
}
- t.running.Done()
t.cancel.Done()
+ t.running.Done()
t.l.Unlock()
t.isLive.Done()
+ return t.err
} else {
go func() {
t.Wait()
if val.SaveToPipeWriter != nil {
val.SaveToPipeWriter.Close()
}
- t.running.Done()
t.cancel.Done()
+ t.running.Done()
t.l.Unlock()
t.isLive.Done()
}()
}
- return t.err
+ return nil
}
func (t *Req) Reqf_1(val Rval) (err error) {
if val.Timeout > 0 {
cx, cancel = context.WithTimeout(cx, time.Duration(val.Timeout)*time.Millisecond)
}
- t.cancelFs = append(t.cancelFs, cancel)
+ t.cancelFs <- cancel
req, e := http.NewRequest(Method, val.Url, body)
if e != nil {
return
}
ws = append(ws, t.responFile)
- t.cancelFs = append(t.cancelFs, func() { t.responFile.Close() })
+ t.cancelFs <- func() { t.responFile.Close() }
}
if val.SaveToPipeWriter != nil {
ws = append(ws, val.SaveToPipeWriter)
- t.cancelFs = append(t.cancelFs, func() { val.SaveToPipeWriter.Close() })
+ t.cancelFs <- func() { val.SaveToPipeWriter.Close() }
}
if !val.NoResponse {
t.responBuf.Reset()
} else {
resReader = resp.Body
}
- t.cancelFs = append(t.cancelFs, func() { resp.Body.Close() })
+ t.cancelFs <- func() { resp.Body.Close() }
buf := make([]byte, 512)
}
func (t *Req) Wait() error {
+ t.init.RLock()
+ defer t.init.RUnlock()
+
t.running.Wait()
return t.err
}
func (t *Req) Cancel() { t.Close() }
func (t *Req) Close() {
+ t.init.RLock()
+ defer t.init.RUnlock()
+
if !t.cancel.Islive() {
return
}
}
func (t *Req) IsLive() bool {
+ t.init.RLock()
+ defer t.init.RUnlock()
+
return t.isLive.Islive()
}
package part
-type Signal struct{
- Chan chan struct{}
+import (
+ "runtime"
+ "sync/atomic"
+)
+
+type Signal struct {
+ c chan struct{}
+ waitCount atomic.Int32
}
-func Init() (*Signal) {
- return &Signal{Chan:make(chan struct{})}
+func Init() *Signal {
+ return &Signal{c: make(chan struct{})}
}
func (i *Signal) Wait() {
- if i.Islive() {<-i.Chan}
+ if i.Islive() {
+ i.waitCount.Add(1)
+ <-i.c
+ i.waitCount.Add(-1)
+ }
}
-func (i *Signal) WaitC() (<-chan struct{}) {
- if i.Islive() {return i.Chan}
- return nil
+// unsafe. fin() need
+func (i *Signal) WaitC() (c chan struct{}, fin func()) {
+ if i.Islive() {
+ i.waitCount.Add(1)
+ return i.c, func() { i.waitCount.Add(-1) }
+ }
+ return nil, func() {}
}
func (i *Signal) Done() {
- if i.Islive() {close(i.Chan)}
+ if i.Islive() {
+ close(i.c)
+ for !i.waitCount.CompareAndSwap(0, -1) {
+ runtime.Gosched()
+ }
+ }
}
func (i *Signal) Islive() (islive bool) {
- if i == nil {return}
+ if i == nil {
+ return
+ }
select {
- case <-i.Chan:;//close
- default://still alive
- if i.Chan == nil {break}//not make yet
- islive = true//has made
+ case <-i.c: //close
+ default: //still alive
+ if i.c == nil {
+ break
+ } //not make yet
+ islive = true //has made
}
return
-}
\ No newline at end of file
+}