From: qydysky Date: Sat, 13 May 2023 11:55:28 +0000 (+0800) Subject: add X-Git-Tag: v0.26.3~1 X-Git-Url: http://127.0.0.1:8081/?a=commitdiff_plain;h=62a2d31eb3d72f006d0a40fba7d105552cb9c90b;p=part%2F.git add --- diff --git a/msgq/Msgq.go b/msgq/Msgq.go index 7e78987..e1eb588 100644 --- a/msgq/Msgq.go +++ b/msgq/Msgq.go @@ -61,7 +61,6 @@ func (m *Msgq) Push(msg any) { m.funcs.Remove(removes[i]) } m.lock.Unlock() - removes = nil } } @@ -88,7 +87,6 @@ func (m *Msgq) PushLock(msg any) { for i := 0; i < len(removes); i++ { m.funcs.Remove(removes[i]) } - removes = nil } } @@ -113,7 +111,6 @@ func (m *Msgq) ClearAll() { for i := 0; i < len(removes); i++ { m.funcs.Remove(removes[i]) } - removes = nil } } @@ -233,7 +230,7 @@ func (m *MsgType[T]) Push_tag(Tag string, Data T) { } func (m *MsgType[T]) PushLock_tag(Tag string, Data T) { - m.m.Push(Msgq_tag_data{ + m.m.PushLock(Msgq_tag_data{ Tag: Tag, Data: Data, }) diff --git a/sync/RWMutex.go b/sync/RWMutex.go new file mode 100644 index 0000000..587221e --- /dev/null +++ b/sync/RWMutex.go @@ -0,0 +1,93 @@ +package part + +import ( + "fmt" + "runtime" + "sync/atomic" + "time" +) + +const ( + lock = -1 + ulock = 0 +) + +type RWMutex struct { + rlc atomic.Int32 + cul atomic.Int32 + oll atomic.Int32 +} + +func (m *RWMutex) RLock(to ...time.Duration) (unrlock func()) { + if len(to) > 0 { + c := time.Now() + for m.rlc.Load() < ulock { + runtime.Gosched() + if time.Since(c) > to[0] { + panic(fmt.Sprintf("timeout to wait rlock, rlc:%d", m.rlc.Load())) + } + } + } else { + for m.rlc.Load() < ulock { + runtime.Gosched() + time.Sleep(time.Millisecond) + } + } + m.rlc.Add(1) + var callC atomic.Bool + return func() { + if !callC.CompareAndSwap(false, true) { + panic("had unrlock") + } + m.rlc.Add(-1) + } +} + +func (m *RWMutex) Lock(to ...time.Duration) (unlock func()) { + lockid := m.cul.Add(1) + + if len(to) > 0 { + c := time.Now() + if !m.rlc.CompareAndSwap(ulock, lock) { + for m.rlc.Load() > ulock { + runtime.Gosched() + if time.Since(c) > to[0] { + panic(fmt.Sprintf("timeout to wait rlock, rlc:%d", m.rlc.Load())) + } + } + for lockid-1 != m.oll.Load() { + runtime.Gosched() + if time.Since(c) > to[0] { + panic(fmt.Sprintf("timeout to wait lock, rlc:%d", m.rlc.Load())) + } + } + if !m.rlc.CompareAndSwap(ulock, lock) { + panic("csa error, bug") + } + } + } else { + if !m.rlc.CompareAndSwap(ulock, lock) { + for m.rlc.Load() > ulock { + runtime.Gosched() + time.Sleep(time.Millisecond) + } + for lockid-1 != m.oll.Load() { + runtime.Gosched() + time.Sleep(time.Millisecond) + } + if !m.rlc.CompareAndSwap(ulock, lock) { + panic("") + } + } + } + var callC atomic.Bool + return func() { + if !callC.CompareAndSwap(false, true) { + panic("had unlock") + } + if !m.rlc.CompareAndSwap(lock, ulock) { + panic("") + } + m.oll.Store(lockid) + } +} diff --git a/sync/RWMutex_test.go b/sync/RWMutex_test.go new file mode 100644 index 0000000..5178e9f --- /dev/null +++ b/sync/RWMutex_test.go @@ -0,0 +1,43 @@ +package part + +import ( + "testing" + "time" +) + +func TestMain(t *testing.T) { + var rl RWMutex + var callL time.Time + var callRL time.Time + var callRL2 time.Time + var to = time.Second * 2 + + ul := rl.RLock(to) + callRL = time.Now() + + go func() { + ull := rl.RLock(to) + callRL2 = time.Now() + ull() + }() + + go func() { + ull := rl.Lock(to) + callL = time.Now() + ull() + }() + + time.Sleep(time.Second) + ul() + rl.Lock(to)() + + if time.Since(callRL) < time.Since(callRL2) { + t.Fatal() + } + if time.Since(callRL2) < time.Since(callL) { + t.Fatal() + } + if callL.IsZero() { + t.Fatal() + } +}