signal "github.com/qydysky/part/signal"
slice "github.com/qydysky/part/slice"
pstring "github.com/qydysky/part/strings"
- psync "github.com/qydysky/part/sync"
)
type M4SStream struct {
config M4SStream_Config //配置
stream_last_modified time.Time //流地址更新时间
// stream_expires int64 //流到期时间
- stream_hosts psync.Map //使用的流服务器
+ // stream_hosts psync.Map //使用的流服务器
stream_type string //流类型
Stream_msg *msgq.Msgq //流数据消息 tag:data
first_buf []byte //m4s起始块 or flv起始块
Url string // m4s链接
Base string // m4s文件名
status int // 下载状态 0:未下载 1:正在下载 2:下载完成 3:下载失败
- tryDownCount int // 下载次数 当=2时,不再下载,忽略此块
+ tryDownCount int // 下载次数 当=3时,不再下载,忽略此块
err error // 下载中出现的错误
data []byte // 下载的数据
createdTime time.Time // 创建时间
time.Sleep(time.Millisecond * 10)
})
+ // 故障转移
+ if v.status == 3 {
+ if linkUrl, e := url.Parse(v.Url); e == nil {
+ oldHost := linkUrl.Host
+ // 将此切片服务器设置1min停用
+ for i := 0; i < len(t.common.Live); i++ {
+ if liveUrl, e := url.Parse(t.common.Live[i].Url); e == nil {
+ if oldHost == liveUrl.Host {
+ t.common.Live[i].ReUpTime = time.Now().Add(time.Minute)
+ break
+ }
+ }
+ }
+ // 从其他服务器获取此切片
+ for i := 0; i < len(t.common.Live); i++ {
+ if time.Now().Before(t.common.Live[i].ReUpTime) {
+ continue
+ }
+ if liveUrl, e := url.Parse(t.common.Live[i].Url); e == nil {
+ linkUrl.Host = liveUrl.Host
+ break
+ }
+ }
+ t.log.L(`W: `, `切片下载失败,故障转移`, oldHost, ` -> `, linkUrl.Host)
+ v.Url = linkUrl.String()
+ }
+ }
+
go func(link *m4s_link_item, path string) {
- // t.log.L(`I: `, `下载`, link.Base)
defer download_limit.UnBlock()
link.status = 1 // 设置切片状态为正在下载
link.tryDownCount += 1
- // 故障转移
- if link_url, e := url.Parse(link.Url); e == nil {
- if t.stream_hosts.Len() != 1 {
- t.stream_hosts.Range(func(key, v interface{}) bool {
- if link.status == 3 {
- if link_url.Host == key.(string) {
- t.stream_hosts.Store(key, false)
- return true
- } else if v != nil {
- return true
- } else {
- link_url.Host = key.(string)
- return false
- }
- }
-
- if v != nil {
- t.stream_hosts.Store(key, nil)
- }
- return false
- })
- }
- link.Url = link_url.String()
- }
-
req := t.reqPool.Get()
defer t.reqPool.Put(req)
if !t.config.save_as_mp4 {
reqConfig.SaveToPath = path + link.Base
}
+
+ // t.log.L(`T: `, `下载`, link.Base)
+ // defer t.log.L(`T: `, `下载完成`, link.Base, link.status, link.err)
+
if e := r.Reqf(reqConfig); e != nil && !errors.Is(e, io.EOF) {
+ // t.log.L(`T: `, `下载错误`, link.Base, e)
if !reqf.IsTimeout(e) {
// 发生非超时错误
link.err = e
- link.tryDownCount = 2 // 设置切片状态为下载失败
- } else {
- link.status = 3 // 设置切片状态为下载失败
+ link.tryDownCount = 3 // 设置切片状态为下载失败
}
+ link.status = 3 // 设置切片状态为下载失败
} else {
// if usedt := r.UsedTime.Seconds(); usedt > 700 {
// t.log.L(`I: `, `hls切片下载慢`, usedt, `ms`)
e = err
return
}
- if download_seq[k].tryDownCount >= 2 {
+ if download_seq[k].tryDownCount >= 3 {
//下载了2次,任未下载成功,忽略此块
t.putM4s(download_seq[k])
download_seq = append(download_seq[:k], download_seq[k+1:]...)
continue
}
- // 设置全部服务
- for _, v := range t.common.Live {
- if url_struct, e := url.Parse(v.Url); e == nil {
- t.stream_hosts.Store(url_struct.Hostname(), nil)
- }
- }
+ // // 设置全部服务
+ // for _, v := range t.common.Live {
+ // if url_struct, e := url.Parse(v.Url); e == nil {
+ // t.stream_hosts.Store(url_struct.Hostname(), v.)
+ // }
+ // }
// 保存流
err := t.saveStream()