]> 127.0.0.1 Git - bili_danmu/.git/commitdiff
Fix #84
authorqydysky <qydysky@foxmail.com>
Wed, 9 Aug 2023 17:19:28 +0000 (01:19 +0800)
committerqydysky <qydysky@foxmail.com>
Wed, 9 Aug 2023 17:19:28 +0000 (01:19 +0800)
F/F.go
F/F_test.go [new file with mode: 0644]
Reply/F.go
Reply/stream.go
go.mod
go.sum

diff --git a/F/F.go b/F/F.go
index 7ddf1b8b104462b46cda0ddcb8dc0ca37c062191..c7ca5f1033c36a45f7e7447854ae4e9c59ce7a63 100644 (file)
--- a/F/F.go
+++ b/F/F.go
@@ -136,3 +136,82 @@ func CookieCheck(key []string) (missKey []string) {
        }
        return
 }
+
+// just faster, use in right way
+//
+// eg. ParseQuery(`http://1.com/2?workspace=1`, "workspace=") => `1`
+func ParseQuery(rawURL, key string) string {
+       s := 0
+       for i := 0; i < len(rawURL); i++ {
+               if rawURL[i] == '?' {
+                       s = i + 1
+                       break
+               }
+       }
+
+       for i := s; i < len(rawURL); i++ {
+               for j := 0; i < len(rawURL) && j < len(key); j, i = j+1, i+1 {
+                       if rawURL[i] != key[j] {
+                               break
+                       } else if j == len(key)-1 {
+                               s = i + 1
+                               i = len(rawURL)
+                               break
+                       }
+               }
+       }
+
+       d := s
+       for ; d < len(rawURL); d++ {
+               if rawURL[d] == '&' || rawURL[d] == '#' {
+                       break
+               }
+       }
+
+       return rawURL[s:d]
+}
+
+// just faster, use in right way
+//
+// eg. ParseHost(`http://1.com/2`) => `1.com`
+func ParseHost(rawURL string) string {
+       s := 0
+       for i := 0; i < len(rawURL); i++ {
+               for j := 0; i < len(rawURL) && j < len("//"); j, i = j+1, i+1 {
+                       if rawURL[i] != "//"[j] {
+                               break
+                       } else if j == len("//")-1 {
+                               s = i + 1
+                               i = len(rawURL)
+                               break
+                       }
+               }
+       }
+
+       d := s
+       for i := s; i < len(rawURL); i++ {
+               if rawURL[i] == '/' {
+                       d = i
+                       break
+               }
+       }
+
+       return rawURL[s:d]
+}
+
+// just faster, use in right way
+//
+// eg. ResolveReferenceLast(`http://1.com/2`, `1`) => `http://1.com/1`
+func ResolveReferenceLast(rawURL, ref string) string {
+       s := 0
+       for i := 0; i < len(rawURL); i++ {
+               if rawURL[i] == '/' {
+                       s = i
+               }
+               if rawURL[i] == '?' || rawURL[i] == '#' {
+                       break
+               }
+       }
+
+       return rawURL[:s+1] + ref
+}
diff --git a/F/F_test.go b/F/F_test.go
new file mode 100644 (file)
index 0000000..9dacc25
--- /dev/null
@@ -0,0 +1,30 @@
+package F
+
+import (
+       "net/url"
+       "testing"
+)
+
+func Test2(t *testing.T) {
+       rawURL := "http://127.0.0.1:10841/1?workspace=/codefile/qydysky.code-workspace#12"
+       u, _ := url.Parse(rawURL)
+       if u.Host != ParseHost(rawURL) {
+               t.Fatal()
+       }
+       if u.Query().Get("workspace") != ParseQuery(rawURL, "workspace=") {
+               t.Log(u.Query().Get("workspace"))
+               t.Log(ParseQuery(rawURL, "workspace="))
+               t.Fatal()
+       }
+}
+
+func Test3(t *testing.T) {
+       rawURL := "http://127.0.0.1:10841/1?workspace=/codefile/qydysky.code-workspace#12"
+
+       u, _ := url.Parse(rawURL)
+       u1, _ := url.Parse("./2")
+
+       if u.ResolveReference(u1).String() != ResolveReferenceLast(rawURL, "2") {
+               t.Fatal()
+       }
+}
index 93a01df5e3c75fb1ab1f2f1fedb5442879bf97a9..57145b7770ae393ed8f239ef62c35de187964616 100644 (file)
@@ -8,6 +8,7 @@ import (
        "fmt"
        "io"
        "math"
+       "net"
        "net/http"
        "net/http/pprof"
        "net/url"
@@ -1399,6 +1400,7 @@ func init() {
                                w.Header().Set("Retry-After", "1")
                                w.WriteHeader(http.StatusServiceUnavailable)
                                flog.L(`E: `, `无指定路径`)
+                               return
                        }
 
                        if rpath != `/now/` {
@@ -1493,7 +1495,7 @@ func init() {
                                return
                        }
 
-                       w.WriteHeader(http.StatusOK)
+                       // w.WriteHeader(http.StatusOK)
 
                        // 推送数据
                        {
@@ -1505,7 +1507,10 @@ func init() {
                                        flog.L(`T: `, r.RemoteAddr, `断开直播流`)
                                        return nil
                                }
-                               if e := currentStreamO.PusherToHttp(w, r, startFunc, stopFunc); e != nil {
+
+                               conn, _ := r.Context().Value(c.C.SerF).(net.Conn)
+
+                               if e := currentStreamO.PusherToHttp(conn, w, r, startFunc, stopFunc); e != nil {
                                        flog.L(`W: `, e)
                                }
                        }
index 5d1793e0a84d8e7460a2270b1b1b5c35e7dcca94..228402d7a7706fbdbe4da19939a6622aa1470a9c 100644 (file)
@@ -10,6 +10,7 @@ import (
        "fmt"
        "io"
        "io/fs"
+       "net"
        "net/http"
        "net/url"
        "os"
@@ -258,8 +259,7 @@ func (t *M4SStream) fetchCheckStream() bool {
                }
 
                // 显示使用流服务器
-               u, _ := url.Parse(v.Url)
-               t.log.L(`I: `, `使用流服务器`, u.Host)
+               t.log.L(`I: `, `使用流服务器`, F.ParseHost(v.Url))
        }
 
        return len(t.common.Live) != 0
@@ -282,20 +282,14 @@ func (t *M4SStream) fetchParseM3U8() (m4s_links []*m4s_link_item, m3u8_addon []b
                        continue
                }
 
-               m3u8_url, err := url.Parse(v.Url)
-               if err != nil {
-                       e = err
-                       return
-               }
-
                // 设置请求参数
                rval := reqf.Rval{
-                       Url:     m3u8_url.String(),
+                       Url:     v.Url,
                        Retry:   2,
                        Timeout: 2000,
                        Proxy:   c.C.Proxy,
                        Header: map[string]string{
-                               `Host`:            m3u8_url.Host,
+                               `Host`:            F.ParseHost(v.Url),
                                `User-Agent`:      c.UA,
                                `Accept`:          `*/*`,
                                `Accept-Language`: `zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2`,
@@ -314,7 +308,7 @@ func (t *M4SStream) fetchParseM3U8() (m4s_links []*m4s_link_item, m3u8_addon []b
                if err := r.Reqf(rval); err != nil {
                        // 1min后重新启用
                        t.common.Live[k].DisableAuto()
-                       t.log.L("W: ", fmt.Sprintf("服务器 %s 发生故障 %s", m3u8_url.Host, err.Error()))
+                       t.log.L("W: ", fmt.Sprintf("服务器 %s 发生故障 %s", F.ParseHost(v.Url), err.Error()))
                        if t.common.ValidLive() == nil {
                                e = errors.New("全部流服务器发生故障")
                                break
@@ -339,6 +333,7 @@ func (t *M4SStream) fetchParseM3U8() (m4s_links []*m4s_link_item, m3u8_addon []b
 
                // base64解码
                if len(m3u8_respon) != 0 && !bytes.Contains(m3u8_respon, []byte("#")) {
+                       var err error
                        m3u8_respon, err = base64.StdEncoding.DecodeString(string(m3u8_respon))
                        if err != nil {
                                e = err
@@ -386,13 +381,6 @@ func (t *M4SStream) fetchParseM3U8() (m4s_links []*m4s_link_item, m3u8_addon []b
                                continue
                        }
 
-                       //获取切片地址
-                       u, err := url.Parse("./" + m4s_link + "?trid=" + m3u8_url.Query().Get("trid"))
-                       if err != nil {
-                               e = err
-                               return
-                       }
-
                        {
                                tmpBase := m4s_link
                                // fmt.Println(tmpBase, t.last_m4s != nil)
@@ -412,7 +400,8 @@ func (t *M4SStream) fetchParseM3U8() (m4s_links []*m4s_link_item, m3u8_addon []b
                        // fmt.Println("->", m4s_link)
                        //将切片添加到返回切片数组
                        p := t.getM4s()
-                       p.Url = m3u8_url.ResolveReference(u).String()
+                       //获取切片地址
+                       p.Url = F.ResolveReferenceLast(v.Url, m4s_link+"?trid="+F.ParseQuery(v.Url, "trid="))
                        p.Base = m4s_link
                        p.createdTime = time.Now()
                        tmp = append(tmp, p)
@@ -431,7 +420,7 @@ func (t *M4SStream) fetchParseM3U8() (m4s_links []*m4s_link_item, m3u8_addon []b
                        if timed > 5 && nos-noe == 0 {
                                // 1min后重新启用
                                t.common.Live[k].DisableAuto()
-                               t.log.L("W: ", fmt.Sprintf("服务器 %s 发生故障 %d 秒产出了 %d 切片", m3u8_url.Host, int(timed), nos-noe))
+                               t.log.L("W: ", fmt.Sprintf("服务器 %s 发生故障 %d 秒产出了 %d 切片", F.ParseHost(v.Url), int(timed), nos-noe))
                                if t.common.ValidLive() == nil {
                                        e = errors.New("全部切片服务器发生故障")
                                        break
@@ -497,17 +486,11 @@ func (t *M4SStream) fetchParseM3U8() (m4s_links []*m4s_link_item, m3u8_addon []b
                        // 补充m3u8
                        m3u8_addon = append([]byte("#EXTINF:1.00\n"+strconv.Itoa(guess_no)+".m4s\n"), m3u8_addon...)
 
-                       //获取切片地址
-                       u, err := url.Parse("./" + strconv.Itoa(guess_no) + `.m4s`)
-                       if err != nil {
-                               e = err
-                               return
-                       }
-
                        //将切片添加到返回切片数组前
                        p := t.getM4s()
-                       p.Url = m3u8_url.ResolveReference(u).String()
                        p.Base = strconv.Itoa(guess_no) + `.m4s`
+                       //获取切片地址
+                       p.Url = F.ResolveReferenceLast(v.Url, p.Base)
                        p.createdTime = time.Now()
                        m4s_links = append([]*m4s_link_item{p}, m4s_links...)
                }
@@ -1402,7 +1385,7 @@ func (t *M4SStream) PusherToFile(contextC context.Context, filepath string, star
 }
 
 // 流服务推送方法
-func (t *M4SStream) PusherToHttp(w http.ResponseWriter, r *http.Request, startFunc func(*M4SStream) error, stopFunc func(*M4SStream) error) error {
+func (t *M4SStream) PusherToHttp(conn net.Conn, w http.ResponseWriter, r *http.Request, startFunc func(*M4SStream) error, stopFunc func(*M4SStream) error) error {
        switch t.stream_type {
        case `m3u8`:
                fallthrough
@@ -1463,7 +1446,8 @@ func (t *M4SStream) PusherToHttp(w http.ResponseWriter, r *http.Request, startFu
        }
 
        //
-       cancelRec := t.Stream_msg.Pull_tag_async(map[string]func([]byte) bool{
+       var cancelRec func()
+       cancelRec = t.Stream_msg.Pull_tag_async(map[string]func([]byte) bool{
                `data`: func(b []byte) bool {
                        select {
                        case <-r.Context().Done():
@@ -1473,6 +1457,16 @@ func (t *M4SStream) PusherToHttp(w http.ResponseWriter, r *http.Request, startFu
                        if len(b) == 0 {
                                return true
                        }
+
+                       // 1s内写入失败,关闭conn,防止协程泄漏
+                       done := time.AfterFunc(time.Second, func() {
+                               cancelRec()
+                               if conn != nil {
+                                       println(conn.Close())
+                               }
+                       }).Stop
+                       defer done()
+
                        if _, err := w.Write(b); err != nil {
                                return true
                        } else if flushSupport {
diff --git a/go.mod b/go.mod
index e733fe0dac0f3ab5b7a58bc90b6a273dc383f34f..194913587f62093aa2bd93067c9cb07be38d6692 100644 (file)
--- a/go.mod
+++ b/go.mod
@@ -5,7 +5,7 @@ go 1.20
 require (
        github.com/gotk3/gotk3 v0.6.2
        github.com/mdp/qrterminal/v3 v3.1.1
-       github.com/qydysky/part v0.28.1-0.20230808193421-14e125f1be9a
+       github.com/qydysky/part v0.28.1-0.20230809171140-df54e42857d2
        github.com/skip2/go-qrcode v0.0.0-20200617195104-da1b6568686e
        github.com/skratchdot/open-golang v0.0.0-20200116055534-eef842397966
        golang.org/x/text v0.12.0
diff --git a/go.sum b/go.sum
index 5f0880bf6c0ee788f22878737186bcc2439de96f..118a51b2b27b35b5ebc07a4fc6d97f253e9bcfc5 100644 (file)
--- a/go.sum
+++ b/go.sum
@@ -30,8 +30,8 @@ github.com/mdp/qrterminal/v3 v3.1.1/go.mod h1:5lJlXe7Jdr8wlPDdcsJttv1/knsRgzXASy
 github.com/miekg/dns v1.1.55 h1:GoQ4hpsj0nFLYe+bWiCToyrBEJXkQfOOIvFGFy0lEgo=
 github.com/miekg/dns v1.1.55/go.mod h1:uInx36IzPl7FYnDcMeVWxj9byh7DutNykX4G9Sj60FY=
 github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
-github.com/qydysky/part v0.28.1-0.20230808193421-14e125f1be9a h1:9DVBL8FYsmBR/FoqwfH2lg848P13+09tU00qc7m0efc=
-github.com/qydysky/part v0.28.1-0.20230808193421-14e125f1be9a/go.mod h1:CdkAHZ+OxieG1sI4M6UowP9j0QQDnhtDtN4tWsylCPU=
+github.com/qydysky/part v0.28.1-0.20230809171140-df54e42857d2 h1:+jPSFEDiM0inIjTCtsMhn/NJruwe18iZQbtWqAL7Cgk=
+github.com/qydysky/part v0.28.1-0.20230809171140-df54e42857d2/go.mod h1:CdkAHZ+OxieG1sI4M6UowP9j0QQDnhtDtN4tWsylCPU=
 github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE=
 github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo=
 github.com/shirou/gopsutil v3.21.11+incompatible h1:+1+c1VGhc88SSonWP6foOcLhvnKlUeu/erjjvaPEYiI=