return err
}
}
- defer t.msg.Push_tag(`stopRec`, t)
// 移除历史流
if err := t.removeStream(); err != nil {
}
// 保存到文件
- // 确保能接收到第一个帧才开始录制
if t.config.save_to_file {
- t.msg.Pull_tag_only(`firstFrame`, func(ms *M4SStream) (disable bool) {
+ // 确保能接收到第一个帧才开始录制
+ var cancelfirstFrame = t.msg.Pull_tag_only(`firstFrame`, func(ms *M4SStream) (disable bool) {
ms.msg.Push_tag(`cut`, ms)
return true
})
+ defer cancelfirstFrame()
}
// 获取流
t.log.L(`E: `, e)
}
+ // 退出当前方法时,结束录制
+ t.msg.Push_tag(`stopRec`, t)
return
}
func (t *M4SStream) saveStreamFlv() (e error) {
+ // 开始获取
+ r := t.reqPool.Get()
+ defer t.reqPool.Put(r)
+
+ CookieM := make(map[string]string)
+ t.common.Cookie.Range(func(k, v interface{}) bool {
+ CookieM[k.(string)] = v.(string)
+ return true
+ })
+
+ var surl *url.URL
+
+ // 找到可用流服务器
for {
v := t.common.ValidLive()
if v == nil {
return errors.New("未能找到可用流服务器")
}
- surl, err := url.Parse(v.Url)
+ var err error
+ surl, err = url.Parse(v.Url)
if err != nil {
t.log.L(`E: `, err)
e = err
return
}
- CookieM := make(map[string]string)
- t.common.Cookie.Range(func(k, v interface{}) bool {
- CookieM[k.(string)] = v.(string)
- return true
- })
-
- //开始获取
- r := t.reqPool.Get()
//检查
if e := r.Reqf(reqf.Rval{
Url: surl.String(),
continue
}
- cancelC, cancel := context.WithCancel(context.Background())
- {
- go func() {
- tsc, tscf := t.Status.WaitC()
- defer tscf()
+ break
+ }
- select {
- //停止录制
- case <-tsc:
- cancel()
- //当前连接终止
- case <-cancelC.Done():
- }
- }()
+ cancelC, cancel := context.WithCancel(context.Background())
+ defer cancel()
+ {
+ go func() {
+ tsc, tscf := t.Status.WaitC()
+ defer tscf()
- pipe := pio.NewPipe()
- var (
- leastReadUnix atomic.Int64
- readTO int64 = 5
- )
- leastReadUnix.Store(time.Now().Unix())
+ select {
+ //停止录制
+ case <-tsc:
+ cancel()
+ //当前连接终止
+ case <-cancelC.Done():
+ }
+ }()
+
+ pipe := pio.NewPipe()
+ var (
+ leastReadUnix atomic.Int64
+ readTO int64 = 5
+ )
+ leastReadUnix.Store(time.Now().Unix())
- // read timeout
- go func() {
- timer := time.NewTicker(time.Duration(readTO * int64(time.Second)))
- defer timer.Stop()
+ // read timeout
+ go func() {
+ timer := time.NewTicker(time.Duration(readTO * int64(time.Second)))
+ defer timer.Stop()
- for {
- select {
- case <-cancelC.Done():
+ for {
+ select {
+ case <-cancelC.Done():
+ return
+ case curT := <-timer.C:
+ if curT.Unix()-leastReadUnix.Load() > readTO {
+ t.log.L(`W: `, fmt.Sprintf("%vs未接收到有效数据", readTO))
+ // 5s未接收到任何数据
+ cancel()
return
- case curT := <-timer.C:
- if curT.Unix()-leastReadUnix.Load() > readTO {
- t.log.L(`W: `, fmt.Sprintf("%vs未接收到有效数据", readTO))
- // 5s未接收到任何数据
+ }
+ if v, ok := c.C.K_v.LoadV(`直播流清晰度`).(float64); ok {
+ if t.config.want_qn != int(v) {
+ t.log.L(`I: `, "直播流清晰度改变:", t.common.Qn[t.config.want_qn], "=>", t.common.Qn[int(v)])
+ t.config.want_qn = int(v)
cancel()
return
}
- if v, ok := c.C.K_v.LoadV(`直播流清晰度`).(float64); ok {
- if t.config.want_qn != int(v) {
- t.log.L(`I: `, "直播流清晰度改变:", t.common.Qn[t.config.want_qn], "=>", t.common.Qn[int(v)])
- t.config.want_qn = int(v)
- cancel()
- return
- }
- }
}
}
- }()
-
- // read
- go func() {
- var (
- ticker = time.NewTicker(time.Second)
- buff = slice.New[byte]()
- keyframe = slice.New[byte]()
- buf = make([]byte, 1<<16)
- frameCount = 0
- )
-
- for {
- n, e := pipe.Read(buf)
- _ = buff.Append(buf[:n])
+ }
+ }()
+
+ // read
+ go func() {
+ var (
+ ticker = time.NewTicker(time.Second)
+ buff = slice.New[byte]()
+ keyframe = slice.New[byte]()
+ buf = make([]byte, 1<<16)
+ frameCount = 0
+ )
+
+ for {
+ n, e := pipe.Read(buf)
+ _ = buff.Append(buf[:n])
+ if e != nil {
+ cancel()
+ break
+ }
+
+ skip := true
+ select {
+ case <-ticker.C:
+ skip = false
+ default:
+ }
+ if skip {
+ continue
+ }
+
+ if !buff.IsEmpty() {
+ keyframe.Reset()
+ front_buf, last_available_offset, e := Search_stream_tag(buff.GetPureBuf(), keyframe)
if e != nil {
- cancel()
- break
+ if strings.Contains(e.Error(), `no found available tag`) {
+ continue
+ }
+ //丢弃所有数据
+ buff.Reset()
}
-
- skip := true
- select {
- case <-ticker.C:
- skip = false
- default:
+ // 存在有效数据
+ if len(front_buf) != 0 || keyframe.Size() != 0 {
+ leastReadUnix.Store(time.Now().Unix())
}
- if skip {
- continue
+ if len(front_buf) != 0 && len(t.first_buf) == 0 {
+ t.first_buf = make([]byte, len(front_buf))
+ copy(t.first_buf, front_buf)
+ // fmt.Println("write front_buf")
+ t.Stream_msg.PushLock_tag(`data`, t.first_buf)
+ t.msg.Push_tag(`load`, t)
}
-
- if !buff.IsEmpty() {
- keyframe.Reset()
- front_buf, last_available_offset, e := Search_stream_tag(buff.GetPureBuf(), keyframe)
- if e != nil {
- if strings.Contains(e.Error(), `no found available tag`) {
- continue
- }
- //丢弃所有数据
- buff.Reset()
- }
- // 存在有效数据
- if len(front_buf) != 0 || keyframe.Size() != 0 {
- leastReadUnix.Store(time.Now().Unix())
- }
- if len(front_buf) != 0 && len(t.first_buf) == 0 {
- t.first_buf = make([]byte, len(front_buf))
- copy(t.first_buf, front_buf)
- // fmt.Println("write front_buf")
- t.Stream_msg.PushLock_tag(`data`, t.first_buf)
- t.msg.Push_tag(`load`, t)
- }
- if keyframe.Size() != 0 {
- if len(t.first_buf) == 0 {
- t.log.L(`W: `, `flv未接收到起始段`)
- cancel()
- break
- }
- t.bootBufPush(keyframe.GetPureBuf())
- keyframe.Reset()
- t.Stream_msg.PushLock_tag(`data`, t.boot_buf)
- frameCount += 1
- if frameCount == 1 {
- t.msg.Push_tag(`firstFrame`, t)
- }
+ if keyframe.Size() != 0 {
+ if len(t.first_buf) == 0 {
+ t.log.L(`W: `, `flv未接收到起始段`)
+ cancel()
+ break
}
- if last_available_offset > 1 {
- // fmt.Println("write Sync")
- _ = buff.RemoveFront(last_available_offset - 1)
+ t.bootBufPush(keyframe.GetPureBuf())
+ keyframe.Reset()
+ t.Stream_msg.PushLock_tag(`data`, t.boot_buf)
+ frameCount += 1
+ if frameCount == 1 {
+ t.msg.Push_tag(`firstFrame`, t)
}
}
+ if last_available_offset > 1 {
+ // fmt.Println("write Sync")
+ _ = buff.RemoveFront(last_available_offset - 1)
+ }
}
+ }
- buf = nil
- buff.Reset()
-
- ticker.Stop()
- }()
-
- t.log.L(`I: `, `flv下载开始`)
-
- _ = r.Reqf(reqf.Rval{
- Ctx: cancelC,
- Url: surl.String(),
- SaveToPipe: pipe,
- NoResponse: true,
- Async: true,
- Proxy: t.common.Proxy,
- WriteLoopTO: int(readTO)*1000*2 + 1,
- Header: map[string]string{
- `Host`: surl.Host,
- `User-Agent`: `Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:103.0) Gecko/20100101 Firefox/103.0`,
- `Accept`: `*/*`,
- `Accept-Language`: `zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2`,
- `Origin`: `https://live.bilibili.com`,
- `Connection`: `keep-alive`,
- `Pragma`: `no-cache`,
- `Cache-Control`: `no-cache`,
- `Referer`: "https://live.bilibili.com/",
- `Cookie`: reqf.Map_2_Cookies_String(CookieM),
- },
- })
- if err := r.Wait(); err != nil && !errors.Is(err, io.EOF) {
- if reqf.IsCancel(err) {
- t.log.L(`I: `, `flv下载停止`)
- } else if err != nil && !reqf.IsTimeout(err) {
- e = err
- t.log.L(`E: `, `flv下载失败:`, err)
- }
+ buf = nil
+ buff.Reset()
+
+ ticker.Stop()
+ }()
+
+ t.log.L(`I: `, `flv下载开始`)
+
+ _ = r.Reqf(reqf.Rval{
+ Ctx: cancelC,
+ Url: surl.String(),
+ SaveToPipe: pipe,
+ NoResponse: true,
+ Async: true,
+ Proxy: t.common.Proxy,
+ WriteLoopTO: int(readTO)*1000*2 + 1,
+ Header: map[string]string{
+ `Host`: surl.Host,
+ `User-Agent`: `Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:103.0) Gecko/20100101 Firefox/103.0`,
+ `Accept`: `*/*`,
+ `Accept-Language`: `zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2`,
+ `Origin`: `https://live.bilibili.com`,
+ `Connection`: `keep-alive`,
+ `Pragma`: `no-cache`,
+ `Cache-Control`: `no-cache`,
+ `Referer`: "https://live.bilibili.com/",
+ `Cookie`: reqf.Map_2_Cookies_String(CookieM),
+ },
+ })
+ if err := r.Wait(); err != nil && !errors.Is(err, io.EOF) {
+ if reqf.IsCancel(err) {
+ t.log.L(`I: `, `flv下载停止`)
+ } else if err != nil && !reqf.IsTimeout(err) {
+ e = err
+ t.log.L(`E: `, `flv下载失败:`, err)
}
- v.DisableAuto()
}
- cancel()
- t.reqPool.Put(r)
- t.Stream_msg.PushLock_tag(`close`, nil)
}
+
+ return
}
func (t *M4SStream) saveStreamM4s() (e error) {
r := t.reqPool.Get()
defer t.reqPool.Put(r)
reqConfig := reqf.Rval{
- Url: link.Url,
- Timeout: 3000,
- Proxy: t.common.Proxy,
+ Url: link.Url,
+ Timeout: 3000,
+ WriteLoopTO: 5000,
+ Proxy: t.common.Proxy,
Header: map[string]string{
`Connection`: `close`,
},
download_seq = append(download_seq, m4s_links...)
}
- // 发送空字节会导致流服务终止
- t.Stream_msg.PushLock_tag(`close`, nil)
-
return
}
// 设置事件
// 当录制停止时,取消全部录制
- mainContextC, mainCancle := context.WithCancel(context.Background())
+ mainContextC, maincancel := context.WithCancel(context.Background())
if t.Callback_stopRec != nil {
- t.msg.Pull_tag_only("stopRec", func(ms *M4SStream) (disable bool) {
+ cancel := t.msg.Pull_tag_only(`stopRec`, func(ms *M4SStream) (disable bool) {
ms.Callback_stopRec(ms)
return false
})
+ defer cancel()
}
- t.msg.Pull_tag_only("stop", func(ms *M4SStream) (disable bool) {
+ cancel := t.msg.Pull_tag_only("stop", func(ms *M4SStream) (disable bool) {
if ms.Callback_stop != nil {
ms.Callback_stop(ms)
}
- mainCancle()
+ maincancel()
t.msg.ClearAll()
return true
})
+ defer cancel()
if t.config.save_to_file {
var fc funcCtrl.FlashFunc
- t.msg.Pull_tag_async(map[string]func(*M4SStream) (disable bool){
+ cancel := t.msg.Pull_tag_async(map[string]func(*M4SStream) (disable bool){
`cut`: func(ms *M4SStream) (disable bool) {
// 当cut时,取消上次录制
- contextC, cancle := context.WithCancel(mainContextC)
- fc.FlashWithCallback(cancle)
+ contextC, cancel := context.WithCancel(mainContextC)
+ fc.FlashWithCallback(cancel)
// 当stopRec时,取消录制
- ms.msg.Pull_tag_only("stopRec", func(_ *M4SStream) (disable bool) {
- cancle()
+ cancelMsg := ms.msg.Pull_tag_only(`stopRec`, func(_ *M4SStream) (disable bool) {
+ cancel()
return true
})
+ defer cancelMsg()
l := ms.log.Base_add(`文件保存`)
startf := func(_ *M4SStream) error {
}
duration := time.Since(startT)
+ // 结束,发送空值停止直播回放
+ t.Stream_msg.PushLock_tag(`data`, []byte{})
+
//指定房间录制回调
if v, ok := ms.common.K_v.LoadV("指定房间录制回调").([]any); ok && len(v) > 0 {
l := l.Base(`录制回调`)
return false
},
})
+ defer cancel()
}
- defer t.msg.Push_tag(`stop`, t)
-
// 主循环
for t.Status.Islive() {
// 是否在直播
}
t.log.L(`I: `, `结束录制(`+strconv.Itoa(t.common.Roomid)+`)`)
+
+ // 退出
+ t.msg.Push_tag(`stop`, t)
t.exitSign.Done()
}()
return true
}
_, _ = f.Write(t.getFirstBuf(), true)
- t.Stream_msg.Pull_tag_async(map[string]func([]byte) bool{
+ cancelRec := t.Stream_msg.Pull_tag_async(map[string]func([]byte) bool{
`data`: func(b []byte) bool {
select {
case <-contextC.Done():
},
})
<-contextC.Done()
+ cancelRec()
if e := stopFunc(t); e != nil {
return e
contextC, cancel := context.WithCancel(r.Context())
//
- t.Stream_msg.Pull_tag_async(map[string]func([]byte) bool{
+ cancelRec := t.Stream_msg.Pull_tag_async(map[string]func([]byte) bool{
`data`: func(b []byte) bool {
select {
case <-contextC.Done():
return true
},
})
-
<-contextC.Done()
+ cancelRec()
if e := stopFunc(t); e != nil {
return e
var stop = sys.Sys().PreventSleep()
defer stop.Done()
- var interrupt_chan = c.C.Danmu_Main_mq.Pull_tag_chan(`interrupt`, 2, context.Background())
+ // 用户中断
+ var cancelInterrupt, interrupt_chan = c.C.Danmu_Main_mq.Pull_tag_chan(`interrupt`, 2, context.Background())
+ defer cancelInterrupt()
//ctrl+c退出
go func() {
reply.SaveToJson.Init()
//使用带tag的消息队列在功能间传递消息
- c.C.Danmu_Main_mq.Pull_tag(msgq.FuncMap{
- `change_room`: func(_ any) bool { //房间改变
- c.C.Rev = 0.0 // 营收
- c.C.Renqi = 1 // 人气置1
- c.C.Watched = 0 // 观看人数
- c.C.OnlineNum = 0 // 在线人数
- c.C.GuardNum = 0 // 舰长数
- c.C.Note = `` // 分区排行
- c.C.Uname = `` // 主播id
- c.C.Title = ``
- c.C.Wearing_FansMedal = 0
- return false
- },
- `c.Rev_add`: func(data any) bool { //收入
- c.C.Rev += data.(float64)
- return false
- },
- `c.Renqi`: func(data any) bool { //人气更新
- if tmp, ok := data.(int); ok {
- c.C.Renqi = tmp
- }
- return false
- },
- `gtk_close`: func(_ any) bool { //gtk关闭信号
- c.C.Danmu_Main_mq.PushLock_tag(`interrupt`, nil)
- return false
- },
- `pm`: func(data any) bool { //私信
- if tmp, ok := data.(send.Pm_item); ok {
- if e := send.Send_pm(tmp.Uid, tmp.Msg); e != nil {
- danmulog.Base_add(`私信`).L(`E: `, e)
+ {
+ var cancelfunc = c.C.Danmu_Main_mq.Pull_tag(msgq.FuncMap{
+ `change_room`: func(_ any) bool { //房间改变
+ c.C.Rev = 0.0 // 营收
+ c.C.Renqi = 1 // 人气置1
+ c.C.Watched = 0 // 观看人数
+ c.C.OnlineNum = 0 // 在线人数
+ c.C.GuardNum = 0 // 舰长数
+ c.C.Note = `` // 分区排行
+ c.C.Uname = `` // 主播id
+ c.C.Title = ``
+ c.C.Wearing_FansMedal = 0
+ return false
+ },
+ `c.Rev_add`: func(data any) bool { //收入
+ c.C.Rev += data.(float64)
+ return false
+ },
+ `c.Renqi`: func(data any) bool { //人气更新
+ if tmp, ok := data.(int); ok {
+ c.C.Renqi = tmp
}
- }
- return false
- },
- })
+ return false
+ },
+ `gtk_close`: func(_ any) bool { //gtk关闭信号
+ c.C.Danmu_Main_mq.PushLock_tag(`interrupt`, nil)
+ return false
+ },
+ `pm`: func(data any) bool { //私信
+ if tmp, ok := data.(send.Pm_item); ok {
+ if e := send.Send_pm(tmp.Uid, tmp.Msg); e != nil {
+ danmulog.Base_add(`私信`).L(`E: `, e)
+ }
+ }
+ return false
+ },
+ })
+ defer cancelfunc()
+ }
for exit_sign := true; exit_sign; {
if c.C.Roomid == 0 {
fmt.Println("回车查看指令")
- ctx, cancle := context.WithCancel(context.Background())
+ ctx, cancel := context.WithCancel(context.Background())
+ cancel1, c := c.C.Danmu_Main_mq.Pull_tag_chan(`change_room`, 1, ctx)
select {
- case <-c.C.Danmu_Main_mq.Pull_tag_chan(`change_room`, 1, ctx):
+ case <-c:
case <-interrupt_chan:
exit_sign = false
}
- cancle()
+ cancel1()
+ cancel()
} else {
fmt.Print("房间号: ", strconv.Itoa(c.C.Roomid), "\n")
}
Msg: F.HelloGen(c.C.Roomid, c.C.Token),
})
waitCheckAuth, cancel := context.WithTimeout(context.Background(), 5*time.Second)
- wsmsg.Pull_tag_only(`rec`, func(wm *ws.WsMsg) (disable bool) {
+ doneAuth := wsmsg.Pull_tag_only(`rec`, func(wm *ws.WsMsg) (disable bool) {
if F.HelloChe(wm.Msg) {
cancel()
}
return true
})
<-waitCheckAuth.Done()
+ doneAuth()
if err := waitCheckAuth.Err(); errors.Is(err, context.DeadlineExceeded) {
danmulog.L(`E: `, "连接验证失败")
i += 1
reply.Gui_show(`房间标题: `+c.C.Title, `0room`)
}
- wsmsg.Pull_tag(map[string]func(*ws.WsMsg) (disable bool){
+ // 处理ws消息
+ var cancelDeal = wsmsg.Pull_tag(map[string]func(*ws.WsMsg) (disable bool){
`rec`: func(wm *ws.WsMsg) (disable bool) {
go reply.Reply(wm.Msg)
return false
//当前ws
{
- c.C.Danmu_Main_mq.Pull_tag(msgq.FuncMap{
+ // 处理各种指令
+ var cancelfunc = c.C.Danmu_Main_mq.Pull_tag(msgq.FuncMap{
`interrupt`: func(_ any) (disable bool) {
exitloop = true
exit_sign = false
},
})
- <-wsmsg.Pull_tag_chan(`exit`, 1, context.Background())
+ {
+ cancel, c := wsmsg.Pull_tag_chan(`exit`, 1, context.Background())
+ <-c
+ cancel()
+ }
+
+ cancelfunc()
time.Sleep(time.Second)
}
+ cancelDeal()
}
time.Sleep(time.Second)
}