From: qydysky Date: Fri, 28 Jul 2023 09:15:09 +0000 (+0800) Subject: add X-Git-Tag: v0.28.0+20230728804fca1~1 X-Git-Url: http://127.0.0.1:8081/?a=commitdiff_plain;h=602c8c80727e2bead5e7fd4f6407279efab1013b;p=part%2F.git add --- diff --git a/msgq/Msgq.go b/msgq/Msgq.go index c60b1c2..b0a1403 100644 --- a/msgq/Msgq.go +++ b/msgq/Msgq.go @@ -28,14 +28,8 @@ type msgqItem struct { type FuncMap map[string]func(any) (disable bool) -func New() *Msgq { - m := new(Msgq) - m.funcs = list.New() - return m -} - // to[0]:timeout to wait to[1]:timeout to run -func NewTo(to ...time.Duration) *Msgq { +func New(to ...time.Duration) *Msgq { m := new(Msgq) m.funcs = list.New() m.to = to @@ -67,7 +61,9 @@ func (m *Msgq) Register_front(f func(any) (disable bool)) { } func (m *Msgq) Push(msg any) { + defer m.removeDisable() ul := m.lock.RLock(m.to...) + defer ul() for el := m.funcs.Front(); el != nil; el = el.Next() { mi := el.Value.(*msgqItem) @@ -81,15 +77,12 @@ func (m *Msgq) Push(msg any) { } mi.running.Add(-1) } - - ul() - if m.someNeedRemove.Load() { - m.removeDisable() - } } func (m *Msgq) PushLock(msg any) { + defer m.removeDisable() ul := m.lock.Lock(m.to...) + defer ul() for el := m.funcs.Front(); el != nil; el = el.Next() { mi := el.Value.(*msgqItem) @@ -103,11 +96,6 @@ func (m *Msgq) PushLock(msg any) { } mi.running.Add(-1) } - - ul() - if m.someNeedRemove.Load() { - m.removeDisable() - } } func (m *Msgq) ClearAll() { @@ -119,10 +107,13 @@ func (m *Msgq) ClearAll() { } func (m *Msgq) removeDisable() { + if !m.someNeedRemove.CompareAndSwap(true, false) { + return + } + ul := m.lock.Lock(m.to...) defer ul() - m.someNeedRemove.Store(false) for el := m.funcs.Front(); el != nil; el = el.Next() { mi := el.Value.(*msgqItem) if mi.disable.Load() && mi.running.Load() == 0 { @@ -164,7 +155,9 @@ func (m *Msgq) Push_tag(Tag string, Data any) { Data: Data, }) */ + defer m.removeDisable() ul := m.lock.RLock(m.to...) + defer ul() for el := m.funcs.Front(); el != nil; el = el.Next() { mi := el.Value.(*msgqItem) @@ -181,11 +174,6 @@ func (m *Msgq) Push_tag(Tag string, Data any) { } mi.running.Add(-1) } - - ul() - if m.someNeedRemove.Load() { - m.removeDisable() - } } } @@ -217,7 +205,9 @@ func (m *Msgq) PushLock_tag(Tag string, Data any) { Data: Data, }) */ + defer m.removeDisable() ul := m.lock.Lock(m.to...) + defer ul() for el := m.funcs.Front(); el != nil; el = el.Next() { mi := el.Value.(*msgqItem) @@ -234,11 +224,6 @@ func (m *Msgq) PushLock_tag(Tag string, Data any) { } mi.running.Add(-1) } - - ul() - if m.someNeedRemove.Load() { - m.removeDisable() - } } } @@ -336,13 +321,9 @@ type MsgType_tag_data[T any] struct { Data *T } -func NewType[T any]() *MsgType[T] { - return &MsgType[T]{m: New()} -} - // to[0]:timeout to wait to[1]:timeout to run -func NewTypeTo[T any](to ...time.Duration) *MsgType[T] { - return &MsgType[T]{m: NewTo(to...)} +func NewType[T any](to ...time.Duration) *MsgType[T] { + return &MsgType[T]{m: New(to...)} } func (m *MsgType[T]) ClearAll() { @@ -377,7 +358,9 @@ func (m *MsgType[T]) Push_tag(Tag string, Data T) { Data: Data, }) */ + defer m.m.removeDisable() ul := m.m.lock.RLock(m.m.to...) + defer ul() for el := m.m.funcs.Front(); el != nil; el = el.Next() { mi := el.Value.(*msgqItem) @@ -394,11 +377,6 @@ func (m *MsgType[T]) Push_tag(Tag string, Data T) { } mi.running.Add(-1) } - - ul() - if m.m.someNeedRemove.Load() { - m.m.removeDisable() - } } } @@ -430,7 +408,9 @@ func (m *MsgType[T]) PushLock_tag(Tag string, Data T) { Data: Data, }) */ + defer m.m.removeDisable() ul := m.m.lock.Lock(m.m.to...) + defer ul() for el := m.m.funcs.Front(); el != nil; el = el.Next() { mi := el.Value.(*msgqItem) @@ -447,11 +427,6 @@ func (m *MsgType[T]) PushLock_tag(Tag string, Data T) { } mi.running.Add(-1) } - - ul() - if m.m.someNeedRemove.Load() { - m.m.removeDisable() - } } } diff --git a/msgq/Msgq_test.go b/msgq/Msgq_test.go index 67f9f9b..0fd9005 100644 --- a/msgq/Msgq_test.go +++ b/msgq/Msgq_test.go @@ -166,7 +166,7 @@ func Benchmark_1(b *testing.B) { } func Test_RemoveInPush(t *testing.T) { - mq := NewTo(time.Second, time.Second*3) + mq := New(time.Second, time.Second*3) mq.Pull_tag(FuncMap{ `r1`: func(a any) (disable bool) { mq.ClearAll() @@ -183,7 +183,7 @@ func Test_RemoveInPush(t *testing.T) { } func Test_3(t *testing.T) { - mq := NewTo(time.Millisecond*5, time.Millisecond*10) + mq := New(time.Millisecond*5, time.Millisecond*10) go mq.Push_tag(`sss`, nil) mq.Pull_tag(FuncMap{ `test`: func(a any) (disable bool) {