import (
"io"
- "time"
+ "log"
"os"
- "log"
+ "time"
- p "github.com/qydysky/part"
- m "github.com/qydysky/part/msgq"
- s "github.com/qydysky/part/signal"
+ p "github.com/qydysky/part"
+ m "github.com/qydysky/part/msgq"
+ s "github.com/qydysky/part/signal"
)
var (
- On = struct{}{}
+ On = struct{}{}
)
type Log_interface struct {
- MQ *m.Msgq
- Config
+ MQ *m.Msgq
+ Config
}
type Config struct {
- File string
- Stdout bool
- Prefix_string map[string]struct{}
- Base_string []interface{}
+ File string
+ Stdout bool
+ Prefix_string map[string]struct{}
+ Base_string []interface{}
}
type Msg_item struct {
- Prefix string
- Msg_obj []interface{}
- Config
+ Prefix string
+ Msg_obj []interface{}
+ Config
}
-//New 初始化
+// New 初始化
func New(c Config) (o *Log_interface) {
- o = &Log_interface{
- Config:c,
- }
- if c.File != `` {p.File().NewPath(c.File)}
-
- o.MQ = m.New(100)
- o.MQ.Pull_tag(map[string]func(interface{})(bool){
- `block`:func(data interface{})(bool){
- if v,ok := data.(*s.Signal);ok{v.Done()}
- return false
- },
- `L`:func(data interface{})(bool){
- msg := data.(Msg_item)
- var showObj = []io.Writer{}
- if msg.Stdout {showObj = append(showObj, os.Stdout)}
- if msg.File != `` {
- file, err := os.OpenFile(msg.File, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666)
- if err == nil {
- showObj = append(showObj, file)
- defer file.Close()
- }else{log.Println(err)}
- }
- log.New(io.MultiWriter(showObj...),
- msg.Prefix,
- log.Ldate|log.Ltime).Println(msg.Msg_obj...)
- return false
- },
- })
- {//启动阻塞
- b := s.Init()
- for b.Islive() {
- o.MQ.Push_tag(`block`,b)
- time.Sleep(time.Duration(20)*time.Millisecond)
- }
- }
- return
+ o = &Log_interface{
+ Config: c,
+ }
+ if c.File != `` {
+ p.File().NewPath(c.File)
+ }
+
+ o.MQ = m.New()
+ o.MQ.Pull_tag(map[string]func(interface{}) bool{
+ `block`: func(data interface{}) bool {
+ if v, ok := data.(*s.Signal); ok {
+ v.Done()
+ }
+ return false
+ },
+ `L`: func(data interface{}) bool {
+ msg := data.(Msg_item)
+ var showObj = []io.Writer{}
+ if msg.Stdout {
+ showObj = append(showObj, os.Stdout)
+ }
+ if msg.File != `` {
+ file, err := os.OpenFile(msg.File, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666)
+ if err == nil {
+ showObj = append(showObj, file)
+ defer file.Close()
+ } else {
+ log.Println(err)
+ }
+ }
+ log.New(io.MultiWriter(showObj...),
+ msg.Prefix,
+ log.Ldate|log.Ltime).Println(msg.Msg_obj...)
+ return false
+ },
+ })
+ { //启动阻塞
+ b := s.Init()
+ for b.Islive() {
+ o.MQ.Push_tag(`block`, b)
+ time.Sleep(time.Duration(20) * time.Millisecond)
+ }
+ }
+ return
}
-//
-func Copy(i *Log_interface)(o *Log_interface){
- o = &Log_interface{
- Config:(*i).Config,
- MQ:(*i).MQ,
- }
- {//启动阻塞
- b := s.Init()
- for b.Islive() {
- o.MQ.Push_tag(`block`,b)
- time.Sleep(time.Duration(20)*time.Millisecond)
- }
- }
- return
+func Copy(i *Log_interface) (o *Log_interface) {
+ o = &Log_interface{
+ Config: (*i).Config,
+ MQ: (*i).MQ,
+ }
+ { //启动阻塞
+ b := s.Init()
+ for b.Islive() {
+ o.MQ.Push_tag(`block`, b)
+ time.Sleep(time.Duration(20) * time.Millisecond)
+ }
+ }
+ return
}
-//Level 设置之后日志等级
+// Level 设置之后日志等级
func (I *Log_interface) Level(log map[string]struct{}) (O *Log_interface) {
- O = Copy(I)
- for k,_ := range O.Prefix_string {
- if _,ok := log[k];!ok{delete(O.Prefix_string,k)}
- }
- return
+ O = Copy(I)
+ for k, _ := range O.Prefix_string {
+ if _, ok := log[k]; !ok {
+ delete(O.Prefix_string, k)
+ }
+ }
+ return
}
-//Open 日志不显示
+// Open 日志不显示
func (I *Log_interface) Log_show_control(show bool) (O *Log_interface) {
- O = Copy(I)
- //
- O.Block(100)
- O.Stdout = show
- return
+ O = Copy(I)
+ //
+ O.Block(100)
+ O.Stdout = show
+ return
}
-//Open 日志输出至文件
+// Open 日志输出至文件
func (I *Log_interface) Log_to_file(fileP string) (O *Log_interface) {
- O=I
- //
- O.Block(100)
- O.File = fileP
- if O.File != `` {p.File().NewPath(O.File)}
- return
+ O = I
+ //
+ O.Block(100)
+ O.File = fileP
+ if O.File != `` {
+ p.File().NewPath(O.File)
+ }
+ return
}
-//Block 阻塞直到本轮日志输出完毕
+// Block 阻塞直到本轮日志输出完毕
func (I *Log_interface) Block(timeout int) (O *Log_interface) {
- O=I
- b := s.Init()
- O.MQ.Push_tag(`block`,b)
- b.Wait()
- return
+ O = I
+ b := s.Init()
+ O.MQ.Push_tag(`block`, b)
+ b.Wait()
+ return
}
-//日志等级
-//Base 追加到后续输出
+// 日志等级
+// Base 追加到后续输出
func (I *Log_interface) Base(i ...interface{}) (O *Log_interface) {
- O=Copy(I)
- O.Base_string = i
- return
+ O = Copy(I)
+ O.Base_string = i
+ return
}
func (I *Log_interface) Base_add(i ...interface{}) (O *Log_interface) {
- O=Copy(I)
- O.Base_string = append(O.Base_string, i...)
- return
+ O = Copy(I)
+ O.Base_string = append(O.Base_string, i...)
+ return
}
func (I *Log_interface) L(prefix string, i ...interface{}) (O *Log_interface) {
- O=I
- if _,ok := O.Prefix_string[prefix];!ok{return}
-
- O.MQ.Push_tag(`L`,Msg_item{
- Prefix:prefix,
- Msg_obj:append(O.Base_string, i),
- Config:O.Config,
- })
- return
-}
\ No newline at end of file
+ O = I
+ if _, ok := O.Prefix_string[prefix]; !ok {
+ return
+ }
+
+ O.MQ.Push_tag(`L`, Msg_item{
+ Prefix: prefix,
+ Msg_obj: append(O.Base_string, i),
+ Config: O.Config,
+ })
+ return
+}
package part
import (
- "sync"
- "time"
- "runtime"
"container/list"
+ "sync"
)
type Msgq struct {
- data_list *list.List
- wait_push chan struct{}
- max_data_mun int
- ticker *time.Ticker
- sig uint64
+ funcs *list.List
sync.RWMutex
}
-type Msgq_item struct {
- data interface{}
- sig uint64
-}
-
-type FuncMap map[string]func(interface{})(bool)
-
-func New(want_max_data_mun int) (*Msgq) {
+func New() *Msgq {
m := new(Msgq)
- (*m).wait_push = make(chan struct{},10)
- (*m).data_list = list.New()
- (*m).max_data_mun = want_max_data_mun
- (*m).ticker = time.NewTicker(time.Duration(25)*time.Millisecond)
+ m.funcs = list.New()
return m
}
-func (m *Msgq) Push(msg interface{}) {
+func (m *Msgq) Register(f func(any) (disable bool)) {
m.Lock()
- defer m.Unlock()
- m.data_list.PushBack(Msgq_item{
- data:msg,
- sig:m.get_sig(),
- })
- if m.data_list.Len() > m.max_data_mun {m.data_list.Remove(m.data_list.Front())}
-
- var pull_num int
- for len(m.wait_push) == 0 {
- pull_num += 1
- m.wait_push <- struct{}{}
- }
- if pull_num < 1 {<- m.ticker.C}
- runtime.Gosched()
- select {
- case <- m.wait_push:
- case <- m.ticker.C:
- }
+ m.funcs.PushBack(f)
+ m.Unlock()
}
-func (m *Msgq) Pull(old_sig uint64) (data interface{},sig uint64) {
- for old_sig == m.Sig() {
- select {
- case <- m.wait_push:
- case <- m.ticker.C:
- }
- }
+func (m *Msgq) Push(msg any) {
m.RLock()
- defer m.RUnlock()
-
- if int(m.Sig() - old_sig) > m.max_data_mun {return nil,m.Sig()}
-
- for el := m.data_list.Front();el != nil;el = el.Next() {
- if old_sig < el.Value.(Msgq_item).sig {
- data = el.Value.(Msgq_item).data
- sig = el.Value.(Msgq_item).sig
- return
+ for el := m.funcs.Front(); el != nil; el = el.Next() {
+ if disable := el.Value.(func(any) bool)(msg); disable {
+ m.funcs.Remove(el)
}
}
- return
-}
-
-func (m *Msgq) Sig() (sig uint64) {
- if el := m.data_list.Back();el == nil {
- sig = 0
- } else {
- sig = el.Value.(Msgq_item).sig
- }
- return
-}
-
-func (m *Msgq) get_sig() (sig uint64) {
- m.sig += 1
- return m.sig
+ m.RUnlock()
}
type Msgq_tag_data struct {
- Tag string
+ Tag string
Data interface{}
}
-func (m *Msgq) Push_tag(Tag string,Data interface{}) {
+func (m *Msgq) Push_tag(Tag string, Data interface{}) {
m.Push(Msgq_tag_data{
- Tag:Tag,
- Data:Data,
+ Tag: Tag,
+ Data: Data,
})
}
-func (m *Msgq) Pull_tag(func_map map[string]func(interface{})(bool)) {
- go func(){
- var (
- sig = m.Sig()
- data interface{}
- )
- for {
- data,sig = m.Pull(sig)
- if d,ok := data.(Msgq_tag_data);!ok{
- if f,ok := func_map[`Error`];ok{
- if f(d.Data) {break}
- }
- } else {
- if f,ok := func_map[d.Tag];ok{
- if f(d.Data) {break}
- }
+func (m *Msgq) Pull_tag(func_map map[string]func(any) (disable bool)) {
+ m.Register(func(data any) (disable bool) {
+ if d, ok := data.(Msgq_tag_data); !ok {
+ if f, ok := func_map[`Error`]; ok {
+ return f(d.Data)
+ }
+ } else {
+ if f, ok := func_map[d.Tag]; ok {
+ return f(d.Data)
}
}
- }()
-}
\ No newline at end of file
+ return false
+ })
+}
import (
"fmt"
- "net/http"
_ "net/http/pprof"
"testing"
"time"
-
- sys "github.com/qydysky/part/sys"
)
type test_item struct {
data string
}
-func Test_msgq(t *testing.T) {
+// func Test_msgq(t *testing.T) {
- mq := New(5)
- mun := 100000
- mun_c := make(chan bool, mun)
- mun_s := make(chan bool, mun)
+// mq := New(5)
+// mun := 100000
+// mun_c := make(chan bool, mun)
+// mun_s := make(chan bool, mun)
- var e int
+// var e int
- sig := mq.Sig()
- for i := 0; i < mun; i++ {
- go func() {
- mun_c <- true
- data, t0 := mq.Pull(sig)
- if o, ok := data.(string); o != `mmm` || !ok {
- e = 1
- }
- data1, _ := mq.Pull(t0)
- if o, ok := data1.(string); o != `mm1` || !ok {
- e = 2
- }
- mun_s <- true
- }()
- }
+// sig := mq.Sig()
+// for i := 0; i < mun; i++ {
+// go func() {
+// mun_c <- true
+// data, t0 := mq.Pull(sig)
+// if o, ok := data.(string); o != `mmm` || !ok {
+// e = 1
+// }
+// data1, _ := mq.Pull(t0)
+// if o, ok := data1.(string); o != `mm1` || !ok {
+// e = 2
+// }
+// mun_s <- true
+// }()
+// }
- for len(mun_c) != mun {
- t.Log(`>`, len(mun_c))
- sys.Sys().Timeoutf(1)
- }
- t.Log(`>`, len(mun_c))
+// for len(mun_c) != mun {
+// t.Log(`>`, len(mun_c))
+// sys.Sys().Timeoutf(1)
+// }
+// t.Log(`>`, len(mun_c))
- t.Log(`push mmm`)
- mq.Push(`mmm`)
- t.Log(`push mm1`)
- mq.Push(`mm1`)
+// t.Log(`push mmm`)
+// mq.Push(`mmm`)
+// t.Log(`push mm1`)
+// mq.Push(`mm1`)
- for len(mun_s) != mun {
- t.Log(`<`, len(mun_s))
- sys.Sys().Timeoutf(1)
- }
- t.Log(`<`, len(mun_s))
+// for len(mun_s) != mun {
+// t.Log(`<`, len(mun_s))
+// sys.Sys().Timeoutf(1)
+// }
+// t.Log(`<`, len(mun_s))
- if e != 0 {
- t.Error(e)
- }
-}
+// if e != 0 {
+// t.Error(e)
+// }
+// }
-func Test_msgq2(t *testing.T) {
- mq := New(5)
+// func Test_msgq2(t *testing.T) {
+// mq := New(5)
- mun_c := make(chan bool, 100)
- go func() {
- var (
- sig = mq.Sig()
- data interface{}
- )
- for {
- data, sig = mq.Pull(sig)
- if data.(test_item).data != `aa1` {
- t.Error(`1`)
- }
- mun_c <- true
- }
- }()
- go func() {
- var (
- sig = mq.Sig()
- data interface{}
- )
- for {
- data, sig = mq.Pull(sig)
- if data.(test_item).data != `aa1` {
- t.Error(`2`)
- }
- mun_c <- true
- }
- }()
- go func() {
- var (
- sig = mq.Sig()
- data interface{}
- )
- for {
- data, sig = mq.Pull(sig)
- if data.(test_item).data != `aa1` {
- t.Error(`3`)
- }
- mun_c <- true
- }
- }()
- var fin_turn = 0
- t.Log(`start`)
- time.Sleep(time.Second)
- for fin_turn < 1000000 {
- mq.Push(test_item{
- data: `aa1`,
- })
- <-mun_c
- <-mun_c
- <-mun_c
- fin_turn += 1
- fmt.Print("\r", fin_turn)
- }
- t.Log(`fin`)
-}
+// mun_c := make(chan bool, 100)
+// go func() {
+// var (
+// sig = mq.Sig()
+// data interface{}
+// )
+// for {
+// data, sig = mq.Pull(sig)
+// if data.(test_item).data != `aa1` {
+// t.Error(`1`)
+// }
+// mun_c <- true
+// }
+// }()
+// go func() {
+// var (
+// sig = mq.Sig()
+// data interface{}
+// )
+// for {
+// data, sig = mq.Pull(sig)
+// if data.(test_item).data != `aa1` {
+// t.Error(`2`)
+// }
+// mun_c <- true
+// }
+// }()
+// go func() {
+// var (
+// sig = mq.Sig()
+// data interface{}
+// )
+// for {
+// data, sig = mq.Pull(sig)
+// if data.(test_item).data != `aa1` {
+// t.Error(`3`)
+// }
+// mun_c <- true
+// }
+// }()
+// var fin_turn = 0
+// t.Log(`start`)
+// time.Sleep(time.Second)
+// for fin_turn < 1000000 {
+// mq.Push(test_item{
+// data: `aa1`,
+// })
+// <-mun_c
+// <-mun_c
+// <-mun_c
+// fin_turn += 1
+// fmt.Print("\r", fin_turn)
+// }
+// t.Log(`fin`)
+// }
func Test_msgq3(t *testing.T) {
- mq := New(100)
+ mq := New()
mun_c := make(chan int, 100)
mq.Pull_tag(map[string]func(interface{}) bool{
func Test_msgq4(t *testing.T) {
// mq := New(30)
- mq := New(3) //out of list
+ mq := New() //out of list
mun_c1 := make(chan bool, 100)
mun_c2 := make(chan bool, 100)
}
func Test_msgq5(t *testing.T) {
- mq := New(30)
+ mq := New()
mun_c1 := make(chan bool, 100)
mun_c2 := make(chan bool, 100)
t.Log(`fin`)
}
-func Test_msgq6(t *testing.T) {
- mq := New(30)
- go func() {
- http.ListenAndServe("0.0.0.0:8899", nil)
- }()
- go mq.Pull_tag(map[string]func(interface{}) bool{
- `A1`: func(data interface{}) bool {
- return false
- },
- `A2`: func(data interface{}) bool {
- if v, ok := data.(string); !ok || v != `a11` {
- t.Error(`2`)
- }
- return false
- },
- `Error`: func(data interface{}) bool {
- if data == nil {
- t.Error(`out of list`)
- }
- return false
- },
- })
+// func Test_msgq6(t *testing.T) {
+// mq := New()
+// go mq.Pull_tag(map[string]func(interface{}) bool{
+// `A1`: func(data interface{}) bool {
+// return false
+// },
+// `A2`: func(data interface{}) bool {
+// if v, ok := data.(string); !ok || v != `a11` {
+// t.Error(`2`)
+// }
+// return false
+// },
+// `Error`: func(data interface{}) bool {
+// if data == nil {
+// t.Error(`out of list`)
+// }
+// return false
+// },
+// })
- var fin_turn = 0
- t.Log(`start`)
- for fin_turn < 1000 {
- time.Sleep(time.Second)
- time.Sleep(time.Second)
- mq.Push_tag(`A1`, `a11`)
- fin_turn += 1
- fmt.Print("\r", fin_turn)
- }
- t.Log(`fin`)
-}
+// var fin_turn = 0
+// t.Log(`start`)
+// for fin_turn < 1000 {
+// time.Sleep(time.Second)
+// time.Sleep(time.Second)
+// mq.Push_tag(`A1`, `a11`)
+// fin_turn += 1
+// fmt.Print("\r", fin_turn)
+// }
+// t.Log(`fin`)
+// }
func New_server() *Server {
return &Server{
- ws_mq: mq.New(200), //收发通道
+ ws_mq: mq.New(), //收发通道
userpool: idpool.New(func() interface{} { return new(struct{}) }), //浏览器标签页池
}
}
// return false
// }
-// ws_mq.Push_tag(`send`,Uinterface{//just reply
-// Id:tmp.Id,
-// Data:tmp.Data,
-// })
-// //or
-// ws_mq.Push_tag(`send`,Uinterface{//just reply
-// Id:0,//send to all
-// Data:tmp.Data,
-// })
-// }
-// return false
-// },
-// `error`:func(data interface{})(bool){
-// log.Println(data)
-// return false
-// },
-// })
+// ws_mq.Push_tag(`send`,Uinterface{//just reply
+// Id:tmp.Id,
+// Data:tmp.Data,
+// })
+// //or
+// ws_mq.Push_tag(`send`,Uinterface{//just reply
+// Id:0,//send to all
+// Data:tmp.Data,
+// })
+// }
+// return false
+// },
+// `error`:func(data interface{})(bool){
+// log.Println(data)
+// return false
+// },
+// })
func (t *Server) Interface() *mq.Msgq {
return t.ws_mq
}