data_list *list.List
wait_push chan struct{}
max_data_mun int
+ ticker *time.Ticker
sig uint64
sync.RWMutex
}
(*m).wait_push = make(chan struct{},10)
(*m).data_list = list.New()
(*m).max_data_mun = want_max_data_mun
+ (*m).ticker = time.NewTicker(time.Duration(25)*time.Millisecond)
return m
}
sig:m.get_sig(),
})
if m.data_list.Len() > m.max_data_mun {m.data_list.Remove(m.data_list.Front())}
- for len(m.wait_push) == 0 {m.wait_push <- struct{}{}}
+
+ var pull_num int
+ for len(m.wait_push) == 0 {
+ pull_num += 1
+ m.wait_push <- struct{}{}
+ }
+ if pull_num < 1 {<- m.ticker.C}
select {
case <- m.wait_push:
- case <- time.After(time.Millisecond):
+ case <- m.ticker.C:
}
}
for old_sig == m.Sig() {
select {
case <- m.wait_push:
- case <- time.After(time.Millisecond):
+ case <- m.ticker.C:
}
}
m.RLock()
defer m.RUnlock()
+ if int(m.Sig() - old_sig) > m.max_data_mun {return nil,m.Sig()}
+
for el := m.data_list.Front();el != nil;el = el.Next() {
if old_sig < el.Value.(Msgq_item).sig {
data = el.Value.(Msgq_item).data
sig = el.Value.(Msgq_item).sig
- break
+ return
}
}
return
)
for {
data,sig = m.Pull(sig)
- if d,ok := data.(Msgq_tag_data);!ok {
- continue
+ if d,ok := data.(Msgq_tag_data);!ok{
+ if f,ok := func_map[`Error`];ok{
+ if f(d.Data) {break}
+ }
} else {
if f,ok := func_map[d.Tag];ok{
if f(d.Data) {break}
var fin_turn = 0
t.Log(`start`)
time.Sleep(time.Second)
- for fin_turn < 10000000 {
+ for fin_turn < 1000000 {
mq.Push_tag(`A1`,fin_turn)
if fin_turn != <-mun_c {t.Error(fin_turn)}
fin_turn += 1
}
func Test_msgq4(t *testing.T) {
- mq := New(5)
+ // mq := New(30)
+ mq := New(3)//out of list
- mun_c := make(chan bool,100)
+ mun_c1 := make(chan bool,100)
+ mun_c2 := make(chan bool,100)
+ mun_c3 := make(chan bool,100)
mq.Pull_tag(map[string]func(interface{})(bool){
`A1`:func(data interface{})(bool){
if v,ok := data.(string);!ok || v != `a11`{t.Error(`1`)}
- mun_c <- true
+ mun_c1 <- true
+ return false
+ },
+ `A2`:func(data interface{})(bool){
+ if v,ok := data.(string);!ok || v != `a11`{t.Error(`2`)}
+ mun_c2 <- true
+ return false
+ },
+ `Error`:func(data interface{})(bool){
+ if data == nil {t.Error(`out of list`)}
return false
},
})
mq.Pull_tag(map[string]func(interface{})(bool){
`A1`:func(data interface{})(bool){
if v,ok := data.(string);!ok || v != `a11`{t.Error(`2`)}
- mun_c <- true
+ mun_c3 <- true
+ return false
+ },
+ `Error`:func(data interface{})(bool){
+ if data == nil {t.Error(`out of list`)}
+ return false
+ },
+ })
+
+ var fin_turn = 0
+ t.Log(`start`)
+ time.Sleep(time.Second)
+ for fin_turn < 5 {
+ go mq.Push_tag(`A1`,`a11`)
+ go mq.Push_tag(`A1`,`a11`)
+ go mq.Push_tag(`A1`,`a11`)
+ // mq.Push_tag(`A4`,`a11`)
+ go mq.Push_tag(`A1`,`a11`)
+ mq.Push_tag(`A1`,`a11`)
+ mq.Push_tag(`A2`,`a11`)
+ // mq.Push_tag(`A4`,`a11`)
+ <-mun_c2
+ <-mun_c1
+ // <-mun_c3
+ fin_turn += 1
+ fmt.Print("\r",fin_turn)
+ }
+ t.Log(`fin`)
+}
+
+func Test_msgq5(t *testing.T) {
+ mq := New(30)
+
+ mun_c1 := make(chan bool,100)
+ mun_c2 := make(chan bool,100)
+ go mq.Pull_tag(map[string]func(interface{})(bool){
+ `A1`:func(data interface{})(bool){
+ time.Sleep(time.Second)//will block
+ return false
+ },
+ `A2`:func(data interface{})(bool){
+ if v,ok := data.(string);!ok || v != `a11`{t.Error(`2`)}
+ mun_c2 <- true
+ return false
+ },
+ `Error`:func(data interface{})(bool){
+ if data == nil {t.Error(`out of list`)}
return false
},
})
mq.Pull_tag(map[string]func(interface{})(bool){
`A1`:func(data interface{})(bool){
- if v,ok := data.(string);!ok || v != `a11`{t.Error(`3`)}
- mun_c <- true
+ if v,ok := data.(string);!ok || v != `a11`{t.Error(`1`)}
+ mun_c1 <- true
+ return false
+ },
+ `A2`:func(data interface{})(bool){
+ if v,ok := data.(string);!ok || v != `a11`{t.Error(`2`)}
+ return false
+ },
+ `Error`:func(data interface{})(bool){
+ if data == nil {t.Error(`out of list`)}
return false
},
})
-
+
var fin_turn = 0
t.Log(`start`)
time.Sleep(time.Second)
- for fin_turn < 1000000 {
+ for fin_turn < 10 {
mq.Push_tag(`A1`,`a11`)
- <-mun_c
- <-mun_c
- <-mun_c
+ mq.Push_tag(`A2`,`a11`)
+ <-mun_c1
+ <-mun_c2
fin_turn += 1
fmt.Print("\r",fin_turn)
}