]> 127.0.0.1 Git - bili_danmu/.git/commitdiff
Improve 添加配置flv断流超时s、flv断流续接
authorqydysky <qydysky@foxmail.com>
Tue, 1 Aug 2023 16:19:11 +0000 (00:19 +0800)
committerqydysky <qydysky@foxmail.com>
Tue, 1 Aug 2023 16:19:11 +0000 (00:19 +0800)
Reply/stream.go
demo/config/config_K_v.json

index 1b5b8420e38640667cbd07ccd30e8c7dcf645a4a..90b9dd9659e9921488e5fa4abb5432c7966fa8ef 100644 (file)
@@ -646,13 +646,18 @@ func (t *M4SStream) saveStreamFlv() (e error) {
                        return errors.New("未能找到可用流服务器")
                }
 
-               var err error
-               surl, err = url.Parse(v.Url)
-               if err != nil {
-                       t.log.L(`E: `, err)
-                       e = err
-                       v.DisableAuto()
-                       continue
+               //reset e
+               e = nil
+
+               {
+                       var err error
+                       surl, err = url.Parse(v.Url)
+                       if err != nil {
+                               t.log.L(`E: `, err)
+                               e = err
+                               v.DisableAuto()
+                               continue
+                       }
                }
 
                //结束退出
@@ -685,169 +690,175 @@ func (t *M4SStream) saveStreamFlv() (e error) {
                        continue
                }
 
-               break
-       }
+               // flv获取
+               cancelC, cancel := context.WithCancel(context.Background())
+               {
+                       go func() {
+                               tsc, tscf := t.Status.WaitC()
+                               defer tscf()
 
-       cancelC, cancel := context.WithCancel(context.Background())
-       defer cancel()
-       {
-               go func() {
-                       tsc, tscf := t.Status.WaitC()
-                       defer tscf()
+                               select {
+                               //停止录制
+                               case <-tsc:
+                                       cancel()
+                               //当前连接终止
+                               case <-cancelC.Done():
+                               }
+                       }()
 
-                       select {
-                       //停止录制
-                       case <-tsc:
-                               cancel()
-                       //当前连接终止
-                       case <-cancelC.Done():
+                       pipe := pio.NewPipe()
+                       var (
+                               leastReadUnix atomic.Int64
+                               readTO        int64 = 5
+                       )
+                       leastReadUnix.Store(time.Now().Unix())
+                       if v, ok := c.C.K_v.LoadV(`flv断流超时s`).(float64); ok && int64(v) > readTO {
+                               readTO = int64(v)
                        }
-               }()
-
-               pipe := pio.NewPipe()
-               var (
-                       leastReadUnix atomic.Int64
-                       readTO        int64 = 5
-               )
-               leastReadUnix.Store(time.Now().Unix())
 
-               // read timeout
-               go func() {
-                       timer := time.NewTicker(time.Duration(readTO * int64(time.Second)))
-                       defer timer.Stop()
+                       // read timeout
+                       go func() {
+                               timer := time.NewTicker(time.Duration(readTO * int64(time.Second)))
+                               defer timer.Stop()
 
-                       for {
-                               select {
-                               case <-cancelC.Done():
-                                       return
-                               case curT := <-timer.C:
-                                       if curT.Unix()-leastReadUnix.Load() > readTO {
-                                               t.log.L(`W: `, fmt.Sprintf("%vs未接收到有效数据", readTO))
-                                               // 5s未接收到任何数据
-                                               cancel()
+                               for {
+                                       select {
+                                       case <-cancelC.Done():
                                                return
-                                       }
-                                       if v, ok := c.C.K_v.LoadV(`直播流清晰度`).(float64); ok {
-                                               if t.config.want_qn != int(v) {
-                                                       t.log.L(`I: `, "直播流清晰度改变:", t.common.Qn[t.config.want_qn], "=>", t.common.Qn[int(v)])
-                                                       t.config.want_qn = int(v)
+                                       case curT := <-timer.C:
+                                               if curT.Unix()-leastReadUnix.Load() > readTO {
+                                                       t.log.L(`W: `, fmt.Sprintf("%vs未接收到有效数据", readTO))
+                                                       // 5s未接收到任何数据
                                                        cancel()
                                                        return
                                                }
+                                               if v, ok := c.C.K_v.LoadV(`直播流清晰度`).(float64); ok {
+                                                       if t.config.want_qn != int(v) {
+                                                               t.log.L(`I: `, "直播流清晰度改变:", t.common.Qn[t.config.want_qn], "=>", t.common.Qn[int(v)])
+                                                               t.config.want_qn = int(v)
+                                                               cancel()
+                                                               return
+                                                       }
+                                               }
                                        }
                                }
-                       }
-               }()
-
-               // read
-               go func() {
-                       var (
-                               ticker     = time.NewTicker(time.Second)
-                               buff       = slice.New[byte]()
-                               keyframe   = slice.New[byte]()
-                               buf        = make([]byte, 1<<16)
-                               frameCount = 0
-                       )
-
-                       for {
-                               n, e := pipe.Read(buf)
-                               _ = buff.Append(buf[:n])
-                               if e != nil {
-                                       cancel()
-                                       break
-                               }
-
-                               skip := true
-                               select {
-                               case <-ticker.C:
-                                       skip = false
-                               default:
-                               }
-                               if skip {
-                                       continue
-                               }
-
-                               if !buff.IsEmpty() {
-                                       keyframe.Reset()
-                                       front_buf, last_available_offset, e := Search_stream_tag(buff.GetPureBuf(), keyframe)
+                       }()
+
+                       // read
+                       go func() {
+                               var (
+                                       ticker     = time.NewTicker(time.Second)
+                                       buff       = slice.New[byte]()
+                                       keyframe   = slice.New[byte]()
+                                       buf        = make([]byte, 1<<16)
+                                       frameCount = 0
+                               )
+
+                               for {
+                                       n, e := pipe.Read(buf)
+                                       _ = buff.Append(buf[:n])
                                        if e != nil {
-                                               if strings.Contains(e.Error(), `no found available tag`) {
-                                                       continue
-                                               }
-                                               //丢弃所有数据
-                                               buff.Reset()
+                                               cancel()
+                                               break
                                        }
-                                       // 存在有效数据
-                                       if len(front_buf) != 0 || keyframe.Size() != 0 {
-                                               leastReadUnix.Store(time.Now().Unix())
+
+                                       skip := true
+                                       select {
+                                       case <-ticker.C:
+                                               skip = false
+                                       default:
                                        }
-                                       if len(front_buf) != 0 && len(t.first_buf) == 0 {
-                                               t.first_buf = make([]byte, len(front_buf))
-                                               copy(t.first_buf, front_buf)
-                                               // fmt.Println("write front_buf")
-                                               t.Stream_msg.PushLock_tag(`data`, t.first_buf)
-                                               t.msg.Push_tag(`load`, t)
+                                       if skip {
+                                               continue
                                        }
-                                       if keyframe.Size() != 0 {
-                                               if len(t.first_buf) == 0 {
-                                                       t.log.L(`W: `, `flv未接收到起始段`)
-                                                       cancel()
-                                                       break
-                                               }
-                                               t.bootBufPush(keyframe.GetPureBuf())
+
+                                       if !buff.IsEmpty() {
                                                keyframe.Reset()
-                                               t.Stream_msg.PushLock_tag(`data`, t.boot_buf)
-                                               frameCount += 1
-                                               if frameCount == 1 {
-                                                       t.msg.Push_tag(`firstFrame`, t)
+                                               front_buf, last_available_offset, e := Search_stream_tag(buff.GetPureBuf(), keyframe)
+                                               if e != nil {
+                                                       if strings.Contains(e.Error(), `no found available tag`) {
+                                                               continue
+                                                       }
+                                                       //丢弃所有数据
+                                                       buff.Reset()
+                                               }
+                                               // 存在有效数据
+                                               if len(front_buf) != 0 || keyframe.Size() != 0 {
+                                                       leastReadUnix.Store(time.Now().Unix())
+                                               }
+                                               if len(front_buf) != 0 && len(t.first_buf) == 0 {
+                                                       t.first_buf = make([]byte, len(front_buf))
+                                                       copy(t.first_buf, front_buf)
+                                                       // fmt.Println("write front_buf")
+                                                       t.Stream_msg.PushLock_tag(`data`, t.first_buf)
+                                                       t.msg.Push_tag(`load`, t)
+                                               }
+                                               if keyframe.Size() != 0 {
+                                                       if len(t.first_buf) == 0 {
+                                                               t.log.L(`W: `, `flv未接收到起始段`)
+                                                               cancel()
+                                                               break
+                                                       }
+                                                       t.bootBufPush(keyframe.GetPureBuf())
+                                                       keyframe.Reset()
+                                                       t.Stream_msg.PushLock_tag(`data`, t.boot_buf)
+                                                       frameCount += 1
+                                                       if frameCount == 1 {
+                                                               t.msg.Push_tag(`firstFrame`, t)
+                                                       }
+                                               }
+                                               if last_available_offset > 1 {
+                                                       // fmt.Println("write Sync")
+                                                       _ = buff.RemoveFront(last_available_offset - 1)
                                                }
-                                       }
-                                       if last_available_offset > 1 {
-                                               // fmt.Println("write Sync")
-                                               _ = buff.RemoveFront(last_available_offset - 1)
                                        }
                                }
-                       }
-
-                       buf = nil
-                       buff.Reset()
-
-                       ticker.Stop()
-               }()
 
-               t.log.L(`I: `, `flv下载开始`)
-
-               _ = r.Reqf(reqf.Rval{
-                       Ctx:         cancelC,
-                       Url:         surl.String(),
-                       SaveToPipe:  pipe,
-                       NoResponse:  true,
-                       Async:       true,
-                       Proxy:       t.common.Proxy,
-                       WriteLoopTO: int(readTO)*1000*2 + 1,
-                       Header: map[string]string{
-                               `Host`:            surl.Host,
-                               `User-Agent`:      c.UA,
-                               `Accept`:          `*/*`,
-                               `Accept-Language`: `zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2`,
-                               `Origin`:          `https://live.bilibili.com`,
-                               `Connection`:      `keep-alive`,
-                               `Pragma`:          `no-cache`,
-                               `Cache-Control`:   `no-cache`,
-                               `Referer`:         "https://live.bilibili.com/",
-                               `Cookie`:          reqf.Map_2_Cookies_String(CookieM),
-                       },
-               })
-               if err := r.Wait(); err != nil && !errors.Is(err, io.EOF) {
-                       if reqf.IsCancel(err) {
-                               t.log.L(`I: `, `flv下载停止`)
-                       } else if err != nil && !reqf.IsTimeout(err) {
-                               e = err
-                               t.log.L(`E: `, `flv下载失败:`, err)
+                               buf = nil
+                               buff.Reset()
+
+                               ticker.Stop()
+                       }()
+
+                       t.log.L(`I: `, `flv下载开始`)
+
+                       _ = r.Reqf(reqf.Rval{
+                               Ctx:         cancelC,
+                               Url:         surl.String(),
+                               SaveToPipe:  pipe,
+                               NoResponse:  true,
+                               Async:       true,
+                               Proxy:       t.common.Proxy,
+                               WriteLoopTO: int(readTO)*1000*2 + 1,
+                               Header: map[string]string{
+                                       `Host`:            surl.Host,
+                                       `User-Agent`:      c.UA,
+                                       `Accept`:          `*/*`,
+                                       `Accept-Language`: `zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2`,
+                                       `Origin`:          `https://live.bilibili.com`,
+                                       `Connection`:      `keep-alive`,
+                                       `Pragma`:          `no-cache`,
+                                       `Cache-Control`:   `no-cache`,
+                                       `Referer`:         "https://live.bilibili.com/",
+                                       `Cookie`:          reqf.Map_2_Cookies_String(CookieM),
+                               },
+                       })
+                       if err := r.Wait(); err != nil && !errors.Is(err, io.EOF) {
+                               if reqf.IsCancel(err) {
+                                       t.log.L(`I: `, `flv下载停止`)
+                               } else if err != nil && !reqf.IsTimeout(err) {
+                                       e = err
+                                       t.log.L(`E: `, `flv下载失败:`, err)
+                               }
                        }
                }
-       }
+               cancel()
 
+               if v1, ok := c.C.K_v.LoadV(`flv断流续接`).(bool); ok && !v1 {
+                       break
+               }
+               v.DisableAuto()
+       }
        return
 }
 
index 1dd9980436a68e5d4be1bcd247699ef28329ce47..f67b989b99e131708697960ba01bc2e890646f07 100644 (file)
@@ -69,6 +69,8 @@
     "直播流清晰度": 10000,
     "直播流类型-help": "flv,fmp4,flvH,fmp4H,带H后缀的为Hevc格式编码",
     "直播流类型": "flv",
+    "flv断流超时s": 7,
+    "flv断流续接": true,
     "直播流保存位置": "./live",
     "直播流保存天数-help": "当t日有1录播时,会尝试删除t-n日的1个最早的录播。小于1的数将禁用此功能",
     "直播流保存天数": 4,