"runtime"
"sync/atomic"
"time"
+ "unsafe"
signal "github.com/qydysky/part/signal"
sync "github.com/qydysky/part/sync"
funcs *list.List
someNeedRemove atomic.Int32
lock sync.RWMutex
+ runTag sync.Map
}
type FuncMap map[string]func(any) (disable bool)
return m
}
-func NewTo(to ...time.Duration) *Msgq {
+func NewTo(to time.Duration) *Msgq {
m := new(Msgq)
m.funcs = list.New()
- if len(to) > 0 {
- m.to = append(m.to, to[0])
+ if to != 0 {
+ m.to = append(m.to, to)
} else {
m.to = append(m.to, time.Second*30)
}
}
func (m *Msgq) Push_tag(Tag string, Data any) {
- defer func() {
- if e := recover(); e != nil {
- panic(fmt.Sprintf("Push_tag(%s,%v) > %v", Tag, Data, e))
- }
- }()
+ if len(m.to) > 0 {
+ ptr := uintptr(unsafe.Pointer(&Data))
+ m.runTag.Store(ptr, "Push_tag(`"+Tag+"`,...)")
+ defer func() {
+ if e := recover(); e != nil {
+ m.runTag.Range(func(key, value any) bool {
+ if key == ptr {
+ fmt.Printf("%v panic > %v\n", value, e)
+ } else {
+ fmt.Printf("%v running\n", value)
+ }
+ return true
+ })
+ m.runTag.ClearAll()
+ panic(e)
+ }
+ m.runTag.Delete(ptr)
+ }()
+ }
m.Push(Msgq_tag_data{
Tag: Tag,
Data: Data,
}
func (m *Msgq) PushLock_tag(Tag string, Data any) {
- defer func() {
- if e := recover(); e != nil {
- panic(fmt.Sprintf("PushLock_tag(%s,%v) > %v", Tag, Data, e))
- }
- }()
+ if len(m.to) > 0 {
+ ptr := uintptr(unsafe.Pointer(&Data))
+ m.runTag.Store(ptr, "PushLock_tag(`"+Tag+"`,...)")
+ defer func() {
+ if e := recover(); e != nil {
+ m.runTag.Range(func(key, value any) bool {
+ if key == ptr {
+ fmt.Printf("%v panic > %v\n", value, e)
+ } else {
+ fmt.Printf("%v running\n", value)
+ }
+ return true
+ })
+ m.runTag.ClearAll()
+ panic(e)
+ }
+ m.runTag.Delete(ptr)
+ }()
+ }
m.PushLock(Msgq_tag_data{
Tag: Tag,
Data: Data,
}
}
-func NewTypeTo[T any](to ...time.Duration) *MsgType[T] {
+func NewTypeTo[T any](to time.Duration) *MsgType[T] {
return &MsgType[T]{
- m: NewTo(to...),
+ m: NewTo(to),
}
}
func (m *MsgType[T]) Push_tag(Tag string, Data T) {
- defer func() {
- if e := recover(); e != nil {
- panic(fmt.Sprintf("Push_tag(%s,%v) > %v", Tag, Data, e))
- }
- }()
+ if len(m.m.to) > 0 {
+ ptr := uintptr(unsafe.Pointer(&Data))
+ m.m.runTag.Store(ptr, "[T]Push_tag(`"+Tag+"`,...)")
+ defer func() {
+ if e := recover(); e != nil {
+ m.m.runTag.Range(func(key, value any) bool {
+ if key == ptr {
+ fmt.Printf("%v panic > %v\n", value, e)
+ } else {
+ fmt.Printf("%v running\n", value)
+ }
+ return true
+ })
+ m.m.runTag.ClearAll()
+ panic(e)
+ }
+ m.m.runTag.Delete(ptr)
+ }()
+ }
m.m.Push(Msgq_tag_data{
Tag: Tag,
Data: Data,
}
func (m *MsgType[T]) PushLock_tag(Tag string, Data T) {
- defer func() {
- if e := recover(); e != nil {
- panic(fmt.Sprintf("PushLock_tag(%s,%v) > %v", Tag, Data, e))
- }
- }()
+ if len(m.m.to) > 0 {
+ ptr := uintptr(unsafe.Pointer(&Data))
+ m.m.runTag.Store(ptr, "[T]PushLock_tag(`"+Tag+"`,...)")
+ defer func() {
+ if e := recover(); e != nil {
+ m.m.runTag.Range(func(key, value any) bool {
+ if key == ptr {
+ fmt.Printf("%v panic > %v\n", value, e)
+ } else {
+ fmt.Printf("%v running\n", value)
+ }
+ return true
+ })
+ m.m.runTag.ClearAll()
+ panic(e)
+ }
+ m.m.runTag.Delete(ptr)
+ }()
+ }
m.m.PushLock(Msgq_tag_data{
Tag: Tag,
Data: Data,
func TestPushLock(t *testing.T) {
defer func() {
- if e := recover(); e.(string) != "Push_tag(test,<nil>) > PushLock_tag(lock,<nil>) > timeout to wait rlock, rlc:1" {
+ if e := recover(); e.(string) != "timeout to wait rlock, rlc:1" {
t.Fatal(e)
}
}()
t.Fatal()
}
+func Benchmark_1(b *testing.B) {
+ mq := NewTo(time.Second)
+ mq.Pull_tag_only(`test`, func(a any) (disable bool) {
+ return false
+ })
+ for i := 0; i < b.N; i++ {
+ mq.PushLock_tag(`test`, i)
+ }
+}
+
func Test_Pull_tag_chan(t *testing.T) {
mq := New()
ctx, cf := context.WithCancel(context.Background())