From e8bd39c0c5cdf815f43f0c11e44ae14875ca9440 Mon Sep 17 00:00:00 2001 From: qydysky Date: Sat, 24 Apr 2021 10:22:42 +0800 Subject: [PATCH] =?utf8?q?flv=E6=B5=81=E6=9C=8D=E5=8A=A1=E5=87=8F=E5=B0=91?= =?utf8?q?io?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit --- Reply/F.go | 82 +++++++++++++++++++++++++++++++++------------- Reply/flvDecode.go | 26 +++++++-------- 2 files changed, 72 insertions(+), 36 deletions(-) diff --git a/Reply/F.go b/Reply/F.go index 28e8677..c84201c 100644 --- a/Reply/F.go +++ b/Reply/F.go @@ -206,6 +206,8 @@ func dtos(t time.Duration) string { type Savestream struct { path string hls_stream []byte//发送给客户的m3u8字节 + flv_front []byte//flv头及首tag + flv_stream *msgq.Msgq//发送给客户的flv流关键帧间隔片 max_m4s_hls int//m3u8最多有几个m4s min_m4s_hls int @@ -229,6 +231,7 @@ type m4s_link_item struct {//使用指针以设置是否已下载 } var savestream = Savestream { + flv_stream:msgq.New(10),//队列最多保留10个关键帧间隔片 max_m4s_hls:15, min_m4s_hls:5, } @@ -436,6 +439,29 @@ func Savestreamf(){ // no expect qn exit_chan := make(chan struct{}) + go func(){//flv stream + byteC := make(chan []byte,1024*1024*30)//传来的关键帧间隔buf为3s,避免超出buf,设为30M + + go func(){ + for !p.Checkfile().IsExist(savestream.path + ".flv.dtmp") { + time.Sleep(time.Second) + } + if err := Stream(savestream.path + ".flv.dtmp",&savestream.flv_front,byteC,exit_chan);err != nil { + flog.L(`T: `,err); + return + } + }() + + for { + select{ + case res :=<- byteC: + savestream.flv_stream.Push_tag("stream",res) + case <- exit_chan: + savestream.flv_stream.Push_tag("close",nil) + return; + } + } + }() if c.Live_want_qn < c.Live_qn { go func(){ for c.Live_want_qn < c.Live_qn { @@ -469,6 +495,7 @@ func Savestreamf(){ l.L(`I: `,"结束") Ass_f("", time.Now())//ass + savestream.flv_front = []byte{}//flv头及首tag置空 p.FileMove(savestream.path+".flv.dtmp", savestream.path+".flv") } else { savestream.path += "/" @@ -1354,38 +1381,47 @@ func init() { if filepath.Ext(path) == `.dtmp` { if strings.Contains(path,"flv") { - path = base_dir+path + // path = base_dir+path w.Header().Set("Connection", "Keep-Alive") w.Header().Set("Content-Type", "video/x-flv") w.Header().Set("X-Content-Type-Options", "nosniff") w.WriteHeader(http.StatusOK) - byteC := make(chan []byte,1024*1024*10) - cancel := make(chan struct{}) - defer close(cancel) - - go func(){ - if err := Stream(path,byteC,cancel);err != nil { - flog.L(`T: `,err); - return - } - }() - - flusher, flushSupport := w.(http.Flusher); + flusher, flushSupport := w.(http.Flusher) if flushSupport {flusher.Flush()} - var ( - err error - ) - for err == nil { - if b := <- byteC;len(b) != 0 { - _,err = w.Write(b) - } else { - break - } - if flushSupport {flusher.Flush()} + //写入flv头,首tag + if n,err := w.Write(savestream.flv_front);err != nil { + return + } else if flushSupport { + flusher.Flush() + } else { + fmt.Println("pass",n) } + + cancel := make(chan struct{}) + + //flv流关键帧间隔切片 + savestream.flv_stream.Pull_tag(map[string]func(interface{})(bool){ + `stream`:func(data interface{})(bool){ + if b,ok := data.([]byte);ok{ + if _,err := w.Write(b);err != nil { + close(cancel) + return true + } else if flushSupport { + flusher.Flush() + } + } + return false + }, + `close`:func(data interface{})(bool){ + close(cancel) + return true + }, + }) + + <- cancel } else if strings.Contains(path,"m3u8") { gmt, _ := time.LoadLocation("GMT") diff --git a/Reply/flvDecode.go b/Reply/flvDecode.go index 396e958..832b306 100644 --- a/Reply/flvDecode.go +++ b/Reply/flvDecode.go @@ -31,7 +31,7 @@ var ( send_sign = []byte{0x00} ) -func Stream(path string,streamChan chan []byte,cancel chan struct{}) (error) { +func Stream(path string,front_buf *[]byte,streamChan chan []byte,cancel chan struct{}) (error) { flvlog.L(`T: `,path) defer flvlog.L(`T: `,`退出`) //file @@ -47,7 +47,7 @@ func Stream(path string,streamChan chan []byte,cancel chan struct{}) (error) { buf := make([]byte, flv_header_size+previou_tag_size) if _,err := f.Read(buf);err != nil {return err} if bytes.Index(buf,flv_header_sign) != 0 {return errors.New(`no flv`)} - streamChan <- buf + *front_buf = append(*front_buf, buf...) } type flv_tag struct { @@ -133,11 +133,11 @@ func Stream(path string,streamChan chan []byte,cancel chan struct{}) (error) { for { t := getTag(f) if t.Tag == script_tag { - streamChan <- *t.Buf + *front_buf = append(*front_buf, *t.Buf...) } else if t.Tag == video_tag { if !first_video_tag { first_video_tag = true - streamChan <- *t.Buf + *front_buf = append(*front_buf, *t.Buf...) } if t.FirstByte & 0xf0 == 0x10 { @@ -152,7 +152,7 @@ func Stream(path string,streamChan chan []byte,cancel chan struct{}) (error) { } else if t.Tag == audio_tag { if !first_audio_tag { first_audio_tag = true - streamChan <- *t.Buf + *front_buf = append(*front_buf, *t.Buf...) } } else {//eof_tag break; @@ -168,9 +168,10 @@ func Stream(path string,streamChan chan []byte,cancel chan struct{}) (error) { // last_video_keyframe_timestramp int32 // video_keyframe_speed int32 // ) - //copy + //copy when key frame { last_available_offset := last_keyframe_video_offsets[0] + var buf []byte // last_Timestamp := last_timestamps[0] for { //退出 @@ -188,14 +189,13 @@ func Stream(path string,streamChan chan []byte,cancel chan struct{}) (error) { f.Seek(seachtag(f),1) continue } else if t.Tag == video_tag { - // if t.FirstByte & 0xf0 == 0x10 { - // video_keyframe_speed = t.Timestamp - last_video_keyframe_timestramp - // fmt.Println(`video_keyframe_speed`,video_keyframe_speed) - // last_video_keyframe_timestramp = t.Timestamp - // } - streamChan <- *t.Buf + if t.FirstByte & 0xf0 == 0x10 { + streamChan <- buf + buf = []byte{} + } + buf = append(buf, *t.Buf...) } else if t.Tag == audio_tag { - streamChan <- *t.Buf + buf = append(buf, *t.Buf...) } else if t.Tag != script_tag { ; } -- 2.39.2