]> 127.0.0.1 Git - part/.git/commitdiff
Add
authorqydysky <qydysky@foxmail.com>
Sun, 14 May 2023 08:55:12 +0000 (16:55 +0800)
committerqydysky <qydysky@foxmail.com>
Sun, 14 May 2023 08:55:12 +0000 (16:55 +0800)
log/Log.go
log/Log_test.go
msgq/Msgq.go
msgq/Msgq_test.go
reqf/Reqf.go
sync/RWMutex.go
websocket/Client.go

index cefc117a849d6b4d50772b61ff12f3292a54f240..7ab8ed25ddeaab47bbb1e60e7374125ae31b2d60 100644 (file)
@@ -4,11 +4,9 @@ import (
        "io"
        "log"
        "os"
-       "time"
 
-       p "github.com/qydysky/part"
+       f "github.com/qydysky/part/file"
        m "github.com/qydysky/part/msgq"
-       s "github.com/qydysky/part/signal"
 )
 
 var (
@@ -16,20 +14,21 @@ var (
 )
 
 type Log_interface struct {
-       MQ *m.Msgq
+       MQ *m.MsgType[Msg_item]
        Config
 }
 
 type Config struct {
-       File          string
-       Stdout        bool
+       File   string
+       Stdout bool
+
        Prefix_string map[string]struct{}
-       Base_string   []interface{}
+       Base_string   []any
 }
 
 type Msg_item struct {
        Prefix  string
-       Msg_obj []interface{}
+       Msg_obj []any
        Config
 }
 
@@ -40,45 +39,31 @@ func New(c Config) (o *Log_interface) {
                Config: c,
        }
        if c.File != `` {
-               p.File().NewPath(c.File)
+               f.New(c.File, 0, true).Create()
        }
 
-       o.MQ = m.New()
-       o.MQ.Pull_tag(map[string]func(interface{}) bool{
-               `block`: func(data interface{}) bool {
-                       if v, ok := data.(*s.Signal); ok {
-                               v.Done()
-                       }
-                       return false
-               },
-               `L`: func(data interface{}) bool {
-                       msg := data.(Msg_item)
-                       var showObj = []io.Writer{}
-                       if msg.Stdout {
-                               showObj = append(showObj, os.Stdout)
-                       }
-                       if msg.File != `` {
-                               file, err := os.OpenFile(msg.File, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666)
-                               if err == nil {
-                                       showObj = append(showObj, file)
-                                       defer file.Close()
-                               } else {
-                                       log.Println(err)
-                               }
+       o.MQ = m.NewType[Msg_item]()
+       o.MQ.Pull_tag_only(`L`, func(msg Msg_item) bool {
+               var showObj = []io.Writer{}
+               if msg.Stdout {
+                       showObj = append(showObj, os.Stdout)
+               }
+               if msg.File != `` {
+                       file, err := os.OpenFile(msg.File, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666)
+                       if err == nil {
+                               showObj = append(showObj, file)
+                               defer file.Close()
+                       } else {
+                               log.Println(err)
                        }
-                       log.New(io.MultiWriter(showObj...),
-                               msg.Prefix,
-                               log.Ldate|log.Ltime).Println(msg.Msg_obj...)
-                       return false
-               },
-       })
-       { //启动阻塞
-               b := s.Init()
-               for b.Islive() {
-                       o.MQ.Push_tag(`block`, b)
-                       time.Sleep(time.Duration(20) * time.Millisecond)
                }
-       }
+               log.New(io.MultiWriter(showObj...),
+                       msg.Prefix,
+                       log.Ldate|log.Ltime).Println(msg.Msg_obj...)
+               return false
+       })
+       //启动阻塞
+       o.MQ.PushLock_tag(`block`, Msg_item{})
        return
 }
 
@@ -87,20 +72,15 @@ func Copy(i *Log_interface) (o *Log_interface) {
                Config: (*i).Config,
                MQ:     (*i).MQ,
        }
-       { //启动阻塞
-               b := s.Init()
-               for b.Islive() {
-                       o.MQ.Push_tag(`block`, b)
-                       time.Sleep(time.Duration(20) * time.Millisecond)
-               }
-       }
+       //启动阻塞
+       o.MQ.PushLock_tag(`block`, Msg_item{})
        return
 }
 
 // Level 设置之后日志等级
 func (I *Log_interface) Level(log map[string]struct{}) (O *Log_interface) {
        O = Copy(I)
-       for k, _ := range O.Prefix_string {
+       for k := range O.Prefix_string {
                if _, ok := log[k]; !ok {
                        delete(O.Prefix_string, k)
                }
@@ -117,40 +97,52 @@ func (I *Log_interface) Log_show_control(show bool) (O *Log_interface) {
        return
 }
 
+func (I *Log_interface) LShow(show bool) (O *Log_interface) {
+       return I.Log_show_control(show)
+}
+
 // Open 日志输出至文件
 func (I *Log_interface) Log_to_file(fileP string) (O *Log_interface) {
        O = I
        //
        O.Block(100)
-       O.File = fileP
        if O.File != `` {
-               p.File().NewPath(O.File)
+               O.File = fileP
+               f.New(O.File, 0, true).Create()
+       } else {
+               O.File = ``
        }
        return
 }
 
+func (I *Log_interface) LFile(fileP string) (O *Log_interface) {
+       return I.Log_to_file(fileP)
+}
+
 // Block 阻塞直到本轮日志输出完毕
-func (I *Log_interface) Block(timeout int) (O *Log_interface) {
+func (I *Log_interface) Block(ms int) (O *Log_interface) {
        O = I
-       b := s.Init()
-       O.MQ.Push_tag(`block`, b)
-       b.Wait()
+       O.MQ.PushLock_tag(`block`, Msg_item{})
        return
 }
 
+func (I *Log_interface) Close() {
+       I.MQ.ClearAll()
+}
+
 // 日志等级
 // Base 追加到后续输出
-func (I *Log_interface) Base(i ...interface{}) (O *Log_interface) {
+func (I *Log_interface) Base(i ...any) (O *Log_interface) {
        O = Copy(I)
        O.Base_string = i
        return
 }
-func (I *Log_interface) Base_add(i ...interface{}) (O *Log_interface) {
+func (I *Log_interface) Base_add(i ...any) (O *Log_interface) {
        O = Copy(I)
        O.Base_string = append(O.Base_string, i...)
        return
 }
-func (I *Log_interface) L(prefix string, i ...interface{}) (O *Log_interface) {
+func (I *Log_interface) L(prefix string, i ...any) (O *Log_interface) {
        O = I
        if _, ok := O.Prefix_string[prefix]; !ok {
                return
index fe00760d835407f231550f621cb1733e424313bd..0c79679988482e29a7c6ce04319a93da510dccb5 100644 (file)
@@ -1,66 +1,55 @@
 package part
 
 import (
-    // "fmt"
-    "runtime"
-    "time"
-    "testing"
-    
+       // "fmt"
+
+       "testing"
+       "time"
+
        "net/http"
        _ "net/http/pprof"
 )
 
-type test_item struct {
-       data string
-}
-
 func Test_1(t *testing.T) {
-    n := New(Config{
-        File:`1.log`,
-        Stdout:true,
-        Prefix_string:map[string]struct{}{`T:`:On,`I:`:On,`W:`:On,`E:`:On},
-    })
-
-    n.L(`T:`,`s`).L(`I:`,`s`).Block(1000)
-    n.Log_to_file(`2.log`).L(`W:`,`s`).L(`E:`,`s`)
-
-    {
-        n1 := n.Base(`>1`)
-        n1.L(`T:`,`s`).L(`I:`,`s`)
-        {
-            n2 := n1.Base_add(`>2`)
-            n2.L(`T:`,`s`).L(`I:`,`s`)
-        }
-    }
-
-    n.Level(map[string]struct{}{`W:`:On}).L(`T:`,`s`).L(`I:`,`s`).L(`W:`,`s`).L(`E:`,`s`)
-    n.Block(1000)
+       n := New(Config{
+               File:          `1.log`,
+               Stdout:        true,
+               Prefix_string: map[string]struct{}{`T:`: On, `I:`: On, `W:`: On, `E:`: On},
+       })
+
+       n.L(`T:`, `s`).L(`I:`, `s`).Block(1000)
+       n.Log_to_file(`2.log`).L(`W:`, `s`).L(`E:`, `s`)
+
+       {
+               n1 := n.Base(`>1`)
+               n1.L(`T:`, `s`).L(`I:`, `s`)
+               {
+                       n2 := n1.Base_add(`>2`)
+                       n2.L(`T:`, `s`).L(`I:`, `s`)
+               }
+       }
+
+       n.Level(map[string]struct{}{`W:`: On}).L(`T:`, `s`).L(`I:`, `s`).L(`W:`, `s`).L(`E:`, `s`)
+       n.Block(1000)
 }
 
-
 var n *Log_interface
 
 func Test_2(t *testing.T) {
-    n = New(Config{
-        File:`1.log`,
-        Stdout:true,
-        Prefix_string:map[string]struct{}{`T:`:On,`I:`:On,`W:`:On,`E:`:On},
-    })
+       n = New(Config{
+               File:          `1.log`,
+               Stdout:        true,
+               Prefix_string: map[string]struct{}{`T:`: On, `I:`: On, `W:`: On, `E:`: On},
+       })
 
        go func() {
                http.ListenAndServe("0.0.0.0:8899", nil)
-    }()
-    // n = nil
-    for {
-        n:=n.Base_add(`>1`)
-        n.L(`T:`,`s`)
-        time.Sleep(time.Second*time.Duration(1))
-        // n=nil
-    }
-    n.L(`T:`,`s`)
-    runtime.GC()
-    time.Sleep(time.Second*time.Duration(1000))
-    // fmt.Printf("%p %p\n",n.MQ,n1.MQ)
-    // n1 = nil
-    // fmt.Println(n)
-}
\ No newline at end of file
+       }()
+       // n = nil
+       for {
+               n := n.Base_add(`>1`)
+               n.L(`T:`, `s`)
+               time.Sleep(time.Second * time.Duration(1))
+               // n=nil
+       }
+}
index 5f1c3a0502eebb9861db9e80d76f54ed9324ac88..117545e92198d770a7a55d722ca93f0f59ffcca0 100644 (file)
@@ -30,6 +30,7 @@ func New() *Msgq {
 }
 
 func NewTo(to time.Duration) *Msgq {
+       fmt.Println("Warn: NewTo is slow, consider New")
        m := new(Msgq)
        m.funcs = list.New()
        if to != 0 {
@@ -195,11 +196,10 @@ func (m *Msgq) Pull_tag_chan(key string, size int, ctx context.Context) <-chan a
                                close(ch)
                                return true
                        default:
-                               if len(ch) == size {
+                               for len(ch) != 0 {
                                        <-ch
                                }
                                ch <- d.Data
-                               return false
                        }
                }
                return false
@@ -266,28 +266,108 @@ func (m *Msgq) Pull_tag_async(func_map map[string]func(any) (disable bool)) {
 }
 
 type MsgType[T any] struct {
-       m *Msgq
+       to             []time.Duration
+       funcs          *list.List
+       someNeedRemove atomic.Int32
+       lock           sync.RWMutex
+       runTag         sync.Map
+}
+
+type MsgType_tag_data[T any] struct {
+       Tag  string
+       Data T
 }
 
 func NewType[T any]() *MsgType[T] {
-       return &MsgType[T]{
-               m: New(),
-       }
+       m := new(MsgType[T])
+       m.funcs = list.New()
+       return m
 }
 
 func NewTypeTo[T any](to time.Duration) *MsgType[T] {
-       return &MsgType[T]{
-               m: NewTo(to),
+       fmt.Println("Warn: NewTypeTo[T any] is slow, consider NewType[T any]")
+       m := new(MsgType[T])
+       m.funcs = list.New()
+       if to != 0 {
+               m.to = append(m.to, to)
+       } else {
+               m.to = append(m.to, time.Second*30)
+       }
+       return m
+}
+
+func (m *MsgType[T]) push(msg MsgType_tag_data[T]) {
+       for m.someNeedRemove.Load() != 0 {
+               time.Sleep(time.Millisecond)
+               runtime.Gosched()
+       }
+
+       var removes []*list.Element
+
+       ul := m.lock.RLock(m.to...)
+       for el := m.funcs.Front(); el != nil; el = el.Next() {
+               if disable := el.Value.(func(MsgType_tag_data[T]) bool)(msg); disable {
+                       m.someNeedRemove.Add(1)
+                       removes = append(removes, el)
+               }
+       }
+       ul()
+
+       if len(removes) != 0 {
+               ul := m.lock.Lock(m.to...)
+               m.someNeedRemove.Add(-int32(len(removes)))
+               for i := 0; i < len(removes); i++ {
+                       m.funcs.Remove(removes[i])
+               }
+               ul()
+       }
+}
+
+func (m *MsgType[T]) pushLock(msg MsgType_tag_data[T]) {
+       for m.someNeedRemove.Load() != 0 {
+               time.Sleep(time.Millisecond)
+               runtime.Gosched()
+       }
+
+       ul := m.lock.Lock(m.to...)
+       defer ul()
+
+       var removes []*list.Element
+
+       for el := m.funcs.Front(); el != nil; el = el.Next() {
+               if disable := el.Value.(func(MsgType_tag_data[T]) bool)(msg); disable {
+                       m.someNeedRemove.Add(1)
+                       removes = append(removes, el)
+               }
+       }
+
+       if len(removes) != 0 {
+               m.someNeedRemove.Add(-int32(len(removes)))
+               for i := 0; i < len(removes); i++ {
+                       m.funcs.Remove(removes[i])
+               }
        }
 }
 
+func (m *MsgType[T]) register(f func(MsgType_tag_data[T]) (disable bool)) {
+       ul := m.lock.Lock(m.to...)
+       m.funcs.PushBack(f)
+       ul()
+}
+
+func (m *MsgType[T]) register_front(f func(MsgType_tag_data[T]) (disable bool)) {
+       ul := m.lock.Lock(m.to...)
+       m.funcs.PushFront(f)
+       ul()
+}
+
 func (m *MsgType[T]) Push_tag(Tag string, Data T) {
-       if len(m.m.to) > 0 {
+       if len(m.to) > 0 {
                ptr := uintptr(unsafe.Pointer(&Data))
-               m.m.runTag.Store(ptr, "[T]Push_tag(`"+Tag+"`,...)")
+               m.runTag.Store(ptr, "[T]Push_tag(`"+Tag+"`,...)")
                defer func() {
                        if e := recover(); e != nil {
-                               m.m.runTag.Range(func(key, value any) bool {
+                               m.runTag.Range(func(key, value any) bool {
                                        if key == ptr {
                                                fmt.Printf("%v panic > %v\n", value, e)
                                        } else {
@@ -295,25 +375,25 @@ func (m *MsgType[T]) Push_tag(Tag string, Data T) {
                                        }
                                        return true
                                })
-                               m.m.runTag.ClearAll()
+                               m.runTag.ClearAll()
                                panic(e)
                        }
-                       m.m.runTag.Delete(ptr)
+                       m.runTag.Delete(ptr)
                }()
        }
-       m.m.Push(Msgq_tag_data{
+       m.push(MsgType_tag_data[T]{
                Tag:  Tag,
                Data: Data,
        })
 }
 
 func (m *MsgType[T]) PushLock_tag(Tag string, Data T) {
-       if len(m.m.to) > 0 {
+       if len(m.to) > 0 {
                ptr := uintptr(unsafe.Pointer(&Data))
-               m.m.runTag.Store(ptr, "[T]PushLock_tag(`"+Tag+"`,...)")
+               m.runTag.Store(ptr, "[T]PushLock_tag(`"+Tag+"`,...)")
                defer func() {
                        if e := recover(); e != nil {
-                               m.m.runTag.Range(func(key, value any) bool {
+                               m.runTag.Range(func(key, value any) bool {
                                        if key == ptr {
                                                fmt.Printf("%v panic > %v\n", value, e)
                                        } else {
@@ -321,36 +401,55 @@ func (m *MsgType[T]) PushLock_tag(Tag string, Data T) {
                                        }
                                        return true
                                })
-                               m.m.runTag.ClearAll()
+                               m.runTag.ClearAll()
                                panic(e)
                        }
-                       m.m.runTag.Delete(ptr)
+                       m.runTag.Delete(ptr)
                }()
        }
-       m.m.PushLock(Msgq_tag_data{
+       m.pushLock(MsgType_tag_data[T]{
                Tag:  Tag,
                Data: Data,
        })
 }
 
 func (m *MsgType[T]) ClearAll() {
-       m.m.ClearAll()
+       for m.someNeedRemove.Load() != 0 {
+               time.Sleep(time.Millisecond)
+               runtime.Gosched()
+       }
+
+       ul := m.lock.Lock(m.to...)
+       defer ul()
+
+       var removes []*list.Element
+
+       for el := m.funcs.Front(); el != nil; el = el.Next() {
+               m.someNeedRemove.Add(1)
+               removes = append(removes, el)
+       }
+
+       if len(removes) != 0 {
+               m.someNeedRemove.Add(-int32(len(removes)))
+               for i := 0; i < len(removes); i++ {
+                       m.funcs.Remove(removes[i])
+               }
+       }
 }
 
 func (m *MsgType[T]) Pull_tag_chan(key string, size int, ctx context.Context) <-chan T {
        var ch = make(chan T, size)
-       m.m.Register(func(data any) bool {
-               if d, ok := data.(Msgq_tag_data); ok && d.Tag == key {
+       m.register(func(data MsgType_tag_data[T]) bool {
+               if data.Tag == key {
                        select {
                        case <-ctx.Done():
                                close(ch)
                                return true
                        default:
-                               if len(ch) == size {
+                               for len(ch) != 0 {
                                        <-ch
                                }
-                               ch <- d.Data.(T)
-                               return false
+                               ch <- data.Data
                        }
                }
                return false
@@ -359,20 +458,18 @@ func (m *MsgType[T]) Pull_tag_chan(key string, size int, ctx context.Context) <-
 }
 
 func (m *MsgType[T]) Pull_tag_only(key string, f func(T) (disable bool)) {
-       m.m.Register(func(data any) (disable bool) {
-               if d, ok := data.(Msgq_tag_data); ok && d.Tag == key {
-                       return f(d.Data.(T))
+       m.register(func(data MsgType_tag_data[T]) (disable bool) {
+               if data.Tag == key {
+                       return f(data.Data)
                }
                return false
        })
 }
 
 func (m *MsgType[T]) Pull_tag(func_map map[string]func(T) (disable bool)) {
-       m.m.Register(func(data any) (disable bool) {
-               if d, ok := data.(Msgq_tag_data); ok {
-                       if f, ok := func_map[d.Tag]; ok {
-                               return f(d.Data.(T))
-                       }
+       m.register(func(data MsgType_tag_data[T]) (disable bool) {
+               if f, ok := func_map[data.Tag]; ok {
+                       return f(data.Data)
                }
                return false
        })
@@ -381,13 +478,13 @@ func (m *MsgType[T]) Pull_tag(func_map map[string]func(T) (disable bool)) {
 func (m *MsgType[T]) Pull_tag_async_only(key string, f func(T) (disable bool)) {
        var disable = signal.Init()
 
-       m.m.Register_front(func(data any) bool {
+       m.register_front(func(data MsgType_tag_data[T]) bool {
                if !disable.Islive() {
                        return true
                }
-               if d, ok := data.(Msgq_tag_data); ok && d.Tag == key {
+               if data.Tag == key {
                        go func() {
-                               if f(d.Data.(T)) {
+                               if f(data.Data) {
                                        disable.Done()
                                }
                        }()
@@ -399,18 +496,16 @@ func (m *MsgType[T]) Pull_tag_async_only(key string, f func(T) (disable bool)) {
 func (m *MsgType[T]) Pull_tag_async(func_map map[string]func(T) (disable bool)) {
        var disable = signal.Init()
 
-       m.m.Register_front(func(data any) bool {
+       m.register_front(func(data MsgType_tag_data[T]) bool {
                if !disable.Islive() {
                        return true
                }
-               if d, ok := data.(Msgq_tag_data); ok {
-                       if f, ok := func_map[d.Tag]; ok {
-                               go func() {
-                                       if f(d.Data.(T)) {
-                                               disable.Done()
-                                       }
-                               }()
-                       }
+               if f, ok := func_map[data.Tag]; ok {
+                       go func() {
+                               if f(data.Data) {
+                                       disable.Done()
+                               }
+                       }()
                }
                return false
        })
index 2a4f3221b3dc07e018993bf0704ea7411389925e..7cf06b5af51e32fc1be07990ee7ca1c6b2caace7 100644 (file)
@@ -141,7 +141,7 @@ func BenchmarkXxx(b *testing.B) {
 
 func TestPushLock(t *testing.T) {
        defer func() {
-               if e := recover(); e.(string) != "timeout to wait rlock, rlc:1" {
+               if e := recover(); e != nil {
                        t.Fatal(e)
                }
        }()
@@ -171,6 +171,9 @@ func Test_Pull_tag_chan(t *testing.T) {
        for i := 0; i < 5; i++ {
                mq.Push_tag(`a`, i)
        }
+       if len(ch) != 1 {
+               t.Fatal()
+       }
        var o = 0
        for s := true; s; {
                select {
@@ -180,7 +183,7 @@ func Test_Pull_tag_chan(t *testing.T) {
                        s = false
                }
        }
-       if o != 7 {
+       if o != 4 {
                t.Fatal()
        }
        select {
@@ -200,6 +203,16 @@ func Test_Pull_tag_chan(t *testing.T) {
        }
 }
 
+func Test_Pull_tag_chan2(t *testing.T) {
+       mq := New()
+
+       mq.Pull_tag_chan(`a`, 1, context.Background())
+       go func() {
+               mq.PushLock_tag(`a`, nil)
+               mq.PushLock_tag(`a`, nil)
+       }()
+}
+
 func Test_msgq1(t *testing.T) {
        mq := New()
        c := make(chan time.Time, 10)
index d7d73acaafa96b3abc4712f9e4c2b9c16394a0f8..3cdc79de05118f678f466fe3e42a9ab86b53f69a 100644 (file)
@@ -106,6 +106,9 @@ func (t *Req) Reqf(val Rval) error {
                        for len(t.cancelFs) != 0 {
                                <-t.cancelFs
                        }
+                       t.Respon = t.Respon[:0]
+                       t.responBuf.Reset()
+
                        t.err = t.Reqf_1(_val)
                        if t.err == nil || IsCancel(t.err) {
                                break
index 48bda30f5601666dc247cd278105ed5b13c61919..5c1add80d5a95375fe759f788e6fbfb2e55ff373 100644 (file)
@@ -19,22 +19,43 @@ type RWMutex struct {
 }
 
 func (m *RWMutex) RLock(to ...time.Duration) (unrlock func()) {
+       var callC atomic.Bool
        if len(to) > 0 {
+               var calls []string
+               for i := 1; true; i++ {
+                       if pc, file, line, ok := runtime.Caller(i); !ok {
+                               break
+                       } else {
+                               calls = append(calls, fmt.Sprintf("%s\n\t%s:%d", runtime.FuncForPC(pc).Name(), file, line))
+                       }
+               }
                c := time.Now()
-               for m.rlc.Load() < ulock {
+               for m.rlc.Load() < ulock || m.cul.Load() != m.oll.Load() {
                        if time.Since(c) > to[0] {
                                panic(fmt.Sprintf("timeout to wait lock, rlc:%d", m.rlc.Load()))
                        }
                        runtime.Gosched()
                }
+               c = time.Now()
+               go func() {
+                       for !callC.Load() {
+                               if time.Since(c) > to[0] {
+                                       panicS := fmt.Sprintf("timeout to run rlock %v > %v\n", time.Since(c), to[0])
+                                       for i := 0; i < len(calls); i++ {
+                                               panicS += fmt.Sprintf("%s\n", calls[i])
+                                       }
+                                       panic(panicS)
+                               }
+                               runtime.Gosched()
+                       }
+               }()
        } else {
-               for m.rlc.Load() < ulock {
+               for m.rlc.Load() < ulock || m.cul.Load() != m.oll.Load() {
                        time.Sleep(time.Millisecond)
                        runtime.Gosched()
                }
        }
        m.rlc.Add(1)
-       var callC atomic.Bool
        return func() {
                if !callC.CompareAndSwap(false, true) {
                        panic("had unrlock")
@@ -45,7 +66,16 @@ func (m *RWMutex) RLock(to ...time.Duration) (unrlock func()) {
 
 func (m *RWMutex) Lock(to ...time.Duration) (unlock func()) {
        lockid := m.cul.Add(1)
+       var callC atomic.Bool
        if len(to) > 0 {
+               var calls []string
+               for i := 1; true; i++ {
+                       if pc, file, line, ok := runtime.Caller(i); !ok {
+                               break
+                       } else {
+                               calls = append(calls, fmt.Sprintf("%s\n\t%s:%d", runtime.FuncForPC(pc).Name(), file, line))
+                       }
+               }
                c := time.Now()
                for m.rlc.Load() > ulock {
                        if time.Since(c) > to[0] {
@@ -60,8 +90,21 @@ func (m *RWMutex) Lock(to ...time.Duration) (unlock func()) {
                        runtime.Gosched()
                }
                if !m.rlc.CompareAndSwap(ulock, lock) {
-                       panic("csa error, bug")
+                       panic(fmt.Sprintf("csa error, rlc:%d", m.rlc.Load()))
                }
+               c = time.Now()
+               go func() {
+                       for !callC.Load() {
+                               if time.Since(c) > to[0] {
+                                       panicS := fmt.Sprintf("timeout to run lock %v > %v\n", time.Since(c), to[0])
+                                       for i := 0; i < len(calls); i++ {
+                                               panicS += fmt.Sprintf("call by %s\n", calls[i])
+                                       }
+                                       panic(panicS)
+                               }
+                               runtime.Gosched()
+                       }
+               }()
        } else {
                for m.rlc.Load() > ulock {
                        time.Sleep(time.Millisecond)
@@ -72,10 +115,9 @@ func (m *RWMutex) Lock(to ...time.Duration) (unlock func()) {
                        runtime.Gosched()
                }
                if !m.rlc.CompareAndSwap(ulock, lock) {
-                       panic("csa error, bug")
+                       panic(fmt.Sprintf("csa error, rlc:%d", m.rlc.Load()))
                }
        }
-       var callC atomic.Bool
        return func() {
                if !callC.CompareAndSwap(false, true) {
                        panic("had unlock")
index 4d3d646fa814b1cf0efc44d031cee7ac05e1e29e..bad06e468e358b8f4bef9dcbe5a5f18d3c24e659 100644 (file)
@@ -176,6 +176,7 @@ func (o *Client) Handle() (*msgq.MsgType[*WsMsg], error) {
                if wm.Type == 0 {
                        wm.Type = websocket.TextMessage
                }
+               c.SetWriteDeadline(time.Now().Add(time.Duration(o.TO * int(time.Millisecond))))
                if err := c.WriteMessage(wm.Type, wm.Msg); err != nil {
                        o.error(err)
                        o.msg.ClearAll()