"flag"
"fmt"
"io"
+ "net/url"
"strings"
"testing"
"time"
Expires int //流到期时间
}
+func (t *LiveQn) Host() string {
+ if liveUrl, e := url.Parse(t.Url); e == nil {
+ return liveUrl.Host
+ } else {
+ panic(e)
+ }
+}
+
+func (t *LiveQn) Valid() bool {
+ return time.Now().After(t.ReUpTime)
+}
+
+func (t *LiveQn) Disable(reUpTime time.Time) {
+ t.ReUpTime = reUpTime
+}
+
+func (t *Common) DisableLive(host string, reUpTime time.Time) {
+ for i := 0; i < len(t.Live); i++ {
+ if liveUrl, e := url.Parse(t.Live[i].Url); e == nil {
+ if host == liveUrl.Host {
+ t.Live[i].ReUpTime = reUpTime
+ break
+ }
+ }
+ }
+}
+
+func (t *Common) ValidLive() *LiveQn {
+ for i := 0; i < len(t.Live); i++ {
+ if time.Now().Before(t.Live[i].ReUpTime) {
+ continue
+ }
+ return &t.Live[i]
+ }
+ return nil
+}
+
type StreamType struct {
Protocol_name string
Format_name string
}
func (t *M4SStream) fetchParseM3U8() (m4s_links []*m4s_link_item, m3u8_addon []byte, e error) {
+ if t.common.ValidLive() == nil {
+ e = errors.New("全部流服务器发生故障")
+ return
+ }
+
// 开始请求
req := t.reqPool.Get()
defer t.reqPool.Put(req)
// 请求解析m3u8内容
for k, v := range t.common.Live {
// 跳过尚未启用的live地址
- if time.Now().Before(v.ReUpTime) {
+ if !v.Valid() {
continue
}
if err := r.Reqf(rval); err != nil {
// 1min后重新启用
- t.common.Live[k].ReUpTime = time.Now().Add(time.Minute)
+ t.common.Live[k].Disable(time.Now().Add(time.Minute))
t.log.L("W: ", fmt.Sprintf("服务器 %s 发生故障 %s", m3u8_url.Host, err.Error()))
- if k == len(t.common.Live)-1 {
- e = errors.New("全部切片服务器发生故障")
+ if t.common.ValidLive() == nil {
+ e = errors.New("全部流服务器发生故障")
+ break
}
continue
}
noe, _ := t.last_m4s.getNo()
if timed > 5 && nos-noe == 0 {
// 1min后重新启用
- t.common.Live[k].ReUpTime = time.Now().Add(time.Minute)
+ t.common.Live[k].Disable(time.Now().Add(time.Minute))
t.log.L("W: ", fmt.Sprintf("服务器 %s 发生故障 %d 秒产出了 %d 切片", m3u8_url.Host, int(timed), nos-noe))
- if k == len(t.common.Live)-1 {
+ if t.common.ValidLive() == nil {
e = errors.New("全部切片服务器发生故障")
+ break
}
continue
}
if linkUrl, e := url.Parse(v.Url); e == nil {
oldHost := linkUrl.Host
// 将此切片服务器设置1min停用
- for i := 0; i < len(t.common.Live); i++ {
- if liveUrl, e := url.Parse(t.common.Live[i].Url); e == nil {
- if oldHost == liveUrl.Host {
- t.common.Live[i].ReUpTime = time.Now().Add(time.Minute)
- break
- }
- }
- }
+ t.common.DisableLive(oldHost, time.Now().Add(time.Minute))
// 从其他服务器获取此切片
- for i := 0; i < len(t.common.Live); i++ {
- if time.Now().Before(t.common.Live[i].ReUpTime) {
- continue
- }
- if liveUrl, e := url.Parse(t.common.Live[i].Url); e == nil {
- linkUrl.Host = liveUrl.Host
- break
- }
+ if vl := t.common.ValidLive(); vl == nil {
+ return errors.New(`全部流服务器故障`)
+ } else {
+ t.log.L(`W: `, `切片下载失败,故障转移`, oldHost, ` -> `, linkUrl.Host)
+ linkUrl.Host = vl.Host()
}
- t.log.L(`W: `, `切片下载失败,故障转移`, oldHost, ` -> `, linkUrl.Host)
v.Url = linkUrl.String()
}
}