From: qydysky Date: Sun, 13 Dec 2020 03:11:29 +0000 (+0800) Subject: 92 X-Git-Tag: v0.3.2 X-Git-Url: http://127.0.0.1:8081/?a=commitdiff_plain;h=ca3253dc72adacb298aeb7374e4685d416ac2ab6;p=part%2F.git 92 --- diff --git a/msgq/Msgq.go b/msgq/Msgq.go index fc87905..032d5b1 100644 --- a/msgq/Msgq.go +++ b/msgq/Msgq.go @@ -1,61 +1,67 @@ package part import ( - // "fmt" "sync" - "time" - p "github.com/qydysky/part" + "container/list" ) -const push_mute = 25 - type msgq struct { - data msgq_item - wait_push p.Signal - push_mute int + data_list *list.List + wait_push chan bool + max_data_mun int + sig uint64 sync.RWMutex } type msgq_item struct { data interface{} - sig time.Time + sig uint64 } -func New(want_push_mute ...int) (*msgq) { +func New(want_max_data_mun int) (*msgq) { m := new(msgq) - (*m).wait_push.Init() - (*m).data.sig = time.Now() - (*m).push_mute = push_mute - //太小的禁言时间,将可能出现丢失push的情况 - if len(want_push_mute) != 0 && want_push_mute[0] >= 0{(*m).push_mute = want_push_mute[0]} + (*m).wait_push = make(chan bool,100) + (*m).data_list = list.New() + (*m).max_data_mun = want_max_data_mun return m } func (m *msgq) Push(msg interface{}) { m.Lock() defer m.Unlock() - - m.wait_push.Done() - - m.data = msgq_item{ + m.data_list.PushBack(msgq_item{ data:msg, - sig:time.Now(), - } - - if m.push_mute != 0 {p.Sys().MTimeoutf(m.push_mute)} + sig:m.get_sig(), + }) + if m.data_list.Len() > m.max_data_mun {m.data_list.Remove(m.data_list.Front())} + for len(m.wait_push) == 0 {m.wait_push <- true} + <- m.wait_push } -func (m *msgq) Pull(sig ...time.Time) (interface{},time.Time) { - if m.wait_push.Islive() || len(sig) != 0 && sig[0] == m.Sig() { - m.wait_push.Init().Wait() - } - //若不提供sig用以识别,将可能出现重复取得的情况 - +func (m *msgq) Pull(old_sig uint64) (data interface{},sig uint64) { + if old_sig == m.Sig() || m.data_list.Len() == 0 {<- m.wait_push} m.RLock() defer m.RUnlock() - return m.data.data,m.Sig() + for el := m.data_list.Front();el != nil;el = el.Next() { + if old_sig < el.Value.(msgq_item).sig { + data = el.Value.(msgq_item).data + sig = el.Value.(msgq_item).sig + break + } + } + return +} + +func (m *msgq) Sig() (sig uint64) { + if el := m.data_list.Back();el == nil { + sig = m.get_sig() + } else { + sig = el.Value.(msgq_item).sig + } + return } -func (m *msgq) Sig() (time.Time) { - return m.data.sig +func (m *msgq) get_sig() (sig uint64) { + m.sig += 1 + return m.sig } \ No newline at end of file diff --git a/msgq/Msgq_test.go b/msgq/Msgq_test.go index e2c25a9..9e80275 100644 --- a/msgq/Msgq_test.go +++ b/msgq/Msgq_test.go @@ -1,14 +1,20 @@ package part import ( + "time" + "fmt" "testing" p "github.com/qydysky/part" ) +type test_item struct { + data string +} + func Test_msgq(t *testing.T) { - mq := New(25) + mq := New(5) mun := 1000000 mun_c := make(chan bool,mun) mun_s := make(chan bool,mun) @@ -46,3 +52,44 @@ func Test_msgq(t *testing.T) { if e!=0 {t.Error(e)} } + +func Test_msgq2(t *testing.T) { + mq := New(5) + + mun_c := make(chan bool,100) + go func(){ + var ( + sig = mq.Sig() + data interface{} + ) + for { + data,sig = mq.Pull(sig) + if data.(test_item).data != `aa1` {t.Error(`1`)} + mun_c <- true + } + }() + go func(){ + var ( + sig = mq.Sig() + data interface{} + ) + for { + data,sig = mq.Pull(sig) + if data.(test_item).data != `aa1` {t.Error(`2`)} + mun_c <- true + } + }() + var fin_turn = 0 + t.Log(`start`) + time.Sleep(time.Second) + for fin_turn < 10000 { + mq.Push(test_item{ + data:`aa1`, + }) + <-mun_c + <-mun_c + fin_turn += 1 + fmt.Print("\r",fin_turn) + } + t.Log(`fin`) +} diff --git a/msgq/go.mod b/msgq/go.mod new file mode 100644 index 0000000..4fa99f7 --- /dev/null +++ b/msgq/go.mod @@ -0,0 +1,7 @@ +module github.com/qydysky/part/msgq + +go 1.15 + +require github.com/qydysky/part v0.3.1 + +replace github.com/qydysky/part => ../ diff --git a/msgq/go.sum b/msgq/go.sum new file mode 100644 index 0000000..2dee042 --- /dev/null +++ b/msgq/go.sum @@ -0,0 +1,39 @@ +github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d/go.mod h1:3eOhrUMpNV+6aFIbp5/iudMxNCF27Vw2OZgy4xEx0Fg= +github.com/andybalholm/brotli v1.0.1 h1:KqhlKozYbRtJvsPrrEeXcO+N2l6NYT5A2QAFmSULpEc= +github.com/andybalholm/brotli v1.0.1/go.mod h1:loMXtMfwqflxFJPmdbJO0a3KNoPuLBgiu3qAvBg8x/Y= +github.com/go-ole/go-ole v1.2.4/go.mod h1:XCwSNxSkXRo4vlyPy93sltvi/qJq0jqQhjqQNIwKuxM= +github.com/klauspost/compress v1.10.10 h1:a/y8CglcM7gLGYmlbP/stPE5sR3hbhFRUjCBfd/0B3I= +github.com/klauspost/compress v1.10.10/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= +github.com/klauspost/pgzip v1.2.5 h1:qnWYvvKqedOF2ulHpMG72XQol4ILEJ8k2wwRl/Km8oE= +github.com/klauspost/pgzip v1.2.5/go.mod h1:Ch1tH69qFZu15pkjo5kYi6mth2Zzwzt50oCQKQE9RUs= +github.com/miekg/dns v1.1.31 h1:sJFOl9BgwbYAWOGEwr61FU28pqsBNdpRBnhGXtO06Oo= +github.com/miekg/dns v1.1.31/go.mod h1:KNUDUusw/aVsxyTYZM1oqvCicbwhgbNgztCETuNZ7xM= +github.com/qydysky/part v0.3.1 h1:pyRSZQCWPsMSmn3XcEGFdwfhTdMczDomHi2+pfoiBdU= +github.com/qydysky/part v0.3.1/go.mod h1:93s9ohLtzULet5ZPEUUWrT9BELC30oDZgRpgGSiDye4= +github.com/shirou/gopsutil v2.20.7+incompatible h1:Ymv4OD12d6zm+2yONe39VSmp2XooJe8za7ngOLW/o/w= +github.com/shirou/gopsutil v2.20.7+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA= +github.com/thedevsaddam/gojsonq v2.3.0+incompatible h1:i2lFTvGY4LvoZ2VUzedsFlRiyaWcJm3Uh6cQ9+HyQA8= +github.com/thedevsaddam/gojsonq v2.3.0+incompatible/go.mod h1:RBcQaITThgJAAYKH7FNp2onYodRz8URfsuEGpAch0NA= +github.com/thedevsaddam/gojsonq/v2 v2.5.2 h1:CoMVaYyKFsVj6TjU6APqAhAvC07hTI6IQen8PHzHYY0= +github.com/thedevsaddam/gojsonq/v2 v2.5.2/go.mod h1:bv6Xa7kWy82uT0LnXPE2SzGqTj33TAEeR560MdJkiXs= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/crypto v0.0.0-20200728195943-123391ffb6de h1:ikNHVSjEfnvz6sxdSPCaPt572qowuyMDMJLLm3Db3ig= +golang.org/x/crypto v0.0.0-20200728195943-123391ffb6de/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20190923162816-aa69164e4478/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200707034311-ab3426394381 h1:VXak5I6aEWmAXeQjA+QSZzlgNrpq9mjcfDemuexIKsU= +golang.org/x/net v0.0.0-20200707034311-ab3426394381/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190924154521-2837fb4f24fe/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200810151505-1b9f1253b3ed h1:WBkVNH1zd9jg/dK4HCM4lNANnmd12EHC9z+LmcCG4ns= +golang.org/x/sys v0.0.0-20200810151505-1b9f1253b3ed/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/tools v0.0.0-20191216052735-49a3e744a425/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28= +golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/Signal.go b/signal/Signal.go similarity index 75% rename from Signal.go rename to signal/Signal.go index ae3c6e6..60ae4d8 100644 --- a/Signal.go +++ b/signal/Signal.go @@ -4,10 +4,8 @@ type Signal struct{ Chan chan struct{} } -func (i *Signal) Init() (o *Signal) { - o = i - if !i.Islive() {o.Chan = make(chan struct{})} - return +func Init() (*Signal) { + return &Signal{Chan:make(chan struct{})} } func (i *Signal) Wait() { @@ -19,6 +17,7 @@ func (i *Signal) Done() { } func (i *Signal) Islive() (islive bool) { + if i == nil {return} select { case <-i.Chan:; default: diff --git a/signal/Signal_test.go b/signal/Signal_test.go new file mode 100644 index 0000000..c0ccfb1 --- /dev/null +++ b/signal/Signal_test.go @@ -0,0 +1,16 @@ +package part + +import ( + "testing" +) + +func Test_signal(t *testing.T) { + var s *Signal + t.Log(s.Islive()) + s.Done() + t.Log(s.Islive()) + s = Init() + t.Log(s.Islive()) + s.Done() + t.Log(s.Islive()) +}