package reply
import (
- "bytes"
"context"
- "encoding/base64"
"encoding/json"
- "errors"
"fmt"
"io"
- "io/fs"
"io/ioutil"
"math"
"net/http"
- "net/url"
- "os"
"os/exec"
- "path"
"path/filepath"
"strconv"
"strings"
send "github.com/qydysky/bili_danmu/Send"
p "github.com/qydysky/part"
- funcCtrl "github.com/qydysky/part/funcCtrl"
- idpool "github.com/qydysky/part/idpool"
limit "github.com/qydysky/part/limit"
msgq "github.com/qydysky/part/msgq"
- reqf "github.com/qydysky/part/reqf"
- s "github.com/qydysky/part/signal"
psync "github.com/qydysky/part/sync"
- util "github.com/qydysky/part/util"
web "github.com/qydysky/part/web"
obsws "github.com/christopher-dG/go-obs-websocket"
return
}
- if rel, err := filepath.Rel(savestream.base_path, ass.file); err == nil {
+ if rel, err := filepath.Rel(streamO.config.save_path, ass.file); err == nil {
c.C.Log.Base(`Ass`).L(`I: `, "保存到", rel+".ass")
} else {
c.C.Log.Base(`Ass`).L(`I: `, "保存到", ass.file+".ass")
//hls
//https://datatracker.ietf.org/doc/html/draft-pantos-http-live-streaming
-
-//直播流保存
-type Savestream struct {
- base_path string
- path string
- hls_stream struct {
- b []byte //发送给客户的m3u8字节
- t time.Time
- }
- front []byte //flv头及首tag or hls的初始化m4s
- stream *msgq.Msgq //发送给客户的flv流关键帧间隔片 or hls的fmp4片
-
- m4s_hls int //hls list 中的m4s数量
- hlsbuffersize int //hls list缓冲m4s数量
- hls_banlance_host bool //使用均衡hls服务器
-
- wait *s.Signal
- cancel *s.Signal
- skipFunc funcCtrl.SkipFunc
-}
-
-type hls_generate struct {
- hls_first_fmp4_name string
- hls_file_header []byte //发送给客户的m3u8不变头
- m4s_list []*m4s_link_item //m4s列表 缓冲
-}
-
-type m4s_link_item struct { //使用指针以设置是否已下载
- Url string // m4s链接
- Base string //m4s文件名
- Offset_line int //m3u8中的行下标
- status int //该m4s下载状态 s_noload:未下载 s_loading正在下载 s_fin下载完成 s_fail下载失败
- isshow bool
-}
-
-//m4s状态
-const (
- s_noload = iota
- s_loading
- s_fin
- s_fail
-)
-
-var savestream = Savestream{
- stream: msgq.New(10), //队列最多保留10个关键帧间隔片
- m4s_hls: 8,
-}
+var streamO = new(M4SStream)
func init() {
//使用带tag的消息队列在功能间传递消息
c.C.Danmu_Main_mq.Pull_tag(msgq.FuncMap{
`savestream`: func(data interface{}) bool {
- if savestream.cancel.Islive() {
- Savestream_wait()
+ if streamO.Status.Islive() {
+ streamO.Stop()
} else {
- go Savestreamf()
+ streamO.LoadConfig(&c.C.K_v, c.C.Log)
+ go streamO.Start()
}
-
return false
},
})
- //base_path
- if path, ok := c.C.K_v.LoadV("直播流保存位置").(string); ok {
- if path, err := filepath.Abs(path); err == nil {
- savestream.base_path = path + "/"
- }
- }
- if v, ok := c.C.K_v.LoadV(`直播hls流缓冲`).(float64); ok && v > 0 {
- savestream.hlsbuffersize = int(v)
- }
- if v, ok := c.C.K_v.LoadV(`直播hls流均衡`).(bool); ok {
- savestream.hls_banlance_host = v
- }
-}
-
-//已go func形式调用,将会获取直播流
-func Savestreamf() {
- l := c.C.Log.Base(`savestream`)
-
- //避免多次开播导致的多次触发
- {
- if savestream.skipFunc.NeedSkip() {
- l.L(`T: `, `已存在实例`)
- return
- }
- defer savestream.skipFunc.UnSet()
- }
-
- want_qn, ok := c.C.K_v.LoadV("直播流清晰度").(float64)
- if !ok || want_qn < 0 {
- return
- }
- c.C.Live_want_qn = int(want_qn)
-
- F.Get(&c.C).Get(`Live`)
-
- if savestream.cancel.Islive() {
- return
- }
-
- //random host load balance
- Host_list := []string{}
- if savestream.hls_banlance_host {
- for _, v := range c.C.Live {
- url_struct, e := url.Parse(v)
- if e != nil {
- continue
- }
- Host_list = append(Host_list, url_struct.Hostname())
- }
- }
-
- var (
- no_found_link = errors.New("no_found_link")
- no_Modified = errors.New("no_Modified")
- last_hls_Modified time.Time
- hls_get_link = func(m3u8_urls []string, last_download *m4s_link_item) (need_download []*m4s_link_item, m3u8_file_addition []byte, expires int, err error) {
- var (
- r *reqf.Req
- m3u8_url *url.URL
- )
- for index := 0; index < len(m3u8_urls); index += 1 {
- r = reqf.New()
- if tmp, e := url.Parse(m3u8_urls[index]); e != nil {
- err = e
- return
- } else {
- m3u8_url = tmp
- }
-
- rval := reqf.Rval{
- Url: m3u8_url.String(),
- ConnectTimeout: 2000,
- ReadTimeout: 1000,
- Timeout: 2000,
- Proxy: c.C.Proxy,
- Header: map[string]string{
- `Host`: m3u8_url.Host,
- `User-Agent`: `Mozilla/5.0 (X11; Linux x86_64; rv:83.0) Gecko/20100101 Firefox/83.0`,
- `Accept`: `*/*`,
- `Accept-Language`: `zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2`,
- `Accept-Encoding`: `gzip, deflate, br`,
- `Origin`: `https://live.bilibili.com`,
- `Connection`: `keep-alive`,
- `Pragma`: `no-cache`,
- `Cache-Control`: `no-cache`,
- `Referer`: "https://live.bilibili.com/",
- },
- }
- if !last_hls_Modified.IsZero() {
- rval.Header[`If-Modified-Since`] = last_hls_Modified.Add(time.Second).Format("Mon, 02 Jan 2006 15:04:05 CST")
- }
- if e := r.Reqf(rval); e != nil {
- if index+1 < len(m3u8_urls) {
- continue
- }
- err = e
- return
- }
- break
- }
-
- if usedt := r.UsedTime.Seconds(); usedt > 3000 {
- l.L(`I: `, `hls列表下载慢`, usedt, `ms`)
- }
- if r.Response.StatusCode == http.StatusNotModified {
- l.L(`T: `, `hls未更改`)
- err = no_Modified
- return
- }
- //last_hls_Modified
- if t, ok := r.Response.Header[`Last-Modified`]; ok && len(t) > 0 {
- if lm, e := time.Parse("Mon, 02 Jan 2006 15:04:05 CST", t[0]); e == nil {
- last_hls_Modified = lm
- } else {
- l.L(`T: `, e)
- }
- }
-
- query := m3u8_url.Query()
- trid := query.Get("trid")
- expires, _ = strconv.Atoi(query.Get("expires"))
- buf := r.Respon
-
- //base-64
- if len(buf) != 0 && !bytes.Contains(buf, []byte("#")) {
- buf, err = base64.StdEncoding.DecodeString(string(buf))
- if err != nil {
- l.L(`W: `, err, string(buf))
- return
- }
- }
-
- var m4s_links []*m4s_link_item
- lines := bytes.Split(buf, []byte("\n"))
- for i := 0; i < len(lines); i += 1 {
- line := lines[i]
- m4s_link := ""
-
- if bytes.Contains(line, []byte("EXT-X-MAP")) {
- o := bytes.Index(line, []byte(`EXT-X-MAP:URI="`)) + 15
- e := bytes.Index(line[o:], []byte(`"`)) + o
- m4s_link = string(line[o:e])
- } else if bytes.Contains(lines[i], []byte("#EXT-X")) { //忽略扩展标签
- continue
- } else if bytes.Contains(line, []byte(".m4s")) {
- m4s_link = string(line)
- }
-
- if m4s_link == "" {
- continue
- }
-
- u, e := url.Parse("./" + m4s_link + "?trid=" + trid)
- if e != nil {
- err = e
- return
- }
- m4s_links = append(m4s_links, &m4s_link_item{
- Url: m3u8_url.ResolveReference(u).String(),
- Base: m4s_link,
- Offset_line: i,
- })
- }
- if len(m4s_links) == 0 {
- err = no_found_link
- return
- }
-
- if last_download == nil {
- m3u8_file_addition = buf
- need_download = m4s_links
- return
- }
-
- var found bool
- for i := 0; i < len(m4s_links); i += 1 {
- if found {
- offset := m4s_links[i].Offset_line - 1
- for i := offset; i < len(lines); i += 1 {
- if bytes.Contains(lines[i], []byte("#EXT-X")) { //忽略扩展标签
- continue
- }
- m3u8_file_addition = append(m3u8_file_addition, lines[i]...)
- m3u8_file_addition = append(m3u8_file_addition, []byte("\n")...)
- }
- m3u8_file_addition = m3u8_file_addition[:len(m3u8_file_addition)-1]
-
- need_download = append(need_download, m4s_links[i:]...)
- break
- }
- found = (*last_download).Base == m4s_links[i].Base
- }
- if !found {
- offset := m4s_links[1].Offset_line - 1
- for i := offset; i < len(lines); i += 1 {
- if bytes.Contains(lines[i], []byte("#EXT-X")) { //忽略扩展标签
- continue
- }
- m3u8_file_addition = append(m3u8_file_addition, lines[i]...)
- m3u8_file_addition = append(m3u8_file_addition, []byte("\n")...)
- }
- m3u8_file_addition = m3u8_file_addition[:len(m3u8_file_addition)-1]
-
- need_download = append(need_download, m4s_links[1:]...)
- }
-
- return
- }
- flv_get_link = func(link string) (need_download string, expires int, err error) {
- need_download = link
-
- url_struct, e := url.Parse(link)
- if e != nil {
- err = e
- return
- }
- query := url_struct.Query()
- expires, _ = strconv.Atoi(query.Get("expires"))
-
- return
- }
- )
-
- for {
- F.Get(&c.C).Get(`Liveing`)
- if !c.C.Liveing {
- break
- }
-
- F.Get(&c.C).Get(`Live`)
- if len(c.C.Live) == 0 {
- break
- }
-
- savestream.path = savestream.base_path
-
- savestream.path += strconv.Itoa(c.C.Roomid) + "_" + time.Now().Format("2006_01_02_15-04-05-000")
-
- savestream.wait = s.Init()
- savestream.cancel = s.Init()
-
- CookieM := make(map[string]string)
- c.C.Cookie.Range(func(k, v interface{}) bool {
- CookieM[k.(string)] = v.(string)
- return true
- })
-
- { //重试
- r := reqf.New()
- go func() {
- savestream.cancel.Wait()
- r.Close()
- }()
- l.L(`I: `, "尝试连接live")
- if e := r.Reqf(reqf.Rval{
- Url: c.C.Live[0],
- Retry: 10,
- SleepTime: 1000,
- Proxy: c.C.Proxy,
- Header: map[string]string{
- `Cookie`: reqf.Map_2_Cookies_String(CookieM),
- },
- Timeout: 5 * 1000,
- JustResponseCode: true,
- }); e != nil {
- l.L(`W: `, e)
- }
-
- if r.Response == nil {
- l.L(`W: `, `live响应错误`)
- savestream.wait.Done()
- savestream.cancel.Done()
- time.Sleep(time.Second * 5)
- continue
- } else if r.Response.StatusCode != 200 {
- l.L(`W: `, `live响应错误`, r.Response.Status, string(r.Respon))
- savestream.wait.Done()
- savestream.cancel.Done()
- time.Sleep(time.Second * 5)
- continue
- }
- }
-
- if strings.Contains(c.C.Live[0], "flv") {
- if rel, err := filepath.Rel(savestream.base_path, savestream.path); err == nil {
- l.L(`I: `, "保存到", rel+".flv")
- } else {
- l.L(`I: `, "保存到", savestream.path+".flv")
- l.L(`W: `, err)
- }
- Ass_f(savestream.path, time.Now())
-
- // no expect qn
- exit_chan := s.Init()
- go func() {
- savestream.cancel.Wait()
- exit_chan.Done()
- }()
-
- type link_stream struct {
- id *idpool.Id
- front []byte
- keyframe [][]byte
- // sync_buf []byte
- close func()
- }
-
- //chans
- var (
- reqs = msgq.New(10)
- id_pool = idpool.New()
- )
-
- //文件
- out, err := os.Create(savestream.path + ".flv" + ".dtmp")
- if err != nil {
- l.L(`E: `, err)
- return
- }
-
- //数据整合
- {
- type id_close struct {
- id uintptr
- close func()
- }
-
- var (
- reqs_used_id []id_close
- reqs_remove_id []id_close
-
- reqs_keyframe [][][]byte
-
- reqs_func_block funcCtrl.BlockFunc
- last_keyframe_timestamp int
- )
- reqs.Pull_tag(map[string]func(interface{}) bool{
- `req`: func(data interface{}) bool {
- req, ok := data.(link_stream)
-
- if !ok {
- return false
- }
-
- if len(req.keyframe) == 0 {
- // fmt.Println(`没有keyframe,退出`)
- req.close()
- return false
- }
- // fmt.Println(`处理req_id`,req.id.Id,`keyframe_len`,len(req.keyframe))
-
- if offset, _ := out.Seek(0, 1); offset == 0 {
- // fmt.Println(`添加头`,len(req.front))
- //stream
- savestream.front = req.front
- out.Write(req.front)
- }
-
- reqs_func_block.Block()
- defer reqs_func_block.UnBlock()
-
- for i := 0; i < len(reqs_remove_id); i += 1 {
- if reqs_remove_id[i].id == req.id.Id {
- req.close()
- return false
- }
- }
-
- var reqs_keyframe_index int = len(reqs_used_id)
- {
- var isnew bool = true
- for i := 0; i < len(reqs_used_id); i += 1 {
- if reqs_used_id[i].id == req.id.Id {
- reqs_keyframe_index = i
- isnew = false
- break
- }
- }
- if isnew {
- // fmt.Println(`新req`,req.id.Id,reqs_keyframe_index)
- reqs_used_id = append(reqs_used_id, id_close{
- id: req.id.Id,
- close: req.close,
- })
- }
- }
-
- if len(reqs_used_id) == 1 {
- // l.L(`T: `,"单req写入",len(req.keyframe))
- last_keyframe_timestamp, _ = Keyframe_timebase(req.keyframe, last_keyframe_timestamp)
-
- for i := 0; i < len(req.keyframe); i += 1 {
- //stream
- savestream.stream.Push_tag("stream", req.keyframe[i])
- out.Write(req.keyframe[i])
- }
- return false
- }
-
- for reqs_keyframe_index >= len(reqs_keyframe) {
- reqs_keyframe = append(reqs_keyframe, [][]byte{})
- }
- reqs_keyframe[reqs_keyframe_index] = append(reqs_keyframe[reqs_keyframe_index], req.keyframe...)
-
- // fmt.Println(`merge,添加reqs_keyframe数据`,reqs_keyframe_index,len(reqs_keyframe[reqs_keyframe_index]))
-
- for _, v := range reqs_keyframe {
- if len(v) == 0 {
- // fmt.Println(`merge,req无数据`,k)
- return false
- }
- }
-
- if success_last_keyframe_timestamp, b, merged := Merge_stream(reqs_keyframe, last_keyframe_timestamp); merged == 0 {
- // fmt.Println(`merge失败,reqs_keyframe[1]`,reqs_keyframe[1][0][:11],reqs_keyframe[1][len(reqs_keyframe[1])-1][:11])
- size := 0
- for i := 1; i < len(reqs_keyframe); i += 1 {
- size += len(reqs_keyframe[i])
- }
-
- if reqs_keyframe_index == 0 {
- // l.L(`T: `,"flv拼合失败,reqs_keyframe[0]写入")
- // fmt.Println(`merge失败,reqs_keyframe[0]写入`,len(req.keyframe))
-
- last_keyframe_timestamp, _ = Keyframe_timebase(req.keyframe, last_keyframe_timestamp)
-
- for i := 0; i < len(req.keyframe); i += 1 {
- //stream
- savestream.stream.Push_tag("stream", req.keyframe[i])
- out.Write(req.keyframe[i])
- }
- // reqs_keyframe[0] = [][]byte{reqs_keyframe[0][len(reqs_keyframe[0])-1]}
- } else if size > 4 {
- if reqs_keyframe_index == len(reqs_used_id)-1 {
- l.L(`T: `, "flv强行拼合")
-
- for i := 0; i < reqs_keyframe_index; i += 1 {
- reqs_remove_id = append(reqs_remove_id, reqs_used_id[i])
- reqs_used_id[i].close()
- }
- reqs_used_id = reqs_used_id[reqs_keyframe_index:]
-
- last_keyframe_timestamp, _ = Keyframe_timebase(req.keyframe, last_keyframe_timestamp)
-
- for i := 0; i < len(req.keyframe); i += 1 {
- //stream
- savestream.stream.Push_tag("stream", req.keyframe[i])
- out.Write(req.keyframe[i])
- }
-
- reqs_keyframe = [][][]byte{}
- } else {
- req.close()
- return false
- }
- }
- } else {
- // fmt.Println(`merge成功`,len(b))
- l.L(`T: `, "flv拼合成功")
-
- last_keyframe_timestamp = success_last_keyframe_timestamp
-
- for i := 0; i < merged; i += 1 {
- reqs_remove_id = append(reqs_remove_id, reqs_used_id[i])
- reqs_used_id[i].close()
- }
- reqs_keyframe = [][][]byte{}
-
- reqs_used_id = reqs_used_id[merged:]
-
- //stream
- savestream.stream.Push_tag("stream", b)
- out.Write(b)
- }
-
- return false
- },
- // 11区 1
- `close`: func(data interface{}) bool {
- // defer l.L(`I: `,"处理退出")
- for i := 0; i < len(reqs_used_id); i += 1 {
- reqs_used_id[i].close()
- }
- reqs_used_id = []id_close{}
- // reqs_remove_id = []id_close{}
- reqs_keyframe = [][][]byte{}
- last_keyframe_timestamp = 0
- return true
- },
- })
- }
-
- //连接保持
- for {
- //随机选取服务器,获取超时时间
-
- live_index := 0
- if len(c.C.Live) > 0 {
- live_index = int(p.Rand().MixRandom(0, int64(len(c.C.Live)-1)))
- }
- link, exp, e := flv_get_link(c.C.Live[live_index])
- if e != nil {
- l.L(`W: `, `流链接获取错误`, e)
- break
- }
-
- // 新建chan
- var (
- bc = make(chan []byte, 1<<17)
- req = reqf.New()
- req_exit = s.Init()
- )
-
- l.L(`I: `, `新建请求`, req.Id())
-
- //新建请求
- go func(r *reqf.Req, rval reqf.Rval) {
- go func() {
- select {
- case <-exit_chan.WaitC():
- case <-req_exit.WaitC():
- }
- r.Close()
- }()
- defer req_exit.Done()
- e := r.Reqf(rval)
- if r.Response == nil {
- l.L(`W: `, `请求退出`, r.Id(), e)
- } else if r.Response.StatusCode != 200 {
- l.L(`W: `, `请求退出`, r.Id(), e, r.Response.Status, string(r.Respon))
- } else {
- l.L(`W: `, `请求退出`, r.Id())
- }
- }(req, reqf.Rval{
- Url: link,
- Proxy: c.C.Proxy,
- Header: map[string]string{
- `Cookie`: reqf.Map_2_Cookies_String(CookieM),
- },
- //SaveToPath:savestream.path + ".flv",
- SaveToChan: bc,
- Timeout: int(int64(exp)-p.Sys().GetSTime()) * 1000,
- ReadTimeout: 5 * 1000,
- ConnectTimeout: 10 * 1000,
- })
-
- //返回通道
- var item = link_stream{
- close: req.Close,
- id: id_pool.Get(),
- }
- l.L(`I: `, `新建连接`, item.id.Id)
-
- //解析
- go func(bc chan []byte, item *link_stream, exit_chan *s.Signal) {
- var (
- buf []byte
- skip_buf_size int
- )
- defer req_exit.Done()
- defer l.L(`W: `, `连接退出`, item.id.Id)
- for exit_chan.Islive() && req_exit.Islive() {
- select {
- case <-exit_chan.WaitC():
- return
- case <-req_exit.WaitC():
- return
- case b := <-bc:
- if len(b) == 0 {
- // fmt.Println(`req退出`,item.id.Id)
- id_pool.Put(item.id)
- // reqs.Push_tag(`closereq`,*item)
- return
- }
-
- buf = append(buf, b...)
-
- if len(buf) < skip_buf_size {
- break
- }
-
- front, list, _ := Seach_stream_tag(buf)
-
- if len(front) != 0 && len(item.front) == 0 {
- // fmt.Println(item.id.Id,`获取到header`,len(front))
- item.front = make([]byte, len(front))
- copy(item.front, front)
- }
-
- if len(list) == 0 || len(item.front) == 0 {
- // fmt.Println(`再次查询bufsize`,skip_buf_size)
- skip_buf_size = 2 * len(buf)
- break
- }
-
- item.keyframe = list
-
- {
- last_keyframe := list[len(list)-1]
- cut_offset := bytes.LastIndex(buf, last_keyframe) + len(last_keyframe)
- // fmt.Printf("buf截断 当前%d=>%d 下一header %b\n",len(buf),len(buf)-cut_offset,buf[:11])
- buf = buf[cut_offset:]
- }
-
- skip_buf_size = len(buf) + len(list[0])
- reqs.Push_tag(`req`, *item)
- }
- }
- }(bc, &item, exit_chan)
-
- expires := int64(exp) - p.Sys().GetSTime() - 120
- // no expect qn
- if c.C.Live_want_qn < c.C.Live_qn {
- expires = time.Now().Add(time.Minute * 2).Unix()
- }
-
- //等待过期/退出
- {
- var exit_sign bool
- select {
- case <-req_exit.Chan: //本次连接错误,退出重试
- case <-exit_chan.Chan: //要求退出
- exit_sign = true //
- case <-time.After(time.Second * time.Duration(int(expires))):
- }
- if exit_sign {
- //退出
- // l.L(`T: `,"chan退出")
- break
- }
- }
-
- l.L(`I: `, "flv关闭,开始新连接")
-
- //即将过期,刷新c.C.Live
- F.Get(&c.C).Get(`Liveing`)
- if !c.C.Liveing {
- break
- }
- F.Get(&c.C).Get(`Live`)
- if len(c.C.Live) == 0 {
- break
- }
- }
-
- exit_chan.Done()
- reqs.Push_tag(`close`, nil)
- out.Close()
-
- p.FileMove(savestream.path+".flv.dtmp", savestream.path+".flv")
- } else {
- savestream.path += "/"
- if rel, err := filepath.Rel(savestream.base_path, savestream.path); err == nil {
- l.L(`I: `, "保存到", rel+`/0.m3u8`)
- } else {
- l.L(`I: `, "保存到", savestream.path)
- l.L(`W: `, err)
- }
- Ass_f(savestream.path+"0", time.Now())
-
- var (
- hls_msg = msgq.New(20)
- hls_gen hls_generate
- DISCONTINUITY int
- SEQUENCE int
- )
-
- //hls stream gen 用户m3u8生成
- go func() {
- per_second := time.Tick(time.Second)
- for {
- select {
- case <-savestream.cancel.WaitC():
- return //exit
- case now := <-per_second:
- hls_msg.Push_tag(`clock`, now)
- }
- }
- }()
-
- //hls stream gen 用户m3u8生成
- hls_msg.Pull_tag(map[string]func(interface{}) bool{
- `header`: func(d interface{}) bool {
- if b, ok := d.([]byte); ok {
- hls_gen.hls_file_header = b
- }
- return false
- },
- `body`: func(d interface{}) bool {
- links, ok := d.([]*m4s_link_item)
- if !ok {
- return false
- }
- //remove hls first m4s
- if len(links) > 0 &&
- len((*links[0]).Base) > 0 &&
- (*links[0]).Base[0] == 104 {
- links = links[1:]
- }
-
- hls_gen.m4s_list = append(hls_gen.m4s_list, links...)
-
- return false
- },
- `clock`: func(now interface{}) bool {
- //buffer
- if len(hls_gen.m4s_list)-savestream.hlsbuffersize < 0 {
- return false
- }
-
- //add block
- var (
- m4s_num int
- has_DICONTINUITY bool
- res []byte
- threshold = savestream.m4s_hls
- )
- if threshold < 3 {
- threshold = 3
- }
- {
- //m4s list
- m4s_list_b := []byte{}
- for k, v := range hls_gen.m4s_list {
- if v.status != s_fin {
- //#EXT-X-DISCONTINUITY-SEQUENCE
- //reset hls lists
- if !has_DICONTINUITY && m4s_num < threshold {
- m4s_list := append(util.SliceCopy(hls_gen.m4s_list[:k]).([]*m4s_link_item), &m4s_link_item{
- Base: "DICONTINUITY",
- status: s_fin,
- isshow: true,
- })
- hls_gen.m4s_list = append(m4s_list, hls_gen.m4s_list[k:]...)
- m4s_list_b = append(m4s_list_b, []byte("#EXT-X-DICONTINUITY\n")...)
- }
- break
- }
-
- v.isshow = true
-
- if v.Base == "DICONTINUITY" {
- has_DICONTINUITY = true
- m4s_list_b = append(m4s_list_b, []byte("#EXT-X-DICONTINUITY\n")...)
- continue
- }
-
- if m4s_num >= savestream.m4s_hls {
- break
- }
-
- m4s_num += 1
- // if m4s_num == 1 {SEQUENCE = strings.ReplaceAll(v.Base, ".m4s", "")}
- m4s_list_b = append(m4s_list_b, []byte("#EXTINF:1,"+v.Base+"\n")...)
- m4s_list_b = append(m4s_list_b, v.Base...)
- m4s_list_b = append(m4s_list_b, []byte("\n")...)
- }
-
- //have useable m4s
- if m4s_num != 0 {
- //add header
- res = hls_gen.hls_file_header
- //add #EXT-X-DISCONTINUITY-SEQUENCE
- res = append(res, []byte("#EXT-X-DISCONTINUITY-SEQUENCE:"+strconv.Itoa(DISCONTINUITY)+"\n")...)
- //add #EXT-X-MEDIA-SEQUENCE
- res = append(res, []byte("#EXT-X-MEDIA-SEQUENCE:"+strconv.Itoa(SEQUENCE)+"\n")...)
- //add #INFO
- res = append(res, []byte(fmt.Sprintf("#INFO-BUFFER:%d/%d\n", m4s_num, len(hls_gen.m4s_list)))...)
- //add m4s
- res = append(res, m4s_list_b...)
- //去除最后一个换行
- res = res[:len(res)-1]
- }
- }
-
- //try to jump the hole
- var skip_del bool
- if m4s_num < threshold {
- var (
- index int //the first useable fmp4 index of section
- DICONTINUITY_num int
- catch bool //catch useable fmp4?
- )
- for i := 0; i < len(hls_gen.m4s_list); i += 1 {
- if hls_gen.m4s_list[i].status != s_fin {
- catch = false
- continue
- }
- if !catch {
- index = i
- } else {
- if hls_gen.m4s_list[i].Base == "DICONTINUITY" {
- DICONTINUITY_num += 1
- } else if i-index-DICONTINUITY_num > threshold {
- //find a nice index, remove all bad fmp4s
- skip := 0
- skip_del = true
- for ; index >= 0; index -= 1 {
- if hls_gen.m4s_list[index].status == s_fin {
- continue
- }
- skip += 1
- hls_gen.m4s_list = append(hls_gen.m4s_list[:index], hls_gen.m4s_list[index+1:]...)
- }
- l.L(`I: `, "卡顿,跳过", skip, "个tag")
- break
- }
- }
- catch = true
- }
- }
-
- //设置到全局变量,方便流服务器获取
- if len(res) != 0 {
- savestream.hls_stream.b = res
- }
- savestream.hls_stream.t, _ = now.(time.Time)
-
- //del
- for del_num := 1; !skip_del && del_num > 0; hls_gen.m4s_list = hls_gen.m4s_list[1:] {
- del_num -= 1
- if !hls_gen.m4s_list[0].isshow {
- continue
- }
- //#EXT-X-DICONTINUITY
- if hls_gen.m4s_list[0].Base == "DICONTINUITY" {
- DISCONTINUITY += 1
- continue
- }
- //#EXTINF
- if hls_gen.m4s_list[0].isshow {
- SEQUENCE += 1
-
- { //stream hls2mp4
- var buf []byte
- //stream front
- if len(savestream.front) == 0 {
- buf, _, e := get_m4s_cache(savestream.path + hls_gen.hls_first_fmp4_name)
- if e == nil {
- savestream.front = buf
- } else {
- l.L(`W: `, `推送mp4流错误,无法读取文件`, e)
- }
- }
- //stream body
- buf, _, e := get_m4s_cache(savestream.path + hls_gen.m4s_list[0].Base)
- if e == nil {
- savestream.stream.Push_tag(`stream`, buf)
- } else {
- l.L(`W: `, `推送mp4流错误,无法读取文件`, e)
- }
- }
- }
- }
-
- return false
- },
- `close`: func(d interface{}) bool {
- savestream.hls_stream.b = []byte{} //退出置空
- savestream.hls_stream.t = time.Now()
- return true
- },
- })
-
- var (
- last_download *m4s_link_item
- miss_download = make(chan *m4s_link_item, 100)
- download_limit = funcCtrl.BlockFuncN{
- Max: 2,
- } //limit
- )
- expires := time.Now().Add(time.Minute * 2).Unix()
-
- var (
- path_front string
- path_behind string
- )
-
- for {
- //退出,等待下载完成
- if !savestream.cancel.Islive() {
- l.L(`I: `, "退出,等待片段下载")
- download_limit.None()
- download_limit.UnNone()
-
- links := []*m4s_link_item{}
- //下载出错的
- for len(miss_download) != 0 {
- links = append(links, <-miss_download)
- }
-
- for k, v := range links {
- l.L(`I: `, "正在下载最后片段:", k+1, "/", len(links))
- v.status = s_loading
- r := reqf.New()
- if e := r.Reqf(reqf.Rval{
- Url: v.Url,
- SaveToPath: savestream.path + v.Base,
- ConnectTimeout: 5000,
- ReadTimeout: 1000,
- Retry: 1,
- Proxy: c.C.Proxy,
- }); e != nil && !errors.Is(e, io.EOF) {
- l.L(`I: `, e)
- v.status = s_fail
- } else {
- if usedt := r.UsedTime.Seconds(); usedt > 700 {
- l.L(`I: `, `hls切片下载慢`, usedt, `ms`)
- }
- v.status = s_fin
- }
- }
- //退出
- break
- }
-
- links, file_add, exp, e := hls_get_link(c.C.Live, last_download)
- if e != nil {
- if e == no_Modified {
- time.Sleep(time.Duration(2) * time.Second)
- continue
- } else if reqf.IsTimeout(e) || strings.Contains(e.Error(), "x509") {
- l.L(`I: `, e)
- continue
- } else {
- l.L(`W: `, e)
- break
- }
- }
-
- //first block 获取并设置不变头
- if last_download == nil {
- var res []byte
- {
- for row, i := bytes.SplitAfter(file_add, []byte("\n")), 0; i < len(row); i += 1 {
- if bytes.Contains(row[i], []byte("#EXT")) {
- //set stream front
- if op := bytes.Index(row[i], []byte(`#EXT-X-MAP:URI="`)); op != -1 {
- hls_gen.hls_first_fmp4_name = string(row[i][op+16 : len(row[i])-2])
- }
- if bytes.Contains(row[i], []byte("#EXT-X-MEDIA-SEQUENCE")) || bytes.Contains(row[i], []byte("#EXTINF")) {
- continue
- }
- res = append(res, row[i]...)
- }
- }
- }
- hls_msg.Push_tag(`header`, res)
- }
-
- if len(links) == 0 {
- time.Sleep(time.Duration(2) * time.Second)
- continue
- }
-
- //qn in expect , set expires
- if c.C.Live_want_qn >= c.C.Live_qn {
- expires = int64(exp)
- }
-
- //use guess
- if last_download != nil {
- previou, _ := strconv.Atoi((*last_download).Base[:len((*last_download).Base)-4])
- now, _ := strconv.Atoi(links[0].Base[:len(links[0].Base)-4])
- if previou < now-1 {
- if diff := now - previou; diff > 100 {
- l.L(`W: `, `diff too large `, diff)
- break
- } else {
- l.L(`I: `, `猜测hls`, previou, `-`, now, `(`, diff, `)`)
- }
-
- { //file_add
- for i := now - 1; i > previou; i -= 1 {
- file_add = append([]byte(strconv.Itoa(i)+".m4s"), file_add...)
- }
- }
- { //links
- if path_front == "" || path_behind == "" {
- u, e := url.Parse(links[0].Url)
- if e != nil {
- l.L(`E: `, `fault to enable guess`, e)
- return
- }
- path_front = u.Scheme + "://" + path.Dir(u.Host+u.Path) + "/"
- path_behind = "?" + u.RawQuery
- }
-
- //出错期间没能获取到的
- for i := now - 1; i > previou; i -= 1 {
- base := strconv.Itoa(i) + ".m4s"
- links = append([]*m4s_link_item{
- {
- Url: path_front + base + path_behind,
- Base: base,
- },
- }, links...)
- }
- }
- }
- }
-
- if len(links) > 10 {
- l.L(`T: `, `等待下载切片:`, len(links))
- } else if len(links) > 100 {
- l.L(`W: `, `重试,等待下载切片:`, len(links))
-
- if F.Get(&c.C).Get(`Liveing`); !c.C.Liveing {
- break
- }
- if F.Get(&c.C).Get(`Live`); len(c.C.Live) == 0 {
- break
- }
-
- // set expect
- expires = time.Now().Add(time.Minute * 2).Unix()
- continue
- }
-
- //将links传送给hls生成器
- hls_msg.Push_tag(`body`, links)
-
- f := p.File()
- f.FileWR(p.Filel{
- File: savestream.path + "0.m3u8.dtmp",
- Loc: -1,
- Context: []interface{}{file_add},
- })
-
- for i := 0; i < len(links); i += 1 {
- //fmp4切片下载
- go func(link *m4s_link_item, path string) {
- //use url struct
- var link_url *url.URL
- {
- if tmp, e := url.Parse(link.Url); e != nil {
- l.L(`E: `, e)
- return
- } else {
- link_url = tmp
- }
- }
-
- download_limit.Block()
- defer download_limit.UnBlock()
-
- link.status = s_loading
- for index := 0; index < len(Host_list); index += 1 {
- link_url.Host = Host_list[index]
-
- r := reqf.New()
- if e := r.Reqf(reqf.Rval{
- Url: link_url.String(),
- SaveToPath: path + link.Base,
- ConnectTimeout: 2000,
- ReadTimeout: 1000,
- Timeout: 2000,
- Proxy: c.C.Proxy,
- }); e != nil {
- //try other host
- if index+1 < len(Host_list) {
- continue
- }
-
- if reqf.IsTimeout(e) || strings.Contains(e.Error(), "x509") {
- l.L(`T: `, link.Base, `将重试!`)
- //避免影响后续猜测
- link.Offset_line = 0
- go func(link *m4s_link_item) { miss_download <- link }(link)
- } else {
- l.L(`W: `, e)
- link.status = s_fail
- }
- } else {
- if usedt := r.UsedTime.Seconds(); usedt > 700 {
- l.L(`I: `, `hls切片下载慢`, usedt, `ms`)
- }
- link.status = s_fin
- //存入cache
- if _, ok := m4s_cache.Load(path + link.Base); !ok {
- m4s_cache.Store(path+link.Base, r.Respon)
- go func() { //移除
- time.Sleep(time.Second * time.Duration(savestream.hlsbuffersize+savestream.m4s_hls+2))
- m4s_cache.Delete(path + link.Base)
- }()
- }
- break
- }
- }
- }(links[i], savestream.path)
-
- //只记录最新
- if links[i].Offset_line > 0 {
- last_download = links[i]
- }
- }
-
- //m3u8_url 将过期
- if p.Sys().GetSTime()+60 > expires {
- if F.Get(&c.C).Get(`Liveing`); !c.C.Liveing {
- break
- }
- if F.Get(&c.C).Get(`Live`); len(c.C.Live) == 0 {
- break
- }
- // set expect
- expires = time.Now().Add(time.Minute * 2).Unix()
- } else {
- time.Sleep(time.Second)
- }
- }
-
- if p.Checkfile().IsExist(savestream.path + "0.m3u8.dtmp") {
- f := p.File()
- f.FileWR(p.Filel{
- File: savestream.path + "0.m3u8.dtmp",
- Loc: -1,
- Context: []interface{}{"#EXT-X-ENDLIST"},
- })
- p.FileMove(savestream.path+"0.m3u8.dtmp", savestream.path+"0.m3u8")
- }
-
- hls_msg.Push_tag(`close`, nil)
- }
- //set ro ``
- savestream.path = ``
- savestream.front = []byte{} //flv头及首tag置空
- savestream.stream.Push_tag("close", nil)
- Ass_f("", time.Now()) //ass
- l.L(`I: `, "结束")
-
- if !savestream.cancel.Islive() {
- // l.L(`I: `,"退出")
- break
- } //cancel
- /*
- Savestream需要外部组件
- ffmpeg http://ffmpeg.org/download.html
- */
- // if p.Checkfile().IsExist(savestream.path+".flv"){
- // l.L(`I: `,"转码中")
- // p.Exec().Run(false, "ffmpeg", "-i", savestream.path+".flv", "-c", "copy", savestream.path+".mkv")
- // if p.Checkfile().IsExist(savestream.path+".mkv"){os.Remove(savestream.path+".flv")}
- // }
-
- // l.L(`I: `,"转码结束")
- savestream.wait.Done()
- savestream.cancel.Done()
- }
- savestream.wait.Done()
- savestream.cancel.Done()
}
-//已func形式调用,将会停止保存直播流
-func Savestream_wait() {
- if !savestream.cancel.Islive() {
- return
+func StreamOStop() {
+ if streamO.Status.Islive() {
+ streamO.Stop()
}
-
- savestream.cancel.Done()
- c.C.Log.Base(`savestream`).L(`I: `, "等待停止")
- savestream.wait.Wait()
}
type Obs struct {
}
}
-var m4s_cache sync.Map //使用内存cache避免频繁io
-
-func get_m4s_cache(path string) (buf []byte, cached bool, err error) {
- if b, ok := m4s_cache.Load(path); !ok {
- f, e := os.OpenFile(path, os.O_RDONLY, 0644)
- if e != nil {
- err = e
- return
- }
- defer f.Close()
-
- if b, e := io.ReadAll(f); e != nil {
- err = e
- return
- } else {
- buf = b
- m4s_cache.Store(path, buf)
- go func() { //移除
- time.Sleep(time.Second * time.Duration(savestream.m4s_hls+2))
- m4s_cache.Delete(path)
- }()
- }
- } else {
- cached = true
- buf, _ = b.([]byte)
- }
- return
-}
-
//直播Web服务口
func init() {
flog := flog.Base_add(`直播Web服务`)
if port_f, ok := c.C.K_v.LoadV(`直播Web服务口`).(float64); ok && port_f >= 0 {
port := int(port_f)
- base_dir := savestream.base_path
-
addr := "0.0.0.0:"
if port == 0 {
addr += strconv.Itoa(p.Sys().GetFreePort())
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("Content-Transfer-Encoding", "binary")
- start := time.Now()
-
- var path string = r.URL.Path[1:]
- if !p.Checkfile().IsExist(base_dir + path) {
- w.WriteHeader(http.StatusNotFound)
+ if len(streamO.getFirstM4S()) == 0 {
+ w.Header().Set("Retry-After", "1")
+ w.WriteHeader(http.StatusServiceUnavailable)
return
}
- if savestream.path != "" && strings.Contains(path, filepath.Base(savestream.path)) {
- w.Header().Set("Server", "live")
- if filepath.Ext(path) == `.dtmp` {
- if strings.Contains(path, ".flv") {
- if len(savestream.front) == 0 {
- w.Header().Set("Retry-After", "1")
- w.WriteHeader(http.StatusServiceUnavailable)
- return
- }
+ // path = base_dir+path
+ w.Header().Set("Content-Type", "video/mp4")
+ w.WriteHeader(http.StatusOK)
- // path = base_dir+path
- w.Header().Set("Content-Type", "video/x-flv")
- w.WriteHeader(http.StatusOK)
+ flusher, flushSupport := w.(http.Flusher)
+ if flushSupport {
+ flusher.Flush()
+ }
- flusher, flushSupport := w.(http.Flusher)
- if flushSupport {
- flusher.Flush()
- }
+ //写入hls头
+ if _, err := w.Write(streamO.getFirstM4S()); err != nil {
+ return
+ } else if flushSupport {
+ flusher.Flush()
+ }
+
+ cancel := make(chan struct{})
- //写入flv头,首tag
- if _, err := w.Write(savestream.front); err != nil {
- return
+ //hls切片
+ streamO.Newst_m4s.Pull_tag(map[string]func(interface{}) bool{
+ `m4s`: func(data interface{}) bool {
+ if b, ok := data.([]byte); ok {
+ if _, err := w.Write(b); err != nil {
+ close(cancel)
+ return true
} else if flushSupport {
flusher.Flush()
}
-
- cancel := make(chan struct{})
-
- //flv流关键帧间隔切片
- savestream.stream.Pull_tag(map[string]func(interface{}) bool{
- `stream`: func(data interface{}) bool {
- if b, ok := data.([]byte); ok {
- if _, err := w.Write(b); err != nil {
- close(cancel)
- return true
- } else if flushSupport {
- flusher.Flush()
- }
- }
- return false
- },
- `close`: func(data interface{}) bool {
- close(cancel)
- return true
- },
- })
-
- <-cancel
- } else if strings.Contains(path, ".m3u8") {
- if r.URL.Query().Get("type") == "mp4" {
- if len(savestream.front) == 0 {
- w.Header().Set("Retry-After", "1")
- w.WriteHeader(http.StatusServiceUnavailable)
- return
- }
-
- // path = base_dir+path
- w.Header().Set("Content-Type", "video/mp4")
- w.WriteHeader(http.StatusOK)
-
- flusher, flushSupport := w.(http.Flusher)
- if flushSupport {
- flusher.Flush()
- }
-
- //写入hls头
- if _, err := w.Write(savestream.front); err != nil {
- return
- } else if flushSupport {
- flusher.Flush()
- }
-
- cancel := make(chan struct{})
-
- //hls切片
- savestream.stream.Pull_tag(map[string]func(interface{}) bool{
- `stream`: func(data interface{}) bool {
- if b, ok := data.([]byte); ok {
- if _, err := w.Write(b); err != nil {
- close(cancel)
- return true
- } else if flushSupport {
- flusher.Flush()
- }
- }
- return false
- },
- `close`: func(data interface{}) bool {
- close(cancel)
- return true
- },
- })
-
- <-cancel
- } else {
- w.Header().Set("Cache-Control", "max-age=1")
- w.Header().Set("Content-Type", "application/vnd.apple.mpegurl")
- w.Header().Set("Last-Modified", savestream.hls_stream.t.Format(http.TimeFormat))
-
- // //经常m4s下载速度赶不上,使用阻塞避免频繁获取列表带来的卡顿
- // if time.Now().Sub(savestream.hls_stream.t).Seconds() > 1 {
- // time.Sleep(time.Duration(3)*time.Second)
- // }
-
- res := savestream.hls_stream.b
-
- if len(res) == 0 {
- w.Header().Set("Retry-After", "1")
- w.WriteHeader(http.StatusServiceUnavailable)
- return
- }
-
- //Server-Timing
- w.Header().Set("Server-Timing", fmt.Sprintf("dur=%d", time.Since(start).Microseconds()))
-
- if _, err := w.Write(res); err != nil {
- flog.L(`E: `, err)
- return
- }
- }
- }
- } else if filepath.Ext(path) == `.m4s` {
- w.Header().Set("Server", "live")
- w.Header().Set("Cache-Control", "Cache-Control:public, max-age=3600")
-
- path = base_dir + path
-
- buf, cached, e := get_m4s_cache(path)
-
- if e != nil {
- w.Header().Set("Retry-After", "1")
- w.WriteHeader(http.StatusServiceUnavailable)
- return
- }
-
- if len(buf) == 0 {
- flog.L(`W: `, `buf size 0`)
- w.Header().Set("Retry-After", "1")
- w.WriteHeader(http.StatusServiceUnavailable)
- return
- }
-
- //Server-Timing
- w.Header().Add("Server-Timing", fmt.Sprintf("cache=%v;dur=%d", cached, time.Since(start).Microseconds()))
- w.WriteHeader(http.StatusOK)
- if _, err := w.Write(buf); err != nil {
- flog.L(`E: `, err)
- return
- }
- }
- } else {
- w.Header().Set("Server", "file")
- if r.URL.Query().Get("type") == "mp4" { //hls拼合
- dir := base_dir + filepath.Dir(path) + "/"
- if !p.Checkfile().IsExist(dir + `0.m3u8`) {
- w.WriteHeader(http.StatusNotFound)
- return
- }
- var m4slist []string
- if e := filepath.WalkDir(dir, func(path string, info fs.DirEntry, err error) error {
- if err != nil {
- return err
- }
- if filepath.Ext(info.Name()) == ".m4s" {
- m4slist = append(m4slist, path)
- }
- return nil
- }); e != nil {
- flog.L(`E: `, e)
- w.WriteHeader(http.StatusServiceUnavailable)
- return
- }
- m4slist = append(m4slist[len(m4slist)-1:], m4slist[:len(m4slist)-1]...)
- for _, v := range m4slist {
- f, err := os.OpenFile(v, os.O_RDONLY, 0644)
- if err != nil {
- w.WriteHeader(http.StatusServiceUnavailable)
- flog.L(`E: `, err)
- return
- }
- io.Copy(w, f)
- f.Close()
}
- return
- }
- http.FileServer(http.Dir(base_dir)).ServeHTTP(w, r)
- }
- }
- now = func(w http.ResponseWriter, r *http.Request) {
- //header
- w.Header().Set("Access-Control-Allow-Credentials", "true")
- w.Header().Set("Access-Control-Allow-Headers", "*")
- w.Header().Set("Access-Control-Allow-Methods", "POST, GET, OPTIONS")
- w.Header().Set("Access-Control-Allow-Origin", "*")
- w.Header().Set("Cache-Control", "max-age=3")
-
- //最新直播流
- if savestream.path == `` {
- flog.L(`T: `, `还没有下载直播流-直播流为空`)
- w.WriteHeader(http.StatusNotFound)
- return
- }
-
- path := filepath.Base(savestream.path)
- if strings.Contains(c.C.Live[0], "flv") {
- path += ".flv.dtmp"
- } else {
- path += "/0.m3u8.dtmp"
- }
+ return false
+ },
+ `close`: func(data interface{}) bool {
+ close(cancel)
+ return true
+ },
+ })
- if !p.Checkfile().IsExist(base_dir + path) {
- flog.L(`T: `, `还没有下载直播流-文件未能找到`)
- w.WriteHeader(http.StatusNotFound)
- } else {
- u, e := url.Parse("../" + path)
- if e != nil {
- flog.L(`E: `, e)
- w.Header().Set("Retry-After", "1")
- w.WriteHeader(http.StatusServiceUnavailable)
- return
- }
- if r.URL.RawQuery != "" {
- u.RawQuery = r.URL.RawQuery
- }
- // r.URL =
- // root(w, r)
- w.Header().Set("Location", r.URL.ResolveReference(u).String())
- w.WriteHeader(http.StatusTemporaryRedirect)
- }
- return
+ <-cancel
}
)
s.Handle(map[string]func(http.ResponseWriter, *http.Request){
- `/`: root,
- `/now`: now,
+ `/`: root,
`/exit`: func(w http.ResponseWriter, r *http.Request) {
s.Server.Shutdown(context.Background())
},
--- /dev/null
+package reply
+
+import (
+ "bytes"
+ "encoding/base64"
+ "errors"
+ "io"
+ "net/http"
+ "net/url"
+ "path/filepath"
+ "strconv"
+ "strings"
+ "time"
+
+ c "github.com/qydysky/bili_danmu/CV"
+ F "github.com/qydysky/bili_danmu/F"
+
+ p "github.com/qydysky/part"
+ funcCtrl "github.com/qydysky/part/funcCtrl"
+ log "github.com/qydysky/part/log"
+ msgq "github.com/qydysky/part/msgq"
+ reqf "github.com/qydysky/part/reqf"
+ signal "github.com/qydysky/part/signal"
+ sync "github.com/qydysky/part/sync"
+)
+
+type M4SStream struct {
+ Status *signal.Signal //IsLive()是否运行中
+ exitSign *signal.Signal //IsLive()是否等待退出中
+ log *log.Log_interface
+ config M4SStream_Config //配置
+ stream_last_modified time.Time //流地址更新时间
+ stream_expires int64 //流到期时间
+ last_m4s *m4s_link_item //最后一个切片
+ stream_hosts sync.Map //使用的流服务器
+ Newst_m4s *msgq.Msgq //m4s消息 tag:m4s
+ first_m4s []byte //m4s起始块
+}
+
+type M4SStream_Config struct {
+ save_path string //直播流保存目录
+ want_qn int //直播流清晰度
+ want_type string //直播流类型
+ bufsize int //直播hls流缓冲
+ banlance_host bool //直播hls流均衡
+}
+
+type m4s_link_item struct {
+ Url string // m4s链接
+ Base string // m4s文件名
+ status int // 下载状态 0:未下载 1:正在下载 2:下载完成 3:下载失败
+ data []byte // 下载的数据
+}
+
+func (t *m4s_link_item) isInit() bool {
+ return strings.Contains(t.Base, "h")
+}
+
+func (t *m4s_link_item) getNo() (int, error) {
+ var base = t.Base
+ if t.isInit() {
+ base = base[1:]
+ }
+ return strconv.Atoi(base[:len(base)-4])
+}
+
+func (t *M4SStream) LoadConfig(kv *sync.Map, l *log.Log_interface) {
+ //读取配置
+ if path, ok := kv.LoadV("直播流保存位置").(string); ok {
+ if path, err := filepath.Abs(path); err == nil {
+ t.config.save_path = path + "/"
+ }
+ }
+ if v, ok := kv.LoadV(`直播hls流缓冲`).(float64); ok && v > 0 {
+ t.config.bufsize = int(v)
+ }
+ if v, ok := kv.LoadV(`直播hls流均衡`).(bool); ok {
+ t.config.banlance_host = v
+ }
+ if v, ok := kv.LoadV(`直播流清晰度`).(float64); ok {
+ t.config.want_qn = int(v)
+ }
+ if v, ok := kv.LoadV(`直播流类型`).(string); ok {
+ t.config.want_type = v
+ }
+ t.log = l.Base(`直播流保存`)
+}
+
+func (t *M4SStream) getFirstM4S() []byte {
+ return t.first_m4s
+}
+
+func (t *M4SStream) fetchCheckStream(tmpc *c.Common) bool {
+ // 获取流地址
+ tmpc.Live_want_qn = t.config.want_qn
+ if F.Get(tmpc).Get(`Live`); len(tmpc.Live) == 0 {
+ return false
+ }
+
+ // 保存流地址过期时间
+ if m3u8_url, err := url.Parse(tmpc.Live[0]); err != nil {
+ t.log.L(`E: `, err.Error())
+ return false
+ } else {
+ expires, _ := strconv.Atoi(m3u8_url.Query().Get("expires"))
+ t.stream_expires = int64(expires)
+ }
+
+ // 检查是否可以获取
+ CookieM := make(map[string]string)
+ tmpc.Cookie.Range(func(k, v interface{}) bool {
+ CookieM[k.(string)] = v.(string)
+ return true
+ })
+
+ var req = reqf.New()
+ if e := req.Reqf(reqf.Rval{
+ Url: tmpc.Live[0],
+ Retry: 10,
+ SleepTime: 1000,
+ Proxy: tmpc.Proxy,
+ Header: map[string]string{
+ `Cookie`: reqf.Map_2_Cookies_String(CookieM),
+ },
+ Timeout: 5 * 1000,
+ JustResponseCode: true,
+ }); e != nil {
+ t.log.L(`W: `, e)
+ }
+
+ if req.Response == nil {
+ t.log.L(`W: `, `live响应错误`)
+ return false
+ } else if req.Response.StatusCode != 200 {
+ t.log.L(`W: `, `live响应错误`, req.Response.Status, string(req.Respon))
+ return false
+ }
+ return true
+}
+
+func (t *M4SStream) fetchParseM3U8(tmpc *c.Common) (m4s_links []*m4s_link_item, m3u8_addon []byte) {
+ // 请求解析m3u8内容
+ for _, v := range tmpc.Live {
+ m3u8_url, err := url.Parse(v)
+ if err != nil {
+ t.log.L(`E: `, err.Error())
+ return
+ }
+
+ // 设置请求参数
+ rval := reqf.Rval{
+ Url: m3u8_url.String(),
+ ConnectTimeout: 2000,
+ ReadTimeout: 1000,
+ Timeout: 2000,
+ Proxy: c.C.Proxy,
+ Header: map[string]string{
+ `Host`: m3u8_url.Host,
+ `User-Agent`: `Mozilla/5.0 (X11; Linux x86_64; rv:83.0) Gecko/20100101 Firefox/83.0`,
+ `Accept`: `*/*`,
+ `Accept-Language`: `zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2`,
+ `Accept-Encoding`: `gzip, deflate, br`,
+ `Origin`: `https://live.bilibili.com`,
+ `Connection`: `keep-alive`,
+ `Pragma`: `no-cache`,
+ `Cache-Control`: `no-cache`,
+ `Referer`: "https://live.bilibili.com/",
+ },
+ }
+ if !t.stream_last_modified.IsZero() {
+ rval.Header[`If-Modified-Since`] = t.stream_last_modified.Add(time.Second).Format("Mon, 02 Jan 2006 15:04:05 CST")
+ }
+
+ // 开始请求
+ var r = reqf.New()
+ if e := r.Reqf(rval); e != nil {
+ continue
+ }
+
+ if r.Response.StatusCode == http.StatusNotModified {
+ t.log.L(`T: `, `hls未更改`)
+ return
+ }
+
+ // 保存最后m3u8修改时间
+ if last_mod, ok := r.Response.Header[`Last-Modified`]; ok && len(last_mod) > 0 {
+ if lm, e := time.Parse("Mon, 02 Jan 2006 15:04:05 CST", last_mod[0]); e == nil {
+ t.stream_last_modified = lm
+ }
+ }
+
+ // m3u8字节流
+ var m3u8_respon = r.Respon
+
+ // base64解码
+ if len(m3u8_respon) != 0 && !bytes.Contains(m3u8_respon, []byte("#")) {
+ m3u8_respon, err = base64.StdEncoding.DecodeString(string(m3u8_respon))
+ if err != nil {
+ t.log.L(`W: `, err, string(m3u8_respon))
+ return
+ }
+ }
+
+ // 解析m3u8
+ for _, line := range bytes.Split(m3u8_respon, []byte("\n")) {
+ if len(line) == 0 {
+ continue
+ }
+
+ var m4s_link = "" //切片文件名
+
+ //获取附加的m3u8字节 忽略bili定制拓展
+ if !bytes.Contains(line, []byte(`#EXT-X-BILI`)) {
+ if t.last_m4s == nil {
+ m3u8_addon = append(m3u8_addon, line...)
+ m3u8_addon = append(m3u8_addon, []byte("\n")...)
+ } else {
+ if bytes.Contains(line, []byte(`#EXTINF`)) ||
+ !bytes.Contains(line, []byte(`#`)) {
+ m3u8_addon = append(m3u8_addon, line...)
+ m3u8_addon = append(m3u8_addon, []byte("\n")...)
+ }
+ }
+ }
+
+ //获取切片文件名
+ if bytes.Contains(line, []byte("EXT-X-MAP")) {
+ o := bytes.Index(line, []byte(`EXT-X-MAP:URI="`)) + 15
+ e := bytes.Index(line[o:], []byte(`"`)) + o
+ m4s_link = string(line[o:e])
+ } else if bytes.Contains(line, []byte("#EXT-X")) { //忽略扩展标签
+ continue
+ } else if bytes.Contains(line, []byte(".m4s")) {
+ m4s_link = string(line)
+ } else {
+ continue
+ }
+
+ //获取切片地址
+ u, e := url.Parse("./" + m4s_link + "?trid=" + m3u8_url.Query().Get("trid"))
+ if e != nil {
+ t.log.L(`E: `, e)
+ return
+ }
+
+ //将切片添加到返回切片数组
+ m4s_links = append(m4s_links, &m4s_link_item{
+ Url: m3u8_url.ResolveReference(u).String(),
+ Base: m4s_link,
+ })
+ }
+
+ // 设置最后的切片
+ defer func(last_m4s *m4s_link_item) {
+ t.last_m4s = last_m4s
+ }(m4s_links[len(m4s_links)-1])
+
+ if t.last_m4s == nil {
+ return
+ }
+
+ // 只返回新增加的
+ for k, m4s_link := range m4s_links {
+ if m4s_link.Base == t.last_m4s.Base {
+ // 只返回新增加的切片
+ m4s_links = m4s_links[k+1:]
+
+ // 只返回新增加的m3u8字节
+ if index := bytes.Index(m3u8_addon, []byte(m4s_link.Base)); index != -1 {
+ index += len([]byte(m4s_link.Base))
+ if index == len(m3u8_addon) {
+ m3u8_addon = []byte{}
+ } else {
+ m3u8_addon = m3u8_addon[index+1:]
+ }
+ }
+ return
+ }
+ }
+
+ // 来到此处说明出现了丢失 尝试补充
+ var guess_end_no, _ = m4s_links[0].getNo()
+ for no, _ := t.last_m4s.getNo(); no < guess_end_no; no += 1 {
+ // 补充m3u8
+ m3u8_addon = append([]byte(`#EXTINF:1.00\n`+strconv.Itoa(no)+`.m4s\n`), m3u8_addon...)
+
+ //获取切片地址
+ u, e := url.Parse("./" + strconv.Itoa(no) + `.m4s`)
+ if e != nil {
+ t.log.L(`E: `, e)
+ return
+ }
+
+ //将切片添加到返回切片数组前
+ m4s_links = append([]*m4s_link_item{
+ {
+ Url: m3u8_url.ResolveReference(u).String(),
+ Base: strconv.Itoa(no) + `.m4s`,
+ },
+ }, m4s_links...)
+ }
+
+ // 请求解析成功,退出获取循环
+ break
+ }
+
+ return
+}
+
+func (t *M4SStream) saveStream(tmpc *c.Common) {
+ // 设置保存路径
+ var save_path = t.config.save_path + strconv.Itoa(tmpc.Roomid) + "_" + time.Now().Format("2006_01_02_15-04-05-000") + `/`
+
+ // 显示保存位置
+ if rel, err := filepath.Rel(t.config.save_path, save_path); err == nil {
+ t.log.L(`I: `, "保存到", rel+`/0.m3u8`)
+ } else {
+ t.log.L(`W: `, err)
+ }
+
+ // 获取流
+ if strings.Contains(tmpc.Live[0], `m3u8`) {
+ t.stream_expires = time.Now().Add(time.Minute * 2).Unix() // 流链接过期时间
+
+ // 同时下载数限制
+ var download_limit = funcCtrl.BlockFuncN{
+ Max: 2,
+ }
+
+ // 下载循环
+ for download_seq := []*m4s_link_item{}; ; {
+ // 下载切片
+ for _, v := range download_seq {
+ v.status = 1 // 设置切片状态为正在下载
+
+ // 均衡负载
+ if link_url, e := url.Parse(v.Url); e == nil {
+ if t.stream_hosts.Len() != 1 {
+ t.stream_hosts.Range(func(key, value interface{}) bool {
+ // 故障转移
+ if v.status == 3 && link_url.Host == key.(string) {
+ return true
+ }
+ // 随机
+ link_url.Host = key.(string)
+ return false
+ })
+ }
+ v.Url = link_url.String()
+ }
+
+ download_limit.Block()
+ go func(link *m4s_link_item, path string) {
+ defer download_limit.UnBlock()
+
+ r := reqf.New()
+ if e := r.Reqf(reqf.Rval{
+ Url: link.Url,
+ SaveToPath: path + link.Base,
+ ConnectTimeout: 2000,
+ ReadTimeout: 1000,
+ Timeout: 2000,
+ Proxy: tmpc.Proxy,
+ }); e != nil && !errors.Is(e, io.EOF) {
+ link.status = 3 // 设置切片状态为下载失败
+ } else {
+ if usedt := r.UsedTime.Seconds(); usedt > 700 {
+ t.log.L(`I: `, `hls切片下载慢`, usedt, `ms`)
+ }
+ link.data = r.Respon
+ link.status = 2 // 设置切片状态为下载完成
+ }
+ }(v, save_path)
+ }
+
+ // 等待队列下载完成
+ download_limit.None()
+ download_limit.UnNone()
+
+ //添加失败切片 传递切片
+ {
+ var tmp_seq []*m4s_link_item
+ for _, v := range download_seq {
+ if strings.Contains(v.Base, `h`) {
+ t.first_m4s = v.data
+ }
+
+ if v.status == 3 {
+ tmp_seq = append(tmp_seq, v)
+ } else {
+ t.Newst_m4s.Push_tag(`m4s`, v.data)
+ }
+ }
+ download_seq = tmp_seq
+ }
+
+ // 停止录制
+ if !t.Status.Islive() {
+ if len(download_seq) != 0 {
+ t.log.L(`I: `, `下载最后切片:`, len(download_seq))
+ continue
+ }
+ break
+ }
+
+ // 刷新流地址
+ if time.Now().Unix()+60 > t.stream_expires {
+ t.fetchCheckStream(tmpc)
+ }
+
+ // 获取解析m3u8
+ var m4s_links, m3u8_addon = t.fetchParseM3U8(tmpc)
+ if len(m4s_links) == 0 {
+ time.Sleep(time.Second)
+ continue
+ }
+
+ // 添加新切片到下载队列
+ download_seq = append(download_seq, m4s_links...)
+
+ // 添加m3u8字节
+ p.File().FileWR(p.Filel{
+ File: save_path + "0.m3u8.dtmp",
+ Loc: -1,
+ Context: []interface{}{m3u8_addon},
+ })
+ }
+
+ // 结束
+ if p.Checkfile().IsExist(save_path + "0.m3u8.dtmp") {
+ f := p.File()
+ f.FileWR(p.Filel{
+ File: save_path + "0.m3u8.dtmp",
+ Loc: -1,
+ Context: []interface{}{"#EXT-X-ENDLIST"},
+ })
+ p.FileMove(save_path+"0.m3u8.dtmp", save_path+"0.m3u8")
+ }
+
+ }
+}
+
+func (t *M4SStream) Start() {
+ // 清晰度-1 不保存
+ if t.config.want_qn == -1 {
+ return
+ }
+
+ // 状态检测与设置
+ if t.Status.Islive() {
+ t.log.L(`T: `, `已存在实例`)
+ return
+ }
+ t.Status = signal.Init()
+ defer t.Status.Done()
+
+ // 初始化切片消息
+ t.Newst_m4s = msgq.New(10)
+
+ var tmpc = c.C
+
+ // 主循环
+ for t.Status.Islive() {
+ // 是否在直播
+ F.Get(&tmpc).Get(`Liveing`)
+ if !tmpc.Liveing {
+ t.log.L(`T: `, `未直播`)
+ break
+ }
+
+ // 获取 and 检查流地址状态
+ if !t.fetchCheckStream(&tmpc) {
+ time.Sleep(time.Second * 5)
+ continue
+ }
+
+ // 设置均衡负载
+ for _, v := range tmpc.Live {
+ if url_struct, e := url.Parse(v); e == nil {
+ t.stream_hosts.Store(url_struct.Hostname(), nil)
+ }
+ if !t.config.banlance_host {
+ break
+ }
+ }
+
+ // 保存流
+ t.saveStream(&tmpc)
+ }
+
+ t.log.L(`T: `, `结束`)
+ t.exitSign.Done()
+}
+
+func (t *M4SStream) Stop() {
+ t.exitSign = signal.Init()
+ t.Status.Done()
+ t.exitSign.Wait()
+}