From 711eee31cd017a4d1a80d033f2cf1945df3ac16f Mon Sep 17 00:00:00 2001 From: qydysky <32743305+qydysky@users.noreply.github.com> Date: Sun, 29 Jan 2023 21:55:47 +0800 Subject: [PATCH] =?utf8?q?Fix=20=E5=BD=93=E6=9C=8D=E5=8A=A1=E5=99=A8?= =?utf8?q?=E8=BF=94=E5=9B=9E=E5=88=87=E7=89=87=E8=B6=85=E6=97=B6=E6=97=B6?= =?utf8?q?=EF=BC=8C=E4=B8=B4=E6=97=B6=E7=A6=81=E7=94=A8=E6=AD=A4=E6=9C=8D?= =?utf8?q?=E5=8A=A1=E5=99=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit --- Reply/stream.go | 83 ++++++++++++++++++++++++++----------------------- 1 file changed, 44 insertions(+), 39 deletions(-) diff --git a/Reply/stream.go b/Reply/stream.go index d13d1a5..01285db 100644 --- a/Reply/stream.go +++ b/Reply/stream.go @@ -28,7 +28,6 @@ import ( signal "github.com/qydysky/part/signal" slice "github.com/qydysky/part/slice" pstring "github.com/qydysky/part/strings" - psync "github.com/qydysky/part/sync" ) type M4SStream struct { @@ -38,7 +37,7 @@ type M4SStream struct { config M4SStream_Config //配置 stream_last_modified time.Time //流地址更新时间 // stream_expires int64 //流到期时间 - stream_hosts psync.Map //使用的流服务器 + // stream_hosts psync.Map //使用的流服务器 stream_type string //流类型 Stream_msg *msgq.Msgq //流数据消息 tag:data first_buf []byte //m4s起始块 or flv起始块 @@ -68,7 +67,7 @@ type m4s_link_item struct { Url string // m4s链接 Base string // m4s文件名 status int // 下载状态 0:未下载 1:正在下载 2:下载完成 3:下载失败 - tryDownCount int // 下载次数 当=2时,不再下载,忽略此块 + tryDownCount int // 下载次数 当=3时,不再下载,忽略此块 err error // 下载中出现的错误 data []byte // 下载的数据 createdTime time.Time // 创建时间 @@ -781,38 +780,40 @@ func (t *M4SStream) saveStreamM4s() (e error) { time.Sleep(time.Millisecond * 10) }) + // 故障转移 + if v.status == 3 { + if linkUrl, e := url.Parse(v.Url); e == nil { + oldHost := linkUrl.Host + // 将此切片服务器设置1min停用 + for i := 0; i < len(t.common.Live); i++ { + if liveUrl, e := url.Parse(t.common.Live[i].Url); e == nil { + if oldHost == liveUrl.Host { + t.common.Live[i].ReUpTime = time.Now().Add(time.Minute) + break + } + } + } + // 从其他服务器获取此切片 + for i := 0; i < len(t.common.Live); i++ { + if time.Now().Before(t.common.Live[i].ReUpTime) { + continue + } + if liveUrl, e := url.Parse(t.common.Live[i].Url); e == nil { + linkUrl.Host = liveUrl.Host + break + } + } + t.log.L(`W: `, `切片下载失败,故障转移`, oldHost, ` -> `, linkUrl.Host) + v.Url = linkUrl.String() + } + } + go func(link *m4s_link_item, path string) { - // t.log.L(`I: `, `下载`, link.Base) defer download_limit.UnBlock() link.status = 1 // 设置切片状态为正在下载 link.tryDownCount += 1 - // 故障转移 - if link_url, e := url.Parse(link.Url); e == nil { - if t.stream_hosts.Len() != 1 { - t.stream_hosts.Range(func(key, v interface{}) bool { - if link.status == 3 { - if link_url.Host == key.(string) { - t.stream_hosts.Store(key, false) - return true - } else if v != nil { - return true - } else { - link_url.Host = key.(string) - return false - } - } - - if v != nil { - t.stream_hosts.Store(key, nil) - } - return false - }) - } - link.Url = link_url.String() - } - req := t.reqPool.Get() defer t.reqPool.Put(req) @@ -828,14 +829,18 @@ func (t *M4SStream) saveStreamM4s() (e error) { if !t.config.save_as_mp4 { reqConfig.SaveToPath = path + link.Base } + + // t.log.L(`T: `, `下载`, link.Base) + // defer t.log.L(`T: `, `下载完成`, link.Base, link.status, link.err) + if e := r.Reqf(reqConfig); e != nil && !errors.Is(e, io.EOF) { + // t.log.L(`T: `, `下载错误`, link.Base, e) if !reqf.IsTimeout(e) { // 发生非超时错误 link.err = e - link.tryDownCount = 2 // 设置切片状态为下载失败 - } else { - link.status = 3 // 设置切片状态为下载失败 + link.tryDownCount = 3 // 设置切片状态为下载失败 } + link.status = 3 // 设置切片状态为下载失败 } else { // if usedt := r.UsedTime.Seconds(); usedt > 700 { // t.log.L(`I: `, `hls切片下载慢`, usedt, `ms`) @@ -870,7 +875,7 @@ func (t *M4SStream) saveStreamM4s() (e error) { e = err return } - if download_seq[k].tryDownCount >= 2 { + if download_seq[k].tryDownCount >= 3 { //下载了2次,任未下载成功,忽略此块 t.putM4s(download_seq[k]) download_seq = append(download_seq[:k], download_seq[k+1:]...) @@ -1106,12 +1111,12 @@ func (t *M4SStream) Start() bool { continue } - // 设置全部服务 - for _, v := range t.common.Live { - if url_struct, e := url.Parse(v.Url); e == nil { - t.stream_hosts.Store(url_struct.Hostname(), nil) - } - } + // // 设置全部服务 + // for _, v := range t.common.Live { + // if url_struct, e := url.Parse(v.Url); e == nil { + // t.stream_hosts.Store(url_struct.Hostname(), v.) + // } + // } // 保存流 err := t.saveStream() -- 2.39.2