From 24dffb978e8ff1bf42baf8fbb26e82d97d951452 Mon Sep 17 00:00:00 2001 From: qydysky Date: Mon, 15 Aug 2022 23:42:36 +0800 Subject: [PATCH] tidy --- Reply/F.go | 75 +++++++----------------- Reply/stream.go | 104 +++++++++++++++++++++++++++++++-- demo/config/config_K_v.json | 12 ++-- demo/html/artPlayer/index.html | 2 +- demo/main.go | 23 ++++---- 5 files changed, 140 insertions(+), 76 deletions(-) diff --git a/Reply/F.go b/Reply/F.go index 03c4112..a8cd4e3 100644 --- a/Reply/F.go +++ b/Reply/F.go @@ -1034,8 +1034,23 @@ func init() { s := web.New(&http.Server{ Addr: addr, }) - var ( - root = func(w http.ResponseWriter, r *http.Request) { + + s.Handle(map[string]func(http.ResponseWriter, *http.Request){ + `/`: func(w http.ResponseWriter, r *http.Request) { + if v, ok := c.C.K_v.LoadV(`直播流保存位置`).(string); ok && v != "" { + http.FileServer(http.Dir(v)).ServeHTTP(w, r) + } else { + flog.L(`W: `, `直播流保存位置无效`) + } + }, + `/now/`: func(w http.ResponseWriter, r *http.Request) { + var path string = r.URL.Path[4:] + if path == `` { + path = `index.html` + } + http.ServeFile(w, r, "html/artPlayer/"+path) + }, + `/mp4`: func(w http.ResponseWriter, r *http.Request) { //header w.Header().Set("Access-Control-Allow-Credentials", "true") w.Header().Set("Access-Control-Allow-Headers", "*") @@ -1050,66 +1065,18 @@ func init() { currentStreamO = v.(*M4SStream) } - if len(currentStreamO.getFirstM4S()) == 0 { + // 未准备好 + if !currentStreamO.Status.Islive() { w.Header().Set("Retry-After", "1") w.WriteHeader(http.StatusServiceUnavailable) return } - // path = base_dir+path - w.Header().Set("Content-Type", "video/mp4") w.WriteHeader(http.StatusOK) - flusher, flushSupport := w.(http.Flusher) - if flushSupport { - flusher.Flush() - } - - //写入hls头 - if _, err := w.Write(currentStreamO.getFirstM4S()); err != nil { - return - } else if flushSupport { - flusher.Flush() - } - - cancel := make(chan struct{}) - - //hls切片 - currentStreamO.Newst_m4s.Pull_tag(map[string]func(interface{}) bool{ - `m4s`: func(data interface{}) bool { - if b, ok := data.([]byte); ok { - if len(b) == 0 { - close(cancel) - return true - } - if _, err := w.Write(b); err != nil { - close(cancel) - return true - } else if flushSupport { - flusher.Flush() - } - } - return false - }, - `close`: func(data interface{}) bool { - close(cancel) - return true - }, - }) - - <-cancel - } - ) - - s.Handle(map[string]func(http.ResponseWriter, *http.Request){ - `/`: func(w http.ResponseWriter, r *http.Request) { - var path string = r.URL.Path[1:] - if path == `` { - path = `index.html` - } - http.ServeFile(w, r, "html/artPlayer/"+path) + // 推送数据 + currentStreamO.Pusher(w, r) }, - `/mp4`: root, `/ws`: func(w http.ResponseWriter, r *http.Request) { //获取通道 conn := StreamWs.WS(w, r) diff --git a/Reply/stream.go b/Reply/stream.go index 5ce9875..2cb5140 100644 --- a/Reply/stream.go +++ b/Reply/stream.go @@ -33,11 +33,11 @@ type M4SStream struct { config M4SStream_Config //配置 stream_last_modified time.Time //流地址更新时间 // stream_expires int64 //流到期时间 - last_m4s *m4s_link_item //最后一个切片 stream_hosts sync.Map //使用的流服务器 stream_type string //流类型 - Newst_m4s *msgq.Msgq //m4s消息 tag:m4s + Stream_msg *msgq.Msgq //流数据消息 tag:data first_m4s []byte //m4s起始块 + last_m4s *m4s_link_item //最后一个切片 common c.Common //通用配置副本 Current_save_path string //明确的直播流保存目录 Callback_start func(*M4SStream) //开始的回调 @@ -425,6 +425,29 @@ func (t *M4SStream) saveStreamM4s() { Max: 3, } + // 直播流切片缓冲 + var ( + streamWebCache chan []byte + streamWebCacheLen int + ) + if v, ok := t.common.K_v.LoadV(`直播Web缓冲长度`).(float64); ok && v != 0 { + streamWebCacheLen = int(v) + streamWebCache = make(chan []byte, streamWebCacheLen) + defer close(streamWebCache) + go func() { + for { + if len(streamWebCache) <= streamWebCacheLen/2 { + time.Sleep(time.Second) + } + if data := <-streamWebCache; len(data) != 0 { + t.Stream_msg.Push_tag(`data`, data) + } else { + return + } + } + }() + } + // 下载循环 for download_seq := []*m4s_link_item{}; ; { @@ -503,7 +526,14 @@ func (t *M4SStream) saveStreamM4s() { if v.status == 2 { download_seq = download_seq[1:] - t.Newst_m4s.Push_tag(`m4s`, v.data) + if streamWebCache != nil { + if streamWebCacheLen == len(streamWebCache) { + <-streamWebCache + } + streamWebCache <- v.data + } else { + t.Stream_msg.Push_tag(`data`, v.data) + } } else { break } @@ -562,7 +592,7 @@ func (t *M4SStream) saveStreamM4s() { } // 发送空字节会导致流服务终止 - t.Newst_m4s.Push_tag(`m4s`, []byte{}) + t.Stream_msg.Push_tag(`data`, []byte{}) // 结束 if p.Checkfile().IsExist(t.Current_save_path + "0.m3u8.dtmp") { @@ -596,7 +626,7 @@ func (t *M4SStream) Start() bool { t.reqPool = t.common.ReqPool // 初始化切片消息 - t.Newst_m4s = msgq.New(15) + t.Stream_msg = msgq.New(15) // 主循环 for t.Status.Islive() { @@ -642,3 +672,67 @@ func (t *M4SStream) Stop() { t.log.L(`I: `, `正在等待切片下载...`) t.exitSign.Wait() } + +// 流服务推送方法 +func (t *M4SStream) Pusher(w http.ResponseWriter, r *http.Request) { + switch t.stream_type { + case `m3u8`: + t.pusherM4s(w, r) + case `flv`: + t.pusherFlv(w, r) + default: + t.log.L(`E: `, `no support stream_type`) + } +} + +func (t *M4SStream) pusherM4s(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "video/mp4") + + flusher, flushSupport := w.(http.Flusher) + if flushSupport { + flusher.Flush() + } + + //写入hls头 + if _, err := w.Write(t.getFirstM4S()); err != nil { + return + } else if flushSupport { + flusher.Flush() + } + + cancel := make(chan struct{}) + + //hls切片 + t.Stream_msg.Pull_tag(map[string]func(interface{}) bool{ + `data`: func(data interface{}) bool { + if b, ok := data.([]byte); ok { + if len(b) == 0 { + close(cancel) + return true + } + if _, err := w.Write(b); err != nil { + close(cancel) + return true + } else if flushSupport { + flusher.Flush() + } + } + return false + }, + `close`: func(data interface{}) bool { + close(cancel) + return true + }, + }) + + <-cancel +} + +func (t *M4SStream) pusherFlv(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "video/x-flv") + + flusher, flushSupport := w.(http.Flusher) + if flushSupport { + flusher.Flush() + } +} diff --git a/demo/config/config_K_v.json b/demo/config/config_K_v.json index d9708c9..0d3540f 100644 --- a/demo/config/config_K_v.json +++ b/demo/config/config_K_v.json @@ -6,7 +6,7 @@ "弹幕私信": "", "弹幕私信(额外)": "[弹幕机测试 额外]:弹幕", "TTS_配置-help": "将会运行[TTS_使用程序路径 获取的音频路径 TTS_使用程序参数]", - "TTS_总开关": true, + "TTS_总开关": false, "TTS_服务器-help": "baidu:百度翻译合成 youdao:有道TTS xf:讯飞TTS", "TTS_服务器": "baidu", "TTS_服务器_youdaoId": "", @@ -17,7 +17,7 @@ "TTS_服务器_xfVoice-help": "讯飞发音人 xiaoyan:小燕甜美女声 aisjiuxu:许久亲切男声 aisxping:小萍知性女声 aisjinger:小婧亲切女声 aisbabyxu:许小宝可爱童声 random:随机", "TTS_服务器_xfVoice": "random", "TTS_使用程序路径": "ffplay", - "TTS_使用程序参数": "-autoexit -nodisp -volume 60", + "TTS_使用程序参数": "-autoexit -nodisp -volume 10", "弹幕-help": "弹幕相关", "弹幕_礼物金额显示阈值": 20, "gtk": "GTK相关", @@ -28,7 +28,7 @@ "save_to_json": "", "get_xiao_xinxin-help": "获取小心心", "get_xiao_xinxin": true, - "小心心端口": 0, + "小心心端口": 10000, "自动打开小心心浏览器": true, "小心心nodjs加密服务地址-help": "请查看项目README", "小心心nodjs加密服务地址": "", @@ -58,16 +58,18 @@ "Gtk弹幕窗": false, "调用obs": false, "直播流清晰度-help": "清晰度可选-1:不保存 0:默认 10000:原画 800:4K 401:蓝光(杜比) 400:蓝光 250:超清 150:高清 80:流畅,无提供所选清晰度时,使用低一档清晰度", - "直播流清晰度": 0, + "直播流清晰度": 10000, "直播流类型-help": "flv or hls", "直播流类型": "hls", - "直播流保存位置": "./live", + "直播流保存位置": "E:\\test\\", "直播hls流缓冲": 20, "直播hls流均衡-help":"true:使用所有hls服务器", "直播hls流均衡": true, "仅保存当前直播间流-help": "启用此项,才会保存Ass", "仅保存当前直播间流": true, "直播Web服务口":0, + "直播Web缓冲长度-help":"非负整数,越长直播流延迟越高越流畅内存占用越高", + "直播Web缓冲长度":10, "ass-help": "只有保存直播流时才考虑生成ass,ass编码默认GB18030(可选utf-8)", "生成Ass弹幕": true, "Ass编码": "GB18030", diff --git a/demo/html/artPlayer/index.html b/demo/html/artPlayer/index.html index 584fdcc..5488e38 100644 --- a/demo/html/artPlayer/index.html +++ b/demo/html/artPlayer/index.html @@ -14,6 +14,6 @@
- + \ No newline at end of file diff --git a/demo/main.go b/demo/main.go index 1c87551..a0a5dcf 100644 --- a/demo/main.go +++ b/demo/main.go @@ -6,10 +6,11 @@ import ( // "runtime/pprof" "os" // "log" - // "net/http" "fmt" + // "net/http" // _ "net/http/pprof" "runtime/debug" + // "github.com/skratchdot/open-golang/open" q "github.com/qydysky/bili_danmu" ) @@ -22,29 +23,29 @@ func main() { // open.Run("http://127.0.0.1:8899/debug/pprof/goroutine?debug=2") // time.Sleep(time.Duration(3)*time.Second) // }() - go func(){ - fmt.Printf("PID:%d\n",os.Getpid()) - for{ + go func() { + fmt.Printf("PID:%d\n", os.Getpid()) + for { View() - time.Sleep(time.Duration(60)*time.Second) + time.Sleep(time.Duration(60) * time.Second) { debug.FreeOSMemory() } } }() // f, err := os.OpenFile("cpu.pprof", os.O_RDWR|os.O_CREATE, 0644) - // if err != nil { - // log.Fatal(err) + // if err != nil { + // log.Fatal(err) // } - // defer f.Close() - // pprof.StartCPUProfile(f) + // defer f.Close() + // pprof.StartCPUProfile(f) q.Start() // pprof.StopCPUProfile() } -func View(){ +func View() { // var memStats runtime.MemStats // runtime.ReadMemStats(&memStats) // fmt.Printf("=====\n") @@ -52,4 +53,4 @@ func View(){ // fmt.Printf("GC次数:%v \n",memStats.NumGC) // fmt.Printf("堆 :%v %v MB\n",memStats.HeapInuse/1024e2/8,(memStats.HeapIdle - memStats.HeapReleased)/1024e2/8) // fmt.Printf("=====\n") -} \ No newline at end of file +} -- 2.39.2