]> 127.0.0.1 Git - bili_danmu/.git/commitdiff
1
authorqydysky <qydysky@foxmail.com>
Thu, 6 Apr 2023 23:19:12 +0000 (07:19 +0800)
committerqydysky <qydysky@foxmail.com>
Thu, 6 Apr 2023 23:19:12 +0000 (07:19 +0800)
.gitignore
Reply/stream.go
demo/config/config_K_v.json

index 6f0bbbc3360d373c60392fa35ebb5294d64a2ac0..12f61739950bcfc375a04cee5ab7854881cc114f 100644 (file)
@@ -24,3 +24,4 @@ demo/main.exe
 *.m3u8
 *.flv
 demo/build.bat
+demo/build.sh
index 9f1e6a17da6b79786ab7f2d4a8f5ea6bd049e58a..80012926e8cdc447ef64e61424caab1b4dc24ffc 100644 (file)
@@ -50,8 +50,7 @@ type M4SStream struct {
        m4s_pool          *pool.Buf[m4s_link_item] //切片pool
        common            c.Common                 //通用配置副本
        Current_save_path string                   //明确的直播流保存目录
-       // 事件周期
-       // start: 开始实例 startRec:开始录制 load:接收到视频头 stopRec:结束录制 stop:结束实例
+       // 事件周期 start: 开始实例 startRec:开始录制 load:接收到视频头 cut:切 stopRec:结束录制 stop:结束实例
        msg               *msgq.MsgType[*M4SStream] //实例的各种事件回调
        Callback_start    func(*M4SStream) error    //实例开始的回调
        Callback_startRec func(*M4SStream) error    //录制开始的回调
@@ -66,6 +65,7 @@ type M4SStream_Config struct {
        want_qn       int    //直播流清晰度
        want_type     string //直播流类型
        banlance_host bool   //直播hls流故障转移
+       save_to_file  bool   //保存到文件
 }
 
 type m4s_link_item struct {
@@ -173,6 +173,9 @@ func (t *M4SStream) LoadConfig(common c.Common) (e error) {
        if v, ok := common.K_v.LoadV(`直播hls流故障转移`).(bool); ok {
                t.config.banlance_host = v
        }
+       if v, ok := common.K_v.LoadV(`直播流保存到文件`).(bool); ok {
+               t.config.save_to_file = v
+       }
        if v, ok := common.K_v.LoadV(`直播流清晰度`).(float64); ok {
                t.config.want_qn = int(v)
        }
@@ -591,11 +594,48 @@ func (t *M4SStream) saveStream() (e error) {
                t.log.L(`W: `, err)
        }
 
+       // 保存到文件
+       if t.config.save_to_file {
+               var (
+                       contextC context.Context
+                       cancle   context.CancelFunc
+               )
+               t.msg.Pull_tag_async(map[string]func(*M4SStream) (disable bool){
+                       `cut`: func(ms *M4SStream) (disable bool) {
+                               select {
+                               case <-contextC.Done():
+                                       
+                               }
+                               if contextC != nil {
+                                       cancle()
+                               }
+                               contextC, cancle = context.WithCancel(context.Background())
+                               defer cancle()
+
+                               l := ms.log.Base_add(`文件`)
+                               startf := func(_ *M4SStream) error {
+                                       l.L(`T: `, `start`)
+                                       return nil
+                               }
+                               stopf := func(_ *M4SStream) error {
+                                       l.L(`T: `, `stop`)
+                                       return nil
+                               }
+                               if e := ms.PusherToFile(contextC, ms.Current_save_path+`0.`+ms.stream_type, startf, stopf); e != nil {
+                                       l.L(`E: `, e)
+                               }
+                               return false
+                       },
+               })
+               t.msg.Pull_tag_only(`load`, func(ms *M4SStream) (disable bool) {
+                       ms.msg.Push_tag(`cut`, ms)
+                       return true
+               })
+       }
+
        // 获取流
        startT := time.Now()
        switch t.stream_type {
-       case `m3u8`:
-               fallthrough
        case `mp4`:
                e = t.saveStreamM4s()
        case `flv`:
@@ -677,8 +717,6 @@ func (t *M4SStream) saveStreamFlv() (e error) {
                                }
                        }()
 
-                       out := file.New(t.Current_save_path+`0.flv`, -1, true).File()
-
                        rc, rw := io.Pipe()
                        var leastReadUnix atomic.Int64
                        leastReadUnix.Store(time.Now().Unix())
@@ -724,7 +762,6 @@ func (t *M4SStream) saveStreamFlv() (e error) {
                                        n, e := rc.Read(buf)
                                        buff.Append(buf[:n])
                                        if e != nil {
-                                               out.Close()
                                                t.Stream_msg.PushLock_tag(`close`, nil)
                                                break
                                        }
@@ -754,20 +791,17 @@ func (t *M4SStream) saveStreamFlv() (e error) {
                                                        t.first_buf = make([]byte, len(front_buf))
                                                        copy(t.first_buf, front_buf)
                                                        // fmt.Println("write front_buf")
-                                                       out.Write(t.first_buf)
                                                        t.Stream_msg.PushLock_tag(`data`, t.first_buf)
                                                        t.msg.Push_tag(`load`, t)
                                                }
                                                if len(t.first_buf) != 0 && keyframe.Size() != 0 {
                                                        t.bootBufPush(keyframe.GetPureBuf())
                                                        keyframe.Reset()
-                                                       out.Write(t.boot_buf)
                                                        t.Stream_msg.PushLock_tag(`data`, t.boot_buf)
                                                }
                                                if last_available_offset > 1 {
                                                        // fmt.Println("write Sync")
                                                        buff.RemoveFront(last_available_offset - 1)
-                                                       out.Sync()
                                                }
                                        }
                                }
@@ -818,9 +852,6 @@ func (t *M4SStream) saveStreamM4s() (e error) {
                Max: 3,
        }
 
-       var out = file.New(t.Current_save_path+`0.mp4`, 0, false)
-       defer out.Close()
-
        //
        var (
                buf         = slice.New[byte]()
@@ -960,10 +991,6 @@ func (t *M4SStream) saveStreamM4s() (e error) {
                                        }
                                        t.first_buf = make([]byte, len(front_buf))
                                        copy(t.first_buf, front_buf)
-                                       if out != nil {
-                                               out.Write(t.first_buf, true)
-                                               out.Sync()
-                                       }
                                        t.msg.Push_tag(`load`, t)
                                }
                                t.putM4s(download_seq[k])
@@ -1010,10 +1037,6 @@ func (t *M4SStream) saveStreamM4s() (e error) {
                                t.bootBufPush(keyframe.GetPureBuf())
                                keyframe.Reset()
                                t.Stream_msg.PushLock_tag(`data`, t.boot_buf)
-                               if out != nil {
-                                       out.Write(t.boot_buf, true)
-                                       out.Sync()
-                               }
                        }
 
                        buf.RemoveFront(last_available_offset)
@@ -1135,13 +1158,13 @@ func (t *M4SStream) Start() bool {
                // 设置事件
                if t.Callback_stopRec != nil {
                        t.msg.Pull_tag_only("stopRec", func(ms *M4SStream) (disable bool) {
-                               t.Callback_stopRec(ms)
+                               ms.Callback_stopRec(ms)
                                return false
                        })
                }
                if t.Callback_stop != nil {
                        t.msg.Pull_tag_only("stop", func(ms *M4SStream) (disable bool) {
-                               t.Callback_stop(ms)
+                               ms.Callback_stop(ms)
                                return false
                        })
                }
@@ -1227,12 +1250,10 @@ func (t *M4SStream) Stop() {
 }
 
 // 保存到文件
-// filepath: 不包含后缀,会自动添加后缀
 func (t *M4SStream) PusherToFile(cont context.Context, filepath string, startFunc func(*M4SStream) error, stopFunc func(*M4SStream) error) error {
-       f := file.New(filepath+"."+t.stream_type, 0, true)
-       if e := f.Delete(); e != nil {
-               return e
-       }
+       f := file.New(filepath, 0, false)
+       defer f.Close()
+       f.Delete()
 
        if e := startFunc(t); e != nil {
                return e
index 20396d9c17ec3284264745774e9663e8c89c0ec0..880088370c8b67f79779a31cefbc83cf72cb84b7 100644 (file)
@@ -58,6 +58,7 @@
     "直播流保存位置": "./live",
     "直播流保存天数-help": "当t日有1录播时,会尝试删除t-n日的1个最早的录播。小于1的数将禁用此功能",
     "直播流保存天数": 4,
+    "直播流保存到文件": true,
     "直播hls流故障转移-help":"true:hls服务器故障时,使用其他",
     "直播hls流故障转移": true,
     "仅保存当前直播间流-help": "启用此项,才会保存Ass",