frameCount uint //关键帧数量
boot_buf []byte //快速启动缓冲
boot_buf_locker sync.RWMutex
- last_m4s *m4s_link_item //最后一个切片
m4s_pool *pool.Buf[m4s_link_item] //切片pool
common *c.Common //通用配置副本
Current_save_path string //明确的直播流保存目录
return t.common.ValidLive() != nil
}
-func (t *M4SStream) fetchParseM3U8(fmp4ListUpdateTo float64) (m4s_links []*m4s_link_item, e error) {
+func (t *M4SStream) fetchParseM3U8(lastM4s *m4s_link_item, fmp4ListUpdateTo float64) (m4s_links []*m4s_link_item, e error) {
if t.common.ValidLive() == nil {
e = errors.New("全部流服务器发生故障")
return
defer t.reqPool.Put(r)
// 请求解析m3u8内容
- for k, v := range t.common.Live {
+ for k := 0; k < len(t.common.Live); k++ {
+ v := &(t.common.Live[k])
+
// 跳过尚未启用的live地址
if !v.Valid() {
continue
// 解析m3u8
// var tmp []*m4s_link_item
var lastNo int
- if t.last_m4s != nil {
- lastNo, _ = t.last_m4s.getNo()
+ if lastM4s != nil {
+ lastNo, _ = lastM4s.getNo()
}
for _, line := range bytes.Split(m3u8_respon, []byte("\n")) {
if len(line) == 0 {
continue
} else if bytes.Contains(line, []byte(".m4s")) {
m4s_link = string(line)
+ } else if bytes.HasPrefix(line, []byte("http")) {
+ // m3u8 指向新连接
+ newUrl := strings.TrimSpace(string(line))
+ t.log.L(`I: `, `指向新连接`, v.Host(), "=>", F.ParseHost(newUrl))
+ v.SetUrl(newUrl)
+ continue
} else {
continue
}
{
// 只增加新的切片
tmpBase := m4s_link
- // fmt.Println(tmpBase, t.last_m4s != nil)
+ // fmt.Println(tmpBase, lastM4s != nil)
if tmpBase[0] == 'h' {
- if t.last_m4s != nil {
+ if lastM4s != nil {
continue
} else {
tmpBase = tmpBase[1:]
}
if len(m4s_links) == 0 {
- if t.last_m4s != nil &&
- !t.last_m4s.createdTime.IsZero() &&
- time.Since(t.last_m4s.createdTime).Seconds() > fmp4ListUpdateTo {
+ if lastM4s != nil &&
+ !lastM4s.createdTime.IsZero() &&
+ time.Since(lastM4s.createdTime).Seconds() > fmp4ListUpdateTo {
// 1min后重新启用
t.common.Live[k].DisableAuto()
- t.log.L("W: ", fmt.Sprintf("服务器 %s 发生故障 %.2f 秒未产出切片", F.ParseHost(v.Url), time.Since(t.last_m4s.createdTime).Seconds()))
+ t.log.L("W: ", fmt.Sprintf("服务器 %s 发生故障 %.2f 秒未产出切片", F.ParseHost(v.Url), time.Since(lastM4s.createdTime).Seconds()))
if t.common.ValidLive() == nil {
e = errors.New("全部切片服务器发生故障")
break
}
// 检查是否服务器发生故障,产出切片错误
- if t.last_m4s != nil {
- timed := m4s_links[len(m4s_links)-1].createdTime.Sub(t.last_m4s.createdTime).Seconds()
+ if lastM4s != nil {
+ timed := m4s_links[len(m4s_links)-1].createdTime.Sub(lastM4s.createdTime).Seconds()
nos, _ := m4s_links[len(m4s_links)-1].getNo()
- noe, _ := t.last_m4s.getNo()
+ noe := lastNo
if (timed > 5 && nos-noe == 0) || (nos-noe > 50) {
// 1min后重新启用
t.common.Live[k].DisableAuto()
}
}
- // m4s_links = append(m4s_links, tmp...)
-
// 首次下载
- if t.last_m4s == nil {
+ if lastM4s == nil {
return
}
func (t *M4SStream) saveStream() (e error) {
// 清除初始值
- t.last_m4s = nil
t.first_buf = nil
t.frameCount = 0
buf = slice.New[byte]()
fmp4Decoder = &Fmp4Decoder{}
keyframe = slice.New[byte]()
+ lastM4s *m4s_link_item
to = 3
+ waitT = 1.0
fmp4ListUpdateTo = 5.0
)
}
}
- // 避免过于频繁的请求
- if t.last_m4s != nil && time.Since(t.last_m4s.createdTime) < time.Second {
- time.Sleep(time.Second)
- }
-
// 获取解析m3u8
- var m4s_links, err = t.fetchParseM3U8(fmp4ListUpdateTo)
+ var m4s_links, err = t.fetchParseM3U8(lastM4s, fmp4ListUpdateTo)
if err != nil {
t.log.L(`E: `, `获取解析m3u8发生错误`, err)
// if len(download_seq) != 0 {
}
}
- // {
- // if t.last_m4s != nil {
- // l, _ := t.last_m4s.getNo()
- // fmt.Println("last", l)
- // }
- // for i := 0; i < len(m4s_links); i++ {
- // fmt.Println(m4s_links[i].getNo())
- // }
- // }
+ // 避免过于频繁的请求
+ if len(m4s_links) < 2 {
+ waitT += 0.5
+ } else if len(m4s_links) > 2 {
+ waitT -= 0.5
+ }
+ time.Sleep(time.Second * time.Duration(waitT))
// 设置最后的切片
- if t.last_m4s == nil {
- t.last_m4s = &m4s_link_item{}
- }
for i := len(m4s_links) - 1; i >= 0; i-- {
// fmt.Println("set last m4s", m4s_links[i].Base)
if !m4s_links[i].isInit() && len(m4s_links[i].Base) > 0 {
- m4s_links[i].copyTo(t.last_m4s)
+ if lastM4s == nil {
+ lastM4s = &m4s_link_item{}
+ }
+ m4s_links[i].copyTo(lastM4s)
break
}
}