From 5782170ae1fea8dc4f75afd6e986a28bb4261617 Mon Sep 17 00:00:00 2001 From: qydysky Date: Mon, 27 Jan 2025 14:17:01 +0800 Subject: [PATCH] =?utf8?q?Improve=20=E9=81=BF=E5=85=8D=E5=AE=A2=E6=88=B7?= =?utf8?q?=E7=AB=AF=E5=AE=9E=E6=97=B6=E5=9B=9E=E6=94=BE=E5=AF=BC=E8=87=B4?= =?utf8?q?=E5=8D=A1=E9=A1=BF=20(#154)?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit * Improve 避免客户端实时回放导致卡顿 * Improve 避免客户端实时回放导致卡顿 * Improve 避免客户端实时回放导致卡顿 * Add 直播流保存写入超时 --- Reply/F.go | 2 +- Reply/stream.go | 32 +++++++++++++++++++++++++++----- demo/config/config_K_v.json | 2 ++ go.mod | 2 +- go.sum | 4 ++-- 5 files changed, 33 insertions(+), 9 deletions(-) diff --git a/Reply/F.go b/Reply/F.go index 608ca88..3e6341d 100644 --- a/Reply/F.go +++ b/Reply/F.go @@ -1283,7 +1283,7 @@ func init() { // } // } - if e := currentStreamO.PusherToHttp(conn, w, r, startFunc, stopFunc); e != nil { + if e := currentStreamO.PusherToHttp(flog, conn, w, r, startFunc, stopFunc); e != nil { flog.L(`W: `, e) } } diff --git a/Reply/stream.go b/Reply/stream.go index 10b713a..c54b4dc 100644 --- a/Reply/stream.go +++ b/Reply/stream.go @@ -40,6 +40,7 @@ import ( signal "github.com/qydysky/part/signal" slice "github.com/qydysky/part/slice" pstring "github.com/qydysky/part/strings" + pu "github.com/qydysky/part/util" pweb "github.com/qydysky/part/web" ) @@ -1556,28 +1557,42 @@ func (t *M4SStream) PusherToFile(contextC context.Context, filepath string, star return e } - contextC, done := pctx.WaitCtx(contextC) + ctx1, done := pctx.WaitCtx(contextC) defer done() + to := 2.0 + if tmp, ok := t.common.K_v.LoadV("直播流保存写入超时").(float64); ok && tmp > 2 { + to = tmp + } + _, _ = f.Write(t.getFirstBuf(), true) cancelRec := t.Stream_msg.Pull_tag(map[string]func([]byte) bool{ `data`: func(b []byte) bool { + defer pu.Callback(func(startT time.Time, args ...any) { + if dru := time.Since(startT).Seconds(); dru > to { + t.log.L("W: ", "磁盘写入超时", dru) + done() + } + })() + select { - case <-contextC.Done(): + case <-ctx1.Done(): return true default: } if len(b) == 0 { return true } - _, _ = f.Write(b, true) + if n, err := f.Write(b, true); err != nil || n == 0 { + done() + } return false }, `close`: func(_ []byte) bool { return true }, }) - <-contextC.Done() + <-ctx1.Done() cancelRec() if e := stopFunc(t); e != nil { @@ -1590,7 +1605,7 @@ func (t *M4SStream) PusherToFile(contextC context.Context, filepath string, star // 流服务推送方法 // // 在客户端存在某种代理时,将有可能无法监测到客户端关闭,这有可能导致goroutine泄漏 -func (t *M4SStream) PusherToHttp(conn net.Conn, w http.ResponseWriter, r *http.Request, startFunc func(*M4SStream) error, stopFunc func(*M4SStream) error) error { +func (t *M4SStream) PusherToHttp(plog *log.Log_interface, conn net.Conn, w http.ResponseWriter, r *http.Request, startFunc func(*M4SStream) error, stopFunc func(*M4SStream) error) error { switch t.stream_type { case `m3u8`: fallthrough @@ -1647,6 +1662,8 @@ func (t *M4SStream) PusherToHttp(conn net.Conn, w http.ResponseWriter, r *http.R return err } + w = pweb.WithCache(w) + var cancelRec = t.Stream_msg.Pull_tag(map[string]func([]byte) bool{ `data`: func(b []byte) bool { select { @@ -1657,8 +1674,13 @@ func (t *M4SStream) PusherToHttp(conn net.Conn, w http.ResponseWriter, r *http.R if len(b) == 0 { return true } + _ = conn.SetWriteDeadline(time.Now().Add(time.Second * 30)) if n, err := w.Write(b); err != nil || n == 0 { + if pweb.IsCacheBusy(err) { + plog.L(`I: `, r.RemoteAddr, "回放连接慢,缓存跳过") + return false + } return true } return false diff --git a/demo/config/config_K_v.json b/demo/config/config_K_v.json index c9770e0..15f1a61 100644 --- a/demo/config/config_K_v.json +++ b/demo/config/config_K_v.json @@ -95,6 +95,8 @@ "直播流保存位置": "./live", "直播流保存天数-help": "当t日有1录播时,会尝试删除t-n日及之前的1or2个最早的录播,目录下有.keep文件将忽略。小于1的数将禁用此功能", "直播流保存天数": 4, + "直播流保存写入超时-help": "默认2,单位秒,小于默认无效,写入时间过大会影响流下载。当写入超过指定时长时,保存中止", + "直播流保存写入超时": 2, "标题修改检测s-help": "默认900秒,少于默认无效。直播间标题引入审核机制,触发审核时会接收到一个roomchange但标题不变,将持续检测指定时长,如通过审核将修改录播标题", "标题修改检测s": 900, "直播流保存到文件": true, diff --git a/go.mod b/go.mod index 20c6ebb..f3fef5a 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.23 require ( github.com/gotk3/gotk3 v0.6.4 github.com/mdp/qrterminal/v3 v3.2.0 - github.com/qydysky/part v0.28.20250120190624 + github.com/qydysky/part v0.28.20250127053704 github.com/skip2/go-qrcode v0.0.0-20200617195104-da1b6568686e github.com/skratchdot/open-golang v0.0.0-20200116055534-eef842397966 golang.org/x/text v0.21.0 diff --git a/go.sum b/go.sum index 474c27e..46f848a 100644 --- a/go.sum +++ b/go.sum @@ -48,8 +48,8 @@ github.com/qydysky/biliApi v0.0.0-20240725184407-15076dddb6fb h1:dtSpNF9hLQa09TU github.com/qydysky/biliApi v0.0.0-20240725184407-15076dddb6fb/go.mod h1:om024vfxALQ5vxsbaGoMm8IS0esLYBnEOpJI8FsGoDg= github.com/qydysky/brotli v0.0.0-20240828134800-e9913a6e7ed9 h1:k451T+bpsLr+Dq9Ujo+Qtx0iomRA1XXS5ttlEojvfuQ= github.com/qydysky/brotli v0.0.0-20240828134800-e9913a6e7ed9/go.mod h1:cI8/gy/wjy2Eb+p2IUj2ZuDnC8R5Vrx3O0VMPvMvphA= -github.com/qydysky/part v0.28.20250120190624 h1:KyxUMxO0k5/loU1TP9usKJpN8YDGPgrIC6Yc37OeGws= -github.com/qydysky/part v0.28.20250120190624/go.mod h1:RAb3G05OaqCSRWFJz9FnONB6iqF/Dk4R+Z5c/H7mWSg= +github.com/qydysky/part v0.28.20250127053704 h1:UkhwV4IkqkP/u3H3IlnwG9lywSvsIIMgXkmu+MW/hq4= +github.com/qydysky/part v0.28.20250127053704/go.mod h1:RAb3G05OaqCSRWFJz9FnONB6iqF/Dk4R+Z5c/H7mWSg= github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE= github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= github.com/shirou/gopsutil v3.21.11+incompatible h1:+1+c1VGhc88SSonWP6foOcLhvnKlUeu/erjjvaPEYiI= -- 2.39.2