From cd33d092fe5e4aef8708f176989deebbfbb703aa Mon Sep 17 00:00:00 2001 From: qydysky Date: Tue, 2 Apr 2024 23:34:30 +0800 Subject: [PATCH] Pull_tag_async_order --- msgq/Msgq.go | 68 +++++++++++++++++++++++++++++++++++++++++++++++ msgq/Msgq_test.go | 34 ++++++++++++++++++++++++ 2 files changed, 102 insertions(+) diff --git a/msgq/Msgq.go b/msgq/Msgq.go index bb75998..e39875f 100644 --- a/msgq/Msgq.go +++ b/msgq/Msgq.go @@ -27,6 +27,7 @@ type msgqItem struct { } type FuncMap map[string]func(any) (disable bool) +type FuncMapType[T any] map[string]func(T) (disable bool) // to[0]:timeout to wait to[1]:timeout to run func New(to ...time.Duration) *Msgq { @@ -323,6 +324,37 @@ func (m *Msgq) Pull_tag_async(func_map map[string]func(any) (disable bool)) (can return m.register_front(&mi) } +func (m *Msgq) Pull_tag_async_order(func_map map[string]func(any) (disable bool)) (cancel func()) { + var mi = msgqItem{} + var order atomic.Pointer[orderFunc] + order.Store(&orderFunc{ + f: func() {}, + next: &orderFunc{}, + }) + var f = func(data any) bool { + if d, ok := data.(*Msgq_tag_data); ok { + if f, ok := func_map[d.Tag]; ok { + p := order.Load() + p.f = func() { + if f(d.Data) { + mi.disable.Store(true) + m.someNeedRemove.Store(true) + } + if p.next.f != nil { + p.next.f() + } + } + p.next = &orderFunc{} + order.Store(p.next) + p.f() + } + } + return false + } + mi.f = &f + return m.register_front(&mi) +} + type MsgType[T any] struct { m *Msgq } @@ -528,3 +560,39 @@ func (m *MsgType[T]) Pull_tag_async(func_map map[string]func(T) (disable bool)) mi.f = &f return m.m.register_front(&mi) } + +type orderFunc struct { + f func() + next *orderFunc +} + +func (m *MsgType[T]) Pull_tag_async_order(func_map map[string]func(T) (disable bool)) (cancel func()) { + var mi = msgqItem{} + var order atomic.Pointer[orderFunc] + order.Store(&orderFunc{ + f: func() {}, + next: &orderFunc{}, + }) + var f = func(data any) bool { + if d, ok := data.(*MsgType_tag_data[T]); ok { + if f, ok := func_map[d.Tag]; ok { + p := order.Load() + p.f = func() { + if f(*d.Data) { + mi.disable.Store(true) + m.m.someNeedRemove.Store(true) + } + if p.next.f != nil { + p.next.f() + } + } + p.next = &orderFunc{} + order.Store(p.next) + p.f() + } + } + return false + } + mi.f = &f + return m.m.register_front(&mi) +} diff --git a/msgq/Msgq_test.go b/msgq/Msgq_test.go index 3d5102c..45ef7d7 100644 --- a/msgq/Msgq_test.go +++ b/msgq/Msgq_test.go @@ -165,6 +165,40 @@ func Benchmark_1(b *testing.B) { } } +func Benchmark_3(b *testing.B) { + mq := NewType[int]() + mq.Pull_tag_async_order(FuncMapType[int]{ + `del`: func(a int) (disable bool) { + return false + }, + }) + b.ResetTimer() + for i := 0; i < b.N; i++ { + mq.Push_tag(`del`, i) + } +} + +func Test_5(t *testing.T) { + c := make(chan int, 1000) + mq := NewType[int]() + cancel := mq.Pull_tag_async_order(FuncMapType[int]{ + `del`: func(a int) (disable bool) { + c <- a + return false + }, + }) + time.Sleep(time.Millisecond * 500) + for i := 0; i < 100; i++ { + mq.Push_tag(`del`, i) + } + cancel() + for i := 0; i < 100; i++ { + if i != <-c { + t.FailNow() + } + } +} + func Test_4(t *testing.T) { mq := New() cancel := mq.Pull_tag(FuncMap{ -- 2.39.2