From: qydysky Date: Sun, 14 May 2023 08:55:12 +0000 (+0800) Subject: Add X-Git-Tag: v0.27.0~2 X-Git-Url: http://127.0.0.1:8081/?a=commitdiff_plain;h=b22d189cc7a113ae5908ac708a5ac899b708f840;p=part%2F.git Add --- diff --git a/log/Log.go b/log/Log.go index cefc117..7ab8ed2 100644 --- a/log/Log.go +++ b/log/Log.go @@ -4,11 +4,9 @@ import ( "io" "log" "os" - "time" - p "github.com/qydysky/part" + f "github.com/qydysky/part/file" m "github.com/qydysky/part/msgq" - s "github.com/qydysky/part/signal" ) var ( @@ -16,20 +14,21 @@ var ( ) type Log_interface struct { - MQ *m.Msgq + MQ *m.MsgType[Msg_item] Config } type Config struct { - File string - Stdout bool + File string + Stdout bool + Prefix_string map[string]struct{} - Base_string []interface{} + Base_string []any } type Msg_item struct { Prefix string - Msg_obj []interface{} + Msg_obj []any Config } @@ -40,45 +39,31 @@ func New(c Config) (o *Log_interface) { Config: c, } if c.File != `` { - p.File().NewPath(c.File) + f.New(c.File, 0, true).Create() } - o.MQ = m.New() - o.MQ.Pull_tag(map[string]func(interface{}) bool{ - `block`: func(data interface{}) bool { - if v, ok := data.(*s.Signal); ok { - v.Done() - } - return false - }, - `L`: func(data interface{}) bool { - msg := data.(Msg_item) - var showObj = []io.Writer{} - if msg.Stdout { - showObj = append(showObj, os.Stdout) - } - if msg.File != `` { - file, err := os.OpenFile(msg.File, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666) - if err == nil { - showObj = append(showObj, file) - defer file.Close() - } else { - log.Println(err) - } + o.MQ = m.NewType[Msg_item]() + o.MQ.Pull_tag_only(`L`, func(msg Msg_item) bool { + var showObj = []io.Writer{} + if msg.Stdout { + showObj = append(showObj, os.Stdout) + } + if msg.File != `` { + file, err := os.OpenFile(msg.File, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666) + if err == nil { + showObj = append(showObj, file) + defer file.Close() + } else { + log.Println(err) } - log.New(io.MultiWriter(showObj...), - msg.Prefix, - log.Ldate|log.Ltime).Println(msg.Msg_obj...) - return false - }, - }) - { //启动阻塞 - b := s.Init() - for b.Islive() { - o.MQ.Push_tag(`block`, b) - time.Sleep(time.Duration(20) * time.Millisecond) } - } + log.New(io.MultiWriter(showObj...), + msg.Prefix, + log.Ldate|log.Ltime).Println(msg.Msg_obj...) + return false + }) + //启动阻塞 + o.MQ.PushLock_tag(`block`, Msg_item{}) return } @@ -87,20 +72,15 @@ func Copy(i *Log_interface) (o *Log_interface) { Config: (*i).Config, MQ: (*i).MQ, } - { //启动阻塞 - b := s.Init() - for b.Islive() { - o.MQ.Push_tag(`block`, b) - time.Sleep(time.Duration(20) * time.Millisecond) - } - } + //启动阻塞 + o.MQ.PushLock_tag(`block`, Msg_item{}) return } // Level 设置之后日志等级 func (I *Log_interface) Level(log map[string]struct{}) (O *Log_interface) { O = Copy(I) - for k, _ := range O.Prefix_string { + for k := range O.Prefix_string { if _, ok := log[k]; !ok { delete(O.Prefix_string, k) } @@ -117,40 +97,52 @@ func (I *Log_interface) Log_show_control(show bool) (O *Log_interface) { return } +func (I *Log_interface) LShow(show bool) (O *Log_interface) { + return I.Log_show_control(show) +} + // Open 日志输出至文件 func (I *Log_interface) Log_to_file(fileP string) (O *Log_interface) { O = I // O.Block(100) - O.File = fileP if O.File != `` { - p.File().NewPath(O.File) + O.File = fileP + f.New(O.File, 0, true).Create() + } else { + O.File = `` } return } +func (I *Log_interface) LFile(fileP string) (O *Log_interface) { + return I.Log_to_file(fileP) +} + // Block 阻塞直到本轮日志输出完毕 -func (I *Log_interface) Block(timeout int) (O *Log_interface) { +func (I *Log_interface) Block(ms int) (O *Log_interface) { O = I - b := s.Init() - O.MQ.Push_tag(`block`, b) - b.Wait() + O.MQ.PushLock_tag(`block`, Msg_item{}) return } +func (I *Log_interface) Close() { + I.MQ.ClearAll() +} + // 日志等级 // Base 追加到后续输出 -func (I *Log_interface) Base(i ...interface{}) (O *Log_interface) { +func (I *Log_interface) Base(i ...any) (O *Log_interface) { O = Copy(I) O.Base_string = i return } -func (I *Log_interface) Base_add(i ...interface{}) (O *Log_interface) { +func (I *Log_interface) Base_add(i ...any) (O *Log_interface) { O = Copy(I) O.Base_string = append(O.Base_string, i...) return } -func (I *Log_interface) L(prefix string, i ...interface{}) (O *Log_interface) { +func (I *Log_interface) L(prefix string, i ...any) (O *Log_interface) { O = I if _, ok := O.Prefix_string[prefix]; !ok { return diff --git a/log/Log_test.go b/log/Log_test.go index fe00760..0c79679 100644 --- a/log/Log_test.go +++ b/log/Log_test.go @@ -1,66 +1,55 @@ package part import ( - // "fmt" - "runtime" - "time" - "testing" - + // "fmt" + + "testing" + "time" + "net/http" _ "net/http/pprof" ) -type test_item struct { - data string -} - func Test_1(t *testing.T) { - n := New(Config{ - File:`1.log`, - Stdout:true, - Prefix_string:map[string]struct{}{`T:`:On,`I:`:On,`W:`:On,`E:`:On}, - }) - - n.L(`T:`,`s`).L(`I:`,`s`).Block(1000) - n.Log_to_file(`2.log`).L(`W:`,`s`).L(`E:`,`s`) - - { - n1 := n.Base(`>1`) - n1.L(`T:`,`s`).L(`I:`,`s`) - { - n2 := n1.Base_add(`>2`) - n2.L(`T:`,`s`).L(`I:`,`s`) - } - } - - n.Level(map[string]struct{}{`W:`:On}).L(`T:`,`s`).L(`I:`,`s`).L(`W:`,`s`).L(`E:`,`s`) - n.Block(1000) + n := New(Config{ + File: `1.log`, + Stdout: true, + Prefix_string: map[string]struct{}{`T:`: On, `I:`: On, `W:`: On, `E:`: On}, + }) + + n.L(`T:`, `s`).L(`I:`, `s`).Block(1000) + n.Log_to_file(`2.log`).L(`W:`, `s`).L(`E:`, `s`) + + { + n1 := n.Base(`>1`) + n1.L(`T:`, `s`).L(`I:`, `s`) + { + n2 := n1.Base_add(`>2`) + n2.L(`T:`, `s`).L(`I:`, `s`) + } + } + + n.Level(map[string]struct{}{`W:`: On}).L(`T:`, `s`).L(`I:`, `s`).L(`W:`, `s`).L(`E:`, `s`) + n.Block(1000) } - var n *Log_interface func Test_2(t *testing.T) { - n = New(Config{ - File:`1.log`, - Stdout:true, - Prefix_string:map[string]struct{}{`T:`:On,`I:`:On,`W:`:On,`E:`:On}, - }) + n = New(Config{ + File: `1.log`, + Stdout: true, + Prefix_string: map[string]struct{}{`T:`: On, `I:`: On, `W:`: On, `E:`: On}, + }) go func() { http.ListenAndServe("0.0.0.0:8899", nil) - }() - // n = nil - for { - n:=n.Base_add(`>1`) - n.L(`T:`,`s`) - time.Sleep(time.Second*time.Duration(1)) - // n=nil - } - n.L(`T:`,`s`) - runtime.GC() - time.Sleep(time.Second*time.Duration(1000)) - // fmt.Printf("%p %p\n",n.MQ,n1.MQ) - // n1 = nil - // fmt.Println(n) -} \ No newline at end of file + }() + // n = nil + for { + n := n.Base_add(`>1`) + n.L(`T:`, `s`) + time.Sleep(time.Second * time.Duration(1)) + // n=nil + } +} diff --git a/msgq/Msgq.go b/msgq/Msgq.go index 5f1c3a0..117545e 100644 --- a/msgq/Msgq.go +++ b/msgq/Msgq.go @@ -30,6 +30,7 @@ func New() *Msgq { } func NewTo(to time.Duration) *Msgq { + fmt.Println("Warn: NewTo is slow, consider New") m := new(Msgq) m.funcs = list.New() if to != 0 { @@ -195,11 +196,10 @@ func (m *Msgq) Pull_tag_chan(key string, size int, ctx context.Context) <-chan a close(ch) return true default: - if len(ch) == size { + for len(ch) != 0 { <-ch } ch <- d.Data - return false } } return false @@ -266,28 +266,108 @@ func (m *Msgq) Pull_tag_async(func_map map[string]func(any) (disable bool)) { } type MsgType[T any] struct { - m *Msgq + to []time.Duration + funcs *list.List + someNeedRemove atomic.Int32 + lock sync.RWMutex + runTag sync.Map +} + +type MsgType_tag_data[T any] struct { + Tag string + Data T } func NewType[T any]() *MsgType[T] { - return &MsgType[T]{ - m: New(), - } + m := new(MsgType[T]) + m.funcs = list.New() + return m } func NewTypeTo[T any](to time.Duration) *MsgType[T] { - return &MsgType[T]{ - m: NewTo(to), + fmt.Println("Warn: NewTypeTo[T any] is slow, consider NewType[T any]") + m := new(MsgType[T]) + m.funcs = list.New() + if to != 0 { + m.to = append(m.to, to) + } else { + m.to = append(m.to, time.Second*30) + } + return m +} + +func (m *MsgType[T]) push(msg MsgType_tag_data[T]) { + for m.someNeedRemove.Load() != 0 { + time.Sleep(time.Millisecond) + runtime.Gosched() + } + + var removes []*list.Element + + ul := m.lock.RLock(m.to...) + for el := m.funcs.Front(); el != nil; el = el.Next() { + if disable := el.Value.(func(MsgType_tag_data[T]) bool)(msg); disable { + m.someNeedRemove.Add(1) + removes = append(removes, el) + } + } + ul() + + if len(removes) != 0 { + ul := m.lock.Lock(m.to...) + m.someNeedRemove.Add(-int32(len(removes))) + for i := 0; i < len(removes); i++ { + m.funcs.Remove(removes[i]) + } + ul() + } +} + +func (m *MsgType[T]) pushLock(msg MsgType_tag_data[T]) { + for m.someNeedRemove.Load() != 0 { + time.Sleep(time.Millisecond) + runtime.Gosched() + } + + ul := m.lock.Lock(m.to...) + defer ul() + + var removes []*list.Element + + for el := m.funcs.Front(); el != nil; el = el.Next() { + if disable := el.Value.(func(MsgType_tag_data[T]) bool)(msg); disable { + m.someNeedRemove.Add(1) + removes = append(removes, el) + } + } + + if len(removes) != 0 { + m.someNeedRemove.Add(-int32(len(removes))) + for i := 0; i < len(removes); i++ { + m.funcs.Remove(removes[i]) + } } } +func (m *MsgType[T]) register(f func(MsgType_tag_data[T]) (disable bool)) { + ul := m.lock.Lock(m.to...) + m.funcs.PushBack(f) + ul() +} + +func (m *MsgType[T]) register_front(f func(MsgType_tag_data[T]) (disable bool)) { + ul := m.lock.Lock(m.to...) + m.funcs.PushFront(f) + ul() +} + func (m *MsgType[T]) Push_tag(Tag string, Data T) { - if len(m.m.to) > 0 { + if len(m.to) > 0 { ptr := uintptr(unsafe.Pointer(&Data)) - m.m.runTag.Store(ptr, "[T]Push_tag(`"+Tag+"`,...)") + m.runTag.Store(ptr, "[T]Push_tag(`"+Tag+"`,...)") defer func() { if e := recover(); e != nil { - m.m.runTag.Range(func(key, value any) bool { + m.runTag.Range(func(key, value any) bool { if key == ptr { fmt.Printf("%v panic > %v\n", value, e) } else { @@ -295,25 +375,25 @@ func (m *MsgType[T]) Push_tag(Tag string, Data T) { } return true }) - m.m.runTag.ClearAll() + m.runTag.ClearAll() panic(e) } - m.m.runTag.Delete(ptr) + m.runTag.Delete(ptr) }() } - m.m.Push(Msgq_tag_data{ + m.push(MsgType_tag_data[T]{ Tag: Tag, Data: Data, }) } func (m *MsgType[T]) PushLock_tag(Tag string, Data T) { - if len(m.m.to) > 0 { + if len(m.to) > 0 { ptr := uintptr(unsafe.Pointer(&Data)) - m.m.runTag.Store(ptr, "[T]PushLock_tag(`"+Tag+"`,...)") + m.runTag.Store(ptr, "[T]PushLock_tag(`"+Tag+"`,...)") defer func() { if e := recover(); e != nil { - m.m.runTag.Range(func(key, value any) bool { + m.runTag.Range(func(key, value any) bool { if key == ptr { fmt.Printf("%v panic > %v\n", value, e) } else { @@ -321,36 +401,55 @@ func (m *MsgType[T]) PushLock_tag(Tag string, Data T) { } return true }) - m.m.runTag.ClearAll() + m.runTag.ClearAll() panic(e) } - m.m.runTag.Delete(ptr) + m.runTag.Delete(ptr) }() } - m.m.PushLock(Msgq_tag_data{ + m.pushLock(MsgType_tag_data[T]{ Tag: Tag, Data: Data, }) } func (m *MsgType[T]) ClearAll() { - m.m.ClearAll() + for m.someNeedRemove.Load() != 0 { + time.Sleep(time.Millisecond) + runtime.Gosched() + } + + ul := m.lock.Lock(m.to...) + defer ul() + + var removes []*list.Element + + for el := m.funcs.Front(); el != nil; el = el.Next() { + m.someNeedRemove.Add(1) + removes = append(removes, el) + } + + if len(removes) != 0 { + m.someNeedRemove.Add(-int32(len(removes))) + for i := 0; i < len(removes); i++ { + m.funcs.Remove(removes[i]) + } + } } func (m *MsgType[T]) Pull_tag_chan(key string, size int, ctx context.Context) <-chan T { var ch = make(chan T, size) - m.m.Register(func(data any) bool { - if d, ok := data.(Msgq_tag_data); ok && d.Tag == key { + m.register(func(data MsgType_tag_data[T]) bool { + if data.Tag == key { select { case <-ctx.Done(): close(ch) return true default: - if len(ch) == size { + for len(ch) != 0 { <-ch } - ch <- d.Data.(T) - return false + ch <- data.Data } } return false @@ -359,20 +458,18 @@ func (m *MsgType[T]) Pull_tag_chan(key string, size int, ctx context.Context) <- } func (m *MsgType[T]) Pull_tag_only(key string, f func(T) (disable bool)) { - m.m.Register(func(data any) (disable bool) { - if d, ok := data.(Msgq_tag_data); ok && d.Tag == key { - return f(d.Data.(T)) + m.register(func(data MsgType_tag_data[T]) (disable bool) { + if data.Tag == key { + return f(data.Data) } return false }) } func (m *MsgType[T]) Pull_tag(func_map map[string]func(T) (disable bool)) { - m.m.Register(func(data any) (disable bool) { - if d, ok := data.(Msgq_tag_data); ok { - if f, ok := func_map[d.Tag]; ok { - return f(d.Data.(T)) - } + m.register(func(data MsgType_tag_data[T]) (disable bool) { + if f, ok := func_map[data.Tag]; ok { + return f(data.Data) } return false }) @@ -381,13 +478,13 @@ func (m *MsgType[T]) Pull_tag(func_map map[string]func(T) (disable bool)) { func (m *MsgType[T]) Pull_tag_async_only(key string, f func(T) (disable bool)) { var disable = signal.Init() - m.m.Register_front(func(data any) bool { + m.register_front(func(data MsgType_tag_data[T]) bool { if !disable.Islive() { return true } - if d, ok := data.(Msgq_tag_data); ok && d.Tag == key { + if data.Tag == key { go func() { - if f(d.Data.(T)) { + if f(data.Data) { disable.Done() } }() @@ -399,18 +496,16 @@ func (m *MsgType[T]) Pull_tag_async_only(key string, f func(T) (disable bool)) { func (m *MsgType[T]) Pull_tag_async(func_map map[string]func(T) (disable bool)) { var disable = signal.Init() - m.m.Register_front(func(data any) bool { + m.register_front(func(data MsgType_tag_data[T]) bool { if !disable.Islive() { return true } - if d, ok := data.(Msgq_tag_data); ok { - if f, ok := func_map[d.Tag]; ok { - go func() { - if f(d.Data.(T)) { - disable.Done() - } - }() - } + if f, ok := func_map[data.Tag]; ok { + go func() { + if f(data.Data) { + disable.Done() + } + }() } return false }) diff --git a/msgq/Msgq_test.go b/msgq/Msgq_test.go index 2a4f322..7cf06b5 100644 --- a/msgq/Msgq_test.go +++ b/msgq/Msgq_test.go @@ -141,7 +141,7 @@ func BenchmarkXxx(b *testing.B) { func TestPushLock(t *testing.T) { defer func() { - if e := recover(); e.(string) != "timeout to wait rlock, rlc:1" { + if e := recover(); e != nil { t.Fatal(e) } }() @@ -171,6 +171,9 @@ func Test_Pull_tag_chan(t *testing.T) { for i := 0; i < 5; i++ { mq.Push_tag(`a`, i) } + if len(ch) != 1 { + t.Fatal() + } var o = 0 for s := true; s; { select { @@ -180,7 +183,7 @@ func Test_Pull_tag_chan(t *testing.T) { s = false } } - if o != 7 { + if o != 4 { t.Fatal() } select { @@ -200,6 +203,16 @@ func Test_Pull_tag_chan(t *testing.T) { } } +func Test_Pull_tag_chan2(t *testing.T) { + mq := New() + + mq.Pull_tag_chan(`a`, 1, context.Background()) + go func() { + mq.PushLock_tag(`a`, nil) + mq.PushLock_tag(`a`, nil) + }() +} + func Test_msgq1(t *testing.T) { mq := New() c := make(chan time.Time, 10) diff --git a/reqf/Reqf.go b/reqf/Reqf.go index d7d73ac..3cdc79d 100644 --- a/reqf/Reqf.go +++ b/reqf/Reqf.go @@ -106,6 +106,9 @@ func (t *Req) Reqf(val Rval) error { for len(t.cancelFs) != 0 { <-t.cancelFs } + t.Respon = t.Respon[:0] + t.responBuf.Reset() + t.err = t.Reqf_1(_val) if t.err == nil || IsCancel(t.err) { break diff --git a/sync/RWMutex.go b/sync/RWMutex.go index 48bda30..5c1add8 100644 --- a/sync/RWMutex.go +++ b/sync/RWMutex.go @@ -19,22 +19,43 @@ type RWMutex struct { } func (m *RWMutex) RLock(to ...time.Duration) (unrlock func()) { + var callC atomic.Bool if len(to) > 0 { + var calls []string + for i := 1; true; i++ { + if pc, file, line, ok := runtime.Caller(i); !ok { + break + } else { + calls = append(calls, fmt.Sprintf("%s\n\t%s:%d", runtime.FuncForPC(pc).Name(), file, line)) + } + } c := time.Now() - for m.rlc.Load() < ulock { + for m.rlc.Load() < ulock || m.cul.Load() != m.oll.Load() { if time.Since(c) > to[0] { panic(fmt.Sprintf("timeout to wait lock, rlc:%d", m.rlc.Load())) } runtime.Gosched() } + c = time.Now() + go func() { + for !callC.Load() { + if time.Since(c) > to[0] { + panicS := fmt.Sprintf("timeout to run rlock %v > %v\n", time.Since(c), to[0]) + for i := 0; i < len(calls); i++ { + panicS += fmt.Sprintf("%s\n", calls[i]) + } + panic(panicS) + } + runtime.Gosched() + } + }() } else { - for m.rlc.Load() < ulock { + for m.rlc.Load() < ulock || m.cul.Load() != m.oll.Load() { time.Sleep(time.Millisecond) runtime.Gosched() } } m.rlc.Add(1) - var callC atomic.Bool return func() { if !callC.CompareAndSwap(false, true) { panic("had unrlock") @@ -45,7 +66,16 @@ func (m *RWMutex) RLock(to ...time.Duration) (unrlock func()) { func (m *RWMutex) Lock(to ...time.Duration) (unlock func()) { lockid := m.cul.Add(1) + var callC atomic.Bool if len(to) > 0 { + var calls []string + for i := 1; true; i++ { + if pc, file, line, ok := runtime.Caller(i); !ok { + break + } else { + calls = append(calls, fmt.Sprintf("%s\n\t%s:%d", runtime.FuncForPC(pc).Name(), file, line)) + } + } c := time.Now() for m.rlc.Load() > ulock { if time.Since(c) > to[0] { @@ -60,8 +90,21 @@ func (m *RWMutex) Lock(to ...time.Duration) (unlock func()) { runtime.Gosched() } if !m.rlc.CompareAndSwap(ulock, lock) { - panic("csa error, bug") + panic(fmt.Sprintf("csa error, rlc:%d", m.rlc.Load())) } + c = time.Now() + go func() { + for !callC.Load() { + if time.Since(c) > to[0] { + panicS := fmt.Sprintf("timeout to run lock %v > %v\n", time.Since(c), to[0]) + for i := 0; i < len(calls); i++ { + panicS += fmt.Sprintf("call by %s\n", calls[i]) + } + panic(panicS) + } + runtime.Gosched() + } + }() } else { for m.rlc.Load() > ulock { time.Sleep(time.Millisecond) @@ -72,10 +115,9 @@ func (m *RWMutex) Lock(to ...time.Duration) (unlock func()) { runtime.Gosched() } if !m.rlc.CompareAndSwap(ulock, lock) { - panic("csa error, bug") + panic(fmt.Sprintf("csa error, rlc:%d", m.rlc.Load())) } } - var callC atomic.Bool return func() { if !callC.CompareAndSwap(false, true) { panic("had unlock") diff --git a/websocket/Client.go b/websocket/Client.go index 4d3d646..bad06e4 100644 --- a/websocket/Client.go +++ b/websocket/Client.go @@ -176,6 +176,7 @@ func (o *Client) Handle() (*msgq.MsgType[*WsMsg], error) { if wm.Type == 0 { wm.Type = websocket.TextMessage } + c.SetWriteDeadline(time.Now().Add(time.Duration(o.TO * int(time.Millisecond)))) if err := c.WriteMessage(wm.Type, wm.Msg); err != nil { o.error(err) o.msg.ClearAll()