// 获取实例的录制状态
func StreamOStatus(roomid int) (Islive bool) {
v, ok := streamO.Load(roomid)
- return ok && (v.(*M4SStream).Status.Islive() || v.(*M4SStream).exitSign.Islive())
+ return ok && (!pctx.Done(v.(*M4SStream).Status) || v.(*M4SStream).exitSign.Islive())
}
// 开始实例
if c.C.Roomid == _roomid {
return true
}
- if v.(*M4SStream).Status.Islive() {
+ if !pctx.Done(v.(*M4SStream).Status) {
v.(*M4SStream).Stop()
}
streamO.Delete(_roomid)
})
case -1: // 所有房间
streamO.Range(func(k, v interface{}) bool {
- if v.(*M4SStream).Status.Islive() {
+ if !pctx.Done(v.(*M4SStream).Status) {
v.(*M4SStream).Stop()
}
streamO.Delete(k)
})
default: // 针对某房间
if v, ok := streamO.Load(roomid); ok {
- if v.(*M4SStream).Status.Islive() {
+ if !pctx.Done(v.(*M4SStream).Status) {
v.(*M4SStream).Stop()
}
streamO.Delete(roomid)
// 实例切断
func StreamOCut(roomid int, title ...string) {
if v, ok := streamO.Load(roomid); ok {
- if v.(*M4SStream).Status.Islive() {
+ if !pctx.Done(v.(*M4SStream).Status) {
if len(title) != 0 {
v.(*M4SStream).common.Title = title[0]
}
})
// 未准备好
- if currentStreamO == nil || !currentStreamO.Status.Islive() {
+ if currentStreamO == nil || pctx.Done(currentStreamO.Status) {
w.Header().Set("Retry-After", "1")
w.WriteHeader(http.StatusNotFound)
return
buf_offset = tag_offset + 1
last_available_offset = buf_offset
// fmt.Printf("tag_size error %x\n",buf[tag_offset:tag_offset+tag_header_size])
- continue //tag_size error
+ // continue //tag_size error
+ err = errors.New(`[decoder]tag_size error`)
+ return
}
tag_size_check := int(F.Btoi32(buf[tag_offset+tag_header_size+tag_size:tag_offset+tag_header_size+tag_size+previou_tag_size], 0))
buf_offset = tag_offset + 1
last_available_offset = buf_offset
// fmt.Printf("tag_size_check error %x\n",buf[tag_offset:tag_offset+tag_header_size])
- continue //tag_size_check error
+ // continue //tag_size_check error
+ err = errors.New(`[decoder]tag_size_check error`)
+ return
}
time_stamp := int(F.Btoi32([]byte{buf[tag_offset+7], buf[tag_offset+4], buf[tag_offset+5], buf[tag_offset+6]}, 0))
)
type M4SStream struct {
- Status *signal.Signal //IsLive()是否运行中
+ Status context.Context //IsLive()是否运行中
exitSign *signal.Signal //IsLive()是否等待退出中
log *log.Log_interface //日志
config M4SStream_Config //配置
}
//结束退出
- if !t.Status.Islive() {
+ if pctx.Done(t.Status) {
return
}
}
// flv获取
- cancelC, cancel := context.WithCancel(context.Background())
+ cancelC, cancel := context.WithCancel(t.Status)
+ errCtx := pctx.Value[error]{}
+ cancelC = errCtx.LinkCtx(cancelC)
{
- go func() {
- tsc, tscf := t.Status.WaitC()
- defer tscf()
-
- select {
- //停止录制
- case <-tsc:
- cancel()
- //当前连接终止
- case <-cancelC.Done():
- }
- }()
-
pipe := pio.NewPipe()
var (
leastReadUnix atomic.Int64
// read timeout
go func() {
+ defer cancel()
+
timer := time.NewTicker(time.Duration(readTO * int64(time.Second)))
defer timer.Stop()
case curT := <-timer.C:
if curT.Unix()-leastReadUnix.Load() > readTO {
t.log.L(`W: `, fmt.Sprintf("%vs未接收到有效数据", readTO))
+ pctx.PutVal(cancelC, &errCtx, fmt.Errorf("%vs未接收到有效数据", readTO))
// 时间段内未接收到任何数据
- cancel()
return
}
if v, ok := c.C.K_v.LoadV(`直播流清晰度`).(float64); ok {
if t.config.want_qn != int(v) {
t.log.L(`I: `, "直播流清晰度改变:", t.common.Qn[t.config.want_qn], "=>", t.common.Qn[int(v)])
t.config.want_qn = int(v)
- cancel()
return
}
}
// read
go func() {
+ defer cancel()
+
var (
ticker = time.NewTicker(time.Second)
buff = slice.New[byte]()
keyframe = slice.New[byte]()
buf = make([]byte, 1<<16)
)
+ defer ticker.Stop()
for {
n, e := pipe.Read(buf)
_ = buff.Append(buf[:n])
if e != nil {
- cancel()
+ pctx.PutVal(cancelC, &errCtx, e)
break
}
- skip := true
select {
case <-ticker.C:
- skip = false
default:
- }
- if skip {
continue
}
if strings.Contains(e.Error(), `no found available tag`) {
continue
}
+ pctx.PutVal(cancelC, &errCtx, errors.New("[decoder]"+e.Error()))
//丢弃所有数据
buff.Reset()
}
if keyframe.Size() != 0 {
if len(t.first_buf) == 0 {
t.log.L(`W: `, `flv未接收到起始段`)
- cancel()
+ pctx.PutVal(cancelC, &errCtx, errors.New(`flv未接收到起始段`))
break
}
t.bootBufPush(keyframe.GetPureBuf())
}
}
}
-
buf = nil
buff.Reset()
-
- ticker.Stop()
}()
t.log.L(`I: `, `flv下载开始`)
if err := r.Wait(); err != nil && !errors.Is(err, io.EOF) {
if reqf.IsCancel(err) {
t.log.L(`I: `, `flv下载停止`)
+ return
} else if err != nil && !reqf.IsTimeout(err) {
e = err
t.log.L(`E: `, `flv下载失败:`, err)
}
+ } else if err := errCtx.Get(); err != nil && strings.HasPrefix(err.Error(), "[decoder]") {
+ e = err
}
}
+
cancel()
if v1, ok := t.common.K_v.LoadV(`flv断流续接`).(bool); ok && !v1 {
}
// 停止录制
- if !t.Status.Islive() {
+ if pctx.Done(t.Status) {
if len(download_seq) != 0 {
if time.Now().Unix() > t.stream_last_modified.Unix()+300 {
e = errors.New("切片下载超时")
}
// 状态检测与设置
- if t.Status.Islive() {
+ if !pctx.Done(t.Status) {
t.log.L(`T: `, `已存在实例`)
return false
}
}
}
- t.Status = signal.Init()
go func() {
t.log.L(`I: `, `初始化录制(`+strconv.Itoa(t.common.Roomid)+`)`)
defer t.log.L(`I: `, `结束录制(`+strconv.Itoa(t.common.Roomid)+`)`)
defer func() {
// use anonymous func avoid data race and unexpect sign wait
- t.Status.Done()
t.exitSign.Done()
}()
// 设置事件
// 当录制停止时,取消全部录制
- mainCtx, mainCancel := context.WithCancel(context.Background())
- mainCtx, done := pctx.WithWait(mainCtx, 0, time.Minute)
+ t.Status = pctx.CarryCancel(context.WithCancel(context.Background()))
+ mainCtx, done := pctx.WithWait(t.Status, 0, time.Minute)
defer func() {
switch done() {
case pctx.ErrWaitTo:
fallthrough
default:
}
+ _ = pctx.CallCancel(t.Status)
}()
if t.Callback_stopRec != nil {
if ms.Callback_stop != nil {
ms.Callback_stop(ms)
}
- mainCancel()
+ _ = pctx.CallCancel(t.Status)
t.msg.ClearAll()
return true
})
}
// 主循环
- for t.Status.Islive() {
+ for !pctx.Done(t.Status) {
// 是否在直播
F.Get(t.common).Get(`Liveing`)
if !t.common.Liveing {
}
func (t *M4SStream) Stop() {
- if !t.Status.Islive() {
+ if pctx.Done(t.Status) {
t.log.L(`I: `, `正在等待下载完成...`)
t.exitSign.Wait()
return
}
t.exitSign = signal.Init()
- t.Status.Done()
+ _ = pctx.CallCancel(t.Status)
t.log.L(`I: `, `正在等待下载完成...`)
t.exitSign.Wait()
t.log.L(`I: `, `结束`)