]> 127.0.0.1 Git - part/.git/commitdiff
Pull_tag_async_order v0.28.20240402154002
authorqydysky <qydysky@foxmail.com>
Tue, 2 Apr 2024 15:34:30 +0000 (23:34 +0800)
committerqydysky <qydysky@foxmail.com>
Tue, 2 Apr 2024 15:34:30 +0000 (23:34 +0800)
msgq/Msgq.go
msgq/Msgq_test.go

index bb7599894f5349c5e1c24da120913a306a3532a4..e39875f912c59cd443b26b8ddfcd33138223b7b1 100644 (file)
@@ -27,6 +27,7 @@ type msgqItem struct {
 }
 
 type FuncMap map[string]func(any) (disable bool)
+type FuncMapType[T any] map[string]func(T) (disable bool)
 
 // to[0]:timeout to wait to[1]:timeout to run
 func New(to ...time.Duration) *Msgq {
@@ -323,6 +324,37 @@ func (m *Msgq) Pull_tag_async(func_map map[string]func(any) (disable bool)) (can
        return m.register_front(&mi)
 }
 
+func (m *Msgq) Pull_tag_async_order(func_map map[string]func(any) (disable bool)) (cancel func()) {
+       var mi = msgqItem{}
+       var order atomic.Pointer[orderFunc]
+       order.Store(&orderFunc{
+               f:    func() {},
+               next: &orderFunc{},
+       })
+       var f = func(data any) bool {
+               if d, ok := data.(*Msgq_tag_data); ok {
+                       if f, ok := func_map[d.Tag]; ok {
+                               p := order.Load()
+                               p.f = func() {
+                                       if f(d.Data) {
+                                               mi.disable.Store(true)
+                                               m.someNeedRemove.Store(true)
+                                       }
+                                       if p.next.f != nil {
+                                               p.next.f()
+                                       }
+                               }
+                               p.next = &orderFunc{}
+                               order.Store(p.next)
+                               p.f()
+                       }
+               }
+               return false
+       }
+       mi.f = &f
+       return m.register_front(&mi)
+}
+
 type MsgType[T any] struct {
        m *Msgq
 }
@@ -528,3 +560,39 @@ func (m *MsgType[T]) Pull_tag_async(func_map map[string]func(T) (disable bool))
        mi.f = &f
        return m.m.register_front(&mi)
 }
+
+type orderFunc struct {
+       f    func()
+       next *orderFunc
+}
+
+func (m *MsgType[T]) Pull_tag_async_order(func_map map[string]func(T) (disable bool)) (cancel func()) {
+       var mi = msgqItem{}
+       var order atomic.Pointer[orderFunc]
+       order.Store(&orderFunc{
+               f:    func() {},
+               next: &orderFunc{},
+       })
+       var f = func(data any) bool {
+               if d, ok := data.(*MsgType_tag_data[T]); ok {
+                       if f, ok := func_map[d.Tag]; ok {
+                               p := order.Load()
+                               p.f = func() {
+                                       if f(*d.Data) {
+                                               mi.disable.Store(true)
+                                               m.m.someNeedRemove.Store(true)
+                                       }
+                                       if p.next.f != nil {
+                                               p.next.f()
+                                       }
+                               }
+                               p.next = &orderFunc{}
+                               order.Store(p.next)
+                               p.f()
+                       }
+               }
+               return false
+       }
+       mi.f = &f
+       return m.m.register_front(&mi)
+}
index 3d5102c65d0fefcd9ffa2685eca9bcb7ab843323..45ef7d78fe7a166797806af954a93d2e2580101b 100644 (file)
@@ -165,6 +165,40 @@ func Benchmark_1(b *testing.B) {
        }
 }
 
+func Benchmark_3(b *testing.B) {
+       mq := NewType[int]()
+       mq.Pull_tag_async_order(FuncMapType[int]{
+               `del`: func(a int) (disable bool) {
+                       return false
+               },
+       })
+       b.ResetTimer()
+       for i := 0; i < b.N; i++ {
+               mq.Push_tag(`del`, i)
+       }
+}
+
+func Test_5(t *testing.T) {
+       c := make(chan int, 1000)
+       mq := NewType[int]()
+       cancel := mq.Pull_tag_async_order(FuncMapType[int]{
+               `del`: func(a int) (disable bool) {
+                       c <- a
+                       return false
+               },
+       })
+       time.Sleep(time.Millisecond * 500)
+       for i := 0; i < 100; i++ {
+               mq.Push_tag(`del`, i)
+       }
+       cancel()
+       for i := 0; i < 100; i++ {
+               if i != <-c {
+                       t.FailNow()
+               }
+       }
+}
+
 func Test_4(t *testing.T) {
        mq := New()
        cancel := mq.Pull_tag(FuncMap{