From: qydysky Date: Mon, 22 May 2023 16:07:17 +0000 (+0800) Subject: fix X-Git-Tag: v0.27.9 X-Git-Url: http://127.0.0.1:8081/?a=commitdiff_plain;h=faca30a041dee3e95425a194dca1e98a9352156d;p=part%2F.git fix --- diff --git a/msgq/Msgq_test.go b/msgq/Msgq_test.go index ee8ee30..3d614bd 100644 --- a/msgq/Msgq_test.go +++ b/msgq/Msgq_test.go @@ -159,8 +159,9 @@ func Benchmark_1(b *testing.B) { mq.Pull_tag_only(`test`, func(a any) (disable bool) { return false }) + b.ResetTimer() for i := 0; i < b.N; i++ { - mq.PushLock_tag(`test`, i) + mq.Push_tag(`test`, i) } } @@ -181,6 +182,17 @@ func Test_RemoveInPush(t *testing.T) { } } +func Test_3(t *testing.T) { + mq := NewTo(time.Millisecond, time.Millisecond*3) + go mq.Push_tag(`sss`, nil) + mq.Pull_tag(FuncMap{ + `test`: func(a any) (disable bool) { + return false + }, + }) + time.Sleep(time.Millisecond * 500) +} + func Test_Pull_tag_chan(t *testing.T) { mq := New() ctx, cf := context.WithCancel(context.Background()) diff --git a/sync/RWMutex.go b/sync/RWMutex.go index f79bd5b..4943c5d 100644 --- a/sync/RWMutex.go +++ b/sync/RWMutex.go @@ -10,19 +10,19 @@ import ( const ( lock = -1 ulock = 0 + rlock = 0 ) type RWMutex struct { - rlc atomic.Int32 - wantRead atomic.Bool - wantWrite atomic.Bool + rlc atomic.Int32 + want atomic.Int32 } // to[0]: wait lock timeout to[1]: run lock timeout // // 不要在Rlock内设置变量,有DATA RACE风险 func (m *RWMutex) RLock(to ...time.Duration) (unlockf func()) { - m.wantRead.Store(true) + getWant := m.want.CompareAndSwap(ulock, rlock) var callC atomic.Bool if len(to) > 0 { var calls []string @@ -36,9 +36,9 @@ func (m *RWMutex) RLock(to ...time.Duration) (unlockf func()) { } } c := time.Now() - for m.rlc.Load() < ulock || m.wantWrite.Load() { + for m.rlc.Load() < ulock || !getWant && !m.want.CompareAndSwap(ulock, rlock) { if time.Since(c) > to[0] { - panic(fmt.Sprintf("timeout to wait lock while rlocking, rlc:%d", m.rlc.Load())) + panic(fmt.Sprintf("timeout to wait lock while rlocking, rlc:%d, want:%d, getWant:%v", m.rlc.Load(), m.want.Load(), getWant)) } runtime.Gosched() } @@ -54,7 +54,7 @@ func (m *RWMutex) RLock(to ...time.Duration) (unlockf func()) { }) } } else { - for m.rlc.Load() < ulock || m.wantWrite.Load() { + for m.rlc.Load() < ulock || !getWant && m.want.CompareAndSwap(ulock, rlock) { runtime.Gosched() } } @@ -64,14 +64,14 @@ func (m *RWMutex) RLock(to ...time.Duration) (unlockf func()) { panic("had unrlock") } if m.rlc.Add(-1) == ulock { - m.wantRead.Store(false) + m.want.CompareAndSwap(rlock, ulock) } } } // to[0]: wait lock timeout to[1]: run lock timeout func (m *RWMutex) Lock(to ...time.Duration) (unlockf func()) { - m.wantWrite.Store(true) + getWant := m.want.CompareAndSwap(ulock, lock) var callC atomic.Bool if len(to) > 0 { var calls []string @@ -85,9 +85,9 @@ func (m *RWMutex) Lock(to ...time.Duration) (unlockf func()) { } } c := time.Now() - for m.rlc.Load() != ulock || m.wantRead.Load() { + for m.rlc.Load() != ulock || !getWant && !m.want.CompareAndSwap(ulock, lock) { if time.Since(c) > to[0] { - panic(fmt.Sprintf("timeout to wait rlock while locking, rlc:%d", m.rlc.Load())) + panic(fmt.Sprintf("timeout to wait rlock while locking, rlc:%d, want:%v, getWant:%v", m.rlc.Load(), m.want.Load(), getWant)) } runtime.Gosched() } @@ -103,7 +103,7 @@ func (m *RWMutex) Lock(to ...time.Duration) (unlockf func()) { }) } } else { - for m.rlc.Load() != ulock || m.wantRead.Load() { + for m.rlc.Load() != ulock || !getWant && m.want.CompareAndSwap(ulock, lock) { runtime.Gosched() } } @@ -113,7 +113,7 @@ func (m *RWMutex) Lock(to ...time.Duration) (unlockf func()) { panic("had unlock") } if m.rlc.Add(1) == ulock { - m.wantWrite.Store(false) + m.want.CompareAndSwap(lock, ulock) } } }