import (
"container/list"
+ "runtime"
"sync"
+ "sync/atomic"
+ "time"
)
type Msgq struct {
- funcs *list.List
+ funcs *list.List
+ someNeedRemove atomic.Int32
sync.RWMutex
}
}
func (m *Msgq) Push(msg any) {
+ for m.someNeedRemove.Load() != 0 {
+ time.Sleep(time.Millisecond)
+ runtime.Gosched()
+ }
+
+ var removes []*list.Element
+
m.RLock()
for el := m.funcs.Front(); el != nil; el = el.Next() {
if disable := el.Value.(func(any) bool)(msg); disable {
- m.funcs.Remove(el)
+ m.someNeedRemove.Add(1)
+ removes = append(removes, el)
}
}
m.RUnlock()
+
+ if len(removes) != 0 {
+ m.Lock()
+ m.someNeedRemove.Add(-int32(len(removes)))
+ for i := 0; i < len(removes); i++ {
+ m.funcs.Remove(removes[i])
+ }
+ m.Unlock()
+ removes = nil
+ }
}
type Msgq_tag_data struct {
Tag string
- Data interface{}
+ Data any
}
-func (m *Msgq) Push_tag(Tag string, Data interface{}) {
+func (m *Msgq) Push_tag(Tag string, Data any) {
m.Push(Msgq_tag_data{
Tag: Tag,
Data: Data,
func (m *Msgq) Pull_tag(func_map map[string]func(any) (disable bool)) {
m.Register(func(data any) (disable bool) {
- if d, ok := data.(Msgq_tag_data); !ok {
- if f, ok := func_map[`Error`]; ok {
- return f(d.Data)
- }
- } else {
+ if d, ok := data.(Msgq_tag_data); ok {
if f, ok := func_map[d.Tag]; ok {
return f(d.Data)
}
// t.Log(`fin`)
// }
+func BenchmarkXxx(b *testing.B) {
+ mq := New()
+ mq.Pull_tag(map[string]func(any) bool{
+ `1`: func(_ any) bool {
+ return false
+ },
+ })
+ mq.Pull_tag(map[string]func(any) bool{
+ `2`: func(_ any) bool {
+ return false
+ },
+ })
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ mq.Push_tag(`1`, nil)
+ if i == b.N/2 {
+ mq.Push_tag(`2`, nil)
+ }
+ }
+}
+
+func Test_msgq2(t *testing.T) {
+ mq := New()
+
+ mq.Pull_tag(map[string]func(any) bool{
+ `A1`: func(data any) bool {
+ if v, ok := data.(bool); ok {
+ return v
+ }
+ return false
+ },
+ `A3`: func(data any) bool {
+ if v, ok := data.(int); ok && v > 50 {
+ t.Fatal()
+ }
+ return false
+ },
+ })
+
+ mq.Pull_tag(map[string]func(any) bool{
+ `A2`: func(data any) bool {
+ if v, ok := data.(bool); ok {
+ return v
+ }
+ return false
+ },
+ `A3`: func(data any) bool {
+ // if v, ok := data.(int); ok {
+ // fmt.Println(`A2A3`, v)
+ // }
+ return false
+ },
+ })
+
+ for i := 0; i < 1000; i++ {
+ if i == 50 {
+ mq.Push_tag(`A1`, true)
+ }
+ mq.Push_tag(`A3`, i)
+ }
+ t.Log(`fin`)
+}
+
func Test_msgq3(t *testing.T) {
mq := New()