"time"
"unsafe"
- signal "github.com/qydysky/part/signal"
psync "github.com/qydysky/part/sync"
)
to []time.Duration
funcs *list.List
- call atomic.Int64
- removeList []*list.Element
- removelock psync.RWMutex
+ someNeedRemove atomic.Bool
+ lock psync.RWMutex
+ runTag psync.Map
+}
- lock psync.RWMutex
- runTag psync.Map
+type msgqItem struct {
+ running atomic.Int32
+ disable atomic.Bool
+ f *func(any) (disable bool)
}
type FuncMap map[string]func(any) (disable bool)
return m
}
-func NewTo(waitTo time.Duration, runTo ...time.Duration) *Msgq {
- fmt.Println("Warn: NewTo is slow, consider New")
+// to[0]:timeout to wait to[1]:timeout to run
+func NewTo(to ...time.Duration) *Msgq {
m := new(Msgq)
m.funcs = list.New()
- m.to = append([]time.Duration{waitTo}, runTo...)
+ m.to = to
return m
}
+func (m *Msgq) register(mp *msgqItem) {
+ ul := m.lock.Lock()
+ m.funcs.PushBack(mp)
+ ul()
+}
+
func (m *Msgq) Register(f func(any) (disable bool)) {
- ul := m.lock.Lock(m.to...)
- m.funcs.PushBack(f)
+ m.register(&msgqItem{
+ f: &f,
+ })
+}
+
+func (m *Msgq) register_front(mp *msgqItem) {
+ ul := m.lock.Lock()
+ m.funcs.PushFront(mp)
ul()
}
func (m *Msgq) Register_front(f func(any) (disable bool)) {
- ul := m.lock.Lock(m.to...)
- m.funcs.PushFront(f)
- ul()
+ m.register_front(&msgqItem{
+ f: &f,
+ })
}
func (m *Msgq) Push(msg any) {
- isfirst := m.call.Add(1)
-
ul := m.lock.RLock(m.to...)
+
for el := m.funcs.Front(); el != nil; el = el.Next() {
- if disable := el.Value.(func(any) bool)(msg); disable {
- rul := m.removelock.Lock()
- m.removeList = append(m.removeList, el)
- rul()
+ mi := el.Value.(*msgqItem)
+ if mi.disable.Load() {
+ continue
}
- }
- ul()
-
- if isfirst == 1 {
- rul := m.removelock.Lock()
- for i := 0; i < len(m.removeList); i++ {
- m.funcs.Remove(m.removeList[i])
+ mi.running.Add(1)
+ if disable := (*mi.f)(msg); disable {
+ mi.disable.Store(true)
+ m.someNeedRemove.Store(true)
}
- rul()
+ mi.running.Add(-1)
}
- m.call.Add(-1)
+ ul()
+ if m.someNeedRemove.Load() {
+ m.removeDisable()
+ }
}
func (m *Msgq) PushLock(msg any) {
- isfirst := m.call.Add(1)
-
ul := m.lock.Lock(m.to...)
- defer ul()
for el := m.funcs.Front(); el != nil; el = el.Next() {
- if disable := el.Value.(func(any) bool)(msg); disable {
- rul := m.removelock.Lock()
- m.removeList = append(m.removeList, el)
- rul()
+ mi := el.Value.(*msgqItem)
+ if mi.disable.Load() {
+ continue
}
- }
-
- if isfirst == 1 {
- rul := m.removelock.Lock()
- for i := 0; i < len(m.removeList); i++ {
- m.funcs.Remove(m.removeList[i])
+ mi.running.Add(1)
+ if disable := (*mi.f)(msg); disable {
+ mi.disable.Store(true)
+ m.someNeedRemove.Store(true)
}
- rul()
+ mi.running.Add(-1)
}
- m.call.Add(-1)
+ ul()
+ if m.someNeedRemove.Load() {
+ m.removeDisable()
+ }
}
func (m *Msgq) ClearAll() {
- isfirst := m.call.Add(1)
-
- rul := m.removelock.Lock()
for el := m.funcs.Front(); el != nil; el = el.Next() {
- m.removeList = append(m.removeList, el)
+ mi := el.Value.(*msgqItem)
+ mi.disable.Store(true)
+ m.someNeedRemove.Store(true)
}
- rul()
+}
+
+func (m *Msgq) removeDisable() {
+ ul := m.lock.Lock(m.to...)
+ defer ul()
- if isfirst == 1 {
- rul := m.removelock.Lock()
- for i := 0; i < len(m.removeList); i++ {
- m.funcs.Remove(m.removeList[i])
+ m.someNeedRemove.Store(false)
+ for el := m.funcs.Front(); el != nil; el = el.Next() {
+ mi := el.Value.(*msgqItem)
+ if mi.disable.Load() && mi.running.Load() == 0 {
+ m.funcs.Remove(el)
}
- rul()
}
-
- m.call.Add(-1)
}
type Msgq_tag_data struct {
m.runTag.Delete(ptr)
}()
}
- m.Push(Msgq_tag_data{
- Tag: Tag,
- Data: Data,
- })
+ {
+ /*
+ m.m.Push(Msgq_tag_data{
+ Tag: Tag,
+ Data: Data,
+ })
+ */
+ ul := m.lock.RLock(m.to...)
+
+ for el := m.funcs.Front(); el != nil; el = el.Next() {
+ mi := el.Value.(*msgqItem)
+ if mi.disable.Load() {
+ continue
+ }
+ mi.running.Add(1)
+ if disable := (*mi.f)(&Msgq_tag_data{
+ Tag: Tag,
+ Data: Data,
+ }); disable {
+ mi.disable.Store(true)
+ m.someNeedRemove.Store(true)
+ }
+ mi.running.Add(-1)
+ }
+
+ ul()
+ if m.someNeedRemove.Load() {
+ m.removeDisable()
+ }
+ }
}
// 不能放置在由Push_tag、PushLock_tag调用的同步Pull中
m.runTag.Delete(ptr)
}()
}
- m.PushLock(Msgq_tag_data{
- Tag: Tag,
- Data: Data,
- })
+ {
+ /*
+ m.m.PushLock(Msgq_tag_data{
+ Tag: Tag,
+ Data: Data,
+ })
+ */
+ ul := m.lock.Lock(m.to...)
+
+ for el := m.funcs.Front(); el != nil; el = el.Next() {
+ mi := el.Value.(*msgqItem)
+ if mi.disable.Load() {
+ continue
+ }
+ mi.running.Add(1)
+ if disable := (*mi.f)(&Msgq_tag_data{
+ Tag: Tag,
+ Data: Data,
+ }); disable {
+ mi.disable.Store(true)
+ m.someNeedRemove.Store(true)
+ }
+ mi.running.Add(-1)
+ }
+
+ ul()
+ if m.someNeedRemove.Load() {
+ m.removeDisable()
+ }
+ }
}
func (m *Msgq) Pull_tag_chan(key string, size int, ctx context.Context) <-chan any {
var ch = make(chan any, size)
- m.Register(func(data any) bool {
- if d, ok := data.(Msgq_tag_data); ok && d.Tag == key {
+ var f1 = func(data any) bool {
+ if d, ok := data.(*Msgq_tag_data); ok && d.Tag == key {
select {
case <-ctx.Done():
close(ch)
}
}
return false
+ }
+ m.register_front(&msgqItem{
+ f: &f1,
})
return ch
}
func (m *Msgq) Pull_tag_only(key string, f func(any) (disable bool)) {
- m.Register(func(data any) (disable bool) {
- if d, ok := data.(Msgq_tag_data); ok && d.Tag == key {
+ var f1 = func(data any) (disable bool) {
+ if d, ok := data.(*Msgq_tag_data); ok && d.Tag == key {
return f(d.Data)
}
return false
+ }
+ m.register_front(&msgqItem{
+ f: &f1,
})
}
func (m *Msgq) Pull_tag(func_map map[string]func(any) (disable bool)) {
- m.Register(func(data any) (disable bool) {
- if d, ok := data.(Msgq_tag_data); ok {
+ var f1 = func(data any) (disable bool) {
+ if d, ok := data.(*Msgq_tag_data); ok {
if f, ok := func_map[d.Tag]; ok {
return f(d.Data)
}
}
return false
+ }
+ m.register_front(&msgqItem{
+ f: &f1,
})
}
func (m *Msgq) Pull_tag_async_only(key string, f func(any) (disable bool)) {
- var disable = signal.Init()
-
- m.Register_front(func(data any) bool {
- if !disable.Islive() {
- return true
- }
- if d, ok := data.(Msgq_tag_data); ok && d.Tag == key {
+ var mi = msgqItem{}
+ var f1 = func(data any) bool {
+ if d, ok := data.(*Msgq_tag_data); ok {
go func() {
if f(d.Data) {
- disable.Done()
+ mi.disable.Store(true)
+ m.someNeedRemove.Store(true)
}
}()
}
return false
- })
+ }
+ mi.f = &f1
+ m.register_front(&mi)
}
func (m *Msgq) Pull_tag_async(func_map map[string]func(any) (disable bool)) {
- var disable = signal.Init()
-
- m.Register_front(func(data any) bool {
- if !disable.Islive() {
- return true
- }
- if d, ok := data.(Msgq_tag_data); ok {
+ var mi = msgqItem{}
+ var f = func(data any) bool {
+ if d, ok := data.(*Msgq_tag_data); ok {
if f, ok := func_map[d.Tag]; ok {
go func() {
if f(d.Data) {
- disable.Done()
+ mi.disable.Store(true)
+ m.someNeedRemove.Store(true)
}
}()
}
}
return false
- })
+ }
+ mi.f = &f
+ m.register_front(&mi)
}
type MsgType[T any] struct {
- to []time.Duration
- funcs *list.List
-
- call atomic.Int64
- removeList []*list.Element
- removelock psync.RWMutex
-
- lock psync.RWMutex
- runTag psync.Map
+ m *Msgq
}
type MsgType_tag_data[T any] struct {
Tag string
- Data T
+ Data *T
}
func NewType[T any]() *MsgType[T] {
- m := new(MsgType[T])
- m.funcs = list.New()
- return m
-}
-
-func NewTypeTo[T any](waitTo time.Duration, runTo ...time.Duration) *MsgType[T] {
- fmt.Println("Warn: NewTypeTo[T any] is slow, consider NewType[T any]")
- m := new(MsgType[T])
- m.funcs = list.New()
- m.to = append([]time.Duration{waitTo}, runTo...)
- return m
-}
-
-func (m *MsgType[T]) push(msg MsgType_tag_data[T]) {
- isfirst := m.call.Add(1)
-
- ul := m.lock.RLock(m.to...)
- for el := m.funcs.Front(); el != nil; el = el.Next() {
- if disable := el.Value.(func(MsgType_tag_data[T]) bool)(msg); disable {
- rul := m.removelock.Lock()
- m.removeList = append(m.removeList, el)
- rul()
- }
- }
- ul()
-
- if isfirst == 1 {
- rul := m.removelock.Lock()
- for i := 0; i < len(m.removeList); i++ {
- m.funcs.Remove(m.removeList[i])
- }
- rul()
- }
-
- m.call.Add(-1)
+ return &MsgType[T]{m: New()}
}
-func (m *MsgType[T]) pushLock(msg MsgType_tag_data[T]) {
- isfirst := m.call.Add(1)
-
- ul := m.lock.Lock(m.to...)
- defer ul()
-
- for el := m.funcs.Front(); el != nil; el = el.Next() {
- if disable := el.Value.(func(MsgType_tag_data[T]) bool)(msg); disable {
- rul := m.removelock.Lock()
- m.removeList = append(m.removeList, el)
- rul()
- }
- }
-
- if isfirst == 1 {
- rul := m.removelock.Lock()
- for i := 0; i < len(m.removeList); i++ {
- m.funcs.Remove(m.removeList[i])
- }
- rul()
- }
-
- m.call.Add(-1)
+// to[0]:timeout to wait to[1]:timeout to run
+func NewTypeTo[T any](to ...time.Duration) *MsgType[T] {
+ return &MsgType[T]{m: NewTo(to...)}
}
-func (m *MsgType[T]) register(f func(MsgType_tag_data[T]) (disable bool)) {
- ul := m.lock.Lock(m.to...)
- m.funcs.PushBack(f)
- ul()
-}
-
-func (m *MsgType[T]) register_front(f func(MsgType_tag_data[T]) (disable bool)) {
- ul := m.lock.Lock(m.to...)
- m.funcs.PushFront(f)
- ul()
+func (m *MsgType[T]) ClearAll() {
+ m.m.ClearAll()
}
// 不能放置在由PushLock_tag调用的同步Pull中
func (m *MsgType[T]) Push_tag(Tag string, Data T) {
- if len(m.to) > 0 {
+ if len(m.m.to) > 0 {
ptr := uintptr(unsafe.Pointer(&Data))
- m.runTag.Store(ptr, "[T]Push_tag(`"+Tag+"`,...)")
+ m.m.runTag.Store(ptr, "Push_tag(`"+Tag+"`,...)")
defer func() {
if e := recover(); e != nil {
- m.runTag.Range(func(key, value any) bool {
+ m.m.runTag.Range(func(key, value any) bool {
if key == ptr {
fmt.Printf("%v panic > %v\n", value, e)
} else {
}
return true
})
- m.runTag.ClearAll()
+ m.m.runTag.ClearAll()
panic(e)
}
- m.runTag.Delete(ptr)
+ m.m.runTag.Delete(ptr)
}()
}
- m.push(MsgType_tag_data[T]{
- Tag: Tag,
- Data: Data,
- })
+ {
+ /*
+ m.m.Push(Msgq_tag_data{
+ Tag: Tag,
+ Data: Data,
+ })
+ */
+ ul := m.m.lock.RLock(m.m.to...)
+
+ for el := m.m.funcs.Front(); el != nil; el = el.Next() {
+ mi := el.Value.(*msgqItem)
+ if mi.disable.Load() {
+ continue
+ }
+ mi.running.Add(1)
+ if disable := (*mi.f)(&MsgType_tag_data[T]{
+ Tag: Tag,
+ Data: &Data,
+ }); disable {
+ mi.disable.Store(true)
+ m.m.someNeedRemove.Store(true)
+ }
+ mi.running.Add(-1)
+ }
+
+ ul()
+ if m.m.someNeedRemove.Load() {
+ m.m.removeDisable()
+ }
+ }
}
// 不能放置在由Push_tag、PushLock_tag调用的同步Pull中
func (m *MsgType[T]) PushLock_tag(Tag string, Data T) {
- if len(m.to) > 0 {
+ if len(m.m.to) > 0 {
ptr := uintptr(unsafe.Pointer(&Data))
- m.runTag.Store(ptr, "[T]PushLock_tag(`"+Tag+"`,...)")
+ m.m.runTag.Store(ptr, "PushLock_tag(`"+Tag+"`,...)")
defer func() {
if e := recover(); e != nil {
- m.runTag.Range(func(key, value any) bool {
+ m.m.runTag.Range(func(key, value any) bool {
if key == ptr {
fmt.Printf("%v panic > %v\n", value, e)
} else {
}
return true
})
- m.runTag.ClearAll()
+ m.m.runTag.ClearAll()
panic(e)
}
- m.runTag.Delete(ptr)
+ m.m.runTag.Delete(ptr)
}()
}
- m.pushLock(MsgType_tag_data[T]{
- Tag: Tag,
- Data: Data,
- })
-}
-
-func (m *MsgType[T]) ClearAll() {
- isfirst := m.call.Add(1)
-
- rul := m.removelock.Lock()
- for el := m.funcs.Front(); el != nil; el = el.Next() {
- m.removeList = append(m.removeList, el)
- }
- rul()
+ {
+ /*
+ m.m.PushLock(Msgq_tag_data{
+ Tag: Tag,
+ Data: Data,
+ })
+ */
+ ul := m.m.lock.Lock(m.m.to...)
+
+ for el := m.m.funcs.Front(); el != nil; el = el.Next() {
+ mi := el.Value.(*msgqItem)
+ if mi.disable.Load() {
+ continue
+ }
+ mi.running.Add(1)
+ if disable := (*mi.f)(&MsgType_tag_data[T]{
+ Tag: Tag,
+ Data: &Data,
+ }); disable {
+ mi.disable.Store(true)
+ m.m.someNeedRemove.Store(true)
+ }
+ mi.running.Add(-1)
+ }
- if isfirst == 1 {
- rul := m.removelock.Lock()
- for i := 0; i < len(m.removeList); i++ {
- m.funcs.Remove(m.removeList[i])
+ ul()
+ if m.m.someNeedRemove.Load() {
+ m.m.removeDisable()
}
- rul()
}
-
- m.call.Add(-1)
}
func (m *MsgType[T]) Pull_tag_chan(key string, size int, ctx context.Context) <-chan T {
var ch = make(chan T, size)
- m.register(func(data MsgType_tag_data[T]) bool {
- if data.Tag == key {
- select {
- case <-ctx.Done():
- close(ch)
- return true
- default:
- for len(ch) != 0 {
- <-ch
+ var f = func(data any) bool {
+ if data1, ok := data.(*MsgType_tag_data[T]); ok {
+ if data1.Tag == key {
+ select {
+ case <-ctx.Done():
+ close(ch)
+ return true
+ default:
+ for len(ch) != 0 {
+ <-ch
+ }
+ ch <- *data1.Data
}
- ch <- data.Data
}
}
return false
+ }
+ m.m.register(&msgqItem{
+ f: &f,
})
return ch
}
func (m *MsgType[T]) Pull_tag_only(key string, f func(T) (disable bool)) {
- m.register(func(data MsgType_tag_data[T]) (disable bool) {
- if data.Tag == key {
- return f(data.Data)
+ var f1 = func(data any) (disable bool) {
+ if data1, ok := data.(*MsgType_tag_data[T]); ok {
+ if data1.Tag == key {
+ return f(*data1.Data)
+ }
}
return false
+ }
+ m.m.register(&msgqItem{
+ f: &f1,
})
}
func (m *MsgType[T]) Pull_tag(func_map map[string]func(T) (disable bool)) {
- m.register(func(data MsgType_tag_data[T]) (disable bool) {
- if f, ok := func_map[data.Tag]; ok {
- return f(data.Data)
+ var f = func(data any) (disable bool) {
+ if data1, ok := data.(*MsgType_tag_data[T]); ok {
+ if f, ok := func_map[data1.Tag]; ok {
+ return f(*data1.Data)
+ }
}
return false
+ }
+ m.m.register(&msgqItem{
+ f: &f,
})
}
func (m *MsgType[T]) Pull_tag_async_only(key string, f func(T) (disable bool)) {
- var disable = signal.Init()
-
- m.register_front(func(data MsgType_tag_data[T]) bool {
- if !disable.Islive() {
- return true
- }
- if data.Tag == key {
+ var mi = msgqItem{}
+ var f1 = func(data any) bool {
+ if d, ok := data.(*MsgType_tag_data[T]); ok {
go func() {
- if f(data.Data) {
- disable.Done()
+ if f(*d.Data) {
+ mi.disable.Store(true)
+ m.m.someNeedRemove.Store(true)
}
}()
}
return false
- })
+ }
+ mi.f = &f1
+ m.m.register_front(&mi)
}
func (m *MsgType[T]) Pull_tag_async(func_map map[string]func(T) (disable bool)) {
- var disable = signal.Init()
-
- m.register_front(func(data MsgType_tag_data[T]) bool {
- if !disable.Islive() {
- return true
- }
- if f, ok := func_map[data.Tag]; ok {
- go func() {
- if f(data.Data) {
- disable.Done()
- }
- }()
+ var mi = msgqItem{}
+ var f = func(data any) bool {
+ if d, ok := data.(*MsgType_tag_data[T]); ok {
+ if f, ok := func_map[d.Tag]; ok {
+ go func() {
+ if f(*d.Data) {
+ mi.disable.Store(true)
+ m.m.someNeedRemove.Store(true)
+ }
+ }()
+ }
}
return false
- })
+ }
+ mi.f = &f
+ m.m.register_front(&mi)
}