import (
"bytes"
+ "context"
"encoding/base64"
"errors"
"fmt"
}
func (t *M4SStream) saveStreamFlv() (e error) {
- //对每个直播流进行尝试
- for _, v := range t.common.Live {
+ for {
+ v := t.common.ValidLive()
+ if v == nil {
+ return errors.New("未能找到可用流服务器")
+ }
+
surl, err := url.Parse(v.Url)
if err != nil {
t.log.L(`E: `, err)
e = err
+ v.DisableAuto()
continue
}
},
}); e != nil && reqf.IsTimeout(e) {
t.reqPool.Put(r)
+ v.DisableAuto()
continue
}
- // 如果被主动关闭,则退出saveStreamFlv,否则继续尝试其他live
- needStop := signal.Init()
+ cancelC, cancel := context.WithCancel(context.Background())
{
go func() {
tsc, tscf := t.Status.WaitC()
defer tscf()
- sc, scf := needStop.WaitC()
- defer scf()
select {
//停止录制
case <-tsc:
r.Cancel()
//当前连接终止
- case <-sc:
+ case <-cancelC.Done():
}
}()
timer := time.NewTicker(5 * time.Second)
defer timer.Stop()
- sc, scf := needStop.WaitC()
- defer scf()
-
for {
select {
- case <-sc:
+ case <-cancelC.Done():
return
case curT := <-timer.C:
if curT.Unix()-leastReadUnix.Load() > 5 {
if t.config.want_qn != int(v) {
t.log.L(`I: `, "直播流清晰度改变:", t.common.Qn[t.config.want_qn], "=>", t.common.Qn[int(v)])
t.config.want_qn = int(v)
- needStop.Done()
r.Cancel()
return
}
t.log.L(`E: `, `flv下载失败:`, err)
}
}
+ v.DisableAuto()
}
+ cancel()
t.reqPool.Put(r)
-
- if needStop.Islive() {
- needStop.Done()
- } else {
- return
- }
}
-
- e = errors.New("未能找到可用流服务器")
- return
}
func (t *M4SStream) saveStreamM4s() (e error) {