]> 127.0.0.1 Git - part/.git/commitdiff
add v0.28.0+202307300daa8fa
authorqydysky <qydysky@foxmail.com>
Sun, 30 Jul 2023 06:02:08 +0000 (14:02 +0800)
committerqydysky <qydysky@foxmail.com>
Sun, 30 Jul 2023 06:02:08 +0000 (14:02 +0800)
msgq/Msgq.go
msgq/Msgq_test.go

index 0173e1aad1971c3f05e646b43a7e6e21063320a5..07f15836473c517a1623038f9d23ba6b37fc1e41 100644 (file)
@@ -36,28 +36,46 @@ func New(to ...time.Duration) *Msgq {
        return m
 }
 
-func (m *Msgq) register(mp *msgqItem) {
+func (m *Msgq) register(mp *msgqItem) (cancel func()) {
        ul := m.lock.Lock()
        m.funcs.PushBack(mp)
        ul()
+       return func() {
+               mp.disable.Store(true)
+               m.someNeedRemove.Store(true)
+       }
 }
 
-func (m *Msgq) Register(f func(any) (disable bool)) {
-       m.register(&msgqItem{
+func (m *Msgq) Register(f func(any) (disable bool)) (cancel func()) {
+       mp := &msgqItem{
                f: &f,
-       })
+       }
+       m.register(mp)
+       return func() {
+               mp.disable.Store(true)
+               m.someNeedRemove.Store(true)
+       }
 }
 
-func (m *Msgq) register_front(mp *msgqItem) {
+func (m *Msgq) register_front(mp *msgqItem) (cancel func()) {
        ul := m.lock.Lock()
        m.funcs.PushFront(mp)
        ul()
+       return func() {
+               mp.disable.Store(true)
+               m.someNeedRemove.Store(true)
+       }
 }
 
-func (m *Msgq) Register_front(f func(any) (disable bool)) {
-       m.register_front(&msgqItem{
+func (m *Msgq) Register_front(f func(any) (disable bool)) (cancel func()) {
+       mp := &msgqItem{
                f: &f,
-       })
+       }
+       m.register_front(mp)
+       return func() {
+               mp.disable.Store(true)
+               m.someNeedRemove.Store(true)
+       }
 }
 
 func (m *Msgq) Push(msg any) {
@@ -220,42 +238,43 @@ func (m *Msgq) PushLock_tag(Tag string, Data any) {
        }
 }
 
-func (m *Msgq) Pull_tag_chan(key string, size int, ctx context.Context) <-chan any {
-       var ch = make(chan any, size)
+func (m *Msgq) Pull_tag_chan(key string, size int, ctx context.Context) (cancel func(), ch <-chan any) {
+       var c = make(chan any, size)
        var f1 = func(data any) bool {
                if d, ok := data.(*Msgq_tag_data); ok && d.Tag == key {
                        select {
                        case <-ctx.Done():
-                               close(ch)
+                               close(c)
                                return true
                        default:
                                for len(ch) != 0 {
                                        <-ch
                                }
-                               ch <- d.Data
+                               c <- d.Data
                        }
                }
                return false
        }
-       m.register_front(&msgqItem{
+       cancel = m.register_front(&msgqItem{
                f: &f1,
        })
-       return ch
+       ch = c
+       return
 }
 
-func (m *Msgq) Pull_tag_only(key string, f func(any) (disable bool)) {
+func (m *Msgq) Pull_tag_only(key string, f func(any) (disable bool)) (cancel func()) {
        var f1 = func(data any) (disable bool) {
                if d, ok := data.(*Msgq_tag_data); ok && d.Tag == key {
                        return f(d.Data)
                }
                return false
        }
-       m.register_front(&msgqItem{
+       return m.register_front(&msgqItem{
                f: &f1,
        })
 }
 
-func (m *Msgq) Pull_tag(func_map map[string]func(any) (disable bool)) {
+func (m *Msgq) Pull_tag(func_map map[string]func(any) (disable bool)) (cancel func()) {
        var f1 = func(data any) (disable bool) {
                if d, ok := data.(*Msgq_tag_data); ok {
                        if f, ok := func_map[d.Tag]; ok {
@@ -264,12 +283,12 @@ func (m *Msgq) Pull_tag(func_map map[string]func(any) (disable bool)) {
                }
                return false
        }
-       m.register_front(&msgqItem{
+       return m.register_front(&msgqItem{
                f: &f1,
        })
 }
 
-func (m *Msgq) Pull_tag_async_only(key string, f func(any) (disable bool)) {
+func (m *Msgq) Pull_tag_async_only(key string, f func(any) (disable bool)) (cancel func()) {
        var mi = msgqItem{}
        var f1 = func(data any) bool {
                if d, ok := data.(*Msgq_tag_data); ok {
@@ -283,10 +302,10 @@ func (m *Msgq) Pull_tag_async_only(key string, f func(any) (disable bool)) {
                return false
        }
        mi.f = &f1
-       m.register_front(&mi)
+       return m.register_front(&mi)
 }
 
-func (m *Msgq) Pull_tag_async(func_map map[string]func(any) (disable bool)) {
+func (m *Msgq) Pull_tag_async(func_map map[string]func(any) (disable bool)) (cancel func()) {
        var mi = msgqItem{}
        var f = func(data any) bool {
                if d, ok := data.(*Msgq_tag_data); ok {
@@ -302,7 +321,7 @@ func (m *Msgq) Pull_tag_async(func_map map[string]func(any) (disable bool)) {
                return false
        }
        mi.f = &f
-       m.register_front(&mi)
+       return m.register_front(&mi)
 }
 
 type MsgType[T any] struct {
@@ -421,32 +440,33 @@ func (m *MsgType[T]) PushLock_tag(Tag string, Data T) {
        }
 }
 
-func (m *MsgType[T]) Pull_tag_chan(key string, size int, ctx context.Context) <-chan T {
-       var ch = make(chan T, size)
+func (m *MsgType[T]) Pull_tag_chan(key string, size int, ctx context.Context) (cancel func(), ch <-chan T) {
+       var c = make(chan T, size)
        var f = func(data any) bool {
                if data1, ok := data.(*MsgType_tag_data[T]); ok {
                        if data1.Tag == key {
                                select {
                                case <-ctx.Done():
-                                       close(ch)
+                                       close(c)
                                        return true
                                default:
                                        for len(ch) != 0 {
                                                <-ch
                                        }
-                                       ch <- *data1.Data
+                                       c <- *data1.Data
                                }
                        }
                }
                return false
        }
-       m.m.register(&msgqItem{
+       cancel = m.m.register(&msgqItem{
                f: &f,
        })
-       return ch
+       ch = c
+       return
 }
 
-func (m *MsgType[T]) Pull_tag_only(key string, f func(T) (disable bool)) {
+func (m *MsgType[T]) Pull_tag_only(key string, f func(T) (disable bool)) (cancel func()) {
        var f1 = func(data any) (disable bool) {
                if data1, ok := data.(*MsgType_tag_data[T]); ok {
                        if data1.Tag == key {
@@ -455,12 +475,12 @@ func (m *MsgType[T]) Pull_tag_only(key string, f func(T) (disable bool)) {
                }
                return false
        }
-       m.m.register(&msgqItem{
+       return m.m.register(&msgqItem{
                f: &f1,
        })
 }
 
-func (m *MsgType[T]) Pull_tag(func_map map[string]func(T) (disable bool)) {
+func (m *MsgType[T]) Pull_tag(func_map map[string]func(T) (disable bool)) (cancel func()) {
        var f = func(data any) (disable bool) {
                if data1, ok := data.(*MsgType_tag_data[T]); ok {
                        if f, ok := func_map[data1.Tag]; ok {
@@ -469,12 +489,12 @@ func (m *MsgType[T]) Pull_tag(func_map map[string]func(T) (disable bool)) {
                }
                return false
        }
-       m.m.register(&msgqItem{
+       return m.m.register(&msgqItem{
                f: &f,
        })
 }
 
-func (m *MsgType[T]) Pull_tag_async_only(key string, f func(T) (disable bool)) {
+func (m *MsgType[T]) Pull_tag_async_only(key string, f func(T) (disable bool)) (cancel func()) {
        var mi = msgqItem{}
        var f1 = func(data any) bool {
                if d, ok := data.(*MsgType_tag_data[T]); ok {
@@ -488,10 +508,10 @@ func (m *MsgType[T]) Pull_tag_async_only(key string, f func(T) (disable bool)) {
                return false
        }
        mi.f = &f1
-       m.m.register_front(&mi)
+       return m.m.register_front(&mi)
 }
 
-func (m *MsgType[T]) Pull_tag_async(func_map map[string]func(T) (disable bool)) {
+func (m *MsgType[T]) Pull_tag_async(func_map map[string]func(T) (disable bool)) (cancel func()) {
        var mi = msgqItem{}
        var f = func(data any) bool {
                if d, ok := data.(*MsgType_tag_data[T]); ok {
@@ -507,5 +527,5 @@ func (m *MsgType[T]) Pull_tag_async(func_map map[string]func(T) (disable bool))
                return false
        }
        mi.f = &f
-       m.m.register_front(&mi)
+       return m.m.register_front(&mi)
 }
index 1cf194b2a92b2ab95f03107f1546711444e8d86a..5fb2664e0bf7fbb69648b5c0e0cdb5691ba0f084 100644 (file)
@@ -165,6 +165,19 @@ func Benchmark_1(b *testing.B) {
        }
 }
 
+func Test_2(t *testing.T) {
+       mq := New()
+       cancel := mq.Pull_tag(FuncMap{
+               `del`: func(a any) (disable bool) {
+                       t.Fatal()
+                       return false
+               },
+       })
+       time.Sleep(time.Millisecond * 500)
+       cancel()
+       mq.PushLock_tag(`del`, nil)
+}
+
 func Test_1(t *testing.T) {
        mq := New(time.Millisecond*5, time.Millisecond*10)
        go mq.Push_tag(`del`, nil)
@@ -211,7 +224,7 @@ func Test_3(t *testing.T) {
 func Test_Pull_tag_chan(t *testing.T) {
        mq := New()
        ctx, cf := context.WithCancel(context.Background())
-       ch := mq.Pull_tag_chan(`a`, 2, ctx)
+       _, ch := mq.Pull_tag_chan(`a`, 2, ctx)
        for i := 0; i < 5; i++ {
                mq.Push_tag(`a`, i)
        }