package reply
import (
- "bytes"
"errors"
+ "fmt"
+ "math"
- // "math"
-
- "github.com/dustin/go-humanize"
F "github.com/qydysky/bili_danmu/F"
slice "github.com/qydysky/part/slice"
)
const (
- flv_header_size = 9
- tag_header_size = 11
- previou_tag_size = 4
-
- video_tag = byte(0x09)
- audio_tag = byte(0x08)
- script_tag = byte(0x12)
+ flvHeaderSize = 9
+ tagHeaderSize = 11
+ previouTagSize = 4
+
+ streamId = byte(0x00)
+ videoTag = byte(0x09)
+ audioTag = byte(0x08)
+ scriptTag = byte(0x12)
)
var (
- flv_header_sign = []byte{0x46, 0x4c, 0x56}
+ flvHeaderSign = []byte{0x46, 0x4c, 0x56}
+
+ ErrInit = errors.New("ErrInit")
+ ErrNoInit = errors.New("ErrNoInit")
+ ErrNoFoundFlvHeader = errors.New("ErrNoFoundFlvHeader")
+ ErrNoFoundTagHeader = errors.New("ErrNoFoundTagHeader")
+ ErrTagSizeZero = errors.New("ErrTagSizeZero")
+ ErrStreamId = errors.New("ErrStreamId")
+ ErrTagSize = errors.New("ErrTagSize")
+ ErrSignLost = errors.New("ErrSignLost")
)
-// this fuction read []byte and return flv header and all complete keyframe if possible.
-// complete keyframe means the video and audio tags between two video key frames tag
-func Search_stream_tag(buf []byte, keyframe *slice.Buf[byte]) (front_buf []byte, last_available_offset int, err error) {
- if len(buf) > humanize.MByte*100 {
- err = errors.New("buf too large")
+type FlvDecoder struct {
+ Diff float64
+ init bool
+}
+
+func NewFlvDecoder() *FlvDecoder {
+ return &FlvDecoder{Diff: 100}
+}
+
+func (t *FlvDecoder) InitFlv(buf []byte) (frontBuf []byte, dropOffset int, err error) {
+ if t.init {
+ err = ErrInit
return
}
- //get flv header(9byte) + FirstTagSize(4byte)
- if header_offset := bytes.Index(buf, flv_header_sign); header_offset != -1 {
- if header_offset+flv_header_size+previou_tag_size > len(buf) {
- err = errors.New(`no found available tag`)
- return
- }
- front_buf = buf[header_offset : header_offset+flv_header_size+previou_tag_size]
- last_available_offset = header_offset + flv_header_size + previou_tag_size
+
+ var sign = 0x00
+ var tagNum = 0
+
+ if len(buf) < flvHeaderSize+previouTagSize {
+ return
}
- var (
- sign = 0x00
- keyframe_num = -1
- confirm_num = -1
- tag_num = 0
- )
+ if buf[0] != flvHeaderSign[0] || buf[1] != flvHeaderSign[1] || buf[2] != flvHeaderSign[2] {
+ err = ErrNoFoundFlvHeader
+ return
+ }
- defer func() {
- if sign != 0x07 {
- // if sign != 0x00 {
- // fmt.Printf("front_buf error:%x\n", sign)
- // }
- front_buf = front_buf[:0]
+ if buf[flvHeaderSize]|buf[flvHeaderSize+1]|buf[flvHeaderSize+2]|buf[flvHeaderSize+3] != 0 {
+ err = ErrTagSize
+ return
+ }
+
+ for bufOffset := flvHeaderSize + previouTagSize; bufOffset >= flvHeaderSize && bufOffset+tagHeaderSize < len(buf); {
+
+ if buf[bufOffset]&0b11000000 != 0 &&
+ buf[bufOffset]&0b00011111 != videoTag &&
+ buf[bufOffset]&0b00011111 != audioTag &&
+ buf[bufOffset]&0b00011111 != scriptTag {
+ err = ErrNoFoundTagHeader
+ return
}
- if bufl := keyframe.Size(); confirm_num != bufl {
- _ = keyframe.RemoveBack(bufl - confirm_num)
+
+ if buf[bufOffset+8]|buf[bufOffset+9]|buf[bufOffset+10] != streamId {
+ err = ErrStreamId
+ return
}
- }()
- for buf_offset := 0; buf_offset+tag_header_size < len(buf); {
+ tagSize := int(F.Btoi32([]byte{0x00, buf[bufOffset+1], buf[bufOffset+2], buf[bufOffset+3]}, 0))
+ if tagSize == 0 {
+ err = ErrTagSizeZero
+ return
+ }
- tag_offset := buf_offset + bytes.IndexAny(buf[buf_offset:], string([]byte{video_tag, audio_tag, script_tag}))
- if tag_offset == buf_offset-1 {
- err = errors.New(`no found available tag`)
- // fmt.Printf("last %x\n",buf[tag_offset:tag_offset+tag_header_size])
- return //no found available video,audio,script tag
+ if bufOffset+tagHeaderSize+tagSize+previouTagSize > len(buf) {
+ return
}
- if tag_offset+tag_header_size > len(buf) {
- // err = errors.New(`reach end when get tag header`)
- // fmt.Printf("last %x\n",buf[tag_offset:tag_offset+tag_header_size])
- return //buf end
+
+ tagSizeCheck := int(F.Btoi32(buf[bufOffset+tagHeaderSize+tagSize:bufOffset+tagHeaderSize+tagSize+previouTagSize], 0))
+ if tagNum != 0 && tagSizeCheck != tagSize+tagHeaderSize {
+ err = ErrTagSize
+ return
}
- streamid := int(F.Btoi32([]byte{0x00, buf[tag_offset+8], buf[tag_offset+9], buf[tag_offset+10]}, 0))
- if streamid != 0 {
- buf_offset = tag_offset + 1
- last_available_offset = buf_offset
- // fmt.Printf("streamid error %x\n",buf[tag_offset:tag_offset+tag_header_size])
- continue //streamid error
+ tagNum += 1
+
+ if sign != 0x07 { // ignore first video audio time_stamp
+ if (buf[bufOffset] == videoTag) && (sign&0x04 == 0x00) {
+ sign |= 0x04
+ } else if (buf[bufOffset] == audioTag) && (sign&0x02 == 0x00) {
+ sign |= 0x02
+ } else if (buf[bufOffset] == scriptTag) && (sign&0x01 == 0x00) {
+ sign |= 0x01
+ } else {
+ err = ErrSignLost
+ return
+ }
+ bufOffset += tagSizeCheck + previouTagSize
+
+ if sign == 0x07 {
+ frontBuf = append(frontBuf, buf[0:bufOffset]...) // copy
+ dropOffset = bufOffset
+ t.init = true
+ return
+ }
}
+ }
+ return
+}
+
+// this fuction read []byte and return flv header and all complete keyframe if possible.
+// complete keyframe means the video and audio tags between two video key frames tag
+func (t *FlvDecoder) SearchStreamTag(buf []byte, keyframe *slice.Buf[byte]) (dropOffset int, err error) {
+ if !t.init {
+ err = ErrNoInit
+ return
+ }
+
+ keyframe.Reset()
+
+ var (
+ keyframeOp = -1
+ lastAT int
+ lastVT int
+ )
- tag_size := int(F.Btoi32([]byte{0x00, buf[tag_offset+1], buf[tag_offset+2], buf[tag_offset+3]}, 0))
- if tag_offset+tag_header_size+tag_size+previou_tag_size > len(buf) {
- // err = errors.New(`reach end when get tag body`)
- // fmt.Printf("last %x\n",buf[tag_offset:tag_offset+tag_header_size])
- return //buf end
+ for bufOffset := 0; bufOffset >= 0 && bufOffset+tagHeaderSize < len(buf); {
+
+ if buf[bufOffset]&0b11000000 != 0 &&
+ buf[bufOffset]&0b00011111 != videoTag &&
+ buf[bufOffset]&0b00011111 != audioTag &&
+ buf[bufOffset]&0b00011111 != scriptTag {
+ err = ErrNoFoundTagHeader
+ return
}
- if tag_size == 0 {
- buf_offset = tag_offset + 1
- last_available_offset = buf_offset
- // fmt.Printf("tag_size error %x\n",buf[tag_offset:tag_offset+tag_header_size])
- // continue //tag_size error
- err = errors.New(`[decoder]tag_size error`)
+
+ if buf[bufOffset+8]|buf[bufOffset+9]|buf[bufOffset+10] != streamId {
+ err = ErrStreamId
return
}
- tag_size_check := int(F.Btoi32(buf[tag_offset+tag_header_size+tag_size:tag_offset+tag_header_size+tag_size+previou_tag_size], 0))
- if tag_num+tag_size_check == 0 {
- tag_size_check = tag_size + tag_header_size
+ tagSize := int(F.Btoi32([]byte{0x00, buf[bufOffset+1], buf[bufOffset+2], buf[bufOffset+3]}, 0))
+ if tagSize == 0 {
+ err = ErrTagSizeZero
+ return
}
- if tag_size_check != tag_size+tag_header_size {
- buf_offset = tag_offset + 1
- last_available_offset = buf_offset
- // fmt.Printf("tag_size_check error %x\n",buf[tag_offset:tag_offset+tag_header_size])
- // continue //tag_size_check error
- err = errors.New(`[decoder]tag_size_check error`)
+ if bufOffset+tagHeaderSize+tagSize+previouTagSize > len(buf) {
return
}
- time_stamp := int(F.Btoi32([]byte{buf[tag_offset+7], buf[tag_offset+4], buf[tag_offset+5], buf[tag_offset+6]}, 0))
-
- // show tag header
- // fmt.Printf("%x\n", buf[tag_offset:tag_offset+tag_header_size])
-
- tag_num += 1
-
- if time_stamp == 0 || sign != 0x00 { // ignore first video audio time_stamp
- if len(front_buf) != 0 {
- //first video audio script tag
- if (buf[tag_offset] == video_tag) && (sign&0x04 == 0x00) {
- sign |= 0x04
- front_buf = append(front_buf, buf[tag_offset:tag_offset+tag_size_check+previou_tag_size]...)
- } else if (buf[tag_offset] == audio_tag) && (sign&0x02 == 0x00) {
- sign |= 0x02
- front_buf = append(front_buf, buf[tag_offset:tag_offset+tag_size_check+previou_tag_size]...)
- } else if (buf[tag_offset] == script_tag) && (sign&0x01 == 0x00) {
- sign |= 0x01
- front_buf = append(front_buf, buf[tag_offset:tag_offset+tag_size_check+previou_tag_size]...)
- }
- }
- buf_offset = tag_offset + tag_size_check + previou_tag_size
- last_available_offset = buf_offset
- continue
+ tagSizeCheck := int(F.Btoi32(buf[bufOffset+tagHeaderSize+tagSize:bufOffset+tagHeaderSize+tagSize+previouTagSize], 0))
+ if tagSizeCheck != tagSize+tagHeaderSize {
+ err = ErrTagSize
+ return
}
- if buf[tag_offset] == video_tag {
- if buf[tag_offset+11]&0xf0 == 0x10 { //key frame
- keyframe_num += 1
- confirm_num = keyframe.Size()
- last_available_offset = tag_offset
+ timeStamp := int(F.Btoi32([]byte{buf[bufOffset+7], buf[bufOffset+4], buf[bufOffset+5], buf[bufOffset+6]}, 0))
+ switch {
+ case buf[bufOffset] == videoTag:
+ lastVT = timeStamp
+ case buf[bufOffset] == audioTag:
+ lastAT = timeStamp
+ default:
+ }
+ if lastAT != 0 && lastVT != 0 {
+ diff := math.Abs(float64(lastVT - lastAT))
+ if diff > t.Diff {
+ err = fmt.Errorf("时间戳不匹配 %v %v (或许应调整flv音视频时间戳容差ms>%f)", lastVT, lastAT, diff)
+ return
}
+ }
- if keyframe_num >= 0 {
- _ = keyframe.Append(buf[tag_offset : tag_offset+tag_size_check+previou_tag_size])
- }
- } else if buf[tag_offset] == audio_tag {
- if keyframe_num >= 0 {
- _ = keyframe.Append(buf[tag_offset : tag_offset+tag_size_check+previou_tag_size])
+ if buf[bufOffset] == videoTag && buf[bufOffset+11]&0xf0 == 0x10 { //key frame
+ if keyframeOp >= 0 {
+ err = keyframe.Append(buf[keyframeOp:bufOffset])
+ dropOffset = bufOffset
}
+ keyframeOp = bufOffset
}
-
- buf_offset = tag_offset + tag_size_check + previou_tag_size
+ bufOffset += tagSizeCheck + previouTagSize
}
return
}
+
+func (t *FlvDecoder) Parse(buf []byte, keyframe *slice.Buf[byte]) (frontBuf []byte, dropOffset int, err error) {
+ if !t.init {
+ frontBuf, dropOffset, err = t.InitFlv(buf)
+ } else {
+ dropOffset, err = t.SearchStreamTag(buf, keyframe)
+ }
+ return
+}
"sync/atomic"
"time"
+ "github.com/dustin/go-humanize"
c "github.com/qydysky/bili_danmu/CV"
F "github.com/qydysky/bili_danmu/F"
{
pipe := pio.NewPipe()
var (
- leastReadUnix atomic.Int64
- readTO int64 = 3
- useInterFlvHeader bool = false
+ leastReadUnix atomic.Int64
+ readTO int64 = 10
)
leastReadUnix.Store(time.Now().Unix())
if v, ok := t.common.K_v.LoadV(`flv断流超时s`).(float64); ok && int64(v) > readTO {
readTO = int64(v)
}
- if v, ok := t.common.K_v.LoadV(`flv使用内置头`).(bool); ok && v {
- useInterFlvHeader = v
- }
// read timeout
go func() {
defer cancel()
var (
- ticker = time.NewTicker(time.Second)
- buff = slice.New[byte]()
- keyframe = slice.New[byte]()
- buf = make([]byte, 1<<16)
+ buff = slice.New[byte]()
+ keyframe = slice.New[byte]()
+ buf = make([]byte, humanize.KByte)
+ flvDecoder = NewFlvDecoder()
+ bufSize = humanize.KByte * 1100
)
- defer ticker.Stop()
+
+ if v, ok := c.C.K_v.LoadV(`flv音视频时间戳容差ms`).(float64); ok && v > 100 {
+ flvDecoder.Diff = v
+ }
for {
- n, e := pipe.Read(buf)
- _ = buff.Append(buf[:n])
- if e != nil {
+ if n, e := pipe.Read(buf); e != nil {
pctx.PutVal(cancelC, &errCtx, e)
break
- }
-
- select {
- case <-ticker.C:
- default:
+ } else if e = buff.Append(buf[:n]); e != nil {
+ pctx.PutVal(cancelC, &errCtx, e)
+ break
+ } else if buff.Size() < bufSize {
continue
}
if !buff.IsEmpty() {
- keyframe.Reset()
+ // front_buf
buf, unlock := buff.GetPureBufRLock()
- front_buf, last_available_offset, e := Search_stream_tag(buf, keyframe)
+ frontBuf, dropOffset, e := flvDecoder.Parse(buf, keyframe)
unlock()
if e != nil {
- if strings.Contains(e.Error(), `no found available tag`) {
- continue
- }
+ t.log.L(`E: `, e)
pctx.PutVal(cancelC, &errCtx, errors.New("[decoder]"+e.Error()))
- //丢弃所有数据
- buff.Reset()
- }
- // 存在有效数据
- if len(front_buf) != 0 || keyframe.Size() != 0 {
- leastReadUnix.Store(time.Now().Unix())
+ break
}
- if len(front_buf) != 0 && len(t.first_buf) == 0 {
- t.first_buf = t.first_buf[:0]
- t.first_buf = append(t.first_buf, front_buf...)
- // fmt.Println("write front_buf")
- // t.Stream_msg.PushLock_tag(`data`, t.first_buf)
+
+ if len(frontBuf) != 0 {
+ t.first_buf = frontBuf
t.msg.Push_tag(`load`, t)
}
+
if keyframe.Size() != 0 {
- if len(t.first_buf) == 0 {
- if useInterFlvHeader {
- switch v.Codec {
- case "hevc":
- t.log.L(`W: `, `flv未接收到起始段,使用内置头`)
- t.first_buf = t.first_buf[:0]
- t.first_buf = append(t.first_buf, flvHeaderHevc...)
- t.msg.Push_tag(`load`, t)
- case "avc":
- t.log.L(`W: `, `flv未接收到起始段,使用内置头`)
- t.first_buf = t.first_buf[:0]
- t.first_buf = append(t.first_buf, flvHeader...)
- t.msg.Push_tag(`load`, t)
- default:
- }
- }
- if len(t.first_buf) == 0 {
- t.log.L(`W: `, `flv未接收到起始段`)
- pctx.PutVal(cancelC, &errCtx, errors.New(`flv未接收到起始段`))
- break
- }
- }
+ // 存在有效数据
+ leastReadUnix.Store(time.Now().Unix())
+
buf, unlock := keyframe.GetPureBufRLock()
t.bootBufPush(buf)
t.Stream_msg.PushLock_tag(`data`, buf)
unlock()
+
keyframe.Reset()
t.frameCount += 1
t.msg.Push_tag(`keyFrame`, t)
+ } else {
+ bufSize += humanize.KByte * 50
+ if bufSize > humanize.MByte*10 {
+ t.log.L(`E: `, `缓冲池过大`)
+ pctx.PutVal(cancelC, &errCtx, errors.New("缓冲池过大"))
+ break
+ }
}
- if last_available_offset > 1 {
- // fmt.Println("write Sync")
- _ = buff.RemoveFront(last_available_offset - 1)
+
+ if dropOffset > 0 {
+ _ = buff.RemoveFront(dropOffset)
}
}
}