]> 127.0.0.1 Git - part/.git/commitdiff
add
authorqydysky <qydysky@foxmail.com>
Sun, 14 May 2023 15:36:54 +0000 (23:36 +0800)
committerqydysky <qydysky@foxmail.com>
Sun, 14 May 2023 15:36:54 +0000 (23:36 +0800)
msgq/Msgq.go
msgq/Msgq_test.go
sync/RWMutex.go
sync/RWMutex_test.go
websocket/Client.go

index 117545e92198d770a7a55d722ca93f0f59ffcca0..d254eddca4331fc24cc0905140d0df1480fd1e53 100644 (file)
@@ -42,13 +42,13 @@ func NewTo(to time.Duration) *Msgq {
 }
 
 func (m *Msgq) Register(f func(any) (disable bool)) {
-       ul := m.lock.Lock(m.to...)
+       ul := m.lock.Lock(m.to...)()
        m.funcs.PushBack(f)
        ul()
 }
 
 func (m *Msgq) Register_front(f func(any) (disable bool)) {
-       ul := m.lock.Lock(m.to...)
+       ul := m.lock.Lock(m.to...)()
        m.funcs.PushFront(f)
        ul()
 }
@@ -61,7 +61,7 @@ func (m *Msgq) Push(msg any) {
 
        var removes []*list.Element
 
-       ul := m.lock.RLock(m.to...)
+       ul := m.lock.RLock(m.to...)()
        for el := m.funcs.Front(); el != nil; el = el.Next() {
                if disable := el.Value.(func(any) bool)(msg); disable {
                        m.someNeedRemove.Add(1)
@@ -71,7 +71,7 @@ func (m *Msgq) Push(msg any) {
        ul()
 
        if len(removes) != 0 {
-               ul := m.lock.Lock(m.to...)
+               ul := m.lock.Lock(m.to...)()
                m.someNeedRemove.Add(-int32(len(removes)))
                for i := 0; i < len(removes); i++ {
                        m.funcs.Remove(removes[i])
@@ -86,7 +86,7 @@ func (m *Msgq) PushLock(msg any) {
                runtime.Gosched()
        }
 
-       ul := m.lock.Lock(m.to...)
+       ul := m.lock.Lock(m.to...)()
        defer ul()
 
        var removes []*list.Element
@@ -112,7 +112,7 @@ func (m *Msgq) ClearAll() {
                runtime.Gosched()
        }
 
-       ul := m.lock.Lock(m.to...)
+       ul := m.lock.Lock(m.to...)()
        defer ul()
 
        var removes []*list.Element
@@ -304,7 +304,7 @@ func (m *MsgType[T]) push(msg MsgType_tag_data[T]) {
 
        var removes []*list.Element
 
-       ul := m.lock.RLock(m.to...)
+       ul := m.lock.RLock(m.to...)()
        for el := m.funcs.Front(); el != nil; el = el.Next() {
                if disable := el.Value.(func(MsgType_tag_data[T]) bool)(msg); disable {
                        m.someNeedRemove.Add(1)
@@ -314,7 +314,7 @@ func (m *MsgType[T]) push(msg MsgType_tag_data[T]) {
        ul()
 
        if len(removes) != 0 {
-               ul := m.lock.Lock(m.to...)
+               ul := m.lock.Lock(m.to...)()
                m.someNeedRemove.Add(-int32(len(removes)))
                for i := 0; i < len(removes); i++ {
                        m.funcs.Remove(removes[i])
@@ -329,7 +329,7 @@ func (m *MsgType[T]) pushLock(msg MsgType_tag_data[T]) {
                runtime.Gosched()
        }
 
-       ul := m.lock.Lock(m.to...)
+       ul := m.lock.Lock(m.to...)()
        defer ul()
 
        var removes []*list.Element
@@ -350,13 +350,13 @@ func (m *MsgType[T]) pushLock(msg MsgType_tag_data[T]) {
 }
 
 func (m *MsgType[T]) register(f func(MsgType_tag_data[T]) (disable bool)) {
-       ul := m.lock.Lock(m.to...)
+       ul := m.lock.Lock(m.to...)()
        m.funcs.PushBack(f)
        ul()
 }
 
 func (m *MsgType[T]) register_front(f func(MsgType_tag_data[T]) (disable bool)) {
-       ul := m.lock.Lock(m.to...)
+       ul := m.lock.Lock(m.to...)()
        m.funcs.PushFront(f)
        ul()
 }
@@ -419,7 +419,7 @@ func (m *MsgType[T]) ClearAll() {
                runtime.Gosched()
        }
 
-       ul := m.lock.Lock(m.to...)
+       ul := m.lock.Lock(m.to...)()
        defer ul()
 
        var removes []*list.Element
index 033ffda81ce393bf0b70901e0362eb061e81c7e5..931857ec7cce203fbb892fe22fc09ae44ba1a0b0 100644 (file)
@@ -2,6 +2,7 @@ package part
 
 import (
        "context"
+       "fmt"
        _ "net/http/pprof"
        "sync"
        "testing"
@@ -155,7 +156,7 @@ func BenchmarkXxx(b *testing.B) {
 // }
 
 func Benchmark_1(b *testing.B) {
-       mq := NewTo(time.Second)
+       mq := New()
        mq.Pull_tag_only(`test`, func(a any) (disable bool) {
                return false
        })
@@ -358,6 +359,7 @@ func Test_msgq3(t *testing.T) {
 
        time.Sleep(time.Second)
        for fin_turn := 0; fin_turn < 1000000; fin_turn += 1 {
+               fmt.Printf("\r%d", fin_turn)
                mq.Push_tag(`A1`, fin_turn)
                if fin_turn != <-mun_c {
                        t.Fatal(fin_turn)
index 5c1add80d5a95375fe759f788e6fbfb2e55ff373..b3c17f169f2661157fc7cd19451ec0d5276eb61b 100644 (file)
@@ -18,113 +18,116 @@ type RWMutex struct {
        oll atomic.Int32
 }
 
-func (m *RWMutex) RLock(to ...time.Duration) (unrlock func()) {
-       var callC atomic.Bool
-       if len(to) > 0 {
-               var calls []string
-               for i := 1; true; i++ {
-                       if pc, file, line, ok := runtime.Caller(i); !ok {
-                               break
-                       } else {
-                               calls = append(calls, fmt.Sprintf("%s\n\t%s:%d", runtime.FuncForPC(pc).Name(), file, line))
-                       }
-               }
-               c := time.Now()
-               for m.rlc.Load() < ulock || m.cul.Load() != m.oll.Load() {
-                       if time.Since(c) > to[0] {
-                               panic(fmt.Sprintf("timeout to wait lock, rlc:%d", m.rlc.Load()))
+// RLock() 必须在 lock期间操作的变量所定义的goroutime 中调用
+func (m *RWMutex) RLock(to ...time.Duration) (lockf func() (unlockf func())) {
+       lockid := m.cul.Add(1)
+       return func() (unlockf func()) {
+               var callC atomic.Bool
+               if len(to) > 0 {
+                       var calls []string
+                       for i := 1; true; i++ {
+                               if pc, file, line, ok := runtime.Caller(i); !ok {
+                                       break
+                               } else {
+                                       calls = append(calls, fmt.Sprintf("%s\n\t%s:%d", runtime.FuncForPC(pc).Name(), file, line))
+                               }
                        }
-                       runtime.Gosched()
-               }
-               c = time.Now()
-               go func() {
-                       for !callC.Load() {
+                       c := time.Now()
+                       for m.rlc.Load() < ulock {
                                if time.Since(c) > to[0] {
-                                       panicS := fmt.Sprintf("timeout to run rlock %v > %v\n", time.Since(c), to[0])
-                                       for i := 0; i < len(calls); i++ {
-                                               panicS += fmt.Sprintf("%s\n", calls[i])
+                                       panic(fmt.Sprintf("timeout to wait lock, rlc:%d", m.rlc.Load()))
+                               }
+                               runtime.Gosched()
+                       }
+                       if lockid > m.oll.Load() {
+                               m.oll.Store(lockid)
+                       }
+                       c = time.Now()
+                       go func() {
+                               for !callC.Load() {
+                                       if time.Since(c) > to[0] {
+                                               panicS := fmt.Sprintf("timeout to run rlock %v > %v\n", time.Since(c), to[0])
+                                               for i := 0; i < len(calls); i++ {
+                                                       panicS += fmt.Sprintf("%s\n", calls[i])
+                                               }
+                                               panic(panicS)
                                        }
-                                       panic(panicS)
+                                       runtime.Gosched()
                                }
+                       }()
+               } else {
+                       for m.rlc.Load() < ulock {
+                               time.Sleep(time.Millisecond)
                                runtime.Gosched()
                        }
-               }()
-       } else {
-               for m.rlc.Load() < ulock || m.cul.Load() != m.oll.Load() {
-                       time.Sleep(time.Millisecond)
-                       runtime.Gosched()
+                       if lockid > m.oll.Load() {
+                               m.oll.Store(lockid)
+                       }
                }
-       }
-       m.rlc.Add(1)
-       return func() {
-               if !callC.CompareAndSwap(false, true) {
-                       panic("had unrlock")
+               m.rlc.Add(1)
+               return func() {
+                       if !callC.CompareAndSwap(false, true) {
+                               panic("had unrlock")
+                       }
+                       m.rlc.Add(-1)
                }
-               m.rlc.Add(-1)
        }
 }
 
-func (m *RWMutex) Lock(to ...time.Duration) (unlock func()) {
+// Lock() 必须在 lock期间操作的变量所定义的goroutime 中调用
+func (m *RWMutex) Lock(to ...time.Duration) (lockf func() (unlockf func())) {
        lockid := m.cul.Add(1)
-       var callC atomic.Bool
-       if len(to) > 0 {
-               var calls []string
-               for i := 1; true; i++ {
-                       if pc, file, line, ok := runtime.Caller(i); !ok {
-                               break
-                       } else {
-                               calls = append(calls, fmt.Sprintf("%s\n\t%s:%d", runtime.FuncForPC(pc).Name(), file, line))
+       return func() (unlock func()) {
+               var callC atomic.Bool
+               if len(to) > 0 {
+                       var calls []string
+                       for i := 1; true; i++ {
+                               if pc, file, line, ok := runtime.Caller(i); !ok {
+                                       break
+                               } else {
+                                       calls = append(calls, fmt.Sprintf("%s\n\t%s:%d", runtime.FuncForPC(pc).Name(), file, line))
+                               }
                        }
-               }
-               c := time.Now()
-               for m.rlc.Load() > ulock {
-                       if time.Since(c) > to[0] {
-                               panic(fmt.Sprintf("timeout to wait rlock, rlc:%d", m.rlc.Load()))
+                       c := time.Now()
+                       for m.rlc.Load() > ulock || lockid-1 != m.oll.Load() {
+                               if time.Since(c) > to[0] {
+                                       panic(fmt.Sprintf("timeout to wait rlock, rlc:%d", m.rlc.Load()))
+                               }
+                               runtime.Gosched()
                        }
-                       runtime.Gosched()
-               }
-               for lockid-1 != m.oll.Load() {
-                       if time.Since(c) > to[0] {
-                               panic(fmt.Sprintf("timeout to wait lock, rlc:%d", m.rlc.Load()))
+                       if !m.rlc.CompareAndSwap(ulock, lock) {
+                               panic(fmt.Sprintf("csa error, rlc:%d", m.rlc.Load()))
                        }
-                       runtime.Gosched()
-               }
-               if !m.rlc.CompareAndSwap(ulock, lock) {
-                       panic(fmt.Sprintf("csa error, rlc:%d", m.rlc.Load()))
-               }
-               c = time.Now()
-               go func() {
-                       for !callC.Load() {
-                               if time.Since(c) > to[0] {
-                                       panicS := fmt.Sprintf("timeout to run lock %v > %v\n", time.Since(c), to[0])
-                                       for i := 0; i < len(calls); i++ {
-                                               panicS += fmt.Sprintf("call by %s\n", calls[i])
+                       c = time.Now()
+                       go func() {
+                               for !callC.Load() {
+                                       if time.Since(c) > to[0] {
+                                               panicS := fmt.Sprintf("timeout to run lock %v > %v\n", time.Since(c), to[0])
+                                               for i := 0; i < len(calls); i++ {
+                                                       panicS += fmt.Sprintf("call by %s\n", calls[i])
+                                               }
+                                               panic(panicS)
                                        }
-                                       panic(panicS)
+                                       runtime.Gosched()
                                }
+                       }()
+               } else {
+                       for m.rlc.Load() > ulock || lockid-1 != m.oll.Load() {
+                               time.Sleep(time.Millisecond)
                                runtime.Gosched()
                        }
-               }()
-       } else {
-               for m.rlc.Load() > ulock {
-                       time.Sleep(time.Millisecond)
-                       runtime.Gosched()
-               }
-               for lockid-1 != m.oll.Load() {
-                       time.Sleep(time.Millisecond)
-                       runtime.Gosched()
-               }
-               if !m.rlc.CompareAndSwap(ulock, lock) {
-                       panic(fmt.Sprintf("csa error, rlc:%d", m.rlc.Load()))
-               }
-       }
-       return func() {
-               if !callC.CompareAndSwap(false, true) {
-                       panic("had unlock")
+                       if !m.rlc.CompareAndSwap(ulock, lock) {
+                               panic(fmt.Sprintf("csa error, rlc:%d", m.rlc.Load()))
+                       }
                }
-               if !m.rlc.CompareAndSwap(lock, ulock) {
-                       panic("")
+               return func() {
+                       if !callC.CompareAndSwap(false, true) {
+                               panic("had unlock")
+                       }
+                       if !m.rlc.CompareAndSwap(lock, ulock) {
+                               panic("")
+                       }
+                       m.oll.Store(lockid)
                }
-               m.oll.Store(lockid)
        }
 }
index 5178e9fb8d9081b200fab8b599df2c6ce9eff40b..843253a7a6579b0b71927e655a4631d3e988bc61 100644 (file)
@@ -12,24 +12,26 @@ func TestMain(t *testing.T) {
        var callRL2 time.Time
        var to = time.Second * 2
 
-       ul := rl.RLock(to)
+       ul := rl.RLock(to)()
        callRL = time.Now()
 
+       rlock := rl.RLock(to)
        go func() {
-               ull := rl.RLock(to)
+               unlock := rlock()
                callRL2 = time.Now()
-               ull()
+               unlock()
        }()
 
+       lock := rl.Lock(to)
        go func() {
-               ull := rl.Lock(to)
+               ull := lock()
                callL = time.Now()
                ull()
        }()
 
        time.Sleep(time.Second)
        ul()
-       rl.Lock(to)()
+       rl.Lock(to)()()
 
        if time.Since(callRL) < time.Since(callRL2) {
                t.Fatal()
index 323ec722aa21832e53e83b719fd86e277dbef411..7b3c7d7db5007fcf2cf99a0b6b0f46cdfce197a3 100644 (file)
@@ -87,6 +87,8 @@ func New_client(config *Client) (*Client, error) {
 //             fmt.Println(string(wm.Msg))
 //             return false
 //     })
+//
+// 事件 send rec close exit
 func (o *Client) Handle() (*msgq.MsgType[*WsMsg], error) {
        tmp_Header := make(http.Header)
        for k, v := range o.Header {
@@ -125,7 +127,7 @@ func (o *Client) Handle() (*msgq.MsgType[*WsMsg], error) {
        // rec
        go func() {
                defer func() {
-                       o.msg.PushLock_tag(`close`, nil)
+                       o.msg.PushLock_tag(`exit`, nil)
                        o.msg.ClearAll()
                        o.l.Lock()
                        o.close = true