github.com/miekg/dns v1.1.62/go.mod h1:mvDlcItzm+br7MToIKqkglaGhlFMHJ9DTNNWONWXbNQ=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
-github.com/qydysky/part v0.28.20250319150042 h1:E15i7L84i61txZZ7XLmQPqc0DzT77RPeXLDlsd/lJHI=
-github.com/qydysky/part v0.28.20250319150042/go.mod h1:RHYTy8EbqCP6OioVf6BkvFcfWLNO0S220zl0DDlY84Y=
+github.com/qydysky/part v0.28.20250420054637 h1:/0ZWIZIWsvg6iSS26sjuGrzKeLl5BWEoHSI2cJEOHbk=
+github.com/qydysky/part v0.28.20250420054637/go.mod h1:wp71PQdKYcg9jn9yDDvqC4shS/kzejyvFqbfUxuHocY=
github.com/shirou/gopsutil v3.21.11+incompatible h1:+1+c1VGhc88SSonWP6foOcLhvnKlUeu/erjjvaPEYiI=
github.com/shirou/gopsutil v3.21.11+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA=
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
github.com/yusufpapurcu/wmi v1.2.4/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0=
golang.org/x/mod v0.22.0 h1:D4nJWe9zXqHOmWqj4VMOJhvzj7bEZg4wEYa759z1pH4=
golang.org/x/mod v0.22.0/go.mod h1:6SkKJ3Xj0I0BrPOZoBy3bdMptDDU9oJrpohJ3eWZ1fY=
-golang.org/x/net v0.37.0 h1:1zLorHbz+LYj7MQlSf1+2tPIIgibq2eL5xkrGk6f+2c=
-golang.org/x/net v0.37.0/go.mod h1:ivrbrMbzFq5J41QOQh0siUuly180yBYtLp+CKbEaFx8=
-golang.org/x/sync v0.12.0 h1:MHc5BpPuC30uJk597Ri8TV3CNZcTLu6B6z4lJy+g6Jw=
-golang.org/x/sync v0.12.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA=
+golang.org/x/net v0.39.0 h1:ZCu7HMWDxpXpaiKdhzIfaltL9Lp31x/3fCP11bc6/fY=
+golang.org/x/net v0.39.0/go.mod h1:X7NRbYVEA+ewNkCNyJ513WmMdQ3BineSwVtN2zD/d+E=
+golang.org/x/sync v0.13.0 h1:AauUjRAJ9OSnvULf/ARrrVywoJDy0YS2AwQ98I37610=
+golang.org/x/sync v0.13.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA=
golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
-golang.org/x/sys v0.31.0 h1:ioabZlmFYtWhL+TRYpcnNlLwhyxaM9kWTDEmfnprqik=
-golang.org/x/sys v0.31.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
-golang.org/x/text v0.23.0 h1:D71I7dUrlY+VX0gQShAThNGHFxZ13dGLBHQLVl1mJlY=
-golang.org/x/text v0.23.0/go.mod h1:/BLNzu4aZCJ1+kcD0DNRotWKage4q2rGVAg4o22unh4=
+golang.org/x/sys v0.32.0 h1:s77OFDvIQeibCmezSnk/q6iAfkdiQaJi4VzroCFrN20=
+golang.org/x/sys v0.32.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
+golang.org/x/text v0.24.0 h1:dd5Bzh4yt5KYA8f9CJHCP4FB4D51c2c6JvN37xJJkJ0=
+golang.org/x/text v0.24.0/go.mod h1:L8rBsPeo2pSS+xqN0d5u2ikmjtmoJbDBT1b7nHvFCdU=
golang.org/x/tools v0.28.0 h1:WuB6qZ4RPCQo5aP3WdKZS7i595EdWqWR8vqJTlwTVK8=
golang.org/x/tools v0.28.0/go.mod h1:dcIOrVd3mfQKTgrDVQHqCPMWy6lnhfhtX3hLXYVLfRw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
"os"
"os/signal"
"sync"
+ "sync/atomic"
"github.com/dustin/go-humanize"
"github.com/qydysky/part"
- pctx "github.com/qydysky/part/ctx"
+ pe "github.com/qydysky/part/errors"
file "github.com/qydysky/part/file"
)
log.Fatal(e)
return
} else {
+ ctx, cancle := context.WithCancel(context.Background())
+ wait := dealConfig(ctx, config)
// ctrl+c退出
var interrupt = make(chan os.Signal, 2)
signal.Notify(interrupt, os.Interrupt)
+ <-interrupt
+ cancle()
+ wait()
+ }
+}
- ctx := pctx.CarryCancel(context.WithCancel(context.Background()))
- msdChan, wait := dealConfig(ctx, config)
+type fm struct {
+ id atomic.Uint32
+ alive atomic.Int32
+}
- defer wait()
- defer func() {
- _ = pctx.CallCancel(ctx)
- }()
+func (t *fm) ErrorMsg(targetaddr, listenaddr string, e error) {
+ log.Default().Printf("ERROR %v => %v %v", listenaddr, targetaddr, pe.ErrorFormat(e, pe.ErrActionInLineFunc))
+}
+func (t *fm) WarnMsg(targetaddr, listenaddr string, e error) {
+ // log.Default().Printf("Warn %v => %v %v", listenaddr, targetaddr, pe.ErrorFormat(e, pe.ErrActionInLineFunc))
+}
+func (t *fm) AcceptMsg(remote net.Addr, targetaddr string) (close func()) {
+ current := t.id.Add(1)
+ if current >= 99 {
+ t.id.Store(0)
+ }
- for {
- select {
- case msg := <-msdChan:
- switch msg.fmsg.Type {
- case part.LisnMsg:
- log.Default().Printf("LISTEN %v => %v", msg.item.Listen, msg.item.To)
- case part.AcceptMsg:
- log.Default().Printf("ACCEPT %v => %v", (msg.fmsg.Msg).(net.Addr).String(), msg.item.To)
- case part.DenyMsg:
- log.Default().Printf("DENY %v => %v", (msg.fmsg.Msg).(net.Addr).String(), msg.item.To)
- case part.ErrorMsg:
- log.Default().Fatalf("ERROR %v => %v %v", msg.item.Listen, msg.item.To, msg.fmsg.Msg)
- default:
- }
- case <-interrupt:
- log.Default().Printf("CLOSE")
- return
- }
- }
+ log.Default().Printf("ACCEPT %d %d %v => %v", t.alive.Add(1), current, remote.Network()+"://"+remote.String(), targetaddr)
+ return func() {
+ log.Default().Printf("CONFIN %d %d %v => %v", t.alive.Add(-1), current, remote.Network()+"://"+remote.String(), targetaddr)
}
}
-
-type ConfigMsg struct {
- item ConfigItem
- fmsg part.ForwardMsg
+func (t *fm) DenyMsg(remote net.Addr, targetaddr string) {
+ current := t.id.Add(1)
+ if current >= 99 {
+ t.id.Store(0)
+ }
+ log.Default().Printf("DENY %d %v => %v", current, remote.Network()+"://"+remote.String(), targetaddr)
+}
+func (t *fm) LisnMsg(targetaddr, listenaddr string) {
+ log.Default().Printf("LISTEN %v => %v", listenaddr, targetaddr)
+}
+func (t *fm) ClosMsg(targetaddr, listenaddr string) {
+ log.Default().Printf("CLOSE %v => %v", listenaddr, targetaddr)
}
-func dealConfig(ctx context.Context, config Config) (msgChan chan ConfigMsg, WaitFin func()) {
- msgChan = make(chan ConfigMsg, 10)
+func dealConfig(ctx context.Context, config Config) (WaitFin func()) {
var wg sync.WaitGroup
wg.Add(len(config))
+ fmp := &fm{}
for _, v := range config {
- go func(ctx context.Context, item ConfigItem) {
+ go func(item ConfigItem) {
defer wg.Done()
- var msg_chan chan part.ForwardMsg
- var close func()
+ defer part.Forward(item.To, item.Listen, item.Accept, fmp)()
- close, msg_chan = part.Forward(item.To, item.Listen, item.Accept)
-
- go func() {
- <-ctx.Done()
- close()
- }()
- defer func() {
- _ = pctx.CallCancel(ctx)
- }()
-
- for {
- select {
- case msg := <-msg_chan:
- select {
- case msgChan <- ConfigMsg{item: item, fmsg: msg}:
- default:
- <-msgChan
- msgChan <- ConfigMsg{item: item, fmsg: msg}
- }
- if msg.Type == part.ErrorMsg {
- return
- }
- case <-ctx.Done():
- return
- }
- }
- }(ctx, v)
+ <-ctx.Done()
+ }(v)
}
-
- return msgChan, wg.Wait
+ return wg.Wait
}
"bufio"
"bytes"
"context"
+ "crypto/rand"
"errors"
- "log"
"net"
"testing"
"time"
"github.com/qydysky/part"
- pctx "github.com/qydysky/part/ctx"
)
func Test(t *testing.T) {
- ctx := pctx.CarryCancel(context.WithCancel(context.Background()))
- msdChan, wait := dealConfig(ctx, []ConfigItem{
+ ctx, cancle := context.WithCancel(context.Background())
+ wait := dealConfig(ctx, []ConfigItem{
+ // {
+ // Listen: "tcp://127.0.0.1:20000",
+ // To: "tcp://127.0.0.1:20001",
+ // Accept: []string{"127.0.0.2/32", "127.0.0.1/32"},
+ // },
+ // {
+ // Listen: "tcp://127.0.0.1:20002",
+ // To: "tcp://127.0.0.1:20003",
+ // Accept: []string{"127.0.0.2/32"},
+ // },
+ // {
+ // Listen: "udp://127.0.0.1:20000",
+ // To: "udp://127.0.0.1:20001",
+ // Accept: []string{"127.0.0.1/32"},
+ // },
+ // {
+ // Listen: "udp://127.0.0.1:20004",
+ // To: "udp://127.0.0.1:20005",
+ // Accept: []string{"127.0.0.2/32"},
+ // },
+ // {
+ // Listen: "udp://127.0.0.1:20006",
+ // To: "tcp://127.0.0.1:20007",
+ // Accept: []string{"127.0.0.1/32"},
+ // },
+ // {
+ // Listen: "tcp://127.0.0.1:20008",
+ // To: "udp://127.0.0.1:20009",
+ // Accept: []string{"127.0.0.1/32"},
+ // },
{
- Listen: "tcp://127.0.0.1:20000",
- To: "tcp://127.0.0.1:20001",
- Accept: []string{"127.0.0.2/32", "127.0.0.1/32"},
- },
- {
- Listen: "tcp://127.0.0.1:20002",
- To: "tcp://127.0.0.1:20003",
- Accept: []string{"127.0.0.2/32"},
- },
- {
- Listen: "udp://127.0.0.1:20000",
- To: "udp://127.0.0.1:20001",
+ Listen: "tcp://127.0.0.1:20012",
+ To: "udp://127.0.0.1:20013",
Accept: []string{"127.0.0.1/32"},
},
{
- Listen: "udp://127.0.0.1:20004",
- To: "udp://127.0.0.1:20005",
- Accept: []string{"127.0.0.2/32"},
- },
- {
- Listen: "udp://127.0.0.1:20006",
- To: "tcp://127.0.0.1:20007",
- Accept: []string{"127.0.0.1/32"},
- },
- {
- Listen: "tcp://127.0.0.1:20008",
- To: "udp://127.0.0.1:20009",
- Accept: []string{"127.0.0.1/32"},
- },
- {
- Listen: "tcp://127.0.0.1:20011",
- To: "tcp://127.0.0.1:20010",
+ Listen: "udp://127.0.0.1:20014",
+ To: "tcp://127.0.0.1:20012",
Accept: []string{"127.0.0.1/32"},
},
+ // {
+ // Listen: "tcp://127.0.0.1:20011",
+ // To: "tcp://127.0.0.1:20010",
+ // Accept: []string{"127.0.0.1/32"},
+ // },
})
- go func() {
- for {
- select {
- case msg := <-msdChan:
- switch msg.fmsg.Type {
- case part.LisnMsg:
- log.Default().Printf("LISTEN %v => %v", msg.item.Listen, msg.item.To)
- case part.AcceptMsg:
- log.Default().Printf("ACCEPT %v => %v", (msg.fmsg.Msg).(net.Addr).String(), msg.item.To)
- case part.DenyMsg:
- log.Default().Printf("DENY %v => %v", (msg.fmsg.Msg).(net.Addr).String(), msg.item.To)
- case part.ErrorMsg:
- log.Default().Fatalf("ERROR %v => %v %v", msg.item.Listen, msg.item.To, msg.fmsg.Msg)
- default:
- }
- case <-ctx.Done():
- log.Default().Printf("CLOSE")
- return
- }
- }
- }()
- defer wait()
- defer func() {
- _ = pctx.CallCancel(ctx)
- }()
-
time.Sleep(time.Second)
- for i := 0; i < 100; i++ {
- if e := tcpSer("127.0.0.1:20000", "127.0.0.1:20001"); e != nil {
- t.Fatal(e)
- }
- }
- if e := tcpSer("127.0.0.1:20002", "127.0.0.1:20003"); e == nil {
- t.Fatal(e)
- }
- if e := udpSer("127.0.0.1:20000", "127.0.0.1:20001"); e != nil {
- t.Fatal(e)
- }
- if e := udpSer("127.0.0.1:20004", "127.0.0.1:20005"); e == nil {
- t.Fatal(e)
- }
- if e := udp2tcpSer("127.0.0.1:20006", "127.0.0.1:20007"); e != nil {
- t.Fatal(e)
- }
- if e := tcp2udpSer("127.0.0.1:20008", "127.0.0.1:20009"); e != nil {
- t.Fatal(e)
- }
- if e := tcp2udpSer("127.0.0.1:20008", "127.0.0.1:20009"); e != nil {
+ // for i := 0; i < 100; i++ {
+ // if e := tcpSer("127.0.0.1:20000", "127.0.0.1:20001"); e != nil {
+ // t.Fatal(e)
+ // }
+ // }
+ // if e := tcpSer("127.0.0.1:20002", "127.0.0.1:20003"); e == nil {
+ // t.Fatal(e)
+ // }
+ // if e := udpSer("127.0.0.1:20000", "127.0.0.1:20001"); e != nil {
+ // t.Fatal(e)
+ // }
+ // if e := udpSer("127.0.0.1:20004", "127.0.0.1:20005"); e == nil {
+ // t.Fatal(e)
+ // }
+ // if e := tcp2udpSer("127.0.0.1:20008", "127.0.0.1:20009"); e != nil {
+ // t.Fatal(e)
+ // }
+ // if e := udp2tcpSer("127.0.0.1:20006", "127.0.0.1:20007"); e != nil {
+ // t.Fatal(e)
+ // }
+ if e := u2t2u("127.0.0.1:20014", "127.0.0.1:20013"); e != nil {
t.Fatal(e)
}
+ cancle()
+ wait()
}
func tcpSer(lis, to string) error {
}
defer conn.Close()
- _, err = bufio.NewReader(conn).ReadString('\n')
+ data, err := bufio.NewReader(conn).ReadBytes('\n')
if err != nil {
ec <- err
- return
+ } else if _, err := conn.Write(data); err != nil {
+ ec <- err
+ } else {
}
-
- // Print the data read from the connection to the terminal
-
- // Write back the same message to the client
- _, _ = conn.Write([]byte("Hello TCP Client\n"))
}()
}
- udpAddr, err := net.ResolveUDPAddr("udp", lis)
+ conn1, err := net.Dial("udp", lis)
if err != nil {
return err
}
- conn1, err := net.ListenUDP("udp", nil)
+ size := 20000
+ data := genData(size)
- if err != nil {
+ if n, err := conn1.Write(data); err != nil || n != size {
return err
}
- _, _ = conn1.WriteToUDP([]byte("Hello UDP Server\n"), udpAddr)
-
- var buf [512]byte
- _ = conn1.SetDeadline(time.Now().Add(time.Second))
- n, _, err := conn1.ReadFromUDP(buf[0:])
- if err != nil {
- return err
- }
-
- if string(buf[:n]) != "Hello TCP Client\n" {
- return errors.New("no match:" + string(buf[:n]))
+ // Read from the connection untill a new line is send
+ buf2 := make([]byte, 100000)
+ n, _ := conn1.Read(buf2)
+ if !bytes.Equal(data, buf2[:n]) {
+ return errors.New("no match")
}
select {
ec <- err
return
}
- defer conn.Close()
- _, err = bufio.NewReader(conn).ReadString('\n')
+ data := make([]byte, 10000)
+ n, err := conn.Read(data)
if err != nil {
ec <- err
- return
+ } else {
+ _, err := conn.Write(data[:n])
+ if err != nil {
+ ec <- err
+ }
}
-
- // Print the data read from the connection to the terminal
-
- // Write back the same message to the client
- _, _ = conn.Write([]byte("Hello UDP Client\n"))
}()
}
return err
}
+ size := 6666
+ data := genData(size)
+
// Send a message to the server
- _, err = conn.Write([]byte("Hello TCP Server\n"))
- if err != nil {
+ if n, err := conn.Write(data); err != nil || n != size {
return err
}
// Read from the connection untill a new line is send
- data, err := bufio.NewReader(conn).ReadString('\n')
+ data2, err := bufio.NewReader(conn).ReadBytes('\n')
if err != nil {
return err
}
- if string(data) != "Hello UDP Client\n" {
- return errors.New("no match:" + string(data))
+ if !bytes.Equal(data, data2) {
+ return errors.New("no match")
+ }
+
+ select {
+ case err := <-ec:
+ return err
+ default:
+ return nil
+ }
+}
+
+func genData(size int) []byte {
+ data := make([]byte, size)
+ if _, e := rand.Read(data); e != nil {
+ panic(e)
+ }
+ data = bytes.ReplaceAll(data, []byte{'\n'}, []byte{' '})
+ data[size-1] = '\n'
+ return data
+}
+
+func u2t2u(lis, to string) error {
+ ec := make(chan error, 10)
+ {
+ listener, err := part.NewUdpListener("udp", to)
+
+ if err != nil {
+ return err
+ }
+
+ defer listener.Close()
+
+ go func() {
+ conn, err := listener.Accept()
+ if err != nil {
+ ec <- err
+ return
+ }
+
+ data := make([]byte, 10000)
+ n, err := conn.Read(data)
+ if err != nil {
+ ec <- err
+ } else {
+ _, err := conn.Write(data[:n])
+ if err != nil {
+ ec <- err
+ }
+ }
+ }()
+ }
+
+ conn1, err := net.Dial("udp", lis)
+
+ if err != nil {
+ return err
+ }
+
+ size := 8888
+ data := genData(size)
+
+ if n, err := conn1.Write(data); err != nil || n != size {
+ return err
+ }
+
+ // Read from the connection untill a new line is send
+ buf2 := make([]byte, 10000)
+ n, _ := conn1.Read(buf2)
+ if !bytes.Equal(data, buf2[:n]) {
+ return errors.New("no match")
}
select {