type Savestream struct {
path string
hls_stream []byte//发送给客户的m3u8字节
+ flv_front []byte//flv头及首tag
+ flv_stream *msgq.Msgq//发送给客户的flv流关键帧间隔片
max_m4s_hls int//m3u8最多有几个m4s
min_m4s_hls int
}
var savestream = Savestream {
+ flv_stream:msgq.New(10),//队列最多保留10个关键帧间隔片
max_m4s_hls:15,
min_m4s_hls:5,
}
// no expect qn
exit_chan := make(chan struct{})
+ go func(){//flv stream
+ byteC := make(chan []byte,1024*1024*30)//传来的关键帧间隔buf为3s,避免超出buf,设为30M
+
+ go func(){
+ for !p.Checkfile().IsExist(savestream.path + ".flv.dtmp") {
+ time.Sleep(time.Second)
+ }
+ if err := Stream(savestream.path + ".flv.dtmp",&savestream.flv_front,byteC,exit_chan);err != nil {
+ flog.L(`T: `,err);
+ return
+ }
+ }()
+
+ for {
+ select{
+ case res :=<- byteC:
+ savestream.flv_stream.Push_tag("stream",res)
+ case <- exit_chan:
+ savestream.flv_stream.Push_tag("close",nil)
+ return;
+ }
+ }
+ }()
if c.Live_want_qn < c.Live_qn {
go func(){
for c.Live_want_qn < c.Live_qn {
l.L(`I: `,"结束")
Ass_f("", time.Now())//ass
+ savestream.flv_front = []byte{}//flv头及首tag置空
p.FileMove(savestream.path+".flv.dtmp", savestream.path+".flv")
} else {
savestream.path += "/"
if filepath.Ext(path) == `.dtmp` {
if strings.Contains(path,"flv") {
- path = base_dir+path
+ // path = base_dir+path
w.Header().Set("Connection", "Keep-Alive")
w.Header().Set("Content-Type", "video/x-flv")
w.Header().Set("X-Content-Type-Options", "nosniff")
w.WriteHeader(http.StatusOK)
- byteC := make(chan []byte,1024*1024*10)
- cancel := make(chan struct{})
- defer close(cancel)
-
- go func(){
- if err := Stream(path,byteC,cancel);err != nil {
- flog.L(`T: `,err);
- return
- }
- }()
-
- flusher, flushSupport := w.(http.Flusher);
+ flusher, flushSupport := w.(http.Flusher)
if flushSupport {flusher.Flush()}
- var (
- err error
- )
- for err == nil {
- if b := <- byteC;len(b) != 0 {
- _,err = w.Write(b)
- } else {
- break
- }
- if flushSupport {flusher.Flush()}
+ //写入flv头,首tag
+ if n,err := w.Write(savestream.flv_front);err != nil {
+ return
+ } else if flushSupport {
+ flusher.Flush()
+ } else {
+ fmt.Println("pass",n)
}
+
+ cancel := make(chan struct{})
+
+ //flv流关键帧间隔切片
+ savestream.flv_stream.Pull_tag(map[string]func(interface{})(bool){
+ `stream`:func(data interface{})(bool){
+ if b,ok := data.([]byte);ok{
+ if _,err := w.Write(b);err != nil {
+ close(cancel)
+ return true
+ } else if flushSupport {
+ flusher.Flush()
+ }
+ }
+ return false
+ },
+ `close`:func(data interface{})(bool){
+ close(cancel)
+ return true
+ },
+ })
+
+ <- cancel
} else if strings.Contains(path,"m3u8") {
gmt, _ := time.LoadLocation("GMT")
send_sign = []byte{0x00}
)
-func Stream(path string,streamChan chan []byte,cancel chan struct{}) (error) {
+func Stream(path string,front_buf *[]byte,streamChan chan []byte,cancel chan struct{}) (error) {
flvlog.L(`T: `,path)
defer flvlog.L(`T: `,`退出`)
//file
buf := make([]byte, flv_header_size+previou_tag_size)
if _,err := f.Read(buf);err != nil {return err}
if bytes.Index(buf,flv_header_sign) != 0 {return errors.New(`no flv`)}
- streamChan <- buf
+ *front_buf = append(*front_buf, buf...)
}
type flv_tag struct {
for {
t := getTag(f)
if t.Tag == script_tag {
- streamChan <- *t.Buf
+ *front_buf = append(*front_buf, *t.Buf...)
} else if t.Tag == video_tag {
if !first_video_tag {
first_video_tag = true
- streamChan <- *t.Buf
+ *front_buf = append(*front_buf, *t.Buf...)
}
if t.FirstByte & 0xf0 == 0x10 {
} else if t.Tag == audio_tag {
if !first_audio_tag {
first_audio_tag = true
- streamChan <- *t.Buf
+ *front_buf = append(*front_buf, *t.Buf...)
}
} else {//eof_tag
break;
// last_video_keyframe_timestramp int32
// video_keyframe_speed int32
// )
- //copy
+ //copy when key frame
{
last_available_offset := last_keyframe_video_offsets[0]
+ var buf []byte
// last_Timestamp := last_timestamps[0]
for {
//退出
f.Seek(seachtag(f),1)
continue
} else if t.Tag == video_tag {
- // if t.FirstByte & 0xf0 == 0x10 {
- // video_keyframe_speed = t.Timestamp - last_video_keyframe_timestramp
- // fmt.Println(`video_keyframe_speed`,video_keyframe_speed)
- // last_video_keyframe_timestramp = t.Timestamp
- // }
- streamChan <- *t.Buf
+ if t.FirstByte & 0xf0 == 0x10 {
+ streamChan <- buf
+ buf = []byte{}
+ }
+ buf = append(buf, *t.Buf...)
} else if t.Tag == audio_tag {
- streamChan <- *t.Buf
+ buf = append(buf, *t.Buf...)
} else if t.Tag != script_tag {
;
}