"github.com/dustin/go-humanize"
F "github.com/qydysky/bili_danmu/F"
+ slice "github.com/qydysky/part/slice"
)
const (
// this fuction read []byte and return flv header and all complete keyframe if possible.
// complete keyframe means the video and audio tags between two video key frames tag
-func Search_stream_tag(buf []byte) (front_buf []byte, keyframe [][]byte, last_available_offset int, err error) {
+func Search_stream_tag(buf []byte, keyframe *slice.Buf[byte]) (front_buf []byte, last_available_offset int, err error) {
if len(buf) > humanize.MByte*100 {
err = errors.New("buf too large")
return
var (
sign = 0x00
keyframe_num = -1
+ confirm_num = -1
tag_num = 0
)
// }
front_buf = []byte{}
}
- if len(keyframe) > 0 {
- keyframe = keyframe[:len(keyframe)-1]
+ if bufl := keyframe.Size(); confirm_num != bufl {
+ keyframe.RemoveBack(bufl - confirm_num)
}
}()
if buf[tag_offset] == video_tag {
if buf[tag_offset+11]&0xf0 == 0x10 { //key frame
keyframe_num += 1
- keyframe = append(keyframe, []byte{})
+ confirm_num = keyframe.Size()
last_available_offset = tag_offset
}
if keyframe_num >= 0 {
- keyframe[keyframe_num] = append(keyframe[keyframe_num], buf[tag_offset:tag_offset+tag_size_check+previou_tag_size]...)
+ keyframe.Append(buf[tag_offset : tag_offset+tag_size_check+previou_tag_size])
}
} else if buf[tag_offset] == audio_tag {
if keyframe_num >= 0 {
- keyframe[keyframe_num] = append(keyframe[keyframe_num], buf[tag_offset:tag_offset+tag_size_check+previou_tag_size]...)
+ keyframe.Append(buf[tag_offset : tag_offset+tag_size_check+previou_tag_size])
}
}
stream_type string //流类型
Stream_msg *msgq.Msgq //流数据消息 tag:data
first_buf []byte //m4s起始块 or flv起始块
- boot_buf [][]byte //快速启动缓冲
- boot_buf_size int //快速启动缓冲长度
+ boot_buf []byte //快速启动缓冲
boot_buf_locker funcCtrl.BlockFunc
last_m4s *m4s_link_item //最后一个切片
m4s_pool *pool.Buf[m4s_link_item] //切片pool
// read
go func() {
var (
- ticker = time.NewTicker(time.Second)
- buff = slice.New[byte]()
- buf = make([]byte, 1<<16)
+ ticker = time.NewTicker(time.Second)
+ buff = slice.New[byte]()
+ keyframe = slice.New[byte]()
+ buf = make([]byte, 1<<16)
)
defer ticker.Stop()
for {
}
if !buff.IsEmpty() {
- front_buf, keyframe, last_available_offset, e := Search_stream_tag(buff.GetCopyBuf())
+ keyframe.Reset()
+ front_buf, last_available_offset, e := Search_stream_tag(buff.GetPureBuf(), keyframe)
if e != nil {
if strings.Contains(e.Error(), `no found available tag`) {
continue
//丢弃所有数据
buff.Reset()
}
- if len(front_buf)+len(keyframe) != 0 {
- if len(front_buf) != 0 {
- if len(t.first_buf) != 0 {
- t.log.L(`E: `, `flv重复接收到起始段,退出`)
- r.Cancel()
- s.Done()
- break
- }
- t.first_buf = front_buf
- // fmt.Println("write front_buf")
- out.Write(front_buf)
- t.Stream_msg.Push_tag(`data`, front_buf)
- }
- for _, frame := range keyframe {
- // fmt.Println("write frame")
- out.Write(frame)
- t.bootBufPush(frame)
- t.Stream_msg.Push_tag(`data`, frame)
+ if len(front_buf) != 0 {
+ if len(t.first_buf) != 0 {
+ t.log.L(`E: `, `flv重复接收到起始段,退出`)
+ r.Cancel()
+ s.Done()
+ break
}
+ t.first_buf = make([]byte, len(front_buf))
+ copy(t.first_buf, front_buf)
+ // fmt.Println("write front_buf")
+ out.Write(t.first_buf)
+ t.Stream_msg.Push_tag(`data`, t.first_buf)
+ }
+ if keyframe.Size() != 0 {
+ t.bootBufPush(keyframe.GetPureBuf())
+ keyframe.Reset()
+ out.Write(t.boot_buf)
+ t.Stream_msg.Push_tag(`data`, t.boot_buf)
}
if last_available_offset > 1 {
// fmt.Println("write Sync")
// 初始化切片消息
t.Stream_msg = msgq.New()
- // 初始化快速启动缓冲
- if v, ok := t.common.K_v.LoadV(`直播Web缓冲长度`).(float64); ok && v != 0 {
- t.boot_buf_size = int(v)
- t.boot_buf = make([][]byte, t.boot_buf_size)
- defer func() {
- t.boot_buf = nil
- }()
- }
-
// 主循环
for t.Status.Islive() {
// 是否在直播
}
//写入快速启动缓冲
- for i := 0; i < len(t.boot_buf); i++ {
- if _, err := w.Write(t.boot_buf[i]); err != nil {
+ if len(t.boot_buf) != 0 {
+ if _, err := w.Write(t.boot_buf); err != nil {
return
}
- }
- if len(t.boot_buf) != 0 && flushSupport {
- flusher.Flush()
+ if flushSupport {
+ flusher.Flush()
+ }
}
cancel := make(chan struct{})
}
//写入快速启动缓冲
- for i := 0; i < len(t.boot_buf); i++ {
- if _, err := w.Write(t.boot_buf[i]); err != nil {
+ if len(t.boot_buf) != 0 {
+ if _, err := w.Write(t.boot_buf); err != nil {
return
}
- }
- if len(t.boot_buf) != 0 && flushSupport {
- flusher.Flush()
+ if flushSupport {
+ flusher.Flush()
+ }
}
cancel := make(chan struct{})
}
func (t *M4SStream) bootBufPush(buf []byte) {
- if t.boot_buf != nil {
- t.boot_buf_locker.Block()
- defer t.boot_buf_locker.UnBlock()
-
- if len(t.boot_buf) == t.boot_buf_size {
- for i := 0; i < len(t.boot_buf)-1; i++ {
- t.boot_buf[i] = t.boot_buf[i+1]
- }
- t.boot_buf = t.boot_buf[:len(t.boot_buf)-1]
- }
- t.boot_buf = append(t.boot_buf, buf)
+ t.boot_buf_locker.Block()
+ defer t.boot_buf_locker.UnBlock()
+ if len(t.boot_buf) < len(buf) {
+ t.boot_buf = append(t.boot_buf, make([]byte, len(buf)-len(t.boot_buf))...)
+ } else {
+ t.boot_buf = t.boot_buf[:len(buf)]
}
+ copy(t.boot_buf, buf)
}
github.com/qydysky/part v0.23.2/go.mod h1:T6tQU8VYOVT+rX5v40Y7OeDWByz4fwDnyehpa+QIP2c=
github.com/qydysky/part v0.23.3 h1:ydGN65ZwbBhWGe0uFVLdX1TNqDGEJerTH1fmBQlPgDc=
github.com/qydysky/part v0.23.3/go.mod h1:T6tQU8VYOVT+rX5v40Y7OeDWByz4fwDnyehpa+QIP2c=
+github.com/qydysky/part v0.23.4 h1:bb1ruDPBFAJZa3gBMi+4Zlvp/Y9FB4NeCbN+uJMTUbE=
+github.com/qydysky/part v0.23.4/go.mod h1:T6tQU8VYOVT+rX5v40Y7OeDWByz4fwDnyehpa+QIP2c=
+github.com/qydysky/part v0.23.5 h1:or85qcoTNyvUifxazqpXB5i3tcLkOtZefpyNOs9MYRI=
+github.com/qydysky/part v0.23.5/go.mod h1:T6tQU8VYOVT+rX5v40Y7OeDWByz4fwDnyehpa+QIP2c=
+github.com/qydysky/part v0.23.6 h1:hSq3LKb+4TYGHagJB0/Nu5mXSs8yUit75suFR5IJvZE=
+github.com/qydysky/part v0.23.6/go.mod h1:T6tQU8VYOVT+rX5v40Y7OeDWByz4fwDnyehpa+QIP2c=
github.com/shirou/gopsutil v3.21.11+incompatible h1:+1+c1VGhc88SSonWP6foOcLhvnKlUeu/erjjvaPEYiI=
github.com/shirou/gopsutil v3.21.11+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA=
github.com/skip2/go-qrcode v0.0.0-20200617195104-da1b6568686e h1:MRM5ITcdelLK2j1vwZ3Je0FKVCfqOLp5zO6trqMLYs0=