type Msgq struct {
funcs *list.List
someNeedRemove atomic.Int32
- sync.RWMutex
+ lock sync.RWMutex
}
type FuncMap map[string]func(any) (disable bool)
}
func (m *Msgq) Register(f func(any) (disable bool)) {
- m.Lock()
+ m.lock.Lock()
m.funcs.PushBack(f)
- m.Unlock()
+ m.lock.Unlock()
}
func (m *Msgq) Push(msg any) {
var removes []*list.Element
- m.RLock()
+ m.lock.RLock()
for el := m.funcs.Front(); el != nil; el = el.Next() {
if disable := el.Value.(func(any) bool)(msg); disable {
m.someNeedRemove.Add(1)
removes = append(removes, el)
}
}
- m.RUnlock()
+ m.lock.RUnlock()
if len(removes) != 0 {
- m.Lock()
+ m.lock.Lock()
m.someNeedRemove.Add(-int32(len(removes)))
for i := 0; i < len(removes); i++ {
m.funcs.Remove(removes[i])
}
- m.Unlock()
+ m.lock.Unlock()
removes = nil
}
}
return false
})
}
+
+type MsgType[T any] struct {
+ m *Msgq
+}
+
+func (m *MsgType[T]) Push_tag(Tag string, Data T) {
+ if m.m == nil {
+ m.m = New()
+ }
+ m.m.Push(Msgq_tag_data{
+ Tag: Tag,
+ Data: Data,
+ })
+}
+
+func (m *MsgType[T]) Pull_tag(func_map map[string]func(T) (disable bool)) {
+ if m.m == nil {
+ m.m = New()
+ }
+ m.m.Register(func(data any) (disable bool) {
+ if d, ok := data.(Msgq_tag_data); ok {
+ if f, ok := func_map[d.Tag]; ok {
+ return f(d.Data.(T))
+ }
+ }
+ return false
+ })
+}
t.Log(`fin`)
}
+func Test_msgq6(t *testing.T) {
+ msg := MsgType[int]{}
+ msg.Pull_tag(map[string]func(int) (disable bool){
+ `1`: func(b int) (disable bool) {
+ if b != 0 {
+ t.Fatal()
+ }
+ t.Log(b)
+ return false
+ },
+ })
+ msg.Push_tag(`1`, 0)
+ time.Sleep(time.Second)
+}
+
// func Test_msgq6(t *testing.T) {
// mq := New()
// go mq.Pull_tag(map[string]func(interface{}) bool{