]> 127.0.0.1 Git - bili_danmu/.git/commitdiff
Improve 重写流服务器管理
authorqydysky <32743305+qydysky@users.noreply.github.com>
Sun, 29 Jan 2023 17:07:46 +0000 (01:07 +0800)
committerqydysky <32743305+qydysky@users.noreply.github.com>
Sun, 29 Jan 2023 17:07:46 +0000 (01:07 +0800)
CV/Var.go
Reply/stream.go

index 0032dd7d36e804ac35664de7259ec867ce1acca4..3e215e22b0b2e63345cbf21453ccf56c59e983f1 100644 (file)
--- a/CV/Var.go
+++ b/CV/Var.go
@@ -6,6 +6,7 @@ import (
        "flag"
        "fmt"
        "io"
+       "net/url"
        "strings"
        "testing"
        "time"
@@ -61,6 +62,43 @@ type LiveQn struct {
        Expires  int //流到期时间
 }
 
+func (t *LiveQn) Host() string {
+       if liveUrl, e := url.Parse(t.Url); e == nil {
+               return liveUrl.Host
+       } else {
+               panic(e)
+       }
+}
+
+func (t *LiveQn) Valid() bool {
+       return time.Now().After(t.ReUpTime)
+}
+
+func (t *LiveQn) Disable(reUpTime time.Time) {
+       t.ReUpTime = reUpTime
+}
+
+func (t *Common) DisableLive(host string, reUpTime time.Time) {
+       for i := 0; i < len(t.Live); i++ {
+               if liveUrl, e := url.Parse(t.Live[i].Url); e == nil {
+                       if host == liveUrl.Host {
+                               t.Live[i].ReUpTime = reUpTime
+                               break
+                       }
+               }
+       }
+}
+
+func (t *Common) ValidLive() *LiveQn {
+       for i := 0; i < len(t.Live); i++ {
+               if time.Now().Before(t.Live[i].ReUpTime) {
+                       continue
+               }
+               return &t.Live[i]
+       }
+       return nil
+}
+
 type StreamType struct {
        Protocol_name string
        Format_name   string
index bf5f6fd676c99665c1575ae077c89d914502fc67..494a6f5794c0e21ee0c8fb4425c920a657f207d0 100644 (file)
@@ -251,6 +251,11 @@ func (t *M4SStream) fetchCheckStream() bool {
 }
 
 func (t *M4SStream) fetchParseM3U8() (m4s_links []*m4s_link_item, m3u8_addon []byte, e error) {
+       if t.common.ValidLive() == nil {
+               e = errors.New("全部流服务器发生故障")
+               return
+       }
+
        // 开始请求
        req := t.reqPool.Get()
        defer t.reqPool.Put(req)
@@ -259,7 +264,7 @@ func (t *M4SStream) fetchParseM3U8() (m4s_links []*m4s_link_item, m3u8_addon []b
        // 请求解析m3u8内容
        for k, v := range t.common.Live {
                // 跳过尚未启用的live地址
-               if time.Now().Before(v.ReUpTime) {
+               if !v.Valid() {
                        continue
                }
 
@@ -294,10 +299,11 @@ func (t *M4SStream) fetchParseM3U8() (m4s_links []*m4s_link_item, m3u8_addon []b
 
                if err := r.Reqf(rval); err != nil {
                        // 1min后重新启用
-                       t.common.Live[k].ReUpTime = time.Now().Add(time.Minute)
+                       t.common.Live[k].Disable(time.Now().Add(time.Minute))
                        t.log.L("W: ", fmt.Sprintf("服务器 %s 发生故障 %s", m3u8_url.Host, err.Error()))
-                       if k == len(t.common.Live)-1 {
-                               e = errors.New("全部切片服务器发生故障")
+                       if t.common.ValidLive() == nil {
+                               e = errors.New("全部流服务器发生故障")
+                               break
                        }
                        continue
                }
@@ -410,10 +416,11 @@ func (t *M4SStream) fetchParseM3U8() (m4s_links []*m4s_link_item, m3u8_addon []b
                        noe, _ := t.last_m4s.getNo()
                        if timed > 5 && nos-noe == 0 {
                                // 1min后重新启用
-                               t.common.Live[k].ReUpTime = time.Now().Add(time.Minute)
+                               t.common.Live[k].Disable(time.Now().Add(time.Minute))
                                t.log.L("W: ", fmt.Sprintf("服务器 %s 发生故障 %d 秒产出了 %d 切片", m3u8_url.Host, int(timed), nos-noe))
-                               if k == len(t.common.Live)-1 {
+                               if t.common.ValidLive() == nil {
                                        e = errors.New("全部切片服务器发生故障")
+                                       break
                                }
                                continue
                        }
@@ -785,25 +792,14 @@ func (t *M4SStream) saveStreamM4s() (e error) {
                                        if linkUrl, e := url.Parse(v.Url); e == nil {
                                                oldHost := linkUrl.Host
                                                // 将此切片服务器设置1min停用
-                                               for i := 0; i < len(t.common.Live); i++ {
-                                                       if liveUrl, e := url.Parse(t.common.Live[i].Url); e == nil {
-                                                               if oldHost == liveUrl.Host {
-                                                                       t.common.Live[i].ReUpTime = time.Now().Add(time.Minute)
-                                                                       break
-                                                               }
-                                                       }
-                                               }
+                                               t.common.DisableLive(oldHost, time.Now().Add(time.Minute))
                                                // 从其他服务器获取此切片
-                                               for i := 0; i < len(t.common.Live); i++ {
-                                                       if time.Now().Before(t.common.Live[i].ReUpTime) {
-                                                               continue
-                                                       }
-                                                       if liveUrl, e := url.Parse(t.common.Live[i].Url); e == nil {
-                                                               linkUrl.Host = liveUrl.Host
-                                                               break
-                                                       }
+                                               if vl := t.common.ValidLive(); vl == nil {
+                                                       return errors.New(`全部流服务器故障`)
+                                               } else {
+                                                       t.log.L(`W: `, `切片下载失败,故障转移`, oldHost, ` -> `, linkUrl.Host)
+                                                       linkUrl.Host = vl.Host()
                                                }
-                                               t.log.L(`W: `, `切片下载失败,故障转移`, oldHost, ` -> `, linkUrl.Host)
                                                v.Url = linkUrl.String()
                                        }
                                }