From: qydysky Date: Sun, 6 Jun 2021 09:47:15 +0000 (+0800) Subject: hls流优化 X-Git-Tag: v0.5.10~41^2~125 X-Git-Url: http://127.0.0.1:8081/?a=commitdiff_plain;h=930d27587b8909be774ea8edae75f132dfafce5e;p=bili_danmu%2F.git hls流优化 --- diff --git a/README.md b/README.md index 3c35b42..0725f79 100644 --- a/README.md +++ b/README.md @@ -127,12 +127,12 @@ golang go version go1.15 linux/amd64 I: 2021/04/13 20:07:45 命令行操作 [直播Web服务: http://192.168.31.245:38259] ``` -测试可用项目: +测试可用项目(测试可连续播放10min+): - [xqq/mpegts.js](https://github.com/xqq/mpegts.js) - [bilibili/flv.js](https://github.com/bilibili/flv.js) - [bytedance/xgplayer](https://github.com/bytedance/xgplayer) -- [video-dev/hls.js](https://github.com/video-dev/hls.js) +- [video-dev/hls.js@v1.0.7+](https://hls-js-10780deb-25d8-41d3-b164-bc334c8dd47f.netlify.app/demo/) - [mpv](https://mpv.io/) diff --git a/Reply/F.go b/Reply/F.go index 90cd8ce..8de2c9a 100644 --- a/Reply/F.go +++ b/Reply/F.go @@ -18,11 +18,11 @@ import ( "bytes" "encoding/base64" // "runtime" - + c "github.com/qydysky/bili_danmu/CV" F "github.com/qydysky/bili_danmu/F" send "github.com/qydysky/bili_danmu/Send" - + p "github.com/qydysky/part" funcCtrl "github.com/qydysky/part/funcCtrl" idpool "github.com/qydysky/part/idpool" @@ -125,7 +125,6 @@ func ShowRevf(){ //Ass 弹幕转字幕 type Ass struct { - file string//弹幕ass文件名 startT time.Time//开始记录的基准时间 header string//ass开头 @@ -161,7 +160,7 @@ Format: Layer, Start, End, Style, Name, MarginL, MarginR, MarginV, Effect, Text func Ass_f(file string, st time.Time){ ass.file = file if file == "" {return} - + if rel, err := filepath.Rel(savestream.base_path, ass.file);err == nil { c.Log.Base(`Ass`).L(`I: `,"保存到", rel + ".ass") } else { @@ -218,7 +217,10 @@ func dtos(t time.Duration) string { type Savestream struct { base_path string path string - hls_stream []byte//发送给客户的m3u8字节 + hls_stream struct { + b []byte//发送给客户的m3u8字节 + t time.Time + } flv_front []byte//flv头及首tag flv_stream *msgq.Msgq//发送给客户的flv流关键帧间隔片 @@ -231,7 +233,7 @@ type Savestream struct { type hls_generate struct { hls_file_header []byte//发送给客户的m3u8不变头 - m4s_list []*m4s_link_item//m4s列表 + m4s_list []*m4s_link_item//m4s列表 缓冲 } type m4s_link_item struct {//使用指针以设置是否已下载 @@ -239,6 +241,7 @@ type m4s_link_item struct {//使用指针以设置是否已下载 Base string//m4s文件名 Offset_line int//m3u8中的行下标 status int//该m4s下载状态 s_noload:未下载 s_loading正在下载 s_fin下载完成 s_fail下载失败 + isshow bool } //m4s状态 const ( @@ -511,7 +514,7 @@ func Savestreamf(){ l.L(`W: `, err) } Ass_f(savestream.path, time.Now()) - + // no expect qn exit_chan := s.Init() go func(){ @@ -526,7 +529,7 @@ func Savestreamf(){ // sync_buf []byte close func() } - + //chans var ( reqs = msgq.New(10) @@ -550,7 +553,7 @@ func Savestreamf(){ var ( reqs_used_id []id_close reqs_remove_id []id_close - + reqs_keyframe [][][]byte reqs_func_block funcCtrl.BlockFunc @@ -559,26 +562,26 @@ func Savestreamf(){ reqs.Pull_tag(map[string]func(interface{})(bool){ `req`:func(data interface{})(bool){ req,ok := data.(link_stream) - + if !ok {return false} - + if len(req.keyframe) == 0 { // fmt.Println(`没有keyframe,退出`) req.close() return false } // fmt.Println(`处理req_id`,req.id.Id,`keyframe_len`,len(req.keyframe)) - + if offset,_ := out.Seek(0,1);offset == 0 { // fmt.Println(`添加头`,len(req.front)) //stream savestream.flv_front = req.front out.Write(req.front) } - + reqs_func_block.Block() defer reqs_func_block.UnBlock() - + for i:=0;i 4 { if reqs_keyframe_index == len(reqs_used_id)-1 { l.L(`T: `,"flv强行拼合") - + for i:=0;i%d 下一header %b\n",len(buf),len(buf)-cut_offset,buf[:11]) buf = buf[cut_offset:] } - + skip_buf_size = len(buf)+len(list[0]) reqs.Push_tag(`req`,*item) } @@ -844,20 +846,20 @@ func Savestreamf(){ break } } - + l.L(`I: `,"flv关闭,开始新连接") - + //即将过期,刷新c.Live F.Get(`Liveing`) if !c.Liveing {break} F.Get(`Live`) if len(c.Live)==0 {break} } - + exit_chan.Done() reqs.Push_tag(`close`,nil) out.Close() - + l.L(`I: `,"结束") Ass_f("", time.Now())//ass savestream.flv_front = []byte{}//flv头及首tag置空 @@ -875,7 +877,26 @@ func Savestreamf(){ var ( hls_msg = msgq.New(10) hls_gen hls_generate + DISCONTINUITY int + SEQUENCE int + buffersize int ) + + if v, ok := c.K_v.LoadV(`直播流缓冲`).(float64);ok && v > 0 { + buffersize = int(v) + } + + //hls stream gen 用户m3u8生成 + go func(){ + per_second := time.Tick(time.Second) + for { + select { + case <- savestream.cancel.WaitC():return;//exit + case now :=<- per_second:hls_msg.Push_tag(`clock`, now); + } + } + }() + //hls stream gen 用户m3u8生成 hls_msg.Pull_tag(map[string]func(interface{})(bool){ `header`:func(d interface{})(bool){ @@ -887,75 +908,115 @@ func Savestreamf(){ `body`:func(d interface{})(bool){ links,ok := d.([]*m4s_link_item) if !ok {return false} - //remove hls first m4s if len(links) > 0 && len((*links[0]).Base) > 0 && (*links[0]).Base[0] == 104 {links = links[1:]} - + //diff_t + diff_t := int(time.Now().Unix() - savestream.hls_stream.t.Unix()) + if savestream.hls_stream.t.IsZero() {diff_t = 0} + //diff_t too large + if diff_t > 2 { + hls_gen.m4s_list = append(hls_gen.m4s_list, &m4s_link_item{ + Base:"DICONTINUITY", + status:s_fin, + }) + } hls_gen.m4s_list = append(hls_gen.m4s_list, links...) - // if len(hls_gen.m4s_list) > savestream.max_m4s_hls {//too much - // cut_offset := 0 - // for i:=0;i savestream.min_m4s_hls { - // cut_offset = i-savestream.min_m4s_hls - // } - // } - // hls_gen.m4s_list = hls_gen.m4s_list[cut_offset:] - // } + return false + }, + `clock`:func(now interface{})(bool){ + //buffer + if len(hls_gen.m4s_list) - buffersize < 0 { + return false + } - var res []byte - - //add m4s block + var ( + res []byte + add = int(savestream.hls_stream.t.Unix() % 3) + ) + + //add block { - m4s := 0 + var m4s_num int + + //m4s list m4s_list_b := []byte{} - var SEQUENCE string - for _,v := range hls_gen.m4s_list { + for k,v := range hls_gen.m4s_list { if v.status != s_fin { - // if m4s < savestream.m4s_hls && len(hls_gen.m4s_list) > 3+savestream.m4s_hls { - continue - // } - // break + //#EXT-X-DISCONTINUITY-SEQUENCE + //reset hls lists + if k == m4s_num && m4s_num < 3 { + m4s_list := append(hls_gen.m4s_list[:k], &m4s_link_item{ + Base:"DICONTINUITY", + status:s_fin, + isshow:true, + }) + hls_gen.m4s_list = append(m4s_list, hls_gen.m4s_list[k:]...) + m4s_list_b = append(m4s_list_b, []byte("#EXT-X-DICONTINUITY\n")...) + } + break } - m4s += 1 - if m4s == 1 {SEQUENCE = strings.ReplaceAll(v.Base, ".m4s", "")} - m4s_list_b = append(m4s_list_b, []byte("#EXTINF:1\n")...) + + v.isshow = true + + if v.Base == "DICONTINUITY" { + m4s_list_b = append(m4s_list_b, []byte("#EXT-X-DICONTINUITY\n")...) + continue + } + + if m4s_num >= 8+add {break} + + m4s_num += 1 + // if m4s_num == 1 {SEQUENCE = strings.ReplaceAll(v.Base, ".m4s", "")} + m4s_list_b = append(m4s_list_b, []byte("#EXTINF:1,"+v.Base+"\n")...) m4s_list_b = append(m4s_list_b, v.Base...) m4s_list_b = append(m4s_list_b, []byte("\n")...) } - //no useable m4s - if m4s == 0 {return false} - //clear - if cut := m4s-savestream.m4s_hls;cut > 0 { - hls_gen.m4s_list = hls_gen.m4s_list[cut:] - } - //add header - res = hls_gen.hls_file_header - //add #EXT-X-DISCONTINUITY - res = append(res, []byte("#EXT-X-DISCONTINUITY\n")...) - //add #EXT-X-MEDIA-SEQUENCE - if SEQUENCE != "" { - res = append(res, []byte("#EXT-X-MEDIA-SEQUENCE:"+SEQUENCE+"\n")...) + + //have useable m4s + if m4s_num != 0 { + //add header + res = hls_gen.hls_file_header + //add #EXT-X-DISCONTINUITY-SEQUENCE + res = append(res, []byte("#EXT-X-DISCONTINUITY-SEQUENCE:"+strconv.Itoa(DISCONTINUITY)+"\n")...) + //add #EXT-X-MEDIA-SEQUENCE + res = append(res, []byte("#EXT-X-MEDIA-SEQUENCE:"+strconv.Itoa(SEQUENCE)+"\n")...) + //add #INFO + res = append(res, []byte(fmt.Sprintf("#INFO-BUFFER:%d/%d\n",m4s_num,len(hls_gen.m4s_list)))...) + //add m4s + res = append(res, m4s_list_b...) } - //add m4s list - res = append(res, m4s_list_b...) + + //去除最后一个换行 + if len(res) > 0 {res = res[:len(res)-1]} + + //设置到全局变量,方便流服务器获取 + savestream.hls_stream.b = res } - - - //去除最后一个换行 - if len(res) > 0 {res = res[:len(res)-1]} //设置到全局变量,方便流服务器获取 - savestream.hls_stream = res + savestream.hls_stream.t,_ = now.(time.Time) + + //del + if add != 2 {return false} + for del_num:=3;del_num > 0;hls_gen.m4s_list = hls_gen.m4s_list[1:] { + //#EXT-X-DICONTINUITY + if hls_gen.m4s_list[0].Base == "DICONTINUITY" { + DISCONTINUITY += 1 + continue + } + del_num -= 1 + //#EXTINF + if hls_gen.m4s_list[0].isshow {SEQUENCE += 1} + } + return false }, `close`:func(d interface{})(bool){ - savestream.hls_stream = []byte{}//退出置空 + savestream.hls_stream.b = []byte{}//退出置空 + savestream.hls_stream.t = time.Now() return true }, }) @@ -967,7 +1028,9 @@ func Savestreamf(){ var ( last_download *m4s_link_item miss_download miss_download_T - download_limit funcCtrl.BlockFunc//limit.New(1,500,0)//download m4s per 500ms + download_limit = funcCtrl.BlockFuncN{ + Max:2, + }//limit ) expires := time.Now().Add(time.Minute*2).Unix() @@ -975,13 +1038,13 @@ func Savestreamf(){ path_front string path_behind string ) - + for { //退出,等待下载完成 if !savestream.cancel.Islive() { l.L(`I: `,"退出,等待片段下载") - download_limit.Block() - download_limit.UnBlock() + download_limit.None() + download_limit.UnNone() links := []*m4s_link_item{} //下载出错的 @@ -1002,10 +1065,9 @@ func Savestreamf(){ r := reqf.New() if e := r.Reqf(reqf.Rval{ Url:v.Url, - Retry:0, SaveToPath:savestream.path+v.Base, ConnectTimeout:5000, - ReadTimeout:5000, + ReadTimeout:1000, Proxy:c.Proxy, }); e != nil{ l.L(`I: `,e) @@ -1025,7 +1087,10 @@ func Savestreamf(){ links,file_add,exp,e := hls_get_link(c.Live[0],last_download) if e != nil { - if reqf.IsTimeout(e) || reqf.IsDnsErr(e) { + if e == no_Modified { + time.Sleep(time.Duration(2)*time.Second) + continue + } else if reqf.IsTimeout(e) || reqf.IsDnsErr(e) { l.L(`I: `,e) continue } else { @@ -1049,7 +1114,7 @@ func Savestreamf(){ } if len(links) == 0 { - time.Sleep(time.Second) + time.Sleep(time.Duration(2)*time.Second) continue } @@ -1069,7 +1134,7 @@ func Savestreamf(){ } else { l.L(`I: `,`猜测hls`,previou,`-`,now,`(`,diff,`)`) } - + {//file_add for i:=now-1;i>previou;i-=1 { file_add = append([]byte(strconv.Itoa(i)+".m4s"),file_add...) @@ -1085,7 +1150,7 @@ func Savestreamf(){ path_front = u.Scheme+"://"+path.Dir(u.Host+u.Path)+"/" path_behind = "?"+u.RawQuery } - + //下载出错的 miss_download.RLock() if len(miss_download.List) != 0 { @@ -1127,6 +1192,9 @@ func Savestreamf(){ continue } + //将links传送给hls生成器 + hls_msg.Push_tag(`body`, links) + f := p.File() f.FileWR(p.Filel{ File:savestream.path+"0.m3u8.dtmp", @@ -1134,7 +1202,7 @@ func Savestreamf(){ Loc:-1, Context:[]interface{}{file_add}, }) - + for i:=0;i expires { @@ -1274,7 +1342,7 @@ func Obsf(on bool){ p.Exec().Start(exec.Command(obs.Prog)) p.Sys().Timeoutf(3) } - + // Connect a client. if err := obs.c.Connect(); err != nil { l.L(`E: `,err) @@ -1368,7 +1436,7 @@ func Autobanf(s string) bool { if pt <= 5 {return false}//字数过少去除 res = append(res, pt) } - { + { pt := selfcross(s); // if pt > 0.5 {return false}//自身重复高去除 // res = append(res, pt) @@ -1490,7 +1558,7 @@ func init(){ for k,v := range autoskip.buf {tmp[k] = v} autoskip.buf = tmp } - autoskip.Unlock() + autoskip.Unlock() } }() } @@ -1540,10 +1608,10 @@ func init() { for lessdanmu.limit.PTK() == lessdanmu.max_num { time.Sleep(time.Second*3) } - + for { time.Sleep(time.Second*10) - + lessdanmu.Lock() if ptk := lessdanmu.limit.PTK();ptk == lessdanmu.max_num { if lessdanmu.threshold > 0.03 { @@ -1569,7 +1637,7 @@ func Lessdanmuf(s string) (show bool) { o := cross(s, lessdanmu.buf) if o == 1 {return false}//完全无用 - + Jiezouf(lessdanmu.buf) lessdanmu.buf = append(lessdanmu.buf[1:], s) @@ -1580,7 +1648,7 @@ func Lessdanmuf(s string) (show bool) { if show && lessdanmu.max_num > 0 { lessdanmu.limit.TO() } - return + return } /* @@ -1694,7 +1762,7 @@ func Jiezouf(s []string) bool { now,S := selfcross2(s) jiezou.avg = (8 * jiezou.avg + 2 * now)/10 if jiezou.turn < len(s) {jiezou.turn += 1;return false} - + if _,ok := jiezou.skipS[S]; ok {return false} jiezou.Lock() @@ -1743,7 +1811,7 @@ func Entry_danmu(){ //检查与切换粉丝牌,只在cookie存在时启用 F.Get(`CheckSwitch_FansMedal`) - + if v,_ := c.K_v.LoadV(`进房弹幕_有粉丝牌时才发`).(bool);v && c.Wearing_FansMedal == 0{ flog.L(`T: `,`无粉丝牌`) return @@ -1873,7 +1941,8 @@ func init() { w.Header().Set("Access-Control-Allow-Origin", "*") w.Header().Set("Connection", "Keep-Alive") w.Header().Set("Content-Transfer-Encoding", "binary") - + start := time.Now() + var path string = r.URL.Path[1:] if !p.Checkfile().IsExist(base_dir+path) { @@ -1881,112 +1950,136 @@ func init() { return } - if filepath.Ext(path) == `.dtmp` { - if strings.Contains(path,".flv") { - // path = base_dir+path - w.Header().Set("Content-Type", "video/x-flv") - w.WriteHeader(http.StatusOK) + if strings.Contains(path, filepath.Base(savestream.path)) { + w.Header().Set("Server", "live") + if filepath.Ext(path) == `.dtmp` { + if strings.Contains(path,".flv") { + // path = base_dir+path + w.Header().Set("Content-Type", "video/x-flv") + w.WriteHeader(http.StatusOK) - flusher, flushSupport := w.(http.Flusher) - if flushSupport {flusher.Flush()} + flusher, flushSupport := w.(http.Flusher) + if flushSupport {flusher.Flush()} - //写入flv头,首tag - if _,err := w.Write(savestream.flv_front);err != nil { - return - } else if flushSupport { - flusher.Flush() - } + //写入flv头,首tag + if _,err := w.Write(savestream.flv_front);err != nil { + return + } else if flushSupport { + flusher.Flush() + } - cancel := make(chan struct{}) - - //flv流关键帧间隔切片 - savestream.flv_stream.Pull_tag(map[string]func(interface{})(bool){ - `stream`:func(data interface{})(bool){ - if b,ok := data.([]byte);ok{ - if _,err := w.Write(b);err != nil { - close(cancel) - return true - } else if flushSupport { - flusher.Flush() + cancel := make(chan struct{}) + + //flv流关键帧间隔切片 + savestream.flv_stream.Pull_tag(map[string]func(interface{})(bool){ + `stream`:func(data interface{})(bool){ + if b,ok := data.([]byte);ok{ + if _,err := w.Write(b);err != nil { + close(cancel) + return true + } else if flushSupport { + flusher.Flush() + } } - } - return false - }, - `close`:func(data interface{})(bool){ - close(cancel) - return true - }, - }) + return false + }, + `close`:func(data interface{})(bool){ + close(cancel) + return true + }, + }) + + <- cancel + } else if strings.Contains(path,".m3u8") { + w.Header().Set("Cache-Control", "max-age=1") + w.Header().Set("Content-Type", "application/vnd.apple.mpegurl") + w.Header().Set("Last-Modified", savestream.hls_stream.t.Format(http.TimeFormat)) + + // //经常m4s下载速度赶不上,使用阻塞避免频繁获取列表带来的卡顿 + // if time.Now().Sub(savestream.hls_stream.t).Seconds() > 1 { + // time.Sleep(time.Duration(3)*time.Second) + // } + + res := savestream.hls_stream.b + + if len(res) == 0 { + w.Header().Set("Retry-After", "1") + w.WriteHeader(http.StatusServiceUnavailable) + return + } - <- cancel - } else if strings.Contains(path,".m3u8") { - // w.Header().Set("Cache-Control", "max-age=1") - w.Header().Set("Content-Type", "application/vnd.apple.mpegurl") + //PROCESS-TIME + w.Header().Set("PROCESS-TIME", time.Since(start).String()) - res := savestream.hls_stream - if len(res) == 0 { - w.WriteHeader(http.StatusNotFound) - return + if _,err := w.Write(res);err != nil { + flog.L(`E: `,err) + return + } } + } else if filepath.Ext(path) == `.m4s` { + w.Header().Set("Server", "live") + w.Header().Set("Cache-Control", "max-age=60") - if _,err := w.Write(res);err != nil { - flog.L(`E: `,err) - return - } - } - } else if filepath.Ext(path) == `.m4s` { - path = base_dir+path + path = base_dir+path - var buf []byte + var buf []byte - if b,ok := m4s_cache.Load(path);!ok{ - f,err := os.OpenFile(path,os.O_RDONLY,0644) - if err != nil { - flog.L(`E: `,err); - return + if b,ok := m4s_cache.Load(path);!ok{ + f,err := os.OpenFile(path,os.O_RDONLY,0644) + if err != nil { + flog.L(`E: `,err); + return + } + defer f.Close() + + b := make([]byte,1<<20) + if n,e := f.Read(b);e != nil { + flog.L(`E: `,e) + w.Header().Set("Retry-After", "1") + w.WriteHeader(http.StatusServiceUnavailable) + return + } else if n == 1<<20 { + flog.L(`W: `,`buf limit`) + w.Header().Set("Retry-After", "1") + w.WriteHeader(http.StatusServiceUnavailable) + return + } else { + buf = b[:n] + m4s_cache.Store(path,buf) + go func(){//移除 + time.Sleep(time.Second*time.Duration(savestream.m4s_hls+1)) + m4s_cache.Delete(path) + }() + } + } else { + buf,_ = b.([]byte) } - defer f.Close() - - b := make([]byte,1<<20) - if n,e := f.Read(b);e != nil { - flog.L(`E: `,e) - w.WriteHeader(http.StatusServiceUnavailable) - return - } else if n == 1<<20 { - flog.L(`W: `,`buf limit`) + + if len(buf) == 0 { + flog.L(`W: `,`buf size 0`) + w.Header().Set("Retry-After", "1") w.WriteHeader(http.StatusServiceUnavailable) return - } else { - buf = b[:n] - m4s_cache.Store(path,buf) - go func(){//移除 - time.Sleep(time.Second*time.Duration(savestream.m4s_hls+1)) - m4s_cache.Delete(path) - }() } - } else { - buf,_ = b.([]byte) - } - if len(buf) == 0 { - flog.L(`W: `,`buf size 0`) - w.WriteHeader(http.StatusServiceUnavailable) - return - } - - // w.Header().Set("Content-Type", "application/octet-stream") - // w.Header().Set("Content-Length", strconv.Itoa(len(buf))) - w.WriteHeader(http.StatusOK) - if _,err := w.Write(buf);err != nil { - flog.L(`E: `,err) - return + //PROCESS-TIME + w.Header().Set("PROCESS-TIME", time.Since(start).String()) + w.WriteHeader(http.StatusOK) + if _,err := w.Write(buf);err != nil { + flog.L(`E: `,err) + return + } } } else { + w.Header().Set("Server", "file") http.FileServer(http.Dir(base_dir)).ServeHTTP(w,r) } } now = func(w http.ResponseWriter,r *http.Request){ //header + w.Header().Set("Access-Control-Allow-Credentials", "true") + w.Header().Set("Access-Control-Allow-Headers", "*") + w.Header().Set("Access-Control-Allow-Methods", "POST, GET, OPTIONS") w.Header().Set("Access-Control-Allow-Origin", "*") w.Header().Set("Cache-Control", "max-age=3") @@ -2011,10 +2104,11 @@ func init() { u, e := url.Parse("../"+path) if e != nil { flog.L(`E: `,e) + w.Header().Set("Retry-After", "1") w.WriteHeader(http.StatusServiceUnavailable) return } - // r.URL = + // r.URL = // root(w, r) w.Header().Set("Location", r.URL.ResolveReference(u).String()) w.WriteHeader(http.StatusTemporaryRedirect) diff --git a/demo/config/config_K_v.json b/demo/config/config_K_v.json index 1858321..042fa1b 100644 --- a/demo/config/config_K_v.json +++ b/demo/config/config_K_v.json @@ -48,6 +48,7 @@ "直播流类型-help": "flv or hls", "直播流类型": "hls", "直播流保存位置": "./live", + "直播流缓冲": 20, "直播保存位置Web服务":0, "ass-help": "只有保存直播流时才考虑生成ass", "生成Ass弹幕": true,