]> 127.0.0.1 Git - bili_danmu/.git/commitdiff
refactor
authorqydysky <qydysky@foxmail.com>
Wed, 20 Apr 2022 05:42:15 +0000 (13:42 +0800)
committerqydysky <qydysky@foxmail.com>
Wed, 20 Apr 2022 05:42:15 +0000 (13:42 +0800)
F/cmd.go
Reply/F.go
Reply/Reply.go
Reply/steam/stream.go [deleted file]
Reply/stream.go [new file with mode: 0644]
bili_danmu.go

index 3bcf94d842f8fc0ee14a5ba79f5804c069205680..d5949f8f0fb1fb810c450c0e30b7f307e0643ae3 100644 (file)
--- a/F/cmd.go
+++ b/F/cmd.go
@@ -160,7 +160,7 @@ func Cmd() {
                                fmt.Println(`舰长数:`, c.C.GuardNum)
                                fmt.Println(`分区排行:`, c.C.Note, `人气:`, c.C.Renqi)
                                if c.C.Stream_url != "" {
-                                       fmt.Println(`直播Web服务:`, c.C.Stream_url+`/now`)
+                                       fmt.Println(`直播Web服务:`, c.C.Stream_url)
                                }
                                fmt.Print("\n")
 
index 5d98924e4eb13c6c052eccfb10ba4dde7848bb7b..77ef155a460bc55117e29e387d27a9c0a7bc4a21 100644 (file)
@@ -1,21 +1,14 @@
 package reply
 
 import (
-       "bytes"
        "context"
-       "encoding/base64"
        "encoding/json"
-       "errors"
        "fmt"
        "io"
-       "io/fs"
        "io/ioutil"
        "math"
        "net/http"
-       "net/url"
-       "os"
        "os/exec"
-       "path"
        "path/filepath"
        "strconv"
        "strings"
@@ -31,14 +24,9 @@ import (
        send "github.com/qydysky/bili_danmu/Send"
 
        p "github.com/qydysky/part"
-       funcCtrl "github.com/qydysky/part/funcCtrl"
-       idpool "github.com/qydysky/part/idpool"
        limit "github.com/qydysky/part/limit"
        msgq "github.com/qydysky/part/msgq"
-       reqf "github.com/qydysky/part/reqf"
-       s "github.com/qydysky/part/signal"
        psync "github.com/qydysky/part/sync"
-       util "github.com/qydysky/part/util"
        web "github.com/qydysky/part/web"
 
        obsws "github.com/christopher-dG/go-obs-websocket"
@@ -204,7 +192,7 @@ func Ass_f(file string, st time.Time) {
                return
        }
 
-       if rel, err := filepath.Rel(savestream.base_path, ass.file); err == nil {
+       if rel, err := filepath.Rel(streamO.config.save_path, ass.file); err == nil {
                c.C.Log.Base(`Ass`).L(`I: `, "保存到", rel+".ass")
        } else {
                c.C.Log.Base(`Ass`).L(`I: `, "保存到", ass.file+".ass")
@@ -259,1229 +247,27 @@ func dtos(t time.Duration) string {
 
 //hls
 //https://datatracker.ietf.org/doc/html/draft-pantos-http-live-streaming
-
-//直播流保存
-type Savestream struct {
-       base_path  string
-       path       string
-       hls_stream struct {
-               b []byte //发送给客户的m3u8字节
-               t time.Time
-       }
-       front  []byte     //flv头及首tag or hls的初始化m4s
-       stream *msgq.Msgq //发送给客户的flv流关键帧间隔片 or hls的fmp4片
-
-       m4s_hls           int  //hls list 中的m4s数量
-       hlsbuffersize     int  //hls list缓冲m4s数量
-       hls_banlance_host bool //使用均衡hls服务器
-
-       wait     *s.Signal
-       cancel   *s.Signal
-       skipFunc funcCtrl.SkipFunc
-}
-
-type hls_generate struct {
-       hls_first_fmp4_name string
-       hls_file_header     []byte           //发送给客户的m3u8不变头
-       m4s_list            []*m4s_link_item //m4s列表 缓冲
-}
-
-type m4s_link_item struct { //使用指针以设置是否已下载
-       Url         string // m4s链接
-       Base        string //m4s文件名
-       Offset_line int    //m3u8中的行下标
-       status      int    //该m4s下载状态 s_noload:未下载 s_loading正在下载 s_fin下载完成 s_fail下载失败
-       isshow      bool
-}
-
-//m4s状态
-const (
-       s_noload = iota
-       s_loading
-       s_fin
-       s_fail
-)
-
-var savestream = Savestream{
-       stream:  msgq.New(10), //队列最多保留10个关键帧间隔片
-       m4s_hls: 8,
-}
+var streamO = new(M4SStream)
 
 func init() {
        //使用带tag的消息队列在功能间传递消息
        c.C.Danmu_Main_mq.Pull_tag(msgq.FuncMap{
                `savestream`: func(data interface{}) bool {
-                       if savestream.cancel.Islive() {
-                               Savestream_wait()
+                       if streamO.Status.Islive() {
+                               streamO.Stop()
                        } else {
-                               go Savestreamf()
+                               streamO.LoadConfig(&c.C.K_v, c.C.Log)
+                               go streamO.Start()
                        }
-
                        return false
                },
        })
-       //base_path
-       if path, ok := c.C.K_v.LoadV("直播流保存位置").(string); ok {
-               if path, err := filepath.Abs(path); err == nil {
-                       savestream.base_path = path + "/"
-               }
-       }
-       if v, ok := c.C.K_v.LoadV(`直播hls流缓冲`).(float64); ok && v > 0 {
-               savestream.hlsbuffersize = int(v)
-       }
-       if v, ok := c.C.K_v.LoadV(`直播hls流均衡`).(bool); ok {
-               savestream.hls_banlance_host = v
-       }
-}
-
-//已go func形式调用,将会获取直播流
-func Savestreamf() {
-       l := c.C.Log.Base(`savestream`)
-
-       //避免多次开播导致的多次触发
-       {
-               if savestream.skipFunc.NeedSkip() {
-                       l.L(`T: `, `已存在实例`)
-                       return
-               }
-               defer savestream.skipFunc.UnSet()
-       }
-
-       want_qn, ok := c.C.K_v.LoadV("直播流清晰度").(float64)
-       if !ok || want_qn < 0 {
-               return
-       }
-       c.C.Live_want_qn = int(want_qn)
-
-       F.Get(&c.C).Get(`Live`)
-
-       if savestream.cancel.Islive() {
-               return
-       }
-
-       //random host load balance
-       Host_list := []string{}
-       if savestream.hls_banlance_host {
-               for _, v := range c.C.Live {
-                       url_struct, e := url.Parse(v)
-                       if e != nil {
-                               continue
-                       }
-                       Host_list = append(Host_list, url_struct.Hostname())
-               }
-       }
-
-       var (
-               no_found_link     = errors.New("no_found_link")
-               no_Modified       = errors.New("no_Modified")
-               last_hls_Modified time.Time
-               hls_get_link      = func(m3u8_urls []string, last_download *m4s_link_item) (need_download []*m4s_link_item, m3u8_file_addition []byte, expires int, err error) {
-                       var (
-                               r        *reqf.Req
-                               m3u8_url *url.URL
-                       )
-                       for index := 0; index < len(m3u8_urls); index += 1 {
-                               r = reqf.New()
-                               if tmp, e := url.Parse(m3u8_urls[index]); e != nil {
-                                       err = e
-                                       return
-                               } else {
-                                       m3u8_url = tmp
-                               }
-
-                               rval := reqf.Rval{
-                                       Url:            m3u8_url.String(),
-                                       ConnectTimeout: 2000,
-                                       ReadTimeout:    1000,
-                                       Timeout:        2000,
-                                       Proxy:          c.C.Proxy,
-                                       Header: map[string]string{
-                                               `Host`:            m3u8_url.Host,
-                                               `User-Agent`:      `Mozilla/5.0 (X11; Linux x86_64; rv:83.0) Gecko/20100101 Firefox/83.0`,
-                                               `Accept`:          `*/*`,
-                                               `Accept-Language`: `zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2`,
-                                               `Accept-Encoding`: `gzip, deflate, br`,
-                                               `Origin`:          `https://live.bilibili.com`,
-                                               `Connection`:      `keep-alive`,
-                                               `Pragma`:          `no-cache`,
-                                               `Cache-Control`:   `no-cache`,
-                                               `Referer`:         "https://live.bilibili.com/",
-                                       },
-                               }
-                               if !last_hls_Modified.IsZero() {
-                                       rval.Header[`If-Modified-Since`] = last_hls_Modified.Add(time.Second).Format("Mon, 02 Jan 2006 15:04:05 CST")
-                               }
-                               if e := r.Reqf(rval); e != nil {
-                                       if index+1 < len(m3u8_urls) {
-                                               continue
-                                       }
-                                       err = e
-                                       return
-                               }
-                               break
-                       }
-
-                       if usedt := r.UsedTime.Seconds(); usedt > 3000 {
-                               l.L(`I: `, `hls列表下载慢`, usedt, `ms`)
-                       }
-                       if r.Response.StatusCode == http.StatusNotModified {
-                               l.L(`T: `, `hls未更改`)
-                               err = no_Modified
-                               return
-                       }
-                       //last_hls_Modified
-                       if t, ok := r.Response.Header[`Last-Modified`]; ok && len(t) > 0 {
-                               if lm, e := time.Parse("Mon, 02 Jan 2006 15:04:05 CST", t[0]); e == nil {
-                                       last_hls_Modified = lm
-                               } else {
-                                       l.L(`T: `, e)
-                               }
-                       }
-
-                       query := m3u8_url.Query()
-                       trid := query.Get("trid")
-                       expires, _ = strconv.Atoi(query.Get("expires"))
-                       buf := r.Respon
-
-                       //base-64
-                       if len(buf) != 0 && !bytes.Contains(buf, []byte("#")) {
-                               buf, err = base64.StdEncoding.DecodeString(string(buf))
-                               if err != nil {
-                                       l.L(`W: `, err, string(buf))
-                                       return
-                               }
-                       }
-
-                       var m4s_links []*m4s_link_item
-                       lines := bytes.Split(buf, []byte("\n"))
-                       for i := 0; i < len(lines); i += 1 {
-                               line := lines[i]
-                               m4s_link := ""
-
-                               if bytes.Contains(line, []byte("EXT-X-MAP")) {
-                                       o := bytes.Index(line, []byte(`EXT-X-MAP:URI="`)) + 15
-                                       e := bytes.Index(line[o:], []byte(`"`)) + o
-                                       m4s_link = string(line[o:e])
-                               } else if bytes.Contains(lines[i], []byte("#EXT-X")) { //忽略扩展标签
-                                       continue
-                               } else if bytes.Contains(line, []byte(".m4s")) {
-                                       m4s_link = string(line)
-                               }
-
-                               if m4s_link == "" {
-                                       continue
-                               }
-
-                               u, e := url.Parse("./" + m4s_link + "?trid=" + trid)
-                               if e != nil {
-                                       err = e
-                                       return
-                               }
-                               m4s_links = append(m4s_links, &m4s_link_item{
-                                       Url:         m3u8_url.ResolveReference(u).String(),
-                                       Base:        m4s_link,
-                                       Offset_line: i,
-                               })
-                       }
-                       if len(m4s_links) == 0 {
-                               err = no_found_link
-                               return
-                       }
-
-                       if last_download == nil {
-                               m3u8_file_addition = buf
-                               need_download = m4s_links
-                               return
-                       }
-
-                       var found bool
-                       for i := 0; i < len(m4s_links); i += 1 {
-                               if found {
-                                       offset := m4s_links[i].Offset_line - 1
-                                       for i := offset; i < len(lines); i += 1 {
-                                               if bytes.Contains(lines[i], []byte("#EXT-X")) { //忽略扩展标签
-                                                       continue
-                                               }
-                                               m3u8_file_addition = append(m3u8_file_addition, lines[i]...)
-                                               m3u8_file_addition = append(m3u8_file_addition, []byte("\n")...)
-                                       }
-                                       m3u8_file_addition = m3u8_file_addition[:len(m3u8_file_addition)-1]
-
-                                       need_download = append(need_download, m4s_links[i:]...)
-                                       break
-                               }
-                               found = (*last_download).Base == m4s_links[i].Base
-                       }
-                       if !found {
-                               offset := m4s_links[1].Offset_line - 1
-                               for i := offset; i < len(lines); i += 1 {
-                                       if bytes.Contains(lines[i], []byte("#EXT-X")) { //忽略扩展标签
-                                               continue
-                                       }
-                                       m3u8_file_addition = append(m3u8_file_addition, lines[i]...)
-                                       m3u8_file_addition = append(m3u8_file_addition, []byte("\n")...)
-                               }
-                               m3u8_file_addition = m3u8_file_addition[:len(m3u8_file_addition)-1]
-
-                               need_download = append(need_download, m4s_links[1:]...)
-                       }
-
-                       return
-               }
-               flv_get_link = func(link string) (need_download string, expires int, err error) {
-                       need_download = link
-
-                       url_struct, e := url.Parse(link)
-                       if e != nil {
-                               err = e
-                               return
-                       }
-                       query := url_struct.Query()
-                       expires, _ = strconv.Atoi(query.Get("expires"))
-
-                       return
-               }
-       )
-
-       for {
-               F.Get(&c.C).Get(`Liveing`)
-               if !c.C.Liveing {
-                       break
-               }
-
-               F.Get(&c.C).Get(`Live`)
-               if len(c.C.Live) == 0 {
-                       break
-               }
-
-               savestream.path = savestream.base_path
-
-               savestream.path += strconv.Itoa(c.C.Roomid) + "_" + time.Now().Format("2006_01_02_15-04-05-000")
-
-               savestream.wait = s.Init()
-               savestream.cancel = s.Init()
-
-               CookieM := make(map[string]string)
-               c.C.Cookie.Range(func(k, v interface{}) bool {
-                       CookieM[k.(string)] = v.(string)
-                       return true
-               })
-
-               { //重试
-                       r := reqf.New()
-                       go func() {
-                               savestream.cancel.Wait()
-                               r.Close()
-                       }()
-                       l.L(`I: `, "尝试连接live")
-                       if e := r.Reqf(reqf.Rval{
-                               Url:       c.C.Live[0],
-                               Retry:     10,
-                               SleepTime: 1000,
-                               Proxy:     c.C.Proxy,
-                               Header: map[string]string{
-                                       `Cookie`: reqf.Map_2_Cookies_String(CookieM),
-                               },
-                               Timeout:          5 * 1000,
-                               JustResponseCode: true,
-                       }); e != nil {
-                               l.L(`W: `, e)
-                       }
-
-                       if r.Response == nil {
-                               l.L(`W: `, `live响应错误`)
-                               savestream.wait.Done()
-                               savestream.cancel.Done()
-                               time.Sleep(time.Second * 5)
-                               continue
-                       } else if r.Response.StatusCode != 200 {
-                               l.L(`W: `, `live响应错误`, r.Response.Status, string(r.Respon))
-                               savestream.wait.Done()
-                               savestream.cancel.Done()
-                               time.Sleep(time.Second * 5)
-                               continue
-                       }
-               }
-
-               if strings.Contains(c.C.Live[0], "flv") {
-                       if rel, err := filepath.Rel(savestream.base_path, savestream.path); err == nil {
-                               l.L(`I: `, "保存到", rel+".flv")
-                       } else {
-                               l.L(`I: `, "保存到", savestream.path+".flv")
-                               l.L(`W: `, err)
-                       }
-                       Ass_f(savestream.path, time.Now())
-
-                       // no expect qn
-                       exit_chan := s.Init()
-                       go func() {
-                               savestream.cancel.Wait()
-                               exit_chan.Done()
-                       }()
-
-                       type link_stream struct {
-                               id       *idpool.Id
-                               front    []byte
-                               keyframe [][]byte
-                               // sync_buf []byte
-                               close func()
-                       }
-
-                       //chans
-                       var (
-                               reqs    = msgq.New(10)
-                               id_pool = idpool.New()
-                       )
-
-                       //文件
-                       out, err := os.Create(savestream.path + ".flv" + ".dtmp")
-                       if err != nil {
-                               l.L(`E: `, err)
-                               return
-                       }
-
-                       //数据整合
-                       {
-                               type id_close struct {
-                                       id    uintptr
-                                       close func()
-                               }
-
-                               var (
-                                       reqs_used_id   []id_close
-                                       reqs_remove_id []id_close
-
-                                       reqs_keyframe [][][]byte
-
-                                       reqs_func_block         funcCtrl.BlockFunc
-                                       last_keyframe_timestamp int
-                               )
-                               reqs.Pull_tag(map[string]func(interface{}) bool{
-                                       `req`: func(data interface{}) bool {
-                                               req, ok := data.(link_stream)
-
-                                               if !ok {
-                                                       return false
-                                               }
-
-                                               if len(req.keyframe) == 0 {
-                                                       // fmt.Println(`没有keyframe,退出`)
-                                                       req.close()
-                                                       return false
-                                               }
-                                               // fmt.Println(`处理req_id`,req.id.Id,`keyframe_len`,len(req.keyframe))
-
-                                               if offset, _ := out.Seek(0, 1); offset == 0 {
-                                                       // fmt.Println(`添加头`,len(req.front))
-                                                       //stream
-                                                       savestream.front = req.front
-                                                       out.Write(req.front)
-                                               }
-
-                                               reqs_func_block.Block()
-                                               defer reqs_func_block.UnBlock()
-
-                                               for i := 0; i < len(reqs_remove_id); i += 1 {
-                                                       if reqs_remove_id[i].id == req.id.Id {
-                                                               req.close()
-                                                               return false
-                                                       }
-                                               }
-
-                                               var reqs_keyframe_index int = len(reqs_used_id)
-                                               {
-                                                       var isnew bool = true
-                                                       for i := 0; i < len(reqs_used_id); i += 1 {
-                                                               if reqs_used_id[i].id == req.id.Id {
-                                                                       reqs_keyframe_index = i
-                                                                       isnew = false
-                                                                       break
-                                                               }
-                                                       }
-                                                       if isnew {
-                                                               // fmt.Println(`新req`,req.id.Id,reqs_keyframe_index)
-                                                               reqs_used_id = append(reqs_used_id, id_close{
-                                                                       id:    req.id.Id,
-                                                                       close: req.close,
-                                                               })
-                                                       }
-                                               }
-
-                                               if len(reqs_used_id) == 1 {
-                                                       // l.L(`T: `,"单req写入",len(req.keyframe))
-                                                       last_keyframe_timestamp, _ = Keyframe_timebase(req.keyframe, last_keyframe_timestamp)
-
-                                                       for i := 0; i < len(req.keyframe); i += 1 {
-                                                               //stream
-                                                               savestream.stream.Push_tag("stream", req.keyframe[i])
-                                                               out.Write(req.keyframe[i])
-                                                       }
-                                                       return false
-                                               }
-
-                                               for reqs_keyframe_index >= len(reqs_keyframe) {
-                                                       reqs_keyframe = append(reqs_keyframe, [][]byte{})
-                                               }
-                                               reqs_keyframe[reqs_keyframe_index] = append(reqs_keyframe[reqs_keyframe_index], req.keyframe...)
-
-                                               // fmt.Println(`merge,添加reqs_keyframe数据`,reqs_keyframe_index,len(reqs_keyframe[reqs_keyframe_index]))
-
-                                               for _, v := range reqs_keyframe {
-                                                       if len(v) == 0 {
-                                                               // fmt.Println(`merge,req无数据`,k)
-                                                               return false
-                                                       }
-                                               }
-
-                                               if success_last_keyframe_timestamp, b, merged := Merge_stream(reqs_keyframe, last_keyframe_timestamp); merged == 0 {
-                                                       // fmt.Println(`merge失败,reqs_keyframe[1]`,reqs_keyframe[1][0][:11],reqs_keyframe[1][len(reqs_keyframe[1])-1][:11])
-                                                       size := 0
-                                                       for i := 1; i < len(reqs_keyframe); i += 1 {
-                                                               size += len(reqs_keyframe[i])
-                                                       }
-
-                                                       if reqs_keyframe_index == 0 {
-                                                               // l.L(`T: `,"flv拼合失败,reqs_keyframe[0]写入")
-                                                               // fmt.Println(`merge失败,reqs_keyframe[0]写入`,len(req.keyframe))
-
-                                                               last_keyframe_timestamp, _ = Keyframe_timebase(req.keyframe, last_keyframe_timestamp)
-
-                                                               for i := 0; i < len(req.keyframe); i += 1 {
-                                                                       //stream
-                                                                       savestream.stream.Push_tag("stream", req.keyframe[i])
-                                                                       out.Write(req.keyframe[i])
-                                                               }
-                                                               // reqs_keyframe[0] = [][]byte{reqs_keyframe[0][len(reqs_keyframe[0])-1]}
-                                                       } else if size > 4 {
-                                                               if reqs_keyframe_index == len(reqs_used_id)-1 {
-                                                                       l.L(`T: `, "flv强行拼合")
-
-                                                                       for i := 0; i < reqs_keyframe_index; i += 1 {
-                                                                               reqs_remove_id = append(reqs_remove_id, reqs_used_id[i])
-                                                                               reqs_used_id[i].close()
-                                                                       }
-                                                                       reqs_used_id = reqs_used_id[reqs_keyframe_index:]
-
-                                                                       last_keyframe_timestamp, _ = Keyframe_timebase(req.keyframe, last_keyframe_timestamp)
-
-                                                                       for i := 0; i < len(req.keyframe); i += 1 {
-                                                                               //stream
-                                                                               savestream.stream.Push_tag("stream", req.keyframe[i])
-                                                                               out.Write(req.keyframe[i])
-                                                                       }
-
-                                                                       reqs_keyframe = [][][]byte{}
-                                                               } else {
-                                                                       req.close()
-                                                                       return false
-                                                               }
-                                                       }
-                                               } else {
-                                                       // fmt.Println(`merge成功`,len(b))
-                                                       l.L(`T: `, "flv拼合成功")
-
-                                                       last_keyframe_timestamp = success_last_keyframe_timestamp
-
-                                                       for i := 0; i < merged; i += 1 {
-                                                               reqs_remove_id = append(reqs_remove_id, reqs_used_id[i])
-                                                               reqs_used_id[i].close()
-                                                       }
-                                                       reqs_keyframe = [][][]byte{}
-
-                                                       reqs_used_id = reqs_used_id[merged:]
-
-                                                       //stream
-                                                       savestream.stream.Push_tag("stream", b)
-                                                       out.Write(b)
-                                               }
-
-                                               return false
-                                       },
-                                       // 11区        1
-                                       `close`: func(data interface{}) bool {
-                                               // defer l.L(`I: `,"处理退出")
-                                               for i := 0; i < len(reqs_used_id); i += 1 {
-                                                       reqs_used_id[i].close()
-                                               }
-                                               reqs_used_id = []id_close{}
-                                               // reqs_remove_id = []id_close{}
-                                               reqs_keyframe = [][][]byte{}
-                                               last_keyframe_timestamp = 0
-                                               return true
-                                       },
-                               })
-                       }
-
-                       //连接保持
-                       for {
-                               //随机选取服务器,获取超时时间
-
-                               live_index := 0
-                               if len(c.C.Live) > 0 {
-                                       live_index = int(p.Rand().MixRandom(0, int64(len(c.C.Live)-1)))
-                               }
-                               link, exp, e := flv_get_link(c.C.Live[live_index])
-                               if e != nil {
-                                       l.L(`W: `, `流链接获取错误`, e)
-                                       break
-                               }
-
-                               // 新建chan
-                               var (
-                                       bc       = make(chan []byte, 1<<17)
-                                       req      = reqf.New()
-                                       req_exit = s.Init()
-                               )
-
-                               l.L(`I: `, `新建请求`, req.Id())
-
-                               //新建请求
-                               go func(r *reqf.Req, rval reqf.Rval) {
-                                       go func() {
-                                               select {
-                                               case <-exit_chan.WaitC():
-                                               case <-req_exit.WaitC():
-                                               }
-                                               r.Close()
-                                       }()
-                                       defer req_exit.Done()
-                                       e := r.Reqf(rval)
-                                       if r.Response == nil {
-                                               l.L(`W: `, `请求退出`, r.Id(), e)
-                                       } else if r.Response.StatusCode != 200 {
-                                               l.L(`W: `, `请求退出`, r.Id(), e, r.Response.Status, string(r.Respon))
-                                       } else {
-                                               l.L(`W: `, `请求退出`, r.Id())
-                                       }
-                               }(req, reqf.Rval{
-                                       Url:   link,
-                                       Proxy: c.C.Proxy,
-                                       Header: map[string]string{
-                                               `Cookie`: reqf.Map_2_Cookies_String(CookieM),
-                                       },
-                                       //SaveToPath:savestream.path + ".flv",
-                                       SaveToChan:     bc,
-                                       Timeout:        int(int64(exp)-p.Sys().GetSTime()) * 1000,
-                                       ReadTimeout:    5 * 1000,
-                                       ConnectTimeout: 10 * 1000,
-                               })
-
-                               //返回通道
-                               var item = link_stream{
-                                       close: req.Close,
-                                       id:    id_pool.Get(),
-                               }
-                               l.L(`I: `, `新建连接`, item.id.Id)
-
-                               //解析
-                               go func(bc chan []byte, item *link_stream, exit_chan *s.Signal) {
-                                       var (
-                                               buf           []byte
-                                               skip_buf_size int
-                                       )
-                                       defer req_exit.Done()
-                                       defer l.L(`W: `, `连接退出`, item.id.Id)
-                                       for exit_chan.Islive() && req_exit.Islive() {
-                                               select {
-                                               case <-exit_chan.WaitC():
-                                                       return
-                                               case <-req_exit.WaitC():
-                                                       return
-                                               case b := <-bc:
-                                                       if len(b) == 0 {
-                                                               // fmt.Println(`req退出`,item.id.Id)
-                                                               id_pool.Put(item.id)
-                                                               // reqs.Push_tag(`closereq`,*item)
-                                                               return
-                                                       }
-
-                                                       buf = append(buf, b...)
-
-                                                       if len(buf) < skip_buf_size {
-                                                               break
-                                                       }
-
-                                                       front, list, _ := Seach_stream_tag(buf)
-
-                                                       if len(front) != 0 && len(item.front) == 0 {
-                                                               // fmt.Println(item.id.Id,`获取到header`,len(front))
-                                                               item.front = make([]byte, len(front))
-                                                               copy(item.front, front)
-                                                       }
-
-                                                       if len(list) == 0 || len(item.front) == 0 {
-                                                               // fmt.Println(`再次查询bufsize`,skip_buf_size)
-                                                               skip_buf_size = 2 * len(buf)
-                                                               break
-                                                       }
-
-                                                       item.keyframe = list
-
-                                                       {
-                                                               last_keyframe := list[len(list)-1]
-                                                               cut_offset := bytes.LastIndex(buf, last_keyframe) + len(last_keyframe)
-                                                               // fmt.Printf("buf截断 当前%d=>%d 下一header %b\n",len(buf),len(buf)-cut_offset,buf[:11])
-                                                               buf = buf[cut_offset:]
-                                                       }
-
-                                                       skip_buf_size = len(buf) + len(list[0])
-                                                       reqs.Push_tag(`req`, *item)
-                                               }
-                                       }
-                               }(bc, &item, exit_chan)
-
-                               expires := int64(exp) - p.Sys().GetSTime() - 120
-                               // no expect qn
-                               if c.C.Live_want_qn < c.C.Live_qn {
-                                       expires = time.Now().Add(time.Minute * 2).Unix()
-                               }
-
-                               //等待过期/退出
-                               {
-                                       var exit_sign bool
-                                       select {
-                                       case <-req_exit.Chan: //本次连接错误,退出重试
-                                       case <-exit_chan.Chan: //要求退出
-                                               exit_sign = true //
-                                       case <-time.After(time.Second * time.Duration(int(expires))):
-                                       }
-                                       if exit_sign {
-                                               //退出
-                                               // l.L(`T: `,"chan退出")
-                                               break
-                                       }
-                               }
-
-                               l.L(`I: `, "flv关闭,开始新连接")
-
-                               //即将过期,刷新c.C.Live
-                               F.Get(&c.C).Get(`Liveing`)
-                               if !c.C.Liveing {
-                                       break
-                               }
-                               F.Get(&c.C).Get(`Live`)
-                               if len(c.C.Live) == 0 {
-                                       break
-                               }
-                       }
-
-                       exit_chan.Done()
-                       reqs.Push_tag(`close`, nil)
-                       out.Close()
-
-                       p.FileMove(savestream.path+".flv.dtmp", savestream.path+".flv")
-               } else {
-                       savestream.path += "/"
-                       if rel, err := filepath.Rel(savestream.base_path, savestream.path); err == nil {
-                               l.L(`I: `, "保存到", rel+`/0.m3u8`)
-                       } else {
-                               l.L(`I: `, "保存到", savestream.path)
-                               l.L(`W: `, err)
-                       }
-                       Ass_f(savestream.path+"0", time.Now())
-
-                       var (
-                               hls_msg       = msgq.New(20)
-                               hls_gen       hls_generate
-                               DISCONTINUITY int
-                               SEQUENCE      int
-                       )
-
-                       //hls stream gen 用户m3u8生成
-                       go func() {
-                               per_second := time.Tick(time.Second)
-                               for {
-                                       select {
-                                       case <-savestream.cancel.WaitC():
-                                               return //exit
-                                       case now := <-per_second:
-                                               hls_msg.Push_tag(`clock`, now)
-                                       }
-                               }
-                       }()
-
-                       //hls stream gen 用户m3u8生成
-                       hls_msg.Pull_tag(map[string]func(interface{}) bool{
-                               `header`: func(d interface{}) bool {
-                                       if b, ok := d.([]byte); ok {
-                                               hls_gen.hls_file_header = b
-                                       }
-                                       return false
-                               },
-                               `body`: func(d interface{}) bool {
-                                       links, ok := d.([]*m4s_link_item)
-                                       if !ok {
-                                               return false
-                                       }
-                                       //remove hls first m4s
-                                       if len(links) > 0 &&
-                                               len((*links[0]).Base) > 0 &&
-                                               (*links[0]).Base[0] == 104 {
-                                               links = links[1:]
-                                       }
-
-                                       hls_gen.m4s_list = append(hls_gen.m4s_list, links...)
-
-                                       return false
-                               },
-                               `clock`: func(now interface{}) bool {
-                                       //buffer
-                                       if len(hls_gen.m4s_list)-savestream.hlsbuffersize < 0 {
-                                               return false
-                                       }
-
-                                       //add block
-                                       var (
-                                               m4s_num          int
-                                               has_DICONTINUITY bool
-                                               res              []byte
-                                               threshold        = savestream.m4s_hls
-                                       )
-                                       if threshold < 3 {
-                                               threshold = 3
-                                       }
-                                       {
-                                               //m4s list
-                                               m4s_list_b := []byte{}
-                                               for k, v := range hls_gen.m4s_list {
-                                                       if v.status != s_fin {
-                                                               //#EXT-X-DISCONTINUITY-SEQUENCE
-                                                               //reset hls lists
-                                                               if !has_DICONTINUITY && m4s_num < threshold {
-                                                                       m4s_list := append(util.SliceCopy(hls_gen.m4s_list[:k]).([]*m4s_link_item), &m4s_link_item{
-                                                                               Base:   "DICONTINUITY",
-                                                                               status: s_fin,
-                                                                               isshow: true,
-                                                                       })
-                                                                       hls_gen.m4s_list = append(m4s_list, hls_gen.m4s_list[k:]...)
-                                                                       m4s_list_b = append(m4s_list_b, []byte("#EXT-X-DICONTINUITY\n")...)
-                                                               }
-                                                               break
-                                                       }
-
-                                                       v.isshow = true
-
-                                                       if v.Base == "DICONTINUITY" {
-                                                               has_DICONTINUITY = true
-                                                               m4s_list_b = append(m4s_list_b, []byte("#EXT-X-DICONTINUITY\n")...)
-                                                               continue
-                                                       }
-
-                                                       if m4s_num >= savestream.m4s_hls {
-                                                               break
-                                                       }
-
-                                                       m4s_num += 1
-                                                       // if m4s_num == 1 {SEQUENCE = strings.ReplaceAll(v.Base, ".m4s", "")}
-                                                       m4s_list_b = append(m4s_list_b, []byte("#EXTINF:1,"+v.Base+"\n")...)
-                                                       m4s_list_b = append(m4s_list_b, v.Base...)
-                                                       m4s_list_b = append(m4s_list_b, []byte("\n")...)
-                                               }
-
-                                               //have useable m4s
-                                               if m4s_num != 0 {
-                                                       //add header
-                                                       res = hls_gen.hls_file_header
-                                                       //add #EXT-X-DISCONTINUITY-SEQUENCE
-                                                       res = append(res, []byte("#EXT-X-DISCONTINUITY-SEQUENCE:"+strconv.Itoa(DISCONTINUITY)+"\n")...)
-                                                       //add #EXT-X-MEDIA-SEQUENCE
-                                                       res = append(res, []byte("#EXT-X-MEDIA-SEQUENCE:"+strconv.Itoa(SEQUENCE)+"\n")...)
-                                                       //add #INFO
-                                                       res = append(res, []byte(fmt.Sprintf("#INFO-BUFFER:%d/%d\n", m4s_num, len(hls_gen.m4s_list)))...)
-                                                       //add m4s
-                                                       res = append(res, m4s_list_b...)
-                                                       //去除最后一个换行
-                                                       res = res[:len(res)-1]
-                                               }
-                                       }
-
-                                       //try to jump the hole
-                                       var skip_del bool
-                                       if m4s_num < threshold {
-                                               var (
-                                                       index            int //the first useable fmp4 index of section
-                                                       DICONTINUITY_num int
-                                                       catch            bool //catch useable fmp4?
-                                               )
-                                               for i := 0; i < len(hls_gen.m4s_list); i += 1 {
-                                                       if hls_gen.m4s_list[i].status != s_fin {
-                                                               catch = false
-                                                               continue
-                                                       }
-                                                       if !catch {
-                                                               index = i
-                                                       } else {
-                                                               if hls_gen.m4s_list[i].Base == "DICONTINUITY" {
-                                                                       DICONTINUITY_num += 1
-                                                               } else if i-index-DICONTINUITY_num > threshold {
-                                                                       //find a nice index, remove all bad fmp4s
-                                                                       skip := 0
-                                                                       skip_del = true
-                                                                       for ; index >= 0; index -= 1 {
-                                                                               if hls_gen.m4s_list[index].status == s_fin {
-                                                                                       continue
-                                                                               }
-                                                                               skip += 1
-                                                                               hls_gen.m4s_list = append(hls_gen.m4s_list[:index], hls_gen.m4s_list[index+1:]...)
-                                                                       }
-                                                                       l.L(`I: `, "卡顿,跳过", skip, "个tag")
-                                                                       break
-                                                               }
-                                                       }
-                                                       catch = true
-                                               }
-                                       }
-
-                                       //设置到全局变量,方便流服务器获取
-                                       if len(res) != 0 {
-                                               savestream.hls_stream.b = res
-                                       }
-                                       savestream.hls_stream.t, _ = now.(time.Time)
-
-                                       //del
-                                       for del_num := 1; !skip_del && del_num > 0; hls_gen.m4s_list = hls_gen.m4s_list[1:] {
-                                               del_num -= 1
-                                               if !hls_gen.m4s_list[0].isshow {
-                                                       continue
-                                               }
-                                               //#EXT-X-DICONTINUITY
-                                               if hls_gen.m4s_list[0].Base == "DICONTINUITY" {
-                                                       DISCONTINUITY += 1
-                                                       continue
-                                               }
-                                               //#EXTINF
-                                               if hls_gen.m4s_list[0].isshow {
-                                                       SEQUENCE += 1
-
-                                                       { //stream hls2mp4
-                                                               var buf []byte
-                                                               //stream front
-                                                               if len(savestream.front) == 0 {
-                                                                       buf, _, e := get_m4s_cache(savestream.path + hls_gen.hls_first_fmp4_name)
-                                                                       if e == nil {
-                                                                               savestream.front = buf
-                                                                       } else {
-                                                                               l.L(`W: `, `推送mp4流错误,无法读取文件`, e)
-                                                                       }
-                                                               }
-                                                               //stream body
-                                                               buf, _, e := get_m4s_cache(savestream.path + hls_gen.m4s_list[0].Base)
-                                                               if e == nil {
-                                                                       savestream.stream.Push_tag(`stream`, buf)
-                                                               } else {
-                                                                       l.L(`W: `, `推送mp4流错误,无法读取文件`, e)
-                                                               }
-                                                       }
-                                               }
-                                       }
-
-                                       return false
-                               },
-                               `close`: func(d interface{}) bool {
-                                       savestream.hls_stream.b = []byte{} //退出置空
-                                       savestream.hls_stream.t = time.Now()
-                                       return true
-                               },
-                       })
-
-                       var (
-                               last_download  *m4s_link_item
-                               miss_download  = make(chan *m4s_link_item, 100)
-                               download_limit = funcCtrl.BlockFuncN{
-                                       Max: 2,
-                               } //limit
-                       )
-                       expires := time.Now().Add(time.Minute * 2).Unix()
-
-                       var (
-                               path_front  string
-                               path_behind string
-                       )
-
-                       for {
-                               //退出,等待下载完成
-                               if !savestream.cancel.Islive() {
-                                       l.L(`I: `, "退出,等待片段下载")
-                                       download_limit.None()
-                                       download_limit.UnNone()
-
-                                       links := []*m4s_link_item{}
-                                       //下载出错的
-                                       for len(miss_download) != 0 {
-                                               links = append(links, <-miss_download)
-                                       }
-
-                                       for k, v := range links {
-                                               l.L(`I: `, "正在下载最后片段:", k+1, "/", len(links))
-                                               v.status = s_loading
-                                               r := reqf.New()
-                                               if e := r.Reqf(reqf.Rval{
-                                                       Url:            v.Url,
-                                                       SaveToPath:     savestream.path + v.Base,
-                                                       ConnectTimeout: 5000,
-                                                       ReadTimeout:    1000,
-                                                       Retry:          1,
-                                                       Proxy:          c.C.Proxy,
-                                               }); e != nil && !errors.Is(e, io.EOF) {
-                                                       l.L(`I: `, e)
-                                                       v.status = s_fail
-                                               } else {
-                                                       if usedt := r.UsedTime.Seconds(); usedt > 700 {
-                                                               l.L(`I: `, `hls切片下载慢`, usedt, `ms`)
-                                                       }
-                                                       v.status = s_fin
-                                               }
-                                       }
-                                       //退出
-                                       break
-                               }
-
-                               links, file_add, exp, e := hls_get_link(c.C.Live, last_download)
-                               if e != nil {
-                                       if e == no_Modified {
-                                               time.Sleep(time.Duration(2) * time.Second)
-                                               continue
-                                       } else if reqf.IsTimeout(e) || strings.Contains(e.Error(), "x509") {
-                                               l.L(`I: `, e)
-                                               continue
-                                       } else {
-                                               l.L(`W: `, e)
-                                               break
-                                       }
-                               }
-
-                               //first block 获取并设置不变头
-                               if last_download == nil {
-                                       var res []byte
-                                       {
-                                               for row, i := bytes.SplitAfter(file_add, []byte("\n")), 0; i < len(row); i += 1 {
-                                                       if bytes.Contains(row[i], []byte("#EXT")) {
-                                                               //set stream front
-                                                               if op := bytes.Index(row[i], []byte(`#EXT-X-MAP:URI="`)); op != -1 {
-                                                                       hls_gen.hls_first_fmp4_name = string(row[i][op+16 : len(row[i])-2])
-                                                               }
-                                                               if bytes.Contains(row[i], []byte("#EXT-X-MEDIA-SEQUENCE")) || bytes.Contains(row[i], []byte("#EXTINF")) {
-                                                                       continue
-                                                               }
-                                                               res = append(res, row[i]...)
-                                                       }
-                                               }
-                                       }
-                                       hls_msg.Push_tag(`header`, res)
-                               }
-
-                               if len(links) == 0 {
-                                       time.Sleep(time.Duration(2) * time.Second)
-                                       continue
-                               }
-
-                               //qn in expect , set expires
-                               if c.C.Live_want_qn >= c.C.Live_qn {
-                                       expires = int64(exp)
-                               }
-
-                               //use guess
-                               if last_download != nil {
-                                       previou, _ := strconv.Atoi((*last_download).Base[:len((*last_download).Base)-4])
-                                       now, _ := strconv.Atoi(links[0].Base[:len(links[0].Base)-4])
-                                       if previou < now-1 {
-                                               if diff := now - previou; diff > 100 {
-                                                       l.L(`W: `, `diff too large `, diff)
-                                                       break
-                                               } else {
-                                                       l.L(`I: `, `猜测hls`, previou, `-`, now, `(`, diff, `)`)
-                                               }
-
-                                               { //file_add
-                                                       for i := now - 1; i > previou; i -= 1 {
-                                                               file_add = append([]byte(strconv.Itoa(i)+".m4s"), file_add...)
-                                                       }
-                                               }
-                                               { //links
-                                                       if path_front == "" || path_behind == "" {
-                                                               u, e := url.Parse(links[0].Url)
-                                                               if e != nil {
-                                                                       l.L(`E: `, `fault to enable guess`, e)
-                                                                       return
-                                                               }
-                                                               path_front = u.Scheme + "://" + path.Dir(u.Host+u.Path) + "/"
-                                                               path_behind = "?" + u.RawQuery
-                                                       }
-
-                                                       //出错期间没能获取到的
-                                                       for i := now - 1; i > previou; i -= 1 {
-                                                               base := strconv.Itoa(i) + ".m4s"
-                                                               links = append([]*m4s_link_item{
-                                                                       {
-                                                                               Url:  path_front + base + path_behind,
-                                                                               Base: base,
-                                                                       },
-                                                               }, links...)
-                                                       }
-                                               }
-                                       }
-                               }
-
-                               if len(links) > 10 {
-                                       l.L(`T: `, `等待下载切片:`, len(links))
-                               } else if len(links) > 100 {
-                                       l.L(`W: `, `重试,等待下载切片:`, len(links))
-
-                                       if F.Get(&c.C).Get(`Liveing`); !c.C.Liveing {
-                                               break
-                                       }
-                                       if F.Get(&c.C).Get(`Live`); len(c.C.Live) == 0 {
-                                               break
-                                       }
-
-                                       // set expect
-                                       expires = time.Now().Add(time.Minute * 2).Unix()
-                                       continue
-                               }
-
-                               //将links传送给hls生成器
-                               hls_msg.Push_tag(`body`, links)
-
-                               f := p.File()
-                               f.FileWR(p.Filel{
-                                       File:    savestream.path + "0.m3u8.dtmp",
-                                       Loc:     -1,
-                                       Context: []interface{}{file_add},
-                               })
-
-                               for i := 0; i < len(links); i += 1 {
-                                       //fmp4切片下载
-                                       go func(link *m4s_link_item, path string) {
-                                               //use url struct
-                                               var link_url *url.URL
-                                               {
-                                                       if tmp, e := url.Parse(link.Url); e != nil {
-                                                               l.L(`E: `, e)
-                                                               return
-                                                       } else {
-                                                               link_url = tmp
-                                                       }
-                                               }
-
-                                               download_limit.Block()
-                                               defer download_limit.UnBlock()
-
-                                               link.status = s_loading
-                                               for index := 0; index < len(Host_list); index += 1 {
-                                                       link_url.Host = Host_list[index]
-
-                                                       r := reqf.New()
-                                                       if e := r.Reqf(reqf.Rval{
-                                                               Url:            link_url.String(),
-                                                               SaveToPath:     path + link.Base,
-                                                               ConnectTimeout: 2000,
-                                                               ReadTimeout:    1000,
-                                                               Timeout:        2000,
-                                                               Proxy:          c.C.Proxy,
-                                                       }); e != nil {
-                                                               //try other host
-                                                               if index+1 < len(Host_list) {
-                                                                       continue
-                                                               }
-
-                                                               if reqf.IsTimeout(e) || strings.Contains(e.Error(), "x509") {
-                                                                       l.L(`T: `, link.Base, `将重试!`)
-                                                                       //避免影响后续猜测
-                                                                       link.Offset_line = 0
-                                                                       go func(link *m4s_link_item) { miss_download <- link }(link)
-                                                               } else {
-                                                                       l.L(`W: `, e)
-                                                                       link.status = s_fail
-                                                               }
-                                                       } else {
-                                                               if usedt := r.UsedTime.Seconds(); usedt > 700 {
-                                                                       l.L(`I: `, `hls切片下载慢`, usedt, `ms`)
-                                                               }
-                                                               link.status = s_fin
-                                                               //存入cache
-                                                               if _, ok := m4s_cache.Load(path + link.Base); !ok {
-                                                                       m4s_cache.Store(path+link.Base, r.Respon)
-                                                                       go func() { //移除
-                                                                               time.Sleep(time.Second * time.Duration(savestream.hlsbuffersize+savestream.m4s_hls+2))
-                                                                               m4s_cache.Delete(path + link.Base)
-                                                                       }()
-                                                               }
-                                                               break
-                                                       }
-                                               }
-                                       }(links[i], savestream.path)
-
-                                       //只记录最新
-                                       if links[i].Offset_line > 0 {
-                                               last_download = links[i]
-                                       }
-                               }
-
-                               //m3u8_url 将过期
-                               if p.Sys().GetSTime()+60 > expires {
-                                       if F.Get(&c.C).Get(`Liveing`); !c.C.Liveing {
-                                               break
-                                       }
-                                       if F.Get(&c.C).Get(`Live`); len(c.C.Live) == 0 {
-                                               break
-                                       }
-                                       // set expect
-                                       expires = time.Now().Add(time.Minute * 2).Unix()
-                               } else {
-                                       time.Sleep(time.Second)
-                               }
-                       }
-
-                       if p.Checkfile().IsExist(savestream.path + "0.m3u8.dtmp") {
-                               f := p.File()
-                               f.FileWR(p.Filel{
-                                       File:    savestream.path + "0.m3u8.dtmp",
-                                       Loc:     -1,
-                                       Context: []interface{}{"#EXT-X-ENDLIST"},
-                               })
-                               p.FileMove(savestream.path+"0.m3u8.dtmp", savestream.path+"0.m3u8")
-                       }
-
-                       hls_msg.Push_tag(`close`, nil)
-               }
-               //set ro ``
-               savestream.path = ``
-               savestream.front = []byte{} //flv头及首tag置空
-               savestream.stream.Push_tag("close", nil)
-               Ass_f("", time.Now()) //ass
-               l.L(`I: `, "结束")
-
-               if !savestream.cancel.Islive() {
-                       // l.L(`I: `,"退出")
-                       break
-               } //cancel
-               /*
-                       Savestream需要外部组件
-                       ffmpeg http://ffmpeg.org/download.html
-               */
-               // if p.Checkfile().IsExist(savestream.path+".flv"){
-               //      l.L(`I: `,"转码中")
-               //      p.Exec().Run(false, "ffmpeg", "-i", savestream.path+".flv", "-c", "copy", savestream.path+".mkv")
-               //      if p.Checkfile().IsExist(savestream.path+".mkv"){os.Remove(savestream.path+".flv")}
-               // }
-
-               // l.L(`I: `,"转码结束")
-               savestream.wait.Done()
-               savestream.cancel.Done()
-       }
-       savestream.wait.Done()
-       savestream.cancel.Done()
 }
 
-//已func形式调用,将会停止保存直播流
-func Savestream_wait() {
-       if !savestream.cancel.Islive() {
-               return
+func StreamOStop() {
+       if streamO.Status.Islive() {
+               streamO.Stop()
        }
-
-       savestream.cancel.Done()
-       c.C.Log.Base(`savestream`).L(`I: `, "等待停止")
-       savestream.wait.Wait()
 }
 
 type Obs struct {
@@ -2155,43 +941,12 @@ func AutoSend_silver_gift() {
        }
 }
 
-var m4s_cache sync.Map //使用内存cache避免频繁io
-
-func get_m4s_cache(path string) (buf []byte, cached bool, err error) {
-       if b, ok := m4s_cache.Load(path); !ok {
-               f, e := os.OpenFile(path, os.O_RDONLY, 0644)
-               if e != nil {
-                       err = e
-                       return
-               }
-               defer f.Close()
-
-               if b, e := io.ReadAll(f); e != nil {
-                       err = e
-                       return
-               } else {
-                       buf = b
-                       m4s_cache.Store(path, buf)
-                       go func() { //移除
-                               time.Sleep(time.Second * time.Duration(savestream.m4s_hls+2))
-                               m4s_cache.Delete(path)
-                       }()
-               }
-       } else {
-               cached = true
-               buf, _ = b.([]byte)
-       }
-       return
-}
-
 //直播Web服务口
 func init() {
        flog := flog.Base_add(`直播Web服务`)
        if port_f, ok := c.C.K_v.LoadV(`直播Web服务口`).(float64); ok && port_f >= 0 {
                port := int(port_f)
 
-               base_dir := savestream.base_path
-
                addr := "0.0.0.0:"
                if port == 0 {
                        addr += strconv.Itoa(p.Sys().GetFreePort())
@@ -2211,250 +966,55 @@ func init() {
                                w.Header().Set("Access-Control-Allow-Origin", "*")
                                w.Header().Set("Connection", "keep-alive")
                                w.Header().Set("Content-Transfer-Encoding", "binary")
-                               start := time.Now()
-
-                               var path string = r.URL.Path[1:]
 
-                               if !p.Checkfile().IsExist(base_dir + path) {
-                                       w.WriteHeader(http.StatusNotFound)
+                               if len(streamO.getFirstM4S()) == 0 {
+                                       w.Header().Set("Retry-After", "1")
+                                       w.WriteHeader(http.StatusServiceUnavailable)
                                        return
                                }
 
-                               if savestream.path != "" && strings.Contains(path, filepath.Base(savestream.path)) {
-                                       w.Header().Set("Server", "live")
-                                       if filepath.Ext(path) == `.dtmp` {
-                                               if strings.Contains(path, ".flv") {
-                                                       if len(savestream.front) == 0 {
-                                                               w.Header().Set("Retry-After", "1")
-                                                               w.WriteHeader(http.StatusServiceUnavailable)
-                                                               return
-                                                       }
+                               // path = base_dir+path
+                               w.Header().Set("Content-Type", "video/mp4")
+                               w.WriteHeader(http.StatusOK)
 
-                                                       // path = base_dir+path
-                                                       w.Header().Set("Content-Type", "video/x-flv")
-                                                       w.WriteHeader(http.StatusOK)
+                               flusher, flushSupport := w.(http.Flusher)
+                               if flushSupport {
+                                       flusher.Flush()
+                               }
 
-                                                       flusher, flushSupport := w.(http.Flusher)
-                                                       if flushSupport {
-                                                               flusher.Flush()
-                                                       }
+                               //写入hls头
+                               if _, err := w.Write(streamO.getFirstM4S()); err != nil {
+                                       return
+                               } else if flushSupport {
+                                       flusher.Flush()
+                               }
+
+                               cancel := make(chan struct{})
 
-                                                       //写入flv头,首tag
-                                                       if _, err := w.Write(savestream.front); err != nil {
-                                                               return
+                               //hls切片
+                               streamO.Newst_m4s.Pull_tag(map[string]func(interface{}) bool{
+                                       `m4s`: func(data interface{}) bool {
+                                               if b, ok := data.([]byte); ok {
+                                                       if _, err := w.Write(b); err != nil {
+                                                               close(cancel)
+                                                               return true
                                                        } else if flushSupport {
                                                                flusher.Flush()
                                                        }
-
-                                                       cancel := make(chan struct{})
-
-                                                       //flv流关键帧间隔切片
-                                                       savestream.stream.Pull_tag(map[string]func(interface{}) bool{
-                                                               `stream`: func(data interface{}) bool {
-                                                                       if b, ok := data.([]byte); ok {
-                                                                               if _, err := w.Write(b); err != nil {
-                                                                                       close(cancel)
-                                                                                       return true
-                                                                               } else if flushSupport {
-                                                                                       flusher.Flush()
-                                                                               }
-                                                                       }
-                                                                       return false
-                                                               },
-                                                               `close`: func(data interface{}) bool {
-                                                                       close(cancel)
-                                                                       return true
-                                                               },
-                                                       })
-
-                                                       <-cancel
-                                               } else if strings.Contains(path, ".m3u8") {
-                                                       if r.URL.Query().Get("type") == "mp4" {
-                                                               if len(savestream.front) == 0 {
-                                                                       w.Header().Set("Retry-After", "1")
-                                                                       w.WriteHeader(http.StatusServiceUnavailable)
-                                                                       return
-                                                               }
-
-                                                               // path = base_dir+path
-                                                               w.Header().Set("Content-Type", "video/mp4")
-                                                               w.WriteHeader(http.StatusOK)
-
-                                                               flusher, flushSupport := w.(http.Flusher)
-                                                               if flushSupport {
-                                                                       flusher.Flush()
-                                                               }
-
-                                                               //写入hls头
-                                                               if _, err := w.Write(savestream.front); err != nil {
-                                                                       return
-                                                               } else if flushSupport {
-                                                                       flusher.Flush()
-                                                               }
-
-                                                               cancel := make(chan struct{})
-
-                                                               //hls切片
-                                                               savestream.stream.Pull_tag(map[string]func(interface{}) bool{
-                                                                       `stream`: func(data interface{}) bool {
-                                                                               if b, ok := data.([]byte); ok {
-                                                                                       if _, err := w.Write(b); err != nil {
-                                                                                               close(cancel)
-                                                                                               return true
-                                                                                       } else if flushSupport {
-                                                                                               flusher.Flush()
-                                                                                       }
-                                                                               }
-                                                                               return false
-                                                                       },
-                                                                       `close`: func(data interface{}) bool {
-                                                                               close(cancel)
-                                                                               return true
-                                                                       },
-                                                               })
-
-                                                               <-cancel
-                                                       } else {
-                                                               w.Header().Set("Cache-Control", "max-age=1")
-                                                               w.Header().Set("Content-Type", "application/vnd.apple.mpegurl")
-                                                               w.Header().Set("Last-Modified", savestream.hls_stream.t.Format(http.TimeFormat))
-
-                                                               // //经常m4s下载速度赶不上,使用阻塞避免频繁获取列表带来的卡顿
-                                                               // if time.Now().Sub(savestream.hls_stream.t).Seconds() > 1 {
-                                                               //      time.Sleep(time.Duration(3)*time.Second)
-                                                               // }
-
-                                                               res := savestream.hls_stream.b
-
-                                                               if len(res) == 0 {
-                                                                       w.Header().Set("Retry-After", "1")
-                                                                       w.WriteHeader(http.StatusServiceUnavailable)
-                                                                       return
-                                                               }
-
-                                                               //Server-Timing
-                                                               w.Header().Set("Server-Timing", fmt.Sprintf("dur=%d", time.Since(start).Microseconds()))
-
-                                                               if _, err := w.Write(res); err != nil {
-                                                                       flog.L(`E: `, err)
-                                                                       return
-                                                               }
-                                                       }
-                                               }
-                                       } else if filepath.Ext(path) == `.m4s` {
-                                               w.Header().Set("Server", "live")
-                                               w.Header().Set("Cache-Control", "Cache-Control:public, max-age=3600")
-
-                                               path = base_dir + path
-
-                                               buf, cached, e := get_m4s_cache(path)
-
-                                               if e != nil {
-                                                       w.Header().Set("Retry-After", "1")
-                                                       w.WriteHeader(http.StatusServiceUnavailable)
-                                                       return
-                                               }
-
-                                               if len(buf) == 0 {
-                                                       flog.L(`W: `, `buf size 0`)
-                                                       w.Header().Set("Retry-After", "1")
-                                                       w.WriteHeader(http.StatusServiceUnavailable)
-                                                       return
-                                               }
-
-                                               //Server-Timing
-                                               w.Header().Add("Server-Timing", fmt.Sprintf("cache=%v;dur=%d", cached, time.Since(start).Microseconds()))
-                                               w.WriteHeader(http.StatusOK)
-                                               if _, err := w.Write(buf); err != nil {
-                                                       flog.L(`E: `, err)
-                                                       return
-                                               }
-                                       }
-                               } else {
-                                       w.Header().Set("Server", "file")
-                                       if r.URL.Query().Get("type") == "mp4" { //hls拼合
-                                               dir := base_dir + filepath.Dir(path) + "/"
-                                               if !p.Checkfile().IsExist(dir + `0.m3u8`) {
-                                                       w.WriteHeader(http.StatusNotFound)
-                                                       return
-                                               }
-                                               var m4slist []string
-                                               if e := filepath.WalkDir(dir, func(path string, info fs.DirEntry, err error) error {
-                                                       if err != nil {
-                                                               return err
-                                                       }
-                                                       if filepath.Ext(info.Name()) == ".m4s" {
-                                                               m4slist = append(m4slist, path)
-                                                       }
-                                                       return nil
-                                               }); e != nil {
-                                                       flog.L(`E: `, e)
-                                                       w.WriteHeader(http.StatusServiceUnavailable)
-                                                       return
-                                               }
-                                               m4slist = append(m4slist[len(m4slist)-1:], m4slist[:len(m4slist)-1]...)
-                                               for _, v := range m4slist {
-                                                       f, err := os.OpenFile(v, os.O_RDONLY, 0644)
-                                                       if err != nil {
-                                                               w.WriteHeader(http.StatusServiceUnavailable)
-                                                               flog.L(`E: `, err)
-                                                               return
-                                                       }
-                                                       io.Copy(w, f)
-                                                       f.Close()
                                                }
-                                               return
-                                       }
-                                       http.FileServer(http.Dir(base_dir)).ServeHTTP(w, r)
-                               }
-                       }
-                       now = func(w http.ResponseWriter, r *http.Request) {
-                               //header
-                               w.Header().Set("Access-Control-Allow-Credentials", "true")
-                               w.Header().Set("Access-Control-Allow-Headers", "*")
-                               w.Header().Set("Access-Control-Allow-Methods", "POST, GET, OPTIONS")
-                               w.Header().Set("Access-Control-Allow-Origin", "*")
-                               w.Header().Set("Cache-Control", "max-age=3")
-
-                               //最新直播流
-                               if savestream.path == `` {
-                                       flog.L(`T: `, `还没有下载直播流-直播流为空`)
-                                       w.WriteHeader(http.StatusNotFound)
-                                       return
-                               }
-
-                               path := filepath.Base(savestream.path)
-                               if strings.Contains(c.C.Live[0], "flv") {
-                                       path += ".flv.dtmp"
-                               } else {
-                                       path += "/0.m3u8.dtmp"
-                               }
+                                               return false
+                                       },
+                                       `close`: func(data interface{}) bool {
+                                               close(cancel)
+                                               return true
+                                       },
+                               })
 
-                               if !p.Checkfile().IsExist(base_dir + path) {
-                                       flog.L(`T: `, `还没有下载直播流-文件未能找到`)
-                                       w.WriteHeader(http.StatusNotFound)
-                               } else {
-                                       u, e := url.Parse("../" + path)
-                                       if e != nil {
-                                               flog.L(`E: `, e)
-                                               w.Header().Set("Retry-After", "1")
-                                               w.WriteHeader(http.StatusServiceUnavailable)
-                                               return
-                                       }
-                                       if r.URL.RawQuery != "" {
-                                               u.RawQuery = r.URL.RawQuery
-                                       }
-                                       // r.URL =
-                                       // root(w, r)
-                                       w.Header().Set("Location", r.URL.ResolveReference(u).String())
-                                       w.WriteHeader(http.StatusTemporaryRedirect)
-                               }
-                               return
+                               <-cancel
                        }
                )
                s.Handle(map[string]func(http.ResponseWriter, *http.Request){
-                       `/`:    root,
-                       `/now`: now,
+                       `/`: root,
                        `/exit`: func(w http.ResponseWriter, r *http.Request) {
                                s.Server.Shutdown(context.Background())
                        },
index 58fb8c8468cee644d7b2b66bab6173e7db592504..7281c949780d0ec7aab7f4ca61db7ec130e462e3 100644 (file)
@@ -593,7 +593,7 @@ func (replyF) preparing(s string) {
                { //附加功能 obs结束 `savestream`结束
                        Obs_R(false)
                        Obsf(false)
-                       Savestream_wait()
+                       streamO.Stop()
                        go ShowRevf()
                        c.C.Liveing = false
                }
@@ -618,7 +618,7 @@ func (replyF) live(s string) {
                { //附加功能 obs录播
                        Obsf(true)
                        Obs_R(true)
-                       go Savestreamf()
+                       go streamO.Start()
                }
                {
                        c.C.Rev = 0.0                    //营收
diff --git a/Reply/steam/stream.go b/Reply/steam/stream.go
deleted file mode 100644 (file)
index c2a5751..0000000
+++ /dev/null
@@ -1,49 +0,0 @@
-package stream
-
-import (
-       "path/filepath"
-
-       c "github.com/qydysky/bili_danmu/CV"
-
-       log "github.com/qydysky/part/log"
-       signal "github.com/qydysky/part/signal"
-)
-
-type Stream struct {
-       Status *signal.Signal //IsLive()是否运行中
-       log    *log.Log_interface
-       config Stream_Config //配置
-}
-
-type Stream_Config struct {
-       save_path     string //直播流保存目录
-       want_qn       int    //直播流清晰度
-       want_type     string //直播流类型
-       bufsize       int    //直播hls流缓冲
-       banlance_host bool   //直播hls流均衡
-}
-
-func (t *Stream) LoadConfig() {
-       //读取配置
-       if path, ok := c.C.K_v.LoadV("直播流保存位置").(string); ok {
-               if path, err := filepath.Abs(path); err == nil {
-                       t.config.save_path = path + "/"
-               }
-       }
-       if v, ok := c.C.K_v.LoadV(`直播hls流缓冲`).(float64); ok && v > 0 {
-               t.config.bufsize = int(v)
-       }
-       if v, ok := c.C.K_v.LoadV(`直播hls流均衡`).(bool); ok {
-               t.config.banlance_host = v
-       }
-       if v, ok := c.C.K_v.LoadV(`直播流清晰度`).(int); ok {
-               t.config.want_qn = v
-       }
-       if v, ok := c.C.K_v.LoadV(`直播流类型`).(string); ok {
-               t.config.want_type = v
-       }
-}
-
-func (t *Stream) Start() {
-       t.log = c.C.Log.Base(`直播流保存`)
-}
diff --git a/Reply/stream.go b/Reply/stream.go
new file mode 100644 (file)
index 0000000..2950a74
--- /dev/null
@@ -0,0 +1,499 @@
+package reply
+
+import (
+       "bytes"
+       "encoding/base64"
+       "errors"
+       "io"
+       "net/http"
+       "net/url"
+       "path/filepath"
+       "strconv"
+       "strings"
+       "time"
+
+       c "github.com/qydysky/bili_danmu/CV"
+       F "github.com/qydysky/bili_danmu/F"
+
+       p "github.com/qydysky/part"
+       funcCtrl "github.com/qydysky/part/funcCtrl"
+       log "github.com/qydysky/part/log"
+       msgq "github.com/qydysky/part/msgq"
+       reqf "github.com/qydysky/part/reqf"
+       signal "github.com/qydysky/part/signal"
+       sync "github.com/qydysky/part/sync"
+)
+
+type M4SStream struct {
+       Status               *signal.Signal //IsLive()是否运行中
+       exitSign             *signal.Signal //IsLive()是否等待退出中
+       log                  *log.Log_interface
+       config               M4SStream_Config //配置
+       stream_last_modified time.Time        //流地址更新时间
+       stream_expires       int64            //流到期时间
+       last_m4s             *m4s_link_item   //最后一个切片
+       stream_hosts         sync.Map         //使用的流服务器
+       Newst_m4s            *msgq.Msgq       //m4s消息 tag:m4s
+       first_m4s            []byte           //m4s起始块
+}
+
+type M4SStream_Config struct {
+       save_path     string //直播流保存目录
+       want_qn       int    //直播流清晰度
+       want_type     string //直播流类型
+       bufsize       int    //直播hls流缓冲
+       banlance_host bool   //直播hls流均衡
+}
+
+type m4s_link_item struct {
+       Url    string // m4s链接
+       Base   string // m4s文件名
+       status int    // 下载状态 0:未下载 1:正在下载 2:下载完成 3:下载失败
+       data   []byte // 下载的数据
+}
+
+func (t *m4s_link_item) isInit() bool {
+       return strings.Contains(t.Base, "h")
+}
+
+func (t *m4s_link_item) getNo() (int, error) {
+       var base = t.Base
+       if t.isInit() {
+               base = base[1:]
+       }
+       return strconv.Atoi(base[:len(base)-4])
+}
+
+func (t *M4SStream) LoadConfig(kv *sync.Map, l *log.Log_interface) {
+       //读取配置
+       if path, ok := kv.LoadV("直播流保存位置").(string); ok {
+               if path, err := filepath.Abs(path); err == nil {
+                       t.config.save_path = path + "/"
+               }
+       }
+       if v, ok := kv.LoadV(`直播hls流缓冲`).(float64); ok && v > 0 {
+               t.config.bufsize = int(v)
+       }
+       if v, ok := kv.LoadV(`直播hls流均衡`).(bool); ok {
+               t.config.banlance_host = v
+       }
+       if v, ok := kv.LoadV(`直播流清晰度`).(float64); ok {
+               t.config.want_qn = int(v)
+       }
+       if v, ok := kv.LoadV(`直播流类型`).(string); ok {
+               t.config.want_type = v
+       }
+       t.log = l.Base(`直播流保存`)
+}
+
+func (t *M4SStream) getFirstM4S() []byte {
+       return t.first_m4s
+}
+
+func (t *M4SStream) fetchCheckStream(tmpc *c.Common) bool {
+       // 获取流地址
+       tmpc.Live_want_qn = t.config.want_qn
+       if F.Get(tmpc).Get(`Live`); len(tmpc.Live) == 0 {
+               return false
+       }
+
+       // 保存流地址过期时间
+       if m3u8_url, err := url.Parse(tmpc.Live[0]); err != nil {
+               t.log.L(`E: `, err.Error())
+               return false
+       } else {
+               expires, _ := strconv.Atoi(m3u8_url.Query().Get("expires"))
+               t.stream_expires = int64(expires)
+       }
+
+       // 检查是否可以获取
+       CookieM := make(map[string]string)
+       tmpc.Cookie.Range(func(k, v interface{}) bool {
+               CookieM[k.(string)] = v.(string)
+               return true
+       })
+
+       var req = reqf.New()
+       if e := req.Reqf(reqf.Rval{
+               Url:       tmpc.Live[0],
+               Retry:     10,
+               SleepTime: 1000,
+               Proxy:     tmpc.Proxy,
+               Header: map[string]string{
+                       `Cookie`: reqf.Map_2_Cookies_String(CookieM),
+               },
+               Timeout:          5 * 1000,
+               JustResponseCode: true,
+       }); e != nil {
+               t.log.L(`W: `, e)
+       }
+
+       if req.Response == nil {
+               t.log.L(`W: `, `live响应错误`)
+               return false
+       } else if req.Response.StatusCode != 200 {
+               t.log.L(`W: `, `live响应错误`, req.Response.Status, string(req.Respon))
+               return false
+       }
+       return true
+}
+
+func (t *M4SStream) fetchParseM3U8(tmpc *c.Common) (m4s_links []*m4s_link_item, m3u8_addon []byte) {
+       // 请求解析m3u8内容
+       for _, v := range tmpc.Live {
+               m3u8_url, err := url.Parse(v)
+               if err != nil {
+                       t.log.L(`E: `, err.Error())
+                       return
+               }
+
+               // 设置请求参数
+               rval := reqf.Rval{
+                       Url:            m3u8_url.String(),
+                       ConnectTimeout: 2000,
+                       ReadTimeout:    1000,
+                       Timeout:        2000,
+                       Proxy:          c.C.Proxy,
+                       Header: map[string]string{
+                               `Host`:            m3u8_url.Host,
+                               `User-Agent`:      `Mozilla/5.0 (X11; Linux x86_64; rv:83.0) Gecko/20100101 Firefox/83.0`,
+                               `Accept`:          `*/*`,
+                               `Accept-Language`: `zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2`,
+                               `Accept-Encoding`: `gzip, deflate, br`,
+                               `Origin`:          `https://live.bilibili.com`,
+                               `Connection`:      `keep-alive`,
+                               `Pragma`:          `no-cache`,
+                               `Cache-Control`:   `no-cache`,
+                               `Referer`:         "https://live.bilibili.com/",
+                       },
+               }
+               if !t.stream_last_modified.IsZero() {
+                       rval.Header[`If-Modified-Since`] = t.stream_last_modified.Add(time.Second).Format("Mon, 02 Jan 2006 15:04:05 CST")
+               }
+
+               // 开始请求
+               var r = reqf.New()
+               if e := r.Reqf(rval); e != nil {
+                       continue
+               }
+
+               if r.Response.StatusCode == http.StatusNotModified {
+                       t.log.L(`T: `, `hls未更改`)
+                       return
+               }
+
+               // 保存最后m3u8修改时间
+               if last_mod, ok := r.Response.Header[`Last-Modified`]; ok && len(last_mod) > 0 {
+                       if lm, e := time.Parse("Mon, 02 Jan 2006 15:04:05 CST", last_mod[0]); e == nil {
+                               t.stream_last_modified = lm
+                       }
+               }
+
+               // m3u8字节流
+               var m3u8_respon = r.Respon
+
+               // base64解码
+               if len(m3u8_respon) != 0 && !bytes.Contains(m3u8_respon, []byte("#")) {
+                       m3u8_respon, err = base64.StdEncoding.DecodeString(string(m3u8_respon))
+                       if err != nil {
+                               t.log.L(`W: `, err, string(m3u8_respon))
+                               return
+                       }
+               }
+
+               // 解析m3u8
+               for _, line := range bytes.Split(m3u8_respon, []byte("\n")) {
+                       if len(line) == 0 {
+                               continue
+                       }
+
+                       var m4s_link = "" //切片文件名
+
+                       //获取附加的m3u8字节 忽略bili定制拓展
+                       if !bytes.Contains(line, []byte(`#EXT-X-BILI`)) {
+                               if t.last_m4s == nil {
+                                       m3u8_addon = append(m3u8_addon, line...)
+                                       m3u8_addon = append(m3u8_addon, []byte("\n")...)
+                               } else {
+                                       if bytes.Contains(line, []byte(`#EXTINF`)) ||
+                                               !bytes.Contains(line, []byte(`#`)) {
+                                               m3u8_addon = append(m3u8_addon, line...)
+                                               m3u8_addon = append(m3u8_addon, []byte("\n")...)
+                                       }
+                               }
+                       }
+
+                       //获取切片文件名
+                       if bytes.Contains(line, []byte("EXT-X-MAP")) {
+                               o := bytes.Index(line, []byte(`EXT-X-MAP:URI="`)) + 15
+                               e := bytes.Index(line[o:], []byte(`"`)) + o
+                               m4s_link = string(line[o:e])
+                       } else if bytes.Contains(line, []byte("#EXT-X")) { //忽略扩展标签
+                               continue
+                       } else if bytes.Contains(line, []byte(".m4s")) {
+                               m4s_link = string(line)
+                       } else {
+                               continue
+                       }
+
+                       //获取切片地址
+                       u, e := url.Parse("./" + m4s_link + "?trid=" + m3u8_url.Query().Get("trid"))
+                       if e != nil {
+                               t.log.L(`E: `, e)
+                               return
+                       }
+
+                       //将切片添加到返回切片数组
+                       m4s_links = append(m4s_links, &m4s_link_item{
+                               Url:  m3u8_url.ResolveReference(u).String(),
+                               Base: m4s_link,
+                       })
+               }
+
+               // 设置最后的切片
+               defer func(last_m4s *m4s_link_item) {
+                       t.last_m4s = last_m4s
+               }(m4s_links[len(m4s_links)-1])
+
+               if t.last_m4s == nil {
+                       return
+               }
+
+               // 只返回新增加的
+               for k, m4s_link := range m4s_links {
+                       if m4s_link.Base == t.last_m4s.Base {
+                               // 只返回新增加的切片
+                               m4s_links = m4s_links[k+1:]
+
+                               // 只返回新增加的m3u8字节
+                               if index := bytes.Index(m3u8_addon, []byte(m4s_link.Base)); index != -1 {
+                                       index += len([]byte(m4s_link.Base))
+                                       if index == len(m3u8_addon) {
+                                               m3u8_addon = []byte{}
+                                       } else {
+                                               m3u8_addon = m3u8_addon[index+1:]
+                                       }
+                               }
+                               return
+                       }
+               }
+
+               // 来到此处说明出现了丢失 尝试补充
+               var guess_end_no, _ = m4s_links[0].getNo()
+               for no, _ := t.last_m4s.getNo(); no < guess_end_no; no += 1 {
+                       // 补充m3u8
+                       m3u8_addon = append([]byte(`#EXTINF:1.00\n`+strconv.Itoa(no)+`.m4s\n`), m3u8_addon...)
+
+                       //获取切片地址
+                       u, e := url.Parse("./" + strconv.Itoa(no) + `.m4s`)
+                       if e != nil {
+                               t.log.L(`E: `, e)
+                               return
+                       }
+
+                       //将切片添加到返回切片数组前
+                       m4s_links = append([]*m4s_link_item{
+                               {
+                                       Url:  m3u8_url.ResolveReference(u).String(),
+                                       Base: strconv.Itoa(no) + `.m4s`,
+                               },
+                       }, m4s_links...)
+               }
+
+               // 请求解析成功,退出获取循环
+               break
+       }
+
+       return
+}
+
+func (t *M4SStream) saveStream(tmpc *c.Common) {
+       // 设置保存路径
+       var save_path = t.config.save_path + strconv.Itoa(tmpc.Roomid) + "_" + time.Now().Format("2006_01_02_15-04-05-000") + `/`
+
+       // 显示保存位置
+       if rel, err := filepath.Rel(t.config.save_path, save_path); err == nil {
+               t.log.L(`I: `, "保存到", rel+`/0.m3u8`)
+       } else {
+               t.log.L(`W: `, err)
+       }
+
+       // 获取流
+       if strings.Contains(tmpc.Live[0], `m3u8`) {
+               t.stream_expires = time.Now().Add(time.Minute * 2).Unix() // 流链接过期时间
+
+               // 同时下载数限制
+               var download_limit = funcCtrl.BlockFuncN{
+                       Max: 2,
+               }
+
+               // 下载循环
+               for download_seq := []*m4s_link_item{}; ; {
+                       // 下载切片
+                       for _, v := range download_seq {
+                               v.status = 1 // 设置切片状态为正在下载
+
+                               // 均衡负载
+                               if link_url, e := url.Parse(v.Url); e == nil {
+                                       if t.stream_hosts.Len() != 1 {
+                                               t.stream_hosts.Range(func(key, value interface{}) bool {
+                                                       // 故障转移
+                                                       if v.status == 3 && link_url.Host == key.(string) {
+                                                               return true
+                                                       }
+                                                       // 随机
+                                                       link_url.Host = key.(string)
+                                                       return false
+                                               })
+                                       }
+                                       v.Url = link_url.String()
+                               }
+
+                               download_limit.Block()
+                               go func(link *m4s_link_item, path string) {
+                                       defer download_limit.UnBlock()
+
+                                       r := reqf.New()
+                                       if e := r.Reqf(reqf.Rval{
+                                               Url:            link.Url,
+                                               SaveToPath:     path + link.Base,
+                                               ConnectTimeout: 2000,
+                                               ReadTimeout:    1000,
+                                               Timeout:        2000,
+                                               Proxy:          tmpc.Proxy,
+                                       }); e != nil && !errors.Is(e, io.EOF) {
+                                               link.status = 3 // 设置切片状态为下载失败
+                                       } else {
+                                               if usedt := r.UsedTime.Seconds(); usedt > 700 {
+                                                       t.log.L(`I: `, `hls切片下载慢`, usedt, `ms`)
+                                               }
+                                               link.data = r.Respon
+                                               link.status = 2 // 设置切片状态为下载完成
+                                       }
+                               }(v, save_path)
+                       }
+
+                       // 等待队列下载完成
+                       download_limit.None()
+                       download_limit.UnNone()
+
+                       //添加失败切片 传递切片
+                       {
+                               var tmp_seq []*m4s_link_item
+                               for _, v := range download_seq {
+                                       if strings.Contains(v.Base, `h`) {
+                                               t.first_m4s = v.data
+                                       }
+
+                                       if v.status == 3 {
+                                               tmp_seq = append(tmp_seq, v)
+                                       } else {
+                                               t.Newst_m4s.Push_tag(`m4s`, v.data)
+                                       }
+                               }
+                               download_seq = tmp_seq
+                       }
+
+                       // 停止录制
+                       if !t.Status.Islive() {
+                               if len(download_seq) != 0 {
+                                       t.log.L(`I: `, `下载最后切片:`, len(download_seq))
+                                       continue
+                               }
+                               break
+                       }
+
+                       // 刷新流地址
+                       if time.Now().Unix()+60 > t.stream_expires {
+                               t.fetchCheckStream(tmpc)
+                       }
+
+                       // 获取解析m3u8
+                       var m4s_links, m3u8_addon = t.fetchParseM3U8(tmpc)
+                       if len(m4s_links) == 0 {
+                               time.Sleep(time.Second)
+                               continue
+                       }
+
+                       // 添加新切片到下载队列
+                       download_seq = append(download_seq, m4s_links...)
+
+                       // 添加m3u8字节
+                       p.File().FileWR(p.Filel{
+                               File:    save_path + "0.m3u8.dtmp",
+                               Loc:     -1,
+                               Context: []interface{}{m3u8_addon},
+                       })
+               }
+
+               // 结束
+               if p.Checkfile().IsExist(save_path + "0.m3u8.dtmp") {
+                       f := p.File()
+                       f.FileWR(p.Filel{
+                               File:    save_path + "0.m3u8.dtmp",
+                               Loc:     -1,
+                               Context: []interface{}{"#EXT-X-ENDLIST"},
+                       })
+                       p.FileMove(save_path+"0.m3u8.dtmp", save_path+"0.m3u8")
+               }
+
+       }
+}
+
+func (t *M4SStream) Start() {
+       // 清晰度-1 不保存
+       if t.config.want_qn == -1 {
+               return
+       }
+
+       // 状态检测与设置
+       if t.Status.Islive() {
+               t.log.L(`T: `, `已存在实例`)
+               return
+       }
+       t.Status = signal.Init()
+       defer t.Status.Done()
+
+       // 初始化切片消息
+       t.Newst_m4s = msgq.New(10)
+
+       var tmpc = c.C
+
+       // 主循环
+       for t.Status.Islive() {
+               // 是否在直播
+               F.Get(&tmpc).Get(`Liveing`)
+               if !tmpc.Liveing {
+                       t.log.L(`T: `, `未直播`)
+                       break
+               }
+
+               // 获取 and 检查流地址状态
+               if !t.fetchCheckStream(&tmpc) {
+                       time.Sleep(time.Second * 5)
+                       continue
+               }
+
+               // 设置均衡负载
+               for _, v := range tmpc.Live {
+                       if url_struct, e := url.Parse(v); e == nil {
+                               t.stream_hosts.Store(url_struct.Hostname(), nil)
+                       }
+                       if !t.config.banlance_host {
+                               break
+                       }
+               }
+
+               // 保存流
+               t.saveStream(&tmpc)
+       }
+
+       t.log.L(`T: `, `结束`)
+       t.exitSign.Done()
+}
+
+func (t *M4SStream) Stop() {
+       t.exitSign = signal.Init()
+       t.Status.Done()
+       t.exitSign.Wait()
+}
index 69d476ff281f833b92095211733bdd9362963c5a..0e172b2c1c882378a6229e3ee878acf7cc1cbd40 100644 (file)
@@ -270,7 +270,7 @@ func Demo(roomid ...int) {
 
                                                { //附加功能 进房间发送弹幕 直播流保存 营收
                                                        go reply.Entry_danmu()
-                                                       go reply.Savestreamf()
+                                                       c.C.Danmu_Main_mq.Push_tag(`savestream`, nil)
                                                        go reply.ShowRevf()
                                                        //小心心
                                                        go F.F_x25Kn()
@@ -310,7 +310,7 @@ func Demo(roomid ...int) {
                                }
                        }
                        { //附加功能 直播流停止
-                               reply.Savestream_wait()
+                               reply.StreamOStop()
                                reply.Save_to_json(-1, []interface{}{`{}]`})
                        }
                        p.Sys().Timeoutf(1)