From: qydysky Date: Fri, 18 Oct 2024 15:21:53 +0000 (+0800) Subject: Fix flv录制断流 X-Git-Tag: v0.14.21~4 X-Git-Url: http://127.0.0.1:8081/?a=commitdiff_plain;h=af89df0d45a78b5a9798a3209c9c9fafdb094f21;p=bili_danmu%2F.git Fix flv录制断流 --- diff --git a/Reply/Msg.go b/Reply/Msg.go index 674353d..9885468 100644 --- a/Reply/Msg.go +++ b/Reply/Msg.go @@ -84,6 +84,7 @@ var Msg_map = map[string]func(replyF, string){ "HOUR_RANK_AWARDS": nil, "ROOM_RANK": nil, "ROOM_SHIELD": nil, + "USER_TOAST_MSG_V2": nil, //大航海购买信息 "USER_TOAST_MSG": replyF.user_toast_msg, //大航海购买信息 "WIN_ACTIVITY": replyF.win_activity, //活动 "SPECIAL_GIFT": nil, //replyF.special_gift, //节奏风暴 diff --git a/Reply/flvDecode.go b/Reply/flvDecode.go index 4d6ae80..8b5f782 100644 --- a/Reply/flvDecode.go +++ b/Reply/flvDecode.go @@ -1,160 +1,207 @@ package reply import ( - "bytes" "errors" + "fmt" + "math" - // "math" - - "github.com/dustin/go-humanize" F "github.com/qydysky/bili_danmu/F" slice "github.com/qydysky/part/slice" ) const ( - flv_header_size = 9 - tag_header_size = 11 - previou_tag_size = 4 - - video_tag = byte(0x09) - audio_tag = byte(0x08) - script_tag = byte(0x12) + flvHeaderSize = 9 + tagHeaderSize = 11 + previouTagSize = 4 + + streamId = byte(0x00) + videoTag = byte(0x09) + audioTag = byte(0x08) + scriptTag = byte(0x12) ) var ( - flv_header_sign = []byte{0x46, 0x4c, 0x56} + flvHeaderSign = []byte{0x46, 0x4c, 0x56} + + ErrInit = errors.New("ErrInit") + ErrNoInit = errors.New("ErrNoInit") + ErrNoFoundFlvHeader = errors.New("ErrNoFoundFlvHeader") + ErrNoFoundTagHeader = errors.New("ErrNoFoundTagHeader") + ErrTagSizeZero = errors.New("ErrTagSizeZero") + ErrStreamId = errors.New("ErrStreamId") + ErrTagSize = errors.New("ErrTagSize") + ErrSignLost = errors.New("ErrSignLost") ) -// this fuction read []byte and return flv header and all complete keyframe if possible. -// complete keyframe means the video and audio tags between two video key frames tag -func Search_stream_tag(buf []byte, keyframe *slice.Buf[byte]) (front_buf []byte, last_available_offset int, err error) { - if len(buf) > humanize.MByte*100 { - err = errors.New("buf too large") +type FlvDecoder struct { + Diff float64 + init bool +} + +func NewFlvDecoder() *FlvDecoder { + return &FlvDecoder{Diff: 100} +} + +func (t *FlvDecoder) InitFlv(buf []byte) (frontBuf []byte, dropOffset int, err error) { + if t.init { + err = ErrInit return } - //get flv header(9byte) + FirstTagSize(4byte) - if header_offset := bytes.Index(buf, flv_header_sign); header_offset != -1 { - if header_offset+flv_header_size+previou_tag_size > len(buf) { - err = errors.New(`no found available tag`) - return - } - front_buf = buf[header_offset : header_offset+flv_header_size+previou_tag_size] - last_available_offset = header_offset + flv_header_size + previou_tag_size + + var sign = 0x00 + var tagNum = 0 + + if len(buf) < flvHeaderSize+previouTagSize { + return } - var ( - sign = 0x00 - keyframe_num = -1 - confirm_num = -1 - tag_num = 0 - ) + if buf[0] != flvHeaderSign[0] || buf[1] != flvHeaderSign[1] || buf[2] != flvHeaderSign[2] { + err = ErrNoFoundFlvHeader + return + } - defer func() { - if sign != 0x07 { - // if sign != 0x00 { - // fmt.Printf("front_buf error:%x\n", sign) - // } - front_buf = front_buf[:0] + if buf[flvHeaderSize]|buf[flvHeaderSize+1]|buf[flvHeaderSize+2]|buf[flvHeaderSize+3] != 0 { + err = ErrTagSize + return + } + + for bufOffset := flvHeaderSize + previouTagSize; bufOffset >= flvHeaderSize && bufOffset+tagHeaderSize < len(buf); { + + if buf[bufOffset]&0b11000000 != 0 && + buf[bufOffset]&0b00011111 != videoTag && + buf[bufOffset]&0b00011111 != audioTag && + buf[bufOffset]&0b00011111 != scriptTag { + err = ErrNoFoundTagHeader + return } - if bufl := keyframe.Size(); confirm_num != bufl { - _ = keyframe.RemoveBack(bufl - confirm_num) + + if buf[bufOffset+8]|buf[bufOffset+9]|buf[bufOffset+10] != streamId { + err = ErrStreamId + return } - }() - for buf_offset := 0; buf_offset+tag_header_size < len(buf); { + tagSize := int(F.Btoi32([]byte{0x00, buf[bufOffset+1], buf[bufOffset+2], buf[bufOffset+3]}, 0)) + if tagSize == 0 { + err = ErrTagSizeZero + return + } - tag_offset := buf_offset + bytes.IndexAny(buf[buf_offset:], string([]byte{video_tag, audio_tag, script_tag})) - if tag_offset == buf_offset-1 { - err = errors.New(`no found available tag`) - // fmt.Printf("last %x\n",buf[tag_offset:tag_offset+tag_header_size]) - return //no found available video,audio,script tag + if bufOffset+tagHeaderSize+tagSize+previouTagSize > len(buf) { + return } - if tag_offset+tag_header_size > len(buf) { - // err = errors.New(`reach end when get tag header`) - // fmt.Printf("last %x\n",buf[tag_offset:tag_offset+tag_header_size]) - return //buf end + + tagSizeCheck := int(F.Btoi32(buf[bufOffset+tagHeaderSize+tagSize:bufOffset+tagHeaderSize+tagSize+previouTagSize], 0)) + if tagNum != 0 && tagSizeCheck != tagSize+tagHeaderSize { + err = ErrTagSize + return } - streamid := int(F.Btoi32([]byte{0x00, buf[tag_offset+8], buf[tag_offset+9], buf[tag_offset+10]}, 0)) - if streamid != 0 { - buf_offset = tag_offset + 1 - last_available_offset = buf_offset - // fmt.Printf("streamid error %x\n",buf[tag_offset:tag_offset+tag_header_size]) - continue //streamid error + tagNum += 1 + + if sign != 0x07 { // ignore first video audio time_stamp + if (buf[bufOffset] == videoTag) && (sign&0x04 == 0x00) { + sign |= 0x04 + } else if (buf[bufOffset] == audioTag) && (sign&0x02 == 0x00) { + sign |= 0x02 + } else if (buf[bufOffset] == scriptTag) && (sign&0x01 == 0x00) { + sign |= 0x01 + } else { + err = ErrSignLost + return + } + bufOffset += tagSizeCheck + previouTagSize + + if sign == 0x07 { + frontBuf = append(frontBuf, buf[0:bufOffset]...) // copy + dropOffset = bufOffset + t.init = true + return + } } + } + return +} + +// this fuction read []byte and return flv header and all complete keyframe if possible. +// complete keyframe means the video and audio tags between two video key frames tag +func (t *FlvDecoder) SearchStreamTag(buf []byte, keyframe *slice.Buf[byte]) (dropOffset int, err error) { + if !t.init { + err = ErrNoInit + return + } + + keyframe.Reset() + + var ( + keyframeOp = -1 + lastAT int + lastVT int + ) - tag_size := int(F.Btoi32([]byte{0x00, buf[tag_offset+1], buf[tag_offset+2], buf[tag_offset+3]}, 0)) - if tag_offset+tag_header_size+tag_size+previou_tag_size > len(buf) { - // err = errors.New(`reach end when get tag body`) - // fmt.Printf("last %x\n",buf[tag_offset:tag_offset+tag_header_size]) - return //buf end + for bufOffset := 0; bufOffset >= 0 && bufOffset+tagHeaderSize < len(buf); { + + if buf[bufOffset]&0b11000000 != 0 && + buf[bufOffset]&0b00011111 != videoTag && + buf[bufOffset]&0b00011111 != audioTag && + buf[bufOffset]&0b00011111 != scriptTag { + err = ErrNoFoundTagHeader + return } - if tag_size == 0 { - buf_offset = tag_offset + 1 - last_available_offset = buf_offset - // fmt.Printf("tag_size error %x\n",buf[tag_offset:tag_offset+tag_header_size]) - // continue //tag_size error - err = errors.New(`[decoder]tag_size error`) + + if buf[bufOffset+8]|buf[bufOffset+9]|buf[bufOffset+10] != streamId { + err = ErrStreamId return } - tag_size_check := int(F.Btoi32(buf[tag_offset+tag_header_size+tag_size:tag_offset+tag_header_size+tag_size+previou_tag_size], 0)) - if tag_num+tag_size_check == 0 { - tag_size_check = tag_size + tag_header_size + tagSize := int(F.Btoi32([]byte{0x00, buf[bufOffset+1], buf[bufOffset+2], buf[bufOffset+3]}, 0)) + if tagSize == 0 { + err = ErrTagSizeZero + return } - if tag_size_check != tag_size+tag_header_size { - buf_offset = tag_offset + 1 - last_available_offset = buf_offset - // fmt.Printf("tag_size_check error %x\n",buf[tag_offset:tag_offset+tag_header_size]) - // continue //tag_size_check error - err = errors.New(`[decoder]tag_size_check error`) + if bufOffset+tagHeaderSize+tagSize+previouTagSize > len(buf) { return } - time_stamp := int(F.Btoi32([]byte{buf[tag_offset+7], buf[tag_offset+4], buf[tag_offset+5], buf[tag_offset+6]}, 0)) - - // show tag header - // fmt.Printf("%x\n", buf[tag_offset:tag_offset+tag_header_size]) - - tag_num += 1 - - if time_stamp == 0 || sign != 0x00 { // ignore first video audio time_stamp - if len(front_buf) != 0 { - //first video audio script tag - if (buf[tag_offset] == video_tag) && (sign&0x04 == 0x00) { - sign |= 0x04 - front_buf = append(front_buf, buf[tag_offset:tag_offset+tag_size_check+previou_tag_size]...) - } else if (buf[tag_offset] == audio_tag) && (sign&0x02 == 0x00) { - sign |= 0x02 - front_buf = append(front_buf, buf[tag_offset:tag_offset+tag_size_check+previou_tag_size]...) - } else if (buf[tag_offset] == script_tag) && (sign&0x01 == 0x00) { - sign |= 0x01 - front_buf = append(front_buf, buf[tag_offset:tag_offset+tag_size_check+previou_tag_size]...) - } - } - buf_offset = tag_offset + tag_size_check + previou_tag_size - last_available_offset = buf_offset - continue + tagSizeCheck := int(F.Btoi32(buf[bufOffset+tagHeaderSize+tagSize:bufOffset+tagHeaderSize+tagSize+previouTagSize], 0)) + if tagSizeCheck != tagSize+tagHeaderSize { + err = ErrTagSize + return } - if buf[tag_offset] == video_tag { - if buf[tag_offset+11]&0xf0 == 0x10 { //key frame - keyframe_num += 1 - confirm_num = keyframe.Size() - last_available_offset = tag_offset + timeStamp := int(F.Btoi32([]byte{buf[bufOffset+7], buf[bufOffset+4], buf[bufOffset+5], buf[bufOffset+6]}, 0)) + switch { + case buf[bufOffset] == videoTag: + lastVT = timeStamp + case buf[bufOffset] == audioTag: + lastAT = timeStamp + default: + } + if lastAT != 0 && lastVT != 0 { + diff := math.Abs(float64(lastVT - lastAT)) + if diff > t.Diff { + err = fmt.Errorf("时间戳不匹配 %v %v (或许应调整flv音视频时间戳容差ms>%f)", lastVT, lastAT, diff) + return } + } - if keyframe_num >= 0 { - _ = keyframe.Append(buf[tag_offset : tag_offset+tag_size_check+previou_tag_size]) - } - } else if buf[tag_offset] == audio_tag { - if keyframe_num >= 0 { - _ = keyframe.Append(buf[tag_offset : tag_offset+tag_size_check+previou_tag_size]) + if buf[bufOffset] == videoTag && buf[bufOffset+11]&0xf0 == 0x10 { //key frame + if keyframeOp >= 0 { + err = keyframe.Append(buf[keyframeOp:bufOffset]) + dropOffset = bufOffset } + keyframeOp = bufOffset } - - buf_offset = tag_offset + tag_size_check + previou_tag_size + bufOffset += tagSizeCheck + previouTagSize } return } + +func (t *FlvDecoder) Parse(buf []byte, keyframe *slice.Buf[byte]) (frontBuf []byte, dropOffset int, err error) { + if !t.init { + frontBuf, dropOffset, err = t.InitFlv(buf) + } else { + dropOffset, err = t.SearchStreamTag(buf, keyframe) + } + return +} diff --git a/Reply/flvDecode_test.go b/Reply/flvDecode_test.go index 3535536..4af7cb4 100644 --- a/Reply/flvDecode_test.go +++ b/Reply/flvDecode_test.go @@ -25,6 +25,7 @@ func Test_FLVdeal(t *testing.T) { buf := make([]byte, humanize.MByte) buff := slice.New[byte](10 * humanize.MByte) max := 0 + flvDecoder := NewFlvDecoder() for c := 0; true; c++ { n, e := f.Read(buf) @@ -37,7 +38,11 @@ func Test_FLVdeal(t *testing.T) { max = s } keyframe := slice.New[byte]() - front_buf, last_available_offset, e := Search_stream_tag(buff.GetPureBuf(), keyframe) + front_buf, last_available_offset, e := flvDecoder.InitFlv(buff.GetPureBuf()) + if e != nil { + t.Fatal(e) + } + last_available_offset, e = flvDecoder.SearchStreamTag(buff.GetPureBuf()[last_available_offset:], keyframe) if e != nil { t.Fatal(e) } diff --git a/Reply/stream.go b/Reply/stream.go index 770a4fb..0dd982d 100644 --- a/Reply/stream.go +++ b/Reply/stream.go @@ -24,6 +24,7 @@ import ( "sync/atomic" "time" + "github.com/dustin/go-humanize" c "github.com/qydysky/bili_danmu/CV" F "github.com/qydysky/bili_danmu/F" @@ -797,17 +798,13 @@ func (t *M4SStream) saveStreamFlv() (e error) { { pipe := pio.NewPipe() var ( - leastReadUnix atomic.Int64 - readTO int64 = 3 - useInterFlvHeader bool = false + leastReadUnix atomic.Int64 + readTO int64 = 10 ) leastReadUnix.Store(time.Now().Unix()) if v, ok := t.common.K_v.LoadV(`flv断流超时s`).(float64); ok && int64(v) > readTO { readTO = int64(v) } - if v, ok := t.common.K_v.LoadV(`flv使用内置头`).(bool); ok && v { - useInterFlvHeader = v - } // read timeout go func() { @@ -843,85 +840,67 @@ func (t *M4SStream) saveStreamFlv() (e error) { defer cancel() var ( - ticker = time.NewTicker(time.Second) - buff = slice.New[byte]() - keyframe = slice.New[byte]() - buf = make([]byte, 1<<16) + buff = slice.New[byte]() + keyframe = slice.New[byte]() + buf = make([]byte, humanize.KByte) + flvDecoder = NewFlvDecoder() + bufSize = humanize.KByte * 1100 ) - defer ticker.Stop() + + if v, ok := c.C.K_v.LoadV(`flv音视频时间戳容差ms`).(float64); ok && v > 100 { + flvDecoder.Diff = v + } for { - n, e := pipe.Read(buf) - _ = buff.Append(buf[:n]) - if e != nil { + if n, e := pipe.Read(buf); e != nil { pctx.PutVal(cancelC, &errCtx, e) break - } - - select { - case <-ticker.C: - default: + } else if e = buff.Append(buf[:n]); e != nil { + pctx.PutVal(cancelC, &errCtx, e) + break + } else if buff.Size() < bufSize { continue } if !buff.IsEmpty() { - keyframe.Reset() + // front_buf buf, unlock := buff.GetPureBufRLock() - front_buf, last_available_offset, e := Search_stream_tag(buf, keyframe) + frontBuf, dropOffset, e := flvDecoder.Parse(buf, keyframe) unlock() if e != nil { - if strings.Contains(e.Error(), `no found available tag`) { - continue - } + t.log.L(`E: `, e) pctx.PutVal(cancelC, &errCtx, errors.New("[decoder]"+e.Error())) - //丢弃所有数据 - buff.Reset() - } - // 存在有效数据 - if len(front_buf) != 0 || keyframe.Size() != 0 { - leastReadUnix.Store(time.Now().Unix()) + break } - if len(front_buf) != 0 && len(t.first_buf) == 0 { - t.first_buf = t.first_buf[:0] - t.first_buf = append(t.first_buf, front_buf...) - // fmt.Println("write front_buf") - // t.Stream_msg.PushLock_tag(`data`, t.first_buf) + + if len(frontBuf) != 0 { + t.first_buf = frontBuf t.msg.Push_tag(`load`, t) } + if keyframe.Size() != 0 { - if len(t.first_buf) == 0 { - if useInterFlvHeader { - switch v.Codec { - case "hevc": - t.log.L(`W: `, `flv未接收到起始段,使用内置头`) - t.first_buf = t.first_buf[:0] - t.first_buf = append(t.first_buf, flvHeaderHevc...) - t.msg.Push_tag(`load`, t) - case "avc": - t.log.L(`W: `, `flv未接收到起始段,使用内置头`) - t.first_buf = t.first_buf[:0] - t.first_buf = append(t.first_buf, flvHeader...) - t.msg.Push_tag(`load`, t) - default: - } - } - if len(t.first_buf) == 0 { - t.log.L(`W: `, `flv未接收到起始段`) - pctx.PutVal(cancelC, &errCtx, errors.New(`flv未接收到起始段`)) - break - } - } + // 存在有效数据 + leastReadUnix.Store(time.Now().Unix()) + buf, unlock := keyframe.GetPureBufRLock() t.bootBufPush(buf) t.Stream_msg.PushLock_tag(`data`, buf) unlock() + keyframe.Reset() t.frameCount += 1 t.msg.Push_tag(`keyFrame`, t) + } else { + bufSize += humanize.KByte * 50 + if bufSize > humanize.MByte*10 { + t.log.L(`E: `, `缓冲池过大`) + pctx.PutVal(cancelC, &errCtx, errors.New("缓冲池过大")) + break + } } - if last_available_offset > 1 { - // fmt.Println("write Sync") - _ = buff.RemoveFront(last_available_offset - 1) + + if dropOffset > 0 { + _ = buff.RemoveFront(dropOffset) } } } diff --git a/demo/config/config_K_v.json b/demo/config/config_K_v.json index 68f7aac..bcf35e8 100644 --- a/demo/config/config_K_v.json +++ b/demo/config/config_K_v.json @@ -76,9 +76,10 @@ "直播流停用服务器-help": "正则字符串数组", "直播流停用服务器": [], "直播流接收n帧才保存": 3, - "flv断流超时s": 5, + "flv断流超时s": 10, "flv断流续接": true, - "flv使用内置头": false, + "flv音视频时间戳容差ms-help": "默认100,小于默认无效,调大可以允许较差的流,但可能会音画不同步", + "flv音视频时间戳容差ms": 100, "fmp4切片下载超时s": 3, "fmp4获取更多服务器": true, "fmp4跳过解码出错的帧-help": "fmp4跳过解码出错的帧、但可能导致关键帧时间上的跳越",