package part
import (
- // "fmt"
"sync"
- "time"
- p "github.com/qydysky/part"
+ "container/list"
)
-const push_mute = 25
-
type msgq struct {
- data msgq_item
- wait_push p.Signal
- push_mute int
+ data_list *list.List
+ wait_push chan bool
+ max_data_mun int
+ sig uint64
sync.RWMutex
}
type msgq_item struct {
data interface{}
- sig time.Time
+ sig uint64
}
-func New(want_push_mute ...int) (*msgq) {
+func New(want_max_data_mun int) (*msgq) {
m := new(msgq)
- (*m).wait_push.Init()
- (*m).data.sig = time.Now()
- (*m).push_mute = push_mute
- //太小的禁言时间,将可能出现丢失push的情况
- if len(want_push_mute) != 0 && want_push_mute[0] >= 0{(*m).push_mute = want_push_mute[0]}
+ (*m).wait_push = make(chan bool,100)
+ (*m).data_list = list.New()
+ (*m).max_data_mun = want_max_data_mun
return m
}
func (m *msgq) Push(msg interface{}) {
m.Lock()
defer m.Unlock()
-
- m.wait_push.Done()
-
- m.data = msgq_item{
+ m.data_list.PushBack(msgq_item{
data:msg,
- sig:time.Now(),
- }
-
- if m.push_mute != 0 {p.Sys().MTimeoutf(m.push_mute)}
+ sig:m.get_sig(),
+ })
+ if m.data_list.Len() > m.max_data_mun {m.data_list.Remove(m.data_list.Front())}
+ for len(m.wait_push) == 0 {m.wait_push <- true}
+ <- m.wait_push
}
-func (m *msgq) Pull(sig ...time.Time) (interface{},time.Time) {
- if m.wait_push.Islive() || len(sig) != 0 && sig[0] == m.Sig() {
- m.wait_push.Init().Wait()
- }
- //若不提供sig用以识别,将可能出现重复取得的情况
-
+func (m *msgq) Pull(old_sig uint64) (data interface{},sig uint64) {
+ if old_sig == m.Sig() || m.data_list.Len() == 0 {<- m.wait_push}
m.RLock()
defer m.RUnlock()
- return m.data.data,m.Sig()
+ for el := m.data_list.Front();el != nil;el = el.Next() {
+ if old_sig < el.Value.(msgq_item).sig {
+ data = el.Value.(msgq_item).data
+ sig = el.Value.(msgq_item).sig
+ break
+ }
+ }
+ return
+}
+
+func (m *msgq) Sig() (sig uint64) {
+ if el := m.data_list.Back();el == nil {
+ sig = m.get_sig()
+ } else {
+ sig = el.Value.(msgq_item).sig
+ }
+ return
}
-func (m *msgq) Sig() (time.Time) {
- return m.data.sig
+func (m *msgq) get_sig() (sig uint64) {
+ m.sig += 1
+ return m.sig
}
\ No newline at end of file
package part
import (
+ "time"
+ "fmt"
"testing"
p "github.com/qydysky/part"
)
+type test_item struct {
+ data string
+}
+
func Test_msgq(t *testing.T) {
- mq := New(25)
+ mq := New(5)
mun := 1000000
mun_c := make(chan bool,mun)
mun_s := make(chan bool,mun)
if e!=0 {t.Error(e)}
}
+
+func Test_msgq2(t *testing.T) {
+ mq := New(5)
+
+ mun_c := make(chan bool,100)
+ go func(){
+ var (
+ sig = mq.Sig()
+ data interface{}
+ )
+ for {
+ data,sig = mq.Pull(sig)
+ if data.(test_item).data != `aa1` {t.Error(`1`)}
+ mun_c <- true
+ }
+ }()
+ go func(){
+ var (
+ sig = mq.Sig()
+ data interface{}
+ )
+ for {
+ data,sig = mq.Pull(sig)
+ if data.(test_item).data != `aa1` {t.Error(`2`)}
+ mun_c <- true
+ }
+ }()
+ var fin_turn = 0
+ t.Log(`start`)
+ time.Sleep(time.Second)
+ for fin_turn < 10000 {
+ mq.Push(test_item{
+ data:`aa1`,
+ })
+ <-mun_c
+ <-mun_c
+ fin_turn += 1
+ fmt.Print("\r",fin_turn)
+ }
+ t.Log(`fin`)
+}
--- /dev/null
+module github.com/qydysky/part/msgq
+
+go 1.15
+
+require github.com/qydysky/part v0.3.1
+
+replace github.com/qydysky/part => ../
--- /dev/null
+github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d/go.mod h1:3eOhrUMpNV+6aFIbp5/iudMxNCF27Vw2OZgy4xEx0Fg=
+github.com/andybalholm/brotli v1.0.1 h1:KqhlKozYbRtJvsPrrEeXcO+N2l6NYT5A2QAFmSULpEc=
+github.com/andybalholm/brotli v1.0.1/go.mod h1:loMXtMfwqflxFJPmdbJO0a3KNoPuLBgiu3qAvBg8x/Y=
+github.com/go-ole/go-ole v1.2.4/go.mod h1:XCwSNxSkXRo4vlyPy93sltvi/qJq0jqQhjqQNIwKuxM=
+github.com/klauspost/compress v1.10.10 h1:a/y8CglcM7gLGYmlbP/stPE5sR3hbhFRUjCBfd/0B3I=
+github.com/klauspost/compress v1.10.10/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
+github.com/klauspost/pgzip v1.2.5 h1:qnWYvvKqedOF2ulHpMG72XQol4ILEJ8k2wwRl/Km8oE=
+github.com/klauspost/pgzip v1.2.5/go.mod h1:Ch1tH69qFZu15pkjo5kYi6mth2Zzwzt50oCQKQE9RUs=
+github.com/miekg/dns v1.1.31 h1:sJFOl9BgwbYAWOGEwr61FU28pqsBNdpRBnhGXtO06Oo=
+github.com/miekg/dns v1.1.31/go.mod h1:KNUDUusw/aVsxyTYZM1oqvCicbwhgbNgztCETuNZ7xM=
+github.com/qydysky/part v0.3.1 h1:pyRSZQCWPsMSmn3XcEGFdwfhTdMczDomHi2+pfoiBdU=
+github.com/qydysky/part v0.3.1/go.mod h1:93s9ohLtzULet5ZPEUUWrT9BELC30oDZgRpgGSiDye4=
+github.com/shirou/gopsutil v2.20.7+incompatible h1:Ymv4OD12d6zm+2yONe39VSmp2XooJe8za7ngOLW/o/w=
+github.com/shirou/gopsutil v2.20.7+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA=
+github.com/thedevsaddam/gojsonq v2.3.0+incompatible h1:i2lFTvGY4LvoZ2VUzedsFlRiyaWcJm3Uh6cQ9+HyQA8=
+github.com/thedevsaddam/gojsonq v2.3.0+incompatible/go.mod h1:RBcQaITThgJAAYKH7FNp2onYodRz8URfsuEGpAch0NA=
+github.com/thedevsaddam/gojsonq/v2 v2.5.2 h1:CoMVaYyKFsVj6TjU6APqAhAvC07hTI6IQen8PHzHYY0=
+github.com/thedevsaddam/gojsonq/v2 v2.5.2/go.mod h1:bv6Xa7kWy82uT0LnXPE2SzGqTj33TAEeR560MdJkiXs=
+golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
+golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
+golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
+golang.org/x/crypto v0.0.0-20200728195943-123391ffb6de h1:ikNHVSjEfnvz6sxdSPCaPt572qowuyMDMJLLm3Db3ig=
+golang.org/x/crypto v0.0.0-20200728195943-123391ffb6de/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
+golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg=
+golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
+golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
+golang.org/x/net v0.0.0-20190923162816-aa69164e4478/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
+golang.org/x/net v0.0.0-20200707034311-ab3426394381 h1:VXak5I6aEWmAXeQjA+QSZzlgNrpq9mjcfDemuexIKsU=
+golang.org/x/net v0.0.0-20200707034311-ab3426394381/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
+golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20190924154521-2837fb4f24fe/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20200810151505-1b9f1253b3ed h1:WBkVNH1zd9jg/dK4HCM4lNANnmd12EHC9z+LmcCG4ns=
+golang.org/x/sys v0.0.0-20200810151505-1b9f1253b3ed/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
+golang.org/x/tools v0.0.0-20191216052735-49a3e744a425/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28=
+golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
Chan chan struct{}
}
-func (i *Signal) Init() (o *Signal) {
- o = i
- if !i.Islive() {o.Chan = make(chan struct{})}
- return
+func Init() (*Signal) {
+ return &Signal{Chan:make(chan struct{})}
}
func (i *Signal) Wait() {
}
func (i *Signal) Islive() (islive bool) {
+ if i == nil {return}
select {
case <-i.Chan:;
default:
--- /dev/null
+package part
+
+import (
+ "testing"
+)
+
+func Test_signal(t *testing.T) {
+ var s *Signal
+ t.Log(s.Islive())
+ s.Done()
+ t.Log(s.Islive())
+ s = Init()
+ t.Log(s.Islive())
+ s.Done()
+ t.Log(s.Islive())
+}