From 0d080581e03d9a6427581965c06ebfde778cb465 Mon Sep 17 00:00:00 2001 From: qydysky Date: Fri, 28 Jul 2023 15:23:07 +0800 Subject: [PATCH] add --- msgq/Msgq.go | 526 ++++++++++++++++++++++++++++----------------------- 1 file changed, 286 insertions(+), 240 deletions(-) diff --git a/msgq/Msgq.go b/msgq/Msgq.go index 80dc6e5..c60b1c2 100644 --- a/msgq/Msgq.go +++ b/msgq/Msgq.go @@ -8,7 +8,6 @@ import ( "time" "unsafe" - signal "github.com/qydysky/part/signal" psync "github.com/qydysky/part/sync" ) @@ -16,12 +15,15 @@ type Msgq struct { to []time.Duration funcs *list.List - call atomic.Int64 - removeList []*list.Element - removelock psync.RWMutex + someNeedRemove atomic.Bool + lock psync.RWMutex + runTag psync.Map +} - lock psync.RWMutex - runTag psync.Map +type msgqItem struct { + running atomic.Int32 + disable atomic.Bool + f *func(any) (disable bool) } type FuncMap map[string]func(any) (disable bool) @@ -32,93 +34,101 @@ func New() *Msgq { return m } -func NewTo(waitTo time.Duration, runTo ...time.Duration) *Msgq { - fmt.Println("Warn: NewTo is slow, consider New") +// to[0]:timeout to wait to[1]:timeout to run +func NewTo(to ...time.Duration) *Msgq { m := new(Msgq) m.funcs = list.New() - m.to = append([]time.Duration{waitTo}, runTo...) + m.to = to return m } +func (m *Msgq) register(mp *msgqItem) { + ul := m.lock.Lock() + m.funcs.PushBack(mp) + ul() +} + func (m *Msgq) Register(f func(any) (disable bool)) { - ul := m.lock.Lock(m.to...) - m.funcs.PushBack(f) + m.register(&msgqItem{ + f: &f, + }) +} + +func (m *Msgq) register_front(mp *msgqItem) { + ul := m.lock.Lock() + m.funcs.PushFront(mp) ul() } func (m *Msgq) Register_front(f func(any) (disable bool)) { - ul := m.lock.Lock(m.to...) - m.funcs.PushFront(f) - ul() + m.register_front(&msgqItem{ + f: &f, + }) } func (m *Msgq) Push(msg any) { - isfirst := m.call.Add(1) - ul := m.lock.RLock(m.to...) + for el := m.funcs.Front(); el != nil; el = el.Next() { - if disable := el.Value.(func(any) bool)(msg); disable { - rul := m.removelock.Lock() - m.removeList = append(m.removeList, el) - rul() + mi := el.Value.(*msgqItem) + if mi.disable.Load() { + continue } - } - ul() - - if isfirst == 1 { - rul := m.removelock.Lock() - for i := 0; i < len(m.removeList); i++ { - m.funcs.Remove(m.removeList[i]) + mi.running.Add(1) + if disable := (*mi.f)(msg); disable { + mi.disable.Store(true) + m.someNeedRemove.Store(true) } - rul() + mi.running.Add(-1) } - m.call.Add(-1) + ul() + if m.someNeedRemove.Load() { + m.removeDisable() + } } func (m *Msgq) PushLock(msg any) { - isfirst := m.call.Add(1) - ul := m.lock.Lock(m.to...) - defer ul() for el := m.funcs.Front(); el != nil; el = el.Next() { - if disable := el.Value.(func(any) bool)(msg); disable { - rul := m.removelock.Lock() - m.removeList = append(m.removeList, el) - rul() + mi := el.Value.(*msgqItem) + if mi.disable.Load() { + continue } - } - - if isfirst == 1 { - rul := m.removelock.Lock() - for i := 0; i < len(m.removeList); i++ { - m.funcs.Remove(m.removeList[i]) + mi.running.Add(1) + if disable := (*mi.f)(msg); disable { + mi.disable.Store(true) + m.someNeedRemove.Store(true) } - rul() + mi.running.Add(-1) } - m.call.Add(-1) + ul() + if m.someNeedRemove.Load() { + m.removeDisable() + } } func (m *Msgq) ClearAll() { - isfirst := m.call.Add(1) - - rul := m.removelock.Lock() for el := m.funcs.Front(); el != nil; el = el.Next() { - m.removeList = append(m.removeList, el) + mi := el.Value.(*msgqItem) + mi.disable.Store(true) + m.someNeedRemove.Store(true) } - rul() +} + +func (m *Msgq) removeDisable() { + ul := m.lock.Lock(m.to...) + defer ul() - if isfirst == 1 { - rul := m.removelock.Lock() - for i := 0; i < len(m.removeList); i++ { - m.funcs.Remove(m.removeList[i]) + m.someNeedRemove.Store(false) + for el := m.funcs.Front(); el != nil; el = el.Next() { + mi := el.Value.(*msgqItem) + if mi.disable.Load() && mi.running.Load() == 0 { + m.funcs.Remove(el) } - rul() } - - m.call.Add(-1) } type Msgq_tag_data struct { @@ -147,10 +157,36 @@ func (m *Msgq) Push_tag(Tag string, Data any) { m.runTag.Delete(ptr) }() } - m.Push(Msgq_tag_data{ - Tag: Tag, - Data: Data, - }) + { + /* + m.m.Push(Msgq_tag_data{ + Tag: Tag, + Data: Data, + }) + */ + ul := m.lock.RLock(m.to...) + + for el := m.funcs.Front(); el != nil; el = el.Next() { + mi := el.Value.(*msgqItem) + if mi.disable.Load() { + continue + } + mi.running.Add(1) + if disable := (*mi.f)(&Msgq_tag_data{ + Tag: Tag, + Data: Data, + }); disable { + mi.disable.Store(true) + m.someNeedRemove.Store(true) + } + mi.running.Add(-1) + } + + ul() + if m.someNeedRemove.Load() { + m.removeDisable() + } + } } // 不能放置在由Push_tag、PushLock_tag调用的同步Pull中 @@ -174,16 +210,42 @@ func (m *Msgq) PushLock_tag(Tag string, Data any) { m.runTag.Delete(ptr) }() } - m.PushLock(Msgq_tag_data{ - Tag: Tag, - Data: Data, - }) + { + /* + m.m.PushLock(Msgq_tag_data{ + Tag: Tag, + Data: Data, + }) + */ + ul := m.lock.Lock(m.to...) + + for el := m.funcs.Front(); el != nil; el = el.Next() { + mi := el.Value.(*msgqItem) + if mi.disable.Load() { + continue + } + mi.running.Add(1) + if disable := (*mi.f)(&Msgq_tag_data{ + Tag: Tag, + Data: Data, + }); disable { + mi.disable.Store(true) + m.someNeedRemove.Store(true) + } + mi.running.Add(-1) + } + + ul() + if m.someNeedRemove.Load() { + m.removeDisable() + } + } } func (m *Msgq) Pull_tag_chan(key string, size int, ctx context.Context) <-chan any { var ch = make(chan any, size) - m.Register(func(data any) bool { - if d, ok := data.(Msgq_tag_data); ok && d.Tag == key { + var f1 = func(data any) bool { + if d, ok := data.(*Msgq_tag_data); ok && d.Tag == key { select { case <-ctx.Done(): close(ch) @@ -196,168 +258,105 @@ func (m *Msgq) Pull_tag_chan(key string, size int, ctx context.Context) <-chan a } } return false + } + m.register_front(&msgqItem{ + f: &f1, }) return ch } func (m *Msgq) Pull_tag_only(key string, f func(any) (disable bool)) { - m.Register(func(data any) (disable bool) { - if d, ok := data.(Msgq_tag_data); ok && d.Tag == key { + var f1 = func(data any) (disable bool) { + if d, ok := data.(*Msgq_tag_data); ok && d.Tag == key { return f(d.Data) } return false + } + m.register_front(&msgqItem{ + f: &f1, }) } func (m *Msgq) Pull_tag(func_map map[string]func(any) (disable bool)) { - m.Register(func(data any) (disable bool) { - if d, ok := data.(Msgq_tag_data); ok { + var f1 = func(data any) (disable bool) { + if d, ok := data.(*Msgq_tag_data); ok { if f, ok := func_map[d.Tag]; ok { return f(d.Data) } } return false + } + m.register_front(&msgqItem{ + f: &f1, }) } func (m *Msgq) Pull_tag_async_only(key string, f func(any) (disable bool)) { - var disable = signal.Init() - - m.Register_front(func(data any) bool { - if !disable.Islive() { - return true - } - if d, ok := data.(Msgq_tag_data); ok && d.Tag == key { + var mi = msgqItem{} + var f1 = func(data any) bool { + if d, ok := data.(*Msgq_tag_data); ok { go func() { if f(d.Data) { - disable.Done() + mi.disable.Store(true) + m.someNeedRemove.Store(true) } }() } return false - }) + } + mi.f = &f1 + m.register_front(&mi) } func (m *Msgq) Pull_tag_async(func_map map[string]func(any) (disable bool)) { - var disable = signal.Init() - - m.Register_front(func(data any) bool { - if !disable.Islive() { - return true - } - if d, ok := data.(Msgq_tag_data); ok { + var mi = msgqItem{} + var f = func(data any) bool { + if d, ok := data.(*Msgq_tag_data); ok { if f, ok := func_map[d.Tag]; ok { go func() { if f(d.Data) { - disable.Done() + mi.disable.Store(true) + m.someNeedRemove.Store(true) } }() } } return false - }) + } + mi.f = &f + m.register_front(&mi) } type MsgType[T any] struct { - to []time.Duration - funcs *list.List - - call atomic.Int64 - removeList []*list.Element - removelock psync.RWMutex - - lock psync.RWMutex - runTag psync.Map + m *Msgq } type MsgType_tag_data[T any] struct { Tag string - Data T + Data *T } func NewType[T any]() *MsgType[T] { - m := new(MsgType[T]) - m.funcs = list.New() - return m -} - -func NewTypeTo[T any](waitTo time.Duration, runTo ...time.Duration) *MsgType[T] { - fmt.Println("Warn: NewTypeTo[T any] is slow, consider NewType[T any]") - m := new(MsgType[T]) - m.funcs = list.New() - m.to = append([]time.Duration{waitTo}, runTo...) - return m -} - -func (m *MsgType[T]) push(msg MsgType_tag_data[T]) { - isfirst := m.call.Add(1) - - ul := m.lock.RLock(m.to...) - for el := m.funcs.Front(); el != nil; el = el.Next() { - if disable := el.Value.(func(MsgType_tag_data[T]) bool)(msg); disable { - rul := m.removelock.Lock() - m.removeList = append(m.removeList, el) - rul() - } - } - ul() - - if isfirst == 1 { - rul := m.removelock.Lock() - for i := 0; i < len(m.removeList); i++ { - m.funcs.Remove(m.removeList[i]) - } - rul() - } - - m.call.Add(-1) + return &MsgType[T]{m: New()} } -func (m *MsgType[T]) pushLock(msg MsgType_tag_data[T]) { - isfirst := m.call.Add(1) - - ul := m.lock.Lock(m.to...) - defer ul() - - for el := m.funcs.Front(); el != nil; el = el.Next() { - if disable := el.Value.(func(MsgType_tag_data[T]) bool)(msg); disable { - rul := m.removelock.Lock() - m.removeList = append(m.removeList, el) - rul() - } - } - - if isfirst == 1 { - rul := m.removelock.Lock() - for i := 0; i < len(m.removeList); i++ { - m.funcs.Remove(m.removeList[i]) - } - rul() - } - - m.call.Add(-1) +// to[0]:timeout to wait to[1]:timeout to run +func NewTypeTo[T any](to ...time.Duration) *MsgType[T] { + return &MsgType[T]{m: NewTo(to...)} } -func (m *MsgType[T]) register(f func(MsgType_tag_data[T]) (disable bool)) { - ul := m.lock.Lock(m.to...) - m.funcs.PushBack(f) - ul() -} - -func (m *MsgType[T]) register_front(f func(MsgType_tag_data[T]) (disable bool)) { - ul := m.lock.Lock(m.to...) - m.funcs.PushFront(f) - ul() +func (m *MsgType[T]) ClearAll() { + m.m.ClearAll() } // 不能放置在由PushLock_tag调用的同步Pull中 func (m *MsgType[T]) Push_tag(Tag string, Data T) { - if len(m.to) > 0 { + if len(m.m.to) > 0 { ptr := uintptr(unsafe.Pointer(&Data)) - m.runTag.Store(ptr, "[T]Push_tag(`"+Tag+"`,...)") + m.m.runTag.Store(ptr, "Push_tag(`"+Tag+"`,...)") defer func() { if e := recover(); e != nil { - m.runTag.Range(func(key, value any) bool { + m.m.runTag.Range(func(key, value any) bool { if key == ptr { fmt.Printf("%v panic > %v\n", value, e) } else { @@ -365,26 +364,52 @@ func (m *MsgType[T]) Push_tag(Tag string, Data T) { } return true }) - m.runTag.ClearAll() + m.m.runTag.ClearAll() panic(e) } - m.runTag.Delete(ptr) + m.m.runTag.Delete(ptr) }() } - m.push(MsgType_tag_data[T]{ - Tag: Tag, - Data: Data, - }) + { + /* + m.m.Push(Msgq_tag_data{ + Tag: Tag, + Data: Data, + }) + */ + ul := m.m.lock.RLock(m.m.to...) + + for el := m.m.funcs.Front(); el != nil; el = el.Next() { + mi := el.Value.(*msgqItem) + if mi.disable.Load() { + continue + } + mi.running.Add(1) + if disable := (*mi.f)(&MsgType_tag_data[T]{ + Tag: Tag, + Data: &Data, + }); disable { + mi.disable.Store(true) + m.m.someNeedRemove.Store(true) + } + mi.running.Add(-1) + } + + ul() + if m.m.someNeedRemove.Load() { + m.m.removeDisable() + } + } } // 不能放置在由Push_tag、PushLock_tag调用的同步Pull中 func (m *MsgType[T]) PushLock_tag(Tag string, Data T) { - if len(m.to) > 0 { + if len(m.m.to) > 0 { ptr := uintptr(unsafe.Pointer(&Data)) - m.runTag.Store(ptr, "[T]PushLock_tag(`"+Tag+"`,...)") + m.m.runTag.Store(ptr, "PushLock_tag(`"+Tag+"`,...)") defer func() { if e := recover(); e != nil { - m.runTag.Range(func(key, value any) bool { + m.m.runTag.Range(func(key, value any) bool { if key == ptr { fmt.Printf("%v panic > %v\n", value, e) } else { @@ -392,108 +417,129 @@ func (m *MsgType[T]) PushLock_tag(Tag string, Data T) { } return true }) - m.runTag.ClearAll() + m.m.runTag.ClearAll() panic(e) } - m.runTag.Delete(ptr) + m.m.runTag.Delete(ptr) }() } - m.pushLock(MsgType_tag_data[T]{ - Tag: Tag, - Data: Data, - }) -} - -func (m *MsgType[T]) ClearAll() { - isfirst := m.call.Add(1) - - rul := m.removelock.Lock() - for el := m.funcs.Front(); el != nil; el = el.Next() { - m.removeList = append(m.removeList, el) - } - rul() + { + /* + m.m.PushLock(Msgq_tag_data{ + Tag: Tag, + Data: Data, + }) + */ + ul := m.m.lock.Lock(m.m.to...) + + for el := m.m.funcs.Front(); el != nil; el = el.Next() { + mi := el.Value.(*msgqItem) + if mi.disable.Load() { + continue + } + mi.running.Add(1) + if disable := (*mi.f)(&MsgType_tag_data[T]{ + Tag: Tag, + Data: &Data, + }); disable { + mi.disable.Store(true) + m.m.someNeedRemove.Store(true) + } + mi.running.Add(-1) + } - if isfirst == 1 { - rul := m.removelock.Lock() - for i := 0; i < len(m.removeList); i++ { - m.funcs.Remove(m.removeList[i]) + ul() + if m.m.someNeedRemove.Load() { + m.m.removeDisable() } - rul() } - - m.call.Add(-1) } func (m *MsgType[T]) Pull_tag_chan(key string, size int, ctx context.Context) <-chan T { var ch = make(chan T, size) - m.register(func(data MsgType_tag_data[T]) bool { - if data.Tag == key { - select { - case <-ctx.Done(): - close(ch) - return true - default: - for len(ch) != 0 { - <-ch + var f = func(data any) bool { + if data1, ok := data.(*MsgType_tag_data[T]); ok { + if data1.Tag == key { + select { + case <-ctx.Done(): + close(ch) + return true + default: + for len(ch) != 0 { + <-ch + } + ch <- *data1.Data } - ch <- data.Data } } return false + } + m.m.register(&msgqItem{ + f: &f, }) return ch } func (m *MsgType[T]) Pull_tag_only(key string, f func(T) (disable bool)) { - m.register(func(data MsgType_tag_data[T]) (disable bool) { - if data.Tag == key { - return f(data.Data) + var f1 = func(data any) (disable bool) { + if data1, ok := data.(*MsgType_tag_data[T]); ok { + if data1.Tag == key { + return f(*data1.Data) + } } return false + } + m.m.register(&msgqItem{ + f: &f1, }) } func (m *MsgType[T]) Pull_tag(func_map map[string]func(T) (disable bool)) { - m.register(func(data MsgType_tag_data[T]) (disable bool) { - if f, ok := func_map[data.Tag]; ok { - return f(data.Data) + var f = func(data any) (disable bool) { + if data1, ok := data.(*MsgType_tag_data[T]); ok { + if f, ok := func_map[data1.Tag]; ok { + return f(*data1.Data) + } } return false + } + m.m.register(&msgqItem{ + f: &f, }) } func (m *MsgType[T]) Pull_tag_async_only(key string, f func(T) (disable bool)) { - var disable = signal.Init() - - m.register_front(func(data MsgType_tag_data[T]) bool { - if !disable.Islive() { - return true - } - if data.Tag == key { + var mi = msgqItem{} + var f1 = func(data any) bool { + if d, ok := data.(*MsgType_tag_data[T]); ok { go func() { - if f(data.Data) { - disable.Done() + if f(*d.Data) { + mi.disable.Store(true) + m.m.someNeedRemove.Store(true) } }() } return false - }) + } + mi.f = &f1 + m.m.register_front(&mi) } func (m *MsgType[T]) Pull_tag_async(func_map map[string]func(T) (disable bool)) { - var disable = signal.Init() - - m.register_front(func(data MsgType_tag_data[T]) bool { - if !disable.Islive() { - return true - } - if f, ok := func_map[data.Tag]; ok { - go func() { - if f(data.Data) { - disable.Done() - } - }() + var mi = msgqItem{} + var f = func(data any) bool { + if d, ok := data.(*MsgType_tag_data[T]); ok { + if f, ok := func_map[d.Tag]; ok { + go func() { + if f(*d.Data) { + mi.disable.Store(true) + m.m.someNeedRemove.Store(true) + } + }() + } } return false - }) + } + mi.f = &f + m.m.register_front(&mi) } -- 2.39.2