From 8caeec754091e2c84f5589ee19ec3f340ab69976 Mon Sep 17 00:00:00 2001 From: qydysky Date: Fri, 7 Apr 2023 07:19:12 +0800 Subject: [PATCH] 1 --- .gitignore | 1 + Reply/stream.go | 77 +++++++++++++++++++++++-------------- demo/config/config_K_v.json | 1 + 3 files changed, 51 insertions(+), 28 deletions(-) diff --git a/.gitignore b/.gitignore index 6f0bbbc..12f6173 100644 --- a/.gitignore +++ b/.gitignore @@ -24,3 +24,4 @@ demo/main.exe *.m3u8 *.flv demo/build.bat +demo/build.sh diff --git a/Reply/stream.go b/Reply/stream.go index 9f1e6a1..8001292 100644 --- a/Reply/stream.go +++ b/Reply/stream.go @@ -50,8 +50,7 @@ type M4SStream struct { m4s_pool *pool.Buf[m4s_link_item] //切片pool common c.Common //通用配置副本 Current_save_path string //明确的直播流保存目录 - // 事件周期 - // start: 开始实例 startRec:开始录制 load:接收到视频头 stopRec:结束录制 stop:结束实例 + // 事件周期 start: 开始实例 startRec:开始录制 load:接收到视频头 cut:切 stopRec:结束录制 stop:结束实例 msg *msgq.MsgType[*M4SStream] //实例的各种事件回调 Callback_start func(*M4SStream) error //实例开始的回调 Callback_startRec func(*M4SStream) error //录制开始的回调 @@ -66,6 +65,7 @@ type M4SStream_Config struct { want_qn int //直播流清晰度 want_type string //直播流类型 banlance_host bool //直播hls流故障转移 + save_to_file bool //保存到文件 } type m4s_link_item struct { @@ -173,6 +173,9 @@ func (t *M4SStream) LoadConfig(common c.Common) (e error) { if v, ok := common.K_v.LoadV(`直播hls流故障转移`).(bool); ok { t.config.banlance_host = v } + if v, ok := common.K_v.LoadV(`直播流保存到文件`).(bool); ok { + t.config.save_to_file = v + } if v, ok := common.K_v.LoadV(`直播流清晰度`).(float64); ok { t.config.want_qn = int(v) } @@ -591,11 +594,48 @@ func (t *M4SStream) saveStream() (e error) { t.log.L(`W: `, err) } + // 保存到文件 + if t.config.save_to_file { + var ( + contextC context.Context + cancle context.CancelFunc + ) + t.msg.Pull_tag_async(map[string]func(*M4SStream) (disable bool){ + `cut`: func(ms *M4SStream) (disable bool) { + select { + case <-contextC.Done(): + + } + if contextC != nil { + cancle() + } + contextC, cancle = context.WithCancel(context.Background()) + defer cancle() + + l := ms.log.Base_add(`文件`) + startf := func(_ *M4SStream) error { + l.L(`T: `, `start`) + return nil + } + stopf := func(_ *M4SStream) error { + l.L(`T: `, `stop`) + return nil + } + if e := ms.PusherToFile(contextC, ms.Current_save_path+`0.`+ms.stream_type, startf, stopf); e != nil { + l.L(`E: `, e) + } + return false + }, + }) + t.msg.Pull_tag_only(`load`, func(ms *M4SStream) (disable bool) { + ms.msg.Push_tag(`cut`, ms) + return true + }) + } + // 获取流 startT := time.Now() switch t.stream_type { - case `m3u8`: - fallthrough case `mp4`: e = t.saveStreamM4s() case `flv`: @@ -677,8 +717,6 @@ func (t *M4SStream) saveStreamFlv() (e error) { } }() - out := file.New(t.Current_save_path+`0.flv`, -1, true).File() - rc, rw := io.Pipe() var leastReadUnix atomic.Int64 leastReadUnix.Store(time.Now().Unix()) @@ -724,7 +762,6 @@ func (t *M4SStream) saveStreamFlv() (e error) { n, e := rc.Read(buf) buff.Append(buf[:n]) if e != nil { - out.Close() t.Stream_msg.PushLock_tag(`close`, nil) break } @@ -754,20 +791,17 @@ func (t *M4SStream) saveStreamFlv() (e error) { t.first_buf = make([]byte, len(front_buf)) copy(t.first_buf, front_buf) // fmt.Println("write front_buf") - out.Write(t.first_buf) t.Stream_msg.PushLock_tag(`data`, t.first_buf) t.msg.Push_tag(`load`, t) } if len(t.first_buf) != 0 && keyframe.Size() != 0 { t.bootBufPush(keyframe.GetPureBuf()) keyframe.Reset() - out.Write(t.boot_buf) t.Stream_msg.PushLock_tag(`data`, t.boot_buf) } if last_available_offset > 1 { // fmt.Println("write Sync") buff.RemoveFront(last_available_offset - 1) - out.Sync() } } } @@ -818,9 +852,6 @@ func (t *M4SStream) saveStreamM4s() (e error) { Max: 3, } - var out = file.New(t.Current_save_path+`0.mp4`, 0, false) - defer out.Close() - // var ( buf = slice.New[byte]() @@ -960,10 +991,6 @@ func (t *M4SStream) saveStreamM4s() (e error) { } t.first_buf = make([]byte, len(front_buf)) copy(t.first_buf, front_buf) - if out != nil { - out.Write(t.first_buf, true) - out.Sync() - } t.msg.Push_tag(`load`, t) } t.putM4s(download_seq[k]) @@ -1010,10 +1037,6 @@ func (t *M4SStream) saveStreamM4s() (e error) { t.bootBufPush(keyframe.GetPureBuf()) keyframe.Reset() t.Stream_msg.PushLock_tag(`data`, t.boot_buf) - if out != nil { - out.Write(t.boot_buf, true) - out.Sync() - } } buf.RemoveFront(last_available_offset) @@ -1135,13 +1158,13 @@ func (t *M4SStream) Start() bool { // 设置事件 if t.Callback_stopRec != nil { t.msg.Pull_tag_only("stopRec", func(ms *M4SStream) (disable bool) { - t.Callback_stopRec(ms) + ms.Callback_stopRec(ms) return false }) } if t.Callback_stop != nil { t.msg.Pull_tag_only("stop", func(ms *M4SStream) (disable bool) { - t.Callback_stop(ms) + ms.Callback_stop(ms) return false }) } @@ -1227,12 +1250,10 @@ func (t *M4SStream) Stop() { } // 保存到文件 -// filepath: 不包含后缀,会自动添加后缀 func (t *M4SStream) PusherToFile(cont context.Context, filepath string, startFunc func(*M4SStream) error, stopFunc func(*M4SStream) error) error { - f := file.New(filepath+"."+t.stream_type, 0, true) - if e := f.Delete(); e != nil { - return e - } + f := file.New(filepath, 0, false) + defer f.Close() + f.Delete() if e := startFunc(t); e != nil { return e diff --git a/demo/config/config_K_v.json b/demo/config/config_K_v.json index 20396d9..8800883 100644 --- a/demo/config/config_K_v.json +++ b/demo/config/config_K_v.json @@ -58,6 +58,7 @@ "直播流保存位置": "./live", "直播流保存天数-help": "当t日有1录播时,会尝试删除t-n日的1个最早的录播。小于1的数将禁用此功能", "直播流保存天数": 4, + "直播流保存到文件": true, "直播hls流故障转移-help":"true:hls服务器故障时,使用其他", "直播hls流故障转移": true, "仅保存当前直播间流-help": "启用此项,才会保存Ass", -- 2.39.2