]> 127.0.0.1 Git - part/.git/commitdiff
Add v0.26.4
authorqydysky <qydysky@foxmail.com>
Sat, 13 May 2023 13:16:14 +0000 (21:16 +0800)
committerqydysky <qydysky@foxmail.com>
Sat, 13 May 2023 13:16:14 +0000 (21:16 +0800)
msgq/Msgq.go
msgq/Msgq_test.go

index e1eb58835f0c226c699d92abf57be6f3b2862eff..cbe8ab3d03ee4db9b7029e89b74b74e91867cabb 100644 (file)
@@ -4,14 +4,15 @@ import (
        "container/list"
        "context"
        "runtime"
-       "sync"
        "sync/atomic"
        "time"
 
        signal "github.com/qydysky/part/signal"
+       sync "github.com/qydysky/part/sync"
 )
 
 type Msgq struct {
+       to             []time.Duration
        funcs          *list.List
        someNeedRemove atomic.Int32
        lock           sync.RWMutex
@@ -25,16 +26,27 @@ func New() *Msgq {
        return m
 }
 
+func NewTo(to ...time.Duration) *Msgq {
+       m := new(Msgq)
+       m.funcs = list.New()
+       if len(to) > 0 {
+               m.to = append(m.to, to[0])
+       } else {
+               m.to = append(m.to, time.Second*30)
+       }
+       return m
+}
+
 func (m *Msgq) Register(f func(any) (disable bool)) {
-       m.lock.Lock()
+       ul := m.lock.Lock(m.to...)
        m.funcs.PushBack(f)
-       m.lock.Unlock()
+       ul()
 }
 
 func (m *Msgq) Register_front(f func(any) (disable bool)) {
-       m.lock.Lock()
+       ul := m.lock.Lock(m.to...)
        m.funcs.PushFront(f)
-       m.lock.Unlock()
+       ul()
 }
 
 func (m *Msgq) Push(msg any) {
@@ -45,22 +57,22 @@ func (m *Msgq) Push(msg any) {
 
        var removes []*list.Element
 
-       m.lock.RLock()
+       ul := m.lock.RLock(m.to...)
        for el := m.funcs.Front(); el != nil; el = el.Next() {
                if disable := el.Value.(func(any) bool)(msg); disable {
                        m.someNeedRemove.Add(1)
                        removes = append(removes, el)
                }
        }
-       m.lock.RUnlock()
+       ul()
 
        if len(removes) != 0 {
-               m.lock.Lock()
+               ul := m.lock.Lock(m.to...)
                m.someNeedRemove.Add(-int32(len(removes)))
                for i := 0; i < len(removes); i++ {
                        m.funcs.Remove(removes[i])
                }
-               m.lock.Unlock()
+               ul()
        }
 }
 
@@ -70,8 +82,8 @@ func (m *Msgq) PushLock(msg any) {
                runtime.Gosched()
        }
 
-       m.lock.Lock()
-       defer m.lock.Unlock()
+       ul := m.lock.Lock(m.to...)
+       defer ul()
 
        var removes []*list.Element
 
@@ -96,8 +108,8 @@ func (m *Msgq) ClearAll() {
                runtime.Gosched()
        }
 
-       m.lock.Lock()
-       defer m.lock.Unlock()
+       ul := m.lock.Lock(m.to...)
+       defer ul()
 
        var removes []*list.Element
 
@@ -222,6 +234,12 @@ func NewType[T any]() *MsgType[T] {
        }
 }
 
+func NewTypeTo[T any](to ...time.Duration) *MsgType[T] {
+       return &MsgType[T]{
+               m: NewTo(to...),
+       }
+}
+
 func (m *MsgType[T]) Push_tag(Tag string, Data T) {
        m.m.Push(Msgq_tag_data{
                Tag:  Tag,
index 037ea9e7e31130872af77cada4f56fa38a8b1e1b..202bf23aef2fa766df5b795605a22981ed8872e3 100644 (file)
@@ -139,6 +139,21 @@ func BenchmarkXxx(b *testing.B) {
        }
 }
 
+func TestPushLock(t *testing.T) {
+       defer func() {
+               if e := recover(); e.(string) != "timeout to wait rlock, rlc:1" {
+                       t.Fatal(e)
+               }
+       }()
+       mq := NewTo(time.Second)
+       mq.Pull_tag_only(`test`, func(a any) (disable bool) {
+               mq.PushLock_tag(`lock`, nil)
+               return false
+       })
+       mq.Push_tag(`test`, nil)
+       t.Fatal()
+}
+
 func Test_Pull_tag_chan(t *testing.T) {
        mq := New()
        ctx, cf := context.WithCancel(context.Background())