From: qydysky Date: Sun, 3 Dec 2023 03:10:53 +0000 (+0800) Subject: 1 X-Git-Tag: v0.1.20231203031113 X-Git-Url: http://127.0.0.1:8081/?a=commitdiff_plain;h=4109d22ce7b50c7fb888e374bffe2bccb6747882;p=front%2F.git 1 --- diff --git a/config.go b/config.go index 837779b..2598236 100644 --- a/config.go +++ b/config.go @@ -35,7 +35,8 @@ func (t *Route) SwapSign() bool { func (t *Route) GenBack() []*Back { var backLink []*Back - for _, back := range t.Back { + for i := 0; i < len(t.Back); i++ { + back := &t.Back[i] tmpBack := Back{ Name: back.Name, To: back.To, diff --git a/main.go b/main.go index 7adf04d..a72a832 100644 --- a/main.go +++ b/main.go @@ -4,13 +4,9 @@ import ( "context" "encoding/json" "errors" - "flag" "fmt" "io" "net/http" - "os" - "os/signal" - "slices" "strings" "time" @@ -18,89 +14,29 @@ import ( pctx "github.com/qydysky/part/ctx" pfile "github.com/qydysky/part/file" plog "github.com/qydysky/part/log" - psys "github.com/qydysky/part/sys" pweb "github.com/qydysky/part/web" ) -func main() { - // 保持唤醒 - var stop = psys.Sys().PreventSleep() - defer stop.Done() - - // 获取config路径 - configP := flag.String("c", "main.json", "config") - testP := flag.Int("t", 0, "test port") - _ = flag.Bool("q", true, "no warn,error log") - flag.Parse() - - // 日志初始化 - logger := plog.New(plog.Config{ - Stdout: true, - Prefix_string: map[string]struct{}{ - `T:`: plog.On, - `I:`: plog.On, - `W:`: plog.On, - `E:`: plog.On, - }, - }) - - if slices.Contains(os.Args[1:], "-q") { - logger.L(`I:`, "简化输出") - delete(logger.Config.Prefix_string, `E:`) - delete(logger.Config.Prefix_string, `W:`) - delete(logger.Config.Prefix_string, `T:`) - } - - // 根ctx - ctx, cancle := pctx.WithWait(context.Background(), 0, time.Minute*2) - - // 获取config - configS := Config{} - configF := pfile.New(*configP, 0, true) - if !configF.IsExist() { - logger.L(`E:`, "配置不存在") - return - } - defer configF.Close() - - var buf = make([]byte, 1<<16) - - if e := loadConfig(buf, configF, &configS, logger); e != nil { +// 加载 +func LoadPeriod(ctx context.Context, buf []byte, configF *pfile.File, configS *Config, logger *plog.Log_interface) { + if e := loadConfig(buf, configF, configS, logger); e != nil { logger.L(`E:`, "配置加载", e) } - - // 定时加载 - go LoadPeriod(ctx, buf, configF, &configS, logger) - - // 测试响应 - go Test(ctx, *testP, logger) - - go Run(ctx, &configS, logger) - - // ctrl+c退出 - var interrupt = make(chan os.Signal, 2) - signal.Notify(interrupt, os.Interrupt) - <-interrupt - if errors.Is(cancle(), pctx.ErrWaitTo) { - logger.L(`E:`, "退出超时") - } -} - -// 定时加载 -func LoadPeriod(ctx context.Context, buf []byte, configF *pfile.File, configS *Config, logger *plog.Log_interface) { - ctx1, done1 := pctx.WaitCtx(ctx) - defer done1() // 定时加载config - for { - select { - case <-time.After(time.Second * 10): - if e := loadConfig(buf, configF, configS, logger); e != nil { - logger.L(`E:`, "配置加载", e) + go func() { + ctx1, done1 := pctx.WaitCtx(ctx) + defer done1() + for { + select { + case <-time.After(time.Second * 10): + if e := loadConfig(buf, configF, configS, logger); e != nil { + logger.L(`E:`, "配置加载", e) + } + case <-ctx1.Done(): + return } - case <-ctx1.Done(): - return } - } + }() } // 测试 @@ -227,21 +163,49 @@ func applyConfig(ctx context.Context, configS *Config, routeP *pweb.WebPath, log ctx1, done1 := pctx.WaitCtx(ctx) defer done1() - back := backArray[time.Now().UnixMilli()%int64(len(backArray))] + now := time.Now() + backI := now.UnixMilli() % int64(len(backArray)) + + if !backArray[backI].IsLive() { + for backI = 0; backI < int64(len(backArray)); backI++ { + if backArray[backI].IsLive() { + break + } + } + if backI == int64(len(backArray)) { + pweb.WithStatusCode(w, http.StatusServiceUnavailable) + logger.L(`E:`, fmt.Sprintf("%s=> 全部后端失效", path)) + return + } + } - logger.L(`T:`, fmt.Sprintf("%s=>%s", path, back.To)) + logger.L(`T:`, fmt.Sprintf("%s=>%s", path, backArray[backI].Name)) + var e error if r.Header.Get("Upgrade") == "websocket" { - wsDealer(ctx1, w, r, path, back, logger) + e = wsDealer(ctx1, w, r, path, backArray[backI], logger) } else { - httpDealer(ctx1, w, r, path, back, logger) + e = httpDealer(ctx1, w, r, path, backArray[backI], logger) + } + if e != nil { + logger.L(`W:`, fmt.Sprintf("%s=>%s 后端失效", path, backArray[backI].Name)) + backArray[backI].Disable() } }) } return nil } -func httpDealer(ctx context.Context, w http.ResponseWriter, r *http.Request, routePath string, back *Back, logger *plog.Log_interface) { +var ( + ErrNoHttp = errors.New("ErrNoHttp") + ErrNoWs = errors.New("ErrNoWs") + ErrCopy = errors.New("ErrCopy") + ErrReqCreFail = errors.New("ErrReqCreFail") + ErrReqDoFail = errors.New("ErrReqDoFail") + ErrResDoFail = errors.New("ErrResDoFail") +) + +func httpDealer(ctx context.Context, w http.ResponseWriter, r *http.Request, routePath string, back *Back, logger *plog.Log_interface) error { url := back.To if back.PathAdd { url += r.URL.String() @@ -250,14 +214,14 @@ func httpDealer(ctx context.Context, w http.ResponseWriter, r *http.Request, rou if !strings.HasPrefix(url, "http") { pweb.WithStatusCode(w, http.StatusServiceUnavailable) logger.L(`E:`, fmt.Sprintf("%s=>%s %v", routePath, back.Name, "非http")) - return + return ErrNoHttp } req, e := http.NewRequestWithContext(ctx, r.Method, url, r.Body) if e != nil { pweb.WithStatusCode(w, http.StatusServiceUnavailable) logger.L(`E:`, fmt.Sprintf("%s=>%s %v", routePath, back.Name, e)) - return + return ErrReqCreFail } for k, v := range r.Header { @@ -281,7 +245,7 @@ func httpDealer(ctx context.Context, w http.ResponseWriter, r *http.Request, rou if e != nil { pweb.WithStatusCode(w, http.StatusServiceUnavailable) logger.L(`E:`, fmt.Sprintf("%s=>%s %v", routePath, back.Name, e)) - return + return ErrReqDoFail } for k, v := range resp.Header { @@ -304,17 +268,18 @@ func httpDealer(ctx context.Context, w http.ResponseWriter, r *http.Request, rou w.WriteHeader(resp.StatusCode) if resp.StatusCode < 200 || resp.StatusCode == 204 || resp.StatusCode == 304 { - return + return nil } - w = pweb.WithFlush(w) + defer resp.Body.Close() if _, e = io.Copy(w, resp.Body); e != nil { logger.L(`E:`, fmt.Sprintf("%s=>%s %v", routePath, back.Name, e)) + return ErrCopy } - resp.Body.Close() + return nil } -func wsDealer(ctx context.Context, w http.ResponseWriter, r *http.Request, routePath string, back *Back, logger *plog.Log_interface) { +func wsDealer(ctx context.Context, w http.ResponseWriter, r *http.Request, routePath string, back *Back, logger *plog.Log_interface) error { url := back.To if back.PathAdd { url += r.URL.String() @@ -323,7 +288,7 @@ func wsDealer(ctx context.Context, w http.ResponseWriter, r *http.Request, route if !strings.HasPrefix(url, "ws") { pweb.WithStatusCode(w, http.StatusServiceUnavailable) logger.L(`E:`, fmt.Sprintf("%s=>%s %v", routePath, back.Name, "非websocket")) - return + return ErrNoWs } reqHeader := make(http.Header) @@ -342,6 +307,7 @@ func wsDealer(ctx context.Context, w http.ResponseWriter, r *http.Request, route if res, resp, e := websocket.DefaultDialer.Dial(url, reqHeader); e != nil { pweb.WithStatusCode(w, http.StatusServiceUnavailable) logger.L(`E:`, fmt.Sprintf("%s=>%s %v", routePath, back.Name, e)) + return ErrReqDoFail } else { for _, v := range back.ResHeader { switch v.Action { @@ -359,24 +325,30 @@ func wsDealer(ctx context.Context, w http.ResponseWriter, r *http.Request, route if req, e := (&websocket.Upgrader{}).Upgrade(w, r, resp.Header); e != nil { pweb.WithStatusCode(w, http.StatusServiceUnavailable) logger.L(`E:`, fmt.Sprintf("%s=>%s %v", routePath, back.Name, e)) + return ErrResDoFail } else { - fin := make(chan struct{}) + fin := make(chan error) reqc := req.NetConn() resc := res.NetConn() + defer func() { + reqc.Close() + resc.Close() + }() go func() { - _, _ = io.Copy(reqc, resc) - fin <- struct{}{} + _, e := io.Copy(reqc, resc) + fin <- e }() go func() { - _, _ = io.Copy(resc, reqc) - fin <- struct{}{} + _, e := io.Copy(resc, reqc) + fin <- e }() select { - case <-fin: + case e := <-fin: + logger.L(`E:`, fmt.Sprintf("%s=>%s %v", routePath, back.Name, e)) + return ErrCopy case <-ctx.Done(): + return nil } - reqc.Close() - resc.Close() } } } diff --git a/main/main.go b/main/main.go new file mode 100644 index 0000000..71e506a --- /dev/null +++ b/main/main.go @@ -0,0 +1,77 @@ +package main + +import ( + "context" + "errors" + "flag" + "os" + "os/signal" + "slices" + "time" + + pfront "github.com/qydysky/front" + pctx "github.com/qydysky/part/ctx" + pfile "github.com/qydysky/part/file" + plog "github.com/qydysky/part/log" + psys "github.com/qydysky/part/sys" +) + +func main() { + // 保持唤醒 + var stop = psys.Sys().PreventSleep() + defer stop.Done() + + // 获取config路径 + configP := flag.String("c", "main.json", "config") + testP := flag.Int("t", 0, "test port") + _ = flag.Bool("q", true, "no warn,error log") + flag.Parse() + + // 日志初始化 + logger := plog.New(plog.Config{ + Stdout: true, + Prefix_string: map[string]struct{}{ + `T:`: plog.On, + `I:`: plog.On, + `W:`: plog.On, + `E:`: plog.On, + }, + }) + + if slices.Contains(os.Args[1:], "-q") { + logger.L(`I:`, "简化输出") + delete(logger.Config.Prefix_string, `E:`) + delete(logger.Config.Prefix_string, `W:`) + delete(logger.Config.Prefix_string, `T:`) + } + + // 根ctx + ctx, cancle := pctx.WithWait(context.Background(), 0, time.Minute*2) + + // 获取config + configS := pfront.Config{} + configF := pfile.New(*configP, 0, true) + if !configF.IsExist() { + logger.L(`E:`, "配置不存在") + return + } + defer configF.Close() + + var buf = make([]byte, 1<<16) + + // 加载配置 + pfront.LoadPeriod(ctx, buf, configF, &configS, logger) + + // 测试响应 + go pfront.Test(ctx, *testP, logger) + + go pfront.Run(ctx, &configS, logger) + + // ctrl+c退出 + var interrupt = make(chan os.Signal, 2) + signal.Notify(interrupt, os.Interrupt) + <-interrupt + if errors.Is(cancle(), pctx.ErrWaitTo) { + logger.L(`E:`, "退出超时") + } +} diff --git a/main.json b/main/main.json similarity index 93% rename from main.json rename to main/main.json index ff74276..b92d062 100644 --- a/main.json +++ b/main/main.json @@ -10,6 +10,7 @@ "to": "http://127.0.0.1:13000", "weight": 1, "pathAdd": false, + "errBanSec": 10, "resHeader":[ { "action": "set",