From 508c706d691b89f6f1fed8acdb4b6ee2e8c60e57 Mon Sep 17 00:00:00 2001 From: qydysky Date: Wed, 6 Jan 2021 00:00:37 +0800 Subject: [PATCH] =?utf8?q?97mq=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit --- msgq/Msgq.go | 24 +++++++++---- msgq/Msgq_test.go | 90 ++++++++++++++++++++++++++++++++++++++++------- 2 files changed, 96 insertions(+), 18 deletions(-) diff --git a/msgq/Msgq.go b/msgq/Msgq.go index 1fcd332..3851286 100644 --- a/msgq/Msgq.go +++ b/msgq/Msgq.go @@ -10,6 +10,7 @@ type Msgq struct { data_list *list.List wait_push chan struct{} max_data_mun int + ticker *time.Ticker sig uint64 sync.RWMutex } @@ -24,6 +25,7 @@ func New(want_max_data_mun int) (*Msgq) { (*m).wait_push = make(chan struct{},10) (*m).data_list = list.New() (*m).max_data_mun = want_max_data_mun + (*m).ticker = time.NewTicker(time.Duration(25)*time.Millisecond) return m } @@ -35,10 +37,16 @@ func (m *Msgq) Push(msg interface{}) { sig:m.get_sig(), }) if m.data_list.Len() > m.max_data_mun {m.data_list.Remove(m.data_list.Front())} - for len(m.wait_push) == 0 {m.wait_push <- struct{}{}} + + var pull_num int + for len(m.wait_push) == 0 { + pull_num += 1 + m.wait_push <- struct{}{} + } + if pull_num < 1 {<- m.ticker.C} select { case <- m.wait_push: - case <- time.After(time.Millisecond): + case <- m.ticker.C: } } @@ -46,17 +54,19 @@ func (m *Msgq) Pull(old_sig uint64) (data interface{},sig uint64) { for old_sig == m.Sig() { select { case <- m.wait_push: - case <- time.After(time.Millisecond): + case <- m.ticker.C: } } m.RLock() defer m.RUnlock() + if int(m.Sig() - old_sig) > m.max_data_mun {return nil,m.Sig()} + for el := m.data_list.Front();el != nil;el = el.Next() { if old_sig < el.Value.(Msgq_item).sig { data = el.Value.(Msgq_item).data sig = el.Value.(Msgq_item).sig - break + return } } return @@ -96,8 +106,10 @@ func (m *Msgq) Pull_tag(func_map map[string]func(interface{})(bool)) { ) for { data,sig = m.Pull(sig) - if d,ok := data.(Msgq_tag_data);!ok { - continue + if d,ok := data.(Msgq_tag_data);!ok{ + if f,ok := func_map[`Error`];ok{ + if f(d.Data) {break} + } } else { if f,ok := func_map[d.Tag];ok{ if f(d.Data) {break} diff --git a/msgq/Msgq_test.go b/msgq/Msgq_test.go index cd9ab78..98157e1 100644 --- a/msgq/Msgq_test.go +++ b/msgq/Msgq_test.go @@ -120,7 +120,7 @@ func Test_msgq3(t *testing.T) { var fin_turn = 0 t.Log(`start`) time.Sleep(time.Second) - for fin_turn < 10000000 { + for fin_turn < 1000000 { mq.Push_tag(`A1`,fin_turn) if fin_turn != <-mun_c {t.Error(fin_turn)} fin_turn += 1 @@ -130,39 +130,105 @@ func Test_msgq3(t *testing.T) { } func Test_msgq4(t *testing.T) { - mq := New(5) + // mq := New(30) + mq := New(3)//out of list - mun_c := make(chan bool,100) + mun_c1 := make(chan bool,100) + mun_c2 := make(chan bool,100) + mun_c3 := make(chan bool,100) mq.Pull_tag(map[string]func(interface{})(bool){ `A1`:func(data interface{})(bool){ if v,ok := data.(string);!ok || v != `a11`{t.Error(`1`)} - mun_c <- true + mun_c1 <- true + return false + }, + `A2`:func(data interface{})(bool){ + if v,ok := data.(string);!ok || v != `a11`{t.Error(`2`)} + mun_c2 <- true + return false + }, + `Error`:func(data interface{})(bool){ + if data == nil {t.Error(`out of list`)} return false }, }) mq.Pull_tag(map[string]func(interface{})(bool){ `A1`:func(data interface{})(bool){ if v,ok := data.(string);!ok || v != `a11`{t.Error(`2`)} - mun_c <- true + mun_c3 <- true + return false + }, + `Error`:func(data interface{})(bool){ + if data == nil {t.Error(`out of list`)} + return false + }, + }) + + var fin_turn = 0 + t.Log(`start`) + time.Sleep(time.Second) + for fin_turn < 5 { + go mq.Push_tag(`A1`,`a11`) + go mq.Push_tag(`A1`,`a11`) + go mq.Push_tag(`A1`,`a11`) + // mq.Push_tag(`A4`,`a11`) + go mq.Push_tag(`A1`,`a11`) + mq.Push_tag(`A1`,`a11`) + mq.Push_tag(`A2`,`a11`) + // mq.Push_tag(`A4`,`a11`) + <-mun_c2 + <-mun_c1 + // <-mun_c3 + fin_turn += 1 + fmt.Print("\r",fin_turn) + } + t.Log(`fin`) +} + +func Test_msgq5(t *testing.T) { + mq := New(30) + + mun_c1 := make(chan bool,100) + mun_c2 := make(chan bool,100) + go mq.Pull_tag(map[string]func(interface{})(bool){ + `A1`:func(data interface{})(bool){ + time.Sleep(time.Second)//will block + return false + }, + `A2`:func(data interface{})(bool){ + if v,ok := data.(string);!ok || v != `a11`{t.Error(`2`)} + mun_c2 <- true + return false + }, + `Error`:func(data interface{})(bool){ + if data == nil {t.Error(`out of list`)} return false }, }) mq.Pull_tag(map[string]func(interface{})(bool){ `A1`:func(data interface{})(bool){ - if v,ok := data.(string);!ok || v != `a11`{t.Error(`3`)} - mun_c <- true + if v,ok := data.(string);!ok || v != `a11`{t.Error(`1`)} + mun_c1 <- true + return false + }, + `A2`:func(data interface{})(bool){ + if v,ok := data.(string);!ok || v != `a11`{t.Error(`2`)} + return false + }, + `Error`:func(data interface{})(bool){ + if data == nil {t.Error(`out of list`)} return false }, }) - + var fin_turn = 0 t.Log(`start`) time.Sleep(time.Second) - for fin_turn < 1000000 { + for fin_turn < 10 { mq.Push_tag(`A1`,`a11`) - <-mun_c - <-mun_c - <-mun_c + mq.Push_tag(`A2`,`a11`) + <-mun_c1 + <-mun_c2 fin_turn += 1 fmt.Print("\r",fin_turn) } -- 2.39.2