]> 127.0.0.1 Git - bili_danmu/.git/commitdiff
Improve flv模式 减少内存分配及GC
authorqydysky <32743305+qydysky@users.noreply.github.com>
Thu, 2 Mar 2023 17:13:51 +0000 (01:13 +0800)
committerqydysky <32743305+qydysky@users.noreply.github.com>
Thu, 2 Mar 2023 17:13:51 +0000 (01:13 +0800)
Reply/flvDecode.go
Reply/stream.go
go.mod
go.sum

index b445288ca871837efdcbc1430202730baf32dd5b..5fd20ceec795d1e42267f0051df367a3bba4d78a 100644 (file)
@@ -8,6 +8,7 @@ import (
 
        "github.com/dustin/go-humanize"
        F "github.com/qydysky/bili_danmu/F"
+       slice "github.com/qydysky/part/slice"
 )
 
 const (
@@ -26,7 +27,7 @@ var (
 
 // this fuction read []byte and return flv header and all complete keyframe if possible.
 // complete keyframe means the video and audio tags between two video key frames tag
-func Search_stream_tag(buf []byte) (front_buf []byte, keyframe [][]byte, last_available_offset int, err error) {
+func Search_stream_tag(buf []byte, keyframe *slice.Buf[byte]) (front_buf []byte, last_available_offset int, err error) {
        if len(buf) > humanize.MByte*100 {
                err = errors.New("buf too large")
                return
@@ -44,6 +45,7 @@ func Search_stream_tag(buf []byte) (front_buf []byte, keyframe [][]byte, last_av
        var (
                sign         = 0x00
                keyframe_num = -1
+               confirm_num  = -1
                tag_num      = 0
        )
 
@@ -54,8 +56,8 @@ func Search_stream_tag(buf []byte) (front_buf []byte, keyframe [][]byte, last_av
                        // }
                        front_buf = []byte{}
                }
-               if len(keyframe) > 0 {
-                       keyframe = keyframe[:len(keyframe)-1]
+               if bufl := keyframe.Size(); confirm_num != bufl {
+                       keyframe.RemoveBack(bufl - confirm_num)
                }
        }()
 
@@ -134,16 +136,16 @@ func Search_stream_tag(buf []byte) (front_buf []byte, keyframe [][]byte, last_av
                if buf[tag_offset] == video_tag {
                        if buf[tag_offset+11]&0xf0 == 0x10 { //key frame
                                keyframe_num += 1
-                               keyframe = append(keyframe, []byte{})
+                               confirm_num = keyframe.Size()
                                last_available_offset = tag_offset
                        }
 
                        if keyframe_num >= 0 {
-                               keyframe[keyframe_num] = append(keyframe[keyframe_num], buf[tag_offset:tag_offset+tag_size_check+previou_tag_size]...)
+                               keyframe.Append(buf[tag_offset : tag_offset+tag_size_check+previou_tag_size])
                        }
                } else if buf[tag_offset] == audio_tag {
                        if keyframe_num >= 0 {
-                               keyframe[keyframe_num] = append(keyframe[keyframe_num], buf[tag_offset:tag_offset+tag_size_check+previou_tag_size]...)
+                               keyframe.Append(buf[tag_offset : tag_offset+tag_size_check+previou_tag_size])
                        }
                }
 
index 8d745508cecfe2bfb86de04747ba870e5278e748..70d0f8579b0debc091fdfdcd57e36f036c13b032 100644 (file)
@@ -41,8 +41,7 @@ type M4SStream struct {
        stream_type       string     //流类型
        Stream_msg        *msgq.Msgq //流数据消息 tag:data
        first_buf         []byte     //m4s起始块 or flv起始块
-       boot_buf          [][]byte   //快速启动缓冲
-       boot_buf_size     int        //快速启动缓冲长度
+       boot_buf          []byte     //快速启动缓冲
        boot_buf_locker   funcCtrl.BlockFunc
        last_m4s          *m4s_link_item           //最后一个切片
        m4s_pool          *pool.Buf[m4s_link_item] //切片pool
@@ -673,9 +672,10 @@ func (t *M4SStream) saveStreamFlv() (e error) {
                        // read
                        go func() {
                                var (
-                                       ticker = time.NewTicker(time.Second)
-                                       buff   = slice.New[byte]()
-                                       buf    = make([]byte, 1<<16)
+                                       ticker   = time.NewTicker(time.Second)
+                                       buff     = slice.New[byte]()
+                                       keyframe = slice.New[byte]()
+                                       buf      = make([]byte, 1<<16)
                                )
                                defer ticker.Stop()
                                for {
@@ -699,7 +699,8 @@ func (t *M4SStream) saveStreamFlv() (e error) {
                                        }
 
                                        if !buff.IsEmpty() {
-                                               front_buf, keyframe, last_available_offset, e := Search_stream_tag(buff.GetCopyBuf())
+                                               keyframe.Reset()
+                                               front_buf, last_available_offset, e := Search_stream_tag(buff.GetPureBuf(), keyframe)
                                                if e != nil {
                                                        if strings.Contains(e.Error(), `no found available tag`) {
                                                                continue
@@ -707,25 +708,24 @@ func (t *M4SStream) saveStreamFlv() (e error) {
                                                        //丢弃所有数据
                                                        buff.Reset()
                                                }
-                                               if len(front_buf)+len(keyframe) != 0 {
-                                                       if len(front_buf) != 0 {
-                                                               if len(t.first_buf) != 0 {
-                                                                       t.log.L(`E: `, `flv重复接收到起始段,退出`)
-                                                                       r.Cancel()
-                                                                       s.Done()
-                                                                       break
-                                                               }
-                                                               t.first_buf = front_buf
-                                                               // fmt.Println("write front_buf")
-                                                               out.Write(front_buf)
-                                                               t.Stream_msg.Push_tag(`data`, front_buf)
-                                                       }
-                                                       for _, frame := range keyframe {
-                                                               // fmt.Println("write frame")
-                                                               out.Write(frame)
-                                                               t.bootBufPush(frame)
-                                                               t.Stream_msg.Push_tag(`data`, frame)
+                                               if len(front_buf) != 0 {
+                                                       if len(t.first_buf) != 0 {
+                                                               t.log.L(`E: `, `flv重复接收到起始段,退出`)
+                                                               r.Cancel()
+                                                               s.Done()
+                                                               break
                                                        }
+                                                       t.first_buf = make([]byte, len(front_buf))
+                                                       copy(t.first_buf, front_buf)
+                                                       // fmt.Println("write front_buf")
+                                                       out.Write(t.first_buf)
+                                                       t.Stream_msg.Push_tag(`data`, t.first_buf)
+                                               }
+                                               if keyframe.Size() != 0 {
+                                                       t.bootBufPush(keyframe.GetPureBuf())
+                                                       keyframe.Reset()
+                                                       out.Write(t.boot_buf)
+                                                       t.Stream_msg.Push_tag(`data`, t.boot_buf)
                                                }
                                                if last_available_offset > 1 {
                                                        // fmt.Println("write Sync")
@@ -1133,15 +1133,6 @@ func (t *M4SStream) Start() bool {
                // 初始化切片消息
                t.Stream_msg = msgq.New()
 
-               // 初始化快速启动缓冲
-               if v, ok := t.common.K_v.LoadV(`直播Web缓冲长度`).(float64); ok && v != 0 {
-                       t.boot_buf_size = int(v)
-                       t.boot_buf = make([][]byte, t.boot_buf_size)
-                       defer func() {
-                               t.boot_buf = nil
-                       }()
-               }
-
                // 主循环
                for t.Status.Islive() {
                        // 是否在直播
@@ -1235,13 +1226,13 @@ func (t *M4SStream) pusherM4s(w http.ResponseWriter, r *http.Request) {
        }
 
        //写入快速启动缓冲
-       for i := 0; i < len(t.boot_buf); i++ {
-               if _, err := w.Write(t.boot_buf[i]); err != nil {
+       if len(t.boot_buf) != 0 {
+               if _, err := w.Write(t.boot_buf); err != nil {
                        return
                }
-       }
-       if len(t.boot_buf) != 0 && flushSupport {
-               flusher.Flush()
+               if flushSupport {
+                       flusher.Flush()
+               }
        }
 
        cancel := make(chan struct{})
@@ -1288,13 +1279,13 @@ func (t *M4SStream) pusherFlv(w http.ResponseWriter, r *http.Request) {
        }
 
        //写入快速启动缓冲
-       for i := 0; i < len(t.boot_buf); i++ {
-               if _, err := w.Write(t.boot_buf[i]); err != nil {
+       if len(t.boot_buf) != 0 {
+               if _, err := w.Write(t.boot_buf); err != nil {
                        return
                }
-       }
-       if len(t.boot_buf) != 0 && flushSupport {
-               flusher.Flush()
+               if flushSupport {
+                       flusher.Flush()
+               }
        }
 
        cancel := make(chan struct{})
@@ -1326,16 +1317,12 @@ func (t *M4SStream) pusherFlv(w http.ResponseWriter, r *http.Request) {
 }
 
 func (t *M4SStream) bootBufPush(buf []byte) {
-       if t.boot_buf != nil {
-               t.boot_buf_locker.Block()
-               defer t.boot_buf_locker.UnBlock()
-
-               if len(t.boot_buf) == t.boot_buf_size {
-                       for i := 0; i < len(t.boot_buf)-1; i++ {
-                               t.boot_buf[i] = t.boot_buf[i+1]
-                       }
-                       t.boot_buf = t.boot_buf[:len(t.boot_buf)-1]
-               }
-               t.boot_buf = append(t.boot_buf, buf)
+       t.boot_buf_locker.Block()
+       defer t.boot_buf_locker.UnBlock()
+       if len(t.boot_buf) < len(buf) {
+               t.boot_buf = append(t.boot_buf, make([]byte, len(buf)-len(t.boot_buf))...)
+       } else {
+               t.boot_buf = t.boot_buf[:len(buf)]
        }
+       copy(t.boot_buf, buf)
 }
diff --git a/go.mod b/go.mod
index 60c06d4a14195929c43db0ed93349901726d4a98..f97c1e190de18dfafa6ec0214e7505b68581a80c 100644 (file)
--- a/go.mod
+++ b/go.mod
@@ -6,7 +6,7 @@ require (
        github.com/gofrs/uuid v4.3.0+incompatible
        github.com/gotk3/gotk3 v0.6.1
        github.com/mdp/qrterminal/v3 v3.0.0
-       github.com/qydysky/part v0.23.3
+       github.com/qydysky/part v0.23.6
        github.com/skip2/go-qrcode v0.0.0-20200617195104-da1b6568686e
        github.com/skratchdot/open-golang v0.0.0-20200116055534-eef842397966
        golang.org/x/text v0.3.8
diff --git a/go.sum b/go.sum
index 60bb57d639bae19ae668bb127f9c9bbb4a9379b0..d6cf259bfec4454345ff17e7f272d7fd7c40f564 100644 (file)
--- a/go.sum
+++ b/go.sum
@@ -53,6 +53,12 @@ github.com/qydysky/part v0.23.2 h1:0lGZBEVanwQSQk925p89Cn+zRR+5Sr5c2lihnS25LeA=
 github.com/qydysky/part v0.23.2/go.mod h1:T6tQU8VYOVT+rX5v40Y7OeDWByz4fwDnyehpa+QIP2c=
 github.com/qydysky/part v0.23.3 h1:ydGN65ZwbBhWGe0uFVLdX1TNqDGEJerTH1fmBQlPgDc=
 github.com/qydysky/part v0.23.3/go.mod h1:T6tQU8VYOVT+rX5v40Y7OeDWByz4fwDnyehpa+QIP2c=
+github.com/qydysky/part v0.23.4 h1:bb1ruDPBFAJZa3gBMi+4Zlvp/Y9FB4NeCbN+uJMTUbE=
+github.com/qydysky/part v0.23.4/go.mod h1:T6tQU8VYOVT+rX5v40Y7OeDWByz4fwDnyehpa+QIP2c=
+github.com/qydysky/part v0.23.5 h1:or85qcoTNyvUifxazqpXB5i3tcLkOtZefpyNOs9MYRI=
+github.com/qydysky/part v0.23.5/go.mod h1:T6tQU8VYOVT+rX5v40Y7OeDWByz4fwDnyehpa+QIP2c=
+github.com/qydysky/part v0.23.6 h1:hSq3LKb+4TYGHagJB0/Nu5mXSs8yUit75suFR5IJvZE=
+github.com/qydysky/part v0.23.6/go.mod h1:T6tQU8VYOVT+rX5v40Y7OeDWByz4fwDnyehpa+QIP2c=
 github.com/shirou/gopsutil v3.21.11+incompatible h1:+1+c1VGhc88SSonWP6foOcLhvnKlUeu/erjjvaPEYiI=
 github.com/shirou/gopsutil v3.21.11+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA=
 github.com/skip2/go-qrcode v0.0.0-20200617195104-da1b6568686e h1:MRM5ITcdelLK2j1vwZ3Je0FKVCfqOLp5zO6trqMLYs0=