package part
+import (
+ // "fmt"
+ "sync"
+ "time"
+ p "github.com/qydysky/part"
+)
+
+const push_mute = 25
+
type msgq struct {
- d interface{}
- i chan struct{}
- o chan struct{}
+ data msgq_item
+ wait_push p.Signal
+ push_mute int
+ sync.RWMutex
}
-func New() (*msgq) {
- l := new(msgq)
- (*l).i = make(chan struct{})
- (*l).o = make(chan struct{})
- close((*l).i)
- return l
+type msgq_item struct {
+ data interface{}
+ sig time.Time
+}
+
+func New(want_push_mute ...int) (*msgq) {
+ m := new(msgq)
+ (*m).wait_push.Init()
+ (*m).data.sig = time.Now()
+ (*m).push_mute = push_mute
+ //太小的禁言时间,将可能出现丢失push的情况
+ if len(want_push_mute) != 0 && want_push_mute[0] >= 0{(*m).push_mute = want_push_mute[0]}
+ return m
}
func (m *msgq) Push(msg interface{}) {
- <- m.i
- m.i = make(chan struct{})
- m.d = msg
- close(m.o)
- m.o = make(chan struct{})
- close(m.i)
+ m.Lock()
+ defer m.Unlock()
+
+ m.wait_push.Done()
+
+ m.data = msgq_item{
+ data:msg,
+ sig:time.Now(),
+ }
+
+ if m.push_mute != 0 {p.Sys().MTimeoutf(m.push_mute)}
}
-func (m *msgq) Pull() (o interface{}) {
- <- m.i
- <- m.o
- return m.d
+func (m *msgq) Pull(sig ...time.Time) (interface{},time.Time) {
+ if m.wait_push.Islive() || len(sig) != 0 && sig[0] == m.Sig() {
+ m.wait_push.Init().Wait()
+ }
+ //若不提供sig用以识别,将可能出现重复取得的情况
+
+ m.RLock()
+ defer m.RUnlock()
+ return m.data.data,m.Sig()
}
+
+func (m *msgq) Sig() (time.Time) {
+ return m.data.sig
+}
\ No newline at end of file
)
func Test_msgq(t *testing.T) {
- mq := New()
- k := 0
- var e bool
- for i:=0;i<1e5;i++{
+
+
+ mq := New(25)
+ mun := 1000000
+ mun_c := make(chan bool,mun)
+ mun_s := make(chan bool,mun)
+
+ var e int
+
+ sig := mq.Sig()
+ for i:=0;i<mun;i++{
go func(){
- k += 1
- if o,ok:=mq.Pull().(string);o != `mmm`||!ok {e = true}
- k += 1
+ mun_c <- true
+ data,t0 := mq.Pull(sig)
+ if o,ok:=data.(string);o != `mmm`||!ok {e = 1}
+ data1,_ := mq.Pull(t0)
+ if o,ok:=data1.(string);o != `mm1`||!ok {e = 2}
+ mun_s <- true
}()
}
- p.Sys().Timeoutf(2)
- t.Log(`>`,k)
- k = 0
+ for len(mun_c) != mun {
+ t.Log(`>`,len(mun_c))
+ p.Sys().Timeoutf(1)
+ }
+ t.Log(`>`,len(mun_c))
+
+ t.Log(`push mmm`)
mq.Push(`mmm`)
+ t.Log(`push mm1`)
+ mq.Push(`mm1`)
+
+ for len(mun_s) != mun {
+ t.Log(`<`,len(mun_s))
+ p.Sys().Timeoutf(1)
+ }
+ t.Log(`<`,len(mun_s))
- p.Sys().Timeoutf(1)
- t.Log(`<`,k)
- if e {t.Error("f")}
+ if e!=0 {t.Error(e)}
}