diff --git a/README_CN.md b/README_CN.md index 552685c7bba9888e089f635f5812d93355e31715..b76d8983deae427f9317c4f930f0e06da479f484 100644 --- a/README_CN.md +++ b/README_CN.md @@ -180,7 +180,7 @@ go test ./... -coverprofile=coverage.txt -covermode=atomic ## [User List](https://github.com/apache/dubbo-go/issues/2) -若你正在使用 [apache/dubbo-go](github.com/apache/dubbo-go) 且认为其有用或者向对其做改进,请忝列贵司信息于 [用户列表](https://github.com/apache/dubbo-go/issues/2),以便我们知晓之。 +若你正在使用 [apache/dubbo-go](github.com/apache/dubbo-go) 且认为其有用或者想对其做改进,请添列贵司信息于 [用户列表](https://github.com/apache/dubbo-go/issues/2),以便我们知晓。 <div> <table> diff --git a/cluster/router/tag/router_rule.go b/cluster/router/tag/router_rule.go index 5ca9281a2d677d7c34be73a088dafe17a1b30e9e..78479c0f1d4c2468c73083ec1f324f4ec86e31e2 100644 --- a/cluster/router/tag/router_rule.go +++ b/cluster/router/tag/router_rule.go @@ -56,7 +56,7 @@ func getRule(rawRule string) (*RouterRule, error) { return r, nil } -// init use for flatten yaml tags data to @addressToTagNames and @tagNameToAddresses +// init use for flattening tags data to @addressToTagNames and @tagNameToAddresses func (t *RouterRule) init() { t.addressToTagNames = make(map[string][]string, 2*len(t.Tags)) t.tagNameToAddresses = make(map[string][]string, len(t.Tags)) @@ -68,7 +68,6 @@ func (t *RouterRule) init() { } } -// getAddresses gets all tag addresses func (t *RouterRule) getAddresses() []string { var result = make([]string, 0, 2*len(t.Tags)) for _, tag := range t.Tags { @@ -77,7 +76,6 @@ func (t *RouterRule) getAddresses() []string { return result } -// getTagNames gets all tag names func (t *RouterRule) getTagNames() []string { var result = make([]string, 0, len(t.Tags)) for _, tag := range t.Tags { diff --git a/cluster/router/tag/router_rule_test.go b/cluster/router/tag/router_rule_test.go index 4e0f5b729ee3b74c60ff0e97b98ea0b26763bb89..2df65193f9d0cf607258f3080e22b42cd6e9b16a 100644 --- a/cluster/router/tag/router_rule_test.go +++ b/cluster/router/tag/router_rule_test.go @@ -22,56 +22,19 @@ import ( ) import ( - "github.com/stretchr/testify/suite" + "github.com/stretchr/testify/assert" ) -type RuleTestSuite struct { - suite.Suite - rule *RouterRule -} - -func (suite *RuleTestSuite) SetupTest() { - var err error +func TestGetRule(t *testing.T) { yml := ` scope: application +runtime: true force: true -runtime: false -enabled: true -priority: 1 -key: demo-provider -tags: - - name: tag1 - addresses: [ip1, ip2] - - name: tag2 - addresses: [ip3, ip4] ` - suite.rule, err = getRule(yml) - suite.Nil(err) -} - -func (suite *RuleTestSuite) TestGetRule() { - var err error - suite.Equal(true, suite.rule.Force) - suite.Equal(false, suite.rule.Runtime) - suite.Equal("application", suite.rule.Scope) - suite.Equal(1, suite.rule.Priority) - suite.Equal("demo-provider", suite.rule.Key) - suite.Nil(err) -} - -func (suite *RuleTestSuite) TestGetTagNames() { - suite.Equal([]string{"tag1", "tag2"}, suite.rule.getTagNames()) -} - -func (suite *RuleTestSuite) TestGetAddresses() { - suite.Equal([]string{"ip1", "ip2", "ip3", "ip4"}, suite.rule.getAddresses()) -} - -func (suite *RuleTestSuite) TestHasTag() { - suite.Equal(true, suite.rule.hasTag("tag1")) - suite.Equal(false, suite.rule.hasTag("tag404")) -} - -func TestRuleTestSuite(t *testing.T) { - suite.Run(t, new(RuleTestSuite)) + rule, e := getRule(yml) + assert.Nil(t, e) + assert.NotNil(t, rule) + assert.Equal(t, true, rule.Force) + assert.Equal(t, true, rule.Runtime) + assert.Equal(t, "application", rule.Scope) } diff --git a/cluster/router/tag/tag_router.go b/cluster/router/tag/tag_router.go index 07ca8f1136883779ea4a1247a9be5a2a46a6bd87..ef222ac9634124d3156b616bf94c7e071e2a5b90 100644 --- a/cluster/router/tag/tag_router.go +++ b/cluster/router/tag/tag_router.go @@ -64,10 +64,6 @@ func (c *tagRouter) isEnabled() bool { return c.enabled } -func (c *tagRouter) SetApplication(app string) { - c.application = app -} - func (c *tagRouter) tagRouterRuleCopy() RouterRule { routerRule := *c.tagRouterRule return routerRule @@ -79,6 +75,7 @@ func (c *tagRouter) Route(invokers []protocol.Invoker, url *common.URL, invocati result []protocol.Invoker addresses []string ) + if !c.isEnabled() || len(invokers) == 0 { return invokers } @@ -191,7 +188,7 @@ func (c *tagRouter) Priority() int64 { return c.priority } -// filterUsingStaticTag gets a list of invoker using static tag, If there's no dynamic tag rule being set, use static tag in URL +// filterUsingStaticTag gets a list of invoker using static tag func filterUsingStaticTag(invokers []protocol.Invoker, url *common.URL, invocation protocol.Invocation) []protocol.Invoker { if tag, ok := invocation.Attachments()[constant.Tagkey]; ok { result := make([]protocol.Invoker, 0, 8) diff --git a/cluster/router/tag/tag_router_test.go b/cluster/router/tag/tag_router_test.go index 5ea28c3799d17aaac5da4c994e9a614941e412b6..6b9a5e08f133403565d8b2dd38e03a789c5e95e7 100644 --- a/cluster/router/tag/tag_router_test.go +++ b/cluster/router/tag/tag_router_test.go @@ -70,7 +70,8 @@ const ( ) var ( - zkFormat = "zookeeper://%s:%d" + zkFormat = "zookeeper://%s:%d" + conditionFormat = "condition://%s/com.foo.BarService" ) // MockInvoker is only mock the Invoker to support test tagRouter diff --git a/common/constant/default.go b/common/constant/default.go index c1c404e089ea90899d2b599b01cd5980c3e92ab1..629aa32392a0151046eaaea67287618eae02158d 100644 --- a/common/constant/default.go +++ b/common/constant/default.go @@ -37,6 +37,7 @@ const ( DEFAULT_RETRIES_INT = 2 DEFAULT_PROTOCOL = "dubbo" DEFAULT_REG_TIMEOUT = "10s" + DEFAULT_REG_TTL = "15m" DEFAULT_CLUSTER = "failover" DEFAULT_FAILBACK_TIMES = "3" DEFAULT_FAILBACK_TIMES_INT = 3 diff --git a/common/constant/key.go b/common/constant/key.go index a86e797ae5af1ca7f8aeab82d10fb85e661451b5..72072ddb155edeff1ca05fbfe6ae132f0e5576e9 100644 --- a/common/constant/key.go +++ b/common/constant/key.go @@ -97,6 +97,7 @@ const ( ROLE_KEY = "registry.role" REGISTRY_DEFAULT_KEY = "registry.default" REGISTRY_TIMEOUT_KEY = "registry.timeout" + REGISTRY_TTL_KEY = "registry.ttl" ) const ( diff --git a/config/base_config.go b/config/base_config.go index 0ba5bc7ef98cb30a13890b93a659c467adcbf73b..22a0832731daff6c9957d4913a3784c9b268b11f 100644 --- a/config/base_config.go +++ b/config/base_config.go @@ -29,11 +29,8 @@ import ( ) import ( - "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/config" - "github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/common/logger" - "github.com/apache/dubbo-go/config_center" ) type multiConfiger interface { @@ -52,7 +49,6 @@ type BaseConfig struct { // application config ApplicationConfig *ApplicationConfig `yaml:"application" json:"application,omitempty" property:"application"` - configCenterUrl *common.URL prefix string fatherConfig interface{} EventDispatcherType string `default:"direct" yaml:"event_dispatcher_type" json:"event_dispatcher_type,omitempty"` @@ -72,74 +68,6 @@ func (c *BaseConfig) GetRemoteConfig(name string) (config *RemoteConfig, ok bool return } -// startConfigCenter will start the config center. -// it will prepare the environment -func (c *BaseConfig) startConfigCenter() error { - url, err := common.NewURL(c.ConfigCenterConfig.Address, - common.WithProtocol(c.ConfigCenterConfig.Protocol), common.WithParams(c.ConfigCenterConfig.GetUrlMap())) - if err != nil { - return err - } - c.configCenterUrl = &url - if c.prepareEnvironment() != nil { - return perrors.WithMessagef(err, "start config center error!") - } - // c.fresh() - return err -} - -func (c *BaseConfig) prepareEnvironment() error { - factory := extension.GetConfigCenterFactory(c.ConfigCenterConfig.Protocol) - dynamicConfig, err := factory.GetDynamicConfiguration(c.configCenterUrl) - config.GetEnvInstance().SetDynamicConfiguration(dynamicConfig) - if err != nil { - logger.Errorf("Get dynamic configuration error , error message is %v", err) - return perrors.WithStack(err) - } - content, err := dynamicConfig.GetProperties(c.ConfigCenterConfig.ConfigFile, config_center.WithGroup(c.ConfigCenterConfig.Group)) - if err != nil { - logger.Errorf("Get config content in dynamic configuration error , error message is %v", err) - return perrors.WithStack(err) - } - var appGroup string - var appContent string - if providerConfig != nil && providerConfig.ApplicationConfig != nil && - reflect.ValueOf(c.fatherConfig).Elem().Type().Name() == "ProviderConfig" { - appGroup = providerConfig.ApplicationConfig.Name - } else if consumerConfig != nil && consumerConfig.ApplicationConfig != nil && - reflect.ValueOf(c.fatherConfig).Elem().Type().Name() == "ConsumerConfig" { - appGroup = consumerConfig.ApplicationConfig.Name - } - - if len(appGroup) != 0 { - configFile := c.ConfigCenterConfig.AppConfigFile - if len(configFile) == 0 { - configFile = c.ConfigCenterConfig.ConfigFile - } - appContent, err = dynamicConfig.GetProperties(configFile, config_center.WithGroup(appGroup)) - if err != nil { - return perrors.WithStack(err) - } - } - // global config file - mapContent, err := dynamicConfig.Parser().Parse(content) - if err != nil { - return perrors.WithStack(err) - } - config.GetEnvInstance().UpdateExternalConfigMap(mapContent) - - // appGroup config file - if len(appContent) != 0 { - appMapConent, err := dynamicConfig.Parser().Parse(appContent) - if err != nil { - return perrors.WithStack(err) - } - config.GetEnvInstance().UpdateAppExternalConfigMap(appMapConent) - } - - return nil -} - func getKeyPrefix(val reflect.Value) []string { var ( prefix string diff --git a/config/base_config_test.go b/config/base_config_test.go index 6db6a8dcb84de3fdefe94cce87338b9efe28246c..849a9c4586c0c8cd2d74e3dd1011aaab466f0e93 100644 --- a/config/base_config_test.go +++ b/config/base_config_test.go @@ -28,8 +28,6 @@ import ( import ( "github.com/apache/dubbo-go/common/config" - "github.com/apache/dubbo-go/common/extension" - "github.com/apache/dubbo-go/config_center" _ "github.com/apache/dubbo-go/config_center/apollo" ) @@ -282,23 +280,6 @@ func TestRefreshProvider(t *testing.T) { assert.Equal(t, "20001", father.Protocols["jsonrpc1"].Port) } -func TestStartConfigCenter(t *testing.T) { - extension.SetConfigCenterFactory("mock", func() config_center.DynamicConfigurationFactory { - return &config_center.MockDynamicConfigurationFactory{} - }) - c := &BaseConfig{ConfigCenterConfig: &ConfigCenterConfig{ - Protocol: "mock", - Address: "172.0.0.1", - Group: "dubbo", - ConfigFile: "mockDubbo.properties", - }} - err := c.startConfigCenter() - assert.NoError(t, err) - b, v := config.GetEnvInstance().Configuration().Back().Value.(*config.InmemoryConfiguration).GetProperty("dubbo.application.organization") - assert.True(t, b) - assert.Equal(t, "ikurento.com", v) -} - func TestInitializeStruct(t *testing.T) { testConsumerConfig := &ConsumerConfig{} tp := reflect.TypeOf(ConsumerConfig{}) diff --git a/config/config_center_config.go b/config/config_center_config.go index c9133dc26df0b05e3bb61df0f612d0e2914e98bb..0fc4007940d9b1ac2456c9b2d379493bb5d8edb0 100644 --- a/config/config_center_config.go +++ b/config/config_center_config.go @@ -20,6 +20,7 @@ package config import ( "context" "net/url" + "reflect" "time" ) @@ -28,7 +29,13 @@ import ( ) import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/config" "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/common/logger" + "github.com/apache/dubbo-go/config_center" + perrors "github.com/pkg/errors" ) // ConfigCenterConfig is configuration for config center @@ -52,6 +59,7 @@ type ConfigCenterConfig struct { AppConfigFile string `default:"dubbo.properties" yaml:"app_config_file" json:"app_config_file,omitempty"` AppId string `default:"dubbo" yaml:"app_id" json:"app_id,omitempty"` TimeoutStr string `yaml:"timeout" json:"timeout,omitempty"` + RemoteRef string `required:"false" yaml:"remote_ref" json:"remote_ref,omitempty"` timeout time.Duration } @@ -77,3 +85,94 @@ func (c *ConfigCenterConfig) GetUrlMap() url.Values { urlMap.Set(constant.CONFIG_LOG_DIR_KEY, c.LogDir) return urlMap } + +type configCenter struct { +} + +// toURL will compatible with baseConfig.ConfigCenterConfig.Address and baseConfig.ConfigCenterConfig.RemoteRef before 1.6.0 +// After 1.6.0 will not compatible, only baseConfig.ConfigCenterConfig.RemoteRef +func (b *configCenter) toURL(baseConfig BaseConfig) (common.URL, error) { + if len(baseConfig.ConfigCenterConfig.Address) > 0 { + return common.NewURL(baseConfig.ConfigCenterConfig.Address, + common.WithProtocol(baseConfig.ConfigCenterConfig.Protocol), common.WithParams(baseConfig.ConfigCenterConfig.GetUrlMap())) + } + + remoteRef := baseConfig.ConfigCenterConfig.RemoteRef + rc, ok := baseConfig.GetRemoteConfig(remoteRef) + + if !ok { + return common.URL{}, perrors.New("Could not find out the remote ref config, name: " + remoteRef) + } + + newURL, err := rc.toURL() + if err == nil { + newURL.SetParams(baseConfig.ConfigCenterConfig.GetUrlMap()) + } + return newURL, err +} + +// startConfigCenter will start the config center. +// it will prepare the environment +func (b *configCenter) startConfigCenter(baseConfig BaseConfig) error { + url, err := b.toURL(baseConfig) + if err != nil { + return err + } + if err = b.prepareEnvironment(baseConfig, &url); err != nil { + return perrors.WithMessagef(err, "start config center error!") + } + // c.fresh() + return nil +} + +func (b *configCenter) prepareEnvironment(baseConfig BaseConfig, configCenterUrl *common.URL) error { + factory := extension.GetConfigCenterFactory(configCenterUrl.Protocol) + dynamicConfig, err := factory.GetDynamicConfiguration(configCenterUrl) + if err != nil { + logger.Errorf("Get dynamic configuration error , error message is %v", err) + return perrors.WithStack(err) + } + config.GetEnvInstance().SetDynamicConfiguration(dynamicConfig) + content, err := dynamicConfig.GetProperties(baseConfig.ConfigCenterConfig.ConfigFile, config_center.WithGroup(baseConfig.ConfigCenterConfig.Group)) + if err != nil { + logger.Errorf("Get config content in dynamic configuration error , error message is %v", err) + return perrors.WithStack(err) + } + var appGroup string + var appContent string + if providerConfig != nil && providerConfig.ApplicationConfig != nil && + reflect.ValueOf(baseConfig.fatherConfig).Elem().Type().Name() == "ProviderConfig" { + appGroup = providerConfig.ApplicationConfig.Name + } else if consumerConfig != nil && consumerConfig.ApplicationConfig != nil && + reflect.ValueOf(baseConfig.fatherConfig).Elem().Type().Name() == "ConsumerConfig" { + appGroup = consumerConfig.ApplicationConfig.Name + } + + if len(appGroup) != 0 { + configFile := baseConfig.ConfigCenterConfig.AppConfigFile + if len(configFile) == 0 { + configFile = baseConfig.ConfigCenterConfig.ConfigFile + } + appContent, err = dynamicConfig.GetProperties(configFile, config_center.WithGroup(appGroup)) + if err != nil { + return perrors.WithStack(err) + } + } + // global config file + mapContent, err := dynamicConfig.Parser().Parse(content) + if err != nil { + return perrors.WithStack(err) + } + config.GetEnvInstance().UpdateExternalConfigMap(mapContent) + + // appGroup config file + if len(appContent) != 0 { + appMapConent, err := dynamicConfig.Parser().Parse(appContent) + if err != nil { + return perrors.WithStack(err) + } + config.GetEnvInstance().UpdateAppExternalConfigMap(appMapConent) + } + + return nil +} diff --git a/config/config_center_config_test.go b/config/config_center_config_test.go new file mode 100644 index 0000000000000000000000000000000000000000..2299167bb62a019f6854ea48d61442f5bde7e646 --- /dev/null +++ b/config/config_center_config_test.go @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package config + +import ( + "testing" +) + +import ( + "github.com/stretchr/testify/assert" +) + +import ( + "github.com/apache/dubbo-go/common/config" + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/config_center" +) + +func TestStartConfigCenter(t *testing.T) { + extension.SetConfigCenterFactory("mock", func() config_center.DynamicConfigurationFactory { + return &config_center.MockDynamicConfigurationFactory{} + }) + baseConfig := &BaseConfig{ConfigCenterConfig: &ConfigCenterConfig{ + Protocol: "mock", + Address: "172.0.0.1", + Group: "dubbo", + ConfigFile: "mockDubbo.properties", + }} + + c := &configCenter{} + err := c.startConfigCenter(*baseConfig) + assert.NoError(t, err) + b, v := config.GetEnvInstance().Configuration().Back().Value.(*config.InmemoryConfiguration).GetProperty("dubbo.application.organization") + assert.True(t, b) + assert.Equal(t, "ikurento.com", v) +} + +func TestStartConfigCenterWithRemoteRef(t *testing.T) { + extension.SetConfigCenterFactory("mock", func() config_center.DynamicConfigurationFactory { + return &config_center.MockDynamicConfigurationFactory{} + }) + m := make(map[string]*RemoteConfig) + m["mock"] = &RemoteConfig{Protocol: "mock", Address: "172.0.0.1"} + baseConfig := &BaseConfig{ + Remotes: m, + ConfigCenterConfig: &ConfigCenterConfig{ + Group: "dubbo", + RemoteRef: "mock", + ConfigFile: "mockDubbo.properties", + }} + + c := &configCenter{} + err := c.startConfigCenter(*baseConfig) + assert.NoError(t, err) + b, v := config.GetEnvInstance().Configuration().Back().Value.(*config.InmemoryConfiguration).GetProperty("dubbo.application.organization") + assert.True(t, b) + assert.Equal(t, "ikurento.com", v) +} + +func TestStartConfigCenterWithRemoteRefError(t *testing.T) { + extension.SetConfigCenterFactory("mock", func() config_center.DynamicConfigurationFactory { + return &config_center.MockDynamicConfigurationFactory{} + }) + m := make(map[string]*RemoteConfig) + m["mock"] = &RemoteConfig{Address: "172.0.0.1"} + baseConfig := &BaseConfig{ + Remotes: m, + ConfigCenterConfig: &ConfigCenterConfig{ + Protocol: "mock", + Group: "dubbo", + RemoteRef: "mock", + ConfigFile: "mockDubbo.properties", + }} + + c := &configCenter{} + err := c.startConfigCenter(*baseConfig) + assert.Error(t, err) +} diff --git a/config/consumer_config.go b/config/consumer_config.go index 48f29f0e70028a7c057ee3831b45afa72446f3d0..177531209225b7a4b25751352d9777c0d4ee260c 100644 --- a/config/consumer_config.go +++ b/config/consumer_config.go @@ -41,7 +41,8 @@ import ( // ConsumerConfig is Consumer default configuration type ConsumerConfig struct { BaseConfig `yaml:",inline"` - Filter string `yaml:"filter" json:"filter,omitempty" property:"filter"` + configCenter + Filter string `yaml:"filter" json:"filter,omitempty" property:"filter"` // client Connect_Timeout string `default:"100ms" yaml:"connect_timeout" json:"connect_timeout,omitempty" property:"connect_timeout"` ConnectTimeout time.Duration @@ -125,13 +126,6 @@ func ConsumerInit(confConFile string) error { func configCenterRefreshConsumer() error { //fresh it var err error - if consumerConfig.ConfigCenterConfig != nil { - consumerConfig.SetFatherConfig(consumerConfig) - if err = consumerConfig.startConfigCenter(); err != nil { - return perrors.Errorf("start config center error , error message is {%v}", perrors.WithStack(err)) - } - consumerConfig.fresh() - } if consumerConfig.Request_Timeout != "" { if consumerConfig.RequestTimeout, err = time.ParseDuration(consumerConfig.Request_Timeout); err != nil { return perrors.WithMessagef(err, "time.ParseDuration(Request_Timeout{%#v})", consumerConfig.Request_Timeout) @@ -142,5 +136,12 @@ func configCenterRefreshConsumer() error { return perrors.WithMessagef(err, "time.ParseDuration(Connect_Timeout{%#v})", consumerConfig.Connect_Timeout) } } + if consumerConfig.ConfigCenterConfig != nil { + consumerConfig.SetFatherConfig(consumerConfig) + if err = consumerConfig.startConfigCenter((*consumerConfig).BaseConfig); err != nil { + return perrors.Errorf("start config center error , error message is {%v}", perrors.WithStack(err)) + } + consumerConfig.fresh() + } return nil } diff --git a/config/provider_config.go b/config/provider_config.go index 7cd3c1e98bfb8c35abb2b414b782ec709d0a8d0d..c710e48dc233a62837b31a89828e9c612eaff093 100644 --- a/config/provider_config.go +++ b/config/provider_config.go @@ -37,7 +37,8 @@ import ( // ProviderConfig is the default configuration of service provider type ProviderConfig struct { - BaseConfig `yaml:",inline"` + BaseConfig `yaml:",inline"` + configCenter Filter string `yaml:"filter" json:"filter,omitempty" property:"filter"` ProxyFactory string `yaml:"proxy_factory" default:"default" json:"proxy_factory,omitempty" property:"proxy_factory"` Services map[string]*ServiceConfig `yaml:"services" json:"services,omitempty" property:"services"` @@ -101,7 +102,7 @@ func configCenterRefreshProvider() error { // fresh it if providerConfig.ConfigCenterConfig != nil { providerConfig.fatherConfig = providerConfig - if err := providerConfig.startConfigCenter(); err != nil { + if err := providerConfig.startConfigCenter((*providerConfig).BaseConfig); err != nil { return perrors.Errorf("start config center error , error message is {%v}", perrors.WithStack(err)) } providerConfig.fresh() diff --git a/config/registry_config.go b/config/registry_config.go index ef527c827e9dac4cd2762f579d30254e9e51150f..703606b836906a8dbe9964e74da0edbe76991f48 100644 --- a/config/registry_config.go +++ b/config/registry_config.go @@ -39,6 +39,7 @@ type RegistryConfig struct { // I changed "type" to "protocol" ,the same as "protocol" field in java class RegistryConfig TimeoutStr string `yaml:"timeout" default:"5s" json:"timeout,omitempty" property:"timeout"` // unit: second Group string `yaml:"group" json:"group,omitempty" property:"group"` + TTL string `yaml:"ttl" default:"10m" json:"ttl,omitempty" property:"ttl"` // unit: minute // for registry Address string `yaml:"address" json:"address,omitempty" property:"address"` Username string `yaml:"username" json:"username,omitempty" property:"username"` @@ -118,6 +119,7 @@ func (c *RegistryConfig) getUrlMap(roleType common.RoleType) url.Values { urlMap.Set(constant.ROLE_KEY, strconv.Itoa(int(roleType))) urlMap.Set(constant.REGISTRY_KEY, c.Protocol) urlMap.Set(constant.REGISTRY_TIMEOUT_KEY, c.TimeoutStr) + urlMap.Set(constant.REGISTRY_TTL_KEY, c.TTL) for k, v := range c.Params { urlMap.Set(k, v) } diff --git a/config/remote_config.go b/config/remote_config.go index 5e0330c571715d99e63688ee944c61f8e48117bb..55380dd5a05b47b5b4677b32daf73b37376673d0 100644 --- a/config/remote_config.go +++ b/config/remote_config.go @@ -22,6 +22,11 @@ import ( ) import ( + perrors "github.com/pkg/errors" +) + +import ( + "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/logger" ) @@ -30,6 +35,7 @@ import ( // so that other module, like config center, registry could reuse the config // but now, only metadata report, metadata service, service discovery use this structure type RemoteConfig struct { + Protocol string `yaml:"protocol" json:"protocol,omitempty"` Address string `yaml:"address" json:"address,omitempty"` TimeoutStr string `default:"5s" yaml:"timeout" json:"timeout,omitempty"` Username string `yaml:"username" json:"username,omitempty" property:"username"` @@ -56,3 +62,15 @@ func (rc *RemoteConfig) GetParam(key string, def string) string { } return param } + +func (rc *RemoteConfig) toURL() (common.URL, error) { + if len(rc.Protocol) == 0 { + return common.URL{}, perrors.Errorf("Must provide protocol in RemoteConfig.") + } + return common.NewURL(rc.Address, + common.WithUsername(rc.Username), + common.WithPassword(rc.Password), + common.WithLocation(rc.Address), + common.WithProtocol(rc.Protocol), + ) +} diff --git a/config_center/nacos/client.go b/config_center/nacos/client.go index 6fe5c4d7df28a7693c732543140ed74f959dc77e..9a09b713fad11afdc06310a2d0072454342ccb0b 100644 --- a/config_center/nacos/client.go +++ b/config_center/nacos/client.go @@ -167,7 +167,6 @@ func initNacosConfigClient(nacosAddrs []string, timeout time.Duration, url commo "serverConfigs": svrConfList, "clientConfig": nacosconst.ClientConfig{ TimeoutMs: uint64(int32(timeout / time.Millisecond)), - ListenInterval: uint64(int32(timeout / time.Millisecond)), NotLoadCacheAtStart: true, LogDir: url.GetParam(constant.NACOS_LOG_DIR_KEY, logDir), CacheDir: url.GetParam(constant.NACOS_CACHE_DIR_KEY, ""), diff --git a/go.mod b/go.mod index 7a472daef7481be390c3debfa00b1d1cd5207c47..c19627378261221b8a9565730ed62f7cc367e5e2 100644 --- a/go.mod +++ b/go.mod @@ -42,7 +42,7 @@ require ( github.com/mitchellh/hashstructure v1.0.0 // indirect github.com/mitchellh/mapstructure v1.2.3 github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd - github.com/nacos-group/nacos-sdk-go v0.3.3-0.20200617023039-50c7537d6a5f + github.com/nacos-group/nacos-sdk-go v1.0.0 github.com/opentracing/opentracing-go v1.1.0 github.com/pierrec/lz4 v2.2.6+incompatible // indirect github.com/pkg/errors v0.9.1 diff --git a/go.sum b/go.sum index 424a11087bb9cad0c62488e8e3e96d64feec6d7d..aa6ecc86e25276eae2747aa5291f718713d164bd 100644 --- a/go.sum +++ b/go.sum @@ -516,8 +516,8 @@ github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3Rllmb github.com/munnerz/goautoneg v0.0.0-20120707110453-a547fc61f48d/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f/go.mod h1:ZdcZmHo+o7JKHSa8/e818NopupXU1YMK5fe1lsApnBw= -github.com/nacos-group/nacos-sdk-go v0.3.3-0.20200617023039-50c7537d6a5f h1:gid5/0AkHvINWK69Fgbidb3BVIXqlf1YEm7wO0NVPsw= -github.com/nacos-group/nacos-sdk-go v0.3.3-0.20200617023039-50c7537d6a5f/go.mod h1:fti1GlX/EB6RDKvzK/P7Vuibqj0JMPJHQwrcTU1tLXk= +github.com/nacos-group/nacos-sdk-go v1.0.0 h1:CufUF7DZca2ZzIrJtMMCDih1sA58BWCglArLMCZArUc= +github.com/nacos-group/nacos-sdk-go v1.0.0/go.mod h1:hlAPn3UdzlxIlSILAyOXKxjFSvDJ9oLzTJ9hLAK1KzA= github.com/nicolai86/scaleway-sdk v1.10.2-0.20180628010248-798f60e20bb2 h1:BQ1HW7hr4IVovMwWg0E0PYcyW8CzqDcVmaew9cujU4s= github.com/nicolai86/scaleway-sdk v1.10.2-0.20180628010248-798f60e20bb2/go.mod h1:TLb2Sg7HQcgGdloNxkrmtgDNR9uVYF3lfdFIN4Ro6Sk= github.com/oklog/run v1.0.0 h1:Ru7dDtJNOyC66gQ5dQmaCa0qIsAUFY3sFpK1Xk8igrw= diff --git a/protocol/rest/server/server_impl/go_restful_server.go b/protocol/rest/server/server_impl/go_restful_server.go index c7d971fcaa5ada0ba02cc436b5ae6705793887ef..6fb9ee8daa7383580b9144ea25954f8ead974dcc 100644 --- a/protocol/rest/server/server_impl/go_restful_server.go +++ b/protocol/rest/server/server_impl/go_restful_server.go @@ -48,8 +48,8 @@ var filterSlice []restful.FilterFunction // GoRestfulServer a rest server implement by go-restful type GoRestfulServer struct { - srv *http.Server - container *restful.Container + srv *http.Server + ws *restful.WebService } // NewGoRestfulServer a constructor of GoRestfulServer @@ -60,13 +60,17 @@ func NewGoRestfulServer() server.RestServer { // Start go-restful server // It will add all go-restful filters func (grs *GoRestfulServer) Start(url common.URL) { - grs.container = restful.NewContainer() + container := restful.NewContainer() for _, filter := range filterSlice { - grs.container.Filter(filter) + container.Filter(filter) } grs.srv = &http.Server{ - Handler: grs.container, + Handler: container, } + grs.ws = &restful.WebService{} + grs.ws.Path("/") + grs.ws.SetDynamicRoutes(true) + container.Add(grs.ws) ln, err := net.Listen("tcp", url.Location) if err != nil { panic(perrors.New(fmt.Sprintf("Restful Server start error:%v", err))) @@ -83,23 +87,21 @@ func (grs *GoRestfulServer) Start(url common.URL) { // Publish a http api in go-restful server // The routeFunc should be invoked when the server receive a request func (grs *GoRestfulServer) Deploy(restMethodConfig *config.RestMethodConfig, routeFunc func(request server.RestServerRequest, response server.RestServerResponse)) { - ws := &restful.WebService{} + rf := func(req *restful.Request, resp *restful.Response) { routeFunc(NewGoRestfulRequestAdapter(req), resp) } - ws.Path(restMethodConfig.Path). + grs.ws.Route(grs.ws.Method(restMethodConfig.MethodType). Produces(strings.Split(restMethodConfig.Produces, ",")...). Consumes(strings.Split(restMethodConfig.Consumes, ",")...). - Route(ws.Method(restMethodConfig.MethodType).To(rf)) - grs.container.Add(ws) - + Path(restMethodConfig.Path).To(rf)) } // Delete a http api in go-restful server func (grs *GoRestfulServer) UnDeploy(restMethodConfig *config.RestMethodConfig) { ws := new(restful.WebService) ws.Path(restMethodConfig.Path) - err := grs.container.Remove(ws) + err := grs.ws.RemoveRoute(restMethodConfig.Path, restMethodConfig.MethodType) if err != nil { logger.Warnf("[Go restful] Remove web service error:%v", err) } diff --git a/protocol/rest/server/server_impl/go_restful_server_test.go b/protocol/rest/server/server_impl/go_restful_server_test.go new file mode 100644 index 0000000000000000000000000000000000000000..b1e66063bf11f1fa805a83b0633a0e1aa8b38b0f --- /dev/null +++ b/protocol/rest/server/server_impl/go_restful_server_test.go @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package server_impl + +import ( + "testing" +) + +import ( + "github.com/stretchr/testify/assert" +) + +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/protocol/rest/config" + "github.com/apache/dubbo-go/protocol/rest/server" +) + +func TestGoRestfulServerDeploySameUrl(t *testing.T) { + grs := NewGoRestfulServer() + url, err := common.NewURL("http://127.0.0.1:43121") + assert.NoError(t, err) + grs.Start(url) + rmc := &config.RestMethodConfig{ + Produces: "*/*", + Consumes: "*/*", + MethodType: "POST", + Path: "/test", + } + f := func(request server.RestServerRequest, response server.RestServerResponse) {} + grs.Deploy(rmc, f) + rmc1 := &config.RestMethodConfig{ + Produces: "*/*", + Consumes: "*/*", + MethodType: "GET", + Path: "/test", + } + grs.Deploy(rmc1, f) + grs.UnDeploy(rmc) + grs.UnDeploy(rmc1) + grs.Destroy() +} diff --git a/registry/nacos/registry.go b/registry/nacos/registry.go index 51d3e2f56abac8e4ab8b966870f1ff5bb79c4171..411090820c7682ab9c3b5576ea8ad5207c2c899f 100644 --- a/registry/nacos/registry.go +++ b/registry/nacos/registry.go @@ -59,6 +59,7 @@ func init() { type nacosRegistry struct { *common.URL namingClient naming_client.INamingClient + registryUrls []common.URL } func getCategory(url common.URL) string { @@ -128,6 +129,36 @@ func (nr *nacosRegistry) Register(url common.URL) error { if !isRegistry { return perrors.New("registry [" + serviceName + "] to nacos failed") } + nr.registryUrls = append(nr.registryUrls, url) + return nil +} + +func createDeregisterParam(url common.URL, serviceName string) vo.DeregisterInstanceParam { + if len(url.Ip) == 0 { + url.Ip = localIP + } + if len(url.Port) == 0 || url.Port == "0" { + url.Port = "80" + } + port, _ := strconv.Atoi(url.Port) + return vo.DeregisterInstanceParam{ + Ip: url.Ip, + Port: uint64(port), + ServiceName: serviceName, + Ephemeral: true, + } +} + +func (nr *nacosRegistry) DeRegister(url common.URL) error { + serviceName := getServiceName(url) + param := createDeregisterParam(url, serviceName) + isDeRegistry, err := nr.namingClient.DeregisterInstance(param) + if err != nil { + return err + } + if !isDeRegistry { + return perrors.New("DeRegistry [" + serviceName + "] to nacos failed") + } return nil } @@ -193,6 +224,13 @@ func (nr *nacosRegistry) IsAvailable() bool { // nolint func (nr *nacosRegistry) Destroy() { + for _, url := range nr.registryUrls { + err := nr.DeRegister(url) + logger.Infof("DeRegister Nacos URL:%+v", url) + if err != nil { + logger.Errorf("Deregister URL:%+v err:%v", url, err.Error()) + } + } return } @@ -209,6 +247,7 @@ func newNacosRegistry(url *common.URL) (registry.Registry, error) { registry := &nacosRegistry{ URL: url, namingClient: client, + registryUrls: []common.URL{}, } return registry, nil } diff --git a/registry/nacos/service_discovery.go b/registry/nacos/service_discovery.go index 63d92d70fd5e1a00f0ce1ca95b1926fb9c36c84b..0e5ad8e6990856aeb0dfdde72f9c3f7fdae3e985 100644 --- a/registry/nacos/service_discovery.go +++ b/registry/nacos/service_discovery.go @@ -60,11 +60,20 @@ type nacosServiceDiscovery struct { // namingClient is the Nacos' client namingClient naming_client.INamingClient + // cache registry instances + registryInstances []registry.ServiceInstance } // Destroy will close the service discovery. // Actually, it only marks the naming client as null and then return func (n *nacosServiceDiscovery) Destroy() error { + for _, inst := range n.registryInstances { + err := n.Unregister(inst) + logger.Infof("Unregister nacos instance:%+v", inst) + if err != nil { + logger.Errorf("Unregister nacos instance:%+v, err:%+v", inst, err) + } + } n.namingClient = nil return nil } @@ -76,6 +85,7 @@ func (n *nacosServiceDiscovery) Register(instance registry.ServiceInstance) erro if err != nil || !ok { return perrors.WithMessage(err, "Could not register the instance. "+instance.GetServiceName()) } + n.registryInstances = append(n.registryInstances, instance) return nil } @@ -118,8 +128,8 @@ func (n *nacosServiceDiscovery) GetServices() *gxset.HashSet { return res } - for _, e := range services { - res.Add(e.Name) + for _, e := range services.Doms { + res.Add(e) } return res } @@ -334,8 +344,9 @@ func newNacosServiceDiscovery(name string) (registry.ServiceDiscovery, error) { descriptor := fmt.Sprintf("nacos-service-discovery[%s]", remoteConfig.Address) return &nacosServiceDiscovery{ - group: group, - namingClient: client, - descriptor: descriptor, + group: group, + namingClient: client, + descriptor: descriptor, + registryInstances: []registry.ServiceInstance{}, }, nil } diff --git a/registry/zookeeper/registry.go b/registry/zookeeper/registry.go index 8f2ac1023b8ad34938b9996b480e3bbc4adbaaea..e8ee51beb70b5a08ec60b213c5342ef52972c59f 100644 --- a/registry/zookeeper/registry.go +++ b/registry/zookeeper/registry.go @@ -243,6 +243,8 @@ func (r *zkRegistry) getListener(conf *common.URL) (*RegistryConfigurationListen var zkListener *RegistryConfigurationListener dataListener := r.dataListener + ttl := r.GetParam(constant.REGISTRY_TTL_KEY, constant.DEFAULT_REG_TTL) + conf.SetParam(constant.REGISTRY_TTL_KEY, ttl) dataListener.mutex.Lock() defer dataListener.mutex.Unlock() if r.dataListener.subscribed[conf.ServiceKey()] != nil { diff --git a/remoting/kubernetes/listener_test.go b/remoting/kubernetes/listener_test.go index 1f398485b2f16defddf44ce1a08a7ecfd9760dd1..0b05b6e6e09501dfd25164e865a3eb6eef91dc9f 100644 --- a/remoting/kubernetes/listener_test.go +++ b/remoting/kubernetes/listener_test.go @@ -19,6 +19,7 @@ package kubernetes import ( "testing" + "time" ) import ( @@ -87,6 +88,7 @@ func TestListener(t *testing.T) { listener := NewEventListener(c) dataListener := &mockDataListener{client: c, changedData: changedData, rc: make(chan remoting.Event)} listener.ListenServiceEvent("/dubbo", dataListener) + time.Sleep(1e9) for _, tc := range tests { diff --git a/remoting/zookeeper/listener.go b/remoting/zookeeper/listener.go index 9a4874db24696d90e4fcc7d9d987f5888f1be599..4f50c18ab61ba6faf373dfd0f831c14ae7ab6d5d 100644 --- a/remoting/zookeeper/listener.go +++ b/remoting/zookeeper/listener.go @@ -37,6 +37,10 @@ import ( "github.com/apache/dubbo-go/remoting" ) +var ( + defaultTTL = 15 * time.Minute +) + // nolint type ZkEventListener struct { client *ZookeeperClient @@ -197,10 +201,20 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkPath string, listen var ( failTimes int + ttl time.Duration event chan struct{} zkEvent zk.Event ) event = make(chan struct{}, 4) + ttl = defaultTTL + if conf != nil { + timeout, err := time.ParseDuration(conf.GetParam(constant.REGISTRY_TTL_KEY, constant.DEFAULT_REG_TTL)) + if err == nil { + ttl = timeout + } else { + logger.Warnf("wrong configuration for registry ttl, error:=%+v, using default value %v instead", err, defaultTTL) + } + } defer close(event) for { // get current children for a zkPath @@ -302,18 +316,29 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkPath string, listen }(dubboPath, listener) } } - select { - case zkEvent = <-childEventCh: - logger.Warnf("get a zookeeper zkEvent{type:%s, server:%s, path:%s, state:%d-%s, err:%s}", - zkEvent.Type.String(), zkEvent.Server, zkEvent.Path, zkEvent.State, StateToString(zkEvent.State), zkEvent.Err) - if zkEvent.Type != zk.EventNodeChildrenChanged { - continue + // Periodically update provider information + ticker := time.NewTicker(ttl) + WATCH: + for { + select { + case <-ticker.C: + l.handleZkNodeEvent(zkEvent.Path, children, listener) + case zkEvent = <-childEventCh: + logger.Warnf("get a zookeeper zkEvent{type:%s, server:%s, path:%s, state:%d-%s, err:%s}", + zkEvent.Type.String(), zkEvent.Server, zkEvent.Path, zkEvent.State, StateToString(zkEvent.State), zkEvent.Err) + ticker.Stop() + if zkEvent.Type != zk.EventNodeChildrenChanged { + break WATCH + } + l.handleZkNodeEvent(zkEvent.Path, children, listener) + break WATCH + case <-l.client.Done(): + logger.Warnf("client.done(), listen(path{%s}) goroutine exit now...", zkPath) + ticker.Stop() + return } - l.handleZkNodeEvent(zkEvent.Path, children, listener) - case <-l.client.Done(): - logger.Warnf("client.done(), listen(path{%s}) goroutine exit now...", zkPath) - return } + } }