From dea8c4197f2650ceab0653c2fad9a71e29c2eef4 Mon Sep 17 00:00:00 2001 From: qydysky Date: Thu, 12 May 2022 20:39:03 +0800 Subject: [PATCH] =?utf8?q?fix=20=E5=BC=80=E6=92=AD=E5=BD=95=E5=88=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit --- Reply/F.go | 15 +- Reply/Reply.go | 41 ++--- Reply/stream.go | 366 +++++++++++++++++++++++-------------------- Reply/ws_msg/LIVE.go | 16 ++ 4 files changed, 242 insertions(+), 196 deletions(-) create mode 100644 Reply/ws_msg/LIVE.go diff --git a/Reply/F.go b/Reply/F.go index 636c718..5b455bc 100644 --- a/Reply/F.go +++ b/Reply/F.go @@ -291,10 +291,12 @@ func init() { Ass_f(ms.Current_save_path, ms.Current_save_path+"0", time.Now()) //开始ass } tmp.Callback_stop = func(ms *M4SStream) { + streamO.Delete(ms.common.Roomid) Ass_f("", "", time.Now()) //停止ass } - streamO.Store(item.Roomid, tmp) - go tmp.Start() + if tmp.Start() { + streamO.Store(item.Roomid, tmp) + } } else if !item.IsRec && ok { if v.(*M4SStream).Status.Islive() { v.(*M4SStream).Stop() @@ -1007,6 +1009,15 @@ func AutoSend_silver_gift() { //直播Web服务口 var StreamWs = websocket.New_server() +func SendStreamWs(msg string) { + msg = strings.ReplaceAll(msg, "\n", "") + msg = strings.ReplaceAll(msg, "\\", "\\\\") + StreamWs.Interface().Push_tag(`send`, websocket.Uinterface{ + Id: 0, + Data: []byte(`{"text":"` + msg + `"}`), + }) +} + func init() { flog := flog.Base_add(`直播Web服务`) if port_f, ok := c.C.K_v.LoadV(`直播Web服务口`).(float64); ok && port_f >= 0 { diff --git a/Reply/Reply.go b/Reply/Reply.go index 4d92a14..a1fe28d 100644 --- a/Reply/Reply.go +++ b/Reply/Reply.go @@ -16,7 +16,6 @@ import ( p "github.com/qydysky/part" mq "github.com/qydysky/part/msgq" pstrings "github.com/qydysky/part/strings" - pwebsocket "github.com/qydysky/part/websocket" ) var reply_log = c.C.Log.Base(`Reply`) @@ -702,8 +701,9 @@ func (replyF) preparing(s string) { func (replyF) live(s string) { msglog := msglog.Base_add("房") - if roomid := p.Json().GetValFromS(s, "roomid"); roomid == nil { - msglog.L(`E: `, "roomid", roomid) + var type_item ws_msg.LIVE + if err := json.Unmarshal([]byte(s), &type_item); err != nil { + msglog.L(`E: `, err) return } else { { //附加功能 obs录播 @@ -715,25 +715,20 @@ func (replyF) live(s string) { c.C.Liveing = true //直播i标志 c.C.Live_Start_Time = time.Now() //开播h时间 } - if p.Sys().Type(roomid) == "float64" { - //开始录制 - go func() { - if v, ok := c.C.K_v.LoadV(`仅保存当前直播间流`).(bool); ok && v { - StreamOStop(-1) //停止其他房间录制 - } - }() + //开始录制 + go func() { + if v, ok := c.C.K_v.LoadV(`仅保存当前直播间流`).(bool); ok && v { + StreamOStop(-1) //停止其他房间录制 + } + }() - c.C.Danmu_Main_mq.Push_tag(`savestream`, SavestreamO{ - Roomid: int(roomid.(float64)), - IsRec: true, - }) + c.C.Danmu_Main_mq.Push_tag(`savestream`, SavestreamO{ + Roomid: type_item.Roomid, + IsRec: true, + }) - Gui_show(Itos([]interface{}{"房间", roomid, "开播了"}), "0room") - msglog.L(`I: `, "房间", int(roomid.(float64)), "开播了") - return - } - Gui_show(Itos([]interface{}{"房间", roomid, "开播了"}), "0room") - msglog.L(`I: `, "房间", roomid, "开播了") + Gui_show(Itos([]interface{}{"房间", type_item.Roomid, "开播了"}), "0room") + msglog.L(`I: `, "房间", type_item.Roomid, "开播了") } } @@ -1251,10 +1246,8 @@ func Gui_show(m ...string) { if len(m) > 1 { uid = m[1] } - StreamWs.Interface().Push_tag(`send`, pwebsocket.Uinterface{ - Id: 0, - Data: []byte(`{"text":"` + strings.ReplaceAll(m[0], "\n", "") + `"}`), - }) + //直播流服务弹幕 + SendStreamWs(m[0]) Danmu_mq.Push_tag(`danmu`, Danmu_mq_t{ uid: uid, msg: m[0], diff --git a/Reply/stream.go b/Reply/stream.go index 66beb1e..19fbf07 100644 --- a/Reply/stream.go +++ b/Reply/stream.go @@ -35,6 +35,7 @@ type M4SStream struct { // stream_expires int64 //流到期时间 last_m4s *m4s_link_item //最后一个切片 stream_hosts sync.Map //使用的流服务器 + stream_type string //流类型 Newst_m4s *msgq.Msgq //m4s消息 tag:m4s first_m4s []byte //m4s起始块 common c.Common //通用配置副本 @@ -127,7 +128,14 @@ func (t *M4SStream) fetchCheckStream() bool { return false } - // 保存流地址过期时间 + // 保存流类型 + if strings.Contains(t.common.Live[0], `m3u8`) { + t.stream_type = "m3u8" + } else if strings.Contains(t.common.Live[0], `flv`) { + t.stream_type = "flv" + } + + // // 保存流地址过期时间 // if m3u8_url, err := url.Parse(t.common.Live[0]); err != nil { // t.log.L(`E: `, err.Error()) // return false @@ -377,232 +385,250 @@ func (t *M4SStream) fetchParseM3U8() (m4s_links []*m4s_link_item, m3u8_addon []b func (t *M4SStream) saveStream() { // 设置保存路径 t.Current_save_path = t.config.save_path + "/" + strconv.Itoa(t.common.Roomid) + "_" + time.Now().Format("2006_01_02_15-04-05-000") + `/` - var save_path = t.Current_save_path // 清除初始值 t.last_m4s = nil // 显示保存位置 - if rel, err := filepath.Rel(t.config.save_path, save_path); err == nil { - t.log.L(`I: `, "保存到", rel+`/0.m3u8`) + if rel, err := filepath.Rel(t.config.save_path, t.Current_save_path); err == nil { + t.log.L(`I: `, "保存到", rel+`/0.`+t.stream_type) } else { t.log.L(`W: `, err) } + t.log.L(`I: `, "流地址:", t.common.Stream_url) //开始,结束回调 t.Callback_start(t) defer t.Callback_stop(t) // 获取流 - if strings.Contains(t.common.Live[0], `m3u8`) { - // 同时下载数限制 - var download_limit = &funcCtrl.BlockFuncN{ - Max: 3, - } + switch t.stream_type { + case `m3u8`: + t.saveStreamM4s() + case `flv`: + t.saveStreamFlv() + default: + t.log.L(`E: `, `undefind stream type`) + } +} - // 下载循环 - for download_seq := []*m4s_link_item{}; ; { +func (t *M4SStream) saveStreamFlv() { + t.log.L(`E: `, `not support yet`) + t.Status.Done() +} - // 存在待下载切片 - if len(download_seq) != 0 { - // 设置限制计划 - download_limit.Plan(int64(len(download_seq))) - - // 下载切片 - for _, v := range download_seq { - go func(link *m4s_link_item, path string) { - defer download_limit.UnBlock() - download_limit.Block() - - // 已下载但还未移除的切片 - if link.status == 2 { - return - } +func (t *M4SStream) saveStreamM4s() { + // 同时下载数限制 + var download_limit = &funcCtrl.BlockFuncN{ + Max: 3, + } - link.status = 1 // 设置切片状态为正在下载 - - // 均衡负载 - if link_url, e := url.Parse(link.Url); e == nil { - if t.stream_hosts.Len() != 1 { - t.stream_hosts.Range(func(key, value interface{}) bool { - // 故障转移 - if link.status == 3 && link_url.Host == key.(string) { - return true - } - // 随机 - link_url.Host = key.(string) - return false - }) - } - link.Url = link_url.String() - } + // 下载循环 + for download_seq := []*m4s_link_item{}; ; { - req := t.reqPool.Get() - defer t.reqPool.Put(req) - r := req.Item.(*reqf.Req) - if e := r.Reqf(reqf.Rval{ - Url: link.Url, - SaveToPath: path + link.Base, - ConnectTimeout: 2000, - ReadTimeout: 1000, - Timeout: 2000, - Proxy: t.common.Proxy, - Header: map[string]string{ - `Connection`: `close`, - }, - }); e != nil && !errors.Is(e, io.EOF) { - if !reqf.IsTimeout(e) { - t.log.L(`E: `, `hls切片下载失败:`, e) - } - link.status = 3 // 设置切片状态为下载失败 - } else { - if usedt := r.UsedTime.Seconds(); usedt > 700 { - t.log.L(`I: `, `hls切片下载慢`, usedt, `ms`) - } - link.data = r.Respon - link.status = 2 // 设置切片状态为下载完成 - } - }(v, save_path) - } + // 存在待下载切片 + if len(download_seq) != 0 { + // 设置限制计划 + download_limit.Plan(int64(len(download_seq))) - // 等待队列下载完成 - download_limit.PlanDone() - } + // 下载切片 + for _, v := range download_seq { + go func(link *m4s_link_item, path string) { + defer download_limit.UnBlock() + download_limit.Block() - // 传递已下载切片 - { - for _, v := range download_seq { - if strings.Contains(v.Base, `h`) { - t.first_m4s = v.data + // 已下载但还未移除的切片 + if link.status == 2 { + return } - if v.status == 2 { - download_seq = download_seq[1:] - t.Newst_m4s.Push_tag(`m4s`, v.data) - } else { - break + link.status = 1 // 设置切片状态为正在下载 + + // 均衡负载 + if link_url, e := url.Parse(link.Url); e == nil { + if t.stream_hosts.Len() != 1 { + t.stream_hosts.Range(func(key, value interface{}) bool { + // 故障转移 + if link.status == 3 && link_url.Host == key.(string) { + return true + } + // 随机 + link_url.Host = key.(string) + return false + }) + } + link.Url = link_url.String() } - } - } - // 停止录制 - if !t.Status.Islive() { - if len(download_seq) != 0 { - if time.Now().Unix() > t.stream_last_modified.Unix()+300 { - t.log.L(`E: `, `切片下载超时`) + req := t.reqPool.Get() + defer t.reqPool.Put(req) + r := req.Item.(*reqf.Req) + if e := r.Reqf(reqf.Rval{ + Url: link.Url, + SaveToPath: path + link.Base, + ConnectTimeout: 2000, + ReadTimeout: 1000, + Timeout: 2000, + Proxy: t.common.Proxy, + Header: map[string]string{ + `Connection`: `close`, + }, + }); e != nil && !errors.Is(e, io.EOF) { + if !reqf.IsTimeout(e) { + t.log.L(`E: `, `hls切片下载失败:`, e) + } + link.status = 3 // 设置切片状态为下载失败 } else { - t.log.L(`I: `, `下载最后切片:`, len(download_seq)) - continue + if usedt := r.UsedTime.Seconds(); usedt > 700 { + t.log.L(`I: `, `hls切片下载慢`, usedt, `ms`) + } + link.data = r.Respon + link.status = 2 // 设置切片状态为下载完成 } - } - break + }(v, t.Current_save_path) } - // 刷新流地址 - // 偶尔刷新后的切片编号与原来不连续,故不再提前检查,直到流获取失败再刷新 - // if time.Now().Unix()+60 > t.stream_expires { - // t.stream_expires = time.Now().Add(time.Minute * 2).Unix() // 临时的流链接过期时间 - // go func() { - // if t.fetchCheckStream() { - // t.last_m4s = nil - // } - // }() - // } - - // 获取解析m3u8 - var m4s_links, m3u8_addon, err = t.fetchParseM3U8() - if err != nil { - t.log.L(`E: `, `获取解析m3u8发生错误`, err) - if len(download_seq) != 0 { - continue + // 等待队列下载完成 + download_limit.PlanDone() + } + + // 传递已下载切片 + { + for _, v := range download_seq { + if strings.Contains(v.Base, `h`) { + t.first_m4s = v.data } - if !reqf.IsTimeout(err) { + + if v.status == 2 { + download_seq = download_seq[1:] + t.Newst_m4s.Push_tag(`m4s`, v.data) + } else { break } } - if len(m4s_links) == 0 { - time.Sleep(time.Second) - continue - } + } - // 添加新切片到下载队列 - download_seq = append(download_seq, m4s_links...) + // 停止录制 + if !t.Status.Islive() { + if len(download_seq) != 0 { + if time.Now().Unix() > t.stream_last_modified.Unix()+300 { + t.log.L(`E: `, `切片下载超时`) + } else { + t.log.L(`I: `, `下载最后切片:`, len(download_seq)) + continue + } + } + break + } - // 添加m3u8字节 - p.File().FileWR(p.Filel{ - File: save_path + "0.m3u8.dtmp", - Loc: -1, - Context: []interface{}{m3u8_addon}, - }) + // 刷新流地址 + // 偶尔刷新后的切片编号与原来不连续,故不再提前检查,直到流获取失败再刷新 + // if time.Now().Unix()+60 > t.stream_expires { + // t.stream_expires = time.Now().Add(time.Minute * 2).Unix() // 临时的流链接过期时间 + // go func() { + // if t.fetchCheckStream() { + // t.last_m4s = nil + // } + // }() + // } + + // 获取解析m3u8 + var m4s_links, m3u8_addon, err = t.fetchParseM3U8() + if err != nil { + t.log.L(`E: `, `获取解析m3u8发生错误`, err) + if len(download_seq) != 0 { + continue + } + if !reqf.IsTimeout(err) { + break + } + } + if len(m4s_links) == 0 { + time.Sleep(time.Second) + continue } - // 发送空字节会导致流服务终止 - t.Newst_m4s.Push_tag(`m4s`, []byte{}) + // 添加新切片到下载队列 + download_seq = append(download_seq, m4s_links...) - // 结束 - if p.Checkfile().IsExist(save_path + "0.m3u8.dtmp") { - f := p.File() - f.FileWR(p.Filel{ - File: save_path + "0.m3u8.dtmp", - Loc: -1, - Context: []interface{}{"#EXT-X-ENDLIST"}, - }) - p.FileMove(save_path+"0.m3u8.dtmp", save_path+"0.m3u8") - } + // 添加m3u8字节 + p.File().FileWR(p.Filel{ + File: t.Current_save_path + "0.m3u8.dtmp", + Loc: -1, + Context: []interface{}{m3u8_addon}, + }) + } + + // 发送空字节会导致流服务终止 + t.Newst_m4s.Push_tag(`m4s`, []byte{}) + + // 结束 + if p.Checkfile().IsExist(t.Current_save_path + "0.m3u8.dtmp") { + f := p.File() + f.FileWR(p.Filel{ + File: t.Current_save_path + "0.m3u8.dtmp", + Loc: -1, + Context: []interface{}{"#EXT-X-ENDLIST"}, + }) + p.FileMove(t.Current_save_path+"0.m3u8.dtmp", t.Current_save_path+"0.m3u8") } } -func (t *M4SStream) Start() { +func (t *M4SStream) Start() bool { // 清晰度-1 or 路径存在问题 不保存 if t.config.want_qn == -1 || t.config.save_path == "" { - return + return false } // 状态检测与设置 if t.Status.Islive() { t.log.L(`T: `, `已存在实例`) - return + return false } - t.Status = signal.Init() - defer t.Status.Done() - // 初始化请求池 - t.reqPool = t.common.ReqPool + t.Status = signal.Init() + go func() { + defer t.Status.Done() - // 初始化切片消息 - t.Newst_m4s = msgq.New(15) + // 初始化请求池 + t.reqPool = t.common.ReqPool - // 主循环 - for t.Status.Islive() { - // 是否在直播 - F.Get(&t.common).Get(`Liveing`) - if !t.common.Liveing { - t.log.L(`W: `, `未直播`) - break - } + // 初始化切片消息 + t.Newst_m4s = msgq.New(15) - // 获取 and 检查流地址状态 - if !t.fetchCheckStream() { - time.Sleep(time.Second * 5) - continue - } + // 主循环 + for t.Status.Islive() { + // 是否在直播 + F.Get(&t.common).Get(`Liveing`) + if !t.common.Liveing { + t.log.L(`W: `, `未直播`) + break + } - // 设置均衡负载 - for _, v := range t.common.Live { - if url_struct, e := url.Parse(v); e == nil { - t.stream_hosts.Store(url_struct.Hostname(), nil) + // 获取 and 检查流地址状态 + if !t.fetchCheckStream() { + time.Sleep(time.Second * 5) + continue } - if !t.config.banlance_host { - break + + // 设置均衡负载 + for _, v := range t.common.Live { + if url_struct, e := url.Parse(v); e == nil { + t.stream_hosts.Store(url_struct.Hostname(), nil) + } + if !t.config.banlance_host { + break + } } - } - // 保存流 - t.saveStream() - } + // 保存流 + t.saveStream() + } - t.log.L(`I: `, `结束录制(`+strconv.Itoa(t.common.Roomid)+`)`) - t.exitSign.Done() + t.log.L(`I: `, `结束录制(`+strconv.Itoa(t.common.Roomid)+`)`) + t.exitSign.Done() + }() + return true } func (t *M4SStream) Stop() { diff --git a/Reply/ws_msg/LIVE.go b/Reply/ws_msg/LIVE.go new file mode 100644 index 0000000..6d8d5c6 --- /dev/null +++ b/Reply/ws_msg/LIVE.go @@ -0,0 +1,16 @@ +package part + +type LIVE struct { + Cmd string `json:"cmd"` + LiveKey string `json:"live_key"` + VoiceBackground string `json:"voice_background"` + SubSessionKey string `json:"sub_session_key"` + LivePlatform string `json:"live_platform"` + LiveModel int `json:"live_model"` + LiveTime int `json:"live_time"` + Roomid int `json:"roomid"` +} + +/* +{"cmd":"LIVE","live_key":"243098417424107244","voice_background":"","sub_session_key":"243098417424107244sub_time:1652355679","live_platform":"pc_link","live_model":0,"live_time":1652355679,"roomid":394988} +*/ -- 2.39.2