]> 127.0.0.1 Git - bili_danmu/.git/commitdiff
Fix fmp4故障转移
authorqydysky <qydysky@foxmail.com>
Wed, 19 Jun 2024 23:22:59 +0000 (23:22 +0000)
committerqydysky <qydysky@foxmail.com>
Wed, 19 Jun 2024 23:22:59 +0000 (23:22 +0000)
CV/Var.go
F/api.go
Reply/stream.go

index 722687b8d0d2ac11244edff89d57a1623c89622e..438abd41bbd31d6d218990c49f8b41f45327466e 100644 (file)
--- a/CV/Var.go
+++ b/CV/Var.go
@@ -87,6 +87,7 @@ type Common struct {
 
 type LiveQn struct {
        Url          string `json:"-"`
+       Uuid         string `json:"-"`
        Codec        string
        ReUpTime     time.Time
        CreateTime   time.Time
@@ -200,6 +201,16 @@ func (t *Common) Copy() *Common {
 }
 
 // 自动停用机制
+func (t *Common) DisableLiveAutoByUuid(uuid string) (hadDisable bool) {
+       for i := 0; i < len(t.Live); i++ {
+               if t.Live[i].Uuid == uuid {
+                       return t.Live[i].DisableAuto()
+               }
+       }
+       return
+}
+
+// Deprecated: 存在缺陷
 func (t *Common) DisableLiveAuto(host string) (hadDisable bool) {
        for i := 0; i < len(t.Live); i++ {
                if liveUrl, e := url.Parse(t.Live[i].Url); e == nil {
@@ -211,6 +222,7 @@ func (t *Common) DisableLiveAuto(host string) (hadDisable bool) {
        return
 }
 
+// Deprecated: 存在缺陷
 func (t *Common) DisableLive(host string, reUpTime time.Time) {
        for i := 0; i < len(t.Live); i++ {
                if liveUrl, e := url.Parse(t.Live[i].Url); e == nil {
index 25343cf0c84a8bf0e82248ebe0c659773acac326..08c06c2568d4debc41e58e54a0caaef6a65c9d60 100644 (file)
--- a/F/api.go
+++ b/F/api.go
@@ -13,6 +13,7 @@ import (
        "sync/atomic"
        "time"
 
+       "github.com/google/uuid"
        c "github.com/qydysky/bili_danmu/CV"
        J "github.com/qydysky/bili_danmu/Json"
        "github.com/skratchdot/open-golang/open"
@@ -462,6 +463,7 @@ func (t *GetFunc) configStreamType(sts []struct {
                                        //直播流链接
                                        for _, v1 := range v.URLInfo {
                                                item := c.LiveQn{
+                                                       Uuid:       uuid.NewString(),
                                                        Codec:      v.CodecName,
                                                        Url:        v1.Host + v.BaseURL + v1.Extra,
                                                        CreateTime: time.Now(),
index f4b91f21fa3ab502cdc8b07debc1148bd1ea7de6..f4121604ef1f00dc4866ab20ab8b78685a569bf9 100644 (file)
@@ -91,6 +91,14 @@ type m4s_link_item struct {
        data         *slice.Buf[byte] // 下载的数据
        createdTime  time.Time        // 创建时间
        pooledTime   time.Time        // 到pool时间
+       SerUuid      string           // 使用的流服务器uuid
+}
+
+// 更换服务器
+func (t *m4s_link_item) replaceSer(v *c.LiveQn) {
+       t.SerUuid = v.Uuid
+       t.Url = F.ResolveReferenceLast(v.Url, t.Base+"?trid="+F.ParseQuery(v.Url, "trid="))
+       t.status = 0
 }
 
 func (t *m4s_link_item) copyTo(to *m4s_link_item) {
@@ -462,6 +470,7 @@ func (t *M4SStream) fetchParseM3U8(lastM4s *m4s_link_item, fmp4ListUpdateTo floa
 
                        //将切片添加到返回切片数组
                        p := t.getM4s()
+                       p.SerUuid = v.Uuid
                        p.Url = F.ResolveReferenceLast(v.Url, m4s_link+"?trid="+F.ParseQuery(v.Url, "trid="))
                        p.Base = m4s_link
                        p.isHeader = isHeader
@@ -517,6 +526,7 @@ func (t *M4SStream) fetchParseM3U8(lastM4s *m4s_link_item, fmp4ListUpdateTo floa
                        for guess_no := linksFirstNo - 1; guess_no > lastNo; guess_no-- {
                                //将切片添加到返回切片数组前
                                p := t.getM4s()
+                               p.SerUuid = v.Uuid
                                p.Base = strconv.Itoa(guess_no) + `.m4s`
                                p.isHeader = false
                                p.Url = F.ResolveReferenceLast(v.Url, p.Base)
@@ -1029,7 +1039,7 @@ func (t *M4SStream) saveStreamM4s() (e error) {
 
                // 下载切片
                for {
-                       downOk := true
+                       var downErr atomic.Bool
                        dCount := 0
                        for i := 0; i < len(download_seq); i++ {
                                // 已下载但还未移除的切片
@@ -1047,19 +1057,21 @@ func (t *M4SStream) saveStreamM4s() (e error) {
                                // 故障转移
                                if download_seq[i].status == 3 {
                                        if linkUrl, e := url.Parse(download_seq[i].Url); e == nil {
-                                               oldHost := linkUrl.Host
                                                // 将此切片服务器设置停用
-                                               hadDisable := t.common.DisableLiveAuto(oldHost)
+                                               // hadDisable := t.common.DisableLiveAuto(oldHost)
+                                               hadDisable := t.common.DisableLiveAutoByUuid(download_seq[i].SerUuid)
                                                // 从其他服务器获取此切片
                                                if vl := t.common.ValidLive(); vl == nil {
                                                        return errors.New(`全部流服务器故障`)
                                                } else {
-                                                       linkUrl.Host = vl.Host()
+                                                       download_seq[i].replaceSer(vl)
                                                        if !hadDisable {
-                                                               t.log.L(`W: `, `切片下载失败,故障转移`, oldHost, ` -> `, linkUrl.Host)
+                                                               t.log.L(`W: `, `切片下载失败,故障转移`, linkUrl.Host, ` -> `, vl.Host())
                                                        }
                                                }
-                                               download_seq[i].Url = linkUrl.String()
+                                               // download_seq[i].Url = linkUrl.String()
+                                       } else {
+                                               return errors.New(`切片url错误`)
                                        }
                                }
 
@@ -1075,7 +1087,7 @@ func (t *M4SStream) saveStreamM4s() (e error) {
                                                        `Connection`: `close`,
                                                },
                                        }); e != nil {
-                                               downOk = false
+                                               downErr.Store(true)
                                                t.log.L(`W: `, `切片下载失败`, link.Base, e)
                                        }
                                }(download_seq[i])
@@ -1084,7 +1096,7 @@ func (t *M4SStream) saveStreamM4s() (e error) {
                        // 等待队列下载完成
                        downloadLimit.BlockAll()()
 
-                       if downOk {
+                       if !downErr.Load() {
                                break
                        }
                }
@@ -1142,9 +1154,10 @@ func (t *M4SStream) saveStreamM4s() (e error) {
                                e = err
                                if skipErrFrame {
                                        // 将此切片服务器设置停用
-                                       if u, e := url.Parse(cu.Url); e == nil {
-                                               t.common.DisableLiveAuto(u.Host)
-                                       }
+                                       // if u, e := url.Parse(cu.Url); e == nil {
+                                       t.common.DisableLiveAutoByUuid(cu.SerUuid)
+                                       // t.common.DisableLiveAuto(u.Host)
+                                       // }
                                } else {
                                        return
                                }