From b29d3732132244579841408853b97fc25aaa1a52 Mon Sep 17 00:00:00 2001 From: qydysky <32743305+qydysky@users.noreply.github.com> Date: Tue, 10 Jan 2023 01:15:06 +0800 Subject: [PATCH] =?utf8?q?=E9=87=8D=E5=86=99flv=E5=A4=84=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit --- Reply/fmp4Decode.go | 50 ++++++++++++++++++++++----------------------- Reply/stream.go | 47 +++++++++++++++++++++++++++--------------- 2 files changed, 55 insertions(+), 42 deletions(-) diff --git a/Reply/fmp4Decode.go b/Reply/fmp4Decode.go index 74b0185..3d53c24 100644 --- a/Reply/fmp4Decode.go +++ b/Reply/fmp4Decode.go @@ -114,6 +114,31 @@ func (t *Fmp4Decoder) Seach_stream_fmp4(buf []byte, keyframes *bufB) (cu int, er haveKeyframe bool bufModified = t.buf.getModifiedTime() maxSequenceNumber int + + //get timeStamp + get_timeStamp = func(tfdt int) (ts timeStamp) { + switch buf[tfdt+8] { + case 0: + ts.data = buf[tfdt+16 : tfdt+20] + ts.timeStamp = int(F.Btoi(buf, tfdt+16, 4)) + case 1: + ts.data = buf[tfdt+12 : tfdt+20] + ts.timeStamp = int(F.Btoi64(buf, tfdt+12)) + } + return + } + + //get track type + get_track_type = func(tfhd, tfdt int) (ts timeStamp, handlerType byte) { + track, ok := t.traks[int(F.Btoi(buf, tfhd+12, 4))] + if ok { + ts := get_timeStamp(tfdt) + ts.handlerType = track.handlerType + ts.timescale = track.timescale + return ts, track.handlerType + } + return + } ) err = deal(buf, @@ -144,31 +169,6 @@ func (t *Fmp4Decoder) Seach_stream_fmp4(buf []byte, keyframes *bufB) (cu int, er } else { maxSequenceNumber = moofSN } - - //get timeStamp - var get_timeStamp = func(tfdt int) (ts timeStamp) { - switch buf[tfdt+8] { - case 0: - ts.data = buf[tfdt+16 : tfdt+20] - ts.timeStamp = int(F.Btoi(buf, tfdt+16, 4)) - case 1: - ts.data = buf[tfdt+12 : tfdt+20] - ts.timeStamp = int(F.Btoi64(buf, tfdt+12)) - } - return - } - - //get track type - var get_track_type = func(tfhd, tfdt int) (ts timeStamp, handlerType byte) { - track, ok := t.traks[int(F.Btoi(buf, tfhd+12, 4))] - if ok { - ts := get_timeStamp(tfdt) - ts.handlerType = track.handlerType - ts.timescale = track.timescale - return ts, track.handlerType - } - return - } { ts, handlerType := get_track_type(m[3].i, m[4].i) switch handlerType { diff --git a/Reply/stream.go b/Reply/stream.go index b32c511..d70eca3 100644 --- a/Reply/stream.go +++ b/Reply/stream.go @@ -532,20 +532,38 @@ func (t *M4SStream) saveStreamFlv() (e error) { // read go func() { - var buff []byte - var buf = make([]byte, 1<<16) + var ( + ticker = time.NewTicker(time.Second) + buff bufB + buf = make([]byte, 1<<16) + ) for { n, e := rc.Read(buf) + buff.append(buf[:n]) + if e != nil { + out.Close() + t.Stream_msg.Push_tag(`close`, nil) + break + } leastReadUnix = time.Now().Unix() - buff = append(buff, buf[:n]...) - if n > 0 { - front_buf, keyframe, last_avilable_offset, e := Seach_stream_tag(buff) + + skip := true + select { + case <-ticker.C: + skip = false + default: + } + if skip { + continue + } + + if !buff.isEmpty() { + front_buf, keyframe, last_avilable_offset, e := Seach_stream_tag(buff.getCopyBuf()) if e != nil { if strings.Contains(e.Error(), `no found available tag`) { continue } } - if len(front_buf)+len(keyframe) != 0 { if len(front_buf) != 0 { t.first_buf = front_buf @@ -559,22 +577,17 @@ func (t *M4SStream) saveStreamFlv() (e error) { t.bootBufPush(frame) t.Stream_msg.Push_tag(`data`, frame) } - if last_avilable_offset != 0 { - // fmt.Println("write Sync") - buff = buff[last_avilable_offset-1:] - out.Sync() - } } - } - if e != nil { - out.Close() - t.Stream_msg.Push_tag(`close`, nil) - break + if last_avilable_offset > 1 { + // fmt.Println("write Sync") + buff.removeFront(last_avilable_offset - 1) + out.Sync() + } } } buf = nil - buff = nil + buff.reset() }() CookieM := make(map[string]string) -- 2.39.2