]> 127.0.0.1 Git - bili_danmu/.git/commitdiff
hls使用内存缓存,优化逻辑
authorqydysky <qydysky@foxmail.com>
Fri, 23 Apr 2021 17:55:44 +0000 (01:55 +0800)
committerqydysky <qydysky@foxmail.com>
Fri, 23 Apr 2021 17:55:44 +0000 (01:55 +0800)
Reply/F.go

index 8c939d346aebdacde9d5d073bee4d2fb004318e3..28e867785d656e8ad4c823ce14a41157d3c6941f 100644 (file)
@@ -205,12 +205,32 @@ func dtos(t time.Duration) string {
 //直播流保存
 type Savestream struct {
        path string
+       hls_stream []byte//发送给客户的m3u8字节
+
+       max_m4s_hls int//m3u8最多有几个m4s
+       min_m4s_hls int
+
        wait *s.Signal
        cancel *s.Signal
        skipFunc funcCtrl.SkipFunc
 }
 
+type hls_generate struct {
+       hls_file_header []byte//发送给客户的m3u8不变头
+       m4s_list []*m4s_link_item//m4s列表
+       sync.RWMutex
+}
+
+type m4s_link_item struct {//使用指针以设置是否已下载
+       Url string// m4s链接
+       Base string//m4s文件名
+       Offset_line int//m3u8中的行下标
+       downloaded bool//该m4s是否已下载
+}
+
 var savestream = Savestream {
+       max_m4s_hls:15,
+       min_m4s_hls:5,
 }
 
 func init(){
@@ -249,15 +269,9 @@ func Savestreamf(){
 
        if savestream.cancel.Islive() {return}
 
-       type m4s_link_item struct {
-               Url string
-               Base string
-               Offset_line int
-       }
-
        var (
                no_found_link = errors.New("no_found_link")
-               hls_get_link = func(m3u8_url string,last_download m4s_link_item) (need_download []m4s_link_item,m3u8_file_addition []byte,expires int,err error) {
+               hls_get_link = func(m3u8_url string,last_download *m4s_link_item) (need_download []*m4s_link_item,m3u8_file_addition []byte,expires int,err error) {
                        url_struct,e := url.Parse(m3u8_url)
                        if e != nil {
                                err = e
@@ -292,7 +306,7 @@ func Savestreamf(){
                        trid := query.Get("trid")
                        expires,_ = strconv.Atoi(query.Get("expires"))
 
-                       var m4s_links []m4s_link_item
+                       var m4s_links []*m4s_link_item
                        lines := bytes.Split(r.Respon, []byte("\n"))
                        for i:=0;i<len(lines);i+=1 {
                                line := lines[i]
@@ -313,7 +327,7 @@ func Savestreamf(){
                                        err = e
                                        return
                                }
-                               m4s_links = append(m4s_links, m4s_link_item{
+                               m4s_links = append(m4s_links, &m4s_link_item{
                                        Url:url_struct.ResolveReference(u).String(),
                                        Base:m4s_link,
                                        Offset_line:i,
@@ -324,7 +338,7 @@ func Savestreamf(){
                                return
                        }
 
-                       if last_download.Base == "" {
+                       if last_download == nil {
                                m3u8_file_addition = r.Respon
                                need_download = m4s_links
                                return
@@ -343,7 +357,7 @@ func Savestreamf(){
                                        need_download = append(need_download, m4s_links[i:]...)
                                        break
                                }
-                               found = last_download.Base == m4s_links[i].Base
+                               found = (*last_download).Base == m4s_links[i].Base
                        }
                        if !found {
                                offset := m4s_links[1].Offset_line-1
@@ -460,15 +474,70 @@ func Savestreamf(){
                        savestream.path += "/"
                        l.L(`I: `,"保存到", savestream.path)
                        Ass_f(savestream.path+"0", time.Now())
-                       
+
+                       var (
+                               hls_gen hls_generate
+                               exit_chan = make(chan struct{})//退出监听
+                       )
+                       {//hls stream gen 用户m3u8生成
+                               go func(){
+                                       for {
+                                               select {
+                                               case <- time.After(time.Second):;
+                                               case <- exit_chan:
+                                                       savestream.hls_stream = []byte{}//退出置空
+                                                       return
+                                               }
+                                               //在设置下载标志时,需要进行操作,故加锁
+                                               hls_gen.Lock()
+                                               var res []byte
+                                               //add header
+                                               res = hls_gen.hls_file_header
+               
+                                               //add EXT-X-MEDIA-SEQUENCE
+                                               m4s_list := hls_gen.m4s_list
+                                               if (*m4s_list[0]).Base[0] == 104 {
+                                                       m4s_list = m4s_list[1:]
+                                               }
+                                               if len(m4s_list) > savestream.max_m4s_hls {//too much
+                                                       cut_offset := 0
+                                                       for i:=0;i<len(m4s_list);i+=1 {
+                                                               if !(*m4s_list[i]).downloaded {break}
+                                                               if i > savestream.min_m4s_hls {
+                                                                       cut_offset = i-savestream.min_m4s_hls
+                                                               }
+                                                       }
+                                                       hls_gen.m4s_list = hls_gen.m4s_list[cut_offset:]
+                                               }
+                                               res = append(res, []byte((*m4s_list[0]).Base[:len((*m4s_list[0]).Base)-4])...)
+                                               res = append(res, []byte("\n")...)
+               
+                                               //add m4s block
+                                               for i:=0;i<len(m4s_list);i+=1 {
+                                                       if !(*m4s_list[i]).downloaded {break}
+                                                       res = append(res, []byte("#EXTINF:1\n")...)
+                                                       res = append(res, (*m4s_list[i]).Base...)
+                                                       res = append(res, []byte("\n")...)
+                                               }
+                                               hls_gen.Unlock()
+               
+                                               //去除最后一个换行
+                                               if len(res) > 0 {res = res[:len(res)-1]}
+
+                                               //设置到全局变量,方便流服务器获取
+                                               savestream.hls_stream = res
+                                       }
+                               }()
+                       }
+
                        type miss_download_T struct{
-                               List []m4s_link_item
+                               List []*m4s_link_item
                                sync.RWMutex
                        }
                        var (
-                               last_download m4s_link_item
+                               last_download *m4s_link_item
                                miss_download miss_download_T
-                               download_limit = limit.New(5,1000,0)
+                               download_limit = limit.New(1,200,0)//download m4s per 200ms
                        )
                        expires := time.Now().Add(time.Minute*2).Unix()
 
@@ -485,6 +554,18 @@ func Savestreamf(){
                                        break
                                }
 
+                               //first block 获取并设置不变头
+                               if last_download == nil {
+                                       var res []byte
+                                       {
+                                               fin_offset := bytes.LastIndex(file_add, []byte("EXT-X-MEDIA-SEQUENCE:"))+21
+                                               res = file_add[:fin_offset]
+                                       }
+                                       hls_gen.Lock()
+                                       hls_gen.hls_file_header = res
+                                       hls_gen.Unlock()
+                               }
+
                                if len(links) == 0 {
                                        time.Sleep(time.Second)
                                        continue
@@ -496,10 +577,10 @@ func Savestreamf(){
                                }
 
                                //use guess
-                               if last_download.Base != "" {
-                                       previou,_ := strconv.Atoi(last_download.Base[:len(last_download.Base)-4])
+                               if last_download != nil {
+                                       previou,_ := strconv.Atoi((*last_download).Base[:len((*last_download).Base)-4])
                                        now,_ := strconv.Atoi(links[0].Base[:len(links[0].Base)-4])
-                                       if previou != now-1 {
+                                       if previou < now-1 {
                                                if diff := now - previou;diff > 100 {
                                                        l.L(`W: `,`diff too large `,diff)
                                                        break
@@ -523,28 +604,28 @@ func Savestreamf(){
                                                                path_behind = "?"+u.RawQuery
                                                        }
                                                        
+                                                       //优先下载出错的,以尽快恢复客户流播放
                                                        //下载出错的
                                                        miss_download.RLock()
                                                        if len(miss_download.List) != 0 {
                                                                miss_download.RUnlock()
                                                                miss_download.Lock()
-                                                               links = append(miss_download.List,links...)
-                                                               miss_download.List = []m4s_link_item{}
+                                                               links = append(miss_download.List, links...)
+                                                               miss_download.List = []*m4s_link_item{}
                                                                miss_download.Unlock()
                                                        } else {
                                                                miss_download.RUnlock()
                                                        }
-                                                       
 
                                                        //出错期间没能获取到的
                                                        for i:=now-1;i>previou;i-=1 {
                                                                base := strconv.Itoa(i)+".m4s"
-                                                               links = append([]m4s_link_item{
-                                                                       m4s_link_item{
+                                                               links = append([]*m4s_link_item{
+                                                                       &m4s_link_item{
                                                                                Url:path_front+base+path_behind,
                                                                                Base:base,
                                                                        },
-                                                               },links...)
+                                                               }, links...)
                                                        }
                                                }
                                        }
@@ -559,23 +640,37 @@ func Savestreamf(){
                                })
 
                                for i:=0;i<len(links);i+=1 {
-                                       go func(link m4s_link_item,path string){
+                                       go func(link *m4s_link_item,path string){
                                                download_limit.TO()
                                                r := reqf.Req()
                                                if e := r.Reqf(reqf.Rval{
-                                                       Url:link.Url,
+                                                       Url:(*link).Url,
                                                        Retry:3,
                                                        SleepTime:1,
-                                                       SaveToPath:path+link.Base,
+                                                       SaveToPath:path+(*link).Base,
                                                        Timeout:3,
                                                }); e != nil{
                                                        l.L(`I: `,e,`将重试!`)
+                                                       //避免影响后续猜测
+                                                       (*link).Offset_line = 0
                                                        miss_download.Lock()
                                                        miss_download.List = append(miss_download.List, link)
                                                        miss_download.Unlock()
+                                               } else {
+                                                       (*link).downloaded = true
                                                }
                                        }(links[i],savestream.path)
-                                       last_download = links[i]
+
+                                       //只记录最新
+                                       if links[i].Offset_line > 0 {
+                                               last_download = links[i]
+                                       }
+
+                                       {//store m4s to hls_gen
+                                               hls_gen.Lock()
+                                               hls_gen.m4s_list = append(hls_gen.m4s_list, links[i])
+                                               hls_gen.Unlock()
+                                       }
                                }
 
                                //m3u8_url 将过期
@@ -604,6 +699,7 @@ func Savestreamf(){
                        })
                        p.FileMove(savestream.path+"0.m3u8.dtmp", savestream.path+"0.m3u8")
                        l.L(`I: `,"结束")
+                       close(exit_chan)//hls_stream
                        Ass_f("", time.Now())//ass
                }
                //set ro ``
@@ -1238,6 +1334,9 @@ func init() {
                } else {
                        addr += strconv.Itoa(port)
                }
+
+               var cache sync.Map//使用内存cache避免频繁io
+
                s := web.New(&http.Server{
                        Addr: addr,
                })
@@ -1288,130 +1387,69 @@ func init() {
                                                        if flushSupport {flusher.Flush()}
                                                }
                                        } else if strings.Contains(path,"m3u8")  {
-                                               m3u8_file := base_dir+path
-
-                                               f,err := os.OpenFile(m3u8_file,os.O_RDONLY,0644)
-                                               if err != nil {
-                                                       flog.L(`E: `,err);
-                                                       return
-                                               }
-                                               defer f.Close()
                                        
-                                               w.Header().Set("Cache-Control", "max-age=3")
+                                               gmt, _ := time.LoadLocation("GMT")
+                                               w.Header().Set("Cache-Control", "max-age=1")
+                                               w.Header().Set("Last-Modified", time.Now().Add(time.Second).In(gmt).Format(time.RFC1123))
                                                w.Header().Set("Content-Type", "application/vnd.apple.mpegurl")
                                                w.Header().Set("Connection", "Keep-Alive")
-
-                                               var res []byte
-                                               {
-                                                       buf := make([]byte, 200)
-                                                       if _,err := f.Read(buf);err != nil {
-                                                               flog.L(`E: `,err);
-                                                               w.WriteHeader(http.StatusServiceUnavailable)
-                                                               return
-                                                       }
-                                                       fin_offset := bytes.LastIndex(buf, []byte("EXT-X-MEDIA-SEQUENCE:"))+21
-                                                       res = buf[:fin_offset]
+                                               
+                                               res := savestream.hls_stream
+                                               if len(res) == 0 {
+                                                       w.WriteHeader(http.StatusNotFound)
+                                                       return
                                                }
 
-                                               var seed_m4s = func(f *os.File) (A,B []byte,e error) {
-                                                       f.Seek(-1000,2)
-
-                                                       buf := make([]byte, 1000)
-                                                       if _,err := f.Read(buf);err != nil {
-                                                               e = err
-                                                               return
-                                                       }
-
-                                                       //fast seed a suit m4s
-                                                       var sign_offset int
-                                                       {
-                                                               if offset := bytes.Index(buf, []byte("#EXTINF"));offset != -1 {
-                                                                       sign_offset = offset
-                                                               } else {
-                                                                       e = errors.New(`no found first #EXTINF`)
-                                                                       return
-                                                               }
-                                                               for sign_offset < len(buf) {
-                                                                       if offset := bytes.Index(buf[sign_offset:], []byte(".m4s"));offset != -1 {
-                                                                               sign_offset = offset + sign_offset
-                                                                       } else if sign_offset==0 {
-                                                                               e = errors.New(`sign_offset`)
-                                                                               return
-                                                                       } else {
-                                                                               break
-                                                                       }
-                                                                       if sign := buf[sign_offset-1] - 48;sign%3 == 0 {break}
-                                                                       sign_offset += 4
-                                                               }
-                                                       }
-
-                                                       var (
-                                                               start_offset int
-                                                               end_offset int
-                                                               m4s_start_offset int
-                                                               m4s_end_offset int
-                                                       )
-
-                                                       if offset := bytes.Index(buf[sign_offset:], []byte("#EXTINF"));offset != -1 {
-                                                               start_offset = offset + sign_offset
-                                                       } else {
-                                                               e = errors.New(`start_offset`)
-                                                               return
-                                                       }
-
-                                                       if offset := bytes.Index(buf[start_offset:], []byte{0x0a});offset != -1 {
-                                                               m4s_start_offset = offset+start_offset+1
-                                                       } else {
-                                                               e = errors.New(`m4s_start_offset`)
-                                                               return
-                                                       }
-
-                                                       if offset := bytes.Index(buf[m4s_start_offset:], []byte{0x0a});offset != -1 {
-                                                               m4s_end_offset = offset+m4s_start_offset+1
-                                                       } else {
-                                                               e = errors.New(`m4s_end_offset`)
-                                                               return
-                                                       }
-
-                                                       end_offset += start_offset+7
-
-                                                       for i:=0;end_offset < len(buf);i+=1{
-                                                               if offset := bytes.Index(buf[end_offset:], []byte("#EXTINF"));offset != -1 {
-                                                                       end_offset = offset+end_offset
-                                                               } else if end_offset==0 {
-                                                                       e = errors.New(`end_offset`)
-                                                                       return
-                                                               } else {
-                                                                       e = errors.New("not enough data "+strconv.Itoa(i))
-                                                                       return
-                                                               }
-                                                               if i>12 {break}
-                                                               end_offset += 7
-                                                       }
-
-                                                       A = buf[m4s_start_offset:m4s_end_offset]
-                                                       B = buf[start_offset:end_offset]
+                                               if _,err := w.Write(res);err != nil {
+                                                       flog.L(`E: `,err)
                                                        return
                                                }
-                                               
-                                               {
-                                                       A,B,e := seed_m4s(f)
-
-                                                       if e != nil {
-                                                               flog.L(`W: `,e);
-                                                               w.WriteHeader(http.StatusServiceUnavailable)
-                                                               return
-                                                       }
-
-                                                       res = append(res, A...)
+                                       }
+                               } else if filepath.Ext(path) == `.m4s` {
+                                       path = base_dir+path
 
-                                                       res = append(res, B...)
-                                               }
+                                       var buf []byte
 
-                                               if _,err = w.Write(res);err != nil {
+                                       if b,ok := cache.Load(path);!ok{
+                                               f,err := os.OpenFile(path,os.O_RDONLY,0644)
+                                               if err != nil {
                                                        flog.L(`E: `,err);
                                                        return
                                                }
+                                               defer f.Close()
+                                       
+                                               b := make([]byte,1024*1024)
+                                               if n,e := f.Read(b);e != nil {
+                                                       flog.L(`E: `,e)
+                                                       w.WriteHeader(http.StatusServiceUnavailable)
+                                                       return
+                                               } else if n == 1024*1024 {
+                                                       flog.L(`W: `,`buf limit`)
+                                                       w.WriteHeader(http.StatusServiceUnavailable)
+                                                       return
+                                               } else {
+                                                       buf = b[:n]
+                                                       cache.Store(path,buf)
+                                                       go func(){//移除
+                                                               time.Sleep(time.Second*time.Duration(savestream.max_m4s_hls+1))
+                                                               cache.Delete(path)
+                                                       }()
+                                               }
+                                       } else {
+                                               buf,_ = b.([]byte)
+                                       }
+
+                                       if len(buf) == 0 {
+                                               flog.L(`W: `,`buf size 0`)
+                                               w.WriteHeader(http.StatusServiceUnavailable)
+                                               return
+                                       }
+
+                                       w.Header().Set("Content-Type", "application/octet-stream")
+                                       w.Header().Set("Connection", "Keep-Alive")
+                                       if _,err := w.Write(buf);err != nil {
+                                               flog.L(`E: `,err)
+                                               return
                                        }
                                } else {
                                        http.FileServer(http.Dir(base_dir)).ServeHTTP(w,r)