diff --git a/config/config_loader.go b/config/config_loader.go index 0922a7a28569bdbc97a39d2f7b3f9d1e2673f004..b79c2e824988e2353b032d3ac1992740d0eca2b9 100644 --- a/config/config_loader.go +++ b/config/config_loader.go @@ -68,11 +68,15 @@ func init() { if errCon := ConsumerInit(confConFile); errCon != nil { log.Printf("[consumerInit] %#v", errCon) consumerConfig = nil + } else { + baseConfig = &consumerConfig.BaseConfig } if errPro := ProviderInit(confProFile); errPro != nil { log.Printf("[providerInit] %#v", errPro) providerConfig = nil + } else { + baseConfig = &providerConfig.BaseConfig } } @@ -282,7 +286,7 @@ func GetProviderConfig() ProviderConfig { defer configAccessMutex.Unlock() if providerConfig == nil { logger.Warnf("creating empty provider config. You should see this log only once.") - providerConfig = &ProviderConfig{} + return ProviderConfig{} } } return *providerConfig @@ -300,13 +304,14 @@ func GetConsumerConfig() ConsumerConfig { defer configAccessMutex.Unlock() if consumerConfig == nil { logger.Warnf("creating empty consumer config. You should see this log only once.") - consumerConfig = &ConsumerConfig{} + return ConsumerConfig{} } } return *consumerConfig } func GetBaseConfig() *BaseConfig { + if baseConfig == nil { baseConfigOnce.Do(func() { baseConfig = &BaseConfig{ diff --git a/config/metadata_report_config.go b/config/metadata_report_config.go index 41fb6b4769e59784d8d18c3f82b956fd029d4ff7..caf7c55c28a02cdfe1afcefa7861770b81944c92 100644 --- a/config/metadata_report_config.go +++ b/config/metadata_report_config.go @@ -34,13 +34,10 @@ import ( // MethodConfig ... type MetadataReportConfig struct { - Protocol string `required:"true" yaml:"protocol" json:"protocol,omitempty"` - Address string `yaml:"address" json:"address,omitempty" property:"address"` - Username string `yaml:"username" json:"username,omitempty" property:"username"` - Password string `yaml:"password" json:"password,omitempty" property:"password"` - Params map[string]string `yaml:"params" json:"params,omitempty" property:"params"` - TimeoutStr string `yaml:"timeout" default:"5s" json:"timeout,omitempty" property:"timeout"` // unit: second - Group string `yaml:"group" json:"group,omitempty" property:"group"` + Protocol string `required:"true" yaml:"protocol" json:"protocol,omitempty"` + RemoteRef string `required:"true" yaml:"remote_ref" json:"remote_ref,omitempty"` + Params map[string]string `yaml:"params" json:"params,omitempty" property:"params"` + Group string `yaml:"group" json:"group,omitempty" property:"group"` } // Prefix ... @@ -70,18 +67,24 @@ func (c *MetadataReportConfig) ToUrl() (*common.URL, error) { } } - url, err := common.NewURL(c.Address, + rc, ok := GetBaseConfig().GetRemoteConfig(c.RemoteRef) + + if !ok { + return nil, perrors.New("Could not find out the remote ref config, name: " + c.RemoteRef) + } + + res, err := common.NewURL(rc.Address, common.WithParams(urlMap), - common.WithUsername(c.Username), - common.WithPassword(c.Password), - common.WithLocation(c.Address), + common.WithUsername(rc.Username), + common.WithPassword(rc.Password), + common.WithLocation(rc.Address), common.WithProtocol(c.Protocol), ) - if err != nil || len(url.Protocol) == 0 { + if err != nil || len(res.Protocol) == 0 { return nil, perrors.New("Invalid MetadataReportConfig.") } - url.SetParam("metadata", url.Protocol) - return &url, nil + res.SetParam("metadata", res.Protocol) + return &res, nil } func (c *MetadataReportConfig) IsValid() bool { @@ -94,10 +97,8 @@ func startMetadataReport(metadataType string, metadataReportConfig *MetadataRepo return nil } - if metadataType == constant.METACONFIG_REMOTE { - return perrors.New("No MetadataConfig found, you must specify the remote Metadata Center address when 'metadata=remote' is enabled.") - } else if metadataType == constant.METACONFIG_REMOTE && len(metadataReportConfig.Address) == 0 { - return perrors.New("MetadataConfig address can not be empty.") + if metadataType == constant.METACONFIG_REMOTE && len(metadataReportConfig.RemoteRef) == 0 { + return perrors.New("MetadataConfig remote ref can not be empty.") } if url, err := metadataReportConfig.ToUrl(); err == nil { diff --git a/config/metadata_report_config_test.go b/config/metadata_report_config_test.go index 635feecc2d433366534566d184e058eb54a881ed..1c585ee79d58826b227df52574b3403639856306 100644 --- a/config/metadata_report_config_test.go +++ b/config/metadata_report_config_test.go @@ -24,12 +24,15 @@ import ( ) func TestMetadataReportConfig_ToUrl(t *testing.T) { - metadataReportConfig := MetadataReportConfig{ - Protocol: "mock", + GetBaseConfig().Remotes["mock"] = &RemoteConfig{ Address: "127.0.0.1:2181", Username: "test", Password: "test", TimeoutStr: "3s", + } + metadataReportConfig := MetadataReportConfig{ + Protocol: "mock", + RemoteRef: "mock", Params: map[string]string{ "k": "v", }, diff --git a/config/provider_config.go b/config/provider_config.go index 81f20e864d6e10424c66ee60f501bd41ec6db12f..f0d3c4cf7f76a7d57ed0cfe4b6def2965003b868 100644 --- a/config/provider_config.go +++ b/config/provider_config.go @@ -87,15 +87,15 @@ func ProviderInit(confProFile string) error { } providerConfig.fileStream = bytes.NewBuffer(fileStream) - //set method interfaceId & interfaceName + // set method interfaceId & interfaceName for k, v := range providerConfig.Services { - //set id for reference + // set id for reference for _, n := range providerConfig.Services[k].Methods { n.InterfaceName = v.InterfaceName n.InterfaceId = k } } - //start the metadata report if config set + // start the metadata report if config set if err := startMetadataReport(providerConfig.ApplicationConfig.MetadataType, providerConfig.MetadataReportConfig); err != nil { return perrors.WithMessagef(err, "Provider starts metadata report error, and the error is {%#v}", err) } @@ -105,7 +105,7 @@ func ProviderInit(confProFile string) error { } func configCenterRefreshProvider() error { - //fresh it + // fresh it if providerConfig.ConfigCenterConfig != nil { providerConfig.fatherConfig = providerConfig if err := providerConfig.startConfigCenter(); err != nil { diff --git a/config/remote_config.go b/config/remote_config.go index ed9dab37a43954ad1fd5061748a0419d21d4304d..4d17cc1c41c8033d550be7b2f0159485eabcec62 100644 --- a/config/remote_config.go +++ b/config/remote_config.go @@ -19,12 +19,24 @@ package config import ( "time" + + "github.com/apache/dubbo-go/common/logger" ) type RemoteConfig struct { - Address string `yaml:"address" json:"address,omitempty"` - Timeout time.Duration `default:"10s" yaml:"timeout" json:"timeout,omitempty"` - Params map[string]string `yaml:"params" json:"address,omitempty"` + Address string `yaml:"address" json:"address,omitempty"` + TimeoutStr string `default:"5s" yaml:"timeout" json:"timeout,omitempty"` + Username string `yaml:"username" json:"username,omitempty" property:"username"` + Password string `yaml:"password" json:"password,omitempty" property:"password"` + Params map[string]string `yaml:"params" json:"address,omitempty"` +} + +func (rc *RemoteConfig) Timeout() time.Duration { + if res, err := time.ParseDuration(rc.TimeoutStr); err == nil { + return res + } + logger.Errorf("Could not parse the timeout string to Duration: %s, the default value will be returned", rc.TimeoutStr) + return 5 * time.Second } // GetParam will return the value of the key. If not found, def will be return; diff --git a/metadata/mapping/dynamic/service_name_mapping.go b/metadata/mapping/dynamic/service_name_mapping.go index 49b7747065a06ea5b9c4aab06224e6aa5b11d564..3d502b618481638e90cca10e437b1a0f325eb791 100644 --- a/metadata/mapping/dynamic/service_name_mapping.go +++ b/metadata/mapping/dynamic/service_name_mapping.go @@ -18,6 +18,7 @@ package dynamic import ( + "github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/metadata/mapping" "strconv" "sync" @@ -41,6 +42,10 @@ const ( slash = "/" ) +func init() { + extension.SetServiceNameMapping("dynamic", GetServiceNameMappingInstance) +} + // DynamicConfigurationServiceNameMapping is the implementation based on config center type DynamicConfigurationServiceNameMapping struct { dc config_center.DynamicConfiguration diff --git a/metadata/report/delegate/delegate_report.go b/metadata/report/delegate/delegate_report.go index cb7e42030b2dec32b0537b20e2f825e638f228d0..4bc781a7f54a19c789dd8a0f7bd2f13bf09fc353 100644 --- a/metadata/report/delegate/delegate_report.go +++ b/metadata/report/delegate/delegate_report.go @@ -130,7 +130,7 @@ func NewMetadataReport() (*MetadataReport, error) { scheduler := gocron.NewScheduler(time.UTC) _, err := scheduler.Every(1).Day().Do( func() { - logger.Info("start to publish all metadata in metadata report %v.", url) + logger.Info("start to publish all metadata in metadata report %s.", url.String()) bmr.allMetadataReportsLock.RLock() bmr.doHandlerMetadataCollection(bmr.allMetadataReports) bmr.allMetadataReportsLock.RUnlock() diff --git a/metadata/service/remote/service.go b/metadata/service/remote/service.go index f55c482ad846d801e57e2a98436161c6c70165c4..cd8a2dc837caaef341d34a720070ce4e4e6f6bcd 100644 --- a/metadata/service/remote/service.go +++ b/metadata/service/remote/service.go @@ -65,7 +65,7 @@ func (mts *MetadataService) setInMemoryMetadataService(metadata *inmemory.Metada // ExportURL will be implemented by in memory service func (mts *MetadataService) ExportURL(url common.URL) (bool, error) { - return true, nil + return mts.inMemoryMetadataService.ExportURL(url) } // UnexportURL @@ -76,13 +76,13 @@ func (mts *MetadataService) UnexportURL(url common.URL) error { } // SubscribeURL will be implemented by in memory service -func (MetadataService) SubscribeURL(url common.URL) (bool, error) { - return true, nil +func (mts *MetadataService) SubscribeURL(url common.URL) (bool, error) { + return mts.inMemoryMetadataService.SubscribeURL(url) } // UnsubscribeURL will be implemented by in memory service -func (MetadataService) UnsubscribeURL(url common.URL) error { - return nil +func (mts *MetadataService) UnsubscribeURL(url common.URL) error { + return mts.UnsubscribeURL(url) } // PublishServiceDefinition will call remote metadata's StoreProviderMetadata to store url info and service definition diff --git a/registry/nacos/service_discovery_test.go b/registry/nacos/service_discovery_test.go index ca0f24976e988af63297c524101c24d40e55b1f8..633a1d41c81ed88b40db3a637333e6795abed871 100644 --- a/registry/nacos/service_discovery_test.go +++ b/registry/nacos/service_discovery_test.go @@ -19,7 +19,6 @@ package nacos import ( "testing" - "time" "github.com/apache/dubbo-go/config" ) @@ -59,8 +58,8 @@ func Test_newNacosServiceDiscovery(t *testing.T) { assert.NotNil(t, err) config.GetBaseConfig().Remotes["mock"] = &config.RemoteConfig{ - Address: "console.nacos.io:80", - Timeout: 10 * time.Second, + Address: "console.nacos.io:80", + TimeoutStr: "10s", } res, err := newNacosServiceDiscovery(name) @@ -168,7 +167,7 @@ func prepareData() { } config.GetBaseConfig().Remotes[testName] = &config.RemoteConfig{ - Address: "console.nacos.io:80", - Timeout: 10 * time.Second, + Address: "console.nacos.io:80", + TimeoutStr: "10s", } } diff --git a/registry/servicediscovery/service_discovery_registry.go b/registry/servicediscovery/service_discovery_registry.go index 6b25bc60ef1e8cae5ed0e0c1ae3cbe492df1c107..33d22e7339cb98552ec11c08b34c32ed444263c7 100644 --- a/registry/servicediscovery/service_discovery_registry.go +++ b/registry/servicediscovery/service_discovery_registry.go @@ -33,9 +33,10 @@ import ( "github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/common/logger" "github.com/apache/dubbo-go/common/observer" + "github.com/apache/dubbo-go/config" "github.com/apache/dubbo-go/metadata/mapping" "github.com/apache/dubbo-go/metadata/service" - "github.com/apache/dubbo-go/metadata/service/inmemory" + "github.com/apache/dubbo-go/metadata/service/remote" "github.com/apache/dubbo-go/registry" "github.com/apache/dubbo-go/registry/servicediscovery/proxy" "github.com/apache/dubbo-go/registry/servicediscovery/synthesizer" @@ -76,8 +77,10 @@ func newServiceDiscoveryRegistry(url *common.URL) (registry.Registry, error) { subscribedServices := parseServices(url.GetParam(constant.SUBSCRIBED_SERVICE_NAMES_KEY, "")) subscribedURLsSynthesizers := synthesizer.GetAllSynthesizer() serviceNameMapping := extension.GetServiceNameMapping(url.GetParam(constant.SERVICE_NAME_MAPPING_KEY, "")) - // TODO it's need to get implement by factory - metaDataService := inmemory.NewMetadataService() + metaDataService, err := remote.NewMetadataService() + if err != nil { + return nil, perrors.WithMessage(err, "could not init metadata service") + } return &serviceDiscoveryRegistry{ url: url, serviceDiscovery: serviceDiscovery, @@ -91,15 +94,26 @@ func newServiceDiscoveryRegistry(url *common.URL) (registry.Registry, error) { } func (s *serviceDiscoveryRegistry) UnRegister(url common.URL) error { - panic("implement me") + if !shouldRegister(url) { + return nil + } + return s.metaDataService.UnexportURL(url) } -func (s *serviceDiscoveryRegistry) UnSubscribe(*common.URL, registry.NotifyListener) error { - panic("implement me") +func (s *serviceDiscoveryRegistry) UnSubscribe(url *common.URL, listener registry.NotifyListener) error { + if !shouldSubscribe(*url) { + return nil + } + return s.metaDataService.UnsubscribeURL(*url) } func creatServiceDiscovery(url *common.URL) (registry.ServiceDiscovery, error) { - return extension.GetServiceDiscovery(url.Protocol, "TODO") + sdcName := url.GetParam(constant.SERVICE_DISCOVERY_KEY, "") + sdc, ok := config.GetBaseConfig().GetServiceDiscoveries(sdcName) + if !ok { + return nil, perrors.Errorf("The service discovery with name: %s is not found", sdcName) + } + return extension.GetServiceDiscovery(sdc.Protocol, sdcName) } func parseServices(literalServices string) *gxset.HashSet { @@ -125,6 +139,7 @@ func (s *serviceDiscoveryRegistry) GetUrl() common.URL { } func (s *serviceDiscoveryRegistry) IsAvailable() bool { + // TODO(whether available depends on metadata service and service discovery) return true } diff --git a/remoting/nacos/builder.go b/remoting/nacos/builder.go index 170f23f60cfb0ed12e311dd6011c3a177e7f93f6..16325f88cc7af959f5a129d158c54cf9db5b81f6 100644 --- a/remoting/nacos/builder.go +++ b/remoting/nacos/builder.go @@ -118,7 +118,7 @@ func NewNacosClient(rc *config.RemoteConfig) (naming_client.INamingClient, error configMap["serverConfigs"] = serverConfigs var clientConfig nacosConstant.ClientConfig - timeout := rc.Timeout + timeout := rc.Timeout() clientConfig.TimeoutMs = uint64(timeout.Nanoseconds() / constant.MsToNanoRate) clientConfig.ListenInterval = 2 * clientConfig.TimeoutMs clientConfig.CacheDir = rc.GetParam(constant.NACOS_CACHE_DIR_KEY, "") diff --git a/remoting/nacos/builder_test.go b/remoting/nacos/builder_test.go index bbfadef71bf1fe98446dbe0eaa34540b3504ba52..61d13ef26f9f1d17173bbeb11468f9babdade2f5 100644 --- a/remoting/nacos/builder_test.go +++ b/remoting/nacos/builder_test.go @@ -19,7 +19,6 @@ package nacos import ( "testing" - "time" ) import ( @@ -43,7 +42,7 @@ func TestNewNacosClient(t *testing.T) { assert.NotNil(t, err) rc.Address = "console.nacos.io:80" - rc.Timeout = 10 * time.Second + rc.TimeoutStr = "10s" client, err = NewNacosClient(rc) assert.NotNil(t, client) assert.Nil(t, err)