mq.Pull_tag_only(`test`, func(a any) (disable bool) {
return false
})
+ b.ResetTimer()
for i := 0; i < b.N; i++ {
- mq.PushLock_tag(`test`, i)
+ mq.Push_tag(`test`, i)
}
}
}
}
+func Test_3(t *testing.T) {
+ mq := NewTo(time.Millisecond, time.Millisecond*3)
+ go mq.Push_tag(`sss`, nil)
+ mq.Pull_tag(FuncMap{
+ `test`: func(a any) (disable bool) {
+ return false
+ },
+ })
+ time.Sleep(time.Millisecond * 500)
+}
+
func Test_Pull_tag_chan(t *testing.T) {
mq := New()
ctx, cf := context.WithCancel(context.Background())
const (
lock = -1
ulock = 0
+ rlock = 0
)
type RWMutex struct {
- rlc atomic.Int32
- wantRead atomic.Bool
- wantWrite atomic.Bool
+ rlc atomic.Int32
+ want atomic.Int32
}
// to[0]: wait lock timeout to[1]: run lock timeout
//
// 不要在Rlock内设置变量,有DATA RACE风险
func (m *RWMutex) RLock(to ...time.Duration) (unlockf func()) {
- m.wantRead.Store(true)
+ getWant := m.want.CompareAndSwap(ulock, rlock)
var callC atomic.Bool
if len(to) > 0 {
var calls []string
}
}
c := time.Now()
- for m.rlc.Load() < ulock || m.wantWrite.Load() {
+ for m.rlc.Load() < ulock || !getWant && !m.want.CompareAndSwap(ulock, rlock) {
if time.Since(c) > to[0] {
- panic(fmt.Sprintf("timeout to wait lock while rlocking, rlc:%d", m.rlc.Load()))
+ panic(fmt.Sprintf("timeout to wait lock while rlocking, rlc:%d, want:%d, getWant:%v", m.rlc.Load(), m.want.Load(), getWant))
}
runtime.Gosched()
}
})
}
} else {
- for m.rlc.Load() < ulock || m.wantWrite.Load() {
+ for m.rlc.Load() < ulock || !getWant && m.want.CompareAndSwap(ulock, rlock) {
runtime.Gosched()
}
}
panic("had unrlock")
}
if m.rlc.Add(-1) == ulock {
- m.wantRead.Store(false)
+ m.want.CompareAndSwap(rlock, ulock)
}
}
}
// to[0]: wait lock timeout to[1]: run lock timeout
func (m *RWMutex) Lock(to ...time.Duration) (unlockf func()) {
- m.wantWrite.Store(true)
+ getWant := m.want.CompareAndSwap(ulock, lock)
var callC atomic.Bool
if len(to) > 0 {
var calls []string
}
}
c := time.Now()
- for m.rlc.Load() != ulock || m.wantRead.Load() {
+ for m.rlc.Load() != ulock || !getWant && !m.want.CompareAndSwap(ulock, lock) {
if time.Since(c) > to[0] {
- panic(fmt.Sprintf("timeout to wait rlock while locking, rlc:%d", m.rlc.Load()))
+ panic(fmt.Sprintf("timeout to wait rlock while locking, rlc:%d, want:%v, getWant:%v", m.rlc.Load(), m.want.Load(), getWant))
}
runtime.Gosched()
}
})
}
} else {
- for m.rlc.Load() != ulock || m.wantRead.Load() {
+ for m.rlc.Load() != ulock || !getWant && m.want.CompareAndSwap(ulock, lock) {
runtime.Gosched()
}
}
panic("had unlock")
}
if m.rlc.Add(1) == ulock {
- m.wantWrite.Store(false)
+ m.want.CompareAndSwap(lock, ulock)
}
}
}