From 77864e02a8c3982d27c85160fa192c4db391151b Mon Sep 17 00:00:00 2001 From: qydysky <32743305+qydysky@users.noreply.github.com> Date: Sun, 19 Mar 2023 00:53:49 +0800 Subject: [PATCH] =?utf8?q?Add=20=E6=8C=87=E5=AE=9A=E6=88=BF=E9=97=B4?= =?utf8?q?=E5=BD=95=E5=88=B6=E5=9B=9E=E8=B0=83?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit --- README.md | 23 +++++++ Reply/stream.go | 117 +++++++++++++++++++++++------------- demo/config/config_K_v.json | 13 ++++ 3 files changed, 111 insertions(+), 42 deletions(-) diff --git a/README.md b/README.md index ed7c631..85bea1e 100644 --- a/README.md +++ b/README.md @@ -64,6 +64,29 @@ ### 说明 本项目使用github action自动构建,构建过程详见[yml](https://github.com/qydysky/bili_danmu/blob/master/.github/workflows/go.yml) +#### 指定房间录制回调 +配置文件添加了如下配置 +```json +{ + "指定房间录制回调-help":"当指定roomid的房间结束录制后触发对应的命令,命令执行目录为录播目录,占位符({type}:视频类型),durationS:录制时长超过指定秒数才触发", + "指定房间录制回调":[ + { + "roomid":0, + "durationS":60, + "after":["cmd","/c","ffmpeg","-i","0.{type}","-y","-c","copy","1.{type}","1>1.log","2>&1"] + }, + { + "roomid":0, + "durationS":60, + "after":["ffmpeg","-i","0.{type}","-y","-c","copy","1.{type}"] + } + ] +} +``` +上述例子中演示了windows下使用[ffmpeg](https://ffmpeg.org/),这将使得保存的流文件`0.mp4 or 0.flv`转为正常的视频`1.mp4 or 1.flv`。 + +注意:命令运行是异步的,如同步执行多个命令,应使用脚本。 + #### 性能检查 当配置了`Web服务地址`及`性能路径`时,运行中的性能信息将可以通过http获取。 例如有如下配置: diff --git a/Reply/stream.go b/Reply/stream.go index 860e227..b462982 100644 --- a/Reply/stream.go +++ b/Reply/stream.go @@ -10,6 +10,7 @@ import ( "net/http" "net/url" "os" + "os/exec" "path/filepath" "strconv" "strings" @@ -44,15 +45,17 @@ type M4SStream struct { first_buf []byte //m4s起始块 or flv起始块 boot_buf []byte //快速启动缓冲 boot_buf_locker funcCtrl.BlockFunc - last_m4s *m4s_link_item //最后一个切片 - m4s_pool *pool.Buf[m4s_link_item] //切片pool - common c.Common //通用配置副本 - Current_save_path string //明确的直播流保存目录 - Callback_start func(*M4SStream) error //实例开始的回调 - Callback_startRec func(*M4SStream) error //录制开始的回调 - Callback_stopRec func(*M4SStream) //录制结束的回调 - Callback_stop func(*M4SStream) //实例结束的回调 - reqPool *pool.Buf[reqf.Req] //请求池 + last_m4s *m4s_link_item //最后一个切片 + m4s_pool *pool.Buf[m4s_link_item] //切片pool + common c.Common //通用配置副本 + Current_save_path string //明确的直播流保存目录 + Callback_start func(*M4SStream) error //实例开始的回调 + Callback_startRec func(*M4SStream) error //录制开始的回调 + Callback_stopRec func(*M4SStream) //录制结束的回调 + msg *msgq.MsgType[*M4SStream] // + Callback_stop func(*M4SStream) //实例结束的回调 + reqPool *pool.Buf[reqf.Req] //请求池 + duration time.Duration //录制时长 } type M4SStream_Config struct { @@ -585,9 +588,7 @@ func (t *M4SStream) saveStream() (e error) { return err } } - if t.Callback_stopRec != nil { - defer t.Callback_stopRec(t) - } + defer t.msg.Push_tag(`stoprec`, t) // 移除历史流 if err := t.removeStream(); err != nil { @@ -595,6 +596,7 @@ func (t *M4SStream) saveStream() (e error) { } // 获取流 + startT := time.Now() switch t.stream_type { case `m3u8`: fallthrough @@ -606,6 +608,7 @@ func (t *M4SStream) saveStream() (e error) { e = errors.New("undefind stream type") t.log.L(`E: `, e) } + t.duration = time.Since(startT) return } @@ -1131,10 +1134,6 @@ func (t *M4SStream) Start() bool { go func() { defer t.Status.Done() - if t.Callback_stop != nil { - defer t.Callback_stop(t) - } - t.log.L(`I: `, `初始化录制(`+strconv.Itoa(t.common.Roomid)+`)`) // 初始化请求池 @@ -1143,6 +1142,65 @@ func (t *M4SStream) Start() bool { // 初始化切片消息 t.Stream_msg = msgq.NewType[[]byte]() + t.msg = msgq.NewType[*M4SStream]() + t.msg.Pull_tag_only("fin", func(ms *M4SStream) (disable bool) { + return true + }) + + // 设置事件 + if t.Callback_stopRec != nil { + t.msg.Pull_tag_only("stoprec", func(ms *M4SStream) (disable bool) { + t.Callback_stopRec(ms) + return false + }) + } + if t.Callback_stop != nil { + t.msg.Pull_tag_only("stop", func(ms *M4SStream) (disable bool) { + t.Callback_stop(ms) + return false + }) + } + + defer t.msg.Push_tag(`stop`, t) + defer t.msg.Push_tag(`fin`, nil) + + //指定房间录制回调 + if v, ok := t.common.K_v.LoadV("指定房间录制回调").([]any); ok && len(v) > 0 { + for i := 0; i < len(v); i++ { + if vm, ok := v[i].(map[string]any); ok { + if roomid, ok := vm["roomid"].(float64); ok && int(roomid) == t.common.Roomid { + var ( + durationS, _ = vm["durationS"].(float64) + after, _ = vm["after"].([]any) + ) + + if len(after) > 2 { + t.msg.Pull_tag_async_only("stoprec", func(ms *M4SStream) (disable bool) { + if durationS >= 0 && ms.duration.Seconds() > durationS { + var cmds []string + for i := 0; i < len(after); i++ { + if cmd, ok := after[i].(string); ok && cmd != "" { + cmds = append(cmds, strings.ReplaceAll(cmd, "{type}", ms.stream_type)) + } + } + + l := t.log.Base_add(`指定房间录制回调`) + cmd := exec.Command(cmds[0], cmds[1:]...) + cmd.Dir = ms.Current_save_path + l.L(`I: `, "启动", cmd.Args) + if e := cmd.Run(); e != nil { + l.L(`E: `, e) + } + l.L(`I: `, "结束") + } + return false + }) + } + } + } + } + } + // 主循环 for t.Status.Islive() { // 是否在直播 @@ -1158,39 +1216,14 @@ func (t *M4SStream) Start() bool { continue } - // // 设置全部服务 - // for _, v := range t.common.Live { - // if url_struct, e := url.Parse(v.Url); e == nil { - // t.stream_hosts.Store(url_struct.Hostname(), v.) - // } - // } - // 保存流 err := t.saveStream() if err != nil { t.log.L(`E: `, "saveStream:", err) } - - // Deprecated: 默认总是获取到可用流 - // 直播流类型故障切换 - // if v, ok := t.common.K_v.LoadV(`直播流类型故障切换`).(bool); v && ok { - // if err != nil && err.Error() == "未能找到可用流服务器" { - // if v, ok := t.common.K_v.LoadV(`直播流类型`).(string); ok { - // switch v { - // case "fmp4": - // t.common.K_v.Store(`直播流类型`, `flv`) - // case "flv": - // t.common.K_v.Store(`直播流类型`, `hls`) - // default: - // t.log.L(`E: `, `未知的流类型:`+v) - // } - // } - // } - // } - } - t.log.L(`I: `, `结束录制(`+strconv.Itoa(t.common.Roomid)+`)`) + t.log.L(`I: `, `结束录制(`+strconv.Itoa(t.common.Roomid)+`) 时长(`+t.duration.String()+`)`) t.exitSign.Done() }() return true diff --git a/demo/config/config_K_v.json b/demo/config/config_K_v.json index 417ab27..4f6805e 100644 --- a/demo/config/config_K_v.json +++ b/demo/config/config_K_v.json @@ -71,6 +71,19 @@ "danmu":"" } ], + "指定房间录制回调-help":"当指定roomid的房间结束录制后触发对应的命令,命令执行目录为录播目录,占位符({type}:视频类型),durationS:录制时长超过指定秒数才触发", + "指定房间录制回调":[ + { + "roomid":0, + "durationS":10, + "after":["cmd","/c","ffmpeg","-i","0.{type}","-y","-c","copy","1.{type}","1>1.log","2>&1"] + }, + { + "roomid":0, + "durationS":10, + "after":["ffmpeg","-i","0.{type}","-y","-c","copy","1.{type}"] + } + ], "Web服务地址-help":"填写本程序各组件所用的服务地址 例0.0.0.0:10000 为空时不启动Web服务", "Web服务地址":"0.0.0.0:10000", "直播Web服务路径":"/web/", -- 2.39.2