]> 127.0.0.1 Git - bili_danmu/.git/commitdiff
Improve fmp4模式减少内存分配及其他优化
authorqydysky <32743305+qydysky@users.noreply.github.com>
Sat, 4 Mar 2023 16:46:28 +0000 (00:46 +0800)
committerqydysky <32743305+qydysky@users.noreply.github.com>
Sat, 4 Mar 2023 16:46:28 +0000 (00:46 +0800)
CV/Var.go
Reply/flvDecode_test.go
Reply/fmp4Decode.go
Reply/fmp4Decode_test.go
Reply/stream.go
go.mod
go.sum

index 0745fcbeb3206766e6cd31cc5c341f6af0c8dff2..0a41dde1d2d18d5b1b0a43c60c6db810d913ea9a 100644 (file)
--- a/CV/Var.go
+++ b/CV/Var.go
@@ -241,6 +241,8 @@ func (t *Common) Init() Common {
 
                        type s struct {
                                MenInUse     string `json:"menInUse"`
+                               ReqPoolInUse int    `json:"reqPoolInUse"`
+                               ReqPoolSum   int    `json:"reqPoolSum"`
                                NumGoroutine int    `json:"numGoroutine"`
                                GoVersion    string `json:"goVersion"`
                        }
@@ -252,6 +254,8 @@ func (t *Common) Init() Common {
                                j{
                                        s{
                                                humanize.Bytes(memStats.HeapInuse + memStats.StackInuse),
+                                               t.ReqPool.PoolInUse(),
+                                               t.ReqPool.PoolSum(),
                                                runtime.NumGoroutine(),
                                                runtime.Version(),
                                        },
index 5cf89fa47f8b26c7728c97dbdc35a48b6ee61c7a..3ffae5a26b2d0b4abeec29f0cf09fb69851dac3c 100644 (file)
@@ -36,12 +36,13 @@ func Test_FLVdeal(t *testing.T) {
                if s := buff.Size(); max < s {
                        max = s
                }
-               front_buf, keyframe, last_available_offset, e := Search_stream_tag(buff.GetPureBuf())
+               keyframe := slice.New[byte]()
+               front_buf, last_available_offset, e := Search_stream_tag(buff.GetPureBuf(), keyframe)
                if e != nil {
                        t.Fatal(e)
                }
-               flog.Write([]byte(fmt.Sprintf("%d %d %d %d\n", c, len(front_buf), len(keyframe), last_available_offset)), true)
-               t.Log(c, len(front_buf), len(keyframe))
+               flog.Write([]byte(fmt.Sprintf("%d %d %d %d\n", c, len(front_buf), keyframe.Size(), last_available_offset)), true)
+               t.Log(c, len(front_buf), keyframe.Size())
                buff.RemoveFront(last_available_offset)
        }
        t.Log("max", humanize.Bytes(uint64(max)))
index e4e82cb6056afe922db866abd912aed250c8f4e9..938fd7dce96064ceee55c124e930f17323b50871 100644 (file)
@@ -113,7 +113,7 @@ func (t *Fmp4Decoder) Init_fmp4(buf []byte) (b []byte, err error) {
        return b, nil
 }
 
-func (t *Fmp4Decoder) Search_stream_fmp4(buf []byte) (cu int, keyframes [][]byte, err error) {
+func (t *Fmp4Decoder) Search_stream_fmp4(buf []byte, keyframe *slice.Buf[byte]) (cu int, err error) {
        if len(buf) > humanize.MByte*100 {
                err = errors.New("buf too large")
                return
@@ -243,7 +243,7 @@ func (t *Fmp4Decoder) Search_stream_fmp4(buf []byte) (cu int, keyframes [][]byte
                                //deal frame
                                if keyframeMoof {
                                        if v, e := t.buf.HadModified(bufModified); e == nil && v && !t.buf.IsEmpty() {
-                                               keyframes = append(keyframes, t.buf.GetCopyBuf())
+                                               keyframe.Append(t.buf.GetPureBuf())
                                                cu = m[0].i
                                                t.buf.Reset()
                                        }
@@ -332,7 +332,7 @@ func (t *Fmp4Decoder) Search_stream_fmp4(buf []byte) (cu int, keyframes [][]byte
                                //deal frame
                                if keyframeMoof {
                                        if v, e := t.buf.HadModified(bufModified); e == nil && v && !t.buf.IsEmpty() {
-                                               keyframes = append(keyframes, t.buf.GetCopyBuf())
+                                               keyframe.Append(t.buf.GetPureBuf())
                                                cu = m[0].i
                                                t.buf.Reset()
                                        }
index bc634ba3a1c881dfa5d994670bd1fc67a1a7988f..7ea6ffdff15f6ac283c33ba24cdb808768210b00 100644 (file)
@@ -46,7 +46,7 @@ func Test_deal(t *testing.T) {
                if e != nil {
                        t.Fatal(e)
                }
-               last_available_offset, _, e := fmp4Decoder.Search_stream_fmp4(buff.GetPureBuf())
+               last_available_offset, e := fmp4Decoder.Search_stream_fmp4(buff.GetPureBuf(), slice.New[byte]())
                if e != nil && e.Error() != "未初始化traks" {
                        t.Fatal(e)
                }
index 5406b8a57cbedbdde724df712f33574265c9698f..077116de3f4d73b0be251056037ba2a7278a4486 100644 (file)
@@ -38,10 +38,10 @@ type M4SStream struct {
        stream_last_modified time.Time          //流地址更新时间
        // stream_expires       int64              //流到期时间
        // stream_hosts      psync.Map  //使用的流服务器
-       stream_type       string     //流类型
-       Stream_msg        *msgq.Msgq //流数据消息 tag:data
-       first_buf         []byte     //m4s起始块 or flv起始块
-       boot_buf          []byte     //快速启动缓冲
+       stream_type       string                //流类型
+       Stream_msg        *msgq.MsgType[[]byte] //流数据消息 tag:data
+       first_buf         []byte                //m4s起始块 or flv起始块
+       boot_buf          []byte                //快速启动缓冲
        boot_buf_locker   funcCtrl.BlockFunc
        last_m4s          *m4s_link_item           //最后一个切片
        m4s_pool          *pool.Buf[m4s_link_item] //切片pool
@@ -63,14 +63,14 @@ type M4SStream_Config struct {
 }
 
 type m4s_link_item struct {
-       Url          string    // m4s链接
-       Base         string    // m4s文件名
-       status       int       // 下载状态 0:未下载 1:正在下载 2:下载完成 3:下载失败
-       tryDownCount int       // 下载次数 当=3时,不再下载,忽略此块
-       err          error     // 下载中出现的错误
-       data         []byte    // 下载的数据
-       createdTime  time.Time // 创建时间
-       pooledTime   time.Time // 到pool时间
+       Url          string           // m4s链接
+       Base         string           // m4s文件名
+       status       int              // 下载状态 0:未下载 1:正在下载 2:下载完成 3:下载失败
+       tryDownCount int              // 下载次数 当=3时,不再下载,忽略此块
+       err          error            // 下载中出现的错误
+       data         *slice.Buf[byte] // 下载的数据
+       createdTime  time.Time        // 创建时间
+       pooledTime   time.Time        // 到pool时间
 }
 
 func (t *m4s_link_item) copyTo(to *m4s_link_item) {
@@ -90,7 +90,7 @@ func (t *m4s_link_item) reset() *m4s_link_item {
        t.Base = ""
        t.status = 0
        t.tryDownCount = 0
-       t.data = t.data[:0]
+       t.data.Reset()
        t.createdTime = time.Now()
        return t
 }
@@ -111,7 +111,9 @@ func (t *M4SStream) getM4s() (p *m4s_link_item) {
        if t.m4s_pool == nil {
                t.m4s_pool = pool.New(
                        func() *m4s_link_item {
-                               return &m4s_link_item{}
+                               return &m4s_link_item{
+                                       data: slice.New[byte](),
+                               }
                        },
                        func(t *m4s_link_item) bool {
                                return t.createdTime.After(t.pooledTime) || time.Now().Before(t.pooledTime.Add(time.Second*10))
@@ -805,6 +807,7 @@ func (t *M4SStream) saveStreamM4s() (e error) {
        var (
                buf         = slice.New[byte]()
                fmp4Decoder = &Fmp4Decoder{}
+               keyframe    = slice.New[byte]()
                // flashingSer      bool
        )
 
@@ -892,17 +895,8 @@ func (t *M4SStream) saveStreamM4s() (e error) {
                                                // }
                                                link.status = 3 // 设置切片状态为下载失败
                                        } else {
-                                               // if usedt := r.UsedTime.Seconds(); usedt > 700 {
-                                               //      t.log.L(`I: `, `hls切片下载慢`, usedt, `ms`)
-                                               // }
-                                               if len(link.data) > len(r.Respon) {
-                                                       link.data = link.data[:copy(link.data, r.Respon)]
-                                               } else {
-                                                       // if cap(link.data) < len(r.Respon) {
-                                                       //      fmt.Println(cap(link.data), len(r.Respon))
-                                                       // }
-                                                       link.data = append(link.data[:0], r.Respon...)
-                                               }
+                                               link.data.Reset()
+                                               link.data.Append(r.Respon)
                                                link.status = 2 // 设置切片状态为下载完成
                                        }
                                }(v, t.Current_save_path)
@@ -937,10 +931,10 @@ func (t *M4SStream) saveStreamM4s() (e error) {
                        }
 
                        // no, _ := download_seq[k].getNo()
-                       // fmt.Println("download_seq ", no, download_seq[k].status, t.first_buf == nil)
+                       // fmt.Println("download_seq ", no, download_seq[k].status, download_seq[k].data.Size(), len(t.first_buf))
 
                        if strings.Contains(download_seq[k].Base, `h`) {
-                               if header, e := fmp4Decoder.Init_fmp4(download_seq[k].data); e != nil {
+                               if front_buf, e := fmp4Decoder.Init_fmp4(download_seq[k].data.GetPureBuf()); e != nil {
                                        t.log.L(`E: `, e, `重试!`)
                                        download_seq[k].status = 3
                                        break
@@ -949,7 +943,8 @@ func (t *M4SStream) saveStreamM4s() (e error) {
                                                // fmt.Println(`T: `, "找到trak:", string(trak.handlerType), trak.trackID, trak.timescale)
                                                t.log.L(`T: `, "找到trak:", string(trak.handlerType), trak.trackID, trak.timescale)
                                        }
-                                       t.first_buf = header
+                                       t.first_buf = make([]byte, len(front_buf))
+                                       copy(t.first_buf, front_buf)
                                        if out != nil {
                                                out.Write(t.first_buf, true)
                                                out.Sync()
@@ -966,12 +961,11 @@ func (t *M4SStream) saveStreamM4s() (e error) {
                                continue
                        }
 
-                       buf.Append(download_seq[k].data)
+                       buf.Append(download_seq[k].data.GetPureBuf())
                        t.putM4s(download_seq[k])
                        download_seq = append(download_seq[:k], download_seq[k+1:]...)
                        k -= 1
-
-                       last_available_offset, keyframes, err := fmp4Decoder.Search_stream_fmp4(buf.GetPureBuf())
+                       last_available_offset, err := fmp4Decoder.Search_stream_fmp4(buf.GetPureBuf(), keyframe)
                        if err != nil {
                                if !errors.Is(err, io.EOF) {
                                        t.log.L(`E: `, err)
@@ -987,18 +981,21 @@ func (t *M4SStream) saveStreamM4s() (e error) {
                                        //丢弃所有数据
                                        buf.Reset()
                                } else {
+                                       keyframe.Reset()
                                        last_available_offset = 0
                                }
                        }
 
-                       // no, _ := v.getNo()
-                       // fmt.Println(no, "fmp4KeyFrames", fmp4KeyFrames.size(), last_available_offset, err)
+                       // no, _ := download_seq[k].getNo()
+                       // fmt.Println(no, "fmp4KeyFrames", keyframe.Size(), last_available_offset, err)
 
-                       for _, keyframe := range keyframes {
-                               t.bootBufPush(keyframe)
-                               t.Stream_msg.Push_tag(`data`, keyframe)
+                       // 传递关键帧
+                       if !keyframe.IsEmpty() {
+                               t.bootBufPush(keyframe.GetPureBuf())
+                               keyframe.Reset()
+                               t.Stream_msg.Push_tag(`data`, t.boot_buf)
                                if out != nil {
-                                       out.Write(keyframe, true)
+                                       out.Write(t.boot_buf, true)
                                        out.Sync()
                                }
                        }
@@ -1132,7 +1129,7 @@ func (t *M4SStream) Start() bool {
                t.reqPool = t.common.ReqPool
 
                // 初始化切片消息
-               t.Stream_msg = msgq.New()
+               t.Stream_msg = msgq.NewType[[]byte]()
 
                // 主循环
                for t.Status.Islive() {
@@ -1239,23 +1236,21 @@ func (t *M4SStream) pusherM4s(w http.ResponseWriter, r *http.Request) {
        cancel := make(chan struct{})
 
        //hls切片
-       t.Stream_msg.Pull_tag(map[string]func(interface{}) bool{
-               `data`: func(data interface{}) bool {
-                       if b, ok := data.([]byte); ok {
-                               if len(b) == 0 {
-                                       close(cancel)
-                                       return true
-                               }
-                               if _, err := w.Write(b); err != nil {
-                                       close(cancel)
-                                       return true
-                               } else if flushSupport {
-                                       flusher.Flush()
-                               }
+       t.Stream_msg.Pull_tag(map[string]func([]byte) bool{
+               `data`: func(b []byte) bool {
+                       if len(b) == 0 {
+                               close(cancel)
+                               return true
+                       }
+                       if _, err := w.Write(b); err != nil {
+                               close(cancel)
+                               return true
+                       } else if flushSupport {
+                               flusher.Flush()
                        }
                        return false
                },
-               `close`: func(_ interface{}) bool {
+               `close`: func(_ []byte) bool {
                        close(cancel)
                        return true
                },
@@ -1292,23 +1287,21 @@ func (t *M4SStream) pusherFlv(w http.ResponseWriter, r *http.Request) {
        cancel := make(chan struct{})
 
        //flv
-       t.Stream_msg.Pull_tag(map[string]func(interface{}) bool{
-               `data`: func(data interface{}) bool {
-                       if b, ok := data.([]byte); ok {
-                               if len(b) == 0 {
-                                       close(cancel)
-                                       return true
-                               }
-                               if _, err := w.Write(b); err != nil {
-                                       close(cancel)
-                                       return true
-                               } else if flushSupport {
-                                       flusher.Flush()
-                               }
+       t.Stream_msg.Pull_tag(map[string]func([]byte) bool{
+               `data`: func(b []byte) bool {
+                       if len(b) == 0 {
+                               close(cancel)
+                               return true
+                       }
+                       if _, err := w.Write(b); err != nil {
+                               close(cancel)
+                               return true
+                       } else if flushSupport {
+                               flusher.Flush()
                        }
                        return false
                },
-               `close`: func(_ interface{}) bool {
+               `close`: func(_ []byte) bool {
                        close(cancel)
                        return true
                },
@@ -1322,8 +1315,6 @@ func (t *M4SStream) bootBufPush(buf []byte) {
        defer t.boot_buf_locker.UnBlock()
        if len(t.boot_buf) < len(buf) {
                t.boot_buf = append(t.boot_buf, make([]byte, len(buf)-len(t.boot_buf))...)
-       } else {
-               t.boot_buf = t.boot_buf[:len(buf)]
        }
-       copy(t.boot_buf, buf)
+       t.boot_buf = t.boot_buf[:copy(t.boot_buf, buf)]
 }
diff --git a/go.mod b/go.mod
index cc4a817c1de7ba74a82033f43e6bbcbe8a3698f1..32e2ce840c9121e4e986911b1f0682e1ab7a4dec 100644 (file)
--- a/go.mod
+++ b/go.mod
@@ -6,7 +6,7 @@ require (
        github.com/gofrs/uuid v4.3.0+incompatible
        github.com/gotk3/gotk3 v0.6.1
        github.com/mdp/qrterminal/v3 v3.0.0
-       github.com/qydysky/part v0.23.8
+       github.com/qydysky/part v0.23.14
        github.com/skip2/go-qrcode v0.0.0-20200617195104-da1b6568686e
        github.com/skratchdot/open-golang v0.0.0-20200116055534-eef842397966
        golang.org/x/text v0.3.8
diff --git a/go.sum b/go.sum
index 7a1916e494ecad7a3871f3cabf651886a1db017b..a2e1985f8f8484bd2db5b656c2c2900e7846998e 100644 (file)
--- a/go.sum
+++ b/go.sum
@@ -63,6 +63,18 @@ github.com/qydysky/part v0.23.7 h1:RlQskc+t9EYlIhKGP3QCZZ9Jts6t2x1FqC2RwJF2Ncg=
 github.com/qydysky/part v0.23.7/go.mod h1:T6tQU8VYOVT+rX5v40Y7OeDWByz4fwDnyehpa+QIP2c=
 github.com/qydysky/part v0.23.8 h1:gyOfutApa+M1DEYV1/pVppxDmSigIArrT+qE6k8sPlo=
 github.com/qydysky/part v0.23.8/go.mod h1:T6tQU8VYOVT+rX5v40Y7OeDWByz4fwDnyehpa+QIP2c=
+github.com/qydysky/part v0.23.9 h1:DALmi1yUIKIntU0iXTuk6bJV2SRGqBjWNe+c0Snbpng=
+github.com/qydysky/part v0.23.9/go.mod h1:T6tQU8VYOVT+rX5v40Y7OeDWByz4fwDnyehpa+QIP2c=
+github.com/qydysky/part v0.23.10 h1:i5wGkCUkNqXV0eDuKaHOXx2ow6hFNN0z8i5MPNlnkGo=
+github.com/qydysky/part v0.23.10/go.mod h1:T6tQU8VYOVT+rX5v40Y7OeDWByz4fwDnyehpa+QIP2c=
+github.com/qydysky/part v0.23.11 h1:mU4hNlKAPP4Rq7tW0pGEMaqyR9jtxPceKpkyFUB+gt4=
+github.com/qydysky/part v0.23.11/go.mod h1:T6tQU8VYOVT+rX5v40Y7OeDWByz4fwDnyehpa+QIP2c=
+github.com/qydysky/part v0.23.12 h1:hmLAHIyxPkYwD3IxW1tc9Ro4dLR/4TtxjeBP2hPkJHc=
+github.com/qydysky/part v0.23.12/go.mod h1:T6tQU8VYOVT+rX5v40Y7OeDWByz4fwDnyehpa+QIP2c=
+github.com/qydysky/part v0.23.13 h1:KPeT8To+UJw8waUGXIUVNyB9mQh5EtpHkdwFRgZMLEI=
+github.com/qydysky/part v0.23.13/go.mod h1:T6tQU8VYOVT+rX5v40Y7OeDWByz4fwDnyehpa+QIP2c=
+github.com/qydysky/part v0.23.14 h1:1clhXe+OmPai6E9F6ywYzUDShX8KJDv/Cd4QGOcxCkY=
+github.com/qydysky/part v0.23.14/go.mod h1:T6tQU8VYOVT+rX5v40Y7OeDWByz4fwDnyehpa+QIP2c=
 github.com/shirou/gopsutil v3.21.11+incompatible h1:+1+c1VGhc88SSonWP6foOcLhvnKlUeu/erjjvaPEYiI=
 github.com/shirou/gopsutil v3.21.11+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA=
 github.com/skip2/go-qrcode v0.0.0-20200617195104-da1b6568686e h1:MRM5ITcdelLK2j1vwZ3Je0FKVCfqOLp5zO6trqMLYs0=