stream_last_modified time.Time //流地址更新时间
// stream_expires int64 //流到期时间
// stream_hosts psync.Map //使用的流服务器
- stream_type string //流类型
- Stream_msg *msgq.Msgq //流数据消息 tag:data
- first_buf []byte //m4s起始块 or flv起始块
- boot_buf []byte //快速启动缓冲
+ stream_type string //流类型
+ Stream_msg *msgq.MsgType[[]byte] //流数据消息 tag:data
+ first_buf []byte //m4s起始块 or flv起始块
+ boot_buf []byte //快速启动缓冲
boot_buf_locker funcCtrl.BlockFunc
last_m4s *m4s_link_item //最后一个切片
m4s_pool *pool.Buf[m4s_link_item] //切片pool
}
type m4s_link_item struct {
- Url string // m4s链接
- Base string // m4s文件名
- status int // 下载状态 0:未下载 1:正在下载 2:下载完成 3:下载失败
- tryDownCount int // 下载次数 当=3时,不再下载,忽略此块
- err error // 下载中出现的错误
- data []byte // 下载的数据
- createdTime time.Time // 创建时间
- pooledTime time.Time // 到pool时间
+ Url string // m4s链接
+ Base string // m4s文件名
+ status int // 下载状态 0:未下载 1:正在下载 2:下载完成 3:下载失败
+ tryDownCount int // 下载次数 当=3时,不再下载,忽略此块
+ err error // 下载中出现的错误
+ data *slice.Buf[byte] // 下载的数据
+ createdTime time.Time // 创建时间
+ pooledTime time.Time // 到pool时间
}
func (t *m4s_link_item) copyTo(to *m4s_link_item) {
t.Base = ""
t.status = 0
t.tryDownCount = 0
- t.data = t.data[:0]
+ t.data.Reset()
t.createdTime = time.Now()
return t
}
if t.m4s_pool == nil {
t.m4s_pool = pool.New(
func() *m4s_link_item {
- return &m4s_link_item{}
+ return &m4s_link_item{
+ data: slice.New[byte](),
+ }
},
func(t *m4s_link_item) bool {
return t.createdTime.After(t.pooledTime) || time.Now().Before(t.pooledTime.Add(time.Second*10))
var (
buf = slice.New[byte]()
fmp4Decoder = &Fmp4Decoder{}
+ keyframe = slice.New[byte]()
// flashingSer bool
)
// }
link.status = 3 // 设置切片状态为下载失败
} else {
- // if usedt := r.UsedTime.Seconds(); usedt > 700 {
- // t.log.L(`I: `, `hls切片下载慢`, usedt, `ms`)
- // }
- if len(link.data) > len(r.Respon) {
- link.data = link.data[:copy(link.data, r.Respon)]
- } else {
- // if cap(link.data) < len(r.Respon) {
- // fmt.Println(cap(link.data), len(r.Respon))
- // }
- link.data = append(link.data[:0], r.Respon...)
- }
+ link.data.Reset()
+ link.data.Append(r.Respon)
link.status = 2 // 设置切片状态为下载完成
}
}(v, t.Current_save_path)
}
// no, _ := download_seq[k].getNo()
- // fmt.Println("download_seq ", no, download_seq[k].status, t.first_buf == nil)
+ // fmt.Println("download_seq ", no, download_seq[k].status, download_seq[k].data.Size(), len(t.first_buf))
if strings.Contains(download_seq[k].Base, `h`) {
- if header, e := fmp4Decoder.Init_fmp4(download_seq[k].data); e != nil {
+ if front_buf, e := fmp4Decoder.Init_fmp4(download_seq[k].data.GetPureBuf()); e != nil {
t.log.L(`E: `, e, `重试!`)
download_seq[k].status = 3
break
// fmt.Println(`T: `, "找到trak:", string(trak.handlerType), trak.trackID, trak.timescale)
t.log.L(`T: `, "找到trak:", string(trak.handlerType), trak.trackID, trak.timescale)
}
- t.first_buf = header
+ t.first_buf = make([]byte, len(front_buf))
+ copy(t.first_buf, front_buf)
if out != nil {
out.Write(t.first_buf, true)
out.Sync()
continue
}
- buf.Append(download_seq[k].data)
+ buf.Append(download_seq[k].data.GetPureBuf())
t.putM4s(download_seq[k])
download_seq = append(download_seq[:k], download_seq[k+1:]...)
k -= 1
-
- last_available_offset, keyframes, err := fmp4Decoder.Search_stream_fmp4(buf.GetPureBuf())
+ last_available_offset, err := fmp4Decoder.Search_stream_fmp4(buf.GetPureBuf(), keyframe)
if err != nil {
if !errors.Is(err, io.EOF) {
t.log.L(`E: `, err)
//丢弃所有数据
buf.Reset()
} else {
+ keyframe.Reset()
last_available_offset = 0
}
}
- // no, _ := v.getNo()
- // fmt.Println(no, "fmp4KeyFrames", fmp4KeyFrames.size(), last_available_offset, err)
+ // no, _ := download_seq[k].getNo()
+ // fmt.Println(no, "fmp4KeyFrames", keyframe.Size(), last_available_offset, err)
- for _, keyframe := range keyframes {
- t.bootBufPush(keyframe)
- t.Stream_msg.Push_tag(`data`, keyframe)
+ // 传递关键帧
+ if !keyframe.IsEmpty() {
+ t.bootBufPush(keyframe.GetPureBuf())
+ keyframe.Reset()
+ t.Stream_msg.Push_tag(`data`, t.boot_buf)
if out != nil {
- out.Write(keyframe, true)
+ out.Write(t.boot_buf, true)
out.Sync()
}
}
t.reqPool = t.common.ReqPool
// 初始化切片消息
- t.Stream_msg = msgq.New()
+ t.Stream_msg = msgq.NewType[[]byte]()
// 主循环
for t.Status.Islive() {
cancel := make(chan struct{})
//hls切片
- t.Stream_msg.Pull_tag(map[string]func(interface{}) bool{
- `data`: func(data interface{}) bool {
- if b, ok := data.([]byte); ok {
- if len(b) == 0 {
- close(cancel)
- return true
- }
- if _, err := w.Write(b); err != nil {
- close(cancel)
- return true
- } else if flushSupport {
- flusher.Flush()
- }
+ t.Stream_msg.Pull_tag(map[string]func([]byte) bool{
+ `data`: func(b []byte) bool {
+ if len(b) == 0 {
+ close(cancel)
+ return true
+ }
+ if _, err := w.Write(b); err != nil {
+ close(cancel)
+ return true
+ } else if flushSupport {
+ flusher.Flush()
}
return false
},
- `close`: func(_ interface{}) bool {
+ `close`: func(_ []byte) bool {
close(cancel)
return true
},
cancel := make(chan struct{})
//flv
- t.Stream_msg.Pull_tag(map[string]func(interface{}) bool{
- `data`: func(data interface{}) bool {
- if b, ok := data.([]byte); ok {
- if len(b) == 0 {
- close(cancel)
- return true
- }
- if _, err := w.Write(b); err != nil {
- close(cancel)
- return true
- } else if flushSupport {
- flusher.Flush()
- }
+ t.Stream_msg.Pull_tag(map[string]func([]byte) bool{
+ `data`: func(b []byte) bool {
+ if len(b) == 0 {
+ close(cancel)
+ return true
+ }
+ if _, err := w.Write(b); err != nil {
+ close(cancel)
+ return true
+ } else if flushSupport {
+ flusher.Flush()
}
return false
},
- `close`: func(_ interface{}) bool {
+ `close`: func(_ []byte) bool {
close(cancel)
return true
},
defer t.boot_buf_locker.UnBlock()
if len(t.boot_buf) < len(buf) {
t.boot_buf = append(t.boot_buf, make([]byte, len(buf)-len(t.boot_buf))...)
- } else {
- t.boot_buf = t.boot_buf[:len(buf)]
}
- copy(t.boot_buf, buf)
+ t.boot_buf = t.boot_buf[:copy(t.boot_buf, buf)]
}