From: qydysky Date: Wed, 9 Aug 2023 17:19:28 +0000 (+0800) Subject: Fix #84 X-Git-Tag: v0.10.11~1 X-Git-Url: http://127.0.0.1:8081/?a=commitdiff_plain;h=744ab5b26adc8f25c059528f69287211ce2dedfc;p=bili_danmu%2F.git Fix #84 --- diff --git a/F/F.go b/F/F.go index 7ddf1b8..c7ca5f1 100644 --- a/F/F.go +++ b/F/F.go @@ -136,3 +136,82 @@ func CookieCheck(key []string) (missKey []string) { } return } + +// just faster, use in right way +// +// eg. ParseQuery(`http://1.com/2?workspace=1`, "workspace=") => `1` +func ParseQuery(rawURL, key string) string { + s := 0 + for i := 0; i < len(rawURL); i++ { + if rawURL[i] == '?' { + s = i + 1 + break + } + } + + for i := s; i < len(rawURL); i++ { + for j := 0; i < len(rawURL) && j < len(key); j, i = j+1, i+1 { + if rawURL[i] != key[j] { + break + } else if j == len(key)-1 { + s = i + 1 + i = len(rawURL) + break + } + } + } + + d := s + for ; d < len(rawURL); d++ { + if rawURL[d] == '&' || rawURL[d] == '#' { + break + } + } + + return rawURL[s:d] +} + +// just faster, use in right way +// +// eg. ParseHost(`http://1.com/2`) => `1.com` +func ParseHost(rawURL string) string { + s := 0 + for i := 0; i < len(rawURL); i++ { + for j := 0; i < len(rawURL) && j < len("//"); j, i = j+1, i+1 { + if rawURL[i] != "//"[j] { + break + } else if j == len("//")-1 { + s = i + 1 + i = len(rawURL) + break + } + } + } + + d := s + for i := s; i < len(rawURL); i++ { + if rawURL[i] == '/' { + d = i + break + } + } + + return rawURL[s:d] +} + +// just faster, use in right way +// +// eg. ResolveReferenceLast(`http://1.com/2`, `1`) => `http://1.com/1` +func ResolveReferenceLast(rawURL, ref string) string { + s := 0 + for i := 0; i < len(rawURL); i++ { + if rawURL[i] == '/' { + s = i + } + if rawURL[i] == '?' || rawURL[i] == '#' { + break + } + } + + return rawURL[:s+1] + ref +} diff --git a/F/F_test.go b/F/F_test.go new file mode 100644 index 0000000..9dacc25 --- /dev/null +++ b/F/F_test.go @@ -0,0 +1,30 @@ +package F + +import ( + "net/url" + "testing" +) + +func Test2(t *testing.T) { + rawURL := "http://127.0.0.1:10841/1?workspace=/codefile/qydysky.code-workspace#12" + u, _ := url.Parse(rawURL) + if u.Host != ParseHost(rawURL) { + t.Fatal() + } + if u.Query().Get("workspace") != ParseQuery(rawURL, "workspace=") { + t.Log(u.Query().Get("workspace")) + t.Log(ParseQuery(rawURL, "workspace=")) + t.Fatal() + } +} + +func Test3(t *testing.T) { + rawURL := "http://127.0.0.1:10841/1?workspace=/codefile/qydysky.code-workspace#12" + + u, _ := url.Parse(rawURL) + u1, _ := url.Parse("./2") + + if u.ResolveReference(u1).String() != ResolveReferenceLast(rawURL, "2") { + t.Fatal() + } +} diff --git a/Reply/F.go b/Reply/F.go index 93a01df..57145b7 100644 --- a/Reply/F.go +++ b/Reply/F.go @@ -8,6 +8,7 @@ import ( "fmt" "io" "math" + "net" "net/http" "net/http/pprof" "net/url" @@ -1399,6 +1400,7 @@ func init() { w.Header().Set("Retry-After", "1") w.WriteHeader(http.StatusServiceUnavailable) flog.L(`E: `, `无指定路径`) + return } if rpath != `/now/` { @@ -1493,7 +1495,7 @@ func init() { return } - w.WriteHeader(http.StatusOK) + // w.WriteHeader(http.StatusOK) // 推送数据 { @@ -1505,7 +1507,10 @@ func init() { flog.L(`T: `, r.RemoteAddr, `断开直播流`) return nil } - if e := currentStreamO.PusherToHttp(w, r, startFunc, stopFunc); e != nil { + + conn, _ := r.Context().Value(c.C.SerF).(net.Conn) + + if e := currentStreamO.PusherToHttp(conn, w, r, startFunc, stopFunc); e != nil { flog.L(`W: `, e) } } diff --git a/Reply/stream.go b/Reply/stream.go index 5d1793e..228402d 100644 --- a/Reply/stream.go +++ b/Reply/stream.go @@ -10,6 +10,7 @@ import ( "fmt" "io" "io/fs" + "net" "net/http" "net/url" "os" @@ -258,8 +259,7 @@ func (t *M4SStream) fetchCheckStream() bool { } // 显示使用流服务器 - u, _ := url.Parse(v.Url) - t.log.L(`I: `, `使用流服务器`, u.Host) + t.log.L(`I: `, `使用流服务器`, F.ParseHost(v.Url)) } return len(t.common.Live) != 0 @@ -282,20 +282,14 @@ func (t *M4SStream) fetchParseM3U8() (m4s_links []*m4s_link_item, m3u8_addon []b continue } - m3u8_url, err := url.Parse(v.Url) - if err != nil { - e = err - return - } - // 设置请求参数 rval := reqf.Rval{ - Url: m3u8_url.String(), + Url: v.Url, Retry: 2, Timeout: 2000, Proxy: c.C.Proxy, Header: map[string]string{ - `Host`: m3u8_url.Host, + `Host`: F.ParseHost(v.Url), `User-Agent`: c.UA, `Accept`: `*/*`, `Accept-Language`: `zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2`, @@ -314,7 +308,7 @@ func (t *M4SStream) fetchParseM3U8() (m4s_links []*m4s_link_item, m3u8_addon []b if err := r.Reqf(rval); err != nil { // 1min后重新启用 t.common.Live[k].DisableAuto() - t.log.L("W: ", fmt.Sprintf("服务器 %s 发生故障 %s", m3u8_url.Host, err.Error())) + t.log.L("W: ", fmt.Sprintf("服务器 %s 发生故障 %s", F.ParseHost(v.Url), err.Error())) if t.common.ValidLive() == nil { e = errors.New("全部流服务器发生故障") break @@ -339,6 +333,7 @@ func (t *M4SStream) fetchParseM3U8() (m4s_links []*m4s_link_item, m3u8_addon []b // base64解码 if len(m3u8_respon) != 0 && !bytes.Contains(m3u8_respon, []byte("#")) { + var err error m3u8_respon, err = base64.StdEncoding.DecodeString(string(m3u8_respon)) if err != nil { e = err @@ -386,13 +381,6 @@ func (t *M4SStream) fetchParseM3U8() (m4s_links []*m4s_link_item, m3u8_addon []b continue } - //获取切片地址 - u, err := url.Parse("./" + m4s_link + "?trid=" + m3u8_url.Query().Get("trid")) - if err != nil { - e = err - return - } - { tmpBase := m4s_link // fmt.Println(tmpBase, t.last_m4s != nil) @@ -412,7 +400,8 @@ func (t *M4SStream) fetchParseM3U8() (m4s_links []*m4s_link_item, m3u8_addon []b // fmt.Println("->", m4s_link) //将切片添加到返回切片数组 p := t.getM4s() - p.Url = m3u8_url.ResolveReference(u).String() + //获取切片地址 + p.Url = F.ResolveReferenceLast(v.Url, m4s_link+"?trid="+F.ParseQuery(v.Url, "trid=")) p.Base = m4s_link p.createdTime = time.Now() tmp = append(tmp, p) @@ -431,7 +420,7 @@ func (t *M4SStream) fetchParseM3U8() (m4s_links []*m4s_link_item, m3u8_addon []b if timed > 5 && nos-noe == 0 { // 1min后重新启用 t.common.Live[k].DisableAuto() - t.log.L("W: ", fmt.Sprintf("服务器 %s 发生故障 %d 秒产出了 %d 切片", m3u8_url.Host, int(timed), nos-noe)) + t.log.L("W: ", fmt.Sprintf("服务器 %s 发生故障 %d 秒产出了 %d 切片", F.ParseHost(v.Url), int(timed), nos-noe)) if t.common.ValidLive() == nil { e = errors.New("全部切片服务器发生故障") break @@ -497,17 +486,11 @@ func (t *M4SStream) fetchParseM3U8() (m4s_links []*m4s_link_item, m3u8_addon []b // 补充m3u8 m3u8_addon = append([]byte("#EXTINF:1.00\n"+strconv.Itoa(guess_no)+".m4s\n"), m3u8_addon...) - //获取切片地址 - u, err := url.Parse("./" + strconv.Itoa(guess_no) + `.m4s`) - if err != nil { - e = err - return - } - //将切片添加到返回切片数组前 p := t.getM4s() - p.Url = m3u8_url.ResolveReference(u).String() p.Base = strconv.Itoa(guess_no) + `.m4s` + //获取切片地址 + p.Url = F.ResolveReferenceLast(v.Url, p.Base) p.createdTime = time.Now() m4s_links = append([]*m4s_link_item{p}, m4s_links...) } @@ -1402,7 +1385,7 @@ func (t *M4SStream) PusherToFile(contextC context.Context, filepath string, star } // 流服务推送方法 -func (t *M4SStream) PusherToHttp(w http.ResponseWriter, r *http.Request, startFunc func(*M4SStream) error, stopFunc func(*M4SStream) error) error { +func (t *M4SStream) PusherToHttp(conn net.Conn, w http.ResponseWriter, r *http.Request, startFunc func(*M4SStream) error, stopFunc func(*M4SStream) error) error { switch t.stream_type { case `m3u8`: fallthrough @@ -1463,7 +1446,8 @@ func (t *M4SStream) PusherToHttp(w http.ResponseWriter, r *http.Request, startFu } // - cancelRec := t.Stream_msg.Pull_tag_async(map[string]func([]byte) bool{ + var cancelRec func() + cancelRec = t.Stream_msg.Pull_tag_async(map[string]func([]byte) bool{ `data`: func(b []byte) bool { select { case <-r.Context().Done(): @@ -1473,6 +1457,16 @@ func (t *M4SStream) PusherToHttp(w http.ResponseWriter, r *http.Request, startFu if len(b) == 0 { return true } + + // 1s内写入失败,关闭conn,防止协程泄漏 + done := time.AfterFunc(time.Second, func() { + cancelRec() + if conn != nil { + println(conn.Close()) + } + }).Stop + defer done() + if _, err := w.Write(b); err != nil { return true } else if flushSupport { diff --git a/go.mod b/go.mod index e733fe0..1949135 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.20 require ( github.com/gotk3/gotk3 v0.6.2 github.com/mdp/qrterminal/v3 v3.1.1 - github.com/qydysky/part v0.28.1-0.20230808193421-14e125f1be9a + github.com/qydysky/part v0.28.1-0.20230809171140-df54e42857d2 github.com/skip2/go-qrcode v0.0.0-20200617195104-da1b6568686e github.com/skratchdot/open-golang v0.0.0-20200116055534-eef842397966 golang.org/x/text v0.12.0 diff --git a/go.sum b/go.sum index 5f0880b..118a51b 100644 --- a/go.sum +++ b/go.sum @@ -30,8 +30,8 @@ github.com/mdp/qrterminal/v3 v3.1.1/go.mod h1:5lJlXe7Jdr8wlPDdcsJttv1/knsRgzXASy github.com/miekg/dns v1.1.55 h1:GoQ4hpsj0nFLYe+bWiCToyrBEJXkQfOOIvFGFy0lEgo= github.com/miekg/dns v1.1.55/go.mod h1:uInx36IzPl7FYnDcMeVWxj9byh7DutNykX4G9Sj60FY= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= -github.com/qydysky/part v0.28.1-0.20230808193421-14e125f1be9a h1:9DVBL8FYsmBR/FoqwfH2lg848P13+09tU00qc7m0efc= -github.com/qydysky/part v0.28.1-0.20230808193421-14e125f1be9a/go.mod h1:CdkAHZ+OxieG1sI4M6UowP9j0QQDnhtDtN4tWsylCPU= +github.com/qydysky/part v0.28.1-0.20230809171140-df54e42857d2 h1:+jPSFEDiM0inIjTCtsMhn/NJruwe18iZQbtWqAL7Cgk= +github.com/qydysky/part v0.28.1-0.20230809171140-df54e42857d2/go.mod h1:CdkAHZ+OxieG1sI4M6UowP9j0QQDnhtDtN4tWsylCPU= github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE= github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= github.com/shirou/gopsutil v3.21.11+incompatible h1:+1+c1VGhc88SSonWP6foOcLhvnKlUeu/erjjvaPEYiI=