From b1f4c583458724201e9bb354a5d6d19e51e5652c Mon Sep 17 00:00:00 2001 From: qydysky Date: Fri, 10 Nov 2023 02:30:09 +0800 Subject: [PATCH] =?utf8?q?Improve=20flv=E8=A7=A3=E7=A0=81=E9=94=99?= =?utf8?q?=E8=AF=AF=E9=80=80=E5=87=BA?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit --- Reply/F.go | 12 ++++----- Reply/flvDecode.go | 8 ++++-- Reply/stream.go | 64 +++++++++++++++++++--------------------------- go.mod | 2 +- go.sum | 4 +-- 5 files changed, 42 insertions(+), 48 deletions(-) diff --git a/Reply/F.go b/Reply/F.go index bfe072c..4bedb3a 100644 --- a/Reply/F.go +++ b/Reply/F.go @@ -294,7 +294,7 @@ func StreamOCommon(roomid int) (array []*c.Common) { // 获取实例的录制状态 func StreamOStatus(roomid int) (Islive bool) { v, ok := streamO.Load(roomid) - return ok && (v.(*M4SStream).Status.Islive() || v.(*M4SStream).exitSign.Islive()) + return ok && (!pctx.Done(v.(*M4SStream).Status) || v.(*M4SStream).exitSign.Islive()) } // 开始实例 @@ -337,7 +337,7 @@ func StreamOStop(roomid int) { if c.C.Roomid == _roomid { return true } - if v.(*M4SStream).Status.Islive() { + if !pctx.Done(v.(*M4SStream).Status) { v.(*M4SStream).Stop() } streamO.Delete(_roomid) @@ -345,7 +345,7 @@ func StreamOStop(roomid int) { }) case -1: // 所有房间 streamO.Range(func(k, v interface{}) bool { - if v.(*M4SStream).Status.Islive() { + if !pctx.Done(v.(*M4SStream).Status) { v.(*M4SStream).Stop() } streamO.Delete(k) @@ -353,7 +353,7 @@ func StreamOStop(roomid int) { }) default: // 针对某房间 if v, ok := streamO.Load(roomid); ok { - if v.(*M4SStream).Status.Islive() { + if !pctx.Done(v.(*M4SStream).Status) { v.(*M4SStream).Stop() } streamO.Delete(roomid) @@ -364,7 +364,7 @@ func StreamOStop(roomid int) { // 实例切断 func StreamOCut(roomid int, title ...string) { if v, ok := streamO.Load(roomid); ok { - if v.(*M4SStream).Status.Islive() { + if !pctx.Done(v.(*M4SStream).Status) { if len(title) != 0 { v.(*M4SStream).common.Title = title[0] } @@ -1505,7 +1505,7 @@ func init() { }) // 未准备好 - if currentStreamO == nil || !currentStreamO.Status.Islive() { + if currentStreamO == nil || pctx.Done(currentStreamO.Status) { w.Header().Set("Retry-After", "1") w.WriteHeader(http.StatusNotFound) return diff --git a/Reply/flvDecode.go b/Reply/flvDecode.go index 9912c2d..68757c9 100644 --- a/Reply/flvDecode.go +++ b/Reply/flvDecode.go @@ -93,7 +93,9 @@ func Search_stream_tag(buf []byte, keyframe *slice.Buf[byte]) (front_buf []byte, buf_offset = tag_offset + 1 last_available_offset = buf_offset // fmt.Printf("tag_size error %x\n",buf[tag_offset:tag_offset+tag_header_size]) - continue //tag_size error + // continue //tag_size error + err = errors.New(`[decoder]tag_size error`) + return } tag_size_check := int(F.Btoi32(buf[tag_offset+tag_header_size+tag_size:tag_offset+tag_header_size+tag_size+previou_tag_size], 0)) @@ -104,7 +106,9 @@ func Search_stream_tag(buf []byte, keyframe *slice.Buf[byte]) (front_buf []byte, buf_offset = tag_offset + 1 last_available_offset = buf_offset // fmt.Printf("tag_size_check error %x\n",buf[tag_offset:tag_offset+tag_header_size]) - continue //tag_size_check error + // continue //tag_size_check error + err = errors.New(`[decoder]tag_size_check error`) + return } time_stamp := int(F.Btoi32([]byte{buf[tag_offset+7], buf[tag_offset+4], buf[tag_offset+5], buf[tag_offset+6]}, 0)) diff --git a/Reply/stream.go b/Reply/stream.go index 9166d99..06483ca 100644 --- a/Reply/stream.go +++ b/Reply/stream.go @@ -40,7 +40,7 @@ import ( ) type M4SStream struct { - Status *signal.Signal //IsLive()是否运行中 + Status context.Context //IsLive()是否运行中 exitSign *signal.Signal //IsLive()是否等待退出中 log *log.Log_interface //日志 config M4SStream_Config //配置 @@ -665,7 +665,7 @@ func (t *M4SStream) saveStreamFlv() (e error) { } //结束退出 - if !t.Status.Islive() { + if pctx.Done(t.Status) { return } @@ -694,21 +694,10 @@ func (t *M4SStream) saveStreamFlv() (e error) { } // flv获取 - cancelC, cancel := context.WithCancel(context.Background()) + cancelC, cancel := context.WithCancel(t.Status) + errCtx := pctx.Value[error]{} + cancelC = errCtx.LinkCtx(cancelC) { - go func() { - tsc, tscf := t.Status.WaitC() - defer tscf() - - select { - //停止录制 - case <-tsc: - cancel() - //当前连接终止 - case <-cancelC.Done(): - } - }() - pipe := pio.NewPipe() var ( leastReadUnix atomic.Int64 @@ -721,6 +710,8 @@ func (t *M4SStream) saveStreamFlv() (e error) { // read timeout go func() { + defer cancel() + timer := time.NewTicker(time.Duration(readTO * int64(time.Second))) defer timer.Stop() @@ -731,15 +722,14 @@ func (t *M4SStream) saveStreamFlv() (e error) { case curT := <-timer.C: if curT.Unix()-leastReadUnix.Load() > readTO { t.log.L(`W: `, fmt.Sprintf("%vs未接收到有效数据", readTO)) + pctx.PutVal(cancelC, &errCtx, fmt.Errorf("%vs未接收到有效数据", readTO)) // 时间段内未接收到任何数据 - cancel() return } if v, ok := c.C.K_v.LoadV(`直播流清晰度`).(float64); ok { if t.config.want_qn != int(v) { t.log.L(`I: `, "直播流清晰度改变:", t.common.Qn[t.config.want_qn], "=>", t.common.Qn[int(v)]) t.config.want_qn = int(v) - cancel() return } } @@ -749,28 +739,27 @@ func (t *M4SStream) saveStreamFlv() (e error) { // read go func() { + defer cancel() + var ( ticker = time.NewTicker(time.Second) buff = slice.New[byte]() keyframe = slice.New[byte]() buf = make([]byte, 1<<16) ) + defer ticker.Stop() for { n, e := pipe.Read(buf) _ = buff.Append(buf[:n]) if e != nil { - cancel() + pctx.PutVal(cancelC, &errCtx, e) break } - skip := true select { case <-ticker.C: - skip = false default: - } - if skip { continue } @@ -781,6 +770,7 @@ func (t *M4SStream) saveStreamFlv() (e error) { if strings.Contains(e.Error(), `no found available tag`) { continue } + pctx.PutVal(cancelC, &errCtx, errors.New("[decoder]"+e.Error())) //丢弃所有数据 buff.Reset() } @@ -798,7 +788,7 @@ func (t *M4SStream) saveStreamFlv() (e error) { if keyframe.Size() != 0 { if len(t.first_buf) == 0 { t.log.L(`W: `, `flv未接收到起始段`) - cancel() + pctx.PutVal(cancelC, &errCtx, errors.New(`flv未接收到起始段`)) break } t.bootBufPush(keyframe.GetPureBuf()) @@ -813,11 +803,8 @@ func (t *M4SStream) saveStreamFlv() (e error) { } } } - buf = nil buff.Reset() - - ticker.Stop() }() t.log.L(`I: `, `flv下载开始`) @@ -846,12 +833,16 @@ func (t *M4SStream) saveStreamFlv() (e error) { if err := r.Wait(); err != nil && !errors.Is(err, io.EOF) { if reqf.IsCancel(err) { t.log.L(`I: `, `flv下载停止`) + return } else if err != nil && !reqf.IsTimeout(err) { e = err t.log.L(`E: `, `flv下载失败:`, err) } + } else if err := errCtx.Get(); err != nil && strings.HasPrefix(err.Error(), "[decoder]") { + e = err } } + cancel() if v1, ok := t.common.K_v.LoadV(`flv断流续接`).(bool); ok && !v1 { @@ -1087,7 +1078,7 @@ func (t *M4SStream) saveStreamM4s() (e error) { } // 停止录制 - if !t.Status.Islive() { + if pctx.Done(t.Status) { if len(download_seq) != 0 { if time.Now().Unix() > t.stream_last_modified.Unix()+300 { e = errors.New("切片下载超时") @@ -1168,7 +1159,7 @@ func (t *M4SStream) Start() bool { } // 状态检测与设置 - if t.Status.Islive() { + if !pctx.Done(t.Status) { t.log.L(`T: `, `已存在实例`) return false } @@ -1190,14 +1181,12 @@ func (t *M4SStream) Start() bool { } } - t.Status = signal.Init() go func() { t.log.L(`I: `, `初始化录制(`+strconv.Itoa(t.common.Roomid)+`)`) defer t.log.L(`I: `, `结束录制(`+strconv.Itoa(t.common.Roomid)+`)`) defer func() { // use anonymous func avoid data race and unexpect sign wait - t.Status.Done() t.exitSign.Done() }() @@ -1209,8 +1198,8 @@ func (t *M4SStream) Start() bool { // 设置事件 // 当录制停止时,取消全部录制 - mainCtx, mainCancel := context.WithCancel(context.Background()) - mainCtx, done := pctx.WithWait(mainCtx, 0, time.Minute) + t.Status = pctx.CarryCancel(context.WithCancel(context.Background())) + mainCtx, done := pctx.WithWait(t.Status, 0, time.Minute) defer func() { switch done() { case pctx.ErrWaitTo: @@ -1219,6 +1208,7 @@ func (t *M4SStream) Start() bool { fallthrough default: } + _ = pctx.CallCancel(t.Status) }() if t.Callback_stopRec != nil { @@ -1232,7 +1222,7 @@ func (t *M4SStream) Start() bool { if ms.Callback_stop != nil { ms.Callback_stop(ms) } - mainCancel() + _ = pctx.CallCancel(t.Status) t.msg.ClearAll() return true }) @@ -1351,7 +1341,7 @@ func (t *M4SStream) Start() bool { } // 主循环 - for t.Status.Islive() { + for !pctx.Done(t.Status) { // 是否在直播 F.Get(t.common).Get(`Liveing`) if !t.common.Liveing { @@ -1379,13 +1369,13 @@ func (t *M4SStream) Start() bool { } func (t *M4SStream) Stop() { - if !t.Status.Islive() { + if pctx.Done(t.Status) { t.log.L(`I: `, `正在等待下载完成...`) t.exitSign.Wait() return } t.exitSign = signal.Init() - t.Status.Done() + _ = pctx.CallCancel(t.Status) t.log.L(`I: `, `正在等待下载完成...`) t.exitSign.Wait() t.log.L(`I: `, `结束`) diff --git a/go.mod b/go.mod index 7070e1a..2194820 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.21 require ( github.com/gotk3/gotk3 v0.6.2 github.com/mdp/qrterminal/v3 v3.1.1 - github.com/qydysky/part v0.28.1-0.20231101183328-e06a1ded4858 + github.com/qydysky/part v0.28.1-0.20231109160627-cb7ab257995b github.com/skip2/go-qrcode v0.0.0-20200617195104-da1b6568686e github.com/skratchdot/open-golang v0.0.0-20200116055534-eef842397966 golang.org/x/text v0.13.0 diff --git a/go.sum b/go.sum index f3f2271..c95cf71 100644 --- a/go.sum +++ b/go.sum @@ -35,8 +35,8 @@ github.com/miekg/dns v1.1.56 h1:5imZaSeoRNvpM9SzWNhEcP9QliKiz20/dA2QabIGVnE= github.com/miekg/dns v1.1.56/go.mod h1:cRm6Oo2C8TY9ZS/TqsSrseAcncm74lfK5G+ikN2SWWY= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/qydysky/part v0.28.1-0.20231101183328-e06a1ded4858 h1:QgidAvZ6HBuwGO4QrP6kCHe+Ymd4vScOgwtMU9PUqVU= -github.com/qydysky/part v0.28.1-0.20231101183328-e06a1ded4858/go.mod h1:twb1IuSmUJ3hllGLwWTBjXRkHjsgmiYi3B9H2ENgIf0= +github.com/qydysky/part v0.28.1-0.20231109160627-cb7ab257995b h1:nKmP2PJdgpFBVWl9O92zKNS9k7MgBGXUGH0jdXpd7Xo= +github.com/qydysky/part v0.28.1-0.20231109160627-cb7ab257995b/go.mod h1:twb1IuSmUJ3hllGLwWTBjXRkHjsgmiYi3B9H2ENgIf0= github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE= github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= github.com/shirou/gopsutil v3.21.11+incompatible h1:+1+c1VGhc88SSonWP6foOcLhvnKlUeu/erjjvaPEYiI= -- 2.39.2