"container/list"
"context"
"fmt"
- "runtime"
- "sync"
"sync/atomic"
"time"
"unsafe"
)
type Msgq struct {
- to []time.Duration
- funcs *list.List
- someNeedRemove atomic.Int32
- lock sync.RWMutex
- runTag psync.Map
+ to []time.Duration
+ funcs *list.List
+
+ call atomic.Int64
+ removeList []*list.Element
+ removelock psync.RWMutex
+
+ lock psync.RWMutex
+ runTag psync.Map
}
type FuncMap map[string]func(any) (disable bool)
return m
}
-func NewTo(to ...time.Duration) *Msgq {
+func NewTo(waitTo time.Duration, runTo ...time.Duration) *Msgq {
fmt.Println("Warn: NewTo is slow, consider New")
m := new(Msgq)
m.funcs = list.New()
- m.to = to
+ m.to = append([]time.Duration{waitTo}, runTo...)
return m
}
func (m *Msgq) Register(f func(any) (disable bool)) {
- m.lock.Lock()
+ ul := m.lock.Lock(m.to...)
m.funcs.PushBack(f)
- m.lock.Unlock()
+ ul()
}
func (m *Msgq) Register_front(f func(any) (disable bool)) {
- m.lock.Lock()
+ ul := m.lock.Lock(m.to...)
m.funcs.PushFront(f)
- m.lock.Unlock()
+ ul()
}
func (m *Msgq) Push(msg any) {
- for m.someNeedRemove.Load() != 0 {
- time.Sleep(time.Millisecond)
- runtime.Gosched()
- }
+ isfirst := m.call.Add(1)
- var removes []*list.Element
-
- m.lock.RLock()
+ ul := m.lock.RLock(m.to...)
for el := m.funcs.Front(); el != nil; el = el.Next() {
if disable := el.Value.(func(any) bool)(msg); disable {
- m.someNeedRemove.Add(1)
- removes = append(removes, el)
+ rul := m.removelock.Lock()
+ m.removeList = append(m.removeList, el)
+ rul()
}
}
- m.lock.RUnlock()
+ ul()
- if len(removes) != 0 {
- m.lock.Lock()
- m.someNeedRemove.Add(-int32(len(removes)))
- for i := 0; i < len(removes); i++ {
- m.funcs.Remove(removes[i])
+ if isfirst == 1 {
+ rul := m.removelock.Lock()
+ for i := 0; i < len(m.removeList); i++ {
+ m.funcs.Remove(m.removeList[i])
}
- m.lock.Unlock()
+ rul()
}
+
+ m.call.Add(-1)
}
func (m *Msgq) PushLock(msg any) {
- for m.someNeedRemove.Load() != 0 {
- time.Sleep(time.Millisecond)
- runtime.Gosched()
- }
+ isfirst := m.call.Add(1)
- m.lock.Lock()
- defer m.lock.Unlock()
-
- var removes []*list.Element
+ ul := m.lock.Lock(m.to...)
+ defer ul()
for el := m.funcs.Front(); el != nil; el = el.Next() {
if disable := el.Value.(func(any) bool)(msg); disable {
- m.someNeedRemove.Add(1)
- removes = append(removes, el)
+ rul := m.removelock.Lock()
+ m.removeList = append(m.removeList, el)
+ rul()
}
}
- if len(removes) != 0 {
- m.someNeedRemove.Add(-int32(len(removes)))
- for i := 0; i < len(removes); i++ {
- m.funcs.Remove(removes[i])
+ if isfirst == 1 {
+ rul := m.removelock.Lock()
+ for i := 0; i < len(m.removeList); i++ {
+ m.funcs.Remove(m.removeList[i])
}
+ rul()
}
+
+ m.call.Add(-1)
}
func (m *Msgq) ClearAll() {
- for m.someNeedRemove.Load() != 0 {
- time.Sleep(time.Millisecond)
- runtime.Gosched()
- }
-
- m.lock.Lock()
- defer m.lock.Unlock()
-
- var removes []*list.Element
+ isfirst := m.call.Add(1)
+ rul := m.removelock.Lock()
for el := m.funcs.Front(); el != nil; el = el.Next() {
- m.someNeedRemove.Add(1)
- removes = append(removes, el)
+ m.removeList = append(m.removeList, el)
}
+ rul()
- if len(removes) != 0 {
- m.someNeedRemove.Add(-int32(len(removes)))
- for i := 0; i < len(removes); i++ {
- m.funcs.Remove(removes[i])
+ if isfirst == 1 {
+ rul := m.removelock.Lock()
+ for i := 0; i < len(m.removeList); i++ {
+ m.funcs.Remove(m.removeList[i])
}
+ rul()
}
+
+ m.call.Add(-1)
}
type Msgq_tag_data struct {
Data any
}
+// 不能放置在由PushLock_tag调用的同步Pull中
func (m *Msgq) Push_tag(Tag string, Data any) {
if len(m.to) > 0 {
ptr := uintptr(unsafe.Pointer(&Data))
})
}
+// 不能放置在由Push_tag、PushLock_tag调用的同步Pull中
func (m *Msgq) PushLock_tag(Tag string, Data any) {
if len(m.to) > 0 {
ptr := uintptr(unsafe.Pointer(&Data))
}
type MsgType[T any] struct {
- to []time.Duration
- funcs *list.List
- someNeedRemove atomic.Int32
- lock sync.RWMutex
- runTag psync.Map
+ to []time.Duration
+ funcs *list.List
+
+ call atomic.Int64
+ removeList []*list.Element
+ removelock psync.RWMutex
+
+ lock psync.RWMutex
+ runTag psync.Map
}
type MsgType_tag_data[T any] struct {
return m
}
-func NewTypeTo[T any](to ...time.Duration) *MsgType[T] {
+func NewTypeTo[T any](waitTo time.Duration, runTo ...time.Duration) *MsgType[T] {
fmt.Println("Warn: NewTypeTo[T any] is slow, consider NewType[T any]")
m := new(MsgType[T])
m.funcs = list.New()
- m.to = to
+ m.to = append([]time.Duration{waitTo}, runTo...)
return m
}
func (m *MsgType[T]) push(msg MsgType_tag_data[T]) {
- for m.someNeedRemove.Load() != 0 {
- time.Sleep(time.Millisecond)
- runtime.Gosched()
- }
+ isfirst := m.call.Add(1)
- var removes []*list.Element
-
- m.lock.RLock()
+ ul := m.lock.RLock(m.to...)
for el := m.funcs.Front(); el != nil; el = el.Next() {
if disable := el.Value.(func(MsgType_tag_data[T]) bool)(msg); disable {
- m.someNeedRemove.Add(1)
- removes = append(removes, el)
+ rul := m.removelock.Lock()
+ m.removeList = append(m.removeList, el)
+ rul()
}
}
- m.lock.RUnlock()
+ ul()
- if len(removes) != 0 {
- m.lock.Lock()
- m.someNeedRemove.Add(-int32(len(removes)))
- for i := 0; i < len(removes); i++ {
- m.funcs.Remove(removes[i])
+ if isfirst == 1 {
+ rul := m.removelock.Lock()
+ for i := 0; i < len(m.removeList); i++ {
+ m.funcs.Remove(m.removeList[i])
}
- m.lock.Unlock()
+ rul()
}
+
+ m.call.Add(-1)
}
func (m *MsgType[T]) pushLock(msg MsgType_tag_data[T]) {
- for m.someNeedRemove.Load() != 0 {
- time.Sleep(time.Millisecond)
- runtime.Gosched()
- }
+ isfirst := m.call.Add(1)
- m.lock.Lock()
- defer m.lock.Unlock()
-
- var removes []*list.Element
+ ul := m.lock.Lock(m.to...)
+ defer ul()
for el := m.funcs.Front(); el != nil; el = el.Next() {
if disable := el.Value.(func(MsgType_tag_data[T]) bool)(msg); disable {
- m.someNeedRemove.Add(1)
- removes = append(removes, el)
+ rul := m.removelock.Lock()
+ m.removeList = append(m.removeList, el)
+ rul()
}
}
- if len(removes) != 0 {
- m.someNeedRemove.Add(-int32(len(removes)))
- for i := 0; i < len(removes); i++ {
- m.funcs.Remove(removes[i])
+ if isfirst == 1 {
+ rul := m.removelock.Lock()
+ for i := 0; i < len(m.removeList); i++ {
+ m.funcs.Remove(m.removeList[i])
}
+ rul()
}
+
+ m.call.Add(-1)
}
func (m *MsgType[T]) register(f func(MsgType_tag_data[T]) (disable bool)) {
- m.lock.Lock()
+ ul := m.lock.Lock(m.to...)
m.funcs.PushBack(f)
- m.lock.Unlock()
+ ul()
}
func (m *MsgType[T]) register_front(f func(MsgType_tag_data[T]) (disable bool)) {
- m.lock.Lock()
+ ul := m.lock.Lock(m.to...)
m.funcs.PushFront(f)
- m.lock.Unlock()
+ ul()
}
+// 不能放置在由PushLock_tag调用的同步Pull中
func (m *MsgType[T]) Push_tag(Tag string, Data T) {
if len(m.to) > 0 {
ptr := uintptr(unsafe.Pointer(&Data))
})
}
+// 不能放置在由Push_tag、PushLock_tag调用的同步Pull中
func (m *MsgType[T]) PushLock_tag(Tag string, Data T) {
if len(m.to) > 0 {
ptr := uintptr(unsafe.Pointer(&Data))
}
func (m *MsgType[T]) ClearAll() {
- for m.someNeedRemove.Load() != 0 {
- time.Sleep(time.Millisecond)
- runtime.Gosched()
- }
-
- m.lock.Lock()
- defer m.lock.Unlock()
-
- var removes []*list.Element
+ isfirst := m.call.Add(1)
+ rul := m.removelock.Lock()
for el := m.funcs.Front(); el != nil; el = el.Next() {
- m.someNeedRemove.Add(1)
- removes = append(removes, el)
+ m.removeList = append(m.removeList, el)
}
+ rul()
- if len(removes) != 0 {
- m.someNeedRemove.Add(-int32(len(removes)))
- for i := 0; i < len(removes); i++ {
- m.funcs.Remove(removes[i])
+ if isfirst == 1 {
+ rul := m.removelock.Lock()
+ for i := 0; i < len(m.removeList); i++ {
+ m.funcs.Remove(m.removeList[i])
}
+ rul()
}
+
+ m.call.Add(-1)
}
func (m *MsgType[T]) Pull_tag_chan(key string, size int, ctx context.Context) <-chan T {
--- /dev/null
+package part
+
+import (
+ "fmt"
+ "runtime"
+ "sync/atomic"
+ "time"
+)
+
+const (
+ lock = -1
+ ulock = 0
+)
+
+type RWMutex struct {
+ rlc atomic.Int32
+ wantRead atomic.Bool
+ wantWrite atomic.Bool
+}
+
+// to[0]: wait lock timeout to[1]: run lock timeout
+//
+// 不要在Rlock内设置变量,有DATA RACE风险
+func (m *RWMutex) RLock(to ...time.Duration) (unlockf func()) {
+ m.wantRead.Store(true)
+ var callC atomic.Bool
+ if len(to) > 0 {
+ var calls []string
+ if len(to) > 1 {
+ for i := 1; true; i++ {
+ if pc, file, line, ok := runtime.Caller(i); !ok {
+ break
+ } else {
+ calls = append(calls, fmt.Sprintf("%s\n\t%s:%d", runtime.FuncForPC(pc).Name(), file, line))
+ }
+ }
+ }
+ c := time.Now()
+ for m.rlc.Load() < ulock || m.wantWrite.Load() {
+ if time.Since(c) > to[0] {
+ panic(fmt.Sprintf("timeout to wait lock while rlocking, rlc:%d", m.rlc.Load()))
+ }
+ runtime.Gosched()
+ }
+ if len(to) > 1 {
+ time.AfterFunc(to[1], func() {
+ if !callC.Load() {
+ panicS := fmt.Sprintf("timeout to run rlock %v > %v\n", time.Since(c), to[0])
+ for i := 0; i < len(calls); i++ {
+ panicS += fmt.Sprintf("call by %s\n", calls[i])
+ }
+ panic(panicS)
+ }
+ })
+ }
+ } else {
+ for m.rlc.Load() < ulock || m.wantWrite.Load() {
+ runtime.Gosched()
+ }
+ }
+ m.rlc.Add(1)
+ return func() {
+ if !callC.CompareAndSwap(false, true) {
+ panic("had unrlock")
+ }
+ if m.rlc.Add(-1) == ulock {
+ m.wantRead.Store(false)
+ }
+ }
+}
+
+// to[0]: wait lock timeout to[1]: run lock timeout
+func (m *RWMutex) Lock(to ...time.Duration) (unlockf func()) {
+ m.wantWrite.Store(true)
+ var callC atomic.Bool
+ if len(to) > 0 {
+ var calls []string
+ if len(to) > 1 {
+ for i := 1; true; i++ {
+ if pc, file, line, ok := runtime.Caller(i); !ok {
+ break
+ } else {
+ calls = append(calls, fmt.Sprintf("%s\n\t%s:%d", runtime.FuncForPC(pc).Name(), file, line))
+ }
+ }
+ }
+ c := time.Now()
+ for m.rlc.Load() != ulock || m.wantRead.Load() {
+ if time.Since(c) > to[0] {
+ panic(fmt.Sprintf("timeout to wait rlock while locking, rlc:%d", m.rlc.Load()))
+ }
+ runtime.Gosched()
+ }
+ if len(to) > 1 {
+ time.AfterFunc(to[1], func() {
+ if !callC.Load() {
+ panicS := fmt.Sprintf("timeout to run lock %v > %v\n", time.Since(c), to[0])
+ for i := 0; i < len(calls); i++ {
+ panicS += fmt.Sprintf("call by %s\n", calls[i])
+ }
+ panic(panicS)
+ }
+ })
+ }
+ } else {
+ for m.rlc.Load() != ulock || m.wantRead.Load() {
+ runtime.Gosched()
+ }
+ }
+ m.rlc.Add(-1)
+ return func() {
+ if !callC.CompareAndSwap(false, true) {
+ panic("had unlock")
+ }
+ if m.rlc.Add(1) == ulock {
+ m.wantWrite.Store(false)
+ }
+ }
+}