]> 127.0.0.1 Git - part/.git/commitdiff
Fix data race
authorqydysky <32743305+qydysky@users.noreply.github.com>
Sat, 11 Mar 2023 15:41:44 +0000 (23:41 +0800)
committerqydysky <32743305+qydysky@users.noreply.github.com>
Sat, 11 Mar 2023 15:41:44 +0000 (23:41 +0800)
reqf/Reqf.go
signal/Signal.go
signal/Signal_test.go

index 3f7caa4f0a305bb85b4226e4f5c97a10c693bcf0..5a7b2feb3194c26dd25ae8364d82d53592ce9772 100644 (file)
@@ -48,7 +48,7 @@ type Req struct {
        Response  *http.Response
        UsedTime  time.Duration
 
-       cancelFs []func()
+       cancelFs chan func()
        cancel   *signal.Signal
        running  *signal.Signal
        isLive   *signal.Signal
@@ -56,7 +56,8 @@ type Req struct {
        responFile *os.File
        err        error
 
-       l sync.Mutex
+       init sync.RWMutex
+       l    sync.Mutex
 }
 
 func New() *Req {
@@ -64,7 +65,9 @@ func New() *Req {
 }
 
 func (t *Req) Reqf(val Rval) error {
+       t.isLive.Wait()
        t.l.Lock()
+       t.init.Lock()
 
        t.Respon = t.Respon[:0]
        if t.responBuf == nil {
@@ -72,20 +75,27 @@ func (t *Req) Reqf(val Rval) error {
        }
        t.Response = nil
        t.UsedTime = 0
-       t.cancelFs = []func(){}
+       t.cancelFs = make(chan func(), 5)
+       t.isLive = signal.Init()
        t.cancel = signal.Init()
        t.running = signal.Init()
-       t.isLive = signal.Init()
        t.responFile = nil
        t.err = nil
 
+       t.init.Unlock()
+
        go func() {
+               cancel, cancelFin := t.cancel.WaitC()
+               defer cancelFin()
+               running, runningFin := t.running.WaitC()
+               defer runningFin()
+
                select {
-               case <-t.cancel.Chan:
-                       for i := 0; i < len(t.cancelFs); i++ {
-                               t.cancelFs[i]()
+               case <-cancel:
+                       for len(t.cancelFs) != 0 {
+                               (<-t.cancelFs)()
                        }
-               case <-t.running.Chan:
+               case <-running:
                }
        }()
        go func() {
@@ -115,10 +125,11 @@ func (t *Req) Reqf(val Rval) error {
                if val.SaveToPipeWriter != nil {
                        val.SaveToPipeWriter.Close()
                }
-               t.running.Done()
                t.cancel.Done()
+               t.running.Done()
                t.l.Unlock()
                t.isLive.Done()
+               return t.err
        } else {
                go func() {
                        t.Wait()
@@ -131,13 +142,13 @@ func (t *Req) Reqf(val Rval) error {
                        if val.SaveToPipeWriter != nil {
                                val.SaveToPipeWriter.Close()
                        }
-                       t.running.Done()
                        t.cancel.Done()
+                       t.running.Done()
                        t.l.Unlock()
                        t.isLive.Done()
                }()
        }
-       return t.err
+       return nil
 }
 
 func (t *Req) Reqf_1(val Rval) (err error) {
@@ -183,7 +194,7 @@ func (t *Req) Reqf_1(val Rval) (err error) {
        if val.Timeout > 0 {
                cx, cancel = context.WithTimeout(cx, time.Duration(val.Timeout)*time.Millisecond)
        }
-       t.cancelFs = append(t.cancelFs, cancel)
+       t.cancelFs <- cancel
 
        req, e := http.NewRequest(Method, val.Url, body)
        if e != nil {
@@ -250,11 +261,11 @@ func (t *Req) Reqf_1(val Rval) (err error) {
                        return
                }
                ws = append(ws, t.responFile)
-               t.cancelFs = append(t.cancelFs, func() { t.responFile.Close() })
+               t.cancelFs <- func() { t.responFile.Close() }
        }
        if val.SaveToPipeWriter != nil {
                ws = append(ws, val.SaveToPipeWriter)
-               t.cancelFs = append(t.cancelFs, func() { val.SaveToPipeWriter.Close() })
+               t.cancelFs <- func() { val.SaveToPipeWriter.Close() }
        }
        if !val.NoResponse {
                t.responBuf.Reset()
@@ -278,7 +289,7 @@ func (t *Req) Reqf_1(val Rval) (err error) {
        } else {
                resReader = resp.Body
        }
-       t.cancelFs = append(t.cancelFs, func() { resp.Body.Close() })
+       t.cancelFs <- func() { resp.Body.Close() }
 
        buf := make([]byte, 512)
 
@@ -311,6 +322,9 @@ func (t *Req) Reqf_1(val Rval) (err error) {
 }
 
 func (t *Req) Wait() error {
+       t.init.RLock()
+       defer t.init.RUnlock()
+
        t.running.Wait()
        return t.err
 }
@@ -318,6 +332,9 @@ func (t *Req) Wait() error {
 func (t *Req) Cancel() { t.Close() }
 
 func (t *Req) Close() {
+       t.init.RLock()
+       defer t.init.RUnlock()
+
        if !t.cancel.Islive() {
                return
        }
@@ -325,6 +342,9 @@ func (t *Req) Close() {
 }
 
 func (t *Req) IsLive() bool {
+       t.init.RLock()
+       defer t.init.RUnlock()
+
        return t.isLive.Islive()
 }
 
index 2721d99d4bc103ee2d67fd99d31aee836a2e39c8..e3cb8fc90191ec056ec9b8ccf7532f680556e342 100644 (file)
@@ -1,33 +1,56 @@
 package part
 
-type Signal struct{
-       Chan chan struct{}
+import (
+       "runtime"
+       "sync/atomic"
+)
+
+type Signal struct {
+       c         chan struct{}
+       waitCount atomic.Int32
 }
 
-func Init() (*Signal) {
-       return &Signal{Chan:make(chan struct{})}
+func Init() *Signal {
+       return &Signal{c: make(chan struct{})}
 }
 
 func (i *Signal) Wait() {
-       if i.Islive() {<-i.Chan}
+       if i.Islive() {
+               i.waitCount.Add(1)
+               <-i.c
+               i.waitCount.Add(-1)
+       }
 }
 
-func (i *Signal) WaitC() (<-chan struct{}) {
-       if i.Islive() {return i.Chan}
-       return nil
+// unsafe. fin() need
+func (i *Signal) WaitC() (c chan struct{}, fin func()) {
+       if i.Islive() {
+               i.waitCount.Add(1)
+               return i.c, func() { i.waitCount.Add(-1) }
+       }
+       return nil, func() {}
 }
 
 func (i *Signal) Done() {
-       if i.Islive() {close(i.Chan)}
+       if i.Islive() {
+               close(i.c)
+               for !i.waitCount.CompareAndSwap(0, -1) {
+                       runtime.Gosched()
+               }
+       }
 }
 
 func (i *Signal) Islive() (islive bool) {
-       if i == nil {return}
+       if i == nil {
+               return
+       }
        select {
-       case <-i.Chan:;//close
-       default://still alive
-               if i.Chan == nil {break}//not make yet
-               islive = true//has made
+       case <-i.c: //close
+       default: //still alive
+               if i.c == nil {
+                       break
+               } //not make yet
+               islive = true //has made
        }
        return
-}
\ No newline at end of file
+}
index c0ccfb1f002bac316dedd78f3b283561ace7d718..3b64f29a99ba03a3d1cb139a427c28816150bcad 100644 (file)
@@ -6,6 +6,7 @@ import (
 
 func Test_signal(t *testing.T) {
        var s *Signal
+       s.Wait()
        t.Log(s.Islive())
        s.Done()
        t.Log(s.Islive())
@@ -14,3 +15,28 @@ func Test_signal(t *testing.T) {
        s.Done()
        t.Log(s.Islive())
 }
+
+func Test_signal2(t *testing.T) {
+       s := Init()
+       go s.Done()
+       s.Wait()
+}
+
+func Test_signal3(t *testing.T) {
+       var s *Signal
+       go func() {
+               if s != nil {
+                       s.Islive()
+               }
+       }()
+       s = Init()
+}
+
+func BenchmarkXxx(b *testing.B) {
+       b.ResetTimer()
+       for i := 0; i < b.N; i++ {
+               s := Init()
+               go s.Done()
+               s.Wait()
+       }
+}