From 534226c92fe2474f80e19eddadb39cbac674ad9e Mon Sep 17 00:00:00 2001 From: qydysky <32743305+qydysky@users.noreply.github.com> Date: Mon, 9 Jan 2023 04:32:41 +0800 Subject: [PATCH] =?utf8?q?=E9=87=8D=E5=86=99fmp4=E5=A4=84=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit --- Reply/Msg.go | 1 + Reply/Reply.go | 15 + Reply/flvDecode_test.go | 1 - Reply/fmp4Decode.go | 508 +++++++++++++-------------- Reply/fmp4Decode_test.go | 25 ++ Reply/sliceBuf.go | 177 +++++++--- Reply/stream.go | 136 ++++--- Reply/ws_msg/POPULAR_RANK_CHANGED.go | 14 + demo/config/config_disable_msg.json | 3 +- go.mod | 3 + go.sum | 34 +- 11 files changed, 515 insertions(+), 402 deletions(-) delete mode 100644 Reply/flvDecode_test.go create mode 100644 Reply/fmp4Decode_test.go create mode 100644 Reply/ws_msg/POPULAR_RANK_CHANGED.go diff --git a/Reply/Msg.go b/Reply/Msg.go index 92f4b14..b88da2c 100644 --- a/Reply/Msg.go +++ b/Reply/Msg.go @@ -114,6 +114,7 @@ var Msg_map = map[string]func(replyF, string){ "LIKE_INFO_V3_NOTICE": nil, "LIVE_INTERACTIVE_GAME": nil, "LIVE_MULTI_VIEW_CHANGE": nil, + "POPULAR_RANK_CHANGED": nil, //replyF.popular_rank_changed, // Msg-人气排名 } // 屏蔽不需要的消息 diff --git a/Reply/Reply.go b/Reply/Reply.go index 3ec4ead..baedf09 100644 --- a/Reply/Reply.go +++ b/Reply/Reply.go @@ -495,6 +495,20 @@ func (replyF) little_tips(s string) { msglog.Base_add("房").L(`I: `, s) } +// Msg-人气排名 +func (replyF) popular_rank_changed(s string) { + var type_item ws_msg.POPULAR_RANK_CHANGED + + if e := json.Unmarshal([]byte(s), &type_item); e != nil { + msglog.L(`E: `, e) + } + s = fmt.Sprintf("人气排行 %d", type_item.Data.Rank) + + Gui_show(s, "0room") + + msglog.Base_add("房").L(`I: `, s) +} + // Msg-开始了视频连线 func (replyF) video_connection_join_start(s string) { msglog := msglog.Base_add("房").Log_show_control(false) @@ -1124,6 +1138,7 @@ type Danmu_mq_t struct { var Danmu_mq = mq.New(10) +// 消息显示 func Gui_show(m ...string) { //m[0]:msg m[1]:uid uid := "" diff --git a/Reply/flvDecode_test.go b/Reply/flvDecode_test.go deleted file mode 100644 index 98cf0bf..0000000 --- a/Reply/flvDecode_test.go +++ /dev/null @@ -1 +0,0 @@ -package reply diff --git a/Reply/fmp4Decode.go b/Reply/fmp4Decode.go index b81dcc1..74b0185 100644 --- a/Reply/fmp4Decode.go +++ b/Reply/fmp4Decode.go @@ -3,328 +3,314 @@ package reply import ( "bytes" "errors" + "io" F "github.com/qydysky/bili_danmu/F" ) +var boxs map[string]bool + +func init() { + boxs = make(map[string]bool) + //isPureBox? || need to skip? + boxs["ftyp"] = true + boxs["moov"] = false + boxs["mvhd"] = true + boxs["trak"] = false + boxs["tkhd"] = true + boxs["mdia"] = false + boxs["mdhd"] = true + boxs["hdlr"] = true + boxs["minf"] = false || true + boxs["mvex"] = false || true + boxs["moof"] = false + boxs["mfhd"] = true + boxs["traf"] = false + boxs["tfhd"] = true + boxs["tfdt"] = true + boxs["trun"] = true + boxs["mdat"] = true +} + +type ie struct { + n string // box name + i int // start index + e int // end index +} + type trak struct { timescale int trackID int handlerType byte } +type timeStamp struct { + timeStamp int + data []byte + timescale int + handlerType byte +} + +func (t *timeStamp) getT() float64 { + return float64(t.timeStamp) / float64(t.timescale) +} + type Fmp4Decoder struct { traks map[int]trak + buf bufB } -func (t *Fmp4Decoder) Init_fmp4(buf []byte) ([]byte, error) { - var ( - cu int - lastMoovI int - lastMoovE int - ) - - //ftyp - ftypI := bytes.Index(buf[cu:], []byte("ftyp")) - if ftypI == -1 { - return nil, errors.New("未找到ftyp包") - } - ftypI = cu + ftypI - 4 - ftypE := ftypI + int(F.Btoi(buf, ftypI, 4)) - if ftypE > len(buf) { - return nil, errors.New("ftyp包破损") +func (t *Fmp4Decoder) Init_fmp4(buf []byte) (b []byte, err error) { + var ftypI, ftypE, moovI, moovE int + + err = deal(buf, + []string{"ftyp", "moov"}, + func(m []*ie) bool { + ftypI = m[0].i + ftypE = m[0].e + moovI = m[1].i + moovE = m[1].e + return true + }) + + if err != nil { + return nil, err } - cu = ftypI - - for cu < len(buf) { - //moov - moovI := bytes.Index(buf[cu:], []byte("moov")) - if moovI == -1 { - break - } - moovI = cu + moovI - 4 - moovE := moovI + int(F.Btoi(buf, moovI, 4)) - if moovE > len(buf) { - return nil, errors.New("moov包破损") - } - cu = moovI - - lastMoovI = moovI - lastMoovE = moovE - - for cu < moovE { - //trak - trakI := bytes.Index(buf[cu:], []byte("trak")) - if trakI == -1 { - break - } - trakI = cu + trakI - 4 - trakE := trakI + int(F.Btoi(buf, trakI, 4)) - if trakE > moovE { - return nil, errors.New("trak包破损") - } - cu = trakI - //tkhd - tkhdI := bytes.Index(buf[cu:], []byte("tkhd")) - if tkhdI == -1 { - return nil, errors.New("未找到tkhd包") - } - tkhdI = cu + tkhdI - 4 - tkhdE := tkhdI + int(F.Btoi(buf, tkhdI, 4)) - if tkhdE > trakE { - return nil, errors.New("tkhd包破损") - } - cu = tkhdI - - //mdia - mdiaI := bytes.Index(buf[cu:], []byte("mdia")) - if mdiaI == -1 { - return nil, errors.New("未找到mdia包") - } - mdiaI = cu + mdiaI - 4 - mdiaE := mdiaI + int(F.Btoi(buf, mdiaI, 4)) - if mdiaE > trakE { - return nil, errors.New("mdia包破损") - } - cu = mdiaI - - //mdhd - mdhdI := bytes.Index(buf[cu:], []byte("mdhd")) - if mdhdI == -1 { - return nil, errors.New("未找到mdhd包") - } - mdhdI = cu + mdhdI - 4 - mdhdE := mdhdI + int(F.Btoi(buf, mdhdI, 4)) - if mdhdE > mdiaE { - return nil, errors.New("mdhd包破损") - } - cu = mdhdI - - //hdlr - hdlrI := bytes.Index(buf[cu:], []byte("hdlr")) - if hdlrI == -1 { - return nil, errors.New("未找到hdlr包") - } - hdlrI = cu + hdlrI - 4 - hdlrE := hdlrI + int(F.Btoi(buf, hdlrI, 4)) - if hdlrE > mdiaE { - return nil, errors.New("hdlr包破损") - } - cu = hdlrI - - tackId := int(F.Btoi(buf, tkhdI+20, 4)) + err = deal(buf, + []string{"tkhd", "mdia", "mdhd", "hdlr"}, + func(m []*ie) bool { + tackId := int(F.Btoi(buf, m[0].i+20, 4)) if t.traks == nil { t.traks = make(map[int]trak) } t.traks[tackId] = trak{ trackID: tackId, - timescale: int(F.Btoi(buf, mdhdI+20, 4)), - handlerType: buf[hdlrI+16], + timescale: int(F.Btoi(buf, m[2].i+20, 4)), + handlerType: buf[m[3].i+16], } - } + return false + }) + + if err != nil { + return nil, err } + if len(t.traks) == 0 { - return nil, errors.New("未找到trak包") + return nil, errors.New("未找到任何trak包") } - return append(buf[ftypI:ftypE], buf[lastMoovI:lastMoovE]...), nil + + return append(buf[ftypI:ftypE], buf[moovI:moovE]...), nil } -func (t *Fmp4Decoder) Seach_stream_fmp4(buf []byte) (keyframes [][]byte, last_avilable_offset int, err error) { +func (t *Fmp4Decoder) Seach_stream_fmp4(buf []byte, keyframes *bufB) (cu int, err error) { if len(t.traks) == 0 { err = errors.New("未初始化traks") return } + t.buf.reset() var ( - cu int - haveKeyframe bool - keyframe []byte - frameTime int + haveKeyframe bool + bufModified = t.buf.getModifiedTime() + maxSequenceNumber int ) - for cu < len(buf) { - //moof - moofI := bytes.Index(buf[cu:], []byte("moof")) - if moofI == -1 { - break - } - moofI = cu + moofI - 4 - moofE := moofI + int(F.Btoi(buf, moofI, 4)) - if moofE > len(buf) { - break - } - cu = moofI - - var ( - iskeyFrame bool - videoTime float64 - audioTime float64 - audioTimeIndex int - audioTimeSize int - audioTimeScale int - ) - - for cu < moofE { - //traf - trafI := bytes.Index(buf[cu:], []byte("traf")) - if trafI == -1 { - break - } - trafI = cu + trafI - 4 - trafE := trafI + int(F.Btoi(buf, trafI, 4)) - if trafE > moofE { - break - } - cu = trafI + err = deal(buf, + []string{"moof", "mfhd", + "traf", "tfhd", "tfdt", "trun", + "traf", "tfhd", "tfdt", "trun", + "mdat"}, + func(m []*ie) bool { + var ( + keyframeMoof = buf[m[5].i+20] == byte(0x02) || buf[m[9].i+20] == byte(0x02) + moofSN = int(F.Btoi(buf, m[1].i+12, 4)) + video timeStamp + audio timeStamp + ) - //tfhd - tfhdI := bytes.Index(buf[cu:], []byte("tfhd")) - if tfhdI == -1 { - err = errors.New("未找到tfhd包") - break + // fmt.Println(moofSN, "frame", keyframeMoof, t.buf.size(), m[0].i, m[10].n, m[10].e) + + //is sn error? + if maxSequenceNumber == 0 { + maxSequenceNumber = moofSN + } else if moofSN == maxSequenceNumber { + return false + } else if moofSN != maxSequenceNumber+1 { + t.buf.reset() + haveKeyframe = false + cu = m[0].i + return false + } else { + maxSequenceNumber = moofSN } - tfhdI = cu + tfhdI - 4 - tfhdE := tfhdI + int(F.Btoi(buf, tfhdI, 4)) - if tfhdE > trafE { - err = errors.New("tfhd包破损") - break + + //get timeStamp + var get_timeStamp = func(tfdt int) (ts timeStamp) { + switch buf[tfdt+8] { + case 0: + ts.data = buf[tfdt+16 : tfdt+20] + ts.timeStamp = int(F.Btoi(buf, tfdt+16, 4)) + case 1: + ts.data = buf[tfdt+12 : tfdt+20] + ts.timeStamp = int(F.Btoi64(buf, tfdt+12)) + } + return } - cu = tfhdI - //tfdt - tfdtI := bytes.Index(buf[cu:], []byte("tfdt")) - if tfdtI == -1 { - err = errors.New("未找到tfdt包") - break + //get track type + var get_track_type = func(tfhd, tfdt int) (ts timeStamp, handlerType byte) { + track, ok := t.traks[int(F.Btoi(buf, tfhd+12, 4))] + if ok { + ts := get_timeStamp(tfdt) + ts.handlerType = track.handlerType + ts.timescale = track.timescale + return ts, track.handlerType + } + return } - tfdtI = cu + tfdtI - 4 - tfdtE := tfdtI + int(F.Btoi(buf, tfdtI, 4)) - if tfdtE > trafE { - err = errors.New("tfdt包破损") - break + { + ts, handlerType := get_track_type(m[3].i, m[4].i) + switch handlerType { + case 'v': + video = ts + case 's': + audio = ts + } + } + { + ts, handlerType := get_track_type(m[7].i, m[8].i) + switch handlerType { + case 'v': + video = ts + case 's': + audio = ts + } } - cu = tfdtI - //trun - trunI := bytes.Index(buf[cu:], []byte("trun")) - if trunI == -1 { - err = errors.New("未找到trun包") - break + //deal frame + if keyframeMoof { + //sync audio timeStamp + if audio.getT() != video.getT() { + date := F.Itob64(int64(video.getT() * float64(audio.timescale))) + copy(audio.data, date) + } + if t.buf.hadModified(bufModified) && !t.buf.isEmpty() { + keyframes.append(t.buf.getPureBuf()) + cu = m[0].i + t.buf.reset() + } + haveKeyframe = true + } else if !haveKeyframe { + cu = m[10].e } - trunI = cu + trunI - 4 - trunE := trunI + int(F.Btoi(buf, trunI, 4)) - if trunE > trafE { - err = errors.New("trun包破损") - break + if haveKeyframe { + t.buf.append(buf[m[0].i:m[10].e]) } - cu = trunI + return false + }) - var ( - timeStamp int - timeStampIndex int - timeSize int - ) - switch buf[tfdtI+8] { - case 0: - timeSize = 4 - timeStampIndex = tfdtI + 16 - timeStamp = int(F.Btoi(buf, tfdtI+16, 4)) - case 1: - timeSize = 8 - timeStampIndex = tfdtI + 12 - timeStamp = int(F.Btoi64(buf, tfdtI+12)) - } + if len(buf) > 1024*1024*20 { + err = errors.New("buf超过20M") + } - track, ok := t.traks[int(F.Btoi(buf, tfhdI+12, 4))] - if !ok { - err = errors.New("找不到trak") - // log.Default().Println(`cant find trak`, int(F.Btoi(buf, tfhdI+12))) - continue - } + return +} - switch track.handlerType { - case 'v': - videoTime = float64(timeStamp) / float64(track.timescale) - case 's': - audioTimeIndex = timeStampIndex - audioTimeSize = timeSize - audioTimeScale = track.timescale - audioTime = float64(timeStamp) / float64(track.timescale) - } +func deal(buf []byte, boxName []string, f func([]*ie) (breakloop bool)) (err error) { - if !iskeyFrame && buf[trunI+20] == byte(0x02) { - iskeyFrame = true - } + m, e := decode(buf, boxName[0]) + if len(m) == 0 { + return errors.New("未找到box") + } + if e != nil { + err = e + } - if track.handlerType == 'v' { - if timeStamp < frameTime { - // log.Default().Println("时间戳异常 忽略到下个关键帧") - iskeyFrame = false - haveKeyframe = false - keyframe = []byte{} + var matchCount = 0 + for cu := 0; cu < len(m); cu++ { + if m[cu].n == boxName[matchCount] { + matchCount += 1 + if matchCount == len(boxName) { + var ies []*ie + + for k, v := range boxName { + ies = append(ies, &ie{ + n: v, + i: m[cu-(matchCount-1)+k].i, + e: m[cu-(matchCount-1)+k].e, + }) + } + + if f(ies) { break } - frameTime = timeStamp + matchCount = 0 } + } else { + matchCount = 0 } + } - if err != nil { - break - } + return +} - //change audio timeStamp - if audioTime != videoTime { - // err = errors.New("重新设置音频时间戳") - switch audioTimeSize { - case 4: - // log.Default().Println("set audio to:", int32(videoTime*float64(audioTimeScale))) - date := F.Itob32(int32(videoTime * float64(audioTimeScale))) - copy(buf[audioTimeIndex:], date) - case 8: - // log.Default().Println("set audio to:", int64(videoTime*float64(audioTimeScale))) - date := F.Itob64(int64(videoTime * float64(audioTimeScale))) - copy(buf[audioTimeIndex:], date) - } - } +func decode(buf []byte, reSyncboxName string) (m []ie, err error) { + var cu int - if iskeyFrame { - haveKeyframe = true - last_avilable_offset = moofI - if len(keyframe) != 0 { - keyframes = append(keyframes, keyframe) + for cu < len(buf) { + boxName, i, e, E := searchBox(buf, &cu) + if E != nil { + err = E + if reSyncI := bytes.Index(buf[cu:], []byte(reSyncboxName)); reSyncI != -1 { + cu += reSyncI - 4 + m = []ie{} + continue } - keyframe = []byte{} + err = errors.New(E.Error() + " > 未能reSync") + return } - //mdat - mdatI := bytes.Index(buf[cu:], []byte("mdat")) - if moofI == -1 { - err = errors.New("未找到mdat包") - break - } - mdatI = cu + mdatI - 4 - mdatE := mdatI + int(F.Btoi(buf, mdatI, 4)) - if mdatE > len(buf) { - // err = errors.New("mdat包破损") - break - } - cu = mdatI - - if !iskeyFrame && !haveKeyframe { - // 之前并没有关键帧,丢弃 - last_avilable_offset = cu - continue - } - - keyframe = append(keyframe, buf[moofI:mdatE]...) + m = append(m, ie{ + n: boxName, + i: i, + e: e, + }) } - if cu == 0 { - err = errors.New("未找到moof") - } - if len(buf)-last_avilable_offset > 1024*1024*20 { - err = errors.New("buf超过20M") + return +} + +func searchBox(buf []byte, cu *int) (boxName string, i int, e int, err error) { + i = *cu + e = i + int(F.Btoi(buf, *cu, 4)) + boxName = string(buf[*cu+4 : *cu+8]) + isPureBoxOrNeedSkip, ok := boxs[boxName] + if !ok { + err = errors.New("未知包: " + boxName) + } else if e > len(buf) { + err = io.EOF + } else if isPureBoxOrNeedSkip { + *cu = e + } else { + *cu += 8 } return } + +// func testBox(buf []byte, no string) { +// fmt.Println("testBox", "===>") +// err := deal(buf, +// []string{"moof", "mfhd", +// "traf", "tfhd", "tfdt", "trun", +// "traf", "tfhd", "tfdt", "trun", +// "mdat"}, +// func(m []*ie) bool { +// moofSN := int(F.Btoi(buf, m[1].i+12, 4)) +// keyframeMoof := buf[m[5].i+20] == byte(0x02) || buf[m[9].i+20] == byte(0x02) +// fmt.Println(moofSN, "frame", keyframeMoof, m[0].i, m[10].n, m[10].e) +// return false +// }) +// fmt.Println("err", err) +// fmt.Println("testBox", "<===") +// } diff --git a/Reply/fmp4Decode_test.go b/Reply/fmp4Decode_test.go new file mode 100644 index 0000000..73dfe13 --- /dev/null +++ b/Reply/fmp4Decode_test.go @@ -0,0 +1,25 @@ +package reply + +import ( + _ "embed" + "testing" + + F "github.com/qydysky/bili_danmu/F" +) + +var buf []byte + +func Test_deal(t *testing.T) { + err := deal(buf, + []string{"moof", "mfhd", + "traf", "tfhd", "tfdt", "trun", + "traf", "tfhd", "tfdt", "trun", + "mdat"}, + func(m []*ie) bool { + moofSN := int(F.Btoi(buf, m[1].i+12, 4)) + keyframeMoof := buf[m[5].i+20] == byte(0x02) || buf[m[9].i+20] == byte(0x02) + t.Log(moofSN, "frame", keyframeMoof, m[0].i, m[10].n, m[10].e) + return false + }) + t.Log("err", err) +} diff --git a/Reply/sliceBuf.go b/Reply/sliceBuf.go index cf40af7..1831f29 100644 --- a/Reply/sliceBuf.go +++ b/Reply/sliceBuf.go @@ -1,59 +1,132 @@ package reply -// 线程不安全的[]byte操作 -// 需保证append到getSlice的[]byte已被使用之间,对象[]byte未改动 -type bufI struct { - b []int - e []int - size int - buf []byte - useDirect bool //true:直接使用源[]byte,仅适用于连续的append - useBufPool bool //true:使用内部buf,当getSlice调用时,上次getSlice输出[]byte失效 - //useDirect、useBufPool都为false:每次都返回新创建的[]byte -} - -func (t *bufI) reset() { - t.b = []int{} - t.e = []int{} - t.size = 0 -} - -func (t *bufI) append(b, e int) { - if len(t.e) > 0 && t.e[len(t.e)-1] == b { - t.e[len(t.e)-1] = e - } else { - t.b = append(t.b, b) - t.e = append(t.e, e) - } - t.size += e - b -} - -func (t *bufI) getSlice(buf []byte) []byte { - if t.useDirect && len(t.b) == 1 { - return buf[t.b[0]:t.e[0]] - } else if t.useBufPool { - if len(t.buf) == 0 { - t.buf = make([]byte, t.size) - } else if len(t.buf) < t.size { - t.buf = append(t.buf, make([]byte, t.size-len(t.buf))...) - } else if diff := len(t.buf) - t.size; diff > 0 { - t.buf = t.buf[:t.size+diff/2] - } - i := 0 - for k, bi := range t.b { - i += copy(t.buf[i:], buf[bi:t.e[k]]) - } - return t.buf[:i] +import ( + "sync" + "time" +) + +type bufB struct { + bufsize int + modifiedTime time.Time + buf []byte + sync.RWMutex +} + +func (t *bufB) size() int { + t.RLock() + defer t.RUnlock() + + return t.bufsize +} + +func (t *bufB) isEmpty() bool { + t.RLock() + defer t.RUnlock() + + return t.bufsize == 0 +} + +func (t *bufB) reset() { + t.Lock() + defer t.Unlock() + + t.bufsize = 0 +} + +func (t *bufB) append(data []byte) { + t.Lock() + defer t.Unlock() + + if len(t.buf) == 0 { + t.buf = make([]byte, len(data)) } else { - var b = make([]byte, t.size) - if len(t.b) == 1 { - copy(b, buf[t.b[0]:t.e[0]]) + diff := len(t.buf) - t.bufsize - len(data) + if diff < 0 { + t.buf = append(t.buf, make([]byte, -diff)...) } else { - i := 0 - for k, bi := range t.b { - i += copy(b[i:], buf[bi:t.e[k]]) - } + t.buf = t.buf[:t.bufsize+len(data)] } - return b } + t.bufsize += copy(t.buf[t.bufsize:], data) + t.modifiedTime = time.Now() +} + +func (t *bufB) removeFront(n int) { + if n <= 0 { + return + } + + t.Lock() + defer t.Unlock() + + if t.bufsize == 0 { + return + } else if t.bufsize < n { + panic("尝试移除的数值大于长度") + } else if t.bufsize == n { + t.bufsize = 0 + } else { + t.bufsize = copy(t.buf, t.buf[n:t.bufsize]) + } + + t.modifiedTime = time.Now() +} + +func (t *bufB) removeBack(n int) { + if n <= 0 { + return + } + + t.Lock() + defer t.Unlock() + + if t.bufsize == 0 { + return + } else if t.bufsize < n { + panic("尝试移除的数值大于长度") + } else if t.bufsize == n { + t.bufsize = 0 + } else { + t.bufsize -= n + } + + t.modifiedTime = time.Now() +} + +func (t *bufB) setModifiedTime() { + t.Lock() + defer t.Unlock() + + t.modifiedTime = time.Now() +} + +func (t *bufB) getModifiedTime() time.Time { + t.RLock() + defer t.RUnlock() + + return t.modifiedTime +} + +func (t *bufB) hadModified(mt time.Time) bool { + t.RLock() + defer t.RUnlock() + + return !t.modifiedTime.Equal(mt) +} + +// // 通常情况下使用getCopyBuf替代 +func (t *bufB) getPureBuf() (buf []byte) { + t.RLock() + defer t.RUnlock() + + return t.buf[:t.bufsize] +} + +func (t *bufB) getCopyBuf() (buf []byte) { + t.RLock() + defer t.RUnlock() + + buf = make([]byte, t.bufsize) + copy(buf, t.buf[:t.bufsize]) + return } diff --git a/Reply/stream.go b/Reply/stream.go index 7054b67..b32c511 100644 --- a/Reply/stream.go +++ b/Reply/stream.go @@ -607,7 +607,7 @@ func (t *M4SStream) saveStreamFlv() (e error) { if e := r.Wait(); e != nil && !errors.Is(err, io.EOF) { if reqf.IsCancel(err) { t.log.L(`I: `, `flv下载停止`) - } else if !reqf.IsTimeout(err) { + } else if err != nil && !reqf.IsTimeout(err) { e = err t.log.L(`E: `, `flv下载失败:`, err) } @@ -640,8 +640,10 @@ func (t *M4SStream) saveStreamM4s() (e error) { // var ( - buf []byte - fmp4Decoder = &Fmp4Decoder{} + buf bufB + fmp4KeyFrames bufB + fmp4KeyFramesBuf []byte + fmp4Decoder = &Fmp4Decoder{} ) // 下载循环 @@ -719,10 +721,11 @@ func (t *M4SStream) saveStreamM4s() (e error) { link.status = 3 // 设置切片状态为下载失败 } } else { - if usedt := r.UsedTime.Seconds(); usedt > 700 { - t.log.L(`I: `, `hls切片下载慢`, usedt, `ms`) - } - link.data = r.Respon + // if usedt := r.UsedTime.Seconds(); usedt > 700 { + // t.log.L(`I: `, `hls切片下载慢`, usedt, `ms`) + // } + link.data = make([]byte, len(r.Respon)) + copy(link.data, r.Respon) link.status = 2 // 设置切片状态为下载完成 } }(v, t.Current_save_path) @@ -733,64 +736,85 @@ func (t *M4SStream) saveStreamM4s() (e error) { } // 传递已下载切片 - { - for _, v := range download_seq { - if v.status == 2 { - if strings.Contains(v.Base, `h`) { - if header, e := fmp4Decoder.Init_fmp4(v.data); e != nil { - t.log.L(`E: `, e, ` 重试!`) - v.status = 3 - break - } else { - for _, trak := range fmp4Decoder.traks { - t.log.L(`T: `, "找到trak:", string(trak.handlerType), trak.trackID, trak.timescale) - } - t.first_buf = header - if out != nil { - out.Write(t.first_buf, true) - out.Sync() - } - } - download_seq = download_seq[1:] - continue - } else if t.first_buf == nil { - download_seq = download_seq[1:] - continue - } + for k := 0; k < len(download_seq); k++ { + v := download_seq[k] - download_seq = download_seq[1:] - buf = append(buf, v.data...) + if v.status != 2 { + if v.tryDownCount >= 4 { + //下载了4次,任未下载成功,忽略此块 + download_seq = append(download_seq[:k], download_seq[k+1:]...) + k -= 1 + continue + } else { + break + } + } - fmp4KeyFrames, last_avilable_offset, err := fmp4Decoder.Seach_stream_fmp4(buf) - if err != nil { - t.log.L(`E: `, err) - if err.Error() == "未初始化traks" { - e = err - return - } - //丢弃所有数据 - last_avilable_offset = len(buf) + if strings.Contains(v.Base, `h`) { + if header, e := fmp4Decoder.Init_fmp4(v.data); e != nil { + t.log.L(`E: `, e, `重试!`) + v.status = 3 + break + } else { + for _, trak := range fmp4Decoder.traks { + // fmt.Println(`T: `, "找到trak:", string(trak.handlerType), trak.trackID, trak.timescale) + t.log.L(`T: `, "找到trak:", string(trak.handlerType), trak.trackID, trak.timescale) } - - for _, fmp4KeyFrame := range fmp4KeyFrames { - t.bootBufPush(fmp4KeyFrame) - t.Stream_msg.Push_tag(`data`, fmp4KeyFrame) - if out != nil { - out.Write(fmp4KeyFrame, true) - out.Sync() - } + t.first_buf = header + if out != nil { + out.Write(t.first_buf, true) + out.Sync() } + } + download_seq = append(download_seq[:k], download_seq[k+1:]...) + k -= 1 + continue + } else if t.first_buf == nil { + download_seq = append(download_seq[:k], download_seq[k+1:]...) + k -= 1 + continue + } + + buf.append(v.data) + download_seq = append(download_seq[:k], download_seq[k+1:]...) + k -= 1 + + last_avilable_offset, err := fmp4Decoder.Seach_stream_fmp4(buf.getCopyBuf(), &fmp4KeyFrames) + if err != nil { + if !errors.Is(err, io.EOF) { + t.log.L(`E: `, err) + + // no, _ := v.getNo() + // file.New("error/"+strconv.Itoa(no)+".m4s", 0, true).Write(buf.getCopyBuf(), true) + // file.New("error/"+strconv.Itoa(no)+"S.m4s", 0, true).Write(v.data, true) - if last_avilable_offset > 0 { - buf = buf[last_avilable_offset:] + if err.Error() == "未初始化traks" { + e = err + return } - } else if v.tryDownCount >= 4 { - //下载了4次,任未下载成功,忽略此块 - download_seq = download_seq[1:] + //丢弃所有数据 + buf.reset() } else { - break + fmp4KeyFrames.reset() + last_avilable_offset = 0 } } + + // no, _ := v.getNo() + // fmt.Println(no, "fmp4KeyFrames", len(fmp4KeyFrames), last_avilable_offset, err) + + if !fmp4KeyFrames.isEmpty() { + fmp4KeyFramesBuf = fmp4KeyFrames.getCopyBuf() + fmp4KeyFrames.reset() + t.bootBufPush(fmp4KeyFramesBuf) + t.Stream_msg.Push_tag(`data`, fmp4KeyFramesBuf) + if out != nil { + out.Write(fmp4KeyFramesBuf, true) + out.Sync() + } + } + + buf.removeFront(last_avilable_offset) } // 停止录制 diff --git a/Reply/ws_msg/POPULAR_RANK_CHANGED.go b/Reply/ws_msg/POPULAR_RANK_CHANGED.go new file mode 100644 index 0000000..f754111 --- /dev/null +++ b/Reply/ws_msg/POPULAR_RANK_CHANGED.go @@ -0,0 +1,14 @@ +package part + +type POPULAR_RANK_CHANGED struct { + Cmd string `json:"cmd"` + Data struct { + UID int `json:"uid"` + Rank int `json:"rank"` + Countdown int `json:"countdown"` + Timestamp int `json:"timestamp"` + CacheKey string `json:"cache_key"` + } `json:"data"` +} + +// {"cmd":"POPULAR_RANK_CHANGED","data":{"uid":13046,"rank":77,"countdown":1421,"timestamp":1672662980,"cache_key":"rank_change:e759ceed19234dbd9517829adb9b0b6c"}} diff --git a/demo/config/config_disable_msg.json b/demo/config/config_disable_msg.json index 511d3e6..4d1754a 100644 --- a/demo/config/config_disable_msg.json +++ b/demo/config/config_disable_msg.json @@ -3,5 +3,6 @@ "PK_BATTLE_PRE":false, "LIVE_INTERACTIVE_GAME":false, "room_admin_entrance":false, - "LIKE_INFO_V3_CLICK":false + "LIKE_INFO_V3_CLICK":false, + "PK_BATTLE_ENTRANCE": false } \ No newline at end of file diff --git a/go.mod b/go.mod index 2626407..e05a0c8 100644 --- a/go.mod +++ b/go.mod @@ -15,11 +15,13 @@ require ( require ( github.com/andybalholm/brotli v1.0.4 // indirect + github.com/davecgh/go-spew v1.1.1 // indirect github.com/go-ole/go-ole v1.2.6 // indirect github.com/gorilla/websocket v1.5.0 // indirect github.com/klauspost/compress v1.15.11 // indirect github.com/miekg/dns v1.1.50 // indirect github.com/mitchellh/mapstructure v1.5.0 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect github.com/shirou/gopsutil v3.21.11+incompatible // indirect github.com/stretchr/testify v1.8.1 // indirect github.com/thedevsaddam/gojsonq/v2 v2.5.2 // indirect @@ -30,5 +32,6 @@ require ( golang.org/x/net v0.0.0-20221014081412-f15817d10f9b // indirect golang.org/x/sys v0.0.0-20221013171732-95e765b1cc43 // indirect golang.org/x/tools v0.1.12 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect rsc.io/qr v0.2.0 // indirect ) diff --git a/go.sum b/go.sum index 4fc02e0..4012266 100644 --- a/go.sum +++ b/go.sum @@ -3,8 +3,8 @@ github.com/andybalholm/brotli v1.0.4 h1:V7DdXeJtZscaqfNuAdSRuRFzuiKlHSC/Zh3zl9qY github.com/andybalholm/brotli v1.0.4/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig= github.com/christopher-dG/go-obs-websocket v0.0.0-20200720193653-c4fed10356a5 h1:UFBgEMSPv6a2vgzowHOPphVit+ZBNQ3+4Q+dEBgwIww= github.com/christopher-dG/go-obs-websocket v0.0.0-20200720193653-c4fed10356a5/go.mod h1:P5w+dDqQEbCMFAkmucNcEQ6xgAt/NP+Aw58OQfY/H/o= -github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/go-ole/go-ole v1.2.5/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0= github.com/go-ole/go-ole v1.2.6 h1:/Fpf6oFPoeFik9ty7siob0G6Ke8QvQEuVcuChpwXzpY= @@ -34,36 +34,8 @@ github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyua github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/qydysky/part v0.10.8 h1:3VSWkV1SaIUXfUdsOFn04N7wNfPTOYEScrkuni/0e1M= -github.com/qydysky/part v0.10.8/go.mod h1:B3GD/j5jmvwfKtnzDWqRYFqnwOXEyoUg/jShFk1yQSM= -github.com/qydysky/part v0.10.9 h1:WkmHRkmPYDCmgWGRFH1A8PcqkHJ+vmC2Ge0jktkgdYY= -github.com/qydysky/part v0.10.9/go.mod h1:B3GD/j5jmvwfKtnzDWqRYFqnwOXEyoUg/jShFk1yQSM= -github.com/qydysky/part v0.10.10 h1:R0GhqB1d1oFuOywJ1lFO8pqgBgZBa981sAF6nrzCqRw= -github.com/qydysky/part v0.10.10/go.mod h1:B3GD/j5jmvwfKtnzDWqRYFqnwOXEyoUg/jShFk1yQSM= -github.com/qydysky/part v0.10.11 h1:6KeTx3accgmzv+HwMP8mcYmWhWxAO7wxyPisQWJTfS4= -github.com/qydysky/part v0.10.11/go.mod h1:B3GD/j5jmvwfKtnzDWqRYFqnwOXEyoUg/jShFk1yQSM= -github.com/qydysky/part v0.10.12 h1:i5eMB/AMncs0pqWD3z1E8FivxiyDGGrsIQtKbkOjW/Y= -github.com/qydysky/part v0.10.12/go.mod h1:B3GD/j5jmvwfKtnzDWqRYFqnwOXEyoUg/jShFk1yQSM= -github.com/qydysky/part v0.10.13 h1:X2pzesWFRTyEw164sP3AaijYiRH3WoaUcldzklrmcI8= -github.com/qydysky/part v0.10.13/go.mod h1:B3GD/j5jmvwfKtnzDWqRYFqnwOXEyoUg/jShFk1yQSM= -github.com/qydysky/part v0.10.14 h1:Pf13/0mbl7Gc0qRnJXUDnzALqJKRDL88ZbQm5JI2nS0= -github.com/qydysky/part v0.10.14/go.mod h1:B3GD/j5jmvwfKtnzDWqRYFqnwOXEyoUg/jShFk1yQSM= -github.com/qydysky/part v0.10.15 h1:vKGhzhY5HGJGOzygkbBbEgs0Nmc6ddNNhqLf/a4fxQk= -github.com/qydysky/part v0.10.15/go.mod h1:B3GD/j5jmvwfKtnzDWqRYFqnwOXEyoUg/jShFk1yQSM= -github.com/qydysky/part v0.10.16 h1:mzrCQEPkQOH4MK+vdPAvJboYhQYBdiKbw2QdvDKinX0= -github.com/qydysky/part v0.10.16/go.mod h1:B3GD/j5jmvwfKtnzDWqRYFqnwOXEyoUg/jShFk1yQSM= github.com/qydysky/part v0.10.17 h1:xcMgJaEvPlOPAEfOniTZZM/pDiafWW3FA5ZQXNPthpI= github.com/qydysky/part v0.10.17/go.mod h1:B3GD/j5jmvwfKtnzDWqRYFqnwOXEyoUg/jShFk1yQSM= -github.com/qydysky/part v0.10.18 h1:GGHuGBsKI6h41EzzVml3FMbqDS1dh88wSDmQBfq2jtw= -github.com/qydysky/part v0.10.18/go.mod h1:BG0tulTKW58jSkC0EZ0MrxDHe+gkPULfGNzksiGCayw= -github.com/qydysky/part v0.18.19 h1:s4CL+ljiGhySX633x0ohN6NA2c7T/BWg5YVpjm4xo30= -github.com/qydysky/part v0.18.19/go.mod h1:BG0tulTKW58jSkC0EZ0MrxDHe+gkPULfGNzksiGCayw= -github.com/qydysky/part v0.19.0 h1:H0esXkedZKrYdEEACC76O+DL9TZDkApow67RYLSyT0U= -github.com/qydysky/part v0.19.0/go.mod h1:BG0tulTKW58jSkC0EZ0MrxDHe+gkPULfGNzksiGCayw= -github.com/qydysky/part v0.19.1 h1:3AdjJtm5Q594Jd/My1E1wQIiRPXgmS+KM1DJhofLr1M= -github.com/qydysky/part v0.19.1/go.mod h1:BG0tulTKW58jSkC0EZ0MrxDHe+gkPULfGNzksiGCayw= -github.com/qydysky/part v0.19.2 h1:peR1UBrBgnjB63nv5F100oJ72hRoJnn8cuZPXDiGZOM= -github.com/qydysky/part v0.19.2/go.mod h1:BG0tulTKW58jSkC0EZ0MrxDHe+gkPULfGNzksiGCayw= github.com/qydysky/part v0.20.0 h1:JkAdTrGwXjbL8FJuiinKK8Vrd2HU/rcRD+Bdx4RpGGw= github.com/qydysky/part v0.20.0/go.mod h1:BG0tulTKW58jSkC0EZ0MrxDHe+gkPULfGNzksiGCayw= github.com/shirou/gopsutil v3.20.12+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA= @@ -76,9 +48,9 @@ github.com/skratchdot/open-golang v0.0.0-20200116055534-eef842397966/go.mod h1:s github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= -github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMTY= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/thedevsaddam/gojsonq v2.3.0+incompatible/go.mod h1:RBcQaITThgJAAYKH7FNp2onYodRz8URfsuEGpAch0NA= github.com/thedevsaddam/gojsonq/v2 v2.5.2 h1:CoMVaYyKFsVj6TjU6APqAhAvC07hTI6IQen8PHzHYY0= @@ -147,8 +119,8 @@ golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8T golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= rsc.io/qr v0.2.0 h1:6vBLea5/NRMVTz8V66gipeLycZMl/+UlFmk8DvqQ6WY= rsc.io/qr v0.2.0/go.mod h1:IF+uZjkb9fqyeF/4tlBoynqmQxUoPfWEKh921coOuXs= -- 2.39.2