From: qydysky Date: Thu, 19 Nov 2020 03:29:22 +0000 (+0800) Subject: 92 X-Git-Tag: v0.3.0 X-Git-Url: http://127.0.0.1:8081/?a=commitdiff_plain;h=a6f2c34760950ece75b4ca2f801b0065eb8f6a4f;p=part%2F.git 92 --- diff --git a/Signal.go b/Signal.go index 88868e4..ae3c6e6 100644 --- a/Signal.go +++ b/Signal.go @@ -6,7 +6,7 @@ type Signal struct{ func (i *Signal) Init() (o *Signal) { o = i - o.Chan = make(chan struct{}) + if !i.Islive() {o.Chan = make(chan struct{})} return } diff --git a/msgq/Msgq.go b/msgq/Msgq.go index c5fabf1..fc87905 100644 --- a/msgq/Msgq.go +++ b/msgq/Msgq.go @@ -1,30 +1,61 @@ package part +import ( + // "fmt" + "sync" + "time" + p "github.com/qydysky/part" +) + +const push_mute = 25 + type msgq struct { - d interface{} - i chan struct{} - o chan struct{} + data msgq_item + wait_push p.Signal + push_mute int + sync.RWMutex } -func New() (*msgq) { - l := new(msgq) - (*l).i = make(chan struct{}) - (*l).o = make(chan struct{}) - close((*l).i) - return l +type msgq_item struct { + data interface{} + sig time.Time +} + +func New(want_push_mute ...int) (*msgq) { + m := new(msgq) + (*m).wait_push.Init() + (*m).data.sig = time.Now() + (*m).push_mute = push_mute + //太小的禁言时间,将可能出现丢失push的情况 + if len(want_push_mute) != 0 && want_push_mute[0] >= 0{(*m).push_mute = want_push_mute[0]} + return m } func (m *msgq) Push(msg interface{}) { - <- m.i - m.i = make(chan struct{}) - m.d = msg - close(m.o) - m.o = make(chan struct{}) - close(m.i) + m.Lock() + defer m.Unlock() + + m.wait_push.Done() + + m.data = msgq_item{ + data:msg, + sig:time.Now(), + } + + if m.push_mute != 0 {p.Sys().MTimeoutf(m.push_mute)} } -func (m *msgq) Pull() (o interface{}) { - <- m.i - <- m.o - return m.d +func (m *msgq) Pull(sig ...time.Time) (interface{},time.Time) { + if m.wait_push.Islive() || len(sig) != 0 && sig[0] == m.Sig() { + m.wait_push.Init().Wait() + } + //若不提供sig用以识别,将可能出现重复取得的情况 + + m.RLock() + defer m.RUnlock() + return m.data.data,m.Sig() } + +func (m *msgq) Sig() (time.Time) { + return m.data.sig +} \ No newline at end of file diff --git a/msgq/Msgq_test.go b/msgq/Msgq_test.go index 3740ac5..e2c25a9 100644 --- a/msgq/Msgq_test.go +++ b/msgq/Msgq_test.go @@ -6,23 +6,43 @@ import ( ) func Test_msgq(t *testing.T) { - mq := New() - k := 0 - var e bool - for i:=0;i<1e5;i++{ + + + mq := New(25) + mun := 1000000 + mun_c := make(chan bool,mun) + mun_s := make(chan bool,mun) + + var e int + + sig := mq.Sig() + for i:=0;i`,k) - k = 0 + for len(mun_c) != mun { + t.Log(`>`,len(mun_c)) + p.Sys().Timeoutf(1) + } + t.Log(`>`,len(mun_c)) + + t.Log(`push mmm`) mq.Push(`mmm`) + t.Log(`push mm1`) + mq.Push(`mm1`) + + for len(mun_s) != mun { + t.Log(`<`,len(mun_s)) + p.Sys().Timeoutf(1) + } + t.Log(`<`,len(mun_s)) - p.Sys().Timeoutf(1) - t.Log(`<`,k) - if e {t.Error("f")} + if e!=0 {t.Error(e)} }