From faca30a041dee3e95425a194dca1e98a9352156d Mon Sep 17 00:00:00 2001 From: qydysky Date: Tue, 23 May 2023 00:07:17 +0800 Subject: [PATCH] fix --- msgq/Msgq_test.go | 14 +++++++++++++- sync/RWMutex.go | 26 +++++++++++++------------- 2 files changed, 26 insertions(+), 14 deletions(-) diff --git a/msgq/Msgq_test.go b/msgq/Msgq_test.go index ee8ee30..3d614bd 100644 --- a/msgq/Msgq_test.go +++ b/msgq/Msgq_test.go @@ -159,8 +159,9 @@ func Benchmark_1(b *testing.B) { mq.Pull_tag_only(`test`, func(a any) (disable bool) { return false }) + b.ResetTimer() for i := 0; i < b.N; i++ { - mq.PushLock_tag(`test`, i) + mq.Push_tag(`test`, i) } } @@ -181,6 +182,17 @@ func Test_RemoveInPush(t *testing.T) { } } +func Test_3(t *testing.T) { + mq := NewTo(time.Millisecond, time.Millisecond*3) + go mq.Push_tag(`sss`, nil) + mq.Pull_tag(FuncMap{ + `test`: func(a any) (disable bool) { + return false + }, + }) + time.Sleep(time.Millisecond * 500) +} + func Test_Pull_tag_chan(t *testing.T) { mq := New() ctx, cf := context.WithCancel(context.Background()) diff --git a/sync/RWMutex.go b/sync/RWMutex.go index f79bd5b..4943c5d 100644 --- a/sync/RWMutex.go +++ b/sync/RWMutex.go @@ -10,19 +10,19 @@ import ( const ( lock = -1 ulock = 0 + rlock = 0 ) type RWMutex struct { - rlc atomic.Int32 - wantRead atomic.Bool - wantWrite atomic.Bool + rlc atomic.Int32 + want atomic.Int32 } // to[0]: wait lock timeout to[1]: run lock timeout // // 不要在Rlock内设置变量,有DATA RACE风险 func (m *RWMutex) RLock(to ...time.Duration) (unlockf func()) { - m.wantRead.Store(true) + getWant := m.want.CompareAndSwap(ulock, rlock) var callC atomic.Bool if len(to) > 0 { var calls []string @@ -36,9 +36,9 @@ func (m *RWMutex) RLock(to ...time.Duration) (unlockf func()) { } } c := time.Now() - for m.rlc.Load() < ulock || m.wantWrite.Load() { + for m.rlc.Load() < ulock || !getWant && !m.want.CompareAndSwap(ulock, rlock) { if time.Since(c) > to[0] { - panic(fmt.Sprintf("timeout to wait lock while rlocking, rlc:%d", m.rlc.Load())) + panic(fmt.Sprintf("timeout to wait lock while rlocking, rlc:%d, want:%d, getWant:%v", m.rlc.Load(), m.want.Load(), getWant)) } runtime.Gosched() } @@ -54,7 +54,7 @@ func (m *RWMutex) RLock(to ...time.Duration) (unlockf func()) { }) } } else { - for m.rlc.Load() < ulock || m.wantWrite.Load() { + for m.rlc.Load() < ulock || !getWant && m.want.CompareAndSwap(ulock, rlock) { runtime.Gosched() } } @@ -64,14 +64,14 @@ func (m *RWMutex) RLock(to ...time.Duration) (unlockf func()) { panic("had unrlock") } if m.rlc.Add(-1) == ulock { - m.wantRead.Store(false) + m.want.CompareAndSwap(rlock, ulock) } } } // to[0]: wait lock timeout to[1]: run lock timeout func (m *RWMutex) Lock(to ...time.Duration) (unlockf func()) { - m.wantWrite.Store(true) + getWant := m.want.CompareAndSwap(ulock, lock) var callC atomic.Bool if len(to) > 0 { var calls []string @@ -85,9 +85,9 @@ func (m *RWMutex) Lock(to ...time.Duration) (unlockf func()) { } } c := time.Now() - for m.rlc.Load() != ulock || m.wantRead.Load() { + for m.rlc.Load() != ulock || !getWant && !m.want.CompareAndSwap(ulock, lock) { if time.Since(c) > to[0] { - panic(fmt.Sprintf("timeout to wait rlock while locking, rlc:%d", m.rlc.Load())) + panic(fmt.Sprintf("timeout to wait rlock while locking, rlc:%d, want:%v, getWant:%v", m.rlc.Load(), m.want.Load(), getWant)) } runtime.Gosched() } @@ -103,7 +103,7 @@ func (m *RWMutex) Lock(to ...time.Duration) (unlockf func()) { }) } } else { - for m.rlc.Load() != ulock || m.wantRead.Load() { + for m.rlc.Load() != ulock || !getWant && m.want.CompareAndSwap(ulock, lock) { runtime.Gosched() } } @@ -113,7 +113,7 @@ func (m *RWMutex) Lock(to ...time.Duration) (unlockf func()) { panic("had unlock") } if m.rlc.Add(1) == ulock { - m.wantWrite.Store(false) + m.want.CompareAndSwap(lock, ulock) } } } -- 2.39.2