diff --git a/common/config/environment.go b/common/config/environment.go index 8709d69a78263ab99501c4f6db78e78c47d2955b..77a962a51183aec139dd2a8ce640b467a32763d3 100644 --- a/common/config/environment.go +++ b/common/config/environment.go @@ -19,6 +19,7 @@ package config import ( "container/list" + "github.com/apache/dubbo-go/config_center" "strings" "sync" ) @@ -29,9 +30,10 @@ import ( // We just have config center configuration which can override configuration in consumer.yaml & provider.yaml. // But for add these features in future ,I finish the environment struct following Environment class in java. type Environment struct { - configCenterFirst bool - externalConfigs sync.Map - externalConfigMap sync.Map + configCenterFirst bool + externalConfigs sync.Map + externalConfigMap sync.Map + dynamicConfiguration config_center.DynamicConfiguration } var ( @@ -68,6 +70,14 @@ func (env *Environment) Configuration() *list.List { return list } +func (env *Environment) SetDynamicConfiguration(dc config_center.DynamicConfiguration) { + env.dynamicConfiguration = dc +} + +func (env *Environment) GetDynamicConfiguration() config_center.DynamicConfiguration { + return env.dynamicConfiguration +} + type InmemoryConfiguration struct { store *sync.Map } diff --git a/common/constant/default.go b/common/constant/default.go index e5c92fad4d3aa569b57e90d59f38c68834f4dba9..b7b8bb023708e69e6d182763449efc7932b739f7 100644 --- a/common/constant/default.go +++ b/common/constant/default.go @@ -22,6 +22,7 @@ const ( PROVIDER_PROTOCOL = "provider" //compatible with 2.6.x OVERRIDE_PROTOCOL = "override" + EMPTY_PROTOCOL = "empty" ) const ( @@ -56,6 +57,8 @@ const ( ) const ( - CONFIGURATORS_CATEGORY = "configurators" - DEFAULT_CATEGORY = "providers" + CONFIGURATORS_CATEGORY = "configurators" + DEFAULT_CATEGORY = "providers" + DYNAMIC_CONFIGURATORS_CATEGORY = "dynamicconfigurators" + APP_DYNAMIC_CONFIGURATORS_CATEGORY = "appdynamicconfigurators" ) diff --git a/common/constant/key.go b/common/constant/key.go index 13d65eb6f8d64f808d736566f4cc18c05cb9b1b4..e8db5713e38df7ef2b0caead60636a249156eb12 100644 --- a/common/constant/key.go +++ b/common/constant/key.go @@ -34,8 +34,8 @@ const ( ENABLED_KEY = "enabled" SIDE_KEY = "side" OVERRIDE_PROVIDERS_KEY = "providerAddresses" - BEAN_NAME_KEY = "bean.name" - GENERIC_KEY = "generic" + BEAN_NAME_KEY = "bean.name" + GENERIC_KEY = "generic" ) const ( @@ -98,6 +98,10 @@ const ( ConsumerConfigPrefix = "dubbo.consumer." ) +const ( + CONFIGURATORS_SUFFIX = ".configurators" +) + const ( NACOS_KEY = "nacos" NACOS_DEFAULT_ROLETYPE = 3 diff --git a/common/extension/configurator.go b/common/extension/configurator.go index 99cc8bdb985ac25e9f2f743342ab11064c7de5ca..f7e1a263cd7541397869060bfd4bcbb6c88f1687 100644 --- a/common/extension/configurator.go +++ b/common/extension/configurator.go @@ -18,33 +18,39 @@ package extension import ( - "github.com/apache/dubbo-go/cluster" "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/config_center" ) var ( - configurator = make(map[string]func(url *common.URL) cluster.Configurator) + configurator = make(map[string]func(url *common.URL) config_center.Configurator) ) -func SetConfigurator(name string, v func(url *common.URL) cluster.Configurator) { +func SetConfigurator(name string, v func(url *common.URL) config_center.Configurator) { configurator[name] = v } -func GetConfigurator(name string, url *common.URL) cluster.Configurator { +func GetConfigurator(name string, url *common.URL) config_center.Configurator { if configurator[name] == nil { panic("config center for " + name + " is not existing, make sure you have import the package.") } return configurator[name](url) } -func SetDefaultConfigurator(v func(url *common.URL) cluster.Configurator) { +func SetDefaultConfigurator(v func(url *common.URL) config_center.Configurator) { configurator["default"] = v } -func GetDefaultConfigurator(url *common.URL) cluster.Configurator { +func GetDefaultConfigurator(url *common.URL) config_center.Configurator { if configurator["default"] == nil { panic("config center for default is not existing, make sure you have import the package.") } return configurator["default"](url) } +func GetDefaultConfiguratorFunc() func(url *common.URL) config_center.Configurator { + if configurator["default"] == nil { + panic("config center for default is not existing, make sure you have import the package.") + } + return configurator["default"] +} diff --git a/common/url.go b/common/url.go index 52472d463a0784b0cb45e667c92cfc38f89e83d0..4d9f6d1208cc5fa5dac052b43466c3246bb01e5a 100644 --- a/common/url.go +++ b/common/url.go @@ -288,6 +288,11 @@ func (c URL) ServiceKey() string { return buf.String() } +func (c *URL) EncodedServiceKey() string { + serviceKey := c.ServiceKey() + return strings.Replace(serviceKey, "/", "*", 1) +} + func (c URL) Context() context.Context { return c.ctx } diff --git a/config/base_config.go b/config/base_config.go index 54ad8aba368c7d9477faad6fbd97c5dccd32dca1..65cc0fe3f99d6189ad7776cf5062b341f34149fb 100644 --- a/config/base_config.go +++ b/config/base_config.go @@ -60,6 +60,7 @@ func (c *BaseConfig) prepareEnvironment() error { factory := extension.GetConfigCenterFactory(c.ConfigCenterConfig.Protocol) dynamicConfig, err := factory.GetDynamicConfiguration(c.configCenterUrl) + config.GetEnvInstance().SetDynamicConfiguration(dynamicConfig) if err != nil { logger.Errorf("Get dynamic configuration error , error message is %v", err) return perrors.WithStack(err) diff --git a/config_center/configuration_listener.go b/config_center/configuration_listener.go new file mode 100644 index 0000000000000000000000000000000000000000..5367ee72dfa71ffb5e2b66d78029fe2c92338e90 --- /dev/null +++ b/config_center/configuration_listener.go @@ -0,0 +1,22 @@ +package config_center + +import ( + "fmt" +) +import ( + "github.com/apache/dubbo-go/remoting" +) + +type ConfigurationListener interface { + Process(*ConfigChangeEvent) +} + +type ConfigChangeEvent struct { + Key string + Value interface{} + ConfigType remoting.EventType +} + +func (c ConfigChangeEvent) String() string { + return fmt.Sprintf("ConfigChangeEvent{key = %v , value = %v , changeType = %v}", c.Key, c.Value, c.ConfigType) +} diff --git a/config_center/configuration_parser.go b/config_center/configuration_parser.go deleted file mode 100644 index ab02789c9267786c900b57c312b1346ee2653fde..0000000000000000000000000000000000000000 --- a/config_center/configuration_parser.go +++ /dev/null @@ -1,24 +0,0 @@ -package config_center - -import ( - "github.com/magiconair/properties" -) -import ( - "github.com/apache/dubbo-go/common/logger" -) - -type ConfigurationParser interface { - Parse(string) (map[string]string, error) -} - -//for support properties file in config center -type DefaultConfigurationParser struct{} - -func (parser *DefaultConfigurationParser) Parse(content string) (map[string]string, error) { - properties, err := properties.LoadString(content) - if err != nil { - logger.Errorf("Parse the content {%v} in DefaultConfigurationParser error ,error message is {%v}", content, err) - return nil, err - } - return properties.Map(), nil -} diff --git a/cluster/configuration.go b/config_center/configurator.go similarity index 54% rename from cluster/configuration.go rename to config_center/configurator.go index ebe29a145d55a4a6ebd49f04af3bd04623b023dc..0b0b1ddee81c242e5031cad8809ff236251baeb6 100644 --- a/cluster/configuration.go +++ b/config_center/configurator.go @@ -1,6 +1,8 @@ -package cluster +package config_center -import "github.com/apache/dubbo-go/common" +import ( + "github.com/apache/dubbo-go/common" +) type Configurator interface { GetUrl() *common.URL diff --git a/config_center/configurator/mock.go b/config_center/configurator/mock.go new file mode 100644 index 0000000000000000000000000000000000000000..8547d562e1e815f6d5fff2c421c24ed62316f7fc --- /dev/null +++ b/config_center/configurator/mock.go @@ -0,0 +1,22 @@ +package configurator + +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/config_center" +) + +func NewMockConfigurator(url *common.URL) config_center.Configurator { + return &mockConfigurator{configuratorUrl: url} +} + +type mockConfigurator struct { + configuratorUrl *common.URL +} + +func (c *mockConfigurator) GetUrl() *common.URL { + return c.configuratorUrl +} + +func (c *mockConfigurator) Configure(url *common.URL) { + +} diff --git a/cluster/configurator/override.go b/config_center/configurator/override.go similarity index 96% rename from cluster/configurator/override.go rename to config_center/configurator/override.go index 797665e51388fbe3e8c8c46024be4a1f08a67055..78ab3d8533cca43c0b15bfc4f089958948d07c59 100644 --- a/cluster/configurator/override.go +++ b/config_center/configurator/override.go @@ -1,19 +1,23 @@ package configurator import ( - "github.com/apache/dubbo-go/cluster" + "strings" +) +import ( + "github.com/dubbogo/gost/container" +) +import ( "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/common/utils" - "github.com/dubbogo/gost/container" - "strings" + "github.com/apache/dubbo-go/config_center" ) func init() { extension.SetDefaultConfigurator(newConfigurator) } -func newConfigurator(url *common.URL) cluster.Configurator { +func newConfigurator(url *common.URL) config_center.Configurator { return &overrideConfigurator{configuratorUrl: url} } diff --git a/cluster/configurator/override_test.go b/config_center/configurator/override_test.go similarity index 100% rename from cluster/configurator/override_test.go rename to config_center/configurator/override_test.go diff --git a/config_center/dynamic_configuration.go b/config_center/dynamic_configuration.go index 3b829507b1cfbcb687b194ac971c9b827c1c3e8b..c3718ee4c9deabfcbec38e8a83361cd7a57202e0 100644 --- a/config_center/dynamic_configuration.go +++ b/config_center/dynamic_configuration.go @@ -21,7 +21,7 @@ import ( "time" ) import ( - "github.com/apache/dubbo-go/remoting" + "github.com/apache/dubbo-go/config_center/parser" ) ////////////////////////////////////////// @@ -31,10 +31,10 @@ const DEFAULT_GROUP = "dubbo" const DEFAULT_CONFIG_TIMEOUT = "10s" type DynamicConfiguration interface { - Parser() ConfigurationParser - SetParser(ConfigurationParser) - AddListener(string, remoting.ConfigurationListener, ...Option) - RemoveListener(string, remoting.ConfigurationListener, ...Option) + Parser() parser.ConfigurationParser + SetParser(parser.ConfigurationParser) + AddListener(string, ConfigurationListener, ...Option) + RemoveListener(string, ConfigurationListener, ...Option) GetConfig(string, ...Option) (string, error) GetConfigs(string, ...Option) (string, error) } diff --git a/config_center/mock_dynamic_config.go b/config_center/mock_dynamic_config.go index a6c7267a4fdd68fda0bde80f16edae1d97e58a51..7ecffca160a9bab76b1b7462c5b17db6eef82b44 100644 --- a/config_center/mock_dynamic_config.go +++ b/config_center/mock_dynamic_config.go @@ -18,11 +18,11 @@ package config_center import ( + "github.com/apache/dubbo-go/config_center/parser" "sync" ) import ( "github.com/apache/dubbo-go/common" - "github.com/apache/dubbo-go/remoting" ) type MockDynamicConfigurationFactory struct{} @@ -36,7 +36,7 @@ func (f *MockDynamicConfigurationFactory) GetDynamicConfiguration(url *common.UR var err error once.Do(func() { dynamicConfiguration = &mockDynamicConfiguration{} - dynamicConfiguration.SetParser(&DefaultConfigurationParser{}) + dynamicConfiguration.SetParser(&parser.DefaultConfigurationParser{}) dynamicConfiguration.content = ` dubbo.consumer.request_timeout=5s dubbo.consumer.connect_timeout=5s @@ -67,14 +67,14 @@ func (f *MockDynamicConfigurationFactory) GetDynamicConfiguration(url *common.UR } type mockDynamicConfiguration struct { - parser ConfigurationParser + parser parser.ConfigurationParser content string } -func (c *mockDynamicConfiguration) AddListener(key string, listener remoting.ConfigurationListener, opions ...Option) { +func (c *mockDynamicConfiguration) AddListener(key string, listener ConfigurationListener, opions ...Option) { } -func (c *mockDynamicConfiguration) RemoveListener(key string, listener remoting.ConfigurationListener, opions ...Option) { +func (c *mockDynamicConfiguration) RemoveListener(key string, listener ConfigurationListener, opions ...Option) { } func (c *mockDynamicConfiguration) GetConfig(key string, opts ...Option) (string, error) { @@ -87,9 +87,9 @@ func (c *mockDynamicConfiguration) GetConfigs(key string, opts ...Option) (strin return c.GetConfig(key, opts...) } -func (c *mockDynamicConfiguration) Parser() ConfigurationParser { +func (c *mockDynamicConfiguration) Parser() parser.ConfigurationParser { return c.parser } -func (c *mockDynamicConfiguration) SetParser(p ConfigurationParser) { +func (c *mockDynamicConfiguration) SetParser(p parser.ConfigurationParser) { c.parser = p } diff --git a/config_center/parser/configuration_parser.go b/config_center/parser/configuration_parser.go new file mode 100644 index 0000000000000000000000000000000000000000..34d5f267630817e5e7324cc8d7d0f430174cbef7 --- /dev/null +++ b/config_center/parser/configuration_parser.go @@ -0,0 +1,231 @@ +package parser + +import ( + "context" + "strconv" + "strings" +) +import ( + "github.com/magiconair/properties" + perrors "github.com/pkg/errors" + "gopkg.in/yaml.v2" +) +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/logger" +) + +type ConfigurationParser interface { + Parse(string) (map[string]string, error) + ParseToUrls(content string) ([]*common.URL, error) +} + +//for support properties file in config center +type DefaultConfigurationParser struct{} + +func (parser *DefaultConfigurationParser) Parse(content string) (map[string]string, error) { + properties, err := properties.LoadString(content) + if err != nil { + logger.Errorf("Parse the content {%v} in DefaultConfigurationParser error ,error message is {%v}", content, err) + return nil, err + } + return properties.Map(), nil +} + +func (parser *DefaultConfigurationParser) ParseToUrls(content string) ([]*common.URL, error) { + config := ConfiguratorConfig{} + if err := yaml.Unmarshal([]byte(content), &config); err != nil { + return nil, err + } + scope := config.Scope + items := config.Configs + var allUrls []*common.URL + if scope == ScopeApplication { + for _, v := range items { + urls, err := appItemToUrls(v, config) + if err != nil { + return nil, err + } + allUrls = append(allUrls, urls...) + } + } else { + for _, v := range items { + urls, err := serviceItemToUrls(v, config) + if err != nil { + return nil, err + } + allUrls = append(allUrls, urls...) + } + } + return allUrls, nil +} +func serviceItemToUrls(item ConfigItem, config ConfiguratorConfig) ([]*common.URL, error) { + var addresses = item.Addresses + if len(addresses) == 0 { + addresses = append(addresses, constant.ANYHOST_VALUE) + } + var urls []*common.URL + for _, v := range addresses { + urlStr := constant.OVERRIDE_PROTOCOL + "://" + v + "/" + serviceStr, err := getServiceString(config.Key) + if err != nil { + return nil, perrors.WithStack(err) + } + urlStr = urlStr + serviceStr + paramStr, err := getParamString(item) + if err != nil { + return nil, perrors.WithStack(err) + } + urlStr = urlStr + paramStr + urlStr = urlStr + getEnabledString(item, config) + urlStr = urlStr + "&category=" + urlStr = urlStr + constant.DYNAMIC_CONFIGURATORS_CATEGORY + urlStr = urlStr + "&configVersion=" + urlStr = urlStr + config.ConfigVersion + apps := item.Applications + if len(apps) > 0 { + for _, v := range apps { + newUrlStr := urlStr + newUrlStr = newUrlStr + "&application" + newUrlStr = newUrlStr + v + url, err := common.NewURL(context.Background(), newUrlStr) + if err != nil { + perrors.WithStack(err) + } + urls = append(urls, &url) + } + } else { + url, err := common.NewURL(context.Background(), urlStr) + if err != nil { + perrors.WithStack(err) + } + urls = append(urls, &url) + } + } + return urls, nil +} +func appItemToUrls(item ConfigItem, config ConfiguratorConfig) ([]*common.URL, error) { + var addresses = item.Addresses + if len(addresses) == 0 { + addresses = append(addresses, constant.ANYHOST_VALUE) + } + var urls []*common.URL + for _, v := range addresses { + urlStr := constant.OVERRIDE_PROTOCOL + "://" + v + "/" + services := item.Services + if len(services) == 0 { + services = append(services, constant.ANY_VALUE) + } + for _, vs := range services { + serviceStr, err := getServiceString(vs) + if err != nil { + return nil, perrors.WithStack(err) + } + urlStr = urlStr + serviceStr + paramStr, err := getParamString(item) + if err != nil { + return nil, perrors.WithStack(err) + } + urlStr = urlStr + paramStr + urlStr = urlStr + "&application=" + urlStr = urlStr + config.Key + urlStr = urlStr + getEnabledString(item, config) + urlStr = urlStr + "&category=" + urlStr = urlStr + constant.APP_DYNAMIC_CONFIGURATORS_CATEGORY + urlStr = urlStr + "&configVersion=" + urlStr = urlStr + config.ConfigVersion + url, err := common.NewURL(context.Background(), urlStr) + if err != nil { + perrors.WithStack(err) + } + urls = append(urls, &url) + } + } + return urls, nil +} + +func getServiceString(service string) (string, error) { + if len(service) == 0 { + return "", perrors.New("service field in configuration is null.") + } + var serviceStr string + i := strings.Index(service, "/") + if i > 0 { + serviceStr = serviceStr + "group=" + serviceStr = serviceStr + service[0:i] + serviceStr = serviceStr + "&" + service = service[i+1:] + } + j := strings.Index(service, ":") + if j > 0 { + serviceStr = serviceStr + "version=" + serviceStr = serviceStr + service[j+1:] + serviceStr = serviceStr + "&" + service = service[0:j] + } + serviceStr = service + "?" + serviceStr + return serviceStr, nil +} + +func getParamString(item ConfigItem) (string, error) { + var retStr string + retStr = retStr + "category=" + retStr = retStr + constant.DYNAMIC_CONFIGURATORS_CATEGORY + if len(item.Side) > 0 { + retStr = retStr + "&side=" + retStr = retStr + item.Side + } + params := item.Parameters + if len(params) <= 0 { + return "", perrors.New("Invalid configurator rule, please specify at least one parameter " + + "you want to change in the rule.") + } + for k, v := range params { + retStr = retStr + "&" + retStr = retStr + k + retStr = retStr + "=" + retStr = retStr + v + } + + if len(item.ProviderAddresses) >= 0 { + retStr = retStr + "&" + retStr = retStr + constant.OVERRIDE_PROVIDERS_KEY + retStr = retStr + "=" + retStr = retStr + strings.Join(item.ProviderAddresses, ",") + } + + return retStr, nil +} +func getEnabledString(item ConfigItem, config ConfiguratorConfig) string { + retStr := "&enabled=" + if len(item.Type) == 0 || item.Type == GeneralType { + retStr = retStr + strconv.FormatBool(config.Enabled) + } else { + retStr = retStr + strconv.FormatBool(item.Enabled) + } + return retStr +} + +const ( + ScopeApplication = "application" + GeneralType = "general" +) + +type ConfiguratorConfig struct { + ConfigVersion string + Scope string + Key string + Enabled bool + Configs []ConfigItem +} +type ConfigItem struct { + Type string + Enabled bool + Addresses []string + ProviderAddresses []string + Services []string + Applications []string + Parameters map[string]string + Side string +} diff --git a/config_center/configuration_parser_test.go b/config_center/parser/configuration_parser_test.go similarity index 94% rename from config_center/configuration_parser_test.go rename to config_center/parser/configuration_parser_test.go index 3c84fd70b03df06184365e132a174dfc640da1c2..08490e8581f4563fdc0e26c10e5126586a131ceb 100644 --- a/config_center/configuration_parser_test.go +++ b/config_center/parser/configuration_parser_test.go @@ -1,4 +1,4 @@ -package config_center +package parser import ( "testing" diff --git a/config_center/zookeeper/factory.go b/config_center/zookeeper/factory.go index c1c7e27b14a5bbe651faddb0c2ff5341195f716e..ac10d2a76fdb20cb6f6689a5e3d2f9475c160dfb 100644 --- a/config_center/zookeeper/factory.go +++ b/config_center/zookeeper/factory.go @@ -18,6 +18,7 @@ package zookeeper import ( + "github.com/apache/dubbo-go/config_center/parser" "sync" ) import ( @@ -44,7 +45,7 @@ func (f *zookeeperDynamicConfigurationFactory) GetDynamicConfiguration(url *comm if err != nil { return nil, err } - dynamicConfiguration.SetParser(&config_center.DefaultConfigurationParser{}) + dynamicConfiguration.SetParser(&parser.DefaultConfigurationParser{}) return dynamicConfiguration, err } diff --git a/config_center/zookeeper/impl.go b/config_center/zookeeper/impl.go index f2827b2bb693fc1943116686f8056c0edaeadc99..1858a5a83d95818658fd54f94c1d99b36610d058 100644 --- a/config_center/zookeeper/impl.go +++ b/config_center/zookeeper/impl.go @@ -18,6 +18,7 @@ package zookeeper import ( + "github.com/apache/dubbo-go/config_center/parser" "strings" "sync" "time" @@ -31,7 +32,6 @@ import ( "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/logger" "github.com/apache/dubbo-go/config_center" - "github.com/apache/dubbo-go/remoting" "github.com/apache/dubbo-go/remoting/zookeeper" ) @@ -48,7 +48,7 @@ type zookeeperDynamicConfiguration struct { listenerLock sync.Mutex listener *zookeeper.ZkEventListener cacheListener *CacheListener - parser config_center.ConfigurationParser + parser parser.ConfigurationParser } func newZookeeperDynamicConfiguration(url *common.URL) (*zookeeperDynamicConfiguration, error) { @@ -99,11 +99,11 @@ func newMockZookeeperDynamicConfiguration(url *common.URL, opts ...zookeeper.Opt } -func (c *zookeeperDynamicConfiguration) AddListener(key string, listener remoting.ConfigurationListener, opions ...config_center.Option) { +func (c *zookeeperDynamicConfiguration) AddListener(key string, listener config_center.ConfigurationListener, opions ...config_center.Option) { c.cacheListener.AddListener(key, listener) } -func (c *zookeeperDynamicConfiguration) RemoveListener(key string, listener remoting.ConfigurationListener, opions ...config_center.Option) { +func (c *zookeeperDynamicConfiguration) RemoveListener(key string, listener config_center.ConfigurationListener, opions ...config_center.Option) { c.cacheListener.RemoveListener(key, listener) } @@ -143,10 +143,10 @@ func (c *zookeeperDynamicConfiguration) GetConfigs(key string, opts ...config_ce return c.GetConfig(key, opts...) } -func (c *zookeeperDynamicConfiguration) Parser() config_center.ConfigurationParser { +func (c *zookeeperDynamicConfiguration) Parser() parser.ConfigurationParser { return c.parser } -func (c *zookeeperDynamicConfiguration) SetParser(p config_center.ConfigurationParser) { +func (c *zookeeperDynamicConfiguration) SetParser(p parser.ConfigurationParser) { c.parser = p } diff --git a/config_center/zookeeper/impl_test.go b/config_center/zookeeper/impl_test.go index 26b899e82d7f1878d13e7dab113524be09ebde34..b200b9cdbf4cb638bb4a8ae66d2fe5947f861046 100644 --- a/config_center/zookeeper/impl_test.go +++ b/config_center/zookeeper/impl_test.go @@ -19,6 +19,7 @@ package zookeeper import ( "context" "fmt" + "github.com/apache/dubbo-go/config_center/parser" "sync" "testing" ) @@ -37,7 +38,7 @@ import ( func initZkData(group string, t *testing.T) (*zk.TestCluster, *zookeeperDynamicConfiguration) { regurl, _ := common.NewURL(context.TODO(), "registry://127.0.0.1:1111") ts, reg, err := newMockZookeeperDynamicConfiguration(®url) - reg.SetParser(&config_center.DefaultConfigurationParser{}) + reg.SetParser(&parser.DefaultConfigurationParser{}) assert.NoError(t, err) diff --git a/config_center/zookeeper/listener.go b/config_center/zookeeper/listener.go index c79c05c9bcf8ee86d0921ac62e7812ed00c9f5c1..0874f0c2f78790c14fccaa0ed5571c7f30959c11 100644 --- a/config_center/zookeeper/listener.go +++ b/config_center/zookeeper/listener.go @@ -22,6 +22,7 @@ import ( "sync" ) import ( + "github.com/apache/dubbo-go/config_center" "github.com/apache/dubbo-go/remoting" ) @@ -33,21 +34,21 @@ type CacheListener struct { func NewCacheListener(rootPath string) *CacheListener { return &CacheListener{rootPath: rootPath} } -func (l *CacheListener) AddListener(key string, listener remoting.ConfigurationListener) { +func (l *CacheListener) AddListener(key string, listener config_center.ConfigurationListener) { // reference from https://stackoverflow.com/questions/34018908/golang-why-dont-we-have-a-set-datastructure // make a map[your type]struct{} like set in java - listeners, loaded := l.keyListeners.LoadOrStore(key, map[remoting.ConfigurationListener]struct{}{listener: struct{}{}}) + listeners, loaded := l.keyListeners.LoadOrStore(key, map[config_center.ConfigurationListener]struct{}{listener: struct{}{}}) if loaded { - listeners.(map[remoting.ConfigurationListener]struct{})[listener] = struct{}{} + listeners.(map[config_center.ConfigurationListener]struct{})[listener] = struct{}{} l.keyListeners.Store(key, listeners) } } -func (l *CacheListener) RemoveListener(key string, listener remoting.ConfigurationListener) { +func (l *CacheListener) RemoveListener(key string, listener config_center.ConfigurationListener) { listeners, loaded := l.keyListeners.Load(key) if loaded { - delete(listeners.(map[remoting.ConfigurationListener]struct{}), listener) + delete(listeners.(map[config_center.ConfigurationListener]struct{}), listener) } } @@ -59,8 +60,8 @@ func (l *CacheListener) DataChange(event remoting.Event) bool { key := l.pathToKey(event.Path) if key != "" { if listeners, ok := l.keyListeners.Load(key); ok { - for listener := range listeners.(map[remoting.ConfigurationListener]struct{}) { - listener.Process(&remoting.ConfigChangeEvent{Key: key, Value: event.Content, ConfigType: event.Action}) + for listener := range listeners.(map[config_center.ConfigurationListener]struct{}) { + listener.Process(&config_center.ConfigChangeEvent{Key: key, Value: event.Content, ConfigType: event.Action}) } return true } diff --git a/registry/base_configuration_listener.go b/registry/base_configuration_listener.go new file mode 100644 index 0000000000000000000000000000000000000000..f209049877900edfb3739f34be037354654dda0d --- /dev/null +++ b/registry/base_configuration_listener.go @@ -0,0 +1,76 @@ +package registry + +import ( + perrors "github.com/pkg/errors" +) +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/config" + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/logger" + "github.com/apache/dubbo-go/config_center" + "github.com/apache/dubbo-go/remoting" +) + +type BaseConfigurationListener struct { + configurators []config_center.Configurator + dynamicConfiguration config_center.DynamicConfiguration + defaultConfiguratorFunc func(url *common.URL) config_center.Configurator +} + +func (bcl *BaseConfigurationListener) Configurators() []config_center.Configurator { + return bcl.configurators +} +func (bcl *BaseConfigurationListener) InitWith(key string, listener config_center.ConfigurationListener, f func(url *common.URL) config_center.Configurator) error { + bcl.dynamicConfiguration = config.GetEnvInstance().GetDynamicConfiguration() + bcl.defaultConfiguratorFunc = f + bcl.dynamicConfiguration.AddListener(key, listener) + if rawConfig, err := bcl.dynamicConfiguration.GetConfig(key, config_center.WithGroup(constant.DUBBO)); err != nil { + return err + } else if len(rawConfig) > 0 { + bcl.genConfiguratorFromRawRule(rawConfig) + } + return nil +} + +func (bcl *BaseConfigurationListener) Process(event *config_center.ConfigChangeEvent) { + logger.Infof("Notification of overriding rule, change type is: %v , raw config content is:%v", event.ConfigType, event.Value) + if event.ConfigType == remoting.EventTypeDel { + bcl.configurators = nil + } else { + if err := bcl.genConfiguratorFromRawRule(event.Value.(string)); err != nil { + logger.Error(perrors.WithStack(err)) + } + } +} + +func (bcl *BaseConfigurationListener) genConfiguratorFromRawRule(rawConfig string) error { + urls, err := bcl.dynamicConfiguration.Parser().ParseToUrls(rawConfig) + if err != nil { + return perrors.WithMessage(err, "Failed to parse raw dynamic config and it will not take effect, the raw config is: "+ + rawConfig) + } + bcl.configurators = ToConfigurators(urls, bcl.defaultConfiguratorFunc) + return nil +} +func (bcl *BaseConfigurationListener) OverrideUrl(url *common.URL) { + for _, v := range bcl.configurators { + v.Configure(url) + } +} + +func ToConfigurators(urls []*common.URL, f func(url *common.URL) config_center.Configurator) []config_center.Configurator { + if len(urls) == 0 { + return nil + } + var configurators []config_center.Configurator + for _, url := range urls { + if url.Protocol == constant.EMPTY_PROTOCOL { + configurators = []config_center.Configurator{} + break + } + //TODO:anyhost_key judage + configurators = append(configurators, f(url)) + } + return configurators +} diff --git a/registry/directory/directory.go b/registry/directory/directory.go index cd479ed7d64572e5823d783e6812757a4d6129d6..43bc65742b9ddfbca16800ce3a833f35630c87b1 100644 --- a/registry/directory/directory.go +++ b/registry/directory/directory.go @@ -32,6 +32,7 @@ import ( "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/common/logger" + "github.com/apache/dubbo-go/config_center" "github.com/apache/dubbo-go/protocol" "github.com/apache/dubbo-go/protocol/protocolwrapper" "github.com/apache/dubbo-go/registry" @@ -219,6 +220,6 @@ type referenceConfigurationListener struct { url *common.URL } -func (l *referenceConfigurationListener) Process(event *remoting.ConfigChangeEvent) { +func (l *referenceConfigurationListener) Process(event *config_center.ConfigChangeEvent) { //l.directory.refreshInvokers(event) } diff --git a/registry/directory/directory_test.go b/registry/directory/directory_test.go index f31165d0a2e32c89b3d15df3df4e2048dadcb5e5..5c2113de0435ddb5fd924c9026e2bd1b7cbfc520 100644 --- a/registry/directory/directory_test.go +++ b/registry/directory/directory_test.go @@ -74,7 +74,7 @@ func TestSubscribe_Group(t *testing.T) { mockRegistry, _ := registry.NewMockRegistry(&common.URL{}) registryDirectory, _ := NewRegistryDirectory(®url, mockRegistry) - go registryDirectory.Subscribe(*common.NewURLWithOptions(common.WithPath("testservice"))) + go registryDirectory.Subscribe(common.NewURLWithOptions(common.WithPath("testservice"))) //for group1 urlmap := url.Values{} @@ -127,7 +127,7 @@ func normalRegistryDir() (*registryDirectory, *registry.MockRegistry) { mockRegistry, _ := registry.NewMockRegistry(&common.URL{}) registryDirectory, _ := NewRegistryDirectory(&url, mockRegistry) - go registryDirectory.Subscribe(*common.NewURLWithOptions(common.WithPath("testservice"))) + go registryDirectory.Subscribe(common.NewURLWithOptions(common.WithPath("testservice"))) for i := 0; i < 3; i++ { mockRegistry.(*registry.MockRegistry).MockEvent(®istry.ServiceEvent{Action: remoting.EventTypeAdd, Service: *common.NewURLWithOptions(common.WithPath("TEST"+strconv.FormatInt(int64(i), 10)), common.WithProtocol("dubbo"))}) } diff --git a/registry/mock_registry.go b/registry/mock_registry.go index 1fc700edb7cf37eb2613c90d458c58d278507faf..29922ef5bbfdd9b62b77d904a9e75f07db7b3e5a 100644 --- a/registry/mock_registry.go +++ b/registry/mock_registry.go @@ -18,7 +18,9 @@ package registry import ( + "github.com/apache/dubbo-go/common/logger" "go.uber.org/atomic" + "time" ) import ( @@ -53,9 +55,43 @@ func (r *MockRegistry) GetUrl() common.URL { return common.URL{} } -func (r *MockRegistry) Subscribe(common.URL) (Listener, error) { +func (r *MockRegistry) subscribe(*common.URL) (Listener, error) { return r.listener, nil } +func (r *MockRegistry) Subscribe(url *common.URL, notifyListener NotifyListener) { + go func() { + for { + if !r.IsAvailable() { + logger.Warnf("event listener game over.") + time.Sleep(time.Duration(3) * time.Second) + return + } + + listener, err := r.subscribe(url) + if err != nil { + if !r.IsAvailable() { + logger.Warnf("event listener game over.") + return + } + time.Sleep(time.Duration(3) * time.Second) + continue + } + + for { + if serviceEvent, err := listener.Next(); err != nil { + listener.Close() + time.Sleep(time.Duration(3) * time.Second) + return + } else { + logger.Infof("update begin, service event: %v", serviceEvent.String()) + notifyListener.Notify(serviceEvent) + } + + } + + } + }() +} type listener struct { count int64 diff --git a/registry/protocol/protocol.go b/registry/protocol/protocol.go index 58d17940aef125307cd2738e77bdf74739b5c45a..51d4d26a6696f51d892ffc8a1e5fbfd935335baa 100644 --- a/registry/protocol/protocol.go +++ b/registry/protocol/protocol.go @@ -18,7 +18,8 @@ package protocol import ( - "github.com/apache/dubbo-go/cluster" + "github.com/apache/dubbo-go/config" + "github.com/apache/dubbo-go/config_center" "sync" ) @@ -43,7 +44,10 @@ type registryProtocol struct { registries sync.Map //To solve the problem of RMI repeated exposure port conflicts, the services that have been exposed are no longer exposed. //providerurl <--> exporter - bounds sync.Map + bounds sync.Map + overrideListeners sync.Map + serviceConfigurationListeners sync.Map + providerConfigurationListener *providerConfigurationListener } func init() { @@ -51,9 +55,13 @@ func init() { } func newRegistryProtocol() *registryProtocol { + overrideListeners := sync.Map{} return ®istryProtocol{ - registries: sync.Map{}, - bounds: sync.Map{}, + overrideListeners: overrideListeners, + registries: sync.Map{}, + bounds: sync.Map{}, + serviceConfigurationListeners: sync.Map{}, + providerConfigurationListener: newProviderConfigurationListener(&overrideListeners), } } func getRegistry(regUrl *common.URL) registry.Registry { @@ -107,6 +115,14 @@ func (proto *registryProtocol) Export(invoker protocol.Invoker) protocol.Exporte providerUrl := getProviderUrl(invoker) overriderUrl := getSubscribedOverrideUrl(&providerUrl) + // Deprecated! subscribe to override rules in 2.6.x or before. + overrideSubscribeListener := newOverrideSubscribeListener(overriderUrl, invoker, proto) + proto.overrideListeners.Store(overriderUrl, overrideSubscribeListener) + proto.providerConfigurationListener.OverrideUrl(&providerUrl) + serviceConfigurationListener := newServiceConfigurationListener(overrideSubscribeListener, &providerUrl) + proto.serviceConfigurationListeners.Store(providerUrl.ServiceKey(), serviceConfigurationListener) + serviceConfigurationListener.OverrideUrl(&providerUrl) + var reg registry.Registry if regI, loaded := proto.registries.Load(registryUrl.Key()); !loaded { @@ -134,8 +150,6 @@ func (proto *registryProtocol) Export(invoker protocol.Invoker) protocol.Exporte logger.Infof("The exporter has not been cached, and will return a new exporter!") } - // Deprecated! subscribe to override rules in 2.6.x or before. - overrideSubscribeListener := &overrideSubscribeListener{url: overriderUrl, originInvoker: invoker, protocol: proto} reg.Subscribe(overriderUrl, overrideSubscribeListener) return cachedExporter.(protocol.Exporter) @@ -161,9 +175,12 @@ type overrideSubscribeListener struct { url *common.URL originInvoker protocol.Invoker protocol *registryProtocol - configurator cluster.Configurator + configurator config_center.Configurator } +func newOverrideSubscribeListener(overriderUrl *common.URL, invoker protocol.Invoker, proto *registryProtocol) *overrideSubscribeListener { + return &overrideSubscribeListener{url: overriderUrl, originInvoker: invoker, protocol: proto} +} func (nl *overrideSubscribeListener) Notify(event *registry.ServiceEvent) { if isMatched(&(event.Service), nl.url) { nl.configurator = extension.GetDefaultConfigurator(&(event.Service)) @@ -175,7 +192,19 @@ func (nl *overrideSubscribeListener) doOverrideIfNecessary() { key := providerUrl.Key() if exporter, ok := nl.protocol.bounds.Load(key); ok { currentUrl := exporter.(protocol.Exporter).GetInvoker().GetUrl() + // Compatible with the 2.6.x nl.configurator.Configure(&providerUrl) + // provider application level management in 2.7.x + for _, v := range nl.protocol.providerConfigurationListener.Configurators() { + v.Configure(&providerUrl) + } + // provider service level management in 2.7.x + if serviceListener, ok := nl.protocol.serviceConfigurationListeners.Load(providerUrl.ServiceKey()); ok { + for _, v := range serviceListener.(*serviceConfigurationListener).Configurators() { + v.Configure(&providerUrl) + } + } + if currentUrl.String() == providerUrl.String() { newRegUrl := nl.originInvoker.GetUrl() setProviderUrl(&newRegUrl, &providerUrl) @@ -271,3 +300,42 @@ func (ivk *wrappedInvoker) GetUrl() common.URL { func (ivk *wrappedInvoker) getInvoker() protocol.Invoker { return ivk.invoker } + +type providerConfigurationListener struct { + registry.BaseConfigurationListener + overrideListeners *sync.Map +} + +func newProviderConfigurationListener(overrideListeners *sync.Map) *providerConfigurationListener { + listener := &providerConfigurationListener{} + listener.overrideListeners = overrideListeners + //TODO:error handler + _ = listener.BaseConfigurationListener.InitWith(config.GetProviderConfig().ApplicationConfig.Name+constant.CONFIGURATORS_SUFFIX, listener, extension.GetDefaultConfiguratorFunc()) + return listener +} + +func (listener *providerConfigurationListener) Process(event *config_center.ConfigChangeEvent) { + listener.BaseConfigurationListener.Process(event) + listener.overrideListeners.Range(func(key, value interface{}) bool { + value.(*overrideSubscribeListener).doOverrideIfNecessary() + return true + }) +} + +type serviceConfigurationListener struct { + registry.BaseConfigurationListener + overrideListener *overrideSubscribeListener + providerUrl *common.URL +} + +func newServiceConfigurationListener(overrideListener *overrideSubscribeListener, providerUrl *common.URL) *serviceConfigurationListener { + listener := &serviceConfigurationListener{overrideListener: overrideListener, providerUrl: providerUrl} + //TODO:error handler + _ = listener.BaseConfigurationListener.InitWith(providerUrl.EncodedServiceKey()+constant.CONFIGURATORS_SUFFIX, listener, extension.GetDefaultConfiguratorFunc()) + return &serviceConfigurationListener{overrideListener: overrideListener, providerUrl: providerUrl} +} + +func (listener *serviceConfigurationListener) Process(event *config_center.ConfigChangeEvent) { + listener.BaseConfigurationListener.Process(event) + listener.overrideListener.doOverrideIfNecessary() +} diff --git a/registry/protocol/protocol_test.go b/registry/protocol/protocol_test.go index 418f1f6779679b6eb93a74ff7689ca8e76f2c0af..72c0300dfcecdbb1bece198662c104f662517867 100644 --- a/registry/protocol/protocol_test.go +++ b/registry/protocol/protocol_test.go @@ -19,6 +19,8 @@ package protocol import ( "context" + "github.com/apache/dubbo-go/config_center" + "testing" ) @@ -29,13 +31,23 @@ import ( import ( cluster "github.com/apache/dubbo-go/cluster/cluster_impl" "github.com/apache/dubbo-go/common" + commonConfig "github.com/apache/dubbo-go/common/config" "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/config" + "github.com/apache/dubbo-go/config_center/configurator" "github.com/apache/dubbo-go/protocol" "github.com/apache/dubbo-go/protocol/protocolwrapper" "github.com/apache/dubbo-go/registry" ) +func init() { + extension.SetDefaultConfigurator(configurator.NewMockConfigurator) + config.SetProviderConfig(config.ProviderConfig{ApplicationConfig: &config.ApplicationConfig{Name: "mock"}}) + factory := &config_center.MockDynamicConfigurationFactory{} + dc, _ := factory.GetDynamicConfiguration(&common.URL{}) + commonConfig.GetEnvInstance().SetDynamicConfiguration(dc) +} func referNormal(t *testing.T, regProtocol *registryProtocol) { extension.SetProtocol("registry", GetProtocol) extension.SetRegistry("mock", registry.NewMockRegistry) @@ -108,6 +120,7 @@ func exporterNormal(t *testing.T, regProtocol *registryProtocol) { } func TestExporter(t *testing.T) { + regProtocol := newRegistryProtocol() exporterNormal(t, regProtocol) } diff --git a/registry/zookeeper/listener.go b/registry/zookeeper/listener.go index 7d58cee1220b9aedba353d929ca1e936cf9366f2..688ede8b65f4622d41d3f58f05b07f793ddb2224 100644 --- a/registry/zookeeper/listener.go +++ b/registry/zookeeper/listener.go @@ -19,6 +19,7 @@ package zookeeper import ( "context" + "github.com/apache/dubbo-go/config_center" "strings" ) import ( @@ -34,10 +35,10 @@ import ( type RegistryDataListener struct { interestedURL []*common.URL - listener remoting.ConfigurationListener + listener config_center.ConfigurationListener } -func NewRegistryDataListener(listener remoting.ConfigurationListener) *RegistryDataListener { +func NewRegistryDataListener(listener config_center.ConfigurationListener) *RegistryDataListener { return &RegistryDataListener{listener: listener, interestedURL: []*common.URL{}} } func (l *RegistryDataListener) AddInterestedURL(url *common.URL) { @@ -59,7 +60,7 @@ func (l *RegistryDataListener) DataChange(eventType remoting.Event) bool { } for _, v := range l.interestedURL { if serviceURL.URLEqual(*v) { - l.listener.Process(&remoting.ConfigChangeEvent{Value: serviceURL, ConfigType: eventType.Action}) + l.listener.Process(&config_center.ConfigChangeEvent{Value: serviceURL, ConfigType: eventType.Action}) return true } } @@ -70,14 +71,14 @@ func (l *RegistryDataListener) DataChange(eventType remoting.Event) bool { type RegistryConfigurationListener struct { client *zk.ZookeeperClient registry *zkRegistry - events chan *remoting.ConfigChangeEvent + events chan *config_center.ConfigChangeEvent } func NewRegistryConfigurationListener(client *zk.ZookeeperClient, reg *zkRegistry) *RegistryConfigurationListener { reg.wg.Add(1) - return &RegistryConfigurationListener{client: client, registry: reg, events: make(chan *remoting.ConfigChangeEvent, 32)} + return &RegistryConfigurationListener{client: client, registry: reg, events: make(chan *config_center.ConfigChangeEvent, 32)} } -func (l *RegistryConfigurationListener) Process(configType *remoting.ConfigChangeEvent) { +func (l *RegistryConfigurationListener) Process(configType *config_center.ConfigChangeEvent) { l.events <- configType } diff --git a/remoting/etcdv3/listener.go b/remoting/etcdv3/listener.go index 59273af554a63e5fc907ba5a30bb1e18bb22c0f5..f5401917e208f41d3ea84e47c46f535b344e2784 100644 --- a/remoting/etcdv3/listener.go +++ b/remoting/etcdv3/listener.go @@ -100,7 +100,7 @@ func (l *EventListener) handleEvents(event *clientv3.Event, listeners ...remotin logger.Infof("etcd get event (key{%s}) = event{EventNodeDataChanged}", event.Kv.Key) listener.DataChange(remoting.Event{ Path: string(event.Kv.Key), - Action: remoting.EvnetTypeUpdate, + Action: remoting.EventTypeUpdate, Content: string(event.Kv.Value), }) } diff --git a/remoting/listener.go b/remoting/listener.go index b94ba26980b50db99e766fcc8febb07d6b554274..8d1e357d37ff92e7bf60121133998dc1745c9af8 100644 --- a/remoting/listener.go +++ b/remoting/listener.go @@ -17,26 +17,14 @@ package remoting -import "fmt" - -type ConfigurationListener interface { - Process(*ConfigChangeEvent) -} +import ( + "fmt" +) type DataListener interface { DataChange(eventType Event) bool //bool is return for interface implement is interesting } -type ConfigChangeEvent struct { - Key string - Value interface{} - ConfigType EventType -} - -func (c ConfigChangeEvent) String() string { - return fmt.Sprintf("ConfigChangeEvent{key = %v , value = %v , changeType = %v}", c.Key, c.Value, c.ConfigType) -} - ////////////////////////////////////////// // event type //////////////////////////////////////////