From 0555ffb164edd296e6d9311bd44ef71b3fd042e5 Mon Sep 17 00:00:00 2001 From: qydysky Date: Thu, 8 May 2025 22:15:44 +0800 Subject: [PATCH] =?utf8?q?=20Fix=20=E5=8D=8F=E7=A8=8B=E6=B3=84=E9=9C=B2?= =?utf8?q?=E5=8F=AF=E8=83=BD=20#198=20=20(#199)?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit * Fix 协程泄露可能 #198 * Fix 协程泄露可能 #198 * Fix data race * Fix data race --- Reply/tts.go | 77 +++++++++++++++++++++++++++------------------------ bili_danmu.go | 45 ++++++++++++++++++++++++------ go.mod | 4 +-- go.sum | 8 +++--- 4 files changed, 83 insertions(+), 51 deletions(-) diff --git a/Reply/tts.go b/Reply/tts.go index 41baff6..21aff67 100644 --- a/Reply/tts.go +++ b/Reply/tts.go @@ -373,51 +373,56 @@ func init() { var buf []byte wait, cancel := context.WithCancel(context.Background()) + var someErr = errors.New(`someErr`) wsc.Pull_tag_only(`rec`, func(wm *ws.WsMsg) (disable bool) { - if len(wm.Msg) == 0 { - cancel() - return true - } - - var partS struct { - Code int `json:"code"` - Message string `json:"message"` - Sid string `json:"sid"` - Data struct { - Audio string `json:"audio"` - Ced string `json:"ced"` - Status int `json:"status"` - } `json:"data"` - } - if e := json.Unmarshal(wm.Msg, &partS); e != nil { - tts_log.L(`E: `, "错误", e, wm.Msg) - xfwsClient.Close() - return - } else { - if partS.Code != 0 { - tts_log.L(`W: `, fmt.Sprintf("code:%d msg:%s", partS.Code, partS.Message)) + return wm.Msg(func(b []byte) error { + if len(b) == 0 { cancel() - return true + return someErr } - if partS.Data.Audio != "" { - if part, e := base64.StdEncoding.DecodeString(partS.Data.Audio); e != nil { - tts_log.L(`E: `, "错误", e) + + var partS struct { + Code int `json:"code"` + Message string `json:"message"` + Sid string `json:"sid"` + Data struct { + Audio string `json:"audio"` + Ced string `json:"ced"` + Status int `json:"status"` + } `json:"data"` + } + if e := json.Unmarshal(b, &partS); e != nil { + tts_log.L(`E: `, "错误", e, b) + xfwsClient.Close() + return nil + } else { + if partS.Code != 0 { + tts_log.L(`W: `, fmt.Sprintf("code:%d msg:%s", partS.Code, partS.Message)) cancel() - return true - } else { - buf = append(buf, part...) + return someErr + } + if partS.Data.Audio != "" { + if part, e := base64.StdEncoding.DecodeString(partS.Data.Audio); e != nil { + tts_log.L(`E: `, "错误", e) + cancel() + return someErr + } else { + buf = append(buf, part...) + } + } + if partS.Data.Status == 2 { + cancel() + return someErr } } - if partS.Data.Status == 2 { - cancel() - return true - } - } - return false + return nil + }) != nil }) wsc.Push_tag(`send`, &ws.WsMsg{ - Msg: b, + Msg: func(f func([]byte) error) error { + return f(b) + }, }) <-wait.Done() diff --git a/bili_danmu.go b/bili_danmu.go index 883be3e..3e4a1cd 100644 --- a/bili_danmu.go +++ b/bili_danmu.go @@ -22,6 +22,7 @@ import ( send "github.com/qydysky/bili_danmu/Send" Cmd "github.com/qydysky/bili_danmu/cmd" pctx "github.com/qydysky/part/ctx" + fc "github.com/qydysky/part/funcCtrl" part "github.com/qydysky/part/log" sys "github.com/qydysky/part/sys" @@ -285,12 +286,26 @@ func entryRoom(rootCtx, mainCtx context.Context, danmulog *part.Log_interface, c F.Get(common).Get(`WSURL`) aliveT := time.Now().Add(3 * time.Hour) heartbeatmsg, heartinterval := F.Heartbeat() - for i, exitloop := 0, false; !exitloop && i < len(common.WSURL) && time.Now().Before(aliveT); { + + var ( + exitloop = false + i = 0 + rangeSource fc.RangeSource[any] = func(yield func(any) bool) { + for !exitloop && i < len(common.WSURL) && time.Now().Before(aliveT) { + if !yield(nil) { + return + } + } + } + ) + + for ctx := range rangeSource.RangeCtx(mainCtx) { v := common.WSURL[i] //ws启动 danmulog.L(`T: `, "连接 "+v) u, _ := url.Parse(v) ws_c, err := ws.New_client(&ws.Client{ + BufSize: 50, Url: v, TO: (heartinterval + 5) * 1000, Proxy: common.Proxy, @@ -329,13 +344,18 @@ func entryRoom(rootCtx, mainCtx context.Context, danmulog *part.Log_interface, c // auth { wsmsg.PushLock_tag(`send`, &ws.WsMsg{ - Msg: F.HelloGen(common.Roomid, common.Token), + Msg: func(f func([]byte) error) error { + return f(F.HelloGen(common.Roomid, common.Token)) + }, }) - waitCheckAuth, cancel := context.WithTimeout(mainCtx, 5*time.Second) + waitCheckAuth, cancel := context.WithTimeout(ctx, 5*time.Second) doneAuth := wsmsg.Pull_tag_only(`rec`, func(wm *ws.WsMsg) (disable bool) { - if F.HelloChe(wm.Msg) { + _ = wm.Msg(func(b []byte) error { + if F.HelloChe(b) { cancel() } + return nil + }) return true }) <-waitCheckAuth.Done() @@ -364,7 +384,12 @@ func entryRoom(rootCtx, mainCtx context.Context, danmulog *part.Log_interface, c // 处理ws消息 var cancelDeal = wsmsg.Pull_tag(map[string]func(*ws.WsMsg) (disable bool){ `rec`: func(wm *ws.WsMsg) (disable bool) { - go reply.Reply(common, wm.Msg) + go func() { + _ = wm.Msg(func(b []byte) error { + reply.Reply(common, b) + return nil + }) + }() return false }, `close`: func(_ *ws.WsMsg) (disable bool) { @@ -377,7 +402,9 @@ func entryRoom(rootCtx, mainCtx context.Context, danmulog *part.Log_interface, c danmulog.L(`T: `, "获取人气") for !ws_c.Isclose() { wsmsg.Push_tag(`send`, &ws.WsMsg{ - Msg: heartbeatmsg, + Msg: func(f func([]byte) error) error { + return f(heartbeatmsg) + }, }) time.Sleep(time.Millisecond * time.Duration(heartinterval*1000)) } @@ -394,13 +421,13 @@ func entryRoom(rootCtx, mainCtx context.Context, danmulog *part.Log_interface, c `LIVE_BUVID`, }); len(missKey) == 0 && reply.IsOn("自动弹幕机") { //附加功能 弹幕机 无cookie无法发送弹幕 - replyFunc.Danmuji.Danmuji_auto(mainCtx, c.C.K_v.LoadV(`自动弹幕机_内容`).([]any), c.C.K_v.LoadV(`自动弹幕机_发送间隔s`).(float64), reply.Msg_senddanmu) + replyFunc.Danmuji.Danmuji_auto(ctx, c.C.K_v.LoadV(`自动弹幕机_内容`).([]any), c.C.K_v.LoadV(`自动弹幕机_发送间隔s`).(float64), reply.Msg_senddanmu) } { //附加功能 进房间发送弹幕 直播流保存 每日签到 F.RoomEntryAction(common.Roomid) // go F.Dosign() reply.Entry_danmu(common) - if _, e := recStartEnd.RecStartCheck.Run(mainCtx, common); e == nil { + if _, e := recStartEnd.RecStartCheck.Run(ctx, common); e == nil { reply.StreamOStart(common.Roomid) } else { danmulog.Base("功能", "指定房间录制区间").L(`I: `, common.Roomid, e) @@ -462,7 +489,7 @@ func entryRoom(rootCtx, mainCtx context.Context, danmulog *part.Log_interface, c danmulog.L(`T: `, "启动完成", common.Uname, `(`, common.Roomid, `)`) { - cancel, c := wsmsg.Pull_tag_chan(`exit`, 1, mainCtx) + cancel, c := wsmsg.Pull_tag_chan(`exit`, 1, ctx) select { case <-c: case <-rootCtx.Done(): diff --git a/go.mod b/go.mod index 6daf7fb..8b0aa4d 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.24 require ( github.com/gotk3/gotk3 v0.6.4 github.com/mdp/qrterminal/v3 v3.2.0 - github.com/qydysky/part v0.28.20250424194925 + github.com/qydysky/part v0.28.20250508122622 github.com/skip2/go-qrcode v0.0.0-20200617195104-da1b6568686e github.com/skratchdot/open-golang v0.0.0-20200116055534-eef842397966 golang.org/x/text v0.24.0 // indirect @@ -13,7 +13,7 @@ require ( require ( github.com/google/uuid v1.6.0 - github.com/qydysky/biliApi v0.0.0-20250406112014-bf8c070170f6 + github.com/qydysky/biliApi v0.0.0-20250506170139-ecbeaf602c40 github.com/qydysky/brotli v0.0.0-20240828134800-e9913a6e7ed9 golang.org/x/exp v0.0.0-20250215185904-eff6e970281f ) diff --git a/go.sum b/go.sum index 23f9249..919bef2 100644 --- a/go.sum +++ b/go.sum @@ -42,12 +42,12 @@ github.com/ncruces/go-strftime v0.1.9 h1:bY0MQC28UADQmHmaF5dgpLmImcShSi2kHU9XLdh github.com/ncruces/go-strftime v0.1.9/go.mod h1:Fwc5htZGVVkseilnfgOVb9mKy6w1naJmn9CehxcKcls= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/qydysky/biliApi v0.0.0-20250406112014-bf8c070170f6 h1:eWklz9YhqcLnJeHxWSlJZmL2V5rRyyEVqLKgLY3ipQs= -github.com/qydysky/biliApi v0.0.0-20250406112014-bf8c070170f6/go.mod h1:1FbgCj+aOwIvuRRuX/l5uTLb3JIwWyJSa0uEfwpYV/8= +github.com/qydysky/biliApi v0.0.0-20250506170139-ecbeaf602c40 h1:QtROBClET9rmSkWgfsC+M9BAkPJQ9y1CcxAYuAkvDdM= +github.com/qydysky/biliApi v0.0.0-20250506170139-ecbeaf602c40/go.mod h1:1FbgCj+aOwIvuRRuX/l5uTLb3JIwWyJSa0uEfwpYV/8= github.com/qydysky/brotli v0.0.0-20240828134800-e9913a6e7ed9 h1:k451T+bpsLr+Dq9Ujo+Qtx0iomRA1XXS5ttlEojvfuQ= github.com/qydysky/brotli v0.0.0-20240828134800-e9913a6e7ed9/go.mod h1:cI8/gy/wjy2Eb+p2IUj2ZuDnC8R5Vrx3O0VMPvMvphA= -github.com/qydysky/part v0.28.20250424194925 h1:DzhiRrcSn3ptsE8GElF8hfVvjCB000fpk1/+1HKnEHA= -github.com/qydysky/part v0.28.20250424194925/go.mod h1:wp71PQdKYcg9jn9yDDvqC4shS/kzejyvFqbfUxuHocY= +github.com/qydysky/part v0.28.20250508122622 h1:D7+SQiUE8MANSTHTYNHH+MEgrhceX3hG9FFrpXvDSd8= +github.com/qydysky/part v0.28.20250508122622/go.mod h1:wp71PQdKYcg9jn9yDDvqC4shS/kzejyvFqbfUxuHocY= github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE= github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= github.com/shirou/gopsutil v3.21.11+incompatible h1:+1+c1VGhc88SSonWP6foOcLhvnKlUeu/erjjvaPEYiI= -- 2.39.2