m4s_pool *pool.Buf[m4s_link_item] //切片pool
common c.Common //通用配置副本
Current_save_path string //明确的直播流保存目录
- // 事件周期
- // start: 开始实例 startRec:开始录制 load:接收到视频头 stopRec:结束录制 stop:结束实例
+ // 事件周期 start: 开始实例 startRec:开始录制 load:接收到视频头 cut:切 stopRec:结束录制 stop:结束实例
msg *msgq.MsgType[*M4SStream] //实例的各种事件回调
Callback_start func(*M4SStream) error //实例开始的回调
Callback_startRec func(*M4SStream) error //录制开始的回调
want_qn int //直播流清晰度
want_type string //直播流类型
banlance_host bool //直播hls流故障转移
+ save_to_file bool //保存到文件
}
type m4s_link_item struct {
if v, ok := common.K_v.LoadV(`直播hls流故障转移`).(bool); ok {
t.config.banlance_host = v
}
+ if v, ok := common.K_v.LoadV(`直播流保存到文件`).(bool); ok {
+ t.config.save_to_file = v
+ }
if v, ok := common.K_v.LoadV(`直播流清晰度`).(float64); ok {
t.config.want_qn = int(v)
}
t.log.L(`W: `, err)
}
+ // 保存到文件
+ if t.config.save_to_file {
+ var (
+ contextC context.Context
+ cancle context.CancelFunc
+ )
+ t.msg.Pull_tag_async(map[string]func(*M4SStream) (disable bool){
+ `cut`: func(ms *M4SStream) (disable bool) {
+ select {
+ case <-contextC.Done():
+
+ }
+ if contextC != nil {
+ cancle()
+ }
+ contextC, cancle = context.WithCancel(context.Background())
+ defer cancle()
+
+ l := ms.log.Base_add(`文件`)
+ startf := func(_ *M4SStream) error {
+ l.L(`T: `, `start`)
+ return nil
+ }
+ stopf := func(_ *M4SStream) error {
+ l.L(`T: `, `stop`)
+ return nil
+ }
+ if e := ms.PusherToFile(contextC, ms.Current_save_path+`0.`+ms.stream_type, startf, stopf); e != nil {
+ l.L(`E: `, e)
+ }
+ return false
+ },
+ })
+ t.msg.Pull_tag_only(`load`, func(ms *M4SStream) (disable bool) {
+ ms.msg.Push_tag(`cut`, ms)
+ return true
+ })
+ }
+
// 获取流
startT := time.Now()
switch t.stream_type {
- case `m3u8`:
- fallthrough
case `mp4`:
e = t.saveStreamM4s()
case `flv`:
}
}()
- out := file.New(t.Current_save_path+`0.flv`, -1, true).File()
-
rc, rw := io.Pipe()
var leastReadUnix atomic.Int64
leastReadUnix.Store(time.Now().Unix())
n, e := rc.Read(buf)
buff.Append(buf[:n])
if e != nil {
- out.Close()
t.Stream_msg.PushLock_tag(`close`, nil)
break
}
t.first_buf = make([]byte, len(front_buf))
copy(t.first_buf, front_buf)
// fmt.Println("write front_buf")
- out.Write(t.first_buf)
t.Stream_msg.PushLock_tag(`data`, t.first_buf)
t.msg.Push_tag(`load`, t)
}
if len(t.first_buf) != 0 && keyframe.Size() != 0 {
t.bootBufPush(keyframe.GetPureBuf())
keyframe.Reset()
- out.Write(t.boot_buf)
t.Stream_msg.PushLock_tag(`data`, t.boot_buf)
}
if last_available_offset > 1 {
// fmt.Println("write Sync")
buff.RemoveFront(last_available_offset - 1)
- out.Sync()
}
}
}
Max: 3,
}
- var out = file.New(t.Current_save_path+`0.mp4`, 0, false)
- defer out.Close()
-
//
var (
buf = slice.New[byte]()
}
t.first_buf = make([]byte, len(front_buf))
copy(t.first_buf, front_buf)
- if out != nil {
- out.Write(t.first_buf, true)
- out.Sync()
- }
t.msg.Push_tag(`load`, t)
}
t.putM4s(download_seq[k])
t.bootBufPush(keyframe.GetPureBuf())
keyframe.Reset()
t.Stream_msg.PushLock_tag(`data`, t.boot_buf)
- if out != nil {
- out.Write(t.boot_buf, true)
- out.Sync()
- }
}
buf.RemoveFront(last_available_offset)
// 设置事件
if t.Callback_stopRec != nil {
t.msg.Pull_tag_only("stopRec", func(ms *M4SStream) (disable bool) {
- t.Callback_stopRec(ms)
+ ms.Callback_stopRec(ms)
return false
})
}
if t.Callback_stop != nil {
t.msg.Pull_tag_only("stop", func(ms *M4SStream) (disable bool) {
- t.Callback_stop(ms)
+ ms.Callback_stop(ms)
return false
})
}
}
// 保存到文件
-// filepath: 不包含后缀,会自动添加后缀
func (t *M4SStream) PusherToFile(cont context.Context, filepath string, startFunc func(*M4SStream) error, stopFunc func(*M4SStream) error) error {
- f := file.New(filepath+"."+t.stream_type, 0, true)
- if e := f.Delete(); e != nil {
- return e
- }
+ f := file.New(filepath, 0, false)
+ defer f.Close()
+ f.Delete()
if e := startFunc(t); e != nil {
return e