]> 127.0.0.1 Git - bili_danmu/.git/commitdiff
Improve 减少data race
authorqydysky <32743305+qydysky@users.noreply.github.com>
Sun, 12 Mar 2023 16:11:49 +0000 (00:11 +0800)
committerqydysky <32743305+qydysky@users.noreply.github.com>
Sun, 12 Mar 2023 16:11:49 +0000 (00:11 +0800)
CV/Var.go
F/api.go
Reply/F.go
Reply/stream.go
Reply/tts.go
Send/Send.go
Send/Send_gift.go
Send/Send_pm.go
bili_danmu.go
go.mod
go.sum

index 405becc003473cbea112343c6d6b300b97d07bbd..01286a35a0fd63a9f106225401745bbec025d1a7 100644 (file)
--- a/CV/Var.go
+++ b/CV/Var.go
@@ -24,6 +24,12 @@ import (
        web "github.com/qydysky/part/web"
 )
 
+type StreamType struct {
+       Protocol_name string
+       Format_name   string
+       Codec_name    string
+}
+
 type Common struct {
        PID               int                   //进程id
        Uid               int                   //client uid
@@ -96,6 +102,55 @@ func (t *LiveQn) Disable(reUpTime time.Time) {
        t.ReUpTime = reUpTime
 }
 
+func (t *Common) IsOn(key string) bool {
+       v, ok := t.K_v.LoadV(key).(bool)
+       return ok && v
+}
+
+func (t *Common) Copy() *Common {
+       var c = Common{
+               PID:               t.PID,
+               Uid:               t.Uid,
+               Live:              t.Live,
+               Live_qn:           t.Live_qn,
+               Live_want_qn:      t.Live_want_qn,
+               Roomid:            t.Roomid,
+               Cookie:            t.Cookie.Copy(),
+               Title:             t.Title,
+               Uname:             t.Uname,
+               UpUid:             t.UpUid,
+               Rev:               t.Rev,
+               Renqi:             t.Renqi,
+               Watched:           t.Watched,
+               OnlineNum:         t.OnlineNum,
+               GuardNum:          t.GuardNum,
+               ParentAreaID:      t.ParentAreaID,
+               AreaID:            t.AreaID,
+               Locked:            t.Locked,
+               Note:              t.Note,
+               Live_Start_Time:   t.Live_Start_Time,
+               Liveing:           t.Liveing,
+               Wearing_FansMedal: t.Wearing_FansMedal,
+               Token:             t.Token,
+               WSURL:             t.WSURL,
+               LIVE_BUVID:        t.LIVE_BUVID,
+               Stream_url:        t.Stream_url,
+               Proxy:             t.Proxy,
+               AcceptQn:          syncmap.Copy(t.AcceptQn),
+               Qn:                syncmap.Copy(t.Qn),
+               StreamType:        t.StreamType,
+               AllStreamType:     syncmap.Copy(t.AllStreamType),
+               K_v:               t.K_v.Copy(),
+               Log:               t.Log,
+               Danmu_Main_mq:     t.Danmu_Main_mq,
+               ReqPool:           t.ReqPool,
+               SerF:              t.SerF,
+               StartT:            t.StartT,
+       }
+
+       return &c
+}
+
 // 自动停用机制
 func (t *Common) DisableLiveAuto(host string) {
        for i := 0; i < len(t.Live); i++ {
@@ -129,12 +184,6 @@ func (t *Common) ValidLive() *LiveQn {
        return nil
 }
 
-type StreamType struct {
-       Protocol_name string
-       Format_name   string
-       Codec_name    string
-}
-
 func (t *Common) Init() Common {
        t.PID = os.Getpid()
        t.StartT = time.Now()
index cf6fcb44d5e08af3a7098a36af943472912835a2..db79ce573967f51f3d33723bc52753d97a741c2a 100644 (file)
--- a/F/api.go
+++ b/F/api.go
@@ -8,6 +8,7 @@ import (
        "os"
        "strconv"
        "strings"
+       "sync"
        "time"
 
        "github.com/dustin/go-humanize"
@@ -28,14 +29,15 @@ import (
 )
 
 var apilog = c.C.Log.Base(`api`)
-var api_limit = limit.New(2, 1000, 30000) //频率限制2次/s,最大等待时间30s
+var api_limit = limit.New(2, "1s", "30s") //频率限制2次/s,最大等待时间30s
 
 type GetFunc struct {
        *c.Common
+       l sync.RWMutex
 }
 
 func Get(c *c.Common) *GetFunc {
-       return &GetFunc{c}
+       return &GetFunc{Common: c}
 }
 
 func (c *GetFunc) Get(key string) {
@@ -213,12 +215,21 @@ func (c *GetFunc) Get(key string) {
        if fList, ok := api_can_get[key]; ok {
                for _, fItem := range fList {
                        apilog.Log_show_control(false).L(`T: `, `Get`, key)
+
+                       c.l.Lock()
                        missKey := fItem()
+                       c.l.Unlock()
+
                        if len(missKey) > 0 {
                                apilog.L(`T: `, `missKey when get`, key, missKey)
                                for _, misskeyitem := range missKey {
-                                       if checkf, ok := check[misskeyitem]; ok && checkf() {
-                                               continue
+                                       if checkf, ok := check[misskeyitem]; ok {
+                                               c.l.RLock()
+                                               if checkf() {
+                                                       c.l.RUnlock()
+                                                       continue
+                                               }
+                                               c.l.RUnlock()
                                        }
                                        if misskeyitem == key {
                                                apilog.L(`W: `, `missKey equrt key`, key, missKey)
@@ -226,14 +237,23 @@ func (c *GetFunc) Get(key string) {
                                        }
                                        c.Get(misskeyitem)
                                }
+
+                               c.l.Lock()
                                missKey := fItem()
+                               c.l.Unlock()
+
                                if len(missKey) > 0 {
                                        apilog.L(`W: `, `missKey when get`, key, missKey)
                                        continue
                                }
                        }
-                       if checkf, ok := check[key]; ok && checkf() {
-                               break
+                       if checkf, ok := check[key]; ok {
+                               c.l.RLock()
+                               if checkf() {
+                                       c.l.RUnlock()
+                                       break
+                               }
+                               c.l.RUnlock()
                        }
                }
        }
index 6019618dc8c58e66b0af13e961b39858b8e1ffff..32a2bcc889a87ed422c5662421fbd888720a38cd 100644 (file)
@@ -45,8 +45,7 @@ var flog = c.C.Log.Base(`功能`)
 
 // 功能开关选取函数
 func IsOn(s string) bool {
-       v, ok := c.C.K_v.LoadV(s).(bool)
-       return ok && v
+       return c.C.IsOn(s)
 }
 
 // 字符重复度检查
@@ -135,8 +134,11 @@ func ShowRevf() {
        ShowRev_start = true
        for {
                c.C.Log.Base(`功能`).L(`I: `, fmt.Sprintf("营收 ¥%.2f", c.C.Rev))
-               for c.C.Rev == ShowRev_old {
-                       sys.Sys().Timeoutf(60)
+               for {
+                       if c.C.Rev != ShowRev_old {
+                               break
+                       }
+                       time.Sleep(time.Minute)
                }
                ShowRev_old = c.C.Rev
        }
@@ -293,15 +295,15 @@ func StreamOStart(roomid int) {
                flog.L(`W: `, `已录制 `+strconv.Itoa(roomid)+` 不能重复录制`)
                return
        }
-       var (
-               tmp    = new(M4SStream)
-               common = c.C
-       )
-       common.Roomid = roomid
-       if e := tmp.LoadConfig(common); e != nil {
+
+       var tmp = new(M4SStream)
+
+       if e := tmp.LoadConfig(*c.C.Copy()); e != nil {
                flog.L(`E: `, e)
                return
        }
+       tmp.common.Roomid = roomid
+
        //录制回调,关于ass
        tmp.Callback_startRec = func(ms *M4SStream) error {
                StartRecDanmu(ms.Current_save_path + "0.csv")
@@ -345,13 +347,13 @@ func StreamOStop(roomid int) {
                        return true
                })
        case -1: // 所有房间
-               streamO.Range(func(_, v interface{}) bool {
+               streamO.Range(func(k, v interface{}) bool {
                        if v.(*M4SStream).Status.Islive() {
                                v.(*M4SStream).Stop()
                        }
+                       streamO.Delete(k)
                        return true
                })
-               streamO = new(sync.Map)
        default: // 针对某房间
                if v, ok := streamO.Load(roomid); ok {
                        if v.(*M4SStream).Status.Islive() {
@@ -535,7 +537,7 @@ var danmuji = Danmuji{
        Buf: map[string]string{
                "弹幕机在么": "在",
        },
-       reflect_limit: limit.New(1, 4000, 8000),
+       reflect_limit: limit.New(1, "4s", "8s"),
 }
 
 func init() { //初始化反射型弹幕机
@@ -624,11 +626,12 @@ func init() {
        go func() {
                for {
                        <-autoskip.ticker.C
+                       autoskip.Lock()
                        if len(autoskip.buf) == 0 {
+                               autoskip.Unlock()
                                continue
                        }
                        autoskip.now += 1
-                       autoskip.Lock()
                        if autoskip.roomid != c.C.Roomid {
                                autoskip.buf = make(map[string]Autoskip_item)
                                autoskip.roomid = c.C.Roomid
@@ -721,7 +724,7 @@ func init() {
        if max_num, ok := c.C.K_v.LoadV(`每秒显示弹幕数`).(float64); ok && int(max_num) >= 1 {
                flog.Base_add(`更少弹幕`).L(`T: `, `每秒弹幕数:`, int(max_num))
                lessdanmu.max_num = int(max_num)
-               lessdanmu.limit = limit.New(int(max_num), 1000, 0) //timeout right now
+               lessdanmu.limit = limit.New(int(max_num), "1s", "0s") //timeout right now
        }
 }
 
@@ -813,6 +816,7 @@ func Lessdanmuf(s string) (show bool) {
 
 type Shortdanmu struct {
        lastdanmu []rune
+       l         sync.Mutex
 }
 
 var shortdanmu = Shortdanmu{}
@@ -821,6 +825,10 @@ func Shortdanmuf(s string) string {
        if !IsOn("精简弹幕") {
                return s
        }
+
+       shortdanmu.l.Lock()
+       defer shortdanmu.l.Unlock()
+
        if len(shortdanmu.lastdanmu) == 0 {
                shortdanmu.lastdanmu = []rune(s)
                return s
index 077116de3f4d73b0be251056037ba2a7278a4486..860e227d34b99391b35df4695ed39a60c4b289f4 100644 (file)
@@ -13,6 +13,7 @@ import (
        "path/filepath"
        "strconv"
        "strings"
+       "sync/atomic"
        "time"
 
        c "github.com/qydysky/bili_danmu/CV"
@@ -631,29 +632,40 @@ func (t *M4SStream) saveStreamFlv() (e error) {
                r := t.reqPool.Get()
                {
                        go func() {
+                               tsc, tscf := t.Status.WaitC()
+                               defer tscf()
+                               sc, scf := s.WaitC()
+                               defer scf()
+
                                select {
                                //停止录制
-                               case <-t.Status.WaitC():
+                               case <-tsc:
                                        r.Cancel()
                                //当前连接终止
-                               case <-s.WaitC():
+                               case <-sc:
                                }
                        }()
 
                        out := file.New(t.Current_save_path+`0.flv`, -1, true).File()
 
                        rc, rw := io.Pipe()
-                       var leastReadUnix = time.Now().Unix()
+                       var leastReadUnix atomic.Int64
+                       leastReadUnix.Store(time.Now().Unix())
+
                        // read timeout
                        go func() {
                                timer := time.NewTicker(5 * time.Second)
                                defer timer.Stop()
+
+                               sc, scf := s.WaitC()
+                               defer scf()
+
                                for {
                                        select {
-                                       case <-s.WaitC():
+                                       case <-sc:
                                                return
                                        case curT := <-timer.C:
-                                               if curT.Unix()-leastReadUnix > 5 {
+                                               if curT.Unix()-leastReadUnix.Load() > 5 {
                                                        t.log.L(`W: `, "5s未接收到任何数据")
                                                        // 5s未接收到任何数据
                                                        r.Cancel()
@@ -689,7 +701,7 @@ func (t *M4SStream) saveStreamFlv() (e error) {
                                                t.Stream_msg.Push_tag(`close`, nil)
                                                break
                                        }
-                                       leastReadUnix = time.Now().Unix()
+                                       leastReadUnix.Store(time.Now().Unix())
 
                                        skip := true
                                        select {
index aeca87cd082422b53602b5dfc9bb79fab84ab92a..670fa3eea1a8f182e3d1b688cae1f7aefffe9d69 100644 (file)
@@ -38,7 +38,7 @@ var (
 )
 var tts_List = make(chan string, 20)
 
-var tts_limit = limit.New(1, 5000, 15000) //频率限制1次/5s,最大等待时间15s
+var tts_limit = limit.New(1, "5s", "15s") //频率限制1次/5s,最大等待时间15s
 
 var tts_log = c.C.Log.Base_add(`TTS`)
 
index 5b7ddd2c0373b52be129c09a1873208c63a118ff..e0354ceeed590ae69466ae2e7fcd2dc593fd51e7 100644 (file)
@@ -15,7 +15,7 @@ import (
 )
 
 // 每5s一个令牌,最多等20秒
-var danmu_s_limit = limit.New(1, 5000, 20000)
+var danmu_s_limit = limit.New(1, "5s", "20s")
 var damnu_official = make(map[string]string)
 
 // 初始化表情代码
index dcca635c04a0a0cbc6f02c058418a373c7990adb..ec9d16a83426c9c1812008fe1334ab14c60c0a42 100644 (file)
@@ -13,7 +13,7 @@ import (
 )
 
 // 每2s一个令牌,最多等10秒
-var gift_limit = limit.New(1, 2000, 10000)
+var gift_limit = limit.New(1, "2s", "10s")
 
 func Send_gift(gift_id, bag_id, gift_num int) {
        log := c.C.Log.Base_add(`发送礼物`)
index aaee17ed6f729cd86f15192cc7a20e70c1c8fd40..ba2ad0a0fe55d05ce98195df0301abeb147c6320 100644 (file)
@@ -22,7 +22,7 @@ type Pm_item struct {
 }
 
 // 每5s一个令牌,最多等10秒
-var pm_limit = limit.New(1, 5000, 10000)
+var pm_limit = limit.New(1, "5s", "10s")
 
 func Send_pm(uid int, msg string) error {
        if msg == `` || uid == 0 {
index 6edf11316e4d0195e1979f0997a0d724edf8ecbc..73866dc84482d596c46a0025a30e7a539fe3b6a5 100644 (file)
@@ -358,7 +358,7 @@ func Start() {
                { //附加功能 直播流停止
                        reply.StreamOStop(-1)
                }
-               close(interrupt)
+               // close(interrupt)
                danmulog.L(`I: `, "结束退出")
        }
 }
diff --git a/go.mod b/go.mod
index c35e4f3d6379136cbccf90772df86136b7ec8e15..76cd245e31c008e56e9c9dea0c3ff4a5842947ae 100644 (file)
--- a/go.mod
+++ b/go.mod
@@ -6,7 +6,7 @@ require (
        github.com/gofrs/uuid v4.3.0+incompatible
        github.com/gotk3/gotk3 v0.6.1
        github.com/mdp/qrterminal/v3 v3.0.0
-       github.com/qydysky/part v0.23.17
+       github.com/qydysky/part v0.24.3
        github.com/skip2/go-qrcode v0.0.0-20200617195104-da1b6568686e
        github.com/skratchdot/open-golang v0.0.0-20200116055534-eef842397966
        golang.org/x/text v0.8.0
@@ -33,3 +33,5 @@ require (
        gopkg.in/yaml.v3 v3.0.1 // indirect
        rsc.io/qr v0.2.0 // indirect
 )
+
+replace github.com/qydysky/part => ../part
diff --git a/go.sum b/go.sum
index 363957a091f0210e36399777efbe8ca3cabfc8d7..c8a81f62895ddea0769c017691f36c20b681a23e 100644 (file)
--- a/go.sum
+++ b/go.sum
@@ -83,6 +83,10 @@ github.com/qydysky/part v0.23.16 h1:T7fjB1boptmQCVgmgJ6SkRYJBeEqJK9ohlcxrSlcMZg=
 github.com/qydysky/part v0.23.16/go.mod h1:AQJH+BYeN30eKXjkDqGEtw0vx3wVGplBeOMLSyleEDo=
 github.com/qydysky/part v0.23.17 h1:VCo1CG2982poLaHxZ1EM9+AfqBphDPmESCpp7JRGk3Q=
 github.com/qydysky/part v0.23.17/go.mod h1:AQJH+BYeN30eKXjkDqGEtw0vx3wVGplBeOMLSyleEDo=
+github.com/qydysky/part v0.24.0 h1:CNikH+D3g01e6RS+jowSdgYt4hyC7ZrRHHFACuQtkrQ=
+github.com/qydysky/part v0.24.0/go.mod h1:AQJH+BYeN30eKXjkDqGEtw0vx3wVGplBeOMLSyleEDo=
+github.com/qydysky/part v0.24.1 h1:St+338RkG5AGlBpcSq29gK5o4tukFOOX8F5x/TMtQG0=
+github.com/qydysky/part v0.24.1/go.mod h1:AQJH+BYeN30eKXjkDqGEtw0vx3wVGplBeOMLSyleEDo=
 github.com/shirou/gopsutil v3.21.11+incompatible h1:+1+c1VGhc88SSonWP6foOcLhvnKlUeu/erjjvaPEYiI=
 github.com/shirou/gopsutil v3.21.11+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA=
 github.com/skip2/go-qrcode v0.0.0-20200617195104-da1b6568686e h1:MRM5ITcdelLK2j1vwZ3Je0FKVCfqOLp5zO6trqMLYs0=