From: qydysky Date: Wed, 20 Apr 2022 10:55:19 +0000 (+0800) Subject: 支持多房间保存直播流 X-Git-Tag: v0.5.10~41^2~78 X-Git-Url: http://127.0.0.1:8081/?a=commitdiff_plain;h=172c7cfa5b7a380d794d0960db1d8a8f1acc0081;p=bili_danmu%2F.git 支持多房间保存直播流 --- diff --git a/.gitignore b/.gitignore index 774147d..9882683 100644 --- a/.gitignore +++ b/.gitignore @@ -19,3 +19,4 @@ demo/tts.mp3 demo/cpu.pprof demo/qr.png demo/live +demo/main.exe diff --git a/F/cmd.go b/F/cmd/cmd.go similarity index 80% rename from F/cmd.go rename to F/cmd/cmd.go index d5949f8..04f3194 100644 --- a/F/cmd.go +++ b/F/cmd/cmd.go @@ -1,4 +1,4 @@ -package F +package Cmd import ( "bufio" @@ -9,6 +9,8 @@ import ( "time" c "github.com/qydysky/bili_danmu/CV" + F "github.com/qydysky/bili_danmu/F" + reply "github.com/qydysky/bili_danmu/Reply" send "github.com/qydysky/bili_danmu/Send" ) @@ -52,12 +54,16 @@ func Cmd() { cmdlog.L(`W: `, "不支持功能键") } else if inputs[0] == 32 { // 开头 //录制切换 - if strings.Contains(inputs, ` rec`) && c.C.Roomid != 0 { - if !c.C.Liveing { - cmdlog.L(`W: `, "不能切换录制状态,未在直播") - continue + if strings.Contains(inputs, ` rec`) { + if len(inputs) > 4 { + if room, err := strconv.Atoi(inputs[4:]); err == nil { + c.C.Danmu_Main_mq.Push_tag(`savestream`, room) + continue + } + cmdlog.L(`W: `, "输入错误", inputs) + } else { + c.C.Danmu_Main_mq.Push_tag(`savestream`, c.C.Roomid) } - c.C.Danmu_Main_mq.Push_tag(`savestream`, nil) continue } //直播间切换 @@ -76,9 +82,9 @@ func Cmd() { cmdlog.L(`W: `, "输入错误", inputs) continue } - for k, v := range Feed_list() { + for k, v := range F.Feed_list() { liveList[` live`+strconv.Itoa(k)] = v.Roomid - fmt.Printf("%d\t%s\n\t\t\t%s\n", k, v.Uname, v.Title) + fmt.Printf("%d\t%s(%d)\n\t\t\t%s\n", k, v.Uname, v.Roomid, v.Title) } fmt.Println("回复' live(序号)'进入直播间") fmt.Print("\n") @@ -91,7 +97,7 @@ func Cmd() { continue } //获取cookie - Get(&c.C).Get(`Cookie`) + F.Get(&c.C).Get(`Cookie`) continue } @@ -102,7 +108,7 @@ func Cmd() { continue } //获取小心心 - go F_x25Kn() + go F.F_x25Kn() continue } @@ -114,7 +120,7 @@ func Cmd() { } fmt.Print("\n") - for k, v := range SearchUP(inputs[7:]) { + for k, v := range F.SearchUP(inputs[7:]) { liveList[` live`+strconv.Itoa(k)] = v.Roomid if v.Is_live { fmt.Printf("%d\t%s\t%s\n", k, `☁`, v.Uname) @@ -135,7 +141,7 @@ func Cmd() { //当前直播间信息 if strings.Contains(inputs, ` room`) && c.C.Roomid != 0 { fmt.Print("\n") - fmt.Println("当前直播间信息") + fmt.Println("当前直播间(" + strconv.Itoa(c.C.Roomid) + ")信息") { living := `未在直播` if c.C.Liveing { @@ -162,6 +168,21 @@ func Cmd() { if c.C.Stream_url != "" { fmt.Println(`直播Web服务:`, c.C.Stream_url) } + if reply.StreamOStatus(c.C.Roomid) { + fmt.Println(`正在录制当前房间`) + } else { + fmt.Println(`未在录制当前房间`) + } + + var array = reply.StreamOCommon(-1) + if len(array) > 1 { + fmt.Println(`正在录制的其他房间:`) + for _, v := range array { + fmt.Println("\t" + v.Uname + "(" + strconv.Itoa(v.Roomid) + ") " + v.Title) + } + } + fmt.Println("输入` rec` 来启停当前房间录制, 输入` rec房间号` 来启停其他录制") + fmt.Print("\n") continue diff --git a/Reply/F.go b/Reply/F.go index 77ef155..7416720 100644 --- a/Reply/F.go +++ b/Reply/F.go @@ -186,13 +186,13 @@ func init() { } //设定字幕文件名,为""时停止输出 -func Ass_f(file string, st time.Time) { +func Ass_f(save_path string, file string, st time.Time) { ass.file = file if file == "" { return } - if rel, err := filepath.Rel(streamO.config.save_path, ass.file); err == nil { + if rel, err := filepath.Rel(save_path, ass.file); err == nil { c.C.Log.Base(`Ass`).L(`I: `, "保存到", rel+".ass") } else { c.C.Log.Base(`Ass`).L(`I: `, "保存到", ass.file+".ass") @@ -247,26 +247,69 @@ func dtos(t time.Duration) string { //hls //https://datatracker.ietf.org/doc/html/draft-pantos-http-live-streaming -var streamO = new(M4SStream) +var streamO psync.Map + +func StreamOCommon(roomid int) (array []c.Common) { + if roomid != -1 { //返回特定房间 + if v, ok := streamO.Load(roomid); ok { + return []c.Common{v.(*M4SStream).Common()} + } + } else { //返回所有 + streamO.Range(func(k, v interface{}) bool { + array = append(array, v.(*M4SStream).Common()) + return true + }) + } + return +} func init() { //使用带tag的消息队列在功能间传递消息 c.C.Danmu_Main_mq.Pull_tag(msgq.FuncMap{ `savestream`: func(data interface{}) bool { - if streamO.Status.Islive() { - streamO.Stop() + if roomid, ok := data.(int); ok { + if v, ok := streamO.Load(roomid); ok { + if v.(*M4SStream).Status.Islive() { + v.(*M4SStream).Stop() + streamO.Delete(roomid) + } + } else { + var ( + tmp = new(M4SStream) + common = c.C + ) + common.Roomid = roomid + tmp.LoadConfig(common, c.C.Log) + streamO.Store(roomid, tmp) + go tmp.Start() + } } else { - streamO.LoadConfig(&c.C.K_v, c.C.Log) - go streamO.Start() + flog.L(`E: `, `savestream必须为数字房间号`) } return false }, }) } -func StreamOStop() { - if streamO.Status.Islive() { - streamO.Stop() +func StreamOStatus(roomid int) bool { + v, ok := streamO.Load(roomid) + return ok && (v.(*M4SStream).Status.Islive() || v.(*M4SStream).exitSign.Islive()) +} + +func StreamOStop(roomid int) { + if roomid != -1 { // 针对某房间 + if v, ok := streamO.Load(roomid); ok { + if v.(*M4SStream).Status.Islive() { + v.(*M4SStream).Stop() + } + } + } else { //所有房间 + streamO.Range(func(_, v interface{}) bool { + if v.(*M4SStream).Status.Islive() { + v.(*M4SStream).Stop() + } + return true + }) } } @@ -967,7 +1010,13 @@ func init() { w.Header().Set("Connection", "keep-alive") w.Header().Set("Content-Transfer-Encoding", "binary") - if len(streamO.getFirstM4S()) == 0 { + // 获取当前房间的 + var currentStreamO *M4SStream + if v, ok := streamO.Load(c.C.Roomid); ok { + currentStreamO = v.(*M4SStream) + } + + if len(currentStreamO.getFirstM4S()) == 0 { w.Header().Set("Retry-After", "1") w.WriteHeader(http.StatusServiceUnavailable) return @@ -983,7 +1032,7 @@ func init() { } //写入hls头 - if _, err := w.Write(streamO.getFirstM4S()); err != nil { + if _, err := w.Write(currentStreamO.getFirstM4S()); err != nil { return } else if flushSupport { flusher.Flush() @@ -992,7 +1041,7 @@ func init() { cancel := make(chan struct{}) //hls切片 - streamO.Newst_m4s.Pull_tag(map[string]func(interface{}) bool{ + currentStreamO.Newst_m4s.Pull_tag(map[string]func(interface{}) bool{ `m4s`: func(data interface{}) bool { if b, ok := data.([]byte); ok { if _, err := w.Write(b); err != nil { diff --git a/Reply/Reply.go b/Reply/Reply.go index 7281c94..e04c95b 100644 --- a/Reply/Reply.go +++ b/Reply/Reply.go @@ -593,11 +593,13 @@ func (replyF) preparing(s string) { { //附加功能 obs结束 `savestream`结束 Obs_R(false) Obsf(false) - streamO.Stop() go ShowRevf() c.C.Liveing = false } if p.Sys().Type(roomid) == "float64" { + // 停止此房间录制 + StreamOStop(int(roomid.(float64))) + Gui_show(Itos([]interface{}{"房间", roomid, "下播了"}), "0room") msglog.L(`I: `, "房间", int(roomid.(float64)), "下播了") return @@ -618,7 +620,6 @@ func (replyF) live(s string) { { //附加功能 obs录播 Obsf(true) Obs_R(true) - go streamO.Start() } { c.C.Rev = 0.0 //营收 @@ -626,6 +627,14 @@ func (replyF) live(s string) { c.C.Live_Start_Time = time.Now() //开播h时间 } if p.Sys().Type(roomid) == "float64" { + //开始录制 + go func() { + if v, ok := c.C.K_v.LoadV(`直播流当前房间开播时停止其他流`).(bool); ok && v { + StreamOStop(-1) //停止其他房间录制 + } + c.C.Danmu_Main_mq.Push_tag(`savestream`, roomid) + }() + Gui_show(Itos([]interface{}{"房间", roomid, "开播了"}), "0room") msglog.L(`I: `, "房间", int(roomid.(float64)), "开播了") return diff --git a/Reply/stream.go b/Reply/stream.go index 2950a74..147cfd4 100644 --- a/Reply/stream.go +++ b/Reply/stream.go @@ -25,16 +25,17 @@ import ( ) type M4SStream struct { - Status *signal.Signal //IsLive()是否运行中 - exitSign *signal.Signal //IsLive()是否等待退出中 - log *log.Log_interface - config M4SStream_Config //配置 - stream_last_modified time.Time //流地址更新时间 - stream_expires int64 //流到期时间 - last_m4s *m4s_link_item //最后一个切片 - stream_hosts sync.Map //使用的流服务器 - Newst_m4s *msgq.Msgq //m4s消息 tag:m4s - first_m4s []byte //m4s起始块 + Status *signal.Signal //IsLive()是否运行中 + exitSign *signal.Signal //IsLive()是否等待退出中 + log *log.Log_interface //日志 + config M4SStream_Config //配置 + stream_last_modified time.Time //流地址更新时间 + stream_expires int64 //流到期时间 + last_m4s *m4s_link_item //最后一个切片 + stream_hosts sync.Map //使用的流服务器 + Newst_m4s *msgq.Msgq //m4s消息 tag:m4s + first_m4s []byte //m4s起始块 + common c.Common //通用配置副本 } type M4SStream_Config struct { @@ -64,25 +65,31 @@ func (t *m4s_link_item) getNo() (int, error) { return strconv.Atoi(base[:len(base)-4]) } -func (t *M4SStream) LoadConfig(kv *sync.Map, l *log.Log_interface) { +func (t *M4SStream) Common() c.Common { + return t.common +} + +func (t *M4SStream) LoadConfig(common c.Common, l *log.Log_interface) { //读取配置 - if path, ok := kv.LoadV("直播流保存位置").(string); ok { + if path, ok := common.K_v.LoadV("直播流保存位置").(string); ok { if path, err := filepath.Abs(path); err == nil { t.config.save_path = path + "/" } } - if v, ok := kv.LoadV(`直播hls流缓冲`).(float64); ok && v > 0 { + if v, ok := common.K_v.LoadV(`直播hls流缓冲`).(float64); ok && v > 0 { t.config.bufsize = int(v) } - if v, ok := kv.LoadV(`直播hls流均衡`).(bool); ok { + if v, ok := common.K_v.LoadV(`直播hls流均衡`).(bool); ok { t.config.banlance_host = v } - if v, ok := kv.LoadV(`直播流清晰度`).(float64); ok { + if v, ok := common.K_v.LoadV(`直播流清晰度`).(float64); ok { t.config.want_qn = int(v) } - if v, ok := kv.LoadV(`直播流类型`).(string); ok { + if v, ok := common.K_v.LoadV(`直播流类型`).(string); ok { t.config.want_type = v } + + t.common = common t.log = l.Base(`直播流保存`) } @@ -90,15 +97,15 @@ func (t *M4SStream) getFirstM4S() []byte { return t.first_m4s } -func (t *M4SStream) fetchCheckStream(tmpc *c.Common) bool { +func (t *M4SStream) fetchCheckStream() bool { // 获取流地址 - tmpc.Live_want_qn = t.config.want_qn - if F.Get(tmpc).Get(`Live`); len(tmpc.Live) == 0 { + t.common.Live_want_qn = t.config.want_qn + if F.Get(&t.common).Get(`Live`); len(t.common.Live) == 0 { return false } // 保存流地址过期时间 - if m3u8_url, err := url.Parse(tmpc.Live[0]); err != nil { + if m3u8_url, err := url.Parse(t.common.Live[0]); err != nil { t.log.L(`E: `, err.Error()) return false } else { @@ -108,17 +115,17 @@ func (t *M4SStream) fetchCheckStream(tmpc *c.Common) bool { // 检查是否可以获取 CookieM := make(map[string]string) - tmpc.Cookie.Range(func(k, v interface{}) bool { + t.common.Cookie.Range(func(k, v interface{}) bool { CookieM[k.(string)] = v.(string) return true }) var req = reqf.New() if e := req.Reqf(reqf.Rval{ - Url: tmpc.Live[0], + Url: t.common.Live[0], Retry: 10, SleepTime: 1000, - Proxy: tmpc.Proxy, + Proxy: t.common.Proxy, Header: map[string]string{ `Cookie`: reqf.Map_2_Cookies_String(CookieM), }, @@ -138,9 +145,9 @@ func (t *M4SStream) fetchCheckStream(tmpc *c.Common) bool { return true } -func (t *M4SStream) fetchParseM3U8(tmpc *c.Common) (m4s_links []*m4s_link_item, m3u8_addon []byte) { +func (t *M4SStream) fetchParseM3U8() (m4s_links []*m4s_link_item, m3u8_addon []byte) { // 请求解析m3u8内容 - for _, v := range tmpc.Live { + for _, v := range t.common.Live { m3u8_url, err := url.Parse(v) if err != nil { t.log.L(`E: `, err.Error()) @@ -307,9 +314,9 @@ func (t *M4SStream) fetchParseM3U8(tmpc *c.Common) (m4s_links []*m4s_link_item, return } -func (t *M4SStream) saveStream(tmpc *c.Common) { +func (t *M4SStream) saveStream() { // 设置保存路径 - var save_path = t.config.save_path + strconv.Itoa(tmpc.Roomid) + "_" + time.Now().Format("2006_01_02_15-04-05-000") + `/` + var save_path = t.config.save_path + strconv.Itoa(t.common.Roomid) + "_" + time.Now().Format("2006_01_02_15-04-05-000") + `/` // 显示保存位置 if rel, err := filepath.Rel(t.config.save_path, save_path); err == nil { @@ -319,7 +326,7 @@ func (t *M4SStream) saveStream(tmpc *c.Common) { } // 获取流 - if strings.Contains(tmpc.Live[0], `m3u8`) { + if strings.Contains(t.common.Live[0], `m3u8`) { t.stream_expires = time.Now().Add(time.Minute * 2).Unix() // 流链接过期时间 // 同时下载数限制 @@ -360,7 +367,7 @@ func (t *M4SStream) saveStream(tmpc *c.Common) { ConnectTimeout: 2000, ReadTimeout: 1000, Timeout: 2000, - Proxy: tmpc.Proxy, + Proxy: t.common.Proxy, }); e != nil && !errors.Is(e, io.EOF) { link.status = 3 // 设置切片状态为下载失败 } else { @@ -405,11 +412,11 @@ func (t *M4SStream) saveStream(tmpc *c.Common) { // 刷新流地址 if time.Now().Unix()+60 > t.stream_expires { - t.fetchCheckStream(tmpc) + t.fetchCheckStream() } // 获取解析m3u8 - var m4s_links, m3u8_addon = t.fetchParseM3U8(tmpc) + var m4s_links, m3u8_addon = t.fetchParseM3U8() if len(m4s_links) == 0 { time.Sleep(time.Second) continue @@ -457,25 +464,24 @@ func (t *M4SStream) Start() { // 初始化切片消息 t.Newst_m4s = msgq.New(10) - var tmpc = c.C - // 主循环 for t.Status.Islive() { // 是否在直播 - F.Get(&tmpc).Get(`Liveing`) - if !tmpc.Liveing { + t.log.L(`I: `, t.common.Roomid) + F.Get(&t.common).Get(`Liveing`) + if !t.common.Liveing { t.log.L(`T: `, `未直播`) break } // 获取 and 检查流地址状态 - if !t.fetchCheckStream(&tmpc) { + if !t.fetchCheckStream() { time.Sleep(time.Second * 5) continue } // 设置均衡负载 - for _, v := range tmpc.Live { + for _, v := range t.common.Live { if url_struct, e := url.Parse(v); e == nil { t.stream_hosts.Store(url_struct.Hostname(), nil) } @@ -485,15 +491,16 @@ func (t *M4SStream) Start() { } // 保存流 - t.saveStream(&tmpc) + t.saveStream() } - t.log.L(`T: `, `结束`) + t.log.L(`T: `, `结束`+strconv.Itoa(t.common.Roomid)) t.exitSign.Done() } func (t *M4SStream) Stop() { t.exitSign = signal.Init() t.Status.Done() + t.log.L(`I: `, `正在等待切片下载...`) t.exitSign.Wait() } diff --git a/bili_danmu.go b/bili_danmu.go index 0e172b2..5d4f103 100644 --- a/bili_danmu.go +++ b/bili_danmu.go @@ -11,6 +11,7 @@ import ( c "github.com/qydysky/bili_danmu/CV" F "github.com/qydysky/bili_danmu/F" + Cmd "github.com/qydysky/bili_danmu/F/Cmd" reply "github.com/qydysky/bili_danmu/Reply" send "github.com/qydysky/bili_danmu/Send" @@ -80,7 +81,7 @@ func Demo(roomid ...int) { } } //命令行操作 切换房间 发送弹幕 - go F.Cmd() + go Cmd.Cmd() //使用带tag的消息队列在功能间传递消息 c.C.Danmu_Main_mq.Pull_tag(msgq.FuncMap{ @@ -270,7 +271,7 @@ func Demo(roomid ...int) { { //附加功能 进房间发送弹幕 直播流保存 营收 go reply.Entry_danmu() - c.C.Danmu_Main_mq.Push_tag(`savestream`, nil) + c.C.Danmu_Main_mq.Push_tag(`savestream`, c.C.Roomid) go reply.ShowRevf() //小心心 go F.F_x25Kn() @@ -310,7 +311,7 @@ func Demo(roomid ...int) { } } { //附加功能 直播流停止 - reply.StreamOStop() + reply.StreamOStop(-1) reply.Save_to_json(-1, []interface{}{`{}]`}) } p.Sys().Timeoutf(1) diff --git a/demo/config/config_K_v.json b/demo/config/config_K_v.json index 32c03ac..c2bcbb3 100644 --- a/demo/config/config_K_v.json +++ b/demo/config/config_K_v.json @@ -61,6 +61,7 @@ "直播hls流缓冲": 20, "直播hls流均衡-help":"true:使用所有hls服务器", "直播hls流均衡": true, + "直播流当前房间开播时停止其他流": true, "直播Web服务口":0, "ass-help": "只有保存直播流时才考虑生成ass,ass编码默认GB18030(可选utf-8)", "生成Ass弹幕": true,