From 2a7aa62defb47cec83d5384b2c28543c1224853d Mon Sep 17 00:00:00 2001 From: qydysky Date: Wed, 10 May 2023 01:34:10 +0800 Subject: [PATCH] Add --- msgq/Msgq.go | 43 +++++++++++++++++++++++++++++++++++++++++++ msgq/Msgq_test.go | 37 +++++++++++++++++++++++++++++++++++++ 2 files changed, 80 insertions(+) diff --git a/msgq/Msgq.go b/msgq/Msgq.go index a6b8e3e..11a9682 100644 --- a/msgq/Msgq.go +++ b/msgq/Msgq.go @@ -2,6 +2,7 @@ package part import ( "container/list" + "context" "runtime" "sync" "sync/atomic" @@ -135,6 +136,27 @@ func (m *Msgq) PushLock_tag(Tag string, Data any) { }) } +func (m *Msgq) Pull_tag_chan(key string, size int, ctx context.Context) chan any { + var ch = make(chan any, size) + m.Register(func(data any) bool { + if d, ok := data.(Msgq_tag_data); ok && d.Tag == key { + select { + case <-ctx.Done(): + close(ch) + return true + default: + if len(ch) == size { + <-ch + } + ch <- d.Data + return false + } + } + return false + }) + return ch +} + func (m *Msgq) Pull_tag_only(key string, f func(any) (disable bool)) { m.Register(func(data any) (disable bool) { if d, ok := data.(Msgq_tag_data); ok && d.Tag == key { @@ -221,6 +243,27 @@ func (m *MsgType[T]) ClearAll() { m.m.ClearAll() } +func (m *MsgType[T]) Pull_tag_chan(key string, size int, ctx context.Context) chan T { + var ch = make(chan T, size) + m.m.Register(func(data any) bool { + if d, ok := data.(Msgq_tag_data); ok && d.Tag == key { + select { + case <-ctx.Done(): + close(ch) + return true + default: + if len(ch) == size { + <-ch + } + ch <- d.Data.(T) + return false + } + } + return false + }) + return ch +} + func (m *MsgType[T]) Pull_tag_only(key string, f func(T) (disable bool)) { m.m.Register(func(data any) (disable bool) { if d, ok := data.(Msgq_tag_data); ok && d.Tag == key { diff --git a/msgq/Msgq_test.go b/msgq/Msgq_test.go index 4e2f991..037ea9e 100644 --- a/msgq/Msgq_test.go +++ b/msgq/Msgq_test.go @@ -1,6 +1,7 @@ package part import ( + "context" _ "net/http/pprof" "sync" "testing" @@ -138,6 +139,42 @@ func BenchmarkXxx(b *testing.B) { } } +func Test_Pull_tag_chan(t *testing.T) { + mq := New() + ctx, cf := context.WithCancel(context.Background()) + ch := mq.Pull_tag_chan(`a`, 2, ctx) + for i := 0; i < 5; i++ { + mq.Push_tag(`a`, i) + } + var o = 0 + for s := true; s; { + select { + case i := <-ch: + o += i.(int) + default: + s = false + } + } + if o != 7 { + t.Fatal() + } + select { + case <-ch: + t.Fatal() + default: + } + cf() + mq.Push_tag(`a`, 1) + select { + case i := <-ch: + if i != nil { + t.Fatal() + } + default: + t.Fatal() + } +} + func Test_msgq1(t *testing.T) { mq := New() c := make(chan time.Time, 10) -- 2.39.2