]> 127.0.0.1 Git - bili_danmu/.git/commitdiff
Fix 直播流回放有概率导致goroutine泄漏 #84
authorqydysky <qydysky@foxmail.com>
Fri, 1 Sep 2023 15:40:24 +0000 (23:40 +0800)
committerqydysky <qydysky@foxmail.com>
Fri, 1 Sep 2023 15:40:24 +0000 (23:40 +0800)
Reply/F.go
Reply/F/comp.go
Reply/F/reSetMp4TimeStamp/reSetMp4TimeStamp.go
Reply/stream.go
demo/config/config_K_v.json

index 2dab9e53972e8c843d95826d1ba7f321693bb75b..e68ae065818492dd4bfaf2af5190a43d4546e0ee 100644 (file)
@@ -1526,6 +1526,13 @@ func init() {
 
                                conn, _ := r.Context().Value(c.C.SerF).(net.Conn)
 
+                               // 在客户端存在某种代理时,将有可能无法监测到客户端关闭,这有可能导致goroutine泄漏
+                               if to, ok := c.C.K_v.LoadV(`直播流回放限时min`).(float64); ok && to > 0 {
+                                       if e := conn.SetDeadline(time.Now().Add(time.Duration(int(time.Minute) * int(to)))); e != nil {
+                                               flog.L(`W: `, `设置直播流回放限时min错误`, e)
+                                       }
+                               }
+
                                if e := currentStreamO.PusherToHttp(conn, w, r, startFunc, stopFunc); e != nil {
                                        flog.L(`W: `, e)
                                }
index afa099e56cecee4517681bf151ac5fffe9bbfd18..3f3f9781ef97a076bb1dd08a58fdc747c37e9521 100644 (file)
@@ -3,7 +3,6 @@ package f
 import (
        "github.com/qydysky/bili_danmu/Reply/F/danmuXml"
        "github.com/qydysky/bili_danmu/Reply/F/liveOver"
-       "github.com/qydysky/bili_danmu/Reply/F/reSetMp4TimeStamp"
        comp "github.com/qydysky/part/component"
 )
 
@@ -11,7 +10,7 @@ func init() {
        var linkMap = map[string][]string{
                "github.com/qydysky/bili_danmu/Reply.startRecDanmu.stop": {
                        comp.Sign[danmuXml.Sign](`toXml`),
-                       comp.Sign[reSetMp4TimeStamp.Sign](`resetTS`),
+                       // comp.Sign[reSetMp4TimeStamp.Sign](`resetTS`),
                },
                "github.com/qydysky/bili_danmu/Reply.SerF.player.ws": {
                        comp.Sign[danmuXml.Sign](`toXml`),
index a4a0959789b6cc6f4fac9e5a2b636ae18a6be0eb..9198ccca67541d0c1f847340c023e4a7fbe14199 100644 (file)
@@ -41,26 +41,11 @@ func resetTS(ctx context.Context, ptr *string) error {
        var tfhdBuf = make([]byte, 12)
        var boxBuf = make([]byte, 4)
        var trackBuf = make([]byte, 4)
-       var timescaleBuf = make([]byte, 4)
-       var timescale int32
+       var mdhdBuf = make([]byte, 4)
+       var timescale = make(map[int32]int32)
        var opTs = make(map[int32]int)
        var cuTs = make(map[int32]int)
 
-       if e := f.SeekUntil([]byte("mvhd"), file.AtCurrent, 1<<17, 1<<22); e != nil && !errors.Is(e, file.ErrMaxReadSizeReach) {
-               return e
-       }
-       if _, e := f.Read(boxBuf); e != nil {
-               return e
-       } else if !bytes.Equal(boxBuf, []byte("mvhd")) {
-               return fmt.Errorf("wrong box:%v", string(boxBuf))
-       }
-       _ = f.SeekIndex(12, file.AtCurrent)
-       if _, e := f.Read(timescaleBuf); e != nil {
-               return e
-       }
-       timescale = btoi32(timescaleBuf, 0)
-       fmt.Printf("resetTS timescale:%v\n", timescale)
-
        for {
                if e := f.SeekUntil([]byte("tkhd"), file.AtCurrent, 1<<17, 1<<22); e != nil {
                        if errors.Is(e, file.ErrMaxReadSizeReach) {
@@ -81,13 +66,31 @@ func resetTS(ctx context.Context, ptr *string) error {
                        return e
                }
                trackId := btoi32(trackBuf, 0)
-               fmt.Printf("trackId %v\n", trackId)
+
+               if e := f.SeekUntil([]byte("mdhd"), file.AtCurrent, 1<<17, 1<<22); e != nil {
+                       if errors.Is(e, file.ErrMaxReadSizeReach) {
+                               break
+                       }
+                       if errors.Is(e, io.EOF) {
+                               break
+                       }
+                       return e
+               }
+               if _, e := f.Read(boxBuf); e != nil {
+                       return e
+               } else if !bytes.Equal(boxBuf, []byte("mdhd")) {
+                       return fmt.Errorf("wrong box:%v", string(boxBuf))
+               }
+               _ = f.SeekIndex(12, file.AtCurrent)
+               if _, e := f.Read(mdhdBuf); e != nil {
+                       return e
+               }
+
                opTs[trackId] = -1
                cuTs[trackId] = 0
+               timescale[trackId] = btoi32(mdhdBuf, 0)
        }
 
-       // fmt.Println("resetTimeStamp Druation %v(%v-%v)", time.Duration(int(time.Second)*(cuTS-opTs)), cuTS, opTs)
-
        _ = f.SeekIndex(0, file.AtOrigin)
 
        for {
@@ -151,26 +154,27 @@ func resetTS(ctx context.Context, ptr *string) error {
        for k, v := range opTs {
                fmt.Printf("track %v opTs:%v cuTS:%v\n", k, v, cuTs[k])
        }
-       // fmt.Println("resetTimeStamp Druation %v(%v-%v)", time.Duration(int(time.Second)*(cuTS-opTs)), cuTS, opTs)
-
-       var duration int32
-       for k, v := range opTs {
-               duration = int32(cuTs[k]-v) / timescale
-               break
-       }
-       fmt.Printf("resetTS dur:%v\n", duration)
 
-       _ = f.SeekIndex(0, file.AtOrigin)
-
-       if e := f.SeekUntil([]byte("mvhd"), file.AtCurrent, 1<<17, 1<<22); !errors.Is(e, file.ErrMaxReadSizeReach) {
-               return e
-       }
-       _ = f.SeekIndex(20, file.AtCurrent)
-       if _, e := f.Write(itob32(duration), false); e != nil {
-               return e
+       // reset timestamp
+       // write mvhd
+       {
+               var duration int32
+               for k, v := range opTs {
+                       duration = int32(cuTs[k]-v) / timescale[k]
+                       break
+               }
+               _ = f.SeekIndex(0, file.AtOrigin)
+               if e := f.SeekUntil([]byte("mvhd"), file.AtCurrent, 1<<17, 1<<22); e != nil {
+                       return e
+               }
+               _ = f.SeekIndex(20, file.AtCurrent)
+               fmt.Printf("mvhd %v \n", duration)
+               if _, e := f.Write(itob32(duration), false); e != nil {
+                       return e
+               }
        }
 
-       // write tkhd
+       // write tkhd mdhd
        _ = f.SeekIndex(0, file.AtOrigin)
        for i := 0; i < len(opTs); i++ {
                if e := f.SeekUntil([]byte("tkhd"), file.AtCurrent, 1<<17, 1<<20); e != nil {
@@ -188,7 +192,27 @@ func resetTS(ctx context.Context, ptr *string) error {
                }
                trackID := btoi32(trackBuf, 0)
                _ = f.SeekIndex(4, file.AtCurrent)
-               if _, e := f.Write(itob32(int32(cuTs[trackID]-opTs[trackID])/timescale), false); e != nil {
+               fmt.Printf("tkhd %v \n", int32(cuTs[trackID]-opTs[trackID])/timescale[trackID])
+               if _, e := f.Write(itob32(int32(cuTs[trackID]-opTs[trackID])/timescale[trackID]), false); e != nil {
+                       return e
+               }
+
+               if e := f.SeekUntil([]byte("mdhd"), file.AtCurrent, 1<<17, 1<<22); e != nil {
+                       if errors.Is(e, file.ErrMaxReadSizeReach) {
+                               continue
+                       }
+                       if errors.Is(e, io.EOF) {
+                               break
+                       }
+                       return e
+               }
+               if _, e := f.Read(boxBuf); e != nil {
+                       return e
+               } else if !bytes.Equal(boxBuf, []byte("mdhd")) {
+                       return fmt.Errorf("wrong box:%v", string(boxBuf))
+               }
+               _ = f.SeekIndex(16, file.AtCurrent)
+               if _, e := f.Write(itob32(0), false); e != nil {
                        return e
                }
        }
index 56d8c79f50b63c43905a373c3846b11f1129eb63..99f739f66bbdecd335a311588e3dba15b83bdac4 100644 (file)
@@ -35,6 +35,7 @@ import (
        signal "github.com/qydysky/part/signal"
        slice "github.com/qydysky/part/slice"
        pstring "github.com/qydysky/part/strings"
+       pweb "github.com/qydysky/part/web"
 )
 
 type M4SStream struct {
@@ -1400,6 +1401,8 @@ func (t *M4SStream) PusherToFile(contextC context.Context, filepath string, star
 }
 
 // 流服务推送方法
+//
+// 在客户端存在某种代理时,将有可能无法监测到客户端关闭,这有可能导致goroutine泄漏
 func (t *M4SStream) PusherToHttp(conn net.Conn, w http.ResponseWriter, r *http.Request, startFunc func(*M4SStream) error, stopFunc func(*M4SStream) error) error {
        switch t.stream_type {
        case `m3u8`:
@@ -1417,10 +1420,7 @@ func (t *M4SStream) PusherToHttp(conn net.Conn, w http.ResponseWriter, r *http.R
                return e
        }
 
-       flusher, flushSupport := w.(http.Flusher)
-       if flushSupport {
-               flusher.Flush()
-       }
+       w = pweb.WithFlush(w)
 
        //写入头
        {
@@ -1435,8 +1435,6 @@ func (t *M4SStream) PusherToHttp(conn net.Conn, w http.ResponseWriter, r *http.R
                        if len(t.getFirstBuf()) != 0 {
                                if _, err := w.Write(t.getFirstBuf()); err != nil {
                                        return err
-                               } else if flushSupport {
-                                       flusher.Flush()
                                }
                                break
                        }
@@ -1455,14 +1453,9 @@ func (t *M4SStream) PusherToHttp(conn net.Conn, w http.ResponseWriter, r *http.R
                if _, err := w.Write(t.boot_buf); err != nil {
                        return err
                }
-               if flushSupport {
-                       flusher.Flush()
-               }
        }
 
-       //
-       var cancelRec func()
-       cancelRec = t.Stream_msg.Pull_tag_async(map[string]func([]byte) bool{
+       var cancelRec = t.Stream_msg.Pull_tag_async(map[string]func([]byte) bool{
                `data`: func(b []byte) bool {
                        select {
                        case <-r.Context().Done():
@@ -1472,20 +1465,10 @@ func (t *M4SStream) PusherToHttp(conn net.Conn, w http.ResponseWriter, r *http.R
                        if len(b) == 0 {
                                return true
                        }
-
-                       // 1s内写入失败,关闭conn,防止协程泄漏
-                       done := time.AfterFunc(time.Second, func() {
-                               cancelRec()
-                               if conn != nil {
-                                       conn.Close()
-                               }
-                       }).Stop
-                       defer done()
-
-                       if _, err := w.Write(b); err != nil {
+                       if n, err := w.Write(b); err != nil || n == 0 {
+                               return true
+                       } else if e := conn.SetWriteDeadline(time.Now().Add(time.Second * 10)); e != nil {
                                return true
-                       } else if flushSupport {
-                               flusher.Flush()
                        }
                        return false
                },
@@ -1493,6 +1476,7 @@ func (t *M4SStream) PusherToHttp(conn net.Conn, w http.ResponseWriter, r *http.R
                        return true
                },
        })
+
        <-r.Context().Done()
        cancelRec()
 
index e6f7d412e91ffcd9f7e61bf4ab33147f87fe2fd2..14d83c75ee9f1537d063782022921a1564454487 100644 (file)
     "弹幕回放": true,
     "直播流回放速率-help": "速率为每秒速率 例最小值(1 MB)",
     "直播流回放速率": "2 MB",
+    "直播流回放限时min": 60,
     "直播流回放连接限制-help": "限制回放连接数,<0无限制,=0禁止,>0最大数量",
     "直播流回放连接限制": [
         {