m.lock.Unlock()
}
+func (m *Msgq) Register_front(f func(any) (disable bool)) {
+ m.lock.Lock()
+ m.funcs.PushFront(f)
+ m.lock.Unlock()
+}
+
func (m *Msgq) Push(msg any) {
for m.someNeedRemove.Load() != 0 {
time.Sleep(time.Millisecond)
})
}
+func (m *Msgq) Pull_tag_only(key string, f func(any) (disable bool)) {
+ m.Register(func(data any) (disable bool) {
+ if d, ok := data.(Msgq_tag_data); ok && d.Tag == key {
+ return f(d.Data)
+ }
+ return false
+ })
+}
+
func (m *Msgq) Pull_tag(func_map map[string]func(any) (disable bool)) {
m.Register(func(data any) (disable bool) {
if d, ok := data.(Msgq_tag_data); ok {
})
}
+func (m *Msgq) Pull_tag_async_only(key string, f func(any) (disable bool)) {
+ var disable = false
+
+ m.Register_front(func(data any) bool {
+ if disable {
+ return true
+ }
+ if d, ok := data.(Msgq_tag_data); ok && d.Tag == key {
+ go func(t *bool) {
+ *t = f(d.Data)
+ }(&disable)
+ }
+ return false
+ })
+}
+
+func (m *Msgq) Pull_tag_async(func_map map[string]func(any) (disable bool)) {
+ var disable = false
+
+ m.Register_front(func(data any) bool {
+ if disable {
+ return true
+ }
+ if d, ok := data.(Msgq_tag_data); ok {
+ if f, ok := func_map[d.Tag]; ok {
+ go func(t *bool) {
+ *t = f(d.Data)
+ }(&disable)
+ }
+ }
+ return false
+ })
+}
+
type MsgType[T any] struct {
m *Msgq
}
})
}
+func (m *MsgType[T]) Pull_tag_only(key string, f func(T) (disable bool)) {
+ m.m.Register(func(data any) (disable bool) {
+ if d, ok := data.(Msgq_tag_data); ok && d.Tag == key {
+ return f(d.Data.(T))
+ }
+ return false
+ })
+}
+
func (m *MsgType[T]) Pull_tag(func_map map[string]func(T) (disable bool)) {
m.m.Register(func(data any) (disable bool) {
if d, ok := data.(Msgq_tag_data); ok {
return false
})
}
+
+func (m *MsgType[T]) Pull_tag_async_only(key string, f func(T) (disable bool)) {
+ var disable = false
+
+ m.m.Register_front(func(data any) bool {
+ if disable {
+ return true
+ }
+ if d, ok := data.(Msgq_tag_data); ok && d.Tag == key {
+ go func(t *bool) {
+ *t = f(d.Data.(T))
+ }(&disable)
+ }
+ return false
+ })
+}
+
+func (m *MsgType[T]) Pull_tag_async(func_map map[string]func(T) (disable bool)) {
+ var disable = false
+
+ m.m.Register_front(func(data any) bool {
+ if disable {
+ return true
+ }
+ if d, ok := data.(Msgq_tag_data); ok {
+ if f, ok := func_map[d.Tag]; ok {
+ go func(t *bool) {
+ *t = f(d.Data.(T))
+ }(&disable)
+ }
+ }
+ return false
+ })
+}
time.Sleep(time.Second)
}
+func Test_msgq7(t *testing.T) {
+ msg := NewType[int]()
+ msg.Pull_tag_async_only(`1`, func(i int) (disable bool) {
+ time.Sleep(time.Second)
+ t.Log(`async1`)
+ return i > 10
+ })
+ msg.Pull_tag_async_only(`1`, func(i int) (disable bool) {
+ time.Sleep(time.Second * 2)
+ t.Log(`async2`)
+ return i > 10
+ })
+ msg.Pull_tag_only(`1`, func(i int) (disable bool) {
+ time.Sleep(time.Second * 2)
+ t.Log(`sync1`)
+ return i > 10
+ })
+ msg.Pull_tag_only(`1`, func(i int) (disable bool) {
+ time.Sleep(time.Second * 2)
+ t.Log(`sync2`)
+ return i > 10
+ })
+ msg.Push_tag(`1`, 0)
+ time.Sleep(time.Second * 10)
+}
+
+func Test_msgq8(t *testing.T) {
+ msg := NewType[int]()
+ msg.Pull_tag_async_only(`1`, func(i int) (disable bool) {
+ time.Sleep(time.Second)
+ t.Logf("async %d", i)
+ return i > 3
+ })
+ msg.Pull_tag_only(`1`, func(i int) (disable bool) {
+ time.Sleep(time.Second)
+ t.Logf("sync %d", i)
+ return i > 5
+ })
+ for i := 0; i < 20; i++ {
+ msg.Push_tag(`1`, i)
+ }
+ time.Sleep(time.Second * 10)
+}
+
// func Test_msgq6(t *testing.T) {
// mq := New()
// go mq.Pull_tag(map[string]func(interface{}) bool{