]> 127.0.0.1 Git - bili_danmu/.git/commitdiff
支持多房间保存直播流
authorqydysky <qydysky@foxmail.com>
Wed, 20 Apr 2022 10:55:19 +0000 (18:55 +0800)
committerqydysky <qydysky@foxmail.com>
Wed, 20 Apr 2022 10:55:19 +0000 (18:55 +0800)
.gitignore
F/cmd/cmd.go [moved from F/cmd.go with 80% similarity]
Reply/F.go
Reply/Reply.go
Reply/stream.go
bili_danmu.go
demo/config/config_K_v.json

index 774147dff2809d9b91866c1d2213613cf08a690d..98826839432382ac33c91598db7f7eb9c62c78b6 100644 (file)
@@ -19,3 +19,4 @@ demo/tts.mp3
 demo/cpu.pprof
 demo/qr.png
 demo/live
+demo/main.exe
similarity index 80%
rename from F/cmd.go
rename to F/cmd/cmd.go
index d5949f8f0fb1fb810c450c0e30b7f307e0643ae3..04f31946d32a67250868b5071798961f05072bea 100644 (file)
--- a/F/cmd.go
@@ -1,4 +1,4 @@
-package F
+package Cmd
 
 import (
        "bufio"
@@ -9,6 +9,8 @@ import (
        "time"
 
        c "github.com/qydysky/bili_danmu/CV"
+       F "github.com/qydysky/bili_danmu/F"
+       reply "github.com/qydysky/bili_danmu/Reply"
        send "github.com/qydysky/bili_danmu/Send"
 )
 
@@ -52,12 +54,16 @@ func Cmd() {
                        cmdlog.L(`W: `, "不支持功能键")
                } else if inputs[0] == 32 { // 开头
                        //录制切换
-                       if strings.Contains(inputs, ` rec`) && c.C.Roomid != 0 {
-                               if !c.C.Liveing {
-                                       cmdlog.L(`W: `, "不能切换录制状态,未在直播")
-                                       continue
+                       if strings.Contains(inputs, ` rec`) {
+                               if len(inputs) > 4 {
+                                       if room, err := strconv.Atoi(inputs[4:]); err == nil {
+                                               c.C.Danmu_Main_mq.Push_tag(`savestream`, room)
+                                               continue
+                                       }
+                                       cmdlog.L(`W: `, "输入错误", inputs)
+                               } else {
+                                       c.C.Danmu_Main_mq.Push_tag(`savestream`, c.C.Roomid)
                                }
-                               c.C.Danmu_Main_mq.Push_tag(`savestream`, nil)
                                continue
                        }
                        //直播间切换
@@ -76,9 +82,9 @@ func Cmd() {
                                        cmdlog.L(`W: `, "输入错误", inputs)
                                        continue
                                }
-                               for k, v := range Feed_list() {
+                               for k, v := range F.Feed_list() {
                                        liveList[` live`+strconv.Itoa(k)] = v.Roomid
-                                       fmt.Printf("%d\t%s\n\t\t\t%s\n", k, v.Uname, v.Title)
+                                       fmt.Printf("%d\t%s(%d)\n\t\t\t%s\n", k, v.Uname, v.Roomid, v.Title)
                                }
                                fmt.Println("回复' live(序号)'进入直播间")
                                fmt.Print("\n")
@@ -91,7 +97,7 @@ func Cmd() {
                                        continue
                                }
                                //获取cookie
-                               Get(&c.C).Get(`Cookie`)
+                               F.Get(&c.C).Get(`Cookie`)
 
                                continue
                        }
@@ -102,7 +108,7 @@ func Cmd() {
                                        continue
                                }
                                //获取小心心
-                               go F_x25Kn()
+                               go F.F_x25Kn()
 
                                continue
                        }
@@ -114,7 +120,7 @@ func Cmd() {
                                }
 
                                fmt.Print("\n")
-                               for k, v := range SearchUP(inputs[7:]) {
+                               for k, v := range F.SearchUP(inputs[7:]) {
                                        liveList[` live`+strconv.Itoa(k)] = v.Roomid
                                        if v.Is_live {
                                                fmt.Printf("%d\t%s\t%s\n", k, `☁`, v.Uname)
@@ -135,7 +141,7 @@ func Cmd() {
                        //当前直播间信息
                        if strings.Contains(inputs, ` room`) && c.C.Roomid != 0 {
                                fmt.Print("\n")
-                               fmt.Println("当前直播间信息")
+                               fmt.Println("当前直播间(" + strconv.Itoa(c.C.Roomid) + ")信息")
                                {
                                        living := `未在直播`
                                        if c.C.Liveing {
@@ -162,6 +168,21 @@ func Cmd() {
                                if c.C.Stream_url != "" {
                                        fmt.Println(`直播Web服务:`, c.C.Stream_url)
                                }
+                               if reply.StreamOStatus(c.C.Roomid) {
+                                       fmt.Println(`正在录制当前房间`)
+                               } else {
+                                       fmt.Println(`未在录制当前房间`)
+                               }
+
+                               var array = reply.StreamOCommon(-1)
+                               if len(array) > 1 {
+                                       fmt.Println(`正在录制的其他房间:`)
+                                       for _, v := range array {
+                                               fmt.Println("\t" + v.Uname + "(" + strconv.Itoa(v.Roomid) + ") " + v.Title)
+                                       }
+                               }
+                               fmt.Println("输入` rec` 来启停当前房间录制, 输入` rec房间号` 来启停其他录制")
+
                                fmt.Print("\n")
 
                                continue
index 77ef155a460bc55117e29e387d27a9c0a7bc4a21..7416720b43a96a0f74f9cfc34d8a43682cf09020 100644 (file)
@@ -186,13 +186,13 @@ func init() {
 }
 
 //设定字幕文件名,为""时停止输出
-func Ass_f(file string, st time.Time) {
+func Ass_f(save_path string, file string, st time.Time) {
        ass.file = file
        if file == "" {
                return
        }
 
-       if rel, err := filepath.Rel(streamO.config.save_path, ass.file); err == nil {
+       if rel, err := filepath.Rel(save_path, ass.file); err == nil {
                c.C.Log.Base(`Ass`).L(`I: `, "保存到", rel+".ass")
        } else {
                c.C.Log.Base(`Ass`).L(`I: `, "保存到", ass.file+".ass")
@@ -247,26 +247,69 @@ func dtos(t time.Duration) string {
 
 //hls
 //https://datatracker.ietf.org/doc/html/draft-pantos-http-live-streaming
-var streamO = new(M4SStream)
+var streamO psync.Map
+
+func StreamOCommon(roomid int) (array []c.Common) {
+       if roomid != -1 { //返回特定房间
+               if v, ok := streamO.Load(roomid); ok {
+                       return []c.Common{v.(*M4SStream).Common()}
+               }
+       } else { //返回所有
+               streamO.Range(func(k, v interface{}) bool {
+                       array = append(array, v.(*M4SStream).Common())
+                       return true
+               })
+       }
+       return
+}
 
 func init() {
        //使用带tag的消息队列在功能间传递消息
        c.C.Danmu_Main_mq.Pull_tag(msgq.FuncMap{
                `savestream`: func(data interface{}) bool {
-                       if streamO.Status.Islive() {
-                               streamO.Stop()
+                       if roomid, ok := data.(int); ok {
+                               if v, ok := streamO.Load(roomid); ok {
+                                       if v.(*M4SStream).Status.Islive() {
+                                               v.(*M4SStream).Stop()
+                                               streamO.Delete(roomid)
+                                       }
+                               } else {
+                                       var (
+                                               tmp    = new(M4SStream)
+                                               common = c.C
+                                       )
+                                       common.Roomid = roomid
+                                       tmp.LoadConfig(common, c.C.Log)
+                                       streamO.Store(roomid, tmp)
+                                       go tmp.Start()
+                               }
                        } else {
-                               streamO.LoadConfig(&c.C.K_v, c.C.Log)
-                               go streamO.Start()
+                               flog.L(`E: `, `savestream必须为数字房间号`)
                        }
                        return false
                },
        })
 }
 
-func StreamOStop() {
-       if streamO.Status.Islive() {
-               streamO.Stop()
+func StreamOStatus(roomid int) bool {
+       v, ok := streamO.Load(roomid)
+       return ok && (v.(*M4SStream).Status.Islive() || v.(*M4SStream).exitSign.Islive())
+}
+
+func StreamOStop(roomid int) {
+       if roomid != -1 { // 针对某房间
+               if v, ok := streamO.Load(roomid); ok {
+                       if v.(*M4SStream).Status.Islive() {
+                               v.(*M4SStream).Stop()
+                       }
+               }
+       } else { //所有房间
+               streamO.Range(func(_, v interface{}) bool {
+                       if v.(*M4SStream).Status.Islive() {
+                               v.(*M4SStream).Stop()
+                       }
+                       return true
+               })
        }
 }
 
@@ -967,7 +1010,13 @@ func init() {
                                w.Header().Set("Connection", "keep-alive")
                                w.Header().Set("Content-Transfer-Encoding", "binary")
 
-                               if len(streamO.getFirstM4S()) == 0 {
+                               // 获取当前房间的
+                               var currentStreamO *M4SStream
+                               if v, ok := streamO.Load(c.C.Roomid); ok {
+                                       currentStreamO = v.(*M4SStream)
+                               }
+
+                               if len(currentStreamO.getFirstM4S()) == 0 {
                                        w.Header().Set("Retry-After", "1")
                                        w.WriteHeader(http.StatusServiceUnavailable)
                                        return
@@ -983,7 +1032,7 @@ func init() {
                                }
 
                                //写入hls头
-                               if _, err := w.Write(streamO.getFirstM4S()); err != nil {
+                               if _, err := w.Write(currentStreamO.getFirstM4S()); err != nil {
                                        return
                                } else if flushSupport {
                                        flusher.Flush()
@@ -992,7 +1041,7 @@ func init() {
                                cancel := make(chan struct{})
 
                                //hls切片
-                               streamO.Newst_m4s.Pull_tag(map[string]func(interface{}) bool{
+                               currentStreamO.Newst_m4s.Pull_tag(map[string]func(interface{}) bool{
                                        `m4s`: func(data interface{}) bool {
                                                if b, ok := data.([]byte); ok {
                                                        if _, err := w.Write(b); err != nil {
index 7281c949780d0ec7aab7f4ca61db7ec130e462e3..e04c95b074531a521e29f693b78d0f67757c0e5d 100644 (file)
@@ -593,11 +593,13 @@ func (replyF) preparing(s string) {
                { //附加功能 obs结束 `savestream`结束
                        Obs_R(false)
                        Obsf(false)
-                       streamO.Stop()
                        go ShowRevf()
                        c.C.Liveing = false
                }
                if p.Sys().Type(roomid) == "float64" {
+                       // 停止此房间录制
+                       StreamOStop(int(roomid.(float64)))
+
                        Gui_show(Itos([]interface{}{"房间", roomid, "下播了"}), "0room")
                        msglog.L(`I: `, "房间", int(roomid.(float64)), "下播了")
                        return
@@ -618,7 +620,6 @@ func (replyF) live(s string) {
                { //附加功能 obs录播
                        Obsf(true)
                        Obs_R(true)
-                       go streamO.Start()
                }
                {
                        c.C.Rev = 0.0                    //营收
@@ -626,6 +627,14 @@ func (replyF) live(s string) {
                        c.C.Live_Start_Time = time.Now() //开播h时间
                }
                if p.Sys().Type(roomid) == "float64" {
+                       //开始录制
+                       go func() {
+                               if v, ok := c.C.K_v.LoadV(`直播流当前房间开播时停止其他流`).(bool); ok && v {
+                                       StreamOStop(-1) //停止其他房间录制
+                               }
+                               c.C.Danmu_Main_mq.Push_tag(`savestream`, roomid)
+                       }()
+
                        Gui_show(Itos([]interface{}{"房间", roomid, "开播了"}), "0room")
                        msglog.L(`I: `, "房间", int(roomid.(float64)), "开播了")
                        return
index 2950a7402333bb87eb203f25fa09753b67ea1f91..147cfd4050b8bb3c1caf48dac674346b4b078ed6 100644 (file)
@@ -25,16 +25,17 @@ import (
 )
 
 type M4SStream struct {
-       Status               *signal.Signal //IsLive()是否运行中
-       exitSign             *signal.Signal //IsLive()是否等待退出中
-       log                  *log.Log_interface
-       config               M4SStream_Config //配置
-       stream_last_modified time.Time        //流地址更新时间
-       stream_expires       int64            //流到期时间
-       last_m4s             *m4s_link_item   //最后一个切片
-       stream_hosts         sync.Map         //使用的流服务器
-       Newst_m4s            *msgq.Msgq       //m4s消息 tag:m4s
-       first_m4s            []byte           //m4s起始块
+       Status               *signal.Signal     //IsLive()是否运行中
+       exitSign             *signal.Signal     //IsLive()是否等待退出中
+       log                  *log.Log_interface //日志
+       config               M4SStream_Config   //配置
+       stream_last_modified time.Time          //流地址更新时间
+       stream_expires       int64              //流到期时间
+       last_m4s             *m4s_link_item     //最后一个切片
+       stream_hosts         sync.Map           //使用的流服务器
+       Newst_m4s            *msgq.Msgq         //m4s消息 tag:m4s
+       first_m4s            []byte             //m4s起始块
+       common               c.Common           //通用配置副本
 }
 
 type M4SStream_Config struct {
@@ -64,25 +65,31 @@ func (t *m4s_link_item) getNo() (int, error) {
        return strconv.Atoi(base[:len(base)-4])
 }
 
-func (t *M4SStream) LoadConfig(kv *sync.Map, l *log.Log_interface) {
+func (t *M4SStream) Common() c.Common {
+       return t.common
+}
+
+func (t *M4SStream) LoadConfig(common c.Common, l *log.Log_interface) {
        //读取配置
-       if path, ok := kv.LoadV("直播流保存位置").(string); ok {
+       if path, ok := common.K_v.LoadV("直播流保存位置").(string); ok {
                if path, err := filepath.Abs(path); err == nil {
                        t.config.save_path = path + "/"
                }
        }
-       if v, ok := kv.LoadV(`直播hls流缓冲`).(float64); ok && v > 0 {
+       if v, ok := common.K_v.LoadV(`直播hls流缓冲`).(float64); ok && v > 0 {
                t.config.bufsize = int(v)
        }
-       if v, ok := kv.LoadV(`直播hls流均衡`).(bool); ok {
+       if v, ok := common.K_v.LoadV(`直播hls流均衡`).(bool); ok {
                t.config.banlance_host = v
        }
-       if v, ok := kv.LoadV(`直播流清晰度`).(float64); ok {
+       if v, ok := common.K_v.LoadV(`直播流清晰度`).(float64); ok {
                t.config.want_qn = int(v)
        }
-       if v, ok := kv.LoadV(`直播流类型`).(string); ok {
+       if v, ok := common.K_v.LoadV(`直播流类型`).(string); ok {
                t.config.want_type = v
        }
+
+       t.common = common
        t.log = l.Base(`直播流保存`)
 }
 
@@ -90,15 +97,15 @@ func (t *M4SStream) getFirstM4S() []byte {
        return t.first_m4s
 }
 
-func (t *M4SStream) fetchCheckStream(tmpc *c.Common) bool {
+func (t *M4SStream) fetchCheckStream() bool {
        // 获取流地址
-       tmpc.Live_want_qn = t.config.want_qn
-       if F.Get(tmpc).Get(`Live`); len(tmpc.Live) == 0 {
+       t.common.Live_want_qn = t.config.want_qn
+       if F.Get(&t.common).Get(`Live`); len(t.common.Live) == 0 {
                return false
        }
 
        // 保存流地址过期时间
-       if m3u8_url, err := url.Parse(tmpc.Live[0]); err != nil {
+       if m3u8_url, err := url.Parse(t.common.Live[0]); err != nil {
                t.log.L(`E: `, err.Error())
                return false
        } else {
@@ -108,17 +115,17 @@ func (t *M4SStream) fetchCheckStream(tmpc *c.Common) bool {
 
        // 检查是否可以获取
        CookieM := make(map[string]string)
-       tmpc.Cookie.Range(func(k, v interface{}) bool {
+       t.common.Cookie.Range(func(k, v interface{}) bool {
                CookieM[k.(string)] = v.(string)
                return true
        })
 
        var req = reqf.New()
        if e := req.Reqf(reqf.Rval{
-               Url:       tmpc.Live[0],
+               Url:       t.common.Live[0],
                Retry:     10,
                SleepTime: 1000,
-               Proxy:     tmpc.Proxy,
+               Proxy:     t.common.Proxy,
                Header: map[string]string{
                        `Cookie`: reqf.Map_2_Cookies_String(CookieM),
                },
@@ -138,9 +145,9 @@ func (t *M4SStream) fetchCheckStream(tmpc *c.Common) bool {
        return true
 }
 
-func (t *M4SStream) fetchParseM3U8(tmpc *c.Common) (m4s_links []*m4s_link_item, m3u8_addon []byte) {
+func (t *M4SStream) fetchParseM3U8() (m4s_links []*m4s_link_item, m3u8_addon []byte) {
        // 请求解析m3u8内容
-       for _, v := range tmpc.Live {
+       for _, v := range t.common.Live {
                m3u8_url, err := url.Parse(v)
                if err != nil {
                        t.log.L(`E: `, err.Error())
@@ -307,9 +314,9 @@ func (t *M4SStream) fetchParseM3U8(tmpc *c.Common) (m4s_links []*m4s_link_item,
        return
 }
 
-func (t *M4SStream) saveStream(tmpc *c.Common) {
+func (t *M4SStream) saveStream() {
        // 设置保存路径
-       var save_path = t.config.save_path + strconv.Itoa(tmpc.Roomid) + "_" + time.Now().Format("2006_01_02_15-04-05-000") + `/`
+       var save_path = t.config.save_path + strconv.Itoa(t.common.Roomid) + "_" + time.Now().Format("2006_01_02_15-04-05-000") + `/`
 
        // 显示保存位置
        if rel, err := filepath.Rel(t.config.save_path, save_path); err == nil {
@@ -319,7 +326,7 @@ func (t *M4SStream) saveStream(tmpc *c.Common) {
        }
 
        // 获取流
-       if strings.Contains(tmpc.Live[0], `m3u8`) {
+       if strings.Contains(t.common.Live[0], `m3u8`) {
                t.stream_expires = time.Now().Add(time.Minute * 2).Unix() // 流链接过期时间
 
                // 同时下载数限制
@@ -360,7 +367,7 @@ func (t *M4SStream) saveStream(tmpc *c.Common) {
                                                ConnectTimeout: 2000,
                                                ReadTimeout:    1000,
                                                Timeout:        2000,
-                                               Proxy:          tmpc.Proxy,
+                                               Proxy:          t.common.Proxy,
                                        }); e != nil && !errors.Is(e, io.EOF) {
                                                link.status = 3 // 设置切片状态为下载失败
                                        } else {
@@ -405,11 +412,11 @@ func (t *M4SStream) saveStream(tmpc *c.Common) {
 
                        // 刷新流地址
                        if time.Now().Unix()+60 > t.stream_expires {
-                               t.fetchCheckStream(tmpc)
+                               t.fetchCheckStream()
                        }
 
                        // 获取解析m3u8
-                       var m4s_links, m3u8_addon = t.fetchParseM3U8(tmpc)
+                       var m4s_links, m3u8_addon = t.fetchParseM3U8()
                        if len(m4s_links) == 0 {
                                time.Sleep(time.Second)
                                continue
@@ -457,25 +464,24 @@ func (t *M4SStream) Start() {
        // 初始化切片消息
        t.Newst_m4s = msgq.New(10)
 
-       var tmpc = c.C
-
        // 主循环
        for t.Status.Islive() {
                // 是否在直播
-               F.Get(&tmpc).Get(`Liveing`)
-               if !tmpc.Liveing {
+               t.log.L(`I: `, t.common.Roomid)
+               F.Get(&t.common).Get(`Liveing`)
+               if !t.common.Liveing {
                        t.log.L(`T: `, `未直播`)
                        break
                }
 
                // 获取 and 检查流地址状态
-               if !t.fetchCheckStream(&tmpc) {
+               if !t.fetchCheckStream() {
                        time.Sleep(time.Second * 5)
                        continue
                }
 
                // 设置均衡负载
-               for _, v := range tmpc.Live {
+               for _, v := range t.common.Live {
                        if url_struct, e := url.Parse(v); e == nil {
                                t.stream_hosts.Store(url_struct.Hostname(), nil)
                        }
@@ -485,15 +491,16 @@ func (t *M4SStream) Start() {
                }
 
                // 保存流
-               t.saveStream(&tmpc)
+               t.saveStream()
        }
 
-       t.log.L(`T: `, `结束`)
+       t.log.L(`T: `, `结束`+strconv.Itoa(t.common.Roomid))
        t.exitSign.Done()
 }
 
 func (t *M4SStream) Stop() {
        t.exitSign = signal.Init()
        t.Status.Done()
+       t.log.L(`I: `, `正在等待切片下载...`)
        t.exitSign.Wait()
 }
index 0e172b2c1c882378a6229e3ee878acf7cc1cbd40..5d4f1035ac878da73babb1501f4db5b6fb92ca78 100644 (file)
@@ -11,6 +11,7 @@ import (
 
        c "github.com/qydysky/bili_danmu/CV"
        F "github.com/qydysky/bili_danmu/F"
+       Cmd "github.com/qydysky/bili_danmu/F/Cmd"
        reply "github.com/qydysky/bili_danmu/Reply"
        send "github.com/qydysky/bili_danmu/Send"
 
@@ -80,7 +81,7 @@ func Demo(roomid ...int) {
                        }
                }
                //命令行操作 切换房间 发送弹幕
-               go F.Cmd()
+               go Cmd.Cmd()
 
                //使用带tag的消息队列在功能间传递消息
                c.C.Danmu_Main_mq.Pull_tag(msgq.FuncMap{
@@ -270,7 +271,7 @@ func Demo(roomid ...int) {
 
                                                { //附加功能 进房间发送弹幕 直播流保存 营收
                                                        go reply.Entry_danmu()
-                                                       c.C.Danmu_Main_mq.Push_tag(`savestream`, nil)
+                                                       c.C.Danmu_Main_mq.Push_tag(`savestream`, c.C.Roomid)
                                                        go reply.ShowRevf()
                                                        //小心心
                                                        go F.F_x25Kn()
@@ -310,7 +311,7 @@ func Demo(roomid ...int) {
                                }
                        }
                        { //附加功能 直播流停止
-                               reply.StreamOStop()
+                               reply.StreamOStop(-1)
                                reply.Save_to_json(-1, []interface{}{`{}]`})
                        }
                        p.Sys().Timeoutf(1)
index 32c03aca56f949988320e9cce4921e659f306190..c2bcbb3c682c3e85d2d63e6eb49a72415de462ae 100644 (file)
@@ -61,6 +61,7 @@
     "直播hls流缓冲": 20,
     "直播hls流均衡-help":"true:使用所有hls服务器",
     "直播hls流均衡": true,
+    "直播流当前房间开播时停止其他流": true,
     "直播Web服务口":0,
     "ass-help": "只有保存直播流时才考虑生成ass,ass编码默认GB18030(可选utf-8)",
     "生成Ass弹幕": true,