diff --git a/protocol/grpc/grpc_protocol.go b/protocol/grpc/grpc_protocol.go index 16c61df0dbb8daf5082088ec0b3f543b4f5e77e8..95b25373b95b37a4114449f786de557b1e81afff 100644 --- a/protocol/grpc/grpc_protocol.go +++ b/protocol/grpc/grpc_protocol.go @@ -45,6 +45,7 @@ type GrpcProtocol struct { func NewGRPCProtocol() *GrpcProtocol { return &GrpcProtocol{ BaseProtocol: protocol.NewBaseProtocol(), + serverMap: make(map[string]*Server), } } @@ -60,7 +61,22 @@ func (gp *GrpcProtocol) Export(invoker protocol.Invoker) protocol.Exporter { } func (gp *GrpcProtocol) openServer(url common.URL) { - return + _, ok := gp.serverMap[url.Location] + if !ok { + _, ok := gp.ExporterMap().Load(url.ServiceKey()) + if !ok { + panic("[GrpcProtocol]" + url.Key() + "is not existing") + } + + gp.serverLock.Lock() + _, ok = gp.serverMap[url.Location] + if !ok { + srv := NewServer() + gp.serverMap[url.Location] = srv + srv.Start(url) + } + gp.serverLock.Unlock() + } } func (gp *GrpcProtocol) Refer(url common.URL, impl interface{}) protocol.Invoker { diff --git a/protocol/grpc/server.go b/protocol/grpc/server.go index 83a9e69f192082c24a0ec4f5bfdbf2e99367cd2d..cc70a9112d9cadd68b4ed175487654c2954246b6 100644 --- a/protocol/grpc/server.go +++ b/protocol/grpc/server.go @@ -18,7 +18,9 @@ limitations under the License. package grpc import ( + "fmt" "net" + "reflect" ) import ( @@ -27,6 +29,9 @@ import ( import ( "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/config" + "github.com/apache/dubbo-go/protocol" ) type Server struct { @@ -34,11 +39,15 @@ type Server struct { } func NewServer() *Server { + return &Server{} +} - return nil +type DubboGrpcService interface { + SetProxyImpl(impl protocol.Invoker) + GetProxyImpl() protocol.Invoker + ServiceDesc() *grpc.ServiceDesc } -// TODO: unimplemented func (s *Server) Start(url common.URL) { var ( addr string @@ -51,12 +60,34 @@ func (s *Server) Start(url common.URL) { } server := grpc.NewServer() + key := url.GetParam(constant.BEAN_NAME_KEY, "") + service := config.GetProviderService(key) + + ds, ok := service.(DubboGrpcService) + if !ok { + panic("illegal service type registered") + } + + m, ok := reflect.TypeOf(service).MethodByName("SetProxyImpl") + if !ok { + panic("method SetProxyImpl is necessary for grpc service") + } + + exporter, _ := grpcProtocol.ExporterMap().Load(url.ServiceKey()) + if exporter == nil { + panic(fmt.Sprintf("no exporter found for servicekey: %v", url.ServiceKey())) + } + invoker := exporter.(protocol.Exporter).GetInvoker() + if invoker == nil { + panic(fmt.Sprintf("no invoker found for servicekey: %v", url.ServiceKey())) + } + in := []reflect.Value{reflect.ValueOf(service)} + in = append(in, reflect.ValueOf(invoker)) + m.Func.Call(in) + + server.RegisterService(ds.ServiceDesc(), service) + s.grpcServer = server - // grpc-go 蹇呴』鎻愬墠娉ㄥ唽 - // ServiceDesc 杩欎釜淇℃伅闇€瑕佹湁 - // 闇€瑕佹壘涓€涓柟娉曘€� - //server.RegisterService() - // 鎯充釜鍔炴硶娉ㄥ唽涓� if err = server.Serve(lis); err != nil { panic(err) } diff --git a/registry/zookeeper/registry.go b/registry/zookeeper/registry.go index 1defedc28a2d42183be8c2e5d77441d8831c1d30..24c4158e8cbe977e428ded523382288b4a93a0e1 100644 --- a/registry/zookeeper/registry.go +++ b/registry/zookeeper/registry.go @@ -370,6 +370,7 @@ func (r *zkRegistry) register(c common.URL) error { return perrors.Errorf("@c{%v} type is not referencer or provider", c) } + dubboPath = strings.ReplaceAll(dubboPath, "$", "%24") err = r.registerTempZookeeperNode(dubboPath, encodedURL) if err != nil {