]> 127.0.0.1 Git - bili_danmu/.git/commitdiff
Fix 协程泄露可能 #198 (#199)
authorqydysky <qydysky@foxmail.com>
Thu, 8 May 2025 14:15:44 +0000 (22:15 +0800)
committerGitHub <noreply@github.com>
Thu, 8 May 2025 14:15:44 +0000 (22:15 +0800)
* Fix 协程泄露可能 #198

* Fix 协程泄露可能 #198

* Fix data race

* Fix data race

Reply/tts.go
bili_danmu.go
go.mod
go.sum

index 41baff69d673dd6f6d5ce4b6f1c2209ddd3d5c03..21aff670f759b99af363029e323c199e879be74e 100644 (file)
@@ -373,51 +373,56 @@ func init() {
                        var buf []byte
                        wait, cancel := context.WithCancel(context.Background())
 
+                       var someErr = errors.New(`someErr`)
                        wsc.Pull_tag_only(`rec`, func(wm *ws.WsMsg) (disable bool) {
-                               if len(wm.Msg) == 0 {
-                                       cancel()
-                                       return true
-                               }
-
-                               var partS struct {
-                                       Code    int    `json:"code"`
-                                       Message string `json:"message"`
-                                       Sid     string `json:"sid"`
-                                       Data    struct {
-                                               Audio  string `json:"audio"`
-                                               Ced    string `json:"ced"`
-                                               Status int    `json:"status"`
-                                       } `json:"data"`
-                               }
-                               if e := json.Unmarshal(wm.Msg, &partS); e != nil {
-                                       tts_log.L(`E: `, "错误", e, wm.Msg)
-                                       xfwsClient.Close()
-                                       return
-                               } else {
-                                       if partS.Code != 0 {
-                                               tts_log.L(`W: `, fmt.Sprintf("code:%d msg:%s", partS.Code, partS.Message))
+                               return wm.Msg(func(b []byte) error {
+                                       if len(b) == 0 {
                                                cancel()
-                                               return true
+                                               return someErr
                                        }
-                                       if partS.Data.Audio != "" {
-                                               if part, e := base64.StdEncoding.DecodeString(partS.Data.Audio); e != nil {
-                                                       tts_log.L(`E: `, "错误", e)
+
+                                       var partS struct {
+                                               Code    int    `json:"code"`
+                                               Message string `json:"message"`
+                                               Sid     string `json:"sid"`
+                                               Data    struct {
+                                                       Audio  string `json:"audio"`
+                                                       Ced    string `json:"ced"`
+                                                       Status int    `json:"status"`
+                                               } `json:"data"`
+                                       }
+                                       if e := json.Unmarshal(b, &partS); e != nil {
+                                               tts_log.L(`E: `, "错误", e, b)
+                                               xfwsClient.Close()
+                                               return nil
+                                       } else {
+                                               if partS.Code != 0 {
+                                                       tts_log.L(`W: `, fmt.Sprintf("code:%d msg:%s", partS.Code, partS.Message))
                                                        cancel()
-                                                       return true
-                                               } else {
-                                                       buf = append(buf, part...)
+                                                       return someErr
+                                               }
+                                               if partS.Data.Audio != "" {
+                                                       if part, e := base64.StdEncoding.DecodeString(partS.Data.Audio); e != nil {
+                                                               tts_log.L(`E: `, "错误", e)
+                                                               cancel()
+                                                               return someErr
+                                                       } else {
+                                                               buf = append(buf, part...)
+                                                       }
+                                               }
+                                               if partS.Data.Status == 2 {
+                                                       cancel()
+                                                       return someErr
                                                }
                                        }
-                                       if partS.Data.Status == 2 {
-                                               cancel()
-                                               return true
-                                       }
-                               }
-                               return false
+                                       return nil
+                               }) != nil
                        })
 
                        wsc.Push_tag(`send`, &ws.WsMsg{
-                               Msg: b,
+                               Msg: func(f func([]byte) error) error {
+                                       return f(b)
+                               },
                        })
 
                        <-wait.Done()
index 883be3e4c950c354b580d69d5c6a4e8ffccc48a3..3e4a1cdbb63b147ad45fcb701ef3d7bd84f89758 100644 (file)
@@ -22,6 +22,7 @@ import (
        send "github.com/qydysky/bili_danmu/Send"
        Cmd "github.com/qydysky/bili_danmu/cmd"
        pctx "github.com/qydysky/part/ctx"
+       fc "github.com/qydysky/part/funcCtrl"
        part "github.com/qydysky/part/log"
        sys "github.com/qydysky/part/sys"
 
@@ -285,12 +286,26 @@ func entryRoom(rootCtx, mainCtx context.Context, danmulog *part.Log_interface, c
        F.Get(common).Get(`WSURL`)
        aliveT := time.Now().Add(3 * time.Hour)
        heartbeatmsg, heartinterval := F.Heartbeat()
-       for i, exitloop := 0, false; !exitloop && i < len(common.WSURL) && time.Now().Before(aliveT); {
+
+       var (
+               exitloop                        = false
+               i                               = 0
+               rangeSource fc.RangeSource[any] = func(yield func(any) bool) {
+                       for !exitloop && i < len(common.WSURL) && time.Now().Before(aliveT) {
+                               if !yield(nil) {
+                                       return
+                               }
+                       }
+               }
+       )
+
+       for ctx := range rangeSource.RangeCtx(mainCtx) {
                v := common.WSURL[i]
                //ws启动
                danmulog.L(`T: `, "连接 "+v)
                u, _ := url.Parse(v)
                ws_c, err := ws.New_client(&ws.Client{
+                       BufSize:           50,
                        Url:               v,
                        TO:                (heartinterval + 5) * 1000,
                        Proxy:             common.Proxy,
@@ -329,13 +344,18 @@ func entryRoom(rootCtx, mainCtx context.Context, danmulog *part.Log_interface, c
                // auth
                {
                        wsmsg.PushLock_tag(`send`, &ws.WsMsg{
-                               Msg: F.HelloGen(common.Roomid, common.Token),
+                               Msg: func(f func([]byte) error) error {
+                                       return f(F.HelloGen(common.Roomid, common.Token))
+                               },
                        })
-                       waitCheckAuth, cancel := context.WithTimeout(mainCtx, 5*time.Second)
+                       waitCheckAuth, cancel := context.WithTimeout(ctx, 5*time.Second)
                        doneAuth := wsmsg.Pull_tag_only(`rec`, func(wm *ws.WsMsg) (disable bool) {
-                               if F.HelloChe(wm.Msg) {
+                               _ = wm.Msg(func(b []byte) error {
+                                       if F.HelloChe(b) {
                                        cancel()
                                }
+                                       return nil
+                               })
                                return true
                        })
                        <-waitCheckAuth.Done()
@@ -364,7 +384,12 @@ func entryRoom(rootCtx, mainCtx context.Context, danmulog *part.Log_interface, c
                // 处理ws消息
                var cancelDeal = wsmsg.Pull_tag(map[string]func(*ws.WsMsg) (disable bool){
                        `rec`: func(wm *ws.WsMsg) (disable bool) {
-                               go reply.Reply(common, wm.Msg)
+                               go func() {
+                                       _ = wm.Msg(func(b []byte) error {
+                                               reply.Reply(common, b)
+                                               return nil
+                                       })
+                               }()
                                return false
                        },
                        `close`: func(_ *ws.WsMsg) (disable bool) {
@@ -377,7 +402,9 @@ func entryRoom(rootCtx, mainCtx context.Context, danmulog *part.Log_interface, c
                        danmulog.L(`T: `, "获取人气")
                        for !ws_c.Isclose() {
                                wsmsg.Push_tag(`send`, &ws.WsMsg{
-                                       Msg: heartbeatmsg,
+                                       Msg: func(f func([]byte) error) error {
+                                               return f(heartbeatmsg)
+                                       },
                                })
                                time.Sleep(time.Millisecond * time.Duration(heartinterval*1000))
                        }
@@ -394,13 +421,13 @@ func entryRoom(rootCtx, mainCtx context.Context, danmulog *part.Log_interface, c
                        `LIVE_BUVID`,
                }); len(missKey) == 0 && reply.IsOn("自动弹幕机") {
                        //附加功能 弹幕机 无cookie无法发送弹幕
-                       replyFunc.Danmuji.Danmuji_auto(mainCtx, c.C.K_v.LoadV(`自动弹幕机_内容`).([]any), c.C.K_v.LoadV(`自动弹幕机_发送间隔s`).(float64), reply.Msg_senddanmu)
+                       replyFunc.Danmuji.Danmuji_auto(ctx, c.C.K_v.LoadV(`自动弹幕机_内容`).([]any), c.C.K_v.LoadV(`自动弹幕机_发送间隔s`).(float64), reply.Msg_senddanmu)
                }
                { //附加功能 进房间发送弹幕 直播流保存 每日签到
                        F.RoomEntryAction(common.Roomid)
                        // go F.Dosign()
                        reply.Entry_danmu(common)
-                       if _, e := recStartEnd.RecStartCheck.Run(mainCtx, common); e == nil {
+                       if _, e := recStartEnd.RecStartCheck.Run(ctx, common); e == nil {
                                reply.StreamOStart(common.Roomid)
                        } else {
                                danmulog.Base("功能", "指定房间录制区间").L(`I: `, common.Roomid, e)
@@ -462,7 +489,7 @@ func entryRoom(rootCtx, mainCtx context.Context, danmulog *part.Log_interface, c
                        danmulog.L(`T: `, "启动完成", common.Uname, `(`, common.Roomid, `)`)
 
                        {
-                               cancel, c := wsmsg.Pull_tag_chan(`exit`, 1, mainCtx)
+                               cancel, c := wsmsg.Pull_tag_chan(`exit`, 1, ctx)
                                select {
                                case <-c:
                                case <-rootCtx.Done():
diff --git a/go.mod b/go.mod
index 6daf7fb1c091f09bf22976dd75ef1a499a8ff67a..8b0aa4d7780c3a0a8994d82f70e5dc516ea17d4e 100644 (file)
--- a/go.mod
+++ b/go.mod
@@ -5,7 +5,7 @@ go 1.24
 require (
        github.com/gotk3/gotk3 v0.6.4
        github.com/mdp/qrterminal/v3 v3.2.0
-       github.com/qydysky/part v0.28.20250424194925
+       github.com/qydysky/part v0.28.20250508122622
        github.com/skip2/go-qrcode v0.0.0-20200617195104-da1b6568686e
        github.com/skratchdot/open-golang v0.0.0-20200116055534-eef842397966
        golang.org/x/text v0.24.0 // indirect
@@ -13,7 +13,7 @@ require (
 
 require (
        github.com/google/uuid v1.6.0
-       github.com/qydysky/biliApi v0.0.0-20250406112014-bf8c070170f6
+       github.com/qydysky/biliApi v0.0.0-20250506170139-ecbeaf602c40
        github.com/qydysky/brotli v0.0.0-20240828134800-e9913a6e7ed9
        golang.org/x/exp v0.0.0-20250215185904-eff6e970281f
 )
diff --git a/go.sum b/go.sum
index 23f9249492f478bf62ee2bdc9dea825ad55eb6d2..919bef2afc0725a26fce238e352460af2e8b6201 100644 (file)
--- a/go.sum
+++ b/go.sum
@@ -42,12 +42,12 @@ github.com/ncruces/go-strftime v0.1.9 h1:bY0MQC28UADQmHmaF5dgpLmImcShSi2kHU9XLdh
 github.com/ncruces/go-strftime v0.1.9/go.mod h1:Fwc5htZGVVkseilnfgOVb9mKy6w1naJmn9CehxcKcls=
 github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
 github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
-github.com/qydysky/biliApi v0.0.0-20250406112014-bf8c070170f6 h1:eWklz9YhqcLnJeHxWSlJZmL2V5rRyyEVqLKgLY3ipQs=
-github.com/qydysky/biliApi v0.0.0-20250406112014-bf8c070170f6/go.mod h1:1FbgCj+aOwIvuRRuX/l5uTLb3JIwWyJSa0uEfwpYV/8=
+github.com/qydysky/biliApi v0.0.0-20250506170139-ecbeaf602c40 h1:QtROBClET9rmSkWgfsC+M9BAkPJQ9y1CcxAYuAkvDdM=
+github.com/qydysky/biliApi v0.0.0-20250506170139-ecbeaf602c40/go.mod h1:1FbgCj+aOwIvuRRuX/l5uTLb3JIwWyJSa0uEfwpYV/8=
 github.com/qydysky/brotli v0.0.0-20240828134800-e9913a6e7ed9 h1:k451T+bpsLr+Dq9Ujo+Qtx0iomRA1XXS5ttlEojvfuQ=
 github.com/qydysky/brotli v0.0.0-20240828134800-e9913a6e7ed9/go.mod h1:cI8/gy/wjy2Eb+p2IUj2ZuDnC8R5Vrx3O0VMPvMvphA=
-github.com/qydysky/part v0.28.20250424194925 h1:DzhiRrcSn3ptsE8GElF8hfVvjCB000fpk1/+1HKnEHA=
-github.com/qydysky/part v0.28.20250424194925/go.mod h1:wp71PQdKYcg9jn9yDDvqC4shS/kzejyvFqbfUxuHocY=
+github.com/qydysky/part v0.28.20250508122622 h1:D7+SQiUE8MANSTHTYNHH+MEgrhceX3hG9FFrpXvDSd8=
+github.com/qydysky/part v0.28.20250508122622/go.mod h1:wp71PQdKYcg9jn9yDDvqC4shS/kzejyvFqbfUxuHocY=
 github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE=
 github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo=
 github.com/shirou/gopsutil v3.21.11+incompatible h1:+1+c1VGhc88SSonWP6foOcLhvnKlUeu/erjjvaPEYiI=