]> 127.0.0.1 Git - part/.git/commitdiff
add
authorqydysky <qydysky@foxmail.com>
Sat, 13 May 2023 11:55:28 +0000 (19:55 +0800)
committerqydysky <qydysky@foxmail.com>
Sat, 13 May 2023 11:55:28 +0000 (19:55 +0800)
msgq/Msgq.go
sync/RWMutex.go [new file with mode: 0644]
sync/RWMutex_test.go [new file with mode: 0644]

index 7e7898732ca7b9c0f2db33226328fb3ed1d58cf3..e1eb58835f0c226c699d92abf57be6f3b2862eff 100644 (file)
@@ -61,7 +61,6 @@ func (m *Msgq) Push(msg any) {
                        m.funcs.Remove(removes[i])
                }
                m.lock.Unlock()
-               removes = nil
        }
 }
 
@@ -88,7 +87,6 @@ func (m *Msgq) PushLock(msg any) {
                for i := 0; i < len(removes); i++ {
                        m.funcs.Remove(removes[i])
                }
-               removes = nil
        }
 }
 
@@ -113,7 +111,6 @@ func (m *Msgq) ClearAll() {
                for i := 0; i < len(removes); i++ {
                        m.funcs.Remove(removes[i])
                }
-               removes = nil
        }
 }
 
@@ -233,7 +230,7 @@ func (m *MsgType[T]) Push_tag(Tag string, Data T) {
 }
 
 func (m *MsgType[T]) PushLock_tag(Tag string, Data T) {
-       m.m.Push(Msgq_tag_data{
+       m.m.PushLock(Msgq_tag_data{
                Tag:  Tag,
                Data: Data,
        })
diff --git a/sync/RWMutex.go b/sync/RWMutex.go
new file mode 100644 (file)
index 0000000..587221e
--- /dev/null
@@ -0,0 +1,93 @@
+package part
+
+import (
+       "fmt"
+       "runtime"
+       "sync/atomic"
+       "time"
+)
+
+const (
+       lock  = -1
+       ulock = 0
+)
+
+type RWMutex struct {
+       rlc atomic.Int32
+       cul atomic.Int32
+       oll atomic.Int32
+}
+
+func (m *RWMutex) RLock(to ...time.Duration) (unrlock func()) {
+       if len(to) > 0 {
+               c := time.Now()
+               for m.rlc.Load() < ulock {
+                       runtime.Gosched()
+                       if time.Since(c) > to[0] {
+                               panic(fmt.Sprintf("timeout to wait rlock, rlc:%d", m.rlc.Load()))
+                       }
+               }
+       } else {
+               for m.rlc.Load() < ulock {
+                       runtime.Gosched()
+                       time.Sleep(time.Millisecond)
+               }
+       }
+       m.rlc.Add(1)
+       var callC atomic.Bool
+       return func() {
+               if !callC.CompareAndSwap(false, true) {
+                       panic("had unrlock")
+               }
+               m.rlc.Add(-1)
+       }
+}
+
+func (m *RWMutex) Lock(to ...time.Duration) (unlock func()) {
+       lockid := m.cul.Add(1)
+
+       if len(to) > 0 {
+               c := time.Now()
+               if !m.rlc.CompareAndSwap(ulock, lock) {
+                       for m.rlc.Load() > ulock {
+                               runtime.Gosched()
+                               if time.Since(c) > to[0] {
+                                       panic(fmt.Sprintf("timeout to wait rlock, rlc:%d", m.rlc.Load()))
+                               }
+                       }
+                       for lockid-1 != m.oll.Load() {
+                               runtime.Gosched()
+                               if time.Since(c) > to[0] {
+                                       panic(fmt.Sprintf("timeout to wait lock, rlc:%d", m.rlc.Load()))
+                               }
+                       }
+                       if !m.rlc.CompareAndSwap(ulock, lock) {
+                               panic("csa error, bug")
+                       }
+               }
+       } else {
+               if !m.rlc.CompareAndSwap(ulock, lock) {
+                       for m.rlc.Load() > ulock {
+                               runtime.Gosched()
+                               time.Sleep(time.Millisecond)
+                       }
+                       for lockid-1 != m.oll.Load() {
+                               runtime.Gosched()
+                               time.Sleep(time.Millisecond)
+                       }
+                       if !m.rlc.CompareAndSwap(ulock, lock) {
+                               panic("")
+                       }
+               }
+       }
+       var callC atomic.Bool
+       return func() {
+               if !callC.CompareAndSwap(false, true) {
+                       panic("had unlock")
+               }
+               if !m.rlc.CompareAndSwap(lock, ulock) {
+                       panic("")
+               }
+               m.oll.Store(lockid)
+       }
+}
diff --git a/sync/RWMutex_test.go b/sync/RWMutex_test.go
new file mode 100644 (file)
index 0000000..5178e9f
--- /dev/null
@@ -0,0 +1,43 @@
+package part
+
+import (
+       "testing"
+       "time"
+)
+
+func TestMain(t *testing.T) {
+       var rl RWMutex
+       var callL time.Time
+       var callRL time.Time
+       var callRL2 time.Time
+       var to = time.Second * 2
+
+       ul := rl.RLock(to)
+       callRL = time.Now()
+
+       go func() {
+               ull := rl.RLock(to)
+               callRL2 = time.Now()
+               ull()
+       }()
+
+       go func() {
+               ull := rl.Lock(to)
+               callL = time.Now()
+               ull()
+       }()
+
+       time.Sleep(time.Second)
+       ul()
+       rl.Lock(to)()
+
+       if time.Since(callRL) < time.Since(callRL2) {
+               t.Fatal()
+       }
+       if time.Since(callRL2) < time.Since(callL) {
+               t.Fatal()
+       }
+       if callL.IsZero() {
+               t.Fatal()
+       }
+}