From 9dd1f9c4c91cbeeb4cf59a757aaa4ffaeb437d15 Mon Sep 17 00:00:00 2001 From: qydysky <32743305+qydysky@users.noreply.github.com> Date: Mon, 13 Mar 2023 00:11:49 +0800 Subject: [PATCH] =?utf8?q?Improve=20=E5=87=8F=E5=B0=91data=20race?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit --- CV/Var.go | 61 ++++++++++++++++++++++++++++++++++++++++++----- F/api.go | 32 ++++++++++++++++++++----- Reply/F.go | 38 +++++++++++++++++------------ Reply/stream.go | 24 ++++++++++++++----- Reply/tts.go | 2 +- Send/Send.go | 2 +- Send/Send_gift.go | 2 +- Send/Send_pm.go | 2 +- bili_danmu.go | 2 +- go.mod | 4 +++- go.sum | 4 ++++ 11 files changed, 134 insertions(+), 39 deletions(-) diff --git a/CV/Var.go b/CV/Var.go index 405becc..01286a3 100644 --- a/CV/Var.go +++ b/CV/Var.go @@ -24,6 +24,12 @@ import ( web "github.com/qydysky/part/web" ) +type StreamType struct { + Protocol_name string + Format_name string + Codec_name string +} + type Common struct { PID int //进程id Uid int //client uid @@ -96,6 +102,55 @@ func (t *LiveQn) Disable(reUpTime time.Time) { t.ReUpTime = reUpTime } +func (t *Common) IsOn(key string) bool { + v, ok := t.K_v.LoadV(key).(bool) + return ok && v +} + +func (t *Common) Copy() *Common { + var c = Common{ + PID: t.PID, + Uid: t.Uid, + Live: t.Live, + Live_qn: t.Live_qn, + Live_want_qn: t.Live_want_qn, + Roomid: t.Roomid, + Cookie: t.Cookie.Copy(), + Title: t.Title, + Uname: t.Uname, + UpUid: t.UpUid, + Rev: t.Rev, + Renqi: t.Renqi, + Watched: t.Watched, + OnlineNum: t.OnlineNum, + GuardNum: t.GuardNum, + ParentAreaID: t.ParentAreaID, + AreaID: t.AreaID, + Locked: t.Locked, + Note: t.Note, + Live_Start_Time: t.Live_Start_Time, + Liveing: t.Liveing, + Wearing_FansMedal: t.Wearing_FansMedal, + Token: t.Token, + WSURL: t.WSURL, + LIVE_BUVID: t.LIVE_BUVID, + Stream_url: t.Stream_url, + Proxy: t.Proxy, + AcceptQn: syncmap.Copy(t.AcceptQn), + Qn: syncmap.Copy(t.Qn), + StreamType: t.StreamType, + AllStreamType: syncmap.Copy(t.AllStreamType), + K_v: t.K_v.Copy(), + Log: t.Log, + Danmu_Main_mq: t.Danmu_Main_mq, + ReqPool: t.ReqPool, + SerF: t.SerF, + StartT: t.StartT, + } + + return &c +} + // 自动停用机制 func (t *Common) DisableLiveAuto(host string) { for i := 0; i < len(t.Live); i++ { @@ -129,12 +184,6 @@ func (t *Common) ValidLive() *LiveQn { return nil } -type StreamType struct { - Protocol_name string - Format_name string - Codec_name string -} - func (t *Common) Init() Common { t.PID = os.Getpid() t.StartT = time.Now() diff --git a/F/api.go b/F/api.go index cf6fcb4..db79ce5 100644 --- a/F/api.go +++ b/F/api.go @@ -8,6 +8,7 @@ import ( "os" "strconv" "strings" + "sync" "time" "github.com/dustin/go-humanize" @@ -28,14 +29,15 @@ import ( ) var apilog = c.C.Log.Base(`api`) -var api_limit = limit.New(2, 1000, 30000) //频率限制2次/s,最大等待时间30s +var api_limit = limit.New(2, "1s", "30s") //频率限制2次/s,最大等待时间30s type GetFunc struct { *c.Common + l sync.RWMutex } func Get(c *c.Common) *GetFunc { - return &GetFunc{c} + return &GetFunc{Common: c} } func (c *GetFunc) Get(key string) { @@ -213,12 +215,21 @@ func (c *GetFunc) Get(key string) { if fList, ok := api_can_get[key]; ok { for _, fItem := range fList { apilog.Log_show_control(false).L(`T: `, `Get`, key) + + c.l.Lock() missKey := fItem() + c.l.Unlock() + if len(missKey) > 0 { apilog.L(`T: `, `missKey when get`, key, missKey) for _, misskeyitem := range missKey { - if checkf, ok := check[misskeyitem]; ok && checkf() { - continue + if checkf, ok := check[misskeyitem]; ok { + c.l.RLock() + if checkf() { + c.l.RUnlock() + continue + } + c.l.RUnlock() } if misskeyitem == key { apilog.L(`W: `, `missKey equrt key`, key, missKey) @@ -226,14 +237,23 @@ func (c *GetFunc) Get(key string) { } c.Get(misskeyitem) } + + c.l.Lock() missKey := fItem() + c.l.Unlock() + if len(missKey) > 0 { apilog.L(`W: `, `missKey when get`, key, missKey) continue } } - if checkf, ok := check[key]; ok && checkf() { - break + if checkf, ok := check[key]; ok { + c.l.RLock() + if checkf() { + c.l.RUnlock() + break + } + c.l.RUnlock() } } } diff --git a/Reply/F.go b/Reply/F.go index 6019618..32a2bcc 100644 --- a/Reply/F.go +++ b/Reply/F.go @@ -45,8 +45,7 @@ var flog = c.C.Log.Base(`功能`) // 功能开关选取函数 func IsOn(s string) bool { - v, ok := c.C.K_v.LoadV(s).(bool) - return ok && v + return c.C.IsOn(s) } // 字符重复度检查 @@ -135,8 +134,11 @@ func ShowRevf() { ShowRev_start = true for { c.C.Log.Base(`功能`).L(`I: `, fmt.Sprintf("营收 ¥%.2f", c.C.Rev)) - for c.C.Rev == ShowRev_old { - sys.Sys().Timeoutf(60) + for { + if c.C.Rev != ShowRev_old { + break + } + time.Sleep(time.Minute) } ShowRev_old = c.C.Rev } @@ -293,15 +295,15 @@ func StreamOStart(roomid int) { flog.L(`W: `, `已录制 `+strconv.Itoa(roomid)+` 不能重复录制`) return } - var ( - tmp = new(M4SStream) - common = c.C - ) - common.Roomid = roomid - if e := tmp.LoadConfig(common); e != nil { + + var tmp = new(M4SStream) + + if e := tmp.LoadConfig(*c.C.Copy()); e != nil { flog.L(`E: `, e) return } + tmp.common.Roomid = roomid + //录制回调,关于ass tmp.Callback_startRec = func(ms *M4SStream) error { StartRecDanmu(ms.Current_save_path + "0.csv") @@ -345,13 +347,13 @@ func StreamOStop(roomid int) { return true }) case -1: // 所有房间 - streamO.Range(func(_, v interface{}) bool { + streamO.Range(func(k, v interface{}) bool { if v.(*M4SStream).Status.Islive() { v.(*M4SStream).Stop() } + streamO.Delete(k) return true }) - streamO = new(sync.Map) default: // 针对某房间 if v, ok := streamO.Load(roomid); ok { if v.(*M4SStream).Status.Islive() { @@ -535,7 +537,7 @@ var danmuji = Danmuji{ Buf: map[string]string{ "弹幕机在么": "在", }, - reflect_limit: limit.New(1, 4000, 8000), + reflect_limit: limit.New(1, "4s", "8s"), } func init() { //初始化反射型弹幕机 @@ -624,11 +626,12 @@ func init() { go func() { for { <-autoskip.ticker.C + autoskip.Lock() if len(autoskip.buf) == 0 { + autoskip.Unlock() continue } autoskip.now += 1 - autoskip.Lock() if autoskip.roomid != c.C.Roomid { autoskip.buf = make(map[string]Autoskip_item) autoskip.roomid = c.C.Roomid @@ -721,7 +724,7 @@ func init() { if max_num, ok := c.C.K_v.LoadV(`每秒显示弹幕数`).(float64); ok && int(max_num) >= 1 { flog.Base_add(`更少弹幕`).L(`T: `, `每秒弹幕数:`, int(max_num)) lessdanmu.max_num = int(max_num) - lessdanmu.limit = limit.New(int(max_num), 1000, 0) //timeout right now + lessdanmu.limit = limit.New(int(max_num), "1s", "0s") //timeout right now } } @@ -813,6 +816,7 @@ func Lessdanmuf(s string) (show bool) { type Shortdanmu struct { lastdanmu []rune + l sync.Mutex } var shortdanmu = Shortdanmu{} @@ -821,6 +825,10 @@ func Shortdanmuf(s string) string { if !IsOn("精简弹幕") { return s } + + shortdanmu.l.Lock() + defer shortdanmu.l.Unlock() + if len(shortdanmu.lastdanmu) == 0 { shortdanmu.lastdanmu = []rune(s) return s diff --git a/Reply/stream.go b/Reply/stream.go index 077116d..860e227 100644 --- a/Reply/stream.go +++ b/Reply/stream.go @@ -13,6 +13,7 @@ import ( "path/filepath" "strconv" "strings" + "sync/atomic" "time" c "github.com/qydysky/bili_danmu/CV" @@ -631,29 +632,40 @@ func (t *M4SStream) saveStreamFlv() (e error) { r := t.reqPool.Get() { go func() { + tsc, tscf := t.Status.WaitC() + defer tscf() + sc, scf := s.WaitC() + defer scf() + select { //停止录制 - case <-t.Status.WaitC(): + case <-tsc: r.Cancel() //当前连接终止 - case <-s.WaitC(): + case <-sc: } }() out := file.New(t.Current_save_path+`0.flv`, -1, true).File() rc, rw := io.Pipe() - var leastReadUnix = time.Now().Unix() + var leastReadUnix atomic.Int64 + leastReadUnix.Store(time.Now().Unix()) + // read timeout go func() { timer := time.NewTicker(5 * time.Second) defer timer.Stop() + + sc, scf := s.WaitC() + defer scf() + for { select { - case <-s.WaitC(): + case <-sc: return case curT := <-timer.C: - if curT.Unix()-leastReadUnix > 5 { + if curT.Unix()-leastReadUnix.Load() > 5 { t.log.L(`W: `, "5s未接收到任何数据") // 5s未接收到任何数据 r.Cancel() @@ -689,7 +701,7 @@ func (t *M4SStream) saveStreamFlv() (e error) { t.Stream_msg.Push_tag(`close`, nil) break } - leastReadUnix = time.Now().Unix() + leastReadUnix.Store(time.Now().Unix()) skip := true select { diff --git a/Reply/tts.go b/Reply/tts.go index aeca87c..670fa3e 100644 --- a/Reply/tts.go +++ b/Reply/tts.go @@ -38,7 +38,7 @@ var ( ) var tts_List = make(chan string, 20) -var tts_limit = limit.New(1, 5000, 15000) //频率限制1次/5s,最大等待时间15s +var tts_limit = limit.New(1, "5s", "15s") //频率限制1次/5s,最大等待时间15s var tts_log = c.C.Log.Base_add(`TTS`) diff --git a/Send/Send.go b/Send/Send.go index 5b7ddd2..e0354ce 100644 --- a/Send/Send.go +++ b/Send/Send.go @@ -15,7 +15,7 @@ import ( ) // 每5s一个令牌,最多等20秒 -var danmu_s_limit = limit.New(1, 5000, 20000) +var danmu_s_limit = limit.New(1, "5s", "20s") var damnu_official = make(map[string]string) // 初始化表情代码 diff --git a/Send/Send_gift.go b/Send/Send_gift.go index dcca635..ec9d16a 100644 --- a/Send/Send_gift.go +++ b/Send/Send_gift.go @@ -13,7 +13,7 @@ import ( ) // 每2s一个令牌,最多等10秒 -var gift_limit = limit.New(1, 2000, 10000) +var gift_limit = limit.New(1, "2s", "10s") func Send_gift(gift_id, bag_id, gift_num int) { log := c.C.Log.Base_add(`发送礼物`) diff --git a/Send/Send_pm.go b/Send/Send_pm.go index aaee17e..ba2ad0a 100644 --- a/Send/Send_pm.go +++ b/Send/Send_pm.go @@ -22,7 +22,7 @@ type Pm_item struct { } // 每5s一个令牌,最多等10秒 -var pm_limit = limit.New(1, 5000, 10000) +var pm_limit = limit.New(1, "5s", "10s") func Send_pm(uid int, msg string) error { if msg == `` || uid == 0 { diff --git a/bili_danmu.go b/bili_danmu.go index 6edf113..73866dc 100644 --- a/bili_danmu.go +++ b/bili_danmu.go @@ -358,7 +358,7 @@ func Start() { { //附加功能 直播流停止 reply.StreamOStop(-1) } - close(interrupt) + // close(interrupt) danmulog.L(`I: `, "结束退出") } } diff --git a/go.mod b/go.mod index c35e4f3..76cd245 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ require ( github.com/gofrs/uuid v4.3.0+incompatible github.com/gotk3/gotk3 v0.6.1 github.com/mdp/qrterminal/v3 v3.0.0 - github.com/qydysky/part v0.23.17 + github.com/qydysky/part v0.24.3 github.com/skip2/go-qrcode v0.0.0-20200617195104-da1b6568686e github.com/skratchdot/open-golang v0.0.0-20200116055534-eef842397966 golang.org/x/text v0.8.0 @@ -33,3 +33,5 @@ require ( gopkg.in/yaml.v3 v3.0.1 // indirect rsc.io/qr v0.2.0 // indirect ) + +replace github.com/qydysky/part => ../part diff --git a/go.sum b/go.sum index 363957a..c8a81f6 100644 --- a/go.sum +++ b/go.sum @@ -83,6 +83,10 @@ github.com/qydysky/part v0.23.16 h1:T7fjB1boptmQCVgmgJ6SkRYJBeEqJK9ohlcxrSlcMZg= github.com/qydysky/part v0.23.16/go.mod h1:AQJH+BYeN30eKXjkDqGEtw0vx3wVGplBeOMLSyleEDo= github.com/qydysky/part v0.23.17 h1:VCo1CG2982poLaHxZ1EM9+AfqBphDPmESCpp7JRGk3Q= github.com/qydysky/part v0.23.17/go.mod h1:AQJH+BYeN30eKXjkDqGEtw0vx3wVGplBeOMLSyleEDo= +github.com/qydysky/part v0.24.0 h1:CNikH+D3g01e6RS+jowSdgYt4hyC7ZrRHHFACuQtkrQ= +github.com/qydysky/part v0.24.0/go.mod h1:AQJH+BYeN30eKXjkDqGEtw0vx3wVGplBeOMLSyleEDo= +github.com/qydysky/part v0.24.1 h1:St+338RkG5AGlBpcSq29gK5o4tukFOOX8F5x/TMtQG0= +github.com/qydysky/part v0.24.1/go.mod h1:AQJH+BYeN30eKXjkDqGEtw0vx3wVGplBeOMLSyleEDo= github.com/shirou/gopsutil v3.21.11+incompatible h1:+1+c1VGhc88SSonWP6foOcLhvnKlUeu/erjjvaPEYiI= github.com/shirou/gopsutil v3.21.11+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA= github.com/skip2/go-qrcode v0.0.0-20200617195104-da1b6568686e h1:MRM5ITcdelLK2j1vwZ3Je0FKVCfqOLp5zO6trqMLYs0= -- 2.39.2