first_buf []byte //m4s起始块 or flv起始块
boot_buf []byte //快速启动缓冲
boot_buf_locker funcCtrl.BlockFunc
- last_m4s *m4s_link_item //最后一个切片
- m4s_pool *pool.Buf[m4s_link_item] //切片pool
- common c.Common //通用配置副本
- Current_save_path string //明确的直播流保存目录
+ last_m4s *m4s_link_item //最后一个切片
+ m4s_pool *pool.Buf[m4s_link_item] //切片pool
+ common c.Common //通用配置副本
+ Current_save_path string //明确的直播流保存目录
+ // 事件周期
+ // start: 开始实例 startRec:开始录制 load:接收到视频头 stopRec:结束录制 stop:结束实例
+ msg *msgq.MsgType[*M4SStream] //实例的各种事件回调
Callback_start func(*M4SStream) error //实例开始的回调
Callback_startRec func(*M4SStream) error //录制开始的回调
Callback_stopRec func(*M4SStream) //录制结束的回调
- msg *msgq.MsgType[*M4SStream] //
Callback_stop func(*M4SStream) //实例结束的回调
reqPool *pool.Buf[reqf.Req] //请求池
duration time.Duration //录制时长
}
// 录制回调
+ t.msg.Push_tag(`startRec`, t)
if t.Callback_startRec != nil {
if err := t.Callback_startRec(t); err != nil {
t.log.L(`W: `, `开始录制回调错误`, err.Error())
return err
}
}
- defer t.msg.Push_tag(`stoprec`, t)
+ defer t.msg.Push_tag(`stopRec`, t)
// 移除历史流
if err := t.removeStream(); err != nil {
}
// 实例回调
+ t.msg = msgq.NewType[*M4SStream]()
+ t.msg.Push_tag(`start`, t)
if t.Callback_start != nil {
if e := t.Callback_start(t); e != nil {
t.log.L(`W: `, `开始回调错误`, e.Error())
// 初始化切片消息
t.Stream_msg = msgq.NewType[[]byte]()
- t.msg = msgq.NewType[*M4SStream]()
- t.msg.Pull_tag_only("fin", func(ms *M4SStream) (disable bool) {
- return true
- })
-
// 设置事件
if t.Callback_stopRec != nil {
- t.msg.Pull_tag_only("stoprec", func(ms *M4SStream) (disable bool) {
+ t.msg.Pull_tag_only("stopRec", func(ms *M4SStream) (disable bool) {
t.Callback_stopRec(ms)
return false
})
return false
})
}
+ t.msg.Pull_tag_only("stop", func(_ *M4SStream) (disable bool) {
+ return true
+ })
defer t.msg.Push_tag(`stop`, t)
- defer t.msg.Push_tag(`fin`, nil)
//指定房间录制回调
if v, ok := t.common.K_v.LoadV("指定房间录制回调").([]any); ok && len(v) > 0 {
)
if len(after) > 2 {
- t.msg.Pull_tag_async_only("stoprec", func(ms *M4SStream) (disable bool) {
+ t.msg.Pull_tag_async_only("stopRec", func(ms *M4SStream) (disable bool) {
if durationS >= 0 && ms.duration.Seconds() > durationS {
var cmds []string
for i := 0; i < len(after); i++ {
t.exitSign.Wait()
}
-// 流服务推送方法
-func (t *M4SStream) Pusher(w http.ResponseWriter, r *http.Request) {
- switch t.stream_type {
- case `m3u8`:
- t.pusherM4s(w, r)
- case `mp4`:
- t.pusherM4s(w, r)
- case `flv`:
- t.pusherFlv(w, r)
- default:
- t.log.L(`W: `, `Pusher no support stream_type`)
- }
-}
-
-func (t *M4SStream) pusherM4s(w http.ResponseWriter, r *http.Request) {
- w.Header().Set("Content-Type", "video/mp4")
-
- flusher, flushSupport := w.(http.Flusher)
- if flushSupport {
- flusher.Flush()
+// 保存到文件
+// filepath: 不包含后缀,会自动添加后缀
+func (t *M4SStream) PusherToFile(filepath string, startFunc func(*M4SStream) error, stopFunc func(*M4SStream) error) error {
+ f := file.New(filepath+"."+t.stream_type, 0, true)
+ if e := f.Delete(); e != nil {
+ return e
}
- //写入hls头
- if _, err := w.Write(t.getFirstBuf()); err != nil {
- return
- } else if flushSupport {
- flusher.Flush()
+ if e := startFunc(t); e != nil {
+ return e
}
- //写入快速启动缓冲
+ f.Write(t.getFirstBuf(), true)
if len(t.boot_buf) != 0 {
- if _, err := w.Write(t.boot_buf); err != nil {
- return
- }
- if flushSupport {
- flusher.Flush()
- }
+ f.Write(t.boot_buf, true)
}
-
- cancel := make(chan struct{})
-
- //hls切片
+ contextC, cancel := context.WithCancel(context.Background())
t.Stream_msg.Pull_tag(map[string]func([]byte) bool{
`data`: func(b []byte) bool {
if len(b) == 0 {
- close(cancel)
- return true
- }
- if _, err := w.Write(b); err != nil {
- close(cancel)
+ cancel()
return true
- } else if flushSupport {
- flusher.Flush()
}
+ f.Write(b, true)
return false
},
`close`: func(_ []byte) bool {
- close(cancel)
+ cancel()
return true
},
})
+ <-contextC.Done()
+
+ if e := stopFunc(t); e != nil {
+ return e
+ }
- <-cancel
+ return nil
}
-func (t *M4SStream) pusherFlv(w http.ResponseWriter, r *http.Request) {
- w.Header().Set("Content-Type", "video/x-flv")
+// 流服务推送方法
+func (t *M4SStream) PusherToHttp(w http.ResponseWriter, r *http.Request, startFunc func(*M4SStream) error, stopFunc func(*M4SStream) error) error {
+ switch t.stream_type {
+ case `m3u8`:
+ fallthrough
+ case `mp4`:
+ w.Header().Set("Content-Type", "video/mp4")
+ case `flv`:
+ w.Header().Set("Content-Type", "video/x-flv")
+ default:
+ w.WriteHeader(http.StatusNotFound)
+ return errors.New("pusher no support stream_type")
+ }
+
+ if e := startFunc(t); e != nil {
+ return e
+ }
flusher, flushSupport := w.(http.Flusher)
if flushSupport {
flusher.Flush()
}
- //写入flv头
+ //写入头
if _, err := w.Write(t.getFirstBuf()); err != nil {
- return
+ return err
} else if flushSupport {
flusher.Flush()
}
//写入快速启动缓冲
if len(t.boot_buf) != 0 {
if _, err := w.Write(t.boot_buf); err != nil {
- return
+ return err
}
if flushSupport {
flusher.Flush()
}
}
- cancel := make(chan struct{})
+ contextC, cancel := context.WithCancel(r.Context())
- //flv
+ //
t.Stream_msg.Pull_tag(map[string]func([]byte) bool{
`data`: func(b []byte) bool {
+ select {
+ case <-contextC.Done():
+ return true
+ default:
+ }
if len(b) == 0 {
- close(cancel)
+ cancel()
return true
}
if _, err := w.Write(b); err != nil {
- close(cancel)
+ cancel()
return true
} else if flushSupport {
flusher.Flush()
return false
},
`close`: func(_ []byte) bool {
- close(cancel)
+ cancel()
return true
},
})
- <-cancel
+ <-contextC.Done()
+
+ if e := stopFunc(t); e != nil {
+ return e
+ }
+
+ return nil
}
func (t *M4SStream) bootBufPush(buf []byte) {