From: qydysky Date: Wed, 17 May 2023 14:14:30 +0000 (+0800) Subject: add X-Git-Tag: v0.27.6 X-Git-Url: http://127.0.0.1:8081/?a=commitdiff_plain;h=6f828914f7151aa5684c57d1ebafbe042989bc8c;p=part%2F.git add --- diff --git a/msgq/Msgq.go b/msgq/Msgq.go index 719d834..80dc6e5 100644 --- a/msgq/Msgq.go +++ b/msgq/Msgq.go @@ -4,8 +4,6 @@ import ( "container/list" "context" "fmt" - "runtime" - "sync" "sync/atomic" "time" "unsafe" @@ -15,11 +13,15 @@ import ( ) type Msgq struct { - to []time.Duration - funcs *list.List - someNeedRemove atomic.Int32 - lock sync.RWMutex - runTag psync.Map + to []time.Duration + funcs *list.List + + call atomic.Int64 + removeList []*list.Element + removelock psync.RWMutex + + lock psync.RWMutex + runTag psync.Map } type FuncMap map[string]func(any) (disable bool) @@ -30,101 +32,93 @@ func New() *Msgq { return m } -func NewTo(to ...time.Duration) *Msgq { +func NewTo(waitTo time.Duration, runTo ...time.Duration) *Msgq { fmt.Println("Warn: NewTo is slow, consider New") m := new(Msgq) m.funcs = list.New() - m.to = to + m.to = append([]time.Duration{waitTo}, runTo...) return m } func (m *Msgq) Register(f func(any) (disable bool)) { - m.lock.Lock() + ul := m.lock.Lock(m.to...) m.funcs.PushBack(f) - m.lock.Unlock() + ul() } func (m *Msgq) Register_front(f func(any) (disable bool)) { - m.lock.Lock() + ul := m.lock.Lock(m.to...) m.funcs.PushFront(f) - m.lock.Unlock() + ul() } func (m *Msgq) Push(msg any) { - for m.someNeedRemove.Load() != 0 { - time.Sleep(time.Millisecond) - runtime.Gosched() - } + isfirst := m.call.Add(1) - var removes []*list.Element - - m.lock.RLock() + ul := m.lock.RLock(m.to...) for el := m.funcs.Front(); el != nil; el = el.Next() { if disable := el.Value.(func(any) bool)(msg); disable { - m.someNeedRemove.Add(1) - removes = append(removes, el) + rul := m.removelock.Lock() + m.removeList = append(m.removeList, el) + rul() } } - m.lock.RUnlock() + ul() - if len(removes) != 0 { - m.lock.Lock() - m.someNeedRemove.Add(-int32(len(removes))) - for i := 0; i < len(removes); i++ { - m.funcs.Remove(removes[i]) + if isfirst == 1 { + rul := m.removelock.Lock() + for i := 0; i < len(m.removeList); i++ { + m.funcs.Remove(m.removeList[i]) } - m.lock.Unlock() + rul() } + + m.call.Add(-1) } func (m *Msgq) PushLock(msg any) { - for m.someNeedRemove.Load() != 0 { - time.Sleep(time.Millisecond) - runtime.Gosched() - } + isfirst := m.call.Add(1) - m.lock.Lock() - defer m.lock.Unlock() - - var removes []*list.Element + ul := m.lock.Lock(m.to...) + defer ul() for el := m.funcs.Front(); el != nil; el = el.Next() { if disable := el.Value.(func(any) bool)(msg); disable { - m.someNeedRemove.Add(1) - removes = append(removes, el) + rul := m.removelock.Lock() + m.removeList = append(m.removeList, el) + rul() } } - if len(removes) != 0 { - m.someNeedRemove.Add(-int32(len(removes))) - for i := 0; i < len(removes); i++ { - m.funcs.Remove(removes[i]) + if isfirst == 1 { + rul := m.removelock.Lock() + for i := 0; i < len(m.removeList); i++ { + m.funcs.Remove(m.removeList[i]) } + rul() } + + m.call.Add(-1) } func (m *Msgq) ClearAll() { - for m.someNeedRemove.Load() != 0 { - time.Sleep(time.Millisecond) - runtime.Gosched() - } - - m.lock.Lock() - defer m.lock.Unlock() - - var removes []*list.Element + isfirst := m.call.Add(1) + rul := m.removelock.Lock() for el := m.funcs.Front(); el != nil; el = el.Next() { - m.someNeedRemove.Add(1) - removes = append(removes, el) + m.removeList = append(m.removeList, el) } + rul() - if len(removes) != 0 { - m.someNeedRemove.Add(-int32(len(removes))) - for i := 0; i < len(removes); i++ { - m.funcs.Remove(removes[i]) + if isfirst == 1 { + rul := m.removelock.Lock() + for i := 0; i < len(m.removeList); i++ { + m.funcs.Remove(m.removeList[i]) } + rul() } + + m.call.Add(-1) } type Msgq_tag_data struct { @@ -132,6 +126,7 @@ type Msgq_tag_data struct { Data any } +// 不能放置在由PushLock_tag调用的同步Pull中 func (m *Msgq) Push_tag(Tag string, Data any) { if len(m.to) > 0 { ptr := uintptr(unsafe.Pointer(&Data)) @@ -158,6 +153,7 @@ func (m *Msgq) Push_tag(Tag string, Data any) { }) } +// 不能放置在由Push_tag、PushLock_tag调用的同步Pull中 func (m *Msgq) PushLock_tag(Tag string, Data any) { if len(m.to) > 0 { ptr := uintptr(unsafe.Pointer(&Data)) @@ -263,11 +259,15 @@ func (m *Msgq) Pull_tag_async(func_map map[string]func(any) (disable bool)) { } type MsgType[T any] struct { - to []time.Duration - funcs *list.List - someNeedRemove atomic.Int32 - lock sync.RWMutex - runTag psync.Map + to []time.Duration + funcs *list.List + + call atomic.Int64 + removeList []*list.Element + removelock psync.RWMutex + + lock psync.RWMutex + runTag psync.Map } type MsgType_tag_data[T any] struct { @@ -281,79 +281,76 @@ func NewType[T any]() *MsgType[T] { return m } -func NewTypeTo[T any](to ...time.Duration) *MsgType[T] { +func NewTypeTo[T any](waitTo time.Duration, runTo ...time.Duration) *MsgType[T] { fmt.Println("Warn: NewTypeTo[T any] is slow, consider NewType[T any]") m := new(MsgType[T]) m.funcs = list.New() - m.to = to + m.to = append([]time.Duration{waitTo}, runTo...) return m } func (m *MsgType[T]) push(msg MsgType_tag_data[T]) { - for m.someNeedRemove.Load() != 0 { - time.Sleep(time.Millisecond) - runtime.Gosched() - } + isfirst := m.call.Add(1) - var removes []*list.Element - - m.lock.RLock() + ul := m.lock.RLock(m.to...) for el := m.funcs.Front(); el != nil; el = el.Next() { if disable := el.Value.(func(MsgType_tag_data[T]) bool)(msg); disable { - m.someNeedRemove.Add(1) - removes = append(removes, el) + rul := m.removelock.Lock() + m.removeList = append(m.removeList, el) + rul() } } - m.lock.RUnlock() + ul() - if len(removes) != 0 { - m.lock.Lock() - m.someNeedRemove.Add(-int32(len(removes))) - for i := 0; i < len(removes); i++ { - m.funcs.Remove(removes[i]) + if isfirst == 1 { + rul := m.removelock.Lock() + for i := 0; i < len(m.removeList); i++ { + m.funcs.Remove(m.removeList[i]) } - m.lock.Unlock() + rul() } + + m.call.Add(-1) } func (m *MsgType[T]) pushLock(msg MsgType_tag_data[T]) { - for m.someNeedRemove.Load() != 0 { - time.Sleep(time.Millisecond) - runtime.Gosched() - } + isfirst := m.call.Add(1) - m.lock.Lock() - defer m.lock.Unlock() - - var removes []*list.Element + ul := m.lock.Lock(m.to...) + defer ul() for el := m.funcs.Front(); el != nil; el = el.Next() { if disable := el.Value.(func(MsgType_tag_data[T]) bool)(msg); disable { - m.someNeedRemove.Add(1) - removes = append(removes, el) + rul := m.removelock.Lock() + m.removeList = append(m.removeList, el) + rul() } } - if len(removes) != 0 { - m.someNeedRemove.Add(-int32(len(removes))) - for i := 0; i < len(removes); i++ { - m.funcs.Remove(removes[i]) + if isfirst == 1 { + rul := m.removelock.Lock() + for i := 0; i < len(m.removeList); i++ { + m.funcs.Remove(m.removeList[i]) } + rul() } + + m.call.Add(-1) } func (m *MsgType[T]) register(f func(MsgType_tag_data[T]) (disable bool)) { - m.lock.Lock() + ul := m.lock.Lock(m.to...) m.funcs.PushBack(f) - m.lock.Unlock() + ul() } func (m *MsgType[T]) register_front(f func(MsgType_tag_data[T]) (disable bool)) { - m.lock.Lock() + ul := m.lock.Lock(m.to...) m.funcs.PushFront(f) - m.lock.Unlock() + ul() } +// 不能放置在由PushLock_tag调用的同步Pull中 func (m *MsgType[T]) Push_tag(Tag string, Data T) { if len(m.to) > 0 { ptr := uintptr(unsafe.Pointer(&Data)) @@ -380,6 +377,7 @@ func (m *MsgType[T]) Push_tag(Tag string, Data T) { }) } +// 不能放置在由Push_tag、PushLock_tag调用的同步Pull中 func (m *MsgType[T]) PushLock_tag(Tag string, Data T) { if len(m.to) > 0 { ptr := uintptr(unsafe.Pointer(&Data)) @@ -407,27 +405,23 @@ func (m *MsgType[T]) PushLock_tag(Tag string, Data T) { } func (m *MsgType[T]) ClearAll() { - for m.someNeedRemove.Load() != 0 { - time.Sleep(time.Millisecond) - runtime.Gosched() - } - - m.lock.Lock() - defer m.lock.Unlock() - - var removes []*list.Element + isfirst := m.call.Add(1) + rul := m.removelock.Lock() for el := m.funcs.Front(); el != nil; el = el.Next() { - m.someNeedRemove.Add(1) - removes = append(removes, el) + m.removeList = append(m.removeList, el) } + rul() - if len(removes) != 0 { - m.someNeedRemove.Add(-int32(len(removes))) - for i := 0; i < len(removes); i++ { - m.funcs.Remove(removes[i]) + if isfirst == 1 { + rul := m.removelock.Lock() + for i := 0; i < len(m.removeList); i++ { + m.funcs.Remove(m.removeList[i]) } + rul() } + + m.call.Add(-1) } func (m *MsgType[T]) Pull_tag_chan(key string, size int, ctx context.Context) <-chan T { diff --git a/msgq/Msgq_test.go b/msgq/Msgq_test.go index 7955cf9..ee8ee30 100644 --- a/msgq/Msgq_test.go +++ b/msgq/Msgq_test.go @@ -164,6 +164,23 @@ func Benchmark_1(b *testing.B) { } } +func Test_RemoveInPush(t *testing.T) { + mq := NewTo(time.Second, time.Second*3) + mq.Pull_tag(FuncMap{ + `r1`: func(a any) (disable bool) { + mq.ClearAll() + return false + }, + `r2`: func(a any) (disable bool) { + return true + }, + }) + mq.PushLock_tag(`r1`, nil) + if mq.funcs.Len() != 0 { + t.Fatal() + } +} + func Test_Pull_tag_chan(t *testing.T) { mq := New() ctx, cf := context.WithCancel(context.Background()) diff --git a/sync/RWMutex.go b/sync/RWMutex.go new file mode 100644 index 0000000..ccda57d --- /dev/null +++ b/sync/RWMutex.go @@ -0,0 +1,119 @@ +package part + +import ( + "fmt" + "runtime" + "sync/atomic" + "time" +) + +const ( + lock = -1 + ulock = 0 +) + +type RWMutex struct { + rlc atomic.Int32 + wantRead atomic.Bool + wantWrite atomic.Bool +} + +// to[0]: wait lock timeout to[1]: run lock timeout +// +// 不要在Rlock内设置变量,有DATA RACE风险 +func (m *RWMutex) RLock(to ...time.Duration) (unlockf func()) { + m.wantRead.Store(true) + var callC atomic.Bool + if len(to) > 0 { + var calls []string + if len(to) > 1 { + for i := 1; true; i++ { + if pc, file, line, ok := runtime.Caller(i); !ok { + break + } else { + calls = append(calls, fmt.Sprintf("%s\n\t%s:%d", runtime.FuncForPC(pc).Name(), file, line)) + } + } + } + c := time.Now() + for m.rlc.Load() < ulock || m.wantWrite.Load() { + if time.Since(c) > to[0] { + panic(fmt.Sprintf("timeout to wait lock while rlocking, rlc:%d", m.rlc.Load())) + } + runtime.Gosched() + } + if len(to) > 1 { + time.AfterFunc(to[1], func() { + if !callC.Load() { + panicS := fmt.Sprintf("timeout to run rlock %v > %v\n", time.Since(c), to[0]) + for i := 0; i < len(calls); i++ { + panicS += fmt.Sprintf("call by %s\n", calls[i]) + } + panic(panicS) + } + }) + } + } else { + for m.rlc.Load() < ulock || m.wantWrite.Load() { + runtime.Gosched() + } + } + m.rlc.Add(1) + return func() { + if !callC.CompareAndSwap(false, true) { + panic("had unrlock") + } + if m.rlc.Add(-1) == ulock { + m.wantRead.Store(false) + } + } +} + +// to[0]: wait lock timeout to[1]: run lock timeout +func (m *RWMutex) Lock(to ...time.Duration) (unlockf func()) { + m.wantWrite.Store(true) + var callC atomic.Bool + if len(to) > 0 { + var calls []string + if len(to) > 1 { + for i := 1; true; i++ { + if pc, file, line, ok := runtime.Caller(i); !ok { + break + } else { + calls = append(calls, fmt.Sprintf("%s\n\t%s:%d", runtime.FuncForPC(pc).Name(), file, line)) + } + } + } + c := time.Now() + for m.rlc.Load() != ulock || m.wantRead.Load() { + if time.Since(c) > to[0] { + panic(fmt.Sprintf("timeout to wait rlock while locking, rlc:%d", m.rlc.Load())) + } + runtime.Gosched() + } + if len(to) > 1 { + time.AfterFunc(to[1], func() { + if !callC.Load() { + panicS := fmt.Sprintf("timeout to run lock %v > %v\n", time.Since(c), to[0]) + for i := 0; i < len(calls); i++ { + panicS += fmt.Sprintf("call by %s\n", calls[i]) + } + panic(panicS) + } + }) + } + } else { + for m.rlc.Load() != ulock || m.wantRead.Load() { + runtime.Gosched() + } + } + m.rlc.Add(-1) + return func() { + if !callC.CompareAndSwap(false, true) { + panic("had unlock") + } + if m.rlc.Add(1) == ulock { + m.wantWrite.Store(false) + } + } +} diff --git a/sync/RWMutex_test.go b/sync/RWMutex_test.go new file mode 100644 index 0000000..67aaaa8 --- /dev/null +++ b/sync/RWMutex_test.go @@ -0,0 +1,16 @@ +package part + +import ( + "testing" + "time" +) + +func TestMain(t *testing.T) { +} + +func BenchmarkRlock(b *testing.B) { + var lock1 RWMutex + for i := 0; i < b.N; i++ { + lock1.RLock(time.Second, time.Second)() + } +}