demo/cpu.pprof
demo/qr.png
demo/live
+demo/main.exe
-package F
+package Cmd
import (
"bufio"
"time"
c "github.com/qydysky/bili_danmu/CV"
+ F "github.com/qydysky/bili_danmu/F"
+ reply "github.com/qydysky/bili_danmu/Reply"
send "github.com/qydysky/bili_danmu/Send"
)
cmdlog.L(`W: `, "不支持功能键")
} else if inputs[0] == 32 { // 开头
//录制切换
- if strings.Contains(inputs, ` rec`) && c.C.Roomid != 0 {
- if !c.C.Liveing {
- cmdlog.L(`W: `, "不能切换录制状态,未在直播")
- continue
+ if strings.Contains(inputs, ` rec`) {
+ if len(inputs) > 4 {
+ if room, err := strconv.Atoi(inputs[4:]); err == nil {
+ c.C.Danmu_Main_mq.Push_tag(`savestream`, room)
+ continue
+ }
+ cmdlog.L(`W: `, "输入错误", inputs)
+ } else {
+ c.C.Danmu_Main_mq.Push_tag(`savestream`, c.C.Roomid)
}
- c.C.Danmu_Main_mq.Push_tag(`savestream`, nil)
continue
}
//直播间切换
cmdlog.L(`W: `, "输入错误", inputs)
continue
}
- for k, v := range Feed_list() {
+ for k, v := range F.Feed_list() {
liveList[` live`+strconv.Itoa(k)] = v.Roomid
- fmt.Printf("%d\t%s\n\t\t\t%s\n", k, v.Uname, v.Title)
+ fmt.Printf("%d\t%s(%d)\n\t\t\t%s\n", k, v.Uname, v.Roomid, v.Title)
}
fmt.Println("回复' live(序号)'进入直播间")
fmt.Print("\n")
continue
}
//获取cookie
- Get(&c.C).Get(`Cookie`)
+ F.Get(&c.C).Get(`Cookie`)
continue
}
continue
}
//获取小心心
- go F_x25Kn()
+ go F.F_x25Kn()
continue
}
}
fmt.Print("\n")
- for k, v := range SearchUP(inputs[7:]) {
+ for k, v := range F.SearchUP(inputs[7:]) {
liveList[` live`+strconv.Itoa(k)] = v.Roomid
if v.Is_live {
fmt.Printf("%d\t%s\t%s\n", k, `☁`, v.Uname)
//当前直播间信息
if strings.Contains(inputs, ` room`) && c.C.Roomid != 0 {
fmt.Print("\n")
- fmt.Println("当前直播间信息")
+ fmt.Println("当前直播间(" + strconv.Itoa(c.C.Roomid) + ")信息")
{
living := `未在直播`
if c.C.Liveing {
if c.C.Stream_url != "" {
fmt.Println(`直播Web服务:`, c.C.Stream_url)
}
+ if reply.StreamOStatus(c.C.Roomid) {
+ fmt.Println(`正在录制当前房间`)
+ } else {
+ fmt.Println(`未在录制当前房间`)
+ }
+
+ var array = reply.StreamOCommon(-1)
+ if len(array) > 1 {
+ fmt.Println(`正在录制的其他房间:`)
+ for _, v := range array {
+ fmt.Println("\t" + v.Uname + "(" + strconv.Itoa(v.Roomid) + ") " + v.Title)
+ }
+ }
+ fmt.Println("输入` rec` 来启停当前房间录制, 输入` rec房间号` 来启停其他录制")
+
fmt.Print("\n")
continue
}
//设定字幕文件名,为""时停止输出
-func Ass_f(file string, st time.Time) {
+func Ass_f(save_path string, file string, st time.Time) {
ass.file = file
if file == "" {
return
}
- if rel, err := filepath.Rel(streamO.config.save_path, ass.file); err == nil {
+ if rel, err := filepath.Rel(save_path, ass.file); err == nil {
c.C.Log.Base(`Ass`).L(`I: `, "保存到", rel+".ass")
} else {
c.C.Log.Base(`Ass`).L(`I: `, "保存到", ass.file+".ass")
//hls
//https://datatracker.ietf.org/doc/html/draft-pantos-http-live-streaming
-var streamO = new(M4SStream)
+var streamO psync.Map
+
+func StreamOCommon(roomid int) (array []c.Common) {
+ if roomid != -1 { //返回特定房间
+ if v, ok := streamO.Load(roomid); ok {
+ return []c.Common{v.(*M4SStream).Common()}
+ }
+ } else { //返回所有
+ streamO.Range(func(k, v interface{}) bool {
+ array = append(array, v.(*M4SStream).Common())
+ return true
+ })
+ }
+ return
+}
func init() {
//使用带tag的消息队列在功能间传递消息
c.C.Danmu_Main_mq.Pull_tag(msgq.FuncMap{
`savestream`: func(data interface{}) bool {
- if streamO.Status.Islive() {
- streamO.Stop()
+ if roomid, ok := data.(int); ok {
+ if v, ok := streamO.Load(roomid); ok {
+ if v.(*M4SStream).Status.Islive() {
+ v.(*M4SStream).Stop()
+ streamO.Delete(roomid)
+ }
+ } else {
+ var (
+ tmp = new(M4SStream)
+ common = c.C
+ )
+ common.Roomid = roomid
+ tmp.LoadConfig(common, c.C.Log)
+ streamO.Store(roomid, tmp)
+ go tmp.Start()
+ }
} else {
- streamO.LoadConfig(&c.C.K_v, c.C.Log)
- go streamO.Start()
+ flog.L(`E: `, `savestream必须为数字房间号`)
}
return false
},
})
}
-func StreamOStop() {
- if streamO.Status.Islive() {
- streamO.Stop()
+func StreamOStatus(roomid int) bool {
+ v, ok := streamO.Load(roomid)
+ return ok && (v.(*M4SStream).Status.Islive() || v.(*M4SStream).exitSign.Islive())
+}
+
+func StreamOStop(roomid int) {
+ if roomid != -1 { // 针对某房间
+ if v, ok := streamO.Load(roomid); ok {
+ if v.(*M4SStream).Status.Islive() {
+ v.(*M4SStream).Stop()
+ }
+ }
+ } else { //所有房间
+ streamO.Range(func(_, v interface{}) bool {
+ if v.(*M4SStream).Status.Islive() {
+ v.(*M4SStream).Stop()
+ }
+ return true
+ })
}
}
w.Header().Set("Connection", "keep-alive")
w.Header().Set("Content-Transfer-Encoding", "binary")
- if len(streamO.getFirstM4S()) == 0 {
+ // 获取当前房间的
+ var currentStreamO *M4SStream
+ if v, ok := streamO.Load(c.C.Roomid); ok {
+ currentStreamO = v.(*M4SStream)
+ }
+
+ if len(currentStreamO.getFirstM4S()) == 0 {
w.Header().Set("Retry-After", "1")
w.WriteHeader(http.StatusServiceUnavailable)
return
}
//写入hls头
- if _, err := w.Write(streamO.getFirstM4S()); err != nil {
+ if _, err := w.Write(currentStreamO.getFirstM4S()); err != nil {
return
} else if flushSupport {
flusher.Flush()
cancel := make(chan struct{})
//hls切片
- streamO.Newst_m4s.Pull_tag(map[string]func(interface{}) bool{
+ currentStreamO.Newst_m4s.Pull_tag(map[string]func(interface{}) bool{
`m4s`: func(data interface{}) bool {
if b, ok := data.([]byte); ok {
if _, err := w.Write(b); err != nil {
{ //附加功能 obs结束 `savestream`结束
Obs_R(false)
Obsf(false)
- streamO.Stop()
go ShowRevf()
c.C.Liveing = false
}
if p.Sys().Type(roomid) == "float64" {
+ // 停止此房间录制
+ StreamOStop(int(roomid.(float64)))
+
Gui_show(Itos([]interface{}{"房间", roomid, "下播了"}), "0room")
msglog.L(`I: `, "房间", int(roomid.(float64)), "下播了")
return
{ //附加功能 obs录播
Obsf(true)
Obs_R(true)
- go streamO.Start()
}
{
c.C.Rev = 0.0 //营收
c.C.Live_Start_Time = time.Now() //开播h时间
}
if p.Sys().Type(roomid) == "float64" {
+ //开始录制
+ go func() {
+ if v, ok := c.C.K_v.LoadV(`直播流当前房间开播时停止其他流`).(bool); ok && v {
+ StreamOStop(-1) //停止其他房间录制
+ }
+ c.C.Danmu_Main_mq.Push_tag(`savestream`, roomid)
+ }()
+
Gui_show(Itos([]interface{}{"房间", roomid, "开播了"}), "0room")
msglog.L(`I: `, "房间", int(roomid.(float64)), "开播了")
return
)
type M4SStream struct {
- Status *signal.Signal //IsLive()是否运行中
- exitSign *signal.Signal //IsLive()是否等待退出中
- log *log.Log_interface
- config M4SStream_Config //配置
- stream_last_modified time.Time //流地址更新时间
- stream_expires int64 //流到期时间
- last_m4s *m4s_link_item //最后一个切片
- stream_hosts sync.Map //使用的流服务器
- Newst_m4s *msgq.Msgq //m4s消息 tag:m4s
- first_m4s []byte //m4s起始块
+ Status *signal.Signal //IsLive()是否运行中
+ exitSign *signal.Signal //IsLive()是否等待退出中
+ log *log.Log_interface //日志
+ config M4SStream_Config //配置
+ stream_last_modified time.Time //流地址更新时间
+ stream_expires int64 //流到期时间
+ last_m4s *m4s_link_item //最后一个切片
+ stream_hosts sync.Map //使用的流服务器
+ Newst_m4s *msgq.Msgq //m4s消息 tag:m4s
+ first_m4s []byte //m4s起始块
+ common c.Common //通用配置副本
}
type M4SStream_Config struct {
return strconv.Atoi(base[:len(base)-4])
}
-func (t *M4SStream) LoadConfig(kv *sync.Map, l *log.Log_interface) {
+func (t *M4SStream) Common() c.Common {
+ return t.common
+}
+
+func (t *M4SStream) LoadConfig(common c.Common, l *log.Log_interface) {
//读取配置
- if path, ok := kv.LoadV("直播流保存位置").(string); ok {
+ if path, ok := common.K_v.LoadV("直播流保存位置").(string); ok {
if path, err := filepath.Abs(path); err == nil {
t.config.save_path = path + "/"
}
}
- if v, ok := kv.LoadV(`直播hls流缓冲`).(float64); ok && v > 0 {
+ if v, ok := common.K_v.LoadV(`直播hls流缓冲`).(float64); ok && v > 0 {
t.config.bufsize = int(v)
}
- if v, ok := kv.LoadV(`直播hls流均衡`).(bool); ok {
+ if v, ok := common.K_v.LoadV(`直播hls流均衡`).(bool); ok {
t.config.banlance_host = v
}
- if v, ok := kv.LoadV(`直播流清晰度`).(float64); ok {
+ if v, ok := common.K_v.LoadV(`直播流清晰度`).(float64); ok {
t.config.want_qn = int(v)
}
- if v, ok := kv.LoadV(`直播流类型`).(string); ok {
+ if v, ok := common.K_v.LoadV(`直播流类型`).(string); ok {
t.config.want_type = v
}
+
+ t.common = common
t.log = l.Base(`直播流保存`)
}
return t.first_m4s
}
-func (t *M4SStream) fetchCheckStream(tmpc *c.Common) bool {
+func (t *M4SStream) fetchCheckStream() bool {
// 获取流地址
- tmpc.Live_want_qn = t.config.want_qn
- if F.Get(tmpc).Get(`Live`); len(tmpc.Live) == 0 {
+ t.common.Live_want_qn = t.config.want_qn
+ if F.Get(&t.common).Get(`Live`); len(t.common.Live) == 0 {
return false
}
// 保存流地址过期时间
- if m3u8_url, err := url.Parse(tmpc.Live[0]); err != nil {
+ if m3u8_url, err := url.Parse(t.common.Live[0]); err != nil {
t.log.L(`E: `, err.Error())
return false
} else {
// 检查是否可以获取
CookieM := make(map[string]string)
- tmpc.Cookie.Range(func(k, v interface{}) bool {
+ t.common.Cookie.Range(func(k, v interface{}) bool {
CookieM[k.(string)] = v.(string)
return true
})
var req = reqf.New()
if e := req.Reqf(reqf.Rval{
- Url: tmpc.Live[0],
+ Url: t.common.Live[0],
Retry: 10,
SleepTime: 1000,
- Proxy: tmpc.Proxy,
+ Proxy: t.common.Proxy,
Header: map[string]string{
`Cookie`: reqf.Map_2_Cookies_String(CookieM),
},
return true
}
-func (t *M4SStream) fetchParseM3U8(tmpc *c.Common) (m4s_links []*m4s_link_item, m3u8_addon []byte) {
+func (t *M4SStream) fetchParseM3U8() (m4s_links []*m4s_link_item, m3u8_addon []byte) {
// 请求解析m3u8内容
- for _, v := range tmpc.Live {
+ for _, v := range t.common.Live {
m3u8_url, err := url.Parse(v)
if err != nil {
t.log.L(`E: `, err.Error())
return
}
-func (t *M4SStream) saveStream(tmpc *c.Common) {
+func (t *M4SStream) saveStream() {
// 设置保存路径
- var save_path = t.config.save_path + strconv.Itoa(tmpc.Roomid) + "_" + time.Now().Format("2006_01_02_15-04-05-000") + `/`
+ var save_path = t.config.save_path + strconv.Itoa(t.common.Roomid) + "_" + time.Now().Format("2006_01_02_15-04-05-000") + `/`
// 显示保存位置
if rel, err := filepath.Rel(t.config.save_path, save_path); err == nil {
}
// 获取流
- if strings.Contains(tmpc.Live[0], `m3u8`) {
+ if strings.Contains(t.common.Live[0], `m3u8`) {
t.stream_expires = time.Now().Add(time.Minute * 2).Unix() // 流链接过期时间
// 同时下载数限制
ConnectTimeout: 2000,
ReadTimeout: 1000,
Timeout: 2000,
- Proxy: tmpc.Proxy,
+ Proxy: t.common.Proxy,
}); e != nil && !errors.Is(e, io.EOF) {
link.status = 3 // 设置切片状态为下载失败
} else {
// 刷新流地址
if time.Now().Unix()+60 > t.stream_expires {
- t.fetchCheckStream(tmpc)
+ t.fetchCheckStream()
}
// 获取解析m3u8
- var m4s_links, m3u8_addon = t.fetchParseM3U8(tmpc)
+ var m4s_links, m3u8_addon = t.fetchParseM3U8()
if len(m4s_links) == 0 {
time.Sleep(time.Second)
continue
// 初始化切片消息
t.Newst_m4s = msgq.New(10)
- var tmpc = c.C
-
// 主循环
for t.Status.Islive() {
// 是否在直播
- F.Get(&tmpc).Get(`Liveing`)
- if !tmpc.Liveing {
+ t.log.L(`I: `, t.common.Roomid)
+ F.Get(&t.common).Get(`Liveing`)
+ if !t.common.Liveing {
t.log.L(`T: `, `未直播`)
break
}
// 获取 and 检查流地址状态
- if !t.fetchCheckStream(&tmpc) {
+ if !t.fetchCheckStream() {
time.Sleep(time.Second * 5)
continue
}
// 设置均衡负载
- for _, v := range tmpc.Live {
+ for _, v := range t.common.Live {
if url_struct, e := url.Parse(v); e == nil {
t.stream_hosts.Store(url_struct.Hostname(), nil)
}
}
// 保存流
- t.saveStream(&tmpc)
+ t.saveStream()
}
- t.log.L(`T: `, `结束`)
+ t.log.L(`T: `, `结束`+strconv.Itoa(t.common.Roomid))
t.exitSign.Done()
}
func (t *M4SStream) Stop() {
t.exitSign = signal.Init()
t.Status.Done()
+ t.log.L(`I: `, `正在等待切片下载...`)
t.exitSign.Wait()
}
c "github.com/qydysky/bili_danmu/CV"
F "github.com/qydysky/bili_danmu/F"
+ Cmd "github.com/qydysky/bili_danmu/F/Cmd"
reply "github.com/qydysky/bili_danmu/Reply"
send "github.com/qydysky/bili_danmu/Send"
}
}
//命令行操作 切换房间 发送弹幕
- go F.Cmd()
+ go Cmd.Cmd()
//使用带tag的消息队列在功能间传递消息
c.C.Danmu_Main_mq.Pull_tag(msgq.FuncMap{
{ //附加功能 进房间发送弹幕 直播流保存 营收
go reply.Entry_danmu()
- c.C.Danmu_Main_mq.Push_tag(`savestream`, nil)
+ c.C.Danmu_Main_mq.Push_tag(`savestream`, c.C.Roomid)
go reply.ShowRevf()
//小心心
go F.F_x25Kn()
}
}
{ //附加功能 直播流停止
- reply.StreamOStop()
+ reply.StreamOStop(-1)
reply.Save_to_json(-1, []interface{}{`{}]`})
}
p.Sys().Timeoutf(1)
"直播hls流缓冲": 20,
"直播hls流均衡-help":"true:使用所有hls服务器",
"直播hls流均衡": true,
+ "直播流当前房间开播时停止其他流": true,
"直播Web服务口":0,
"ass-help": "只有保存直播流时才考虑生成ass,ass编码默认GB18030(可选utf-8)",
"生成Ass弹幕": true,