"path/filepath"
"strconv"
"strings"
+ "sync"
"time"
c "github.com/qydysky/bili_danmu/CV"
msgq "github.com/qydysky/part/msgq"
reqf "github.com/qydysky/part/reqf"
signal "github.com/qydysky/part/signal"
- sync "github.com/qydysky/part/sync"
+ psync "github.com/qydysky/part/sync"
)
type M4SStream struct {
config M4SStream_Config //配置
stream_last_modified time.Time //流地址更新时间
// stream_expires int64 //流到期时间
- stream_hosts sync.Map //使用的流服务器
+ stream_hosts psync.Map //使用的流服务器
stream_type string //流类型
Stream_msg *msgq.Msgq //流数据消息 tag:data
first_buf []byte //m4s起始块 or flv起始块
boot_buf_size int //快速启动缓冲长度
boot_buf_locker funcCtrl.BlockFunc
last_m4s *m4s_link_item //最后一个切片
+ m4s_pool []*m4s_link_item //切片pool
+ m4s_pool_l sync.Mutex
common c.Common //通用配置副本
Current_save_path string //明确的直播流保存目录
Callback_start func(*M4SStream) //实例开始的回调
status int // 下载状态 0:未下载 1:正在下载 2:下载完成 3:下载失败
tryDownCount int // 下载次数 当=4时,不再下载,忽略此块
data []byte // 下载的数据
- createdTime time.Time //创建时间
+ createdTime time.Time // 创建时间
+ pooledTime time.Time // 到pool时间
+}
+
+func (t *m4s_link_item) copyTo(to *m4s_link_item) {
+ // fmt.Println("copy to ", t.Base)
+ to.Url = t.Url
+ to.Base = t.Base
+ to.status = t.status
+ to.tryDownCount = t.tryDownCount
+ to.createdTime = t.createdTime
+}
+
+func (t *m4s_link_item) reset() *m4s_link_item {
+ if t == nil {
+ return t
+ }
+ t.Url = ""
+ t.Base = ""
+ t.status = 0
+ t.tryDownCount = 0
+ t.data = t.data[:0]
+ t.createdTime = time.Now()
+ return t
}
func (t *m4s_link_item) isInit() bool {
return strconv.Atoi(base[:len(base)-4])
}
+func (t *M4SStream) getM4s() (p *m4s_link_item) {
+ t.m4s_pool_l.Lock()
+ defer t.m4s_pool_l.Unlock()
+
+ for i := 0; i < len(t.m4s_pool); i++ {
+ if t.m4s_pool[i].pooledTime.After(t.m4s_pool[i].createdTime) && time.Now().After(t.m4s_pool[i].pooledTime.Add(time.Second*10)) {
+ // fmt.Println("=>", i)
+ return t.m4s_pool[i].reset()
+ }
+ }
+ // fmt.Println("=>")
+ return &m4s_link_item{}
+}
+
+func (t *M4SStream) putM4s(ms ...*m4s_link_item) {
+ t.m4s_pool_l.Lock()
+ defer t.m4s_pool_l.Unlock()
+
+ for i := 0; i < len(ms); i++ {
+ if cap(ms[i].data) == 0 {
+ ms[i] = nil
+ ms = append(ms[:i], ms[i+1:]...)
+ i--
+ } else if len(ms[i].data) != 0 {
+ ms[i].pooledTime = time.Now()
+ if len(t.m4s_pool) < 50 {
+ t.m4s_pool = append(t.m4s_pool, ms[i])
+ }
+ }
+ // else {
+ // fmt.Println("z", cap(ms[i].data))
+ // }
+ }
+
+ for i := 0; i < len(t.m4s_pool); i++ {
+ if t.m4s_pool[i].pooledTime.After(t.m4s_pool[i].createdTime) && time.Now().After(t.m4s_pool[i].pooledTime.Add(time.Second*30)) {
+ t.m4s_pool[i].data = nil
+ }
+ }
+
+ // for i := 0; i < len(ms); i++ {
+ // if ms[i].pooledTime.IsZero() {
+ // fmt.Println("z", cap(ms[i].data))
+ // }
+ // ms[i].pooledTime = time.Now()
+ // if len(t.m4s_pool) < 50 {
+ // t.m4s_pool = append(t.m4s_pool, ms[i])
+ // }
+ // }
+ // fmt.Println("size", len(t.m4s_pool))
+}
+
func (t *M4SStream) Common() c.Common {
return t.common
}
if r.Response == nil {
t.log.L(`W: `, `live响应错误`)
t.common.Live = t.common.Live[1:]
- } else if r.Response.StatusCode != 200 {
+ } else if r.Response.StatusCode&200 != 200 {
t.log.L(`W: `, `live响应错误`, r.Response.Status, string(r.Respon))
t.common.Live = t.common.Live[1:]
}
// 设置请求参数
rval := reqf.Rval{
- Url: m3u8_url.String(),
- Retry: 2,
- ConnectTimeout: 2000,
- ReadTimeout: 1000,
- Timeout: 2000,
- Proxy: c.C.Proxy,
+ Url: m3u8_url.String(),
+ Retry: 2,
+ Timeout: 2000,
+ Proxy: c.C.Proxy,
Header: map[string]string{
`Host`: m3u8_url.Host,
`User-Agent`: `Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:103.0) Gecko/20100101 Firefox/103.0`,
// 解析m3u8
var tmp []*m4s_link_item
+ var lastNo int
+ if t.last_m4s != nil {
+ lastNo, _ = t.last_m4s.getNo()
+ }
for _, line := range bytes.Split(m3u8_respon, []byte("\n")) {
if len(line) == 0 {
continue
return
}
+ {
+ tmpBase := m4s_link
+ // fmt.Println(tmpBase, t.last_m4s != nil)
+ if tmpBase[0] == 'h' {
+ if t.last_m4s != nil {
+ continue
+ } else {
+ tmpBase = tmpBase[1:]
+ }
+ }
+ if no, _ := strconv.Atoi(tmpBase[:len(tmpBase)-4]); lastNo >= no {
+ // fmt.Println("skip", no)
+ continue
+ }
+ }
+
+ // fmt.Println("->", m4s_link)
//将切片添加到返回切片数组
- tmp = append(tmp, &m4s_link_item{
- Url: m3u8_url.ResolveReference(u).String(),
- Base: m4s_link,
- createdTime: time.Now(),
- })
+ p := t.getM4s()
+ p.Url = m3u8_url.ResolveReference(u).String()
+ p.Base = m4s_link
+ p.createdTime = time.Now()
+ tmp = append(tmp, p)
+ }
+
+ if len(tmp) == 0 {
+ // fmt.Println("->", "empty", lastNo)
+ return
}
// 检查是否服务器发生故障,产出多个切片
m4s_links = append(m4s_links, tmp...)
- // 设置最后的切片
- defer func(last_m4s *m4s_link_item) {
- t.last_m4s = last_m4s
- }(m4s_links[len(m4s_links)-1])
-
// 首次下载
if t.last_m4s == nil {
return
// 只返回新增加的
{
last_no, _ := t.last_m4s.getNo()
- for k, m4s_link := range m4s_links {
+ for k := 0; k < len(m4s_links); k++ {
+ m4s_link := m4s_links[k]
// 剔除初始段
if m4s_link.isInit() {
m4s_links = append(m4s_links[:k], m4s_links[k+1:]...)
+ k--
}
no, _ := m4s_link.getNo()
if no == last_no {
// 只返回新增加的切片
+ t.putM4s(m4s_links[:k+1]...)
m4s_links = m4s_links[k+1:]
// 只返回新增加的m3u8字节
if index := bytes.Index(m3u8_addon, []byte(m4s_link.Base)); index != -1 {
}
//将切片添加到返回切片数组前
- m4s_links = append([]*m4s_link_item{
- {
- Url: m3u8_url.ResolveReference(u).String(),
- Base: strconv.Itoa(guess_no) + `.m4s`,
- },
- }, m4s_links...)
+ p := t.getM4s()
+ p.Url = m3u8_url.ResolveReference(u).String()
+ p.Base = strconv.Itoa(guess_no) + `.m4s`
+ p.createdTime = time.Now()
+ m4s_links = append([]*m4s_link_item{p}, m4s_links...)
}
// 请求解析成功,退出获取循环
t.Current_save_path = t.config.save_path + "/" + time.Now().Format("2006_01_02_15_04_05_000") + "_" + strconv.Itoa(t.common.Roomid) + `/`
// 清除初始值
- t.last_m4s = nil
+ t.last_m4s.reset()
// 显示保存位置
if rel, err := filepath.Rel(t.config.save_path, t.Current_save_path); err == nil {
// 存在待下载切片
if len(download_seq) != 0 {
+
// 设置限制计划
download_limit.Plan(int64(len(download_seq)))
r := req.Item.(*reqf.Req)
reqConfig := reqf.Rval{
- Url: link.Url,
- ConnectTimeout: 2000,
- ReadTimeout: 1000,
- Timeout: 2000,
- Proxy: t.common.Proxy,
+ Url: link.Url,
+ Timeout: 2000,
+ Proxy: t.common.Proxy,
Header: map[string]string{
`Connection`: `close`,
},
// if usedt := r.UsedTime.Seconds(); usedt > 700 {
// t.log.L(`I: `, `hls切片下载慢`, usedt, `ms`)
// }
- link.data = make([]byte, len(r.Respon))
- copy(link.data, r.Respon)
+ if len(link.data) > len(r.Respon) {
+ link.data = link.data[:copy(link.data, r.Respon)]
+ } else {
+ // if cap(link.data) < len(r.Respon) {
+ // fmt.Println(cap(link.data), len(r.Respon))
+ // }
+ link.data = append(link.data[:0], r.Respon...)
+ }
link.status = 2 // 设置切片状态为下载完成
}
}(v, t.Current_save_path)
// 等待队列下载完成
download_limit.PlanDone(func() {
- time.Sleep(time.Millisecond * 10)
+ time.Sleep(time.Millisecond * 20)
})
}
// 传递已下载切片
for k := 0; k < len(download_seq); k++ {
- v := download_seq[k]
+ // v := download_seq[k]
- if v.status != 2 {
- if v.tryDownCount >= 4 {
+ if download_seq[k].status != 2 {
+ if download_seq[k].tryDownCount >= 4 {
//下载了4次,任未下载成功,忽略此块
+ t.putM4s(download_seq[k])
download_seq = append(download_seq[:k], download_seq[k+1:]...)
k -= 1
continue
}
}
- if strings.Contains(v.Base, `h`) {
- if header, e := fmp4Decoder.Init_fmp4(v.data); e != nil {
+ // no, _ := download_seq[k].getNo()
+ // fmt.Println("download_seq ", no, download_seq[k].status, t.first_buf == nil)
+
+ if strings.Contains(download_seq[k].Base, `h`) {
+ if header, e := fmp4Decoder.Init_fmp4(download_seq[k].data); e != nil {
t.log.L(`E: `, e, `重试!`)
- v.status = 3
+ download_seq[k].status = 3
break
} else {
for _, trak := range fmp4Decoder.traks {
out.Sync()
}
}
+ t.putM4s(download_seq[k])
download_seq = append(download_seq[:k], download_seq[k+1:]...)
k -= 1
continue
} else if t.first_buf == nil {
+ t.putM4s(download_seq[k])
download_seq = append(download_seq[:k], download_seq[k+1:]...)
k -= 1
continue
}
- buf.append(v.data)
+ buf.append(download_seq[k].data)
+ t.putM4s(download_seq[k])
download_seq = append(download_seq[:k], download_seq[k+1:]...)
k -= 1
- last_avilable_offset, err := fmp4Decoder.Seach_stream_fmp4(buf.getCopyBuf(), &fmp4KeyFrames)
+ last_avilable_offset, err := fmp4Decoder.Seach_stream_fmp4(buf.getPureBuf(), &fmp4KeyFrames)
if err != nil {
if !errors.Is(err, io.EOF) {
t.log.L(`E: `, err)
if len(m4s_links) == 0 {
time.Sleep(time.Second)
continue
+ } else {
+ // 设置最后的切片
+ if t.last_m4s == nil {
+ t.last_m4s = &m4s_link_item{}
+ }
+ for i := len(m4s_links) - 1; i > 0; i-- {
+ if !m4s_links[i].isInit() {
+ m4s_links[i].copyTo(t.last_m4s)
+ break
+ }
+ }
}
// 添加新切片到下载队列
t.reqPool = t.common.ReqPool
// 初始化切片消息
- t.Stream_msg = msgq.New(15)
+ t.Stream_msg = msgq.New(5)
// 初始化快速启动缓冲
if v, ok := t.common.K_v.LoadV(`直播Web缓冲长度`).(float64); ok && v != 0 {
}
//写入快速启动缓冲
- if t.boot_buf != nil && len(t.boot_buf) > 0 {
- if _, err := w.Write(t.getBootBuf()); err != nil {
+ for i := 0; i < len(t.boot_buf); i++ {
+ if _, err := w.Write(t.boot_buf[i]); err != nil {
return
- } else if flushSupport {
- flusher.Flush()
}
}
+ if len(t.boot_buf) != 0 && flushSupport {
+ flusher.Flush()
+ }
cancel := make(chan struct{})
}
//写入快速启动缓冲
- if t.boot_buf != nil && len(t.boot_buf) > 0 {
- if _, err := w.Write(t.getBootBuf()); err != nil {
+ for i := 0; i < len(t.boot_buf); i++ {
+ if _, err := w.Write(t.boot_buf[i]); err != nil {
return
- } else if flushSupport {
- flusher.Flush()
}
}
+ if len(t.boot_buf) != 0 && flushSupport {
+ flusher.Flush()
+ }
cancel := make(chan struct{})
defer t.boot_buf_locker.UnBlock()
if len(t.boot_buf) == t.boot_buf_size {
- t.boot_buf = t.boot_buf[1:]
+ for i := 0; i < len(t.boot_buf)-1; i++ {
+ t.boot_buf[i] = t.boot_buf[i+1]
+ }
+ t.boot_buf = t.boot_buf[:len(t.boot_buf)-1]
}
t.boot_buf = append(t.boot_buf, buf)
}
}
-
-func (t *M4SStream) getBootBuf() (buf []byte) {
- for i := 0; i < len(t.boot_buf); i++ {
- buf = append(buf, t.boot_buf[i]...)
- }
- return buf
-}