}
type FuncMap map[string]func(any) (disable bool)
+type FuncMapType[T any] map[string]func(T) (disable bool)
// to[0]:timeout to wait to[1]:timeout to run
func New(to ...time.Duration) *Msgq {
return m.register_front(&mi)
}
+func (m *Msgq) Pull_tag_async_order(func_map map[string]func(any) (disable bool)) (cancel func()) {
+ var mi = msgqItem{}
+ var order atomic.Pointer[orderFunc]
+ order.Store(&orderFunc{
+ f: func() {},
+ next: &orderFunc{},
+ })
+ var f = func(data any) bool {
+ if d, ok := data.(*Msgq_tag_data); ok {
+ if f, ok := func_map[d.Tag]; ok {
+ p := order.Load()
+ p.f = func() {
+ if f(d.Data) {
+ mi.disable.Store(true)
+ m.someNeedRemove.Store(true)
+ }
+ if p.next.f != nil {
+ p.next.f()
+ }
+ }
+ p.next = &orderFunc{}
+ order.Store(p.next)
+ p.f()
+ }
+ }
+ return false
+ }
+ mi.f = &f
+ return m.register_front(&mi)
+}
+
type MsgType[T any] struct {
m *Msgq
}
mi.f = &f
return m.m.register_front(&mi)
}
+
+type orderFunc struct {
+ f func()
+ next *orderFunc
+}
+
+func (m *MsgType[T]) Pull_tag_async_order(func_map map[string]func(T) (disable bool)) (cancel func()) {
+ var mi = msgqItem{}
+ var order atomic.Pointer[orderFunc]
+ order.Store(&orderFunc{
+ f: func() {},
+ next: &orderFunc{},
+ })
+ var f = func(data any) bool {
+ if d, ok := data.(*MsgType_tag_data[T]); ok {
+ if f, ok := func_map[d.Tag]; ok {
+ p := order.Load()
+ p.f = func() {
+ if f(*d.Data) {
+ mi.disable.Store(true)
+ m.m.someNeedRemove.Store(true)
+ }
+ if p.next.f != nil {
+ p.next.f()
+ }
+ }
+ p.next = &orderFunc{}
+ order.Store(p.next)
+ p.f()
+ }
+ }
+ return false
+ }
+ mi.f = &f
+ return m.m.register_front(&mi)
+}
}
}
+func Benchmark_3(b *testing.B) {
+ mq := NewType[int]()
+ mq.Pull_tag_async_order(FuncMapType[int]{
+ `del`: func(a int) (disable bool) {
+ return false
+ },
+ })
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ mq.Push_tag(`del`, i)
+ }
+}
+
+func Test_5(t *testing.T) {
+ c := make(chan int, 1000)
+ mq := NewType[int]()
+ cancel := mq.Pull_tag_async_order(FuncMapType[int]{
+ `del`: func(a int) (disable bool) {
+ c <- a
+ return false
+ },
+ })
+ time.Sleep(time.Millisecond * 500)
+ for i := 0; i < 100; i++ {
+ mq.Push_tag(`del`, i)
+ }
+ cancel()
+ for i := 0; i < 100; i++ {
+ if i != <-c {
+ t.FailNow()
+ }
+ }
+}
+
func Test_4(t *testing.T) {
mq := New()
cancel := mq.Pull_tag(FuncMap{