"fmt"
"os"
"io"
+ "io/fs"
"strconv"
"strings"
"sync"
b []byte//发送给客户的m3u8字节
t time.Time
}
- flv_front []byte//flv头及首tag
- flv_stream *msgq.Msgq//发送给客户的flv流关键帧间隔片
+ front []byte//flv头及首tag or hls的初始化m4s
+ stream *msgq.Msgq//发送给客户的flv流关键帧间隔片 or hls的fmp4片
m4s_hls int//hls list 中的m4s数量
hlsbuffersize int//hls list缓冲m4s数量
}
type hls_generate struct {
+ hls_first_fmp4_name string
hls_file_header []byte//发送给客户的m3u8不变头
m4s_list []*m4s_link_item//m4s列表 缓冲
}
)
var savestream = Savestream {
- flv_stream:msgq.New(10),//队列最多保留10个关键帧间隔片
+ stream:msgq.New(10),//队列最多保留10个关键帧间隔片
m4s_hls:8,
}
if offset,_ := out.Seek(0,1);offset == 0 {
// fmt.Println(`添加头`,len(req.front))
//stream
- savestream.flv_front = req.front
+ savestream.front = req.front
out.Write(req.front)
}
for i:=0;i<len(req.keyframe);i+=1 {
//stream
- savestream.flv_stream.Push_tag("stream",req.keyframe[i])
+ savestream.stream.Push_tag("stream",req.keyframe[i])
out.Write(req.keyframe[i])
}
return false
for i:=0;i<len(req.keyframe);i+=1 {
//stream
- savestream.flv_stream.Push_tag("stream",req.keyframe[i])
+ savestream.stream.Push_tag("stream",req.keyframe[i])
out.Write(req.keyframe[i])
}
// reqs_keyframe[0] = [][]byte{reqs_keyframe[0][len(reqs_keyframe[0])-1]}
for i:=0;i<len(req.keyframe);i+=1 {
//stream
- savestream.flv_stream.Push_tag("stream",req.keyframe[i])
+ savestream.stream.Push_tag("stream",req.keyframe[i])
out.Write(req.keyframe[i])
}
reqs_used_id = reqs_used_id[merged:]
//stream
- savestream.flv_stream.Push_tag("stream",b)
+ savestream.stream.Push_tag("stream",b)
out.Write(b)
}
l.L(`I: `,"结束")
Ass_f("", time.Now())//ass
- savestream.flv_front = []byte{}//flv头及首tag置空
+ savestream.front = []byte{}//flv头及首tag置空
p.FileMove(savestream.path+".flv.dtmp", savestream.path+".flv")
} else {
savestream.path += "/"
Ass_f(savestream.path+"0", time.Now())
var (
- hls_msg = msgq.New(10)
+ hls_msg = msgq.New(20)
hls_gen hls_generate
DISCONTINUITY int
SEQUENCE int
continue
}
//#EXTINF
- if hls_gen.m4s_list[0].isshow {SEQUENCE += 1}
+ if hls_gen.m4s_list[0].isshow {
+ SEQUENCE += 1
+
+ {//stream hls2mp4
+ var buf []byte
+ //stream front
+ if len(savestream.front) == 0 {
+ buf,_,e := get_m4s_cache(savestream.path+hls_gen.hls_first_fmp4_name)
+ if e == nil {
+ savestream.front = buf
+ } else {
+ l.L(`W: `,`推送mp4流错误,无法读取文件` ,e)
+ }
+ }
+ //stream body
+ buf,_,e := get_m4s_cache(savestream.path+hls_gen.m4s_list[0].Base)
+ if e == nil {
+ savestream.stream.Push_tag(`stream`, buf)
+ } else {
+ l.L(`W: `,`推送mp4流错误,无法读取文件` ,e)
+ }
+ }
+ }
}
return false
{
for row,i:=bytes.SplitAfter(file_add, []byte("\n")),0;i<len(row);i+=1 {
if bytes.Contains(row[i], []byte("#EXT")) {
+ //set stream front
+ if op := bytes.Index(row[i], []byte(`#EXT-X-MAP:URI="`));op != -1 {
+ hls_gen.hls_first_fmp4_name = string(row[i][op+16:len(row[i])-2])
+ }
if bytes.Contains(row[i], []byte("#EXT-X-MEDIA-SEQUENCE")) || bytes.Contains(row[i], []byte("#EXTINF")){continue}
res = append(res, row[i]...)
}
if _,ok := m4s_cache.Load(path + link.Base);!ok{
m4s_cache.Store(path + link.Base, r.Respon)
go func(){//移除
- time.Sleep(time.Second*time.Duration(savestream.hlsbuffersize+savestream.m4s_hls+1))
+ time.Sleep(time.Second*time.Duration(savestream.hlsbuffersize+savestream.m4s_hls+2))
m4s_cache.Delete(path + link.Base)
}()
}
}
hls_msg.Push_tag(`close`, nil)
+ savestream.front = []byte{}//头置空
l.L(`I: `,"结束")
Ass_f("", time.Now())//ass
}
var m4s_cache sync.Map//使用内存cache避免频繁io
+func get_m4s_cache(path string)(buf []byte, cached bool, err error){
+ if b,ok := m4s_cache.Load(path);!ok{
+ f,e := os.OpenFile(path,os.O_RDONLY,0644)
+ if e != nil {
+ err = e
+ return
+ }
+ defer f.Close()
+
+ if b,e := io.ReadAll(f);e != nil {
+ err = e
+ return
+ } else {
+ buf = b
+ m4s_cache.Store(path,buf)
+ go func(){//移除
+ time.Sleep(time.Second*time.Duration(savestream.m4s_hls+2))
+ m4s_cache.Delete(path)
+ }()
+ }
+ } else {
+ cached = true
+ buf,_ = b.([]byte)
+ }
+ return
+}
+
//直播Web服务口
func init() {
flog := flog.Base_add(`直播Web服务`)
w.Header().Set("Server", "live")
if filepath.Ext(path) == `.dtmp` {
if strings.Contains(path,".flv") {
+ if len(savestream.front) == 0 {
+ w.Header().Set("Retry-After", "1")
+ w.WriteHeader(http.StatusServiceUnavailable)
+ return
+ }
+
// path = base_dir+path
w.Header().Set("Content-Type", "video/x-flv")
w.WriteHeader(http.StatusOK)
if flushSupport {flusher.Flush()}
//写入flv头,首tag
- if _,err := w.Write(savestream.flv_front);err != nil {
+ if _,err := w.Write(savestream.front);err != nil {
return
} else if flushSupport {
flusher.Flush()
cancel := make(chan struct{})
//flv流关键帧间隔切片
- savestream.flv_stream.Pull_tag(map[string]func(interface{})(bool){
+ savestream.stream.Pull_tag(map[string]func(interface{})(bool){
`stream`:func(data interface{})(bool){
if b,ok := data.([]byte);ok{
if _,err := w.Write(b);err != nil {
<- cancel
} else if strings.Contains(path,".m3u8") {
- w.Header().Set("Cache-Control", "max-age=1")
- w.Header().Set("Content-Type", "application/vnd.apple.mpegurl")
- w.Header().Set("Last-Modified", savestream.hls_stream.t.Format(http.TimeFormat))
+ if r.URL.Query().Get("type") == "mp4" {
+ if len(savestream.front) == 0 {
+ w.Header().Set("Retry-After", "1")
+ w.WriteHeader(http.StatusServiceUnavailable)
+ return
+ }
- // //经常m4s下载速度赶不上,使用阻塞避免频繁获取列表带来的卡顿
- // if time.Now().Sub(savestream.hls_stream.t).Seconds() > 1 {
- // time.Sleep(time.Duration(3)*time.Second)
- // }
+ // path = base_dir+path
+ w.Header().Set("Content-Type", "video/mp4")
+ w.WriteHeader(http.StatusOK)
- res := savestream.hls_stream.b
+ flusher, flushSupport := w.(http.Flusher)
+ if flushSupport {flusher.Flush()}
- if len(res) == 0 {
- w.Header().Set("Retry-After", "1")
- w.WriteHeader(http.StatusServiceUnavailable)
- return
- }
+ //写入hls头
+ if _,err := w.Write(savestream.front);err != nil {
+ return
+ } else if flushSupport {
+ flusher.Flush()
+ }
- //Server-Timing
- w.Header().Set("Server-Timing", fmt.Sprintf("dur=%d", time.Since(start).Microseconds()))
+ cancel := make(chan struct{})
+
+ //hls切片
+ savestream.stream.Pull_tag(map[string]func(interface{})(bool){
+ `stream`:func(data interface{})(bool){
+ if b,ok := data.([]byte);ok{
+ if _,err := w.Write(b);err != nil {
+ close(cancel)
+ return true
+ } else if flushSupport {
+ flusher.Flush()
+ }
+ }
+ return false
+ },
+ `close`:func(data interface{})(bool){
+ close(cancel)
+ return true
+ },
+ })
- if _,err := w.Write(res);err != nil {
- flog.L(`E: `,err)
- return
+ <- cancel
+ } else {
+ w.Header().Set("Cache-Control", "max-age=1")
+ w.Header().Set("Content-Type", "application/vnd.apple.mpegurl")
+ w.Header().Set("Last-Modified", savestream.hls_stream.t.Format(http.TimeFormat))
+
+ // //经常m4s下载速度赶不上,使用阻塞避免频繁获取列表带来的卡顿
+ // if time.Now().Sub(savestream.hls_stream.t).Seconds() > 1 {
+ // time.Sleep(time.Duration(3)*time.Second)
+ // }
+
+ res := savestream.hls_stream.b
+
+ if len(res) == 0 {
+ w.Header().Set("Retry-After", "1")
+ w.WriteHeader(http.StatusServiceUnavailable)
+ return
+ }
+
+ //Server-Timing
+ w.Header().Set("Server-Timing", fmt.Sprintf("dur=%d", time.Since(start).Microseconds()))
+
+ if _,err := w.Write(res);err != nil {
+ flog.L(`E: `,err)
+ return
+ }
}
}
} else if filepath.Ext(path) == `.m4s` {
path = base_dir+path
- var (
- buf []byte
- cached bool
- )
-
- if b,ok := m4s_cache.Load(path);!ok{
- f,err := os.OpenFile(path,os.O_RDONLY,0644)
- if err != nil {
- flog.L(`E: `,err);
- return
- }
- defer f.Close()
+ buf,cached,e := get_m4s_cache(path)
- if b,e := io.ReadAll(f);e != nil {
- flog.L(`E: `,e)
- w.Header().Set("Retry-After", "1")
- w.WriteHeader(http.StatusServiceUnavailable)
- return
- } else {
- buf = b
- m4s_cache.Store(path,buf)
- go func(){//移除
- time.Sleep(time.Second*time.Duration(savestream.m4s_hls+1))
- m4s_cache.Delete(path)
- }()
- }
- } else {
- cached = true
- buf,_ = b.([]byte)
+ if e != nil {
+ w.Header().Set("Retry-After", "1")
+ w.WriteHeader(http.StatusServiceUnavailable)
+ return
}
if len(buf) == 0 {
}
} else {
w.Header().Set("Server", "file")
+ if r.URL.Query().Get("type") == "mp4" {//hls拼合
+ dir := base_dir+filepath.Dir(path)+"/"
+ if !p.Checkfile().IsExist(dir+`0.m3u8`) {
+ w.WriteHeader(http.StatusNotFound)
+ return
+ }
+ var m4slist []string
+ if e := filepath.WalkDir(dir, func(path string, info fs.DirEntry, err error) error {
+ if err != nil {return err}
+ if filepath.Ext(info.Name()) == ".m4s" {
+ m4slist = append(m4slist, path)
+ }
+ return nil
+ });e != nil {
+ flog.L(`E: `,e);
+ w.WriteHeader(http.StatusServiceUnavailable)
+ return
+ }
+ m4slist = append(m4slist[len(m4slist)-1:], m4slist[:len(m4slist)-1]...)
+ for _,v := range m4slist {
+ f,err := os.OpenFile(v,os.O_RDONLY,0644)
+ if err != nil {
+ w.WriteHeader(http.StatusServiceUnavailable)
+ flog.L(`E: `,err)
+ return
+ }
+ io.Copy(w, f)
+ f.Close()
+ }
+ return
+ }
http.FileServer(http.Dir(base_dir)).ServeHTTP(w,r)
}
}
w.WriteHeader(http.StatusServiceUnavailable)
return
}
+ if r.URL.RawQuery != "" {
+ u.RawQuery = r.URL.RawQuery
+ }
// r.URL =
// root(w, r)
w.Header().Set("Location", r.URL.ResolveReference(u).String())