diff --git a/common/constant/key.go b/common/constant/key.go index f57dcdb77d8011d158ba8248c2a6f1b8a5453055..9b00d021b7858df72edba9fbd4cc7802b7a2832b 100644 --- a/common/constant/key.go +++ b/common/constant/key.go @@ -277,4 +277,6 @@ const ( // SERVICE_DISCOVERY_KEY indicate which service discovery instance will be used SERVICE_DISCOVERY_KEY = "service_discovery" + LANGUAGE_KEY = "language" + GO_LANG = "golang" ) diff --git a/common/rpc_service.go b/common/rpc_service.go index 2eeb52bdfbea21825f7702a750cabe73a3a09bf6..1127e6c018204ee287ba3edfb50b30688a052dd5 100644 --- a/common/rpc_service.go +++ b/common/rpc_service.go @@ -343,6 +343,10 @@ func suiteMethod(method reflect.Method) *MethodType { argsType []reflect.Type ) + if mname == "Reference" { + return nil + } + if outNum != 1 && outNum != 2 { logger.Warnf("method %s of mtype %v has wrong number of in out parameters %d; needs exactly 1/2", mname, mtype.String(), outNum) diff --git a/config/config_loader.go b/config/config_loader.go index 049687924a4745584bfa5e6b37544bf0bf5af427..90eb354cf53fe8009b73f67d35e0827e4b11ecbc 100644 --- a/config/config_loader.go +++ b/config/config_loader.go @@ -189,6 +189,7 @@ func loadProviderConfig() { logger.Errorf("[provider config center refresh] %#v", err) } checkRegistries(providerConfig.Registries, providerConfig.Registry) + for key, svs := range providerConfig.Services { rpcService := GetProviderService(key) if rpcService == nil { diff --git a/metadata/service/exporter/configurable/exporter.go b/metadata/service/exporter/configurable/exporter.go index 1d134bdb3d81f7cb3fd5f00dab5e5e663ed0cfec..e43fdda8861e1fe60010a19033eb4d7b03a38800 100644 --- a/metadata/service/exporter/configurable/exporter.go +++ b/metadata/service/exporter/configurable/exporter.go @@ -55,6 +55,8 @@ func (exporter *MetadataServiceExporter) Export() error { constant.DEFAULT_PROTOCOL: generateMetadataProtocol(), } serviceConfig.InterfaceName = constant.METADATA_SERVICE_NAME + // identify this is a golang server + serviceConfig.Params = map[string]string{constant.LANGUAGE_KEY: constant.GO_LANG} serviceConfig.Group = config.GetApplicationConfig().Name // now the error will always be nil serviceConfig.Version, _ = exporter.metadataService.Version() diff --git a/metadata/service/inmemory/metadata_service_proxy_factory.go b/metadata/service/inmemory/metadata_service_proxy_factory.go index de30d1b089f7b504053b36893d25e71997013634..0afadca8e87d7572452d534f688a66188996c878 100644 --- a/metadata/service/inmemory/metadata_service_proxy_factory.go +++ b/metadata/service/inmemory/metadata_service_proxy_factory.go @@ -41,9 +41,14 @@ func createProxy(ins registry.ServiceInstance) service.MetadataService { logger.Errorf("metadata service urls not found, %v", ins) return nil } - p := extension.GetProtocol(urls[0].Protocol) - invoker := p.Refer(*urls[0]) - return &MetadataServiceProxy{invkr: invoker} + + u := urls[0] + p := extension.GetProtocol(u.Protocol) + invoker := p.Refer(*u) + golang := u.GetParam(constant.LANGUAGE_KEY, "") + return &MetadataServiceProxy{invkr: invoker, + golangServer: golang == constant.GO_LANG, + } } // buildStandardMetadataServiceURL will use standard format to build the metadata service url. diff --git a/metadata/service/inmemory/service.go b/metadata/service/inmemory/service.go index b0782fa3deb190b0577f84ce5979284900da9965..ec626f7f43f6be71b6e5f8562628a48555a5c71d 100644 --- a/metadata/service/inmemory/service.go +++ b/metadata/service/inmemory/service.go @@ -143,7 +143,7 @@ func (mts *MetadataService) getAllService(services *sync.Map) []common.URL { urls := value.(*skip.SkipList) for i := uint64(0); i < urls.Len(); i++ { url := common.URL(urls.ByPosition(i).(Comparator)) - if url.GetParam(constant.INTERFACE_KEY, url.Path) != constant.SIMPLE_METADATA_SERVICE_NAME { + if url.GetParam(constant.INTERFACE_KEY, url.Path) != constant.METADATA_SERVICE_NAME { res = append(res, url) } } @@ -220,12 +220,12 @@ func (mts *MetadataService) PublishServiceDefinition(url common.URL) error { } // GetExportedURLs get all exported urls -func (mts *MetadataService) GetExportedURLs(serviceInterface string, group string, version string, protocol string) ([]common.URL, error) { +func (mts *MetadataService) GetExportedURLs(serviceInterface string, group string, version string, protocol string) ([]interface{}, error) { if serviceInterface == constant.ANY_VALUE { - return mts.getAllService(mts.exportedServiceURLs), nil + return service.ConvertURLArrToIntfArr(mts.getAllService(mts.exportedServiceURLs)), nil } else { serviceKey := definition.ServiceDescriperBuild(serviceInterface, group, version) - return mts.getSpecifiedService(mts.exportedServiceURLs, serviceKey, protocol), nil + return service.ConvertURLArrToIntfArr(mts.getSpecifiedService(mts.exportedServiceURLs, serviceKey, protocol)), nil } } diff --git a/metadata/service/inmemory/service_proxy.go b/metadata/service/inmemory/service_proxy.go index bbe4b11017f626809ff99fae669be5e5f4f8d0d3..95ad680f738ac92880b5d4f32db07ca6728134f6 100644 --- a/metadata/service/inmemory/service_proxy.go +++ b/metadata/service/inmemory/service_proxy.go @@ -20,34 +20,59 @@ package inmemory import ( "context" "reflect" + "time" "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/logger" + "github.com/apache/dubbo-go/metadata/service" "github.com/apache/dubbo-go/protocol" "github.com/apache/dubbo-go/protocol/invocation" ) +// actually it's RPC stub +// this will only be used by client-side +// if the metadata service is "local" metadata service in server side, +// which means that metadata service is RPC service too. +// so in client-side, if we want to get the metadata information, +// we must call metadata service +// this is the stub, or proxy +// for now, only GetExportedURLs will be implemented type MetadataServiceProxy struct { - invkr protocol.Invoker + invkr protocol.Invoker + golangServer bool } -func (m *MetadataServiceProxy) GetExportedURLs(serviceInterface string, group string, version string, protocol string) ([]common.URL, error) { +func (m *MetadataServiceProxy) GetExportedURLs(serviceInterface string, group string, version string, protocol string) ([]interface{}, error) { siV := reflect.ValueOf(serviceInterface) gV := reflect.ValueOf(group) vV := reflect.ValueOf(version) pV := reflect.ValueOf(protocol) - inv := invocation.NewRPCInvocationWithOptions(invocation.WithMethodName("getExportedURLs"), + // this is a strange logic + // we should notice that + // when we call java server, the method was register as "getExportedURLs" + // however, if we call golang server, the method was register as "GetExportedURLs" + // it's so tricky... + methodName := "getExportedURLs" + if m.golangServer { + methodName = "GetExportedURLs" + } + + inv := invocation.NewRPCInvocationWithOptions(invocation.WithMethodName(methodName), invocation.WithArguments([]interface{}{siV.Interface(), gV.Interface(), vV.Interface(), pV.Interface()}), invocation.WithReply(reflect.ValueOf(&[]interface{}{}).Interface()), invocation.WithAttachments(map[string]string{constant.ASYNC_KEY: "false"}), invocation.WithParameterValues([]reflect.Value{siV, gV, vV, pV})) + start := time.Now() res := m.invkr.Invoke(context.Background(), inv) + end := time.Now() + logger.Infof("duration: ", (end.Sub(start)).String()) if res.Error() != nil { logger.Errorf("could not get the metadata service from remote provider: %v", res.Error()) + return []interface{}{}, nil } urlStrs := res.Result().(*[]interface{}) @@ -62,61 +87,65 @@ func (m *MetadataServiceProxy) GetExportedURLs(serviceInterface string, group st } ret = append(ret, u) } - return ret, nil + return service.ConvertURLArrToIntfArr(ret), nil } func (m *MetadataServiceProxy) Reference() string { - panic("implement me") + logger.Error("you should never invoke this implementation") + return constant.METADATA_SERVICE_NAME } func (m *MetadataServiceProxy) ServiceName() (string, error) { - panic("implement me") + logger.Error("you should never invoke this implementation") + return "", nil } func (m *MetadataServiceProxy) ExportURL(url common.URL) (bool, error) { - panic("implement me") + logger.Error("you should never invoke this implementation") + return false, nil } func (m *MetadataServiceProxy) UnexportURL(url common.URL) error { - panic("implement me") + logger.Error("you should never invoke this implementation") + return nil } func (m *MetadataServiceProxy) SubscribeURL(url common.URL) (bool, error) { - panic("implement me") + logger.Error("you should never invoke this implementation") + return false, nil } func (m *MetadataServiceProxy) UnsubscribeURL(url common.URL) error { - panic("implement me") + logger.Error("you should never invoke this implementation") + return nil } func (m *MetadataServiceProxy) PublishServiceDefinition(url common.URL) error { - panic("implement me") + logger.Error("you should never invoke this implementation") + return nil } func (m *MetadataServiceProxy) GetSubscribedURLs() ([]common.URL, error) { - panic("implement me") + logger.Error("you should never invoke this implementation") + return []common.URL{}, nil } func (m *MetadataServiceProxy) GetServiceDefinition(interfaceName string, group string, version string) (string, error) { - panic("implement me") + logger.Error("you should never invoke this implementation") + return "", nil } func (m *MetadataServiceProxy) GetServiceDefinitionByServiceKey(serviceKey string) (string, error) { - panic("implement me") + logger.Error("you should never invoke this implementation") + return "", nil } func (m *MetadataServiceProxy) RefreshMetadata(exportedRevision string, subscribedRevision string) (bool, error) { - panic("implement me") + logger.Error("you should never invoke this implementation") + return false, nil } func (m *MetadataServiceProxy) Version() (string, error) { - panic("implement me") -} - -type MetadataServiceStub struct { - GetExportedURLs func(serviceInterface string, group string, version string, protocol string) ([]interface{}, error) -} - -func (m *MetadataServiceStub) Reference() string { - return constant.METADATA_SERVICE_NAME + logger.Error("you should never invoke this implementation") + return "", nil } diff --git a/metadata/service/remote/service.go b/metadata/service/remote/service.go index 19f0c021d3002ba8274a0909746da8f0dca79d3e..e1127f72db9d5700616d4daed44fe05909d57082 100644 --- a/metadata/service/remote/service.go +++ b/metadata/service/remote/service.go @@ -132,7 +132,7 @@ func (mts *MetadataService) PublishServiceDefinition(url common.URL) error { } // GetExportedURLs will be implemented by in memory service -func (mts *MetadataService) GetExportedURLs(serviceInterface string, group string, version string, protocol string) ([]common.URL, error) { +func (mts *MetadataService) GetExportedURLs(serviceInterface string, group string, version string, protocol string) ([]interface{}, error) { return mts.inMemoryMetadataService.GetExportedURLs(serviceInterface, group, version, protocol) } @@ -162,9 +162,10 @@ func (mts *MetadataService) RefreshMetadata(exportedRevision string, subscribedR } logger.Infof("urls length = %v", len(urls)) for _, u := range urls { - id := identifier.NewServiceMetadataIdentifier(u) + + id := identifier.NewServiceMetadataIdentifier(u.(common.URL)) id.Revision = mts.exportedRevision.Load() - if err := mts.delegateReport.SaveServiceMetadata(id, u); err != nil { + if err := mts.delegateReport.SaveServiceMetadata(id, u.(common.URL)); err != nil { logger.Errorf("Error occur when execute remote.MetadataService.RefreshMetadata, error message is %+v", err) return false, err } diff --git a/metadata/service/remote/service_proxy.go b/metadata/service/remote/service_proxy.go index 090e8edc3d7ccd0167a27b7afb52becfde9914a5..8fe2e51d4d21973fcf0107477afd6f567f9ff02b 100644 --- a/metadata/service/remote/service_proxy.go +++ b/metadata/service/remote/service_proxy.go @@ -45,31 +45,31 @@ func (m *metadataServiceProxy) ServiceName() (string, error) { } func (m *metadataServiceProxy) ExportURL(url common.URL) (bool, error) { - logger.Errorf("you should never invoke this implementation") + logger.Error("you should never invoke this implementation") return true, nil } func (m *metadataServiceProxy) UnexportURL(url common.URL) error { - logger.Errorf("you should never invoke this implementation") + logger.Error("you should never invoke this implementation") return nil } func (m *metadataServiceProxy) SubscribeURL(url common.URL) (bool, error) { - logger.Errorf("you should never invoke this implementation") + logger.Error("you should never invoke this implementation") return true, nil } func (m *metadataServiceProxy) UnsubscribeURL(url common.URL) error { - logger.Errorf("you should never invoke this implementation") + logger.Error("you should never invoke this implementation") return nil } func (m *metadataServiceProxy) PublishServiceDefinition(url common.URL) error { - logger.Errorf("you should never invoke this implementation") + logger.Error("you should never invoke this implementation") return nil } -func (m *metadataServiceProxy) GetExportedURLs(serviceInterface string, group string, version string, protocol string) ([]common.URL, error) { +func (m *metadataServiceProxy) GetExportedURLs(serviceInterface string, group string, version string, protocol string) ([]interface{}, error) { urls := m.report.GetExportedURLs(&identifier.ServiceMetadataIdentifier{ BaseMetadataIdentifier: identifier.BaseMetadataIdentifier{ ServiceInterface: serviceInterface, @@ -89,11 +89,11 @@ func (m *metadataServiceProxy) GetExportedURLs(serviceInterface string, group st } res = append(res, u) } - return res, nil + return service.ConvertURLArrToIntfArr(res), nil } func (m *metadataServiceProxy) GetSubscribedURLs() ([]common.URL, error) { - logger.Errorf("you should never invoke this implementation") + logger.Error("you should never invoke this implementation") return []common.URL{}, nil } @@ -115,7 +115,7 @@ func (m *metadataServiceProxy) GetServiceDefinitionByServiceKey(serviceKey strin } func (m *metadataServiceProxy) RefreshMetadata(exportedRevision string, subscribedRevision string) (bool, error) { - logger.Errorf("you should never invoke this implementation") + logger.Error("you should never invoke this implementation") return true, nil } diff --git a/metadata/service/service.go b/metadata/service/service.go index fc76eca68f1584e22f2c3665bf609956a0650a06..d5673e355216216b87f640f41f36bdefddc8fae4 100644 --- a/metadata/service/service.go +++ b/metadata/service/service.go @@ -43,7 +43,8 @@ type MetadataService interface { PublishServiceDefinition(url common.URL) error // GetExportedURLs will get the target exported url in metadata // the url should be unique - GetExportedURLs(serviceInterface string, group string, version string, protocol string) ([]common.URL, error) + // due to dubbo-go only support return array []interface{} in RPCService, so we should declare the return type as []interface{} + GetExportedURLs(serviceInterface string, group string, version string, protocol string) ([]interface{}, error) // GetExportedURLs will get the target subscribed url in metadata // the url should be unique GetSubscribedURLs() ([]common.URL, error) @@ -108,3 +109,15 @@ func getExportedServicesRevision(serviceInstance registry.ServiceInstance) strin metaData := serviceInstance.GetMetadata() return metaData[constant.EXPORTED_SERVICES_REVISION_PROPERTY_NAME] } + +func ConvertURLArrToIntfArr(urls []common.URL) []interface{} { + if len(urls) == 0 { + return []interface{}{} + } + + res := make([]interface{}, 0, len(urls)) + for _, u := range urls { + res = append(res, u) + } + return res +} \ No newline at end of file diff --git a/registry/event/metadata_service_url_params_customizer.go b/registry/event/metadata_service_url_params_customizer.go index 811d9308ee0dced26e0e35106b4e08f0436d09c9..9f791c8337fd2d1e78409786073bac3a2505ca0e 100644 --- a/registry/event/metadata_service_url_params_customizer.go +++ b/registry/event/metadata_service_url_params_customizer.go @@ -74,13 +74,14 @@ func (m *metadataServiceURLParamsMetadataCustomizer) Customize(instance registry instance.GetMetadata()[constant.METADATA_SERVICE_URL_PARAMS_PROPERTY_NAME] = string(str) } -func (m *metadataServiceURLParamsMetadataCustomizer) convertToParams(urls []common.URL) map[string]map[string]string { +func (m *metadataServiceURLParamsMetadataCustomizer) convertToParams(urls []interface{}) map[string]map[string]string { // usually there will be only one protocol res := make(map[string]map[string]string, 1) // those keys are useless - for _, u := range urls { + for _, ui := range urls { + u := ui.(common.URL) p := make(map[string]string, len(u.GetParams())) for k, v := range u.GetParams() { // we will ignore that diff --git a/registry/event/protocol_ports_metadata_customizer.go b/registry/event/protocol_ports_metadata_customizer.go index be2e381194f8b5d8f16ea54aef622434552d486a..eae3f82c200cdc994e44f1c03b3c683d18174bc2 100644 --- a/registry/event/protocol_ports_metadata_customizer.go +++ b/registry/event/protocol_ports_metadata_customizer.go @@ -21,6 +21,7 @@ import ( "encoding/json" "strconv" + "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/logger" "github.com/apache/dubbo-go/registry" @@ -52,7 +53,8 @@ func (p *ProtocolPortsMetadataCustomizer) Customize(instance registry.ServiceIns return } - for _, u := range list { + for _, ui := range list { + u := ui.(common.URL) if len(u.Protocol) == 0 { continue } diff --git a/registry/event/service_revision_customizer.go b/registry/event/service_revision_customizer.go index f51a955810f50da55dfc7b60d28ff9299865e26c..a001f36f6255623d1563eb9bce80330c8174919d 100644 --- a/registry/event/service_revision_customizer.go +++ b/registry/event/service_revision_customizer.go @@ -26,6 +26,7 @@ import ( "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/common/logger" + "github.com/apache/dubbo-go/metadata/service" "github.com/apache/dubbo-go/registry" ) @@ -85,7 +86,7 @@ func (e *subscribedServicesRevisionMetadataCustomizer) Customize(instance regist logger.Errorf("could not find the subscribed url", err) } - revision := resolveRevision(urls) + revision := resolveRevision(service.ConvertURLArrToIntfArr(urls)) if len(revision) == 0 { revision = defaultRevision } @@ -96,13 +97,14 @@ func (e *subscribedServicesRevisionMetadataCustomizer) Customize(instance regist // so that we could use interface + method name as identifier and ignore the method params // per my understanding, it's enough because Dubbo actually ignore the url params. // please refer org.apache.dubbo.common.URL#toParameterString(java.lang.String...) -func resolveRevision(urls []common.URL) string { +func resolveRevision(urls []interface{}) string { if len(urls) == 0 { return "" } candidates := make([]string, 0, len(urls)) - for _, u := range urls { + for _, ui := range urls { + u := ui.(common.URL) sk := u.GetParam(constant.INTERFACE_KEY, "") for _, m := range u.Methods { // methods are part of candidates diff --git a/registry/servicediscovery/service_discovery_registry.go b/registry/servicediscovery/service_discovery_registry.go index 526603bf1ca53eb535cec3ac144b725f75267e8d..a3899861ab6e3d468b8b8913c8507bb0626c1ef5 100644 --- a/registry/servicediscovery/service_discovery_registry.go +++ b/registry/servicediscovery/service_discovery_registry.go @@ -56,8 +56,7 @@ func init() { // It's completely different from other registry implementations // This implementation is based on ServiceDiscovery abstraction and ServiceNameMapping // In order to keep compatible with interface-level registry锛� -// 1. when we registry the service, we should create the mapping from service name to application name -// 2. when we sub +// this implementation is type serviceDiscoveryRegistry struct { lock sync.RWMutex url *common.URL @@ -72,13 +71,7 @@ type serviceDiscoveryRegistry struct { func newServiceDiscoveryRegistry(url *common.URL) (registry.Registry, error) { - // the metadata service is exported in DubboBootstrap of Java Dubbo - // but I don't want to introduce similar structure because we has less logic to do - // so I codes the related logic here. - // If necessary we need to think about moving there codes to somewhere else. - - // init and expose metadata service - initMetadataService() + tryInitMetadataService() serviceDiscovery, err := creatServiceDiscovery(url) if err != nil { @@ -419,7 +412,12 @@ func (s *serviceDiscoveryRegistry) getExportedUrlsByInst(serviceInstance registr logger.Errorf("get exported urls catch error:%s,instance:%+v", err.Error(), serviceInstance) return urls } - return result + + ret := make([]common.URL, 0, len(result)) + for _, ui := range result { + ret = append(ret, ui.(common.URL)) + } + return ret } func (s *serviceDiscoveryRegistry) prepareServiceRevisionExportedURLs(serviceInstances []registry.ServiceInstance) { @@ -663,17 +661,21 @@ func (icn *InstanceChangeNotify) Notify(event observer.Event) { } } -func initMetadataService() { +var exporting = false + +func tryInitMetadataService() { + ms, err := extension.GetMetadataService(config.GetApplicationConfig().MetadataType) if err != nil { logger.Errorf("could not init metadata service", err) } - // we don't need to expose the metadata service since this is a pure consumer application - if !config.IsProvider() { + if !config.IsProvider() || exporting { return } + exporting = true + expt := configurable.NewMetadataServiceExporter(ms) err = expt.Export()