From f9abe91fdae4143a18a8bf340ead7c5bbfa2cb15 Mon Sep 17 00:00:00 2001 From: qydysky Date: Sun, 16 Feb 2025 01:24:12 +0800 Subject: [PATCH] 1 (#25) * 1 * 1 * 1 * 1 --- .github/workflows/test1.yml | 6 +- go.mod | 2 +- msgq/Msgq.go | 507 +++++++++++------------------------- msgq/Msgq_test.go | 108 ++++++-- sync/RWMutex.go | 62 ++++- sync/RWMutex_test.go | 63 ++++- 6 files changed, 359 insertions(+), 389 deletions(-) diff --git a/.github/workflows/test1.yml b/.github/workflows/test1.yml index d5401a4..e68d549 100644 --- a/.github/workflows/test1.yml +++ b/.github/workflows/test1.yml @@ -13,7 +13,7 @@ jobs: - name: Set up Go 1.x uses: actions/setup-go@v5 with: - go-version: '1.23' + go-version: '1.24' - name: Check out code into the Go module directory uses: actions/checkout@v4 @@ -52,7 +52,7 @@ jobs: - name: Set up Go 1.x uses: actions/setup-go@v5 with: - go-version: '1.23' + go-version: '1.24' - name: Check out code into the Go module directory uses: actions/checkout@v4 @@ -91,7 +91,7 @@ jobs: - name: Set up Go 1.x uses: actions/setup-go@v5 with: - go-version: '1.23' + go-version: '1.24' - name: Check out code into the Go module directory uses: actions/checkout@v4 diff --git a/go.mod b/go.mod index ed461cc..80c67f6 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/qydysky/part -go 1.23 +go 1.24 require ( github.com/gorilla/websocket v1.5.3 diff --git a/msgq/Msgq.go b/msgq/Msgq.go index d3c8e76..4423b0f 100644 --- a/msgq/Msgq.go +++ b/msgq/Msgq.go @@ -3,25 +3,31 @@ package part import ( "container/list" "context" + "errors" "fmt" + lmt "fmt" + "go/build" + "runtime" + "strings" "sync/atomic" "time" - "unsafe" psync "github.com/qydysky/part/sync" ) +var ErrRunTO = errors.New(`ErrRunTO`) + type Msgq struct { to []time.Duration funcs *list.List someNeedRemove atomic.Bool + allNeedRemove atomic.Bool lock psync.RWMutex - runTag psync.Map + PanicFunc func(any) } type msgqItem struct { - running atomic.Int32 disable atomic.Bool f *func(any) (disable bool) } @@ -34,129 +40,120 @@ func New(to ...time.Duration) *Msgq { m := new(Msgq) m.funcs = list.New() m.to = to + if len(m.to) > 0 { + m.lock.RecLock(10, to[0], to[0]) + } return m } -func (m *Msgq) register(mp *msgqItem) (cancel func()) { - ul := m.lock.Lock() - m.funcs.PushBack(mp) - ul() - return func() { - mp.disable.Store(true) - m.someNeedRemove.Store(true) - } +func (m *Msgq) TOPanicFunc(f func(any)) { + m.PanicFunc = f + m.lock.PanicFunc = f } -func (m *Msgq) Register(f func(any) (disable bool)) (cancel func()) { - mp := &msgqItem{ - f: &f, +func (m *Msgq) register(mp *msgqItem, f func(v any) *list.Element) (cancel func()) { + defer m.lock.Lock()() + + if m.allNeedRemove.Load() { + m.removeDisable(m.someNeedRemove.CompareAndSwap(false, true), true) } - m.register(mp) + + f(mp) return func() { mp.disable.Store(true) - m.someNeedRemove.Store(true) + m.removeDisable(m.someNeedRemove.CompareAndSwap(false, true), false) } } -func (m *Msgq) register_front(mp *msgqItem) (cancel func()) { - ul := m.lock.Lock() - m.funcs.PushFront(mp) - ul() - return func() { - mp.disable.Store(true) - m.someNeedRemove.Store(true) - } +func (m *Msgq) Register(f func(any) (disable bool)) (cancel func()) { + return m.register(&msgqItem{ + f: &f, + }, m.funcs.PushBack) } -func (m *Msgq) Register_front(f func(any) (disable bool)) (cancel func()) { - mp := &msgqItem{ +func (m *Msgq) RegisterFront(f func(any) (disable bool)) (cancel func()) { + return m.register(&msgqItem{ f: &f, - } - m.register_front(mp) - return func() { - mp.disable.Store(true) - m.someNeedRemove.Store(true) - } + }, m.funcs.PushFront) } -func (m *Msgq) Push(msg any) { - ul := m.lock.RLock(m.to...) - defer ul(m.removeDisable(true)...) +func (m *Msgq) push(msg any, isLock bool) { + if isLock { + defer m.lock.Lock()() + } else { + defer m.lock.RLock()() + } - for el := m.funcs.Front(); el != nil; el = el.Next() { - mi := el.Value.(*msgqItem) - if mi.disable.Load() { - continue + if m.allNeedRemove.Load() { + m.removeDisable(true, isLock) + if !isLock { + return } - mi.running.Add(1) - if disable := (*mi.f)(msg); disable { + } + + for el := m.funcs.Front(); el != nil; el = el.Next() { + if mi := el.Value.(*msgqItem); !mi.disable.Load() && (*mi.f)(msg) { mi.disable.Store(true) - m.someNeedRemove.Store(true) + m.removeDisable(m.someNeedRemove.CompareAndSwap(false, true), isLock) } - mi.running.Add(-1) } } +// 不能在由PushLock*调用的Pull中以同步方式使用 +func (m *Msgq) Push(msg any) { + m.push(msg, false) +} + +// 不能在由Push*调用的Pull中以同步方式使用 func (m *Msgq) PushLock(msg any) { - ul := m.lock.Lock(m.to...) - defer ul(m.removeDisable(false)...) + m.push(msg, true) +} - for el := m.funcs.Front(); el != nil; el = el.Next() { - mi := el.Value.(*msgqItem) - if mi.disable.Load() { - continue +func (m *Msgq) ClearAll() { + m.allNeedRemove.Store(true) +} + +func (m *Msgq) removeDisable(sig bool, isLock bool) { + if sig { + f := func() { + if !isLock { + defer m.lock.Lock()() + } + all := m.allNeedRemove.Swap(false) + for el := m.funcs.Front(); el != nil; el = el.Next() { + mi := el.Value.(*msgqItem) + if all || mi.disable.Load() { + m.funcs.Remove(el) + } + } + m.someNeedRemove.Store(false) } - mi.running.Add(1) - if disable := (*mi.f)(msg); disable { - mi.disable.Store(true) - m.someNeedRemove.Store(true) + if isLock { + f() + } else { + go f() } - mi.running.Add(-1) } } -func (m *Msgq) ClearAll() { - for el := m.funcs.Front(); el != nil; el = el.Next() { - mi := el.Value.(*msgqItem) - mi.disable.Store(true) - m.someNeedRemove.Store(true) +func (m *Msgq) panicFunc(s any) { + if m.PanicFunc != nil { + m.PanicFunc(s) + } else { + panic(s) } } -func (m *Msgq) removeDisable(rlock bool) (ls []func(ulocked bool) (doUlock bool)) { - if rlock { - return []func(ulocked bool) (doUlock bool){ - func(ulocked bool) (doUlock bool) { - return m.someNeedRemove.CompareAndSwap(true, false) - }, - func(ulocked bool) (doUlock bool) { - if ulocked { - defer m.lock.Lock()() - for el := m.funcs.Front(); el != nil; el = el.Next() { - mi := el.Value.(*msgqItem) - if mi.disable.Load() && mi.running.Load() == 0 { - m.funcs.Remove(el) - } - } - } - return - }, - } - } else { - return []func(ulocked bool) (doUlock bool){ - func(ulocked bool) (doUlock bool) { - if m.someNeedRemove.CompareAndSwap(true, false) { - for el := m.funcs.Front(); el != nil; el = el.Next() { - mi := el.Value.(*msgqItem) - if mi.disable.Load() && mi.running.Load() == 0 { - m.funcs.Remove(el) - } - } - } - return - }, +func (m *Msgq) PushingTO(info string, callTree *string) (fin func()) { + if len(m.to) > 1 { + to := time.AfterFunc(m.to[1], func() { + m.panicFunc(errors.Join(ErrRunTO, lmt.Errorf("%v:%v", info, *callTree))) + }) + return func() { + to.Stop() } } + return func() {} } type Msgq_tag_data struct { @@ -164,106 +161,27 @@ type Msgq_tag_data struct { Data any } -// 不能放置在由PushLock_tag调用的同步Pull中 +// 不能在由PushLock*调用的Pull中以同步方式使用 func (m *Msgq) Push_tag(Tag string, Data any) { - if len(m.to) > 0 { - ptr := uintptr(unsafe.Pointer(&Data)) - m.runTag.Store(ptr, "Push_tag(`"+Tag+"`,...)") - defer func() { - if e := recover(); e != nil { - m.runTag.Range(func(key, value any) bool { - if key == ptr { - fmt.Printf("%v panic > %v\n", value, e) - } else { - fmt.Printf("%v running\n", value) - } - return true - }) - m.runTag.ClearAll() - panic(e) - } - m.runTag.Delete(ptr) - }() - } - { - /* - m.m.Push(Msgq_tag_data{ - Tag: Tag, - Data: Data, - }) - */ - defer m.lock.RLock(m.to...)(m.removeDisable(true)...) - - for el := m.funcs.Front(); el != nil; el = el.Next() { - mi := el.Value.(*msgqItem) - if mi.disable.Load() { - continue - } - mi.running.Add(1) - if disable := (*mi.f)(&Msgq_tag_data{ - Tag: Tag, - Data: Data, - }); disable { - mi.disable.Store(true) - m.someNeedRemove.Store(true) - } - mi.running.Add(-1) - } - } + defer m.PushingTO(lmt.Sprintf("\nPush_tag(`%v`)", Tag), getCall(1))() + m.Push(&Msgq_tag_data{ + Tag: Tag, + Data: Data, + }) } -// 不能放置在由Push_tag、PushLock_tag调用的同步Pull中 +// 不能在由Push*调用的Pull中以同步方式使用 func (m *Msgq) PushLock_tag(Tag string, Data any) { - if len(m.to) > 0 { - ptr := uintptr(unsafe.Pointer(&Data)) - m.runTag.Store(ptr, "PushLock_tag(`"+Tag+"`,...)") - defer func() { - if e := recover(); e != nil { - m.runTag.Range(func(key, value any) bool { - if key == ptr { - fmt.Printf("%v panic > %v\n", value, e) - } else { - fmt.Printf("%v running\n", value) - } - return true - }) - m.runTag.ClearAll() - panic(e) - } - m.runTag.Delete(ptr) - }() - } - { - /* - m.m.PushLock(Msgq_tag_data{ - Tag: Tag, - Data: Data, - }) - */ - ul := m.lock.Lock(m.to...) - defer ul(m.removeDisable(false)...) - - for el := m.funcs.Front(); el != nil; el = el.Next() { - mi := el.Value.(*msgqItem) - if mi.disable.Load() { - continue - } - mi.running.Add(1) - if disable := (*mi.f)(&Msgq_tag_data{ - Tag: Tag, - Data: Data, - }); disable { - mi.disable.Store(true) - m.someNeedRemove.Store(true) - } - mi.running.Add(-1) - } - } + defer m.PushingTO(lmt.Sprintf("\nPushLock_tag(`%v`)", Tag), getCall(1))() + m.PushLock(&Msgq_tag_data{ + Tag: Tag, + Data: Data, + }) } func (m *Msgq) Pull_tag_chan(key string, size int, ctx context.Context) (cancel func(), ch <-chan any) { var c = make(chan any, size) - var f1 = func(data any) bool { + return m.Register(func(data any) bool { if d, ok := data.(*Msgq_tag_data); ok && d.Tag == key { select { case <-ctx.Done(): @@ -282,74 +200,53 @@ func (m *Msgq) Pull_tag_chan(key string, size int, ctx context.Context) (cancel } } return false - } - cancel = m.register_front(&msgqItem{ - f: &f1, - }) - ch = c - return + }), c } func (m *Msgq) Pull_tag_only(key string, f func(any) (disable bool)) (cancel func()) { - var f1 = func(data any) (disable bool) { + return m.Register(func(data any) (disable bool) { if d, ok := data.(*Msgq_tag_data); ok && d.Tag == key { return f(d.Data) } return false - } - return m.register_front(&msgqItem{ - f: &f1, }) } func (m *Msgq) Pull_tag(func_map map[string]func(any) (disable bool)) (cancel func()) { - var f1 = func(data any) (disable bool) { + return m.Register(func(data any) (disable bool) { if d, ok := data.(*Msgq_tag_data); ok { if f, ok := func_map[d.Tag]; ok { return f(d.Data) } } return false - } - return m.register_front(&msgqItem{ - f: &f1, }) } func (m *Msgq) Pull_tag_async_only(key string, f func(any) (disable bool)) (cancel func()) { - var mi = msgqItem{} - var f1 = func(data any) bool { - if d, ok := data.(*Msgq_tag_data); ok { + var disable bool + return m.RegisterFront(func(data any) bool { + if d, ok := data.(*Msgq_tag_data); !disable && ok && d.Tag == key { go func() { - if f(d.Data) { - mi.disable.Store(true) - m.someNeedRemove.Store(true) - } + disable = f(d.Data) }() } - return false - } - mi.f = &f1 - return m.register_front(&mi) + return disable + }) } func (m *Msgq) Pull_tag_async(func_map map[string]func(any) (disable bool)) (cancel func()) { - var mi = msgqItem{} - var f = func(data any) bool { + var disable bool + return m.RegisterFront(func(data any) bool { if d, ok := data.(*Msgq_tag_data); ok { if f, ok := func_map[d.Tag]; ok { go func() { - if f(d.Data) { - mi.disable.Store(true) - m.someNeedRemove.Store(true) - } + disable = f(d.Data) }() } } - return false - } - mi.f = &f - return m.register_front(&mi) + return disable + }) } type MsgType[T any] struct { @@ -370,107 +267,25 @@ func (m *MsgType[T]) ClearAll() { m.m.ClearAll() } -// 不能放置在由PushLock_tag调用的同步Pull中 +// 不能在由PushLock*调用的Pull中以同步方式使用 func (m *MsgType[T]) Push_tag(Tag string, Data T) { - if len(m.m.to) > 0 { - ptr := uintptr(unsafe.Pointer(&Data)) - m.m.runTag.Store(ptr, "Push_tag(`"+Tag+"`,...)") - defer func() { - if e := recover(); e != nil { - m.m.runTag.Range(func(key, value any) bool { - if key == ptr { - fmt.Printf("%v panic > %v\n", value, e) - } else { - fmt.Printf("%v running\n", value) - } - return true - }) - m.m.runTag.ClearAll() - panic(e) - } - m.m.runTag.Delete(ptr) - }() - } - { - /* - m.m.Push(Msgq_tag_data{ - Tag: Tag, - Data: Data, - }) - */ - ul := m.m.lock.RLock(m.m.to...) - defer ul(m.m.removeDisable(true)...) - - for el := m.m.funcs.Front(); el != nil; el = el.Next() { - mi := el.Value.(*msgqItem) - if mi.disable.Load() { - continue - } - mi.running.Add(1) - if disable := (*mi.f)(&MsgType_tag_data[T]{ - Tag: Tag, - Data: &Data, - }); disable { - mi.disable.Store(true) - m.m.someNeedRemove.Store(true) - } - mi.running.Add(-1) - } - } + m.m.Push(&MsgType_tag_data[T]{ + Tag: Tag, + Data: &Data, + }) } -// 不能放置在由Push_tag、PushLock_tag调用的同步Pull中 +// 不能在由Push*调用的Pull中以同步方式使用 func (m *MsgType[T]) PushLock_tag(Tag string, Data T) { - if len(m.m.to) > 0 { - ptr := uintptr(unsafe.Pointer(&Data)) - m.m.runTag.Store(ptr, "PushLock_tag(`"+Tag+"`,...)") - defer func() { - if e := recover(); e != nil { - m.m.runTag.Range(func(key, value any) bool { - if key == ptr { - fmt.Printf("%v panic > %v\n", value, e) - } else { - fmt.Printf("%v running\n", value) - } - return true - }) - m.m.runTag.ClearAll() - panic(e) - } - m.m.runTag.Delete(ptr) - }() - } - { - /* - m.m.PushLock(Msgq_tag_data{ - Tag: Tag, - Data: Data, - }) - */ - ul := m.m.lock.Lock(m.m.to...) - defer ul(m.m.removeDisable(false)...) - - for el := m.m.funcs.Front(); el != nil; el = el.Next() { - mi := el.Value.(*msgqItem) - if mi.disable.Load() { - continue - } - mi.running.Add(1) - if disable := (*mi.f)(&MsgType_tag_data[T]{ - Tag: Tag, - Data: &Data, - }); disable { - mi.disable.Store(true) - m.m.someNeedRemove.Store(true) - } - mi.running.Add(-1) - } - } + m.m.PushLock(&MsgType_tag_data[T]{ + Tag: Tag, + Data: &Data, + }) } func (m *MsgType[T]) Pull_tag_chan(key string, size int, ctx context.Context) (cancel func(), ch <-chan T) { var c = make(chan T, size) - var f = func(data any) bool { + return m.m.Register(func(data any) bool { if data1, ok := data.(*MsgType_tag_data[T]); ok { if data1.Tag == key { select { @@ -491,74 +306,70 @@ func (m *MsgType[T]) Pull_tag_chan(key string, size int, ctx context.Context) (c } } return false - } - cancel = m.m.register(&msgqItem{ - f: &f, - }) - ch = c - return + }), c } func (m *MsgType[T]) Pull_tag_only(key string, f func(T) (disable bool)) (cancel func()) { - var f1 = func(data any) (disable bool) { - if data1, ok := data.(*MsgType_tag_data[T]); ok { - if data1.Tag == key { - return f(*data1.Data) - } + return m.m.Register(func(data any) (disable bool) { + if data1, ok := data.(*MsgType_tag_data[T]); ok && data1.Tag == key { + return f(*data1.Data) } return false - } - return m.m.register(&msgqItem{ - f: &f1, }) } func (m *MsgType[T]) Pull_tag(func_map map[string]func(T) (disable bool)) (cancel func()) { - var f = func(data any) (disable bool) { + return m.m.Register(func(data any) (disable bool) { if data1, ok := data.(*MsgType_tag_data[T]); ok { if f, ok := func_map[data1.Tag]; ok { return f(*data1.Data) } } return false - } - return m.m.register(&msgqItem{ - f: &f, }) } func (m *MsgType[T]) Pull_tag_async_only(key string, f func(T) (disable bool)) (cancel func()) { - var mi = msgqItem{} - var f1 = func(data any) bool { - if d, ok := data.(*MsgType_tag_data[T]); ok { + var disabled atomic.Bool + return m.m.RegisterFront(func(data any) bool { + if d, ok := data.(*MsgType_tag_data[T]); !disabled.Load() && ok && d.Tag == key { go func() { - if f(*d.Data) { - mi.disable.Store(true) - m.m.someNeedRemove.Store(true) + if !disabled.Load() { + disabled.Store(f(*d.Data)) } }() } - return false - } - mi.f = &f1 - return m.m.register_front(&mi) + return disabled.Load() + }) } func (m *MsgType[T]) Pull_tag_async(func_map map[string]func(T) (disable bool)) (cancel func()) { - var mi = msgqItem{} - var f = func(data any) bool { - if d, ok := data.(*MsgType_tag_data[T]); ok { + var disabled atomic.Bool + return m.m.RegisterFront(func(data any) bool { + if d, ok := data.(*MsgType_tag_data[T]); !disabled.Load() && ok { if f, ok := func_map[d.Tag]; ok { go func() { - if f(*d.Data) { - mi.disable.Store(true) - m.m.someNeedRemove.Store(true) + if !disabled.Load() { + disabled.Store(f(*d.Data)) } }() } } - return false + return disabled.Load() + }) +} + +func getCall(i int) (calls *string) { + var cs string + for i += 1; true; i++ { + if pc, file, line, ok := runtime.Caller(i); !ok || strings.HasPrefix(file, build.Default.GOROOT) { + break + } else { + cs += fmt.Sprintf("\ncall by %s\n\t%s:%d", runtime.FuncForPC(pc).Name(), file, line) + } + } + if cs == "" { + cs += fmt.Sprintln("\ncall by goroutine") } - mi.f = &f - return m.m.register_front(&mi) + return &cs } diff --git a/msgq/Msgq_test.go b/msgq/Msgq_test.go index 3d5102c..5883c9e 100644 --- a/msgq/Msgq_test.go +++ b/msgq/Msgq_test.go @@ -2,10 +2,14 @@ package part import ( "context" + "errors" + "fmt" _ "net/http/pprof" "sync" "testing" "time" + + psync "github.com/qydysky/part/sync" ) // type test_item struct { @@ -139,20 +143,41 @@ func BenchmarkXxx(b *testing.B) { } } -// func TestPushLock(t *testing.T) { -// defer func() { -// if e := recover(); e != nil { -// t.Fatal(e) -// } -// }() -// mq := NewTo(time.Second) -// mq.Pull_tag_only(`test`, func(a any) (disable bool) { -// mq.PushLock_tag(`lock`, nil) -// return false -// }) -// mq.Push_tag(`test`, nil) -// t.Fatal() -// } +func TestTORun(t *testing.T) { + t.Parallel() + panicC := make(chan any, 10) + mq := New(time.Second, time.Second) + mq.TOPanicFunc(func(a any) { + panicC <- a + }) + mq.Pull_tag_only(`test`, func(a any) (disable bool) { + time.Sleep(time.Second * 10) + return false + }) + go mq.Push_tag(`test`, nil) + e := <-panicC + if !errors.Is(e.(error), ErrRunTO) { + t.Fatal(e) + } +} + +func TestPushLock(t *testing.T) { + t.Parallel() + panicC := make(chan any, 10) + mq := New(time.Second, time.Second*2) + mq.TOPanicFunc(func(a any) { + panicC <- a + }) + mq.Pull_tag_only(`test`, func(a any) (disable bool) { + mq.PushLock_tag(`lock`, nil) + return false + }) + go mq.Push_tag(`test`, nil) + e := <-panicC + if !errors.Is(e.(error), psync.ErrTimeoutToLock) { + t.Fatal(e) + } +} func Benchmark_1(b *testing.B) { mq := New() @@ -166,6 +191,7 @@ func Benchmark_1(b *testing.B) { } func Test_4(t *testing.T) { + t.Parallel() mq := New() cancel := mq.Pull_tag(FuncMap{ `del`: func(a any) (disable bool) { @@ -178,6 +204,7 @@ func Test_4(t *testing.T) { } func Test_2(t *testing.T) { + t.Parallel() mq := New() cancel := mq.Pull_tag(FuncMap{ `del`: func(a any) (disable bool) { @@ -190,7 +217,27 @@ func Test_2(t *testing.T) { mq.PushLock_tag(`del`, nil) } +func Test_5(t *testing.T) { + t.Parallel() + mq := New(time.Second, time.Second*2) + mq.Pull_tag(FuncMap{ + `del`: func(a any) (disable bool) { + t.Log(1) + mq.Push_tag(`del1`, nil) + t.Log(3) + return false + }, + `del1`: func(a any) (disable bool) { + t.Log(2) + return true + }, + }) + mq.Push_tag(`del`, 1) + mq.Push_tag(`del1`, 1) +} + func Test_1(t *testing.T) { + t.Parallel() mq := New(time.Millisecond*5, time.Millisecond*10) go mq.Push_tag(`del`, nil) mq.Pull_tag(FuncMap{ @@ -206,11 +253,12 @@ func Test_1(t *testing.T) { } func Test_RemoveInPush(t *testing.T) { + t.Parallel() mq := New(time.Second, time.Second*3) mq.Pull_tag(FuncMap{ `r1`: func(a any) (disable bool) { mq.ClearAll() - return false + return true }, `r2`: func(a any) (disable bool) { return true @@ -223,6 +271,7 @@ func Test_RemoveInPush(t *testing.T) { } func Test_3(t *testing.T) { + t.Parallel() mq := New(time.Millisecond*5, time.Millisecond*10) go mq.Push_tag(`sss`, nil) mq.Pull_tag(FuncMap{ @@ -234,6 +283,7 @@ func Test_3(t *testing.T) { } func Test_Pull_tag_chan(t *testing.T) { + t.Parallel() mq := New() ctx, cf := context.WithCancel(context.Background()) _, ch := mq.Pull_tag_chan(`a`, 2, ctx) @@ -273,6 +323,7 @@ func Test_Pull_tag_chan(t *testing.T) { } func Test_Pull_tag_chan2(t *testing.T) { + t.Parallel() mq := New() mq.Pull_tag_chan(`a`, 1, context.Background()) @@ -283,6 +334,7 @@ func Test_Pull_tag_chan2(t *testing.T) { } func Test_msgq1(t *testing.T) { + t.Parallel() mq := New() c := make(chan time.Time, 10) mq.Pull_tag(map[string]func(any) bool{ @@ -339,6 +391,7 @@ func Test_msgq1(t *testing.T) { } func Test_msgq9(t *testing.T) { + t.Parallel() var mq = NewType[int]() var c = make(chan int, 10) mq.Pull_tag_only(`c`, func(i int) (disable bool) { @@ -348,12 +401,16 @@ func Test_msgq9(t *testing.T) { mq.PushLock_tag(`c`, 1) mq.ClearAll() mq.PushLock_tag(`c`, 2) - if len(c) != 1 || <-c != 1 { - t.Fatal() + t.Log(mq.m.funcs.Len()) + if l, i := len(c), <-c; l != 1 || i != 1 { + t.Fatal(l, i) } + fmt.Println(mq.m.allNeedRemove.Load()) + mq.Pull_tag(map[string]func(int) (disable bool){ `c`: func(i int) (disable bool) { + fmt.Println(1) c <- i return false }, @@ -363,15 +420,18 @@ func Test_msgq9(t *testing.T) { }, }) + fmt.Println(mq.m.funcs.Len()) + mq.PushLock_tag(`c`, 1) mq.ClearAll() mq.PushLock_tag(`c`, 2) - if len(c) != 1 || <-c != 1 { - t.Fatal() + if l, i := len(c), <-c; l != 1 || i != 1 { + t.Fatal(l, i) } } func Test_msgq2(t *testing.T) { + t.Parallel() mq := New() mq.Pull_tag(map[string]func(any) bool{ @@ -413,6 +473,7 @@ func Test_msgq2(t *testing.T) { } func Test_msgq3(t *testing.T) { + t.Parallel() mq := New() mun_c := make(chan int, 100) @@ -436,6 +497,7 @@ func Test_msgq3(t *testing.T) { } func Test_msgq4(t *testing.T) { + t.Parallel() // mq := New(30) mq := New(time.Second, time.Second) //out of list @@ -491,6 +553,7 @@ func Test_msgq4(t *testing.T) { } func Test_msgq5(t *testing.T) { + t.Parallel() mq := New() mun_c1 := make(chan bool, 100) @@ -548,6 +611,7 @@ func Test_msgq5(t *testing.T) { } func Test_msgq6(t *testing.T) { + t.Parallel() msg := NewType[int]() msg.Pull_tag(map[string]func(int) (disable bool){ `1`: func(b int) (disable bool) { @@ -562,6 +626,7 @@ func Test_msgq6(t *testing.T) { } func Test_msgq7(t *testing.T) { + t.Parallel() var c = make(chan string, 100) msg := NewType[int]() msg.Pull_tag_async_only(`1`, func(i int) (disable bool) { @@ -585,8 +650,8 @@ func Test_msgq7(t *testing.T) { return i > 10 }) msg.Push_tag(`1`, 0) - if <-c != "1" { - t.Fatal() + if i := <-c; i != "1" { + t.Fatal(i) } if <-c != "2" { t.Fatal() @@ -600,6 +665,7 @@ func Test_msgq7(t *testing.T) { } func Test_msgq8(t *testing.T) { + t.Parallel() msg := NewType[int]() msg.Pull_tag_async_only(`1`, func(i int) (disable bool) { if i > 4 { diff --git a/sync/RWMutex.go b/sync/RWMutex.go index 2794493..6aecf50 100644 --- a/sync/RWMutex.go +++ b/sync/RWMutex.go @@ -3,10 +3,12 @@ package part import ( "errors" "fmt" + "go/build" "runtime" "strings" "sync" "time" + "weak" ) // const ( @@ -16,12 +18,17 @@ import ( // ) var ( - ErrTimeoutToLock = errors.New("ErrTimeoutToLock") - ErrTimeoutToULock = errors.New("ErrTimeoutToULock") + ErrTimeoutToLock = errors.New("ErrTimeoutToLock") + ErrTimeoutToULock = errors.New("ErrTimeoutToULock") + ErrTimeoutToRLock = errors.New("ErrTimeoutToRLock") + ErrTimeoutToURLock = errors.New("ErrTimeoutToURLock") ) type RWMutex struct { rlc sync.RWMutex + rlccl sync.Mutex + rlcc []weak.Pointer[string] + to []time.Duration PanicFunc func(any) } @@ -75,28 +82,55 @@ func (m *RWMutex) panicFunc(s any) { // call inTimeCall() in time or panic(callTree) func (m *RWMutex) tof(to time.Duration, e error) (inTimeCall func() (called bool)) { - callTree := getCall(2) + callTree := getCall(1) return time.AfterFunc(to, func() { - m.panicFunc(errors.Join(e, errors.New(callTree))) + runtime.GC() + e = errors.Join(e, errors.New(*callTree)) + m.rlccl.Lock() + for i := 0; i < len(m.rlcc); i++ { + if s := m.rlcc[i].Value(); s != nil { + e = errors.Join(e, errors.New("\nlocking:"+*s)) + } + } + m.rlccl.Unlock() + m.panicFunc(e) }).Stop } +func (m *RWMutex) RecLock(max int, to ...time.Duration) { + defer m.Lock()() + m.rlcc = make([]weak.Pointer[string], max) + m.to = to +} + +func (m *RWMutex) addcl(s *string) *string { + m.rlccl.Lock() + m.rlcc[copy(m.rlcc, m.rlcc[1:])] = weak.Make(s) + m.rlccl.Unlock() + return s +} + // to[0]: wait lock timeout // // to[1]: wait ulock timeout // // 不要在Rlock内设置变量,有DATA RACE风险 func (m *RWMutex) RLock(to ...time.Duration) (unlockf func(ulockfs ...func(ulocked bool) (doUlock bool))) { + to = append(to, m.to...) if len(to) > 0 { - defer m.tof(to[0], ErrTimeoutToLock)() + defer m.tof(to[0], ErrTimeoutToRLock)() } m.rlc.RLock() + var ct *string + if m.rlcc != nil { + ct = m.addcl(getCall(0)) + } return func(ulockfs ...func(ulocked bool) (doUlock bool)) { inTimeCall := func() (called bool) { return true } if len(to) > 1 { - inTimeCall = m.tof(to[1], ErrTimeoutToULock) + inTimeCall = m.tof(to[1], ErrTimeoutToURLock) } ul := false for i := 0; i < len(ulockfs); i++ { @@ -110,6 +144,7 @@ func (m *RWMutex) RLock(to ...time.Duration) (unlockf func(ulockfs ...func(ulock m.rlc.RUnlock() inTimeCall() } + _ = ct } } @@ -117,11 +152,16 @@ func (m *RWMutex) RLock(to ...time.Duration) (unlockf func(ulockfs ...func(ulock // // to[1]: wait ulock timeout func (m *RWMutex) Lock(to ...time.Duration) (unlockf func(ulockfs ...func(ulocked bool) (doUlock bool))) { + to = append(to, m.to...) if len(to) > 0 { defer m.tof(to[0], ErrTimeoutToLock)() } m.rlc.Lock() + var ct *string + if m.rlcc != nil { + ct = m.addcl(getCall(0)) + } return func(ulockfs ...func(ulocked bool) (doUlock bool)) { inTimeCall := func() (called bool) { return true } @@ -140,16 +180,18 @@ func (m *RWMutex) Lock(to ...time.Duration) (unlockf func(ulockfs ...func(ulocke m.rlc.Unlock() inTimeCall() } + _ = ct } } -func getCall(i int) (calls string) { +func getCall(i int) (calls *string) { + var cs string for i += 1; true; i++ { - if pc, file, line, ok := runtime.Caller(i); !ok || strings.HasPrefix(file, runtime.GOROOT()) { + if pc, file, line, ok := runtime.Caller(i); !ok || strings.HasPrefix(file, build.Default.GOROOT) { break } else { - calls += fmt.Sprintf("\ncall by %s\n\t%s:%d", runtime.FuncForPC(pc).Name(), file, line) + cs += fmt.Sprintf("\ncall by %s\n\t%s:%d", runtime.FuncForPC(pc).Name(), file, line) } } - return + return &cs } diff --git a/sync/RWMutex_test.go b/sync/RWMutex_test.go index fb437d9..f2a1dce 100644 --- a/sync/RWMutex_test.go +++ b/sync/RWMutex_test.go @@ -27,6 +27,7 @@ func Test0(t *testing.T) { // ulock rlock rlock func Test1(t *testing.T) { + t.Parallel() var l RWMutex //check(&l, 0) ul := l.RLock() @@ -40,6 +41,7 @@ func Test1(t *testing.T) { } func Test4(t *testing.T) { + t.Parallel() var l RWMutex ul := l.RLock() ul(func(ulocked bool) (doUlock bool) { @@ -50,8 +52,9 @@ func Test4(t *testing.T) { } func Test5(t *testing.T) { + t.Parallel() var l = RWMutex{PanicFunc: func(a any) { - if !errors.Is(a.(error), ErrTimeoutToULock) { + if !errors.Is(a.(error), ErrTimeoutToURLock) { t.Fatal(a) } }} @@ -63,8 +66,9 @@ func Test5(t *testing.T) { } func Test8(t *testing.T) { + t.Parallel() var l = RWMutex{PanicFunc: func(a any) { - if !errors.Is(a.(error), ErrTimeoutToLock) { + if !errors.Is(a.(error), ErrTimeoutToRLock) { panic(a) } }} @@ -76,6 +80,7 @@ func Test8(t *testing.T) { // ulock rlock lock func Test2(t *testing.T) { + t.Parallel() var l RWMutex ul := l.RLock() //check(&l, 2) @@ -95,6 +100,7 @@ func Test2(t *testing.T) { // ulock lock rlock func Test3(t *testing.T) { + t.Parallel() var l RWMutex ul := l.Lock() //check(&l, -1) @@ -113,6 +119,7 @@ func Test3(t *testing.T) { } func Test6(t *testing.T) { + t.Parallel() var c = make(chan int, 3) var l RWMutex ul := l.RLock() @@ -139,6 +146,7 @@ func Test6(t *testing.T) { } func Test7(t *testing.T) { + t.Parallel() var c = make(chan int, 2) var l RWMutex ul := l.RLock() @@ -160,6 +168,7 @@ func Test7(t *testing.T) { } func Test9(t *testing.T) { + t.Parallel() n := time.Now() var l RWMutex for i := 0; i < 1000; i++ { @@ -169,16 +178,47 @@ func Test9(t *testing.T) { } func Test10(t *testing.T) { + t.Parallel() n := time.Now() var l sync.RWMutex - for i := 0; i < 300; i++ { + for i := 0; i < 30000; i++ { l.RLock() go l.RUnlock() } t.Log(time.Since(n)) + + n = time.Now() + var l2 RWMutex + for i := 0; i < 30000; i++ { + go l2.RLock()() + } + t.Log(time.Since(n)) +} + +func Test11(t *testing.T) { + t.Parallel() + var l RWMutex + + l.RLock()(func(ulocked bool) (doUlock bool) { + return true + }, func(ulocked bool) (doUlock bool) { + if ulocked { + defer l.Lock()() + } + return + }) + l.RLock()(func(ulocked bool) (doUlock bool) { + return false + }, func(ulocked bool) (doUlock bool) { + if ulocked { + defer l.Lock()() + } + return + }) } func Panic_Test8(t *testing.T) { + t.Parallel() var l RWMutex ul := l.Lock(time.Second, time.Second) ul(func(ulocked bool) (doUlock bool) { @@ -188,24 +228,33 @@ func Panic_Test8(t *testing.T) { } // ulock rlock rlock -func Panic_Test4(t *testing.T) { +func Test4Panic_(t *testing.T) { + t.Parallel() var l RWMutex + l.RecLock(10, time.Second, time.Second) //check(&l, 0) ul := l.RLock(time.Second, time.Second) //check(&l, 1) ul1 := l.RLock(time.Second, time.Second) + //check(&l, 1) + ul2 := l.RLock(time.Second, time.Second) //check(&l, 2) time.Sleep(time.Millisecond * 1500) ul() //check(&l, 1) ul1() + ul2() //check(&l, 0) time.Sleep(time.Second * 3) } // ulock rlock lock -func Panic_Test5(t *testing.T) { +func Test5Panic_(t *testing.T) { + t.Parallel() var l RWMutex + l.RecLock(10, time.Second, time.Second) + + l.RLock()() ul := l.RLock() //check(&l, 1) time.AfterFunc(time.Millisecond*1500, func() { @@ -213,7 +262,7 @@ func Panic_Test5(t *testing.T) { ul() }) c := time.Now() - ul1 := l.Lock(time.Second) + ul1 := l.Lock(time.Second * 2) //check(&l, 0) if time.Since(c) < time.Second { t.Fail() @@ -247,6 +296,7 @@ PASS */ func BenchmarkRlock(b *testing.B) { var lock1 RWMutex + lock1.RecLock(5, time.Second, time.Second) for i := 0; i < b.N; i++ { lock1.RLock()() } @@ -268,6 +318,7 @@ func BenchmarkRlock1(b *testing.B) { var lock1 sync.RWMutex for i := 0; i < b.N; i++ { lock1.RLock() + _ = 1 lock1.RUnlock() } } -- 2.39.2