]> 127.0.0.1 Git - part/.git/commitdiff
add v0.26.5
authorqydysky <qydysky@foxmail.com>
Sat, 13 May 2023 17:31:21 +0000 (01:31 +0800)
committerqydysky <qydysky@foxmail.com>
Sat, 13 May 2023 17:31:21 +0000 (01:31 +0800)
msgq/Msgq.go
msgq/Msgq_test.go
sync/Map.go

index 6297be228745c2c21a75da3126abe41e98cbc056..5f1c3a0502eebb9861db9e80d76f54ed9324ac88 100644 (file)
@@ -7,6 +7,7 @@ import (
        "runtime"
        "sync/atomic"
        "time"
+       "unsafe"
 
        signal "github.com/qydysky/part/signal"
        sync "github.com/qydysky/part/sync"
@@ -17,6 +18,7 @@ type Msgq struct {
        funcs          *list.List
        someNeedRemove atomic.Int32
        lock           sync.RWMutex
+       runTag         sync.Map
 }
 
 type FuncMap map[string]func(any) (disable bool)
@@ -27,11 +29,11 @@ func New() *Msgq {
        return m
 }
 
-func NewTo(to ...time.Duration) *Msgq {
+func NewTo(to time.Duration) *Msgq {
        m := new(Msgq)
        m.funcs = list.New()
-       if len(to) > 0 {
-               m.to = append(m.to, to[0])
+       if to != 0 {
+               m.to = append(m.to, to)
        } else {
                m.to = append(m.to, time.Second*30)
        }
@@ -133,11 +135,25 @@ type Msgq_tag_data struct {
 }
 
 func (m *Msgq) Push_tag(Tag string, Data any) {
-       defer func() {
-               if e := recover(); e != nil {
-                       panic(fmt.Sprintf("Push_tag(%s,%v) > %v", Tag, Data, e))
-               }
-       }()
+       if len(m.to) > 0 {
+               ptr := uintptr(unsafe.Pointer(&Data))
+               m.runTag.Store(ptr, "Push_tag(`"+Tag+"`,...)")
+               defer func() {
+                       if e := recover(); e != nil {
+                               m.runTag.Range(func(key, value any) bool {
+                                       if key == ptr {
+                                               fmt.Printf("%v panic > %v\n", value, e)
+                                       } else {
+                                               fmt.Printf("%v running\n", value)
+                                       }
+                                       return true
+                               })
+                               m.runTag.ClearAll()
+                               panic(e)
+                       }
+                       m.runTag.Delete(ptr)
+               }()
+       }
        m.Push(Msgq_tag_data{
                Tag:  Tag,
                Data: Data,
@@ -145,11 +161,25 @@ func (m *Msgq) Push_tag(Tag string, Data any) {
 }
 
 func (m *Msgq) PushLock_tag(Tag string, Data any) {
-       defer func() {
-               if e := recover(); e != nil {
-                       panic(fmt.Sprintf("PushLock_tag(%s,%v) > %v", Tag, Data, e))
-               }
-       }()
+       if len(m.to) > 0 {
+               ptr := uintptr(unsafe.Pointer(&Data))
+               m.runTag.Store(ptr, "PushLock_tag(`"+Tag+"`,...)")
+               defer func() {
+                       if e := recover(); e != nil {
+                               m.runTag.Range(func(key, value any) bool {
+                                       if key == ptr {
+                                               fmt.Printf("%v panic > %v\n", value, e)
+                                       } else {
+                                               fmt.Printf("%v running\n", value)
+                                       }
+                                       return true
+                               })
+                               m.runTag.ClearAll()
+                               panic(e)
+                       }
+                       m.runTag.Delete(ptr)
+               }()
+       }
        m.PushLock(Msgq_tag_data{
                Tag:  Tag,
                Data: Data,
@@ -245,18 +275,32 @@ func NewType[T any]() *MsgType[T] {
        }
 }
 
-func NewTypeTo[T any](to ...time.Duration) *MsgType[T] {
+func NewTypeTo[T any](to time.Duration) *MsgType[T] {
        return &MsgType[T]{
-               m: NewTo(to...),
+               m: NewTo(to),
        }
 }
 
 func (m *MsgType[T]) Push_tag(Tag string, Data T) {
-       defer func() {
-               if e := recover(); e != nil {
-                       panic(fmt.Sprintf("Push_tag(%s,%v) > %v", Tag, Data, e))
-               }
-       }()
+       if len(m.m.to) > 0 {
+               ptr := uintptr(unsafe.Pointer(&Data))
+               m.m.runTag.Store(ptr, "[T]Push_tag(`"+Tag+"`,...)")
+               defer func() {
+                       if e := recover(); e != nil {
+                               m.m.runTag.Range(func(key, value any) bool {
+                                       if key == ptr {
+                                               fmt.Printf("%v panic > %v\n", value, e)
+                                       } else {
+                                               fmt.Printf("%v running\n", value)
+                                       }
+                                       return true
+                               })
+                               m.m.runTag.ClearAll()
+                               panic(e)
+                       }
+                       m.m.runTag.Delete(ptr)
+               }()
+       }
        m.m.Push(Msgq_tag_data{
                Tag:  Tag,
                Data: Data,
@@ -264,11 +308,25 @@ func (m *MsgType[T]) Push_tag(Tag string, Data T) {
 }
 
 func (m *MsgType[T]) PushLock_tag(Tag string, Data T) {
-       defer func() {
-               if e := recover(); e != nil {
-                       panic(fmt.Sprintf("PushLock_tag(%s,%v) > %v", Tag, Data, e))
-               }
-       }()
+       if len(m.m.to) > 0 {
+               ptr := uintptr(unsafe.Pointer(&Data))
+               m.m.runTag.Store(ptr, "[T]PushLock_tag(`"+Tag+"`,...)")
+               defer func() {
+                       if e := recover(); e != nil {
+                               m.m.runTag.Range(func(key, value any) bool {
+                                       if key == ptr {
+                                               fmt.Printf("%v panic > %v\n", value, e)
+                                       } else {
+                                               fmt.Printf("%v running\n", value)
+                                       }
+                                       return true
+                               })
+                               m.m.runTag.ClearAll()
+                               panic(e)
+                       }
+                       m.m.runTag.Delete(ptr)
+               }()
+       }
        m.m.PushLock(Msgq_tag_data{
                Tag:  Tag,
                Data: Data,
index 655cdb4fcf959bf68969207b78299133797c8428..2a4f3221b3dc07e018993bf0704ea7411389925e 100644 (file)
@@ -141,7 +141,7 @@ func BenchmarkXxx(b *testing.B) {
 
 func TestPushLock(t *testing.T) {
        defer func() {
-               if e := recover(); e.(string) != "Push_tag(test,<nil>) > PushLock_tag(lock,<nil>) > timeout to wait rlock, rlc:1" {
+               if e := recover(); e.(string) != "timeout to wait rlock, rlc:1" {
                        t.Fatal(e)
                }
        }()
@@ -154,6 +154,16 @@ func TestPushLock(t *testing.T) {
        t.Fatal()
 }
 
+func Benchmark_1(b *testing.B) {
+       mq := NewTo(time.Second)
+       mq.Pull_tag_only(`test`, func(a any) (disable bool) {
+               return false
+       })
+       for i := 0; i < b.N; i++ {
+               mq.PushLock_tag(`test`, i)
+       }
+}
+
 func Test_Pull_tag_chan(t *testing.T) {
        mq := New()
        ctx, cf := context.WithCancel(context.Background())
index 7553fcfb0b2dd0fb46c5e8a0dc9702adae630a85..12b5d313729040c14860ed8d03dc5638f63b373c 100644 (file)
@@ -34,6 +34,14 @@ func (t *Map) Delete(k any) {
        }
 }
 
+func (t *Map) ClearAll() {
+       t.m.Range(func(key, _ any) bool {
+               t.m.Delete(key)
+               return true
+       })
+       t.size.Store(0)
+}
+
 func (t *Map) Len() int {
        return int(t.size.Load())
 }