From b57506163d5b3998e56c6fbb826762de6cd534db Mon Sep 17 00:00:00 2001 From: qydysky <32743305+qydysky@users.noreply.github.com> Date: Sat, 25 Feb 2023 23:36:23 +0800 Subject: [PATCH] improve --- msgq/Msgq.go | 92 +++++++++++++++++++++++++++++++++++++++++++++++ msgq/Msgq_test.go | 44 +++++++++++++++++++++++ 2 files changed, 136 insertions(+) diff --git a/msgq/Msgq.go b/msgq/Msgq.go index e864492..572f718 100644 --- a/msgq/Msgq.go +++ b/msgq/Msgq.go @@ -28,6 +28,12 @@ func (m *Msgq) Register(f func(any) (disable bool)) { m.lock.Unlock() } +func (m *Msgq) Register_front(f func(any) (disable bool)) { + m.lock.Lock() + m.funcs.PushFront(f) + m.lock.Unlock() +} + func (m *Msgq) Push(msg any) { for m.someNeedRemove.Load() != 0 { time.Sleep(time.Millisecond) @@ -68,6 +74,15 @@ func (m *Msgq) Push_tag(Tag string, Data any) { }) } +func (m *Msgq) Pull_tag_only(key string, f func(any) (disable bool)) { + m.Register(func(data any) (disable bool) { + if d, ok := data.(Msgq_tag_data); ok && d.Tag == key { + return f(d.Data) + } + return false + }) +} + func (m *Msgq) Pull_tag(func_map map[string]func(any) (disable bool)) { m.Register(func(data any) (disable bool) { if d, ok := data.(Msgq_tag_data); ok { @@ -79,6 +94,40 @@ func (m *Msgq) Pull_tag(func_map map[string]func(any) (disable bool)) { }) } +func (m *Msgq) Pull_tag_async_only(key string, f func(any) (disable bool)) { + var disable = false + + m.Register_front(func(data any) bool { + if disable { + return true + } + if d, ok := data.(Msgq_tag_data); ok && d.Tag == key { + go func(t *bool) { + *t = f(d.Data) + }(&disable) + } + return false + }) +} + +func (m *Msgq) Pull_tag_async(func_map map[string]func(any) (disable bool)) { + var disable = false + + m.Register_front(func(data any) bool { + if disable { + return true + } + if d, ok := data.(Msgq_tag_data); ok { + if f, ok := func_map[d.Tag]; ok { + go func(t *bool) { + *t = f(d.Data) + }(&disable) + } + } + return false + }) +} + type MsgType[T any] struct { m *Msgq } @@ -96,6 +145,15 @@ func (m *MsgType[T]) Push_tag(Tag string, Data T) { }) } +func (m *MsgType[T]) Pull_tag_only(key string, f func(T) (disable bool)) { + m.m.Register(func(data any) (disable bool) { + if d, ok := data.(Msgq_tag_data); ok && d.Tag == key { + return f(d.Data.(T)) + } + return false + }) +} + func (m *MsgType[T]) Pull_tag(func_map map[string]func(T) (disable bool)) { m.m.Register(func(data any) (disable bool) { if d, ok := data.(Msgq_tag_data); ok { @@ -106,3 +164,37 @@ func (m *MsgType[T]) Pull_tag(func_map map[string]func(T) (disable bool)) { return false }) } + +func (m *MsgType[T]) Pull_tag_async_only(key string, f func(T) (disable bool)) { + var disable = false + + m.m.Register_front(func(data any) bool { + if disable { + return true + } + if d, ok := data.(Msgq_tag_data); ok && d.Tag == key { + go func(t *bool) { + *t = f(d.Data.(T)) + }(&disable) + } + return false + }) +} + +func (m *MsgType[T]) Pull_tag_async(func_map map[string]func(T) (disable bool)) { + var disable = false + + m.m.Register_front(func(data any) bool { + if disable { + return true + } + if d, ok := data.(Msgq_tag_data); ok { + if f, ok := func_map[d.Tag]; ok { + go func(t *bool) { + *t = f(d.Data.(T)) + }(&disable) + } + } + return false + }) +} diff --git a/msgq/Msgq_test.go b/msgq/Msgq_test.go index 4f51eef..116ffcd 100644 --- a/msgq/Msgq_test.go +++ b/msgq/Msgq_test.go @@ -348,6 +348,50 @@ func Test_msgq6(t *testing.T) { time.Sleep(time.Second) } +func Test_msgq7(t *testing.T) { + msg := NewType[int]() + msg.Pull_tag_async_only(`1`, func(i int) (disable bool) { + time.Sleep(time.Second) + t.Log(`async1`) + return i > 10 + }) + msg.Pull_tag_async_only(`1`, func(i int) (disable bool) { + time.Sleep(time.Second * 2) + t.Log(`async2`) + return i > 10 + }) + msg.Pull_tag_only(`1`, func(i int) (disable bool) { + time.Sleep(time.Second * 2) + t.Log(`sync1`) + return i > 10 + }) + msg.Pull_tag_only(`1`, func(i int) (disable bool) { + time.Sleep(time.Second * 2) + t.Log(`sync2`) + return i > 10 + }) + msg.Push_tag(`1`, 0) + time.Sleep(time.Second * 10) +} + +func Test_msgq8(t *testing.T) { + msg := NewType[int]() + msg.Pull_tag_async_only(`1`, func(i int) (disable bool) { + time.Sleep(time.Second) + t.Logf("async %d", i) + return i > 3 + }) + msg.Pull_tag_only(`1`, func(i int) (disable bool) { + time.Sleep(time.Second) + t.Logf("sync %d", i) + return i > 5 + }) + for i := 0; i < 20; i++ { + msg.Push_tag(`1`, i) + } + time.Sleep(time.Second * 10) +} + // func Test_msgq6(t *testing.T) { // mq := New() // go mq.Pull_tag(map[string]func(interface{}) bool{ -- 2.39.2