From bb71e425534dc2949bdd760dc98ef5079a71b4f9 Mon Sep 17 00:00:00 2001 From: qydysky Date: Thu, 14 Dec 2023 22:33:27 +0800 Subject: [PATCH] 1 --- go.mod | 8 ++- go.sum | 8 +++ main.go | 160 +++++++++++++++++++------------------------------------- 3 files changed, 70 insertions(+), 106 deletions(-) diff --git a/go.mod b/go.mod index a192fb2..53c70c7 100644 --- a/go.mod +++ b/go.mod @@ -4,20 +4,26 @@ go 1.21.4 require ( github.com/gorilla/websocket v1.5.1 - github.com/qydysky/part v0.28.20231209070216 + github.com/qydysky/part v0.28.20231214142916 ) require ( + github.com/andybalholm/brotli v1.0.6 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/dustin/go-humanize v1.0.1 // indirect github.com/go-ole/go-ole v1.3.0 // indirect + github.com/klauspost/compress v1.17.3 // indirect + github.com/miekg/dns v1.1.57 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/shirou/gopsutil v3.21.11+incompatible // indirect + github.com/thedevsaddam/gojsonq/v2 v2.5.2 // indirect github.com/tklauser/go-sysconf v0.3.12 // indirect github.com/tklauser/numcpus v0.6.1 // indirect github.com/yusufpapurcu/wmi v1.2.3 // indirect + golang.org/x/mod v0.14.0 // indirect golang.org/x/net v0.18.0 // indirect golang.org/x/sys v0.14.0 // indirect golang.org/x/text v0.14.0 // indirect + golang.org/x/tools v0.15.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 40bf98f..852ba96 100644 --- a/go.sum +++ b/go.sum @@ -13,20 +13,28 @@ github.com/gorilla/websocket v1.5.1 h1:gmztn0JnHVt9JZquRuzLw3g4wouNVzKL15iLr/zn/ github.com/gorilla/websocket v1.5.1/go.mod h1:x3kM2JMyaluk02fnUJpQuwD2dCS5NDG2ZHL0uE0tcaY= github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 h1:Z9n2FFNUXsshfwJMBgNA0RU6/i7WVaAegv3PtuIHPMs= github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51/go.mod h1:CzGEWj7cYgsdH8dAjBGEr58BoE7ScuLd+fwFZ44+/x8= +github.com/klauspost/compress v1.17.3 h1:qkRjuerhUU1EmXLYGkSH6EZL+vPSxIrYjLNAK4slzwA= +github.com/klauspost/compress v1.17.3/go.mod h1:/dCuZOvVtNoHsyb+cuJD3itjs3NbnF6KH9zAO4BDxPM= github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw= github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= +github.com/miekg/dns v1.1.57 h1:Jzi7ApEIzwEPLHWRcafCN9LZSBbqQpxjt/wpgvg7wcM= +github.com/miekg/dns v1.1.57/go.mod h1:uqRjCRUuEAA6qsOiJvDd+CFo/vW+y5WR6SNmHE55hZk= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/qydysky/part v0.28.20231209070216 h1:1hnOxnsd4sTBTA/nF2pLhyL+0DidpwHo5H08S1D7xHE= github.com/qydysky/part v0.28.20231209070216/go.mod h1:NyKyjpBCSjcHtKlC+fL5lCidm57UCnwEgufiBDs5yxA= +github.com/qydysky/part v0.28.20231214142916 h1:pr4AyzEdcByXYlAB6yMyLebSDFH1X5fVnRqsdZkYNqU= +github.com/qydysky/part v0.28.20231214142916/go.mod h1:NyKyjpBCSjcHtKlC+fL5lCidm57UCnwEgufiBDs5yxA= github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE= github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= github.com/shirou/gopsutil v3.21.11+incompatible h1:+1+c1VGhc88SSonWP6foOcLhvnKlUeu/erjjvaPEYiI= github.com/shirou/gopsutil v3.21.11+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA= github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +github.com/thedevsaddam/gojsonq/v2 v2.5.2 h1:CoMVaYyKFsVj6TjU6APqAhAvC07hTI6IQen8PHzHYY0= +github.com/thedevsaddam/gojsonq/v2 v2.5.2/go.mod h1:bv6Xa7kWy82uT0LnXPE2SzGqTj33TAEeR560MdJkiXs= github.com/tklauser/go-sysconf v0.3.12 h1:0QaGUFOdQaIVdPgfITYzaTegZvdCjmYO52cSFAEVmqU= github.com/tklauser/go-sysconf v0.3.12/go.mod h1:Ho14jnntGE1fpdOqQEEaiKRpvIavV0hSfmBq8nJbHYI= github.com/tklauser/numcpus v0.6.1 h1:ng9scYS7az0Bk4OZLvrNXNSAO2Pxr1XXRAPyjhIx+Fk= diff --git a/main.go b/main.go index d643ed2..4bbadc6 100644 --- a/main.go +++ b/main.go @@ -19,6 +19,7 @@ import ( "github.com/gorilla/websocket" pctx "github.com/qydysky/part/ctx" + pslice "github.com/qydysky/part/slice" pweb "github.com/qydysky/part/web" "golang.org/x/net/proxy" ) @@ -369,9 +370,7 @@ func wsDealer(ctx context.Context, w http.ResponseWriter, r *http.Request, route return e } - fmt.Println(reqHeader.Get("Sec-WebSocket-Key")) - - if res, resp, e := DialContext(websocket.DefaultDialer, context.Background(), url, reqHeader); e != nil { + if res, resp, e := DialContext(ctx, url, reqHeader); e != nil { return errors.Join(ErrReqDoFail, e) } else { defer res.Close() @@ -382,38 +381,25 @@ func wsDealer(ctx context.Context, w http.ResponseWriter, r *http.Request, route return e } - fmt.Println(resHeader.Get("Sec-Websocket-Accept")) - - if req, e := Upgrade(&websocket.Upgrader{}, w, r, resHeader); e != nil { + if req, e := Upgrade(w, r, resHeader); e != nil { return errors.Join(ErrResDoFail, e) } else { defer req.Close() - fin := make(chan error, 2) - reqc := req.NetConn() - resc := res.NetConn() - go func() { - ctx1, done1 := pctx.WaitCtx(ctx) - defer done1() - select { - case fin <- copyWsMsg(reqc, resc): - case <-ctx1.Done(): - fin <- nil + select { + case e := <-copyWsMsg(req, res): + if e != nil { + logger.Error(`E:`, fmt.Sprintf("%s=>%s s->c %v", routePath, back.Name, e)) + return errors.Join(ErrCopy, e) } - }() - go func() { - ctx1, done1 := pctx.WaitCtx(ctx) - defer done1() - select { - case fin <- copyWsMsg(resc, reqc): - case <-ctx1.Done(): - fin <- nil + case e := <-copyWsMsg(res, req): + if e != nil { + logger.Error(`E:`, fmt.Sprintf("%s=>%s c->s %v", routePath, back.Name, e)) + return errors.Join(ErrCopy, e) } - }() - if e := <-fin; e != nil { - logger.Error(`E:`, fmt.Sprintf("%s=>%s %v", routePath, back.Name, e)) - return errors.Join(ErrCopy, e) + case <-ctx.Done(): } + return nil } } @@ -443,15 +429,24 @@ func copyHeader(s, t http.Header, app []Header) error { return nil } -func copyWsMsg(dst io.Writer, src io.Reader) (e error) { - _, e = io.Copy(dst, src) - return +var copyBuf = pslice.NewBlocks[byte](16*1024, 100) + +func copyWsMsg(dst io.Writer, src io.Reader) <-chan error { + c := make(chan error, 1) + go func() { + if tmpbuf, put, e := copyBuf.Get(); e != nil { + c <- e + } else { + _, e := io.CopyBuffer(dst, src, tmpbuf) + put() + c <- e + } + }() + return c } -func DialContext(d *websocket.Dialer, ctx context.Context, urlStr string, requestHeader http.Header) (*websocket.Conn, *http.Response, error) { - if d == nil { - d = websocket.DefaultDialer - } +func DialContext(ctx context.Context, urlStr string, requestHeader http.Header) (net.Conn, *http.Response, error) { + d := websocket.DefaultDialer challengeKey := requestHeader.Get("Sec-WebSocket-Key") @@ -490,16 +485,7 @@ func DialContext(d *websocket.Dialer, ctx context.Context, urlStr string, reques // servers that depend on it. The Header.Set method is not used because the // method canonicalizes the header names. for k, vs := range requestHeader { - switch { - case k == "Host": - if len(vs) > 0 { - req.Host = vs[0] - } - case k == "Sec-Websocket-Protocol": - return nil, nil, errors.New("websocket: header not allowed: " + k) - default: - req.Header[k] = vs - } + req.Header[k] = vs } if d.HandshakeTimeout != 0 { @@ -593,14 +579,6 @@ func DialContext(d *websocket.Dialer, ctx context.Context, urlStr string, reques }) } - defer func() { - if netConn != nil { - if err := netConn.Close(); err != nil { - log.Printf("websocket: failed to close network connection: %v", err) - } - } - }() - if u.Scheme == "https" && d.NetDialTLSContext == nil { // If NetDialTLSContext is set, assume that the TLS handshake has already been done @@ -635,8 +613,6 @@ func DialContext(d *websocket.Dialer, ctx context.Context, urlStr string, reques br = bufio.NewReaderSize(netConn, d.ReadBufferSize) } - conn := newConn(netConn, false, d.ReadBufferSize, d.WriteBufferSize, d.WriteBufferPool, br, nil) - if err := req.Write(netConn); err != nil { return nil, nil, err } @@ -687,8 +663,7 @@ func DialContext(d *websocket.Dialer, ctx context.Context, urlStr string, reques if err := netConn.SetDeadline(time.Time{}); err != nil { return nil, nil, err } - netConn = nil // to avoid close in defer. - return conn, resp, nil + return netConn, resp, nil } type netDialerFunc func(network, addr string) (net.Conn, error) @@ -719,7 +694,7 @@ func generateChallengeKey() (string, error) func tokenListContainsValue(header http.Header, name string, value string) bool //go:linkname returnError github.com/gorilla/websocket.(*Upgrader).returnError -func returnError(u *websocket.Upgrader, w http.ResponseWriter, r *http.Request, status int, reason string) (*websocket.Conn, error) +// func returnError(u *websocket.Upgrader, w http.ResponseWriter, r *http.Request, status int, reason string) (*websocket.Conn, error) //go:linkname checkSameOrigin github.com/gorilla/websocket.checkSameOrigin func checkSameOrigin(r *http.Request) bool @@ -739,9 +714,6 @@ func bufioReaderSize(originalReader io.Reader, br *bufio.Reader) int //go:linkname bufioWriterBuffer github.com/gorilla/websocket.bufioWriterBuffer func bufioWriterBuffer(originalWriter io.Writer, bw *bufio.Writer) []byte -//go:linkname newConn github.com/gorilla/websocket.newConn -func newConn(conn net.Conn, isServer bool, readBufferSize, writeBufferSize int, writeBufferPool websocket.BufferPool, br *bufio.Reader, writeBuf []byte) *websocket.Conn - //go:linkname computeAcceptKey github.com/gorilla/websocket.computeAcceptKey func computeAcceptKey(challengeKey string) string @@ -752,38 +724,8 @@ const ( maxControlFramePayloadSize = 125 ) -func Upgrade(u *websocket.Upgrader, w http.ResponseWriter, r *http.Request, responseHeader http.Header) (*websocket.Conn, error) { - const badHandshake = "websocket: the client is not using the websocket protocol: " - - if !tokenListContainsValue(r.Header, "Connection", "upgrade") { - return returnError(u, w, r, http.StatusBadRequest, badHandshake+"'upgrade' token not found in 'Connection' header") - } - - if !tokenListContainsValue(r.Header, "Upgrade", "websocket") { - return returnError(u, w, r, http.StatusBadRequest, badHandshake+"'websocket' token not found in 'Upgrade' header") - } - - if r.Method != http.MethodGet { - return returnError(u, w, r, http.StatusMethodNotAllowed, badHandshake+"request method is not GET") - } - - if !tokenListContainsValue(r.Header, "Sec-Websocket-Version", "13") { - return returnError(u, w, r, http.StatusBadRequest, "websocket: unsupported version: 13 not found in 'Sec-Websocket-Version' header") - } - - checkOrigin := u.CheckOrigin - if checkOrigin == nil { - checkOrigin = checkSameOrigin - } - if !checkOrigin(r) { - return returnError(u, w, r, http.StatusForbidden, "websocket: request origin not allowed by Upgrader.CheckOrigin") - } - - challengeKey := r.Header.Get("Sec-Websocket-Key") - if !isValidChallengeKey(challengeKey) { - return returnError(u, w, r, http.StatusBadRequest, "websocket: not a websocket handshake: 'Sec-WebSocket-Key' header must be Base64 encoded value of 16-byte in length") - } - +func Upgrade(w http.ResponseWriter, r *http.Request, responseHeader http.Header) (net.Conn, error) { + u := &websocket.Upgrader{} h, ok := w.(http.Hijacker) if !ok { return returnError(u, w, r, http.StatusInternalServerError, "websocket: response does not implement http.Hijacker") @@ -801,12 +743,6 @@ func Upgrade(u *websocket.Upgrader, w http.ResponseWriter, r *http.Request, resp return nil, errors.New("websocket: client sent data before handshake is complete") } - var br *bufio.Reader - if u.ReadBufferSize == 0 && bufioReaderSize(netConn, brw.Reader) > 256 { - // Reuse hijacked buffered reader as connection reader. - br = brw.Reader - } - buf := bufioWriterBuffer(netConn, brw.Writer) var writeBuf []byte @@ -823,8 +759,6 @@ func Upgrade(u *websocket.Upgrader, w http.ResponseWriter, r *http.Request, resp } } - c := newConn(netConn, true, u.ReadBufferSize, u.WriteBufferSize, u.WriteBufferPool, br, writeBuf) - // Use larger of hijacked buffer and connection write buffer for header. p := buf if len(writeBuf) > len(p) { @@ -834,9 +768,6 @@ func Upgrade(u *websocket.Upgrader, w http.ResponseWriter, r *http.Request, resp p = append(p, "HTTP/1.1 101 Switching Protocols\r\n"...) for k, vs := range responseHeader { - if k == "Sec-Websocket-Protocol" || k == "Sec-Websocket-Extensions" { - continue - } for _, v := range vs { p = append(p, k...) p = append(p, ": "...) @@ -884,5 +815,24 @@ func Upgrade(u *websocket.Upgrader, w http.ResponseWriter, r *http.Request, resp } } - return c, nil + return netConn, nil +} + +func returnError(u *websocket.Upgrader, w http.ResponseWriter, r *http.Request, status int, reason string) (net.Conn, error) { + err := HandshakeError{message: reason} + if u.Error != nil { + u.Error(w, r, status, err) + } else { + w.Header().Set("Sec-Websocket-Version", "13") + http.Error(w, http.StatusText(status), status) + } + return nil, err +} + +type HandshakeError struct { + message string +} + +func (t HandshakeError) Error() string { + return t.message } -- 2.39.2