From 093c2ebb1f9a73fd53af9ac835f74f1bc0eca9fd Mon Sep 17 00:00:00 2001 From: qydysky Date: Sun, 4 Feb 2024 21:37:47 +0800 Subject: [PATCH] =?utf8?q?=20Fix=20m3u8=E8=BF=94=E5=9B=9E=E7=9A=84?= =?utf8?q?=E4=B8=8D=E6=98=AF=E6=AD=A3=E5=B8=B8=E5=88=97=E8=A1=A8=20#102?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit --- CV/Var.go | 4 +++ Reply/stream.go | 73 ++++++++++++++++++++++++------------------------- 2 files changed, 40 insertions(+), 37 deletions(-) diff --git a/CV/Var.go b/CV/Var.go index 08afa42..0c5e648 100644 --- a/CV/Var.go +++ b/CV/Var.go @@ -91,6 +91,10 @@ type LiveQn struct { Expires int //流到期时间 } +func (t *LiveQn) SetUrl(url string) { + t.Url = url +} + func (t *LiveQn) Host() string { if liveUrl, e := url.Parse(t.Url); e == nil { return liveUrl.Host diff --git a/Reply/stream.go b/Reply/stream.go index f398206..6181de4 100644 --- a/Reply/stream.go +++ b/Reply/stream.go @@ -52,7 +52,6 @@ type M4SStream struct { frameCount uint //关键帧数量 boot_buf []byte //快速启动缓冲 boot_buf_locker sync.RWMutex - last_m4s *m4s_link_item //最后一个切片 m4s_pool *pool.Buf[m4s_link_item] //切片pool common *c.Common //通用配置副本 Current_save_path string //明确的直播流保存目录 @@ -277,7 +276,7 @@ func (t *M4SStream) fetchCheckStream() bool { return t.common.ValidLive() != nil } -func (t *M4SStream) fetchParseM3U8(fmp4ListUpdateTo float64) (m4s_links []*m4s_link_item, e error) { +func (t *M4SStream) fetchParseM3U8(lastM4s *m4s_link_item, fmp4ListUpdateTo float64) (m4s_links []*m4s_link_item, e error) { if t.common.ValidLive() == nil { e = errors.New("全部流服务器发生故障") return @@ -288,7 +287,9 @@ func (t *M4SStream) fetchParseM3U8(fmp4ListUpdateTo float64) (m4s_links []*m4s_l defer t.reqPool.Put(r) // 请求解析m3u8内容 - for k, v := range t.common.Live { + for k := 0; k < len(t.common.Live); k++ { + v := &(t.common.Live[k]) + // 跳过尚未启用的live地址 if !v.Valid() { continue @@ -355,8 +356,8 @@ func (t *M4SStream) fetchParseM3U8(fmp4ListUpdateTo float64) (m4s_links []*m4s_l // 解析m3u8 // var tmp []*m4s_link_item var lastNo int - if t.last_m4s != nil { - lastNo, _ = t.last_m4s.getNo() + if lastM4s != nil { + lastNo, _ = lastM4s.getNo() } for _, line := range bytes.Split(m3u8_respon, []byte("\n")) { if len(line) == 0 { @@ -374,6 +375,12 @@ func (t *M4SStream) fetchParseM3U8(fmp4ListUpdateTo float64) (m4s_links []*m4s_l continue } else if bytes.Contains(line, []byte(".m4s")) { m4s_link = string(line) + } else if bytes.HasPrefix(line, []byte("http")) { + // m3u8 指向新连接 + newUrl := strings.TrimSpace(string(line)) + t.log.L(`I: `, `指向新连接`, v.Host(), "=>", F.ParseHost(newUrl)) + v.SetUrl(newUrl) + continue } else { continue } @@ -381,9 +388,9 @@ func (t *M4SStream) fetchParseM3U8(fmp4ListUpdateTo float64) (m4s_links []*m4s_l { // 只增加新的切片 tmpBase := m4s_link - // fmt.Println(tmpBase, t.last_m4s != nil) + // fmt.Println(tmpBase, lastM4s != nil) if tmpBase[0] == 'h' { - if t.last_m4s != nil { + if lastM4s != nil { continue } else { tmpBase = tmpBase[1:] @@ -406,12 +413,12 @@ func (t *M4SStream) fetchParseM3U8(fmp4ListUpdateTo float64) (m4s_links []*m4s_l } if len(m4s_links) == 0 { - if t.last_m4s != nil && - !t.last_m4s.createdTime.IsZero() && - time.Since(t.last_m4s.createdTime).Seconds() > fmp4ListUpdateTo { + if lastM4s != nil && + !lastM4s.createdTime.IsZero() && + time.Since(lastM4s.createdTime).Seconds() > fmp4ListUpdateTo { // 1min后重新启用 t.common.Live[k].DisableAuto() - t.log.L("W: ", fmt.Sprintf("服务器 %s 发生故障 %.2f 秒未产出切片", F.ParseHost(v.Url), time.Since(t.last_m4s.createdTime).Seconds())) + t.log.L("W: ", fmt.Sprintf("服务器 %s 发生故障 %.2f 秒未产出切片", F.ParseHost(v.Url), time.Since(lastM4s.createdTime).Seconds())) if t.common.ValidLive() == nil { e = errors.New("全部切片服务器发生故障") break @@ -423,10 +430,10 @@ func (t *M4SStream) fetchParseM3U8(fmp4ListUpdateTo float64) (m4s_links []*m4s_l } // 检查是否服务器发生故障,产出切片错误 - if t.last_m4s != nil { - timed := m4s_links[len(m4s_links)-1].createdTime.Sub(t.last_m4s.createdTime).Seconds() + if lastM4s != nil { + timed := m4s_links[len(m4s_links)-1].createdTime.Sub(lastM4s.createdTime).Seconds() nos, _ := m4s_links[len(m4s_links)-1].getNo() - noe, _ := t.last_m4s.getNo() + noe := lastNo if (timed > 5 && nos-noe == 0) || (nos-noe > 50) { // 1min后重新启用 t.common.Live[k].DisableAuto() @@ -439,10 +446,8 @@ func (t *M4SStream) fetchParseM3U8(fmp4ListUpdateTo float64) (m4s_links []*m4s_l } } - // m4s_links = append(m4s_links, tmp...) - // 首次下载 - if t.last_m4s == nil { + if lastM4s == nil { return } @@ -549,7 +554,6 @@ func (t *M4SStream) getSavepath() { func (t *M4SStream) saveStream() (e error) { // 清除初始值 - t.last_m4s = nil t.first_buf = nil t.frameCount = 0 @@ -854,7 +858,9 @@ func (t *M4SStream) saveStreamM4s() (e error) { buf = slice.New[byte]() fmp4Decoder = &Fmp4Decoder{} keyframe = slice.New[byte]() + lastM4s *m4s_link_item to = 3 + waitT = 1.0 fmp4ListUpdateTo = 5.0 ) @@ -1056,13 +1062,8 @@ func (t *M4SStream) saveStreamM4s() (e error) { } } - // 避免过于频繁的请求 - if t.last_m4s != nil && time.Since(t.last_m4s.createdTime) < time.Second { - time.Sleep(time.Second) - } - // 获取解析m3u8 - var m4s_links, err = t.fetchParseM3U8(fmp4ListUpdateTo) + var m4s_links, err = t.fetchParseM3U8(lastM4s, fmp4ListUpdateTo) if err != nil { t.log.L(`E: `, `获取解析m3u8发生错误`, err) // if len(download_seq) != 0 { @@ -1074,24 +1075,22 @@ func (t *M4SStream) saveStreamM4s() (e error) { } } - // { - // if t.last_m4s != nil { - // l, _ := t.last_m4s.getNo() - // fmt.Println("last", l) - // } - // for i := 0; i < len(m4s_links); i++ { - // fmt.Println(m4s_links[i].getNo()) - // } - // } + // 避免过于频繁的请求 + if len(m4s_links) < 2 { + waitT += 0.5 + } else if len(m4s_links) > 2 { + waitT -= 0.5 + } + time.Sleep(time.Second * time.Duration(waitT)) // 设置最后的切片 - if t.last_m4s == nil { - t.last_m4s = &m4s_link_item{} - } for i := len(m4s_links) - 1; i >= 0; i-- { // fmt.Println("set last m4s", m4s_links[i].Base) if !m4s_links[i].isInit() && len(m4s_links[i].Base) > 0 { - m4s_links[i].copyTo(t.last_m4s) + if lastM4s == nil { + lastM4s = &m4s_link_item{} + } + m4s_links[i].copyTo(lastM4s) break } } -- 2.39.2