]> 127.0.0.1 Git - bili_danmu/.git/commitdiff
Improve 在切片下载失败时马上重试
authorqydysky <qydysky@foxmail.com>
Wed, 27 Mar 2024 14:46:42 +0000 (22:46 +0800)
committerqydysky <qydysky@foxmail.com>
Wed, 27 Mar 2024 14:46:42 +0000 (22:46 +0800)
Reply/fmp4Decode.go
Reply/stream.go
go.mod
go.sum

index cbc599975ba0ff6e02c3f645be759062555a42f4..1f25d285158e97ea69adffd4eafd421f0becc4db 100644 (file)
@@ -9,6 +9,7 @@ import (
 
        "github.com/dustin/go-humanize"
        F "github.com/qydysky/bili_danmu/F"
+       perrors "github.com/qydysky/part/errors"
        slice "github.com/qydysky/part/slice"
 )
 
@@ -79,6 +80,13 @@ type Fmp4Decoder struct {
        AVTDiff float64 // 音视频时间戳容差
 }
 
+func NewFmp4Decoder() *Fmp4Decoder {
+       return &Fmp4Decoder{
+               traks: make(map[int]*trak),
+               buf:   slice.New[byte](),
+       }
+}
+
 func (t *Fmp4Decoder) Init_fmp4(buf []byte) (b []byte, err error) {
        var ftypI, ftypE, moovI, moovE int
 
@@ -105,9 +113,6 @@ func (t *Fmp4Decoder) Init_fmp4(buf []byte) (b []byte, err error) {
                []string{"tkhd", "mdia", "mdhd", "hdlr"},
                func(m []ie) (bool, error) {
                        tackId := int(F.Btoi(buf, m[0].i+20, 4))
-                       if t.traks == nil {
-                               t.traks = make(map[int]*trak)
-                       }
                        t.traks[tackId] = &trak{
                                trackID: tackId,
                                // firstTimeStamp: -1,
@@ -132,19 +137,28 @@ func (t *Fmp4Decoder) Init_fmp4(buf []byte) (b []byte, err error) {
        return b, nil
 }
 
+var (
+       ErrBufTooLarge = errors.New("ErrBufTooLarge")
+       ErrMisTraks    = errors.New("ErrMisTraks")
+)
+
 func (t *Fmp4Decoder) Search_stream_fmp4(buf []byte, keyframe *slice.Buf[byte]) (cu int, err error) {
        if len(buf) > humanize.MByte*100 {
-               err = errors.New("buf too large")
-               return
+               return 0, ErrBufTooLarge
        }
        if len(t.traks) == 0 {
-               err = errors.New("未初始化traks")
-               return
-       }
-       if t.buf == nil {
-               t.buf = slice.New[byte]()
+               return 0, ErrMisTraks
        }
        t.buf.Reset()
+       keyframe.Reset()
+
+       defer func() {
+               if err != nil {
+                       keyframe.Reset()
+                       cu = 0
+               }
+       }()
+
        var (
                haveKeyframe bool
                bufModified  = t.buf.GetModified()
@@ -253,18 +267,18 @@ func (t *Fmp4Decoder) Search_stream_fmp4(buf []byte, keyframe *slice.Buf[byte])
                                                        t.buf.Reset()
                                                        haveKeyframe = false
                                                        cu = m[0].i
-                                                       return false, nil
+                                                       return false, e
                                                }
                                        }
-                                       if nil != check_set_maxT(ts, func(_ timeStamp) error {
+                                       if e := check_set_maxT(ts, func(_ timeStamp) error {
                                                return errors.New("skip")
                                        }, func(_ timeStamp) error {
                                                t.buf.Reset()
                                                haveKeyframe = false
                                                cu = m[0].i
                                                return errors.New("skip")
-                                       }) {
-                                               return false, nil
+                                       }); e != nil {
+                                               return false, e
                                        }
                                }
 
@@ -273,7 +287,9 @@ func (t *Fmp4Decoder) Search_stream_fmp4(buf []byte, keyframe *slice.Buf[byte])
                                //deal frame
                                if keyframeMoof {
                                        if v, e := t.buf.HadModified(bufModified); e == nil && v && !t.buf.IsEmpty() {
-                                               _ = t.buf.AppendTo(keyframe)
+                                               if e := t.buf.AppendTo(keyframe); e != nil {
+                                                       return false, e
+                                               }
                                                cu = m[0].i
                                                t.buf.Reset()
                                        }
@@ -282,7 +298,9 @@ func (t *Fmp4Decoder) Search_stream_fmp4(buf []byte, keyframe *slice.Buf[byte])
                                        cu = m[6].e
                                }
                                if haveKeyframe {
-                                       _ = t.buf.Append(buf[m[0].i:m[6].e])
+                                       if e := t.buf.Append(buf[m[0].i:m[6].e]); e != nil {
+                                               return false, e
+                                       }
                                }
                                return false, nil
                        },
@@ -304,7 +322,7 @@ func (t *Fmp4Decoder) Search_stream_fmp4(buf []byte, keyframe *slice.Buf[byte])
                                                        t.buf.Reset()
                                                        haveKeyframe = false
                                                        cu = m[0].i
-                                                       return false, nil
+                                                       return false, e
                                                }
                                        }
                                        switch handlerType {
@@ -313,15 +331,15 @@ func (t *Fmp4Decoder) Search_stream_fmp4(buf []byte, keyframe *slice.Buf[byte])
                                        case 's':
                                                audio = ts
                                        }
-                                       if nil != check_set_maxT(ts, func(_ timeStamp) error {
+                                       if e := check_set_maxT(ts, func(_ timeStamp) error {
                                                return errors.New("skip")
                                        }, func(_ timeStamp) error {
                                                t.buf.Reset()
                                                haveKeyframe = false
                                                cu = m[0].i
                                                return errors.New("skip")
-                                       }) {
-                                               return false, nil
+                                       }); e != nil {
+                                               return false, e
                                        }
                                }
                                {
@@ -332,7 +350,7 @@ func (t *Fmp4Decoder) Search_stream_fmp4(buf []byte, keyframe *slice.Buf[byte])
                                                        t.buf.Reset()
                                                        haveKeyframe = false
                                                        cu = m[0].i
-                                                       return false, nil
+                                                       return false, e
                                                }
                                        }
                                        switch handlerType {
@@ -341,15 +359,15 @@ func (t *Fmp4Decoder) Search_stream_fmp4(buf []byte, keyframe *slice.Buf[byte])
                                        case 's':
                                                audio = ts
                                        }
-                                       if nil != check_set_maxT(ts, func(_ timeStamp) error {
+                                       if e := check_set_maxT(ts, func(_ timeStamp) error {
                                                return errors.New("skip")
                                        }, func(_ timeStamp) error {
                                                t.buf.Reset()
                                                haveKeyframe = false
                                                cu = m[0].i
                                                return errors.New("skip")
-                                       }) {
-                                               return false, nil
+                                       }); e != nil {
+                                               return false, e
                                        }
                                }
 
@@ -365,7 +383,9 @@ func (t *Fmp4Decoder) Search_stream_fmp4(buf []byte, keyframe *slice.Buf[byte])
                                //deal frame
                                if keyframeMoof {
                                        if v, e := t.buf.HadModified(bufModified); e == nil && v && !t.buf.IsEmpty() {
-                                               _ = t.buf.AppendTo(keyframe)
+                                               if e := t.buf.AppendTo(keyframe); e != nil {
+                                                       return false, e
+                                               }
                                                cu = m[0].i
                                                t.buf.Reset()
                                        }
@@ -374,7 +394,9 @@ func (t *Fmp4Decoder) Search_stream_fmp4(buf []byte, keyframe *slice.Buf[byte])
                                        cu = m[10].e
                                }
                                if haveKeyframe {
-                                       _ = t.buf.Append(buf[m[0].i:m[10].e])
+                                       if e := t.buf.Append(buf[m[0].i:m[10].e]); e != nil {
+                                               return false, e
+                                       }
                                }
                                return false, nil
                        }})
@@ -413,6 +435,11 @@ func deals(ies []ie, boxNames [][]string, fs []func([]ie) (breakloop bool, e err
        return
 }
 
+var (
+       ErrMisBox     = perrors.New("decode", "ErrMisBox")
+       ErrCantResync = perrors.New("decode")
+)
+
 func decode(buf []byte, reSyncboxName string) (m []ie, err error) {
        var cu int
 
@@ -421,7 +448,7 @@ func decode(buf []byte, reSyncboxName string) (m []ie, err error) {
                if E != nil {
                        if errors.Is(E, io.EOF) {
                                if len(m) == 0 {
-                                       err = errors.New("未找到box")
+                                       err = ErrMisBox
                                }
                                return
                        }
@@ -431,7 +458,7 @@ func decode(buf []byte, reSyncboxName string) (m []ie, err error) {
                                m = m[:0]
                                continue
                        }
-                       err = errors.New(E.Error() + " > 未能reSync")
+                       err = ErrCantResync.WithReason(E.Error() + "> 未能reSync")
                        return
                }
 
@@ -445,13 +472,17 @@ func decode(buf []byte, reSyncboxName string) (m []ie, err error) {
        return
 }
 
+var (
+       ErrUnkownBox = perrors.New("ErrUnkownBox")
+)
+
 func searchBox(buf []byte, cu *int) (boxName string, i int, e int, err error) {
        i = *cu
        e = i + int(F.Btoi(buf, *cu, 4))
        boxName = string(buf[*cu+4 : *cu+8])
        isPureBoxOrNeedSkip, ok := boxs[boxName]
        if !ok {
-               err = errors.New("未知包: " + boxName)
+               err = ErrUnkownBox.WithReason("未知包: " + boxName)
        } else if e > len(buf) {
                err = io.EOF
        } else if isPureBoxOrNeedSkip {
index 94963d69777216a5209deafdd7432949dec135af..c6b8c8f9a33cb8790c0743c8cc362b98d4d8520b 100644 (file)
@@ -77,6 +77,7 @@ type m4s_link_item struct {
        Base         string           // m4s文件名
        isHeader     bool             // m4sHeader
        status       int              // 下载状态 0:未下载 1:正在下载 2:下载完成 3:下载失败
+       err          error            // 下载状态 3:下载失败 时的错误
        tryDownCount int              // 下载次数 当=3时,不再下载,忽略此块
        data         *slice.Buf[byte] // 下载的数据
        createdTime  time.Time        // 创建时间
@@ -122,9 +123,11 @@ func (t *m4s_link_item) getNo() (int, error) {
        return strconv.Atoi(base[:len(base)-4])
 }
 
-func (link *m4s_link_item) download(reqPool *pool.Buf[reqf.Req], reqConfig reqf.Rval) {
+func (link *m4s_link_item) download(reqPool *pool.Buf[reqf.Req], reqConfig reqf.Rval) error {
        link.status = 1 // 设置切片状态为正在下载
+       link.err = nil
        link.tryDownCount += 1
+       link.data.Reset()
 
        r := reqPool.Get()
        defer reqPool.Put(r)
@@ -141,10 +144,15 @@ func (link *m4s_link_item) download(reqPool *pool.Buf[reqf.Req], reqConfig reqf.
                //      link.tryDownCount = 3 // 设置切片状态为下载失败
                // }
                link.status = 3 // 设置切片状态为下载失败
+               link.err = e
+               return e
+       } else if e = link.data.Append(r.Respon); e != nil {
+               link.status = 3 // 设置切片状态为下载失败
+               link.err = e
+               return e
        } else {
-               link.data.Reset()
-               _ = link.data.Append(r.Respon)
                link.status = 2 // 设置切片状态为下载完成
+               return nil
        }
 }
 
@@ -857,7 +865,7 @@ func (t *M4SStream) saveStreamM4s() (e error) {
                // 同时下载数限制
                downloadLimit    = funcCtrl.NewBlockFuncN(3)
                buf              = slice.New[byte]()
-               fmp4Decoder      = &Fmp4Decoder{}
+               fmp4Decoder      = NewFmp4Decoder()
                keyframe         = slice.New[byte]()
                lastM4s          *m4s_link_item
                to               = 3
@@ -878,31 +886,66 @@ func (t *M4SStream) saveStreamM4s() (e error) {
 
        // 下载循环
        for download_seq := []*m4s_link_item{}; ; {
+               // 获取解析m3u8
+               {
+                       var m4s_links, err = t.fetchParseM3U8(lastM4s, fmp4ListUpdateTo)
+                       if err != nil {
+                               t.log.L(`E: `, `获取解析m3u8发生错误`, err)
+                               // if len(download_seq) != 0 {
+                               //      continue
+                               // }
+                               if !reqf.IsTimeout(err) {
+                                       e = err
+                                       break
+                               }
+                       }
 
-               // 存在待下载切片
-               if len(download_seq) != 0 {
-                       var downingCount = 0 //本轮下载数量
+                       // 避免过于频繁的请求
+                       fmp4Count += len(m4s_links)
+                       if dru := time.Since(startT).Seconds(); dru > fmp4ListUpdateTo && fmp4Count == 0 {
+                               e = fmt.Errorf("%.2f 秒未产出切片", dru)
+                               t.log.L("E: ", "获取解析m3u8发生错误", e)
+                               break
+                       } else {
+                               time.Sleep(time.Second * time.Duration(dru/float64(fmp4Count+1)+1))
+                       }
 
-                       // 下载切片
-                       for _, v := range download_seq {
+                       // 设置最后的切片
+                       for i := len(m4s_links) - 1; i >= 0; i-- {
+                               // fmt.Println("set last m4s", m4s_links[i].Base)
+                               if !m4s_links[i].isInit() && len(m4s_links[i].Base) > 0 {
+                                       if lastM4s == nil {
+                                               lastM4s = &m4s_link_item{}
+                                       }
+                                       m4s_links[i].copyTo(lastM4s)
+                                       break
+                               }
+                       }
 
+                       // 添加新切片到下载队列
+                       download_seq = append(download_seq, m4s_links...)
+               }
+
+               // 下载切片
+               {
+                       var downFail bool
+                       for i := 0; i < len(download_seq); i++ {
                                // 已下载但还未移除的切片
-                               if v.status == 2 {
+                               if download_seq[i].status == 2 {
                                        continue
                                }
 
                                // 每次最多只下载10个切片
-                               if downingCount >= 10 {
-                                       t.log.L(`T: `, `延迟切片下载 数量(`, len(download_seq)-downingCount, `)`)
+                               if i >= 10 {
+                                       t.log.L(`T: `, `延迟切片下载 数量(`, len(download_seq)-i, `)`)
                                        break
                                }
-                               downingCount += 1
 
                                done := downloadLimit.Block()
 
                                // 故障转移
-                               if v.status == 3 {
-                                       if linkUrl, e := url.Parse(v.Url); e == nil {
+                               if download_seq[i].status == 3 {
+                                       if linkUrl, e := url.Parse(download_seq[i].Url); e == nil {
                                                oldHost := linkUrl.Host
                                                // 将此切片服务器设置停用
                                                t.common.DisableLiveAuto(oldHost)
@@ -913,47 +956,52 @@ func (t *M4SStream) saveStreamM4s() (e error) {
                                                        linkUrl.Host = vl.Host()
                                                        t.log.L(`W: `, `切片下载失败,故障转移`, oldHost, ` -> `, linkUrl.Host)
                                                }
-                                               v.Url = linkUrl.String()
+                                               download_seq[i].Url = linkUrl.String()
                                        }
                                }
 
                                go func(link *m4s_link_item) {
                                        defer done()
 
-                                       link.download(t.reqPool, reqf.Rval{
+                                       if e := link.download(t.reqPool, reqf.Rval{
                                                Timeout:     to * 1000,
                                                WriteLoopTO: (to + 2) * 1000,
                                                Proxy:       t.common.Proxy,
                                                Header: map[string]string{
                                                        `Connection`: `close`,
                                                },
-                                       })
-                               }(v)
+                                       }); e != nil {
+                                               downFail = true
+                                               t.log.L(`W: `, `切片下载失败`, link.Base, e)
+                                       }
+                               }(download_seq[i])
                        }
 
                        // 等待队列下载完成
                        downloadLimit.BlockAll()()
+
+                       if downFail {
+                               continue
+                       }
                }
 
                // 传递已下载切片
                for k := 0; k < len(download_seq); k++ {
                        // v := download_seq[k]
 
-                       if download_seq[k].status != 2 {
-                               if download_seq[k].tryDownCount >= 3 {
-                                       //下载了2次,任未下载成功,忽略此块
-                                       t.putM4s(download_seq...)
-                                       //丢弃所有数据
-                                       buf.Reset()
-                                       t.log.L(`E: `, `切片下载失败`)
-                                       return errors.New(`切片下载失败`)
-                               } else {
-                                       break
-                               }
-                       }
+                       // if download_seq[k].status != 2 {
+                       //      if download_seq[k].tryDownCount >= 3 {
+                       //              //下载了2次,任未下载成功,忽略此块
+                       //              t.putM4s(download_seq...)
+                       //              //丢弃所有数据
+                       //              buf.Reset()
+                       //              t.log.L(`E: `, `切片下载失败`, download_seq[k].err)
+                       //              return errors.New(`切片下载失败` + download_seq[k].err.Error())
+                       //      } else {
+                       //              break
+                       //      }
+                       // }
 
-                       // no, _ := download_seq[k].getNo()
-                       // fmt.Println("download_seq ", no, download_seq[k].status, download_seq[k].data.Size(), len(t.first_buf))
                        if download_seq[k].isInit() {
                                {
                                        buf, unlock := download_seq[k].data.GetPureBufRLock()
@@ -984,7 +1032,9 @@ func (t *M4SStream) saveStreamM4s() (e error) {
                                continue
                        }
 
-                       _ = download_seq[k].data.AppendTo(buf)
+                       if e := download_seq[k].data.AppendTo(buf); e != nil {
+                               t.log.L(`E: `, e)
+                       }
                        t.putM4s(download_seq[k])
                        download_seq = append(download_seq[:k], download_seq[k+1:]...)
                        k -= 1
@@ -993,18 +1043,12 @@ func (t *M4SStream) saveStreamM4s() (e error) {
                        last_available_offset, err := fmp4Decoder.Search_stream_fmp4(buff, keyframe)
                        unlock()
 
-                       if err != nil {
-                               if !errors.Is(err, io.EOF) {
-                                       t.log.L(`E: `, err)
-                                       //丢弃所有数据
-                                       buf.Reset()
-                                       e = err
-                                       return
-                                       // }
-                               } else {
-                                       keyframe.Reset()
-                                       last_available_offset = 0
-                               }
+                       if err != nil && !errors.Is(err, io.EOF) {
+                               t.log.L(`E: `, err)
+                               //丢弃所有数据
+                               buf.Reset()
+                               e = err
+                               return
                        }
 
                        // no, _ := download_seq[k].getNo()
@@ -1012,9 +1056,9 @@ func (t *M4SStream) saveStreamM4s() (e error) {
 
                        // 传递关键帧
                        if !keyframe.IsEmpty() {
-                               buf, unlock := keyframe.GetPureBufRLock()
-                               t.bootBufPush(buf)
-                               t.Stream_msg.PushLock_tag(`data`, buf)
+                               keyframeBuf, unlock := keyframe.GetPureBufRLock()
+                               t.bootBufPush(keyframeBuf)
+                               t.Stream_msg.PushLock_tag(`data`, keyframeBuf)
                                unlock()
                                keyframe.Reset()
                                t.frameCount += 1
@@ -1045,44 +1089,6 @@ func (t *M4SStream) saveStreamM4s() (e error) {
                                return
                        }
                }
-
-               // 获取解析m3u8
-               var m4s_links, err = t.fetchParseM3U8(lastM4s, fmp4ListUpdateTo)
-               if err != nil {
-                       t.log.L(`E: `, `获取解析m3u8发生错误`, err)
-                       // if len(download_seq) != 0 {
-                       //      continue
-                       // }
-                       if !reqf.IsTimeout(err) {
-                               e = err
-                               break
-                       }
-               }
-
-               // 避免过于频繁的请求
-               fmp4Count += len(m4s_links)
-               if dru := time.Since(startT).Seconds(); dru > fmp4ListUpdateTo && fmp4Count == 0 {
-                       e = fmt.Errorf("%.2f 秒未产出切片", dru)
-                       t.log.L("E: ", "获取解析m3u8发生错误", e)
-                       break
-               } else {
-                       time.Sleep(time.Second * time.Duration(dru/float64(fmp4Count+1)+1))
-               }
-
-               // 设置最后的切片
-               for i := len(m4s_links) - 1; i >= 0; i-- {
-                       // fmt.Println("set last m4s", m4s_links[i].Base)
-                       if !m4s_links[i].isInit() && len(m4s_links[i].Base) > 0 {
-                               if lastM4s == nil {
-                                       lastM4s = &m4s_link_item{}
-                               }
-                               m4s_links[i].copyTo(lastM4s)
-                               break
-                       }
-               }
-
-               // 添加新切片到下载队列
-               download_seq = append(download_seq, m4s_links...)
        }
 
        return
diff --git a/go.mod b/go.mod
index d4efcdcf663e63d2405dac67f0e0140057ad7d66..474fa23495e978d50b877483b4f7fde880868e05 100644 (file)
--- a/go.mod
+++ b/go.mod
@@ -5,7 +5,7 @@ go 1.22
 require (
        github.com/gotk3/gotk3 v0.6.3
        github.com/mdp/qrterminal/v3 v3.2.0
-       github.com/qydysky/part v0.28.20240323212235
+       github.com/qydysky/part v0.28.20240325172911
        github.com/skip2/go-qrcode v0.0.0-20200617195104-da1b6568686e
        github.com/skratchdot/open-golang v0.0.0-20200116055534-eef842397966
        golang.org/x/text v0.14.0
diff --git a/go.sum b/go.sum
index bd25312d8e1e2c8da7648d773892badde205d5ab..032bbbe3614f013d0210ba68ee5f4b957a6ff25d 100644 (file)
--- a/go.sum
+++ b/go.sum
@@ -37,8 +37,8 @@ github.com/ncruces/go-strftime v0.1.9 h1:bY0MQC28UADQmHmaF5dgpLmImcShSi2kHU9XLdh
 github.com/ncruces/go-strftime v0.1.9/go.mod h1:Fwc5htZGVVkseilnfgOVb9mKy6w1naJmn9CehxcKcls=
 github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
 github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
-github.com/qydysky/part v0.28.20240323212235 h1:SODy4RbqsMCXb/yaFNBrXlZfARWMKBLz1421FPKBxVQ=
-github.com/qydysky/part v0.28.20240323212235/go.mod h1:XytV5dI1Y7+qvjhsa2TMvi55RBZQQf0LCDYQ1kUCYqM=
+github.com/qydysky/part v0.28.20240325172911 h1:2wu7BIpfsxvoXYZjzd79NQCU4B4ZY61Iu97NwxvupRQ=
+github.com/qydysky/part v0.28.20240325172911/go.mod h1:XytV5dI1Y7+qvjhsa2TMvi55RBZQQf0LCDYQ1kUCYqM=
 github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE=
 github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo=
 github.com/shirou/gopsutil v3.21.11+incompatible h1:+1+c1VGhc88SSonWP6foOcLhvnKlUeu/erjjvaPEYiI=