]> 127.0.0.1 Git - bili_danmu/.git/commitdiff
Fix hls模式panic #81 flv模式间断后的数据未保存 #80
authorqydysky <qydysky@foxmail.com>
Sun, 30 Jul 2023 11:37:57 +0000 (19:37 +0800)
committerqydysky <qydysky@foxmail.com>
Sun, 30 Jul 2023 11:37:57 +0000 (19:37 +0800)
Reply/stream.go
bili_danmu.go
go.mod
go.sum

index 05d9825831c8e05cd2ba60c483e5070e1e24d840..87a8194104df04553a048c575e80eeef1c9fa74c 100644 (file)
@@ -594,7 +594,6 @@ func (t *M4SStream) saveStream() (e error) {
                        return err
                }
        }
-       defer t.msg.Push_tag(`stopRec`, t)
 
        // 移除历史流
        if err := t.removeStream(); err != nil {
@@ -602,12 +601,13 @@ func (t *M4SStream) saveStream() (e error) {
        }
 
        // 保存到文件
-       // 确保能接收到第一个帧才开始录制
        if t.config.save_to_file {
-               t.msg.Pull_tag_only(`firstFrame`, func(ms *M4SStream) (disable bool) {
+               // 确保能接收到第一个帧才开始录制
+               var cancelfirstFrame = t.msg.Pull_tag_only(`firstFrame`, func(ms *M4SStream) (disable bool) {
                        ms.msg.Push_tag(`cut`, ms)
                        return true
                })
+               defer cancelfirstFrame()
        }
 
        // 获取流
@@ -621,17 +621,33 @@ func (t *M4SStream) saveStream() (e error) {
                t.log.L(`E: `, e)
        }
 
+       // 退出当前方法时,结束录制
+       t.msg.Push_tag(`stopRec`, t)
        return
 }
 
 func (t *M4SStream) saveStreamFlv() (e error) {
+       // 开始获取
+       r := t.reqPool.Get()
+       defer t.reqPool.Put(r)
+
+       CookieM := make(map[string]string)
+       t.common.Cookie.Range(func(k, v interface{}) bool {
+               CookieM[k.(string)] = v.(string)
+               return true
+       })
+
+       var surl *url.URL
+
+       // 找到可用流服务器
        for {
                v := t.common.ValidLive()
                if v == nil {
                        return errors.New("未能找到可用流服务器")
                }
 
-               surl, err := url.Parse(v.Url)
+               var err error
+               surl, err = url.Parse(v.Url)
                if err != nil {
                        t.log.L(`E: `, err)
                        e = err
@@ -644,14 +660,6 @@ func (t *M4SStream) saveStreamFlv() (e error) {
                        return
                }
 
-               CookieM := make(map[string]string)
-               t.common.Cookie.Range(func(k, v interface{}) bool {
-                       CookieM[k.(string)] = v.(string)
-                       return true
-               })
-
-               //开始获取
-               r := t.reqPool.Get()
                //检查
                if e := r.Reqf(reqf.Rval{
                        Url:              surl.String(),
@@ -677,169 +685,170 @@ func (t *M4SStream) saveStreamFlv() (e error) {
                        continue
                }
 
-               cancelC, cancel := context.WithCancel(context.Background())
-               {
-                       go func() {
-                               tsc, tscf := t.Status.WaitC()
-                               defer tscf()
+               break
+       }
 
-                               select {
-                               //停止录制
-                               case <-tsc:
-                                       cancel()
-                               //当前连接终止
-                               case <-cancelC.Done():
-                               }
-                       }()
+       cancelC, cancel := context.WithCancel(context.Background())
+       defer cancel()
+       {
+               go func() {
+                       tsc, tscf := t.Status.WaitC()
+                       defer tscf()
 
-                       pipe := pio.NewPipe()
-                       var (
-                               leastReadUnix atomic.Int64
-                               readTO        int64 = 5
-                       )
-                       leastReadUnix.Store(time.Now().Unix())
+                       select {
+                       //停止录制
+                       case <-tsc:
+                               cancel()
+                       //当前连接终止
+                       case <-cancelC.Done():
+                       }
+               }()
+
+               pipe := pio.NewPipe()
+               var (
+                       leastReadUnix atomic.Int64
+                       readTO        int64 = 5
+               )
+               leastReadUnix.Store(time.Now().Unix())
 
-                       // read timeout
-                       go func() {
-                               timer := time.NewTicker(time.Duration(readTO * int64(time.Second)))
-                               defer timer.Stop()
+               // read timeout
+               go func() {
+                       timer := time.NewTicker(time.Duration(readTO * int64(time.Second)))
+                       defer timer.Stop()
 
-                               for {
-                                       select {
-                                       case <-cancelC.Done():
+                       for {
+                               select {
+                               case <-cancelC.Done():
+                                       return
+                               case curT := <-timer.C:
+                                       if curT.Unix()-leastReadUnix.Load() > readTO {
+                                               t.log.L(`W: `, fmt.Sprintf("%vs未接收到有效数据", readTO))
+                                               // 5s未接收到任何数据
+                                               cancel()
                                                return
-                                       case curT := <-timer.C:
-                                               if curT.Unix()-leastReadUnix.Load() > readTO {
-                                                       t.log.L(`W: `, fmt.Sprintf("%vs未接收到有效数据", readTO))
-                                                       // 5s未接收到任何数据
+                                       }
+                                       if v, ok := c.C.K_v.LoadV(`直播流清晰度`).(float64); ok {
+                                               if t.config.want_qn != int(v) {
+                                                       t.log.L(`I: `, "直播流清晰度改变:", t.common.Qn[t.config.want_qn], "=>", t.common.Qn[int(v)])
+                                                       t.config.want_qn = int(v)
                                                        cancel()
                                                        return
                                                }
-                                               if v, ok := c.C.K_v.LoadV(`直播流清晰度`).(float64); ok {
-                                                       if t.config.want_qn != int(v) {
-                                                               t.log.L(`I: `, "直播流清晰度改变:", t.common.Qn[t.config.want_qn], "=>", t.common.Qn[int(v)])
-                                                               t.config.want_qn = int(v)
-                                                               cancel()
-                                                               return
-                                                       }
-                                               }
                                        }
                                }
-                       }()
-
-                       // read
-                       go func() {
-                               var (
-                                       ticker     = time.NewTicker(time.Second)
-                                       buff       = slice.New[byte]()
-                                       keyframe   = slice.New[byte]()
-                                       buf        = make([]byte, 1<<16)
-                                       frameCount = 0
-                               )
-
-                               for {
-                                       n, e := pipe.Read(buf)
-                                       _ = buff.Append(buf[:n])
+                       }
+               }()
+
+               // read
+               go func() {
+                       var (
+                               ticker     = time.NewTicker(time.Second)
+                               buff       = slice.New[byte]()
+                               keyframe   = slice.New[byte]()
+                               buf        = make([]byte, 1<<16)
+                               frameCount = 0
+                       )
+
+                       for {
+                               n, e := pipe.Read(buf)
+                               _ = buff.Append(buf[:n])
+                               if e != nil {
+                                       cancel()
+                                       break
+                               }
+
+                               skip := true
+                               select {
+                               case <-ticker.C:
+                                       skip = false
+                               default:
+                               }
+                               if skip {
+                                       continue
+                               }
+
+                               if !buff.IsEmpty() {
+                                       keyframe.Reset()
+                                       front_buf, last_available_offset, e := Search_stream_tag(buff.GetPureBuf(), keyframe)
                                        if e != nil {
-                                               cancel()
-                                               break
+                                               if strings.Contains(e.Error(), `no found available tag`) {
+                                                       continue
+                                               }
+                                               //丢弃所有数据
+                                               buff.Reset()
                                        }
-
-                                       skip := true
-                                       select {
-                                       case <-ticker.C:
-                                               skip = false
-                                       default:
+                                       // 存在有效数据
+                                       if len(front_buf) != 0 || keyframe.Size() != 0 {
+                                               leastReadUnix.Store(time.Now().Unix())
                                        }
-                                       if skip {
-                                               continue
+                                       if len(front_buf) != 0 && len(t.first_buf) == 0 {
+                                               t.first_buf = make([]byte, len(front_buf))
+                                               copy(t.first_buf, front_buf)
+                                               // fmt.Println("write front_buf")
+                                               t.Stream_msg.PushLock_tag(`data`, t.first_buf)
+                                               t.msg.Push_tag(`load`, t)
                                        }
-
-                                       if !buff.IsEmpty() {
-                                               keyframe.Reset()
-                                               front_buf, last_available_offset, e := Search_stream_tag(buff.GetPureBuf(), keyframe)
-                                               if e != nil {
-                                                       if strings.Contains(e.Error(), `no found available tag`) {
-                                                               continue
-                                                       }
-                                                       //丢弃所有数据
-                                                       buff.Reset()
-                                               }
-                                               // 存在有效数据
-                                               if len(front_buf) != 0 || keyframe.Size() != 0 {
-                                                       leastReadUnix.Store(time.Now().Unix())
-                                               }
-                                               if len(front_buf) != 0 && len(t.first_buf) == 0 {
-                                                       t.first_buf = make([]byte, len(front_buf))
-                                                       copy(t.first_buf, front_buf)
-                                                       // fmt.Println("write front_buf")
-                                                       t.Stream_msg.PushLock_tag(`data`, t.first_buf)
-                                                       t.msg.Push_tag(`load`, t)
-                                               }
-                                               if keyframe.Size() != 0 {
-                                                       if len(t.first_buf) == 0 {
-                                                               t.log.L(`W: `, `flv未接收到起始段`)
-                                                               cancel()
-                                                               break
-                                                       }
-                                                       t.bootBufPush(keyframe.GetPureBuf())
-                                                       keyframe.Reset()
-                                                       t.Stream_msg.PushLock_tag(`data`, t.boot_buf)
-                                                       frameCount += 1
-                                                       if frameCount == 1 {
-                                                               t.msg.Push_tag(`firstFrame`, t)
-                                                       }
+                                       if keyframe.Size() != 0 {
+                                               if len(t.first_buf) == 0 {
+                                                       t.log.L(`W: `, `flv未接收到起始段`)
+                                                       cancel()
+                                                       break
                                                }
-                                               if last_available_offset > 1 {
-                                                       // fmt.Println("write Sync")
-                                                       _ = buff.RemoveFront(last_available_offset - 1)
+                                               t.bootBufPush(keyframe.GetPureBuf())
+                                               keyframe.Reset()
+                                               t.Stream_msg.PushLock_tag(`data`, t.boot_buf)
+                                               frameCount += 1
+                                               if frameCount == 1 {
+                                                       t.msg.Push_tag(`firstFrame`, t)
                                                }
                                        }
+                                       if last_available_offset > 1 {
+                                               // fmt.Println("write Sync")
+                                               _ = buff.RemoveFront(last_available_offset - 1)
+                                       }
                                }
+                       }
 
-                               buf = nil
-                               buff.Reset()
-
-                               ticker.Stop()
-                       }()
-
-                       t.log.L(`I: `, `flv下载开始`)
-
-                       _ = r.Reqf(reqf.Rval{
-                               Ctx:         cancelC,
-                               Url:         surl.String(),
-                               SaveToPipe:  pipe,
-                               NoResponse:  true,
-                               Async:       true,
-                               Proxy:       t.common.Proxy,
-                               WriteLoopTO: int(readTO)*1000*2 + 1,
-                               Header: map[string]string{
-                                       `Host`:            surl.Host,
-                                       `User-Agent`:      `Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:103.0) Gecko/20100101 Firefox/103.0`,
-                                       `Accept`:          `*/*`,
-                                       `Accept-Language`: `zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2`,
-                                       `Origin`:          `https://live.bilibili.com`,
-                                       `Connection`:      `keep-alive`,
-                                       `Pragma`:          `no-cache`,
-                                       `Cache-Control`:   `no-cache`,
-                                       `Referer`:         "https://live.bilibili.com/",
-                                       `Cookie`:          reqf.Map_2_Cookies_String(CookieM),
-                               },
-                       })
-                       if err := r.Wait(); err != nil && !errors.Is(err, io.EOF) {
-                               if reqf.IsCancel(err) {
-                                       t.log.L(`I: `, `flv下载停止`)
-                               } else if err != nil && !reqf.IsTimeout(err) {
-                                       e = err
-                                       t.log.L(`E: `, `flv下载失败:`, err)
-                               }
+                       buf = nil
+                       buff.Reset()
+
+                       ticker.Stop()
+               }()
+
+               t.log.L(`I: `, `flv下载开始`)
+
+               _ = r.Reqf(reqf.Rval{
+                       Ctx:         cancelC,
+                       Url:         surl.String(),
+                       SaveToPipe:  pipe,
+                       NoResponse:  true,
+                       Async:       true,
+                       Proxy:       t.common.Proxy,
+                       WriteLoopTO: int(readTO)*1000*2 + 1,
+                       Header: map[string]string{
+                               `Host`:            surl.Host,
+                               `User-Agent`:      `Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:103.0) Gecko/20100101 Firefox/103.0`,
+                               `Accept`:          `*/*`,
+                               `Accept-Language`: `zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2`,
+                               `Origin`:          `https://live.bilibili.com`,
+                               `Connection`:      `keep-alive`,
+                               `Pragma`:          `no-cache`,
+                               `Cache-Control`:   `no-cache`,
+                               `Referer`:         "https://live.bilibili.com/",
+                               `Cookie`:          reqf.Map_2_Cookies_String(CookieM),
+                       },
+               })
+               if err := r.Wait(); err != nil && !errors.Is(err, io.EOF) {
+                       if reqf.IsCancel(err) {
+                               t.log.L(`I: `, `flv下载停止`)
+                       } else if err != nil && !reqf.IsTimeout(err) {
+                               e = err
+                               t.log.L(`E: `, `flv下载失败:`, err)
                        }
-                       v.DisableAuto()
                }
-               cancel()
-               t.reqPool.Put(r)
-               t.Stream_msg.PushLock_tag(`close`, nil)
        }
+
+       return
 }
 
 func (t *M4SStream) saveStreamM4s() (e error) {
@@ -917,9 +926,10 @@ func (t *M4SStream) saveStreamM4s() (e error) {
                                        r := t.reqPool.Get()
                                        defer t.reqPool.Put(r)
                                        reqConfig := reqf.Rval{
-                                               Url:     link.Url,
-                                               Timeout: 3000,
-                                               Proxy:   t.common.Proxy,
+                                               Url:         link.Url,
+                                               Timeout:     3000,
+                                               WriteLoopTO: 5000,
+                                               Proxy:       t.common.Proxy,
                                                Header: map[string]string{
                                                        `Connection`: `close`,
                                                },
@@ -1108,9 +1118,6 @@ func (t *M4SStream) saveStreamM4s() (e error) {
                download_seq = append(download_seq, m4s_links...)
        }
 
-       // 发送空字节会导致流服务终止
-       t.Stream_msg.PushLock_tag(`close`, nil)
-
        return
 }
 
@@ -1157,35 +1164,38 @@ func (t *M4SStream) Start() bool {
 
                // 设置事件
                // 当录制停止时,取消全部录制
-               mainContextC, mainCancle := context.WithCancel(context.Background())
+               mainContextC, maincancel := context.WithCancel(context.Background())
                if t.Callback_stopRec != nil {
-                       t.msg.Pull_tag_only("stopRec", func(ms *M4SStream) (disable bool) {
+                       cancel := t.msg.Pull_tag_only(`stopRec`, func(ms *M4SStream) (disable bool) {
                                ms.Callback_stopRec(ms)
                                return false
                        })
+                       defer cancel()
                }
-               t.msg.Pull_tag_only("stop", func(ms *M4SStream) (disable bool) {
+               cancel := t.msg.Pull_tag_only("stop", func(ms *M4SStream) (disable bool) {
                        if ms.Callback_stop != nil {
                                ms.Callback_stop(ms)
                        }
-                       mainCancle()
+                       maincancel()
                        t.msg.ClearAll()
                        return true
                })
+               defer cancel()
 
                if t.config.save_to_file {
                        var fc funcCtrl.FlashFunc
-                       t.msg.Pull_tag_async(map[string]func(*M4SStream) (disable bool){
+                       cancel := t.msg.Pull_tag_async(map[string]func(*M4SStream) (disable bool){
                                `cut`: func(ms *M4SStream) (disable bool) {
                                        // 当cut时,取消上次录制
-                                       contextC, cancle := context.WithCancel(mainContextC)
-                                       fc.FlashWithCallback(cancle)
+                                       contextC, cancel := context.WithCancel(mainContextC)
+                                       fc.FlashWithCallback(cancel)
 
                                        // 当stopRec时,取消录制
-                                       ms.msg.Pull_tag_only("stopRec", func(_ *M4SStream) (disable bool) {
-                                               cancle()
+                                       cancelMsg := ms.msg.Pull_tag_only(`stopRec`, func(_ *M4SStream) (disable bool) {
+                                               cancel()
                                                return true
                                        })
+                                       defer cancelMsg()
 
                                        l := ms.log.Base_add(`文件保存`)
                                        startf := func(_ *M4SStream) error {
@@ -1231,6 +1241,9 @@ func (t *M4SStream) Start() bool {
                                        }
                                        duration := time.Since(startT)
 
+                                       // 结束,发送空值停止直播回放
+                                       t.Stream_msg.PushLock_tag(`data`, []byte{})
+
                                        //指定房间录制回调
                                        if v, ok := ms.common.K_v.LoadV("指定房间录制回调").([]any); ok && len(v) > 0 {
                                                l := l.Base(`录制回调`)
@@ -1265,10 +1278,9 @@ func (t *M4SStream) Start() bool {
                                        return false
                                },
                        })
+                       defer cancel()
                }
 
-               defer t.msg.Push_tag(`stop`, t)
-
                // 主循环
                for t.Status.Islive() {
                        // 是否在直播
@@ -1292,6 +1304,9 @@ func (t *M4SStream) Start() bool {
                }
 
                t.log.L(`I: `, `结束录制(`+strconv.Itoa(t.common.Roomid)+`)`)
+
+               // 退出
+               t.msg.Push_tag(`stop`, t)
                t.exitSign.Done()
        }()
        return true
@@ -1319,7 +1334,7 @@ func (t *M4SStream) PusherToFile(contextC context.Context, filepath string, star
        }
 
        _, _ = f.Write(t.getFirstBuf(), true)
-       t.Stream_msg.Pull_tag_async(map[string]func([]byte) bool{
+       cancelRec := t.Stream_msg.Pull_tag_async(map[string]func([]byte) bool{
                `data`: func(b []byte) bool {
                        select {
                        case <-contextC.Done():
@@ -1337,6 +1352,7 @@ func (t *M4SStream) PusherToFile(contextC context.Context, filepath string, star
                },
        })
        <-contextC.Done()
+       cancelRec()
 
        if e := stopFunc(t); e != nil {
                return e
@@ -1388,7 +1404,7 @@ func (t *M4SStream) PusherToHttp(w http.ResponseWriter, r *http.Request, startFu
        contextC, cancel := context.WithCancel(r.Context())
 
        //
-       t.Stream_msg.Pull_tag_async(map[string]func([]byte) bool{
+       cancelRec := t.Stream_msg.Pull_tag_async(map[string]func([]byte) bool{
                `data`: func(b []byte) bool {
                        select {
                        case <-contextC.Done():
@@ -1412,8 +1428,8 @@ func (t *M4SStream) PusherToHttp(w http.ResponseWriter, r *http.Request, startFu
                        return true
                },
        })
-
        <-contextC.Done()
+       cancelRec()
 
        if e := stopFunc(t); e != nil {
                return e
index 1a640e53916e21134791f2cf889e59d20c86aaab..78d7ffcead01d47ced9297aee0fb67a519477b70 100644 (file)
@@ -43,7 +43,9 @@ func Start() {
        var stop = sys.Sys().PreventSleep()
        defer stop.Done()
 
-       var interrupt_chan = c.C.Danmu_Main_mq.Pull_tag_chan(`interrupt`, 2, context.Background())
+       // 用户中断
+       var cancelInterrupt, interrupt_chan = c.C.Danmu_Main_mq.Pull_tag_chan(`interrupt`, 2, context.Background())
+       defer cancelInterrupt()
 
        //ctrl+c退出
        go func() {
@@ -89,53 +91,58 @@ func Start() {
                reply.SaveToJson.Init()
 
                //使用带tag的消息队列在功能间传递消息
-               c.C.Danmu_Main_mq.Pull_tag(msgq.FuncMap{
-                       `change_room`: func(_ any) bool { //房间改变
-                               c.C.Rev = 0.0     // 营收
-                               c.C.Renqi = 1     // 人气置1
-                               c.C.Watched = 0   // 观看人数
-                               c.C.OnlineNum = 0 // 在线人数
-                               c.C.GuardNum = 0  // 舰长数
-                               c.C.Note = ``     // 分区排行
-                               c.C.Uname = ``    // 主播id
-                               c.C.Title = ``
-                               c.C.Wearing_FansMedal = 0
-                               return false
-                       },
-                       `c.Rev_add`: func(data any) bool { //收入
-                               c.C.Rev += data.(float64)
-                               return false
-                       },
-                       `c.Renqi`: func(data any) bool { //人气更新
-                               if tmp, ok := data.(int); ok {
-                                       c.C.Renqi = tmp
-                               }
-                               return false
-                       },
-                       `gtk_close`: func(_ any) bool { //gtk关闭信号
-                               c.C.Danmu_Main_mq.PushLock_tag(`interrupt`, nil)
-                               return false
-                       },
-                       `pm`: func(data any) bool { //私信
-                               if tmp, ok := data.(send.Pm_item); ok {
-                                       if e := send.Send_pm(tmp.Uid, tmp.Msg); e != nil {
-                                               danmulog.Base_add(`私信`).L(`E: `, e)
+               {
+                       var cancelfunc = c.C.Danmu_Main_mq.Pull_tag(msgq.FuncMap{
+                               `change_room`: func(_ any) bool { //房间改变
+                                       c.C.Rev = 0.0     // 营收
+                                       c.C.Renqi = 1     // 人气置1
+                                       c.C.Watched = 0   // 观看人数
+                                       c.C.OnlineNum = 0 // 在线人数
+                                       c.C.GuardNum = 0  // 舰长数
+                                       c.C.Note = ``     // 分区排行
+                                       c.C.Uname = ``    // 主播id
+                                       c.C.Title = ``
+                                       c.C.Wearing_FansMedal = 0
+                                       return false
+                               },
+                               `c.Rev_add`: func(data any) bool { //收入
+                                       c.C.Rev += data.(float64)
+                                       return false
+                               },
+                               `c.Renqi`: func(data any) bool { //人气更新
+                                       if tmp, ok := data.(int); ok {
+                                               c.C.Renqi = tmp
                                        }
-                               }
-                               return false
-                       },
-               })
+                                       return false
+                               },
+                               `gtk_close`: func(_ any) bool { //gtk关闭信号
+                                       c.C.Danmu_Main_mq.PushLock_tag(`interrupt`, nil)
+                                       return false
+                               },
+                               `pm`: func(data any) bool { //私信
+                                       if tmp, ok := data.(send.Pm_item); ok {
+                                               if e := send.Send_pm(tmp.Uid, tmp.Msg); e != nil {
+                                                       danmulog.Base_add(`私信`).L(`E: `, e)
+                                               }
+                                       }
+                                       return false
+                               },
+                       })
+                       defer cancelfunc()
+               }
 
                for exit_sign := true; exit_sign; {
                        if c.C.Roomid == 0 {
                                fmt.Println("回车查看指令")
-                               ctx, cancle := context.WithCancel(context.Background())
+                               ctx, cancel := context.WithCancel(context.Background())
+                               cancel1, c := c.C.Danmu_Main_mq.Pull_tag_chan(`change_room`, 1, ctx)
                                select {
-                               case <-c.C.Danmu_Main_mq.Pull_tag_chan(`change_room`, 1, ctx):
+                               case <-c:
                                case <-interrupt_chan:
                                        exit_sign = false
                                }
-                               cancle()
+                               cancel1()
+                               cancel()
                        } else {
                                fmt.Print("房间号: ", strconv.Itoa(c.C.Roomid), "\n")
                        }
@@ -221,13 +228,14 @@ func Start() {
                                                Msg: F.HelloGen(c.C.Roomid, c.C.Token),
                                        })
                                        waitCheckAuth, cancel := context.WithTimeout(context.Background(), 5*time.Second)
-                                       wsmsg.Pull_tag_only(`rec`, func(wm *ws.WsMsg) (disable bool) {
+                                       doneAuth := wsmsg.Pull_tag_only(`rec`, func(wm *ws.WsMsg) (disable bool) {
                                                if F.HelloChe(wm.Msg) {
                                                        cancel()
                                                }
                                                return true
                                        })
                                        <-waitCheckAuth.Done()
+                                       doneAuth()
                                        if err := waitCheckAuth.Err(); errors.Is(err, context.DeadlineExceeded) {
                                                danmulog.L(`E: `, "连接验证失败")
                                                i += 1
@@ -242,7 +250,8 @@ func Start() {
                                        reply.Gui_show(`房间标题: `+c.C.Title, `0room`)
                                }
 
-                               wsmsg.Pull_tag(map[string]func(*ws.WsMsg) (disable bool){
+                               // 处理ws消息
+                               var cancelDeal = wsmsg.Pull_tag(map[string]func(*ws.WsMsg) (disable bool){
                                        `rec`: func(wm *ws.WsMsg) (disable bool) {
                                                go reply.Reply(wm.Msg)
                                                return false
@@ -286,7 +295,8 @@ func Start() {
 
                                //当前ws
                                {
-                                       c.C.Danmu_Main_mq.Pull_tag(msgq.FuncMap{
+                                       // 处理各种指令
+                                       var cancelfunc = c.C.Danmu_Main_mq.Pull_tag(msgq.FuncMap{
                                                `interrupt`: func(_ any) (disable bool) {
                                                        exitloop = true
                                                        exit_sign = false
@@ -349,9 +359,16 @@ func Start() {
                                                },
                                        })
 
-                                       <-wsmsg.Pull_tag_chan(`exit`, 1, context.Background())
+                                       {
+                                               cancel, c := wsmsg.Pull_tag_chan(`exit`, 1, context.Background())
+                                               <-c
+                                               cancel()
+                                       }
+
+                                       cancelfunc()
                                        time.Sleep(time.Second)
                                }
+                               cancelDeal()
                        }
                        time.Sleep(time.Second)
                }
diff --git a/go.mod b/go.mod
index 9417fc906f2838c9b46270ad66cb034b7561e2e8..69037cb6e1671e8c00ee19be3c06c12039223dc8 100644 (file)
--- a/go.mod
+++ b/go.mod
@@ -5,7 +5,7 @@ go 1.20
 require (
        github.com/gotk3/gotk3 v0.6.2
        github.com/mdp/qrterminal/v3 v3.1.1
-       github.com/qydysky/part v0.28.1-0.20230729035618-c0b38ec57f9e
+       github.com/qydysky/part v0.28.1-0.20230730070413-f9f4c80723b0
        github.com/skip2/go-qrcode v0.0.0-20200617195104-da1b6568686e
        github.com/skratchdot/open-golang v0.0.0-20200116055534-eef842397966
        golang.org/x/text v0.11.0
diff --git a/go.sum b/go.sum
index 1468bcf7493a3dc5c639a35b450661cff789fc50..c687a34ff6fcb889afdbcded9328a3b75a454847 100644 (file)
--- a/go.sum
+++ b/go.sum
@@ -30,8 +30,8 @@ github.com/mdp/qrterminal/v3 v3.1.1/go.mod h1:5lJlXe7Jdr8wlPDdcsJttv1/knsRgzXASy
 github.com/miekg/dns v1.1.55 h1:GoQ4hpsj0nFLYe+bWiCToyrBEJXkQfOOIvFGFy0lEgo=
 github.com/miekg/dns v1.1.55/go.mod h1:uInx36IzPl7FYnDcMeVWxj9byh7DutNykX4G9Sj60FY=
 github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
-github.com/qydysky/part v0.28.1-0.20230729035618-c0b38ec57f9e h1:tsa7lAs8HF5YIndBEMx++NepFRoL1mt12ZvxIKLKXT4=
-github.com/qydysky/part v0.28.1-0.20230729035618-c0b38ec57f9e/go.mod h1:CdkAHZ+OxieG1sI4M6UowP9j0QQDnhtDtN4tWsylCPU=
+github.com/qydysky/part v0.28.1-0.20230730070413-f9f4c80723b0 h1:TnGDvjsK3D6GS2tt8PPGIFvzAyCoV7Jr1aF0U8EzWt0=
+github.com/qydysky/part v0.28.1-0.20230730070413-f9f4c80723b0/go.mod h1:CdkAHZ+OxieG1sI4M6UowP9j0QQDnhtDtN4tWsylCPU=
 github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE=
 github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo=
 github.com/shirou/gopsutil v3.21.11+incompatible h1:+1+c1VGhc88SSonWP6foOcLhvnKlUeu/erjjvaPEYiI=