"container/list"
"context"
"runtime"
- "sync"
"sync/atomic"
"time"
signal "github.com/qydysky/part/signal"
+ sync "github.com/qydysky/part/sync"
)
type Msgq struct {
+ to []time.Duration
funcs *list.List
someNeedRemove atomic.Int32
lock sync.RWMutex
return m
}
+func NewTo(to ...time.Duration) *Msgq {
+ m := new(Msgq)
+ m.funcs = list.New()
+ if len(to) > 0 {
+ m.to = append(m.to, to[0])
+ } else {
+ m.to = append(m.to, time.Second*30)
+ }
+ return m
+}
+
func (m *Msgq) Register(f func(any) (disable bool)) {
- m.lock.Lock()
+ ul := m.lock.Lock(m.to...)
m.funcs.PushBack(f)
- m.lock.Unlock()
+ ul()
}
func (m *Msgq) Register_front(f func(any) (disable bool)) {
- m.lock.Lock()
+ ul := m.lock.Lock(m.to...)
m.funcs.PushFront(f)
- m.lock.Unlock()
+ ul()
}
func (m *Msgq) Push(msg any) {
var removes []*list.Element
- m.lock.RLock()
+ ul := m.lock.RLock(m.to...)
for el := m.funcs.Front(); el != nil; el = el.Next() {
if disable := el.Value.(func(any) bool)(msg); disable {
m.someNeedRemove.Add(1)
removes = append(removes, el)
}
}
- m.lock.RUnlock()
+ ul()
if len(removes) != 0 {
- m.lock.Lock()
+ ul := m.lock.Lock(m.to...)
m.someNeedRemove.Add(-int32(len(removes)))
for i := 0; i < len(removes); i++ {
m.funcs.Remove(removes[i])
}
- m.lock.Unlock()
+ ul()
}
}
runtime.Gosched()
}
- m.lock.Lock()
- defer m.lock.Unlock()
+ ul := m.lock.Lock(m.to...)
+ defer ul()
var removes []*list.Element
runtime.Gosched()
}
- m.lock.Lock()
- defer m.lock.Unlock()
+ ul := m.lock.Lock(m.to...)
+ defer ul()
var removes []*list.Element
}
}
+func NewTypeTo[T any](to ...time.Duration) *MsgType[T] {
+ return &MsgType[T]{
+ m: NewTo(to...),
+ }
+}
+
func (m *MsgType[T]) Push_tag(Tag string, Data T) {
m.m.Push(Msgq_tag_data{
Tag: Tag,
}
}
+func TestPushLock(t *testing.T) {
+ defer func() {
+ if e := recover(); e.(string) != "timeout to wait rlock, rlc:1" {
+ t.Fatal(e)
+ }
+ }()
+ mq := NewTo(time.Second)
+ mq.Pull_tag_only(`test`, func(a any) (disable bool) {
+ mq.PushLock_tag(`lock`, nil)
+ return false
+ })
+ mq.Push_tag(`test`, nil)
+ t.Fatal()
+}
+
func Test_Pull_tag_chan(t *testing.T) {
mq := New()
ctx, cf := context.WithCancel(context.Background())