From: qydysky Date: Wed, 27 Mar 2024 14:46:42 +0000 (+0800) Subject: Improve 在切片下载失败时马上重试 X-Git-Tag: v0.13.6~4 X-Git-Url: http://127.0.0.1:8081/?a=commitdiff_plain;h=cc89b37eb59117420f1e3abdc0ff6f0a585857cc;p=bili_danmu%2F.git Improve 在切片下载失败时马上重试 --- diff --git a/Reply/fmp4Decode.go b/Reply/fmp4Decode.go index cbc5999..1f25d28 100644 --- a/Reply/fmp4Decode.go +++ b/Reply/fmp4Decode.go @@ -9,6 +9,7 @@ import ( "github.com/dustin/go-humanize" F "github.com/qydysky/bili_danmu/F" + perrors "github.com/qydysky/part/errors" slice "github.com/qydysky/part/slice" ) @@ -79,6 +80,13 @@ type Fmp4Decoder struct { AVTDiff float64 // 音视频时间戳容差 } +func NewFmp4Decoder() *Fmp4Decoder { + return &Fmp4Decoder{ + traks: make(map[int]*trak), + buf: slice.New[byte](), + } +} + func (t *Fmp4Decoder) Init_fmp4(buf []byte) (b []byte, err error) { var ftypI, ftypE, moovI, moovE int @@ -105,9 +113,6 @@ func (t *Fmp4Decoder) Init_fmp4(buf []byte) (b []byte, err error) { []string{"tkhd", "mdia", "mdhd", "hdlr"}, func(m []ie) (bool, error) { tackId := int(F.Btoi(buf, m[0].i+20, 4)) - if t.traks == nil { - t.traks = make(map[int]*trak) - } t.traks[tackId] = &trak{ trackID: tackId, // firstTimeStamp: -1, @@ -132,19 +137,28 @@ func (t *Fmp4Decoder) Init_fmp4(buf []byte) (b []byte, err error) { return b, nil } +var ( + ErrBufTooLarge = errors.New("ErrBufTooLarge") + ErrMisTraks = errors.New("ErrMisTraks") +) + func (t *Fmp4Decoder) Search_stream_fmp4(buf []byte, keyframe *slice.Buf[byte]) (cu int, err error) { if len(buf) > humanize.MByte*100 { - err = errors.New("buf too large") - return + return 0, ErrBufTooLarge } if len(t.traks) == 0 { - err = errors.New("未初始化traks") - return - } - if t.buf == nil { - t.buf = slice.New[byte]() + return 0, ErrMisTraks } t.buf.Reset() + keyframe.Reset() + + defer func() { + if err != nil { + keyframe.Reset() + cu = 0 + } + }() + var ( haveKeyframe bool bufModified = t.buf.GetModified() @@ -253,18 +267,18 @@ func (t *Fmp4Decoder) Search_stream_fmp4(buf []byte, keyframe *slice.Buf[byte]) t.buf.Reset() haveKeyframe = false cu = m[0].i - return false, nil + return false, e } } - if nil != check_set_maxT(ts, func(_ timeStamp) error { + if e := check_set_maxT(ts, func(_ timeStamp) error { return errors.New("skip") }, func(_ timeStamp) error { t.buf.Reset() haveKeyframe = false cu = m[0].i return errors.New("skip") - }) { - return false, nil + }); e != nil { + return false, e } } @@ -273,7 +287,9 @@ func (t *Fmp4Decoder) Search_stream_fmp4(buf []byte, keyframe *slice.Buf[byte]) //deal frame if keyframeMoof { if v, e := t.buf.HadModified(bufModified); e == nil && v && !t.buf.IsEmpty() { - _ = t.buf.AppendTo(keyframe) + if e := t.buf.AppendTo(keyframe); e != nil { + return false, e + } cu = m[0].i t.buf.Reset() } @@ -282,7 +298,9 @@ func (t *Fmp4Decoder) Search_stream_fmp4(buf []byte, keyframe *slice.Buf[byte]) cu = m[6].e } if haveKeyframe { - _ = t.buf.Append(buf[m[0].i:m[6].e]) + if e := t.buf.Append(buf[m[0].i:m[6].e]); e != nil { + return false, e + } } return false, nil }, @@ -304,7 +322,7 @@ func (t *Fmp4Decoder) Search_stream_fmp4(buf []byte, keyframe *slice.Buf[byte]) t.buf.Reset() haveKeyframe = false cu = m[0].i - return false, nil + return false, e } } switch handlerType { @@ -313,15 +331,15 @@ func (t *Fmp4Decoder) Search_stream_fmp4(buf []byte, keyframe *slice.Buf[byte]) case 's': audio = ts } - if nil != check_set_maxT(ts, func(_ timeStamp) error { + if e := check_set_maxT(ts, func(_ timeStamp) error { return errors.New("skip") }, func(_ timeStamp) error { t.buf.Reset() haveKeyframe = false cu = m[0].i return errors.New("skip") - }) { - return false, nil + }); e != nil { + return false, e } } { @@ -332,7 +350,7 @@ func (t *Fmp4Decoder) Search_stream_fmp4(buf []byte, keyframe *slice.Buf[byte]) t.buf.Reset() haveKeyframe = false cu = m[0].i - return false, nil + return false, e } } switch handlerType { @@ -341,15 +359,15 @@ func (t *Fmp4Decoder) Search_stream_fmp4(buf []byte, keyframe *slice.Buf[byte]) case 's': audio = ts } - if nil != check_set_maxT(ts, func(_ timeStamp) error { + if e := check_set_maxT(ts, func(_ timeStamp) error { return errors.New("skip") }, func(_ timeStamp) error { t.buf.Reset() haveKeyframe = false cu = m[0].i return errors.New("skip") - }) { - return false, nil + }); e != nil { + return false, e } } @@ -365,7 +383,9 @@ func (t *Fmp4Decoder) Search_stream_fmp4(buf []byte, keyframe *slice.Buf[byte]) //deal frame if keyframeMoof { if v, e := t.buf.HadModified(bufModified); e == nil && v && !t.buf.IsEmpty() { - _ = t.buf.AppendTo(keyframe) + if e := t.buf.AppendTo(keyframe); e != nil { + return false, e + } cu = m[0].i t.buf.Reset() } @@ -374,7 +394,9 @@ func (t *Fmp4Decoder) Search_stream_fmp4(buf []byte, keyframe *slice.Buf[byte]) cu = m[10].e } if haveKeyframe { - _ = t.buf.Append(buf[m[0].i:m[10].e]) + if e := t.buf.Append(buf[m[0].i:m[10].e]); e != nil { + return false, e + } } return false, nil }}) @@ -413,6 +435,11 @@ func deals(ies []ie, boxNames [][]string, fs []func([]ie) (breakloop bool, e err return } +var ( + ErrMisBox = perrors.New("decode", "ErrMisBox") + ErrCantResync = perrors.New("decode") +) + func decode(buf []byte, reSyncboxName string) (m []ie, err error) { var cu int @@ -421,7 +448,7 @@ func decode(buf []byte, reSyncboxName string) (m []ie, err error) { if E != nil { if errors.Is(E, io.EOF) { if len(m) == 0 { - err = errors.New("未找到box") + err = ErrMisBox } return } @@ -431,7 +458,7 @@ func decode(buf []byte, reSyncboxName string) (m []ie, err error) { m = m[:0] continue } - err = errors.New(E.Error() + " > 未能reSync") + err = ErrCantResync.WithReason(E.Error() + "> 未能reSync") return } @@ -445,13 +472,17 @@ func decode(buf []byte, reSyncboxName string) (m []ie, err error) { return } +var ( + ErrUnkownBox = perrors.New("ErrUnkownBox") +) + func searchBox(buf []byte, cu *int) (boxName string, i int, e int, err error) { i = *cu e = i + int(F.Btoi(buf, *cu, 4)) boxName = string(buf[*cu+4 : *cu+8]) isPureBoxOrNeedSkip, ok := boxs[boxName] if !ok { - err = errors.New("未知包: " + boxName) + err = ErrUnkownBox.WithReason("未知包: " + boxName) } else if e > len(buf) { err = io.EOF } else if isPureBoxOrNeedSkip { diff --git a/Reply/stream.go b/Reply/stream.go index 94963d6..c6b8c8f 100644 --- a/Reply/stream.go +++ b/Reply/stream.go @@ -77,6 +77,7 @@ type m4s_link_item struct { Base string // m4s文件名 isHeader bool // m4sHeader status int // 下载状态 0:未下载 1:正在下载 2:下载完成 3:下载失败 + err error // 下载状态 3:下载失败 时的错误 tryDownCount int // 下载次数 当=3时,不再下载,忽略此块 data *slice.Buf[byte] // 下载的数据 createdTime time.Time // 创建时间 @@ -122,9 +123,11 @@ func (t *m4s_link_item) getNo() (int, error) { return strconv.Atoi(base[:len(base)-4]) } -func (link *m4s_link_item) download(reqPool *pool.Buf[reqf.Req], reqConfig reqf.Rval) { +func (link *m4s_link_item) download(reqPool *pool.Buf[reqf.Req], reqConfig reqf.Rval) error { link.status = 1 // 设置切片状态为正在下载 + link.err = nil link.tryDownCount += 1 + link.data.Reset() r := reqPool.Get() defer reqPool.Put(r) @@ -141,10 +144,15 @@ func (link *m4s_link_item) download(reqPool *pool.Buf[reqf.Req], reqConfig reqf. // link.tryDownCount = 3 // 设置切片状态为下载失败 // } link.status = 3 // 设置切片状态为下载失败 + link.err = e + return e + } else if e = link.data.Append(r.Respon); e != nil { + link.status = 3 // 设置切片状态为下载失败 + link.err = e + return e } else { - link.data.Reset() - _ = link.data.Append(r.Respon) link.status = 2 // 设置切片状态为下载完成 + return nil } } @@ -857,7 +865,7 @@ func (t *M4SStream) saveStreamM4s() (e error) { // 同时下载数限制 downloadLimit = funcCtrl.NewBlockFuncN(3) buf = slice.New[byte]() - fmp4Decoder = &Fmp4Decoder{} + fmp4Decoder = NewFmp4Decoder() keyframe = slice.New[byte]() lastM4s *m4s_link_item to = 3 @@ -878,31 +886,66 @@ func (t *M4SStream) saveStreamM4s() (e error) { // 下载循环 for download_seq := []*m4s_link_item{}; ; { + // 获取解析m3u8 + { + var m4s_links, err = t.fetchParseM3U8(lastM4s, fmp4ListUpdateTo) + if err != nil { + t.log.L(`E: `, `获取解析m3u8发生错误`, err) + // if len(download_seq) != 0 { + // continue + // } + if !reqf.IsTimeout(err) { + e = err + break + } + } - // 存在待下载切片 - if len(download_seq) != 0 { - var downingCount = 0 //本轮下载数量 + // 避免过于频繁的请求 + fmp4Count += len(m4s_links) + if dru := time.Since(startT).Seconds(); dru > fmp4ListUpdateTo && fmp4Count == 0 { + e = fmt.Errorf("%.2f 秒未产出切片", dru) + t.log.L("E: ", "获取解析m3u8发生错误", e) + break + } else { + time.Sleep(time.Second * time.Duration(dru/float64(fmp4Count+1)+1)) + } - // 下载切片 - for _, v := range download_seq { + // 设置最后的切片 + for i := len(m4s_links) - 1; i >= 0; i-- { + // fmt.Println("set last m4s", m4s_links[i].Base) + if !m4s_links[i].isInit() && len(m4s_links[i].Base) > 0 { + if lastM4s == nil { + lastM4s = &m4s_link_item{} + } + m4s_links[i].copyTo(lastM4s) + break + } + } + // 添加新切片到下载队列 + download_seq = append(download_seq, m4s_links...) + } + + // 下载切片 + { + var downFail bool + for i := 0; i < len(download_seq); i++ { // 已下载但还未移除的切片 - if v.status == 2 { + if download_seq[i].status == 2 { continue } // 每次最多只下载10个切片 - if downingCount >= 10 { - t.log.L(`T: `, `延迟切片下载 数量(`, len(download_seq)-downingCount, `)`) + if i >= 10 { + t.log.L(`T: `, `延迟切片下载 数量(`, len(download_seq)-i, `)`) break } - downingCount += 1 done := downloadLimit.Block() // 故障转移 - if v.status == 3 { - if linkUrl, e := url.Parse(v.Url); e == nil { + if download_seq[i].status == 3 { + if linkUrl, e := url.Parse(download_seq[i].Url); e == nil { oldHost := linkUrl.Host // 将此切片服务器设置停用 t.common.DisableLiveAuto(oldHost) @@ -913,47 +956,52 @@ func (t *M4SStream) saveStreamM4s() (e error) { linkUrl.Host = vl.Host() t.log.L(`W: `, `切片下载失败,故障转移`, oldHost, ` -> `, linkUrl.Host) } - v.Url = linkUrl.String() + download_seq[i].Url = linkUrl.String() } } go func(link *m4s_link_item) { defer done() - link.download(t.reqPool, reqf.Rval{ + if e := link.download(t.reqPool, reqf.Rval{ Timeout: to * 1000, WriteLoopTO: (to + 2) * 1000, Proxy: t.common.Proxy, Header: map[string]string{ `Connection`: `close`, }, - }) - }(v) + }); e != nil { + downFail = true + t.log.L(`W: `, `切片下载失败`, link.Base, e) + } + }(download_seq[i]) } // 等待队列下载完成 downloadLimit.BlockAll()() + + if downFail { + continue + } } // 传递已下载切片 for k := 0; k < len(download_seq); k++ { // v := download_seq[k] - if download_seq[k].status != 2 { - if download_seq[k].tryDownCount >= 3 { - //下载了2次,任未下载成功,忽略此块 - t.putM4s(download_seq...) - //丢弃所有数据 - buf.Reset() - t.log.L(`E: `, `切片下载失败`) - return errors.New(`切片下载失败`) - } else { - break - } - } + // if download_seq[k].status != 2 { + // if download_seq[k].tryDownCount >= 3 { + // //下载了2次,任未下载成功,忽略此块 + // t.putM4s(download_seq...) + // //丢弃所有数据 + // buf.Reset() + // t.log.L(`E: `, `切片下载失败`, download_seq[k].err) + // return errors.New(`切片下载失败` + download_seq[k].err.Error()) + // } else { + // break + // } + // } - // no, _ := download_seq[k].getNo() - // fmt.Println("download_seq ", no, download_seq[k].status, download_seq[k].data.Size(), len(t.first_buf)) if download_seq[k].isInit() { { buf, unlock := download_seq[k].data.GetPureBufRLock() @@ -984,7 +1032,9 @@ func (t *M4SStream) saveStreamM4s() (e error) { continue } - _ = download_seq[k].data.AppendTo(buf) + if e := download_seq[k].data.AppendTo(buf); e != nil { + t.log.L(`E: `, e) + } t.putM4s(download_seq[k]) download_seq = append(download_seq[:k], download_seq[k+1:]...) k -= 1 @@ -993,18 +1043,12 @@ func (t *M4SStream) saveStreamM4s() (e error) { last_available_offset, err := fmp4Decoder.Search_stream_fmp4(buff, keyframe) unlock() - if err != nil { - if !errors.Is(err, io.EOF) { - t.log.L(`E: `, err) - //丢弃所有数据 - buf.Reset() - e = err - return - // } - } else { - keyframe.Reset() - last_available_offset = 0 - } + if err != nil && !errors.Is(err, io.EOF) { + t.log.L(`E: `, err) + //丢弃所有数据 + buf.Reset() + e = err + return } // no, _ := download_seq[k].getNo() @@ -1012,9 +1056,9 @@ func (t *M4SStream) saveStreamM4s() (e error) { // 传递关键帧 if !keyframe.IsEmpty() { - buf, unlock := keyframe.GetPureBufRLock() - t.bootBufPush(buf) - t.Stream_msg.PushLock_tag(`data`, buf) + keyframeBuf, unlock := keyframe.GetPureBufRLock() + t.bootBufPush(keyframeBuf) + t.Stream_msg.PushLock_tag(`data`, keyframeBuf) unlock() keyframe.Reset() t.frameCount += 1 @@ -1045,44 +1089,6 @@ func (t *M4SStream) saveStreamM4s() (e error) { return } } - - // 获取解析m3u8 - var m4s_links, err = t.fetchParseM3U8(lastM4s, fmp4ListUpdateTo) - if err != nil { - t.log.L(`E: `, `获取解析m3u8发生错误`, err) - // if len(download_seq) != 0 { - // continue - // } - if !reqf.IsTimeout(err) { - e = err - break - } - } - - // 避免过于频繁的请求 - fmp4Count += len(m4s_links) - if dru := time.Since(startT).Seconds(); dru > fmp4ListUpdateTo && fmp4Count == 0 { - e = fmt.Errorf("%.2f 秒未产出切片", dru) - t.log.L("E: ", "获取解析m3u8发生错误", e) - break - } else { - time.Sleep(time.Second * time.Duration(dru/float64(fmp4Count+1)+1)) - } - - // 设置最后的切片 - for i := len(m4s_links) - 1; i >= 0; i-- { - // fmt.Println("set last m4s", m4s_links[i].Base) - if !m4s_links[i].isInit() && len(m4s_links[i].Base) > 0 { - if lastM4s == nil { - lastM4s = &m4s_link_item{} - } - m4s_links[i].copyTo(lastM4s) - break - } - } - - // 添加新切片到下载队列 - download_seq = append(download_seq, m4s_links...) } return diff --git a/go.mod b/go.mod index d4efcdc..474fa23 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.22 require ( github.com/gotk3/gotk3 v0.6.3 github.com/mdp/qrterminal/v3 v3.2.0 - github.com/qydysky/part v0.28.20240323212235 + github.com/qydysky/part v0.28.20240325172911 github.com/skip2/go-qrcode v0.0.0-20200617195104-da1b6568686e github.com/skratchdot/open-golang v0.0.0-20200116055534-eef842397966 golang.org/x/text v0.14.0 diff --git a/go.sum b/go.sum index bd25312..032bbbe 100644 --- a/go.sum +++ b/go.sum @@ -37,8 +37,8 @@ github.com/ncruces/go-strftime v0.1.9 h1:bY0MQC28UADQmHmaF5dgpLmImcShSi2kHU9XLdh github.com/ncruces/go-strftime v0.1.9/go.mod h1:Fwc5htZGVVkseilnfgOVb9mKy6w1naJmn9CehxcKcls= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/qydysky/part v0.28.20240323212235 h1:SODy4RbqsMCXb/yaFNBrXlZfARWMKBLz1421FPKBxVQ= -github.com/qydysky/part v0.28.20240323212235/go.mod h1:XytV5dI1Y7+qvjhsa2TMvi55RBZQQf0LCDYQ1kUCYqM= +github.com/qydysky/part v0.28.20240325172911 h1:2wu7BIpfsxvoXYZjzd79NQCU4B4ZY61Iu97NwxvupRQ= +github.com/qydysky/part v0.28.20240325172911/go.mod h1:XytV5dI1Y7+qvjhsa2TMvi55RBZQQf0LCDYQ1kUCYqM= github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE= github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= github.com/shirou/gopsutil v3.21.11+incompatible h1:+1+c1VGhc88SSonWP6foOcLhvnKlUeu/erjjvaPEYiI=