From: qydysky Date: Sun, 30 Jul 2023 06:02:08 +0000 (+0800) Subject: add X-Git-Tag: v0.28.0+202307300daa8fa X-Git-Url: http://127.0.0.1:8081/?a=commitdiff_plain;h=0daa8fa8f74ac6047b6587238cbedcc27ce12255;p=part%2F.git add --- diff --git a/msgq/Msgq.go b/msgq/Msgq.go index 0173e1a..07f1583 100644 --- a/msgq/Msgq.go +++ b/msgq/Msgq.go @@ -36,28 +36,46 @@ func New(to ...time.Duration) *Msgq { return m } -func (m *Msgq) register(mp *msgqItem) { +func (m *Msgq) register(mp *msgqItem) (cancel func()) { ul := m.lock.Lock() m.funcs.PushBack(mp) ul() + return func() { + mp.disable.Store(true) + m.someNeedRemove.Store(true) + } } -func (m *Msgq) Register(f func(any) (disable bool)) { - m.register(&msgqItem{ +func (m *Msgq) Register(f func(any) (disable bool)) (cancel func()) { + mp := &msgqItem{ f: &f, - }) + } + m.register(mp) + return func() { + mp.disable.Store(true) + m.someNeedRemove.Store(true) + } } -func (m *Msgq) register_front(mp *msgqItem) { +func (m *Msgq) register_front(mp *msgqItem) (cancel func()) { ul := m.lock.Lock() m.funcs.PushFront(mp) ul() + return func() { + mp.disable.Store(true) + m.someNeedRemove.Store(true) + } } -func (m *Msgq) Register_front(f func(any) (disable bool)) { - m.register_front(&msgqItem{ +func (m *Msgq) Register_front(f func(any) (disable bool)) (cancel func()) { + mp := &msgqItem{ f: &f, - }) + } + m.register_front(mp) + return func() { + mp.disable.Store(true) + m.someNeedRemove.Store(true) + } } func (m *Msgq) Push(msg any) { @@ -220,42 +238,43 @@ func (m *Msgq) PushLock_tag(Tag string, Data any) { } } -func (m *Msgq) Pull_tag_chan(key string, size int, ctx context.Context) <-chan any { - var ch = make(chan any, size) +func (m *Msgq) Pull_tag_chan(key string, size int, ctx context.Context) (cancel func(), ch <-chan any) { + var c = make(chan any, size) var f1 = func(data any) bool { if d, ok := data.(*Msgq_tag_data); ok && d.Tag == key { select { case <-ctx.Done(): - close(ch) + close(c) return true default: for len(ch) != 0 { <-ch } - ch <- d.Data + c <- d.Data } } return false } - m.register_front(&msgqItem{ + cancel = m.register_front(&msgqItem{ f: &f1, }) - return ch + ch = c + return } -func (m *Msgq) Pull_tag_only(key string, f func(any) (disable bool)) { +func (m *Msgq) Pull_tag_only(key string, f func(any) (disable bool)) (cancel func()) { var f1 = func(data any) (disable bool) { if d, ok := data.(*Msgq_tag_data); ok && d.Tag == key { return f(d.Data) } return false } - m.register_front(&msgqItem{ + return m.register_front(&msgqItem{ f: &f1, }) } -func (m *Msgq) Pull_tag(func_map map[string]func(any) (disable bool)) { +func (m *Msgq) Pull_tag(func_map map[string]func(any) (disable bool)) (cancel func()) { var f1 = func(data any) (disable bool) { if d, ok := data.(*Msgq_tag_data); ok { if f, ok := func_map[d.Tag]; ok { @@ -264,12 +283,12 @@ func (m *Msgq) Pull_tag(func_map map[string]func(any) (disable bool)) { } return false } - m.register_front(&msgqItem{ + return m.register_front(&msgqItem{ f: &f1, }) } -func (m *Msgq) Pull_tag_async_only(key string, f func(any) (disable bool)) { +func (m *Msgq) Pull_tag_async_only(key string, f func(any) (disable bool)) (cancel func()) { var mi = msgqItem{} var f1 = func(data any) bool { if d, ok := data.(*Msgq_tag_data); ok { @@ -283,10 +302,10 @@ func (m *Msgq) Pull_tag_async_only(key string, f func(any) (disable bool)) { return false } mi.f = &f1 - m.register_front(&mi) + return m.register_front(&mi) } -func (m *Msgq) Pull_tag_async(func_map map[string]func(any) (disable bool)) { +func (m *Msgq) Pull_tag_async(func_map map[string]func(any) (disable bool)) (cancel func()) { var mi = msgqItem{} var f = func(data any) bool { if d, ok := data.(*Msgq_tag_data); ok { @@ -302,7 +321,7 @@ func (m *Msgq) Pull_tag_async(func_map map[string]func(any) (disable bool)) { return false } mi.f = &f - m.register_front(&mi) + return m.register_front(&mi) } type MsgType[T any] struct { @@ -421,32 +440,33 @@ func (m *MsgType[T]) PushLock_tag(Tag string, Data T) { } } -func (m *MsgType[T]) Pull_tag_chan(key string, size int, ctx context.Context) <-chan T { - var ch = make(chan T, size) +func (m *MsgType[T]) Pull_tag_chan(key string, size int, ctx context.Context) (cancel func(), ch <-chan T) { + var c = make(chan T, size) var f = func(data any) bool { if data1, ok := data.(*MsgType_tag_data[T]); ok { if data1.Tag == key { select { case <-ctx.Done(): - close(ch) + close(c) return true default: for len(ch) != 0 { <-ch } - ch <- *data1.Data + c <- *data1.Data } } } return false } - m.m.register(&msgqItem{ + cancel = m.m.register(&msgqItem{ f: &f, }) - return ch + ch = c + return } -func (m *MsgType[T]) Pull_tag_only(key string, f func(T) (disable bool)) { +func (m *MsgType[T]) Pull_tag_only(key string, f func(T) (disable bool)) (cancel func()) { var f1 = func(data any) (disable bool) { if data1, ok := data.(*MsgType_tag_data[T]); ok { if data1.Tag == key { @@ -455,12 +475,12 @@ func (m *MsgType[T]) Pull_tag_only(key string, f func(T) (disable bool)) { } return false } - m.m.register(&msgqItem{ + return m.m.register(&msgqItem{ f: &f1, }) } -func (m *MsgType[T]) Pull_tag(func_map map[string]func(T) (disable bool)) { +func (m *MsgType[T]) Pull_tag(func_map map[string]func(T) (disable bool)) (cancel func()) { var f = func(data any) (disable bool) { if data1, ok := data.(*MsgType_tag_data[T]); ok { if f, ok := func_map[data1.Tag]; ok { @@ -469,12 +489,12 @@ func (m *MsgType[T]) Pull_tag(func_map map[string]func(T) (disable bool)) { } return false } - m.m.register(&msgqItem{ + return m.m.register(&msgqItem{ f: &f, }) } -func (m *MsgType[T]) Pull_tag_async_only(key string, f func(T) (disable bool)) { +func (m *MsgType[T]) Pull_tag_async_only(key string, f func(T) (disable bool)) (cancel func()) { var mi = msgqItem{} var f1 = func(data any) bool { if d, ok := data.(*MsgType_tag_data[T]); ok { @@ -488,10 +508,10 @@ func (m *MsgType[T]) Pull_tag_async_only(key string, f func(T) (disable bool)) { return false } mi.f = &f1 - m.m.register_front(&mi) + return m.m.register_front(&mi) } -func (m *MsgType[T]) Pull_tag_async(func_map map[string]func(T) (disable bool)) { +func (m *MsgType[T]) Pull_tag_async(func_map map[string]func(T) (disable bool)) (cancel func()) { var mi = msgqItem{} var f = func(data any) bool { if d, ok := data.(*MsgType_tag_data[T]); ok { @@ -507,5 +527,5 @@ func (m *MsgType[T]) Pull_tag_async(func_map map[string]func(T) (disable bool)) return false } mi.f = &f - m.m.register_front(&mi) + return m.m.register_front(&mi) } diff --git a/msgq/Msgq_test.go b/msgq/Msgq_test.go index 1cf194b..5fb2664 100644 --- a/msgq/Msgq_test.go +++ b/msgq/Msgq_test.go @@ -165,6 +165,19 @@ func Benchmark_1(b *testing.B) { } } +func Test_2(t *testing.T) { + mq := New() + cancel := mq.Pull_tag(FuncMap{ + `del`: func(a any) (disable bool) { + t.Fatal() + return false + }, + }) + time.Sleep(time.Millisecond * 500) + cancel() + mq.PushLock_tag(`del`, nil) +} + func Test_1(t *testing.T) { mq := New(time.Millisecond*5, time.Millisecond*10) go mq.Push_tag(`del`, nil) @@ -211,7 +224,7 @@ func Test_3(t *testing.T) { func Test_Pull_tag_chan(t *testing.T) { mq := New() ctx, cf := context.WithCancel(context.Background()) - ch := mq.Pull_tag_chan(`a`, 2, ctx) + _, ch := mq.Pull_tag_chan(`a`, 2, ctx) for i := 0; i < 5; i++ { mq.Push_tag(`a`, i) }