flv_front []byte//flv头及首tag
flv_stream *msgq.Msgq//发送给客户的flv流关键帧间隔片
- max_m4s_hls int//m3u8最多有几个m4s
- min_m4s_hls int
+ m4s_hls int
wait *s.Signal
cancel *s.Signal
Url string// m4s链接
Base string//m4s文件名
Offset_line int//m3u8中的行下标
- downloaded bool//该m4s是否已下载
+ status int//该m4s下载状态 s_noload:未下载 s_loading正在下载 s_fin下载完成 s_fail下载失败
}
+//m4s状态
+const (
+ s_noload = iota
+ s_loading
+ s_fin
+ s_fail
+)
var savestream = Savestream {
flv_stream:msgq.New(10),//队列最多保留10个关键帧间隔片
- max_m4s_hls:15,
- min_m4s_hls:5,
+ m4s_hls:5,
}
func init(){
r := reqf.New()
if e := r.Reqf(reqf.Rval{
- Url:m3u8_url,
- Retry:4,
- SleepTime:1000,
+ Url:m3u8_url+"&"+time.Now().Format("20060102150405"),
+ Retry:0,
+ ConnectTimeout:3000,
+ ReadTimeout:1000,
Proxy:c.Proxy,
- Timeout:3*1000,
Header:map[string]string{
`Host`: url_struct.Host,
`User-Agent`: `Mozilla/5.0 (X11; Linux x86_64; rv:83.0) Gecko/20100101 Firefox/83.0`,
(*links[0]).Base[0] == 104 {links = links[1:]}
hls_gen.m4s_list = append(hls_gen.m4s_list, links...)
- if len(hls_gen.m4s_list) > savestream.max_m4s_hls {//too much
- cut_offset := 0
- for i:=0;i<len(hls_gen.m4s_list);i+=1 {
- if !(*hls_gen.m4s_list[i]).downloaded {break}
- if i > savestream.min_m4s_hls {
- cut_offset = i-savestream.min_m4s_hls
- }
- }
- hls_gen.m4s_list = hls_gen.m4s_list[cut_offset:]
- }
+
+ // if len(hls_gen.m4s_list) > savestream.max_m4s_hls {//too much
+ // cut_offset := 0
+ // for i:=0;i<len(hls_gen.m4s_list);i+=1 {
+ // if (*hls_gen.m4s_list[i]).status <= s_loading && len(hls_gen.m4s_list) - i < 5 {
+ // break
+ // }
+ // if i > savestream.min_m4s_hls {
+ // cut_offset = i-savestream.min_m4s_hls
+ // }
+ // }
+ // hls_gen.m4s_list = hls_gen.m4s_list[cut_offset:]
+ // }
var res []byte
//add header
res = hls_gen.hls_file_header
- res = append(res, []byte((*hls_gen.m4s_list[0]).Base[:len((*hls_gen.m4s_list[0]).Base)-4])...)
- res = append(res, []byte("\n")...)
-
+
//add m4s block
- for i:=0;i<len(hls_gen.m4s_list);i+=1 {
- if !(*hls_gen.m4s_list[i]).downloaded {break}
- res = append(res, []byte("#EXTINF:1\n")...)
- res = append(res, (*hls_gen.m4s_list[i]).Base...)
- res = append(res, []byte("\n")...)
+ {
+ m4s := 0
+ m4s_list_b := []byte{}
+ var SEQUENCE string
+ for _,v := range hls_gen.m4s_list {
+ if v.status != s_fin {
+ // if m4s < savestream.m4s_hls && len(hls_gen.m4s_list) > 3+savestream.m4s_hls {
+ continue
+ // }
+ // break
+ }
+ m4s += 1
+ if m4s == 1 {SEQUENCE = strings.ReplaceAll(v.Base, ".m4s", "")}
+ m4s_list_b = append(m4s_list_b, []byte("#EXTINF:1\n")...)
+ m4s_list_b = append(m4s_list_b, v.Base...)
+ m4s_list_b = append(m4s_list_b, []byte("\n")...)
+ }
+ if cut := m4s-savestream.m4s_hls;cut > 0 {
+ hls_gen.m4s_list = hls_gen.m4s_list[cut:]
+ }
+ //add #EXT-X-DISCONTINUITY
+ res = append(res, []byte("#EXT-X-DISCONTINUITY\n")...)
+ //add #EXT-X-MEDIA-SEQUENCE
+ if SEQUENCE != "" {
+ res = append(res, []byte("#EXT-X-MEDIA-SEQUENCE:"+SEQUENCE+"\n")...)
+ }
+ //add m4s list
+ res = append(res, m4s_list_b...)
}
+
//去除最后一个换行
if len(res) > 0 {res = res[:len(res)-1]}
var (
last_download *m4s_link_item
miss_download miss_download_T
- download_limit = limit.New(1,200,0)//download m4s per 200ms
+ download_limit funcCtrl.BlockFunc//limit.New(1,500,0)//download m4s per 500ms
)
expires := time.Now().Add(time.Minute*2).Unix()
path_behind string
)
- for savestream.cancel.Islive() {
- links,file_add,exp,e := hls_get_link(c.Live[0],last_download)
+ for {
+ //退出,等待下载完成
+ if !savestream.cancel.Islive() {
+ l.L(`I: `,"退出,等待片段下载")
+ download_limit.Block()
+ download_limit.UnBlock()
+
+ links := []*m4s_link_item{}
+ //下载出错的
+ miss_download.RLock()
+ if len(miss_download.List) != 0 {
+ miss_download.RUnlock()
+ miss_download.Lock()
+ links = append(miss_download.List, links...)
+ miss_download.List = []*m4s_link_item{}
+ miss_download.Unlock()
+ } else {
+ miss_download.RUnlock()
+ }
- if e != nil && !reqf.IsTimeout(e) {
- l.L(`W: `,e)
+ for k,v :=range links {
+ l.L(`I: `,"正在下载最后片段:",k,"/",len(links))
+ v.status = s_loading
+ r := reqf.New()
+ if e := r.Reqf(reqf.Rval{
+ Url:v.Url,
+ Retry:0,
+ SaveToPath:savestream.path+v.Base,
+ ConnectTimeout:5000,
+ ReadTimeout:5000,
+ Proxy:c.Proxy,
+ }); e != nil{
+ l.L(`I: `,e)
+ if !reqf.IsTimeout(e) {
+ v.status = s_fail
+ }
+ } else {
+ v.status = s_fin
+ }
+ }
+ //退出
break
}
+ links,file_add,exp,e := hls_get_link(c.Live[0],last_download)
+ if e != nil {
+ l.L(`W: `,e)
+ if reqf.IsTimeout(e) || reqf.IsDnsErr(e) {
+ continue
+ } else {
+ break
+ }
+ }
+
//first block 获取并设置不变头
if last_download == nil {
var res []byte
{
- fin_offset := bytes.LastIndex(file_add, []byte("EXT-X-MEDIA-SEQUENCE:"))+21
- res = file_add[:fin_offset]
+ for row,i:=bytes.SplitAfter(file_add, []byte("\n")),0;i<len(row);i+=1 {
+ if bytes.Contains(row[i], []byte("#EXT")) {
+ if bytes.Contains(row[i], []byte("#EXT-X-MEDIA-SEQUENCE")) || bytes.Contains(row[i], []byte("#EXTINF")){continue}
+ res = append(res, row[i]...)
+ }
+ }
}
hls_msg.Push_tag(`header`, res)
}
path_behind = "?"+u.RawQuery
}
- //优先下载出错的,以尽快恢复客户流播放
//下载出错的
miss_download.RLock()
if len(miss_download.List) != 0 {
}
}
+ if len(links) > 10 {
+ l.L(`T: `,`等待下载切片:`,len(links))
+ } else if len(links) > 100 {
+ l.L(`W: `,`重试,等待下载切片:`,len(links))
+
+ F.Get(`Liveing`)
+ if !c.Liveing {break}
+
+ F.Get(`Live`)
+ if len(c.Live)==0 {break}
+
+ // no expect qn
+ if c.Live_want_qn < c.Live_qn {
+ expires = time.Now().Add(time.Minute*2).Unix()
+ }
+ continue
+ }
+
f := p.File()
f.FileWR(p.Filel{
File:savestream.path+"0.m3u8.dtmp",
Loc:-1,
Context:[]interface{}{file_add},
})
-
+
for i:=0;i<len(links);i+=1 {
go func(link *m4s_link_item,path string){
- download_limit.TO()
+ download_limit.Block()
+ defer download_limit.UnBlock()
+
+ //wait num
+ // if wnum := download_limit.WNum();wnum != 0 {
+ // l.L(`T: `, `处理:`, link.Base)
+ // }
+
+ (*link).status = s_loading
r := reqf.New()
if e := r.Reqf(reqf.Rval{
Url:(*link).Url,
- Retry:3,
- SleepTime:1000,
+ Retry:0,
SaveToPath:path+(*link).Base,
- Timeout:3*1000,
+ ConnectTimeout:2000,
+ ReadTimeout:2000,
Proxy:c.Proxy,
}); e != nil{
- l.L(`I: `,e,`将重试!`)
- //避免影响后续猜测
- (*link).Offset_line = 0
- miss_download.Lock()
- miss_download.List = append(miss_download.List, link)
- miss_download.Unlock()
+ if reqf.IsTimeout(e) {
+ l.L(`I: `,e,`将重试!`)
+ //避免影响后续猜测
+ (*link).Offset_line = 0
+ miss_download.Lock()
+ miss_download.List = append(miss_download.List, link)
+ miss_download.Unlock()
+ } else {
+ (*link).status = s_fail
+ }
} else {
- (*link).downloaded = true
+ (*link).status = s_fin
}
}(links[i],savestream.path)
s := web.New(&http.Server{
Addr: addr,
})
- s.Handle(map[string]func(http.ResponseWriter,*http.Request){
- `/`:func(w http.ResponseWriter,r *http.Request){
+ var (
+ root = func(w http.ResponseWriter,r *http.Request){
//header
+ w.Header().Set("Access-Control-Allow-Credentials", "true")
+ w.Header().Set("Access-Control-Allow-Headers", "*")
+ w.Header().Set("Access-Control-Allow-Methods", "POST, GET, OPTIONS")
w.Header().Set("Access-Control-Allow-Origin", "*")
+ w.Header().Set("Connection", "Keep-Alive")
+ w.Header().Set("Content-Transfer-Encoding", "binary")
var path string = r.URL.Path[1:]
}
if filepath.Ext(path) == `.dtmp` {
- if strings.Contains(path,"flv") {
+ if strings.Contains(path,".flv") {
// path = base_dir+path
-
- w.Header().Set("Connection", "Keep-Alive")
w.Header().Set("Content-Type", "video/x-flv")
- w.Header().Set("X-Content-Type-Options", "nosniff")
w.WriteHeader(http.StatusOK)
flusher, flushSupport := w.(http.Flusher)
})
<- cancel
- } else if strings.Contains(path,"m3u8") {
-
- w.Header().Set("Cache-Control", "max-age=1")
- w.Header().Set("Last-Modified", time.Now().Add(time.Second).Format(time.RFC1123))
+ } else if strings.Contains(path,".m3u8") {
+ // w.Header().Set("Cache-Control", "max-age=1")
w.Header().Set("Content-Type", "application/vnd.apple.mpegurl")
- w.Header().Set("Connection", "Keep-Alive")
-
+
res := savestream.hls_stream
if len(res) == 0 {
w.WriteHeader(http.StatusNotFound)
}
defer f.Close()
- b := make([]byte,1024*1024)
+ b := make([]byte,1<<20)
if n,e := f.Read(b);e != nil {
flog.L(`E: `,e)
w.WriteHeader(http.StatusServiceUnavailable)
return
- } else if n == 1024*1024 {
+ } else if n == 1<<20 {
flog.L(`W: `,`buf limit`)
w.WriteHeader(http.StatusServiceUnavailable)
return
buf = b[:n]
cache.Store(path,buf)
go func(){//移除
- time.Sleep(time.Second*time.Duration(savestream.max_m4s_hls+1))
+ time.Sleep(time.Second*time.Duration(savestream.m4s_hls+1))
cache.Delete(path)
}()
}
w.WriteHeader(http.StatusServiceUnavailable)
return
}
-
- w.Header().Set("Content-Type", "application/octet-stream")
- w.Header().Set("Connection", "Keep-Alive")
+
+ // w.Header().Set("Content-Type", "application/octet-stream")
+ // w.Header().Set("Content-Length", strconv.Itoa(len(buf)))
+ w.WriteHeader(http.StatusOK)
if _,err := w.Write(buf);err != nil {
flog.L(`E: `,err)
return
} else {
http.FileServer(http.Dir(base_dir)).ServeHTTP(w,r)
}
- },
- `/now`:func(w http.ResponseWriter,r *http.Request){
+ }
+ now = func(w http.ResponseWriter,r *http.Request){
//header
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Cache-Control", "max-age=3")
w.WriteHeader(http.StatusServiceUnavailable)
return
}
+ // r.URL =
+ // root(w, r)
w.Header().Set("Location", r.URL.ResolveReference(u).String())
w.WriteHeader(http.StatusTemporaryRedirect)
}
return
- },
+ }
+ )
+ s.Handle(map[string]func(http.ResponseWriter,*http.Request){
+ `/`:root,
+ `/now`:now,
`/exit`:func(w http.ResponseWriter,r *http.Request){
s.Server.Shutdown(context.Background())
},