From: qydysky <32743305+qydysky@users.noreply.github.com> Date: Tue, 21 Mar 2023 15:08:41 +0000 (+0800) Subject: add X-Git-Tag: v0.24.4 X-Git-Url: http://127.0.0.1:8081/?a=commitdiff_plain;h=e7ef708c096b5522c2d489f7d25dcac673cda715;p=part%2F.git add --- diff --git a/msgq/Msgq.go b/msgq/Msgq.go index 6821f1c..8054e51 100644 --- a/msgq/Msgq.go +++ b/msgq/Msgq.go @@ -64,6 +64,33 @@ func (m *Msgq) Push(msg any) { } } +func (m *Msgq) PushLock(msg any) { + for m.someNeedRemove.Load() != 0 { + time.Sleep(time.Millisecond) + runtime.Gosched() + } + + m.lock.Lock() + defer m.lock.Unlock() + + var removes []*list.Element + + for el := m.funcs.Front(); el != nil; el = el.Next() { + if disable := el.Value.(func(any) bool)(msg); disable { + m.someNeedRemove.Add(1) + removes = append(removes, el) + } + } + + if len(removes) != 0 { + m.someNeedRemove.Add(-int32(len(removes))) + for i := 0; i < len(removes); i++ { + m.funcs.Remove(removes[i]) + } + removes = nil + } +} + type Msgq_tag_data struct { Tag string Data any @@ -76,6 +103,13 @@ func (m *Msgq) Push_tag(Tag string, Data any) { }) } +func (m *Msgq) PushLock_tag(Tag string, Data any) { + m.PushLock(Msgq_tag_data{ + Tag: Tag, + Data: Data, + }) +} + func (m *Msgq) Pull_tag_only(key string, f func(any) (disable bool)) { m.Register(func(data any) (disable bool) { if d, ok := data.(Msgq_tag_data); ok && d.Tag == key { @@ -151,6 +185,13 @@ func (m *MsgType[T]) Push_tag(Tag string, Data T) { }) } +func (m *MsgType[T]) PushLock_tag(Tag string, Data T) { + m.m.Push(Msgq_tag_data{ + Tag: Tag, + Data: Data, + }) +} + func (m *MsgType[T]) Pull_tag_only(key string, f func(T) (disable bool)) { m.m.Register(func(data any) (disable bool) { if d, ok := data.(Msgq_tag_data); ok && d.Tag == key { diff --git a/msgq/Msgq_test.go b/msgq/Msgq_test.go index 2dab262..7920b8d 100644 --- a/msgq/Msgq_test.go +++ b/msgq/Msgq_test.go @@ -2,13 +2,14 @@ package part import ( _ "net/http/pprof" + "sync" "testing" "time" ) -type test_item struct { - data string -} +// type test_item struct { +// data string +// } // func Test_msgq(t *testing.T) { @@ -64,7 +65,7 @@ type test_item struct { // go func() { // var ( // sig = mq.Sig() -// data interface{} +// data any // ) // for { // data, sig = mq.Pull(sig) @@ -77,7 +78,7 @@ type test_item struct { // go func() { // var ( // sig = mq.Sig() -// data interface{} +// data any // ) // for { // data, sig = mq.Pull(sig) @@ -90,7 +91,7 @@ type test_item struct { // go func() { // var ( // sig = mq.Sig() -// data interface{} +// data any // ) // for { // data, sig = mq.Pull(sig) @@ -137,6 +138,62 @@ func BenchmarkXxx(b *testing.B) { } } +func Test_msgq1(t *testing.T) { + mq := New() + c := make(chan time.Time, 10) + mq.Pull_tag(map[string]func(any) bool{ + `A1`: func(data any) bool { + if v, ok := data.(time.Time); ok { + c <- v + time.Sleep(time.Second) + } + return false + }, + }) + + { + var w sync.WaitGroup + w.Add(2) + go func() { + mq.Push_tag(`A1`, time.Now()) + w.Done() + }() + go func() { + time.Sleep(time.Millisecond * 100) + mq.Push_tag(`A1`, time.Now()) + w.Done() + }() + w.Wait() + if t1 := time.Now().Add(-time.Second).Add(-time.Millisecond * 100).Sub(<-c).Milliseconds(); t1 > 50 { + t.Fatal(t1) + } + if t1 := time.Now().Add(-time.Second).Sub(<-c).Milliseconds(); t1 > 50 { + t.Fatal(t1) + } + } + + { + var w sync.WaitGroup + w.Add(2) + go func() { + mq.PushLock_tag(`A1`, time.Now()) + w.Done() + }() + go func() { + time.Sleep(time.Millisecond * 100) + mq.PushLock_tag(`A1`, time.Now()) + w.Done() + }() + w.Wait() + if t1 := time.Now().Add(-2 * time.Second).Sub(<-c).Milliseconds(); t1 > 50 { + t.Fatal(t1) + } + if t1 := time.Now().Add(-2 * time.Second).Add(-time.Millisecond * 100).Sub(<-c).Milliseconds(); t1 > 50 { + t.Fatal(t1) + } + } +} + func Test_msgq2(t *testing.T) { mq := New() @@ -182,8 +239,8 @@ func Test_msgq3(t *testing.T) { mq := New() mun_c := make(chan int, 100) - mq.Pull_tag(map[string]func(interface{}) bool{ - `A1`: func(data interface{}) bool { + mq.Pull_tag(map[string]func(any) bool{ + `A1`: func(data any) bool { if v, ok := data.(int); ok { mun_c <- v } @@ -207,37 +264,37 @@ func Test_msgq4(t *testing.T) { mun_c1 := make(chan bool, 100) mun_c2 := make(chan bool, 100) mun_c3 := make(chan bool, 100) - mq.Pull_tag(map[string]func(interface{}) bool{ - `A1`: func(data interface{}) bool { + mq.Pull_tag(map[string]func(any) bool{ + `A1`: func(data any) bool { if v, ok := data.(string); !ok || v != `a11` { t.Error(`1`) } mun_c1 <- true return false }, - `A2`: func(data interface{}) bool { + `A2`: func(data any) bool { if v, ok := data.(string); !ok || v != `a11` { t.Error(`2`) } mun_c2 <- true return false }, - `Error`: func(data interface{}) bool { + `Error`: func(data any) bool { if data == nil { t.Error(`out of list`) } return false }, }) - mq.Pull_tag(map[string]func(interface{}) bool{ - `A1`: func(data interface{}) bool { + mq.Pull_tag(map[string]func(any) bool{ + `A1`: func(data any) bool { if v, ok := data.(string); !ok || v != `a11` { t.Error(`2`) } mun_c3 <- true return false }, - `Error`: func(data interface{}) bool { + `Error`: func(data any) bool { if data == nil { t.Error(`out of list`) } @@ -268,40 +325,40 @@ func Test_msgq5(t *testing.T) { mun_c1 := make(chan bool, 100) mun_c2 := make(chan bool, 100) - go mq.Pull_tag(map[string]func(interface{}) bool{ - `A1`: func(data interface{}) bool { + go mq.Pull_tag(map[string]func(any) bool{ + `A1`: func(_ any) bool { time.Sleep(time.Second) //will block return false }, - `A2`: func(data interface{}) bool { + `A2`: func(data any) bool { if v, ok := data.(string); !ok || v != `a11` { t.Error(`2`) } mun_c2 <- true return false }, - `Error`: func(data interface{}) bool { + `Error`: func(data any) bool { if data == nil { t.Error(`out of list`) } return false }, }) - mq.Pull_tag(map[string]func(interface{}) bool{ - `A1`: func(data interface{}) bool { + mq.Pull_tag(map[string]func(any) bool{ + `A1`: func(data any) bool { if v, ok := data.(string); !ok || v != `a11` { t.Error(`1`) } mun_c1 <- true return false }, - `A2`: func(data interface{}) bool { + `A2`: func(data any) bool { if v, ok := data.(string); !ok || v != `a11` { t.Error(`2`) } return false }, - `Error`: func(data interface{}) bool { + `Error`: func(data any) bool { if data == nil { t.Error(`out of list`) } @@ -395,17 +452,17 @@ func Test_msgq8(t *testing.T) { // func Test_msgq6(t *testing.T) { // mq := New() -// go mq.Pull_tag(map[string]func(interface{}) bool{ -// `A1`: func(data interface{}) bool { +// go mq.Pull_tag(map[string]func(any) bool{ +// `A1`: func(data any) bool { // return false // }, -// `A2`: func(data interface{}) bool { +// `A2`: func(data any) bool { // if v, ok := data.(string); !ok || v != `a11` { // t.Error(`2`) // } // return false // }, -// `Error`: func(data interface{}) bool { +// `Error`: func(data any) bool { // if data == nil { // t.Error(`out of list`) // }