From c5a763f0a734615d9e9d79b140bc98c9a35f9e9d Mon Sep 17 00:00:00 2001 From: qydysky Date: Sun, 21 Jan 2024 15:36:13 +0800 Subject: [PATCH] 1 --- Net.go | 477 +++++++++++++++++++++++------------------ component/Component.go | 289 +++++++++++++------------ 2 files changed, 408 insertions(+), 358 deletions(-) diff --git a/Net.go b/Net.go index a2b6e82..9b41d44 100644 --- a/Net.go +++ b/Net.go @@ -1,267 +1,318 @@ package part import ( - "net" - "sync" - "strings" - "errors" "bytes" - "net/url" + "errors" + "net" + "net/url" "os/exec" - "runtime" - "time" + "runtime" + "strings" + "sync" + "time" + "github.com/miekg/dns" ) -type netl struct{ - RV []interface{} - Dns dnsl +type netl struct { + RV []interface{} + Dns dnsl } -func Net() *netl{ +func Net() *netl { return &netl{} } -type dnsl struct{ - Server string +type dnsl struct { + Server string } func (this *netl) Nslookup(target string) error { c := dns.Client{} - m := dns.Msg{} - - m.SetQuestion(target+".", dns.TypeA) - - if this.Dns.Server == "" { - if e := this.GetLocalDns(); e != nil {return e} - } + m := dns.Msg{} + + m.SetQuestion(target+".", dns.TypeA) + + if this.Dns.Server == "" { + if e := this.GetLocalDns(); e != nil { + return e + } + } r, _, err := c.Exchange(&m, this.Dns.Server+":53") if err != nil { return err - } - if len(r.Answer) == 0 { + } + if len(r.Answer) == 0 { return errors.New("no answer") - } - - this.RV = append(this.RV, dns.Field(r.Answer[0],1)) - return nil + } + + this.RV = append(this.RV, dns.Field(r.Answer[0], 1)) + return nil } -func (*netl) TestDial(network,address string, Timeout int) bool { - conn, err := net.DialTimeout(network, address, time.Duration(Timeout)*time.Second) - if err != nil { +func (*netl) TestDial(network, address string, Timeout int) bool { + conn, err := net.DialTimeout(network, address, time.Duration(Timeout)*time.Second) + if err != nil { Logf().E(err.Error()) - return false - } - conn.Close() - return true + return false + } + conn.Close() + return true } const ( - ErrorMsg = iota - AcceptMsg - PortMsg + ErrorMsg = iota + AcceptMsg + DenyMsg + PortMsg ) -//when Type is ErrorMsg, Msg is set to error -//when Type is AcceptMsg, Msg is set to nil -//when Type is PortMsg, Msg is set to Listen Port(int) +// when Type is ErrorMsg, Msg is set to error +// when Type is AcceptMsg, Msg is set to net.Addr +// when Type is DenyMsg, Msg is set to net.Addr +// when Type is PortMsg, Msg is set to Listen Port(int) type ForwardMsg struct { - Type int - Msg interface{} + Type int + Msg interface{} } -func Forward(targetaddr,targetnetwork *string, listenaddr string) (closef func(),msg_chan chan ForwardMsg) { - //初始化消息通道 - msg_chan = make(chan ForwardMsg, 1000) - - //尝试监听 - listener, err := net.Listen("tcp", listenaddr + ":0") - if err != nil { - select{ - default:; - case msg_chan <- ForwardMsg{ - Type: ErrorMsg, - Msg: err, - }:; - } - return - } - - //初始化关闭方法 - closef = func(){ - *targetaddr, *targetnetwork = "", "" - listener.Close() - } - - //返回监听端口 - select{ - default:; - case msg_chan <- ForwardMsg{ - Type: PortMsg, - Msg: listener.Addr().(*net.TCPAddr).Port, - }:; - } - - //开始准备转发 - go func(listener net.Listener, targetaddr,targetnetwork *string, msg_chan chan ForwardMsg){ - defer close(msg_chan) - defer listener.Close() - - //tcp 桥 - tcpBridge2 := func (a, b net.Conn) { - - fin:=make(chan bool,1) - var wg sync.WaitGroup - - wg.Add(2) - go func(){ - defer func() { - a.Close() - b.Close() - fin <- true - wg.Done() - }() - - buf := make([]byte, 20480) - - for { - select { - case <-fin: - return; - default: - n, err := a.Read(buf) - - if err != nil {return} - b.Write(buf[:n]) - } - } - }() - - go func(){ - defer func() { - a.Close() - b.Close() - fin <- true - wg.Done() - }() - - buf := make([]byte, 20480) - - for { - select { - case <-fin: - return; - default: - n, err := b.Read(buf) - - if err != nil {return} - a.Write(buf[:n]) - } - } - }() - - wg.Wait() - } - - for { - proxyconn, err := listener.Accept() - if err != nil { - //返回Accept错误 - select{ - default:; - case msg_chan <- ForwardMsg{ - Type: ErrorMsg, - Msg: err, - }:; - } - continue - } - - //返回Accept - select{ - default:; - case msg_chan <- ForwardMsg{ - Type: AcceptMsg, - }:; - } - - if *targetaddr == "" || *targetnetwork == "" {break} - - targetconn, err := net.Dial(*targetnetwork, *targetaddr) - if err != nil { - select{ - default:; - case msg_chan <- ForwardMsg{ - Type: ErrorMsg, - Msg: err, - }:; - } - proxyconn.Close() - continue - } - - go tcpBridge2(proxyconn,targetconn) - } - }(listener, targetaddr, targetnetwork, msg_chan) - - return -} +func Forward(targetaddr, network, listenaddr string, acceptCIDRs []string) (closef func(), msg_chan chan ForwardMsg) { + //初始化消息通道 + msg_chan = make(chan ForwardMsg, 1000) + + //尝试监听 + listener, err := net.Listen(network, listenaddr) + if err != nil { + select { + default: + case msg_chan <- ForwardMsg{ + Type: ErrorMsg, + Msg: err, + }: + } + return + } + + closec := make(chan struct{}) + //初始化关闭方法 + closef = func() { + listener.Close() + close(closec) + } + + //返回监听端口 + select { + default: + case msg_chan <- ForwardMsg{ + Type: PortMsg, + Msg: listener.Addr().(*net.TCPAddr).Port, + }: + } + + matchfunc := []func(ip net.IP) bool{} + + for _, cidr := range acceptCIDRs { + if _, cidrx, err := net.ParseCIDR(cidr); err != nil { + select { + default: + case msg_chan <- ForwardMsg{ + Type: ErrorMsg, + Msg: err, + }: + } + return + } else { + matchfunc = append(matchfunc, cidrx.Contains) + } + } + + //开始准备转发 + go func(listener net.Listener, targetaddr, network string, msg_chan chan ForwardMsg) { + defer close(msg_chan) + defer listener.Close() + + //tcp 桥 + tcpBridge2 := func(a, b net.Conn) { + fin := make(chan bool, 1) + var wg sync.WaitGroup + + wg.Add(2) + go func() { + defer func() { + a.Close() + b.Close() + fin <- true + wg.Done() + }() + + buf := make([]byte, 20480) + for { + select { + case <-fin: + return + default: + n, err := a.Read(buf) + + if err != nil { + return + } + b.Write(buf[:n]) + } + } + }() + + go func() { + defer func() { + a.Close() + b.Close() + fin <- true + wg.Done() + }() + + buf := make([]byte, 20480) + + for { + select { + case <-fin: + return + default: + n, err := b.Read(buf) + + if err != nil { + return + } + a.Write(buf[:n]) + } + } + }() + + wg.Wait() + } + + for { + proxyconn, err := listener.Accept() + if err != nil { + //返回Accept错误 + select { + default: + case msg_chan <- ForwardMsg{ + Type: ErrorMsg, + Msg: err, + }: + } + continue + } + + ip := net.ParseIP(strings.Split(proxyconn.RemoteAddr().String(), ":")[0]) + + var accpet bool + for i := 0; i < len(matchfunc); i++ { + accpet = accpet || matchfunc[i](ip) + } + if !accpet { + //返回Deny + select { + default: + case msg_chan <- ForwardMsg{ + Type: DenyMsg, + Msg: proxyconn.RemoteAddr(), + }: + } + proxyconn.Close() + continue + } + + //返回Accept + select { + default: + case msg_chan <- ForwardMsg{ + Type: AcceptMsg, + Msg: proxyconn.RemoteAddr(), + }: + } + + select { + case <-closec: + default: + break + } + + targetconn, err := net.Dial(network, targetaddr) + if err != nil { + select { + default: + case msg_chan <- ForwardMsg{ + Type: ErrorMsg, + Msg: err, + }: + } + proxyconn.Close() + continue + } + + go tcpBridge2(proxyconn, targetconn) + } + }(listener, targetaddr, network, msg_chan) + + return +} func (this *netl) GetLocalDns() error { if runtime.GOOS == "windows" { - cmd := exec.Command("nslookup","127.0.0.1") + cmd := exec.Command("nslookup", "127.0.0.1") output, _ := cmd.CombinedOutput() - var ip []byte - loc_ip := bytes.Index(output,[]byte("Address:"))+8 + var ip []byte + loc_ip := bytes.Index(output, []byte("Address:")) + 8 - for brk:=1;brk>=0; { - tmp := bytes.IndexAny(output[loc_ip:],"1234567890.") + for brk := 1; brk >= 0; { + tmp := bytes.IndexAny(output[loc_ip:], "1234567890.") if tmp == 0 { - ip=append(ip,output[loc_ip]) - loc_ip=loc_ip+1 - }else{ - brk=brk-1 - loc_ip=loc_ip+tmp + ip = append(ip, output[loc_ip]) + loc_ip = loc_ip + 1 + } else { + brk = brk - 1 + loc_ip = loc_ip + tmp } } - + this.Dns.Server = string(ip) - return nil - }else if Checkfile().IsExist("/etc/resolv.conf") { - cmd := exec.Command("cat","/etc/resolv.conf") + return nil + } else if Checkfile().IsExist("/etc/resolv.conf") { + cmd := exec.Command("cat", "/etc/resolv.conf") output, _ := cmd.CombinedOutput() - var ip []byte - loc_ip := bytes.Index(output,[]byte("nameserver"))+10 - for brk:=1;brk>=0; { - tmp := bytes.IndexAny(output[loc_ip:],"1234567890.") + var ip []byte + loc_ip := bytes.Index(output, []byte("nameserver")) + 10 + for brk := 1; brk >= 0; { + tmp := bytes.IndexAny(output[loc_ip:], "1234567890.") if tmp == 0 { - ip=append(ip,output[loc_ip]) - loc_ip=loc_ip+1 - }else{ - brk=brk-1 - loc_ip=loc_ip+tmp + ip = append(ip, output[loc_ip]) + loc_ip = loc_ip + 1 + } else { + brk = brk - 1 + loc_ip = loc_ip + tmp } } - this.Dns.Server = string(ip) - return nil + this.Dns.Server = string(ip) + return nil } - Logf().E("[err]Dns: system: ",runtime.GOOS) + Logf().E("[err]Dns: system: ", runtime.GOOS) Logf().E("[err]Dns: none") return errors.New("1") } -func MasterDomain(url_s string) (string,error){ - if u,e := url.Parse(url_s);e != nil { - return "",e - } else { - host := u.Hostname() - list := strings.Split(host, ".") - if len(list) < 2 {return "",errors.New("invalid domain:"+host)} - return strings.Join(list[len(list)-2:], "."),nil - } - return "",nil +func MasterDomain(url_s string) (string, error) { + if u, e := url.Parse(url_s); e != nil { + return "", e + } else { + host := u.Hostname() + list := strings.Split(host, ".") + if len(list) < 2 { + return "", errors.New("invalid domain:" + host) + } + return strings.Join(list[len(list)-2:], "."), nil + } + return "", nil } diff --git a/component/Component.go b/component/Component.go index a50e751..f465562 100644 --- a/component/Component.go +++ b/component/Component.go @@ -3,7 +3,6 @@ package part import ( "context" "errors" - "sync" "sync/atomic" ) @@ -12,160 +11,160 @@ var ( ErrSelfDel = errors.New("ErrSelfDel") ) -type Component[T any] struct { +type Component[T, E any] struct { del atomic.Bool - deal func(ctx context.Context, ptr T) error + deal func(ctx context.Context, ptr T) (E, error) } -func NewComp[T any](deal func(ctx context.Context, ptr T) error) *Component[T] { - return &Component[T]{atomic.Bool{}, deal} +func NewComp[T, E any](deal func(ctx context.Context, ptr T) (ret E, err error)) *Component[T, E] { + return &Component[T, E]{atomic.Bool{}, deal} } -func (t *Component[T]) Run(ctx context.Context, ptr T) error { +func (t *Component[T, E]) Run(ctx context.Context, ptr T) (ret E, err error) { if t.del.Load() || t.deal == nil { - return nil + return } return t.deal(ctx, ptr) } -func (t *Component[T]) Del() { +func (t *Component[T, E]) Del() { t.del.Store(true) } -type Components[T any] struct { - lock sync.RWMutex - comps []*Component[T] -} - -func NewComps[T any](c ...*Component[T]) *Components[T] { - return &Components[T]{comps: c} -} - -func (t *Components[T]) Put(c ...*Component[T]) { - t.lock.Lock() - t.comps = append(t.comps, c...) - t.lock.Unlock() -} - -func (t *Components[T]) Del(c ...*Component[T]) { - t.lock.Lock() - for i := 0; i < len(t.comps); i++ { - for j := 0; j < len(c); j++ { - if t.comps[i] == c[j] { - copy(t.comps[i:], t.comps[i+1:]) - t.comps = t.comps[:len(t.comps)-1] - copy(c[i:], c[i+1:]) - c = c[:len(c)-1] - break - } - } - } - t.lock.Unlock() -} - -func (t *Components[T]) DelAll() { - t.lock.Lock() - clear(t.comps) - t.lock.Unlock() -} - -func (t *Components[T]) Run(ctx context.Context, ptr T) error { - var needDel bool - - t.lock.RLock() - defer func() { - t.lock.RUnlock() - if needDel { - t.lock.Lock() - for i := 0; i < len(t.comps); i++ { - if t.comps[i].del.Load() { - copy(t.comps[i:], t.comps[i+1:]) - t.comps = t.comps[:len(t.comps)-1] - } - } - t.lock.Unlock() - } - }() - - for i := 0; i < len(t.comps); i++ { - if t.comps[i].del.Load() || t.comps[i].deal == nil { - continue - } - e := t.comps[i].deal(ctx, ptr) - if errors.Is(e, ErrSelfDel) { - t.comps[i].del.Store(true) - needDel = true - } - if errors.Is(e, ErrStopRun) { - return e - } - } - - return nil -} - -func (t *Components[T]) Start(ctx context.Context, ptr T, concurrency ...int) error { - var needDel bool - - t.lock.RLock() - defer func() { - t.lock.RUnlock() - if needDel { - t.lock.Lock() - for i := 0; i < len(t.comps); i++ { - if t.comps[i].del.Load() { - copy(t.comps[i:], t.comps[i+1:]) - t.comps = t.comps[:len(t.comps)-1] - } - } - t.lock.Unlock() - } - }() - - var ( - wg sync.WaitGroup - con chan struct{} - err = make(chan error, len(t.comps)) - ) - if len(concurrency) > 0 { - con = make(chan struct{}, concurrency[0]) - } - wg.Add(len(t.comps)) - - for i := 0; i < len(t.comps); i++ { - if t.comps[i].del.Load() || t.comps[i].deal == nil { - wg.Done() - err <- nil - continue - } - if con != nil { - con <- struct{}{} - } - go func(i int) { - e := t.comps[i].deal(ctx, ptr) - if errors.Is(e, ErrSelfDel) { - t.comps[i].del.Store(true) - } - err <- e - wg.Done() - if con != nil { - <-con - } - }(i) - } - - wg.Wait() - - for { - select { - case e := <-err: - if errors.Is(e, ErrSelfDel) { - needDel = true - } else if e != nil { - return e - } - default: - return nil - } - } -} +// type Components[T any] struct { +// lock sync.RWMutex +// comps []*Component[T] +// } + +// func NewComps[T any](c ...*Component[T]) *Components[T] { +// return &Components[T]{comps: c} +// } + +// func (t *Components[T]) Put(c ...*Component[T]) { +// t.lock.Lock() +// t.comps = append(t.comps, c...) +// t.lock.Unlock() +// } + +// func (t *Components[T]) Del(c ...*Component[T]) { +// t.lock.Lock() +// for i := 0; i < len(t.comps); i++ { +// for j := 0; j < len(c); j++ { +// if t.comps[i] == c[j] { +// copy(t.comps[i:], t.comps[i+1:]) +// t.comps = t.comps[:len(t.comps)-1] +// copy(c[i:], c[i+1:]) +// c = c[:len(c)-1] +// break +// } +// } +// } +// t.lock.Unlock() +// } + +// func (t *Components[T]) DelAll() { +// t.lock.Lock() +// clear(t.comps) +// t.lock.Unlock() +// } + +// func (t *Components[T]) Run(ctx context.Context, ptr T) error { +// var needDel bool + +// t.lock.RLock() +// defer func() { +// t.lock.RUnlock() +// if needDel { +// t.lock.Lock() +// for i := 0; i < len(t.comps); i++ { +// if t.comps[i].del.Load() { +// copy(t.comps[i:], t.comps[i+1:]) +// t.comps = t.comps[:len(t.comps)-1] +// } +// } +// t.lock.Unlock() +// } +// }() + +// for i := 0; i < len(t.comps); i++ { +// if t.comps[i].del.Load() || t.comps[i].deal == nil { +// continue +// } +// e := t.comps[i].deal(ctx, ptr) +// if errors.Is(e, ErrSelfDel) { +// t.comps[i].del.Store(true) +// needDel = true +// } +// if errors.Is(e, ErrStopRun) { +// return e +// } +// } + +// return nil +// } + +// func (t *Components[T]) Start(ctx context.Context, ptr T, concurrency ...int) error { +// var needDel bool + +// t.lock.RLock() +// defer func() { +// t.lock.RUnlock() +// if needDel { +// t.lock.Lock() +// for i := 0; i < len(t.comps); i++ { +// if t.comps[i].del.Load() { +// copy(t.comps[i:], t.comps[i+1:]) +// t.comps = t.comps[:len(t.comps)-1] +// } +// } +// t.lock.Unlock() +// } +// }() + +// var ( +// wg sync.WaitGroup +// con chan struct{} +// err = make(chan error, len(t.comps)) +// ) +// if len(concurrency) > 0 { +// con = make(chan struct{}, concurrency[0]) +// } +// wg.Add(len(t.comps)) + +// for i := 0; i < len(t.comps); i++ { +// if t.comps[i].del.Load() || t.comps[i].deal == nil { +// wg.Done() +// err <- nil +// continue +// } +// if con != nil { +// con <- struct{}{} +// } +// go func(i int) { +// e := t.comps[i].deal(ctx, ptr) +// if errors.Is(e, ErrSelfDel) { +// t.comps[i].del.Store(true) +// } +// err <- e +// wg.Done() +// if con != nil { +// <-con +// } +// }(i) +// } + +// wg.Wait() + +// for { +// select { +// case e := <-err: +// if errors.Is(e, ErrSelfDel) { +// needDel = true +// } else if e != nil { +// return e +// } +// default: +// return nil +// } +// } +// } -- 2.39.2