From: qydysky <32743305+qydysky@users.noreply.github.com> Date: Thu, 2 Mar 2023 17:13:51 +0000 (+0800) Subject: Improve flv模式 减少内存分配及GC X-Git-Tag: v0.7.0~4 X-Git-Url: http://127.0.0.1:8081/?a=commitdiff_plain;h=8535890dc412e74d425593dec16133b1dd8097e7;p=bili_danmu%2F.git Improve flv模式 减少内存分配及GC --- diff --git a/Reply/flvDecode.go b/Reply/flvDecode.go index b445288..5fd20ce 100644 --- a/Reply/flvDecode.go +++ b/Reply/flvDecode.go @@ -8,6 +8,7 @@ import ( "github.com/dustin/go-humanize" F "github.com/qydysky/bili_danmu/F" + slice "github.com/qydysky/part/slice" ) const ( @@ -26,7 +27,7 @@ var ( // this fuction read []byte and return flv header and all complete keyframe if possible. // complete keyframe means the video and audio tags between two video key frames tag -func Search_stream_tag(buf []byte) (front_buf []byte, keyframe [][]byte, last_available_offset int, err error) { +func Search_stream_tag(buf []byte, keyframe *slice.Buf[byte]) (front_buf []byte, last_available_offset int, err error) { if len(buf) > humanize.MByte*100 { err = errors.New("buf too large") return @@ -44,6 +45,7 @@ func Search_stream_tag(buf []byte) (front_buf []byte, keyframe [][]byte, last_av var ( sign = 0x00 keyframe_num = -1 + confirm_num = -1 tag_num = 0 ) @@ -54,8 +56,8 @@ func Search_stream_tag(buf []byte) (front_buf []byte, keyframe [][]byte, last_av // } front_buf = []byte{} } - if len(keyframe) > 0 { - keyframe = keyframe[:len(keyframe)-1] + if bufl := keyframe.Size(); confirm_num != bufl { + keyframe.RemoveBack(bufl - confirm_num) } }() @@ -134,16 +136,16 @@ func Search_stream_tag(buf []byte) (front_buf []byte, keyframe [][]byte, last_av if buf[tag_offset] == video_tag { if buf[tag_offset+11]&0xf0 == 0x10 { //key frame keyframe_num += 1 - keyframe = append(keyframe, []byte{}) + confirm_num = keyframe.Size() last_available_offset = tag_offset } if keyframe_num >= 0 { - keyframe[keyframe_num] = append(keyframe[keyframe_num], buf[tag_offset:tag_offset+tag_size_check+previou_tag_size]...) + keyframe.Append(buf[tag_offset : tag_offset+tag_size_check+previou_tag_size]) } } else if buf[tag_offset] == audio_tag { if keyframe_num >= 0 { - keyframe[keyframe_num] = append(keyframe[keyframe_num], buf[tag_offset:tag_offset+tag_size_check+previou_tag_size]...) + keyframe.Append(buf[tag_offset : tag_offset+tag_size_check+previou_tag_size]) } } diff --git a/Reply/stream.go b/Reply/stream.go index 8d74550..70d0f85 100644 --- a/Reply/stream.go +++ b/Reply/stream.go @@ -41,8 +41,7 @@ type M4SStream struct { stream_type string //流类型 Stream_msg *msgq.Msgq //流数据消息 tag:data first_buf []byte //m4s起始块 or flv起始块 - boot_buf [][]byte //快速启动缓冲 - boot_buf_size int //快速启动缓冲长度 + boot_buf []byte //快速启动缓冲 boot_buf_locker funcCtrl.BlockFunc last_m4s *m4s_link_item //最后一个切片 m4s_pool *pool.Buf[m4s_link_item] //切片pool @@ -673,9 +672,10 @@ func (t *M4SStream) saveStreamFlv() (e error) { // read go func() { var ( - ticker = time.NewTicker(time.Second) - buff = slice.New[byte]() - buf = make([]byte, 1<<16) + ticker = time.NewTicker(time.Second) + buff = slice.New[byte]() + keyframe = slice.New[byte]() + buf = make([]byte, 1<<16) ) defer ticker.Stop() for { @@ -699,7 +699,8 @@ func (t *M4SStream) saveStreamFlv() (e error) { } if !buff.IsEmpty() { - front_buf, keyframe, last_available_offset, e := Search_stream_tag(buff.GetCopyBuf()) + keyframe.Reset() + front_buf, last_available_offset, e := Search_stream_tag(buff.GetPureBuf(), keyframe) if e != nil { if strings.Contains(e.Error(), `no found available tag`) { continue @@ -707,25 +708,24 @@ func (t *M4SStream) saveStreamFlv() (e error) { //丢弃所有数据 buff.Reset() } - if len(front_buf)+len(keyframe) != 0 { - if len(front_buf) != 0 { - if len(t.first_buf) != 0 { - t.log.L(`E: `, `flv重复接收到起始段,退出`) - r.Cancel() - s.Done() - break - } - t.first_buf = front_buf - // fmt.Println("write front_buf") - out.Write(front_buf) - t.Stream_msg.Push_tag(`data`, front_buf) - } - for _, frame := range keyframe { - // fmt.Println("write frame") - out.Write(frame) - t.bootBufPush(frame) - t.Stream_msg.Push_tag(`data`, frame) + if len(front_buf) != 0 { + if len(t.first_buf) != 0 { + t.log.L(`E: `, `flv重复接收到起始段,退出`) + r.Cancel() + s.Done() + break } + t.first_buf = make([]byte, len(front_buf)) + copy(t.first_buf, front_buf) + // fmt.Println("write front_buf") + out.Write(t.first_buf) + t.Stream_msg.Push_tag(`data`, t.first_buf) + } + if keyframe.Size() != 0 { + t.bootBufPush(keyframe.GetPureBuf()) + keyframe.Reset() + out.Write(t.boot_buf) + t.Stream_msg.Push_tag(`data`, t.boot_buf) } if last_available_offset > 1 { // fmt.Println("write Sync") @@ -1133,15 +1133,6 @@ func (t *M4SStream) Start() bool { // 初始化切片消息 t.Stream_msg = msgq.New() - // 初始化快速启动缓冲 - if v, ok := t.common.K_v.LoadV(`直播Web缓冲长度`).(float64); ok && v != 0 { - t.boot_buf_size = int(v) - t.boot_buf = make([][]byte, t.boot_buf_size) - defer func() { - t.boot_buf = nil - }() - } - // 主循环 for t.Status.Islive() { // 是否在直播 @@ -1235,13 +1226,13 @@ func (t *M4SStream) pusherM4s(w http.ResponseWriter, r *http.Request) { } //写入快速启动缓冲 - for i := 0; i < len(t.boot_buf); i++ { - if _, err := w.Write(t.boot_buf[i]); err != nil { + if len(t.boot_buf) != 0 { + if _, err := w.Write(t.boot_buf); err != nil { return } - } - if len(t.boot_buf) != 0 && flushSupport { - flusher.Flush() + if flushSupport { + flusher.Flush() + } } cancel := make(chan struct{}) @@ -1288,13 +1279,13 @@ func (t *M4SStream) pusherFlv(w http.ResponseWriter, r *http.Request) { } //写入快速启动缓冲 - for i := 0; i < len(t.boot_buf); i++ { - if _, err := w.Write(t.boot_buf[i]); err != nil { + if len(t.boot_buf) != 0 { + if _, err := w.Write(t.boot_buf); err != nil { return } - } - if len(t.boot_buf) != 0 && flushSupport { - flusher.Flush() + if flushSupport { + flusher.Flush() + } } cancel := make(chan struct{}) @@ -1326,16 +1317,12 @@ func (t *M4SStream) pusherFlv(w http.ResponseWriter, r *http.Request) { } func (t *M4SStream) bootBufPush(buf []byte) { - if t.boot_buf != nil { - t.boot_buf_locker.Block() - defer t.boot_buf_locker.UnBlock() - - if len(t.boot_buf) == t.boot_buf_size { - for i := 0; i < len(t.boot_buf)-1; i++ { - t.boot_buf[i] = t.boot_buf[i+1] - } - t.boot_buf = t.boot_buf[:len(t.boot_buf)-1] - } - t.boot_buf = append(t.boot_buf, buf) + t.boot_buf_locker.Block() + defer t.boot_buf_locker.UnBlock() + if len(t.boot_buf) < len(buf) { + t.boot_buf = append(t.boot_buf, make([]byte, len(buf)-len(t.boot_buf))...) + } else { + t.boot_buf = t.boot_buf[:len(buf)] } + copy(t.boot_buf, buf) } diff --git a/go.mod b/go.mod index 60c06d4..f97c1e1 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ require ( github.com/gofrs/uuid v4.3.0+incompatible github.com/gotk3/gotk3 v0.6.1 github.com/mdp/qrterminal/v3 v3.0.0 - github.com/qydysky/part v0.23.3 + github.com/qydysky/part v0.23.6 github.com/skip2/go-qrcode v0.0.0-20200617195104-da1b6568686e github.com/skratchdot/open-golang v0.0.0-20200116055534-eef842397966 golang.org/x/text v0.3.8 diff --git a/go.sum b/go.sum index 60bb57d..d6cf259 100644 --- a/go.sum +++ b/go.sum @@ -53,6 +53,12 @@ github.com/qydysky/part v0.23.2 h1:0lGZBEVanwQSQk925p89Cn+zRR+5Sr5c2lihnS25LeA= github.com/qydysky/part v0.23.2/go.mod h1:T6tQU8VYOVT+rX5v40Y7OeDWByz4fwDnyehpa+QIP2c= github.com/qydysky/part v0.23.3 h1:ydGN65ZwbBhWGe0uFVLdX1TNqDGEJerTH1fmBQlPgDc= github.com/qydysky/part v0.23.3/go.mod h1:T6tQU8VYOVT+rX5v40Y7OeDWByz4fwDnyehpa+QIP2c= +github.com/qydysky/part v0.23.4 h1:bb1ruDPBFAJZa3gBMi+4Zlvp/Y9FB4NeCbN+uJMTUbE= +github.com/qydysky/part v0.23.4/go.mod h1:T6tQU8VYOVT+rX5v40Y7OeDWByz4fwDnyehpa+QIP2c= +github.com/qydysky/part v0.23.5 h1:or85qcoTNyvUifxazqpXB5i3tcLkOtZefpyNOs9MYRI= +github.com/qydysky/part v0.23.5/go.mod h1:T6tQU8VYOVT+rX5v40Y7OeDWByz4fwDnyehpa+QIP2c= +github.com/qydysky/part v0.23.6 h1:hSq3LKb+4TYGHagJB0/Nu5mXSs8yUit75suFR5IJvZE= +github.com/qydysky/part v0.23.6/go.mod h1:T6tQU8VYOVT+rX5v40Y7OeDWByz4fwDnyehpa+QIP2c= github.com/shirou/gopsutil v3.21.11+incompatible h1:+1+c1VGhc88SSonWP6foOcLhvnKlUeu/erjjvaPEYiI= github.com/shirou/gopsutil v3.21.11+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA= github.com/skip2/go-qrcode v0.0.0-20200617195104-da1b6568686e h1:MRM5ITcdelLK2j1vwZ3Je0FKVCfqOLp5zO6trqMLYs0=