From: qydysky Date: Sun, 13 Dec 2020 12:08:21 +0000 (+0800) Subject: 93 X-Git-Tag: v0.3.3 X-Git-Url: http://127.0.0.1:8081/?a=commitdiff_plain;h=f36e49c32bba15c6f7bec9e89200876ba9223ef6;p=part%2F.git 93 --- diff --git a/msgq/Msgq.go b/msgq/Msgq.go index 032d5b1..413cc41 100644 --- a/msgq/Msgq.go +++ b/msgq/Msgq.go @@ -2,6 +2,7 @@ package part import ( "sync" + "time" "container/list" ) @@ -35,13 +36,22 @@ func (m *msgq) Push(msg interface{}) { }) if m.data_list.Len() > m.max_data_mun {m.data_list.Remove(m.data_list.Front())} for len(m.wait_push) == 0 {m.wait_push <- true} - <- m.wait_push + select { + case <- m.wait_push: + case <- time.After(time.Millisecond): + } } func (m *msgq) Pull(old_sig uint64) (data interface{},sig uint64) { - if old_sig == m.Sig() || m.data_list.Len() == 0 {<- m.wait_push} + for old_sig == m.Sig() { + select { + case <- m.wait_push: + case <- time.After(time.Millisecond): + } + } m.RLock() defer m.RUnlock() + for el := m.data_list.Front();el != nil;el = el.Next() { if old_sig < el.Value.(msgq_item).sig { data = el.Value.(msgq_item).data @@ -54,7 +64,7 @@ func (m *msgq) Pull(old_sig uint64) (data interface{},sig uint64) { func (m *msgq) Sig() (sig uint64) { if el := m.data_list.Back();el == nil { - sig = m.get_sig() + sig = 0 } else { sig = el.Value.(msgq_item).sig } diff --git a/msgq/Msgq_test.go b/msgq/Msgq_test.go index 9e80275..c918d23 100644 --- a/msgq/Msgq_test.go +++ b/msgq/Msgq_test.go @@ -15,7 +15,7 @@ func Test_msgq(t *testing.T) { mq := New(5) - mun := 1000000 + mun := 100000 mun_c := make(chan bool,mun) mun_s := make(chan bool,mun) @@ -79,15 +79,27 @@ func Test_msgq2(t *testing.T) { mun_c <- true } }() + go func(){ + var ( + sig = mq.Sig() + data interface{} + ) + for { + data,sig = mq.Pull(sig) + if data.(test_item).data != `aa1` {t.Error(`3`)} + mun_c <- true + } + }() var fin_turn = 0 t.Log(`start`) time.Sleep(time.Second) - for fin_turn < 10000 { + for fin_turn < 1000000 { mq.Push(test_item{ data:`aa1`, }) <-mun_c <-mun_c + <-mun_c fin_turn += 1 fmt.Print("\r",fin_turn) }