]> 127.0.0.1 Git - bili_danmu/.git/commitdiff
Fix flv录制断流
authorqydysky <qydysky@foxmail.com>
Fri, 18 Oct 2024 15:21:53 +0000 (23:21 +0800)
committerqydysky <qydysky@foxmail.com>
Fri, 18 Oct 2024 15:21:53 +0000 (23:21 +0800)
Reply/Msg.go
Reply/flvDecode.go
Reply/flvDecode_test.go
Reply/stream.go
demo/config/config_K_v.json

index 674353d551576525962f8b17c344ceb744790fcf..98854684dc8148c2596371db730f395dc5cdfdf1 100644 (file)
@@ -84,6 +84,7 @@ var Msg_map = map[string]func(replyF, string){
        "HOUR_RANK_AWARDS":                  nil,
        "ROOM_RANK":                         nil,
        "ROOM_SHIELD":                       nil,
+       "USER_TOAST_MSG_V2":                 nil,                       //大航海购买信息
        "USER_TOAST_MSG":                    replyF.user_toast_msg,     //大航海购买信息
        "WIN_ACTIVITY":                      replyF.win_activity,       //活动
        "SPECIAL_GIFT":                      nil,                       //replyF.special_gift,       //节奏风暴
index 4d6ae80c865d3efadd955c16c3f368dc1f2ad87f..8b5f7829baa71ddf2b08243b3060f259a353cba0 100644 (file)
 package reply
 
 import (
-       "bytes"
        "errors"
+       "fmt"
+       "math"
 
-       // "math"
-
-       "github.com/dustin/go-humanize"
        F "github.com/qydysky/bili_danmu/F"
        slice "github.com/qydysky/part/slice"
 )
 
 const (
-       flv_header_size  = 9
-       tag_header_size  = 11
-       previou_tag_size = 4
-
-       video_tag  = byte(0x09)
-       audio_tag  = byte(0x08)
-       script_tag = byte(0x12)
+       flvHeaderSize  = 9
+       tagHeaderSize  = 11
+       previouTagSize = 4
+
+       streamId  = byte(0x00)
+       videoTag  = byte(0x09)
+       audioTag  = byte(0x08)
+       scriptTag = byte(0x12)
 )
 
 var (
-       flv_header_sign = []byte{0x46, 0x4c, 0x56}
+       flvHeaderSign = []byte{0x46, 0x4c, 0x56}
+
+       ErrInit             = errors.New("ErrInit")
+       ErrNoInit           = errors.New("ErrNoInit")
+       ErrNoFoundFlvHeader = errors.New("ErrNoFoundFlvHeader")
+       ErrNoFoundTagHeader = errors.New("ErrNoFoundTagHeader")
+       ErrTagSizeZero      = errors.New("ErrTagSizeZero")
+       ErrStreamId         = errors.New("ErrStreamId")
+       ErrTagSize          = errors.New("ErrTagSize")
+       ErrSignLost         = errors.New("ErrSignLost")
 )
 
-// this fuction read []byte and return flv header and all complete keyframe if possible.
-// complete keyframe means the video and audio tags between two video key frames tag
-func Search_stream_tag(buf []byte, keyframe *slice.Buf[byte]) (front_buf []byte, last_available_offset int, err error) {
-       if len(buf) > humanize.MByte*100 {
-               err = errors.New("buf too large")
+type FlvDecoder struct {
+       Diff float64
+       init bool
+}
+
+func NewFlvDecoder() *FlvDecoder {
+       return &FlvDecoder{Diff: 100}
+}
+
+func (t *FlvDecoder) InitFlv(buf []byte) (frontBuf []byte, dropOffset int, err error) {
+       if t.init {
+               err = ErrInit
                return
        }
-       //get flv header(9byte) + FirstTagSize(4byte)
-       if header_offset := bytes.Index(buf, flv_header_sign); header_offset != -1 {
-               if header_offset+flv_header_size+previou_tag_size > len(buf) {
-                       err = errors.New(`no found available tag`)
-                       return
-               }
-               front_buf = buf[header_offset : header_offset+flv_header_size+previou_tag_size]
-               last_available_offset = header_offset + flv_header_size + previou_tag_size
+
+       var sign = 0x00
+       var tagNum = 0
+
+       if len(buf) < flvHeaderSize+previouTagSize {
+               return
        }
 
-       var (
-               sign         = 0x00
-               keyframe_num = -1
-               confirm_num  = -1
-               tag_num      = 0
-       )
+       if buf[0] != flvHeaderSign[0] || buf[1] != flvHeaderSign[1] || buf[2] != flvHeaderSign[2] {
+               err = ErrNoFoundFlvHeader
+               return
+       }
 
-       defer func() {
-               if sign != 0x07 {
-                       // if sign != 0x00 {
-                       // fmt.Printf("front_buf error:%x\n", sign)
-                       // }
-                       front_buf = front_buf[:0]
+       if buf[flvHeaderSize]|buf[flvHeaderSize+1]|buf[flvHeaderSize+2]|buf[flvHeaderSize+3] != 0 {
+               err = ErrTagSize
+               return
+       }
+
+       for bufOffset := flvHeaderSize + previouTagSize; bufOffset >= flvHeaderSize && bufOffset+tagHeaderSize < len(buf); {
+
+               if buf[bufOffset]&0b11000000 != 0 &&
+                       buf[bufOffset]&0b00011111 != videoTag &&
+                       buf[bufOffset]&0b00011111 != audioTag &&
+                       buf[bufOffset]&0b00011111 != scriptTag {
+                       err = ErrNoFoundTagHeader
+                       return
                }
-               if bufl := keyframe.Size(); confirm_num != bufl {
-                       _ = keyframe.RemoveBack(bufl - confirm_num)
+
+               if buf[bufOffset+8]|buf[bufOffset+9]|buf[bufOffset+10] != streamId {
+                       err = ErrStreamId
+                       return
                }
-       }()
 
-       for buf_offset := 0; buf_offset+tag_header_size < len(buf); {
+               tagSize := int(F.Btoi32([]byte{0x00, buf[bufOffset+1], buf[bufOffset+2], buf[bufOffset+3]}, 0))
+               if tagSize == 0 {
+                       err = ErrTagSizeZero
+                       return
+               }
 
-               tag_offset := buf_offset + bytes.IndexAny(buf[buf_offset:], string([]byte{video_tag, audio_tag, script_tag}))
-               if tag_offset == buf_offset-1 {
-                       err = errors.New(`no found available tag`)
-                       // fmt.Printf("last %x\n",buf[tag_offset:tag_offset+tag_header_size])
-                       return //no found available video,audio,script tag
+               if bufOffset+tagHeaderSize+tagSize+previouTagSize > len(buf) {
+                       return
                }
-               if tag_offset+tag_header_size > len(buf) {
-                       // err = errors.New(`reach end when get tag header`)
-                       // fmt.Printf("last %x\n",buf[tag_offset:tag_offset+tag_header_size])
-                       return //buf end
+
+               tagSizeCheck := int(F.Btoi32(buf[bufOffset+tagHeaderSize+tagSize:bufOffset+tagHeaderSize+tagSize+previouTagSize], 0))
+               if tagNum != 0 && tagSizeCheck != tagSize+tagHeaderSize {
+                       err = ErrTagSize
+                       return
                }
 
-               streamid := int(F.Btoi32([]byte{0x00, buf[tag_offset+8], buf[tag_offset+9], buf[tag_offset+10]}, 0))
-               if streamid != 0 {
-                       buf_offset = tag_offset + 1
-                       last_available_offset = buf_offset
-                       // fmt.Printf("streamid error %x\n",buf[tag_offset:tag_offset+tag_header_size])
-                       continue //streamid error
+               tagNum += 1
+
+               if sign != 0x07 { // ignore first video audio time_stamp
+                       if (buf[bufOffset] == videoTag) && (sign&0x04 == 0x00) {
+                               sign |= 0x04
+                       } else if (buf[bufOffset] == audioTag) && (sign&0x02 == 0x00) {
+                               sign |= 0x02
+                       } else if (buf[bufOffset] == scriptTag) && (sign&0x01 == 0x00) {
+                               sign |= 0x01
+                       } else {
+                               err = ErrSignLost
+                               return
+                       }
+                       bufOffset += tagSizeCheck + previouTagSize
+
+                       if sign == 0x07 {
+                               frontBuf = append(frontBuf, buf[0:bufOffset]...) // copy
+                               dropOffset = bufOffset
+                               t.init = true
+                               return
+                       }
                }
+       }
+       return
+}
+
+// this fuction read []byte and return flv header and all complete keyframe if possible.
+// complete keyframe means the video and audio tags between two video key frames tag
+func (t *FlvDecoder) SearchStreamTag(buf []byte, keyframe *slice.Buf[byte]) (dropOffset int, err error) {
+       if !t.init {
+               err = ErrNoInit
+               return
+       }
+
+       keyframe.Reset()
+
+       var (
+               keyframeOp = -1
+               lastAT     int
+               lastVT     int
+       )
 
-               tag_size := int(F.Btoi32([]byte{0x00, buf[tag_offset+1], buf[tag_offset+2], buf[tag_offset+3]}, 0))
-               if tag_offset+tag_header_size+tag_size+previou_tag_size > len(buf) {
-                       // err = errors.New(`reach end when get tag body`)
-                       // fmt.Printf("last %x\n",buf[tag_offset:tag_offset+tag_header_size])
-                       return //buf end
+       for bufOffset := 0; bufOffset >= 0 && bufOffset+tagHeaderSize < len(buf); {
+
+               if buf[bufOffset]&0b11000000 != 0 &&
+                       buf[bufOffset]&0b00011111 != videoTag &&
+                       buf[bufOffset]&0b00011111 != audioTag &&
+                       buf[bufOffset]&0b00011111 != scriptTag {
+                       err = ErrNoFoundTagHeader
+                       return
                }
-               if tag_size == 0 {
-                       buf_offset = tag_offset + 1
-                       last_available_offset = buf_offset
-                       // fmt.Printf("tag_size error %x\n",buf[tag_offset:tag_offset+tag_header_size])
-                       // continue //tag_size error
-                       err = errors.New(`[decoder]tag_size error`)
+
+               if buf[bufOffset+8]|buf[bufOffset+9]|buf[bufOffset+10] != streamId {
+                       err = ErrStreamId
                        return
                }
 
-               tag_size_check := int(F.Btoi32(buf[tag_offset+tag_header_size+tag_size:tag_offset+tag_header_size+tag_size+previou_tag_size], 0))
-               if tag_num+tag_size_check == 0 {
-                       tag_size_check = tag_size + tag_header_size
+               tagSize := int(F.Btoi32([]byte{0x00, buf[bufOffset+1], buf[bufOffset+2], buf[bufOffset+3]}, 0))
+               if tagSize == 0 {
+                       err = ErrTagSizeZero
+                       return
                }
-               if tag_size_check != tag_size+tag_header_size {
-                       buf_offset = tag_offset + 1
-                       last_available_offset = buf_offset
-                       // fmt.Printf("tag_size_check error %x\n",buf[tag_offset:tag_offset+tag_header_size])
-                       // continue //tag_size_check error
-                       err = errors.New(`[decoder]tag_size_check error`)
+               if bufOffset+tagHeaderSize+tagSize+previouTagSize > len(buf) {
                        return
                }
 
-               time_stamp := int(F.Btoi32([]byte{buf[tag_offset+7], buf[tag_offset+4], buf[tag_offset+5], buf[tag_offset+6]}, 0))
-
-               // show tag header
-               // fmt.Printf("%x\n", buf[tag_offset:tag_offset+tag_header_size])
-
-               tag_num += 1
-
-               if time_stamp == 0 || sign != 0x00 { // ignore first video audio time_stamp
-                       if len(front_buf) != 0 {
-                               //first video audio script tag
-                               if (buf[tag_offset] == video_tag) && (sign&0x04 == 0x00) {
-                                       sign |= 0x04
-                                       front_buf = append(front_buf, buf[tag_offset:tag_offset+tag_size_check+previou_tag_size]...)
-                               } else if (buf[tag_offset] == audio_tag) && (sign&0x02 == 0x00) {
-                                       sign |= 0x02
-                                       front_buf = append(front_buf, buf[tag_offset:tag_offset+tag_size_check+previou_tag_size]...)
-                               } else if (buf[tag_offset] == script_tag) && (sign&0x01 == 0x00) {
-                                       sign |= 0x01
-                                       front_buf = append(front_buf, buf[tag_offset:tag_offset+tag_size_check+previou_tag_size]...)
-                               }
-                       }
-                       buf_offset = tag_offset + tag_size_check + previou_tag_size
-                       last_available_offset = buf_offset
-                       continue
+               tagSizeCheck := int(F.Btoi32(buf[bufOffset+tagHeaderSize+tagSize:bufOffset+tagHeaderSize+tagSize+previouTagSize], 0))
+               if tagSizeCheck != tagSize+tagHeaderSize {
+                       err = ErrTagSize
+                       return
                }
 
-               if buf[tag_offset] == video_tag {
-                       if buf[tag_offset+11]&0xf0 == 0x10 { //key frame
-                               keyframe_num += 1
-                               confirm_num = keyframe.Size()
-                               last_available_offset = tag_offset
+               timeStamp := int(F.Btoi32([]byte{buf[bufOffset+7], buf[bufOffset+4], buf[bufOffset+5], buf[bufOffset+6]}, 0))
+               switch {
+               case buf[bufOffset] == videoTag:
+                       lastVT = timeStamp
+               case buf[bufOffset] == audioTag:
+                       lastAT = timeStamp
+               default:
+               }
+               if lastAT != 0 && lastVT != 0 {
+                       diff := math.Abs(float64(lastVT - lastAT))
+                       if diff > t.Diff {
+                               err = fmt.Errorf("时间戳不匹配 %v %v (或许应调整flv音视频时间戳容差ms>%f)", lastVT, lastAT, diff)
+                               return
                        }
+               }
 
-                       if keyframe_num >= 0 {
-                               _ = keyframe.Append(buf[tag_offset : tag_offset+tag_size_check+previou_tag_size])
-                       }
-               } else if buf[tag_offset] == audio_tag {
-                       if keyframe_num >= 0 {
-                               _ = keyframe.Append(buf[tag_offset : tag_offset+tag_size_check+previou_tag_size])
+               if buf[bufOffset] == videoTag && buf[bufOffset+11]&0xf0 == 0x10 { //key frame
+                       if keyframeOp >= 0 {
+                               err = keyframe.Append(buf[keyframeOp:bufOffset])
+                               dropOffset = bufOffset
                        }
+                       keyframeOp = bufOffset
                }
-
-               buf_offset = tag_offset + tag_size_check + previou_tag_size
+               bufOffset += tagSizeCheck + previouTagSize
        }
 
        return
 }
+
+func (t *FlvDecoder) Parse(buf []byte, keyframe *slice.Buf[byte]) (frontBuf []byte, dropOffset int, err error) {
+       if !t.init {
+               frontBuf, dropOffset, err = t.InitFlv(buf)
+       } else {
+               dropOffset, err = t.SearchStreamTag(buf, keyframe)
+       }
+       return
+}
index 353553614b9bf8f8aaee2ab54d0ec45b6c9348d2..4af7cb49038eb5b172163e60285a520af0e1b9d9 100644 (file)
@@ -25,6 +25,7 @@ func Test_FLVdeal(t *testing.T) {
        buf := make([]byte, humanize.MByte)
        buff := slice.New[byte](10 * humanize.MByte)
        max := 0
+       flvDecoder := NewFlvDecoder()
 
        for c := 0; true; c++ {
                n, e := f.Read(buf)
@@ -37,7 +38,11 @@ func Test_FLVdeal(t *testing.T) {
                        max = s
                }
                keyframe := slice.New[byte]()
-               front_buf, last_available_offset, e := Search_stream_tag(buff.GetPureBuf(), keyframe)
+               front_buf, last_available_offset, e := flvDecoder.InitFlv(buff.GetPureBuf())
+               if e != nil {
+                       t.Fatal(e)
+               }
+               last_available_offset, e = flvDecoder.SearchStreamTag(buff.GetPureBuf()[last_available_offset:], keyframe)
                if e != nil {
                        t.Fatal(e)
                }
index 770a4fb228998ab82a32ddae3fe9fbe50c5909af..0dd982d2521f041b4e0319bfd70523a5bdf4c153 100644 (file)
@@ -24,6 +24,7 @@ import (
        "sync/atomic"
        "time"
 
+       "github.com/dustin/go-humanize"
        c "github.com/qydysky/bili_danmu/CV"
        F "github.com/qydysky/bili_danmu/F"
 
@@ -797,17 +798,13 @@ func (t *M4SStream) saveStreamFlv() (e error) {
                {
                        pipe := pio.NewPipe()
                        var (
-                               leastReadUnix     atomic.Int64
-                               readTO            int64 = 3
-                               useInterFlvHeader bool  = false
+                               leastReadUnix atomic.Int64
+                               readTO        int64 = 10
                        )
                        leastReadUnix.Store(time.Now().Unix())
                        if v, ok := t.common.K_v.LoadV(`flv断流超时s`).(float64); ok && int64(v) > readTO {
                                readTO = int64(v)
                        }
-                       if v, ok := t.common.K_v.LoadV(`flv使用内置头`).(bool); ok && v {
-                               useInterFlvHeader = v
-                       }
 
                        // read timeout
                        go func() {
@@ -843,85 +840,67 @@ func (t *M4SStream) saveStreamFlv() (e error) {
                                defer cancel()
 
                                var (
-                                       ticker   = time.NewTicker(time.Second)
-                                       buff     = slice.New[byte]()
-                                       keyframe = slice.New[byte]()
-                                       buf      = make([]byte, 1<<16)
+                                       buff       = slice.New[byte]()
+                                       keyframe   = slice.New[byte]()
+                                       buf        = make([]byte, humanize.KByte)
+                                       flvDecoder = NewFlvDecoder()
+                                       bufSize    = humanize.KByte * 1100
                                )
-                               defer ticker.Stop()
+
+                               if v, ok := c.C.K_v.LoadV(`flv音视频时间戳容差ms`).(float64); ok && v > 100 {
+                                       flvDecoder.Diff = v
+                               }
 
                                for {
-                                       n, e := pipe.Read(buf)
-                                       _ = buff.Append(buf[:n])
-                                       if e != nil {
+                                       if n, e := pipe.Read(buf); e != nil {
                                                pctx.PutVal(cancelC, &errCtx, e)
                                                break
-                                       }
-
-                                       select {
-                                       case <-ticker.C:
-                                       default:
+                                       } else if e = buff.Append(buf[:n]); e != nil {
+                                               pctx.PutVal(cancelC, &errCtx, e)
+                                               break
+                                       } else if buff.Size() < bufSize {
                                                continue
                                        }
 
                                        if !buff.IsEmpty() {
-                                               keyframe.Reset()
+                                               // front_buf
                                                buf, unlock := buff.GetPureBufRLock()
-                                               front_buf, last_available_offset, e := Search_stream_tag(buf, keyframe)
+                                               frontBuf, dropOffset, e := flvDecoder.Parse(buf, keyframe)
                                                unlock()
                                                if e != nil {
-                                                       if strings.Contains(e.Error(), `no found available tag`) {
-                                                               continue
-                                                       }
+                                                       t.log.L(`E: `, e)
                                                        pctx.PutVal(cancelC, &errCtx, errors.New("[decoder]"+e.Error()))
-                                                       //丢弃所有数据
-                                                       buff.Reset()
-                                               }
-                                               // 存在有效数据
-                                               if len(front_buf) != 0 || keyframe.Size() != 0 {
-                                                       leastReadUnix.Store(time.Now().Unix())
+                                                       break
                                                }
-                                               if len(front_buf) != 0 && len(t.first_buf) == 0 {
-                                                       t.first_buf = t.first_buf[:0]
-                                                       t.first_buf = append(t.first_buf, front_buf...)
-                                                       // fmt.Println("write front_buf")
-                                                       // t.Stream_msg.PushLock_tag(`data`, t.first_buf)
+
+                                               if len(frontBuf) != 0 {
+                                                       t.first_buf = frontBuf
                                                        t.msg.Push_tag(`load`, t)
                                                }
+
                                                if keyframe.Size() != 0 {
-                                                       if len(t.first_buf) == 0 {
-                                                               if useInterFlvHeader {
-                                                                       switch v.Codec {
-                                                                       case "hevc":
-                                                                               t.log.L(`W: `, `flv未接收到起始段,使用内置头`)
-                                                                               t.first_buf = t.first_buf[:0]
-                                                                               t.first_buf = append(t.first_buf, flvHeaderHevc...)
-                                                                               t.msg.Push_tag(`load`, t)
-                                                                       case "avc":
-                                                                               t.log.L(`W: `, `flv未接收到起始段,使用内置头`)
-                                                                               t.first_buf = t.first_buf[:0]
-                                                                               t.first_buf = append(t.first_buf, flvHeader...)
-                                                                               t.msg.Push_tag(`load`, t)
-                                                                       default:
-                                                                       }
-                                                               }
-                                                               if len(t.first_buf) == 0 {
-                                                                       t.log.L(`W: `, `flv未接收到起始段`)
-                                                                       pctx.PutVal(cancelC, &errCtx, errors.New(`flv未接收到起始段`))
-                                                                       break
-                                                               }
-                                                       }
+                                                       // 存在有效数据
+                                                       leastReadUnix.Store(time.Now().Unix())
+
                                                        buf, unlock := keyframe.GetPureBufRLock()
                                                        t.bootBufPush(buf)
                                                        t.Stream_msg.PushLock_tag(`data`, buf)
                                                        unlock()
+
                                                        keyframe.Reset()
                                                        t.frameCount += 1
                                                        t.msg.Push_tag(`keyFrame`, t)
+                                               } else {
+                                                       bufSize += humanize.KByte * 50
+                                                       if bufSize > humanize.MByte*10 {
+                                                               t.log.L(`E: `, `缓冲池过大`)
+                                                               pctx.PutVal(cancelC, &errCtx, errors.New("缓冲池过大"))
+                                                               break
+                                                       }
                                                }
-                                               if last_available_offset > 1 {
-                                                       // fmt.Println("write Sync")
-                                                       _ = buff.RemoveFront(last_available_offset - 1)
+
+                                               if dropOffset > 0 {
+                                                       _ = buff.RemoveFront(dropOffset)
                                                }
                                        }
                                }
index 68f7aac12031d5f307fe345437340d6da5f3f501..bcf35e833655e860a433cd057a1cda78595736cb 100644 (file)
     "直播流停用服务器-help": "正则字符串数组",
     "直播流停用服务器": [],
     "直播流接收n帧才保存": 3,
-    "flv断流超时s": 5,
+    "flv断流超时s": 10,
     "flv断流续接": true,
-    "flv使用内置头": false,
+    "flv音视频时间戳容差ms-help": "默认100,小于默认无效,调大可以允许较差的流,但可能会音画不同步",
+    "flv音视频时间戳容差ms": 100,
     "fmp4切片下载超时s": 3,
     "fmp4获取更多服务器": true,
     "fmp4跳过解码出错的帧-help": "fmp4跳过解码出错的帧、但可能导致关键帧时间上的跳越",