]> 127.0.0.1 Git - part/.git/commitdiff
97mq优化
authorqydysky <qydysky@foxmail.com>
Tue, 5 Jan 2021 16:00:37 +0000 (00:00 +0800)
committerqydysky <qydysky@foxmail.com>
Tue, 5 Jan 2021 16:00:37 +0000 (00:00 +0800)
msgq/Msgq.go
msgq/Msgq_test.go

index 1fcd332c6eaf711242f932c544abb114fec078f7..38512862bf0e136678613024bb1b5422dd3d6cdf 100644 (file)
@@ -10,6 +10,7 @@ type Msgq struct {
        data_list *list.List
        wait_push chan struct{}
        max_data_mun int
+       ticker *time.Ticker
        sig uint64
        sync.RWMutex
 }
@@ -24,6 +25,7 @@ func New(want_max_data_mun int) (*Msgq) {
        (*m).wait_push = make(chan struct{},10)
        (*m).data_list = list.New()
        (*m).max_data_mun = want_max_data_mun
+       (*m).ticker = time.NewTicker(time.Duration(25)*time.Millisecond)
        return m
 }
 
@@ -35,10 +37,16 @@ func (m *Msgq) Push(msg interface{}) {
                sig:m.get_sig(),
        })
        if m.data_list.Len() > m.max_data_mun {m.data_list.Remove(m.data_list.Front())}
-       for len(m.wait_push) == 0 {m.wait_push <- struct{}{}}
+
+       var pull_num int
+       for len(m.wait_push) == 0 {
+               pull_num += 1
+               m.wait_push <- struct{}{}
+       }
+       if pull_num < 1 {<- m.ticker.C}
        select {
        case <- m.wait_push:
-       case <- time.After(time.Millisecond):
+       case <- m.ticker.C:
        }
 }
 
@@ -46,17 +54,19 @@ func (m *Msgq) Pull(old_sig uint64) (data interface{},sig uint64) {
        for old_sig == m.Sig() {
                select {
                case <- m.wait_push:
-               case <- time.After(time.Millisecond):
+               case <- m.ticker.C:
                }
        }
        m.RLock()
        defer m.RUnlock()
 
+       if int(m.Sig() - old_sig) > m.max_data_mun {return nil,m.Sig()}
+
        for el := m.data_list.Front();el != nil;el = el.Next() {
                if old_sig < el.Value.(Msgq_item).sig {
                        data = el.Value.(Msgq_item).data
                        sig = el.Value.(Msgq_item).sig
-                       break
+                       return
                }
        }
        return
@@ -96,8 +106,10 @@ func (m *Msgq) Pull_tag(func_map map[string]func(interface{})(bool)) {
                )
                for {
                        data,sig = m.Pull(sig)
-                       if d,ok := data.(Msgq_tag_data);!ok {
-                               continue
+                       if d,ok := data.(Msgq_tag_data);!ok{
+                               if f,ok := func_map[`Error`];ok{
+                                       if f(d.Data) {break}
+                               }
                        } else {
                                if f,ok := func_map[d.Tag];ok{
                                        if f(d.Data) {break}
index cd9ab784348da5b98f9a90f80c42115305244b13..98157e1ab8cd7063d693e5f7be7c48aedc8a6ac4 100644 (file)
@@ -120,7 +120,7 @@ func Test_msgq3(t *testing.T) {
        var fin_turn = 0
        t.Log(`start`)
        time.Sleep(time.Second)
-       for fin_turn < 10000000 {
+       for fin_turn < 1000000 {
                mq.Push_tag(`A1`,fin_turn)
                if fin_turn != <-mun_c {t.Error(fin_turn)}
                fin_turn += 1
@@ -130,39 +130,105 @@ func Test_msgq3(t *testing.T) {
 }
 
 func Test_msgq4(t *testing.T) {
-       mq := New(5)
+       // mq := New(30)
+       mq := New(3)//out of list
 
-       mun_c := make(chan bool,100)
+       mun_c1 := make(chan bool,100)
+       mun_c2 := make(chan bool,100)
+       mun_c3 := make(chan bool,100)
        mq.Pull_tag(map[string]func(interface{})(bool){
                `A1`:func(data interface{})(bool){
                        if v,ok := data.(string);!ok || v != `a11`{t.Error(`1`)}
-                       mun_c <- true
+                       mun_c1 <- true
+                       return false
+               },
+               `A2`:func(data interface{})(bool){
+                       if v,ok := data.(string);!ok || v != `a11`{t.Error(`2`)}
+                       mun_c2 <- true
+                       return false
+               },
+               `Error`:func(data interface{})(bool){
+                       if data == nil {t.Error(`out of list`)}
                        return false
                },
        })
        mq.Pull_tag(map[string]func(interface{})(bool){
                `A1`:func(data interface{})(bool){
                        if v,ok := data.(string);!ok || v != `a11`{t.Error(`2`)}
-                       mun_c <- true
+                       mun_c3 <- true
+                       return false
+               },
+               `Error`:func(data interface{})(bool){
+                       if data == nil {t.Error(`out of list`)}
+                       return false
+               },
+       })
+
+       var fin_turn = 0
+       t.Log(`start`)
+       time.Sleep(time.Second)
+       for fin_turn < 5 {
+               go mq.Push_tag(`A1`,`a11`)
+               go mq.Push_tag(`A1`,`a11`)
+               go mq.Push_tag(`A1`,`a11`)
+               // mq.Push_tag(`A4`,`a11`)
+               go mq.Push_tag(`A1`,`a11`)
+               mq.Push_tag(`A1`,`a11`)
+               mq.Push_tag(`A2`,`a11`)
+               // mq.Push_tag(`A4`,`a11`)
+               <-mun_c2                
+               <-mun_c1
+               // <-mun_c3
+               fin_turn += 1
+               fmt.Print("\r",fin_turn)
+       }
+       t.Log(`fin`)
+}
+
+func Test_msgq5(t *testing.T) {
+       mq := New(30)
+
+       mun_c1 := make(chan bool,100)
+       mun_c2 := make(chan bool,100)
+       go mq.Pull_tag(map[string]func(interface{})(bool){
+               `A1`:func(data interface{})(bool){
+                       time.Sleep(time.Second)//will block
+                       return false
+               },
+               `A2`:func(data interface{})(bool){
+                       if v,ok := data.(string);!ok || v != `a11`{t.Error(`2`)}
+                       mun_c2 <- true
+                       return false
+               },
+               `Error`:func(data interface{})(bool){
+                       if data == nil {t.Error(`out of list`)}
                        return false
                },
        })
        mq.Pull_tag(map[string]func(interface{})(bool){
                `A1`:func(data interface{})(bool){
-                       if v,ok := data.(string);!ok || v != `a11`{t.Error(`3`)}
-                       mun_c <- true
+                       if v,ok := data.(string);!ok || v != `a11`{t.Error(`1`)}
+                       mun_c1 <- true
+                       return false
+               },
+               `A2`:func(data interface{})(bool){
+                       if v,ok := data.(string);!ok || v != `a11`{t.Error(`2`)}
+                       return false
+               },
+               `Error`:func(data interface{})(bool){
+                       if data == nil {t.Error(`out of list`)}
                        return false
                },
        })
-
+       
        var fin_turn = 0
        t.Log(`start`)
        time.Sleep(time.Second)
-       for fin_turn < 1000000 {
+       for fin_turn < 10 {
                mq.Push_tag(`A1`,`a11`)
-               <-mun_c
-               <-mun_c
-               <-mun_c
+               mq.Push_tag(`A2`,`a11`)
+               <-mun_c1
+               <-mun_c2
                fin_turn += 1
                fmt.Print("\r",fin_turn)
        }