"context"
"fmt"
"runtime"
+ "sync"
"sync/atomic"
"time"
"unsafe"
signal "github.com/qydysky/part/signal"
- sync "github.com/qydysky/part/sync"
+ psync "github.com/qydysky/part/sync"
)
type Msgq struct {
funcs *list.List
someNeedRemove atomic.Int32
lock sync.RWMutex
- runTag sync.Map
+ runTag psync.Map
}
type FuncMap map[string]func(any) (disable bool)
}
func (m *Msgq) Register(f func(any) (disable bool)) {
- ul := m.lock.Lock(m.to...)()
+ m.lock.Lock()
m.funcs.PushBack(f)
- ul()
+ m.lock.Unlock()
}
func (m *Msgq) Register_front(f func(any) (disable bool)) {
- ul := m.lock.Lock(m.to...)()
+ m.lock.Lock()
m.funcs.PushFront(f)
- ul()
+ m.lock.Unlock()
}
func (m *Msgq) Push(msg any) {
var removes []*list.Element
- ul := m.lock.RLock(m.to...)()
+ m.lock.RLock()
for el := m.funcs.Front(); el != nil; el = el.Next() {
if disable := el.Value.(func(any) bool)(msg); disable {
m.someNeedRemove.Add(1)
removes = append(removes, el)
}
}
- ul()
+ m.lock.RUnlock()
if len(removes) != 0 {
- ul := m.lock.Lock(m.to...)()
+ m.lock.Lock()
m.someNeedRemove.Add(-int32(len(removes)))
for i := 0; i < len(removes); i++ {
m.funcs.Remove(removes[i])
}
- ul()
+ m.lock.Unlock()
}
}
runtime.Gosched()
}
- ul := m.lock.Lock(m.to...)()
- defer ul()
+ m.lock.Lock()
+ defer m.lock.Unlock()
var removes []*list.Element
runtime.Gosched()
}
- ul := m.lock.Lock(m.to...)()
- defer ul()
+ m.lock.Lock()
+ defer m.lock.Unlock()
var removes []*list.Element
funcs *list.List
someNeedRemove atomic.Int32
lock sync.RWMutex
- runTag sync.Map
+ runTag psync.Map
}
type MsgType_tag_data[T any] struct {
var removes []*list.Element
- ul := m.lock.RLock(m.to...)()
+ m.lock.RLock()
for el := m.funcs.Front(); el != nil; el = el.Next() {
if disable := el.Value.(func(MsgType_tag_data[T]) bool)(msg); disable {
m.someNeedRemove.Add(1)
removes = append(removes, el)
}
}
- ul()
+ m.lock.RUnlock()
if len(removes) != 0 {
- ul := m.lock.Lock(m.to...)()
+ m.lock.Lock()
m.someNeedRemove.Add(-int32(len(removes)))
for i := 0; i < len(removes); i++ {
m.funcs.Remove(removes[i])
}
- ul()
+ m.lock.Unlock()
}
}
runtime.Gosched()
}
- ul := m.lock.Lock(m.to...)()
- defer ul()
+ m.lock.Lock()
+ defer m.lock.Unlock()
var removes []*list.Element
}
func (m *MsgType[T]) register(f func(MsgType_tag_data[T]) (disable bool)) {
- ul := m.lock.Lock(m.to...)()
+ m.lock.Lock()
m.funcs.PushBack(f)
- ul()
+ m.lock.Unlock()
}
func (m *MsgType[T]) register_front(f func(MsgType_tag_data[T]) (disable bool)) {
- ul := m.lock.Lock(m.to...)()
+ m.lock.Lock()
m.funcs.PushFront(f)
- ul()
+ m.lock.Unlock()
}
func (m *MsgType[T]) Push_tag(Tag string, Data T) {
runtime.Gosched()
}
- ul := m.lock.Lock(m.to...)()
- defer ul()
+ m.lock.Lock()
+ defer m.lock.Unlock()
var removes []*list.Element
+++ /dev/null
-package part
-
-import (
- "fmt"
- "runtime"
- "sync/atomic"
- "time"
-)
-
-const (
- lock = -1
- ulock = 0
-)
-
-type RWMutex struct {
- rlc atomic.Int32
- wantRead atomic.Bool
- wantWrite atomic.Bool
-}
-
-// RLock() 必须在 lock期间操作的变量所定义的goroutime 中调用
-func (m *RWMutex) RLock(to ...time.Duration) (lockf func() (unlockf func())) {
- m.wantRead.Store(true)
- return func() (unlockf func()) {
- var callC atomic.Bool
- if len(to) > 0 {
- var calls []string
- for i := 1; true; i++ {
- if pc, file, line, ok := runtime.Caller(i); !ok {
- break
- } else {
- calls = append(calls, fmt.Sprintf("%s\n\t%s:%d", runtime.FuncForPC(pc).Name(), file, line))
- }
- }
- c := time.Now()
- for m.rlc.Load() < ulock || m.wantWrite.Load() {
- if time.Since(c) > to[0] {
- panic(fmt.Sprintf("timeout to wait lock, rlc:%d", m.rlc.Load()))
- }
- runtime.Gosched()
- }
- c = time.Now()
- go func() {
- for !callC.Load() {
- if time.Since(c) > to[0] {
- panicS := fmt.Sprintf("timeout to run rlock %v > %v\n", time.Since(c), to[0])
- for i := 0; i < len(calls); i++ {
- panicS += fmt.Sprintf("%s\n", calls[i])
- }
- panic(panicS)
- }
- runtime.Gosched()
- }
- }()
- } else {
- for m.rlc.Load() < ulock || m.wantWrite.Load() {
- runtime.Gosched()
- }
- }
- m.rlc.Add(1)
- return func() {
- if !callC.CompareAndSwap(false, true) {
- panic("had unrlock")
- }
- if m.rlc.Add(-1) == ulock {
- m.wantRead.Store(false)
- }
- }
- }
-}
-
-// Lock() 必须在 lock期间操作的变量所定义的goroutime 中调用
-func (m *RWMutex) Lock(to ...time.Duration) (lockf func() (unlockf func())) {
- m.wantWrite.Store(true)
- return func() (unlock func()) {
- var callC atomic.Bool
- if len(to) > 0 {
- var calls []string
- for i := 1; true; i++ {
- if pc, file, line, ok := runtime.Caller(i); !ok {
- break
- } else {
- calls = append(calls, fmt.Sprintf("%s\n\t%s:%d", runtime.FuncForPC(pc).Name(), file, line))
- }
- }
- c := time.Now()
- for m.rlc.Load() != ulock || m.wantRead.Load() {
- if time.Since(c) > to[0] {
- panic(fmt.Sprintf("timeout to wait rlock, rlc:%d", m.rlc.Load()))
- }
- runtime.Gosched()
- }
- c = time.Now()
- go func() {
- for !callC.Load() {
- if time.Since(c) > to[0] {
- panicS := fmt.Sprintf("timeout to run lock %v > %v\n", time.Since(c), to[0])
- for i := 0; i < len(calls); i++ {
- panicS += fmt.Sprintf("call by %s\n", calls[i])
- }
- panic(panicS)
- }
- runtime.Gosched()
- }
- }()
- } else {
- for m.rlc.Load() != ulock || m.wantRead.Load() {
- runtime.Gosched()
- }
- }
- m.rlc.Add(-1)
- return func() {
- if !callC.CompareAndSwap(false, true) {
- panic("had unlock")
- }
- if m.rlc.Add(1) == ulock {
- m.wantWrite.Store(false)
- }
- }
- }
-}
+++ /dev/null
-package part
-
-import (
- "testing"
- "time"
-)
-
-func TestMain(t *testing.T) {
- var rl RWMutex
- var callL time.Time
- var callRL time.Time
- var callRL2 time.Time
- var to = time.Second * 2
-
- ul := rl.RLock(to)()
- callRL = time.Now()
-
- rlock := rl.RLock(to)
- go func() {
- unlock := rlock()
- callRL2 = time.Now()
- unlock()
- }()
-
- lock := rl.Lock(to)
- go func() {
- ull := lock()
- callL = time.Now()
- ull()
- }()
-
- time.Sleep(time.Second)
- ul()
- rl.Lock(to)()()
-
- if time.Since(callRL) < time.Since(callRL2) {
- t.Fatal()
- }
- if time.Since(callRL2) < time.Since(callL) {
- t.Fatal()
- }
- if callL.IsZero() {
- t.Fatal()
- }
-}
-
-func BenchmarkRlock(b *testing.B) {
- var lock1 RWMutex
- for i := 0; i < b.N; i++ {
- lock1.Lock(time.Second)()()
- }
-}