]> 127.0.0.1 Git - part/.git/commitdiff
add v0.28.0+202307280d08058
authorqydysky <qydysky@foxmail.com>
Fri, 28 Jul 2023 07:23:07 +0000 (15:23 +0800)
committerqydysky <qydysky@foxmail.com>
Fri, 28 Jul 2023 07:23:07 +0000 (15:23 +0800)
msgq/Msgq.go

index 80dc6e5f02298aa644b589ec7d4c0c67f6f04736..c60b1c2926bb136ef59f3d961ca75c44ab0d3987 100644 (file)
@@ -8,7 +8,6 @@ import (
        "time"
        "unsafe"
 
-       signal "github.com/qydysky/part/signal"
        psync "github.com/qydysky/part/sync"
 )
 
@@ -16,12 +15,15 @@ type Msgq struct {
        to    []time.Duration
        funcs *list.List
 
-       call       atomic.Int64
-       removeList []*list.Element
-       removelock psync.RWMutex
+       someNeedRemove atomic.Bool
+       lock           psync.RWMutex
+       runTag         psync.Map
+}
 
-       lock   psync.RWMutex
-       runTag psync.Map
+type msgqItem struct {
+       running atomic.Int32
+       disable atomic.Bool
+       f       *func(any) (disable bool)
 }
 
 type FuncMap map[string]func(any) (disable bool)
@@ -32,93 +34,101 @@ func New() *Msgq {
        return m
 }
 
-func NewTo(waitTo time.Duration, runTo ...time.Duration) *Msgq {
-       fmt.Println("Warn: NewTo is slow, consider New")
+// to[0]:timeout to wait to[1]:timeout to run
+func NewTo(to ...time.Duration) *Msgq {
        m := new(Msgq)
        m.funcs = list.New()
-       m.to = append([]time.Duration{waitTo}, runTo...)
+       m.to = to
        return m
 }
 
+func (m *Msgq) register(mp *msgqItem) {
+       ul := m.lock.Lock()
+       m.funcs.PushBack(mp)
+       ul()
+}
+
 func (m *Msgq) Register(f func(any) (disable bool)) {
-       ul := m.lock.Lock(m.to...)
-       m.funcs.PushBack(f)
+       m.register(&msgqItem{
+               f: &f,
+       })
+}
+
+func (m *Msgq) register_front(mp *msgqItem) {
+       ul := m.lock.Lock()
+       m.funcs.PushFront(mp)
        ul()
 }
 
 func (m *Msgq) Register_front(f func(any) (disable bool)) {
-       ul := m.lock.Lock(m.to...)
-       m.funcs.PushFront(f)
-       ul()
+       m.register_front(&msgqItem{
+               f: &f,
+       })
 }
 
 func (m *Msgq) Push(msg any) {
-       isfirst := m.call.Add(1)
-
        ul := m.lock.RLock(m.to...)
+
        for el := m.funcs.Front(); el != nil; el = el.Next() {
-               if disable := el.Value.(func(any) bool)(msg); disable {
-                       rul := m.removelock.Lock()
-                       m.removeList = append(m.removeList, el)
-                       rul()
+               mi := el.Value.(*msgqItem)
+               if mi.disable.Load() {
+                       continue
                }
-       }
-       ul()
-
-       if isfirst == 1 {
-               rul := m.removelock.Lock()
-               for i := 0; i < len(m.removeList); i++ {
-                       m.funcs.Remove(m.removeList[i])
+               mi.running.Add(1)
+               if disable := (*mi.f)(msg); disable {
+                       mi.disable.Store(true)
+                       m.someNeedRemove.Store(true)
                }
-               rul()
+               mi.running.Add(-1)
        }
 
-       m.call.Add(-1)
+       ul()
+       if m.someNeedRemove.Load() {
+               m.removeDisable()
+       }
 }
 
 func (m *Msgq) PushLock(msg any) {
-       isfirst := m.call.Add(1)
-
        ul := m.lock.Lock(m.to...)
-       defer ul()
 
        for el := m.funcs.Front(); el != nil; el = el.Next() {
-               if disable := el.Value.(func(any) bool)(msg); disable {
-                       rul := m.removelock.Lock()
-                       m.removeList = append(m.removeList, el)
-                       rul()
+               mi := el.Value.(*msgqItem)
+               if mi.disable.Load() {
+                       continue
                }
-       }
-
-       if isfirst == 1 {
-               rul := m.removelock.Lock()
-               for i := 0; i < len(m.removeList); i++ {
-                       m.funcs.Remove(m.removeList[i])
+               mi.running.Add(1)
+               if disable := (*mi.f)(msg); disable {
+                       mi.disable.Store(true)
+                       m.someNeedRemove.Store(true)
                }
-               rul()
+               mi.running.Add(-1)
        }
 
-       m.call.Add(-1)
+       ul()
+       if m.someNeedRemove.Load() {
+               m.removeDisable()
+       }
 }
 
 func (m *Msgq) ClearAll() {
-       isfirst := m.call.Add(1)
-
-       rul := m.removelock.Lock()
        for el := m.funcs.Front(); el != nil; el = el.Next() {
-               m.removeList = append(m.removeList, el)
+               mi := el.Value.(*msgqItem)
+               mi.disable.Store(true)
+               m.someNeedRemove.Store(true)
        }
-       rul()
+}
+
+func (m *Msgq) removeDisable() {
+       ul := m.lock.Lock(m.to...)
+       defer ul()
 
-       if isfirst == 1 {
-               rul := m.removelock.Lock()
-               for i := 0; i < len(m.removeList); i++ {
-                       m.funcs.Remove(m.removeList[i])
+       m.someNeedRemove.Store(false)
+       for el := m.funcs.Front(); el != nil; el = el.Next() {
+               mi := el.Value.(*msgqItem)
+               if mi.disable.Load() && mi.running.Load() == 0 {
+                       m.funcs.Remove(el)
                }
-               rul()
        }
-
-       m.call.Add(-1)
 }
 
 type Msgq_tag_data struct {
@@ -147,10 +157,36 @@ func (m *Msgq) Push_tag(Tag string, Data any) {
                        m.runTag.Delete(ptr)
                }()
        }
-       m.Push(Msgq_tag_data{
-               Tag:  Tag,
-               Data: Data,
-       })
+       {
+               /*
+                       m.m.Push(Msgq_tag_data{
+                               Tag:  Tag,
+                               Data: Data,
+                       })
+               */
+               ul := m.lock.RLock(m.to...)
+
+               for el := m.funcs.Front(); el != nil; el = el.Next() {
+                       mi := el.Value.(*msgqItem)
+                       if mi.disable.Load() {
+                               continue
+                       }
+                       mi.running.Add(1)
+                       if disable := (*mi.f)(&Msgq_tag_data{
+                               Tag:  Tag,
+                               Data: Data,
+                       }); disable {
+                               mi.disable.Store(true)
+                               m.someNeedRemove.Store(true)
+                       }
+                       mi.running.Add(-1)
+               }
+
+               ul()
+               if m.someNeedRemove.Load() {
+                       m.removeDisable()
+               }
+       }
 }
 
 // 不能放置在由Push_tag、PushLock_tag调用的同步Pull中
@@ -174,16 +210,42 @@ func (m *Msgq) PushLock_tag(Tag string, Data any) {
                        m.runTag.Delete(ptr)
                }()
        }
-       m.PushLock(Msgq_tag_data{
-               Tag:  Tag,
-               Data: Data,
-       })
+       {
+               /*
+                       m.m.PushLock(Msgq_tag_data{
+                               Tag:  Tag,
+                               Data: Data,
+                       })
+               */
+               ul := m.lock.Lock(m.to...)
+
+               for el := m.funcs.Front(); el != nil; el = el.Next() {
+                       mi := el.Value.(*msgqItem)
+                       if mi.disable.Load() {
+                               continue
+                       }
+                       mi.running.Add(1)
+                       if disable := (*mi.f)(&Msgq_tag_data{
+                               Tag:  Tag,
+                               Data: Data,
+                       }); disable {
+                               mi.disable.Store(true)
+                               m.someNeedRemove.Store(true)
+                       }
+                       mi.running.Add(-1)
+               }
+
+               ul()
+               if m.someNeedRemove.Load() {
+                       m.removeDisable()
+               }
+       }
 }
 
 func (m *Msgq) Pull_tag_chan(key string, size int, ctx context.Context) <-chan any {
        var ch = make(chan any, size)
-       m.Register(func(data any) bool {
-               if d, ok := data.(Msgq_tag_data); ok && d.Tag == key {
+       var f1 = func(data any) bool {
+               if d, ok := data.(*Msgq_tag_data); ok && d.Tag == key {
                        select {
                        case <-ctx.Done():
                                close(ch)
@@ -196,168 +258,105 @@ func (m *Msgq) Pull_tag_chan(key string, size int, ctx context.Context) <-chan a
                        }
                }
                return false
+       }
+       m.register_front(&msgqItem{
+               f: &f1,
        })
        return ch
 }
 
 func (m *Msgq) Pull_tag_only(key string, f func(any) (disable bool)) {
-       m.Register(func(data any) (disable bool) {
-               if d, ok := data.(Msgq_tag_data); ok && d.Tag == key {
+       var f1 = func(data any) (disable bool) {
+               if d, ok := data.(*Msgq_tag_data); ok && d.Tag == key {
                        return f(d.Data)
                }
                return false
+       }
+       m.register_front(&msgqItem{
+               f: &f1,
        })
 }
 
 func (m *Msgq) Pull_tag(func_map map[string]func(any) (disable bool)) {
-       m.Register(func(data any) (disable bool) {
-               if d, ok := data.(Msgq_tag_data); ok {
+       var f1 = func(data any) (disable bool) {
+               if d, ok := data.(*Msgq_tag_data); ok {
                        if f, ok := func_map[d.Tag]; ok {
                                return f(d.Data)
                        }
                }
                return false
+       }
+       m.register_front(&msgqItem{
+               f: &f1,
        })
 }
 
 func (m *Msgq) Pull_tag_async_only(key string, f func(any) (disable bool)) {
-       var disable = signal.Init()
-
-       m.Register_front(func(data any) bool {
-               if !disable.Islive() {
-                       return true
-               }
-               if d, ok := data.(Msgq_tag_data); ok && d.Tag == key {
+       var mi = msgqItem{}
+       var f1 = func(data any) bool {
+               if d, ok := data.(*Msgq_tag_data); ok {
                        go func() {
                                if f(d.Data) {
-                                       disable.Done()
+                                       mi.disable.Store(true)
+                                       m.someNeedRemove.Store(true)
                                }
                        }()
                }
                return false
-       })
+       }
+       mi.f = &f1
+       m.register_front(&mi)
 }
 
 func (m *Msgq) Pull_tag_async(func_map map[string]func(any) (disable bool)) {
-       var disable = signal.Init()
-
-       m.Register_front(func(data any) bool {
-               if !disable.Islive() {
-                       return true
-               }
-               if d, ok := data.(Msgq_tag_data); ok {
+       var mi = msgqItem{}
+       var f = func(data any) bool {
+               if d, ok := data.(*Msgq_tag_data); ok {
                        if f, ok := func_map[d.Tag]; ok {
                                go func() {
                                        if f(d.Data) {
-                                               disable.Done()
+                                               mi.disable.Store(true)
+                                               m.someNeedRemove.Store(true)
                                        }
                                }()
                        }
                }
                return false
-       })
+       }
+       mi.f = &f
+       m.register_front(&mi)
 }
 
 type MsgType[T any] struct {
-       to    []time.Duration
-       funcs *list.List
-
-       call       atomic.Int64
-       removeList []*list.Element
-       removelock psync.RWMutex
-
-       lock   psync.RWMutex
-       runTag psync.Map
+       m *Msgq
 }
 
 type MsgType_tag_data[T any] struct {
        Tag  string
-       Data T
+       Data *T
 }
 
 func NewType[T any]() *MsgType[T] {
-       m := new(MsgType[T])
-       m.funcs = list.New()
-       return m
-}
-
-func NewTypeTo[T any](waitTo time.Duration, runTo ...time.Duration) *MsgType[T] {
-       fmt.Println("Warn: NewTypeTo[T any] is slow, consider NewType[T any]")
-       m := new(MsgType[T])
-       m.funcs = list.New()
-       m.to = append([]time.Duration{waitTo}, runTo...)
-       return m
-}
-
-func (m *MsgType[T]) push(msg MsgType_tag_data[T]) {
-       isfirst := m.call.Add(1)
-
-       ul := m.lock.RLock(m.to...)
-       for el := m.funcs.Front(); el != nil; el = el.Next() {
-               if disable := el.Value.(func(MsgType_tag_data[T]) bool)(msg); disable {
-                       rul := m.removelock.Lock()
-                       m.removeList = append(m.removeList, el)
-                       rul()
-               }
-       }
-       ul()
-
-       if isfirst == 1 {
-               rul := m.removelock.Lock()
-               for i := 0; i < len(m.removeList); i++ {
-                       m.funcs.Remove(m.removeList[i])
-               }
-               rul()
-       }
-
-       m.call.Add(-1)
+       return &MsgType[T]{m: New()}
 }
 
-func (m *MsgType[T]) pushLock(msg MsgType_tag_data[T]) {
-       isfirst := m.call.Add(1)
-
-       ul := m.lock.Lock(m.to...)
-       defer ul()
-
-       for el := m.funcs.Front(); el != nil; el = el.Next() {
-               if disable := el.Value.(func(MsgType_tag_data[T]) bool)(msg); disable {
-                       rul := m.removelock.Lock()
-                       m.removeList = append(m.removeList, el)
-                       rul()
-               }
-       }
-
-       if isfirst == 1 {
-               rul := m.removelock.Lock()
-               for i := 0; i < len(m.removeList); i++ {
-                       m.funcs.Remove(m.removeList[i])
-               }
-               rul()
-       }
-
-       m.call.Add(-1)
+// to[0]:timeout to wait to[1]:timeout to run
+func NewTypeTo[T any](to ...time.Duration) *MsgType[T] {
+       return &MsgType[T]{m: NewTo(to...)}
 }
 
-func (m *MsgType[T]) register(f func(MsgType_tag_data[T]) (disable bool)) {
-       ul := m.lock.Lock(m.to...)
-       m.funcs.PushBack(f)
-       ul()
-}
-
-func (m *MsgType[T]) register_front(f func(MsgType_tag_data[T]) (disable bool)) {
-       ul := m.lock.Lock(m.to...)
-       m.funcs.PushFront(f)
-       ul()
+func (m *MsgType[T]) ClearAll() {
+       m.m.ClearAll()
 }
 
 // 不能放置在由PushLock_tag调用的同步Pull中
 func (m *MsgType[T]) Push_tag(Tag string, Data T) {
-       if len(m.to) > 0 {
+       if len(m.m.to) > 0 {
                ptr := uintptr(unsafe.Pointer(&Data))
-               m.runTag.Store(ptr, "[T]Push_tag(`"+Tag+"`,...)")
+               m.m.runTag.Store(ptr, "Push_tag(`"+Tag+"`,...)")
                defer func() {
                        if e := recover(); e != nil {
-                               m.runTag.Range(func(key, value any) bool {
+                               m.m.runTag.Range(func(key, value any) bool {
                                        if key == ptr {
                                                fmt.Printf("%v panic > %v\n", value, e)
                                        } else {
@@ -365,26 +364,52 @@ func (m *MsgType[T]) Push_tag(Tag string, Data T) {
                                        }
                                        return true
                                })
-                               m.runTag.ClearAll()
+                               m.m.runTag.ClearAll()
                                panic(e)
                        }
-                       m.runTag.Delete(ptr)
+                       m.m.runTag.Delete(ptr)
                }()
        }
-       m.push(MsgType_tag_data[T]{
-               Tag:  Tag,
-               Data: Data,
-       })
+       {
+               /*
+                       m.m.Push(Msgq_tag_data{
+                               Tag:  Tag,
+                               Data: Data,
+                       })
+               */
+               ul := m.m.lock.RLock(m.m.to...)
+
+               for el := m.m.funcs.Front(); el != nil; el = el.Next() {
+                       mi := el.Value.(*msgqItem)
+                       if mi.disable.Load() {
+                               continue
+                       }
+                       mi.running.Add(1)
+                       if disable := (*mi.f)(&MsgType_tag_data[T]{
+                               Tag:  Tag,
+                               Data: &Data,
+                       }); disable {
+                               mi.disable.Store(true)
+                               m.m.someNeedRemove.Store(true)
+                       }
+                       mi.running.Add(-1)
+               }
+
+               ul()
+               if m.m.someNeedRemove.Load() {
+                       m.m.removeDisable()
+               }
+       }
 }
 
 // 不能放置在由Push_tag、PushLock_tag调用的同步Pull中
 func (m *MsgType[T]) PushLock_tag(Tag string, Data T) {
-       if len(m.to) > 0 {
+       if len(m.m.to) > 0 {
                ptr := uintptr(unsafe.Pointer(&Data))
-               m.runTag.Store(ptr, "[T]PushLock_tag(`"+Tag+"`,...)")
+               m.m.runTag.Store(ptr, "PushLock_tag(`"+Tag+"`,...)")
                defer func() {
                        if e := recover(); e != nil {
-                               m.runTag.Range(func(key, value any) bool {
+                               m.m.runTag.Range(func(key, value any) bool {
                                        if key == ptr {
                                                fmt.Printf("%v panic > %v\n", value, e)
                                        } else {
@@ -392,108 +417,129 @@ func (m *MsgType[T]) PushLock_tag(Tag string, Data T) {
                                        }
                                        return true
                                })
-                               m.runTag.ClearAll()
+                               m.m.runTag.ClearAll()
                                panic(e)
                        }
-                       m.runTag.Delete(ptr)
+                       m.m.runTag.Delete(ptr)
                }()
        }
-       m.pushLock(MsgType_tag_data[T]{
-               Tag:  Tag,
-               Data: Data,
-       })
-}
-
-func (m *MsgType[T]) ClearAll() {
-       isfirst := m.call.Add(1)
-
-       rul := m.removelock.Lock()
-       for el := m.funcs.Front(); el != nil; el = el.Next() {
-               m.removeList = append(m.removeList, el)
-       }
-       rul()
+       {
+               /*
+                       m.m.PushLock(Msgq_tag_data{
+                               Tag:  Tag,
+                               Data: Data,
+                       })
+               */
+               ul := m.m.lock.Lock(m.m.to...)
+
+               for el := m.m.funcs.Front(); el != nil; el = el.Next() {
+                       mi := el.Value.(*msgqItem)
+                       if mi.disable.Load() {
+                               continue
+                       }
+                       mi.running.Add(1)
+                       if disable := (*mi.f)(&MsgType_tag_data[T]{
+                               Tag:  Tag,
+                               Data: &Data,
+                       }); disable {
+                               mi.disable.Store(true)
+                               m.m.someNeedRemove.Store(true)
+                       }
+                       mi.running.Add(-1)
+               }
 
-       if isfirst == 1 {
-               rul := m.removelock.Lock()
-               for i := 0; i < len(m.removeList); i++ {
-                       m.funcs.Remove(m.removeList[i])
+               ul()
+               if m.m.someNeedRemove.Load() {
+                       m.m.removeDisable()
                }
-               rul()
        }
-
-       m.call.Add(-1)
 }
 
 func (m *MsgType[T]) Pull_tag_chan(key string, size int, ctx context.Context) <-chan T {
        var ch = make(chan T, size)
-       m.register(func(data MsgType_tag_data[T]) bool {
-               if data.Tag == key {
-                       select {
-                       case <-ctx.Done():
-                               close(ch)
-                               return true
-                       default:
-                               for len(ch) != 0 {
-                                       <-ch
+       var f = func(data any) bool {
+               if data1, ok := data.(*MsgType_tag_data[T]); ok {
+                       if data1.Tag == key {
+                               select {
+                               case <-ctx.Done():
+                                       close(ch)
+                                       return true
+                               default:
+                                       for len(ch) != 0 {
+                                               <-ch
+                                       }
+                                       ch <- *data1.Data
                                }
-                               ch <- data.Data
                        }
                }
                return false
+       }
+       m.m.register(&msgqItem{
+               f: &f,
        })
        return ch
 }
 
 func (m *MsgType[T]) Pull_tag_only(key string, f func(T) (disable bool)) {
-       m.register(func(data MsgType_tag_data[T]) (disable bool) {
-               if data.Tag == key {
-                       return f(data.Data)
+       var f1 = func(data any) (disable bool) {
+               if data1, ok := data.(*MsgType_tag_data[T]); ok {
+                       if data1.Tag == key {
+                               return f(*data1.Data)
+                       }
                }
                return false
+       }
+       m.m.register(&msgqItem{
+               f: &f1,
        })
 }
 
 func (m *MsgType[T]) Pull_tag(func_map map[string]func(T) (disable bool)) {
-       m.register(func(data MsgType_tag_data[T]) (disable bool) {
-               if f, ok := func_map[data.Tag]; ok {
-                       return f(data.Data)
+       var f = func(data any) (disable bool) {
+               if data1, ok := data.(*MsgType_tag_data[T]); ok {
+                       if f, ok := func_map[data1.Tag]; ok {
+                               return f(*data1.Data)
+                       }
                }
                return false
+       }
+       m.m.register(&msgqItem{
+               f: &f,
        })
 }
 
 func (m *MsgType[T]) Pull_tag_async_only(key string, f func(T) (disable bool)) {
-       var disable = signal.Init()
-
-       m.register_front(func(data MsgType_tag_data[T]) bool {
-               if !disable.Islive() {
-                       return true
-               }
-               if data.Tag == key {
+       var mi = msgqItem{}
+       var f1 = func(data any) bool {
+               if d, ok := data.(*MsgType_tag_data[T]); ok {
                        go func() {
-                               if f(data.Data) {
-                                       disable.Done()
+                               if f(*d.Data) {
+                                       mi.disable.Store(true)
+                                       m.m.someNeedRemove.Store(true)
                                }
                        }()
                }
                return false
-       })
+       }
+       mi.f = &f1
+       m.m.register_front(&mi)
 }
 
 func (m *MsgType[T]) Pull_tag_async(func_map map[string]func(T) (disable bool)) {
-       var disable = signal.Init()
-
-       m.register_front(func(data MsgType_tag_data[T]) bool {
-               if !disable.Islive() {
-                       return true
-               }
-               if f, ok := func_map[data.Tag]; ok {
-                       go func() {
-                               if f(data.Data) {
-                                       disable.Done()
-                               }
-                       }()
+       var mi = msgqItem{}
+       var f = func(data any) bool {
+               if d, ok := data.(*MsgType_tag_data[T]); ok {
+                       if f, ok := func_map[d.Tag]; ok {
+                               go func() {
+                                       if f(*d.Data) {
+                                               mi.disable.Store(true)
+                                               m.m.someNeedRemove.Store(true)
+                                       }
+                               }()
+                       }
                }
                return false
-       })
+       }
+       mi.f = &f
+       m.m.register_front(&mi)
 }