From cadb3f8e70763d9f81c675070cec55f0e7235c1f Mon Sep 17 00:00:00 2001 From: qydysky Date: Sun, 14 May 2023 01:31:21 +0800 Subject: [PATCH] add --- msgq/Msgq.go | 108 +++++++++++++++++++++++++++++++++++----------- msgq/Msgq_test.go | 12 +++++- sync/Map.go | 8 ++++ 3 files changed, 102 insertions(+), 26 deletions(-) diff --git a/msgq/Msgq.go b/msgq/Msgq.go index 6297be2..5f1c3a0 100644 --- a/msgq/Msgq.go +++ b/msgq/Msgq.go @@ -7,6 +7,7 @@ import ( "runtime" "sync/atomic" "time" + "unsafe" signal "github.com/qydysky/part/signal" sync "github.com/qydysky/part/sync" @@ -17,6 +18,7 @@ type Msgq struct { funcs *list.List someNeedRemove atomic.Int32 lock sync.RWMutex + runTag sync.Map } type FuncMap map[string]func(any) (disable bool) @@ -27,11 +29,11 @@ func New() *Msgq { return m } -func NewTo(to ...time.Duration) *Msgq { +func NewTo(to time.Duration) *Msgq { m := new(Msgq) m.funcs = list.New() - if len(to) > 0 { - m.to = append(m.to, to[0]) + if to != 0 { + m.to = append(m.to, to) } else { m.to = append(m.to, time.Second*30) } @@ -133,11 +135,25 @@ type Msgq_tag_data struct { } func (m *Msgq) Push_tag(Tag string, Data any) { - defer func() { - if e := recover(); e != nil { - panic(fmt.Sprintf("Push_tag(%s,%v) > %v", Tag, Data, e)) - } - }() + if len(m.to) > 0 { + ptr := uintptr(unsafe.Pointer(&Data)) + m.runTag.Store(ptr, "Push_tag(`"+Tag+"`,...)") + defer func() { + if e := recover(); e != nil { + m.runTag.Range(func(key, value any) bool { + if key == ptr { + fmt.Printf("%v panic > %v\n", value, e) + } else { + fmt.Printf("%v running\n", value) + } + return true + }) + m.runTag.ClearAll() + panic(e) + } + m.runTag.Delete(ptr) + }() + } m.Push(Msgq_tag_data{ Tag: Tag, Data: Data, @@ -145,11 +161,25 @@ func (m *Msgq) Push_tag(Tag string, Data any) { } func (m *Msgq) PushLock_tag(Tag string, Data any) { - defer func() { - if e := recover(); e != nil { - panic(fmt.Sprintf("PushLock_tag(%s,%v) > %v", Tag, Data, e)) - } - }() + if len(m.to) > 0 { + ptr := uintptr(unsafe.Pointer(&Data)) + m.runTag.Store(ptr, "PushLock_tag(`"+Tag+"`,...)") + defer func() { + if e := recover(); e != nil { + m.runTag.Range(func(key, value any) bool { + if key == ptr { + fmt.Printf("%v panic > %v\n", value, e) + } else { + fmt.Printf("%v running\n", value) + } + return true + }) + m.runTag.ClearAll() + panic(e) + } + m.runTag.Delete(ptr) + }() + } m.PushLock(Msgq_tag_data{ Tag: Tag, Data: Data, @@ -245,18 +275,32 @@ func NewType[T any]() *MsgType[T] { } } -func NewTypeTo[T any](to ...time.Duration) *MsgType[T] { +func NewTypeTo[T any](to time.Duration) *MsgType[T] { return &MsgType[T]{ - m: NewTo(to...), + m: NewTo(to), } } func (m *MsgType[T]) Push_tag(Tag string, Data T) { - defer func() { - if e := recover(); e != nil { - panic(fmt.Sprintf("Push_tag(%s,%v) > %v", Tag, Data, e)) - } - }() + if len(m.m.to) > 0 { + ptr := uintptr(unsafe.Pointer(&Data)) + m.m.runTag.Store(ptr, "[T]Push_tag(`"+Tag+"`,...)") + defer func() { + if e := recover(); e != nil { + m.m.runTag.Range(func(key, value any) bool { + if key == ptr { + fmt.Printf("%v panic > %v\n", value, e) + } else { + fmt.Printf("%v running\n", value) + } + return true + }) + m.m.runTag.ClearAll() + panic(e) + } + m.m.runTag.Delete(ptr) + }() + } m.m.Push(Msgq_tag_data{ Tag: Tag, Data: Data, @@ -264,11 +308,25 @@ func (m *MsgType[T]) Push_tag(Tag string, Data T) { } func (m *MsgType[T]) PushLock_tag(Tag string, Data T) { - defer func() { - if e := recover(); e != nil { - panic(fmt.Sprintf("PushLock_tag(%s,%v) > %v", Tag, Data, e)) - } - }() + if len(m.m.to) > 0 { + ptr := uintptr(unsafe.Pointer(&Data)) + m.m.runTag.Store(ptr, "[T]PushLock_tag(`"+Tag+"`,...)") + defer func() { + if e := recover(); e != nil { + m.m.runTag.Range(func(key, value any) bool { + if key == ptr { + fmt.Printf("%v panic > %v\n", value, e) + } else { + fmt.Printf("%v running\n", value) + } + return true + }) + m.m.runTag.ClearAll() + panic(e) + } + m.m.runTag.Delete(ptr) + }() + } m.m.PushLock(Msgq_tag_data{ Tag: Tag, Data: Data, diff --git a/msgq/Msgq_test.go b/msgq/Msgq_test.go index 655cdb4..2a4f322 100644 --- a/msgq/Msgq_test.go +++ b/msgq/Msgq_test.go @@ -141,7 +141,7 @@ func BenchmarkXxx(b *testing.B) { func TestPushLock(t *testing.T) { defer func() { - if e := recover(); e.(string) != "Push_tag(test,) > PushLock_tag(lock,) > timeout to wait rlock, rlc:1" { + if e := recover(); e.(string) != "timeout to wait rlock, rlc:1" { t.Fatal(e) } }() @@ -154,6 +154,16 @@ func TestPushLock(t *testing.T) { t.Fatal() } +func Benchmark_1(b *testing.B) { + mq := NewTo(time.Second) + mq.Pull_tag_only(`test`, func(a any) (disable bool) { + return false + }) + for i := 0; i < b.N; i++ { + mq.PushLock_tag(`test`, i) + } +} + func Test_Pull_tag_chan(t *testing.T) { mq := New() ctx, cf := context.WithCancel(context.Background()) diff --git a/sync/Map.go b/sync/Map.go index 7553fcf..12b5d31 100644 --- a/sync/Map.go +++ b/sync/Map.go @@ -34,6 +34,14 @@ func (t *Map) Delete(k any) { } } +func (t *Map) ClearAll() { + t.m.Range(func(key, _ any) bool { + t.m.Delete(key) + return true + }) + t.size.Store(0) +} + func (t *Map) Len() int { return int(t.size.Load()) } -- 2.39.2