}
func (m *Msgq) Pull_tag_async_only(key string, f func(any) (disable bool)) {
- var disable = false
+ var disable = signal.Init()
m.Register_front(func(data any) bool {
- if disable {
+ if !disable.Islive() {
return true
}
if d, ok := data.(Msgq_tag_data); ok && d.Tag == key {
- go func(t *bool) {
- *t = f(d.Data)
- }(&disable)
+ go func() {
+ if f(d.Data) {
+ disable.Done()
+ }
+ }()
}
return false
})
}
func (m *Msgq) Pull_tag_async(func_map map[string]func(any) (disable bool)) {
- var disable = false
+ var disable = signal.Init()
m.Register_front(func(data any) bool {
- if disable {
+ if !disable.Islive() {
return true
}
if d, ok := data.(Msgq_tag_data); ok {
if f, ok := func_map[d.Tag]; ok {
- go func(t *bool) {
- *t = f(d.Data)
- }(&disable)
+ go func() {
+ if f(d.Data) {
+ disable.Done()
+ }
+ }()
}
}
return false
}
func (m *MsgType[T]) Pull_tag_async(func_map map[string]func(T) (disable bool)) {
- var disable = false
+ var disable = signal.Init()
m.m.Register_front(func(data any) bool {
- if disable {
+ if !disable.Islive() {
return true
}
if d, ok := data.(Msgq_tag_data); ok {
if f, ok := func_map[d.Tag]; ok {
- go func(t *bool) {
- *t = f(d.Data.(T))
- }(&disable)
+ go func() {
+ if f(d.Data.(T)) {
+ disable.Done()
+ }
+ }()
}
}
return false
package part
import (
- "fmt"
_ "net/http/pprof"
"testing"
"time"
}
func Test_msgq8(t *testing.T) {
- var c = make(chan string, 100)
- var cc string
msg := NewType[int]()
msg.Pull_tag_async_only(`1`, func(i int) (disable bool) {
- time.Sleep(time.Second)
- c <- fmt.Sprintf("a%d", i)
+ if i > 4 {
+ t.Fatal(i)
+ }
return i > 3
})
msg.Pull_tag_only(`1`, func(i int) (disable bool) {
- time.Sleep(time.Second)
- c <- fmt.Sprintf("s%d", i)
+ if i > 6 {
+ t.Fatal(i)
+ }
return i > 5
})
for i := 0; i < 20; i++ {
msg.Push_tag(`1`, i)
+ time.Sleep(time.Millisecond * 20)
}
time.Sleep(time.Second)
- for len(c) != 0 {
- cc += <-c
- }
- if cc != "a0s0a1s1a2s2a3s3a4s4s5s6" {
- t.Fatal()
- }
}
// func Test_msgq6(t *testing.T) {