]> 127.0.0.1 Git - bili_danmu/.git/commitdiff
Improve 优化录制的事件周期
authorqydysky <32743305+qydysky@users.noreply.github.com>
Wed, 29 Mar 2023 19:44:50 +0000 (03:44 +0800)
committerqydysky <32743305+qydysky@users.noreply.github.com>
Wed, 29 Mar 2023 19:44:50 +0000 (03:44 +0800)
Reply/F.go
Reply/stream.go

index 4d8a6e5dbb2e83a4310244bf2f3d1ad037d95cf0..66445892d6080b1437d73620e2b13e63fb1a01db 100644 (file)
@@ -75,15 +75,15 @@ func cross(a string, buf []string) float32 {
 }
 
 // 在a中仅出现一次出现的字符占a的百分数
-func selfcross(a string) float32 {
-       buf := make(map[rune]bool)
-       for _, v := range a {
-               if _, ok := buf[v]; !ok {
-                       buf[v] = true
-               }
-       }
-       return 1 - float32(len(buf))/float32(len([]rune(a)))
-}
+// func selfcross(a string) float32 {
+//     buf := make(map[rune]bool)
+//     for _, v := range a {
+//             if _, ok := buf[v]; !ok {
+//                     buf[v] = true
+//             }
+//     }
+//     return 1 - float32(len(buf))/float32(len([]rune(a)))
+// }
 
 // 在a的每个字符串中
 // 出现的字符次数最多的
@@ -1348,7 +1348,19 @@ func init() {
                        w.WriteHeader(http.StatusOK)
 
                        // 推送数据
-                       currentStreamO.Pusher(w, r)
+                       {
+                               startFunc := func(_ *M4SStream) error {
+                                       flog.L(`T: `, r.RemoteAddr, `接入直播流`)
+                                       return nil
+                               }
+                               stopFunc := func(_ *M4SStream) error {
+                                       flog.L(`T: `, r.RemoteAddr, `断开直播流`)
+                                       return nil
+                               }
+                               if e := currentStreamO.PusherToHttp(w, r, startFunc, stopFunc); e != nil {
+                                       flog.L(`W: `, e)
+                               }
+                       }
                })
 
                // 弹幕回放
index 91e6e0096941bc01a8925d43f1c7c05802a056e3..3a7ad7774f3e43dd81d3ea58c6e009bb43be977d 100644 (file)
@@ -46,14 +46,16 @@ type M4SStream struct {
        first_buf         []byte                //m4s起始块 or flv起始块
        boot_buf          []byte                //快速启动缓冲
        boot_buf_locker   funcCtrl.BlockFunc
-       last_m4s          *m4s_link_item            //最后一个切片
-       m4s_pool          *pool.Buf[m4s_link_item]  //切片pool
-       common            c.Common                  //通用配置副本
-       Current_save_path string                    //明确的直播流保存目录
+       last_m4s          *m4s_link_item           //最后一个切片
+       m4s_pool          *pool.Buf[m4s_link_item] //切片pool
+       common            c.Common                 //通用配置副本
+       Current_save_path string                   //明确的直播流保存目录
+       // 事件周期
+       // start: 开始实例 startRec:开始录制 load:接收到视频头 stopRec:结束录制 stop:结束实例
+       msg               *msgq.MsgType[*M4SStream] //实例的各种事件回调
        Callback_start    func(*M4SStream) error    //实例开始的回调
        Callback_startRec func(*M4SStream) error    //录制开始的回调
        Callback_stopRec  func(*M4SStream)          //录制结束的回调
-       msg               *msgq.MsgType[*M4SStream] //
        Callback_stop     func(*M4SStream)          //实例结束的回调
        reqPool           *pool.Buf[reqf.Req]       //请求池
        duration          time.Duration             //录制时长
@@ -583,13 +585,14 @@ func (t *M4SStream) saveStream() (e error) {
        }
 
        // 录制回调
+       t.msg.Push_tag(`startRec`, t)
        if t.Callback_startRec != nil {
                if err := t.Callback_startRec(t); err != nil {
                        t.log.L(`W: `, `开始录制回调错误`, err.Error())
                        return err
                }
        }
-       defer t.msg.Push_tag(`stoprec`, t)
+       defer t.msg.Push_tag(`stopRec`, t)
 
        // 移除历史流
        if err := t.removeStream(); err != nil {
@@ -1133,6 +1136,8 @@ func (t *M4SStream) Start() bool {
        }
 
        // 实例回调
+       t.msg = msgq.NewType[*M4SStream]()
+       t.msg.Push_tag(`start`, t)
        if t.Callback_start != nil {
                if e := t.Callback_start(t); e != nil {
                        t.log.L(`W: `, `开始回调错误`, e.Error())
@@ -1152,14 +1157,9 @@ func (t *M4SStream) Start() bool {
                // 初始化切片消息
                t.Stream_msg = msgq.NewType[[]byte]()
 
-               t.msg = msgq.NewType[*M4SStream]()
-               t.msg.Pull_tag_only("fin", func(ms *M4SStream) (disable bool) {
-                       return true
-               })
-
                // 设置事件
                if t.Callback_stopRec != nil {
-                       t.msg.Pull_tag_only("stoprec", func(ms *M4SStream) (disable bool) {
+                       t.msg.Pull_tag_only("stopRec", func(ms *M4SStream) (disable bool) {
                                t.Callback_stopRec(ms)
                                return false
                        })
@@ -1170,9 +1170,11 @@ func (t *M4SStream) Start() bool {
                                return false
                        })
                }
+               t.msg.Pull_tag_only("stop", func(_ *M4SStream) (disable bool) {
+                       return true
+               })
 
                defer t.msg.Push_tag(`stop`, t)
-               defer t.msg.Push_tag(`fin`, nil)
 
                //指定房间录制回调
                if v, ok := t.common.K_v.LoadV("指定房间录制回调").([]any); ok && len(v) > 0 {
@@ -1185,7 +1187,7 @@ func (t *M4SStream) Start() bool {
                                                )
 
                                                if len(after) > 2 {
-                                                       t.msg.Pull_tag_async_only("stoprec", func(ms *M4SStream) (disable bool) {
+                                                       t.msg.Pull_tag_async_only("stopRec", func(ms *M4SStream) (disable bool) {
                                                                if durationS >= 0 && ms.duration.Seconds() > durationS {
                                                                        var cmds []string
                                                                        for i := 0; i < len(after); i++ {
@@ -1249,82 +1251,72 @@ func (t *M4SStream) Stop() {
        t.exitSign.Wait()
 }
 
-// 流服务推送方法
-func (t *M4SStream) Pusher(w http.ResponseWriter, r *http.Request) {
-       switch t.stream_type {
-       case `m3u8`:
-               t.pusherM4s(w, r)
-       case `mp4`:
-               t.pusherM4s(w, r)
-       case `flv`:
-               t.pusherFlv(w, r)
-       default:
-               t.log.L(`W: `, `Pusher no support stream_type`)
-       }
-}
-
-func (t *M4SStream) pusherM4s(w http.ResponseWriter, r *http.Request) {
-       w.Header().Set("Content-Type", "video/mp4")
-
-       flusher, flushSupport := w.(http.Flusher)
-       if flushSupport {
-               flusher.Flush()
+// 保存到文件
+// filepath: 不包含后缀,会自动添加后缀
+func (t *M4SStream) PusherToFile(filepath string, startFunc func(*M4SStream) error, stopFunc func(*M4SStream) error) error {
+       f := file.New(filepath+"."+t.stream_type, 0, true)
+       if e := f.Delete(); e != nil {
+               return e
        }
 
-       //写入hls头
-       if _, err := w.Write(t.getFirstBuf()); err != nil {
-               return
-       } else if flushSupport {
-               flusher.Flush()
+       if e := startFunc(t); e != nil {
+               return e
        }
 
-       //写入快速启动缓冲
+       f.Write(t.getFirstBuf(), true)
        if len(t.boot_buf) != 0 {
-               if _, err := w.Write(t.boot_buf); err != nil {
-                       return
-               }
-               if flushSupport {
-                       flusher.Flush()
-               }
+               f.Write(t.boot_buf, true)
        }
-
-       cancel := make(chan struct{})
-
-       //hls切片
+       contextC, cancel := context.WithCancel(context.Background())
        t.Stream_msg.Pull_tag(map[string]func([]byte) bool{
                `data`: func(b []byte) bool {
                        if len(b) == 0 {
-                               close(cancel)
-                               return true
-                       }
-                       if _, err := w.Write(b); err != nil {
-                               close(cancel)
+                               cancel()
                                return true
-                       } else if flushSupport {
-                               flusher.Flush()
                        }
+                       f.Write(b, true)
                        return false
                },
                `close`: func(_ []byte) bool {
-                       close(cancel)
+                       cancel()
                        return true
                },
        })
+       <-contextC.Done()
+
+       if e := stopFunc(t); e != nil {
+               return e
+       }
 
-       <-cancel
+       return nil
 }
 
-func (t *M4SStream) pusherFlv(w http.ResponseWriter, r *http.Request) {
-       w.Header().Set("Content-Type", "video/x-flv")
+// 流服务推送方法
+func (t *M4SStream) PusherToHttp(w http.ResponseWriter, r *http.Request, startFunc func(*M4SStream) error, stopFunc func(*M4SStream) error) error {
+       switch t.stream_type {
+       case `m3u8`:
+               fallthrough
+       case `mp4`:
+               w.Header().Set("Content-Type", "video/mp4")
+       case `flv`:
+               w.Header().Set("Content-Type", "video/x-flv")
+       default:
+               w.WriteHeader(http.StatusNotFound)
+               return errors.New("pusher no support stream_type")
+       }
+
+       if e := startFunc(t); e != nil {
+               return e
+       }
 
        flusher, flushSupport := w.(http.Flusher)
        if flushSupport {
                flusher.Flush()
        }
 
-       //写入flv
+       //写入头
        if _, err := w.Write(t.getFirstBuf()); err != nil {
-               return
+               return err
        } else if flushSupport {
                flusher.Flush()
        }
@@ -1332,24 +1324,29 @@ func (t *M4SStream) pusherFlv(w http.ResponseWriter, r *http.Request) {
        //写入快速启动缓冲
        if len(t.boot_buf) != 0 {
                if _, err := w.Write(t.boot_buf); err != nil {
-                       return
+                       return err
                }
                if flushSupport {
                        flusher.Flush()
                }
        }
 
-       cancel := make(chan struct{})
+       contextC, cancel := context.WithCancel(r.Context())
 
-       //flv
+       //
        t.Stream_msg.Pull_tag(map[string]func([]byte) bool{
                `data`: func(b []byte) bool {
+                       select {
+                       case <-contextC.Done():
+                               return true
+                       default:
+                       }
                        if len(b) == 0 {
-                               close(cancel)
+                               cancel()
                                return true
                        }
                        if _, err := w.Write(b); err != nil {
-                               close(cancel)
+                               cancel()
                                return true
                        } else if flushSupport {
                                flusher.Flush()
@@ -1357,12 +1354,18 @@ func (t *M4SStream) pusherFlv(w http.ResponseWriter, r *http.Request) {
                        return false
                },
                `close`: func(_ []byte) bool {
-                       close(cancel)
+                       cancel()
                        return true
                },
        })
 
-       <-cancel
+       <-contextC.Done()
+
+       if e := stopFunc(t); e != nil {
+               return e
+       }
+
+       return nil
 }
 
 func (t *M4SStream) bootBufPush(buf []byte) {