]> 127.0.0.1 Git - bili_danmu/.git/commitdiff
hls流转播优化
authorqydysky <qydysky@foxmail.com>
Sun, 30 May 2021 09:25:46 +0000 (17:25 +0800)
committerqydysky <qydysky@foxmail.com>
Sun, 30 May 2021 09:25:46 +0000 (17:25 +0800)
hls退出下载剩余切片

Reply/F.go
demo/config/config_K_v.json

index ccdb9f07a8c244df04d357f868ffbd7f89499e69..1a20663f7f03566eefaead286995d543df752bf3 100644 (file)
@@ -212,8 +212,7 @@ type Savestream struct {
        flv_front []byte//flv头及首tag
        flv_stream *msgq.Msgq//发送给客户的flv流关键帧间隔片
 
-       max_m4s_hls int//m3u8最多有几个m4s
-       min_m4s_hls int
+       m4s_hls int
 
        wait *s.Signal
        cancel *s.Signal
@@ -229,13 +228,19 @@ type m4s_link_item struct {//使用指针以设置是否已下载
        Url string// m4s链接
        Base string//m4s文件名
        Offset_line int//m3u8中的行下标
-       downloaded bool//该m4s是否已下载
+       status int//该m4s下载状态 s_noload:未下载 s_loading正在下载 s_fin下载完成 s_fail下载失败
 }
+//m4s状态
+const (
+       s_noload = iota
+       s_loading
+       s_fin
+       s_fail
+)
 
 var savestream = Savestream {
        flv_stream:msgq.New(10),//队列最多保留10个关键帧间隔片
-       max_m4s_hls:15,
-       min_m4s_hls:5,
+       m4s_hls:5,
 }
 
 func init(){
@@ -287,11 +292,11 @@ func Savestreamf(){
 
                        r := reqf.New()
                        if e := r.Reqf(reqf.Rval{
-                               Url:m3u8_url,
-                               Retry:4,
-                               SleepTime:1000,
+                               Url:m3u8_url+"&"+time.Now().Format("20060102150405"),
+                               Retry:0,
+                               ConnectTimeout:3000,
+                               ReadTimeout:1000,
                                Proxy:c.Proxy,
-                               Timeout:3*1000,
                                Header:map[string]string{
                                        `Host`: url_struct.Host,
                                        `User-Agent`: `Mozilla/5.0 (X11; Linux x86_64; rv:83.0) Gecko/20100101 Firefox/83.0`,
@@ -842,30 +847,55 @@ func Savestreamf(){
                                                (*links[0]).Base[0] == 104 {links = links[1:]}
 
                                        hls_gen.m4s_list = append(hls_gen.m4s_list, links...)
-                                       if len(hls_gen.m4s_list) > savestream.max_m4s_hls {//too much
-                                               cut_offset := 0
-                                               for i:=0;i<len(hls_gen.m4s_list);i+=1 {
-                                                       if !(*hls_gen.m4s_list[i]).downloaded {break}
-                                                       if i > savestream.min_m4s_hls {
-                                                               cut_offset = i-savestream.min_m4s_hls
-                                                       }
-                                               }
-                                               hls_gen.m4s_list = hls_gen.m4s_list[cut_offset:]
-                                       }
+                                       
+                                       // if len(hls_gen.m4s_list) > savestream.max_m4s_hls {//too much
+                                       //      cut_offset := 0
+                                       //      for i:=0;i<len(hls_gen.m4s_list);i+=1 {
+                                       //              if (*hls_gen.m4s_list[i]).status <= s_loading && len(hls_gen.m4s_list) - i < 5 {
+                                       //                      break
+                                       //              }
+                                       //              if i > savestream.min_m4s_hls {
+                                       //                      cut_offset = i-savestream.min_m4s_hls
+                                       //              }
+                                       //      }
+                                       //      hls_gen.m4s_list = hls_gen.m4s_list[cut_offset:]
+                                       // }
 
                                        var res []byte
                                        //add header
                                        res = hls_gen.hls_file_header
-                                       res = append(res, []byte((*hls_gen.m4s_list[0]).Base[:len((*hls_gen.m4s_list[0]).Base)-4])...)
-                                       res = append(res, []byte("\n")...)
-       
+       
                                        //add m4s block
-                                       for i:=0;i<len(hls_gen.m4s_list);i+=1 {
-                                               if !(*hls_gen.m4s_list[i]).downloaded {break}
-                                               res = append(res, []byte("#EXTINF:1\n")...)
-                                               res = append(res, (*hls_gen.m4s_list[i]).Base...)
-                                               res = append(res, []byte("\n")...)
+                                       {
+                                               m4s := 0
+                                               m4s_list_b := []byte{}
+                                               var SEQUENCE string
+                                               for _,v := range hls_gen.m4s_list {
+                                                       if v.status != s_fin {
+                                                               // if m4s < savestream.m4s_hls && len(hls_gen.m4s_list) > 3+savestream.m4s_hls {
+                                                                       continue
+                                                               // }
+                                                               // break
+                                                       }
+                                                       m4s += 1
+                                                       if m4s == 1 {SEQUENCE = strings.ReplaceAll(v.Base, ".m4s", "")}
+                                                       m4s_list_b = append(m4s_list_b, []byte("#EXTINF:1\n")...)
+                                                       m4s_list_b = append(m4s_list_b, v.Base...)
+                                                       m4s_list_b = append(m4s_list_b, []byte("\n")...)
+                                               }
+                                               if cut := m4s-savestream.m4s_hls;cut > 0 {
+                                                       hls_gen.m4s_list = hls_gen.m4s_list[cut:]
+                                               }
+                                               //add #EXT-X-DISCONTINUITY
+                                               res = append(res, []byte("#EXT-X-DISCONTINUITY\n")...)
+                                               //add #EXT-X-MEDIA-SEQUENCE
+                                               if SEQUENCE != "" {
+                                                       res = append(res, []byte("#EXT-X-MEDIA-SEQUENCE:"+SEQUENCE+"\n")...)
+                                               }
+                                               //add m4s list
+                                               res = append(res, m4s_list_b...)
                                        }
+                                       
        
                                        //去除最后一个换行
                                        if len(res) > 0 {res = res[:len(res)-1]}
@@ -887,7 +917,7 @@ func Savestreamf(){
                        var (
                                last_download *m4s_link_item
                                miss_download miss_download_T
-                               download_limit = limit.New(1,200,0)//download m4s per 200ms
+                               download_limit funcCtrl.BlockFunc//limit.New(1,500,0)//download m4s per 500ms
                        )
                        expires := time.Now().Add(time.Minute*2).Unix()
 
@@ -896,20 +926,70 @@ func Savestreamf(){
                                path_behind string
                        )
                        
-                       for savestream.cancel.Islive() {
-                               links,file_add,exp,e := hls_get_link(c.Live[0],last_download)
+                       for {
+                               //退出,等待下载完成
+                               if !savestream.cancel.Islive() {
+                                       l.L(`I: `,"退出,等待片段下载")
+                                       download_limit.Block()
+                                       download_limit.UnBlock()
+
+                                       links := []*m4s_link_item{}
+                                       //下载出错的
+                                       miss_download.RLock()
+                                       if len(miss_download.List) != 0 {
+                                               miss_download.RUnlock()
+                                               miss_download.Lock()
+                                               links = append(miss_download.List, links...)
+                                               miss_download.List = []*m4s_link_item{}
+                                               miss_download.Unlock()
+                                       } else {
+                                               miss_download.RUnlock()
+                                       }
 
-                               if e != nil && !reqf.IsTimeout(e) {
-                                       l.L(`W: `,e)
+                                       for k,v :=range links {
+                                               l.L(`I: `,"正在下载最后片段:",k,"/",len(links))
+                                               v.status = s_loading
+                                               r := reqf.New()
+                                               if e := r.Reqf(reqf.Rval{
+                                                       Url:v.Url,
+                                                       Retry:0,
+                                                       SaveToPath:savestream.path+v.Base,
+                                                       ConnectTimeout:5000,
+                                                       ReadTimeout:5000,
+                                                       Proxy:c.Proxy,
+                                               }); e != nil{
+                                                       l.L(`I: `,e)
+                                                       if !reqf.IsTimeout(e) {
+                                                               v.status = s_fail
+                                                       }
+                                               } else {
+                                                       v.status = s_fin
+                                               }
+                                       }
+                                       //退出
                                        break
                                }
 
+                               links,file_add,exp,e := hls_get_link(c.Live[0],last_download)
+                               if e != nil {
+                                       l.L(`W: `,e)
+                                       if reqf.IsTimeout(e) || reqf.IsDnsErr(e) {
+                                               continue
+                                       } else {
+                                               break
+                                       }
+                               }
+
                                //first block 获取并设置不变头
                                if last_download == nil {
                                        var res []byte
                                        {
-                                               fin_offset := bytes.LastIndex(file_add, []byte("EXT-X-MEDIA-SEQUENCE:"))+21
-                                               res = file_add[:fin_offset]
+                                               for row,i:=bytes.SplitAfter(file_add, []byte("\n")),0;i<len(row);i+=1 {
+                                                       if bytes.Contains(row[i], []byte("#EXT")) {
+                                                               if bytes.Contains(row[i], []byte("#EXT-X-MEDIA-SEQUENCE")) || bytes.Contains(row[i], []byte("#EXTINF")){continue}
+                                                               res = append(res, row[i]...)
+                                                       }
+                                               }
                                        }
                                        hls_msg.Push_tag(`header`, res)
                                }
@@ -952,7 +1032,6 @@ func Savestreamf(){
                                                                path_behind = "?"+u.RawQuery
                                                        }
                                                        
-                                                       //优先下载出错的,以尽快恢复客户流播放
                                                        //下载出错的
                                                        miss_download.RLock()
                                                        if len(miss_download.List) != 0 {
@@ -979,6 +1058,24 @@ func Savestreamf(){
                                        }
                                }
 
+                               if len(links) > 10 {
+                                       l.L(`T: `,`等待下载切片:`,len(links))
+                               } else if len(links) > 100 {
+                                       l.L(`W: `,`重试,等待下载切片:`,len(links))
+
+                                       F.Get(`Liveing`)
+                                       if !c.Liveing {break}
+       
+                                       F.Get(`Live`)
+                                       if len(c.Live)==0 {break}
+
+                                       // no expect qn
+                                       if c.Live_want_qn < c.Live_qn {
+                                               expires = time.Now().Add(time.Minute*2).Unix()
+                                       }
+                                       continue
+                               }
+
                                f := p.File()
                                f.FileWR(p.Filel{
                                        File:savestream.path+"0.m3u8.dtmp",
@@ -986,27 +1083,39 @@ func Savestreamf(){
                                        Loc:-1,
                                        Context:[]interface{}{file_add},
                                })
-
                                for i:=0;i<len(links);i+=1 {
                                        go func(link *m4s_link_item,path string){
-                                               download_limit.TO()
+                                               download_limit.Block()
+                                               defer download_limit.UnBlock()
+
+                                               //wait num
+                                               // if wnum := download_limit.WNum();wnum != 0 {
+                                                       // l.L(`T: `, `处理:`, link.Base)
+                                               // }
+
+                                               (*link).status = s_loading
                                                r := reqf.New()
                                                if e := r.Reqf(reqf.Rval{
                                                        Url:(*link).Url,
-                                                       Retry:3,
-                                                       SleepTime:1000,
+                                                       Retry:0,
                                                        SaveToPath:path+(*link).Base,
-                                                       Timeout:3*1000,
+                                                       ConnectTimeout:2000,
+                                                       ReadTimeout:2000,
                                                        Proxy:c.Proxy,
                                                }); e != nil{
-                                                       l.L(`I: `,e,`将重试!`)
-                                                       //避免影响后续猜测
-                                                       (*link).Offset_line = 0
-                                                       miss_download.Lock()
-                                                       miss_download.List = append(miss_download.List, link)
-                                                       miss_download.Unlock()
+                                                       if reqf.IsTimeout(e) {
+                                                               l.L(`I: `,e,`将重试!`)
+                                                               //避免影响后续猜测
+                                                               (*link).Offset_line = 0
+                                                               miss_download.Lock()
+                                                               miss_download.List = append(miss_download.List, link)
+                                                               miss_download.Unlock()
+                                                       } else {
+                                                               (*link).status = s_fail
+                                                       }
                                                } else {
-                                                       (*link).downloaded = true
+                                                       (*link).status = s_fin
                                                }
                                        }(links[i],savestream.path)
 
@@ -1699,10 +1808,15 @@ func init() {
                s := web.New(&http.Server{
                        Addr: addr,
                })
-               s.Handle(map[string]func(http.ResponseWriter,*http.Request){
-                       `/`:func(w http.ResponseWriter,r *http.Request){
+               var (
+                       root = func(w http.ResponseWriter,r *http.Request){
                                //header
+                               w.Header().Set("Access-Control-Allow-Credentials", "true")
+                               w.Header().Set("Access-Control-Allow-Headers", "*")
+                               w.Header().Set("Access-Control-Allow-Methods", "POST, GET, OPTIONS")
                                w.Header().Set("Access-Control-Allow-Origin", "*")
+                               w.Header().Set("Connection", "Keep-Alive")
+                               w.Header().Set("Content-Transfer-Encoding", "binary")
 
                                var path string = r.URL.Path[1:]
 
@@ -1712,12 +1826,9 @@ func init() {
                                }
 
                                if filepath.Ext(path) == `.dtmp` {
-                                       if strings.Contains(path,"flv") {
+                                       if strings.Contains(path,".flv") {
                                                // path = base_dir+path
-
-                                               w.Header().Set("Connection", "Keep-Alive")
                                                w.Header().Set("Content-Type", "video/x-flv")
-                                               w.Header().Set("X-Content-Type-Options", "nosniff")
                                                w.WriteHeader(http.StatusOK)
 
                                                flusher, flushSupport := w.(http.Flusher)
@@ -1752,13 +1863,10 @@ func init() {
                                                })
 
                                                <- cancel
-                                       } else if strings.Contains(path,"m3u8")  {
-                                       
-                                               w.Header().Set("Cache-Control", "max-age=1")
-                                               w.Header().Set("Last-Modified", time.Now().Add(time.Second).Format(time.RFC1123))
+                                       } else if strings.Contains(path,".m3u8")  {
+                                               // w.Header().Set("Cache-Control", "max-age=1")
                                                w.Header().Set("Content-Type", "application/vnd.apple.mpegurl")
-                                               w.Header().Set("Connection", "Keep-Alive")
-                                               
+
                                                res := savestream.hls_stream
                                                if len(res) == 0 {
                                                        w.WriteHeader(http.StatusNotFound)
@@ -1783,12 +1891,12 @@ func init() {
                                                }
                                                defer f.Close()
                                        
-                                               b := make([]byte,1024*1024)
+                                               b := make([]byte,1<<20)
                                                if n,e := f.Read(b);e != nil {
                                                        flog.L(`E: `,e)
                                                        w.WriteHeader(http.StatusServiceUnavailable)
                                                        return
-                                               } else if n == 1024*1024 {
+                                               } else if n == 1<<20 {
                                                        flog.L(`W: `,`buf limit`)
                                                        w.WriteHeader(http.StatusServiceUnavailable)
                                                        return
@@ -1796,7 +1904,7 @@ func init() {
                                                        buf = b[:n]
                                                        cache.Store(path,buf)
                                                        go func(){//移除
-                                                               time.Sleep(time.Second*time.Duration(savestream.max_m4s_hls+1))
+                                                               time.Sleep(time.Second*time.Duration(savestream.m4s_hls+1))
                                                                cache.Delete(path)
                                                        }()
                                                }
@@ -1809,9 +1917,10 @@ func init() {
                                                w.WriteHeader(http.StatusServiceUnavailable)
                                                return
                                        }
-
-                                       w.Header().Set("Content-Type", "application/octet-stream")
-                                       w.Header().Set("Connection", "Keep-Alive")
+                                       
+                                       // w.Header().Set("Content-Type", "application/octet-stream")
+                                       // w.Header().Set("Content-Length", strconv.Itoa(len(buf)))
+                                       w.WriteHeader(http.StatusOK)
                                        if _,err := w.Write(buf);err != nil {
                                                flog.L(`E: `,err)
                                                return
@@ -1819,8 +1928,8 @@ func init() {
                                } else {
                                        http.FileServer(http.Dir(base_dir)).ServeHTTP(w,r)
                                }
-                       },
-                       `/now`:func(w http.ResponseWriter,r *http.Request){
+                       }
+                       now = func(w http.ResponseWriter,r *http.Request){
                                //header
                                w.Header().Set("Access-Control-Allow-Origin", "*")
                                w.Header().Set("Cache-Control", "max-age=3")
@@ -1849,11 +1958,17 @@ func init() {
                                                w.WriteHeader(http.StatusServiceUnavailable)
                                                return
                                        }
+                                       // r.URL = 
+                                       // root(w, r)
                                        w.Header().Set("Location", r.URL.ResolveReference(u).String())
                                        w.WriteHeader(http.StatusTemporaryRedirect)
                                }
                                return
-                       },
+                       }
+               )
+               s.Handle(map[string]func(http.ResponseWriter,*http.Request){
+                       `/`:root,
+                       `/now`:now,
                        `/exit`:func(w http.ResponseWriter,r *http.Request){
                                s.Server.Shutdown(context.Background())
                        },
index 9f324c7e76c5a664adec2fb84cc29da33fc0abb7..a9419e625a1b46dcb326b74712e6f1d4211d200d 100644 (file)
@@ -6,7 +6,7 @@
     "弹幕私信": "",
     "弹幕私信(额外)": "[弹幕机测试 额外]:弹幕",
     "TTS_配置-help": "将会运行[TTS_使用程序路径 获取的音频路径 TTS_使用程序参数]",
-    "TTS_总开关": false,
+    "TTS_总开关": true,
     "TTS_使用程序路径": "ffplay",
     "TTS_使用程序参数": "-autoexit -nodisp -volume 60",
     "弹幕-help": "弹幕相关",
@@ -36,6 +36,7 @@
     "发送还有几天过期的礼物": 3,
     "保持牌子亮着": true,
     "日志显示": [
+        "T: ",
         "I: ",
         "W: ",
         "E: "
@@ -48,7 +49,7 @@
     "直播流类型-help": "flv or hls",
     "直播流类型": "hls",
     "直播流保存位置": "./live",
-    "直播保存位置Web服务":0,
+    "直播保存位置Web服务":23333,
     "ass-help": "只有保存直播流时才考虑生成ass",
     "生成Ass弹幕": true,
     "弹幕处理": "",