From aa44ca4010aca7f61666d68cd58ffc6edd8fb8d9 Mon Sep 17 00:00:00 2001 From: qydysky <32743305+qydysky@users.noreply.github.com> Date: Tue, 17 Jan 2023 00:29:37 +0800 Subject: [PATCH] fmp4 decode fix --- Reply/fmp4Decode.go | 5 +- Reply/stream.go | 242 +++++++++++++++++++++++++++++++++----------- 2 files changed, 185 insertions(+), 62 deletions(-) diff --git a/Reply/fmp4Decode.go b/Reply/fmp4Decode.go index 752b7df..b409b07 100644 --- a/Reply/fmp4Decode.go +++ b/Reply/fmp4Decode.go @@ -108,7 +108,10 @@ func (t *Fmp4Decoder) Init_fmp4(buf []byte) (b []byte, err error) { return nil, errors.New("未找到任何trak包") } - return append(buf[ftypI:ftypE], buf[moovI:moovE]...), nil + b = make([]byte, ftypE-ftypI+moovE-moovI) + copy(b[:ftypE-ftypI], buf[ftypI:ftypE]) + copy(b[ftypE-ftypI:], buf[moovI:moovE]) + return b, nil } func (t *Fmp4Decoder) Seach_stream_fmp4(buf []byte, keyframes *bufB) (cu int, err error) { diff --git a/Reply/stream.go b/Reply/stream.go index 50a820c..824e71f 100644 --- a/Reply/stream.go +++ b/Reply/stream.go @@ -13,6 +13,7 @@ import ( "path/filepath" "strconv" "strings" + "sync" "time" c "github.com/qydysky/bili_danmu/CV" @@ -26,7 +27,7 @@ import ( msgq "github.com/qydysky/part/msgq" reqf "github.com/qydysky/part/reqf" signal "github.com/qydysky/part/signal" - sync "github.com/qydysky/part/sync" + psync "github.com/qydysky/part/sync" ) type M4SStream struct { @@ -36,7 +37,7 @@ type M4SStream struct { config M4SStream_Config //配置 stream_last_modified time.Time //流地址更新时间 // stream_expires int64 //流到期时间 - stream_hosts sync.Map //使用的流服务器 + stream_hosts psync.Map //使用的流服务器 stream_type string //流类型 Stream_msg *msgq.Msgq //流数据消息 tag:data first_buf []byte //m4s起始块 or flv起始块 @@ -44,6 +45,8 @@ type M4SStream struct { boot_buf_size int //快速启动缓冲长度 boot_buf_locker funcCtrl.BlockFunc last_m4s *m4s_link_item //最后一个切片 + m4s_pool []*m4s_link_item //切片pool + m4s_pool_l sync.Mutex common c.Common //通用配置副本 Current_save_path string //明确的直播流保存目录 Callback_start func(*M4SStream) //实例开始的回调 @@ -67,7 +70,30 @@ type m4s_link_item struct { status int // 下载状态 0:未下载 1:正在下载 2:下载完成 3:下载失败 tryDownCount int // 下载次数 当=4时,不再下载,忽略此块 data []byte // 下载的数据 - createdTime time.Time //创建时间 + createdTime time.Time // 创建时间 + pooledTime time.Time // 到pool时间 +} + +func (t *m4s_link_item) copyTo(to *m4s_link_item) { + // fmt.Println("copy to ", t.Base) + to.Url = t.Url + to.Base = t.Base + to.status = t.status + to.tryDownCount = t.tryDownCount + to.createdTime = t.createdTime +} + +func (t *m4s_link_item) reset() *m4s_link_item { + if t == nil { + return t + } + t.Url = "" + t.Base = "" + t.status = 0 + t.tryDownCount = 0 + t.data = t.data[:0] + t.createdTime = time.Now() + return t } func (t *m4s_link_item) isInit() bool { @@ -82,6 +108,58 @@ func (t *m4s_link_item) getNo() (int, error) { return strconv.Atoi(base[:len(base)-4]) } +func (t *M4SStream) getM4s() (p *m4s_link_item) { + t.m4s_pool_l.Lock() + defer t.m4s_pool_l.Unlock() + + for i := 0; i < len(t.m4s_pool); i++ { + if t.m4s_pool[i].pooledTime.After(t.m4s_pool[i].createdTime) && time.Now().After(t.m4s_pool[i].pooledTime.Add(time.Second*10)) { + // fmt.Println("=>", i) + return t.m4s_pool[i].reset() + } + } + // fmt.Println("=>") + return &m4s_link_item{} +} + +func (t *M4SStream) putM4s(ms ...*m4s_link_item) { + t.m4s_pool_l.Lock() + defer t.m4s_pool_l.Unlock() + + for i := 0; i < len(ms); i++ { + if cap(ms[i].data) == 0 { + ms[i] = nil + ms = append(ms[:i], ms[i+1:]...) + i-- + } else if len(ms[i].data) != 0 { + ms[i].pooledTime = time.Now() + if len(t.m4s_pool) < 50 { + t.m4s_pool = append(t.m4s_pool, ms[i]) + } + } + // else { + // fmt.Println("z", cap(ms[i].data)) + // } + } + + for i := 0; i < len(t.m4s_pool); i++ { + if t.m4s_pool[i].pooledTime.After(t.m4s_pool[i].createdTime) && time.Now().After(t.m4s_pool[i].pooledTime.Add(time.Second*30)) { + t.m4s_pool[i].data = nil + } + } + + // for i := 0; i < len(ms); i++ { + // if ms[i].pooledTime.IsZero() { + // fmt.Println("z", cap(ms[i].data)) + // } + // ms[i].pooledTime = time.Now() + // if len(t.m4s_pool) < 50 { + // t.m4s_pool = append(t.m4s_pool, ms[i]) + // } + // } + // fmt.Println("size", len(t.m4s_pool)) +} + func (t *M4SStream) Common() c.Common { return t.common } @@ -194,7 +272,7 @@ func (t *M4SStream) fetchCheckStream() bool { if r.Response == nil { t.log.L(`W: `, `live响应错误`) t.common.Live = t.common.Live[1:] - } else if r.Response.StatusCode != 200 { + } else if r.Response.StatusCode&200 != 200 { t.log.L(`W: `, `live响应错误`, r.Response.Status, string(r.Respon)) t.common.Live = t.common.Live[1:] } @@ -215,12 +293,10 @@ func (t *M4SStream) fetchParseM3U8() (m4s_links []*m4s_link_item, m3u8_addon []b // 设置请求参数 rval := reqf.Rval{ - Url: m3u8_url.String(), - Retry: 2, - ConnectTimeout: 2000, - ReadTimeout: 1000, - Timeout: 2000, - Proxy: c.C.Proxy, + Url: m3u8_url.String(), + Retry: 2, + Timeout: 2000, + Proxy: c.C.Proxy, Header: map[string]string{ `Host`: m3u8_url.Host, `User-Agent`: `Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:103.0) Gecko/20100101 Firefox/103.0`, @@ -273,6 +349,10 @@ func (t *M4SStream) fetchParseM3U8() (m4s_links []*m4s_link_item, m3u8_addon []b // 解析m3u8 var tmp []*m4s_link_item + var lastNo int + if t.last_m4s != nil { + lastNo, _ = t.last_m4s.getNo() + } for _, line := range bytes.Split(m3u8_respon, []byte("\n")) { if len(line) == 0 { continue @@ -314,12 +394,34 @@ func (t *M4SStream) fetchParseM3U8() (m4s_links []*m4s_link_item, m3u8_addon []b return } + { + tmpBase := m4s_link + // fmt.Println(tmpBase, t.last_m4s != nil) + if tmpBase[0] == 'h' { + if t.last_m4s != nil { + continue + } else { + tmpBase = tmpBase[1:] + } + } + if no, _ := strconv.Atoi(tmpBase[:len(tmpBase)-4]); lastNo >= no { + // fmt.Println("skip", no) + continue + } + } + + // fmt.Println("->", m4s_link) //将切片添加到返回切片数组 - tmp = append(tmp, &m4s_link_item{ - Url: m3u8_url.ResolveReference(u).String(), - Base: m4s_link, - createdTime: time.Now(), - }) + p := t.getM4s() + p.Url = m3u8_url.ResolveReference(u).String() + p.Base = m4s_link + p.createdTime = time.Now() + tmp = append(tmp, p) + } + + if len(tmp) == 0 { + // fmt.Println("->", "empty", lastNo) + return } // 检查是否服务器发生故障,产出多个切片 @@ -335,11 +437,6 @@ func (t *M4SStream) fetchParseM3U8() (m4s_links []*m4s_link_item, m3u8_addon []b m4s_links = append(m4s_links, tmp...) - // 设置最后的切片 - defer func(last_m4s *m4s_link_item) { - t.last_m4s = last_m4s - }(m4s_links[len(m4s_links)-1]) - // 首次下载 if t.last_m4s == nil { return @@ -348,14 +445,17 @@ func (t *M4SStream) fetchParseM3U8() (m4s_links []*m4s_link_item, m3u8_addon []b // 只返回新增加的 { last_no, _ := t.last_m4s.getNo() - for k, m4s_link := range m4s_links { + for k := 0; k < len(m4s_links); k++ { + m4s_link := m4s_links[k] // 剔除初始段 if m4s_link.isInit() { m4s_links = append(m4s_links[:k], m4s_links[k+1:]...) + k-- } no, _ := m4s_link.getNo() if no == last_no { // 只返回新增加的切片 + t.putM4s(m4s_links[:k+1]...) m4s_links = m4s_links[k+1:] // 只返回新增加的m3u8字节 if index := bytes.Index(m3u8_addon, []byte(m4s_link.Base)); index != -1 { @@ -406,12 +506,11 @@ func (t *M4SStream) fetchParseM3U8() (m4s_links []*m4s_link_item, m3u8_addon []b } //将切片添加到返回切片数组前 - m4s_links = append([]*m4s_link_item{ - { - Url: m3u8_url.ResolveReference(u).String(), - Base: strconv.Itoa(guess_no) + `.m4s`, - }, - }, m4s_links...) + p := t.getM4s() + p.Url = m3u8_url.ResolveReference(u).String() + p.Base = strconv.Itoa(guess_no) + `.m4s` + p.createdTime = time.Now() + m4s_links = append([]*m4s_link_item{p}, m4s_links...) } // 请求解析成功,退出获取循环 @@ -429,7 +528,7 @@ func (t *M4SStream) saveStream() (e error) { t.Current_save_path = t.config.save_path + "/" + time.Now().Format("2006_01_02_15_04_05_000") + "_" + strconv.Itoa(t.common.Roomid) + `/` // 清除初始值 - t.last_m4s = nil + t.last_m4s.reset() // 显示保存位置 if rel, err := filepath.Rel(t.config.save_path, t.Current_save_path); err == nil { @@ -664,6 +763,7 @@ func (t *M4SStream) saveStreamM4s() (e error) { // 存在待下载切片 if len(download_seq) != 0 { + // 设置限制计划 download_limit.Plan(int64(len(download_seq))) @@ -714,11 +814,9 @@ func (t *M4SStream) saveStreamM4s() (e error) { r := req.Item.(*reqf.Req) reqConfig := reqf.Rval{ - Url: link.Url, - ConnectTimeout: 2000, - ReadTimeout: 1000, - Timeout: 2000, - Proxy: t.common.Proxy, + Url: link.Url, + Timeout: 2000, + Proxy: t.common.Proxy, Header: map[string]string{ `Connection`: `close`, }, @@ -737,8 +835,14 @@ func (t *M4SStream) saveStreamM4s() (e error) { // if usedt := r.UsedTime.Seconds(); usedt > 700 { // t.log.L(`I: `, `hls切片下载慢`, usedt, `ms`) // } - link.data = make([]byte, len(r.Respon)) - copy(link.data, r.Respon) + if len(link.data) > len(r.Respon) { + link.data = link.data[:copy(link.data, r.Respon)] + } else { + // if cap(link.data) < len(r.Respon) { + // fmt.Println(cap(link.data), len(r.Respon)) + // } + link.data = append(link.data[:0], r.Respon...) + } link.status = 2 // 设置切片状态为下载完成 } }(v, t.Current_save_path) @@ -746,17 +850,18 @@ func (t *M4SStream) saveStreamM4s() (e error) { // 等待队列下载完成 download_limit.PlanDone(func() { - time.Sleep(time.Millisecond * 10) + time.Sleep(time.Millisecond * 20) }) } // 传递已下载切片 for k := 0; k < len(download_seq); k++ { - v := download_seq[k] + // v := download_seq[k] - if v.status != 2 { - if v.tryDownCount >= 4 { + if download_seq[k].status != 2 { + if download_seq[k].tryDownCount >= 4 { //下载了4次,任未下载成功,忽略此块 + t.putM4s(download_seq[k]) download_seq = append(download_seq[:k], download_seq[k+1:]...) k -= 1 continue @@ -765,10 +870,13 @@ func (t *M4SStream) saveStreamM4s() (e error) { } } - if strings.Contains(v.Base, `h`) { - if header, e := fmp4Decoder.Init_fmp4(v.data); e != nil { + // no, _ := download_seq[k].getNo() + // fmt.Println("download_seq ", no, download_seq[k].status, t.first_buf == nil) + + if strings.Contains(download_seq[k].Base, `h`) { + if header, e := fmp4Decoder.Init_fmp4(download_seq[k].data); e != nil { t.log.L(`E: `, e, `重试!`) - v.status = 3 + download_seq[k].status = 3 break } else { for _, trak := range fmp4Decoder.traks { @@ -781,20 +889,23 @@ func (t *M4SStream) saveStreamM4s() (e error) { out.Sync() } } + t.putM4s(download_seq[k]) download_seq = append(download_seq[:k], download_seq[k+1:]...) k -= 1 continue } else if t.first_buf == nil { + t.putM4s(download_seq[k]) download_seq = append(download_seq[:k], download_seq[k+1:]...) k -= 1 continue } - buf.append(v.data) + buf.append(download_seq[k].data) + t.putM4s(download_seq[k]) download_seq = append(download_seq[:k], download_seq[k+1:]...) k -= 1 - last_avilable_offset, err := fmp4Decoder.Seach_stream_fmp4(buf.getCopyBuf(), &fmp4KeyFrames) + last_avilable_offset, err := fmp4Decoder.Seach_stream_fmp4(buf.getPureBuf(), &fmp4KeyFrames) if err != nil { if !errors.Is(err, io.EOF) { t.log.L(`E: `, err) @@ -880,6 +991,17 @@ func (t *M4SStream) saveStreamM4s() (e error) { if len(m4s_links) == 0 { time.Sleep(time.Second) continue + } else { + // 设置最后的切片 + if t.last_m4s == nil { + t.last_m4s = &m4s_link_item{} + } + for i := len(m4s_links) - 1; i > 0; i-- { + if !m4s_links[i].isInit() { + m4s_links[i].copyTo(t.last_m4s) + break + } + } } // 添加新切片到下载队列 @@ -942,7 +1064,7 @@ func (t *M4SStream) Start() bool { t.reqPool = t.common.ReqPool // 初始化切片消息 - t.Stream_msg = msgq.New(15) + t.Stream_msg = msgq.New(5) // 初始化快速启动缓冲 if v, ok := t.common.K_v.LoadV(`直播Web缓冲长度`).(float64); ok && v != 0 { @@ -1045,13 +1167,14 @@ func (t *M4SStream) pusherM4s(w http.ResponseWriter, r *http.Request) { } //写入快速启动缓冲 - if t.boot_buf != nil && len(t.boot_buf) > 0 { - if _, err := w.Write(t.getBootBuf()); err != nil { + for i := 0; i < len(t.boot_buf); i++ { + if _, err := w.Write(t.boot_buf[i]); err != nil { return - } else if flushSupport { - flusher.Flush() } } + if len(t.boot_buf) != 0 && flushSupport { + flusher.Flush() + } cancel := make(chan struct{}) @@ -1097,13 +1220,14 @@ func (t *M4SStream) pusherFlv(w http.ResponseWriter, r *http.Request) { } //写入快速启动缓冲 - if t.boot_buf != nil && len(t.boot_buf) > 0 { - if _, err := w.Write(t.getBootBuf()); err != nil { + for i := 0; i < len(t.boot_buf); i++ { + if _, err := w.Write(t.boot_buf[i]); err != nil { return - } else if flushSupport { - flusher.Flush() } } + if len(t.boot_buf) != 0 && flushSupport { + flusher.Flush() + } cancel := make(chan struct{}) @@ -1139,15 +1263,11 @@ func (t *M4SStream) bootBufPush(buf []byte) { defer t.boot_buf_locker.UnBlock() if len(t.boot_buf) == t.boot_buf_size { - t.boot_buf = t.boot_buf[1:] + for i := 0; i < len(t.boot_buf)-1; i++ { + t.boot_buf[i] = t.boot_buf[i+1] + } + t.boot_buf = t.boot_buf[:len(t.boot_buf)-1] } t.boot_buf = append(t.boot_buf, buf) } } - -func (t *M4SStream) getBootBuf() (buf []byte) { - for i := 0; i < len(t.boot_buf); i++ { - buf = append(buf, t.boot_buf[i]...) - } - return buf -} -- 2.39.2