]> 127.0.0.1 Git - bili_danmu/.git/commitdiff
Improve flv解码错误退出
authorqydysky <qydysky@foxmail.com>
Thu, 9 Nov 2023 18:30:09 +0000 (02:30 +0800)
committerqydysky <qydysky@foxmail.com>
Thu, 9 Nov 2023 18:30:09 +0000 (02:30 +0800)
Reply/F.go
Reply/flvDecode.go
Reply/stream.go
go.mod
go.sum

index bfe072c9ac2341b9bb1d987afa0fbd8f9776a2a9..4bedb3abf6b58db09456d4d429bbbd38ac55752e 100644 (file)
@@ -294,7 +294,7 @@ func StreamOCommon(roomid int) (array []*c.Common) {
 // 获取实例的录制状态
 func StreamOStatus(roomid int) (Islive bool) {
        v, ok := streamO.Load(roomid)
-       return ok && (v.(*M4SStream).Status.Islive() || v.(*M4SStream).exitSign.Islive())
+       return ok && (!pctx.Done(v.(*M4SStream).Status) || v.(*M4SStream).exitSign.Islive())
 }
 
 // 开始实例
@@ -337,7 +337,7 @@ func StreamOStop(roomid int) {
                        if c.C.Roomid == _roomid {
                                return true
                        }
-                       if v.(*M4SStream).Status.Islive() {
+                       if !pctx.Done(v.(*M4SStream).Status) {
                                v.(*M4SStream).Stop()
                        }
                        streamO.Delete(_roomid)
@@ -345,7 +345,7 @@ func StreamOStop(roomid int) {
                })
        case -1: // 所有房间
                streamO.Range(func(k, v interface{}) bool {
-                       if v.(*M4SStream).Status.Islive() {
+                       if !pctx.Done(v.(*M4SStream).Status) {
                                v.(*M4SStream).Stop()
                        }
                        streamO.Delete(k)
@@ -353,7 +353,7 @@ func StreamOStop(roomid int) {
                })
        default: // 针对某房间
                if v, ok := streamO.Load(roomid); ok {
-                       if v.(*M4SStream).Status.Islive() {
+                       if !pctx.Done(v.(*M4SStream).Status) {
                                v.(*M4SStream).Stop()
                        }
                        streamO.Delete(roomid)
@@ -364,7 +364,7 @@ func StreamOStop(roomid int) {
 // 实例切断
 func StreamOCut(roomid int, title ...string) {
        if v, ok := streamO.Load(roomid); ok {
-               if v.(*M4SStream).Status.Islive() {
+               if !pctx.Done(v.(*M4SStream).Status) {
                        if len(title) != 0 {
                                v.(*M4SStream).common.Title = title[0]
                        }
@@ -1505,7 +1505,7 @@ func init() {
                        })
 
                        // 未准备好
-                       if currentStreamO == nil || !currentStreamO.Status.Islive() {
+                       if currentStreamO == nil || pctx.Done(currentStreamO.Status) {
                                w.Header().Set("Retry-After", "1")
                                w.WriteHeader(http.StatusNotFound)
                                return
index 9912c2dfc675babba498ff9c5441cd07a7a77d78..68757c9f68614b687ab174ef22c95eec3bedd420 100644 (file)
@@ -93,7 +93,9 @@ func Search_stream_tag(buf []byte, keyframe *slice.Buf[byte]) (front_buf []byte,
                        buf_offset = tag_offset + 1
                        last_available_offset = buf_offset
                        // fmt.Printf("tag_size error %x\n",buf[tag_offset:tag_offset+tag_header_size])
-                       continue //tag_size error
+                       // continue //tag_size error
+                       err = errors.New(`[decoder]tag_size error`)
+                       return
                }
 
                tag_size_check := int(F.Btoi32(buf[tag_offset+tag_header_size+tag_size:tag_offset+tag_header_size+tag_size+previou_tag_size], 0))
@@ -104,7 +106,9 @@ func Search_stream_tag(buf []byte, keyframe *slice.Buf[byte]) (front_buf []byte,
                        buf_offset = tag_offset + 1
                        last_available_offset = buf_offset
                        // fmt.Printf("tag_size_check error %x\n",buf[tag_offset:tag_offset+tag_header_size])
-                       continue //tag_size_check error
+                       // continue //tag_size_check error
+                       err = errors.New(`[decoder]tag_size_check error`)
+                       return
                }
 
                time_stamp := int(F.Btoi32([]byte{buf[tag_offset+7], buf[tag_offset+4], buf[tag_offset+5], buf[tag_offset+6]}, 0))
index 9166d99b459e78751be44baaebc970a9325186a3..06483ca5b14770d4c8a6980e4421ddbfff13deea 100644 (file)
@@ -40,7 +40,7 @@ import (
 )
 
 type M4SStream struct {
-       Status               *signal.Signal        //IsLive()是否运行中
+       Status               context.Context       //IsLive()是否运行中
        exitSign             *signal.Signal        //IsLive()是否等待退出中
        log                  *log.Log_interface    //日志
        config               M4SStream_Config      //配置
@@ -665,7 +665,7 @@ func (t *M4SStream) saveStreamFlv() (e error) {
                }
 
                //结束退出
-               if !t.Status.Islive() {
+               if pctx.Done(t.Status) {
                        return
                }
 
@@ -694,21 +694,10 @@ func (t *M4SStream) saveStreamFlv() (e error) {
                }
 
                // flv获取
-               cancelC, cancel := context.WithCancel(context.Background())
+               cancelC, cancel := context.WithCancel(t.Status)
+               errCtx := pctx.Value[error]{}
+               cancelC = errCtx.LinkCtx(cancelC)
                {
-                       go func() {
-                               tsc, tscf := t.Status.WaitC()
-                               defer tscf()
-
-                               select {
-                               //停止录制
-                               case <-tsc:
-                                       cancel()
-                               //当前连接终止
-                               case <-cancelC.Done():
-                               }
-                       }()
-
                        pipe := pio.NewPipe()
                        var (
                                leastReadUnix atomic.Int64
@@ -721,6 +710,8 @@ func (t *M4SStream) saveStreamFlv() (e error) {
 
                        // read timeout
                        go func() {
+                               defer cancel()
+
                                timer := time.NewTicker(time.Duration(readTO * int64(time.Second)))
                                defer timer.Stop()
 
@@ -731,15 +722,14 @@ func (t *M4SStream) saveStreamFlv() (e error) {
                                        case curT := <-timer.C:
                                                if curT.Unix()-leastReadUnix.Load() > readTO {
                                                        t.log.L(`W: `, fmt.Sprintf("%vs未接收到有效数据", readTO))
+                                                       pctx.PutVal(cancelC, &errCtx, fmt.Errorf("%vs未接收到有效数据", readTO))
                                                        // 时间段内未接收到任何数据
-                                                       cancel()
                                                        return
                                                }
                                                if v, ok := c.C.K_v.LoadV(`直播流清晰度`).(float64); ok {
                                                        if t.config.want_qn != int(v) {
                                                                t.log.L(`I: `, "直播流清晰度改变:", t.common.Qn[t.config.want_qn], "=>", t.common.Qn[int(v)])
                                                                t.config.want_qn = int(v)
-                                                               cancel()
                                                                return
                                                        }
                                                }
@@ -749,28 +739,27 @@ func (t *M4SStream) saveStreamFlv() (e error) {
 
                        // read
                        go func() {
+                               defer cancel()
+
                                var (
                                        ticker   = time.NewTicker(time.Second)
                                        buff     = slice.New[byte]()
                                        keyframe = slice.New[byte]()
                                        buf      = make([]byte, 1<<16)
                                )
+                               defer ticker.Stop()
 
                                for {
                                        n, e := pipe.Read(buf)
                                        _ = buff.Append(buf[:n])
                                        if e != nil {
-                                               cancel()
+                                               pctx.PutVal(cancelC, &errCtx, e)
                                                break
                                        }
 
-                                       skip := true
                                        select {
                                        case <-ticker.C:
-                                               skip = false
                                        default:
-                                       }
-                                       if skip {
                                                continue
                                        }
 
@@ -781,6 +770,7 @@ func (t *M4SStream) saveStreamFlv() (e error) {
                                                        if strings.Contains(e.Error(), `no found available tag`) {
                                                                continue
                                                        }
+                                                       pctx.PutVal(cancelC, &errCtx, errors.New("[decoder]"+e.Error()))
                                                        //丢弃所有数据
                                                        buff.Reset()
                                                }
@@ -798,7 +788,7 @@ func (t *M4SStream) saveStreamFlv() (e error) {
                                                if keyframe.Size() != 0 {
                                                        if len(t.first_buf) == 0 {
                                                                t.log.L(`W: `, `flv未接收到起始段`)
-                                                               cancel()
+                                                               pctx.PutVal(cancelC, &errCtx, errors.New(`flv未接收到起始段`))
                                                                break
                                                        }
                                                        t.bootBufPush(keyframe.GetPureBuf())
@@ -813,11 +803,8 @@ func (t *M4SStream) saveStreamFlv() (e error) {
                                                }
                                        }
                                }
-
                                buf = nil
                                buff.Reset()
-
-                               ticker.Stop()
                        }()
 
                        t.log.L(`I: `, `flv下载开始`)
@@ -846,12 +833,16 @@ func (t *M4SStream) saveStreamFlv() (e error) {
                        if err := r.Wait(); err != nil && !errors.Is(err, io.EOF) {
                                if reqf.IsCancel(err) {
                                        t.log.L(`I: `, `flv下载停止`)
+                                       return
                                } else if err != nil && !reqf.IsTimeout(err) {
                                        e = err
                                        t.log.L(`E: `, `flv下载失败:`, err)
                                }
+                       } else if err := errCtx.Get(); err != nil && strings.HasPrefix(err.Error(), "[decoder]") {
+                               e = err
                        }
                }
+
                cancel()
 
                if v1, ok := t.common.K_v.LoadV(`flv断流续接`).(bool); ok && !v1 {
@@ -1087,7 +1078,7 @@ func (t *M4SStream) saveStreamM4s() (e error) {
                }
 
                // 停止录制
-               if !t.Status.Islive() {
+               if pctx.Done(t.Status) {
                        if len(download_seq) != 0 {
                                if time.Now().Unix() > t.stream_last_modified.Unix()+300 {
                                        e = errors.New("切片下载超时")
@@ -1168,7 +1159,7 @@ func (t *M4SStream) Start() bool {
        }
 
        // 状态检测与设置
-       if t.Status.Islive() {
+       if !pctx.Done(t.Status) {
                t.log.L(`T: `, `已存在实例`)
                return false
        }
@@ -1190,14 +1181,12 @@ func (t *M4SStream) Start() bool {
                }
        }
 
-       t.Status = signal.Init()
        go func() {
                t.log.L(`I: `, `初始化录制(`+strconv.Itoa(t.common.Roomid)+`)`)
 
                defer t.log.L(`I: `, `结束录制(`+strconv.Itoa(t.common.Roomid)+`)`)
                defer func() {
                        // use anonymous func avoid data race and unexpect sign wait
-                       t.Status.Done()
                        t.exitSign.Done()
                }()
 
@@ -1209,8 +1198,8 @@ func (t *M4SStream) Start() bool {
 
                // 设置事件
                // 当录制停止时,取消全部录制
-               mainCtx, mainCancel := context.WithCancel(context.Background())
-               mainCtx, done := pctx.WithWait(mainCtx, 0, time.Minute)
+               t.Status = pctx.CarryCancel(context.WithCancel(context.Background()))
+               mainCtx, done := pctx.WithWait(t.Status, 0, time.Minute)
                defer func() {
                        switch done() {
                        case pctx.ErrWaitTo:
@@ -1219,6 +1208,7 @@ func (t *M4SStream) Start() bool {
                                fallthrough
                        default:
                        }
+                       _ = pctx.CallCancel(t.Status)
                }()
 
                if t.Callback_stopRec != nil {
@@ -1232,7 +1222,7 @@ func (t *M4SStream) Start() bool {
                        if ms.Callback_stop != nil {
                                ms.Callback_stop(ms)
                        }
-                       mainCancel()
+                       _ = pctx.CallCancel(t.Status)
                        t.msg.ClearAll()
                        return true
                })
@@ -1351,7 +1341,7 @@ func (t *M4SStream) Start() bool {
                }
 
                // 主循环
-               for t.Status.Islive() {
+               for !pctx.Done(t.Status) {
                        // 是否在直播
                        F.Get(t.common).Get(`Liveing`)
                        if !t.common.Liveing {
@@ -1379,13 +1369,13 @@ func (t *M4SStream) Start() bool {
 }
 
 func (t *M4SStream) Stop() {
-       if !t.Status.Islive() {
+       if pctx.Done(t.Status) {
                t.log.L(`I: `, `正在等待下载完成...`)
                t.exitSign.Wait()
                return
        }
        t.exitSign = signal.Init()
-       t.Status.Done()
+       _ = pctx.CallCancel(t.Status)
        t.log.L(`I: `, `正在等待下载完成...`)
        t.exitSign.Wait()
        t.log.L(`I: `, `结束`)
diff --git a/go.mod b/go.mod
index 7070e1a8d7997371b440b8a602713b0066e6558e..2194820b79f4fd61d72a42350c2c69f79e8885e0 100644 (file)
--- a/go.mod
+++ b/go.mod
@@ -5,7 +5,7 @@ go 1.21
 require (
        github.com/gotk3/gotk3 v0.6.2
        github.com/mdp/qrterminal/v3 v3.1.1
-       github.com/qydysky/part v0.28.1-0.20231101183328-e06a1ded4858
+       github.com/qydysky/part v0.28.1-0.20231109160627-cb7ab257995b
        github.com/skip2/go-qrcode v0.0.0-20200617195104-da1b6568686e
        github.com/skratchdot/open-golang v0.0.0-20200116055534-eef842397966
        golang.org/x/text v0.13.0
diff --git a/go.sum b/go.sum
index f3f22711f5b290979c6ea1aaf0cdd636f7d2fee4..c95cf71776df8c963a7a20b040a500dc995c3e2a 100644 (file)
--- a/go.sum
+++ b/go.sum
@@ -35,8 +35,8 @@ github.com/miekg/dns v1.1.56 h1:5imZaSeoRNvpM9SzWNhEcP9QliKiz20/dA2QabIGVnE=
 github.com/miekg/dns v1.1.56/go.mod h1:cRm6Oo2C8TY9ZS/TqsSrseAcncm74lfK5G+ikN2SWWY=
 github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
 github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
-github.com/qydysky/part v0.28.1-0.20231101183328-e06a1ded4858 h1:QgidAvZ6HBuwGO4QrP6kCHe+Ymd4vScOgwtMU9PUqVU=
-github.com/qydysky/part v0.28.1-0.20231101183328-e06a1ded4858/go.mod h1:twb1IuSmUJ3hllGLwWTBjXRkHjsgmiYi3B9H2ENgIf0=
+github.com/qydysky/part v0.28.1-0.20231109160627-cb7ab257995b h1:nKmP2PJdgpFBVWl9O92zKNS9k7MgBGXUGH0jdXpd7Xo=
+github.com/qydysky/part v0.28.1-0.20231109160627-cb7ab257995b/go.mod h1:twb1IuSmUJ3hllGLwWTBjXRkHjsgmiYi3B9H2ENgIf0=
 github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE=
 github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo=
 github.com/shirou/gopsutil v3.21.11+incompatible h1:+1+c1VGhc88SSonWP6foOcLhvnKlUeu/erjjvaPEYiI=