}
}
+func (m *Msgq) PushLock(msg any) {
+ for m.someNeedRemove.Load() != 0 {
+ time.Sleep(time.Millisecond)
+ runtime.Gosched()
+ }
+
+ m.lock.Lock()
+ defer m.lock.Unlock()
+
+ var removes []*list.Element
+
+ for el := m.funcs.Front(); el != nil; el = el.Next() {
+ if disable := el.Value.(func(any) bool)(msg); disable {
+ m.someNeedRemove.Add(1)
+ removes = append(removes, el)
+ }
+ }
+
+ if len(removes) != 0 {
+ m.someNeedRemove.Add(-int32(len(removes)))
+ for i := 0; i < len(removes); i++ {
+ m.funcs.Remove(removes[i])
+ }
+ removes = nil
+ }
+}
+
type Msgq_tag_data struct {
Tag string
Data any
})
}
+func (m *Msgq) PushLock_tag(Tag string, Data any) {
+ m.PushLock(Msgq_tag_data{
+ Tag: Tag,
+ Data: Data,
+ })
+}
+
func (m *Msgq) Pull_tag_only(key string, f func(any) (disable bool)) {
m.Register(func(data any) (disable bool) {
if d, ok := data.(Msgq_tag_data); ok && d.Tag == key {
})
}
+func (m *MsgType[T]) PushLock_tag(Tag string, Data T) {
+ m.m.Push(Msgq_tag_data{
+ Tag: Tag,
+ Data: Data,
+ })
+}
+
func (m *MsgType[T]) Pull_tag_only(key string, f func(T) (disable bool)) {
m.m.Register(func(data any) (disable bool) {
if d, ok := data.(Msgq_tag_data); ok && d.Tag == key {
import (
_ "net/http/pprof"
+ "sync"
"testing"
"time"
)
-type test_item struct {
- data string
-}
+// type test_item struct {
+// data string
+// }
// func Test_msgq(t *testing.T) {
// go func() {
// var (
// sig = mq.Sig()
-// data interface{}
+// data any
// )
// for {
// data, sig = mq.Pull(sig)
// go func() {
// var (
// sig = mq.Sig()
-// data interface{}
+// data any
// )
// for {
// data, sig = mq.Pull(sig)
// go func() {
// var (
// sig = mq.Sig()
-// data interface{}
+// data any
// )
// for {
// data, sig = mq.Pull(sig)
}
}
+func Test_msgq1(t *testing.T) {
+ mq := New()
+ c := make(chan time.Time, 10)
+ mq.Pull_tag(map[string]func(any) bool{
+ `A1`: func(data any) bool {
+ if v, ok := data.(time.Time); ok {
+ c <- v
+ time.Sleep(time.Second)
+ }
+ return false
+ },
+ })
+
+ {
+ var w sync.WaitGroup
+ w.Add(2)
+ go func() {
+ mq.Push_tag(`A1`, time.Now())
+ w.Done()
+ }()
+ go func() {
+ time.Sleep(time.Millisecond * 100)
+ mq.Push_tag(`A1`, time.Now())
+ w.Done()
+ }()
+ w.Wait()
+ if t1 := time.Now().Add(-time.Second).Add(-time.Millisecond * 100).Sub(<-c).Milliseconds(); t1 > 50 {
+ t.Fatal(t1)
+ }
+ if t1 := time.Now().Add(-time.Second).Sub(<-c).Milliseconds(); t1 > 50 {
+ t.Fatal(t1)
+ }
+ }
+
+ {
+ var w sync.WaitGroup
+ w.Add(2)
+ go func() {
+ mq.PushLock_tag(`A1`, time.Now())
+ w.Done()
+ }()
+ go func() {
+ time.Sleep(time.Millisecond * 100)
+ mq.PushLock_tag(`A1`, time.Now())
+ w.Done()
+ }()
+ w.Wait()
+ if t1 := time.Now().Add(-2 * time.Second).Sub(<-c).Milliseconds(); t1 > 50 {
+ t.Fatal(t1)
+ }
+ if t1 := time.Now().Add(-2 * time.Second).Add(-time.Millisecond * 100).Sub(<-c).Milliseconds(); t1 > 50 {
+ t.Fatal(t1)
+ }
+ }
+}
+
func Test_msgq2(t *testing.T) {
mq := New()
mq := New()
mun_c := make(chan int, 100)
- mq.Pull_tag(map[string]func(interface{}) bool{
- `A1`: func(data interface{}) bool {
+ mq.Pull_tag(map[string]func(any) bool{
+ `A1`: func(data any) bool {
if v, ok := data.(int); ok {
mun_c <- v
}
mun_c1 := make(chan bool, 100)
mun_c2 := make(chan bool, 100)
mun_c3 := make(chan bool, 100)
- mq.Pull_tag(map[string]func(interface{}) bool{
- `A1`: func(data interface{}) bool {
+ mq.Pull_tag(map[string]func(any) bool{
+ `A1`: func(data any) bool {
if v, ok := data.(string); !ok || v != `a11` {
t.Error(`1`)
}
mun_c1 <- true
return false
},
- `A2`: func(data interface{}) bool {
+ `A2`: func(data any) bool {
if v, ok := data.(string); !ok || v != `a11` {
t.Error(`2`)
}
mun_c2 <- true
return false
},
- `Error`: func(data interface{}) bool {
+ `Error`: func(data any) bool {
if data == nil {
t.Error(`out of list`)
}
return false
},
})
- mq.Pull_tag(map[string]func(interface{}) bool{
- `A1`: func(data interface{}) bool {
+ mq.Pull_tag(map[string]func(any) bool{
+ `A1`: func(data any) bool {
if v, ok := data.(string); !ok || v != `a11` {
t.Error(`2`)
}
mun_c3 <- true
return false
},
- `Error`: func(data interface{}) bool {
+ `Error`: func(data any) bool {
if data == nil {
t.Error(`out of list`)
}
mun_c1 := make(chan bool, 100)
mun_c2 := make(chan bool, 100)
- go mq.Pull_tag(map[string]func(interface{}) bool{
- `A1`: func(data interface{}) bool {
+ go mq.Pull_tag(map[string]func(any) bool{
+ `A1`: func(_ any) bool {
time.Sleep(time.Second) //will block
return false
},
- `A2`: func(data interface{}) bool {
+ `A2`: func(data any) bool {
if v, ok := data.(string); !ok || v != `a11` {
t.Error(`2`)
}
mun_c2 <- true
return false
},
- `Error`: func(data interface{}) bool {
+ `Error`: func(data any) bool {
if data == nil {
t.Error(`out of list`)
}
return false
},
})
- mq.Pull_tag(map[string]func(interface{}) bool{
- `A1`: func(data interface{}) bool {
+ mq.Pull_tag(map[string]func(any) bool{
+ `A1`: func(data any) bool {
if v, ok := data.(string); !ok || v != `a11` {
t.Error(`1`)
}
mun_c1 <- true
return false
},
- `A2`: func(data interface{}) bool {
+ `A2`: func(data any) bool {
if v, ok := data.(string); !ok || v != `a11` {
t.Error(`2`)
}
return false
},
- `Error`: func(data interface{}) bool {
+ `Error`: func(data any) bool {
if data == nil {
t.Error(`out of list`)
}
// func Test_msgq6(t *testing.T) {
// mq := New()
-// go mq.Pull_tag(map[string]func(interface{}) bool{
-// `A1`: func(data interface{}) bool {
+// go mq.Pull_tag(map[string]func(any) bool{
+// `A1`: func(data any) bool {
// return false
// },
-// `A2`: func(data interface{}) bool {
+// `A2`: func(data any) bool {
// if v, ok := data.(string); !ok || v != `a11` {
// t.Error(`2`)
// }
// return false
// },
-// `Error`: func(data interface{}) bool {
+// `Error`: func(data any) bool {
// if data == nil {
// t.Error(`out of list`)
// }