]> 127.0.0.1 Git - bili_danmu/.git/commitdiff
Fix 当服务器返回切片超时时,临时禁用此服务器
authorqydysky <32743305+qydysky@users.noreply.github.com>
Sun, 29 Jan 2023 13:55:47 +0000 (21:55 +0800)
committerqydysky <32743305+qydysky@users.noreply.github.com>
Sun, 29 Jan 2023 13:55:47 +0000 (21:55 +0800)
Reply/stream.go

index d13d1a51d9cd35b556d7f080a0a5255e67bf61e9..01285dbaabc4a4b43fbb9f33d114f121c0ca9eb4 100644 (file)
@@ -28,7 +28,6 @@ import (
        signal "github.com/qydysky/part/signal"
        slice "github.com/qydysky/part/slice"
        pstring "github.com/qydysky/part/strings"
-       psync "github.com/qydysky/part/sync"
 )
 
 type M4SStream struct {
@@ -38,7 +37,7 @@ type M4SStream struct {
        config               M4SStream_Config   //配置
        stream_last_modified time.Time          //流地址更新时间
        // stream_expires       int64              //流到期时间
-       stream_hosts      psync.Map  //使用的流服务器
+       // stream_hosts      psync.Map  //使用的流服务器
        stream_type       string     //流类型
        Stream_msg        *msgq.Msgq //流数据消息 tag:data
        first_buf         []byte     //m4s起始块 or flv起始块
@@ -68,7 +67,7 @@ type m4s_link_item struct {
        Url          string    // m4s链接
        Base         string    // m4s文件名
        status       int       // 下载状态 0:未下载 1:正在下载 2:下载完成 3:下载失败
-       tryDownCount int       // 下载次数 当=2时,不再下载,忽略此块
+       tryDownCount int       // 下载次数 当=3时,不再下载,忽略此块
        err          error     // 下载中出现的错误
        data         []byte    // 下载的数据
        createdTime  time.Time // 创建时间
@@ -781,38 +780,40 @@ func (t *M4SStream) saveStreamM4s() (e error) {
                                        time.Sleep(time.Millisecond * 10)
                                })
 
+                               // 故障转移
+                               if v.status == 3 {
+                                       if linkUrl, e := url.Parse(v.Url); e == nil {
+                                               oldHost := linkUrl.Host
+                                               // 将此切片服务器设置1min停用
+                                               for i := 0; i < len(t.common.Live); i++ {
+                                                       if liveUrl, e := url.Parse(t.common.Live[i].Url); e == nil {
+                                                               if oldHost == liveUrl.Host {
+                                                                       t.common.Live[i].ReUpTime = time.Now().Add(time.Minute)
+                                                                       break
+                                                               }
+                                                       }
+                                               }
+                                               // 从其他服务器获取此切片
+                                               for i := 0; i < len(t.common.Live); i++ {
+                                                       if time.Now().Before(t.common.Live[i].ReUpTime) {
+                                                               continue
+                                                       }
+                                                       if liveUrl, e := url.Parse(t.common.Live[i].Url); e == nil {
+                                                               linkUrl.Host = liveUrl.Host
+                                                               break
+                                                       }
+                                               }
+                                               t.log.L(`W: `, `切片下载失败,故障转移`, oldHost, ` -> `, linkUrl.Host)
+                                               v.Url = linkUrl.String()
+                                       }
+                               }
+
                                go func(link *m4s_link_item, path string) {
-                                       // t.log.L(`I: `, `下载`, link.Base)
                                        defer download_limit.UnBlock()
 
                                        link.status = 1 // 设置切片状态为正在下载
                                        link.tryDownCount += 1
 
-                                       // 故障转移
-                                       if link_url, e := url.Parse(link.Url); e == nil {
-                                               if t.stream_hosts.Len() != 1 {
-                                                       t.stream_hosts.Range(func(key, v interface{}) bool {
-                                                               if link.status == 3 {
-                                                                       if link_url.Host == key.(string) {
-                                                                               t.stream_hosts.Store(key, false)
-                                                                               return true
-                                                                       } else if v != nil {
-                                                                               return true
-                                                                       } else {
-                                                                               link_url.Host = key.(string)
-                                                                               return false
-                                                                       }
-                                                               }
-
-                                                               if v != nil {
-                                                                       t.stream_hosts.Store(key, nil)
-                                                               }
-                                                               return false
-                                                       })
-                                               }
-                                               link.Url = link_url.String()
-                                       }
-
                                        req := t.reqPool.Get()
                                        defer t.reqPool.Put(req)
 
@@ -828,14 +829,18 @@ func (t *M4SStream) saveStreamM4s() (e error) {
                                        if !t.config.save_as_mp4 {
                                                reqConfig.SaveToPath = path + link.Base
                                        }
+
+                                       // t.log.L(`T: `, `下载`, link.Base)
+                                       // defer t.log.L(`T: `, `下载完成`, link.Base, link.status, link.err)
+
                                        if e := r.Reqf(reqConfig); e != nil && !errors.Is(e, io.EOF) {
+                                               // t.log.L(`T: `, `下载错误`, link.Base, e)
                                                if !reqf.IsTimeout(e) {
                                                        // 发生非超时错误
                                                        link.err = e
-                                                       link.tryDownCount = 2 // 设置切片状态为下载失败
-                                               } else {
-                                                       link.status = 3 // 设置切片状态为下载失败
+                                                       link.tryDownCount = 3 // 设置切片状态为下载失败
                                                }
+                                               link.status = 3 // 设置切片状态为下载失败
                                        } else {
                                                // if usedt := r.UsedTime.Seconds(); usedt > 700 {
                                                //      t.log.L(`I: `, `hls切片下载慢`, usedt, `ms`)
@@ -870,7 +875,7 @@ func (t *M4SStream) saveStreamM4s() (e error) {
                                        e = err
                                        return
                                }
-                               if download_seq[k].tryDownCount >= 2 {
+                               if download_seq[k].tryDownCount >= 3 {
                                        //下载了2次,任未下载成功,忽略此块
                                        t.putM4s(download_seq[k])
                                        download_seq = append(download_seq[:k], download_seq[k+1:]...)
@@ -1106,12 +1111,12 @@ func (t *M4SStream) Start() bool {
                                continue
                        }
 
-                       // 设置全部服务
-                       for _, v := range t.common.Live {
-                               if url_struct, e := url.Parse(v.Url); e == nil {
-                                       t.stream_hosts.Store(url_struct.Hostname(), nil)
-                               }
-                       }
+                       // // 设置全部服务
+                       // for _, v := range t.common.Live {
+                       //      if url_struct, e := url.Parse(v.Url); e == nil {
+                       //              t.stream_hosts.Store(url_struct.Hostname(), v.)
+                       //      }
+                       // }
 
                        // 保存流
                        err := t.saveStream()