//直播流保存
type Savestream struct {
path string
+ hls_stream []byte//发送给客户的m3u8字节
+
+ max_m4s_hls int//m3u8最多有几个m4s
+ min_m4s_hls int
+
wait *s.Signal
cancel *s.Signal
skipFunc funcCtrl.SkipFunc
}
+type hls_generate struct {
+ hls_file_header []byte//发送给客户的m3u8不变头
+ m4s_list []*m4s_link_item//m4s列表
+ sync.RWMutex
+}
+
+type m4s_link_item struct {//使用指针以设置是否已下载
+ Url string// m4s链接
+ Base string//m4s文件名
+ Offset_line int//m3u8中的行下标
+ downloaded bool//该m4s是否已下载
+}
+
var savestream = Savestream {
+ max_m4s_hls:15,
+ min_m4s_hls:5,
}
func init(){
if savestream.cancel.Islive() {return}
- type m4s_link_item struct {
- Url string
- Base string
- Offset_line int
- }
-
var (
no_found_link = errors.New("no_found_link")
- hls_get_link = func(m3u8_url string,last_download m4s_link_item) (need_download []m4s_link_item,m3u8_file_addition []byte,expires int,err error) {
+ hls_get_link = func(m3u8_url string,last_download *m4s_link_item) (need_download []*m4s_link_item,m3u8_file_addition []byte,expires int,err error) {
url_struct,e := url.Parse(m3u8_url)
if e != nil {
err = e
trid := query.Get("trid")
expires,_ = strconv.Atoi(query.Get("expires"))
- var m4s_links []m4s_link_item
+ var m4s_links []*m4s_link_item
lines := bytes.Split(r.Respon, []byte("\n"))
for i:=0;i<len(lines);i+=1 {
line := lines[i]
err = e
return
}
- m4s_links = append(m4s_links, m4s_link_item{
+ m4s_links = append(m4s_links, &m4s_link_item{
Url:url_struct.ResolveReference(u).String(),
Base:m4s_link,
Offset_line:i,
return
}
- if last_download.Base == "" {
+ if last_download == nil {
m3u8_file_addition = r.Respon
need_download = m4s_links
return
need_download = append(need_download, m4s_links[i:]...)
break
}
- found = last_download.Base == m4s_links[i].Base
+ found = (*last_download).Base == m4s_links[i].Base
}
if !found {
offset := m4s_links[1].Offset_line-1
savestream.path += "/"
l.L(`I: `,"保存到", savestream.path)
Ass_f(savestream.path+"0", time.Now())
-
+
+ var (
+ hls_gen hls_generate
+ exit_chan = make(chan struct{})//退出监听
+ )
+ {//hls stream gen 用户m3u8生成
+ go func(){
+ for {
+ select {
+ case <- time.After(time.Second):;
+ case <- exit_chan:
+ savestream.hls_stream = []byte{}//退出置空
+ return
+ }
+ //在设置下载标志时,需要进行操作,故加锁
+ hls_gen.Lock()
+ var res []byte
+ //add header
+ res = hls_gen.hls_file_header
+
+ //add EXT-X-MEDIA-SEQUENCE
+ m4s_list := hls_gen.m4s_list
+ if (*m4s_list[0]).Base[0] == 104 {
+ m4s_list = m4s_list[1:]
+ }
+ if len(m4s_list) > savestream.max_m4s_hls {//too much
+ cut_offset := 0
+ for i:=0;i<len(m4s_list);i+=1 {
+ if !(*m4s_list[i]).downloaded {break}
+ if i > savestream.min_m4s_hls {
+ cut_offset = i-savestream.min_m4s_hls
+ }
+ }
+ hls_gen.m4s_list = hls_gen.m4s_list[cut_offset:]
+ }
+ res = append(res, []byte((*m4s_list[0]).Base[:len((*m4s_list[0]).Base)-4])...)
+ res = append(res, []byte("\n")...)
+
+ //add m4s block
+ for i:=0;i<len(m4s_list);i+=1 {
+ if !(*m4s_list[i]).downloaded {break}
+ res = append(res, []byte("#EXTINF:1\n")...)
+ res = append(res, (*m4s_list[i]).Base...)
+ res = append(res, []byte("\n")...)
+ }
+ hls_gen.Unlock()
+
+ //去除最后一个换行
+ if len(res) > 0 {res = res[:len(res)-1]}
+
+ //设置到全局变量,方便流服务器获取
+ savestream.hls_stream = res
+ }
+ }()
+ }
+
type miss_download_T struct{
- List []m4s_link_item
+ List []*m4s_link_item
sync.RWMutex
}
var (
- last_download m4s_link_item
+ last_download *m4s_link_item
miss_download miss_download_T
- download_limit = limit.New(5,1000,0)
+ download_limit = limit.New(1,200,0)//download m4s per 200ms
)
expires := time.Now().Add(time.Minute*2).Unix()
break
}
+ //first block 获取并设置不变头
+ if last_download == nil {
+ var res []byte
+ {
+ fin_offset := bytes.LastIndex(file_add, []byte("EXT-X-MEDIA-SEQUENCE:"))+21
+ res = file_add[:fin_offset]
+ }
+ hls_gen.Lock()
+ hls_gen.hls_file_header = res
+ hls_gen.Unlock()
+ }
+
if len(links) == 0 {
time.Sleep(time.Second)
continue
}
//use guess
- if last_download.Base != "" {
- previou,_ := strconv.Atoi(last_download.Base[:len(last_download.Base)-4])
+ if last_download != nil {
+ previou,_ := strconv.Atoi((*last_download).Base[:len((*last_download).Base)-4])
now,_ := strconv.Atoi(links[0].Base[:len(links[0].Base)-4])
- if previou != now-1 {
+ if previou < now-1 {
if diff := now - previou;diff > 100 {
l.L(`W: `,`diff too large `,diff)
break
path_behind = "?"+u.RawQuery
}
+ //优先下载出错的,以尽快恢复客户流播放
//下载出错的
miss_download.RLock()
if len(miss_download.List) != 0 {
miss_download.RUnlock()
miss_download.Lock()
- links = append(miss_download.List,links...)
- miss_download.List = []m4s_link_item{}
+ links = append(miss_download.List, links...)
+ miss_download.List = []*m4s_link_item{}
miss_download.Unlock()
} else {
miss_download.RUnlock()
}
-
//出错期间没能获取到的
for i:=now-1;i>previou;i-=1 {
base := strconv.Itoa(i)+".m4s"
- links = append([]m4s_link_item{
- m4s_link_item{
+ links = append([]*m4s_link_item{
+ &m4s_link_item{
Url:path_front+base+path_behind,
Base:base,
},
- },links...)
+ }, links...)
}
}
}
})
for i:=0;i<len(links);i+=1 {
- go func(link m4s_link_item,path string){
+ go func(link *m4s_link_item,path string){
download_limit.TO()
r := reqf.Req()
if e := r.Reqf(reqf.Rval{
- Url:link.Url,
+ Url:(*link).Url,
Retry:3,
SleepTime:1,
- SaveToPath:path+link.Base,
+ SaveToPath:path+(*link).Base,
Timeout:3,
}); e != nil{
l.L(`I: `,e,`将重试!`)
+ //避免影响后续猜测
+ (*link).Offset_line = 0
miss_download.Lock()
miss_download.List = append(miss_download.List, link)
miss_download.Unlock()
+ } else {
+ (*link).downloaded = true
}
}(links[i],savestream.path)
- last_download = links[i]
+
+ //只记录最新
+ if links[i].Offset_line > 0 {
+ last_download = links[i]
+ }
+
+ {//store m4s to hls_gen
+ hls_gen.Lock()
+ hls_gen.m4s_list = append(hls_gen.m4s_list, links[i])
+ hls_gen.Unlock()
+ }
}
//m3u8_url 将过期
})
p.FileMove(savestream.path+"0.m3u8.dtmp", savestream.path+"0.m3u8")
l.L(`I: `,"结束")
+ close(exit_chan)//hls_stream
Ass_f("", time.Now())//ass
}
//set ro ``
} else {
addr += strconv.Itoa(port)
}
+
+ var cache sync.Map//使用内存cache避免频繁io
+
s := web.New(&http.Server{
Addr: addr,
})
if flushSupport {flusher.Flush()}
}
} else if strings.Contains(path,"m3u8") {
- m3u8_file := base_dir+path
-
- f,err := os.OpenFile(m3u8_file,os.O_RDONLY,0644)
- if err != nil {
- flog.L(`E: `,err);
- return
- }
- defer f.Close()
- w.Header().Set("Cache-Control", "max-age=3")
+ gmt, _ := time.LoadLocation("GMT")
+ w.Header().Set("Cache-Control", "max-age=1")
+ w.Header().Set("Last-Modified", time.Now().Add(time.Second).In(gmt).Format(time.RFC1123))
w.Header().Set("Content-Type", "application/vnd.apple.mpegurl")
w.Header().Set("Connection", "Keep-Alive")
-
- var res []byte
- {
- buf := make([]byte, 200)
- if _,err := f.Read(buf);err != nil {
- flog.L(`E: `,err);
- w.WriteHeader(http.StatusServiceUnavailable)
- return
- }
- fin_offset := bytes.LastIndex(buf, []byte("EXT-X-MEDIA-SEQUENCE:"))+21
- res = buf[:fin_offset]
+
+ res := savestream.hls_stream
+ if len(res) == 0 {
+ w.WriteHeader(http.StatusNotFound)
+ return
}
- var seed_m4s = func(f *os.File) (A,B []byte,e error) {
- f.Seek(-1000,2)
-
- buf := make([]byte, 1000)
- if _,err := f.Read(buf);err != nil {
- e = err
- return
- }
-
- //fast seed a suit m4s
- var sign_offset int
- {
- if offset := bytes.Index(buf, []byte("#EXTINF"));offset != -1 {
- sign_offset = offset
- } else {
- e = errors.New(`no found first #EXTINF`)
- return
- }
- for sign_offset < len(buf) {
- if offset := bytes.Index(buf[sign_offset:], []byte(".m4s"));offset != -1 {
- sign_offset = offset + sign_offset
- } else if sign_offset==0 {
- e = errors.New(`sign_offset`)
- return
- } else {
- break
- }
- if sign := buf[sign_offset-1] - 48;sign%3 == 0 {break}
- sign_offset += 4
- }
- }
-
- var (
- start_offset int
- end_offset int
- m4s_start_offset int
- m4s_end_offset int
- )
-
- if offset := bytes.Index(buf[sign_offset:], []byte("#EXTINF"));offset != -1 {
- start_offset = offset + sign_offset
- } else {
- e = errors.New(`start_offset`)
- return
- }
-
- if offset := bytes.Index(buf[start_offset:], []byte{0x0a});offset != -1 {
- m4s_start_offset = offset+start_offset+1
- } else {
- e = errors.New(`m4s_start_offset`)
- return
- }
-
- if offset := bytes.Index(buf[m4s_start_offset:], []byte{0x0a});offset != -1 {
- m4s_end_offset = offset+m4s_start_offset+1
- } else {
- e = errors.New(`m4s_end_offset`)
- return
- }
-
- end_offset += start_offset+7
-
- for i:=0;end_offset < len(buf);i+=1{
- if offset := bytes.Index(buf[end_offset:], []byte("#EXTINF"));offset != -1 {
- end_offset = offset+end_offset
- } else if end_offset==0 {
- e = errors.New(`end_offset`)
- return
- } else {
- e = errors.New("not enough data "+strconv.Itoa(i))
- return
- }
- if i>12 {break}
- end_offset += 7
- }
-
- A = buf[m4s_start_offset:m4s_end_offset]
- B = buf[start_offset:end_offset]
+ if _,err := w.Write(res);err != nil {
+ flog.L(`E: `,err)
return
}
-
- {
- A,B,e := seed_m4s(f)
-
- if e != nil {
- flog.L(`W: `,e);
- w.WriteHeader(http.StatusServiceUnavailable)
- return
- }
-
- res = append(res, A...)
+ }
+ } else if filepath.Ext(path) == `.m4s` {
+ path = base_dir+path
- res = append(res, B...)
- }
+ var buf []byte
- if _,err = w.Write(res);err != nil {
+ if b,ok := cache.Load(path);!ok{
+ f,err := os.OpenFile(path,os.O_RDONLY,0644)
+ if err != nil {
flog.L(`E: `,err);
return
}
+ defer f.Close()
+
+ b := make([]byte,1024*1024)
+ if n,e := f.Read(b);e != nil {
+ flog.L(`E: `,e)
+ w.WriteHeader(http.StatusServiceUnavailable)
+ return
+ } else if n == 1024*1024 {
+ flog.L(`W: `,`buf limit`)
+ w.WriteHeader(http.StatusServiceUnavailable)
+ return
+ } else {
+ buf = b[:n]
+ cache.Store(path,buf)
+ go func(){//移除
+ time.Sleep(time.Second*time.Duration(savestream.max_m4s_hls+1))
+ cache.Delete(path)
+ }()
+ }
+ } else {
+ buf,_ = b.([]byte)
+ }
+
+ if len(buf) == 0 {
+ flog.L(`W: `,`buf size 0`)
+ w.WriteHeader(http.StatusServiceUnavailable)
+ return
+ }
+
+ w.Header().Set("Content-Type", "application/octet-stream")
+ w.Header().Set("Connection", "Keep-Alive")
+ if _,err := w.Write(buf);err != nil {
+ flog.L(`E: `,err)
+ return
}
} else {
http.FileServer(http.Dir(base_dir)).ServeHTTP(w,r)