From: qydysky Date: Fri, 23 Apr 2021 17:55:44 +0000 (+0800) Subject: hls使用内存缓存,优化逻辑 X-Git-Tag: v0.5.9~1^2~18 X-Git-Url: http://127.0.0.1:8081/?a=commitdiff_plain;h=326426524ef550cbf9d6888176cf23541cd1da4b;p=bili_danmu%2F.git hls使用内存缓存,优化逻辑 --- diff --git a/Reply/F.go b/Reply/F.go index 8c939d3..28e8677 100644 --- a/Reply/F.go +++ b/Reply/F.go @@ -205,12 +205,32 @@ func dtos(t time.Duration) string { //直播流保存 type Savestream struct { path string + hls_stream []byte//发送给客户的m3u8字节 + + max_m4s_hls int//m3u8最多有几个m4s + min_m4s_hls int + wait *s.Signal cancel *s.Signal skipFunc funcCtrl.SkipFunc } +type hls_generate struct { + hls_file_header []byte//发送给客户的m3u8不变头 + m4s_list []*m4s_link_item//m4s列表 + sync.RWMutex +} + +type m4s_link_item struct {//使用指针以设置是否已下载 + Url string// m4s链接 + Base string//m4s文件名 + Offset_line int//m3u8中的行下标 + downloaded bool//该m4s是否已下载 +} + var savestream = Savestream { + max_m4s_hls:15, + min_m4s_hls:5, } func init(){ @@ -249,15 +269,9 @@ func Savestreamf(){ if savestream.cancel.Islive() {return} - type m4s_link_item struct { - Url string - Base string - Offset_line int - } - var ( no_found_link = errors.New("no_found_link") - hls_get_link = func(m3u8_url string,last_download m4s_link_item) (need_download []m4s_link_item,m3u8_file_addition []byte,expires int,err error) { + hls_get_link = func(m3u8_url string,last_download *m4s_link_item) (need_download []*m4s_link_item,m3u8_file_addition []byte,expires int,err error) { url_struct,e := url.Parse(m3u8_url) if e != nil { err = e @@ -292,7 +306,7 @@ func Savestreamf(){ trid := query.Get("trid") expires,_ = strconv.Atoi(query.Get("expires")) - var m4s_links []m4s_link_item + var m4s_links []*m4s_link_item lines := bytes.Split(r.Respon, []byte("\n")) for i:=0;i savestream.max_m4s_hls {//too much + cut_offset := 0 + for i:=0;i savestream.min_m4s_hls { + cut_offset = i-savestream.min_m4s_hls + } + } + hls_gen.m4s_list = hls_gen.m4s_list[cut_offset:] + } + res = append(res, []byte((*m4s_list[0]).Base[:len((*m4s_list[0]).Base)-4])...) + res = append(res, []byte("\n")...) + + //add m4s block + for i:=0;i 0 {res = res[:len(res)-1]} + + //设置到全局变量,方便流服务器获取 + savestream.hls_stream = res + } + }() + } + type miss_download_T struct{ - List []m4s_link_item + List []*m4s_link_item sync.RWMutex } var ( - last_download m4s_link_item + last_download *m4s_link_item miss_download miss_download_T - download_limit = limit.New(5,1000,0) + download_limit = limit.New(1,200,0)//download m4s per 200ms ) expires := time.Now().Add(time.Minute*2).Unix() @@ -485,6 +554,18 @@ func Savestreamf(){ break } + //first block 获取并设置不变头 + if last_download == nil { + var res []byte + { + fin_offset := bytes.LastIndex(file_add, []byte("EXT-X-MEDIA-SEQUENCE:"))+21 + res = file_add[:fin_offset] + } + hls_gen.Lock() + hls_gen.hls_file_header = res + hls_gen.Unlock() + } + if len(links) == 0 { time.Sleep(time.Second) continue @@ -496,10 +577,10 @@ func Savestreamf(){ } //use guess - if last_download.Base != "" { - previou,_ := strconv.Atoi(last_download.Base[:len(last_download.Base)-4]) + if last_download != nil { + previou,_ := strconv.Atoi((*last_download).Base[:len((*last_download).Base)-4]) now,_ := strconv.Atoi(links[0].Base[:len(links[0].Base)-4]) - if previou != now-1 { + if previou < now-1 { if diff := now - previou;diff > 100 { l.L(`W: `,`diff too large `,diff) break @@ -523,28 +604,28 @@ func Savestreamf(){ path_behind = "?"+u.RawQuery } + //优先下载出错的,以尽快恢复客户流播放 //下载出错的 miss_download.RLock() if len(miss_download.List) != 0 { miss_download.RUnlock() miss_download.Lock() - links = append(miss_download.List,links...) - miss_download.List = []m4s_link_item{} + links = append(miss_download.List, links...) + miss_download.List = []*m4s_link_item{} miss_download.Unlock() } else { miss_download.RUnlock() } - //出错期间没能获取到的 for i:=now-1;i>previou;i-=1 { base := strconv.Itoa(i)+".m4s" - links = append([]m4s_link_item{ - m4s_link_item{ + links = append([]*m4s_link_item{ + &m4s_link_item{ Url:path_front+base+path_behind, Base:base, }, - },links...) + }, links...) } } } @@ -559,23 +640,37 @@ func Savestreamf(){ }) for i:=0;i 0 { + last_download = links[i] + } + + {//store m4s to hls_gen + hls_gen.Lock() + hls_gen.m4s_list = append(hls_gen.m4s_list, links[i]) + hls_gen.Unlock() + } } //m3u8_url 将过期 @@ -604,6 +699,7 @@ func Savestreamf(){ }) p.FileMove(savestream.path+"0.m3u8.dtmp", savestream.path+"0.m3u8") l.L(`I: `,"结束") + close(exit_chan)//hls_stream Ass_f("", time.Now())//ass } //set ro `` @@ -1238,6 +1334,9 @@ func init() { } else { addr += strconv.Itoa(port) } + + var cache sync.Map//使用内存cache避免频繁io + s := web.New(&http.Server{ Addr: addr, }) @@ -1288,130 +1387,69 @@ func init() { if flushSupport {flusher.Flush()} } } else if strings.Contains(path,"m3u8") { - m3u8_file := base_dir+path - - f,err := os.OpenFile(m3u8_file,os.O_RDONLY,0644) - if err != nil { - flog.L(`E: `,err); - return - } - defer f.Close() - w.Header().Set("Cache-Control", "max-age=3") + gmt, _ := time.LoadLocation("GMT") + w.Header().Set("Cache-Control", "max-age=1") + w.Header().Set("Last-Modified", time.Now().Add(time.Second).In(gmt).Format(time.RFC1123)) w.Header().Set("Content-Type", "application/vnd.apple.mpegurl") w.Header().Set("Connection", "Keep-Alive") - - var res []byte - { - buf := make([]byte, 200) - if _,err := f.Read(buf);err != nil { - flog.L(`E: `,err); - w.WriteHeader(http.StatusServiceUnavailable) - return - } - fin_offset := bytes.LastIndex(buf, []byte("EXT-X-MEDIA-SEQUENCE:"))+21 - res = buf[:fin_offset] + + res := savestream.hls_stream + if len(res) == 0 { + w.WriteHeader(http.StatusNotFound) + return } - var seed_m4s = func(f *os.File) (A,B []byte,e error) { - f.Seek(-1000,2) - - buf := make([]byte, 1000) - if _,err := f.Read(buf);err != nil { - e = err - return - } - - //fast seed a suit m4s - var sign_offset int - { - if offset := bytes.Index(buf, []byte("#EXTINF"));offset != -1 { - sign_offset = offset - } else { - e = errors.New(`no found first #EXTINF`) - return - } - for sign_offset < len(buf) { - if offset := bytes.Index(buf[sign_offset:], []byte(".m4s"));offset != -1 { - sign_offset = offset + sign_offset - } else if sign_offset==0 { - e = errors.New(`sign_offset`) - return - } else { - break - } - if sign := buf[sign_offset-1] - 48;sign%3 == 0 {break} - sign_offset += 4 - } - } - - var ( - start_offset int - end_offset int - m4s_start_offset int - m4s_end_offset int - ) - - if offset := bytes.Index(buf[sign_offset:], []byte("#EXTINF"));offset != -1 { - start_offset = offset + sign_offset - } else { - e = errors.New(`start_offset`) - return - } - - if offset := bytes.Index(buf[start_offset:], []byte{0x0a});offset != -1 { - m4s_start_offset = offset+start_offset+1 - } else { - e = errors.New(`m4s_start_offset`) - return - } - - if offset := bytes.Index(buf[m4s_start_offset:], []byte{0x0a});offset != -1 { - m4s_end_offset = offset+m4s_start_offset+1 - } else { - e = errors.New(`m4s_end_offset`) - return - } - - end_offset += start_offset+7 - - for i:=0;end_offset < len(buf);i+=1{ - if offset := bytes.Index(buf[end_offset:], []byte("#EXTINF"));offset != -1 { - end_offset = offset+end_offset - } else if end_offset==0 { - e = errors.New(`end_offset`) - return - } else { - e = errors.New("not enough data "+strconv.Itoa(i)) - return - } - if i>12 {break} - end_offset += 7 - } - - A = buf[m4s_start_offset:m4s_end_offset] - B = buf[start_offset:end_offset] + if _,err := w.Write(res);err != nil { + flog.L(`E: `,err) return } - - { - A,B,e := seed_m4s(f) - - if e != nil { - flog.L(`W: `,e); - w.WriteHeader(http.StatusServiceUnavailable) - return - } - - res = append(res, A...) + } + } else if filepath.Ext(path) == `.m4s` { + path = base_dir+path - res = append(res, B...) - } + var buf []byte - if _,err = w.Write(res);err != nil { + if b,ok := cache.Load(path);!ok{ + f,err := os.OpenFile(path,os.O_RDONLY,0644) + if err != nil { flog.L(`E: `,err); return } + defer f.Close() + + b := make([]byte,1024*1024) + if n,e := f.Read(b);e != nil { + flog.L(`E: `,e) + w.WriteHeader(http.StatusServiceUnavailable) + return + } else if n == 1024*1024 { + flog.L(`W: `,`buf limit`) + w.WriteHeader(http.StatusServiceUnavailable) + return + } else { + buf = b[:n] + cache.Store(path,buf) + go func(){//移除 + time.Sleep(time.Second*time.Duration(savestream.max_m4s_hls+1)) + cache.Delete(path) + }() + } + } else { + buf,_ = b.([]byte) + } + + if len(buf) == 0 { + flog.L(`W: `,`buf size 0`) + w.WriteHeader(http.StatusServiceUnavailable) + return + } + + w.Header().Set("Content-Type", "application/octet-stream") + w.Header().Set("Connection", "Keep-Alive") + if _,err := w.Write(buf);err != nil { + flog.L(`E: `,err) + return } } else { http.FileServer(http.Dir(base_dir)).ServeHTTP(w,r)