From 42add8da806afd48a6a528a4d2f0cf8fd60a92f8 Mon Sep 17 00:00:00 2001 From: qydysky Date: Sun, 14 May 2023 23:36:54 +0800 Subject: [PATCH] add --- msgq/Msgq.go | 24 +++--- msgq/Msgq_test.go | 4 +- sync/RWMutex.go | 177 ++++++++++++++++++++++--------------------- sync/RWMutex_test.go | 12 +-- websocket/Client.go | 4 +- 5 files changed, 115 insertions(+), 106 deletions(-) diff --git a/msgq/Msgq.go b/msgq/Msgq.go index 117545e..d254edd 100644 --- a/msgq/Msgq.go +++ b/msgq/Msgq.go @@ -42,13 +42,13 @@ func NewTo(to time.Duration) *Msgq { } func (m *Msgq) Register(f func(any) (disable bool)) { - ul := m.lock.Lock(m.to...) + ul := m.lock.Lock(m.to...)() m.funcs.PushBack(f) ul() } func (m *Msgq) Register_front(f func(any) (disable bool)) { - ul := m.lock.Lock(m.to...) + ul := m.lock.Lock(m.to...)() m.funcs.PushFront(f) ul() } @@ -61,7 +61,7 @@ func (m *Msgq) Push(msg any) { var removes []*list.Element - ul := m.lock.RLock(m.to...) + ul := m.lock.RLock(m.to...)() for el := m.funcs.Front(); el != nil; el = el.Next() { if disable := el.Value.(func(any) bool)(msg); disable { m.someNeedRemove.Add(1) @@ -71,7 +71,7 @@ func (m *Msgq) Push(msg any) { ul() if len(removes) != 0 { - ul := m.lock.Lock(m.to...) + ul := m.lock.Lock(m.to...)() m.someNeedRemove.Add(-int32(len(removes))) for i := 0; i < len(removes); i++ { m.funcs.Remove(removes[i]) @@ -86,7 +86,7 @@ func (m *Msgq) PushLock(msg any) { runtime.Gosched() } - ul := m.lock.Lock(m.to...) + ul := m.lock.Lock(m.to...)() defer ul() var removes []*list.Element @@ -112,7 +112,7 @@ func (m *Msgq) ClearAll() { runtime.Gosched() } - ul := m.lock.Lock(m.to...) + ul := m.lock.Lock(m.to...)() defer ul() var removes []*list.Element @@ -304,7 +304,7 @@ func (m *MsgType[T]) push(msg MsgType_tag_data[T]) { var removes []*list.Element - ul := m.lock.RLock(m.to...) + ul := m.lock.RLock(m.to...)() for el := m.funcs.Front(); el != nil; el = el.Next() { if disable := el.Value.(func(MsgType_tag_data[T]) bool)(msg); disable { m.someNeedRemove.Add(1) @@ -314,7 +314,7 @@ func (m *MsgType[T]) push(msg MsgType_tag_data[T]) { ul() if len(removes) != 0 { - ul := m.lock.Lock(m.to...) + ul := m.lock.Lock(m.to...)() m.someNeedRemove.Add(-int32(len(removes))) for i := 0; i < len(removes); i++ { m.funcs.Remove(removes[i]) @@ -329,7 +329,7 @@ func (m *MsgType[T]) pushLock(msg MsgType_tag_data[T]) { runtime.Gosched() } - ul := m.lock.Lock(m.to...) + ul := m.lock.Lock(m.to...)() defer ul() var removes []*list.Element @@ -350,13 +350,13 @@ func (m *MsgType[T]) pushLock(msg MsgType_tag_data[T]) { } func (m *MsgType[T]) register(f func(MsgType_tag_data[T]) (disable bool)) { - ul := m.lock.Lock(m.to...) + ul := m.lock.Lock(m.to...)() m.funcs.PushBack(f) ul() } func (m *MsgType[T]) register_front(f func(MsgType_tag_data[T]) (disable bool)) { - ul := m.lock.Lock(m.to...) + ul := m.lock.Lock(m.to...)() m.funcs.PushFront(f) ul() } @@ -419,7 +419,7 @@ func (m *MsgType[T]) ClearAll() { runtime.Gosched() } - ul := m.lock.Lock(m.to...) + ul := m.lock.Lock(m.to...)() defer ul() var removes []*list.Element diff --git a/msgq/Msgq_test.go b/msgq/Msgq_test.go index 033ffda..931857e 100644 --- a/msgq/Msgq_test.go +++ b/msgq/Msgq_test.go @@ -2,6 +2,7 @@ package part import ( "context" + "fmt" _ "net/http/pprof" "sync" "testing" @@ -155,7 +156,7 @@ func BenchmarkXxx(b *testing.B) { // } func Benchmark_1(b *testing.B) { - mq := NewTo(time.Second) + mq := New() mq.Pull_tag_only(`test`, func(a any) (disable bool) { return false }) @@ -358,6 +359,7 @@ func Test_msgq3(t *testing.T) { time.Sleep(time.Second) for fin_turn := 0; fin_turn < 1000000; fin_turn += 1 { + fmt.Printf("\r%d", fin_turn) mq.Push_tag(`A1`, fin_turn) if fin_turn != <-mun_c { t.Fatal(fin_turn) diff --git a/sync/RWMutex.go b/sync/RWMutex.go index 5c1add8..b3c17f1 100644 --- a/sync/RWMutex.go +++ b/sync/RWMutex.go @@ -18,113 +18,116 @@ type RWMutex struct { oll atomic.Int32 } -func (m *RWMutex) RLock(to ...time.Duration) (unrlock func()) { - var callC atomic.Bool - if len(to) > 0 { - var calls []string - for i := 1; true; i++ { - if pc, file, line, ok := runtime.Caller(i); !ok { - break - } else { - calls = append(calls, fmt.Sprintf("%s\n\t%s:%d", runtime.FuncForPC(pc).Name(), file, line)) - } - } - c := time.Now() - for m.rlc.Load() < ulock || m.cul.Load() != m.oll.Load() { - if time.Since(c) > to[0] { - panic(fmt.Sprintf("timeout to wait lock, rlc:%d", m.rlc.Load())) +// RLock() 必须在 lock期间操作的变量所定义的goroutime 中调用 +func (m *RWMutex) RLock(to ...time.Duration) (lockf func() (unlockf func())) { + lockid := m.cul.Add(1) + return func() (unlockf func()) { + var callC atomic.Bool + if len(to) > 0 { + var calls []string + for i := 1; true; i++ { + if pc, file, line, ok := runtime.Caller(i); !ok { + break + } else { + calls = append(calls, fmt.Sprintf("%s\n\t%s:%d", runtime.FuncForPC(pc).Name(), file, line)) + } } - runtime.Gosched() - } - c = time.Now() - go func() { - for !callC.Load() { + c := time.Now() + for m.rlc.Load() < ulock { if time.Since(c) > to[0] { - panicS := fmt.Sprintf("timeout to run rlock %v > %v\n", time.Since(c), to[0]) - for i := 0; i < len(calls); i++ { - panicS += fmt.Sprintf("%s\n", calls[i]) + panic(fmt.Sprintf("timeout to wait lock, rlc:%d", m.rlc.Load())) + } + runtime.Gosched() + } + if lockid > m.oll.Load() { + m.oll.Store(lockid) + } + c = time.Now() + go func() { + for !callC.Load() { + if time.Since(c) > to[0] { + panicS := fmt.Sprintf("timeout to run rlock %v > %v\n", time.Since(c), to[0]) + for i := 0; i < len(calls); i++ { + panicS += fmt.Sprintf("%s\n", calls[i]) + } + panic(panicS) } - panic(panicS) + runtime.Gosched() } + }() + } else { + for m.rlc.Load() < ulock { + time.Sleep(time.Millisecond) runtime.Gosched() } - }() - } else { - for m.rlc.Load() < ulock || m.cul.Load() != m.oll.Load() { - time.Sleep(time.Millisecond) - runtime.Gosched() + if lockid > m.oll.Load() { + m.oll.Store(lockid) + } } - } - m.rlc.Add(1) - return func() { - if !callC.CompareAndSwap(false, true) { - panic("had unrlock") + m.rlc.Add(1) + return func() { + if !callC.CompareAndSwap(false, true) { + panic("had unrlock") + } + m.rlc.Add(-1) } - m.rlc.Add(-1) } } -func (m *RWMutex) Lock(to ...time.Duration) (unlock func()) { +// Lock() 必须在 lock期间操作的变量所定义的goroutime 中调用 +func (m *RWMutex) Lock(to ...time.Duration) (lockf func() (unlockf func())) { lockid := m.cul.Add(1) - var callC atomic.Bool - if len(to) > 0 { - var calls []string - for i := 1; true; i++ { - if pc, file, line, ok := runtime.Caller(i); !ok { - break - } else { - calls = append(calls, fmt.Sprintf("%s\n\t%s:%d", runtime.FuncForPC(pc).Name(), file, line)) + return func() (unlock func()) { + var callC atomic.Bool + if len(to) > 0 { + var calls []string + for i := 1; true; i++ { + if pc, file, line, ok := runtime.Caller(i); !ok { + break + } else { + calls = append(calls, fmt.Sprintf("%s\n\t%s:%d", runtime.FuncForPC(pc).Name(), file, line)) + } } - } - c := time.Now() - for m.rlc.Load() > ulock { - if time.Since(c) > to[0] { - panic(fmt.Sprintf("timeout to wait rlock, rlc:%d", m.rlc.Load())) + c := time.Now() + for m.rlc.Load() > ulock || lockid-1 != m.oll.Load() { + if time.Since(c) > to[0] { + panic(fmt.Sprintf("timeout to wait rlock, rlc:%d", m.rlc.Load())) + } + runtime.Gosched() } - runtime.Gosched() - } - for lockid-1 != m.oll.Load() { - if time.Since(c) > to[0] { - panic(fmt.Sprintf("timeout to wait lock, rlc:%d", m.rlc.Load())) + if !m.rlc.CompareAndSwap(ulock, lock) { + panic(fmt.Sprintf("csa error, rlc:%d", m.rlc.Load())) } - runtime.Gosched() - } - if !m.rlc.CompareAndSwap(ulock, lock) { - panic(fmt.Sprintf("csa error, rlc:%d", m.rlc.Load())) - } - c = time.Now() - go func() { - for !callC.Load() { - if time.Since(c) > to[0] { - panicS := fmt.Sprintf("timeout to run lock %v > %v\n", time.Since(c), to[0]) - for i := 0; i < len(calls); i++ { - panicS += fmt.Sprintf("call by %s\n", calls[i]) + c = time.Now() + go func() { + for !callC.Load() { + if time.Since(c) > to[0] { + panicS := fmt.Sprintf("timeout to run lock %v > %v\n", time.Since(c), to[0]) + for i := 0; i < len(calls); i++ { + panicS += fmt.Sprintf("call by %s\n", calls[i]) + } + panic(panicS) } - panic(panicS) + runtime.Gosched() } + }() + } else { + for m.rlc.Load() > ulock || lockid-1 != m.oll.Load() { + time.Sleep(time.Millisecond) runtime.Gosched() } - }() - } else { - for m.rlc.Load() > ulock { - time.Sleep(time.Millisecond) - runtime.Gosched() - } - for lockid-1 != m.oll.Load() { - time.Sleep(time.Millisecond) - runtime.Gosched() - } - if !m.rlc.CompareAndSwap(ulock, lock) { - panic(fmt.Sprintf("csa error, rlc:%d", m.rlc.Load())) - } - } - return func() { - if !callC.CompareAndSwap(false, true) { - panic("had unlock") + if !m.rlc.CompareAndSwap(ulock, lock) { + panic(fmt.Sprintf("csa error, rlc:%d", m.rlc.Load())) + } } - if !m.rlc.CompareAndSwap(lock, ulock) { - panic("") + return func() { + if !callC.CompareAndSwap(false, true) { + panic("had unlock") + } + if !m.rlc.CompareAndSwap(lock, ulock) { + panic("") + } + m.oll.Store(lockid) } - m.oll.Store(lockid) } } diff --git a/sync/RWMutex_test.go b/sync/RWMutex_test.go index 5178e9f..843253a 100644 --- a/sync/RWMutex_test.go +++ b/sync/RWMutex_test.go @@ -12,24 +12,26 @@ func TestMain(t *testing.T) { var callRL2 time.Time var to = time.Second * 2 - ul := rl.RLock(to) + ul := rl.RLock(to)() callRL = time.Now() + rlock := rl.RLock(to) go func() { - ull := rl.RLock(to) + unlock := rlock() callRL2 = time.Now() - ull() + unlock() }() + lock := rl.Lock(to) go func() { - ull := rl.Lock(to) + ull := lock() callL = time.Now() ull() }() time.Sleep(time.Second) ul() - rl.Lock(to)() + rl.Lock(to)()() if time.Since(callRL) < time.Since(callRL2) { t.Fatal() diff --git a/websocket/Client.go b/websocket/Client.go index 323ec72..7b3c7d7 100644 --- a/websocket/Client.go +++ b/websocket/Client.go @@ -87,6 +87,8 @@ func New_client(config *Client) (*Client, error) { // fmt.Println(string(wm.Msg)) // return false // }) +// +// 事件 send rec close exit func (o *Client) Handle() (*msgq.MsgType[*WsMsg], error) { tmp_Header := make(http.Header) for k, v := range o.Header { @@ -125,7 +127,7 @@ func (o *Client) Handle() (*msgq.MsgType[*WsMsg], error) { // rec go func() { defer func() { - o.msg.PushLock_tag(`close`, nil) + o.msg.PushLock_tag(`exit`, nil) o.msg.ClearAll() o.l.Lock() o.close = true -- 2.39.2