return m.register_front(&mi)
}
-func (m *Msgq) Pull_tag_async_order(func_map map[string]func(any) (disable bool)) (cancel func()) {
- var mi = msgqItem{}
- var order atomic.Pointer[orderFunc]
- order.Store(&orderFunc{
- f: func() {},
- next: &orderFunc{},
- })
- var f = func(data any) bool {
- if d, ok := data.(*Msgq_tag_data); ok {
- if f, ok := func_map[d.Tag]; ok {
- p := order.Load()
- p.f = func() {
- if f(d.Data) {
- mi.disable.Store(true)
- m.someNeedRemove.Store(true)
- }
- if p.next.f != nil {
- p.next.f()
- }
- }
- p.next = &orderFunc{}
- order.Store(p.next)
- p.f()
- }
- }
- return false
- }
- mi.f = &f
- return m.register_front(&mi)
-}
-
type MsgType[T any] struct {
m *Msgq
}
mi.f = &f
return m.m.register_front(&mi)
}
-
-type orderFunc struct {
- f func()
- next *orderFunc
-}
-
-func (m *MsgType[T]) Pull_tag_async_order(func_map map[string]func(T) (disable bool)) (cancel func()) {
- var mi = msgqItem{}
- var order atomic.Pointer[orderFunc]
- order.Store(&orderFunc{
- f: func() {},
- next: &orderFunc{},
- })
- var f = func(data any) bool {
- if d, ok := data.(*MsgType_tag_data[T]); ok {
- if f, ok := func_map[d.Tag]; ok {
- p := order.Load()
- p.f = func() {
- if f(*d.Data) {
- mi.disable.Store(true)
- m.m.someNeedRemove.Store(true)
- }
- if p.next.f != nil {
- p.next.f()
- }
- }
- p.next = &orderFunc{}
- order.Store(p.next)
- p.f()
- }
- }
- return false
- }
- mi.f = &f
- return m.m.register_front(&mi)
-}
}
}
-func Benchmark_3(b *testing.B) {
- mq := NewType[int]()
- mq.Pull_tag_async_order(FuncMapType[int]{
- `del`: func(a int) (disable bool) {
- return false
- },
- })
- b.ResetTimer()
- for i := 0; i < b.N; i++ {
- mq.Push_tag(`del`, i)
- }
-}
-
-func Test_5(t *testing.T) {
- c := make(chan int, 1000)
- mq := NewType[int]()
- cancel := mq.Pull_tag_async_order(FuncMapType[int]{
- `del`: func(a int) (disable bool) {
- c <- a
- return false
- },
- })
- time.Sleep(time.Millisecond * 500)
- for i := 0; i < 100; i++ {
- mq.Push_tag(`del`, i)
- }
- cancel()
- for i := 0; i < 100; i++ {
- if i != <-c {
- t.FailNow()
- }
- }
-}
-
func Test_4(t *testing.T) {
mq := New()
cancel := mq.Pull_tag(FuncMap{