]> 127.0.0.1 Git - part/.git/commitdiff
fix v0.21.6
authorqydysky <32743305+qydysky@users.noreply.github.com>
Wed, 18 Jan 2023 16:46:36 +0000 (00:46 +0800)
committerqydysky <32743305+qydysky@users.noreply.github.com>
Wed, 18 Jan 2023 16:46:36 +0000 (00:46 +0800)
log/Log.go
msgq/Msgq.go
msgq/Msgq_test.go
websocket/Server.go

index 4c9cd403949f53089b2bb3cd899b79bccdf489db..cefc117a849d6b4d50772b61ff12f3292a54f240 100644 (file)
@@ -2,151 +2,164 @@ package part
 
 import (
        "io"
-       "time"
+       "log"
        "os"
-    "log"
+       "time"
 
-    p "github.com/qydysky/part"
-    m "github.com/qydysky/part/msgq"
-    s "github.com/qydysky/part/signal"
+       p "github.com/qydysky/part"
+       m "github.com/qydysky/part/msgq"
+       s "github.com/qydysky/part/signal"
 )
 
 var (
-    On = struct{}{}
+       On = struct{}{}
 )
 
 type Log_interface struct {
-    MQ *m.Msgq
-    Config
+       MQ *m.Msgq
+       Config
 }
 
 type Config struct {
-    File string
-    Stdout bool
-    Prefix_string map[string]struct{}
-    Base_string []interface{}
+       File          string
+       Stdout        bool
+       Prefix_string map[string]struct{}
+       Base_string   []interface{}
 }
 
 type Msg_item struct {
-    Prefix string
-    Msg_obj []interface{}
-    Config
+       Prefix  string
+       Msg_obj []interface{}
+       Config
 }
 
-//New 初始化
+// New 初始化
 func New(c Config) (o *Log_interface) {
 
-    o = &Log_interface{
-        Config:c,
-    }
-    if c.File != `` {p.File().NewPath(c.File)}
-
-    o.MQ = m.New(100)
-    o.MQ.Pull_tag(map[string]func(interface{})(bool){
-        `block`:func(data interface{})(bool){
-            if v,ok := data.(*s.Signal);ok{v.Done()}
-            return false
-        },
-        `L`:func(data interface{})(bool){
-            msg := data.(Msg_item)
-            var showObj = []io.Writer{}
-            if msg.Stdout {showObj = append(showObj, os.Stdout)} 
-            if msg.File != `` {
-                file, err := os.OpenFile(msg.File, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666)
-                if err == nil {
-                    showObj = append(showObj, file)
-                    defer file.Close()
-                }else{log.Println(err)}
-            }
-            log.New(io.MultiWriter(showObj...),
-            msg.Prefix,
-            log.Ldate|log.Ltime).Println(msg.Msg_obj...)
-            return false
-        },
-    })
-    {//启动阻塞
-        b := s.Init()
-        for b.Islive() {
-            o.MQ.Push_tag(`block`,b)
-            time.Sleep(time.Duration(20)*time.Millisecond)
-        }
-    }    
-    return
+       o = &Log_interface{
+               Config: c,
+       }
+       if c.File != `` {
+               p.File().NewPath(c.File)
+       }
+
+       o.MQ = m.New()
+       o.MQ.Pull_tag(map[string]func(interface{}) bool{
+               `block`: func(data interface{}) bool {
+                       if v, ok := data.(*s.Signal); ok {
+                               v.Done()
+                       }
+                       return false
+               },
+               `L`: func(data interface{}) bool {
+                       msg := data.(Msg_item)
+                       var showObj = []io.Writer{}
+                       if msg.Stdout {
+                               showObj = append(showObj, os.Stdout)
+                       }
+                       if msg.File != `` {
+                               file, err := os.OpenFile(msg.File, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666)
+                               if err == nil {
+                                       showObj = append(showObj, file)
+                                       defer file.Close()
+                               } else {
+                                       log.Println(err)
+                               }
+                       }
+                       log.New(io.MultiWriter(showObj...),
+                               msg.Prefix,
+                               log.Ldate|log.Ltime).Println(msg.Msg_obj...)
+                       return false
+               },
+       })
+       { //启动阻塞
+               b := s.Init()
+               for b.Islive() {
+                       o.MQ.Push_tag(`block`, b)
+                       time.Sleep(time.Duration(20) * time.Millisecond)
+               }
+       }
+       return
 }
 
-//
-func Copy(i *Log_interface)(o *Log_interface){
-    o = &Log_interface{
-        Config:(*i).Config,
-        MQ:(*i).MQ,
-    }
-    {//启动阻塞
-        b := s.Init()
-        for b.Islive() {
-            o.MQ.Push_tag(`block`,b)
-            time.Sleep(time.Duration(20)*time.Millisecond)
-        }
-    }
-    return
+func Copy(i *Log_interface) (o *Log_interface) {
+       o = &Log_interface{
+               Config: (*i).Config,
+               MQ:     (*i).MQ,
+       }
+       { //启动阻塞
+               b := s.Init()
+               for b.Islive() {
+                       o.MQ.Push_tag(`block`, b)
+                       time.Sleep(time.Duration(20) * time.Millisecond)
+               }
+       }
+       return
 }
 
-//Level 设置之后日志等级
+// Level 设置之后日志等级
 func (I *Log_interface) Level(log map[string]struct{}) (O *Log_interface) {
-    O = Copy(I)
-    for k,_ := range O.Prefix_string {
-        if _,ok := log[k];!ok{delete(O.Prefix_string,k)}
-    }
-    return
+       O = Copy(I)
+       for k, _ := range O.Prefix_string {
+               if _, ok := log[k]; !ok {
+                       delete(O.Prefix_string, k)
+               }
+       }
+       return
 }
 
-//Open 日志不显示
+// Open 日志不显示
 func (I *Log_interface) Log_show_control(show bool) (O *Log_interface) {
-    O = Copy(I)
-    //
-    O.Block(100)
-    O.Stdout = show
-    return
+       O = Copy(I)
+       //
+       O.Block(100)
+       O.Stdout = show
+       return
 }
 
-//Open 日志输出至文件
+// Open 日志输出至文件
 func (I *Log_interface) Log_to_file(fileP string) (O *Log_interface) {
-    O=I
-    //
-    O.Block(100)
-    O.File = fileP
-    if O.File != `` {p.File().NewPath(O.File)}
-    return
+       O = I
+       //
+       O.Block(100)
+       O.File = fileP
+       if O.File != `` {
+               p.File().NewPath(O.File)
+       }
+       return
 }
 
-//Block 阻塞直到本轮日志输出完毕
+// Block 阻塞直到本轮日志输出完毕
 func (I *Log_interface) Block(timeout int) (O *Log_interface) {
-    O=I
-    b := s.Init()
-    O.MQ.Push_tag(`block`,b)
-    b.Wait()
-    return
+       O = I
+       b := s.Init()
+       O.MQ.Push_tag(`block`, b)
+       b.Wait()
+       return
 }
 
-//日志等级
-//Base 追加到后续输出
+// 日志等级
+// Base 追加到后续输出
 func (I *Log_interface) Base(i ...interface{}) (O *Log_interface) {
-    O=Copy(I)
-    O.Base_string = i
-    return
+       O = Copy(I)
+       O.Base_string = i
+       return
 }
 func (I *Log_interface) Base_add(i ...interface{}) (O *Log_interface) {
-    O=Copy(I)
-    O.Base_string = append(O.Base_string, i...)
-    return
+       O = Copy(I)
+       O.Base_string = append(O.Base_string, i...)
+       return
 }
 func (I *Log_interface) L(prefix string, i ...interface{}) (O *Log_interface) {
-    O=I
-    if _,ok := O.Prefix_string[prefix];!ok{return}
-
-    O.MQ.Push_tag(`L`,Msg_item{
-        Prefix:prefix,
-        Msg_obj:append(O.Base_string, i),
-        Config:O.Config,
-    })
-    return
-}
\ No newline at end of file
+       O = I
+       if _, ok := O.Prefix_string[prefix]; !ok {
+               return
+       }
+
+       O.MQ.Push_tag(`L`, Msg_item{
+               Prefix:  prefix,
+               Msg_obj: append(O.Base_string, i),
+               Config:  O.Config,
+       })
+       return
+}
index d9736d576cbe9332775f7cc700b440bfe683a4dd..b30eca41d682e41a1c841de318469568424e20e6 100644 (file)
 package part
 
 import (
-       "sync"
-       "time"
-       "runtime"
        "container/list"
+       "sync"
 )
 
 type Msgq struct {
-       data_list *list.List
-       wait_push chan struct{}
-       max_data_mun int
-       ticker *time.Ticker
-       sig uint64
+       funcs *list.List
        sync.RWMutex
 }
 
-type Msgq_item struct {
-       data interface{}
-       sig uint64
-}
-
-type FuncMap map[string]func(interface{})(bool)
-
-func New(want_max_data_mun int) (*Msgq) {
+func New() *Msgq {
        m := new(Msgq)
-       (*m).wait_push = make(chan struct{},10)
-       (*m).data_list = list.New()
-       (*m).max_data_mun = want_max_data_mun
-       (*m).ticker = time.NewTicker(time.Duration(25)*time.Millisecond)
+       m.funcs = list.New()
        return m
 }
 
-func (m *Msgq) Push(msg interface{}) {
+func (m *Msgq) Register(f func(any) (disable bool)) {
        m.Lock()
-       defer m.Unlock()
-       m.data_list.PushBack(Msgq_item{
-               data:msg,
-               sig:m.get_sig(),
-       })
-       if m.data_list.Len() > m.max_data_mun {m.data_list.Remove(m.data_list.Front())}
-
-       var pull_num int
-       for len(m.wait_push) == 0 {
-               pull_num += 1
-               m.wait_push <- struct{}{}
-       }
-       if pull_num < 1 {<- m.ticker.C}
-       runtime.Gosched()
-       select {
-       case <- m.wait_push:
-       case <- m.ticker.C:
-       }
+       m.funcs.PushBack(f)
+       m.Unlock()
 }
 
-func (m *Msgq) Pull(old_sig uint64) (data interface{},sig uint64) {
-       for old_sig == m.Sig() {
-               select {
-               case <- m.wait_push:
-               case <- m.ticker.C:
-               }
-       }
+func (m *Msgq) Push(msg any) {
        m.RLock()
-       defer m.RUnlock()
-
-       if int(m.Sig() - old_sig) > m.max_data_mun {return nil,m.Sig()}
-
-       for el := m.data_list.Front();el != nil;el = el.Next() {
-               if old_sig < el.Value.(Msgq_item).sig {
-                       data = el.Value.(Msgq_item).data
-                       sig = el.Value.(Msgq_item).sig
-                       return
+       for el := m.funcs.Front(); el != nil; el = el.Next() {
+               if disable := el.Value.(func(any) bool)(msg); disable {
+                       m.funcs.Remove(el)
                }
        }
-       return
-}
-
-func (m *Msgq) Sig() (sig uint64) {
-       if el := m.data_list.Back();el == nil {
-               sig = 0
-       } else {
-               sig = el.Value.(Msgq_item).sig
-       }
-       return
-}
-
-func (m *Msgq) get_sig() (sig uint64) {
-       m.sig += 1
-       return m.sig
+       m.RUnlock()
 }
 
 type Msgq_tag_data struct {
-       Tag string
+       Tag  string
        Data interface{}
 }
 
-func (m *Msgq) Push_tag(Tag string,Data interface{}) {
+func (m *Msgq) Push_tag(Tag string, Data interface{}) {
        m.Push(Msgq_tag_data{
-               Tag:Tag,
-               Data:Data,
+               Tag:  Tag,
+               Data: Data,
        })
 }
 
-func (m *Msgq) Pull_tag(func_map map[string]func(interface{})(bool)) {
-       go func(){
-               var (
-                       sig = m.Sig()
-                       data interface{}
-               )
-               for {
-                       data,sig = m.Pull(sig)
-                       if d,ok := data.(Msgq_tag_data);!ok{
-                               if f,ok := func_map[`Error`];ok{
-                                       if f(d.Data) {break}
-                               }
-                       } else {
-                               if f,ok := func_map[d.Tag];ok{
-                                       if f(d.Data) {break}
-                               }
+func (m *Msgq) Pull_tag(func_map map[string]func(any) (disable bool)) {
+       m.Register(func(data any) (disable bool) {
+               if d, ok := data.(Msgq_tag_data); !ok {
+                       if f, ok := func_map[`Error`]; ok {
+                               return f(d.Data)
+                       }
+               } else {
+                       if f, ok := func_map[d.Tag]; ok {
+                               return f(d.Data)
                        }
                }
-       }()
-}
\ No newline at end of file
+               return false
+       })
+}
index b2952d3345810f53d7e1db888431dea0b971948b..08846c490ce2afd6fc2e7b7ad6faf5b69dfbb9e3 100644 (file)
@@ -2,126 +2,123 @@ package part
 
 import (
        "fmt"
-       "net/http"
        _ "net/http/pprof"
        "testing"
        "time"
-
-       sys "github.com/qydysky/part/sys"
 )
 
 type test_item struct {
        data string
 }
 
-func Test_msgq(t *testing.T) {
+// func Test_msgq(t *testing.T) {
 
-       mq := New(5)
-       mun := 100000
-       mun_c := make(chan bool, mun)
-       mun_s := make(chan bool, mun)
+//     mq := New(5)
+//     mun := 100000
+//     mun_c := make(chan bool, mun)
+//     mun_s := make(chan bool, mun)
 
-       var e int
+//     var e int
 
-       sig := mq.Sig()
-       for i := 0; i < mun; i++ {
-               go func() {
-                       mun_c <- true
-                       data, t0 := mq.Pull(sig)
-                       if o, ok := data.(string); o != `mmm` || !ok {
-                               e = 1
-                       }
-                       data1, _ := mq.Pull(t0)
-                       if o, ok := data1.(string); o != `mm1` || !ok {
-                               e = 2
-                       }
-                       mun_s <- true
-               }()
-       }
+//     sig := mq.Sig()
+//     for i := 0; i < mun; i++ {
+//             go func() {
+//                     mun_c <- true
+//                     data, t0 := mq.Pull(sig)
+//                     if o, ok := data.(string); o != `mmm` || !ok {
+//                             e = 1
+//                     }
+//                     data1, _ := mq.Pull(t0)
+//                     if o, ok := data1.(string); o != `mm1` || !ok {
+//                             e = 2
+//                     }
+//                     mun_s <- true
+//             }()
+//     }
 
-       for len(mun_c) != mun {
-               t.Log(`>`, len(mun_c))
-               sys.Sys().Timeoutf(1)
-       }
-       t.Log(`>`, len(mun_c))
+//     for len(mun_c) != mun {
+//             t.Log(`>`, len(mun_c))
+//             sys.Sys().Timeoutf(1)
+//     }
+//     t.Log(`>`, len(mun_c))
 
-       t.Log(`push mmm`)
-       mq.Push(`mmm`)
-       t.Log(`push mm1`)
-       mq.Push(`mm1`)
+//     t.Log(`push mmm`)
+//     mq.Push(`mmm`)
+//     t.Log(`push mm1`)
+//     mq.Push(`mm1`)
 
-       for len(mun_s) != mun {
-               t.Log(`<`, len(mun_s))
-               sys.Sys().Timeoutf(1)
-       }
-       t.Log(`<`, len(mun_s))
+//     for len(mun_s) != mun {
+//             t.Log(`<`, len(mun_s))
+//             sys.Sys().Timeoutf(1)
+//     }
+//     t.Log(`<`, len(mun_s))
 
-       if e != 0 {
-               t.Error(e)
-       }
-}
+//     if e != 0 {
+//             t.Error(e)
+//     }
+// }
 
-func Test_msgq2(t *testing.T) {
-       mq := New(5)
+// func Test_msgq2(t *testing.T) {
+//     mq := New(5)
 
-       mun_c := make(chan bool, 100)
-       go func() {
-               var (
-                       sig  = mq.Sig()
-                       data interface{}
-               )
-               for {
-                       data, sig = mq.Pull(sig)
-                       if data.(test_item).data != `aa1` {
-                               t.Error(`1`)
-                       }
-                       mun_c <- true
-               }
-       }()
-       go func() {
-               var (
-                       sig  = mq.Sig()
-                       data interface{}
-               )
-               for {
-                       data, sig = mq.Pull(sig)
-                       if data.(test_item).data != `aa1` {
-                               t.Error(`2`)
-                       }
-                       mun_c <- true
-               }
-       }()
-       go func() {
-               var (
-                       sig  = mq.Sig()
-                       data interface{}
-               )
-               for {
-                       data, sig = mq.Pull(sig)
-                       if data.(test_item).data != `aa1` {
-                               t.Error(`3`)
-                       }
-                       mun_c <- true
-               }
-       }()
-       var fin_turn = 0
-       t.Log(`start`)
-       time.Sleep(time.Second)
-       for fin_turn < 1000000 {
-               mq.Push(test_item{
-                       data: `aa1`,
-               })
-               <-mun_c
-               <-mun_c
-               <-mun_c
-               fin_turn += 1
-               fmt.Print("\r", fin_turn)
-       }
-       t.Log(`fin`)
-}
+//     mun_c := make(chan bool, 100)
+//     go func() {
+//             var (
+//                     sig  = mq.Sig()
+//                     data interface{}
+//             )
+//             for {
+//                     data, sig = mq.Pull(sig)
+//                     if data.(test_item).data != `aa1` {
+//                             t.Error(`1`)
+//                     }
+//                     mun_c <- true
+//             }
+//     }()
+//     go func() {
+//             var (
+//                     sig  = mq.Sig()
+//                     data interface{}
+//             )
+//             for {
+//                     data, sig = mq.Pull(sig)
+//                     if data.(test_item).data != `aa1` {
+//                             t.Error(`2`)
+//                     }
+//                     mun_c <- true
+//             }
+//     }()
+//     go func() {
+//             var (
+//                     sig  = mq.Sig()
+//                     data interface{}
+//             )
+//             for {
+//                     data, sig = mq.Pull(sig)
+//                     if data.(test_item).data != `aa1` {
+//                             t.Error(`3`)
+//                     }
+//                     mun_c <- true
+//             }
+//     }()
+//     var fin_turn = 0
+//     t.Log(`start`)
+//     time.Sleep(time.Second)
+//     for fin_turn < 1000000 {
+//             mq.Push(test_item{
+//                     data: `aa1`,
+//             })
+//             <-mun_c
+//             <-mun_c
+//             <-mun_c
+//             fin_turn += 1
+//             fmt.Print("\r", fin_turn)
+//     }
+//     t.Log(`fin`)
+// }
 
 func Test_msgq3(t *testing.T) {
-       mq := New(100)
+       mq := New()
 
        mun_c := make(chan int, 100)
        mq.Pull_tag(map[string]func(interface{}) bool{
@@ -149,7 +146,7 @@ func Test_msgq3(t *testing.T) {
 
 func Test_msgq4(t *testing.T) {
        // mq := New(30)
-       mq := New(3) //out of list
+       mq := New() //out of list
 
        mun_c1 := make(chan bool, 100)
        mun_c2 := make(chan bool, 100)
@@ -214,7 +211,7 @@ func Test_msgq4(t *testing.T) {
 }
 
 func Test_msgq5(t *testing.T) {
-       mq := New(30)
+       mq := New()
 
        mun_c1 := make(chan bool, 100)
        mun_c2 := make(chan bool, 100)
@@ -273,37 +270,34 @@ func Test_msgq5(t *testing.T) {
        t.Log(`fin`)
 }
 
-func Test_msgq6(t *testing.T) {
-       mq := New(30)
-       go func() {
-               http.ListenAndServe("0.0.0.0:8899", nil)
-       }()
-       go mq.Pull_tag(map[string]func(interface{}) bool{
-               `A1`: func(data interface{}) bool {
-                       return false
-               },
-               `A2`: func(data interface{}) bool {
-                       if v, ok := data.(string); !ok || v != `a11` {
-                               t.Error(`2`)
-                       }
-                       return false
-               },
-               `Error`: func(data interface{}) bool {
-                       if data == nil {
-                               t.Error(`out of list`)
-                       }
-                       return false
-               },
-       })
+// func Test_msgq6(t *testing.T) {
+//     mq := New()
+//     go mq.Pull_tag(map[string]func(interface{}) bool{
+//             `A1`: func(data interface{}) bool {
+//                     return false
+//             },
+//             `A2`: func(data interface{}) bool {
+//                     if v, ok := data.(string); !ok || v != `a11` {
+//                             t.Error(`2`)
+//                     }
+//                     return false
+//             },
+//             `Error`: func(data interface{}) bool {
+//                     if data == nil {
+//                             t.Error(`out of list`)
+//                     }
+//                     return false
+//             },
+//     })
 
-       var fin_turn = 0
-       t.Log(`start`)
-       for fin_turn < 1000 {
-               time.Sleep(time.Second)
-               time.Sleep(time.Second)
-               mq.Push_tag(`A1`, `a11`)
-               fin_turn += 1
-               fmt.Print("\r", fin_turn)
-       }
-       t.Log(`fin`)
-}
+//     var fin_turn = 0
+//     t.Log(`start`)
+//     for fin_turn < 1000 {
+//             time.Sleep(time.Second)
+//             time.Sleep(time.Second)
+//             mq.Push_tag(`A1`, `a11`)
+//             fin_turn += 1
+//             fmt.Print("\r", fin_turn)
+//     }
+//     t.Log(`fin`)
+// }
index f9178bfc47c93ad168897f575cfb836cc4c4a369..738938a4fc20725b1552ec7d1c781b744532af56 100644 (file)
@@ -27,7 +27,7 @@ type uinterface struct { //内部消息
 
 func New_server() *Server {
        return &Server{
-               ws_mq:    mq.New(200),                                             //收发通道
+               ws_mq:    mq.New(),                                                //收发通道
                userpool: idpool.New(func() interface{} { return new(struct{}) }), //浏览器标签页池
        }
 }
@@ -129,23 +129,23 @@ func (t *Server) WS(w http.ResponseWriter, r *http.Request) (o chan uintptr) {
 //                             return false
 //                     }
 
-//                     ws_mq.Push_tag(`send`,Uinterface{//just reply
-//                             Id:tmp.Id,
-//                             Data:tmp.Data,
-//                     })
-//                     //or
-//                     ws_mq.Push_tag(`send`,Uinterface{//just reply
-//                             Id:0,//send to all
-//                             Data:tmp.Data,
-//                     })
-//             }
-//             return false
-//     },
-//     `error`:func(data interface{})(bool){
-//             log.Println(data)
-//             return false
-//     },
-// })
+//                             ws_mq.Push_tag(`send`,Uinterface{//just reply
+//                                     Id:tmp.Id,
+//                                     Data:tmp.Data,
+//                             })
+//                             //or
+//                             ws_mq.Push_tag(`send`,Uinterface{//just reply
+//                                     Id:0,//send to all
+//                                     Data:tmp.Data,
+//                             })
+//                     }
+//                     return false
+//             },
+//             `error`:func(data interface{})(bool){
+//                     log.Println(data)
+//                     return false
+//             },
+//     })
 func (t *Server) Interface() *mq.Msgq {
        return t.ws_mq
 }