return errors.New("未能找到可用流服务器")
}
- var err error
- surl, err = url.Parse(v.Url)
- if err != nil {
- t.log.L(`E: `, err)
- e = err
- v.DisableAuto()
- continue
+ //reset e
+ e = nil
+
+ {
+ var err error
+ surl, err = url.Parse(v.Url)
+ if err != nil {
+ t.log.L(`E: `, err)
+ e = err
+ v.DisableAuto()
+ continue
+ }
}
//结束退出
continue
}
- break
- }
+ // flv获取
+ cancelC, cancel := context.WithCancel(context.Background())
+ {
+ go func() {
+ tsc, tscf := t.Status.WaitC()
+ defer tscf()
- cancelC, cancel := context.WithCancel(context.Background())
- defer cancel()
- {
- go func() {
- tsc, tscf := t.Status.WaitC()
- defer tscf()
+ select {
+ //停止录制
+ case <-tsc:
+ cancel()
+ //当前连接终止
+ case <-cancelC.Done():
+ }
+ }()
- select {
- //停止录制
- case <-tsc:
- cancel()
- //当前连接终止
- case <-cancelC.Done():
+ pipe := pio.NewPipe()
+ var (
+ leastReadUnix atomic.Int64
+ readTO int64 = 5
+ )
+ leastReadUnix.Store(time.Now().Unix())
+ if v, ok := c.C.K_v.LoadV(`flv断流超时s`).(float64); ok && int64(v) > readTO {
+ readTO = int64(v)
}
- }()
-
- pipe := pio.NewPipe()
- var (
- leastReadUnix atomic.Int64
- readTO int64 = 5
- )
- leastReadUnix.Store(time.Now().Unix())
- // read timeout
- go func() {
- timer := time.NewTicker(time.Duration(readTO * int64(time.Second)))
- defer timer.Stop()
+ // read timeout
+ go func() {
+ timer := time.NewTicker(time.Duration(readTO * int64(time.Second)))
+ defer timer.Stop()
- for {
- select {
- case <-cancelC.Done():
- return
- case curT := <-timer.C:
- if curT.Unix()-leastReadUnix.Load() > readTO {
- t.log.L(`W: `, fmt.Sprintf("%vs未接收到有效数据", readTO))
- // 5s未接收到任何数据
- cancel()
+ for {
+ select {
+ case <-cancelC.Done():
return
- }
- if v, ok := c.C.K_v.LoadV(`直播流清晰度`).(float64); ok {
- if t.config.want_qn != int(v) {
- t.log.L(`I: `, "直播流清晰度改变:", t.common.Qn[t.config.want_qn], "=>", t.common.Qn[int(v)])
- t.config.want_qn = int(v)
+ case curT := <-timer.C:
+ if curT.Unix()-leastReadUnix.Load() > readTO {
+ t.log.L(`W: `, fmt.Sprintf("%vs未接收到有效数据", readTO))
+ // 5s未接收到任何数据
cancel()
return
}
+ if v, ok := c.C.K_v.LoadV(`直播流清晰度`).(float64); ok {
+ if t.config.want_qn != int(v) {
+ t.log.L(`I: `, "直播流清晰度改变:", t.common.Qn[t.config.want_qn], "=>", t.common.Qn[int(v)])
+ t.config.want_qn = int(v)
+ cancel()
+ return
+ }
+ }
}
}
- }
- }()
-
- // read
- go func() {
- var (
- ticker = time.NewTicker(time.Second)
- buff = slice.New[byte]()
- keyframe = slice.New[byte]()
- buf = make([]byte, 1<<16)
- frameCount = 0
- )
-
- for {
- n, e := pipe.Read(buf)
- _ = buff.Append(buf[:n])
- if e != nil {
- cancel()
- break
- }
-
- skip := true
- select {
- case <-ticker.C:
- skip = false
- default:
- }
- if skip {
- continue
- }
-
- if !buff.IsEmpty() {
- keyframe.Reset()
- front_buf, last_available_offset, e := Search_stream_tag(buff.GetPureBuf(), keyframe)
+ }()
+
+ // read
+ go func() {
+ var (
+ ticker = time.NewTicker(time.Second)
+ buff = slice.New[byte]()
+ keyframe = slice.New[byte]()
+ buf = make([]byte, 1<<16)
+ frameCount = 0
+ )
+
+ for {
+ n, e := pipe.Read(buf)
+ _ = buff.Append(buf[:n])
if e != nil {
- if strings.Contains(e.Error(), `no found available tag`) {
- continue
- }
- //丢弃所有数据
- buff.Reset()
+ cancel()
+ break
}
- // 存在有效数据
- if len(front_buf) != 0 || keyframe.Size() != 0 {
- leastReadUnix.Store(time.Now().Unix())
+
+ skip := true
+ select {
+ case <-ticker.C:
+ skip = false
+ default:
}
- if len(front_buf) != 0 && len(t.first_buf) == 0 {
- t.first_buf = make([]byte, len(front_buf))
- copy(t.first_buf, front_buf)
- // fmt.Println("write front_buf")
- t.Stream_msg.PushLock_tag(`data`, t.first_buf)
- t.msg.Push_tag(`load`, t)
+ if skip {
+ continue
}
- if keyframe.Size() != 0 {
- if len(t.first_buf) == 0 {
- t.log.L(`W: `, `flv未接收到起始段`)
- cancel()
- break
- }
- t.bootBufPush(keyframe.GetPureBuf())
+
+ if !buff.IsEmpty() {
keyframe.Reset()
- t.Stream_msg.PushLock_tag(`data`, t.boot_buf)
- frameCount += 1
- if frameCount == 1 {
- t.msg.Push_tag(`firstFrame`, t)
+ front_buf, last_available_offset, e := Search_stream_tag(buff.GetPureBuf(), keyframe)
+ if e != nil {
+ if strings.Contains(e.Error(), `no found available tag`) {
+ continue
+ }
+ //丢弃所有数据
+ buff.Reset()
+ }
+ // 存在有效数据
+ if len(front_buf) != 0 || keyframe.Size() != 0 {
+ leastReadUnix.Store(time.Now().Unix())
+ }
+ if len(front_buf) != 0 && len(t.first_buf) == 0 {
+ t.first_buf = make([]byte, len(front_buf))
+ copy(t.first_buf, front_buf)
+ // fmt.Println("write front_buf")
+ t.Stream_msg.PushLock_tag(`data`, t.first_buf)
+ t.msg.Push_tag(`load`, t)
+ }
+ if keyframe.Size() != 0 {
+ if len(t.first_buf) == 0 {
+ t.log.L(`W: `, `flv未接收到起始段`)
+ cancel()
+ break
+ }
+ t.bootBufPush(keyframe.GetPureBuf())
+ keyframe.Reset()
+ t.Stream_msg.PushLock_tag(`data`, t.boot_buf)
+ frameCount += 1
+ if frameCount == 1 {
+ t.msg.Push_tag(`firstFrame`, t)
+ }
+ }
+ if last_available_offset > 1 {
+ // fmt.Println("write Sync")
+ _ = buff.RemoveFront(last_available_offset - 1)
}
- }
- if last_available_offset > 1 {
- // fmt.Println("write Sync")
- _ = buff.RemoveFront(last_available_offset - 1)
}
}
- }
-
- buf = nil
- buff.Reset()
-
- ticker.Stop()
- }()
- t.log.L(`I: `, `flv下载开始`)
-
- _ = r.Reqf(reqf.Rval{
- Ctx: cancelC,
- Url: surl.String(),
- SaveToPipe: pipe,
- NoResponse: true,
- Async: true,
- Proxy: t.common.Proxy,
- WriteLoopTO: int(readTO)*1000*2 + 1,
- Header: map[string]string{
- `Host`: surl.Host,
- `User-Agent`: c.UA,
- `Accept`: `*/*`,
- `Accept-Language`: `zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2`,
- `Origin`: `https://live.bilibili.com`,
- `Connection`: `keep-alive`,
- `Pragma`: `no-cache`,
- `Cache-Control`: `no-cache`,
- `Referer`: "https://live.bilibili.com/",
- `Cookie`: reqf.Map_2_Cookies_String(CookieM),
- },
- })
- if err := r.Wait(); err != nil && !errors.Is(err, io.EOF) {
- if reqf.IsCancel(err) {
- t.log.L(`I: `, `flv下载停止`)
- } else if err != nil && !reqf.IsTimeout(err) {
- e = err
- t.log.L(`E: `, `flv下载失败:`, err)
+ buf = nil
+ buff.Reset()
+
+ ticker.Stop()
+ }()
+
+ t.log.L(`I: `, `flv下载开始`)
+
+ _ = r.Reqf(reqf.Rval{
+ Ctx: cancelC,
+ Url: surl.String(),
+ SaveToPipe: pipe,
+ NoResponse: true,
+ Async: true,
+ Proxy: t.common.Proxy,
+ WriteLoopTO: int(readTO)*1000*2 + 1,
+ Header: map[string]string{
+ `Host`: surl.Host,
+ `User-Agent`: c.UA,
+ `Accept`: `*/*`,
+ `Accept-Language`: `zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2`,
+ `Origin`: `https://live.bilibili.com`,
+ `Connection`: `keep-alive`,
+ `Pragma`: `no-cache`,
+ `Cache-Control`: `no-cache`,
+ `Referer`: "https://live.bilibili.com/",
+ `Cookie`: reqf.Map_2_Cookies_String(CookieM),
+ },
+ })
+ if err := r.Wait(); err != nil && !errors.Is(err, io.EOF) {
+ if reqf.IsCancel(err) {
+ t.log.L(`I: `, `flv下载停止`)
+ } else if err != nil && !reqf.IsTimeout(err) {
+ e = err
+ t.log.L(`E: `, `flv下载失败:`, err)
+ }
}
}
- }
+ cancel()
+ if v1, ok := c.C.K_v.LoadV(`flv断流续接`).(bool); ok && !v1 {
+ break
+ }
+ v.DisableAuto()
+ }
return
}