p "github.com/qydysky/part"
comp "github.com/qydysky/part/component"
+ pctx "github.com/qydysky/part/ctx"
file "github.com/qydysky/part/file"
pio "github.com/qydysky/part/io"
limit "github.com/qydysky/part/limit"
}
// 设定字幕文件名,为""时停止输出
-func Ass_f(contextC context.Context, save_path string, filePath string, st time.Time) {
+func Ass_f(ctx context.Context, save_path string, filePath string, st time.Time) {
if !IsOn(`仅保存当前直播间流`) {
return
}
_, _ = f.Write([]byte(ass.header), true)
ass.startT = st
- <-contextC.Done()
+ done := pctx.Wait(ctx)
+ defer done()
+
ass.file = ""
fl.L(`I: `, "结束")
}
}
// 弹幕回放
-func StartRecDanmu(c context.Context, filePath string) {
+func StartRecDanmu(ctx context.Context, filePath string) {
if !IsOn(`仅保存当前直播间流`) || !IsOn("弹幕回放") {
return
}
} else {
f.L(`E: `, e)
}
- <-c.Done()
+
+ done := pctx.Wait(ctx)
+ defer done()
+
f.L(`I: `, `结束`)
// 弹幕录制结束
"errors"
"fmt"
"io"
+ "time"
comp "github.com/qydysky/part/component"
file "github.com/qydysky/part/file"
}
func resetTS(ctx context.Context, ptr *string) error {
+ be := time.Now()
fmt.Println("resetTS")
- defer fmt.Println("resetTS fin")
+ defer fmt.Printf("resetTS fin (%v)\n", time.Since(be))
f := file.New(*ptr+"0.mp4", 0, false)
if !f.IsExist() {
}
trackID := btoi32(trackBuf, 0)
_ = f.SeekIndex(4, file.AtCurrent)
- fmt.Printf("tkhd %v \n", int32((cuTs[trackID]-opTs[trackID])/timescale[trackID]))
+ fmt.Printf("tkhd %v %v \n", trackID, int32((cuTs[trackID]-opTs[trackID])/timescale[trackID]))
if _, e := f.Write(itob32(int32((cuTs[trackID]-opTs[trackID])/timescale[trackID])), false); e != nil {
return e
}
c "github.com/qydysky/bili_danmu/CV"
F "github.com/qydysky/bili_danmu/F"
+ pctx "github.com/qydysky/part/ctx"
file "github.com/qydysky/part/file"
funcCtrl "github.com/qydysky/part/funcCtrl"
pio "github.com/qydysky/part/io"
)
type M4SStream struct {
- Status *signal.Signal //IsLive()是否运行中
- exitSign *signal.Signal //IsLive()是否等待退出中
- log *log.Log_interface //日志
- config M4SStream_Config //配置
- stream_last_modified time.Time //流地址更新时间
- // stream_expires int64 //流到期时间
- // stream_hosts psync.Map //使用的流服务器
- stream_type string //流类型
- Stream_msg *msgq.MsgType[[]byte] //流数据消息 tag:data
- first_buf []byte //m4s起始块 or flv起始块
- boot_buf []byte //快速启动缓冲
- boot_buf_locker funcCtrl.BlockFunc
- last_m4s *m4s_link_item //最后一个切片
- m4s_pool *pool.Buf[m4s_link_item] //切片pool
- common *c.Common //通用配置副本
- Current_save_path string //明确的直播流保存目录
+ Status *signal.Signal //IsLive()是否运行中
+ exitSign *signal.Signal //IsLive()是否等待退出中
+ log *log.Log_interface //日志
+ config M4SStream_Config //配置
+ stream_last_modified time.Time //流地址更新时间
+ stream_type string //流类型
+ Stream_msg *msgq.MsgType[[]byte] //流数据消息 tag:data
+ first_buf []byte //m4s起始块 or flv起始块
+ boot_buf []byte //快速启动缓冲
+ boot_buf_locker funcCtrl.BlockFunc
+ last_m4s *m4s_link_item //最后一个切片
+ m4s_pool *pool.Buf[m4s_link_item] //切片pool
+ common *c.Common //通用配置副本
+ Current_save_path string //明确的直播流保存目录
// 事件周期 start: 开始实例 startRec:开始录制 load:接收到视频头 firstFrame: 接收到第一个关键帧 cut:切 stopRec:结束录制 stop:结束实例
msg *msgq.MsgType[*M4SStream] //实例的各种事件回调
Callback_start func(*M4SStream) error //实例开始的回调
t.Status = signal.Init()
go func() {
- defer t.Status.Done()
-
t.log.L(`I: `, `初始化录制(`+strconv.Itoa(t.common.Roomid)+`)`)
+ defer t.log.L(`I: `, `结束录制(`+strconv.Itoa(t.common.Roomid)+`)`)
+ defer t.exitSign.Done()
+ defer t.Status.Done()
+
// 初始化请求池
t.reqPool = t.common.ReqPool
// 设置事件
// 当录制停止时,取消全部录制
- mainContextC, maincancel := context.WithCancel(context.Background())
+ mainCtx, mainCancel := context.WithCancel(context.Background())
+ mainCtx, done := pctx.WithWait(mainCtx, time.Minute)
+ defer func() {
+ if done() != nil {
+ t.log.L(`E: `, `结束超时`)
+ }
+ }()
+
if t.Callback_stopRec != nil {
cancel := t.msg.Pull_tag_only(`stopRec`, func(ms *M4SStream) (disable bool) {
ms.Callback_stopRec(ms)
if ms.Callback_stop != nil {
ms.Callback_stop(ms)
}
- maincancel()
+ mainCancel()
t.msg.ClearAll()
return true
})
cancel := t.msg.Pull_tag_async(map[string]func(*M4SStream) (disable bool){
`cut`: func(ms *M4SStream) (disable bool) {
// 当cut时,取消上次录制
- contextC, cancel := context.WithCancel(mainContextC)
+ contextC, cancel := context.WithCancel(mainCtx)
fc.FlashWithCallback(cancel)
// 分段时长min
}
}
}
-
return false
},
})
}
}
- t.log.L(`I: `, `结束录制(`+strconv.Itoa(t.common.Roomid)+`)`)
-
// 退出
t.msg.Push_tag(`stop`, t)
- t.exitSign.Done()
}()
return true
}
return e
}
+ contextC, done := pctx.WaitCtx(contextC)
+ defer done()
+
_, _ = f.Write(t.getFirstBuf(), true)
cancelRec := t.Stream_msg.Pull_tag_async(map[string]func([]byte) bool{
`data`: func(b []byte) bool {