"context"
"encoding/json"
"errors"
- "flag"
"fmt"
"io"
"net/http"
- "os"
- "os/signal"
- "slices"
"strings"
"time"
pctx "github.com/qydysky/part/ctx"
pfile "github.com/qydysky/part/file"
plog "github.com/qydysky/part/log"
- psys "github.com/qydysky/part/sys"
pweb "github.com/qydysky/part/web"
)
-func main() {
- // 保持唤醒
- var stop = psys.Sys().PreventSleep()
- defer stop.Done()
-
- // 获取config路径
- configP := flag.String("c", "main.json", "config")
- testP := flag.Int("t", 0, "test port")
- _ = flag.Bool("q", true, "no warn,error log")
- flag.Parse()
-
- // 日志初始化
- logger := plog.New(plog.Config{
- Stdout: true,
- Prefix_string: map[string]struct{}{
- `T:`: plog.On,
- `I:`: plog.On,
- `W:`: plog.On,
- `E:`: plog.On,
- },
- })
-
- if slices.Contains(os.Args[1:], "-q") {
- logger.L(`I:`, "简化输出")
- delete(logger.Config.Prefix_string, `E:`)
- delete(logger.Config.Prefix_string, `W:`)
- delete(logger.Config.Prefix_string, `T:`)
- }
-
- // 根ctx
- ctx, cancle := pctx.WithWait(context.Background(), 0, time.Minute*2)
-
- // 获取config
- configS := Config{}
- configF := pfile.New(*configP, 0, true)
- if !configF.IsExist() {
- logger.L(`E:`, "配置不存在")
- return
- }
- defer configF.Close()
-
- var buf = make([]byte, 1<<16)
-
- if e := loadConfig(buf, configF, &configS, logger); e != nil {
+// 加载
+func LoadPeriod(ctx context.Context, buf []byte, configF *pfile.File, configS *Config, logger *plog.Log_interface) {
+ if e := loadConfig(buf, configF, configS, logger); e != nil {
logger.L(`E:`, "配置加载", e)
}
-
- // 定时加载
- go LoadPeriod(ctx, buf, configF, &configS, logger)
-
- // 测试响应
- go Test(ctx, *testP, logger)
-
- go Run(ctx, &configS, logger)
-
- // ctrl+c退出
- var interrupt = make(chan os.Signal, 2)
- signal.Notify(interrupt, os.Interrupt)
- <-interrupt
- if errors.Is(cancle(), pctx.ErrWaitTo) {
- logger.L(`E:`, "退出超时")
- }
-}
-
-// 定时加载
-func LoadPeriod(ctx context.Context, buf []byte, configF *pfile.File, configS *Config, logger *plog.Log_interface) {
- ctx1, done1 := pctx.WaitCtx(ctx)
- defer done1()
// 定时加载config
- for {
- select {
- case <-time.After(time.Second * 10):
- if e := loadConfig(buf, configF, configS, logger); e != nil {
- logger.L(`E:`, "配置加载", e)
+ go func() {
+ ctx1, done1 := pctx.WaitCtx(ctx)
+ defer done1()
+ for {
+ select {
+ case <-time.After(time.Second * 10):
+ if e := loadConfig(buf, configF, configS, logger); e != nil {
+ logger.L(`E:`, "配置加载", e)
+ }
+ case <-ctx1.Done():
+ return
}
- case <-ctx1.Done():
- return
}
- }
+ }()
}
// 测试
ctx1, done1 := pctx.WaitCtx(ctx)
defer done1()
- back := backArray[time.Now().UnixMilli()%int64(len(backArray))]
+ now := time.Now()
+ backI := now.UnixMilli() % int64(len(backArray))
+
+ if !backArray[backI].IsLive() {
+ for backI = 0; backI < int64(len(backArray)); backI++ {
+ if backArray[backI].IsLive() {
+ break
+ }
+ }
+ if backI == int64(len(backArray)) {
+ pweb.WithStatusCode(w, http.StatusServiceUnavailable)
+ logger.L(`E:`, fmt.Sprintf("%s=> 全部后端失效", path))
+ return
+ }
+ }
- logger.L(`T:`, fmt.Sprintf("%s=>%s", path, back.To))
+ logger.L(`T:`, fmt.Sprintf("%s=>%s", path, backArray[backI].Name))
+ var e error
if r.Header.Get("Upgrade") == "websocket" {
- wsDealer(ctx1, w, r, path, back, logger)
+ e = wsDealer(ctx1, w, r, path, backArray[backI], logger)
} else {
- httpDealer(ctx1, w, r, path, back, logger)
+ e = httpDealer(ctx1, w, r, path, backArray[backI], logger)
+ }
+ if e != nil {
+ logger.L(`W:`, fmt.Sprintf("%s=>%s 后端失效", path, backArray[backI].Name))
+ backArray[backI].Disable()
}
})
}
return nil
}
-func httpDealer(ctx context.Context, w http.ResponseWriter, r *http.Request, routePath string, back *Back, logger *plog.Log_interface) {
+var (
+ ErrNoHttp = errors.New("ErrNoHttp")
+ ErrNoWs = errors.New("ErrNoWs")
+ ErrCopy = errors.New("ErrCopy")
+ ErrReqCreFail = errors.New("ErrReqCreFail")
+ ErrReqDoFail = errors.New("ErrReqDoFail")
+ ErrResDoFail = errors.New("ErrResDoFail")
+)
+
+func httpDealer(ctx context.Context, w http.ResponseWriter, r *http.Request, routePath string, back *Back, logger *plog.Log_interface) error {
url := back.To
if back.PathAdd {
url += r.URL.String()
if !strings.HasPrefix(url, "http") {
pweb.WithStatusCode(w, http.StatusServiceUnavailable)
logger.L(`E:`, fmt.Sprintf("%s=>%s %v", routePath, back.Name, "非http"))
- return
+ return ErrNoHttp
}
req, e := http.NewRequestWithContext(ctx, r.Method, url, r.Body)
if e != nil {
pweb.WithStatusCode(w, http.StatusServiceUnavailable)
logger.L(`E:`, fmt.Sprintf("%s=>%s %v", routePath, back.Name, e))
- return
+ return ErrReqCreFail
}
for k, v := range r.Header {
if e != nil {
pweb.WithStatusCode(w, http.StatusServiceUnavailable)
logger.L(`E:`, fmt.Sprintf("%s=>%s %v", routePath, back.Name, e))
- return
+ return ErrReqDoFail
}
for k, v := range resp.Header {
w.WriteHeader(resp.StatusCode)
if resp.StatusCode < 200 || resp.StatusCode == 204 || resp.StatusCode == 304 {
- return
+ return nil
}
- w = pweb.WithFlush(w)
+ defer resp.Body.Close()
if _, e = io.Copy(w, resp.Body); e != nil {
logger.L(`E:`, fmt.Sprintf("%s=>%s %v", routePath, back.Name, e))
+ return ErrCopy
}
- resp.Body.Close()
+ return nil
}
-func wsDealer(ctx context.Context, w http.ResponseWriter, r *http.Request, routePath string, back *Back, logger *plog.Log_interface) {
+func wsDealer(ctx context.Context, w http.ResponseWriter, r *http.Request, routePath string, back *Back, logger *plog.Log_interface) error {
url := back.To
if back.PathAdd {
url += r.URL.String()
if !strings.HasPrefix(url, "ws") {
pweb.WithStatusCode(w, http.StatusServiceUnavailable)
logger.L(`E:`, fmt.Sprintf("%s=>%s %v", routePath, back.Name, "非websocket"))
- return
+ return ErrNoWs
}
reqHeader := make(http.Header)
if res, resp, e := websocket.DefaultDialer.Dial(url, reqHeader); e != nil {
pweb.WithStatusCode(w, http.StatusServiceUnavailable)
logger.L(`E:`, fmt.Sprintf("%s=>%s %v", routePath, back.Name, e))
+ return ErrReqDoFail
} else {
for _, v := range back.ResHeader {
switch v.Action {
if req, e := (&websocket.Upgrader{}).Upgrade(w, r, resp.Header); e != nil {
pweb.WithStatusCode(w, http.StatusServiceUnavailable)
logger.L(`E:`, fmt.Sprintf("%s=>%s %v", routePath, back.Name, e))
+ return ErrResDoFail
} else {
- fin := make(chan struct{})
+ fin := make(chan error)
reqc := req.NetConn()
resc := res.NetConn()
+ defer func() {
+ reqc.Close()
+ resc.Close()
+ }()
go func() {
- _, _ = io.Copy(reqc, resc)
- fin <- struct{}{}
+ _, e := io.Copy(reqc, resc)
+ fin <- e
}()
go func() {
- _, _ = io.Copy(resc, reqc)
- fin <- struct{}{}
+ _, e := io.Copy(resc, reqc)
+ fin <- e
}()
select {
- case <-fin:
+ case e := <-fin:
+ logger.L(`E:`, fmt.Sprintf("%s=>%s %v", routePath, back.Name, e))
+ return ErrCopy
case <-ctx.Done():
+ return nil
}
- reqc.Close()
- resc.Close()
}
}
}
--- /dev/null
+package main
+
+import (
+ "context"
+ "errors"
+ "flag"
+ "os"
+ "os/signal"
+ "slices"
+ "time"
+
+ pfront "github.com/qydysky/front"
+ pctx "github.com/qydysky/part/ctx"
+ pfile "github.com/qydysky/part/file"
+ plog "github.com/qydysky/part/log"
+ psys "github.com/qydysky/part/sys"
+)
+
+func main() {
+ // 保持唤醒
+ var stop = psys.Sys().PreventSleep()
+ defer stop.Done()
+
+ // 获取config路径
+ configP := flag.String("c", "main.json", "config")
+ testP := flag.Int("t", 0, "test port")
+ _ = flag.Bool("q", true, "no warn,error log")
+ flag.Parse()
+
+ // 日志初始化
+ logger := plog.New(plog.Config{
+ Stdout: true,
+ Prefix_string: map[string]struct{}{
+ `T:`: plog.On,
+ `I:`: plog.On,
+ `W:`: plog.On,
+ `E:`: plog.On,
+ },
+ })
+
+ if slices.Contains(os.Args[1:], "-q") {
+ logger.L(`I:`, "简化输出")
+ delete(logger.Config.Prefix_string, `E:`)
+ delete(logger.Config.Prefix_string, `W:`)
+ delete(logger.Config.Prefix_string, `T:`)
+ }
+
+ // 根ctx
+ ctx, cancle := pctx.WithWait(context.Background(), 0, time.Minute*2)
+
+ // 获取config
+ configS := pfront.Config{}
+ configF := pfile.New(*configP, 0, true)
+ if !configF.IsExist() {
+ logger.L(`E:`, "配置不存在")
+ return
+ }
+ defer configF.Close()
+
+ var buf = make([]byte, 1<<16)
+
+ // 加载配置
+ pfront.LoadPeriod(ctx, buf, configF, &configS, logger)
+
+ // 测试响应
+ go pfront.Test(ctx, *testP, logger)
+
+ go pfront.Run(ctx, &configS, logger)
+
+ // ctrl+c退出
+ var interrupt = make(chan os.Signal, 2)
+ signal.Notify(interrupt, os.Interrupt)
+ <-interrupt
+ if errors.Is(cancle(), pctx.ErrWaitTo) {
+ logger.L(`E:`, "退出超时")
+ }
+}