]> 127.0.0.1 Git - part/.git/commitdiff
1 (#25) v0.28.20250215172420
authorqydysky <qydysky@foxmail.com>
Sat, 15 Feb 2025 17:24:12 +0000 (01:24 +0800)
committerGitHub <noreply@github.com>
Sat, 15 Feb 2025 17:24:12 +0000 (01:24 +0800)
* 1

* 1

* 1

* 1

.github/workflows/test1.yml
go.mod
msgq/Msgq.go
msgq/Msgq_test.go
sync/RWMutex.go
sync/RWMutex_test.go

index d5401a4433f40632d5bea1e84b96b9aba1a24c48..e68d54901f5c7169e025307e9c0dddc54f7404c4 100644 (file)
@@ -13,7 +13,7 @@ jobs:
     - name: Set up Go 1.x
       uses: actions/setup-go@v5
       with:
-        go-version: '1.23'
+        go-version: '1.24'
 
     - name: Check out code into the Go module directory
       uses: actions/checkout@v4
@@ -52,7 +52,7 @@ jobs:
     - name: Set up Go 1.x
       uses: actions/setup-go@v5
       with:
-        go-version: '1.23'
+        go-version: '1.24'
 
     - name: Check out code into the Go module directory
       uses: actions/checkout@v4
@@ -91,7 +91,7 @@ jobs:
     - name: Set up Go 1.x
       uses: actions/setup-go@v5
       with:
-        go-version: '1.23'
+        go-version: '1.24'
 
     - name: Check out code into the Go module directory
       uses: actions/checkout@v4
diff --git a/go.mod b/go.mod
index ed461ccb5420b39547e34ed2b26319d0079735f6..80c67f6e5194c07a876d42057f1fb0f8d75ca173 100644 (file)
--- a/go.mod
+++ b/go.mod
@@ -1,6 +1,6 @@
 module github.com/qydysky/part
 
-go 1.23
+go 1.24
 
 require (
        github.com/gorilla/websocket v1.5.3
index d3c8e769be4122cf25e79102d1bcacbe6464ca4c..4423b0fe233155b4e42e8ff47dc76a7024bdc2c5 100644 (file)
@@ -3,25 +3,31 @@ package part
 import (
        "container/list"
        "context"
+       "errors"
        "fmt"
+       lmt "fmt"
+       "go/build"
+       "runtime"
+       "strings"
        "sync/atomic"
        "time"
-       "unsafe"
 
        psync "github.com/qydysky/part/sync"
 )
 
+var ErrRunTO = errors.New(`ErrRunTO`)
+
 type Msgq struct {
        to    []time.Duration
        funcs *list.List
 
        someNeedRemove atomic.Bool
+       allNeedRemove  atomic.Bool
        lock           psync.RWMutex
-       runTag         psync.Map
+       PanicFunc      func(any)
 }
 
 type msgqItem struct {
-       running atomic.Int32
        disable atomic.Bool
        f       *func(any) (disable bool)
 }
@@ -34,129 +40,120 @@ func New(to ...time.Duration) *Msgq {
        m := new(Msgq)
        m.funcs = list.New()
        m.to = to
+       if len(m.to) > 0 {
+               m.lock.RecLock(10, to[0], to[0])
+       }
        return m
 }
 
-func (m *Msgq) register(mp *msgqItem) (cancel func()) {
-       ul := m.lock.Lock()
-       m.funcs.PushBack(mp)
-       ul()
-       return func() {
-               mp.disable.Store(true)
-               m.someNeedRemove.Store(true)
-       }
+func (m *Msgq) TOPanicFunc(f func(any)) {
+       m.PanicFunc = f
+       m.lock.PanicFunc = f
 }
 
-func (m *Msgq) Register(f func(any) (disable bool)) (cancel func()) {
-       mp := &msgqItem{
-               f: &f,
+func (m *Msgq) register(mp *msgqItem, f func(v any) *list.Element) (cancel func()) {
+       defer m.lock.Lock()()
+
+       if m.allNeedRemove.Load() {
+               m.removeDisable(m.someNeedRemove.CompareAndSwap(false, true), true)
        }
-       m.register(mp)
+
+       f(mp)
        return func() {
                mp.disable.Store(true)
-               m.someNeedRemove.Store(true)
+               m.removeDisable(m.someNeedRemove.CompareAndSwap(false, true), false)
        }
 }
 
-func (m *Msgq) register_front(mp *msgqItem) (cancel func()) {
-       ul := m.lock.Lock()
-       m.funcs.PushFront(mp)
-       ul()
-       return func() {
-               mp.disable.Store(true)
-               m.someNeedRemove.Store(true)
-       }
+func (m *Msgq) Register(f func(any) (disable bool)) (cancel func()) {
+       return m.register(&msgqItem{
+               f: &f,
+       }, m.funcs.PushBack)
 }
 
-func (m *Msgq) Register_front(f func(any) (disable bool)) (cancel func()) {
-       mp := &msgqItem{
+func (m *Msgq) RegisterFront(f func(any) (disable bool)) (cancel func()) {
+       return m.register(&msgqItem{
                f: &f,
-       }
-       m.register_front(mp)
-       return func() {
-               mp.disable.Store(true)
-               m.someNeedRemove.Store(true)
-       }
+       }, m.funcs.PushFront)
 }
 
-func (m *Msgq) Push(msg any) {
-       ul := m.lock.RLock(m.to...)
-       defer ul(m.removeDisable(true)...)
+func (m *Msgq) push(msg any, isLock bool) {
+       if isLock {
+               defer m.lock.Lock()()
+       } else {
+               defer m.lock.RLock()()
+       }
 
-       for el := m.funcs.Front(); el != nil; el = el.Next() {
-               mi := el.Value.(*msgqItem)
-               if mi.disable.Load() {
-                       continue
+       if m.allNeedRemove.Load() {
+               m.removeDisable(true, isLock)
+               if !isLock {
+                       return
                }
-               mi.running.Add(1)
-               if disable := (*mi.f)(msg); disable {
+       }
+
+       for el := m.funcs.Front(); el != nil; el = el.Next() {
+               if mi := el.Value.(*msgqItem); !mi.disable.Load() && (*mi.f)(msg) {
                        mi.disable.Store(true)
-                       m.someNeedRemove.Store(true)
+                       m.removeDisable(m.someNeedRemove.CompareAndSwap(false, true), isLock)
                }
-               mi.running.Add(-1)
        }
 }
 
+// 不能在由PushLock*调用的Pull中以同步方式使用
+func (m *Msgq) Push(msg any) {
+       m.push(msg, false)
+}
+
+// 不能在由Push*调用的Pull中以同步方式使用
 func (m *Msgq) PushLock(msg any) {
-       ul := m.lock.Lock(m.to...)
-       defer ul(m.removeDisable(false)...)
+       m.push(msg, true)
+}
 
-       for el := m.funcs.Front(); el != nil; el = el.Next() {
-               mi := el.Value.(*msgqItem)
-               if mi.disable.Load() {
-                       continue
+func (m *Msgq) ClearAll() {
+       m.allNeedRemove.Store(true)
+}
+
+func (m *Msgq) removeDisable(sig bool, isLock bool) {
+       if sig {
+               f := func() {
+                       if !isLock {
+                               defer m.lock.Lock()()
+                       }
+                       all := m.allNeedRemove.Swap(false)
+                       for el := m.funcs.Front(); el != nil; el = el.Next() {
+                               mi := el.Value.(*msgqItem)
+                               if all || mi.disable.Load() {
+                                       m.funcs.Remove(el)
+                               }
+                       }
+                       m.someNeedRemove.Store(false)
                }
-               mi.running.Add(1)
-               if disable := (*mi.f)(msg); disable {
-                       mi.disable.Store(true)
-                       m.someNeedRemove.Store(true)
+               if isLock {
+                       f()
+               } else {
+                       go f()
                }
-               mi.running.Add(-1)
        }
 }
 
-func (m *Msgq) ClearAll() {
-       for el := m.funcs.Front(); el != nil; el = el.Next() {
-               mi := el.Value.(*msgqItem)
-               mi.disable.Store(true)
-               m.someNeedRemove.Store(true)
+func (m *Msgq) panicFunc(s any) {
+       if m.PanicFunc != nil {
+               m.PanicFunc(s)
+       } else {
+               panic(s)
        }
 }
 
-func (m *Msgq) removeDisable(rlock bool) (ls []func(ulocked bool) (doUlock bool)) {
-       if rlock {
-               return []func(ulocked bool) (doUlock bool){
-                       func(ulocked bool) (doUlock bool) {
-                               return m.someNeedRemove.CompareAndSwap(true, false)
-                       },
-                       func(ulocked bool) (doUlock bool) {
-                               if ulocked {
-                                       defer m.lock.Lock()()
-                                       for el := m.funcs.Front(); el != nil; el = el.Next() {
-                                               mi := el.Value.(*msgqItem)
-                                               if mi.disable.Load() && mi.running.Load() == 0 {
-                                                       m.funcs.Remove(el)
-                                               }
-                                       }
-                               }
-                               return
-                       },
-               }
-       } else {
-               return []func(ulocked bool) (doUlock bool){
-                       func(ulocked bool) (doUlock bool) {
-                               if m.someNeedRemove.CompareAndSwap(true, false) {
-                                       for el := m.funcs.Front(); el != nil; el = el.Next() {
-                                               mi := el.Value.(*msgqItem)
-                                               if mi.disable.Load() && mi.running.Load() == 0 {
-                                                       m.funcs.Remove(el)
-                                               }
-                                       }
-                               }
-                               return
-                       },
+func (m *Msgq) PushingTO(info string, callTree *string) (fin func()) {
+       if len(m.to) > 1 {
+               to := time.AfterFunc(m.to[1], func() {
+                       m.panicFunc(errors.Join(ErrRunTO, lmt.Errorf("%v:%v", info, *callTree)))
+               })
+               return func() {
+                       to.Stop()
                }
        }
+       return func() {}
 }
 
 type Msgq_tag_data struct {
@@ -164,106 +161,27 @@ type Msgq_tag_data struct {
        Data any
 }
 
-// 不能放置在由PushLock_tag调用的同步Pull中
+// 不能在由PushLock*调用的Pull中以同步方式使用
 func (m *Msgq) Push_tag(Tag string, Data any) {
-       if len(m.to) > 0 {
-               ptr := uintptr(unsafe.Pointer(&Data))
-               m.runTag.Store(ptr, "Push_tag(`"+Tag+"`,...)")
-               defer func() {
-                       if e := recover(); e != nil {
-                               m.runTag.Range(func(key, value any) bool {
-                                       if key == ptr {
-                                               fmt.Printf("%v panic > %v\n", value, e)
-                                       } else {
-                                               fmt.Printf("%v running\n", value)
-                                       }
-                                       return true
-                               })
-                               m.runTag.ClearAll()
-                               panic(e)
-                       }
-                       m.runTag.Delete(ptr)
-               }()
-       }
-       {
-               /*
-                       m.m.Push(Msgq_tag_data{
-                               Tag:  Tag,
-                               Data: Data,
-                       })
-               */
-               defer m.lock.RLock(m.to...)(m.removeDisable(true)...)
-
-               for el := m.funcs.Front(); el != nil; el = el.Next() {
-                       mi := el.Value.(*msgqItem)
-                       if mi.disable.Load() {
-                               continue
-                       }
-                       mi.running.Add(1)
-                       if disable := (*mi.f)(&Msgq_tag_data{
-                               Tag:  Tag,
-                               Data: Data,
-                       }); disable {
-                               mi.disable.Store(true)
-                               m.someNeedRemove.Store(true)
-                       }
-                       mi.running.Add(-1)
-               }
-       }
+       defer m.PushingTO(lmt.Sprintf("\nPush_tag(`%v`)", Tag), getCall(1))()
+       m.Push(&Msgq_tag_data{
+               Tag:  Tag,
+               Data: Data,
+       })
 }
 
-// 不能放置在由Push_tag、PushLock_tag调用的同步Pull中
+// 不能在由Push*调用的Pull中以同步方式使用
 func (m *Msgq) PushLock_tag(Tag string, Data any) {
-       if len(m.to) > 0 {
-               ptr := uintptr(unsafe.Pointer(&Data))
-               m.runTag.Store(ptr, "PushLock_tag(`"+Tag+"`,...)")
-               defer func() {
-                       if e := recover(); e != nil {
-                               m.runTag.Range(func(key, value any) bool {
-                                       if key == ptr {
-                                               fmt.Printf("%v panic > %v\n", value, e)
-                                       } else {
-                                               fmt.Printf("%v running\n", value)
-                                       }
-                                       return true
-                               })
-                               m.runTag.ClearAll()
-                               panic(e)
-                       }
-                       m.runTag.Delete(ptr)
-               }()
-       }
-       {
-               /*
-                       m.m.PushLock(Msgq_tag_data{
-                               Tag:  Tag,
-                               Data: Data,
-                       })
-               */
-               ul := m.lock.Lock(m.to...)
-               defer ul(m.removeDisable(false)...)
-
-               for el := m.funcs.Front(); el != nil; el = el.Next() {
-                       mi := el.Value.(*msgqItem)
-                       if mi.disable.Load() {
-                               continue
-                       }
-                       mi.running.Add(1)
-                       if disable := (*mi.f)(&Msgq_tag_data{
-                               Tag:  Tag,
-                               Data: Data,
-                       }); disable {
-                               mi.disable.Store(true)
-                               m.someNeedRemove.Store(true)
-                       }
-                       mi.running.Add(-1)
-               }
-       }
+       defer m.PushingTO(lmt.Sprintf("\nPushLock_tag(`%v`)", Tag), getCall(1))()
+       m.PushLock(&Msgq_tag_data{
+               Tag:  Tag,
+               Data: Data,
+       })
 }
 
 func (m *Msgq) Pull_tag_chan(key string, size int, ctx context.Context) (cancel func(), ch <-chan any) {
        var c = make(chan any, size)
-       var f1 = func(data any) bool {
+       return m.Register(func(data any) bool {
                if d, ok := data.(*Msgq_tag_data); ok && d.Tag == key {
                        select {
                        case <-ctx.Done():
@@ -282,74 +200,53 @@ func (m *Msgq) Pull_tag_chan(key string, size int, ctx context.Context) (cancel
                        }
                }
                return false
-       }
-       cancel = m.register_front(&msgqItem{
-               f: &f1,
-       })
-       ch = c
-       return
+       }), c
 }
 
 func (m *Msgq) Pull_tag_only(key string, f func(any) (disable bool)) (cancel func()) {
-       var f1 = func(data any) (disable bool) {
+       return m.Register(func(data any) (disable bool) {
                if d, ok := data.(*Msgq_tag_data); ok && d.Tag == key {
                        return f(d.Data)
                }
                return false
-       }
-       return m.register_front(&msgqItem{
-               f: &f1,
        })
 }
 
 func (m *Msgq) Pull_tag(func_map map[string]func(any) (disable bool)) (cancel func()) {
-       var f1 = func(data any) (disable bool) {
+       return m.Register(func(data any) (disable bool) {
                if d, ok := data.(*Msgq_tag_data); ok {
                        if f, ok := func_map[d.Tag]; ok {
                                return f(d.Data)
                        }
                }
                return false
-       }
-       return m.register_front(&msgqItem{
-               f: &f1,
        })
 }
 
 func (m *Msgq) Pull_tag_async_only(key string, f func(any) (disable bool)) (cancel func()) {
-       var mi = msgqItem{}
-       var f1 = func(data any) bool {
-               if d, ok := data.(*Msgq_tag_data); ok {
+       var disable bool
+       return m.RegisterFront(func(data any) bool {
+               if d, ok := data.(*Msgq_tag_data); !disable && ok && d.Tag == key {
                        go func() {
-                               if f(d.Data) {
-                                       mi.disable.Store(true)
-                                       m.someNeedRemove.Store(true)
-                               }
+                               disable = f(d.Data)
                        }()
                }
-               return false
-       }
-       mi.f = &f1
-       return m.register_front(&mi)
+               return disable
+       })
 }
 
 func (m *Msgq) Pull_tag_async(func_map map[string]func(any) (disable bool)) (cancel func()) {
-       var mi = msgqItem{}
-       var f = func(data any) bool {
+       var disable bool
+       return m.RegisterFront(func(data any) bool {
                if d, ok := data.(*Msgq_tag_data); ok {
                        if f, ok := func_map[d.Tag]; ok {
                                go func() {
-                                       if f(d.Data) {
-                                               mi.disable.Store(true)
-                                               m.someNeedRemove.Store(true)
-                                       }
+                                       disable = f(d.Data)
                                }()
                        }
                }
-               return false
-       }
-       mi.f = &f
-       return m.register_front(&mi)
+               return disable
+       })
 }
 
 type MsgType[T any] struct {
@@ -370,107 +267,25 @@ func (m *MsgType[T]) ClearAll() {
        m.m.ClearAll()
 }
 
-// 不能放置在由PushLock_tag调用的同步Pull中
+// 不能在由PushLock*调用的Pull中以同步方式使用
 func (m *MsgType[T]) Push_tag(Tag string, Data T) {
-       if len(m.m.to) > 0 {
-               ptr := uintptr(unsafe.Pointer(&Data))
-               m.m.runTag.Store(ptr, "Push_tag(`"+Tag+"`,...)")
-               defer func() {
-                       if e := recover(); e != nil {
-                               m.m.runTag.Range(func(key, value any) bool {
-                                       if key == ptr {
-                                               fmt.Printf("%v panic > %v\n", value, e)
-                                       } else {
-                                               fmt.Printf("%v running\n", value)
-                                       }
-                                       return true
-                               })
-                               m.m.runTag.ClearAll()
-                               panic(e)
-                       }
-                       m.m.runTag.Delete(ptr)
-               }()
-       }
-       {
-               /*
-                       m.m.Push(Msgq_tag_data{
-                               Tag:  Tag,
-                               Data: Data,
-                       })
-               */
-               ul := m.m.lock.RLock(m.m.to...)
-               defer ul(m.m.removeDisable(true)...)
-
-               for el := m.m.funcs.Front(); el != nil; el = el.Next() {
-                       mi := el.Value.(*msgqItem)
-                       if mi.disable.Load() {
-                               continue
-                       }
-                       mi.running.Add(1)
-                       if disable := (*mi.f)(&MsgType_tag_data[T]{
-                               Tag:  Tag,
-                               Data: &Data,
-                       }); disable {
-                               mi.disable.Store(true)
-                               m.m.someNeedRemove.Store(true)
-                       }
-                       mi.running.Add(-1)
-               }
-       }
+       m.m.Push(&MsgType_tag_data[T]{
+               Tag:  Tag,
+               Data: &Data,
+       })
 }
 
-// 不能放置在由Push_tag、PushLock_tag调用的同步Pull中
+// 不能在由Push*调用的Pull中以同步方式使用
 func (m *MsgType[T]) PushLock_tag(Tag string, Data T) {
-       if len(m.m.to) > 0 {
-               ptr := uintptr(unsafe.Pointer(&Data))
-               m.m.runTag.Store(ptr, "PushLock_tag(`"+Tag+"`,...)")
-               defer func() {
-                       if e := recover(); e != nil {
-                               m.m.runTag.Range(func(key, value any) bool {
-                                       if key == ptr {
-                                               fmt.Printf("%v panic > %v\n", value, e)
-                                       } else {
-                                               fmt.Printf("%v running\n", value)
-                                       }
-                                       return true
-                               })
-                               m.m.runTag.ClearAll()
-                               panic(e)
-                       }
-                       m.m.runTag.Delete(ptr)
-               }()
-       }
-       {
-               /*
-                       m.m.PushLock(Msgq_tag_data{
-                               Tag:  Tag,
-                               Data: Data,
-                       })
-               */
-               ul := m.m.lock.Lock(m.m.to...)
-               defer ul(m.m.removeDisable(false)...)
-
-               for el := m.m.funcs.Front(); el != nil; el = el.Next() {
-                       mi := el.Value.(*msgqItem)
-                       if mi.disable.Load() {
-                               continue
-                       }
-                       mi.running.Add(1)
-                       if disable := (*mi.f)(&MsgType_tag_data[T]{
-                               Tag:  Tag,
-                               Data: &Data,
-                       }); disable {
-                               mi.disable.Store(true)
-                               m.m.someNeedRemove.Store(true)
-                       }
-                       mi.running.Add(-1)
-               }
-       }
+       m.m.PushLock(&MsgType_tag_data[T]{
+               Tag:  Tag,
+               Data: &Data,
+       })
 }
 
 func (m *MsgType[T]) Pull_tag_chan(key string, size int, ctx context.Context) (cancel func(), ch <-chan T) {
        var c = make(chan T, size)
-       var f = func(data any) bool {
+       return m.m.Register(func(data any) bool {
                if data1, ok := data.(*MsgType_tag_data[T]); ok {
                        if data1.Tag == key {
                                select {
@@ -491,74 +306,70 @@ func (m *MsgType[T]) Pull_tag_chan(key string, size int, ctx context.Context) (c
                        }
                }
                return false
-       }
-       cancel = m.m.register(&msgqItem{
-               f: &f,
-       })
-       ch = c
-       return
+       }), c
 }
 
 func (m *MsgType[T]) Pull_tag_only(key string, f func(T) (disable bool)) (cancel func()) {
-       var f1 = func(data any) (disable bool) {
-               if data1, ok := data.(*MsgType_tag_data[T]); ok {
-                       if data1.Tag == key {
-                               return f(*data1.Data)
-                       }
+       return m.m.Register(func(data any) (disable bool) {
+               if data1, ok := data.(*MsgType_tag_data[T]); ok && data1.Tag == key {
+                       return f(*data1.Data)
                }
                return false
-       }
-       return m.m.register(&msgqItem{
-               f: &f1,
        })
 }
 
 func (m *MsgType[T]) Pull_tag(func_map map[string]func(T) (disable bool)) (cancel func()) {
-       var f = func(data any) (disable bool) {
+       return m.m.Register(func(data any) (disable bool) {
                if data1, ok := data.(*MsgType_tag_data[T]); ok {
                        if f, ok := func_map[data1.Tag]; ok {
                                return f(*data1.Data)
                        }
                }
                return false
-       }
-       return m.m.register(&msgqItem{
-               f: &f,
        })
 }
 
 func (m *MsgType[T]) Pull_tag_async_only(key string, f func(T) (disable bool)) (cancel func()) {
-       var mi = msgqItem{}
-       var f1 = func(data any) bool {
-               if d, ok := data.(*MsgType_tag_data[T]); ok {
+       var disabled atomic.Bool
+       return m.m.RegisterFront(func(data any) bool {
+               if d, ok := data.(*MsgType_tag_data[T]); !disabled.Load() && ok && d.Tag == key {
                        go func() {
-                               if f(*d.Data) {
-                                       mi.disable.Store(true)
-                                       m.m.someNeedRemove.Store(true)
+                               if !disabled.Load() {
+                                       disabled.Store(f(*d.Data))
                                }
                        }()
                }
-               return false
-       }
-       mi.f = &f1
-       return m.m.register_front(&mi)
+               return disabled.Load()
+       })
 }
 
 func (m *MsgType[T]) Pull_tag_async(func_map map[string]func(T) (disable bool)) (cancel func()) {
-       var mi = msgqItem{}
-       var f = func(data any) bool {
-               if d, ok := data.(*MsgType_tag_data[T]); ok {
+       var disabled atomic.Bool
+       return m.m.RegisterFront(func(data any) bool {
+               if d, ok := data.(*MsgType_tag_data[T]); !disabled.Load() && ok {
                        if f, ok := func_map[d.Tag]; ok {
                                go func() {
-                                       if f(*d.Data) {
-                                               mi.disable.Store(true)
-                                               m.m.someNeedRemove.Store(true)
+                                       if !disabled.Load() {
+                                               disabled.Store(f(*d.Data))
                                        }
                                }()
                        }
                }
-               return false
+               return disabled.Load()
+       })
+}
+
+func getCall(i int) (calls *string) {
+       var cs string
+       for i += 1; true; i++ {
+               if pc, file, line, ok := runtime.Caller(i); !ok || strings.HasPrefix(file, build.Default.GOROOT) {
+                       break
+               } else {
+                       cs += fmt.Sprintf("\ncall by %s\n\t%s:%d", runtime.FuncForPC(pc).Name(), file, line)
+               }
+       }
+       if cs == "" {
+               cs += fmt.Sprintln("\ncall by goroutine")
        }
-       mi.f = &f
-       return m.m.register_front(&mi)
+       return &cs
 }
index 3d5102c65d0fefcd9ffa2685eca9bcb7ab843323..5883c9ea1fb4c10779f34766449e61e64a52954b 100644 (file)
@@ -2,10 +2,14 @@ package part
 
 import (
        "context"
+       "errors"
+       "fmt"
        _ "net/http/pprof"
        "sync"
        "testing"
        "time"
+
+       psync "github.com/qydysky/part/sync"
 )
 
 // type test_item struct {
@@ -139,20 +143,41 @@ func BenchmarkXxx(b *testing.B) {
        }
 }
 
-// func TestPushLock(t *testing.T) {
-//     defer func() {
-//             if e := recover(); e != nil {
-//                     t.Fatal(e)
-//             }
-//     }()
-//     mq := NewTo(time.Second)
-//     mq.Pull_tag_only(`test`, func(a any) (disable bool) {
-//             mq.PushLock_tag(`lock`, nil)
-//             return false
-//     })
-//     mq.Push_tag(`test`, nil)
-//     t.Fatal()
-// }
+func TestTORun(t *testing.T) {
+       t.Parallel()
+       panicC := make(chan any, 10)
+       mq := New(time.Second, time.Second)
+       mq.TOPanicFunc(func(a any) {
+               panicC <- a
+       })
+       mq.Pull_tag_only(`test`, func(a any) (disable bool) {
+               time.Sleep(time.Second * 10)
+               return false
+       })
+       go mq.Push_tag(`test`, nil)
+       e := <-panicC
+       if !errors.Is(e.(error), ErrRunTO) {
+               t.Fatal(e)
+       }
+}
+
+func TestPushLock(t *testing.T) {
+       t.Parallel()
+       panicC := make(chan any, 10)
+       mq := New(time.Second, time.Second*2)
+       mq.TOPanicFunc(func(a any) {
+               panicC <- a
+       })
+       mq.Pull_tag_only(`test`, func(a any) (disable bool) {
+               mq.PushLock_tag(`lock`, nil)
+               return false
+       })
+       go mq.Push_tag(`test`, nil)
+       e := <-panicC
+       if !errors.Is(e.(error), psync.ErrTimeoutToLock) {
+               t.Fatal(e)
+       }
+}
 
 func Benchmark_1(b *testing.B) {
        mq := New()
@@ -166,6 +191,7 @@ func Benchmark_1(b *testing.B) {
 }
 
 func Test_4(t *testing.T) {
+       t.Parallel()
        mq := New()
        cancel := mq.Pull_tag(FuncMap{
                `del`: func(a any) (disable bool) {
@@ -178,6 +204,7 @@ func Test_4(t *testing.T) {
 }
 
 func Test_2(t *testing.T) {
+       t.Parallel()
        mq := New()
        cancel := mq.Pull_tag(FuncMap{
                `del`: func(a any) (disable bool) {
@@ -190,7 +217,27 @@ func Test_2(t *testing.T) {
        mq.PushLock_tag(`del`, nil)
 }
 
+func Test_5(t *testing.T) {
+       t.Parallel()
+       mq := New(time.Second, time.Second*2)
+       mq.Pull_tag(FuncMap{
+               `del`: func(a any) (disable bool) {
+                       t.Log(1)
+                       mq.Push_tag(`del1`, nil)
+                       t.Log(3)
+                       return false
+               },
+               `del1`: func(a any) (disable bool) {
+                       t.Log(2)
+                       return true
+               },
+       })
+       mq.Push_tag(`del`, 1)
+       mq.Push_tag(`del1`, 1)
+}
+
 func Test_1(t *testing.T) {
+       t.Parallel()
        mq := New(time.Millisecond*5, time.Millisecond*10)
        go mq.Push_tag(`del`, nil)
        mq.Pull_tag(FuncMap{
@@ -206,11 +253,12 @@ func Test_1(t *testing.T) {
 }
 
 func Test_RemoveInPush(t *testing.T) {
+       t.Parallel()
        mq := New(time.Second, time.Second*3)
        mq.Pull_tag(FuncMap{
                `r1`: func(a any) (disable bool) {
                        mq.ClearAll()
-                       return false
+                       return true
                },
                `r2`: func(a any) (disable bool) {
                        return true
@@ -223,6 +271,7 @@ func Test_RemoveInPush(t *testing.T) {
 }
 
 func Test_3(t *testing.T) {
+       t.Parallel()
        mq := New(time.Millisecond*5, time.Millisecond*10)
        go mq.Push_tag(`sss`, nil)
        mq.Pull_tag(FuncMap{
@@ -234,6 +283,7 @@ func Test_3(t *testing.T) {
 }
 
 func Test_Pull_tag_chan(t *testing.T) {
+       t.Parallel()
        mq := New()
        ctx, cf := context.WithCancel(context.Background())
        _, ch := mq.Pull_tag_chan(`a`, 2, ctx)
@@ -273,6 +323,7 @@ func Test_Pull_tag_chan(t *testing.T) {
 }
 
 func Test_Pull_tag_chan2(t *testing.T) {
+       t.Parallel()
        mq := New()
 
        mq.Pull_tag_chan(`a`, 1, context.Background())
@@ -283,6 +334,7 @@ func Test_Pull_tag_chan2(t *testing.T) {
 }
 
 func Test_msgq1(t *testing.T) {
+       t.Parallel()
        mq := New()
        c := make(chan time.Time, 10)
        mq.Pull_tag(map[string]func(any) bool{
@@ -339,6 +391,7 @@ func Test_msgq1(t *testing.T) {
 }
 
 func Test_msgq9(t *testing.T) {
+       t.Parallel()
        var mq = NewType[int]()
        var c = make(chan int, 10)
        mq.Pull_tag_only(`c`, func(i int) (disable bool) {
@@ -348,12 +401,16 @@ func Test_msgq9(t *testing.T) {
        mq.PushLock_tag(`c`, 1)
        mq.ClearAll()
        mq.PushLock_tag(`c`, 2)
-       if len(c) != 1 || <-c != 1 {
-               t.Fatal()
+       t.Log(mq.m.funcs.Len())
+       if l, i := len(c), <-c; l != 1 || i != 1 {
+               t.Fatal(l, i)
        }
 
+       fmt.Println(mq.m.allNeedRemove.Load())
+
        mq.Pull_tag(map[string]func(int) (disable bool){
                `c`: func(i int) (disable bool) {
+                       fmt.Println(1)
                        c <- i
                        return false
                },
@@ -363,15 +420,18 @@ func Test_msgq9(t *testing.T) {
                },
        })
 
+       fmt.Println(mq.m.funcs.Len())
+
        mq.PushLock_tag(`c`, 1)
        mq.ClearAll()
        mq.PushLock_tag(`c`, 2)
-       if len(c) != 1 || <-c != 1 {
-               t.Fatal()
+       if l, i := len(c), <-c; l != 1 || i != 1 {
+               t.Fatal(l, i)
        }
 }
 
 func Test_msgq2(t *testing.T) {
+       t.Parallel()
        mq := New()
 
        mq.Pull_tag(map[string]func(any) bool{
@@ -413,6 +473,7 @@ func Test_msgq2(t *testing.T) {
 }
 
 func Test_msgq3(t *testing.T) {
+       t.Parallel()
        mq := New()
 
        mun_c := make(chan int, 100)
@@ -436,6 +497,7 @@ func Test_msgq3(t *testing.T) {
 }
 
 func Test_msgq4(t *testing.T) {
+       t.Parallel()
        // mq := New(30)
        mq := New(time.Second, time.Second) //out of list
 
@@ -491,6 +553,7 @@ func Test_msgq4(t *testing.T) {
 }
 
 func Test_msgq5(t *testing.T) {
+       t.Parallel()
        mq := New()
 
        mun_c1 := make(chan bool, 100)
@@ -548,6 +611,7 @@ func Test_msgq5(t *testing.T) {
 }
 
 func Test_msgq6(t *testing.T) {
+       t.Parallel()
        msg := NewType[int]()
        msg.Pull_tag(map[string]func(int) (disable bool){
                `1`: func(b int) (disable bool) {
@@ -562,6 +626,7 @@ func Test_msgq6(t *testing.T) {
 }
 
 func Test_msgq7(t *testing.T) {
+       t.Parallel()
        var c = make(chan string, 100)
        msg := NewType[int]()
        msg.Pull_tag_async_only(`1`, func(i int) (disable bool) {
@@ -585,8 +650,8 @@ func Test_msgq7(t *testing.T) {
                return i > 10
        })
        msg.Push_tag(`1`, 0)
-       if <-c != "1" {
-               t.Fatal()
+       if i := <-c; i != "1" {
+               t.Fatal(i)
        }
        if <-c != "2" {
                t.Fatal()
@@ -600,6 +665,7 @@ func Test_msgq7(t *testing.T) {
 }
 
 func Test_msgq8(t *testing.T) {
+       t.Parallel()
        msg := NewType[int]()
        msg.Pull_tag_async_only(`1`, func(i int) (disable bool) {
                if i > 4 {
index 2794493a5899a345115c038099497e4b946ec9d9..6aecf50dcd34f06a3c34b8467556875ecfefebe1 100644 (file)
@@ -3,10 +3,12 @@ package part
 import (
        "errors"
        "fmt"
+       "go/build"
        "runtime"
        "strings"
        "sync"
        "time"
+       "weak"
 )
 
 // const (
@@ -16,12 +18,17 @@ import (
 // )
 
 var (
-       ErrTimeoutToLock  = errors.New("ErrTimeoutToLock")
-       ErrTimeoutToULock = errors.New("ErrTimeoutToULock")
+       ErrTimeoutToLock   = errors.New("ErrTimeoutToLock")
+       ErrTimeoutToULock  = errors.New("ErrTimeoutToULock")
+       ErrTimeoutToRLock  = errors.New("ErrTimeoutToRLock")
+       ErrTimeoutToURLock = errors.New("ErrTimeoutToURLock")
 )
 
 type RWMutex struct {
        rlc       sync.RWMutex
+       rlccl     sync.Mutex
+       rlcc      []weak.Pointer[string]
+       to        []time.Duration
        PanicFunc func(any)
 }
 
@@ -75,28 +82,55 @@ func (m *RWMutex) panicFunc(s any) {
 
 // call inTimeCall() in time or panic(callTree)
 func (m *RWMutex) tof(to time.Duration, e error) (inTimeCall func() (called bool)) {
-       callTree := getCall(2)
+       callTree := getCall(1)
        return time.AfterFunc(to, func() {
-               m.panicFunc(errors.Join(e, errors.New(callTree)))
+               runtime.GC()
+               e = errors.Join(e, errors.New(*callTree))
+               m.rlccl.Lock()
+               for i := 0; i < len(m.rlcc); i++ {
+                       if s := m.rlcc[i].Value(); s != nil {
+                               e = errors.Join(e, errors.New("\nlocking:"+*s))
+                       }
+               }
+               m.rlccl.Unlock()
+               m.panicFunc(e)
        }).Stop
 }
 
+func (m *RWMutex) RecLock(max int, to ...time.Duration) {
+       defer m.Lock()()
+       m.rlcc = make([]weak.Pointer[string], max)
+       m.to = to
+}
+
+func (m *RWMutex) addcl(s *string) *string {
+       m.rlccl.Lock()
+       m.rlcc[copy(m.rlcc, m.rlcc[1:])] = weak.Make(s)
+       m.rlccl.Unlock()
+       return s
+}
+
 // to[0]: wait lock timeout
 //
 // to[1]: wait ulock timeout
 //
 // 不要在Rlock内设置变量,有DATA RACE风险
 func (m *RWMutex) RLock(to ...time.Duration) (unlockf func(ulockfs ...func(ulocked bool) (doUlock bool))) {
+       to = append(to, m.to...)
        if len(to) > 0 {
-               defer m.tof(to[0], ErrTimeoutToLock)()
+               defer m.tof(to[0], ErrTimeoutToRLock)()
        }
 
        m.rlc.RLock()
+       var ct *string
+       if m.rlcc != nil {
+               ct = m.addcl(getCall(0))
+       }
 
        return func(ulockfs ...func(ulocked bool) (doUlock bool)) {
                inTimeCall := func() (called bool) { return true }
                if len(to) > 1 {
-                       inTimeCall = m.tof(to[1], ErrTimeoutToULock)
+                       inTimeCall = m.tof(to[1], ErrTimeoutToURLock)
                }
                ul := false
                for i := 0; i < len(ulockfs); i++ {
@@ -110,6 +144,7 @@ func (m *RWMutex) RLock(to ...time.Duration) (unlockf func(ulockfs ...func(ulock
                        m.rlc.RUnlock()
                        inTimeCall()
                }
+               _ = ct
        }
 }
 
@@ -117,11 +152,16 @@ func (m *RWMutex) RLock(to ...time.Duration) (unlockf func(ulockfs ...func(ulock
 //
 // to[1]: wait ulock timeout
 func (m *RWMutex) Lock(to ...time.Duration) (unlockf func(ulockfs ...func(ulocked bool) (doUlock bool))) {
+       to = append(to, m.to...)
        if len(to) > 0 {
                defer m.tof(to[0], ErrTimeoutToLock)()
        }
 
        m.rlc.Lock()
+       var ct *string
+       if m.rlcc != nil {
+               ct = m.addcl(getCall(0))
+       }
 
        return func(ulockfs ...func(ulocked bool) (doUlock bool)) {
                inTimeCall := func() (called bool) { return true }
@@ -140,16 +180,18 @@ func (m *RWMutex) Lock(to ...time.Duration) (unlockf func(ulockfs ...func(ulocke
                        m.rlc.Unlock()
                        inTimeCall()
                }
+               _ = ct
        }
 }
 
-func getCall(i int) (calls string) {
+func getCall(i int) (calls *string) {
+       var cs string
        for i += 1; true; i++ {
-               if pc, file, line, ok := runtime.Caller(i); !ok || strings.HasPrefix(file, runtime.GOROOT()) {
+               if pc, file, line, ok := runtime.Caller(i); !ok || strings.HasPrefix(file, build.Default.GOROOT) {
                        break
                } else {
-                       calls += fmt.Sprintf("\ncall by %s\n\t%s:%d", runtime.FuncForPC(pc).Name(), file, line)
+                       cs += fmt.Sprintf("\ncall by %s\n\t%s:%d", runtime.FuncForPC(pc).Name(), file, line)
                }
        }
-       return
+       return &cs
 }
index fb437d9201f9078f26a186d9239e1c70d7907648..f2a1dced228c5a8e77e1c29503390dc9139c9076 100644 (file)
@@ -27,6 +27,7 @@ func Test0(t *testing.T) {
 
 // ulock rlock rlock
 func Test1(t *testing.T) {
+       t.Parallel()
        var l RWMutex
        //check(&l, 0)
        ul := l.RLock()
@@ -40,6 +41,7 @@ func Test1(t *testing.T) {
 }
 
 func Test4(t *testing.T) {
+       t.Parallel()
        var l RWMutex
        ul := l.RLock()
        ul(func(ulocked bool) (doUlock bool) {
@@ -50,8 +52,9 @@ func Test4(t *testing.T) {
 }
 
 func Test5(t *testing.T) {
+       t.Parallel()
        var l = RWMutex{PanicFunc: func(a any) {
-               if !errors.Is(a.(error), ErrTimeoutToULock) {
+               if !errors.Is(a.(error), ErrTimeoutToURLock) {
                        t.Fatal(a)
                }
        }}
@@ -63,8 +66,9 @@ func Test5(t *testing.T) {
 }
 
 func Test8(t *testing.T) {
+       t.Parallel()
        var l = RWMutex{PanicFunc: func(a any) {
-               if !errors.Is(a.(error), ErrTimeoutToLock) {
+               if !errors.Is(a.(error), ErrTimeoutToRLock) {
                        panic(a)
                }
        }}
@@ -76,6 +80,7 @@ func Test8(t *testing.T) {
 
 // ulock rlock lock
 func Test2(t *testing.T) {
+       t.Parallel()
        var l RWMutex
        ul := l.RLock()
        //check(&l, 2)
@@ -95,6 +100,7 @@ func Test2(t *testing.T) {
 
 // ulock lock rlock
 func Test3(t *testing.T) {
+       t.Parallel()
        var l RWMutex
        ul := l.Lock()
        //check(&l, -1)
@@ -113,6 +119,7 @@ func Test3(t *testing.T) {
 }
 
 func Test6(t *testing.T) {
+       t.Parallel()
        var c = make(chan int, 3)
        var l RWMutex
        ul := l.RLock()
@@ -139,6 +146,7 @@ func Test6(t *testing.T) {
 }
 
 func Test7(t *testing.T) {
+       t.Parallel()
        var c = make(chan int, 2)
        var l RWMutex
        ul := l.RLock()
@@ -160,6 +168,7 @@ func Test7(t *testing.T) {
 }
 
 func Test9(t *testing.T) {
+       t.Parallel()
        n := time.Now()
        var l RWMutex
        for i := 0; i < 1000; i++ {
@@ -169,16 +178,47 @@ func Test9(t *testing.T) {
 }
 
 func Test10(t *testing.T) {
+       t.Parallel()
        n := time.Now()
        var l sync.RWMutex
-       for i := 0; i < 300; i++ {
+       for i := 0; i < 30000; i++ {
                l.RLock()
                go l.RUnlock()
        }
        t.Log(time.Since(n))
+
+       n = time.Now()
+       var l2 RWMutex
+       for i := 0; i < 30000; i++ {
+               go l2.RLock()()
+       }
+       t.Log(time.Since(n))
+}
+
+func Test11(t *testing.T) {
+       t.Parallel()
+       var l RWMutex
+
+       l.RLock()(func(ulocked bool) (doUlock bool) {
+               return true
+       }, func(ulocked bool) (doUlock bool) {
+               if ulocked {
+                       defer l.Lock()()
+               }
+               return
+       })
+       l.RLock()(func(ulocked bool) (doUlock bool) {
+               return false
+       }, func(ulocked bool) (doUlock bool) {
+               if ulocked {
+                       defer l.Lock()()
+               }
+               return
+       })
 }
 
 func Panic_Test8(t *testing.T) {
+       t.Parallel()
        var l RWMutex
        ul := l.Lock(time.Second, time.Second)
        ul(func(ulocked bool) (doUlock bool) {
@@ -188,24 +228,33 @@ func Panic_Test8(t *testing.T) {
 }
 
 // ulock rlock rlock
-func Panic_Test4(t *testing.T) {
+func Test4Panic_(t *testing.T) {
+       t.Parallel()
        var l RWMutex
+       l.RecLock(10, time.Second, time.Second)
        //check(&l, 0)
        ul := l.RLock(time.Second, time.Second)
        //check(&l, 1)
        ul1 := l.RLock(time.Second, time.Second)
+       //check(&l, 1)
+       ul2 := l.RLock(time.Second, time.Second)
        //check(&l, 2)
        time.Sleep(time.Millisecond * 1500)
        ul()
        //check(&l, 1)
        ul1()
+       ul2()
        //check(&l, 0)
        time.Sleep(time.Second * 3)
 }
 
 // ulock rlock lock
-func Panic_Test5(t *testing.T) {
+func Test5Panic_(t *testing.T) {
+       t.Parallel()
        var l RWMutex
+       l.RecLock(10, time.Second, time.Second)
+
+       l.RLock()()
        ul := l.RLock()
        //check(&l, 1)
        time.AfterFunc(time.Millisecond*1500, func() {
@@ -213,7 +262,7 @@ func Panic_Test5(t *testing.T) {
                ul()
        })
        c := time.Now()
-       ul1 := l.Lock(time.Second)
+       ul1 := l.Lock(time.Second * 2)
        //check(&l, 0)
        if time.Since(c) < time.Second {
                t.Fail()
@@ -247,6 +296,7 @@ PASS
 */
 func BenchmarkRlock(b *testing.B) {
        var lock1 RWMutex
+       lock1.RecLock(5, time.Second, time.Second)
        for i := 0; i < b.N; i++ {
                lock1.RLock()()
        }
@@ -268,6 +318,7 @@ func BenchmarkRlock1(b *testing.B) {
        var lock1 sync.RWMutex
        for i := 0; i < b.N; i++ {
                lock1.RLock()
+               _ = 1
                lock1.RUnlock()
        }
 }