"github.com/dustin/go-humanize"
F "github.com/qydysky/bili_danmu/F"
+ perrors "github.com/qydysky/part/errors"
slice "github.com/qydysky/part/slice"
)
AVTDiff float64 // 音视频时间戳容差
}
+func NewFmp4Decoder() *Fmp4Decoder {
+ return &Fmp4Decoder{
+ traks: make(map[int]*trak),
+ buf: slice.New[byte](),
+ }
+}
+
func (t *Fmp4Decoder) Init_fmp4(buf []byte) (b []byte, err error) {
var ftypI, ftypE, moovI, moovE int
[]string{"tkhd", "mdia", "mdhd", "hdlr"},
func(m []ie) (bool, error) {
tackId := int(F.Btoi(buf, m[0].i+20, 4))
- if t.traks == nil {
- t.traks = make(map[int]*trak)
- }
t.traks[tackId] = &trak{
trackID: tackId,
// firstTimeStamp: -1,
return b, nil
}
+var (
+ ErrBufTooLarge = errors.New("ErrBufTooLarge")
+ ErrMisTraks = errors.New("ErrMisTraks")
+)
+
func (t *Fmp4Decoder) Search_stream_fmp4(buf []byte, keyframe *slice.Buf[byte]) (cu int, err error) {
if len(buf) > humanize.MByte*100 {
- err = errors.New("buf too large")
- return
+ return 0, ErrBufTooLarge
}
if len(t.traks) == 0 {
- err = errors.New("未初始化traks")
- return
- }
- if t.buf == nil {
- t.buf = slice.New[byte]()
+ return 0, ErrMisTraks
}
t.buf.Reset()
+ keyframe.Reset()
+
+ defer func() {
+ if err != nil {
+ keyframe.Reset()
+ cu = 0
+ }
+ }()
+
var (
haveKeyframe bool
bufModified = t.buf.GetModified()
t.buf.Reset()
haveKeyframe = false
cu = m[0].i
- return false, nil
+ return false, e
}
}
- if nil != check_set_maxT(ts, func(_ timeStamp) error {
+ if e := check_set_maxT(ts, func(_ timeStamp) error {
return errors.New("skip")
}, func(_ timeStamp) error {
t.buf.Reset()
haveKeyframe = false
cu = m[0].i
return errors.New("skip")
- }) {
- return false, nil
+ }); e != nil {
+ return false, e
}
}
//deal frame
if keyframeMoof {
if v, e := t.buf.HadModified(bufModified); e == nil && v && !t.buf.IsEmpty() {
- _ = t.buf.AppendTo(keyframe)
+ if e := t.buf.AppendTo(keyframe); e != nil {
+ return false, e
+ }
cu = m[0].i
t.buf.Reset()
}
cu = m[6].e
}
if haveKeyframe {
- _ = t.buf.Append(buf[m[0].i:m[6].e])
+ if e := t.buf.Append(buf[m[0].i:m[6].e]); e != nil {
+ return false, e
+ }
}
return false, nil
},
t.buf.Reset()
haveKeyframe = false
cu = m[0].i
- return false, nil
+ return false, e
}
}
switch handlerType {
case 's':
audio = ts
}
- if nil != check_set_maxT(ts, func(_ timeStamp) error {
+ if e := check_set_maxT(ts, func(_ timeStamp) error {
return errors.New("skip")
}, func(_ timeStamp) error {
t.buf.Reset()
haveKeyframe = false
cu = m[0].i
return errors.New("skip")
- }) {
- return false, nil
+ }); e != nil {
+ return false, e
}
}
{
t.buf.Reset()
haveKeyframe = false
cu = m[0].i
- return false, nil
+ return false, e
}
}
switch handlerType {
case 's':
audio = ts
}
- if nil != check_set_maxT(ts, func(_ timeStamp) error {
+ if e := check_set_maxT(ts, func(_ timeStamp) error {
return errors.New("skip")
}, func(_ timeStamp) error {
t.buf.Reset()
haveKeyframe = false
cu = m[0].i
return errors.New("skip")
- }) {
- return false, nil
+ }); e != nil {
+ return false, e
}
}
//deal frame
if keyframeMoof {
if v, e := t.buf.HadModified(bufModified); e == nil && v && !t.buf.IsEmpty() {
- _ = t.buf.AppendTo(keyframe)
+ if e := t.buf.AppendTo(keyframe); e != nil {
+ return false, e
+ }
cu = m[0].i
t.buf.Reset()
}
cu = m[10].e
}
if haveKeyframe {
- _ = t.buf.Append(buf[m[0].i:m[10].e])
+ if e := t.buf.Append(buf[m[0].i:m[10].e]); e != nil {
+ return false, e
+ }
}
return false, nil
}})
return
}
+var (
+ ErrMisBox = perrors.New("decode", "ErrMisBox")
+ ErrCantResync = perrors.New("decode")
+)
+
func decode(buf []byte, reSyncboxName string) (m []ie, err error) {
var cu int
if E != nil {
if errors.Is(E, io.EOF) {
if len(m) == 0 {
- err = errors.New("未找到box")
+ err = ErrMisBox
}
return
}
m = m[:0]
continue
}
- err = errors.New(E.Error() + " > 未能reSync")
+ err = ErrCantResync.WithReason(E.Error() + "> 未能reSync")
return
}
return
}
+var (
+ ErrUnkownBox = perrors.New("ErrUnkownBox")
+)
+
func searchBox(buf []byte, cu *int) (boxName string, i int, e int, err error) {
i = *cu
e = i + int(F.Btoi(buf, *cu, 4))
boxName = string(buf[*cu+4 : *cu+8])
isPureBoxOrNeedSkip, ok := boxs[boxName]
if !ok {
- err = errors.New("未知包: " + boxName)
+ err = ErrUnkownBox.WithReason("未知包: " + boxName)
} else if e > len(buf) {
err = io.EOF
} else if isPureBoxOrNeedSkip {
Base string // m4s文件名
isHeader bool // m4sHeader
status int // 下载状态 0:未下载 1:正在下载 2:下载完成 3:下载失败
+ err error // 下载状态 3:下载失败 时的错误
tryDownCount int // 下载次数 当=3时,不再下载,忽略此块
data *slice.Buf[byte] // 下载的数据
createdTime time.Time // 创建时间
return strconv.Atoi(base[:len(base)-4])
}
-func (link *m4s_link_item) download(reqPool *pool.Buf[reqf.Req], reqConfig reqf.Rval) {
+func (link *m4s_link_item) download(reqPool *pool.Buf[reqf.Req], reqConfig reqf.Rval) error {
link.status = 1 // 设置切片状态为正在下载
+ link.err = nil
link.tryDownCount += 1
+ link.data.Reset()
r := reqPool.Get()
defer reqPool.Put(r)
// link.tryDownCount = 3 // 设置切片状态为下载失败
// }
link.status = 3 // 设置切片状态为下载失败
+ link.err = e
+ return e
+ } else if e = link.data.Append(r.Respon); e != nil {
+ link.status = 3 // 设置切片状态为下载失败
+ link.err = e
+ return e
} else {
- link.data.Reset()
- _ = link.data.Append(r.Respon)
link.status = 2 // 设置切片状态为下载完成
+ return nil
}
}
// 同时下载数限制
downloadLimit = funcCtrl.NewBlockFuncN(3)
buf = slice.New[byte]()
- fmp4Decoder = &Fmp4Decoder{}
+ fmp4Decoder = NewFmp4Decoder()
keyframe = slice.New[byte]()
lastM4s *m4s_link_item
to = 3
// 下载循环
for download_seq := []*m4s_link_item{}; ; {
+ // 获取解析m3u8
+ {
+ var m4s_links, err = t.fetchParseM3U8(lastM4s, fmp4ListUpdateTo)
+ if err != nil {
+ t.log.L(`E: `, `获取解析m3u8发生错误`, err)
+ // if len(download_seq) != 0 {
+ // continue
+ // }
+ if !reqf.IsTimeout(err) {
+ e = err
+ break
+ }
+ }
- // 存在待下载切片
- if len(download_seq) != 0 {
- var downingCount = 0 //本轮下载数量
+ // 避免过于频繁的请求
+ fmp4Count += len(m4s_links)
+ if dru := time.Since(startT).Seconds(); dru > fmp4ListUpdateTo && fmp4Count == 0 {
+ e = fmt.Errorf("%.2f 秒未产出切片", dru)
+ t.log.L("E: ", "获取解析m3u8发生错误", e)
+ break
+ } else {
+ time.Sleep(time.Second * time.Duration(dru/float64(fmp4Count+1)+1))
+ }
- // 下载切片
- for _, v := range download_seq {
+ // 设置最后的切片
+ for i := len(m4s_links) - 1; i >= 0; i-- {
+ // fmt.Println("set last m4s", m4s_links[i].Base)
+ if !m4s_links[i].isInit() && len(m4s_links[i].Base) > 0 {
+ if lastM4s == nil {
+ lastM4s = &m4s_link_item{}
+ }
+ m4s_links[i].copyTo(lastM4s)
+ break
+ }
+ }
+ // 添加新切片到下载队列
+ download_seq = append(download_seq, m4s_links...)
+ }
+
+ // 下载切片
+ {
+ var downFail bool
+ for i := 0; i < len(download_seq); i++ {
// 已下载但还未移除的切片
- if v.status == 2 {
+ if download_seq[i].status == 2 {
continue
}
// 每次最多只下载10个切片
- if downingCount >= 10 {
- t.log.L(`T: `, `延迟切片下载 数量(`, len(download_seq)-downingCount, `)`)
+ if i >= 10 {
+ t.log.L(`T: `, `延迟切片下载 数量(`, len(download_seq)-i, `)`)
break
}
- downingCount += 1
done := downloadLimit.Block()
// 故障转移
- if v.status == 3 {
- if linkUrl, e := url.Parse(v.Url); e == nil {
+ if download_seq[i].status == 3 {
+ if linkUrl, e := url.Parse(download_seq[i].Url); e == nil {
oldHost := linkUrl.Host
// 将此切片服务器设置停用
t.common.DisableLiveAuto(oldHost)
linkUrl.Host = vl.Host()
t.log.L(`W: `, `切片下载失败,故障转移`, oldHost, ` -> `, linkUrl.Host)
}
- v.Url = linkUrl.String()
+ download_seq[i].Url = linkUrl.String()
}
}
go func(link *m4s_link_item) {
defer done()
- link.download(t.reqPool, reqf.Rval{
+ if e := link.download(t.reqPool, reqf.Rval{
Timeout: to * 1000,
WriteLoopTO: (to + 2) * 1000,
Proxy: t.common.Proxy,
Header: map[string]string{
`Connection`: `close`,
},
- })
- }(v)
+ }); e != nil {
+ downFail = true
+ t.log.L(`W: `, `切片下载失败`, link.Base, e)
+ }
+ }(download_seq[i])
}
// 等待队列下载完成
downloadLimit.BlockAll()()
+
+ if downFail {
+ continue
+ }
}
// 传递已下载切片
for k := 0; k < len(download_seq); k++ {
// v := download_seq[k]
- if download_seq[k].status != 2 {
- if download_seq[k].tryDownCount >= 3 {
- //下载了2次,任未下载成功,忽略此块
- t.putM4s(download_seq...)
- //丢弃所有数据
- buf.Reset()
- t.log.L(`E: `, `切片下载失败`)
- return errors.New(`切片下载失败`)
- } else {
- break
- }
- }
+ // if download_seq[k].status != 2 {
+ // if download_seq[k].tryDownCount >= 3 {
+ // //下载了2次,任未下载成功,忽略此块
+ // t.putM4s(download_seq...)
+ // //丢弃所有数据
+ // buf.Reset()
+ // t.log.L(`E: `, `切片下载失败`, download_seq[k].err)
+ // return errors.New(`切片下载失败` + download_seq[k].err.Error())
+ // } else {
+ // break
+ // }
+ // }
- // no, _ := download_seq[k].getNo()
- // fmt.Println("download_seq ", no, download_seq[k].status, download_seq[k].data.Size(), len(t.first_buf))
if download_seq[k].isInit() {
{
buf, unlock := download_seq[k].data.GetPureBufRLock()
continue
}
- _ = download_seq[k].data.AppendTo(buf)
+ if e := download_seq[k].data.AppendTo(buf); e != nil {
+ t.log.L(`E: `, e)
+ }
t.putM4s(download_seq[k])
download_seq = append(download_seq[:k], download_seq[k+1:]...)
k -= 1
last_available_offset, err := fmp4Decoder.Search_stream_fmp4(buff, keyframe)
unlock()
- if err != nil {
- if !errors.Is(err, io.EOF) {
- t.log.L(`E: `, err)
- //丢弃所有数据
- buf.Reset()
- e = err
- return
- // }
- } else {
- keyframe.Reset()
- last_available_offset = 0
- }
+ if err != nil && !errors.Is(err, io.EOF) {
+ t.log.L(`E: `, err)
+ //丢弃所有数据
+ buf.Reset()
+ e = err
+ return
}
// no, _ := download_seq[k].getNo()
// 传递关键帧
if !keyframe.IsEmpty() {
- buf, unlock := keyframe.GetPureBufRLock()
- t.bootBufPush(buf)
- t.Stream_msg.PushLock_tag(`data`, buf)
+ keyframeBuf, unlock := keyframe.GetPureBufRLock()
+ t.bootBufPush(keyframeBuf)
+ t.Stream_msg.PushLock_tag(`data`, keyframeBuf)
unlock()
keyframe.Reset()
t.frameCount += 1
return
}
}
-
- // 获取解析m3u8
- var m4s_links, err = t.fetchParseM3U8(lastM4s, fmp4ListUpdateTo)
- if err != nil {
- t.log.L(`E: `, `获取解析m3u8发生错误`, err)
- // if len(download_seq) != 0 {
- // continue
- // }
- if !reqf.IsTimeout(err) {
- e = err
- break
- }
- }
-
- // 避免过于频繁的请求
- fmp4Count += len(m4s_links)
- if dru := time.Since(startT).Seconds(); dru > fmp4ListUpdateTo && fmp4Count == 0 {
- e = fmt.Errorf("%.2f 秒未产出切片", dru)
- t.log.L("E: ", "获取解析m3u8发生错误", e)
- break
- } else {
- time.Sleep(time.Second * time.Duration(dru/float64(fmp4Count+1)+1))
- }
-
- // 设置最后的切片
- for i := len(m4s_links) - 1; i >= 0; i-- {
- // fmt.Println("set last m4s", m4s_links[i].Base)
- if !m4s_links[i].isInit() && len(m4s_links[i].Base) > 0 {
- if lastM4s == nil {
- lastM4s = &m4s_link_item{}
- }
- m4s_links[i].copyTo(lastM4s)
- break
- }
- }
-
- // 添加新切片到下载队列
- download_seq = append(download_seq, m4s_links...)
}
return