From: qydysky Date: Wed, 3 Jan 2024 18:26:52 +0000 (+0800) Subject: 1 X-Git-Tag: v0.28.20240103182958 X-Git-Url: http://127.0.0.1:8081/?a=commitdiff_plain;h=95cdae95fdfd2614da22f5d80fa5e4f4104359c7;p=part%2F.git 1 --- diff --git a/rpc/Rpc.go b/rpc/Rpc.go index 00a89ff..f2add44 100644 --- a/rpc/Rpc.go +++ b/rpc/Rpc.go @@ -4,32 +4,64 @@ import ( "bytes" "context" "encoding/gob" + "errors" "net/http" "net/rpc" web "github.com/qydysky/part/web" ) +var ( + ErrSerGob = errors.New("ErrSerGob") + ErrSerDecode = errors.New("ErrSerDecode") + ErrCliDecode = errors.New("ErrCliDecode") + ErrSerDeal = errors.New("ErrSerDeal") + ErrCliDeal = errors.New("ErrCliDeal") + ErrCliEncode = errors.New("ErrCliEncode") + ErrSerEncode = errors.New("ErrSerEncode") + ErrRegister = errors.New("ErrRegister") + ErrDial = errors.New("ErrDial") +) + type Gob struct { - Key string Data []byte + Err error } -func NewGob(k string) *Gob { - return &Gob{Key: k} -} +// func NewGob(ptr any) *Gob { +// t := &Gob{} +// var buf bytes.Buffer +// t.Err = gob.NewEncoder(&buf).Encode(ptr) +// t.Data = buf.Bytes() +// return t +// } -func (t *Gob) Encode(e any) (err error) { +func (t *Gob) encode(ptr any) *Gob { var buf bytes.Buffer - err = gob.NewEncoder(&buf).Encode(e) + t.Err = gob.NewEncoder(&buf).Encode(ptr) t.Data = buf.Bytes() - return + return t } -func (t *Gob) Decode(e any) (err error) { - return gob.NewDecoder(bytes.NewReader(t.Data)).Decode(e) +func (t *Gob) decode(ptr any) *Gob { + t.Err = gob.NewDecoder(bytes.NewReader(t.Data)).Decode(ptr) + return t } +// func (t *Gob) RpcDeal(host, path string) *Gob { +// if t.Err == nil { +// if c, e := rpc.DialHTTPPath("tcp", host, path); e != nil { +// t.Err = e +// } else { +// call := <-c.Go("DealGob.Deal", t, t, make(chan *rpc.Call, 1)).Done +// if call.Error != nil { +// t.Err = call.Error +// } +// } +// } +// return t +// } + type DealGob struct { deal func(*Gob, *Gob) error } @@ -42,58 +74,72 @@ func (t *DealGob) Deal(i *Gob, o *Gob) error { return t.deal(i, o) } -type Pob struct { - Host string `json:"host"` - Path string `json:"path"` - s *rpc.Server - c *rpc.Client +type Server struct { + webP web.WebPath + Shutdown func(ctx ...context.Context) } -func (t *Pob) Server(deal func(i, o *Gob) error) (shutdown func(ctx ...context.Context), err error) { - var path web.WebPath +func NewServer(host string) *Server { + ser := &Server{} webSync := web.NewSyncMap(&http.Server{ - Addr: t.Host, - }, &path) - shutdown = webSync.Shutdown - - t.s = rpc.NewServer() - if e := t.s.Register(newDealGob(deal)); e != nil { - err = e - return + Addr: host, + }, &ser.webP) + ser.Shutdown = webSync.Shutdown + return ser +} + +func Register[T, E any](t *Server, path string, deal func(it *T, ot *E) error) error { + s := rpc.NewServer() + if e := s.Register(newDealGob(func(i, o *Gob) error { + if i.Err != nil { + return errors.Join(ErrSerGob, i.Err) + } else { + var it T + var ot E + if e := i.decode(&it).Err; e != nil { + return errors.Join(ErrSerDecode, e) + } + if e := deal(&it, &ot); e != nil { + return errors.Join(ErrSerDeal, e) + } + if e := o.encode(&ot).Err; e != nil { + return errors.Join(ErrSerEncode, e) + } + return nil + } + })); e != nil { + return errors.Join(ErrRegister, e) } - - path.Store(t.Path, func(w http.ResponseWriter, r *http.Request) { - t.s.ServeHTTP(w, r) + t.webP.Store(path, func(w http.ResponseWriter, r *http.Request) { + s.ServeHTTP(w, r) }) - return -} - -func (t *Pob) Client() (pobClient *PobClient, err error) { - t.c, err = rpc.DialHTTPPath("tcp", t.Host, t.Path) - pobClient = &PobClient{t.c} - return -} - -type PobClient struct { - c *rpc.Client + return nil } -func (t *PobClient) Close() { - t.c.Close() +func UnRegister(t *Server, path string) { + t.webP.Store(path, nil) } -func (t *PobClient) CallIO(i, o *Gob) (err error) { - return t.c.Call("DealGob.Deal", i, o) -} - -func (t *PobClient) GoIO(i, o *Gob, done chan *rpc.Call) *rpc.Call { - return t.c.Go("DealGob.Deal", i, o, done) -} - -func (t *PobClient) Call(g *Gob) (err error) { - return t.c.Call("DealGob.Deal", g, g) -} - -func (t *PobClient) Go(g *Gob, done chan *rpc.Call) *rpc.Call { - return t.c.Go("DealGob.Deal", g, g, done) +func Call[T, E any](it *T, ot *E, host, path string) error { + var buf bytes.Buffer + if e := gob.NewEncoder(&buf).Encode(it); e != nil { + return errors.Join(ErrCliEncode, e) + } else { + if c, e := rpc.DialHTTPPath("tcp", host, path); e != nil { + return errors.Join(ErrDial, e) + } else { + t := &Gob{Data: buf.Bytes()} + call := <-c.Go("DealGob.Deal", t, t, make(chan *rpc.Call, 1)).Done + if call.Error != nil { + return errors.Join(ErrCliDeal, call.Error) + } + if t.Err != nil { + return errors.Join(ErrSerGob, t.Err) + } + if e := gob.NewDecoder(bytes.NewReader(t.Data)).Decode(ot); e != nil { + return errors.Join(ErrCliDecode, e) + } + return nil + } + } } diff --git a/rpc/Rpc_test.go b/rpc/Rpc_test.go index dadb0c6..247830c 100644 --- a/rpc/Rpc_test.go +++ b/rpc/Rpc_test.go @@ -1,62 +1,45 @@ package part import ( - "errors" - "log" "testing" "time" ) +type test1 struct { + Data test1_1 +} +type test1_1 struct { + Data int +} + +type test2 struct { + Data test2_1 +} +type test2_1 struct { + Data int +} + func TestMain(t *testing.T) { - pob := Pob{Host: "127.0.0.1:10902", Path: "/123"} - if shutdown, e := pob.Server(func(i, o *Gob) error { - switch i.Key { - case "+": - var ivv int - if e := i.Decode(&ivv); e != nil { - log.Fatal("d", e) - } - ivv += 1 - if e := o.Encode(ivv); e != nil { - log.Fatal("e", e) - } - default: - return errors.New("no key") - } + pob := NewServer("127.0.0.1:10902") + defer pob.Shutdown() + if e := Register(pob, "/123", func(i *int, o *test1) error { + *i += 1 + o.Data.Data = *i return nil }); e != nil { t.Fatal(e) - } else { - defer shutdown() } time.Sleep(time.Second) - if c, e := pob.Client(); e != nil { + var i int = 9 + var out test2 + + if e := Call(&i, &out, "127.0.0.1:10902", "/123"); e != nil { t.Fatal(e) - } else { - var gob = NewGob("+") - - var i int = 9 - if e := gob.Encode(&i); e != nil { - t.Fatal(e) - } - if e := gob.Decode(&i); e != nil { - t.Fatal(e) - } - if e := c.Call(gob); e != nil { - t.Fatal(e) - } else if gob.Key != "+" { - t.Fatal() - } else { - if e := gob.Decode(&i); e != nil { - t.Fatal(e) - } - if i != 10 { - t.Fatal() - } - } - c.Close() } + if out.Data.Data != 10 { + t.FailNow() + } }