From 8e3488022974f2c713a33d60b1a84fac671b2360 Mon Sep 17 00:00:00 2001 From: qydysky <32743305+qydysky@users.noreply.github.com> Date: Thu, 19 Jan 2023 17:25:41 +0800 Subject: [PATCH] fix --- msgq/Msgq.go | 36 ++++++++++++++++++++------- msgq/Msgq_test.go | 63 +++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 90 insertions(+), 9 deletions(-) diff --git a/msgq/Msgq.go b/msgq/Msgq.go index 77d0418..368ebf5 100644 --- a/msgq/Msgq.go +++ b/msgq/Msgq.go @@ -2,11 +2,15 @@ package part import ( "container/list" + "runtime" "sync" + "sync/atomic" + "time" ) type Msgq struct { - funcs *list.List + funcs *list.List + someNeedRemove atomic.Int32 sync.RWMutex } @@ -25,21 +29,39 @@ func (m *Msgq) Register(f func(any) (disable bool)) { } func (m *Msgq) Push(msg any) { + for m.someNeedRemove.Load() != 0 { + time.Sleep(time.Millisecond) + runtime.Gosched() + } + + var removes []*list.Element + m.RLock() for el := m.funcs.Front(); el != nil; el = el.Next() { if disable := el.Value.(func(any) bool)(msg); disable { - m.funcs.Remove(el) + m.someNeedRemove.Add(1) + removes = append(removes, el) } } m.RUnlock() + + if len(removes) != 0 { + m.Lock() + m.someNeedRemove.Add(-int32(len(removes))) + for i := 0; i < len(removes); i++ { + m.funcs.Remove(removes[i]) + } + m.Unlock() + removes = nil + } } type Msgq_tag_data struct { Tag string - Data interface{} + Data any } -func (m *Msgq) Push_tag(Tag string, Data interface{}) { +func (m *Msgq) Push_tag(Tag string, Data any) { m.Push(Msgq_tag_data{ Tag: Tag, Data: Data, @@ -48,11 +70,7 @@ func (m *Msgq) Push_tag(Tag string, Data interface{}) { func (m *Msgq) Pull_tag(func_map map[string]func(any) (disable bool)) { m.Register(func(data any) (disable bool) { - if d, ok := data.(Msgq_tag_data); !ok { - if f, ok := func_map[`Error`]; ok { - return f(d.Data) - } - } else { + if d, ok := data.(Msgq_tag_data); ok { if f, ok := func_map[d.Tag]; ok { return f(d.Data) } diff --git a/msgq/Msgq_test.go b/msgq/Msgq_test.go index 08846c4..8548baa 100644 --- a/msgq/Msgq_test.go +++ b/msgq/Msgq_test.go @@ -117,6 +117,69 @@ type test_item struct { // t.Log(`fin`) // } +func BenchmarkXxx(b *testing.B) { + mq := New() + mq.Pull_tag(map[string]func(any) bool{ + `1`: func(_ any) bool { + return false + }, + }) + mq.Pull_tag(map[string]func(any) bool{ + `2`: func(_ any) bool { + return false + }, + }) + b.ResetTimer() + for i := 0; i < b.N; i++ { + mq.Push_tag(`1`, nil) + if i == b.N/2 { + mq.Push_tag(`2`, nil) + } + } +} + +func Test_msgq2(t *testing.T) { + mq := New() + + mq.Pull_tag(map[string]func(any) bool{ + `A1`: func(data any) bool { + if v, ok := data.(bool); ok { + return v + } + return false + }, + `A3`: func(data any) bool { + if v, ok := data.(int); ok && v > 50 { + t.Fatal() + } + return false + }, + }) + + mq.Pull_tag(map[string]func(any) bool{ + `A2`: func(data any) bool { + if v, ok := data.(bool); ok { + return v + } + return false + }, + `A3`: func(data any) bool { + // if v, ok := data.(int); ok { + // fmt.Println(`A2A3`, v) + // } + return false + }, + }) + + for i := 0; i < 1000; i++ { + if i == 50 { + mq.Push_tag(`A1`, true) + } + mq.Push_tag(`A3`, i) + } + t.Log(`fin`) +} + func Test_msgq3(t *testing.T) { mq := New() -- 2.39.2