From: qydysky Date: Tue, 16 May 2023 20:23:02 +0000 (+0800) Subject: aaa X-Git-Tag: v0.27.5 X-Git-Url: http://127.0.0.1:8081/?a=commitdiff_plain;h=056077c711c5832b2c6ea9a2b184e5c8aeb63ac7;p=part%2F.git aaa --- diff --git a/msgq/Msgq.go b/msgq/Msgq.go index 2548bb5..719d834 100644 --- a/msgq/Msgq.go +++ b/msgq/Msgq.go @@ -5,12 +5,13 @@ import ( "context" "fmt" "runtime" + "sync" "sync/atomic" "time" "unsafe" signal "github.com/qydysky/part/signal" - sync "github.com/qydysky/part/sync" + psync "github.com/qydysky/part/sync" ) type Msgq struct { @@ -18,7 +19,7 @@ type Msgq struct { funcs *list.List someNeedRemove atomic.Int32 lock sync.RWMutex - runTag sync.Map + runTag psync.Map } type FuncMap map[string]func(any) (disable bool) @@ -38,15 +39,15 @@ func NewTo(to ...time.Duration) *Msgq { } func (m *Msgq) Register(f func(any) (disable bool)) { - ul := m.lock.Lock(m.to...)() + m.lock.Lock() m.funcs.PushBack(f) - ul() + m.lock.Unlock() } func (m *Msgq) Register_front(f func(any) (disable bool)) { - ul := m.lock.Lock(m.to...)() + m.lock.Lock() m.funcs.PushFront(f) - ul() + m.lock.Unlock() } func (m *Msgq) Push(msg any) { @@ -57,22 +58,22 @@ func (m *Msgq) Push(msg any) { var removes []*list.Element - ul := m.lock.RLock(m.to...)() + m.lock.RLock() for el := m.funcs.Front(); el != nil; el = el.Next() { if disable := el.Value.(func(any) bool)(msg); disable { m.someNeedRemove.Add(1) removes = append(removes, el) } } - ul() + m.lock.RUnlock() if len(removes) != 0 { - ul := m.lock.Lock(m.to...)() + m.lock.Lock() m.someNeedRemove.Add(-int32(len(removes))) for i := 0; i < len(removes); i++ { m.funcs.Remove(removes[i]) } - ul() + m.lock.Unlock() } } @@ -82,8 +83,8 @@ func (m *Msgq) PushLock(msg any) { runtime.Gosched() } - ul := m.lock.Lock(m.to...)() - defer ul() + m.lock.Lock() + defer m.lock.Unlock() var removes []*list.Element @@ -108,8 +109,8 @@ func (m *Msgq) ClearAll() { runtime.Gosched() } - ul := m.lock.Lock(m.to...)() - defer ul() + m.lock.Lock() + defer m.lock.Unlock() var removes []*list.Element @@ -266,7 +267,7 @@ type MsgType[T any] struct { funcs *list.List someNeedRemove atomic.Int32 lock sync.RWMutex - runTag sync.Map + runTag psync.Map } type MsgType_tag_data[T any] struct { @@ -296,22 +297,22 @@ func (m *MsgType[T]) push(msg MsgType_tag_data[T]) { var removes []*list.Element - ul := m.lock.RLock(m.to...)() + m.lock.RLock() for el := m.funcs.Front(); el != nil; el = el.Next() { if disable := el.Value.(func(MsgType_tag_data[T]) bool)(msg); disable { m.someNeedRemove.Add(1) removes = append(removes, el) } } - ul() + m.lock.RUnlock() if len(removes) != 0 { - ul := m.lock.Lock(m.to...)() + m.lock.Lock() m.someNeedRemove.Add(-int32(len(removes))) for i := 0; i < len(removes); i++ { m.funcs.Remove(removes[i]) } - ul() + m.lock.Unlock() } } @@ -321,8 +322,8 @@ func (m *MsgType[T]) pushLock(msg MsgType_tag_data[T]) { runtime.Gosched() } - ul := m.lock.Lock(m.to...)() - defer ul() + m.lock.Lock() + defer m.lock.Unlock() var removes []*list.Element @@ -342,15 +343,15 @@ func (m *MsgType[T]) pushLock(msg MsgType_tag_data[T]) { } func (m *MsgType[T]) register(f func(MsgType_tag_data[T]) (disable bool)) { - ul := m.lock.Lock(m.to...)() + m.lock.Lock() m.funcs.PushBack(f) - ul() + m.lock.Unlock() } func (m *MsgType[T]) register_front(f func(MsgType_tag_data[T]) (disable bool)) { - ul := m.lock.Lock(m.to...)() + m.lock.Lock() m.funcs.PushFront(f) - ul() + m.lock.Unlock() } func (m *MsgType[T]) Push_tag(Tag string, Data T) { @@ -411,8 +412,8 @@ func (m *MsgType[T]) ClearAll() { runtime.Gosched() } - ul := m.lock.Lock(m.to...)() - defer ul() + m.lock.Lock() + defer m.lock.Unlock() var removes []*list.Element diff --git a/sync/RWMutex.go b/sync/RWMutex.go deleted file mode 100644 index 0b71740..0000000 --- a/sync/RWMutex.go +++ /dev/null @@ -1,121 +0,0 @@ -package part - -import ( - "fmt" - "runtime" - "sync/atomic" - "time" -) - -const ( - lock = -1 - ulock = 0 -) - -type RWMutex struct { - rlc atomic.Int32 - wantRead atomic.Bool - wantWrite atomic.Bool -} - -// RLock() 必须在 lock期间操作的变量所定义的goroutime 中调用 -func (m *RWMutex) RLock(to ...time.Duration) (lockf func() (unlockf func())) { - m.wantRead.Store(true) - return func() (unlockf func()) { - var callC atomic.Bool - if len(to) > 0 { - var calls []string - for i := 1; true; i++ { - if pc, file, line, ok := runtime.Caller(i); !ok { - break - } else { - calls = append(calls, fmt.Sprintf("%s\n\t%s:%d", runtime.FuncForPC(pc).Name(), file, line)) - } - } - c := time.Now() - for m.rlc.Load() < ulock || m.wantWrite.Load() { - if time.Since(c) > to[0] { - panic(fmt.Sprintf("timeout to wait lock, rlc:%d", m.rlc.Load())) - } - runtime.Gosched() - } - c = time.Now() - go func() { - for !callC.Load() { - if time.Since(c) > to[0] { - panicS := fmt.Sprintf("timeout to run rlock %v > %v\n", time.Since(c), to[0]) - for i := 0; i < len(calls); i++ { - panicS += fmt.Sprintf("%s\n", calls[i]) - } - panic(panicS) - } - runtime.Gosched() - } - }() - } else { - for m.rlc.Load() < ulock || m.wantWrite.Load() { - runtime.Gosched() - } - } - m.rlc.Add(1) - return func() { - if !callC.CompareAndSwap(false, true) { - panic("had unrlock") - } - if m.rlc.Add(-1) == ulock { - m.wantRead.Store(false) - } - } - } -} - -// Lock() 必须在 lock期间操作的变量所定义的goroutime 中调用 -func (m *RWMutex) Lock(to ...time.Duration) (lockf func() (unlockf func())) { - m.wantWrite.Store(true) - return func() (unlock func()) { - var callC atomic.Bool - if len(to) > 0 { - var calls []string - for i := 1; true; i++ { - if pc, file, line, ok := runtime.Caller(i); !ok { - break - } else { - calls = append(calls, fmt.Sprintf("%s\n\t%s:%d", runtime.FuncForPC(pc).Name(), file, line)) - } - } - c := time.Now() - for m.rlc.Load() != ulock || m.wantRead.Load() { - if time.Since(c) > to[0] { - panic(fmt.Sprintf("timeout to wait rlock, rlc:%d", m.rlc.Load())) - } - runtime.Gosched() - } - c = time.Now() - go func() { - for !callC.Load() { - if time.Since(c) > to[0] { - panicS := fmt.Sprintf("timeout to run lock %v > %v\n", time.Since(c), to[0]) - for i := 0; i < len(calls); i++ { - panicS += fmt.Sprintf("call by %s\n", calls[i]) - } - panic(panicS) - } - runtime.Gosched() - } - }() - } else { - for m.rlc.Load() != ulock || m.wantRead.Load() { - runtime.Gosched() - } - } - m.rlc.Add(-1) - return func() { - if !callC.CompareAndSwap(false, true) { - panic("had unlock") - } - if m.rlc.Add(1) == ulock { - m.wantWrite.Store(false) - } - } - } -} diff --git a/sync/RWMutex_test.go b/sync/RWMutex_test.go deleted file mode 100644 index c64775b..0000000 --- a/sync/RWMutex_test.go +++ /dev/null @@ -1,52 +0,0 @@ -package part - -import ( - "testing" - "time" -) - -func TestMain(t *testing.T) { - var rl RWMutex - var callL time.Time - var callRL time.Time - var callRL2 time.Time - var to = time.Second * 2 - - ul := rl.RLock(to)() - callRL = time.Now() - - rlock := rl.RLock(to) - go func() { - unlock := rlock() - callRL2 = time.Now() - unlock() - }() - - lock := rl.Lock(to) - go func() { - ull := lock() - callL = time.Now() - ull() - }() - - time.Sleep(time.Second) - ul() - rl.Lock(to)()() - - if time.Since(callRL) < time.Since(callRL2) { - t.Fatal() - } - if time.Since(callRL2) < time.Since(callL) { - t.Fatal() - } - if callL.IsZero() { - t.Fatal() - } -} - -func BenchmarkRlock(b *testing.B) { - var lock1 RWMutex - for i := 0; i < b.N; i++ { - lock1.Lock(time.Second)()() - } -}