]> 127.0.0.1 Git - part/.git/commitdiff
Add v0.26.1
authorqydysky <qydysky@foxmail.com>
Tue, 9 May 2023 17:34:10 +0000 (01:34 +0800)
committerqydysky <qydysky@foxmail.com>
Tue, 9 May 2023 17:34:10 +0000 (01:34 +0800)
msgq/Msgq.go
msgq/Msgq_test.go

index a6b8e3ee4624c2145a3eee69c54aa4ba0d9ad2fb..11a96821c932714087e7fcceac70cac66314645e 100644 (file)
@@ -2,6 +2,7 @@ package part
 
 import (
        "container/list"
+       "context"
        "runtime"
        "sync"
        "sync/atomic"
@@ -135,6 +136,27 @@ func (m *Msgq) PushLock_tag(Tag string, Data any) {
        })
 }
 
+func (m *Msgq) Pull_tag_chan(key string, size int, ctx context.Context) chan any {
+       var ch = make(chan any, size)
+       m.Register(func(data any) bool {
+               if d, ok := data.(Msgq_tag_data); ok && d.Tag == key {
+                       select {
+                       case <-ctx.Done():
+                               close(ch)
+                               return true
+                       default:
+                               if len(ch) == size {
+                                       <-ch
+                               }
+                               ch <- d.Data
+                               return false
+                       }
+               }
+               return false
+       })
+       return ch
+}
+
 func (m *Msgq) Pull_tag_only(key string, f func(any) (disable bool)) {
        m.Register(func(data any) (disable bool) {
                if d, ok := data.(Msgq_tag_data); ok && d.Tag == key {
@@ -221,6 +243,27 @@ func (m *MsgType[T]) ClearAll() {
        m.m.ClearAll()
 }
 
+func (m *MsgType[T]) Pull_tag_chan(key string, size int, ctx context.Context) chan T {
+       var ch = make(chan T, size)
+       m.m.Register(func(data any) bool {
+               if d, ok := data.(Msgq_tag_data); ok && d.Tag == key {
+                       select {
+                       case <-ctx.Done():
+                               close(ch)
+                               return true
+                       default:
+                               if len(ch) == size {
+                                       <-ch
+                               }
+                               ch <- d.Data.(T)
+                               return false
+                       }
+               }
+               return false
+       })
+       return ch
+}
+
 func (m *MsgType[T]) Pull_tag_only(key string, f func(T) (disable bool)) {
        m.m.Register(func(data any) (disable bool) {
                if d, ok := data.(Msgq_tag_data); ok && d.Tag == key {
index 4e2f991216396b92cf15f9f8020f42b5eadc9753..037ea9e7e31130872af77cada4f56fa38a8b1e1b 100644 (file)
@@ -1,6 +1,7 @@
 package part
 
 import (
+       "context"
        _ "net/http/pprof"
        "sync"
        "testing"
@@ -138,6 +139,42 @@ func BenchmarkXxx(b *testing.B) {
        }
 }
 
+func Test_Pull_tag_chan(t *testing.T) {
+       mq := New()
+       ctx, cf := context.WithCancel(context.Background())
+       ch := mq.Pull_tag_chan(`a`, 2, ctx)
+       for i := 0; i < 5; i++ {
+               mq.Push_tag(`a`, i)
+       }
+       var o = 0
+       for s := true; s; {
+               select {
+               case i := <-ch:
+                       o += i.(int)
+               default:
+                       s = false
+               }
+       }
+       if o != 7 {
+               t.Fatal()
+       }
+       select {
+       case <-ch:
+               t.Fatal()
+       default:
+       }
+       cf()
+       mq.Push_tag(`a`, 1)
+       select {
+       case i := <-ch:
+               if i != nil {
+                       t.Fatal()
+               }
+       default:
+               t.Fatal()
+       }
+}
+
 func Test_msgq1(t *testing.T) {
        mq := New()
        c := make(chan time.Time, 10)