]> 127.0.0.1 Git - part/.git/commitdiff
add v0.27.6
authorqydysky <qydysky@foxmail.com>
Wed, 17 May 2023 14:14:30 +0000 (22:14 +0800)
committerqydysky <qydysky@foxmail.com>
Wed, 17 May 2023 14:14:30 +0000 (22:14 +0800)
msgq/Msgq.go
msgq/Msgq_test.go
sync/RWMutex.go [new file with mode: 0644]
sync/RWMutex_test.go [new file with mode: 0644]

index 719d834356589b36f5781b9ff69f2cc2417e938a..80dc6e5f02298aa644b589ec7d4c0c67f6f04736 100644 (file)
@@ -4,8 +4,6 @@ import (
        "container/list"
        "context"
        "fmt"
-       "runtime"
-       "sync"
        "sync/atomic"
        "time"
        "unsafe"
@@ -15,11 +13,15 @@ import (
 )
 
 type Msgq struct {
-       to             []time.Duration
-       funcs          *list.List
-       someNeedRemove atomic.Int32
-       lock           sync.RWMutex
-       runTag         psync.Map
+       to    []time.Duration
+       funcs *list.List
+
+       call       atomic.Int64
+       removeList []*list.Element
+       removelock psync.RWMutex
+
+       lock   psync.RWMutex
+       runTag psync.Map
 }
 
 type FuncMap map[string]func(any) (disable bool)
@@ -30,101 +32,93 @@ func New() *Msgq {
        return m
 }
 
-func NewTo(to ...time.Duration) *Msgq {
+func NewTo(waitTo time.Duration, runTo ...time.Duration) *Msgq {
        fmt.Println("Warn: NewTo is slow, consider New")
        m := new(Msgq)
        m.funcs = list.New()
-       m.to = to
+       m.to = append([]time.Duration{waitTo}, runTo...)
        return m
 }
 
 func (m *Msgq) Register(f func(any) (disable bool)) {
-       m.lock.Lock()
+       ul := m.lock.Lock(m.to...)
        m.funcs.PushBack(f)
-       m.lock.Unlock()
+       ul()
 }
 
 func (m *Msgq) Register_front(f func(any) (disable bool)) {
-       m.lock.Lock()
+       ul := m.lock.Lock(m.to...)
        m.funcs.PushFront(f)
-       m.lock.Unlock()
+       ul()
 }
 
 func (m *Msgq) Push(msg any) {
-       for m.someNeedRemove.Load() != 0 {
-               time.Sleep(time.Millisecond)
-               runtime.Gosched()
-       }
+       isfirst := m.call.Add(1)
 
-       var removes []*list.Element
-
-       m.lock.RLock()
+       ul := m.lock.RLock(m.to...)
        for el := m.funcs.Front(); el != nil; el = el.Next() {
                if disable := el.Value.(func(any) bool)(msg); disable {
-                       m.someNeedRemove.Add(1)
-                       removes = append(removes, el)
+                       rul := m.removelock.Lock()
+                       m.removeList = append(m.removeList, el)
+                       rul()
                }
        }
-       m.lock.RUnlock()
+       ul()
 
-       if len(removes) != 0 {
-               m.lock.Lock()
-               m.someNeedRemove.Add(-int32(len(removes)))
-               for i := 0; i < len(removes); i++ {
-                       m.funcs.Remove(removes[i])
+       if isfirst == 1 {
+               rul := m.removelock.Lock()
+               for i := 0; i < len(m.removeList); i++ {
+                       m.funcs.Remove(m.removeList[i])
                }
-               m.lock.Unlock()
+               rul()
        }
+
+       m.call.Add(-1)
 }
 
 func (m *Msgq) PushLock(msg any) {
-       for m.someNeedRemove.Load() != 0 {
-               time.Sleep(time.Millisecond)
-               runtime.Gosched()
-       }
+       isfirst := m.call.Add(1)
 
-       m.lock.Lock()
-       defer m.lock.Unlock()
-
-       var removes []*list.Element
+       ul := m.lock.Lock(m.to...)
+       defer ul()
 
        for el := m.funcs.Front(); el != nil; el = el.Next() {
                if disable := el.Value.(func(any) bool)(msg); disable {
-                       m.someNeedRemove.Add(1)
-                       removes = append(removes, el)
+                       rul := m.removelock.Lock()
+                       m.removeList = append(m.removeList, el)
+                       rul()
                }
        }
 
-       if len(removes) != 0 {
-               m.someNeedRemove.Add(-int32(len(removes)))
-               for i := 0; i < len(removes); i++ {
-                       m.funcs.Remove(removes[i])
+       if isfirst == 1 {
+               rul := m.removelock.Lock()
+               for i := 0; i < len(m.removeList); i++ {
+                       m.funcs.Remove(m.removeList[i])
                }
+               rul()
        }
+
+       m.call.Add(-1)
 }
 
 func (m *Msgq) ClearAll() {
-       for m.someNeedRemove.Load() != 0 {
-               time.Sleep(time.Millisecond)
-               runtime.Gosched()
-       }
-
-       m.lock.Lock()
-       defer m.lock.Unlock()
-
-       var removes []*list.Element
+       isfirst := m.call.Add(1)
 
+       rul := m.removelock.Lock()
        for el := m.funcs.Front(); el != nil; el = el.Next() {
-               m.someNeedRemove.Add(1)
-               removes = append(removes, el)
+               m.removeList = append(m.removeList, el)
        }
+       rul()
 
-       if len(removes) != 0 {
-               m.someNeedRemove.Add(-int32(len(removes)))
-               for i := 0; i < len(removes); i++ {
-                       m.funcs.Remove(removes[i])
+       if isfirst == 1 {
+               rul := m.removelock.Lock()
+               for i := 0; i < len(m.removeList); i++ {
+                       m.funcs.Remove(m.removeList[i])
                }
+               rul()
        }
+
+       m.call.Add(-1)
 }
 
 type Msgq_tag_data struct {
@@ -132,6 +126,7 @@ type Msgq_tag_data struct {
        Data any
 }
 
+// 不能放置在由PushLock_tag调用的同步Pull中
 func (m *Msgq) Push_tag(Tag string, Data any) {
        if len(m.to) > 0 {
                ptr := uintptr(unsafe.Pointer(&Data))
@@ -158,6 +153,7 @@ func (m *Msgq) Push_tag(Tag string, Data any) {
        })
 }
 
+// 不能放置在由Push_tag、PushLock_tag调用的同步Pull中
 func (m *Msgq) PushLock_tag(Tag string, Data any) {
        if len(m.to) > 0 {
                ptr := uintptr(unsafe.Pointer(&Data))
@@ -263,11 +259,15 @@ func (m *Msgq) Pull_tag_async(func_map map[string]func(any) (disable bool)) {
 }
 
 type MsgType[T any] struct {
-       to             []time.Duration
-       funcs          *list.List
-       someNeedRemove atomic.Int32
-       lock           sync.RWMutex
-       runTag         psync.Map
+       to    []time.Duration
+       funcs *list.List
+
+       call       atomic.Int64
+       removeList []*list.Element
+       removelock psync.RWMutex
+
+       lock   psync.RWMutex
+       runTag psync.Map
 }
 
 type MsgType_tag_data[T any] struct {
@@ -281,79 +281,76 @@ func NewType[T any]() *MsgType[T] {
        return m
 }
 
-func NewTypeTo[T any](to ...time.Duration) *MsgType[T] {
+func NewTypeTo[T any](waitTo time.Duration, runTo ...time.Duration) *MsgType[T] {
        fmt.Println("Warn: NewTypeTo[T any] is slow, consider NewType[T any]")
        m := new(MsgType[T])
        m.funcs = list.New()
-       m.to = to
+       m.to = append([]time.Duration{waitTo}, runTo...)
        return m
 }
 
 func (m *MsgType[T]) push(msg MsgType_tag_data[T]) {
-       for m.someNeedRemove.Load() != 0 {
-               time.Sleep(time.Millisecond)
-               runtime.Gosched()
-       }
+       isfirst := m.call.Add(1)
 
-       var removes []*list.Element
-
-       m.lock.RLock()
+       ul := m.lock.RLock(m.to...)
        for el := m.funcs.Front(); el != nil; el = el.Next() {
                if disable := el.Value.(func(MsgType_tag_data[T]) bool)(msg); disable {
-                       m.someNeedRemove.Add(1)
-                       removes = append(removes, el)
+                       rul := m.removelock.Lock()
+                       m.removeList = append(m.removeList, el)
+                       rul()
                }
        }
-       m.lock.RUnlock()
+       ul()
 
-       if len(removes) != 0 {
-               m.lock.Lock()
-               m.someNeedRemove.Add(-int32(len(removes)))
-               for i := 0; i < len(removes); i++ {
-                       m.funcs.Remove(removes[i])
+       if isfirst == 1 {
+               rul := m.removelock.Lock()
+               for i := 0; i < len(m.removeList); i++ {
+                       m.funcs.Remove(m.removeList[i])
                }
-               m.lock.Unlock()
+               rul()
        }
+
+       m.call.Add(-1)
 }
 
 func (m *MsgType[T]) pushLock(msg MsgType_tag_data[T]) {
-       for m.someNeedRemove.Load() != 0 {
-               time.Sleep(time.Millisecond)
-               runtime.Gosched()
-       }
+       isfirst := m.call.Add(1)
 
-       m.lock.Lock()
-       defer m.lock.Unlock()
-
-       var removes []*list.Element
+       ul := m.lock.Lock(m.to...)
+       defer ul()
 
        for el := m.funcs.Front(); el != nil; el = el.Next() {
                if disable := el.Value.(func(MsgType_tag_data[T]) bool)(msg); disable {
-                       m.someNeedRemove.Add(1)
-                       removes = append(removes, el)
+                       rul := m.removelock.Lock()
+                       m.removeList = append(m.removeList, el)
+                       rul()
                }
        }
 
-       if len(removes) != 0 {
-               m.someNeedRemove.Add(-int32(len(removes)))
-               for i := 0; i < len(removes); i++ {
-                       m.funcs.Remove(removes[i])
+       if isfirst == 1 {
+               rul := m.removelock.Lock()
+               for i := 0; i < len(m.removeList); i++ {
+                       m.funcs.Remove(m.removeList[i])
                }
+               rul()
        }
+
+       m.call.Add(-1)
 }
 
 func (m *MsgType[T]) register(f func(MsgType_tag_data[T]) (disable bool)) {
-       m.lock.Lock()
+       ul := m.lock.Lock(m.to...)
        m.funcs.PushBack(f)
-       m.lock.Unlock()
+       ul()
 }
 
 func (m *MsgType[T]) register_front(f func(MsgType_tag_data[T]) (disable bool)) {
-       m.lock.Lock()
+       ul := m.lock.Lock(m.to...)
        m.funcs.PushFront(f)
-       m.lock.Unlock()
+       ul()
 }
 
+// 不能放置在由PushLock_tag调用的同步Pull中
 func (m *MsgType[T]) Push_tag(Tag string, Data T) {
        if len(m.to) > 0 {
                ptr := uintptr(unsafe.Pointer(&Data))
@@ -380,6 +377,7 @@ func (m *MsgType[T]) Push_tag(Tag string, Data T) {
        })
 }
 
+// 不能放置在由Push_tag、PushLock_tag调用的同步Pull中
 func (m *MsgType[T]) PushLock_tag(Tag string, Data T) {
        if len(m.to) > 0 {
                ptr := uintptr(unsafe.Pointer(&Data))
@@ -407,27 +405,23 @@ func (m *MsgType[T]) PushLock_tag(Tag string, Data T) {
 }
 
 func (m *MsgType[T]) ClearAll() {
-       for m.someNeedRemove.Load() != 0 {
-               time.Sleep(time.Millisecond)
-               runtime.Gosched()
-       }
-
-       m.lock.Lock()
-       defer m.lock.Unlock()
-
-       var removes []*list.Element
+       isfirst := m.call.Add(1)
 
+       rul := m.removelock.Lock()
        for el := m.funcs.Front(); el != nil; el = el.Next() {
-               m.someNeedRemove.Add(1)
-               removes = append(removes, el)
+               m.removeList = append(m.removeList, el)
        }
+       rul()
 
-       if len(removes) != 0 {
-               m.someNeedRemove.Add(-int32(len(removes)))
-               for i := 0; i < len(removes); i++ {
-                       m.funcs.Remove(removes[i])
+       if isfirst == 1 {
+               rul := m.removelock.Lock()
+               for i := 0; i < len(m.removeList); i++ {
+                       m.funcs.Remove(m.removeList[i])
                }
+               rul()
        }
+
+       m.call.Add(-1)
 }
 
 func (m *MsgType[T]) Pull_tag_chan(key string, size int, ctx context.Context) <-chan T {
index 7955cf983d66a69ac4089a339a1b2a88106d2251..ee8ee30b3b44f163cedf5f5c71153dee7c71d029 100644 (file)
@@ -164,6 +164,23 @@ func Benchmark_1(b *testing.B) {
        }
 }
 
+func Test_RemoveInPush(t *testing.T) {
+       mq := NewTo(time.Second, time.Second*3)
+       mq.Pull_tag(FuncMap{
+               `r1`: func(a any) (disable bool) {
+                       mq.ClearAll()
+                       return false
+               },
+               `r2`: func(a any) (disable bool) {
+                       return true
+               },
+       })
+       mq.PushLock_tag(`r1`, nil)
+       if mq.funcs.Len() != 0 {
+               t.Fatal()
+       }
+}
+
 func Test_Pull_tag_chan(t *testing.T) {
        mq := New()
        ctx, cf := context.WithCancel(context.Background())
diff --git a/sync/RWMutex.go b/sync/RWMutex.go
new file mode 100644 (file)
index 0000000..ccda57d
--- /dev/null
@@ -0,0 +1,119 @@
+package part
+
+import (
+       "fmt"
+       "runtime"
+       "sync/atomic"
+       "time"
+)
+
+const (
+       lock  = -1
+       ulock = 0
+)
+
+type RWMutex struct {
+       rlc       atomic.Int32
+       wantRead  atomic.Bool
+       wantWrite atomic.Bool
+}
+
+// to[0]: wait lock timeout to[1]: run lock timeout
+//
+// 不要在Rlock内设置变量,有DATA RACE风险
+func (m *RWMutex) RLock(to ...time.Duration) (unlockf func()) {
+       m.wantRead.Store(true)
+       var callC atomic.Bool
+       if len(to) > 0 {
+               var calls []string
+               if len(to) > 1 {
+                       for i := 1; true; i++ {
+                               if pc, file, line, ok := runtime.Caller(i); !ok {
+                                       break
+                               } else {
+                                       calls = append(calls, fmt.Sprintf("%s\n\t%s:%d", runtime.FuncForPC(pc).Name(), file, line))
+                               }
+                       }
+               }
+               c := time.Now()
+               for m.rlc.Load() < ulock || m.wantWrite.Load() {
+                       if time.Since(c) > to[0] {
+                               panic(fmt.Sprintf("timeout to wait lock while rlocking, rlc:%d", m.rlc.Load()))
+                       }
+                       runtime.Gosched()
+               }
+               if len(to) > 1 {
+                       time.AfterFunc(to[1], func() {
+                               if !callC.Load() {
+                                       panicS := fmt.Sprintf("timeout to run rlock %v > %v\n", time.Since(c), to[0])
+                                       for i := 0; i < len(calls); i++ {
+                                               panicS += fmt.Sprintf("call by %s\n", calls[i])
+                                       }
+                                       panic(panicS)
+                               }
+                       })
+               }
+       } else {
+               for m.rlc.Load() < ulock || m.wantWrite.Load() {
+                       runtime.Gosched()
+               }
+       }
+       m.rlc.Add(1)
+       return func() {
+               if !callC.CompareAndSwap(false, true) {
+                       panic("had unrlock")
+               }
+               if m.rlc.Add(-1) == ulock {
+                       m.wantRead.Store(false)
+               }
+       }
+}
+
+// to[0]: wait lock timeout to[1]: run lock timeout
+func (m *RWMutex) Lock(to ...time.Duration) (unlockf func()) {
+       m.wantWrite.Store(true)
+       var callC atomic.Bool
+       if len(to) > 0 {
+               var calls []string
+               if len(to) > 1 {
+                       for i := 1; true; i++ {
+                               if pc, file, line, ok := runtime.Caller(i); !ok {
+                                       break
+                               } else {
+                                       calls = append(calls, fmt.Sprintf("%s\n\t%s:%d", runtime.FuncForPC(pc).Name(), file, line))
+                               }
+                       }
+               }
+               c := time.Now()
+               for m.rlc.Load() != ulock || m.wantRead.Load() {
+                       if time.Since(c) > to[0] {
+                               panic(fmt.Sprintf("timeout to wait rlock while locking, rlc:%d", m.rlc.Load()))
+                       }
+                       runtime.Gosched()
+               }
+               if len(to) > 1 {
+                       time.AfterFunc(to[1], func() {
+                               if !callC.Load() {
+                                       panicS := fmt.Sprintf("timeout to run lock %v > %v\n", time.Since(c), to[0])
+                                       for i := 0; i < len(calls); i++ {
+                                               panicS += fmt.Sprintf("call by %s\n", calls[i])
+                                       }
+                                       panic(panicS)
+                               }
+                       })
+               }
+       } else {
+               for m.rlc.Load() != ulock || m.wantRead.Load() {
+                       runtime.Gosched()
+               }
+       }
+       m.rlc.Add(-1)
+       return func() {
+               if !callC.CompareAndSwap(false, true) {
+                       panic("had unlock")
+               }
+               if m.rlc.Add(1) == ulock {
+                       m.wantWrite.Store(false)
+               }
+       }
+}
diff --git a/sync/RWMutex_test.go b/sync/RWMutex_test.go
new file mode 100644 (file)
index 0000000..67aaaa8
--- /dev/null
@@ -0,0 +1,16 @@
+package part
+
+import (
+       "testing"
+       "time"
+)
+
+func TestMain(t *testing.T) {
+}
+
+func BenchmarkRlock(b *testing.B) {
+       var lock1 RWMutex
+       for i := 0; i < b.N; i++ {
+               lock1.RLock(time.Second, time.Second)()
+       }
+}