From: qydysky Date: Sat, 13 May 2023 13:16:14 +0000 (+0800) Subject: Add X-Git-Tag: v0.26.4 X-Git-Url: http://127.0.0.1:8081/?a=commitdiff_plain;h=a90a021f0183a9566395da413c6d94864bb1f136;p=part%2F.git Add --- diff --git a/msgq/Msgq.go b/msgq/Msgq.go index e1eb588..cbe8ab3 100644 --- a/msgq/Msgq.go +++ b/msgq/Msgq.go @@ -4,14 +4,15 @@ import ( "container/list" "context" "runtime" - "sync" "sync/atomic" "time" signal "github.com/qydysky/part/signal" + sync "github.com/qydysky/part/sync" ) type Msgq struct { + to []time.Duration funcs *list.List someNeedRemove atomic.Int32 lock sync.RWMutex @@ -25,16 +26,27 @@ func New() *Msgq { return m } +func NewTo(to ...time.Duration) *Msgq { + m := new(Msgq) + m.funcs = list.New() + if len(to) > 0 { + m.to = append(m.to, to[0]) + } else { + m.to = append(m.to, time.Second*30) + } + return m +} + func (m *Msgq) Register(f func(any) (disable bool)) { - m.lock.Lock() + ul := m.lock.Lock(m.to...) m.funcs.PushBack(f) - m.lock.Unlock() + ul() } func (m *Msgq) Register_front(f func(any) (disable bool)) { - m.lock.Lock() + ul := m.lock.Lock(m.to...) m.funcs.PushFront(f) - m.lock.Unlock() + ul() } func (m *Msgq) Push(msg any) { @@ -45,22 +57,22 @@ func (m *Msgq) Push(msg any) { var removes []*list.Element - m.lock.RLock() + ul := m.lock.RLock(m.to...) for el := m.funcs.Front(); el != nil; el = el.Next() { if disable := el.Value.(func(any) bool)(msg); disable { m.someNeedRemove.Add(1) removes = append(removes, el) } } - m.lock.RUnlock() + ul() if len(removes) != 0 { - m.lock.Lock() + ul := m.lock.Lock(m.to...) m.someNeedRemove.Add(-int32(len(removes))) for i := 0; i < len(removes); i++ { m.funcs.Remove(removes[i]) } - m.lock.Unlock() + ul() } } @@ -70,8 +82,8 @@ func (m *Msgq) PushLock(msg any) { runtime.Gosched() } - m.lock.Lock() - defer m.lock.Unlock() + ul := m.lock.Lock(m.to...) + defer ul() var removes []*list.Element @@ -96,8 +108,8 @@ func (m *Msgq) ClearAll() { runtime.Gosched() } - m.lock.Lock() - defer m.lock.Unlock() + ul := m.lock.Lock(m.to...) + defer ul() var removes []*list.Element @@ -222,6 +234,12 @@ func NewType[T any]() *MsgType[T] { } } +func NewTypeTo[T any](to ...time.Duration) *MsgType[T] { + return &MsgType[T]{ + m: NewTo(to...), + } +} + func (m *MsgType[T]) Push_tag(Tag string, Data T) { m.m.Push(Msgq_tag_data{ Tag: Tag, diff --git a/msgq/Msgq_test.go b/msgq/Msgq_test.go index 037ea9e..202bf23 100644 --- a/msgq/Msgq_test.go +++ b/msgq/Msgq_test.go @@ -139,6 +139,21 @@ func BenchmarkXxx(b *testing.B) { } } +func TestPushLock(t *testing.T) { + defer func() { + if e := recover(); e.(string) != "timeout to wait rlock, rlc:1" { + t.Fatal(e) + } + }() + mq := NewTo(time.Second) + mq.Pull_tag_only(`test`, func(a any) (disable bool) { + mq.PushLock_tag(`lock`, nil) + return false + }) + mq.Push_tag(`test`, nil) + t.Fatal() +} + func Test_Pull_tag_chan(t *testing.T) { mq := New() ctx, cf := context.WithCancel(context.Background())