From b6b83a8b3a15bab2db19ec90fa7f10b02cec6151 Mon Sep 17 00:00:00 2001 From: qydysky Date: Wed, 1 Nov 2023 21:51:55 +0800 Subject: [PATCH] =?utf8?q?Add=20=E6=B7=BB=E5=8A=A0=E9=85=8D=E7=BD=AE=20?= =?utf8?q?=E7=9B=B4=E6=92=AD=E6=B5=81=E6=8E=A5=E6=94=B6n=E5=B8=A7=E6=89=8D?= =?utf8?q?=E4=BF=9D=E5=AD=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit --- Reply/stream.go | 45 ++++++++++++++++++++----------------- demo/config/config_K_v.json | 1 + 2 files changed, 26 insertions(+), 20 deletions(-) diff --git a/Reply/stream.go b/Reply/stream.go index 2e58aef..5ed47c3 100644 --- a/Reply/stream.go +++ b/Reply/stream.go @@ -48,13 +48,15 @@ type M4SStream struct { stream_type string //流类型 Stream_msg *msgq.MsgType[[]byte] //流数据消息 tag:data first_buf []byte //m4s起始块 or flv起始块 + frameCount uint //关键帧数量 boot_buf []byte //快速启动缓冲 boot_buf_locker funcCtrl.BlockFunc last_m4s *m4s_link_item //最后一个切片 m4s_pool *pool.Buf[m4s_link_item] //切片pool common *c.Common //通用配置副本 Current_save_path string //明确的直播流保存目录 - // 事件周期 start: 开始实例 startRec:开始录制 load:接收到视频头 firstFrame: 接收到第一个关键帧 cut:切 stopRec:结束录制 stop:结束实例 + // 事件周期 start: 开始实例 startRec:开始录制 load:接收到视频头 + // keyFrame: 接收到关键帧 cut:切 stopRec:结束录制 stop:结束实例 msg *msgq.MsgType[*M4SStream] //实例的各种事件回调 Callback_start func(*M4SStream) error //实例开始的回调 Callback_startRec func(*M4SStream) error //录制开始的回调 @@ -580,6 +582,7 @@ func (t *M4SStream) saveStream() (e error) { // 清除初始值 t.last_m4s = nil t.first_buf = nil + t.frameCount = 0 if s, ok := t.common.K_v.LoadV("直播Web服务路径").(string); ok && s != "" { t.log.L(`I: `, "Web服务地址:", t.common.Stream_url.String()+s) @@ -596,12 +599,20 @@ func (t *M4SStream) saveStream() (e error) { // 保存到文件 if t.config.save_to_file { - // 确保能接收到第一个帧才开始录制 - var cancelfirstFrame = t.msg.Pull_tag_only(`firstFrame`, func(ms *M4SStream) (disable bool) { - ms.msg.Push_tag(`cut`, ms) - return true + var startCount uint = 3 + if s, ok := t.common.K_v.LoadV("直播流接收n帧才保存").(float64); ok && s > 0 && uint(s) > startCount { + startCount = uint(s) + } + // 确保能接收到第n个帧才开始录制 + var cancelkeyFrame = t.msg.Pull_tag_only(`keyFrame`, func(ms *M4SStream) (disable bool) { + if startCount <= t.frameCount { + ms.msg.Push_tag(`cut`, ms) + return true + } + t.log.L(`T: `, fmt.Sprintf("%d帧后开始录制", startCount-t.frameCount)) + return false }) - defer cancelfirstFrame() + defer cancelkeyFrame() } // 获取流 @@ -740,11 +751,10 @@ func (t *M4SStream) saveStreamFlv() (e error) { // read go func() { var ( - ticker = time.NewTicker(time.Second) - buff = slice.New[byte]() - keyframe = slice.New[byte]() - buf = make([]byte, 1<<16) - frameCount = 0 + ticker = time.NewTicker(time.Second) + buff = slice.New[byte]() + keyframe = slice.New[byte]() + buf = make([]byte, 1<<16) ) for { @@ -795,10 +805,8 @@ func (t *M4SStream) saveStreamFlv() (e error) { t.bootBufPush(keyframe.GetPureBuf()) keyframe.Reset() t.Stream_msg.PushLock_tag(`data`, t.boot_buf) - frameCount += 1 - if frameCount == 1 { - t.msg.Push_tag(`firstFrame`, t) - } + t.frameCount += 1 + t.msg.Push_tag(`keyFrame`, t) } if last_available_offset > 1 { // fmt.Println("write Sync") @@ -881,7 +889,6 @@ func (t *M4SStream) saveStreamM4s() (e error) { buf = slice.New[byte]() fmp4Decoder = &Fmp4Decoder{} keyframe = slice.New[byte]() - frameCount = 0 to = 3 fmp4UpdateTo = 7.0 fmp4Updated time.Time @@ -1072,10 +1079,8 @@ func (t *M4SStream) saveStreamM4s() (e error) { t.bootBufPush(keyframe.GetPureBuf()) keyframe.Reset() t.Stream_msg.PushLock_tag(`data`, t.boot_buf) - frameCount += 1 - if frameCount == 1 { - t.msg.Push_tag(`firstFrame`, t) - } + t.frameCount += 1 + t.msg.Push_tag(`keyFrame`, t) } _ = buf.RemoveFront(last_available_offset) diff --git a/demo/config/config_K_v.json b/demo/config/config_K_v.json index 6cee3cd..c217672 100644 --- a/demo/config/config_K_v.json +++ b/demo/config/config_K_v.json @@ -70,6 +70,7 @@ "直播流类型-help": "flv,fmp4,flvH,fmp4H,带H后缀的为Hevc格式编码", "直播流类型": "fmp4", "直播流不使用mcdn": false, + "直播流接收n帧才保存": 3, "flv断流超时s": 5, "flv断流续接": true, "fmp4切片下载超时s": 3, -- 2.39.2