From: qydysky <32743305+qydysky@users.noreply.github.com> Date: Sat, 4 Mar 2023 16:46:28 +0000 (+0800) Subject: Improve fmp4模式减少内存分配及其他优化 X-Git-Tag: v0.7.0~1 X-Git-Url: http://127.0.0.1:8081/?a=commitdiff_plain;h=ca768d469e0fbd799c5e31f650dd6e8b39d95ed3;p=bili_danmu%2F.git Improve fmp4模式减少内存分配及其他优化 --- diff --git a/CV/Var.go b/CV/Var.go index 0745fcb..0a41dde 100644 --- a/CV/Var.go +++ b/CV/Var.go @@ -241,6 +241,8 @@ func (t *Common) Init() Common { type s struct { MenInUse string `json:"menInUse"` + ReqPoolInUse int `json:"reqPoolInUse"` + ReqPoolSum int `json:"reqPoolSum"` NumGoroutine int `json:"numGoroutine"` GoVersion string `json:"goVersion"` } @@ -252,6 +254,8 @@ func (t *Common) Init() Common { j{ s{ humanize.Bytes(memStats.HeapInuse + memStats.StackInuse), + t.ReqPool.PoolInUse(), + t.ReqPool.PoolSum(), runtime.NumGoroutine(), runtime.Version(), }, diff --git a/Reply/flvDecode_test.go b/Reply/flvDecode_test.go index 5cf89fa..3ffae5a 100644 --- a/Reply/flvDecode_test.go +++ b/Reply/flvDecode_test.go @@ -36,12 +36,13 @@ func Test_FLVdeal(t *testing.T) { if s := buff.Size(); max < s { max = s } - front_buf, keyframe, last_available_offset, e := Search_stream_tag(buff.GetPureBuf()) + keyframe := slice.New[byte]() + front_buf, last_available_offset, e := Search_stream_tag(buff.GetPureBuf(), keyframe) if e != nil { t.Fatal(e) } - flog.Write([]byte(fmt.Sprintf("%d %d %d %d\n", c, len(front_buf), len(keyframe), last_available_offset)), true) - t.Log(c, len(front_buf), len(keyframe)) + flog.Write([]byte(fmt.Sprintf("%d %d %d %d\n", c, len(front_buf), keyframe.Size(), last_available_offset)), true) + t.Log(c, len(front_buf), keyframe.Size()) buff.RemoveFront(last_available_offset) } t.Log("max", humanize.Bytes(uint64(max))) diff --git a/Reply/fmp4Decode.go b/Reply/fmp4Decode.go index e4e82cb..938fd7d 100644 --- a/Reply/fmp4Decode.go +++ b/Reply/fmp4Decode.go @@ -113,7 +113,7 @@ func (t *Fmp4Decoder) Init_fmp4(buf []byte) (b []byte, err error) { return b, nil } -func (t *Fmp4Decoder) Search_stream_fmp4(buf []byte) (cu int, keyframes [][]byte, err error) { +func (t *Fmp4Decoder) Search_stream_fmp4(buf []byte, keyframe *slice.Buf[byte]) (cu int, err error) { if len(buf) > humanize.MByte*100 { err = errors.New("buf too large") return @@ -243,7 +243,7 @@ func (t *Fmp4Decoder) Search_stream_fmp4(buf []byte) (cu int, keyframes [][]byte //deal frame if keyframeMoof { if v, e := t.buf.HadModified(bufModified); e == nil && v && !t.buf.IsEmpty() { - keyframes = append(keyframes, t.buf.GetCopyBuf()) + keyframe.Append(t.buf.GetPureBuf()) cu = m[0].i t.buf.Reset() } @@ -332,7 +332,7 @@ func (t *Fmp4Decoder) Search_stream_fmp4(buf []byte) (cu int, keyframes [][]byte //deal frame if keyframeMoof { if v, e := t.buf.HadModified(bufModified); e == nil && v && !t.buf.IsEmpty() { - keyframes = append(keyframes, t.buf.GetCopyBuf()) + keyframe.Append(t.buf.GetPureBuf()) cu = m[0].i t.buf.Reset() } diff --git a/Reply/fmp4Decode_test.go b/Reply/fmp4Decode_test.go index bc634ba..7ea6ffd 100644 --- a/Reply/fmp4Decode_test.go +++ b/Reply/fmp4Decode_test.go @@ -46,7 +46,7 @@ func Test_deal(t *testing.T) { if e != nil { t.Fatal(e) } - last_available_offset, _, e := fmp4Decoder.Search_stream_fmp4(buff.GetPureBuf()) + last_available_offset, e := fmp4Decoder.Search_stream_fmp4(buff.GetPureBuf(), slice.New[byte]()) if e != nil && e.Error() != "未初始化traks" { t.Fatal(e) } diff --git a/Reply/stream.go b/Reply/stream.go index 5406b8a..077116d 100644 --- a/Reply/stream.go +++ b/Reply/stream.go @@ -38,10 +38,10 @@ type M4SStream struct { stream_last_modified time.Time //流地址更新时间 // stream_expires int64 //流到期时间 // stream_hosts psync.Map //使用的流服务器 - stream_type string //流类型 - Stream_msg *msgq.Msgq //流数据消息 tag:data - first_buf []byte //m4s起始块 or flv起始块 - boot_buf []byte //快速启动缓冲 + stream_type string //流类型 + Stream_msg *msgq.MsgType[[]byte] //流数据消息 tag:data + first_buf []byte //m4s起始块 or flv起始块 + boot_buf []byte //快速启动缓冲 boot_buf_locker funcCtrl.BlockFunc last_m4s *m4s_link_item //最后一个切片 m4s_pool *pool.Buf[m4s_link_item] //切片pool @@ -63,14 +63,14 @@ type M4SStream_Config struct { } type m4s_link_item struct { - Url string // m4s链接 - Base string // m4s文件名 - status int // 下载状态 0:未下载 1:正在下载 2:下载完成 3:下载失败 - tryDownCount int // 下载次数 当=3时,不再下载,忽略此块 - err error // 下载中出现的错误 - data []byte // 下载的数据 - createdTime time.Time // 创建时间 - pooledTime time.Time // 到pool时间 + Url string // m4s链接 + Base string // m4s文件名 + status int // 下载状态 0:未下载 1:正在下载 2:下载完成 3:下载失败 + tryDownCount int // 下载次数 当=3时,不再下载,忽略此块 + err error // 下载中出现的错误 + data *slice.Buf[byte] // 下载的数据 + createdTime time.Time // 创建时间 + pooledTime time.Time // 到pool时间 } func (t *m4s_link_item) copyTo(to *m4s_link_item) { @@ -90,7 +90,7 @@ func (t *m4s_link_item) reset() *m4s_link_item { t.Base = "" t.status = 0 t.tryDownCount = 0 - t.data = t.data[:0] + t.data.Reset() t.createdTime = time.Now() return t } @@ -111,7 +111,9 @@ func (t *M4SStream) getM4s() (p *m4s_link_item) { if t.m4s_pool == nil { t.m4s_pool = pool.New( func() *m4s_link_item { - return &m4s_link_item{} + return &m4s_link_item{ + data: slice.New[byte](), + } }, func(t *m4s_link_item) bool { return t.createdTime.After(t.pooledTime) || time.Now().Before(t.pooledTime.Add(time.Second*10)) @@ -805,6 +807,7 @@ func (t *M4SStream) saveStreamM4s() (e error) { var ( buf = slice.New[byte]() fmp4Decoder = &Fmp4Decoder{} + keyframe = slice.New[byte]() // flashingSer bool ) @@ -892,17 +895,8 @@ func (t *M4SStream) saveStreamM4s() (e error) { // } link.status = 3 // 设置切片状态为下载失败 } else { - // if usedt := r.UsedTime.Seconds(); usedt > 700 { - // t.log.L(`I: `, `hls切片下载慢`, usedt, `ms`) - // } - if len(link.data) > len(r.Respon) { - link.data = link.data[:copy(link.data, r.Respon)] - } else { - // if cap(link.data) < len(r.Respon) { - // fmt.Println(cap(link.data), len(r.Respon)) - // } - link.data = append(link.data[:0], r.Respon...) - } + link.data.Reset() + link.data.Append(r.Respon) link.status = 2 // 设置切片状态为下载完成 } }(v, t.Current_save_path) @@ -937,10 +931,10 @@ func (t *M4SStream) saveStreamM4s() (e error) { } // no, _ := download_seq[k].getNo() - // fmt.Println("download_seq ", no, download_seq[k].status, t.first_buf == nil) + // fmt.Println("download_seq ", no, download_seq[k].status, download_seq[k].data.Size(), len(t.first_buf)) if strings.Contains(download_seq[k].Base, `h`) { - if header, e := fmp4Decoder.Init_fmp4(download_seq[k].data); e != nil { + if front_buf, e := fmp4Decoder.Init_fmp4(download_seq[k].data.GetPureBuf()); e != nil { t.log.L(`E: `, e, `重试!`) download_seq[k].status = 3 break @@ -949,7 +943,8 @@ func (t *M4SStream) saveStreamM4s() (e error) { // fmt.Println(`T: `, "找到trak:", string(trak.handlerType), trak.trackID, trak.timescale) t.log.L(`T: `, "找到trak:", string(trak.handlerType), trak.trackID, trak.timescale) } - t.first_buf = header + t.first_buf = make([]byte, len(front_buf)) + copy(t.first_buf, front_buf) if out != nil { out.Write(t.first_buf, true) out.Sync() @@ -966,12 +961,11 @@ func (t *M4SStream) saveStreamM4s() (e error) { continue } - buf.Append(download_seq[k].data) + buf.Append(download_seq[k].data.GetPureBuf()) t.putM4s(download_seq[k]) download_seq = append(download_seq[:k], download_seq[k+1:]...) k -= 1 - - last_available_offset, keyframes, err := fmp4Decoder.Search_stream_fmp4(buf.GetPureBuf()) + last_available_offset, err := fmp4Decoder.Search_stream_fmp4(buf.GetPureBuf(), keyframe) if err != nil { if !errors.Is(err, io.EOF) { t.log.L(`E: `, err) @@ -987,18 +981,21 @@ func (t *M4SStream) saveStreamM4s() (e error) { //丢弃所有数据 buf.Reset() } else { + keyframe.Reset() last_available_offset = 0 } } - // no, _ := v.getNo() - // fmt.Println(no, "fmp4KeyFrames", fmp4KeyFrames.size(), last_available_offset, err) + // no, _ := download_seq[k].getNo() + // fmt.Println(no, "fmp4KeyFrames", keyframe.Size(), last_available_offset, err) - for _, keyframe := range keyframes { - t.bootBufPush(keyframe) - t.Stream_msg.Push_tag(`data`, keyframe) + // 传递关键帧 + if !keyframe.IsEmpty() { + t.bootBufPush(keyframe.GetPureBuf()) + keyframe.Reset() + t.Stream_msg.Push_tag(`data`, t.boot_buf) if out != nil { - out.Write(keyframe, true) + out.Write(t.boot_buf, true) out.Sync() } } @@ -1132,7 +1129,7 @@ func (t *M4SStream) Start() bool { t.reqPool = t.common.ReqPool // 初始化切片消息 - t.Stream_msg = msgq.New() + t.Stream_msg = msgq.NewType[[]byte]() // 主循环 for t.Status.Islive() { @@ -1239,23 +1236,21 @@ func (t *M4SStream) pusherM4s(w http.ResponseWriter, r *http.Request) { cancel := make(chan struct{}) //hls切片 - t.Stream_msg.Pull_tag(map[string]func(interface{}) bool{ - `data`: func(data interface{}) bool { - if b, ok := data.([]byte); ok { - if len(b) == 0 { - close(cancel) - return true - } - if _, err := w.Write(b); err != nil { - close(cancel) - return true - } else if flushSupport { - flusher.Flush() - } + t.Stream_msg.Pull_tag(map[string]func([]byte) bool{ + `data`: func(b []byte) bool { + if len(b) == 0 { + close(cancel) + return true + } + if _, err := w.Write(b); err != nil { + close(cancel) + return true + } else if flushSupport { + flusher.Flush() } return false }, - `close`: func(_ interface{}) bool { + `close`: func(_ []byte) bool { close(cancel) return true }, @@ -1292,23 +1287,21 @@ func (t *M4SStream) pusherFlv(w http.ResponseWriter, r *http.Request) { cancel := make(chan struct{}) //flv - t.Stream_msg.Pull_tag(map[string]func(interface{}) bool{ - `data`: func(data interface{}) bool { - if b, ok := data.([]byte); ok { - if len(b) == 0 { - close(cancel) - return true - } - if _, err := w.Write(b); err != nil { - close(cancel) - return true - } else if flushSupport { - flusher.Flush() - } + t.Stream_msg.Pull_tag(map[string]func([]byte) bool{ + `data`: func(b []byte) bool { + if len(b) == 0 { + close(cancel) + return true + } + if _, err := w.Write(b); err != nil { + close(cancel) + return true + } else if flushSupport { + flusher.Flush() } return false }, - `close`: func(_ interface{}) bool { + `close`: func(_ []byte) bool { close(cancel) return true }, @@ -1322,8 +1315,6 @@ func (t *M4SStream) bootBufPush(buf []byte) { defer t.boot_buf_locker.UnBlock() if len(t.boot_buf) < len(buf) { t.boot_buf = append(t.boot_buf, make([]byte, len(buf)-len(t.boot_buf))...) - } else { - t.boot_buf = t.boot_buf[:len(buf)] } - copy(t.boot_buf, buf) + t.boot_buf = t.boot_buf[:copy(t.boot_buf, buf)] } diff --git a/go.mod b/go.mod index cc4a817..32e2ce8 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ require ( github.com/gofrs/uuid v4.3.0+incompatible github.com/gotk3/gotk3 v0.6.1 github.com/mdp/qrterminal/v3 v3.0.0 - github.com/qydysky/part v0.23.8 + github.com/qydysky/part v0.23.14 github.com/skip2/go-qrcode v0.0.0-20200617195104-da1b6568686e github.com/skratchdot/open-golang v0.0.0-20200116055534-eef842397966 golang.org/x/text v0.3.8 diff --git a/go.sum b/go.sum index 7a1916e..a2e1985 100644 --- a/go.sum +++ b/go.sum @@ -63,6 +63,18 @@ github.com/qydysky/part v0.23.7 h1:RlQskc+t9EYlIhKGP3QCZZ9Jts6t2x1FqC2RwJF2Ncg= github.com/qydysky/part v0.23.7/go.mod h1:T6tQU8VYOVT+rX5v40Y7OeDWByz4fwDnyehpa+QIP2c= github.com/qydysky/part v0.23.8 h1:gyOfutApa+M1DEYV1/pVppxDmSigIArrT+qE6k8sPlo= github.com/qydysky/part v0.23.8/go.mod h1:T6tQU8VYOVT+rX5v40Y7OeDWByz4fwDnyehpa+QIP2c= +github.com/qydysky/part v0.23.9 h1:DALmi1yUIKIntU0iXTuk6bJV2SRGqBjWNe+c0Snbpng= +github.com/qydysky/part v0.23.9/go.mod h1:T6tQU8VYOVT+rX5v40Y7OeDWByz4fwDnyehpa+QIP2c= +github.com/qydysky/part v0.23.10 h1:i5wGkCUkNqXV0eDuKaHOXx2ow6hFNN0z8i5MPNlnkGo= +github.com/qydysky/part v0.23.10/go.mod h1:T6tQU8VYOVT+rX5v40Y7OeDWByz4fwDnyehpa+QIP2c= +github.com/qydysky/part v0.23.11 h1:mU4hNlKAPP4Rq7tW0pGEMaqyR9jtxPceKpkyFUB+gt4= +github.com/qydysky/part v0.23.11/go.mod h1:T6tQU8VYOVT+rX5v40Y7OeDWByz4fwDnyehpa+QIP2c= +github.com/qydysky/part v0.23.12 h1:hmLAHIyxPkYwD3IxW1tc9Ro4dLR/4TtxjeBP2hPkJHc= +github.com/qydysky/part v0.23.12/go.mod h1:T6tQU8VYOVT+rX5v40Y7OeDWByz4fwDnyehpa+QIP2c= +github.com/qydysky/part v0.23.13 h1:KPeT8To+UJw8waUGXIUVNyB9mQh5EtpHkdwFRgZMLEI= +github.com/qydysky/part v0.23.13/go.mod h1:T6tQU8VYOVT+rX5v40Y7OeDWByz4fwDnyehpa+QIP2c= +github.com/qydysky/part v0.23.14 h1:1clhXe+OmPai6E9F6ywYzUDShX8KJDv/Cd4QGOcxCkY= +github.com/qydysky/part v0.23.14/go.mod h1:T6tQU8VYOVT+rX5v40Y7OeDWByz4fwDnyehpa+QIP2c= github.com/shirou/gopsutil v3.21.11+incompatible h1:+1+c1VGhc88SSonWP6foOcLhvnKlUeu/erjjvaPEYiI= github.com/shirou/gopsutil v3.21.11+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA= github.com/skip2/go-qrcode v0.0.0-20200617195104-da1b6568686e h1:MRM5ITcdelLK2j1vwZ3Je0FKVCfqOLp5zO6trqMLYs0=