From 1391ca997b4e946e078aac090bdffd5420de66be Mon Sep 17 00:00:00 2001 From: qydysky <32743305+qydysky@users.noreply.github.com> Date: Thu, 15 Sep 2022 22:04:52 +0800 Subject: [PATCH] =?utf8?q?=E5=BF=AB=E9=80=9F=E5=90=AF=E5=8A=A8=E7=BC=93?= =?utf8?q?=E5=86=B2=20=E7=9B=B4=E6=92=ADWeb=E7=BC=93=E5=86=B2=E9=95=BF?= =?utf8?q?=E5=BA=A6?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit --- Reply/stream.go | 79 ++++++++++++++++++++++--------------- bili_danmu.go | 2 +- demo/config/config_K_v.json | 4 +- 3 files changed, 51 insertions(+), 34 deletions(-) diff --git a/Reply/stream.go b/Reply/stream.go index 7ee5b85..e9fd176 100644 --- a/Reply/stream.go +++ b/Reply/stream.go @@ -37,6 +37,8 @@ type M4SStream struct { stream_type string //流类型 Stream_msg *msgq.Msgq //流数据消息 tag:data first_buf []byte //m4s起始块 or flv起始块 + boot_buf [][]byte //快速启动缓冲 + boot_buf_size int //快速启动缓冲长度 last_m4s *m4s_link_item //最后一个切片 common c.Common //通用配置副本 Current_save_path string //明确的直播流保存目录 @@ -481,6 +483,7 @@ func (t *M4SStream) saveStreamFlv() { for _, frame := range keyframe { // fmt.Println("write frame") out.Write(frame) + t.bootBufPush(frame) t.Stream_msg.Push_tag(`data`, frame) } if last_avilable_offset != 0 { @@ -544,29 +547,6 @@ func (t *M4SStream) saveStreamM4s() { Max: 3, } - // 直播流切片缓冲 - var ( - streamWebCache chan []byte - streamWebCacheLen int - ) - if v, ok := t.common.K_v.LoadV(`直播Web缓冲长度`).(float64); ok && v != 0 { - streamWebCacheLen = int(v) - streamWebCache = make(chan []byte, streamWebCacheLen) - defer close(streamWebCache) - go func() { - for { - if len(streamWebCache) <= streamWebCacheLen/2 { - time.Sleep(time.Second) - } - if data := <-streamWebCache; len(data) != 0 { - t.Stream_msg.Push_tag(`data`, data) - } else { - return - } - } - }() - } - // 下载循环 for download_seq := []*m4s_link_item{}; ; { @@ -645,14 +625,8 @@ func (t *M4SStream) saveStreamM4s() { if v.status == 2 { download_seq = download_seq[1:] - if streamWebCache != nil { - if streamWebCacheLen == len(streamWebCache) { - <-streamWebCache - } - streamWebCache <- v.data - } else { - t.Stream_msg.Push_tag(`data`, v.data) - } + t.bootBufPush(v.data) + t.Stream_msg.Push_tag(`data`, v.data) } else { break } @@ -756,6 +730,15 @@ func (t *M4SStream) Start() bool { // 初始化切片消息 t.Stream_msg = msgq.New(15) + // 初始化快速启动缓冲 + if v, ok := t.common.K_v.LoadV(`直播Web缓冲长度`).(float64); ok && v != 0 { + t.boot_buf_size = int(v) + t.boot_buf = make([][]byte, t.boot_buf_size) + defer func() { + t.boot_buf = nil + }() + } + // 主循环 for t.Status.Islive() { // 是否在直播 @@ -828,6 +811,15 @@ func (t *M4SStream) pusherM4s(w http.ResponseWriter, r *http.Request) { flusher.Flush() } + //写入快速启动缓冲 + if t.boot_buf != nil && len(t.boot_buf) > 0 { + if _, err := w.Write(t.getBootBuf()); err != nil { + return + } else if flushSupport { + flusher.Flush() + } + } + cancel := make(chan struct{}) //hls切片 @@ -871,6 +863,15 @@ func (t *M4SStream) pusherFlv(w http.ResponseWriter, r *http.Request) { flusher.Flush() } + //写入快速启动缓冲 + if t.boot_buf != nil && len(t.boot_buf) > 0 { + if _, err := w.Write(t.getBootBuf()); err != nil { + return + } else if flushSupport { + flusher.Flush() + } + } + cancel := make(chan struct{}) //hls切片 @@ -898,3 +899,19 @@ func (t *M4SStream) pusherFlv(w http.ResponseWriter, r *http.Request) { <-cancel } + +func (t *M4SStream) bootBufPush(buf []byte) { + if t.boot_buf != nil { + if len(t.boot_buf) == t.boot_buf_size { + t.boot_buf = t.boot_buf[1:] + } + t.boot_buf = append(t.boot_buf, buf) + } +} + +func (t *M4SStream) getBootBuf() (buf []byte) { + for i := 0; i < len(t.boot_buf); i++ { + buf = append(buf, t.boot_buf[i]...) + } + return buf +} diff --git a/bili_danmu.go b/bili_danmu.go index 4004b38..7454f28 100644 --- a/bili_danmu.go +++ b/bili_danmu.go @@ -184,6 +184,7 @@ func Start(roomid ...int) { for i := 0; i < len(c.C.WSURL); i += 1 { v := c.C.WSURL[i] //ws启动 + danmulog.L(`T: `, "连接 "+v) u, _ := url.Parse(v) ws_c := ws.New_client(ws.Client{ Url: v, @@ -209,7 +210,6 @@ func Start(roomid ...int) { //SendChan 传入发送[]byte //RecvChan 接收[]byte - danmulog.L(`T: `, "连接", v) ws_c.SendChan <- F.HelloGen(c.C.Roomid, c.C.Token) if F.HelloChe(<-ws_c.RecvChan) { danmulog.L(`I: `, "已连接到房间", c.C.Uname, `(`, c.C.Roomid, `)`) diff --git a/demo/config/config_K_v.json b/demo/config/config_K_v.json index f36e4d9..a3788eb 100644 --- a/demo/config/config_K_v.json +++ b/demo/config/config_K_v.json @@ -63,8 +63,8 @@ "仅保存当前直播间流-help": "启用此项,才会保存Ass", "仅保存当前直播间流": true, "直播Web服务口":0, - "直播Web缓冲长度-help":"非负整数,越长直播流延迟越高越流畅内存占用越高", - "直播Web缓冲长度":10, + "直播Web缓冲长度-help":"非负整数,越长直播流延迟越高 内存占用越高", + "直播Web缓冲长度":3, "ass-help": "只有保存直播流时才考虑生成ass,ass编码默认GB18030(可选utf-8)", "生成Ass弹幕": true, "Ass编码": "GB18030", -- 2.39.2