From a49442c0fd8d78dbe55424641e90913849052984 Mon Sep 17 00:00:00 2001 From: qydysky Date: Wed, 20 Apr 2022 13:42:15 +0800 Subject: [PATCH] refactor --- F/cmd.go | 2 +- Reply/F.go | 1526 ++--------------------------------------- Reply/Reply.go | 4 +- Reply/steam/stream.go | 49 -- Reply/stream.go | 499 ++++++++++++++ bili_danmu.go | 4 +- 6 files changed, 547 insertions(+), 1537 deletions(-) delete mode 100644 Reply/steam/stream.go create mode 100644 Reply/stream.go diff --git a/F/cmd.go b/F/cmd.go index 3bcf94d..d5949f8 100644 --- a/F/cmd.go +++ b/F/cmd.go @@ -160,7 +160,7 @@ func Cmd() { fmt.Println(`舰长数:`, c.C.GuardNum) fmt.Println(`分区排行:`, c.C.Note, `人气:`, c.C.Renqi) if c.C.Stream_url != "" { - fmt.Println(`直播Web服务:`, c.C.Stream_url+`/now`) + fmt.Println(`直播Web服务:`, c.C.Stream_url) } fmt.Print("\n") diff --git a/Reply/F.go b/Reply/F.go index 5d98924..77ef155 100644 --- a/Reply/F.go +++ b/Reply/F.go @@ -1,21 +1,14 @@ package reply import ( - "bytes" "context" - "encoding/base64" "encoding/json" - "errors" "fmt" "io" - "io/fs" "io/ioutil" "math" "net/http" - "net/url" - "os" "os/exec" - "path" "path/filepath" "strconv" "strings" @@ -31,14 +24,9 @@ import ( send "github.com/qydysky/bili_danmu/Send" p "github.com/qydysky/part" - funcCtrl "github.com/qydysky/part/funcCtrl" - idpool "github.com/qydysky/part/idpool" limit "github.com/qydysky/part/limit" msgq "github.com/qydysky/part/msgq" - reqf "github.com/qydysky/part/reqf" - s "github.com/qydysky/part/signal" psync "github.com/qydysky/part/sync" - util "github.com/qydysky/part/util" web "github.com/qydysky/part/web" obsws "github.com/christopher-dG/go-obs-websocket" @@ -204,7 +192,7 @@ func Ass_f(file string, st time.Time) { return } - if rel, err := filepath.Rel(savestream.base_path, ass.file); err == nil { + if rel, err := filepath.Rel(streamO.config.save_path, ass.file); err == nil { c.C.Log.Base(`Ass`).L(`I: `, "保存到", rel+".ass") } else { c.C.Log.Base(`Ass`).L(`I: `, "保存到", ass.file+".ass") @@ -259,1229 +247,27 @@ func dtos(t time.Duration) string { //hls //https://datatracker.ietf.org/doc/html/draft-pantos-http-live-streaming - -//直播流保存 -type Savestream struct { - base_path string - path string - hls_stream struct { - b []byte //发送给客户的m3u8字节 - t time.Time - } - front []byte //flv头及首tag or hls的初始化m4s - stream *msgq.Msgq //发送给客户的flv流关键帧间隔片 or hls的fmp4片 - - m4s_hls int //hls list 中的m4s数量 - hlsbuffersize int //hls list缓冲m4s数量 - hls_banlance_host bool //使用均衡hls服务器 - - wait *s.Signal - cancel *s.Signal - skipFunc funcCtrl.SkipFunc -} - -type hls_generate struct { - hls_first_fmp4_name string - hls_file_header []byte //发送给客户的m3u8不变头 - m4s_list []*m4s_link_item //m4s列表 缓冲 -} - -type m4s_link_item struct { //使用指针以设置是否已下载 - Url string // m4s链接 - Base string //m4s文件名 - Offset_line int //m3u8中的行下标 - status int //该m4s下载状态 s_noload:未下载 s_loading正在下载 s_fin下载完成 s_fail下载失败 - isshow bool -} - -//m4s状态 -const ( - s_noload = iota - s_loading - s_fin - s_fail -) - -var savestream = Savestream{ - stream: msgq.New(10), //队列最多保留10个关键帧间隔片 - m4s_hls: 8, -} +var streamO = new(M4SStream) func init() { //使用带tag的消息队列在功能间传递消息 c.C.Danmu_Main_mq.Pull_tag(msgq.FuncMap{ `savestream`: func(data interface{}) bool { - if savestream.cancel.Islive() { - Savestream_wait() + if streamO.Status.Islive() { + streamO.Stop() } else { - go Savestreamf() + streamO.LoadConfig(&c.C.K_v, c.C.Log) + go streamO.Start() } - return false }, }) - //base_path - if path, ok := c.C.K_v.LoadV("直播流保存位置").(string); ok { - if path, err := filepath.Abs(path); err == nil { - savestream.base_path = path + "/" - } - } - if v, ok := c.C.K_v.LoadV(`直播hls流缓冲`).(float64); ok && v > 0 { - savestream.hlsbuffersize = int(v) - } - if v, ok := c.C.K_v.LoadV(`直播hls流均衡`).(bool); ok { - savestream.hls_banlance_host = v - } -} - -//已go func形式调用,将会获取直播流 -func Savestreamf() { - l := c.C.Log.Base(`savestream`) - - //避免多次开播导致的多次触发 - { - if savestream.skipFunc.NeedSkip() { - l.L(`T: `, `已存在实例`) - return - } - defer savestream.skipFunc.UnSet() - } - - want_qn, ok := c.C.K_v.LoadV("直播流清晰度").(float64) - if !ok || want_qn < 0 { - return - } - c.C.Live_want_qn = int(want_qn) - - F.Get(&c.C).Get(`Live`) - - if savestream.cancel.Islive() { - return - } - - //random host load balance - Host_list := []string{} - if savestream.hls_banlance_host { - for _, v := range c.C.Live { - url_struct, e := url.Parse(v) - if e != nil { - continue - } - Host_list = append(Host_list, url_struct.Hostname()) - } - } - - var ( - no_found_link = errors.New("no_found_link") - no_Modified = errors.New("no_Modified") - last_hls_Modified time.Time - hls_get_link = func(m3u8_urls []string, last_download *m4s_link_item) (need_download []*m4s_link_item, m3u8_file_addition []byte, expires int, err error) { - var ( - r *reqf.Req - m3u8_url *url.URL - ) - for index := 0; index < len(m3u8_urls); index += 1 { - r = reqf.New() - if tmp, e := url.Parse(m3u8_urls[index]); e != nil { - err = e - return - } else { - m3u8_url = tmp - } - - rval := reqf.Rval{ - Url: m3u8_url.String(), - ConnectTimeout: 2000, - ReadTimeout: 1000, - Timeout: 2000, - Proxy: c.C.Proxy, - Header: map[string]string{ - `Host`: m3u8_url.Host, - `User-Agent`: `Mozilla/5.0 (X11; Linux x86_64; rv:83.0) Gecko/20100101 Firefox/83.0`, - `Accept`: `*/*`, - `Accept-Language`: `zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2`, - `Accept-Encoding`: `gzip, deflate, br`, - `Origin`: `https://live.bilibili.com`, - `Connection`: `keep-alive`, - `Pragma`: `no-cache`, - `Cache-Control`: `no-cache`, - `Referer`: "https://live.bilibili.com/", - }, - } - if !last_hls_Modified.IsZero() { - rval.Header[`If-Modified-Since`] = last_hls_Modified.Add(time.Second).Format("Mon, 02 Jan 2006 15:04:05 CST") - } - if e := r.Reqf(rval); e != nil { - if index+1 < len(m3u8_urls) { - continue - } - err = e - return - } - break - } - - if usedt := r.UsedTime.Seconds(); usedt > 3000 { - l.L(`I: `, `hls列表下载慢`, usedt, `ms`) - } - if r.Response.StatusCode == http.StatusNotModified { - l.L(`T: `, `hls未更改`) - err = no_Modified - return - } - //last_hls_Modified - if t, ok := r.Response.Header[`Last-Modified`]; ok && len(t) > 0 { - if lm, e := time.Parse("Mon, 02 Jan 2006 15:04:05 CST", t[0]); e == nil { - last_hls_Modified = lm - } else { - l.L(`T: `, e) - } - } - - query := m3u8_url.Query() - trid := query.Get("trid") - expires, _ = strconv.Atoi(query.Get("expires")) - buf := r.Respon - - //base-64 - if len(buf) != 0 && !bytes.Contains(buf, []byte("#")) { - buf, err = base64.StdEncoding.DecodeString(string(buf)) - if err != nil { - l.L(`W: `, err, string(buf)) - return - } - } - - var m4s_links []*m4s_link_item - lines := bytes.Split(buf, []byte("\n")) - for i := 0; i < len(lines); i += 1 { - line := lines[i] - m4s_link := "" - - if bytes.Contains(line, []byte("EXT-X-MAP")) { - o := bytes.Index(line, []byte(`EXT-X-MAP:URI="`)) + 15 - e := bytes.Index(line[o:], []byte(`"`)) + o - m4s_link = string(line[o:e]) - } else if bytes.Contains(lines[i], []byte("#EXT-X")) { //忽略扩展标签 - continue - } else if bytes.Contains(line, []byte(".m4s")) { - m4s_link = string(line) - } - - if m4s_link == "" { - continue - } - - u, e := url.Parse("./" + m4s_link + "?trid=" + trid) - if e != nil { - err = e - return - } - m4s_links = append(m4s_links, &m4s_link_item{ - Url: m3u8_url.ResolveReference(u).String(), - Base: m4s_link, - Offset_line: i, - }) - } - if len(m4s_links) == 0 { - err = no_found_link - return - } - - if last_download == nil { - m3u8_file_addition = buf - need_download = m4s_links - return - } - - var found bool - for i := 0; i < len(m4s_links); i += 1 { - if found { - offset := m4s_links[i].Offset_line - 1 - for i := offset; i < len(lines); i += 1 { - if bytes.Contains(lines[i], []byte("#EXT-X")) { //忽略扩展标签 - continue - } - m3u8_file_addition = append(m3u8_file_addition, lines[i]...) - m3u8_file_addition = append(m3u8_file_addition, []byte("\n")...) - } - m3u8_file_addition = m3u8_file_addition[:len(m3u8_file_addition)-1] - - need_download = append(need_download, m4s_links[i:]...) - break - } - found = (*last_download).Base == m4s_links[i].Base - } - if !found { - offset := m4s_links[1].Offset_line - 1 - for i := offset; i < len(lines); i += 1 { - if bytes.Contains(lines[i], []byte("#EXT-X")) { //忽略扩展标签 - continue - } - m3u8_file_addition = append(m3u8_file_addition, lines[i]...) - m3u8_file_addition = append(m3u8_file_addition, []byte("\n")...) - } - m3u8_file_addition = m3u8_file_addition[:len(m3u8_file_addition)-1] - - need_download = append(need_download, m4s_links[1:]...) - } - - return - } - flv_get_link = func(link string) (need_download string, expires int, err error) { - need_download = link - - url_struct, e := url.Parse(link) - if e != nil { - err = e - return - } - query := url_struct.Query() - expires, _ = strconv.Atoi(query.Get("expires")) - - return - } - ) - - for { - F.Get(&c.C).Get(`Liveing`) - if !c.C.Liveing { - break - } - - F.Get(&c.C).Get(`Live`) - if len(c.C.Live) == 0 { - break - } - - savestream.path = savestream.base_path - - savestream.path += strconv.Itoa(c.C.Roomid) + "_" + time.Now().Format("2006_01_02_15-04-05-000") - - savestream.wait = s.Init() - savestream.cancel = s.Init() - - CookieM := make(map[string]string) - c.C.Cookie.Range(func(k, v interface{}) bool { - CookieM[k.(string)] = v.(string) - return true - }) - - { //重试 - r := reqf.New() - go func() { - savestream.cancel.Wait() - r.Close() - }() - l.L(`I: `, "尝试连接live") - if e := r.Reqf(reqf.Rval{ - Url: c.C.Live[0], - Retry: 10, - SleepTime: 1000, - Proxy: c.C.Proxy, - Header: map[string]string{ - `Cookie`: reqf.Map_2_Cookies_String(CookieM), - }, - Timeout: 5 * 1000, - JustResponseCode: true, - }); e != nil { - l.L(`W: `, e) - } - - if r.Response == nil { - l.L(`W: `, `live响应错误`) - savestream.wait.Done() - savestream.cancel.Done() - time.Sleep(time.Second * 5) - continue - } else if r.Response.StatusCode != 200 { - l.L(`W: `, `live响应错误`, r.Response.Status, string(r.Respon)) - savestream.wait.Done() - savestream.cancel.Done() - time.Sleep(time.Second * 5) - continue - } - } - - if strings.Contains(c.C.Live[0], "flv") { - if rel, err := filepath.Rel(savestream.base_path, savestream.path); err == nil { - l.L(`I: `, "保存到", rel+".flv") - } else { - l.L(`I: `, "保存到", savestream.path+".flv") - l.L(`W: `, err) - } - Ass_f(savestream.path, time.Now()) - - // no expect qn - exit_chan := s.Init() - go func() { - savestream.cancel.Wait() - exit_chan.Done() - }() - - type link_stream struct { - id *idpool.Id - front []byte - keyframe [][]byte - // sync_buf []byte - close func() - } - - //chans - var ( - reqs = msgq.New(10) - id_pool = idpool.New() - ) - - //文件 - out, err := os.Create(savestream.path + ".flv" + ".dtmp") - if err != nil { - l.L(`E: `, err) - return - } - - //数据整合 - { - type id_close struct { - id uintptr - close func() - } - - var ( - reqs_used_id []id_close - reqs_remove_id []id_close - - reqs_keyframe [][][]byte - - reqs_func_block funcCtrl.BlockFunc - last_keyframe_timestamp int - ) - reqs.Pull_tag(map[string]func(interface{}) bool{ - `req`: func(data interface{}) bool { - req, ok := data.(link_stream) - - if !ok { - return false - } - - if len(req.keyframe) == 0 { - // fmt.Println(`没有keyframe,退出`) - req.close() - return false - } - // fmt.Println(`处理req_id`,req.id.Id,`keyframe_len`,len(req.keyframe)) - - if offset, _ := out.Seek(0, 1); offset == 0 { - // fmt.Println(`添加头`,len(req.front)) - //stream - savestream.front = req.front - out.Write(req.front) - } - - reqs_func_block.Block() - defer reqs_func_block.UnBlock() - - for i := 0; i < len(reqs_remove_id); i += 1 { - if reqs_remove_id[i].id == req.id.Id { - req.close() - return false - } - } - - var reqs_keyframe_index int = len(reqs_used_id) - { - var isnew bool = true - for i := 0; i < len(reqs_used_id); i += 1 { - if reqs_used_id[i].id == req.id.Id { - reqs_keyframe_index = i - isnew = false - break - } - } - if isnew { - // fmt.Println(`新req`,req.id.Id,reqs_keyframe_index) - reqs_used_id = append(reqs_used_id, id_close{ - id: req.id.Id, - close: req.close, - }) - } - } - - if len(reqs_used_id) == 1 { - // l.L(`T: `,"单req写入",len(req.keyframe)) - last_keyframe_timestamp, _ = Keyframe_timebase(req.keyframe, last_keyframe_timestamp) - - for i := 0; i < len(req.keyframe); i += 1 { - //stream - savestream.stream.Push_tag("stream", req.keyframe[i]) - out.Write(req.keyframe[i]) - } - return false - } - - for reqs_keyframe_index >= len(reqs_keyframe) { - reqs_keyframe = append(reqs_keyframe, [][]byte{}) - } - reqs_keyframe[reqs_keyframe_index] = append(reqs_keyframe[reqs_keyframe_index], req.keyframe...) - - // fmt.Println(`merge,添加reqs_keyframe数据`,reqs_keyframe_index,len(reqs_keyframe[reqs_keyframe_index])) - - for _, v := range reqs_keyframe { - if len(v) == 0 { - // fmt.Println(`merge,req无数据`,k) - return false - } - } - - if success_last_keyframe_timestamp, b, merged := Merge_stream(reqs_keyframe, last_keyframe_timestamp); merged == 0 { - // fmt.Println(`merge失败,reqs_keyframe[1]`,reqs_keyframe[1][0][:11],reqs_keyframe[1][len(reqs_keyframe[1])-1][:11]) - size := 0 - for i := 1; i < len(reqs_keyframe); i += 1 { - size += len(reqs_keyframe[i]) - } - - if reqs_keyframe_index == 0 { - // l.L(`T: `,"flv拼合失败,reqs_keyframe[0]写入") - // fmt.Println(`merge失败,reqs_keyframe[0]写入`,len(req.keyframe)) - - last_keyframe_timestamp, _ = Keyframe_timebase(req.keyframe, last_keyframe_timestamp) - - for i := 0; i < len(req.keyframe); i += 1 { - //stream - savestream.stream.Push_tag("stream", req.keyframe[i]) - out.Write(req.keyframe[i]) - } - // reqs_keyframe[0] = [][]byte{reqs_keyframe[0][len(reqs_keyframe[0])-1]} - } else if size > 4 { - if reqs_keyframe_index == len(reqs_used_id)-1 { - l.L(`T: `, "flv强行拼合") - - for i := 0; i < reqs_keyframe_index; i += 1 { - reqs_remove_id = append(reqs_remove_id, reqs_used_id[i]) - reqs_used_id[i].close() - } - reqs_used_id = reqs_used_id[reqs_keyframe_index:] - - last_keyframe_timestamp, _ = Keyframe_timebase(req.keyframe, last_keyframe_timestamp) - - for i := 0; i < len(req.keyframe); i += 1 { - //stream - savestream.stream.Push_tag("stream", req.keyframe[i]) - out.Write(req.keyframe[i]) - } - - reqs_keyframe = [][][]byte{} - } else { - req.close() - return false - } - } - } else { - // fmt.Println(`merge成功`,len(b)) - l.L(`T: `, "flv拼合成功") - - last_keyframe_timestamp = success_last_keyframe_timestamp - - for i := 0; i < merged; i += 1 { - reqs_remove_id = append(reqs_remove_id, reqs_used_id[i]) - reqs_used_id[i].close() - } - reqs_keyframe = [][][]byte{} - - reqs_used_id = reqs_used_id[merged:] - - //stream - savestream.stream.Push_tag("stream", b) - out.Write(b) - } - - return false - }, - // 11区 1 - `close`: func(data interface{}) bool { - // defer l.L(`I: `,"处理退出") - for i := 0; i < len(reqs_used_id); i += 1 { - reqs_used_id[i].close() - } - reqs_used_id = []id_close{} - // reqs_remove_id = []id_close{} - reqs_keyframe = [][][]byte{} - last_keyframe_timestamp = 0 - return true - }, - }) - } - - //连接保持 - for { - //随机选取服务器,获取超时时间 - - live_index := 0 - if len(c.C.Live) > 0 { - live_index = int(p.Rand().MixRandom(0, int64(len(c.C.Live)-1))) - } - link, exp, e := flv_get_link(c.C.Live[live_index]) - if e != nil { - l.L(`W: `, `流链接获取错误`, e) - break - } - - // 新建chan - var ( - bc = make(chan []byte, 1<<17) - req = reqf.New() - req_exit = s.Init() - ) - - l.L(`I: `, `新建请求`, req.Id()) - - //新建请求 - go func(r *reqf.Req, rval reqf.Rval) { - go func() { - select { - case <-exit_chan.WaitC(): - case <-req_exit.WaitC(): - } - r.Close() - }() - defer req_exit.Done() - e := r.Reqf(rval) - if r.Response == nil { - l.L(`W: `, `请求退出`, r.Id(), e) - } else if r.Response.StatusCode != 200 { - l.L(`W: `, `请求退出`, r.Id(), e, r.Response.Status, string(r.Respon)) - } else { - l.L(`W: `, `请求退出`, r.Id()) - } - }(req, reqf.Rval{ - Url: link, - Proxy: c.C.Proxy, - Header: map[string]string{ - `Cookie`: reqf.Map_2_Cookies_String(CookieM), - }, - //SaveToPath:savestream.path + ".flv", - SaveToChan: bc, - Timeout: int(int64(exp)-p.Sys().GetSTime()) * 1000, - ReadTimeout: 5 * 1000, - ConnectTimeout: 10 * 1000, - }) - - //返回通道 - var item = link_stream{ - close: req.Close, - id: id_pool.Get(), - } - l.L(`I: `, `新建连接`, item.id.Id) - - //解析 - go func(bc chan []byte, item *link_stream, exit_chan *s.Signal) { - var ( - buf []byte - skip_buf_size int - ) - defer req_exit.Done() - defer l.L(`W: `, `连接退出`, item.id.Id) - for exit_chan.Islive() && req_exit.Islive() { - select { - case <-exit_chan.WaitC(): - return - case <-req_exit.WaitC(): - return - case b := <-bc: - if len(b) == 0 { - // fmt.Println(`req退出`,item.id.Id) - id_pool.Put(item.id) - // reqs.Push_tag(`closereq`,*item) - return - } - - buf = append(buf, b...) - - if len(buf) < skip_buf_size { - break - } - - front, list, _ := Seach_stream_tag(buf) - - if len(front) != 0 && len(item.front) == 0 { - // fmt.Println(item.id.Id,`获取到header`,len(front)) - item.front = make([]byte, len(front)) - copy(item.front, front) - } - - if len(list) == 0 || len(item.front) == 0 { - // fmt.Println(`再次查询bufsize`,skip_buf_size) - skip_buf_size = 2 * len(buf) - break - } - - item.keyframe = list - - { - last_keyframe := list[len(list)-1] - cut_offset := bytes.LastIndex(buf, last_keyframe) + len(last_keyframe) - // fmt.Printf("buf截断 当前%d=>%d 下一header %b\n",len(buf),len(buf)-cut_offset,buf[:11]) - buf = buf[cut_offset:] - } - - skip_buf_size = len(buf) + len(list[0]) - reqs.Push_tag(`req`, *item) - } - } - }(bc, &item, exit_chan) - - expires := int64(exp) - p.Sys().GetSTime() - 120 - // no expect qn - if c.C.Live_want_qn < c.C.Live_qn { - expires = time.Now().Add(time.Minute * 2).Unix() - } - - //等待过期/退出 - { - var exit_sign bool - select { - case <-req_exit.Chan: //本次连接错误,退出重试 - case <-exit_chan.Chan: //要求退出 - exit_sign = true // - case <-time.After(time.Second * time.Duration(int(expires))): - } - if exit_sign { - //退出 - // l.L(`T: `,"chan退出") - break - } - } - - l.L(`I: `, "flv关闭,开始新连接") - - //即将过期,刷新c.C.Live - F.Get(&c.C).Get(`Liveing`) - if !c.C.Liveing { - break - } - F.Get(&c.C).Get(`Live`) - if len(c.C.Live) == 0 { - break - } - } - - exit_chan.Done() - reqs.Push_tag(`close`, nil) - out.Close() - - p.FileMove(savestream.path+".flv.dtmp", savestream.path+".flv") - } else { - savestream.path += "/" - if rel, err := filepath.Rel(savestream.base_path, savestream.path); err == nil { - l.L(`I: `, "保存到", rel+`/0.m3u8`) - } else { - l.L(`I: `, "保存到", savestream.path) - l.L(`W: `, err) - } - Ass_f(savestream.path+"0", time.Now()) - - var ( - hls_msg = msgq.New(20) - hls_gen hls_generate - DISCONTINUITY int - SEQUENCE int - ) - - //hls stream gen 用户m3u8生成 - go func() { - per_second := time.Tick(time.Second) - for { - select { - case <-savestream.cancel.WaitC(): - return //exit - case now := <-per_second: - hls_msg.Push_tag(`clock`, now) - } - } - }() - - //hls stream gen 用户m3u8生成 - hls_msg.Pull_tag(map[string]func(interface{}) bool{ - `header`: func(d interface{}) bool { - if b, ok := d.([]byte); ok { - hls_gen.hls_file_header = b - } - return false - }, - `body`: func(d interface{}) bool { - links, ok := d.([]*m4s_link_item) - if !ok { - return false - } - //remove hls first m4s - if len(links) > 0 && - len((*links[0]).Base) > 0 && - (*links[0]).Base[0] == 104 { - links = links[1:] - } - - hls_gen.m4s_list = append(hls_gen.m4s_list, links...) - - return false - }, - `clock`: func(now interface{}) bool { - //buffer - if len(hls_gen.m4s_list)-savestream.hlsbuffersize < 0 { - return false - } - - //add block - var ( - m4s_num int - has_DICONTINUITY bool - res []byte - threshold = savestream.m4s_hls - ) - if threshold < 3 { - threshold = 3 - } - { - //m4s list - m4s_list_b := []byte{} - for k, v := range hls_gen.m4s_list { - if v.status != s_fin { - //#EXT-X-DISCONTINUITY-SEQUENCE - //reset hls lists - if !has_DICONTINUITY && m4s_num < threshold { - m4s_list := append(util.SliceCopy(hls_gen.m4s_list[:k]).([]*m4s_link_item), &m4s_link_item{ - Base: "DICONTINUITY", - status: s_fin, - isshow: true, - }) - hls_gen.m4s_list = append(m4s_list, hls_gen.m4s_list[k:]...) - m4s_list_b = append(m4s_list_b, []byte("#EXT-X-DICONTINUITY\n")...) - } - break - } - - v.isshow = true - - if v.Base == "DICONTINUITY" { - has_DICONTINUITY = true - m4s_list_b = append(m4s_list_b, []byte("#EXT-X-DICONTINUITY\n")...) - continue - } - - if m4s_num >= savestream.m4s_hls { - break - } - - m4s_num += 1 - // if m4s_num == 1 {SEQUENCE = strings.ReplaceAll(v.Base, ".m4s", "")} - m4s_list_b = append(m4s_list_b, []byte("#EXTINF:1,"+v.Base+"\n")...) - m4s_list_b = append(m4s_list_b, v.Base...) - m4s_list_b = append(m4s_list_b, []byte("\n")...) - } - - //have useable m4s - if m4s_num != 0 { - //add header - res = hls_gen.hls_file_header - //add #EXT-X-DISCONTINUITY-SEQUENCE - res = append(res, []byte("#EXT-X-DISCONTINUITY-SEQUENCE:"+strconv.Itoa(DISCONTINUITY)+"\n")...) - //add #EXT-X-MEDIA-SEQUENCE - res = append(res, []byte("#EXT-X-MEDIA-SEQUENCE:"+strconv.Itoa(SEQUENCE)+"\n")...) - //add #INFO - res = append(res, []byte(fmt.Sprintf("#INFO-BUFFER:%d/%d\n", m4s_num, len(hls_gen.m4s_list)))...) - //add m4s - res = append(res, m4s_list_b...) - //去除最后一个换行 - res = res[:len(res)-1] - } - } - - //try to jump the hole - var skip_del bool - if m4s_num < threshold { - var ( - index int //the first useable fmp4 index of section - DICONTINUITY_num int - catch bool //catch useable fmp4? - ) - for i := 0; i < len(hls_gen.m4s_list); i += 1 { - if hls_gen.m4s_list[i].status != s_fin { - catch = false - continue - } - if !catch { - index = i - } else { - if hls_gen.m4s_list[i].Base == "DICONTINUITY" { - DICONTINUITY_num += 1 - } else if i-index-DICONTINUITY_num > threshold { - //find a nice index, remove all bad fmp4s - skip := 0 - skip_del = true - for ; index >= 0; index -= 1 { - if hls_gen.m4s_list[index].status == s_fin { - continue - } - skip += 1 - hls_gen.m4s_list = append(hls_gen.m4s_list[:index], hls_gen.m4s_list[index+1:]...) - } - l.L(`I: `, "卡顿,跳过", skip, "个tag") - break - } - } - catch = true - } - } - - //设置到全局变量,方便流服务器获取 - if len(res) != 0 { - savestream.hls_stream.b = res - } - savestream.hls_stream.t, _ = now.(time.Time) - - //del - for del_num := 1; !skip_del && del_num > 0; hls_gen.m4s_list = hls_gen.m4s_list[1:] { - del_num -= 1 - if !hls_gen.m4s_list[0].isshow { - continue - } - //#EXT-X-DICONTINUITY - if hls_gen.m4s_list[0].Base == "DICONTINUITY" { - DISCONTINUITY += 1 - continue - } - //#EXTINF - if hls_gen.m4s_list[0].isshow { - SEQUENCE += 1 - - { //stream hls2mp4 - var buf []byte - //stream front - if len(savestream.front) == 0 { - buf, _, e := get_m4s_cache(savestream.path + hls_gen.hls_first_fmp4_name) - if e == nil { - savestream.front = buf - } else { - l.L(`W: `, `推送mp4流错误,无法读取文件`, e) - } - } - //stream body - buf, _, e := get_m4s_cache(savestream.path + hls_gen.m4s_list[0].Base) - if e == nil { - savestream.stream.Push_tag(`stream`, buf) - } else { - l.L(`W: `, `推送mp4流错误,无法读取文件`, e) - } - } - } - } - - return false - }, - `close`: func(d interface{}) bool { - savestream.hls_stream.b = []byte{} //退出置空 - savestream.hls_stream.t = time.Now() - return true - }, - }) - - var ( - last_download *m4s_link_item - miss_download = make(chan *m4s_link_item, 100) - download_limit = funcCtrl.BlockFuncN{ - Max: 2, - } //limit - ) - expires := time.Now().Add(time.Minute * 2).Unix() - - var ( - path_front string - path_behind string - ) - - for { - //退出,等待下载完成 - if !savestream.cancel.Islive() { - l.L(`I: `, "退出,等待片段下载") - download_limit.None() - download_limit.UnNone() - - links := []*m4s_link_item{} - //下载出错的 - for len(miss_download) != 0 { - links = append(links, <-miss_download) - } - - for k, v := range links { - l.L(`I: `, "正在下载最后片段:", k+1, "/", len(links)) - v.status = s_loading - r := reqf.New() - if e := r.Reqf(reqf.Rval{ - Url: v.Url, - SaveToPath: savestream.path + v.Base, - ConnectTimeout: 5000, - ReadTimeout: 1000, - Retry: 1, - Proxy: c.C.Proxy, - }); e != nil && !errors.Is(e, io.EOF) { - l.L(`I: `, e) - v.status = s_fail - } else { - if usedt := r.UsedTime.Seconds(); usedt > 700 { - l.L(`I: `, `hls切片下载慢`, usedt, `ms`) - } - v.status = s_fin - } - } - //退出 - break - } - - links, file_add, exp, e := hls_get_link(c.C.Live, last_download) - if e != nil { - if e == no_Modified { - time.Sleep(time.Duration(2) * time.Second) - continue - } else if reqf.IsTimeout(e) || strings.Contains(e.Error(), "x509") { - l.L(`I: `, e) - continue - } else { - l.L(`W: `, e) - break - } - } - - //first block 获取并设置不变头 - if last_download == nil { - var res []byte - { - for row, i := bytes.SplitAfter(file_add, []byte("\n")), 0; i < len(row); i += 1 { - if bytes.Contains(row[i], []byte("#EXT")) { - //set stream front - if op := bytes.Index(row[i], []byte(`#EXT-X-MAP:URI="`)); op != -1 { - hls_gen.hls_first_fmp4_name = string(row[i][op+16 : len(row[i])-2]) - } - if bytes.Contains(row[i], []byte("#EXT-X-MEDIA-SEQUENCE")) || bytes.Contains(row[i], []byte("#EXTINF")) { - continue - } - res = append(res, row[i]...) - } - } - } - hls_msg.Push_tag(`header`, res) - } - - if len(links) == 0 { - time.Sleep(time.Duration(2) * time.Second) - continue - } - - //qn in expect , set expires - if c.C.Live_want_qn >= c.C.Live_qn { - expires = int64(exp) - } - - //use guess - if last_download != nil { - previou, _ := strconv.Atoi((*last_download).Base[:len((*last_download).Base)-4]) - now, _ := strconv.Atoi(links[0].Base[:len(links[0].Base)-4]) - if previou < now-1 { - if diff := now - previou; diff > 100 { - l.L(`W: `, `diff too large `, diff) - break - } else { - l.L(`I: `, `猜测hls`, previou, `-`, now, `(`, diff, `)`) - } - - { //file_add - for i := now - 1; i > previou; i -= 1 { - file_add = append([]byte(strconv.Itoa(i)+".m4s"), file_add...) - } - } - { //links - if path_front == "" || path_behind == "" { - u, e := url.Parse(links[0].Url) - if e != nil { - l.L(`E: `, `fault to enable guess`, e) - return - } - path_front = u.Scheme + "://" + path.Dir(u.Host+u.Path) + "/" - path_behind = "?" + u.RawQuery - } - - //出错期间没能获取到的 - for i := now - 1; i > previou; i -= 1 { - base := strconv.Itoa(i) + ".m4s" - links = append([]*m4s_link_item{ - { - Url: path_front + base + path_behind, - Base: base, - }, - }, links...) - } - } - } - } - - if len(links) > 10 { - l.L(`T: `, `等待下载切片:`, len(links)) - } else if len(links) > 100 { - l.L(`W: `, `重试,等待下载切片:`, len(links)) - - if F.Get(&c.C).Get(`Liveing`); !c.C.Liveing { - break - } - if F.Get(&c.C).Get(`Live`); len(c.C.Live) == 0 { - break - } - - // set expect - expires = time.Now().Add(time.Minute * 2).Unix() - continue - } - - //将links传送给hls生成器 - hls_msg.Push_tag(`body`, links) - - f := p.File() - f.FileWR(p.Filel{ - File: savestream.path + "0.m3u8.dtmp", - Loc: -1, - Context: []interface{}{file_add}, - }) - - for i := 0; i < len(links); i += 1 { - //fmp4切片下载 - go func(link *m4s_link_item, path string) { - //use url struct - var link_url *url.URL - { - if tmp, e := url.Parse(link.Url); e != nil { - l.L(`E: `, e) - return - } else { - link_url = tmp - } - } - - download_limit.Block() - defer download_limit.UnBlock() - - link.status = s_loading - for index := 0; index < len(Host_list); index += 1 { - link_url.Host = Host_list[index] - - r := reqf.New() - if e := r.Reqf(reqf.Rval{ - Url: link_url.String(), - SaveToPath: path + link.Base, - ConnectTimeout: 2000, - ReadTimeout: 1000, - Timeout: 2000, - Proxy: c.C.Proxy, - }); e != nil { - //try other host - if index+1 < len(Host_list) { - continue - } - - if reqf.IsTimeout(e) || strings.Contains(e.Error(), "x509") { - l.L(`T: `, link.Base, `将重试!`) - //避免影响后续猜测 - link.Offset_line = 0 - go func(link *m4s_link_item) { miss_download <- link }(link) - } else { - l.L(`W: `, e) - link.status = s_fail - } - } else { - if usedt := r.UsedTime.Seconds(); usedt > 700 { - l.L(`I: `, `hls切片下载慢`, usedt, `ms`) - } - link.status = s_fin - //存入cache - if _, ok := m4s_cache.Load(path + link.Base); !ok { - m4s_cache.Store(path+link.Base, r.Respon) - go func() { //移除 - time.Sleep(time.Second * time.Duration(savestream.hlsbuffersize+savestream.m4s_hls+2)) - m4s_cache.Delete(path + link.Base) - }() - } - break - } - } - }(links[i], savestream.path) - - //只记录最新 - if links[i].Offset_line > 0 { - last_download = links[i] - } - } - - //m3u8_url 将过期 - if p.Sys().GetSTime()+60 > expires { - if F.Get(&c.C).Get(`Liveing`); !c.C.Liveing { - break - } - if F.Get(&c.C).Get(`Live`); len(c.C.Live) == 0 { - break - } - // set expect - expires = time.Now().Add(time.Minute * 2).Unix() - } else { - time.Sleep(time.Second) - } - } - - if p.Checkfile().IsExist(savestream.path + "0.m3u8.dtmp") { - f := p.File() - f.FileWR(p.Filel{ - File: savestream.path + "0.m3u8.dtmp", - Loc: -1, - Context: []interface{}{"#EXT-X-ENDLIST"}, - }) - p.FileMove(savestream.path+"0.m3u8.dtmp", savestream.path+"0.m3u8") - } - - hls_msg.Push_tag(`close`, nil) - } - //set ro `` - savestream.path = `` - savestream.front = []byte{} //flv头及首tag置空 - savestream.stream.Push_tag("close", nil) - Ass_f("", time.Now()) //ass - l.L(`I: `, "结束") - - if !savestream.cancel.Islive() { - // l.L(`I: `,"退出") - break - } //cancel - /* - Savestream需要外部组件 - ffmpeg http://ffmpeg.org/download.html - */ - // if p.Checkfile().IsExist(savestream.path+".flv"){ - // l.L(`I: `,"转码中") - // p.Exec().Run(false, "ffmpeg", "-i", savestream.path+".flv", "-c", "copy", savestream.path+".mkv") - // if p.Checkfile().IsExist(savestream.path+".mkv"){os.Remove(savestream.path+".flv")} - // } - - // l.L(`I: `,"转码结束") - savestream.wait.Done() - savestream.cancel.Done() - } - savestream.wait.Done() - savestream.cancel.Done() } -//已func形式调用,将会停止保存直播流 -func Savestream_wait() { - if !savestream.cancel.Islive() { - return +func StreamOStop() { + if streamO.Status.Islive() { + streamO.Stop() } - - savestream.cancel.Done() - c.C.Log.Base(`savestream`).L(`I: `, "等待停止") - savestream.wait.Wait() } type Obs struct { @@ -2155,43 +941,12 @@ func AutoSend_silver_gift() { } } -var m4s_cache sync.Map //使用内存cache避免频繁io - -func get_m4s_cache(path string) (buf []byte, cached bool, err error) { - if b, ok := m4s_cache.Load(path); !ok { - f, e := os.OpenFile(path, os.O_RDONLY, 0644) - if e != nil { - err = e - return - } - defer f.Close() - - if b, e := io.ReadAll(f); e != nil { - err = e - return - } else { - buf = b - m4s_cache.Store(path, buf) - go func() { //移除 - time.Sleep(time.Second * time.Duration(savestream.m4s_hls+2)) - m4s_cache.Delete(path) - }() - } - } else { - cached = true - buf, _ = b.([]byte) - } - return -} - //直播Web服务口 func init() { flog := flog.Base_add(`直播Web服务`) if port_f, ok := c.C.K_v.LoadV(`直播Web服务口`).(float64); ok && port_f >= 0 { port := int(port_f) - base_dir := savestream.base_path - addr := "0.0.0.0:" if port == 0 { addr += strconv.Itoa(p.Sys().GetFreePort()) @@ -2211,250 +966,55 @@ func init() { w.Header().Set("Access-Control-Allow-Origin", "*") w.Header().Set("Connection", "keep-alive") w.Header().Set("Content-Transfer-Encoding", "binary") - start := time.Now() - - var path string = r.URL.Path[1:] - if !p.Checkfile().IsExist(base_dir + path) { - w.WriteHeader(http.StatusNotFound) + if len(streamO.getFirstM4S()) == 0 { + w.Header().Set("Retry-After", "1") + w.WriteHeader(http.StatusServiceUnavailable) return } - if savestream.path != "" && strings.Contains(path, filepath.Base(savestream.path)) { - w.Header().Set("Server", "live") - if filepath.Ext(path) == `.dtmp` { - if strings.Contains(path, ".flv") { - if len(savestream.front) == 0 { - w.Header().Set("Retry-After", "1") - w.WriteHeader(http.StatusServiceUnavailable) - return - } + // path = base_dir+path + w.Header().Set("Content-Type", "video/mp4") + w.WriteHeader(http.StatusOK) - // path = base_dir+path - w.Header().Set("Content-Type", "video/x-flv") - w.WriteHeader(http.StatusOK) + flusher, flushSupport := w.(http.Flusher) + if flushSupport { + flusher.Flush() + } - flusher, flushSupport := w.(http.Flusher) - if flushSupport { - flusher.Flush() - } + //写入hls头 + if _, err := w.Write(streamO.getFirstM4S()); err != nil { + return + } else if flushSupport { + flusher.Flush() + } + + cancel := make(chan struct{}) - //写入flv头,首tag - if _, err := w.Write(savestream.front); err != nil { - return + //hls切片 + streamO.Newst_m4s.Pull_tag(map[string]func(interface{}) bool{ + `m4s`: func(data interface{}) bool { + if b, ok := data.([]byte); ok { + if _, err := w.Write(b); err != nil { + close(cancel) + return true } else if flushSupport { flusher.Flush() } - - cancel := make(chan struct{}) - - //flv流关键帧间隔切片 - savestream.stream.Pull_tag(map[string]func(interface{}) bool{ - `stream`: func(data interface{}) bool { - if b, ok := data.([]byte); ok { - if _, err := w.Write(b); err != nil { - close(cancel) - return true - } else if flushSupport { - flusher.Flush() - } - } - return false - }, - `close`: func(data interface{}) bool { - close(cancel) - return true - }, - }) - - <-cancel - } else if strings.Contains(path, ".m3u8") { - if r.URL.Query().Get("type") == "mp4" { - if len(savestream.front) == 0 { - w.Header().Set("Retry-After", "1") - w.WriteHeader(http.StatusServiceUnavailable) - return - } - - // path = base_dir+path - w.Header().Set("Content-Type", "video/mp4") - w.WriteHeader(http.StatusOK) - - flusher, flushSupport := w.(http.Flusher) - if flushSupport { - flusher.Flush() - } - - //写入hls头 - if _, err := w.Write(savestream.front); err != nil { - return - } else if flushSupport { - flusher.Flush() - } - - cancel := make(chan struct{}) - - //hls切片 - savestream.stream.Pull_tag(map[string]func(interface{}) bool{ - `stream`: func(data interface{}) bool { - if b, ok := data.([]byte); ok { - if _, err := w.Write(b); err != nil { - close(cancel) - return true - } else if flushSupport { - flusher.Flush() - } - } - return false - }, - `close`: func(data interface{}) bool { - close(cancel) - return true - }, - }) - - <-cancel - } else { - w.Header().Set("Cache-Control", "max-age=1") - w.Header().Set("Content-Type", "application/vnd.apple.mpegurl") - w.Header().Set("Last-Modified", savestream.hls_stream.t.Format(http.TimeFormat)) - - // //经常m4s下载速度赶不上,使用阻塞避免频繁获取列表带来的卡顿 - // if time.Now().Sub(savestream.hls_stream.t).Seconds() > 1 { - // time.Sleep(time.Duration(3)*time.Second) - // } - - res := savestream.hls_stream.b - - if len(res) == 0 { - w.Header().Set("Retry-After", "1") - w.WriteHeader(http.StatusServiceUnavailable) - return - } - - //Server-Timing - w.Header().Set("Server-Timing", fmt.Sprintf("dur=%d", time.Since(start).Microseconds())) - - if _, err := w.Write(res); err != nil { - flog.L(`E: `, err) - return - } - } - } - } else if filepath.Ext(path) == `.m4s` { - w.Header().Set("Server", "live") - w.Header().Set("Cache-Control", "Cache-Control:public, max-age=3600") - - path = base_dir + path - - buf, cached, e := get_m4s_cache(path) - - if e != nil { - w.Header().Set("Retry-After", "1") - w.WriteHeader(http.StatusServiceUnavailable) - return - } - - if len(buf) == 0 { - flog.L(`W: `, `buf size 0`) - w.Header().Set("Retry-After", "1") - w.WriteHeader(http.StatusServiceUnavailable) - return - } - - //Server-Timing - w.Header().Add("Server-Timing", fmt.Sprintf("cache=%v;dur=%d", cached, time.Since(start).Microseconds())) - w.WriteHeader(http.StatusOK) - if _, err := w.Write(buf); err != nil { - flog.L(`E: `, err) - return - } - } - } else { - w.Header().Set("Server", "file") - if r.URL.Query().Get("type") == "mp4" { //hls拼合 - dir := base_dir + filepath.Dir(path) + "/" - if !p.Checkfile().IsExist(dir + `0.m3u8`) { - w.WriteHeader(http.StatusNotFound) - return - } - var m4slist []string - if e := filepath.WalkDir(dir, func(path string, info fs.DirEntry, err error) error { - if err != nil { - return err - } - if filepath.Ext(info.Name()) == ".m4s" { - m4slist = append(m4slist, path) - } - return nil - }); e != nil { - flog.L(`E: `, e) - w.WriteHeader(http.StatusServiceUnavailable) - return - } - m4slist = append(m4slist[len(m4slist)-1:], m4slist[:len(m4slist)-1]...) - for _, v := range m4slist { - f, err := os.OpenFile(v, os.O_RDONLY, 0644) - if err != nil { - w.WriteHeader(http.StatusServiceUnavailable) - flog.L(`E: `, err) - return - } - io.Copy(w, f) - f.Close() } - return - } - http.FileServer(http.Dir(base_dir)).ServeHTTP(w, r) - } - } - now = func(w http.ResponseWriter, r *http.Request) { - //header - w.Header().Set("Access-Control-Allow-Credentials", "true") - w.Header().Set("Access-Control-Allow-Headers", "*") - w.Header().Set("Access-Control-Allow-Methods", "POST, GET, OPTIONS") - w.Header().Set("Access-Control-Allow-Origin", "*") - w.Header().Set("Cache-Control", "max-age=3") - - //最新直播流 - if savestream.path == `` { - flog.L(`T: `, `还没有下载直播流-直播流为空`) - w.WriteHeader(http.StatusNotFound) - return - } - - path := filepath.Base(savestream.path) - if strings.Contains(c.C.Live[0], "flv") { - path += ".flv.dtmp" - } else { - path += "/0.m3u8.dtmp" - } + return false + }, + `close`: func(data interface{}) bool { + close(cancel) + return true + }, + }) - if !p.Checkfile().IsExist(base_dir + path) { - flog.L(`T: `, `还没有下载直播流-文件未能找到`) - w.WriteHeader(http.StatusNotFound) - } else { - u, e := url.Parse("../" + path) - if e != nil { - flog.L(`E: `, e) - w.Header().Set("Retry-After", "1") - w.WriteHeader(http.StatusServiceUnavailable) - return - } - if r.URL.RawQuery != "" { - u.RawQuery = r.URL.RawQuery - } - // r.URL = - // root(w, r) - w.Header().Set("Location", r.URL.ResolveReference(u).String()) - w.WriteHeader(http.StatusTemporaryRedirect) - } - return + <-cancel } ) s.Handle(map[string]func(http.ResponseWriter, *http.Request){ - `/`: root, - `/now`: now, + `/`: root, `/exit`: func(w http.ResponseWriter, r *http.Request) { s.Server.Shutdown(context.Background()) }, diff --git a/Reply/Reply.go b/Reply/Reply.go index 58fb8c8..7281c94 100644 --- a/Reply/Reply.go +++ b/Reply/Reply.go @@ -593,7 +593,7 @@ func (replyF) preparing(s string) { { //附加功能 obs结束 `savestream`结束 Obs_R(false) Obsf(false) - Savestream_wait() + streamO.Stop() go ShowRevf() c.C.Liveing = false } @@ -618,7 +618,7 @@ func (replyF) live(s string) { { //附加功能 obs录播 Obsf(true) Obs_R(true) - go Savestreamf() + go streamO.Start() } { c.C.Rev = 0.0 //营收 diff --git a/Reply/steam/stream.go b/Reply/steam/stream.go deleted file mode 100644 index c2a5751..0000000 --- a/Reply/steam/stream.go +++ /dev/null @@ -1,49 +0,0 @@ -package stream - -import ( - "path/filepath" - - c "github.com/qydysky/bili_danmu/CV" - - log "github.com/qydysky/part/log" - signal "github.com/qydysky/part/signal" -) - -type Stream struct { - Status *signal.Signal //IsLive()是否运行中 - log *log.Log_interface - config Stream_Config //配置 -} - -type Stream_Config struct { - save_path string //直播流保存目录 - want_qn int //直播流清晰度 - want_type string //直播流类型 - bufsize int //直播hls流缓冲 - banlance_host bool //直播hls流均衡 -} - -func (t *Stream) LoadConfig() { - //读取配置 - if path, ok := c.C.K_v.LoadV("直播流保存位置").(string); ok { - if path, err := filepath.Abs(path); err == nil { - t.config.save_path = path + "/" - } - } - if v, ok := c.C.K_v.LoadV(`直播hls流缓冲`).(float64); ok && v > 0 { - t.config.bufsize = int(v) - } - if v, ok := c.C.K_v.LoadV(`直播hls流均衡`).(bool); ok { - t.config.banlance_host = v - } - if v, ok := c.C.K_v.LoadV(`直播流清晰度`).(int); ok { - t.config.want_qn = v - } - if v, ok := c.C.K_v.LoadV(`直播流类型`).(string); ok { - t.config.want_type = v - } -} - -func (t *Stream) Start() { - t.log = c.C.Log.Base(`直播流保存`) -} diff --git a/Reply/stream.go b/Reply/stream.go new file mode 100644 index 0000000..2950a74 --- /dev/null +++ b/Reply/stream.go @@ -0,0 +1,499 @@ +package reply + +import ( + "bytes" + "encoding/base64" + "errors" + "io" + "net/http" + "net/url" + "path/filepath" + "strconv" + "strings" + "time" + + c "github.com/qydysky/bili_danmu/CV" + F "github.com/qydysky/bili_danmu/F" + + p "github.com/qydysky/part" + funcCtrl "github.com/qydysky/part/funcCtrl" + log "github.com/qydysky/part/log" + msgq "github.com/qydysky/part/msgq" + reqf "github.com/qydysky/part/reqf" + signal "github.com/qydysky/part/signal" + sync "github.com/qydysky/part/sync" +) + +type M4SStream struct { + Status *signal.Signal //IsLive()是否运行中 + exitSign *signal.Signal //IsLive()是否等待退出中 + log *log.Log_interface + config M4SStream_Config //配置 + stream_last_modified time.Time //流地址更新时间 + stream_expires int64 //流到期时间 + last_m4s *m4s_link_item //最后一个切片 + stream_hosts sync.Map //使用的流服务器 + Newst_m4s *msgq.Msgq //m4s消息 tag:m4s + first_m4s []byte //m4s起始块 +} + +type M4SStream_Config struct { + save_path string //直播流保存目录 + want_qn int //直播流清晰度 + want_type string //直播流类型 + bufsize int //直播hls流缓冲 + banlance_host bool //直播hls流均衡 +} + +type m4s_link_item struct { + Url string // m4s链接 + Base string // m4s文件名 + status int // 下载状态 0:未下载 1:正在下载 2:下载完成 3:下载失败 + data []byte // 下载的数据 +} + +func (t *m4s_link_item) isInit() bool { + return strings.Contains(t.Base, "h") +} + +func (t *m4s_link_item) getNo() (int, error) { + var base = t.Base + if t.isInit() { + base = base[1:] + } + return strconv.Atoi(base[:len(base)-4]) +} + +func (t *M4SStream) LoadConfig(kv *sync.Map, l *log.Log_interface) { + //读取配置 + if path, ok := kv.LoadV("直播流保存位置").(string); ok { + if path, err := filepath.Abs(path); err == nil { + t.config.save_path = path + "/" + } + } + if v, ok := kv.LoadV(`直播hls流缓冲`).(float64); ok && v > 0 { + t.config.bufsize = int(v) + } + if v, ok := kv.LoadV(`直播hls流均衡`).(bool); ok { + t.config.banlance_host = v + } + if v, ok := kv.LoadV(`直播流清晰度`).(float64); ok { + t.config.want_qn = int(v) + } + if v, ok := kv.LoadV(`直播流类型`).(string); ok { + t.config.want_type = v + } + t.log = l.Base(`直播流保存`) +} + +func (t *M4SStream) getFirstM4S() []byte { + return t.first_m4s +} + +func (t *M4SStream) fetchCheckStream(tmpc *c.Common) bool { + // 获取流地址 + tmpc.Live_want_qn = t.config.want_qn + if F.Get(tmpc).Get(`Live`); len(tmpc.Live) == 0 { + return false + } + + // 保存流地址过期时间 + if m3u8_url, err := url.Parse(tmpc.Live[0]); err != nil { + t.log.L(`E: `, err.Error()) + return false + } else { + expires, _ := strconv.Atoi(m3u8_url.Query().Get("expires")) + t.stream_expires = int64(expires) + } + + // 检查是否可以获取 + CookieM := make(map[string]string) + tmpc.Cookie.Range(func(k, v interface{}) bool { + CookieM[k.(string)] = v.(string) + return true + }) + + var req = reqf.New() + if e := req.Reqf(reqf.Rval{ + Url: tmpc.Live[0], + Retry: 10, + SleepTime: 1000, + Proxy: tmpc.Proxy, + Header: map[string]string{ + `Cookie`: reqf.Map_2_Cookies_String(CookieM), + }, + Timeout: 5 * 1000, + JustResponseCode: true, + }); e != nil { + t.log.L(`W: `, e) + } + + if req.Response == nil { + t.log.L(`W: `, `live响应错误`) + return false + } else if req.Response.StatusCode != 200 { + t.log.L(`W: `, `live响应错误`, req.Response.Status, string(req.Respon)) + return false + } + return true +} + +func (t *M4SStream) fetchParseM3U8(tmpc *c.Common) (m4s_links []*m4s_link_item, m3u8_addon []byte) { + // 请求解析m3u8内容 + for _, v := range tmpc.Live { + m3u8_url, err := url.Parse(v) + if err != nil { + t.log.L(`E: `, err.Error()) + return + } + + // 设置请求参数 + rval := reqf.Rval{ + Url: m3u8_url.String(), + ConnectTimeout: 2000, + ReadTimeout: 1000, + Timeout: 2000, + Proxy: c.C.Proxy, + Header: map[string]string{ + `Host`: m3u8_url.Host, + `User-Agent`: `Mozilla/5.0 (X11; Linux x86_64; rv:83.0) Gecko/20100101 Firefox/83.0`, + `Accept`: `*/*`, + `Accept-Language`: `zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2`, + `Accept-Encoding`: `gzip, deflate, br`, + `Origin`: `https://live.bilibili.com`, + `Connection`: `keep-alive`, + `Pragma`: `no-cache`, + `Cache-Control`: `no-cache`, + `Referer`: "https://live.bilibili.com/", + }, + } + if !t.stream_last_modified.IsZero() { + rval.Header[`If-Modified-Since`] = t.stream_last_modified.Add(time.Second).Format("Mon, 02 Jan 2006 15:04:05 CST") + } + + // 开始请求 + var r = reqf.New() + if e := r.Reqf(rval); e != nil { + continue + } + + if r.Response.StatusCode == http.StatusNotModified { + t.log.L(`T: `, `hls未更改`) + return + } + + // 保存最后m3u8修改时间 + if last_mod, ok := r.Response.Header[`Last-Modified`]; ok && len(last_mod) > 0 { + if lm, e := time.Parse("Mon, 02 Jan 2006 15:04:05 CST", last_mod[0]); e == nil { + t.stream_last_modified = lm + } + } + + // m3u8字节流 + var m3u8_respon = r.Respon + + // base64解码 + if len(m3u8_respon) != 0 && !bytes.Contains(m3u8_respon, []byte("#")) { + m3u8_respon, err = base64.StdEncoding.DecodeString(string(m3u8_respon)) + if err != nil { + t.log.L(`W: `, err, string(m3u8_respon)) + return + } + } + + // 解析m3u8 + for _, line := range bytes.Split(m3u8_respon, []byte("\n")) { + if len(line) == 0 { + continue + } + + var m4s_link = "" //切片文件名 + + //获取附加的m3u8字节 忽略bili定制拓展 + if !bytes.Contains(line, []byte(`#EXT-X-BILI`)) { + if t.last_m4s == nil { + m3u8_addon = append(m3u8_addon, line...) + m3u8_addon = append(m3u8_addon, []byte("\n")...) + } else { + if bytes.Contains(line, []byte(`#EXTINF`)) || + !bytes.Contains(line, []byte(`#`)) { + m3u8_addon = append(m3u8_addon, line...) + m3u8_addon = append(m3u8_addon, []byte("\n")...) + } + } + } + + //获取切片文件名 + if bytes.Contains(line, []byte("EXT-X-MAP")) { + o := bytes.Index(line, []byte(`EXT-X-MAP:URI="`)) + 15 + e := bytes.Index(line[o:], []byte(`"`)) + o + m4s_link = string(line[o:e]) + } else if bytes.Contains(line, []byte("#EXT-X")) { //忽略扩展标签 + continue + } else if bytes.Contains(line, []byte(".m4s")) { + m4s_link = string(line) + } else { + continue + } + + //获取切片地址 + u, e := url.Parse("./" + m4s_link + "?trid=" + m3u8_url.Query().Get("trid")) + if e != nil { + t.log.L(`E: `, e) + return + } + + //将切片添加到返回切片数组 + m4s_links = append(m4s_links, &m4s_link_item{ + Url: m3u8_url.ResolveReference(u).String(), + Base: m4s_link, + }) + } + + // 设置最后的切片 + defer func(last_m4s *m4s_link_item) { + t.last_m4s = last_m4s + }(m4s_links[len(m4s_links)-1]) + + if t.last_m4s == nil { + return + } + + // 只返回新增加的 + for k, m4s_link := range m4s_links { + if m4s_link.Base == t.last_m4s.Base { + // 只返回新增加的切片 + m4s_links = m4s_links[k+1:] + + // 只返回新增加的m3u8字节 + if index := bytes.Index(m3u8_addon, []byte(m4s_link.Base)); index != -1 { + index += len([]byte(m4s_link.Base)) + if index == len(m3u8_addon) { + m3u8_addon = []byte{} + } else { + m3u8_addon = m3u8_addon[index+1:] + } + } + return + } + } + + // 来到此处说明出现了丢失 尝试补充 + var guess_end_no, _ = m4s_links[0].getNo() + for no, _ := t.last_m4s.getNo(); no < guess_end_no; no += 1 { + // 补充m3u8 + m3u8_addon = append([]byte(`#EXTINF:1.00\n`+strconv.Itoa(no)+`.m4s\n`), m3u8_addon...) + + //获取切片地址 + u, e := url.Parse("./" + strconv.Itoa(no) + `.m4s`) + if e != nil { + t.log.L(`E: `, e) + return + } + + //将切片添加到返回切片数组前 + m4s_links = append([]*m4s_link_item{ + { + Url: m3u8_url.ResolveReference(u).String(), + Base: strconv.Itoa(no) + `.m4s`, + }, + }, m4s_links...) + } + + // 请求解析成功,退出获取循环 + break + } + + return +} + +func (t *M4SStream) saveStream(tmpc *c.Common) { + // 设置保存路径 + var save_path = t.config.save_path + strconv.Itoa(tmpc.Roomid) + "_" + time.Now().Format("2006_01_02_15-04-05-000") + `/` + + // 显示保存位置 + if rel, err := filepath.Rel(t.config.save_path, save_path); err == nil { + t.log.L(`I: `, "保存到", rel+`/0.m3u8`) + } else { + t.log.L(`W: `, err) + } + + // 获取流 + if strings.Contains(tmpc.Live[0], `m3u8`) { + t.stream_expires = time.Now().Add(time.Minute * 2).Unix() // 流链接过期时间 + + // 同时下载数限制 + var download_limit = funcCtrl.BlockFuncN{ + Max: 2, + } + + // 下载循环 + for download_seq := []*m4s_link_item{}; ; { + // 下载切片 + for _, v := range download_seq { + v.status = 1 // 设置切片状态为正在下载 + + // 均衡负载 + if link_url, e := url.Parse(v.Url); e == nil { + if t.stream_hosts.Len() != 1 { + t.stream_hosts.Range(func(key, value interface{}) bool { + // 故障转移 + if v.status == 3 && link_url.Host == key.(string) { + return true + } + // 随机 + link_url.Host = key.(string) + return false + }) + } + v.Url = link_url.String() + } + + download_limit.Block() + go func(link *m4s_link_item, path string) { + defer download_limit.UnBlock() + + r := reqf.New() + if e := r.Reqf(reqf.Rval{ + Url: link.Url, + SaveToPath: path + link.Base, + ConnectTimeout: 2000, + ReadTimeout: 1000, + Timeout: 2000, + Proxy: tmpc.Proxy, + }); e != nil && !errors.Is(e, io.EOF) { + link.status = 3 // 设置切片状态为下载失败 + } else { + if usedt := r.UsedTime.Seconds(); usedt > 700 { + t.log.L(`I: `, `hls切片下载慢`, usedt, `ms`) + } + link.data = r.Respon + link.status = 2 // 设置切片状态为下载完成 + } + }(v, save_path) + } + + // 等待队列下载完成 + download_limit.None() + download_limit.UnNone() + + //添加失败切片 传递切片 + { + var tmp_seq []*m4s_link_item + for _, v := range download_seq { + if strings.Contains(v.Base, `h`) { + t.first_m4s = v.data + } + + if v.status == 3 { + tmp_seq = append(tmp_seq, v) + } else { + t.Newst_m4s.Push_tag(`m4s`, v.data) + } + } + download_seq = tmp_seq + } + + // 停止录制 + if !t.Status.Islive() { + if len(download_seq) != 0 { + t.log.L(`I: `, `下载最后切片:`, len(download_seq)) + continue + } + break + } + + // 刷新流地址 + if time.Now().Unix()+60 > t.stream_expires { + t.fetchCheckStream(tmpc) + } + + // 获取解析m3u8 + var m4s_links, m3u8_addon = t.fetchParseM3U8(tmpc) + if len(m4s_links) == 0 { + time.Sleep(time.Second) + continue + } + + // 添加新切片到下载队列 + download_seq = append(download_seq, m4s_links...) + + // 添加m3u8字节 + p.File().FileWR(p.Filel{ + File: save_path + "0.m3u8.dtmp", + Loc: -1, + Context: []interface{}{m3u8_addon}, + }) + } + + // 结束 + if p.Checkfile().IsExist(save_path + "0.m3u8.dtmp") { + f := p.File() + f.FileWR(p.Filel{ + File: save_path + "0.m3u8.dtmp", + Loc: -1, + Context: []interface{}{"#EXT-X-ENDLIST"}, + }) + p.FileMove(save_path+"0.m3u8.dtmp", save_path+"0.m3u8") + } + + } +} + +func (t *M4SStream) Start() { + // 清晰度-1 不保存 + if t.config.want_qn == -1 { + return + } + + // 状态检测与设置 + if t.Status.Islive() { + t.log.L(`T: `, `已存在实例`) + return + } + t.Status = signal.Init() + defer t.Status.Done() + + // 初始化切片消息 + t.Newst_m4s = msgq.New(10) + + var tmpc = c.C + + // 主循环 + for t.Status.Islive() { + // 是否在直播 + F.Get(&tmpc).Get(`Liveing`) + if !tmpc.Liveing { + t.log.L(`T: `, `未直播`) + break + } + + // 获取 and 检查流地址状态 + if !t.fetchCheckStream(&tmpc) { + time.Sleep(time.Second * 5) + continue + } + + // 设置均衡负载 + for _, v := range tmpc.Live { + if url_struct, e := url.Parse(v); e == nil { + t.stream_hosts.Store(url_struct.Hostname(), nil) + } + if !t.config.banlance_host { + break + } + } + + // 保存流 + t.saveStream(&tmpc) + } + + t.log.L(`T: `, `结束`) + t.exitSign.Done() +} + +func (t *M4SStream) Stop() { + t.exitSign = signal.Init() + t.Status.Done() + t.exitSign.Wait() +} diff --git a/bili_danmu.go b/bili_danmu.go index 69d476f..0e172b2 100644 --- a/bili_danmu.go +++ b/bili_danmu.go @@ -270,7 +270,7 @@ func Demo(roomid ...int) { { //附加功能 进房间发送弹幕 直播流保存 营收 go reply.Entry_danmu() - go reply.Savestreamf() + c.C.Danmu_Main_mq.Push_tag(`savestream`, nil) go reply.ShowRevf() //小心心 go F.F_x25Kn() @@ -310,7 +310,7 @@ func Demo(roomid ...int) { } } { //附加功能 直播流停止 - reply.Savestream_wait() + reply.StreamOStop() reply.Save_to_json(-1, []interface{}{`{}]`}) } p.Sys().Timeoutf(1) -- 2.39.2