"net/http"
"net/url"
"os"
+ "os/exec"
"path/filepath"
"strconv"
"strings"
first_buf []byte //m4s起始块 or flv起始块
boot_buf []byte //快速启动缓冲
boot_buf_locker funcCtrl.BlockFunc
- last_m4s *m4s_link_item //最后一个切片
- m4s_pool *pool.Buf[m4s_link_item] //切片pool
- common c.Common //通用配置副本
- Current_save_path string //明确的直播流保存目录
- Callback_start func(*M4SStream) error //实例开始的回调
- Callback_startRec func(*M4SStream) error //录制开始的回调
- Callback_stopRec func(*M4SStream) //录制结束的回调
- Callback_stop func(*M4SStream) //实例结束的回调
- reqPool *pool.Buf[reqf.Req] //请求池
+ last_m4s *m4s_link_item //最后一个切片
+ m4s_pool *pool.Buf[m4s_link_item] //切片pool
+ common c.Common //通用配置副本
+ Current_save_path string //明确的直播流保存目录
+ Callback_start func(*M4SStream) error //实例开始的回调
+ Callback_startRec func(*M4SStream) error //录制开始的回调
+ Callback_stopRec func(*M4SStream) //录制结束的回调
+ msg *msgq.MsgType[*M4SStream] //
+ Callback_stop func(*M4SStream) //实例结束的回调
+ reqPool *pool.Buf[reqf.Req] //请求池
+ duration time.Duration //录制时长
}
type M4SStream_Config struct {
return err
}
}
- if t.Callback_stopRec != nil {
- defer t.Callback_stopRec(t)
- }
+ defer t.msg.Push_tag(`stoprec`, t)
// 移除历史流
if err := t.removeStream(); err != nil {
}
// 获取流
+ startT := time.Now()
switch t.stream_type {
case `m3u8`:
fallthrough
e = errors.New("undefind stream type")
t.log.L(`E: `, e)
}
+ t.duration = time.Since(startT)
return
}
go func() {
defer t.Status.Done()
- if t.Callback_stop != nil {
- defer t.Callback_stop(t)
- }
-
t.log.L(`I: `, `初始化录制(`+strconv.Itoa(t.common.Roomid)+`)`)
// 初始化请求池
// 初始化切片消息
t.Stream_msg = msgq.NewType[[]byte]()
+ t.msg = msgq.NewType[*M4SStream]()
+ t.msg.Pull_tag_only("fin", func(ms *M4SStream) (disable bool) {
+ return true
+ })
+
+ // 设置事件
+ if t.Callback_stopRec != nil {
+ t.msg.Pull_tag_only("stoprec", func(ms *M4SStream) (disable bool) {
+ t.Callback_stopRec(ms)
+ return false
+ })
+ }
+ if t.Callback_stop != nil {
+ t.msg.Pull_tag_only("stop", func(ms *M4SStream) (disable bool) {
+ t.Callback_stop(ms)
+ return false
+ })
+ }
+
+ defer t.msg.Push_tag(`stop`, t)
+ defer t.msg.Push_tag(`fin`, nil)
+
+ //指定房间录制回调
+ if v, ok := t.common.K_v.LoadV("指定房间录制回调").([]any); ok && len(v) > 0 {
+ for i := 0; i < len(v); i++ {
+ if vm, ok := v[i].(map[string]any); ok {
+ if roomid, ok := vm["roomid"].(float64); ok && int(roomid) == t.common.Roomid {
+ var (
+ durationS, _ = vm["durationS"].(float64)
+ after, _ = vm["after"].([]any)
+ )
+
+ if len(after) > 2 {
+ t.msg.Pull_tag_async_only("stoprec", func(ms *M4SStream) (disable bool) {
+ if durationS >= 0 && ms.duration.Seconds() > durationS {
+ var cmds []string
+ for i := 0; i < len(after); i++ {
+ if cmd, ok := after[i].(string); ok && cmd != "" {
+ cmds = append(cmds, strings.ReplaceAll(cmd, "{type}", ms.stream_type))
+ }
+ }
+
+ l := t.log.Base_add(`指定房间录制回调`)
+ cmd := exec.Command(cmds[0], cmds[1:]...)
+ cmd.Dir = ms.Current_save_path
+ l.L(`I: `, "启动", cmd.Args)
+ if e := cmd.Run(); e != nil {
+ l.L(`E: `, e)
+ }
+ l.L(`I: `, "结束")
+ }
+ return false
+ })
+ }
+ }
+ }
+ }
+ }
+
// 主循环
for t.Status.Islive() {
// 是否在直播
continue
}
- // // 设置全部服务
- // for _, v := range t.common.Live {
- // if url_struct, e := url.Parse(v.Url); e == nil {
- // t.stream_hosts.Store(url_struct.Hostname(), v.)
- // }
- // }
-
// 保存流
err := t.saveStream()
if err != nil {
t.log.L(`E: `, "saveStream:", err)
}
-
- // Deprecated: 默认总是获取到可用流
- // 直播流类型故障切换
- // if v, ok := t.common.K_v.LoadV(`直播流类型故障切换`).(bool); v && ok {
- // if err != nil && err.Error() == "未能找到可用流服务器" {
- // if v, ok := t.common.K_v.LoadV(`直播流类型`).(string); ok {
- // switch v {
- // case "fmp4":
- // t.common.K_v.Store(`直播流类型`, `flv`)
- // case "flv":
- // t.common.K_v.Store(`直播流类型`, `hls`)
- // default:
- // t.log.L(`E: `, `未知的流类型:`+v)
- // }
- // }
- // }
- // }
-
}
- t.log.L(`I: `, `结束录制(`+strconv.Itoa(t.common.Roomid)+`)`)
+ t.log.L(`I: `, `结束录制(`+strconv.Itoa(t.common.Roomid)+`) 时长(`+t.duration.String()+`)`)
t.exitSign.Done()
}()
return true