From: qydysky Date: Tue, 16 May 2023 17:45:07 +0000 (+0800) Subject: add X-Git-Tag: v0.27.3 X-Git-Url: http://127.0.0.1:8081/?a=commitdiff_plain;h=6365faf258f790417eddd3b88c46a7da6194a686;p=part%2F.git add --- diff --git a/log/Log.go b/log/Log.go index 7ab8ed2..b943f3e 100644 --- a/log/Log.go +++ b/log/Log.go @@ -4,6 +4,7 @@ import ( "io" "log" "os" + "time" f "github.com/qydysky/part/file" m "github.com/qydysky/part/msgq" @@ -19,6 +20,7 @@ type Log_interface struct { } type Config struct { + To time.Duration File string Stdout bool @@ -42,7 +44,12 @@ func New(c Config) (o *Log_interface) { f.New(c.File, 0, true).Create() } - o.MQ = m.NewType[Msg_item]() + if o.To != 0 { + o.MQ = m.NewTypeTo[Msg_item](o.To) + } else { + o.MQ = m.NewType[Msg_item]() + } + o.MQ.Pull_tag_only(`L`, func(msg Msg_item) bool { var showObj = []io.Writer{} if msg.Stdout { @@ -72,8 +79,6 @@ func Copy(i *Log_interface) (o *Log_interface) { Config: (*i).Config, MQ: (*i).MQ, } - //启动阻塞 - o.MQ.PushLock_tag(`block`, Msg_item{}) return } diff --git a/msgq/Msgq.go b/msgq/Msgq.go index d254edd..2548bb5 100644 --- a/msgq/Msgq.go +++ b/msgq/Msgq.go @@ -29,15 +29,11 @@ func New() *Msgq { return m } -func NewTo(to time.Duration) *Msgq { +func NewTo(to ...time.Duration) *Msgq { fmt.Println("Warn: NewTo is slow, consider New") m := new(Msgq) m.funcs = list.New() - if to != 0 { - m.to = append(m.to, to) - } else { - m.to = append(m.to, time.Second*30) - } + m.to = to return m } @@ -284,15 +280,11 @@ func NewType[T any]() *MsgType[T] { return m } -func NewTypeTo[T any](to time.Duration) *MsgType[T] { +func NewTypeTo[T any](to ...time.Duration) *MsgType[T] { fmt.Println("Warn: NewTypeTo[T any] is slow, consider NewType[T any]") m := new(MsgType[T]) m.funcs = list.New() - if to != 0 { - m.to = append(m.to, to) - } else { - m.to = append(m.to, time.Second*30) - } + m.to = to return m } diff --git a/sync/RWMutex.go b/sync/RWMutex.go index c6f8812..828fb2e 100644 --- a/sync/RWMutex.go +++ b/sync/RWMutex.go @@ -13,14 +13,15 @@ const ( ) type RWMutex struct { - rlc atomic.Int32 - cul atomic.Int32 - oll atomic.Int32 + rlc atomic.Int32 + cul atomic.Int32 + oll atomic.Int32 + wantRead atomic.Bool } // RLock() 必须在 lock期间操作的变量所定义的goroutime 中调用 func (m *RWMutex) RLock(to ...time.Duration) (lockf func() (unlockf func())) { - lockid := m.cul.Add(1) + m.wantRead.Store(true) return func() (unlockf func()) { var callC atomic.Bool if len(to) > 0 { @@ -39,9 +40,6 @@ func (m *RWMutex) RLock(to ...time.Duration) (lockf func() (unlockf func())) { } runtime.Gosched() } - if lockid > m.oll.Load() { - m.oll.Store(lockid) - } c = time.Now() go func() { for !callC.Load() { @@ -57,19 +55,17 @@ func (m *RWMutex) RLock(to ...time.Duration) (lockf func() (unlockf func())) { }() } else { for m.rlc.Load() < ulock { - time.Sleep(time.Millisecond) runtime.Gosched() } - if lockid > m.oll.Load() { - m.oll.Store(lockid) - } } m.rlc.Add(1) return func() { if !callC.CompareAndSwap(false, true) { panic("had unrlock") } - m.rlc.Add(-1) + if m.rlc.Add(-1) == ulock { + m.wantRead.Store(false) + } } } } @@ -89,7 +85,7 @@ func (m *RWMutex) Lock(to ...time.Duration) (lockf func() (unlockf func())) { } } c := time.Now() - for m.rlc.Load() > ulock { + for m.rlc.Load() > ulock || m.wantRead.Load() { if time.Since(c) > to[0] { panic(fmt.Sprintf("timeout to wait rlock, rlc:%d", m.rlc.Load())) } @@ -97,7 +93,7 @@ func (m *RWMutex) Lock(to ...time.Duration) (lockf func() (unlockf func())) { } for lockid-1 != m.oll.Load() { if time.Since(c) > to[0] { - panic(fmt.Sprintf("timeout to wait lock, rlc:%d", m.rlc.Load())) + panic(fmt.Sprintf("timeout to wait lock, lockid:%d <> %d", lockid, m.oll.Load())) } runtime.Gosched() } @@ -118,8 +114,10 @@ func (m *RWMutex) Lock(to ...time.Duration) (lockf func() (unlockf func())) { } }() } else { - for m.rlc.Load() > ulock || lockid-1 != m.oll.Load() { - time.Sleep(time.Millisecond) + for m.rlc.Load() > ulock || m.wantRead.Load() { + runtime.Gosched() + } + for lockid-1 != m.oll.Load() { runtime.Gosched() } if !m.rlc.CompareAndSwap(ulock, lock) { @@ -131,7 +129,7 @@ func (m *RWMutex) Lock(to ...time.Duration) (lockf func() (unlockf func())) { panic("had unlock") } if !m.rlc.CompareAndSwap(lock, ulock) { - panic("") + panic(fmt.Sprintf("csa error, rlc:%d", m.rlc.Load())) } m.oll.Store(lockid) } diff --git a/sync/RWMutex_test.go b/sync/RWMutex_test.go index 843253a..c64775b 100644 --- a/sync/RWMutex_test.go +++ b/sync/RWMutex_test.go @@ -43,3 +43,10 @@ func TestMain(t *testing.T) { t.Fatal() } } + +func BenchmarkRlock(b *testing.B) { + var lock1 RWMutex + for i := 0; i < b.N; i++ { + lock1.Lock(time.Second)()() + } +}