"io"
"log"
"os"
+ "time"
f "github.com/qydysky/part/file"
m "github.com/qydysky/part/msgq"
}
type Config struct {
+ To time.Duration
File string
Stdout bool
f.New(c.File, 0, true).Create()
}
- o.MQ = m.NewType[Msg_item]()
+ if o.To != 0 {
+ o.MQ = m.NewTypeTo[Msg_item](o.To)
+ } else {
+ o.MQ = m.NewType[Msg_item]()
+ }
+
o.MQ.Pull_tag_only(`L`, func(msg Msg_item) bool {
var showObj = []io.Writer{}
if msg.Stdout {
Config: (*i).Config,
MQ: (*i).MQ,
}
- //启动阻塞
- o.MQ.PushLock_tag(`block`, Msg_item{})
return
}
return m
}
-func NewTo(to time.Duration) *Msgq {
+func NewTo(to ...time.Duration) *Msgq {
fmt.Println("Warn: NewTo is slow, consider New")
m := new(Msgq)
m.funcs = list.New()
- if to != 0 {
- m.to = append(m.to, to)
- } else {
- m.to = append(m.to, time.Second*30)
- }
+ m.to = to
return m
}
return m
}
-func NewTypeTo[T any](to time.Duration) *MsgType[T] {
+func NewTypeTo[T any](to ...time.Duration) *MsgType[T] {
fmt.Println("Warn: NewTypeTo[T any] is slow, consider NewType[T any]")
m := new(MsgType[T])
m.funcs = list.New()
- if to != 0 {
- m.to = append(m.to, to)
- } else {
- m.to = append(m.to, time.Second*30)
- }
+ m.to = to
return m
}
)
type RWMutex struct {
- rlc atomic.Int32
- cul atomic.Int32
- oll atomic.Int32
+ rlc atomic.Int32
+ cul atomic.Int32
+ oll atomic.Int32
+ wantRead atomic.Bool
}
// RLock() 必须在 lock期间操作的变量所定义的goroutime 中调用
func (m *RWMutex) RLock(to ...time.Duration) (lockf func() (unlockf func())) {
- lockid := m.cul.Add(1)
+ m.wantRead.Store(true)
return func() (unlockf func()) {
var callC atomic.Bool
if len(to) > 0 {
}
runtime.Gosched()
}
- if lockid > m.oll.Load() {
- m.oll.Store(lockid)
- }
c = time.Now()
go func() {
for !callC.Load() {
}()
} else {
for m.rlc.Load() < ulock {
- time.Sleep(time.Millisecond)
runtime.Gosched()
}
- if lockid > m.oll.Load() {
- m.oll.Store(lockid)
- }
}
m.rlc.Add(1)
return func() {
if !callC.CompareAndSwap(false, true) {
panic("had unrlock")
}
- m.rlc.Add(-1)
+ if m.rlc.Add(-1) == ulock {
+ m.wantRead.Store(false)
+ }
}
}
}
}
}
c := time.Now()
- for m.rlc.Load() > ulock {
+ for m.rlc.Load() > ulock || m.wantRead.Load() {
if time.Since(c) > to[0] {
panic(fmt.Sprintf("timeout to wait rlock, rlc:%d", m.rlc.Load()))
}
}
for lockid-1 != m.oll.Load() {
if time.Since(c) > to[0] {
- panic(fmt.Sprintf("timeout to wait lock, rlc:%d", m.rlc.Load()))
+ panic(fmt.Sprintf("timeout to wait lock, lockid:%d <> %d", lockid, m.oll.Load()))
}
runtime.Gosched()
}
}
}()
} else {
- for m.rlc.Load() > ulock || lockid-1 != m.oll.Load() {
- time.Sleep(time.Millisecond)
+ for m.rlc.Load() > ulock || m.wantRead.Load() {
+ runtime.Gosched()
+ }
+ for lockid-1 != m.oll.Load() {
runtime.Gosched()
}
if !m.rlc.CompareAndSwap(ulock, lock) {
panic("had unlock")
}
if !m.rlc.CompareAndSwap(lock, ulock) {
- panic("")
+ panic(fmt.Sprintf("csa error, rlc:%d", m.rlc.Load()))
}
m.oll.Store(lockid)
}