]> 127.0.0.1 Git - bili_danmu/.git/commitdiff
fmp4 decode fix
authorqydysky <32743305+qydysky@users.noreply.github.com>
Mon, 16 Jan 2023 16:29:37 +0000 (00:29 +0800)
committerqydysky <32743305+qydysky@users.noreply.github.com>
Mon, 16 Jan 2023 16:29:37 +0000 (00:29 +0800)
Reply/fmp4Decode.go
Reply/stream.go

index 752b7df68290f08a30dd524f183154f54777f5ce..b409b070d81b050988e48c4bb3647b3835e32ea5 100644 (file)
@@ -108,7 +108,10 @@ func (t *Fmp4Decoder) Init_fmp4(buf []byte) (b []byte, err error) {
                return nil, errors.New("未找到任何trak包")
        }
 
-       return append(buf[ftypI:ftypE], buf[moovI:moovE]...), nil
+       b = make([]byte, ftypE-ftypI+moovE-moovI)
+       copy(b[:ftypE-ftypI], buf[ftypI:ftypE])
+       copy(b[ftypE-ftypI:], buf[moovI:moovE])
+       return b, nil
 }
 
 func (t *Fmp4Decoder) Seach_stream_fmp4(buf []byte, keyframes *bufB) (cu int, err error) {
index 50a820c6707f379cbe9f6a91036118014d552bda..824e71f0436b986ff0cffb6768f52fec15dce242 100644 (file)
@@ -13,6 +13,7 @@ import (
        "path/filepath"
        "strconv"
        "strings"
+       "sync"
        "time"
 
        c "github.com/qydysky/bili_danmu/CV"
@@ -26,7 +27,7 @@ import (
        msgq "github.com/qydysky/part/msgq"
        reqf "github.com/qydysky/part/reqf"
        signal "github.com/qydysky/part/signal"
-       sync "github.com/qydysky/part/sync"
+       psync "github.com/qydysky/part/sync"
 )
 
 type M4SStream struct {
@@ -36,7 +37,7 @@ type M4SStream struct {
        config               M4SStream_Config   //配置
        stream_last_modified time.Time          //流地址更新时间
        // stream_expires       int64              //流到期时间
-       stream_hosts      sync.Map   //使用的流服务器
+       stream_hosts      psync.Map  //使用的流服务器
        stream_type       string     //流类型
        Stream_msg        *msgq.Msgq //流数据消息 tag:data
        first_buf         []byte     //m4s起始块 or flv起始块
@@ -44,6 +45,8 @@ type M4SStream struct {
        boot_buf_size     int        //快速启动缓冲长度
        boot_buf_locker   funcCtrl.BlockFunc
        last_m4s          *m4s_link_item   //最后一个切片
+       m4s_pool          []*m4s_link_item //切片pool
+       m4s_pool_l        sync.Mutex
        common            c.Common         //通用配置副本
        Current_save_path string           //明确的直播流保存目录
        Callback_start    func(*M4SStream) //实例开始的回调
@@ -67,7 +70,30 @@ type m4s_link_item struct {
        status       int       // 下载状态 0:未下载 1:正在下载 2:下载完成 3:下载失败
        tryDownCount int       // 下载次数 当=4时,不再下载,忽略此块
        data         []byte    // 下载的数据
-       createdTime  time.Time //创建时间
+       createdTime  time.Time // 创建时间
+       pooledTime   time.Time // 到pool时间
+}
+
+func (t *m4s_link_item) copyTo(to *m4s_link_item) {
+       // fmt.Println("copy to ", t.Base)
+       to.Url = t.Url
+       to.Base = t.Base
+       to.status = t.status
+       to.tryDownCount = t.tryDownCount
+       to.createdTime = t.createdTime
+}
+
+func (t *m4s_link_item) reset() *m4s_link_item {
+       if t == nil {
+               return t
+       }
+       t.Url = ""
+       t.Base = ""
+       t.status = 0
+       t.tryDownCount = 0
+       t.data = t.data[:0]
+       t.createdTime = time.Now()
+       return t
 }
 
 func (t *m4s_link_item) isInit() bool {
@@ -82,6 +108,58 @@ func (t *m4s_link_item) getNo() (int, error) {
        return strconv.Atoi(base[:len(base)-4])
 }
 
+func (t *M4SStream) getM4s() (p *m4s_link_item) {
+       t.m4s_pool_l.Lock()
+       defer t.m4s_pool_l.Unlock()
+
+       for i := 0; i < len(t.m4s_pool); i++ {
+               if t.m4s_pool[i].pooledTime.After(t.m4s_pool[i].createdTime) && time.Now().After(t.m4s_pool[i].pooledTime.Add(time.Second*10)) {
+                       // fmt.Println("=>", i)
+                       return t.m4s_pool[i].reset()
+               }
+       }
+       // fmt.Println("=>")
+       return &m4s_link_item{}
+}
+
+func (t *M4SStream) putM4s(ms ...*m4s_link_item) {
+       t.m4s_pool_l.Lock()
+       defer t.m4s_pool_l.Unlock()
+
+       for i := 0; i < len(ms); i++ {
+               if cap(ms[i].data) == 0 {
+                       ms[i] = nil
+                       ms = append(ms[:i], ms[i+1:]...)
+                       i--
+               } else if len(ms[i].data) != 0 {
+                       ms[i].pooledTime = time.Now()
+                       if len(t.m4s_pool) < 50 {
+                               t.m4s_pool = append(t.m4s_pool, ms[i])
+                       }
+               }
+               // else {
+               // fmt.Println("z", cap(ms[i].data))
+               // }
+       }
+
+       for i := 0; i < len(t.m4s_pool); i++ {
+               if t.m4s_pool[i].pooledTime.After(t.m4s_pool[i].createdTime) && time.Now().After(t.m4s_pool[i].pooledTime.Add(time.Second*30)) {
+                       t.m4s_pool[i].data = nil
+               }
+       }
+
+       // for i := 0; i < len(ms); i++ {
+       //      if ms[i].pooledTime.IsZero() {
+       //              fmt.Println("z", cap(ms[i].data))
+       //      }
+       //      ms[i].pooledTime = time.Now()
+       //      if len(t.m4s_pool) < 50 {
+       //              t.m4s_pool = append(t.m4s_pool, ms[i])
+       //      }
+       // }
+       // fmt.Println("size", len(t.m4s_pool))
+}
+
 func (t *M4SStream) Common() c.Common {
        return t.common
 }
@@ -194,7 +272,7 @@ func (t *M4SStream) fetchCheckStream() bool {
                if r.Response == nil {
                        t.log.L(`W: `, `live响应错误`)
                        t.common.Live = t.common.Live[1:]
-               } else if r.Response.StatusCode != 200 {
+               } else if r.Response.StatusCode&200 != 200 {
                        t.log.L(`W: `, `live响应错误`, r.Response.Status, string(r.Respon))
                        t.common.Live = t.common.Live[1:]
                }
@@ -215,12 +293,10 @@ func (t *M4SStream) fetchParseM3U8() (m4s_links []*m4s_link_item, m3u8_addon []b
 
                // 设置请求参数
                rval := reqf.Rval{
-                       Url:            m3u8_url.String(),
-                       Retry:          2,
-                       ConnectTimeout: 2000,
-                       ReadTimeout:    1000,
-                       Timeout:        2000,
-                       Proxy:          c.C.Proxy,
+                       Url:     m3u8_url.String(),
+                       Retry:   2,
+                       Timeout: 2000,
+                       Proxy:   c.C.Proxy,
                        Header: map[string]string{
                                `Host`:            m3u8_url.Host,
                                `User-Agent`:      `Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:103.0) Gecko/20100101 Firefox/103.0`,
@@ -273,6 +349,10 @@ func (t *M4SStream) fetchParseM3U8() (m4s_links []*m4s_link_item, m3u8_addon []b
 
                // 解析m3u8
                var tmp []*m4s_link_item
+               var lastNo int
+               if t.last_m4s != nil {
+                       lastNo, _ = t.last_m4s.getNo()
+               }
                for _, line := range bytes.Split(m3u8_respon, []byte("\n")) {
                        if len(line) == 0 {
                                continue
@@ -314,12 +394,34 @@ func (t *M4SStream) fetchParseM3U8() (m4s_links []*m4s_link_item, m3u8_addon []b
                                return
                        }
 
+                       {
+                               tmpBase := m4s_link
+                               // fmt.Println(tmpBase, t.last_m4s != nil)
+                               if tmpBase[0] == 'h' {
+                                       if t.last_m4s != nil {
+                                               continue
+                                       } else {
+                                               tmpBase = tmpBase[1:]
+                                       }
+                               }
+                               if no, _ := strconv.Atoi(tmpBase[:len(tmpBase)-4]); lastNo >= no {
+                                       // fmt.Println("skip", no)
+                                       continue
+                               }
+                       }
+
+                       // fmt.Println("->", m4s_link)
                        //将切片添加到返回切片数组
-                       tmp = append(tmp, &m4s_link_item{
-                               Url:         m3u8_url.ResolveReference(u).String(),
-                               Base:        m4s_link,
-                               createdTime: time.Now(),
-                       })
+                       p := t.getM4s()
+                       p.Url = m3u8_url.ResolveReference(u).String()
+                       p.Base = m4s_link
+                       p.createdTime = time.Now()
+                       tmp = append(tmp, p)
+               }
+
+               if len(tmp) == 0 {
+                       // fmt.Println("->", "empty", lastNo)
+                       return
                }
 
                // 检查是否服务器发生故障,产出多个切片
@@ -335,11 +437,6 @@ func (t *M4SStream) fetchParseM3U8() (m4s_links []*m4s_link_item, m3u8_addon []b
 
                m4s_links = append(m4s_links, tmp...)
 
-               // 设置最后的切片
-               defer func(last_m4s *m4s_link_item) {
-                       t.last_m4s = last_m4s
-               }(m4s_links[len(m4s_links)-1])
-
                // 首次下载
                if t.last_m4s == nil {
                        return
@@ -348,14 +445,17 @@ func (t *M4SStream) fetchParseM3U8() (m4s_links []*m4s_link_item, m3u8_addon []b
                // 只返回新增加的
                {
                        last_no, _ := t.last_m4s.getNo()
-                       for k, m4s_link := range m4s_links {
+                       for k := 0; k < len(m4s_links); k++ {
+                               m4s_link := m4s_links[k]
                                // 剔除初始段
                                if m4s_link.isInit() {
                                        m4s_links = append(m4s_links[:k], m4s_links[k+1:]...)
+                                       k--
                                }
                                no, _ := m4s_link.getNo()
                                if no == last_no {
                                        // 只返回新增加的切片
+                                       t.putM4s(m4s_links[:k+1]...)
                                        m4s_links = m4s_links[k+1:]
                                        // 只返回新增加的m3u8字节
                                        if index := bytes.Index(m3u8_addon, []byte(m4s_link.Base)); index != -1 {
@@ -406,12 +506,11 @@ func (t *M4SStream) fetchParseM3U8() (m4s_links []*m4s_link_item, m3u8_addon []b
                        }
 
                        //将切片添加到返回切片数组前
-                       m4s_links = append([]*m4s_link_item{
-                               {
-                                       Url:  m3u8_url.ResolveReference(u).String(),
-                                       Base: strconv.Itoa(guess_no) + `.m4s`,
-                               },
-                       }, m4s_links...)
+                       p := t.getM4s()
+                       p.Url = m3u8_url.ResolveReference(u).String()
+                       p.Base = strconv.Itoa(guess_no) + `.m4s`
+                       p.createdTime = time.Now()
+                       m4s_links = append([]*m4s_link_item{p}, m4s_links...)
                }
 
                // 请求解析成功,退出获取循环
@@ -429,7 +528,7 @@ func (t *M4SStream) saveStream() (e error) {
        t.Current_save_path = t.config.save_path + "/" + time.Now().Format("2006_01_02_15_04_05_000") + "_" + strconv.Itoa(t.common.Roomid) + `/`
 
        // 清除初始值
-       t.last_m4s = nil
+       t.last_m4s.reset()
 
        // 显示保存位置
        if rel, err := filepath.Rel(t.config.save_path, t.Current_save_path); err == nil {
@@ -664,6 +763,7 @@ func (t *M4SStream) saveStreamM4s() (e error) {
 
                // 存在待下载切片
                if len(download_seq) != 0 {
+
                        // 设置限制计划
                        download_limit.Plan(int64(len(download_seq)))
 
@@ -714,11 +814,9 @@ func (t *M4SStream) saveStreamM4s() (e error) {
 
                                        r := req.Item.(*reqf.Req)
                                        reqConfig := reqf.Rval{
-                                               Url:            link.Url,
-                                               ConnectTimeout: 2000,
-                                               ReadTimeout:    1000,
-                                               Timeout:        2000,
-                                               Proxy:          t.common.Proxy,
+                                               Url:     link.Url,
+                                               Timeout: 2000,
+                                               Proxy:   t.common.Proxy,
                                                Header: map[string]string{
                                                        `Connection`: `close`,
                                                },
@@ -737,8 +835,14 @@ func (t *M4SStream) saveStreamM4s() (e error) {
                                                // if usedt := r.UsedTime.Seconds(); usedt > 700 {
                                                //      t.log.L(`I: `, `hls切片下载慢`, usedt, `ms`)
                                                // }
-                                               link.data = make([]byte, len(r.Respon))
-                                               copy(link.data, r.Respon)
+                                               if len(link.data) > len(r.Respon) {
+                                                       link.data = link.data[:copy(link.data, r.Respon)]
+                                               } else {
+                                                       // if cap(link.data) < len(r.Respon) {
+                                                       //      fmt.Println(cap(link.data), len(r.Respon))
+                                                       // }
+                                                       link.data = append(link.data[:0], r.Respon...)
+                                               }
                                                link.status = 2 // 设置切片状态为下载完成
                                        }
                                }(v, t.Current_save_path)
@@ -746,17 +850,18 @@ func (t *M4SStream) saveStreamM4s() (e error) {
 
                        // 等待队列下载完成
                        download_limit.PlanDone(func() {
-                               time.Sleep(time.Millisecond * 10)
+                               time.Sleep(time.Millisecond * 20)
                        })
                }
 
                // 传递已下载切片
                for k := 0; k < len(download_seq); k++ {
-                       v := download_seq[k]
+                       // v := download_seq[k]
 
-                       if v.status != 2 {
-                               if v.tryDownCount >= 4 {
+                       if download_seq[k].status != 2 {
+                               if download_seq[k].tryDownCount >= 4 {
                                        //下载了4次,任未下载成功,忽略此块
+                                       t.putM4s(download_seq[k])
                                        download_seq = append(download_seq[:k], download_seq[k+1:]...)
                                        k -= 1
                                        continue
@@ -765,10 +870,13 @@ func (t *M4SStream) saveStreamM4s() (e error) {
                                }
                        }
 
-                       if strings.Contains(v.Base, `h`) {
-                               if header, e := fmp4Decoder.Init_fmp4(v.data); e != nil {
+                       // no, _ := download_seq[k].getNo()
+                       // fmt.Println("download_seq ", no, download_seq[k].status, t.first_buf == nil)
+
+                       if strings.Contains(download_seq[k].Base, `h`) {
+                               if header, e := fmp4Decoder.Init_fmp4(download_seq[k].data); e != nil {
                                        t.log.L(`E: `, e, `重试!`)
-                                       v.status = 3
+                                       download_seq[k].status = 3
                                        break
                                } else {
                                        for _, trak := range fmp4Decoder.traks {
@@ -781,20 +889,23 @@ func (t *M4SStream) saveStreamM4s() (e error) {
                                                out.Sync()
                                        }
                                }
+                               t.putM4s(download_seq[k])
                                download_seq = append(download_seq[:k], download_seq[k+1:]...)
                                k -= 1
                                continue
                        } else if t.first_buf == nil {
+                               t.putM4s(download_seq[k])
                                download_seq = append(download_seq[:k], download_seq[k+1:]...)
                                k -= 1
                                continue
                        }
 
-                       buf.append(v.data)
+                       buf.append(download_seq[k].data)
+                       t.putM4s(download_seq[k])
                        download_seq = append(download_seq[:k], download_seq[k+1:]...)
                        k -= 1
 
-                       last_avilable_offset, err := fmp4Decoder.Seach_stream_fmp4(buf.getCopyBuf(), &fmp4KeyFrames)
+                       last_avilable_offset, err := fmp4Decoder.Seach_stream_fmp4(buf.getPureBuf(), &fmp4KeyFrames)
                        if err != nil {
                                if !errors.Is(err, io.EOF) {
                                        t.log.L(`E: `, err)
@@ -880,6 +991,17 @@ func (t *M4SStream) saveStreamM4s() (e error) {
                if len(m4s_links) == 0 {
                        time.Sleep(time.Second)
                        continue
+               } else {
+                       // 设置最后的切片
+                       if t.last_m4s == nil {
+                               t.last_m4s = &m4s_link_item{}
+                       }
+                       for i := len(m4s_links) - 1; i > 0; i-- {
+                               if !m4s_links[i].isInit() {
+                                       m4s_links[i].copyTo(t.last_m4s)
+                                       break
+                               }
+                       }
                }
 
                // 添加新切片到下载队列
@@ -942,7 +1064,7 @@ func (t *M4SStream) Start() bool {
                t.reqPool = t.common.ReqPool
 
                // 初始化切片消息
-               t.Stream_msg = msgq.New(15)
+               t.Stream_msg = msgq.New(5)
 
                // 初始化快速启动缓冲
                if v, ok := t.common.K_v.LoadV(`直播Web缓冲长度`).(float64); ok && v != 0 {
@@ -1045,13 +1167,14 @@ func (t *M4SStream) pusherM4s(w http.ResponseWriter, r *http.Request) {
        }
 
        //写入快速启动缓冲
-       if t.boot_buf != nil && len(t.boot_buf) > 0 {
-               if _, err := w.Write(t.getBootBuf()); err != nil {
+       for i := 0; i < len(t.boot_buf); i++ {
+               if _, err := w.Write(t.boot_buf[i]); err != nil {
                        return
-               } else if flushSupport {
-                       flusher.Flush()
                }
        }
+       if len(t.boot_buf) != 0 && flushSupport {
+               flusher.Flush()
+       }
 
        cancel := make(chan struct{})
 
@@ -1097,13 +1220,14 @@ func (t *M4SStream) pusherFlv(w http.ResponseWriter, r *http.Request) {
        }
 
        //写入快速启动缓冲
-       if t.boot_buf != nil && len(t.boot_buf) > 0 {
-               if _, err := w.Write(t.getBootBuf()); err != nil {
+       for i := 0; i < len(t.boot_buf); i++ {
+               if _, err := w.Write(t.boot_buf[i]); err != nil {
                        return
-               } else if flushSupport {
-                       flusher.Flush()
                }
        }
+       if len(t.boot_buf) != 0 && flushSupport {
+               flusher.Flush()
+       }
 
        cancel := make(chan struct{})
 
@@ -1139,15 +1263,11 @@ func (t *M4SStream) bootBufPush(buf []byte) {
                defer t.boot_buf_locker.UnBlock()
 
                if len(t.boot_buf) == t.boot_buf_size {
-                       t.boot_buf = t.boot_buf[1:]
+                       for i := 0; i < len(t.boot_buf)-1; i++ {
+                               t.boot_buf[i] = t.boot_buf[i+1]
+                       }
+                       t.boot_buf = t.boot_buf[:len(t.boot_buf)-1]
                }
                t.boot_buf = append(t.boot_buf, buf)
        }
 }
-
-func (t *M4SStream) getBootBuf() (buf []byte) {
-       for i := 0; i < len(t.boot_buf); i++ {
-               buf = append(buf, t.boot_buf[i]...)
-       }
-       return buf
-}