From: qydysky <32743305+qydysky@users.noreply.github.com> Date: Sat, 11 Mar 2023 15:41:44 +0000 (+0800) Subject: Fix data race X-Git-Tag: v0.24.0~3 X-Git-Url: http://127.0.0.1:8081/?a=commitdiff_plain;h=d3f68efd3cf02553a18f954dfd9f5ad1ed47f1fd;p=part%2F.git Fix data race --- diff --git a/reqf/Reqf.go b/reqf/Reqf.go index 3f7caa4..5a7b2fe 100644 --- a/reqf/Reqf.go +++ b/reqf/Reqf.go @@ -48,7 +48,7 @@ type Req struct { Response *http.Response UsedTime time.Duration - cancelFs []func() + cancelFs chan func() cancel *signal.Signal running *signal.Signal isLive *signal.Signal @@ -56,7 +56,8 @@ type Req struct { responFile *os.File err error - l sync.Mutex + init sync.RWMutex + l sync.Mutex } func New() *Req { @@ -64,7 +65,9 @@ func New() *Req { } func (t *Req) Reqf(val Rval) error { + t.isLive.Wait() t.l.Lock() + t.init.Lock() t.Respon = t.Respon[:0] if t.responBuf == nil { @@ -72,20 +75,27 @@ func (t *Req) Reqf(val Rval) error { } t.Response = nil t.UsedTime = 0 - t.cancelFs = []func(){} + t.cancelFs = make(chan func(), 5) + t.isLive = signal.Init() t.cancel = signal.Init() t.running = signal.Init() - t.isLive = signal.Init() t.responFile = nil t.err = nil + t.init.Unlock() + go func() { + cancel, cancelFin := t.cancel.WaitC() + defer cancelFin() + running, runningFin := t.running.WaitC() + defer runningFin() + select { - case <-t.cancel.Chan: - for i := 0; i < len(t.cancelFs); i++ { - t.cancelFs[i]() + case <-cancel: + for len(t.cancelFs) != 0 { + (<-t.cancelFs)() } - case <-t.running.Chan: + case <-running: } }() go func() { @@ -115,10 +125,11 @@ func (t *Req) Reqf(val Rval) error { if val.SaveToPipeWriter != nil { val.SaveToPipeWriter.Close() } - t.running.Done() t.cancel.Done() + t.running.Done() t.l.Unlock() t.isLive.Done() + return t.err } else { go func() { t.Wait() @@ -131,13 +142,13 @@ func (t *Req) Reqf(val Rval) error { if val.SaveToPipeWriter != nil { val.SaveToPipeWriter.Close() } - t.running.Done() t.cancel.Done() + t.running.Done() t.l.Unlock() t.isLive.Done() }() } - return t.err + return nil } func (t *Req) Reqf_1(val Rval) (err error) { @@ -183,7 +194,7 @@ func (t *Req) Reqf_1(val Rval) (err error) { if val.Timeout > 0 { cx, cancel = context.WithTimeout(cx, time.Duration(val.Timeout)*time.Millisecond) } - t.cancelFs = append(t.cancelFs, cancel) + t.cancelFs <- cancel req, e := http.NewRequest(Method, val.Url, body) if e != nil { @@ -250,11 +261,11 @@ func (t *Req) Reqf_1(val Rval) (err error) { return } ws = append(ws, t.responFile) - t.cancelFs = append(t.cancelFs, func() { t.responFile.Close() }) + t.cancelFs <- func() { t.responFile.Close() } } if val.SaveToPipeWriter != nil { ws = append(ws, val.SaveToPipeWriter) - t.cancelFs = append(t.cancelFs, func() { val.SaveToPipeWriter.Close() }) + t.cancelFs <- func() { val.SaveToPipeWriter.Close() } } if !val.NoResponse { t.responBuf.Reset() @@ -278,7 +289,7 @@ func (t *Req) Reqf_1(val Rval) (err error) { } else { resReader = resp.Body } - t.cancelFs = append(t.cancelFs, func() { resp.Body.Close() }) + t.cancelFs <- func() { resp.Body.Close() } buf := make([]byte, 512) @@ -311,6 +322,9 @@ func (t *Req) Reqf_1(val Rval) (err error) { } func (t *Req) Wait() error { + t.init.RLock() + defer t.init.RUnlock() + t.running.Wait() return t.err } @@ -318,6 +332,9 @@ func (t *Req) Wait() error { func (t *Req) Cancel() { t.Close() } func (t *Req) Close() { + t.init.RLock() + defer t.init.RUnlock() + if !t.cancel.Islive() { return } @@ -325,6 +342,9 @@ func (t *Req) Close() { } func (t *Req) IsLive() bool { + t.init.RLock() + defer t.init.RUnlock() + return t.isLive.Islive() } diff --git a/signal/Signal.go b/signal/Signal.go index 2721d99..e3cb8fc 100644 --- a/signal/Signal.go +++ b/signal/Signal.go @@ -1,33 +1,56 @@ package part -type Signal struct{ - Chan chan struct{} +import ( + "runtime" + "sync/atomic" +) + +type Signal struct { + c chan struct{} + waitCount atomic.Int32 } -func Init() (*Signal) { - return &Signal{Chan:make(chan struct{})} +func Init() *Signal { + return &Signal{c: make(chan struct{})} } func (i *Signal) Wait() { - if i.Islive() {<-i.Chan} + if i.Islive() { + i.waitCount.Add(1) + <-i.c + i.waitCount.Add(-1) + } } -func (i *Signal) WaitC() (<-chan struct{}) { - if i.Islive() {return i.Chan} - return nil +// unsafe. fin() need +func (i *Signal) WaitC() (c chan struct{}, fin func()) { + if i.Islive() { + i.waitCount.Add(1) + return i.c, func() { i.waitCount.Add(-1) } + } + return nil, func() {} } func (i *Signal) Done() { - if i.Islive() {close(i.Chan)} + if i.Islive() { + close(i.c) + for !i.waitCount.CompareAndSwap(0, -1) { + runtime.Gosched() + } + } } func (i *Signal) Islive() (islive bool) { - if i == nil {return} + if i == nil { + return + } select { - case <-i.Chan:;//close - default://still alive - if i.Chan == nil {break}//not make yet - islive = true//has made + case <-i.c: //close + default: //still alive + if i.c == nil { + break + } //not make yet + islive = true //has made } return -} \ No newline at end of file +} diff --git a/signal/Signal_test.go b/signal/Signal_test.go index c0ccfb1..3b64f29 100644 --- a/signal/Signal_test.go +++ b/signal/Signal_test.go @@ -6,6 +6,7 @@ import ( func Test_signal(t *testing.T) { var s *Signal + s.Wait() t.Log(s.Islive()) s.Done() t.Log(s.Islive()) @@ -14,3 +15,28 @@ func Test_signal(t *testing.T) { s.Done() t.Log(s.Islive()) } + +func Test_signal2(t *testing.T) { + s := Init() + go s.Done() + s.Wait() +} + +func Test_signal3(t *testing.T) { + var s *Signal + go func() { + if s != nil { + s.Islive() + } + }() + s = Init() +} + +func BenchmarkXxx(b *testing.B) { + b.ResetTimer() + for i := 0; i < b.N; i++ { + s := Init() + go s.Done() + s.Wait() + } +}