From fc234b9911588a0acf0c8f54a073698bcf3050bb Mon Sep 17 00:00:00 2001 From: qydysky Date: Fri, 1 Sep 2023 23:40:24 +0800 Subject: [PATCH] =?utf8?q?Fix=20=20=E7=9B=B4=E6=92=AD=E6=B5=81=E5=9B=9E?= =?utf8?q?=E6=94=BE=E6=9C=89=E6=A6=82=E7=8E=87=E5=AF=BC=E8=87=B4goroutine?= =?utf8?q?=E6=B3=84=E6=BC=8F=20#84?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit --- Reply/F.go | 7 ++ Reply/F/comp.go | 3 +- .../F/reSetMp4TimeStamp/reSetMp4TimeStamp.go | 100 +++++++++++------- Reply/stream.go | 34 ++---- demo/config/config_K_v.json | 1 + 5 files changed, 80 insertions(+), 65 deletions(-) diff --git a/Reply/F.go b/Reply/F.go index 2dab9e5..e68ae06 100644 --- a/Reply/F.go +++ b/Reply/F.go @@ -1526,6 +1526,13 @@ func init() { conn, _ := r.Context().Value(c.C.SerF).(net.Conn) + // 在客户端存在某种代理时,将有可能无法监测到客户端关闭,这有可能导致goroutine泄漏 + if to, ok := c.C.K_v.LoadV(`直播流回放限时min`).(float64); ok && to > 0 { + if e := conn.SetDeadline(time.Now().Add(time.Duration(int(time.Minute) * int(to)))); e != nil { + flog.L(`W: `, `设置直播流回放限时min错误`, e) + } + } + if e := currentStreamO.PusherToHttp(conn, w, r, startFunc, stopFunc); e != nil { flog.L(`W: `, e) } diff --git a/Reply/F/comp.go b/Reply/F/comp.go index afa099e..3f3f978 100644 --- a/Reply/F/comp.go +++ b/Reply/F/comp.go @@ -3,7 +3,6 @@ package f import ( "github.com/qydysky/bili_danmu/Reply/F/danmuXml" "github.com/qydysky/bili_danmu/Reply/F/liveOver" - "github.com/qydysky/bili_danmu/Reply/F/reSetMp4TimeStamp" comp "github.com/qydysky/part/component" ) @@ -11,7 +10,7 @@ func init() { var linkMap = map[string][]string{ "github.com/qydysky/bili_danmu/Reply.startRecDanmu.stop": { comp.Sign[danmuXml.Sign](`toXml`), - comp.Sign[reSetMp4TimeStamp.Sign](`resetTS`), + // comp.Sign[reSetMp4TimeStamp.Sign](`resetTS`), }, "github.com/qydysky/bili_danmu/Reply.SerF.player.ws": { comp.Sign[danmuXml.Sign](`toXml`), diff --git a/Reply/F/reSetMp4TimeStamp/reSetMp4TimeStamp.go b/Reply/F/reSetMp4TimeStamp/reSetMp4TimeStamp.go index a4a0959..9198ccc 100644 --- a/Reply/F/reSetMp4TimeStamp/reSetMp4TimeStamp.go +++ b/Reply/F/reSetMp4TimeStamp/reSetMp4TimeStamp.go @@ -41,26 +41,11 @@ func resetTS(ctx context.Context, ptr *string) error { var tfhdBuf = make([]byte, 12) var boxBuf = make([]byte, 4) var trackBuf = make([]byte, 4) - var timescaleBuf = make([]byte, 4) - var timescale int32 + var mdhdBuf = make([]byte, 4) + var timescale = make(map[int32]int32) var opTs = make(map[int32]int) var cuTs = make(map[int32]int) - if e := f.SeekUntil([]byte("mvhd"), file.AtCurrent, 1<<17, 1<<22); e != nil && !errors.Is(e, file.ErrMaxReadSizeReach) { - return e - } - if _, e := f.Read(boxBuf); e != nil { - return e - } else if !bytes.Equal(boxBuf, []byte("mvhd")) { - return fmt.Errorf("wrong box:%v", string(boxBuf)) - } - _ = f.SeekIndex(12, file.AtCurrent) - if _, e := f.Read(timescaleBuf); e != nil { - return e - } - timescale = btoi32(timescaleBuf, 0) - fmt.Printf("resetTS timescale:%v\n", timescale) - for { if e := f.SeekUntil([]byte("tkhd"), file.AtCurrent, 1<<17, 1<<22); e != nil { if errors.Is(e, file.ErrMaxReadSizeReach) { @@ -81,13 +66,31 @@ func resetTS(ctx context.Context, ptr *string) error { return e } trackId := btoi32(trackBuf, 0) - fmt.Printf("trackId %v\n", trackId) + + if e := f.SeekUntil([]byte("mdhd"), file.AtCurrent, 1<<17, 1<<22); e != nil { + if errors.Is(e, file.ErrMaxReadSizeReach) { + break + } + if errors.Is(e, io.EOF) { + break + } + return e + } + if _, e := f.Read(boxBuf); e != nil { + return e + } else if !bytes.Equal(boxBuf, []byte("mdhd")) { + return fmt.Errorf("wrong box:%v", string(boxBuf)) + } + _ = f.SeekIndex(12, file.AtCurrent) + if _, e := f.Read(mdhdBuf); e != nil { + return e + } + opTs[trackId] = -1 cuTs[trackId] = 0 + timescale[trackId] = btoi32(mdhdBuf, 0) } - // fmt.Println("resetTimeStamp Druation %v(%v-%v)", time.Duration(int(time.Second)*(cuTS-opTs)), cuTS, opTs) - _ = f.SeekIndex(0, file.AtOrigin) for { @@ -151,26 +154,27 @@ func resetTS(ctx context.Context, ptr *string) error { for k, v := range opTs { fmt.Printf("track %v opTs:%v cuTS:%v\n", k, v, cuTs[k]) } - // fmt.Println("resetTimeStamp Druation %v(%v-%v)", time.Duration(int(time.Second)*(cuTS-opTs)), cuTS, opTs) - - var duration int32 - for k, v := range opTs { - duration = int32(cuTs[k]-v) / timescale - break - } - fmt.Printf("resetTS dur:%v\n", duration) - _ = f.SeekIndex(0, file.AtOrigin) - - if e := f.SeekUntil([]byte("mvhd"), file.AtCurrent, 1<<17, 1<<22); !errors.Is(e, file.ErrMaxReadSizeReach) { - return e - } - _ = f.SeekIndex(20, file.AtCurrent) - if _, e := f.Write(itob32(duration), false); e != nil { - return e + // reset timestamp + // write mvhd + { + var duration int32 + for k, v := range opTs { + duration = int32(cuTs[k]-v) / timescale[k] + break + } + _ = f.SeekIndex(0, file.AtOrigin) + if e := f.SeekUntil([]byte("mvhd"), file.AtCurrent, 1<<17, 1<<22); e != nil { + return e + } + _ = f.SeekIndex(20, file.AtCurrent) + fmt.Printf("mvhd %v \n", duration) + if _, e := f.Write(itob32(duration), false); e != nil { + return e + } } - // write tkhd + // write tkhd mdhd _ = f.SeekIndex(0, file.AtOrigin) for i := 0; i < len(opTs); i++ { if e := f.SeekUntil([]byte("tkhd"), file.AtCurrent, 1<<17, 1<<20); e != nil { @@ -188,7 +192,27 @@ func resetTS(ctx context.Context, ptr *string) error { } trackID := btoi32(trackBuf, 0) _ = f.SeekIndex(4, file.AtCurrent) - if _, e := f.Write(itob32(int32(cuTs[trackID]-opTs[trackID])/timescale), false); e != nil { + fmt.Printf("tkhd %v \n", int32(cuTs[trackID]-opTs[trackID])/timescale[trackID]) + if _, e := f.Write(itob32(int32(cuTs[trackID]-opTs[trackID])/timescale[trackID]), false); e != nil { + return e + } + + if e := f.SeekUntil([]byte("mdhd"), file.AtCurrent, 1<<17, 1<<22); e != nil { + if errors.Is(e, file.ErrMaxReadSizeReach) { + continue + } + if errors.Is(e, io.EOF) { + break + } + return e + } + if _, e := f.Read(boxBuf); e != nil { + return e + } else if !bytes.Equal(boxBuf, []byte("mdhd")) { + return fmt.Errorf("wrong box:%v", string(boxBuf)) + } + _ = f.SeekIndex(16, file.AtCurrent) + if _, e := f.Write(itob32(0), false); e != nil { return e } } diff --git a/Reply/stream.go b/Reply/stream.go index 56d8c79..99f739f 100644 --- a/Reply/stream.go +++ b/Reply/stream.go @@ -35,6 +35,7 @@ import ( signal "github.com/qydysky/part/signal" slice "github.com/qydysky/part/slice" pstring "github.com/qydysky/part/strings" + pweb "github.com/qydysky/part/web" ) type M4SStream struct { @@ -1400,6 +1401,8 @@ func (t *M4SStream) PusherToFile(contextC context.Context, filepath string, star } // 流服务推送方法 +// +// 在客户端存在某种代理时,将有可能无法监测到客户端关闭,这有可能导致goroutine泄漏 func (t *M4SStream) PusherToHttp(conn net.Conn, w http.ResponseWriter, r *http.Request, startFunc func(*M4SStream) error, stopFunc func(*M4SStream) error) error { switch t.stream_type { case `m3u8`: @@ -1417,10 +1420,7 @@ func (t *M4SStream) PusherToHttp(conn net.Conn, w http.ResponseWriter, r *http.R return e } - flusher, flushSupport := w.(http.Flusher) - if flushSupport { - flusher.Flush() - } + w = pweb.WithFlush(w) //写入头 { @@ -1435,8 +1435,6 @@ func (t *M4SStream) PusherToHttp(conn net.Conn, w http.ResponseWriter, r *http.R if len(t.getFirstBuf()) != 0 { if _, err := w.Write(t.getFirstBuf()); err != nil { return err - } else if flushSupport { - flusher.Flush() } break } @@ -1455,14 +1453,9 @@ func (t *M4SStream) PusherToHttp(conn net.Conn, w http.ResponseWriter, r *http.R if _, err := w.Write(t.boot_buf); err != nil { return err } - if flushSupport { - flusher.Flush() - } } - // - var cancelRec func() - cancelRec = t.Stream_msg.Pull_tag_async(map[string]func([]byte) bool{ + var cancelRec = t.Stream_msg.Pull_tag_async(map[string]func([]byte) bool{ `data`: func(b []byte) bool { select { case <-r.Context().Done(): @@ -1472,20 +1465,10 @@ func (t *M4SStream) PusherToHttp(conn net.Conn, w http.ResponseWriter, r *http.R if len(b) == 0 { return true } - - // 1s内写入失败,关闭conn,防止协程泄漏 - done := time.AfterFunc(time.Second, func() { - cancelRec() - if conn != nil { - conn.Close() - } - }).Stop - defer done() - - if _, err := w.Write(b); err != nil { + if n, err := w.Write(b); err != nil || n == 0 { + return true + } else if e := conn.SetWriteDeadline(time.Now().Add(time.Second * 10)); e != nil { return true - } else if flushSupport { - flusher.Flush() } return false }, @@ -1493,6 +1476,7 @@ func (t *M4SStream) PusherToHttp(conn net.Conn, w http.ResponseWriter, r *http.R return true }, }) + <-r.Context().Done() cancelRec() diff --git a/demo/config/config_K_v.json b/demo/config/config_K_v.json index e6f7d41..14d83c7 100644 --- a/demo/config/config_K_v.json +++ b/demo/config/config_K_v.json @@ -123,6 +123,7 @@ "弹幕回放": true, "直播流回放速率-help": "速率为每秒速率 例最小值(1 MB)", "直播流回放速率": "2 MB", + "直播流回放限时min": 60, "直播流回放连接限制-help": "限制回放连接数,<0无限制,=0禁止,>0最大数量", "直播流回放连接限制": [ { -- 2.39.2