From 2997c11dee8f91d76f283319b6ffb0af3b7a2866 Mon Sep 17 00:00:00 2001 From: flycash <mingflycash@gmail.com> Date: Fri, 5 Jun 2020 23:34:27 +0800 Subject: [PATCH] refactor MetadataService --- common/extension/metadata_service.go | 42 +++++++++++++++++++ common/rpc_service_test.go | 6 +-- config/service_config.go | 4 +- .../mapping/dynamic/service_name_mapping.go | 5 ++- .../dynamic/service_name_mapping_test.go | 2 +- metadata/report/nacos/report.go | 2 +- metadata/report/nacos/report_test.go | 2 +- .../service/exporter/configurable/exporter.go | 21 +++++----- .../exporter/configurable/exporter_test.go | 2 +- metadata/service/inmemory/service.go | 20 ++++++--- metadata/service/inmemory/service_test.go | 15 +++---- metadata/service/remote/service.go | 32 ++++++++------ metadata/service/remote/service_test.go | 8 ++-- metadata/service/service.go | 14 +++++-- .../event_publishing_service_discovery.go | 7 ++++ registry/event/log_event_listener_test.go | 1 - .../metadata_service_url_params_customizer.go | 8 ++-- .../protocol_ports_metadata_customizer.go | 3 +- .../event/service_config_exported_event.go | 15 ++++++- .../event/service_name_mapping_listener.go | 14 +++++++ registry/event/service_revision_customizer.go | 5 +-- .../service_discovery_registry.go | 35 ++++++++++++++-- .../service_discovery_registry_test.go | 4 +- 23 files changed, 196 insertions(+), 71 deletions(-) create mode 100644 common/extension/metadata_service.go diff --git a/common/extension/metadata_service.go b/common/extension/metadata_service.go new file mode 100644 index 000000000..c6a165c48 --- /dev/null +++ b/common/extension/metadata_service.go @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package extension + +import ( + "fmt" + + "github.com/apache/dubbo-go/metadata/service" +) + +var ( + // there will be two types: local or remote + metadataServiceInsMap = make(map[string]func() (service.MetadataService, error), 2) +) + +func SetMetadataService(msType string, creator func() (service.MetadataService, error)) { + metadataServiceInsMap[msType] = creator +} + +func GetMetadataService(msType string) (service.MetadataService, error) { + if creator, ok := metadataServiceInsMap[msType]; ok { + return creator() + } + panic(fmt.Sprintf("could not find the creator for metadataType: %s, please check whether you have imported relative packages, \n"+ + "local - github.com/apache/dubbo-go/metadata/service/inmemory, \n"+ + "remote - github.com/apache/dubbo-go/metadata/service/remote", msType)) +} diff --git a/common/rpc_service_test.go b/common/rpc_service_test.go index 19a1d7b03..2311205d0 100644 --- a/common/rpc_service_test.go +++ b/common/rpc_service_test.go @@ -137,7 +137,7 @@ func TestSuiteMethod(t *testing.T) { assert.True(t, ok) methodType := suiteMethod(method) method = methodType.Method() - assert.Equal(t, "func(*event.TestService, context.Context, interface {}, interface {}, interface {}) error", method.Type.String()) + assert.Equal(t, "func(*common.TestService, context.Context, interface {}, interface {}, interface {}) error", method.Type.String()) at := methodType.ArgsType() assert.Equal(t, "interface {}", at[0].String()) assert.Equal(t, "interface {}", at[1].String()) @@ -151,7 +151,7 @@ func TestSuiteMethod(t *testing.T) { assert.True(t, ok) methodType = suiteMethod(method) method = methodType.Method() - assert.Equal(t, "func(*event.TestService, interface {}, interface {}, interface {}) (interface {}, error)", method.Type.String()) + assert.Equal(t, "func(*common.TestService, interface {}, interface {}, interface {}) (interface {}, error)", method.Type.String()) at = methodType.ArgsType() assert.Equal(t, "interface {}", at[0].String()) assert.Equal(t, "interface {}", at[1].String()) @@ -164,7 +164,7 @@ func TestSuiteMethod(t *testing.T) { assert.True(t, ok) methodType = suiteMethod(method) method = methodType.Method() - assert.Equal(t, "func(*event.TestService) error", method.Type.String()) + assert.Equal(t, "func(*common.TestService) error", method.Type.String()) at = methodType.ArgsType() assert.Equal(t, 0, len(at)) assert.Nil(t, methodType.CtxType()) diff --git a/config/service_config.go b/config/service_config.go index 2b1e5a1a3..43c53bc9c 100644 --- a/config/service_config.go +++ b/config/service_config.go @@ -44,7 +44,9 @@ import ( "github.com/apache/dubbo-go/protocol/protocolwrapper" ) -// ServiceConfig ... +// ServiceConfig is a newest structure to support Dubbo 2.7.5 +// But I think it's not very necessary, +// we should think about how to reuse current ProviderConfig rather than use this type ServiceConfig struct { context context.Context id string diff --git a/metadata/mapping/dynamic/service_name_mapping.go b/metadata/mapping/dynamic/service_name_mapping.go index 56ef2a0f0..9f65adf73 100644 --- a/metadata/mapping/dynamic/service_name_mapping.go +++ b/metadata/mapping/dynamic/service_name_mapping.go @@ -21,7 +21,9 @@ import ( "strconv" "time" + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/common/logger" ) import ( @@ -55,7 +57,8 @@ type DynamicConfigurationServiceNameMapping struct { func (d *DynamicConfigurationServiceNameMapping) Map(serviceInterface string, group string, version string, protocol string) error { // metadata service is admin service, should not be mapped if constant.METADATA_SERVICE_NAME == serviceInterface { - return perrors.New("try to map the metadata service, will be ignored") + logger.Info("try to map the metadata service, will be ignored") + return nil } appName := config.GetApplicationConfig().Name diff --git a/metadata/mapping/dynamic/service_name_mapping_test.go b/metadata/mapping/dynamic/service_name_mapping_test.go index 647a15ae7..2896b0fd4 100644 --- a/metadata/mapping/dynamic/service_name_mapping_test.go +++ b/metadata/mapping/dynamic/service_name_mapping_test.go @@ -48,7 +48,7 @@ func TestDynamicConfigurationServiceNameMapping(t *testing.T) { protocol := "myProtocol" err = mapping.Map(intf, group, version, protocol) - assert.NotNil(t, err) + assert.Nil(t, err) intf = "MyService" err = mapping.Map(intf, group, version, protocol) assert.Nil(t, err) diff --git a/metadata/report/nacos/report.go b/metadata/report/nacos/report.go index 5eaee7bb2..6b838187b 100644 --- a/metadata/report/nacos/report.go +++ b/metadata/report/nacos/report.go @@ -181,7 +181,7 @@ func (n *nacosMetadataReport) getConfigAsArray(param vo.ConfigParam) []string { func (n *nacosMetadataReport) getConfig(param vo.ConfigParam) string { cfg, err := n.client.GetConfig(param) if err != nil { - logger.Errorf("Finding the configuration failed: %v", param) + logger.Errorf("Finding the configuration failed: %v, err: %v", param, err) } return cfg } diff --git a/metadata/report/nacos/report_test.go b/metadata/report/nacos/report_test.go index 711e6281a..f23ed8a8d 100644 --- a/metadata/report/nacos/report_test.go +++ b/metadata/report/nacos/report_test.go @@ -44,7 +44,7 @@ func TestNacosMetadataReport_CRUD(t *testing.T) { assert.Nil(t, err) serviceMi := newServiceMetadataIdentifier() - serviceUrl, _ := common.NewURL("registry://console.nacos.io:80", common.WithParamsValue(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER))) + serviceUrl, _ := common.NewURL("registry://localhost:8848", common.WithParamsValue(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER))) err = rpt.SaveServiceMetadata(serviceMi, serviceUrl) assert.Nil(t, err) diff --git a/metadata/service/exporter/configurable/exporter.go b/metadata/service/exporter/configurable/exporter.go index ec3f8ec2d..1d134bdb3 100644 --- a/metadata/service/exporter/configurable/exporter.go +++ b/metadata/service/exporter/configurable/exporter.go @@ -33,7 +33,7 @@ import ( // MetadataServiceExporter is the ConfigurableMetadataServiceExporter which implement MetadataServiceExporter interface type MetadataServiceExporter struct { - serviceConfig *config.ServiceConfig + ServiceConfig *config.ServiceConfig lock sync.RWMutex metadataService service.MetadataService } @@ -56,42 +56,43 @@ func (exporter *MetadataServiceExporter) Export() error { } serviceConfig.InterfaceName = constant.METADATA_SERVICE_NAME serviceConfig.Group = config.GetApplicationConfig().Name - serviceConfig.Version = exporter.metadataService.Version() + // now the error will always be nil + serviceConfig.Version, _ = exporter.metadataService.Version() var err error func() { exporter.lock.Lock() defer exporter.lock.Unlock() - exporter.serviceConfig = serviceConfig - exporter.serviceConfig.Implement(exporter.metadataService) - err = exporter.serviceConfig.Export() + exporter.ServiceConfig = serviceConfig + exporter.ServiceConfig.Implement(exporter.metadataService) + err = exporter.ServiceConfig.Export() }() - logger.Infof("The MetadataService exports urls : %v ", exporter.serviceConfig.GetExportedUrls()) + logger.Infof("The MetadataService exports urls : %v ", exporter.ServiceConfig.GetExportedUrls()) return err } - logger.Warnf("The MetadataService has been exported : %v ", exporter.serviceConfig.GetExportedUrls()) + logger.Warnf("The MetadataService has been exported : %v ", exporter.ServiceConfig.GetExportedUrls()) return nil } // Unexport will unexport the metadataService func (exporter *MetadataServiceExporter) Unexport() { if exporter.IsExported() { - exporter.serviceConfig.Unexport() + exporter.ServiceConfig.Unexport() } } // GetExportedURLs will return the urls that export use. // Noticeļ¼The exported url is not same as url in registry , for example it lack the ip. func (exporter *MetadataServiceExporter) GetExportedURLs() []*common.URL { - return exporter.serviceConfig.GetExportedUrls() + return exporter.ServiceConfig.GetExportedUrls() } // isExported will return is metadataServiceExporter exported or not func (exporter *MetadataServiceExporter) IsExported() bool { exporter.lock.RLock() defer exporter.lock.RUnlock() - return exporter.serviceConfig != nil && exporter.serviceConfig.IsExport() + return exporter.ServiceConfig != nil && exporter.ServiceConfig.IsExport() } // generateMetadataProtocol will return a default ProtocolConfig diff --git a/metadata/service/exporter/configurable/exporter_test.go b/metadata/service/exporter/configurable/exporter_test.go index 364169b31..20a80ef70 100644 --- a/metadata/service/exporter/configurable/exporter_test.go +++ b/metadata/service/exporter/configurable/exporter_test.go @@ -53,7 +53,7 @@ func TestConfigurableExporter(t *testing.T) { SessionName: "server", }}) mockInitProviderWithSingleRegistry() - metadataService := inmemory.NewMetadataService() + metadataService, _ := inmemory.NewMetadataService() exported := NewMetadataServiceExporter(metadataService) assert.Equal(t, false, exported.IsExported()) assert.NoError(t, exported.Export()) diff --git a/metadata/service/inmemory/service.go b/metadata/service/inmemory/service.go index f7b7466a3..4327304ad 100644 --- a/metadata/service/inmemory/service.go +++ b/metadata/service/inmemory/service.go @@ -19,6 +19,9 @@ package inmemory import ( "sort" "sync" + + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/config" ) import ( @@ -34,6 +37,10 @@ import ( "github.com/apache/dubbo-go/metadata/service" ) +func init() { + extension.SetMetadataService("local", NewMetadataService) +} + // version will be used by Version func const version = "1.0.0" @@ -47,13 +54,14 @@ type MetadataService struct { } // NewMetadataService: initiate a metadata service -func NewMetadataService() *MetadataService { +func NewMetadataService() (service.MetadataService, error) { return &MetadataService{ + BaseMetadataService: service.NewBaseMetadataService(config.GetApplicationConfig().Name), exportedServiceURLs: &sync.Map{}, subscribedServiceURLs: &sync.Map{}, serviceDefinitions: &sync.Map{}, lock: &sync.RWMutex{}, - } + }, nil } // Comparator is defined as Comparator for skip list to compare the URL @@ -228,11 +236,11 @@ func (mts *MetadataService) GetServiceDefinitionByServiceKey(serviceKey string) } // RefreshMetadata will always return true because it will be implement by remote service -func (mts *MetadataService) RefreshMetadata(exportedRevision string, subscribedRevision string) bool { - return true +func (mts *MetadataService) RefreshMetadata(exportedRevision string, subscribedRevision string) (bool, error) { + return true, nil } // Version will return the version of metadata service -func (mts *MetadataService) Version() string { - return version +func (mts *MetadataService) Version() (string, error) { + return version, nil } diff --git a/metadata/service/inmemory/service_test.go b/metadata/service/inmemory/service_test.go index fc0410ecc..048c286fd 100644 --- a/metadata/service/inmemory/service_test.go +++ b/metadata/service/inmemory/service_test.go @@ -32,7 +32,7 @@ import ( ) func TestMetadataService(t *testing.T) { - mts := NewMetadataService() + mts, _ := NewMetadataService() serviceName := "com.ikurento.user.UserProvider" group := "group1" version := "0.0.1" @@ -66,25 +66,20 @@ func TestMetadataService(t *testing.T) { assert.NoError(t, err) mts.ExportURL(u) list, _ := mts.GetExportedURLs(serviceName, group, version, protocol) - assert.Equal(t, uint64(3), list.Len()) - iter := list.IterAtPosition(0) - for iter.Next() { - comparator := iter.Value() - fmt.Println(comparator) - } + assert.Equal(t, 3, len(list)) mts.SubscribeURL(u) mts.SubscribeURL(u) list2, _ := mts.GetSubscribedURLs() - assert.Equal(t, uint64(1), list2.Len()) + assert.Equal(t, 1, len(list2)) mts.UnexportURL(u) list3, _ := mts.GetExportedURLs(serviceName, group, version, protocol) - assert.Equal(t, uint64(2), list3.Len()) + assert.Equal(t, 2, len(list3)) mts.UnsubscribeURL(u) list4, _ := mts.GetSubscribedURLs() - assert.Equal(t, uint64(0), list4.Len()) + assert.Equal(t, 0, len(list4)) userProvider := &definition.UserProvider{} common.ServiceMap.Register(serviceName, protocol, userProvider) diff --git a/metadata/service/remote/service.go b/metadata/service/remote/service.go index 6726b1000..2e8f66444 100644 --- a/metadata/service/remote/service.go +++ b/metadata/service/remote/service.go @@ -21,6 +21,8 @@ import ( "sync" "go.uber.org/atomic" + + "github.com/apache/dubbo-go/common/extension" ) import ( @@ -38,6 +40,10 @@ import ( // version will be used by Version func const version = "1.0.0" +func init() { + extension.SetMetadataService("remote", newMetadataService) +} + // MetadataService is a implement of metadata service which will delegate the remote metadata report // This is singleton type MetadataService struct { @@ -53,8 +59,8 @@ var ( metadataServiceInstance *MetadataService ) -// NewMetadataService will create a new remote MetadataService instance -func NewMetadataService() (*MetadataService, error) { +// newMetadataService will create a new remote MetadataService instance +func newMetadataService() (service.MetadataService, error) { var err error metadataServiceOnce.Do(func() { var mr *delegate.MetadataReport @@ -62,8 +68,11 @@ func NewMetadataService() (*MetadataService, error) { if err != nil { return } + // it will never return error + inms, _ := inmemory.NewMetadataService() metadataServiceInstance = &MetadataService{ - inMemoryMetadataService: inmemory.NewMetadataService(), + BaseMetadataService: service.NewBaseMetadataService(config.GetApplicationConfig().Name), + inMemoryMetadataService: inms.(*inmemory.MetadataService), delegateReport: mr, } }) @@ -140,14 +149,13 @@ func (mts *MetadataService) GetServiceDefinitionByServiceKey(serviceKey string) } // RefreshMetadata will refresh the exported & subscribed metadata to remote metadata report from the inmemory metadata service -func (mts *MetadataService) RefreshMetadata(exportedRevision string, subscribedRevision string) bool { - result := true +func (mts *MetadataService) RefreshMetadata(exportedRevision string, subscribedRevision string) (bool, error) { if len(exportedRevision) != 0 && exportedRevision != mts.exportedRevision.Load() { mts.exportedRevision.Store(exportedRevision) urls, err := mts.inMemoryMetadataService.GetExportedURLs(constant.ANY_VALUE, "", "", "") if err != nil { logger.Errorf("Error occur when execute remote.MetadataService.RefreshMetadata, error message is %+v", err) - result = false + return false, err } logger.Infof("urls length = %v", len(urls)) for _, u := range urls { @@ -155,7 +163,7 @@ func (mts *MetadataService) RefreshMetadata(exportedRevision string, subscribedR id.Revision = mts.exportedRevision.Load() if err := mts.delegateReport.SaveServiceMetadata(id, u); err != nil { logger.Errorf("Error occur when execute remote.MetadataService.RefreshMetadata, error message is %+v", err) - result = false + return false, err } } } @@ -165,7 +173,7 @@ func (mts *MetadataService) RefreshMetadata(exportedRevision string, subscribedR urls, err := mts.inMemoryMetadataService.GetSubscribedURLs() if err != nil { logger.Errorf("Error occur when execute remote.MetadataService.RefreshMetadata, error message is %v+", err) - result = false + return false, err } if urls != nil && len(urls) > 0 { id := &identifier.SubscriberMetadataIdentifier{ @@ -176,14 +184,14 @@ func (mts *MetadataService) RefreshMetadata(exportedRevision string, subscribedR } if err := mts.delegateReport.SaveSubscribedData(id, urls); err != nil { logger.Errorf("Error occur when execute remote.MetadataService.RefreshMetadata, error message is %+v", err) - result = false + return false, err } } } - return result + return true, nil } // Version will return the remote service version -func (MetadataService) Version() string { - return version +func (MetadataService) Version() (string, error) { + return version, nil } diff --git a/metadata/service/remote/service_test.go b/metadata/service/remote/service_test.go index 308c631e4..2bf1c4c6c 100644 --- a/metadata/service/remote/service_test.go +++ b/metadata/service/remote/service_test.go @@ -97,16 +97,16 @@ func TestMetadataService(t *testing.T) { "mock://127.0.0.1:20000/?sync.report=true")) assert.NoError(t, err) instance.GetMetadataReportInstance(&u) - mts, err := NewMetadataService() + mts, err := newMetadataService() assert.NoError(t, err) - mts.setInMemoryMetadataService(mockInmemoryProc(t)) + mts.(*MetadataService).setInMemoryMetadataService(mockInmemoryProc(t)) mts.RefreshMetadata("0.0.1", "0.0.1") assert.Equal(t, 1, len(serviceMetadata)) assert.Equal(t, 1, len(subscribedMetadata)) } func mockInmemoryProc(t *testing.T) *inmemory.MetadataService { - mts := inmemory.NewMetadataService() + mts, _ := inmemory.NewMetadataService() serviceName := "com.ikurento.user.UserProvider" group := "group1" version := "0.0.1" @@ -135,5 +135,5 @@ func mockInmemoryProc(t *testing.T) *inmemory.MetadataService { serviceKey := definition.ServiceDescriperBuild(serviceName, group, version) def2, _ := mts.GetServiceDefinitionByServiceKey(serviceKey) assert.Equal(t, expected, def2) - return mts + return mts.(*inmemory.MetadataService) } diff --git a/metadata/service/service.go b/metadata/service/service.go index af9528c68..e05b634f8 100644 --- a/metadata/service/service.go +++ b/metadata/service/service.go @@ -20,7 +20,6 @@ package service import ( "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/constant" - "github.com/apache/dubbo-go/config" ) // MetadataService is used to define meta data related behaviors @@ -50,18 +49,25 @@ type MetadataService interface { // GetServiceDefinition will get the target service info store in metadata by service key GetServiceDefinitionByServiceKey(serviceKey string) (string, error) // RefreshMetadata will refresh the metadata - RefreshMetadata(exportedRevision string, subscribedRevision string) bool + RefreshMetadata(exportedRevision string, subscribedRevision string) (bool, error) // Version will return the metadata service version - Version() string + Version() (string, error) } // BaseMetadataService is used for the event logic for struct who will implement interface MetadataService type BaseMetadataService struct { + serviceName string +} + +func NewBaseMetadataService(serviceName string) BaseMetadataService { + return BaseMetadataService{ + serviceName: serviceName, + } } // ServiceName can get the service's name in meta service , which is application name func (mts *BaseMetadataService) ServiceName() (string, error) { - return config.GetApplicationConfig().Name, nil + return mts.serviceName, nil } // Version will return the version of metadata service diff --git a/registry/event/event_publishing_service_discovery.go b/registry/event/event_publishing_service_discovery.go index 76fdf7237..496eb9b4a 100644 --- a/registry/event/event_publishing_service_discovery.go +++ b/registry/event/event_publishing_service_discovery.go @@ -20,6 +20,9 @@ package event import ( gxset "github.com/dubbogo/gost/container/set" gxpage "github.com/dubbogo/gost/page" + + "github.com/apache/dubbo-go/config" + "github.com/apache/dubbo-go/metadata/service" ) import ( @@ -139,3 +142,7 @@ func (epsd *EventPublishingServiceDiscovery) executeWithEvents(beforeEvent obser } return nil } + +func getMetadataService() (service.MetadataService, error) { + return extension.GetMetadataService(config.GetApplicationConfig().MetadataType) +} diff --git a/registry/event/log_event_listener_test.go b/registry/event/log_event_listener_test.go index e7c0ba0df..f142168b6 100644 --- a/registry/event/log_event_listener_test.go +++ b/registry/event/log_event_listener_test.go @@ -26,6 +26,5 @@ import ( func TestLogEventListener(t *testing.T) { l := &logEventListener{} assert.Equal(t, 0, l.GetPriority()) - assert.Nil(t, l.GetEventType()) assert.Nil(t, l.OnEvent(&ServiceDiscoveryDestroyedEvent{})) } diff --git a/registry/event/metadata_service_url_params_customizer.go b/registry/event/metadata_service_url_params_customizer.go index c9c636448..e5ff2bc26 100644 --- a/registry/event/metadata_service_url_params_customizer.go +++ b/registry/event/metadata_service_url_params_customizer.go @@ -26,7 +26,6 @@ import ( "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/common/logger" - "github.com/apache/dubbo-go/metadata/service/remote" "github.com/apache/dubbo-go/registry" ) @@ -52,17 +51,18 @@ func (m *metadataServiceURLParamsMetadataCustomizer) GetPriority() int { } func (m *metadataServiceURLParamsMetadataCustomizer) Customize(instance registry.ServiceInstance) { - ms, err := remote.NewMetadataService() + ms, err := getMetadataService() if err != nil { logger.Errorf("could not find the metadata service", err) return } serviceName := constant.METADATA_SERVICE_NAME - version := ms.Version() + // error always is nil + version, _ := ms.Version() group := instance.GetServiceName() urls, err := ms.GetExportedURLs(serviceName, group, version, constant.ANY_VALUE) if err != nil || len(urls) == 0 { - logger.Errorf("could not find the exported urls", err) + logger.Info("could not find the exported urls", err) return } ps := m.convertToParams(urls) diff --git a/registry/event/protocol_ports_metadata_customizer.go b/registry/event/protocol_ports_metadata_customizer.go index 975463fe3..be2e38119 100644 --- a/registry/event/protocol_ports_metadata_customizer.go +++ b/registry/event/protocol_ports_metadata_customizer.go @@ -23,7 +23,6 @@ import ( "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/logger" - "github.com/apache/dubbo-go/metadata/service/remote" "github.com/apache/dubbo-go/registry" ) @@ -38,7 +37,7 @@ func (p *ProtocolPortsMetadataCustomizer) GetPriority() int { // Customize will func (p *ProtocolPortsMetadataCustomizer) Customize(instance registry.ServiceInstance) { - metadataService, err := remote.NewMetadataService() + metadataService, err := getMetadataService() if err != nil { logger.Errorf("Could not init the MetadataService", err) return diff --git a/registry/event/service_config_exported_event.go b/registry/event/service_config_exported_event.go index 371f72d0d..c3d8eee92 100644 --- a/registry/event/service_config_exported_event.go +++ b/registry/event/service_config_exported_event.go @@ -18,11 +18,24 @@ package event import ( + "time" + "github.com/apache/dubbo-go/common/observer" "github.com/apache/dubbo-go/config" ) type ServiceConfigExportedEvent struct { observer.BaseEvent - ServiceConfig config.ServiceConfig + ServiceConfig *config.ServiceConfig +} + +func NewServiceConfigExportedEvent(serviceConfig *config.ServiceConfig) *ServiceConfigExportedEvent { + return &ServiceConfigExportedEvent{ + BaseEvent: observer.BaseEvent{ + Source:serviceConfig, + Timestamp:time.Now(), + }, + ServiceConfig: serviceConfig, + } } + diff --git a/registry/event/service_name_mapping_listener.go b/registry/event/service_name_mapping_listener.go index 9ec7392b1..480c51cc5 100644 --- a/registry/event/service_name_mapping_listener.go +++ b/registry/event/service_name_mapping_listener.go @@ -20,6 +20,9 @@ package event import ( "reflect" + perrors "github.com/pkg/errors" + + "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/common/observer" "github.com/apache/dubbo-go/metadata/mapping" @@ -43,6 +46,17 @@ func (s *serviceNameMappingListener) GetPriority() int { func (s *serviceNameMappingListener) OnEvent(e observer.Event) error { if ex, ok := e.(*ServiceConfigExportedEvent); ok { sc := ex.ServiceConfig + urls := sc.GetExportedUrls() + + for _, u := range urls { + err := s.nameMapping.Map(u.GetParam(constant.INTERFACE_KEY, ""), + u.GetParam(constant.GROUP_KEY, ""), + u.GetParam(constant.Version, ""), + u.Protocol) + if err != nil { + return perrors.WithMessage(err, "could not map the service: "+u.String()) + } + } } return nil } diff --git a/registry/event/service_revision_customizer.go b/registry/event/service_revision_customizer.go index 51475d06c..f51a95581 100644 --- a/registry/event/service_revision_customizer.go +++ b/registry/event/service_revision_customizer.go @@ -26,7 +26,6 @@ import ( "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/common/logger" - "github.com/apache/dubbo-go/metadata/service/remote" "github.com/apache/dubbo-go/registry" ) @@ -46,7 +45,7 @@ func (e *exportedServicesRevisionMetadataCustomizer) GetPriority() int { } func (e *exportedServicesRevisionMetadataCustomizer) Customize(instance registry.ServiceInstance) { - ms, err := remote.NewMetadataService() + ms, err := getMetadataService() if err != nil { logger.Errorf("could not get metadata service", err) return @@ -74,7 +73,7 @@ func (e *subscribedServicesRevisionMetadataCustomizer) GetPriority() int { } func (e *subscribedServicesRevisionMetadataCustomizer) Customize(instance registry.ServiceInstance) { - ms, err := remote.NewMetadataService() + ms, err := getMetadataService() if err != nil { logger.Errorf("could not get metadata service", err) return diff --git a/registry/servicediscovery/service_discovery_registry.go b/registry/servicediscovery/service_discovery_registry.go index 379578ddc..39b2b1c28 100644 --- a/registry/servicediscovery/service_discovery_registry.go +++ b/registry/servicediscovery/service_discovery_registry.go @@ -37,9 +37,10 @@ import ( "github.com/apache/dubbo-go/config" "github.com/apache/dubbo-go/metadata/mapping" "github.com/apache/dubbo-go/metadata/service" + "github.com/apache/dubbo-go/metadata/service/exporter/configurable" "github.com/apache/dubbo-go/metadata/service/remote" "github.com/apache/dubbo-go/registry" - registryCommon "github.com/apache/dubbo-go/registry/event" + "github.com/apache/dubbo-go/registry/event" "github.com/apache/dubbo-go/registry/servicediscovery/proxy" "github.com/apache/dubbo-go/registry/servicediscovery/synthesizer" "github.com/apache/dubbo-go/remoting" @@ -72,6 +73,15 @@ type serviceDiscoveryRegistry struct { } func newServiceDiscoveryRegistry(url *common.URL) (registry.Registry, error) { + + // the metadata service is exported in DubboBootstrap of Java Dubbo + // but I don't want to introduce similar structure because we has less logic to do + // so I codes the related logic here. + // If necessary we need to think about moving there codes to somewhere else. + + // init and expose metadata service + initMetadataService() + serviceDiscovery, err := creatServiceDiscovery(url) if err != nil { return nil, err @@ -119,7 +129,7 @@ func creatServiceDiscovery(url *common.URL) (registry.ServiceDiscovery, error) { if err != nil { return nil, perrors.WithMessage(err, "Create service discovery fialed") } - return registryCommon.NewEventPublishingServiceDiscovery(originServiceDiscovery), nil + return event.NewEventPublishingServiceDiscovery(originServiceDiscovery), nil } func parseServices(literalServices string) *gxset.HashSet { @@ -176,7 +186,14 @@ func (s *serviceDiscoveryRegistry) Register(url common.URL) error { if err != nil { return perrors.WithMessage(err, "could not create servcie instance, please check your service url") } - return s.serviceDiscovery.Register(ins) + err = s.serviceDiscovery.Register(ins) + if err != nil { + return perrors.WithMessage(err, "register the service failed") + } + return s.serviceNameMapping.Map(url.GetParam(constant.INTERFACE_KEY, ""), + url.GetParam(constant.GROUP_KEY, ""), + url.GetParam(constant.Version, ""), + url.Protocol) } func createInstance(url common.URL) (registry.ServiceInstance, error) { @@ -642,5 +659,17 @@ func (icn *InstanceChangeNotify) Notify(event observer.Event) { sdr := icn.serviceDiscoveryRegistry sdr.subscribe(sdr.url, icn.notify, se.ServiceName, se.Instances) } +} +func initMetadataService() { + ms, err := extension.GetMetadataService(config.GetApplicationConfig().MetadataType) + if err != nil { + logger.Errorf("could not init metadata service", err) + } + expt := configurable.NewMetadataServiceExporter(ms) + err = expt.Export() + if err != nil { + logger.Errorf("could not export the metadata service", err) + } + extension.GetGlobalDispatcher().Dispatch(event.NewServiceConfigExportedEvent(expt.(*configurable.MetadataServiceExporter).ServiceConfig)) } diff --git a/registry/servicediscovery/service_discovery_registry_test.go b/registry/servicediscovery/service_discovery_registry_test.go index 4aa52f55c..247cfd65e 100644 --- a/registry/servicediscovery/service_discovery_registry_test.go +++ b/registry/servicediscovery/service_discovery_registry_test.go @@ -28,10 +28,10 @@ var ( ) func TestServiceDiscoveryRegistry_Register(t *testing.T) { - //registryURL,_:=event.NewURL("in-memory://localhost:12345", + // registryURL,_:=event.NewURL("in-memory://localhost:12345", // event.WithParamsValue("registry-type","service"), // event.WithParamsValue("subscribed-services","a, b , c,d,e ,")) - //url,_:=event.NewURL("dubbo://192.168.0.102:20880/"+ SERVICE_INTERFACE + + // url,_:=event.NewURL("dubbo://192.168.0.102:20880/"+ SERVICE_INTERFACE + // "?&application=" + GROUP + // "&interface=" + SERVICE_INTERFACE + // "&group=" + GROUP + -- GitLab