]> 127.0.0.1 Git - bili_danmu/.git/commitdiff
flv流服务减少io
authorqydysky <qydysky@foxmail.com>
Sat, 24 Apr 2021 02:22:42 +0000 (10:22 +0800)
committerqydysky <qydysky@foxmail.com>
Sat, 24 Apr 2021 02:22:42 +0000 (10:22 +0800)
Reply/F.go
Reply/flvDecode.go

index 28e867785d656e8ad4c823ce14a41157d3c6941f..c84201c99d7cb66bfa656f20c8016c379df3ad41 100644 (file)
@@ -206,6 +206,8 @@ func dtos(t time.Duration) string {
 type Savestream struct {
        path string
        hls_stream []byte//发送给客户的m3u8字节
+       flv_front []byte//flv头及首tag
+       flv_stream *msgq.Msgq//发送给客户的flv流关键帧间隔片
 
        max_m4s_hls int//m3u8最多有几个m4s
        min_m4s_hls int
@@ -229,6 +231,7 @@ type m4s_link_item struct {//使用指针以设置是否已下载
 }
 
 var savestream = Savestream {
+       flv_stream:msgq.New(10),//队列最多保留10个关键帧间隔片
        max_m4s_hls:15,
        min_m4s_hls:5,
 }
@@ -436,6 +439,29 @@ func Savestreamf(){
 
                        // no expect qn
                        exit_chan := make(chan struct{})
+                       go func(){//flv stream
+                               byteC := make(chan []byte,1024*1024*30)//传来的关键帧间隔buf为3s,避免超出buf,设为30M
+
+                               go func(){
+                                       for !p.Checkfile().IsExist(savestream.path + ".flv.dtmp") {
+                                               time.Sleep(time.Second)
+                                       }
+                                       if err := Stream(savestream.path + ".flv.dtmp",&savestream.flv_front,byteC,exit_chan);err != nil {
+                                               flog.L(`T: `,err);
+                                               return
+                                       }
+                               }()
+
+                               for {
+                                       select{
+                                       case res :=<- byteC:
+                                               savestream.flv_stream.Push_tag("stream",res)
+                                       case <- exit_chan:
+                                               savestream.flv_stream.Push_tag("close",nil)
+                                               return;
+                                       }
+                               }
+                       }()
                        if c.Live_want_qn < c.Live_qn {
                                go func(){
                                        for c.Live_want_qn < c.Live_qn {
@@ -469,6 +495,7 @@ func Savestreamf(){
 
                        l.L(`I: `,"结束")
                        Ass_f("", time.Now())//ass
+                       savestream.flv_front = []byte{}//flv头及首tag置空
                        p.FileMove(savestream.path+".flv.dtmp", savestream.path+".flv")
                } else {
                        savestream.path += "/"
@@ -1354,38 +1381,47 @@ func init() {
 
                                if filepath.Ext(path) == `.dtmp` {
                                        if strings.Contains(path,"flv") {
-                                               path = base_dir+path
+                                               // path = base_dir+path
 
                                                w.Header().Set("Connection", "Keep-Alive")
                                                w.Header().Set("Content-Type", "video/x-flv")
                                                w.Header().Set("X-Content-Type-Options", "nosniff")
                                                w.WriteHeader(http.StatusOK)
 
-                                               byteC := make(chan []byte,1024*1024*10)
-                                               cancel := make(chan struct{})
-                                               defer close(cancel)
-
-                                               go func(){
-                                                       if err := Stream(path,byteC,cancel);err != nil {
-                                                               flog.L(`T: `,err);
-                                                               return
-                                                       }
-                                               }()
-
-                                               flusher, flushSupport := w.(http.Flusher);
+                                               flusher, flushSupport := w.(http.Flusher)
                                                if flushSupport {flusher.Flush()}
 
-                                               var (
-                                                       err error
-                                               )
-                                               for err == nil {
-                                                       if b := <- byteC;len(b) != 0 {
-                                                               _,err = w.Write(b)
-                                                       } else {
-                                                               break
-                                                       }
-                                                       if flushSupport {flusher.Flush()}
+                                               //写入flv头,首tag
+                                               if n,err := w.Write(savestream.flv_front);err != nil {
+                                                       return
+                                               } else if flushSupport {
+                                                       flusher.Flush()
+                                               } else {
+                                                       fmt.Println("pass",n)
                                                }
+
+                                               cancel := make(chan struct{})
+
+                                               //flv流关键帧间隔切片
+                                               savestream.flv_stream.Pull_tag(map[string]func(interface{})(bool){
+                                                       `stream`:func(data interface{})(bool){
+                                                               if b,ok := data.([]byte);ok{
+                                                                       if _,err := w.Write(b);err != nil {
+                                                                               close(cancel)
+                                                                               return true
+                                                                       } else if flushSupport {
+                                                                               flusher.Flush()
+                                                                       }
+                                                               }
+                                                               return false
+                                                       },
+                                                       `close`:func(data interface{})(bool){
+                                                               close(cancel)
+                                                               return true
+                                                       },
+                                               })
+
+                                               <- cancel
                                        } else if strings.Contains(path,"m3u8")  {
                                        
                                                gmt, _ := time.LoadLocation("GMT")
index 396e9588748b02ecbfc43f3552e07d79b658105a..832b306c88cc2a6e5a33a6a0e06bd7e1bf8619a5 100644 (file)
@@ -31,7 +31,7 @@ var (
        send_sign = []byte{0x00}
 )
 
-func Stream(path string,streamChan chan []byte,cancel chan struct{}) (error) {
+func Stream(path string,front_buf *[]byte,streamChan chan []byte,cancel chan struct{}) (error) {
        flvlog.L(`T: `,path)
        defer flvlog.L(`T: `,`退出`)
        //file
@@ -47,7 +47,7 @@ func Stream(path string,streamChan chan []byte,cancel chan struct{}) (error) {
                buf := make([]byte, flv_header_size+previou_tag_size)
                if _,err := f.Read(buf);err != nil {return err}
                if bytes.Index(buf,flv_header_sign) != 0 {return errors.New(`no flv`)}
-               streamChan <- buf
+               *front_buf = append(*front_buf, buf...)
        }
 
        type flv_tag struct {
@@ -133,11 +133,11 @@ func Stream(path string,streamChan chan []byte,cancel chan struct{}) (error) {
        for {
                t := getTag(f)
                if t.Tag == script_tag {
-                       streamChan <- *t.Buf
+                       *front_buf = append(*front_buf, *t.Buf...)
                } else if t.Tag == video_tag {
                        if !first_video_tag {
                                first_video_tag = true
-                               streamChan <- *t.Buf
+                               *front_buf = append(*front_buf, *t.Buf...)
                        }
 
                        if t.FirstByte & 0xf0 == 0x10 {
@@ -152,7 +152,7 @@ func Stream(path string,streamChan chan []byte,cancel chan struct{}) (error) {
                } else if t.Tag == audio_tag {
                        if !first_audio_tag {
                                first_audio_tag = true
-                               streamChan <- *t.Buf
+                               *front_buf = append(*front_buf, *t.Buf...)
                        }
                } else {//eof_tag 
                        break;
@@ -168,9 +168,10 @@ func Stream(path string,streamChan chan []byte,cancel chan struct{}) (error) {
        //      last_video_keyframe_timestramp int32
        //      video_keyframe_speed int32
        // )
-       //copy
+       //copy when key frame
        {
                last_available_offset := last_keyframe_video_offsets[0]
+               var buf []byte
                // last_Timestamp := last_timestamps[0]
                for {
                        //退出
@@ -188,14 +189,13 @@ func Stream(path string,streamChan chan []byte,cancel chan struct{}) (error) {
                                f.Seek(seachtag(f),1)
                                continue
                        } else if t.Tag == video_tag {
-                               // if t.FirstByte & 0xf0 == 0x10 {
-                               //      video_keyframe_speed = t.Timestamp - last_video_keyframe_timestramp
-                               //      fmt.Println(`video_keyframe_speed`,video_keyframe_speed)
-                               //      last_video_keyframe_timestramp = t.Timestamp
-                               // }
-                               streamChan <- *t.Buf
+                               if t.FirstByte & 0xf0 == 0x10 {
+                                       streamChan <- buf
+                                       buf = []byte{}
+                               }
+                               buf = append(buf, *t.Buf...)
                        } else if t.Tag == audio_tag {
-                               streamChan <- *t.Buf
+                               buf = append(buf, *t.Buf...)
                        } else if t.Tag != script_tag {
                                ;
                        }