]> 127.0.0.1 Git - part/.git/commitdiff
92 v0.3.0
authorqydysky <qydysky@foxmail.com>
Thu, 19 Nov 2020 03:29:22 +0000 (11:29 +0800)
committerqydysky <qydysky@foxmail.com>
Thu, 19 Nov 2020 03:29:22 +0000 (11:29 +0800)
Signal.go
msgq/Msgq.go
msgq/Msgq_test.go

index 88868e49406bf5a8329a58538051ef6667eef613..ae3c6e6b7e4b5294d69cab53354b643db5c4d7bf 100644 (file)
--- a/Signal.go
+++ b/Signal.go
@@ -6,7 +6,7 @@ type Signal struct{
 
 func (i *Signal) Init() (o *Signal) {
        o = i
-       o.Chan = make(chan struct{})
+       if !i.Islive() {o.Chan = make(chan struct{})}
        return
 }
 
index c5fabf1cfedc3a334de05350fa8541ce9b95ad80..fc87905c6ce977e9955df39541b0952433da6d98 100644 (file)
@@ -1,30 +1,61 @@
 package part
 
+import (
+       // "fmt"
+       "sync"
+       "time"
+       p "github.com/qydysky/part"
+)
+
+const push_mute = 25
+
 type msgq struct {
-       d interface{}
-       i chan struct{}
-       o chan struct{}
+       data msgq_item
+       wait_push p.Signal
+       push_mute int
+       sync.RWMutex
 }
 
-func New() (*msgq) {
-       l := new(msgq)
-       (*l).i = make(chan struct{})
-       (*l).o = make(chan struct{})
-       close((*l).i)
-       return l
+type msgq_item struct {
+       data interface{}
+       sig time.Time
+}
+
+func New(want_push_mute ...int) (*msgq) {
+       m := new(msgq)
+       (*m).wait_push.Init()
+       (*m).data.sig = time.Now()
+       (*m).push_mute = push_mute
+       //太小的禁言时间,将可能出现丢失push的情况
+       if len(want_push_mute) != 0 && want_push_mute[0] >= 0{(*m).push_mute = want_push_mute[0]}
+       return m
 }
 
 func (m *msgq) Push(msg interface{}) {
-       <- m.i
-       m.i = make(chan struct{})
-       m.d = msg
-       close(m.o)
-       m.o = make(chan struct{})
-       close(m.i)
+       m.Lock()
+       defer m.Unlock()
+
+       m.wait_push.Done()
+
+       m.data = msgq_item{
+               data:msg,
+               sig:time.Now(),
+       }
+
+       if m.push_mute != 0 {p.Sys().MTimeoutf(m.push_mute)}
 }
 
-func (m *msgq) Pull() (o interface{}) {
-       <- m.i
-       <- m.o
-       return m.d
+func (m *msgq) Pull(sig ...time.Time) (interface{},time.Time) {
+       if m.wait_push.Islive() || len(sig) != 0 && sig[0] == m.Sig() {
+               m.wait_push.Init().Wait()
+       }
+       //若不提供sig用以识别,将可能出现重复取得的情况
+
+       m.RLock()
+       defer m.RUnlock()
+       return m.data.data,m.Sig()
 }
+
+func (m *msgq) Sig() (time.Time) {
+       return m.data.sig
+}
\ No newline at end of file
index 3740ac58d176885be75cbaec17fea2a0c617d282..e2c25a945af54463247047b581d7d85c4a2d7da9 100644 (file)
@@ -6,23 +6,43 @@ import (
 )
 
 func Test_msgq(t *testing.T) {
-       mq := New()
-       k := 0
-       var e bool
-       for i:=0;i<1e5;i++{
+
+
+       mq := New(25)
+       mun := 1000000
+       mun_c := make(chan bool,mun)
+       mun_s := make(chan bool,mun)
+
+       var e int
+
+       sig := mq.Sig()
+       for i:=0;i<mun;i++{
                go func(){
-                       k += 1
-                       if o,ok:=mq.Pull().(string);o != `mmm`||!ok {e = true}
-                       k += 1
+                       mun_c <- true
+                       data,t0 := mq.Pull(sig)
+                       if o,ok:=data.(string);o != `mmm`||!ok {e = 1}
+                       data1,_ := mq.Pull(t0)
+                       if o,ok:=data1.(string);o != `mm1`||!ok {e = 2}
+                       mun_s <- true
                }()
        }
-       p.Sys().Timeoutf(2)
-       t.Log(`>`,k)
-       k = 0
 
+       for len(mun_c) != mun {
+               t.Log(`>`,len(mun_c))
+               p.Sys().Timeoutf(1)
+       }
+       t.Log(`>`,len(mun_c))
+
+       t.Log(`push mmm`)
        mq.Push(`mmm`)
+       t.Log(`push mm1`)
+       mq.Push(`mm1`)
+
+       for len(mun_s) != mun {
+               t.Log(`<`,len(mun_s))
+               p.Sys().Timeoutf(1)
+       }
+       t.Log(`<`,len(mun_s))
 
-       p.Sys().Timeoutf(1)
-       t.Log(`<`,k)
-       if e {t.Error("f")}
+       if e!=0 {t.Error(e)}
 }