import (
"container/list"
+ "context"
"runtime"
"sync"
"sync/atomic"
})
}
+func (m *Msgq) Pull_tag_chan(key string, size int, ctx context.Context) chan any {
+ var ch = make(chan any, size)
+ m.Register(func(data any) bool {
+ if d, ok := data.(Msgq_tag_data); ok && d.Tag == key {
+ select {
+ case <-ctx.Done():
+ close(ch)
+ return true
+ default:
+ if len(ch) == size {
+ <-ch
+ }
+ ch <- d.Data
+ return false
+ }
+ }
+ return false
+ })
+ return ch
+}
+
func (m *Msgq) Pull_tag_only(key string, f func(any) (disable bool)) {
m.Register(func(data any) (disable bool) {
if d, ok := data.(Msgq_tag_data); ok && d.Tag == key {
m.m.ClearAll()
}
+func (m *MsgType[T]) Pull_tag_chan(key string, size int, ctx context.Context) chan T {
+ var ch = make(chan T, size)
+ m.m.Register(func(data any) bool {
+ if d, ok := data.(Msgq_tag_data); ok && d.Tag == key {
+ select {
+ case <-ctx.Done():
+ close(ch)
+ return true
+ default:
+ if len(ch) == size {
+ <-ch
+ }
+ ch <- d.Data.(T)
+ return false
+ }
+ }
+ return false
+ })
+ return ch
+}
+
func (m *MsgType[T]) Pull_tag_only(key string, f func(T) (disable bool)) {
m.m.Register(func(data any) (disable bool) {
if d, ok := data.(Msgq_tag_data); ok && d.Tag == key {
package part
import (
+ "context"
_ "net/http/pprof"
"sync"
"testing"
}
}
+func Test_Pull_tag_chan(t *testing.T) {
+ mq := New()
+ ctx, cf := context.WithCancel(context.Background())
+ ch := mq.Pull_tag_chan(`a`, 2, ctx)
+ for i := 0; i < 5; i++ {
+ mq.Push_tag(`a`, i)
+ }
+ var o = 0
+ for s := true; s; {
+ select {
+ case i := <-ch:
+ o += i.(int)
+ default:
+ s = false
+ }
+ }
+ if o != 7 {
+ t.Fatal()
+ }
+ select {
+ case <-ch:
+ t.Fatal()
+ default:
+ }
+ cf()
+ mq.Push_tag(`a`, 1)
+ select {
+ case i := <-ch:
+ if i != nil {
+ t.Fatal()
+ }
+ default:
+ t.Fatal()
+ }
+}
+
func Test_msgq1(t *testing.T) {
mq := New()
c := make(chan time.Time, 10)