From 00592fb23250c5a9c75237ea46b37ec095ff4eee Mon Sep 17 00:00:00 2001 From: qydysky <32743305+qydysky@users.noreply.github.com> Date: Thu, 30 Mar 2023 03:44:50 +0800 Subject: [PATCH] =?utf8?q?Improve=20=E4=BC=98=E5=8C=96=E5=BD=95=E5=88=B6?= =?utf8?q?=E7=9A=84=E4=BA=8B=E4=BB=B6=E5=91=A8=E6=9C=9F?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit --- Reply/F.go | 32 +++++++---- Reply/stream.go | 143 ++++++++++++++++++++++++------------------------ 2 files changed, 95 insertions(+), 80 deletions(-) diff --git a/Reply/F.go b/Reply/F.go index 4d8a6e5..6644589 100644 --- a/Reply/F.go +++ b/Reply/F.go @@ -75,15 +75,15 @@ func cross(a string, buf []string) float32 { } // 在a中仅出现一次出现的字符占a的百分数 -func selfcross(a string) float32 { - buf := make(map[rune]bool) - for _, v := range a { - if _, ok := buf[v]; !ok { - buf[v] = true - } - } - return 1 - float32(len(buf))/float32(len([]rune(a))) -} +// func selfcross(a string) float32 { +// buf := make(map[rune]bool) +// for _, v := range a { +// if _, ok := buf[v]; !ok { +// buf[v] = true +// } +// } +// return 1 - float32(len(buf))/float32(len([]rune(a))) +// } // 在a的每个字符串中 // 出现的字符次数最多的 @@ -1348,7 +1348,19 @@ func init() { w.WriteHeader(http.StatusOK) // 推送数据 - currentStreamO.Pusher(w, r) + { + startFunc := func(_ *M4SStream) error { + flog.L(`T: `, r.RemoteAddr, `接入直播流`) + return nil + } + stopFunc := func(_ *M4SStream) error { + flog.L(`T: `, r.RemoteAddr, `断开直播流`) + return nil + } + if e := currentStreamO.PusherToHttp(w, r, startFunc, stopFunc); e != nil { + flog.L(`W: `, e) + } + } }) // 弹幕回放 diff --git a/Reply/stream.go b/Reply/stream.go index 91e6e00..3a7ad77 100644 --- a/Reply/stream.go +++ b/Reply/stream.go @@ -46,14 +46,16 @@ type M4SStream struct { first_buf []byte //m4s起始块 or flv起始块 boot_buf []byte //快速启动缓冲 boot_buf_locker funcCtrl.BlockFunc - last_m4s *m4s_link_item //最后一个切片 - m4s_pool *pool.Buf[m4s_link_item] //切片pool - common c.Common //通用配置副本 - Current_save_path string //明确的直播流保存目录 + last_m4s *m4s_link_item //最后一个切片 + m4s_pool *pool.Buf[m4s_link_item] //切片pool + common c.Common //通用配置副本 + Current_save_path string //明确的直播流保存目录 + // 事件周期 + // start: 开始实例 startRec:开始录制 load:接收到视频头 stopRec:结束录制 stop:结束实例 + msg *msgq.MsgType[*M4SStream] //实例的各种事件回调 Callback_start func(*M4SStream) error //实例开始的回调 Callback_startRec func(*M4SStream) error //录制开始的回调 Callback_stopRec func(*M4SStream) //录制结束的回调 - msg *msgq.MsgType[*M4SStream] // Callback_stop func(*M4SStream) //实例结束的回调 reqPool *pool.Buf[reqf.Req] //请求池 duration time.Duration //录制时长 @@ -583,13 +585,14 @@ func (t *M4SStream) saveStream() (e error) { } // 录制回调 + t.msg.Push_tag(`startRec`, t) if t.Callback_startRec != nil { if err := t.Callback_startRec(t); err != nil { t.log.L(`W: `, `开始录制回调错误`, err.Error()) return err } } - defer t.msg.Push_tag(`stoprec`, t) + defer t.msg.Push_tag(`stopRec`, t) // 移除历史流 if err := t.removeStream(); err != nil { @@ -1133,6 +1136,8 @@ func (t *M4SStream) Start() bool { } // 实例回调 + t.msg = msgq.NewType[*M4SStream]() + t.msg.Push_tag(`start`, t) if t.Callback_start != nil { if e := t.Callback_start(t); e != nil { t.log.L(`W: `, `开始回调错误`, e.Error()) @@ -1152,14 +1157,9 @@ func (t *M4SStream) Start() bool { // 初始化切片消息 t.Stream_msg = msgq.NewType[[]byte]() - t.msg = msgq.NewType[*M4SStream]() - t.msg.Pull_tag_only("fin", func(ms *M4SStream) (disable bool) { - return true - }) - // 设置事件 if t.Callback_stopRec != nil { - t.msg.Pull_tag_only("stoprec", func(ms *M4SStream) (disable bool) { + t.msg.Pull_tag_only("stopRec", func(ms *M4SStream) (disable bool) { t.Callback_stopRec(ms) return false }) @@ -1170,9 +1170,11 @@ func (t *M4SStream) Start() bool { return false }) } + t.msg.Pull_tag_only("stop", func(_ *M4SStream) (disable bool) { + return true + }) defer t.msg.Push_tag(`stop`, t) - defer t.msg.Push_tag(`fin`, nil) //指定房间录制回调 if v, ok := t.common.K_v.LoadV("指定房间录制回调").([]any); ok && len(v) > 0 { @@ -1185,7 +1187,7 @@ func (t *M4SStream) Start() bool { ) if len(after) > 2 { - t.msg.Pull_tag_async_only("stoprec", func(ms *M4SStream) (disable bool) { + t.msg.Pull_tag_async_only("stopRec", func(ms *M4SStream) (disable bool) { if durationS >= 0 && ms.duration.Seconds() > durationS { var cmds []string for i := 0; i < len(after); i++ { @@ -1249,82 +1251,72 @@ func (t *M4SStream) Stop() { t.exitSign.Wait() } -// 流服务推送方法 -func (t *M4SStream) Pusher(w http.ResponseWriter, r *http.Request) { - switch t.stream_type { - case `m3u8`: - t.pusherM4s(w, r) - case `mp4`: - t.pusherM4s(w, r) - case `flv`: - t.pusherFlv(w, r) - default: - t.log.L(`W: `, `Pusher no support stream_type`) - } -} - -func (t *M4SStream) pusherM4s(w http.ResponseWriter, r *http.Request) { - w.Header().Set("Content-Type", "video/mp4") - - flusher, flushSupport := w.(http.Flusher) - if flushSupport { - flusher.Flush() +// 保存到文件 +// filepath: 不包含后缀,会自动添加后缀 +func (t *M4SStream) PusherToFile(filepath string, startFunc func(*M4SStream) error, stopFunc func(*M4SStream) error) error { + f := file.New(filepath+"."+t.stream_type, 0, true) + if e := f.Delete(); e != nil { + return e } - //写入hls头 - if _, err := w.Write(t.getFirstBuf()); err != nil { - return - } else if flushSupport { - flusher.Flush() + if e := startFunc(t); e != nil { + return e } - //写入快速启动缓冲 + f.Write(t.getFirstBuf(), true) if len(t.boot_buf) != 0 { - if _, err := w.Write(t.boot_buf); err != nil { - return - } - if flushSupport { - flusher.Flush() - } + f.Write(t.boot_buf, true) } - - cancel := make(chan struct{}) - - //hls切片 + contextC, cancel := context.WithCancel(context.Background()) t.Stream_msg.Pull_tag(map[string]func([]byte) bool{ `data`: func(b []byte) bool { if len(b) == 0 { - close(cancel) - return true - } - if _, err := w.Write(b); err != nil { - close(cancel) + cancel() return true - } else if flushSupport { - flusher.Flush() } + f.Write(b, true) return false }, `close`: func(_ []byte) bool { - close(cancel) + cancel() return true }, }) + <-contextC.Done() + + if e := stopFunc(t); e != nil { + return e + } - <-cancel + return nil } -func (t *M4SStream) pusherFlv(w http.ResponseWriter, r *http.Request) { - w.Header().Set("Content-Type", "video/x-flv") +// 流服务推送方法 +func (t *M4SStream) PusherToHttp(w http.ResponseWriter, r *http.Request, startFunc func(*M4SStream) error, stopFunc func(*M4SStream) error) error { + switch t.stream_type { + case `m3u8`: + fallthrough + case `mp4`: + w.Header().Set("Content-Type", "video/mp4") + case `flv`: + w.Header().Set("Content-Type", "video/x-flv") + default: + w.WriteHeader(http.StatusNotFound) + return errors.New("pusher no support stream_type") + } + + if e := startFunc(t); e != nil { + return e + } flusher, flushSupport := w.(http.Flusher) if flushSupport { flusher.Flush() } - //写入flv头 + //写入头 if _, err := w.Write(t.getFirstBuf()); err != nil { - return + return err } else if flushSupport { flusher.Flush() } @@ -1332,24 +1324,29 @@ func (t *M4SStream) pusherFlv(w http.ResponseWriter, r *http.Request) { //写入快速启动缓冲 if len(t.boot_buf) != 0 { if _, err := w.Write(t.boot_buf); err != nil { - return + return err } if flushSupport { flusher.Flush() } } - cancel := make(chan struct{}) + contextC, cancel := context.WithCancel(r.Context()) - //flv + // t.Stream_msg.Pull_tag(map[string]func([]byte) bool{ `data`: func(b []byte) bool { + select { + case <-contextC.Done(): + return true + default: + } if len(b) == 0 { - close(cancel) + cancel() return true } if _, err := w.Write(b); err != nil { - close(cancel) + cancel() return true } else if flushSupport { flusher.Flush() @@ -1357,12 +1354,18 @@ func (t *M4SStream) pusherFlv(w http.ResponseWriter, r *http.Request) { return false }, `close`: func(_ []byte) bool { - close(cancel) + cancel() return true }, }) - <-cancel + <-contextC.Done() + + if e := stopFunc(t); e != nil { + return e + } + + return nil } func (t *M4SStream) bootBufPush(buf []byte) { -- 2.39.2