From: qydysky Date: Tue, 1 Aug 2023 16:19:11 +0000 (+0800) Subject: Improve 添加配置flv断流超时s、flv断流续接 X-Git-Tag: v0.10.8~11 X-Git-Url: http://127.0.0.1:8081/?a=commitdiff_plain;h=14de692349834722009d3a85a493a08ef57ee789;p=bili_danmu%2F.git Improve 添加配置flv断流超时s、flv断流续接 --- diff --git a/Reply/stream.go b/Reply/stream.go index 1b5b842..90b9dd9 100644 --- a/Reply/stream.go +++ b/Reply/stream.go @@ -646,13 +646,18 @@ func (t *M4SStream) saveStreamFlv() (e error) { return errors.New("未能找到可用流服务器") } - var err error - surl, err = url.Parse(v.Url) - if err != nil { - t.log.L(`E: `, err) - e = err - v.DisableAuto() - continue + //reset e + e = nil + + { + var err error + surl, err = url.Parse(v.Url) + if err != nil { + t.log.L(`E: `, err) + e = err + v.DisableAuto() + continue + } } //结束退出 @@ -685,169 +690,175 @@ func (t *M4SStream) saveStreamFlv() (e error) { continue } - break - } + // flv获取 + cancelC, cancel := context.WithCancel(context.Background()) + { + go func() { + tsc, tscf := t.Status.WaitC() + defer tscf() - cancelC, cancel := context.WithCancel(context.Background()) - defer cancel() - { - go func() { - tsc, tscf := t.Status.WaitC() - defer tscf() + select { + //停止录制 + case <-tsc: + cancel() + //当前连接终止 + case <-cancelC.Done(): + } + }() - select { - //停止录制 - case <-tsc: - cancel() - //当前连接终止 - case <-cancelC.Done(): + pipe := pio.NewPipe() + var ( + leastReadUnix atomic.Int64 + readTO int64 = 5 + ) + leastReadUnix.Store(time.Now().Unix()) + if v, ok := c.C.K_v.LoadV(`flv断流超时s`).(float64); ok && int64(v) > readTO { + readTO = int64(v) } - }() - - pipe := pio.NewPipe() - var ( - leastReadUnix atomic.Int64 - readTO int64 = 5 - ) - leastReadUnix.Store(time.Now().Unix()) - // read timeout - go func() { - timer := time.NewTicker(time.Duration(readTO * int64(time.Second))) - defer timer.Stop() + // read timeout + go func() { + timer := time.NewTicker(time.Duration(readTO * int64(time.Second))) + defer timer.Stop() - for { - select { - case <-cancelC.Done(): - return - case curT := <-timer.C: - if curT.Unix()-leastReadUnix.Load() > readTO { - t.log.L(`W: `, fmt.Sprintf("%vs未接收到有效数据", readTO)) - // 5s未接收到任何数据 - cancel() + for { + select { + case <-cancelC.Done(): return - } - if v, ok := c.C.K_v.LoadV(`直播流清晰度`).(float64); ok { - if t.config.want_qn != int(v) { - t.log.L(`I: `, "直播流清晰度改变:", t.common.Qn[t.config.want_qn], "=>", t.common.Qn[int(v)]) - t.config.want_qn = int(v) + case curT := <-timer.C: + if curT.Unix()-leastReadUnix.Load() > readTO { + t.log.L(`W: `, fmt.Sprintf("%vs未接收到有效数据", readTO)) + // 5s未接收到任何数据 cancel() return } + if v, ok := c.C.K_v.LoadV(`直播流清晰度`).(float64); ok { + if t.config.want_qn != int(v) { + t.log.L(`I: `, "直播流清晰度改变:", t.common.Qn[t.config.want_qn], "=>", t.common.Qn[int(v)]) + t.config.want_qn = int(v) + cancel() + return + } + } } } - } - }() - - // read - go func() { - var ( - ticker = time.NewTicker(time.Second) - buff = slice.New[byte]() - keyframe = slice.New[byte]() - buf = make([]byte, 1<<16) - frameCount = 0 - ) - - for { - n, e := pipe.Read(buf) - _ = buff.Append(buf[:n]) - if e != nil { - cancel() - break - } - - skip := true - select { - case <-ticker.C: - skip = false - default: - } - if skip { - continue - } - - if !buff.IsEmpty() { - keyframe.Reset() - front_buf, last_available_offset, e := Search_stream_tag(buff.GetPureBuf(), keyframe) + }() + + // read + go func() { + var ( + ticker = time.NewTicker(time.Second) + buff = slice.New[byte]() + keyframe = slice.New[byte]() + buf = make([]byte, 1<<16) + frameCount = 0 + ) + + for { + n, e := pipe.Read(buf) + _ = buff.Append(buf[:n]) if e != nil { - if strings.Contains(e.Error(), `no found available tag`) { - continue - } - //丢弃所有数据 - buff.Reset() + cancel() + break } - // 存在有效数据 - if len(front_buf) != 0 || keyframe.Size() != 0 { - leastReadUnix.Store(time.Now().Unix()) + + skip := true + select { + case <-ticker.C: + skip = false + default: } - if len(front_buf) != 0 && len(t.first_buf) == 0 { - t.first_buf = make([]byte, len(front_buf)) - copy(t.first_buf, front_buf) - // fmt.Println("write front_buf") - t.Stream_msg.PushLock_tag(`data`, t.first_buf) - t.msg.Push_tag(`load`, t) + if skip { + continue } - if keyframe.Size() != 0 { - if len(t.first_buf) == 0 { - t.log.L(`W: `, `flv未接收到起始段`) - cancel() - break - } - t.bootBufPush(keyframe.GetPureBuf()) + + if !buff.IsEmpty() { keyframe.Reset() - t.Stream_msg.PushLock_tag(`data`, t.boot_buf) - frameCount += 1 - if frameCount == 1 { - t.msg.Push_tag(`firstFrame`, t) + front_buf, last_available_offset, e := Search_stream_tag(buff.GetPureBuf(), keyframe) + if e != nil { + if strings.Contains(e.Error(), `no found available tag`) { + continue + } + //丢弃所有数据 + buff.Reset() + } + // 存在有效数据 + if len(front_buf) != 0 || keyframe.Size() != 0 { + leastReadUnix.Store(time.Now().Unix()) + } + if len(front_buf) != 0 && len(t.first_buf) == 0 { + t.first_buf = make([]byte, len(front_buf)) + copy(t.first_buf, front_buf) + // fmt.Println("write front_buf") + t.Stream_msg.PushLock_tag(`data`, t.first_buf) + t.msg.Push_tag(`load`, t) + } + if keyframe.Size() != 0 { + if len(t.first_buf) == 0 { + t.log.L(`W: `, `flv未接收到起始段`) + cancel() + break + } + t.bootBufPush(keyframe.GetPureBuf()) + keyframe.Reset() + t.Stream_msg.PushLock_tag(`data`, t.boot_buf) + frameCount += 1 + if frameCount == 1 { + t.msg.Push_tag(`firstFrame`, t) + } + } + if last_available_offset > 1 { + // fmt.Println("write Sync") + _ = buff.RemoveFront(last_available_offset - 1) } - } - if last_available_offset > 1 { - // fmt.Println("write Sync") - _ = buff.RemoveFront(last_available_offset - 1) } } - } - - buf = nil - buff.Reset() - - ticker.Stop() - }() - t.log.L(`I: `, `flv下载开始`) - - _ = r.Reqf(reqf.Rval{ - Ctx: cancelC, - Url: surl.String(), - SaveToPipe: pipe, - NoResponse: true, - Async: true, - Proxy: t.common.Proxy, - WriteLoopTO: int(readTO)*1000*2 + 1, - Header: map[string]string{ - `Host`: surl.Host, - `User-Agent`: c.UA, - `Accept`: `*/*`, - `Accept-Language`: `zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2`, - `Origin`: `https://live.bilibili.com`, - `Connection`: `keep-alive`, - `Pragma`: `no-cache`, - `Cache-Control`: `no-cache`, - `Referer`: "https://live.bilibili.com/", - `Cookie`: reqf.Map_2_Cookies_String(CookieM), - }, - }) - if err := r.Wait(); err != nil && !errors.Is(err, io.EOF) { - if reqf.IsCancel(err) { - t.log.L(`I: `, `flv下载停止`) - } else if err != nil && !reqf.IsTimeout(err) { - e = err - t.log.L(`E: `, `flv下载失败:`, err) + buf = nil + buff.Reset() + + ticker.Stop() + }() + + t.log.L(`I: `, `flv下载开始`) + + _ = r.Reqf(reqf.Rval{ + Ctx: cancelC, + Url: surl.String(), + SaveToPipe: pipe, + NoResponse: true, + Async: true, + Proxy: t.common.Proxy, + WriteLoopTO: int(readTO)*1000*2 + 1, + Header: map[string]string{ + `Host`: surl.Host, + `User-Agent`: c.UA, + `Accept`: `*/*`, + `Accept-Language`: `zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2`, + `Origin`: `https://live.bilibili.com`, + `Connection`: `keep-alive`, + `Pragma`: `no-cache`, + `Cache-Control`: `no-cache`, + `Referer`: "https://live.bilibili.com/", + `Cookie`: reqf.Map_2_Cookies_String(CookieM), + }, + }) + if err := r.Wait(); err != nil && !errors.Is(err, io.EOF) { + if reqf.IsCancel(err) { + t.log.L(`I: `, `flv下载停止`) + } else if err != nil && !reqf.IsTimeout(err) { + e = err + t.log.L(`E: `, `flv下载失败:`, err) + } } } - } + cancel() + if v1, ok := c.C.K_v.LoadV(`flv断流续接`).(bool); ok && !v1 { + break + } + v.DisableAuto() + } return } diff --git a/demo/config/config_K_v.json b/demo/config/config_K_v.json index 1dd9980..f67b989 100644 --- a/demo/config/config_K_v.json +++ b/demo/config/config_K_v.json @@ -69,6 +69,8 @@ "直播流清晰度": 10000, "直播流类型-help": "flv,fmp4,flvH,fmp4H,带H后缀的为Hevc格式编码", "直播流类型": "flv", + "flv断流超时s": 7, + "flv断流续接": true, "直播流保存位置": "./live", "直播流保存天数-help": "当t日有1录播时,会尝试删除t-n日的1个最早的录播。小于1的数将禁用此功能", "直播流保存天数": 4,