From 378800f6a9cf53a9c3217ba640a5daaf63020896 Mon Sep 17 00:00:00 2001 From: qydysky <32743305+qydysky@users.noreply.github.com> Date: Thu, 19 Jan 2023 00:46:36 +0800 Subject: [PATCH] fix --- log/Log.go | 231 ++++++++++++++++++++------------------ msgq/Msgq.go | 120 +++++--------------- msgq/Msgq_test.go | 264 ++++++++++++++++++++++---------------------- websocket/Server.go | 36 +++--- 4 files changed, 297 insertions(+), 354 deletions(-) diff --git a/log/Log.go b/log/Log.go index 4c9cd40..cefc117 100644 --- a/log/Log.go +++ b/log/Log.go @@ -2,151 +2,164 @@ package part import ( "io" - "time" + "log" "os" - "log" + "time" - p "github.com/qydysky/part" - m "github.com/qydysky/part/msgq" - s "github.com/qydysky/part/signal" + p "github.com/qydysky/part" + m "github.com/qydysky/part/msgq" + s "github.com/qydysky/part/signal" ) var ( - On = struct{}{} + On = struct{}{} ) type Log_interface struct { - MQ *m.Msgq - Config + MQ *m.Msgq + Config } type Config struct { - File string - Stdout bool - Prefix_string map[string]struct{} - Base_string []interface{} + File string + Stdout bool + Prefix_string map[string]struct{} + Base_string []interface{} } type Msg_item struct { - Prefix string - Msg_obj []interface{} - Config + Prefix string + Msg_obj []interface{} + Config } -//New 初始化 +// New 初始化 func New(c Config) (o *Log_interface) { - o = &Log_interface{ - Config:c, - } - if c.File != `` {p.File().NewPath(c.File)} - - o.MQ = m.New(100) - o.MQ.Pull_tag(map[string]func(interface{})(bool){ - `block`:func(data interface{})(bool){ - if v,ok := data.(*s.Signal);ok{v.Done()} - return false - }, - `L`:func(data interface{})(bool){ - msg := data.(Msg_item) - var showObj = []io.Writer{} - if msg.Stdout {showObj = append(showObj, os.Stdout)} - if msg.File != `` { - file, err := os.OpenFile(msg.File, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666) - if err == nil { - showObj = append(showObj, file) - defer file.Close() - }else{log.Println(err)} - } - log.New(io.MultiWriter(showObj...), - msg.Prefix, - log.Ldate|log.Ltime).Println(msg.Msg_obj...) - return false - }, - }) - {//启动阻塞 - b := s.Init() - for b.Islive() { - o.MQ.Push_tag(`block`,b) - time.Sleep(time.Duration(20)*time.Millisecond) - } - } - return + o = &Log_interface{ + Config: c, + } + if c.File != `` { + p.File().NewPath(c.File) + } + + o.MQ = m.New() + o.MQ.Pull_tag(map[string]func(interface{}) bool{ + `block`: func(data interface{}) bool { + if v, ok := data.(*s.Signal); ok { + v.Done() + } + return false + }, + `L`: func(data interface{}) bool { + msg := data.(Msg_item) + var showObj = []io.Writer{} + if msg.Stdout { + showObj = append(showObj, os.Stdout) + } + if msg.File != `` { + file, err := os.OpenFile(msg.File, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666) + if err == nil { + showObj = append(showObj, file) + defer file.Close() + } else { + log.Println(err) + } + } + log.New(io.MultiWriter(showObj...), + msg.Prefix, + log.Ldate|log.Ltime).Println(msg.Msg_obj...) + return false + }, + }) + { //启动阻塞 + b := s.Init() + for b.Islive() { + o.MQ.Push_tag(`block`, b) + time.Sleep(time.Duration(20) * time.Millisecond) + } + } + return } -// -func Copy(i *Log_interface)(o *Log_interface){ - o = &Log_interface{ - Config:(*i).Config, - MQ:(*i).MQ, - } - {//启动阻塞 - b := s.Init() - for b.Islive() { - o.MQ.Push_tag(`block`,b) - time.Sleep(time.Duration(20)*time.Millisecond) - } - } - return +func Copy(i *Log_interface) (o *Log_interface) { + o = &Log_interface{ + Config: (*i).Config, + MQ: (*i).MQ, + } + { //启动阻塞 + b := s.Init() + for b.Islive() { + o.MQ.Push_tag(`block`, b) + time.Sleep(time.Duration(20) * time.Millisecond) + } + } + return } -//Level 设置之后日志等级 +// Level 设置之后日志等级 func (I *Log_interface) Level(log map[string]struct{}) (O *Log_interface) { - O = Copy(I) - for k,_ := range O.Prefix_string { - if _,ok := log[k];!ok{delete(O.Prefix_string,k)} - } - return + O = Copy(I) + for k, _ := range O.Prefix_string { + if _, ok := log[k]; !ok { + delete(O.Prefix_string, k) + } + } + return } -//Open 日志不显示 +// Open 日志不显示 func (I *Log_interface) Log_show_control(show bool) (O *Log_interface) { - O = Copy(I) - // - O.Block(100) - O.Stdout = show - return + O = Copy(I) + // + O.Block(100) + O.Stdout = show + return } -//Open 日志输出至文件 +// Open 日志输出至文件 func (I *Log_interface) Log_to_file(fileP string) (O *Log_interface) { - O=I - // - O.Block(100) - O.File = fileP - if O.File != `` {p.File().NewPath(O.File)} - return + O = I + // + O.Block(100) + O.File = fileP + if O.File != `` { + p.File().NewPath(O.File) + } + return } -//Block 阻塞直到本轮日志输出完毕 +// Block 阻塞直到本轮日志输出完毕 func (I *Log_interface) Block(timeout int) (O *Log_interface) { - O=I - b := s.Init() - O.MQ.Push_tag(`block`,b) - b.Wait() - return + O = I + b := s.Init() + O.MQ.Push_tag(`block`, b) + b.Wait() + return } -//日志等级 -//Base 追加到后续输出 +// 日志等级 +// Base 追加到后续输出 func (I *Log_interface) Base(i ...interface{}) (O *Log_interface) { - O=Copy(I) - O.Base_string = i - return + O = Copy(I) + O.Base_string = i + return } func (I *Log_interface) Base_add(i ...interface{}) (O *Log_interface) { - O=Copy(I) - O.Base_string = append(O.Base_string, i...) - return + O = Copy(I) + O.Base_string = append(O.Base_string, i...) + return } func (I *Log_interface) L(prefix string, i ...interface{}) (O *Log_interface) { - O=I - if _,ok := O.Prefix_string[prefix];!ok{return} - - O.MQ.Push_tag(`L`,Msg_item{ - Prefix:prefix, - Msg_obj:append(O.Base_string, i), - Config:O.Config, - }) - return -} \ No newline at end of file + O = I + if _, ok := O.Prefix_string[prefix]; !ok { + return + } + + O.MQ.Push_tag(`L`, Msg_item{ + Prefix: prefix, + Msg_obj: append(O.Base_string, i), + Config: O.Config, + }) + return +} diff --git a/msgq/Msgq.go b/msgq/Msgq.go index d9736d5..b30eca4 100644 --- a/msgq/Msgq.go +++ b/msgq/Msgq.go @@ -1,124 +1,60 @@ package part import ( - "sync" - "time" - "runtime" "container/list" + "sync" ) type Msgq struct { - data_list *list.List - wait_push chan struct{} - max_data_mun int - ticker *time.Ticker - sig uint64 + funcs *list.List sync.RWMutex } -type Msgq_item struct { - data interface{} - sig uint64 -} - -type FuncMap map[string]func(interface{})(bool) - -func New(want_max_data_mun int) (*Msgq) { +func New() *Msgq { m := new(Msgq) - (*m).wait_push = make(chan struct{},10) - (*m).data_list = list.New() - (*m).max_data_mun = want_max_data_mun - (*m).ticker = time.NewTicker(time.Duration(25)*time.Millisecond) + m.funcs = list.New() return m } -func (m *Msgq) Push(msg interface{}) { +func (m *Msgq) Register(f func(any) (disable bool)) { m.Lock() - defer m.Unlock() - m.data_list.PushBack(Msgq_item{ - data:msg, - sig:m.get_sig(), - }) - if m.data_list.Len() > m.max_data_mun {m.data_list.Remove(m.data_list.Front())} - - var pull_num int - for len(m.wait_push) == 0 { - pull_num += 1 - m.wait_push <- struct{}{} - } - if pull_num < 1 {<- m.ticker.C} - runtime.Gosched() - select { - case <- m.wait_push: - case <- m.ticker.C: - } + m.funcs.PushBack(f) + m.Unlock() } -func (m *Msgq) Pull(old_sig uint64) (data interface{},sig uint64) { - for old_sig == m.Sig() { - select { - case <- m.wait_push: - case <- m.ticker.C: - } - } +func (m *Msgq) Push(msg any) { m.RLock() - defer m.RUnlock() - - if int(m.Sig() - old_sig) > m.max_data_mun {return nil,m.Sig()} - - for el := m.data_list.Front();el != nil;el = el.Next() { - if old_sig < el.Value.(Msgq_item).sig { - data = el.Value.(Msgq_item).data - sig = el.Value.(Msgq_item).sig - return + for el := m.funcs.Front(); el != nil; el = el.Next() { + if disable := el.Value.(func(any) bool)(msg); disable { + m.funcs.Remove(el) } } - return -} - -func (m *Msgq) Sig() (sig uint64) { - if el := m.data_list.Back();el == nil { - sig = 0 - } else { - sig = el.Value.(Msgq_item).sig - } - return -} - -func (m *Msgq) get_sig() (sig uint64) { - m.sig += 1 - return m.sig + m.RUnlock() } type Msgq_tag_data struct { - Tag string + Tag string Data interface{} } -func (m *Msgq) Push_tag(Tag string,Data interface{}) { +func (m *Msgq) Push_tag(Tag string, Data interface{}) { m.Push(Msgq_tag_data{ - Tag:Tag, - Data:Data, + Tag: Tag, + Data: Data, }) } -func (m *Msgq) Pull_tag(func_map map[string]func(interface{})(bool)) { - go func(){ - var ( - sig = m.Sig() - data interface{} - ) - for { - data,sig = m.Pull(sig) - if d,ok := data.(Msgq_tag_data);!ok{ - if f,ok := func_map[`Error`];ok{ - if f(d.Data) {break} - } - } else { - if f,ok := func_map[d.Tag];ok{ - if f(d.Data) {break} - } +func (m *Msgq) Pull_tag(func_map map[string]func(any) (disable bool)) { + m.Register(func(data any) (disable bool) { + if d, ok := data.(Msgq_tag_data); !ok { + if f, ok := func_map[`Error`]; ok { + return f(d.Data) + } + } else { + if f, ok := func_map[d.Tag]; ok { + return f(d.Data) } } - }() -} \ No newline at end of file + return false + }) +} diff --git a/msgq/Msgq_test.go b/msgq/Msgq_test.go index b2952d3..08846c4 100644 --- a/msgq/Msgq_test.go +++ b/msgq/Msgq_test.go @@ -2,126 +2,123 @@ package part import ( "fmt" - "net/http" _ "net/http/pprof" "testing" "time" - - sys "github.com/qydysky/part/sys" ) type test_item struct { data string } -func Test_msgq(t *testing.T) { +// func Test_msgq(t *testing.T) { - mq := New(5) - mun := 100000 - mun_c := make(chan bool, mun) - mun_s := make(chan bool, mun) +// mq := New(5) +// mun := 100000 +// mun_c := make(chan bool, mun) +// mun_s := make(chan bool, mun) - var e int +// var e int - sig := mq.Sig() - for i := 0; i < mun; i++ { - go func() { - mun_c <- true - data, t0 := mq.Pull(sig) - if o, ok := data.(string); o != `mmm` || !ok { - e = 1 - } - data1, _ := mq.Pull(t0) - if o, ok := data1.(string); o != `mm1` || !ok { - e = 2 - } - mun_s <- true - }() - } +// sig := mq.Sig() +// for i := 0; i < mun; i++ { +// go func() { +// mun_c <- true +// data, t0 := mq.Pull(sig) +// if o, ok := data.(string); o != `mmm` || !ok { +// e = 1 +// } +// data1, _ := mq.Pull(t0) +// if o, ok := data1.(string); o != `mm1` || !ok { +// e = 2 +// } +// mun_s <- true +// }() +// } - for len(mun_c) != mun { - t.Log(`>`, len(mun_c)) - sys.Sys().Timeoutf(1) - } - t.Log(`>`, len(mun_c)) +// for len(mun_c) != mun { +// t.Log(`>`, len(mun_c)) +// sys.Sys().Timeoutf(1) +// } +// t.Log(`>`, len(mun_c)) - t.Log(`push mmm`) - mq.Push(`mmm`) - t.Log(`push mm1`) - mq.Push(`mm1`) +// t.Log(`push mmm`) +// mq.Push(`mmm`) +// t.Log(`push mm1`) +// mq.Push(`mm1`) - for len(mun_s) != mun { - t.Log(`<`, len(mun_s)) - sys.Sys().Timeoutf(1) - } - t.Log(`<`, len(mun_s)) +// for len(mun_s) != mun { +// t.Log(`<`, len(mun_s)) +// sys.Sys().Timeoutf(1) +// } +// t.Log(`<`, len(mun_s)) - if e != 0 { - t.Error(e) - } -} +// if e != 0 { +// t.Error(e) +// } +// } -func Test_msgq2(t *testing.T) { - mq := New(5) +// func Test_msgq2(t *testing.T) { +// mq := New(5) - mun_c := make(chan bool, 100) - go func() { - var ( - sig = mq.Sig() - data interface{} - ) - for { - data, sig = mq.Pull(sig) - if data.(test_item).data != `aa1` { - t.Error(`1`) - } - mun_c <- true - } - }() - go func() { - var ( - sig = mq.Sig() - data interface{} - ) - for { - data, sig = mq.Pull(sig) - if data.(test_item).data != `aa1` { - t.Error(`2`) - } - mun_c <- true - } - }() - go func() { - var ( - sig = mq.Sig() - data interface{} - ) - for { - data, sig = mq.Pull(sig) - if data.(test_item).data != `aa1` { - t.Error(`3`) - } - mun_c <- true - } - }() - var fin_turn = 0 - t.Log(`start`) - time.Sleep(time.Second) - for fin_turn < 1000000 { - mq.Push(test_item{ - data: `aa1`, - }) - <-mun_c - <-mun_c - <-mun_c - fin_turn += 1 - fmt.Print("\r", fin_turn) - } - t.Log(`fin`) -} +// mun_c := make(chan bool, 100) +// go func() { +// var ( +// sig = mq.Sig() +// data interface{} +// ) +// for { +// data, sig = mq.Pull(sig) +// if data.(test_item).data != `aa1` { +// t.Error(`1`) +// } +// mun_c <- true +// } +// }() +// go func() { +// var ( +// sig = mq.Sig() +// data interface{} +// ) +// for { +// data, sig = mq.Pull(sig) +// if data.(test_item).data != `aa1` { +// t.Error(`2`) +// } +// mun_c <- true +// } +// }() +// go func() { +// var ( +// sig = mq.Sig() +// data interface{} +// ) +// for { +// data, sig = mq.Pull(sig) +// if data.(test_item).data != `aa1` { +// t.Error(`3`) +// } +// mun_c <- true +// } +// }() +// var fin_turn = 0 +// t.Log(`start`) +// time.Sleep(time.Second) +// for fin_turn < 1000000 { +// mq.Push(test_item{ +// data: `aa1`, +// }) +// <-mun_c +// <-mun_c +// <-mun_c +// fin_turn += 1 +// fmt.Print("\r", fin_turn) +// } +// t.Log(`fin`) +// } func Test_msgq3(t *testing.T) { - mq := New(100) + mq := New() mun_c := make(chan int, 100) mq.Pull_tag(map[string]func(interface{}) bool{ @@ -149,7 +146,7 @@ func Test_msgq3(t *testing.T) { func Test_msgq4(t *testing.T) { // mq := New(30) - mq := New(3) //out of list + mq := New() //out of list mun_c1 := make(chan bool, 100) mun_c2 := make(chan bool, 100) @@ -214,7 +211,7 @@ func Test_msgq4(t *testing.T) { } func Test_msgq5(t *testing.T) { - mq := New(30) + mq := New() mun_c1 := make(chan bool, 100) mun_c2 := make(chan bool, 100) @@ -273,37 +270,34 @@ func Test_msgq5(t *testing.T) { t.Log(`fin`) } -func Test_msgq6(t *testing.T) { - mq := New(30) - go func() { - http.ListenAndServe("0.0.0.0:8899", nil) - }() - go mq.Pull_tag(map[string]func(interface{}) bool{ - `A1`: func(data interface{}) bool { - return false - }, - `A2`: func(data interface{}) bool { - if v, ok := data.(string); !ok || v != `a11` { - t.Error(`2`) - } - return false - }, - `Error`: func(data interface{}) bool { - if data == nil { - t.Error(`out of list`) - } - return false - }, - }) +// func Test_msgq6(t *testing.T) { +// mq := New() +// go mq.Pull_tag(map[string]func(interface{}) bool{ +// `A1`: func(data interface{}) bool { +// return false +// }, +// `A2`: func(data interface{}) bool { +// if v, ok := data.(string); !ok || v != `a11` { +// t.Error(`2`) +// } +// return false +// }, +// `Error`: func(data interface{}) bool { +// if data == nil { +// t.Error(`out of list`) +// } +// return false +// }, +// }) - var fin_turn = 0 - t.Log(`start`) - for fin_turn < 1000 { - time.Sleep(time.Second) - time.Sleep(time.Second) - mq.Push_tag(`A1`, `a11`) - fin_turn += 1 - fmt.Print("\r", fin_turn) - } - t.Log(`fin`) -} +// var fin_turn = 0 +// t.Log(`start`) +// for fin_turn < 1000 { +// time.Sleep(time.Second) +// time.Sleep(time.Second) +// mq.Push_tag(`A1`, `a11`) +// fin_turn += 1 +// fmt.Print("\r", fin_turn) +// } +// t.Log(`fin`) +// } diff --git a/websocket/Server.go b/websocket/Server.go index f9178bf..738938a 100644 --- a/websocket/Server.go +++ b/websocket/Server.go @@ -27,7 +27,7 @@ type uinterface struct { //内部消息 func New_server() *Server { return &Server{ - ws_mq: mq.New(200), //收发通道 + ws_mq: mq.New(), //收发通道 userpool: idpool.New(func() interface{} { return new(struct{}) }), //浏览器标签页池 } } @@ -129,23 +129,23 @@ func (t *Server) WS(w http.ResponseWriter, r *http.Request) (o chan uintptr) { // return false // } -// ws_mq.Push_tag(`send`,Uinterface{//just reply -// Id:tmp.Id, -// Data:tmp.Data, -// }) -// //or -// ws_mq.Push_tag(`send`,Uinterface{//just reply -// Id:0,//send to all -// Data:tmp.Data, -// }) -// } -// return false -// }, -// `error`:func(data interface{})(bool){ -// log.Println(data) -// return false -// }, -// }) +// ws_mq.Push_tag(`send`,Uinterface{//just reply +// Id:tmp.Id, +// Data:tmp.Data, +// }) +// //or +// ws_mq.Push_tag(`send`,Uinterface{//just reply +// Id:0,//send to all +// Data:tmp.Data, +// }) +// } +// return false +// }, +// `error`:func(data interface{})(bool){ +// log.Println(data) +// return false +// }, +// }) func (t *Server) Interface() *mq.Msgq { return t.ws_mq } -- 2.39.2