import (
"container/list"
"context"
+ "errors"
"fmt"
+ lmt "fmt"
+ "go/build"
+ "runtime"
+ "strings"
"sync/atomic"
"time"
- "unsafe"
psync "github.com/qydysky/part/sync"
)
+var ErrRunTO = errors.New(`ErrRunTO`)
+
type Msgq struct {
to []time.Duration
funcs *list.List
someNeedRemove atomic.Bool
+ allNeedRemove atomic.Bool
lock psync.RWMutex
- runTag psync.Map
+ PanicFunc func(any)
}
type msgqItem struct {
- running atomic.Int32
disable atomic.Bool
f *func(any) (disable bool)
}
m := new(Msgq)
m.funcs = list.New()
m.to = to
+ if len(m.to) > 0 {
+ m.lock.RecLock(10, to[0], to[0])
+ }
return m
}
-func (m *Msgq) register(mp *msgqItem) (cancel func()) {
- ul := m.lock.Lock()
- m.funcs.PushBack(mp)
- ul()
- return func() {
- mp.disable.Store(true)
- m.someNeedRemove.Store(true)
- }
+func (m *Msgq) TOPanicFunc(f func(any)) {
+ m.PanicFunc = f
+ m.lock.PanicFunc = f
}
-func (m *Msgq) Register(f func(any) (disable bool)) (cancel func()) {
- mp := &msgqItem{
- f: &f,
+func (m *Msgq) register(mp *msgqItem, f func(v any) *list.Element) (cancel func()) {
+ defer m.lock.Lock()()
+
+ if m.allNeedRemove.Load() {
+ m.removeDisable(m.someNeedRemove.CompareAndSwap(false, true), true)
}
- m.register(mp)
+
+ f(mp)
return func() {
mp.disable.Store(true)
- m.someNeedRemove.Store(true)
+ m.removeDisable(m.someNeedRemove.CompareAndSwap(false, true), false)
}
}
-func (m *Msgq) register_front(mp *msgqItem) (cancel func()) {
- ul := m.lock.Lock()
- m.funcs.PushFront(mp)
- ul()
- return func() {
- mp.disable.Store(true)
- m.someNeedRemove.Store(true)
- }
+func (m *Msgq) Register(f func(any) (disable bool)) (cancel func()) {
+ return m.register(&msgqItem{
+ f: &f,
+ }, m.funcs.PushBack)
}
-func (m *Msgq) Register_front(f func(any) (disable bool)) (cancel func()) {
- mp := &msgqItem{
+func (m *Msgq) RegisterFront(f func(any) (disable bool)) (cancel func()) {
+ return m.register(&msgqItem{
f: &f,
- }
- m.register_front(mp)
- return func() {
- mp.disable.Store(true)
- m.someNeedRemove.Store(true)
- }
+ }, m.funcs.PushFront)
}
-func (m *Msgq) Push(msg any) {
- ul := m.lock.RLock(m.to...)
- defer ul(m.removeDisable(true)...)
+func (m *Msgq) push(msg any, isLock bool) {
+ if isLock {
+ defer m.lock.Lock()()
+ } else {
+ defer m.lock.RLock()()
+ }
- for el := m.funcs.Front(); el != nil; el = el.Next() {
- mi := el.Value.(*msgqItem)
- if mi.disable.Load() {
- continue
+ if m.allNeedRemove.Load() {
+ m.removeDisable(true, isLock)
+ if !isLock {
+ return
}
- mi.running.Add(1)
- if disable := (*mi.f)(msg); disable {
+ }
+
+ for el := m.funcs.Front(); el != nil; el = el.Next() {
+ if mi := el.Value.(*msgqItem); !mi.disable.Load() && (*mi.f)(msg) {
mi.disable.Store(true)
- m.someNeedRemove.Store(true)
+ m.removeDisable(m.someNeedRemove.CompareAndSwap(false, true), isLock)
}
- mi.running.Add(-1)
}
}
+// 不能在由PushLock*调用的Pull中以同步方式使用
+func (m *Msgq) Push(msg any) {
+ m.push(msg, false)
+}
+
+// 不能在由Push*调用的Pull中以同步方式使用
func (m *Msgq) PushLock(msg any) {
- ul := m.lock.Lock(m.to...)
- defer ul(m.removeDisable(false)...)
+ m.push(msg, true)
+}
- for el := m.funcs.Front(); el != nil; el = el.Next() {
- mi := el.Value.(*msgqItem)
- if mi.disable.Load() {
- continue
+func (m *Msgq) ClearAll() {
+ m.allNeedRemove.Store(true)
+}
+
+func (m *Msgq) removeDisable(sig bool, isLock bool) {
+ if sig {
+ f := func() {
+ if !isLock {
+ defer m.lock.Lock()()
+ }
+ all := m.allNeedRemove.Swap(false)
+ for el := m.funcs.Front(); el != nil; el = el.Next() {
+ mi := el.Value.(*msgqItem)
+ if all || mi.disable.Load() {
+ m.funcs.Remove(el)
+ }
+ }
+ m.someNeedRemove.Store(false)
}
- mi.running.Add(1)
- if disable := (*mi.f)(msg); disable {
- mi.disable.Store(true)
- m.someNeedRemove.Store(true)
+ if isLock {
+ f()
+ } else {
+ go f()
}
- mi.running.Add(-1)
}
}
-func (m *Msgq) ClearAll() {
- for el := m.funcs.Front(); el != nil; el = el.Next() {
- mi := el.Value.(*msgqItem)
- mi.disable.Store(true)
- m.someNeedRemove.Store(true)
+func (m *Msgq) panicFunc(s any) {
+ if m.PanicFunc != nil {
+ m.PanicFunc(s)
+ } else {
+ panic(s)
}
}
-func (m *Msgq) removeDisable(rlock bool) (ls []func(ulocked bool) (doUlock bool)) {
- if rlock {
- return []func(ulocked bool) (doUlock bool){
- func(ulocked bool) (doUlock bool) {
- return m.someNeedRemove.CompareAndSwap(true, false)
- },
- func(ulocked bool) (doUlock bool) {
- if ulocked {
- defer m.lock.Lock()()
- for el := m.funcs.Front(); el != nil; el = el.Next() {
- mi := el.Value.(*msgqItem)
- if mi.disable.Load() && mi.running.Load() == 0 {
- m.funcs.Remove(el)
- }
- }
- }
- return
- },
- }
- } else {
- return []func(ulocked bool) (doUlock bool){
- func(ulocked bool) (doUlock bool) {
- if m.someNeedRemove.CompareAndSwap(true, false) {
- for el := m.funcs.Front(); el != nil; el = el.Next() {
- mi := el.Value.(*msgqItem)
- if mi.disable.Load() && mi.running.Load() == 0 {
- m.funcs.Remove(el)
- }
- }
- }
- return
- },
+func (m *Msgq) PushingTO(info string, callTree *string) (fin func()) {
+ if len(m.to) > 1 {
+ to := time.AfterFunc(m.to[1], func() {
+ m.panicFunc(errors.Join(ErrRunTO, lmt.Errorf("%v:%v", info, *callTree)))
+ })
+ return func() {
+ to.Stop()
}
}
+ return func() {}
}
type Msgq_tag_data struct {
Data any
}
-// 不能放置在由PushLock_tag调用的同步Pull中
+// 不能在由PushLock*调用的Pull中以同步方式使用
func (m *Msgq) Push_tag(Tag string, Data any) {
- if len(m.to) > 0 {
- ptr := uintptr(unsafe.Pointer(&Data))
- m.runTag.Store(ptr, "Push_tag(`"+Tag+"`,...)")
- defer func() {
- if e := recover(); e != nil {
- m.runTag.Range(func(key, value any) bool {
- if key == ptr {
- fmt.Printf("%v panic > %v\n", value, e)
- } else {
- fmt.Printf("%v running\n", value)
- }
- return true
- })
- m.runTag.ClearAll()
- panic(e)
- }
- m.runTag.Delete(ptr)
- }()
- }
- {
- /*
- m.m.Push(Msgq_tag_data{
- Tag: Tag,
- Data: Data,
- })
- */
- defer m.lock.RLock(m.to...)(m.removeDisable(true)...)
-
- for el := m.funcs.Front(); el != nil; el = el.Next() {
- mi := el.Value.(*msgqItem)
- if mi.disable.Load() {
- continue
- }
- mi.running.Add(1)
- if disable := (*mi.f)(&Msgq_tag_data{
- Tag: Tag,
- Data: Data,
- }); disable {
- mi.disable.Store(true)
- m.someNeedRemove.Store(true)
- }
- mi.running.Add(-1)
- }
- }
+ defer m.PushingTO(lmt.Sprintf("\nPush_tag(`%v`)", Tag), getCall(1))()
+ m.Push(&Msgq_tag_data{
+ Tag: Tag,
+ Data: Data,
+ })
}
-// 不能放置在由Push_tag、PushLock_tag调用的同步Pull中
+// 不能在由Push*调用的Pull中以同步方式使用
func (m *Msgq) PushLock_tag(Tag string, Data any) {
- if len(m.to) > 0 {
- ptr := uintptr(unsafe.Pointer(&Data))
- m.runTag.Store(ptr, "PushLock_tag(`"+Tag+"`,...)")
- defer func() {
- if e := recover(); e != nil {
- m.runTag.Range(func(key, value any) bool {
- if key == ptr {
- fmt.Printf("%v panic > %v\n", value, e)
- } else {
- fmt.Printf("%v running\n", value)
- }
- return true
- })
- m.runTag.ClearAll()
- panic(e)
- }
- m.runTag.Delete(ptr)
- }()
- }
- {
- /*
- m.m.PushLock(Msgq_tag_data{
- Tag: Tag,
- Data: Data,
- })
- */
- ul := m.lock.Lock(m.to...)
- defer ul(m.removeDisable(false)...)
-
- for el := m.funcs.Front(); el != nil; el = el.Next() {
- mi := el.Value.(*msgqItem)
- if mi.disable.Load() {
- continue
- }
- mi.running.Add(1)
- if disable := (*mi.f)(&Msgq_tag_data{
- Tag: Tag,
- Data: Data,
- }); disable {
- mi.disable.Store(true)
- m.someNeedRemove.Store(true)
- }
- mi.running.Add(-1)
- }
- }
+ defer m.PushingTO(lmt.Sprintf("\nPushLock_tag(`%v`)", Tag), getCall(1))()
+ m.PushLock(&Msgq_tag_data{
+ Tag: Tag,
+ Data: Data,
+ })
}
func (m *Msgq) Pull_tag_chan(key string, size int, ctx context.Context) (cancel func(), ch <-chan any) {
var c = make(chan any, size)
- var f1 = func(data any) bool {
+ return m.Register(func(data any) bool {
if d, ok := data.(*Msgq_tag_data); ok && d.Tag == key {
select {
case <-ctx.Done():
}
}
return false
- }
- cancel = m.register_front(&msgqItem{
- f: &f1,
- })
- ch = c
- return
+ }), c
}
func (m *Msgq) Pull_tag_only(key string, f func(any) (disable bool)) (cancel func()) {
- var f1 = func(data any) (disable bool) {
+ return m.Register(func(data any) (disable bool) {
if d, ok := data.(*Msgq_tag_data); ok && d.Tag == key {
return f(d.Data)
}
return false
- }
- return m.register_front(&msgqItem{
- f: &f1,
})
}
func (m *Msgq) Pull_tag(func_map map[string]func(any) (disable bool)) (cancel func()) {
- var f1 = func(data any) (disable bool) {
+ return m.Register(func(data any) (disable bool) {
if d, ok := data.(*Msgq_tag_data); ok {
if f, ok := func_map[d.Tag]; ok {
return f(d.Data)
}
}
return false
- }
- return m.register_front(&msgqItem{
- f: &f1,
})
}
func (m *Msgq) Pull_tag_async_only(key string, f func(any) (disable bool)) (cancel func()) {
- var mi = msgqItem{}
- var f1 = func(data any) bool {
- if d, ok := data.(*Msgq_tag_data); ok {
+ var disable bool
+ return m.RegisterFront(func(data any) bool {
+ if d, ok := data.(*Msgq_tag_data); !disable && ok && d.Tag == key {
go func() {
- if f(d.Data) {
- mi.disable.Store(true)
- m.someNeedRemove.Store(true)
- }
+ disable = f(d.Data)
}()
}
- return false
- }
- mi.f = &f1
- return m.register_front(&mi)
+ return disable
+ })
}
func (m *Msgq) Pull_tag_async(func_map map[string]func(any) (disable bool)) (cancel func()) {
- var mi = msgqItem{}
- var f = func(data any) bool {
+ var disable bool
+ return m.RegisterFront(func(data any) bool {
if d, ok := data.(*Msgq_tag_data); ok {
if f, ok := func_map[d.Tag]; ok {
go func() {
- if f(d.Data) {
- mi.disable.Store(true)
- m.someNeedRemove.Store(true)
- }
+ disable = f(d.Data)
}()
}
}
- return false
- }
- mi.f = &f
- return m.register_front(&mi)
+ return disable
+ })
}
type MsgType[T any] struct {
m.m.ClearAll()
}
-// 不能放置在由PushLock_tag调用的同步Pull中
+// 不能在由PushLock*调用的Pull中以同步方式使用
func (m *MsgType[T]) Push_tag(Tag string, Data T) {
- if len(m.m.to) > 0 {
- ptr := uintptr(unsafe.Pointer(&Data))
- m.m.runTag.Store(ptr, "Push_tag(`"+Tag+"`,...)")
- defer func() {
- if e := recover(); e != nil {
- m.m.runTag.Range(func(key, value any) bool {
- if key == ptr {
- fmt.Printf("%v panic > %v\n", value, e)
- } else {
- fmt.Printf("%v running\n", value)
- }
- return true
- })
- m.m.runTag.ClearAll()
- panic(e)
- }
- m.m.runTag.Delete(ptr)
- }()
- }
- {
- /*
- m.m.Push(Msgq_tag_data{
- Tag: Tag,
- Data: Data,
- })
- */
- ul := m.m.lock.RLock(m.m.to...)
- defer ul(m.m.removeDisable(true)...)
-
- for el := m.m.funcs.Front(); el != nil; el = el.Next() {
- mi := el.Value.(*msgqItem)
- if mi.disable.Load() {
- continue
- }
- mi.running.Add(1)
- if disable := (*mi.f)(&MsgType_tag_data[T]{
- Tag: Tag,
- Data: &Data,
- }); disable {
- mi.disable.Store(true)
- m.m.someNeedRemove.Store(true)
- }
- mi.running.Add(-1)
- }
- }
+ m.m.Push(&MsgType_tag_data[T]{
+ Tag: Tag,
+ Data: &Data,
+ })
}
-// 不能放置在由Push_tag、PushLock_tag调用的同步Pull中
+// 不能在由Push*调用的Pull中以同步方式使用
func (m *MsgType[T]) PushLock_tag(Tag string, Data T) {
- if len(m.m.to) > 0 {
- ptr := uintptr(unsafe.Pointer(&Data))
- m.m.runTag.Store(ptr, "PushLock_tag(`"+Tag+"`,...)")
- defer func() {
- if e := recover(); e != nil {
- m.m.runTag.Range(func(key, value any) bool {
- if key == ptr {
- fmt.Printf("%v panic > %v\n", value, e)
- } else {
- fmt.Printf("%v running\n", value)
- }
- return true
- })
- m.m.runTag.ClearAll()
- panic(e)
- }
- m.m.runTag.Delete(ptr)
- }()
- }
- {
- /*
- m.m.PushLock(Msgq_tag_data{
- Tag: Tag,
- Data: Data,
- })
- */
- ul := m.m.lock.Lock(m.m.to...)
- defer ul(m.m.removeDisable(false)...)
-
- for el := m.m.funcs.Front(); el != nil; el = el.Next() {
- mi := el.Value.(*msgqItem)
- if mi.disable.Load() {
- continue
- }
- mi.running.Add(1)
- if disable := (*mi.f)(&MsgType_tag_data[T]{
- Tag: Tag,
- Data: &Data,
- }); disable {
- mi.disable.Store(true)
- m.m.someNeedRemove.Store(true)
- }
- mi.running.Add(-1)
- }
- }
+ m.m.PushLock(&MsgType_tag_data[T]{
+ Tag: Tag,
+ Data: &Data,
+ })
}
func (m *MsgType[T]) Pull_tag_chan(key string, size int, ctx context.Context) (cancel func(), ch <-chan T) {
var c = make(chan T, size)
- var f = func(data any) bool {
+ return m.m.Register(func(data any) bool {
if data1, ok := data.(*MsgType_tag_data[T]); ok {
if data1.Tag == key {
select {
}
}
return false
- }
- cancel = m.m.register(&msgqItem{
- f: &f,
- })
- ch = c
- return
+ }), c
}
func (m *MsgType[T]) Pull_tag_only(key string, f func(T) (disable bool)) (cancel func()) {
- var f1 = func(data any) (disable bool) {
- if data1, ok := data.(*MsgType_tag_data[T]); ok {
- if data1.Tag == key {
- return f(*data1.Data)
- }
+ return m.m.Register(func(data any) (disable bool) {
+ if data1, ok := data.(*MsgType_tag_data[T]); ok && data1.Tag == key {
+ return f(*data1.Data)
}
return false
- }
- return m.m.register(&msgqItem{
- f: &f1,
})
}
func (m *MsgType[T]) Pull_tag(func_map map[string]func(T) (disable bool)) (cancel func()) {
- var f = func(data any) (disable bool) {
+ return m.m.Register(func(data any) (disable bool) {
if data1, ok := data.(*MsgType_tag_data[T]); ok {
if f, ok := func_map[data1.Tag]; ok {
return f(*data1.Data)
}
}
return false
- }
- return m.m.register(&msgqItem{
- f: &f,
})
}
func (m *MsgType[T]) Pull_tag_async_only(key string, f func(T) (disable bool)) (cancel func()) {
- var mi = msgqItem{}
- var f1 = func(data any) bool {
- if d, ok := data.(*MsgType_tag_data[T]); ok {
+ var disabled atomic.Bool
+ return m.m.RegisterFront(func(data any) bool {
+ if d, ok := data.(*MsgType_tag_data[T]); !disabled.Load() && ok && d.Tag == key {
go func() {
- if f(*d.Data) {
- mi.disable.Store(true)
- m.m.someNeedRemove.Store(true)
+ if !disabled.Load() {
+ disabled.Store(f(*d.Data))
}
}()
}
- return false
- }
- mi.f = &f1
- return m.m.register_front(&mi)
+ return disabled.Load()
+ })
}
func (m *MsgType[T]) Pull_tag_async(func_map map[string]func(T) (disable bool)) (cancel func()) {
- var mi = msgqItem{}
- var f = func(data any) bool {
- if d, ok := data.(*MsgType_tag_data[T]); ok {
+ var disabled atomic.Bool
+ return m.m.RegisterFront(func(data any) bool {
+ if d, ok := data.(*MsgType_tag_data[T]); !disabled.Load() && ok {
if f, ok := func_map[d.Tag]; ok {
go func() {
- if f(*d.Data) {
- mi.disable.Store(true)
- m.m.someNeedRemove.Store(true)
+ if !disabled.Load() {
+ disabled.Store(f(*d.Data))
}
}()
}
}
- return false
+ return disabled.Load()
+ })
+}
+
+func getCall(i int) (calls *string) {
+ var cs string
+ for i += 1; true; i++ {
+ if pc, file, line, ok := runtime.Caller(i); !ok || strings.HasPrefix(file, build.Default.GOROOT) {
+ break
+ } else {
+ cs += fmt.Sprintf("\ncall by %s\n\t%s:%d", runtime.FuncForPC(pc).Name(), file, line)
+ }
+ }
+ if cs == "" {
+ cs += fmt.Sprintln("\ncall by goroutine")
}
- mi.f = &f
- return m.m.register_front(&mi)
+ return &cs
}
import (
"context"
+ "errors"
+ "fmt"
_ "net/http/pprof"
"sync"
"testing"
"time"
+
+ psync "github.com/qydysky/part/sync"
)
// type test_item struct {
}
}
-// func TestPushLock(t *testing.T) {
-// defer func() {
-// if e := recover(); e != nil {
-// t.Fatal(e)
-// }
-// }()
-// mq := NewTo(time.Second)
-// mq.Pull_tag_only(`test`, func(a any) (disable bool) {
-// mq.PushLock_tag(`lock`, nil)
-// return false
-// })
-// mq.Push_tag(`test`, nil)
-// t.Fatal()
-// }
+func TestTORun(t *testing.T) {
+ t.Parallel()
+ panicC := make(chan any, 10)
+ mq := New(time.Second, time.Second)
+ mq.TOPanicFunc(func(a any) {
+ panicC <- a
+ })
+ mq.Pull_tag_only(`test`, func(a any) (disable bool) {
+ time.Sleep(time.Second * 10)
+ return false
+ })
+ go mq.Push_tag(`test`, nil)
+ e := <-panicC
+ if !errors.Is(e.(error), ErrRunTO) {
+ t.Fatal(e)
+ }
+}
+
+func TestPushLock(t *testing.T) {
+ t.Parallel()
+ panicC := make(chan any, 10)
+ mq := New(time.Second, time.Second*2)
+ mq.TOPanicFunc(func(a any) {
+ panicC <- a
+ })
+ mq.Pull_tag_only(`test`, func(a any) (disable bool) {
+ mq.PushLock_tag(`lock`, nil)
+ return false
+ })
+ go mq.Push_tag(`test`, nil)
+ e := <-panicC
+ if !errors.Is(e.(error), psync.ErrTimeoutToLock) {
+ t.Fatal(e)
+ }
+}
func Benchmark_1(b *testing.B) {
mq := New()
}
func Test_4(t *testing.T) {
+ t.Parallel()
mq := New()
cancel := mq.Pull_tag(FuncMap{
`del`: func(a any) (disable bool) {
}
func Test_2(t *testing.T) {
+ t.Parallel()
mq := New()
cancel := mq.Pull_tag(FuncMap{
`del`: func(a any) (disable bool) {
mq.PushLock_tag(`del`, nil)
}
+func Test_5(t *testing.T) {
+ t.Parallel()
+ mq := New(time.Second, time.Second*2)
+ mq.Pull_tag(FuncMap{
+ `del`: func(a any) (disable bool) {
+ t.Log(1)
+ mq.Push_tag(`del1`, nil)
+ t.Log(3)
+ return false
+ },
+ `del1`: func(a any) (disable bool) {
+ t.Log(2)
+ return true
+ },
+ })
+ mq.Push_tag(`del`, 1)
+ mq.Push_tag(`del1`, 1)
+}
+
func Test_1(t *testing.T) {
+ t.Parallel()
mq := New(time.Millisecond*5, time.Millisecond*10)
go mq.Push_tag(`del`, nil)
mq.Pull_tag(FuncMap{
}
func Test_RemoveInPush(t *testing.T) {
+ t.Parallel()
mq := New(time.Second, time.Second*3)
mq.Pull_tag(FuncMap{
`r1`: func(a any) (disable bool) {
mq.ClearAll()
- return false
+ return true
},
`r2`: func(a any) (disable bool) {
return true
}
func Test_3(t *testing.T) {
+ t.Parallel()
mq := New(time.Millisecond*5, time.Millisecond*10)
go mq.Push_tag(`sss`, nil)
mq.Pull_tag(FuncMap{
}
func Test_Pull_tag_chan(t *testing.T) {
+ t.Parallel()
mq := New()
ctx, cf := context.WithCancel(context.Background())
_, ch := mq.Pull_tag_chan(`a`, 2, ctx)
}
func Test_Pull_tag_chan2(t *testing.T) {
+ t.Parallel()
mq := New()
mq.Pull_tag_chan(`a`, 1, context.Background())
}
func Test_msgq1(t *testing.T) {
+ t.Parallel()
mq := New()
c := make(chan time.Time, 10)
mq.Pull_tag(map[string]func(any) bool{
}
func Test_msgq9(t *testing.T) {
+ t.Parallel()
var mq = NewType[int]()
var c = make(chan int, 10)
mq.Pull_tag_only(`c`, func(i int) (disable bool) {
mq.PushLock_tag(`c`, 1)
mq.ClearAll()
mq.PushLock_tag(`c`, 2)
- if len(c) != 1 || <-c != 1 {
- t.Fatal()
+ t.Log(mq.m.funcs.Len())
+ if l, i := len(c), <-c; l != 1 || i != 1 {
+ t.Fatal(l, i)
}
+ fmt.Println(mq.m.allNeedRemove.Load())
+
mq.Pull_tag(map[string]func(int) (disable bool){
`c`: func(i int) (disable bool) {
+ fmt.Println(1)
c <- i
return false
},
},
})
+ fmt.Println(mq.m.funcs.Len())
+
mq.PushLock_tag(`c`, 1)
mq.ClearAll()
mq.PushLock_tag(`c`, 2)
- if len(c) != 1 || <-c != 1 {
- t.Fatal()
+ if l, i := len(c), <-c; l != 1 || i != 1 {
+ t.Fatal(l, i)
}
}
func Test_msgq2(t *testing.T) {
+ t.Parallel()
mq := New()
mq.Pull_tag(map[string]func(any) bool{
}
func Test_msgq3(t *testing.T) {
+ t.Parallel()
mq := New()
mun_c := make(chan int, 100)
}
func Test_msgq4(t *testing.T) {
+ t.Parallel()
// mq := New(30)
mq := New(time.Second, time.Second) //out of list
}
func Test_msgq5(t *testing.T) {
+ t.Parallel()
mq := New()
mun_c1 := make(chan bool, 100)
}
func Test_msgq6(t *testing.T) {
+ t.Parallel()
msg := NewType[int]()
msg.Pull_tag(map[string]func(int) (disable bool){
`1`: func(b int) (disable bool) {
}
func Test_msgq7(t *testing.T) {
+ t.Parallel()
var c = make(chan string, 100)
msg := NewType[int]()
msg.Pull_tag_async_only(`1`, func(i int) (disable bool) {
return i > 10
})
msg.Push_tag(`1`, 0)
- if <-c != "1" {
- t.Fatal()
+ if i := <-c; i != "1" {
+ t.Fatal(i)
}
if <-c != "2" {
t.Fatal()
}
func Test_msgq8(t *testing.T) {
+ t.Parallel()
msg := NewType[int]()
msg.Pull_tag_async_only(`1`, func(i int) (disable bool) {
if i > 4 {
import (
"errors"
"fmt"
+ "go/build"
"runtime"
"strings"
"sync"
"time"
+ "weak"
)
// const (
// )
var (
- ErrTimeoutToLock = errors.New("ErrTimeoutToLock")
- ErrTimeoutToULock = errors.New("ErrTimeoutToULock")
+ ErrTimeoutToLock = errors.New("ErrTimeoutToLock")
+ ErrTimeoutToULock = errors.New("ErrTimeoutToULock")
+ ErrTimeoutToRLock = errors.New("ErrTimeoutToRLock")
+ ErrTimeoutToURLock = errors.New("ErrTimeoutToURLock")
)
type RWMutex struct {
rlc sync.RWMutex
+ rlccl sync.Mutex
+ rlcc []weak.Pointer[string]
+ to []time.Duration
PanicFunc func(any)
}
// call inTimeCall() in time or panic(callTree)
func (m *RWMutex) tof(to time.Duration, e error) (inTimeCall func() (called bool)) {
- callTree := getCall(2)
+ callTree := getCall(1)
return time.AfterFunc(to, func() {
- m.panicFunc(errors.Join(e, errors.New(callTree)))
+ runtime.GC()
+ e = errors.Join(e, errors.New(*callTree))
+ m.rlccl.Lock()
+ for i := 0; i < len(m.rlcc); i++ {
+ if s := m.rlcc[i].Value(); s != nil {
+ e = errors.Join(e, errors.New("\nlocking:"+*s))
+ }
+ }
+ m.rlccl.Unlock()
+ m.panicFunc(e)
}).Stop
}
+func (m *RWMutex) RecLock(max int, to ...time.Duration) {
+ defer m.Lock()()
+ m.rlcc = make([]weak.Pointer[string], max)
+ m.to = to
+}
+
+func (m *RWMutex) addcl(s *string) *string {
+ m.rlccl.Lock()
+ m.rlcc[copy(m.rlcc, m.rlcc[1:])] = weak.Make(s)
+ m.rlccl.Unlock()
+ return s
+}
+
// to[0]: wait lock timeout
//
// to[1]: wait ulock timeout
//
// 不要在Rlock内设置变量,有DATA RACE风险
func (m *RWMutex) RLock(to ...time.Duration) (unlockf func(ulockfs ...func(ulocked bool) (doUlock bool))) {
+ to = append(to, m.to...)
if len(to) > 0 {
- defer m.tof(to[0], ErrTimeoutToLock)()
+ defer m.tof(to[0], ErrTimeoutToRLock)()
}
m.rlc.RLock()
+ var ct *string
+ if m.rlcc != nil {
+ ct = m.addcl(getCall(0))
+ }
return func(ulockfs ...func(ulocked bool) (doUlock bool)) {
inTimeCall := func() (called bool) { return true }
if len(to) > 1 {
- inTimeCall = m.tof(to[1], ErrTimeoutToULock)
+ inTimeCall = m.tof(to[1], ErrTimeoutToURLock)
}
ul := false
for i := 0; i < len(ulockfs); i++ {
m.rlc.RUnlock()
inTimeCall()
}
+ _ = ct
}
}
//
// to[1]: wait ulock timeout
func (m *RWMutex) Lock(to ...time.Duration) (unlockf func(ulockfs ...func(ulocked bool) (doUlock bool))) {
+ to = append(to, m.to...)
if len(to) > 0 {
defer m.tof(to[0], ErrTimeoutToLock)()
}
m.rlc.Lock()
+ var ct *string
+ if m.rlcc != nil {
+ ct = m.addcl(getCall(0))
+ }
return func(ulockfs ...func(ulocked bool) (doUlock bool)) {
inTimeCall := func() (called bool) { return true }
m.rlc.Unlock()
inTimeCall()
}
+ _ = ct
}
}
-func getCall(i int) (calls string) {
+func getCall(i int) (calls *string) {
+ var cs string
for i += 1; true; i++ {
- if pc, file, line, ok := runtime.Caller(i); !ok || strings.HasPrefix(file, runtime.GOROOT()) {
+ if pc, file, line, ok := runtime.Caller(i); !ok || strings.HasPrefix(file, build.Default.GOROOT) {
break
} else {
- calls += fmt.Sprintf("\ncall by %s\n\t%s:%d", runtime.FuncForPC(pc).Name(), file, line)
+ cs += fmt.Sprintf("\ncall by %s\n\t%s:%d", runtime.FuncForPC(pc).Name(), file, line)
}
}
- return
+ return &cs
}
// ulock rlock rlock
func Test1(t *testing.T) {
+ t.Parallel()
var l RWMutex
//check(&l, 0)
ul := l.RLock()
}
func Test4(t *testing.T) {
+ t.Parallel()
var l RWMutex
ul := l.RLock()
ul(func(ulocked bool) (doUlock bool) {
}
func Test5(t *testing.T) {
+ t.Parallel()
var l = RWMutex{PanicFunc: func(a any) {
- if !errors.Is(a.(error), ErrTimeoutToULock) {
+ if !errors.Is(a.(error), ErrTimeoutToURLock) {
t.Fatal(a)
}
}}
}
func Test8(t *testing.T) {
+ t.Parallel()
var l = RWMutex{PanicFunc: func(a any) {
- if !errors.Is(a.(error), ErrTimeoutToLock) {
+ if !errors.Is(a.(error), ErrTimeoutToRLock) {
panic(a)
}
}}
// ulock rlock lock
func Test2(t *testing.T) {
+ t.Parallel()
var l RWMutex
ul := l.RLock()
//check(&l, 2)
// ulock lock rlock
func Test3(t *testing.T) {
+ t.Parallel()
var l RWMutex
ul := l.Lock()
//check(&l, -1)
}
func Test6(t *testing.T) {
+ t.Parallel()
var c = make(chan int, 3)
var l RWMutex
ul := l.RLock()
}
func Test7(t *testing.T) {
+ t.Parallel()
var c = make(chan int, 2)
var l RWMutex
ul := l.RLock()
}
func Test9(t *testing.T) {
+ t.Parallel()
n := time.Now()
var l RWMutex
for i := 0; i < 1000; i++ {
}
func Test10(t *testing.T) {
+ t.Parallel()
n := time.Now()
var l sync.RWMutex
- for i := 0; i < 300; i++ {
+ for i := 0; i < 30000; i++ {
l.RLock()
go l.RUnlock()
}
t.Log(time.Since(n))
+
+ n = time.Now()
+ var l2 RWMutex
+ for i := 0; i < 30000; i++ {
+ go l2.RLock()()
+ }
+ t.Log(time.Since(n))
+}
+
+func Test11(t *testing.T) {
+ t.Parallel()
+ var l RWMutex
+
+ l.RLock()(func(ulocked bool) (doUlock bool) {
+ return true
+ }, func(ulocked bool) (doUlock bool) {
+ if ulocked {
+ defer l.Lock()()
+ }
+ return
+ })
+ l.RLock()(func(ulocked bool) (doUlock bool) {
+ return false
+ }, func(ulocked bool) (doUlock bool) {
+ if ulocked {
+ defer l.Lock()()
+ }
+ return
+ })
}
func Panic_Test8(t *testing.T) {
+ t.Parallel()
var l RWMutex
ul := l.Lock(time.Second, time.Second)
ul(func(ulocked bool) (doUlock bool) {
}
// ulock rlock rlock
-func Panic_Test4(t *testing.T) {
+func Test4Panic_(t *testing.T) {
+ t.Parallel()
var l RWMutex
+ l.RecLock(10, time.Second, time.Second)
//check(&l, 0)
ul := l.RLock(time.Second, time.Second)
//check(&l, 1)
ul1 := l.RLock(time.Second, time.Second)
+ //check(&l, 1)
+ ul2 := l.RLock(time.Second, time.Second)
//check(&l, 2)
time.Sleep(time.Millisecond * 1500)
ul()
//check(&l, 1)
ul1()
+ ul2()
//check(&l, 0)
time.Sleep(time.Second * 3)
}
// ulock rlock lock
-func Panic_Test5(t *testing.T) {
+func Test5Panic_(t *testing.T) {
+ t.Parallel()
var l RWMutex
+ l.RecLock(10, time.Second, time.Second)
+
+ l.RLock()()
ul := l.RLock()
//check(&l, 1)
time.AfterFunc(time.Millisecond*1500, func() {
ul()
})
c := time.Now()
- ul1 := l.Lock(time.Second)
+ ul1 := l.Lock(time.Second * 2)
//check(&l, 0)
if time.Since(c) < time.Second {
t.Fail()
*/
func BenchmarkRlock(b *testing.B) {
var lock1 RWMutex
+ lock1.RecLock(5, time.Second, time.Second)
for i := 0; i < b.N; i++ {
lock1.RLock()()
}
var lock1 sync.RWMutex
for i := 0; i < b.N; i++ {
lock1.RLock()
+ _ = 1
lock1.RUnlock()
}
}