diff --git a/cluster/directory/base_directory.go b/cluster/directory/base_directory.go index b2011977140f34d7e86d1414b7ced2d7cda23e9b..1d59b51cc36858b80fb43c1d76e368e89e26ae36 100644 --- a/cluster/directory/base_directory.go +++ b/cluster/directory/base_directory.go @@ -42,6 +42,9 @@ func NewBaseDirectory(url *common.URL) BaseDirectory { func (dir *BaseDirectory) GetUrl() common.URL { return *dir.url } +func (dir *BaseDirectory) GetDirectoryUrl() *common.URL { + return dir.url +} func (dir *BaseDirectory) Destroy(doDestroy func()) { if dir.destroyed.CAS(false, true) { diff --git a/common/constant/default.go b/common/constant/default.go index b7b8bb023708e69e6d182763449efc7932b739f7..9144249ef6d2b04a1e00d2eac210be814e74025d 100644 --- a/common/constant/default.go +++ b/common/constant/default.go @@ -23,6 +23,7 @@ const ( //compatible with 2.6.x OVERRIDE_PROTOCOL = "override" EMPTY_PROTOCOL = "empty" + ROUTER_PROTOCOL = "router" ) const ( @@ -58,7 +59,9 @@ const ( const ( CONFIGURATORS_CATEGORY = "configurators" - DEFAULT_CATEGORY = "providers" + ROUTER_CATEGORY = "category" + DEFAULT_CATEGORY = PROVIDER_CATEGORY DYNAMIC_CONFIGURATORS_CATEGORY = "dynamicconfigurators" APP_DYNAMIC_CONFIGURATORS_CATEGORY = "appdynamicconfigurators" + PROVIDER_CATEGORY = "providers" ) diff --git a/common/url.go b/common/url.go index aeae47b840eb0c959c1b4ddbdb5f075e4eaa2fd2..43c6431c4b16530c2e29050ee2e78d267c50f359 100644 --- a/common/url.go +++ b/common/url.go @@ -454,8 +454,8 @@ func (c URL) ToMap() map[string]string { // configuration > reference config >service config // in this function we should merge the reference local url config into the service url from registry. //TODO configuration merge, in the future , the configuration center's config should merge too. -func MergeUrl(serviceUrl URL, referenceUrl *URL) URL { - mergedUrl := serviceUrl +func MergeUrl(serviceUrl *URL, referenceUrl *URL) *URL { + mergedUrl := serviceUrl.Clone() var methodConfigMergeFcn = []func(method string){} //iterator the referenceUrl if serviceUrl not have the key ,merge in diff --git a/common/url_test.go b/common/url_test.go index 0f23c5a1d0f6d090368c1abccd9ab4093f1d5d25..2f03f1080c1156e0252dba411477244e8ebe3f37 100644 --- a/common/url_test.go +++ b/common/url_test.go @@ -227,7 +227,7 @@ func TestMergeUrl(t *testing.T) { referenceUrl, _ := NewURL(context.TODO(), "mock1://127.0.0.1:1111", WithParams(referenceUrlParams)) serviceUrl, _ := NewURL(context.TODO(), "mock2://127.0.0.1:20000", WithParams(serviceUrlParams)) - mergedUrl := MergeUrl(serviceUrl, &referenceUrl) + mergedUrl := MergeUrl(&serviceUrl, &referenceUrl) assert.Equal(t, "random", mergedUrl.GetParam(constant.CLUSTER_KEY, "")) assert.Equal(t, "1", mergedUrl.GetParam("test2", "")) assert.Equal(t, "1", mergedUrl.GetParam("test3", "")) diff --git a/config/reference_config.go b/config/reference_config.go index f90e3aabd3a68b8dc7a7509331d301ea3a252f92..b3a3bae447d370be281d1786131521e8ba59a22f 100644 --- a/config/reference_config.go +++ b/config/reference_config.go @@ -97,8 +97,8 @@ func (refconfig *ReferenceConfig) Refer() { serviceUrl.Path = "/" + refconfig.id } // merge url need to do - newUrl := common.MergeUrl(serviceUrl, url) - refconfig.urls = append(refconfig.urls, &newUrl) + newUrl := common.MergeUrl(&serviceUrl, url) + refconfig.urls = append(refconfig.urls, newUrl) } } diff --git a/config/service_config.go b/config/service_config.go index 5430f8a7c3c8702117cceefb10171e681a4f29d9..05cdc84f5b48528788ccc94d443ea182fbd0f263 100644 --- a/config/service_config.go +++ b/config/service_config.go @@ -111,7 +111,6 @@ func (srvconfig *ServiceConfig) Export() error { common.WithParams(urlMap), common.WithParamsValue(constant.BEAN_NAME_KEY, srvconfig.id), common.WithMethods(strings.Split(methods, ","))) - if len(regUrls) > 0 { for _, regUrl := range regUrls { regUrl.SubURL = url diff --git a/config_center/mock_dynamic_config.go b/config_center/mock_dynamic_config.go index 7ecffca160a9bab76b1b7462c5b17db6eef82b44..7045c48ec9027b55d5edecd3aa8d3acb1c66da8f 100644 --- a/config_center/mock_dynamic_config.go +++ b/config_center/mock_dynamic_config.go @@ -18,24 +18,27 @@ package config_center import ( - "github.com/apache/dubbo-go/config_center/parser" + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/remoting" + "gopkg.in/yaml.v2" "sync" ) import ( "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/config_center/parser" ) type MockDynamicConfigurationFactory struct{} var ( once sync.Once - dynamicConfiguration *mockDynamicConfiguration + dynamicConfiguration *MockDynamicConfiguration ) func (f *MockDynamicConfigurationFactory) GetDynamicConfiguration(url *common.URL) (DynamicConfiguration, error) { var err error once.Do(func() { - dynamicConfiguration = &mockDynamicConfiguration{} + dynamicConfiguration = &MockDynamicConfiguration{listener: map[string]ConfigurationListener{}} dynamicConfiguration.SetParser(&parser.DefaultConfigurationParser{}) dynamicConfiguration.content = ` dubbo.consumer.request_timeout=5s @@ -66,30 +69,74 @@ func (f *MockDynamicConfigurationFactory) GetDynamicConfiguration(url *common.UR } -type mockDynamicConfiguration struct { - parser parser.ConfigurationParser - content string +type MockDynamicConfiguration struct { + parser parser.ConfigurationParser + content string + listener map[string]ConfigurationListener } -func (c *mockDynamicConfiguration) AddListener(key string, listener ConfigurationListener, opions ...Option) { +func (c *MockDynamicConfiguration) AddListener(key string, listener ConfigurationListener, opions ...Option) { + c.listener[key] = listener } -func (c *mockDynamicConfiguration) RemoveListener(key string, listener ConfigurationListener, opions ...Option) { +func (c *MockDynamicConfiguration) RemoveListener(key string, listener ConfigurationListener, opions ...Option) { } -func (c *mockDynamicConfiguration) GetConfig(key string, opts ...Option) (string, error) { +func (c *MockDynamicConfiguration) GetConfig(key string, opts ...Option) (string, error) { return c.content, nil } //For zookeeper, getConfig and getConfigs have the same meaning. -func (c *mockDynamicConfiguration) GetConfigs(key string, opts ...Option) (string, error) { +func (c *MockDynamicConfiguration) GetConfigs(key string, opts ...Option) (string, error) { return c.GetConfig(key, opts...) } -func (c *mockDynamicConfiguration) Parser() parser.ConfigurationParser { +func (c *MockDynamicConfiguration) Parser() parser.ConfigurationParser { return c.parser } -func (c *mockDynamicConfiguration) SetParser(p parser.ConfigurationParser) { +func (c *MockDynamicConfiguration) SetParser(p parser.ConfigurationParser) { c.parser = p } + +func (c *MockDynamicConfiguration) MockServiceConfigEvent() { + config := &parser.ConfiguratorConfig{ + ConfigVersion: "2.7.1", + Scope: parser.GeneralType, + Key: "org.apache.dubbo-go.mockService", + Enabled: true, + Configs: []parser.ConfigItem{ + {Type: parser.GeneralType, + Enabled: true, + Addresses: []string{"0.0.0.0"}, + Services: []string{"org.apache.dubbo-go.mockService"}, + Side: "provider", + Parameters: map[string]string{"cluster": "mock1"}, + }, + }, + } + value, _ := yaml.Marshal(config) + key := "group*org.apache.dubbo-go.mockService:1.0.0" + constant.CONFIGURATORS_SUFFIX + c.listener[key].Process(&ConfigChangeEvent{Key: key, Value: string(value), ConfigType: remoting.EventTypeAdd}) +} + +func (c *MockDynamicConfiguration) MockApplicationConfigEvent() { + config := &parser.ConfiguratorConfig{ + ConfigVersion: "2.7.1", + Scope: parser.ScopeApplication, + Key: "org.apache.dubbo-go.mockService", + Enabled: true, + Configs: []parser.ConfigItem{ + {Type: parser.ScopeApplication, + Enabled: true, + Addresses: []string{"0.0.0.0"}, + Services: []string{"org.apache.dubbo-go.mockService"}, + Side: "provider", + Parameters: map[string]string{"cluster": "mock1"}, + }, + }, + } + value, _ := yaml.Marshal(config) + key := "test-application" + constant.CONFIGURATORS_SUFFIX + c.listener[key].Process(&ConfigChangeEvent{Key: key, Value: string(value), ConfigType: remoting.EventTypeAdd}) +} diff --git a/config_center/parser/configuration_parser.go b/config_center/parser/configuration_parser.go index 34d5f267630817e5e7324cc8d7d0f430174cbef7..bc0eb1182dc2e6d46cdc947138efa526ad80e2d2 100644 --- a/config_center/parser/configuration_parser.go +++ b/config_center/parser/configuration_parser.go @@ -137,7 +137,7 @@ func appItemToUrls(item ConfigItem, config ConfiguratorConfig) ([]*common.URL, e urlStr = urlStr + config.ConfigVersion url, err := common.NewURL(context.Background(), urlStr) if err != nil { - perrors.WithStack(err) + return nil, perrors.WithStack(err) } urls = append(urls, &url) } @@ -213,19 +213,19 @@ const ( ) type ConfiguratorConfig struct { - ConfigVersion string - Scope string - Key string - Enabled bool - Configs []ConfigItem + ConfigVersion string `yaml:"configVersion"` + Scope string `yaml:"scope"` + Key string `yaml:"key"` + Enabled bool `yaml:"enabled"` + Configs []ConfigItem `yaml:"configs"` } type ConfigItem struct { - Type string - Enabled bool - Addresses []string - ProviderAddresses []string - Services []string - Applications []string - Parameters map[string]string - Side string + Type string `yaml:"type"` + Enabled bool `yaml:"enabled"` + Addresses []string `yaml:"addresses"` + ProviderAddresses []string `yaml:"providerAddresses"` + Services []string `yaml:"services"` + Applications []string `yaml:"applications"` + Parameters map[string]string `yaml:"parameters"` + Side string `yaml:"side"` } diff --git a/registry/directory/directory.go b/registry/directory/directory.go index 3ea5b36fcd7c0a0b183d9b129ce227f3886c1ae5..407890678bf06f55c1f19353e2ba79b6fef2dd7c 100644 --- a/registry/directory/directory.go +++ b/registry/directory/directory.go @@ -32,11 +32,12 @@ import ( "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/common/logger" + "github.com/apache/dubbo-go/config" "github.com/apache/dubbo-go/config_center" + _ "github.com/apache/dubbo-go/config_center/configurator" "github.com/apache/dubbo-go/protocol" "github.com/apache/dubbo-go/protocol/protocolwrapper" "github.com/apache/dubbo-go/registry" - "github.com/apache/dubbo-go/remoting" ) const ( @@ -51,11 +52,15 @@ type Option func(*Options) type registryDirectory struct { directory.BaseDirectory - cacheInvokers []protocol.Invoker - listenerLock sync.Mutex - serviceType string - registry registry.Registry - cacheInvokersMap *sync.Map //use sync.map + cacheInvokers []protocol.Invoker + listenerLock sync.Mutex + serviceType string + registry registry.Registry + cacheInvokersMap *sync.Map //use sync.map + cacheOriginUrl *common.URL + configurators []config_center.Configurator + consumerConfigurationListener *consumerConfigurationListener + referenceConfigurationListener *referenceConfigurationListener Options } @@ -70,28 +75,27 @@ func NewRegistryDirectory(url *common.URL, registry registry.Registry, opts ...O if url.SubURL == nil { return nil, perrors.Errorf("url is invalid, suburl can not be nil") } - return ®istryDirectory{ + dir := ®istryDirectory{ BaseDirectory: directory.NewBaseDirectory(url), cacheInvokers: []protocol.Invoker{}, cacheInvokersMap: &sync.Map{}, serviceType: url.SubURL.Service(), registry: registry, Options: options, - }, nil + } + dir.consumerConfigurationListener = newConsumerConfigurationListener(dir) + return dir, nil } //subscibe from registry func (dir *registryDirectory) Subscribe(url *common.URL) { - notifyListener := ¬ifyListener{dir} - dir.registry.Subscribe(url, notifyListener) -} - -type notifyListener struct { - dir *registryDirectory + dir.consumerConfigurationListener.addNotifyListener(dir) + dir.referenceConfigurationListener = newReferenceConfigurationListener(dir, url) + dir.registry.Subscribe(url, dir) } -func (nl *notifyListener) Notify(event *registry.ServiceEvent) { - go nl.dir.update(event) +func (dir *registryDirectory) Notify(event *registry.ServiceEvent) { + go dir.update(event) } //subscribe service from registry, and update the cacheServices @@ -105,21 +109,33 @@ func (dir *registryDirectory) update(res *registry.ServiceEvent) { } func (dir *registryDirectory) refreshInvokers(res *registry.ServiceEvent) { - - switch res.Action { - case remoting.EventTypeAdd: - //dir.cacheService.EventTypeAdd(res.Path, dir.serviceTTL) - dir.cacheInvoker(res.Service) - case remoting.EventTypeDel: - //dir.cacheService.EventTypeDel(res.Path, dir.serviceTTL) - dir.uncacheInvoker(res.Service) - logger.Infof("selector delete service url{%s}", res.Service) - default: - return + var url *common.URL + //judge is override or others + if res != nil { + url = &res.Service + //1.for override url in 2.6.x + if url.Protocol == constant.OVERRIDE_PROTOCOL || + url.GetParam(constant.CATEGORY_KEY, constant.DEFAULT_CATEGORY) == constant.CONFIGURATORS_CATEGORY { + dir.configurators = append(dir.configurators, extension.GetDefaultConfigurator(url)) + } else if url.Protocol == constant.ROUTER_PROTOCOL || //2.for router + url.GetParam(constant.CATEGORY_KEY, constant.DEFAULT_CATEGORY) == constant.ROUTER_CATEGORY { + //TODO: router + } } - + // + //switch res.Action { + //case remoting.EventTypeAdd: + // //dir.cacheService.EventTypeAdd(res.Path, dir.serviceTTL) + // dir.cacheInvoker(&res.Service) + //case remoting.EventTypeDel: + // //dir.cacheService.EventTypeDel(res.Path, dir.serviceTTL) + // dir.uncacheInvoker(&res.Service) + // logger.Infof("selector delete service url{%s}", res.Service) + //default: + // return + //} + dir.cacheInvoker(url) newInvokers := dir.toGroupInvokers() - dir.listenerLock.Lock() defer dir.listenerLock.Unlock() dir.cacheInvokers = newInvokers @@ -160,22 +176,33 @@ func (dir *registryDirectory) toGroupInvokers() []protocol.Invoker { return groupInvokersList } -func (dir *registryDirectory) uncacheInvoker(url common.URL) { +func (dir *registryDirectory) uncacheInvoker(url *common.URL) { logger.Debugf("service will be deleted in cache invokers: invokers key is %s!", url.Key()) dir.cacheInvokersMap.Delete(url.Key()) } -func (dir *registryDirectory) cacheInvoker(url common.URL) { - referenceUrl := dir.GetUrl().SubURL +func (dir *registryDirectory) cacheInvoker(url *common.URL) { + dir.overrideUrl(dir.GetDirectoryUrl()) + referenceUrl := dir.GetDirectoryUrl().SubURL + + if url == nil && dir.cacheOriginUrl != nil { + url = dir.cacheOriginUrl + } else { + dir.cacheOriginUrl = url + } + if url == nil { + logger.Error("URL is nil ,pls check if service url is subscribe successfully!") + return + } //check the url's protocol is equal to the protocol which is configured in reference config or referenceUrl is not care about protocol if url.Protocol == referenceUrl.Protocol || referenceUrl.Protocol == "" { - url = common.MergeUrl(url, referenceUrl) - - if _, ok := dir.cacheInvokersMap.Load(url.Key()); !ok { + newUrl := common.MergeUrl(url, referenceUrl) + dir.overrideUrl(newUrl) + if _, ok := dir.cacheInvokersMap.Load(newUrl.Key()); !ok { logger.Debugf("service will be added in cache invokers: invokers key is %s!", url.Key()) - newInvoker := extension.GetProtocol(protocolwrapper.FILTER).Refer(url) + newInvoker := extension.GetProtocol(protocolwrapper.FILTER).Refer(*newUrl) if newInvoker != nil { - dir.cacheInvokersMap.Store(url.Key(), newInvoker) + dir.cacheInvokersMap.Store(newUrl.Key(), newInvoker) } } } @@ -209,12 +236,50 @@ func (dir *registryDirectory) Destroy() { dir.cacheInvokers = []protocol.Invoker{} }) } +func (dir *registryDirectory) overrideUrl(targetUrl *common.URL) { + doOverrideUrl(dir.configurators, targetUrl) + doOverrideUrl(dir.consumerConfigurationListener.Configurators(), targetUrl) + doOverrideUrl(dir.referenceConfigurationListener.Configurators(), targetUrl) +} + +func doOverrideUrl(configurators []config_center.Configurator, targetUrl *common.URL) { + for _, v := range configurators { + v.Configure(targetUrl) + } +} type referenceConfigurationListener struct { + registry.BaseConfigurationListener directory *registryDirectory url *common.URL } +func newReferenceConfigurationListener(dir *registryDirectory, url *common.URL) *referenceConfigurationListener { + listener := &referenceConfigurationListener{directory: dir, url: url} + listener.InitWith(url.EncodedServiceKey()+constant.CONFIGURATORS_SUFFIX, listener, extension.GetDefaultConfiguratorFunc()) + return listener +} + func (l *referenceConfigurationListener) Process(event *config_center.ConfigChangeEvent) { - //l.directory.refreshInvokers(event) + l.BaseConfigurationListener.Process(event) + l.directory.refreshInvokers(nil) +} + +type consumerConfigurationListener struct { + registry.BaseConfigurationListener + listeners []registry.NotifyListener + directory *registryDirectory +} + +func newConsumerConfigurationListener(dir *registryDirectory) *consumerConfigurationListener { + listener := &consumerConfigurationListener{directory: dir} + listener.InitWith(config.GetConsumerConfig().ApplicationConfig.Name+constant.CONFIGURATORS_SUFFIX, listener, extension.GetDefaultConfiguratorFunc()) + return listener +} +func (l *consumerConfigurationListener) addNotifyListener(listener registry.NotifyListener) { + l.listeners = append(l.listeners, listener) +} +func (l *consumerConfigurationListener) Process(event *config_center.ConfigChangeEvent) { + l.BaseConfigurationListener.Process(event) + l.directory.refreshInvokers(nil) } diff --git a/registry/directory/directory_test.go b/registry/directory/directory_test.go index 5c2113de0435ddb5fd924c9026e2bd1b7cbfc520..e606720554fdcd52728b690a1d93d7cc8c03cd03 100644 --- a/registry/directory/directory_test.go +++ b/registry/directory/directory_test.go @@ -34,12 +34,16 @@ import ( "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/config" "github.com/apache/dubbo-go/protocol/invocation" "github.com/apache/dubbo-go/protocol/protocolwrapper" "github.com/apache/dubbo-go/registry" "github.com/apache/dubbo-go/remoting" ) +func init() { + config.SetConsumerConfig(config.ConsumerConfig{ApplicationConfig: &config.ApplicationConfig{Name: "test-application"}}) +} func TestSubscribe(t *testing.T) { registryDirectory, _ := normalRegistryDir() @@ -47,14 +51,15 @@ func TestSubscribe(t *testing.T) { assert.Len(t, registryDirectory.cacheInvokers, 3) } -func TestSubscribe_Delete(t *testing.T) { - registryDirectory, mockRegistry := normalRegistryDir() - time.Sleep(1e9) - assert.Len(t, registryDirectory.cacheInvokers, 3) - mockRegistry.MockEvent(®istry.ServiceEvent{Action: remoting.EventTypeDel, Service: *common.NewURLWithOptions(common.WithPath("TEST0"), common.WithProtocol("dubbo"))}) - time.Sleep(1e9) - assert.Len(t, registryDirectory.cacheInvokers, 2) -} +////Deprecated! not support delete +//func TestSubscribe_Delete(t *testing.T) { +// registryDirectory, mockRegistry := normalRegistryDir() +// time.Sleep(1e9) +// assert.Len(t, registryDirectory.cacheInvokers, 3) +// mockRegistry.MockEvent(®istry.ServiceEvent{Action: remoting.EventTypeDel, Service: *common.NewURLWithOptions(common.WithPath("TEST0"), common.WithProtocol("dubbo"))}) +// time.Sleep(1e9) +// assert.Len(t, registryDirectory.cacheInvokers, 2) +//} func TestSubscribe_InvalidUrl(t *testing.T) { url, _ := common.NewURL(context.TODO(), "mock://127.0.0.1:1111") @@ -116,20 +121,56 @@ func Test_List(t *testing.T) { assert.Len(t, registryDirectory.List(&invocation.RPCInvocation{}), 3) assert.Equal(t, true, registryDirectory.IsAvailable()) +} +func Test_MergeProviderUrl(t *testing.T) { + registryDirectory, mockRegistry := normalRegistryDir(true) + providerUrl, _ := common.NewURL(context.TODO(), "dubbo://0.0.0.0:20000/org.apache.dubbo-go.mockService", + common.WithParamsValue(constant.CLUSTER_KEY, "mock1"), + common.WithParamsValue(constant.GROUP_KEY, "group"), + common.WithParamsValue(constant.VERSION_KEY, "1.0.0")) + mockRegistry.MockEvent(®istry.ServiceEvent{Action: remoting.EventTypeAdd, Service: providerUrl}) + time.Sleep(1e9) + assert.Len(t, registryDirectory.cacheInvokers, 1) + if len(registryDirectory.cacheInvokers) > 0 { + assert.Equal(t, "mock", registryDirectory.cacheInvokers[0].GetUrl().GetParam(constant.CLUSTER_KEY, "")) + } + } -func normalRegistryDir() (*registryDirectory, *registry.MockRegistry) { +func Test_MergeOverrideUrl(t *testing.T) { + registryDirectory, mockRegistry := normalRegistryDir(true) + providerUrl, _ := common.NewURL(context.TODO(), "dubbo://0.0.0.0:20000/org.apache.dubbo-go.mockService", + common.WithParamsValue(constant.CLUSTER_KEY, "mock"), + common.WithParamsValue(constant.GROUP_KEY, "group"), + common.WithParamsValue(constant.VERSION_KEY, "1.0.0")) + mockRegistry.MockEvent(®istry.ServiceEvent{Action: remoting.EventTypeAdd, Service: providerUrl}) + overrideUrl, _ := common.NewURL(context.TODO(), "override://0.0.0.0:20000/org.apache.dubbo-go.mockService", + common.WithParamsValue(constant.CLUSTER_KEY, "mock1"), + common.WithParamsValue(constant.GROUP_KEY, "group"), + common.WithParamsValue(constant.VERSION_KEY, "1.0.0")) + mockRegistry.MockEvent(®istry.ServiceEvent{Action: remoting.EventTypeAdd, Service: overrideUrl}) + time.Sleep(1e9) + assert.Len(t, registryDirectory.cacheInvokers, 1) + if len(registryDirectory.cacheInvokers) > 0 { + assert.Equal(t, "mock1", registryDirectory.cacheInvokers[0].GetUrl().GetParam(constant.CLUSTER_KEY, "")) + } + +} + +func normalRegistryDir(noMockEvent ...bool) (*registryDirectory, *registry.MockRegistry) { extension.SetProtocol(protocolwrapper.FILTER, protocolwrapper.NewMockProtocolFilter) url, _ := common.NewURL(context.TODO(), "mock://127.0.0.1:1111") - suburl, _ := common.NewURL(context.TODO(), "dubbo://127.0.0.1:20000") + suburl, _ := common.NewURL(context.TODO(), "dubbo://127.0.0.1:20000/org.apache.dubbo-go.mockService", common.WithParamsValue(constant.CLUSTER_KEY, "mock"), common.WithParamsValue(constant.GROUP_KEY, "group"), common.WithParamsValue(constant.VERSION_KEY, "1.0.0")) url.SubURL = &suburl mockRegistry, _ := registry.NewMockRegistry(&common.URL{}) registryDirectory, _ := NewRegistryDirectory(&url, mockRegistry) - go registryDirectory.Subscribe(common.NewURLWithOptions(common.WithPath("testservice"))) - for i := 0; i < 3; i++ { - mockRegistry.(*registry.MockRegistry).MockEvent(®istry.ServiceEvent{Action: remoting.EventTypeAdd, Service: *common.NewURLWithOptions(common.WithPath("TEST"+strconv.FormatInt(int64(i), 10)), common.WithProtocol("dubbo"))}) + go registryDirectory.Subscribe(&suburl) + if len(noMockEvent) == 0 { + for i := 0; i < 3; i++ { + mockRegistry.(*registry.MockRegistry).MockEvent(®istry.ServiceEvent{Action: remoting.EventTypeAdd, Service: *common.NewURLWithOptions(common.WithPath("TEST"+strconv.FormatInt(int64(i), 10)), common.WithProtocol("dubbo"))}) + } } return registryDirectory, mockRegistry.(*registry.MockRegistry) } diff --git a/registry/etcdv3/registry.go b/registry/etcdv3/registry.go index 4b140f893f1595a0fcf3a522766fee39e9d26a78..845be96fe8428c6da89f020e7a4e272b68ec032d 100644 --- a/registry/etcdv3/registry.go +++ b/registry/etcdv3/registry.go @@ -322,7 +322,9 @@ func (r *etcdV3Registry) subscribe(svc *common.URL) (registry.Listener, error) { //register the svc to dataListener r.dataListener.AddInterestedURL(svc) - go r.listener.ListenServiceEvent(fmt.Sprintf("/dubbo/%s/providers", svc.Service()), r.dataListener) + for _, v := range strings.Split(svc.GetParam(constant.CATEGORY_KEY, constant.DEFAULT_CATEGORY), ",") { + go r.listener.ListenServiceEvent(fmt.Sprintf("/dubbo/%s/"+v, svc.Service()), r.dataListener) + } return configListener, nil } diff --git a/registry/protocol/protocol.go b/registry/protocol/protocol.go index 0289ddac56f36b3b039232c6d1c1b6090dff9ff1..75563bc2f703a65bab4b256d290f312587766e1c 100644 --- a/registry/protocol/protocol.go +++ b/registry/protocol/protocol.go @@ -18,9 +18,6 @@ package protocol import ( - "github.com/apache/dubbo-go/config" - "github.com/apache/dubbo-go/config_center" - "github.com/apache/dubbo-go/remoting" "strings" "sync" ) @@ -30,10 +27,14 @@ import ( "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/common/logger" + "github.com/apache/dubbo-go/config" + "github.com/apache/dubbo-go/config_center" + _ "github.com/apache/dubbo-go/config_center/configurator" "github.com/apache/dubbo-go/protocol" "github.com/apache/dubbo-go/protocol/protocolwrapper" "github.com/apache/dubbo-go/registry" directory2 "github.com/apache/dubbo-go/registry/directory" + "github.com/apache/dubbo-go/remoting" ) var ( @@ -43,13 +44,14 @@ var ( type registryProtocol struct { invokers []protocol.Invoker // Registry Map<RegistryAddress, Registry> - registries sync.Map + registries *sync.Map //To solve the problem of RMI repeated exposure port conflicts, the services that have been exposed are no longer exposed. //providerurl <--> exporter - bounds sync.Map - overrideListeners sync.Map - serviceConfigurationListeners sync.Map + bounds *sync.Map + overrideListeners *sync.Map + serviceConfigurationListeners *sync.Map providerConfigurationListener *providerConfigurationListener + once sync.Once } func init() { @@ -63,13 +65,9 @@ func getCacheKey(url *common.URL) string { return newUrl.String() } func newRegistryProtocol() *registryProtocol { - overrideListeners := sync.Map{} return ®istryProtocol{ - overrideListeners: overrideListeners, - registries: sync.Map{}, - bounds: sync.Map{}, - serviceConfigurationListeners: sync.Map{}, - providerConfigurationListener: newProviderConfigurationListener(&overrideListeners), + registries: &sync.Map{}, + bounds: &sync.Map{}, } } func getRegistry(regUrl *common.URL) registry.Registry { @@ -80,6 +78,11 @@ func getRegistry(regUrl *common.URL) registry.Registry { } return reg } +func (proto *registryProtocol) initConfigurationListeners() { + proto.overrideListeners = &sync.Map{} + proto.serviceConfigurationListeners = &sync.Map{} + proto.providerConfigurationListener = newProviderConfigurationListener(proto.overrideListeners) +} func (proto *registryProtocol) Refer(url common.URL) protocol.Invoker { var registryUrl = url @@ -119,6 +122,10 @@ func (proto *registryProtocol) Refer(url common.URL) protocol.Invoker { } func (proto *registryProtocol) Export(invoker protocol.Invoker) protocol.Exporter { + + proto.once.Do(func() { + proto.initConfigurationListeners() + }) registryUrl := getRegistryUrl(invoker) providerUrl := getProviderUrl(invoker) @@ -197,14 +204,17 @@ func (nl *overrideSubscribeListener) doOverrideIfNecessary() { if exporter, ok := nl.protocol.bounds.Load(key); ok { currentUrl := exporter.(protocol.Exporter).GetInvoker().GetUrl() // Compatible with the 2.6.x - nl.configurator.Configure(providerUrl) + if nl.configurator != nil { + nl.configurator.Configure(providerUrl) + } // provider application level management in 2.7.x for _, v := range nl.protocol.providerConfigurationListener.Configurators() { v.Configure(providerUrl) } // provider service level management in 2.7.x if serviceListener, ok := nl.protocol.serviceConfigurationListeners.Load(providerUrl.ServiceKey()); ok { - for _, v := range serviceListener.(*serviceConfigurationListener).Configurators() { + listener := serviceListener.(*serviceConfigurationListener) + for _, v := range listener.Configurators() { v.Configure(providerUrl) } } @@ -247,7 +257,10 @@ func isMatched(providerUrl *common.URL, consumerUrl *common.URL) bool { //todo: public static boolean isContains(String values, String value) { // return isNotEmpty(values) && isContains(COMMA_SPLIT_PATTERN.split(values), value); // } - return (consumerGroup == constant.ANY_VALUE || consumerGroup == providerGroup || strings.Contains(consumerGroup, providerGroup)) && (consumerVersion == constant.ANY_VALUE || consumerVersion == providerVersion) && (consumerClassifier == "" || consumerClassifier == constant.ANY_VALUE || consumerClassifier == providerClassifier) + return (consumerGroup == constant.ANY_VALUE || consumerGroup == providerGroup || + strings.Contains(consumerGroup, providerGroup)) && (consumerVersion == constant.ANY_VALUE || + consumerVersion == providerVersion) && (len(consumerClassifier) == 0 || consumerClassifier == constant.ANY_VALUE || + consumerClassifier == providerClassifier) } func isMatchCategory(category string, categories string) bool { if categories == "" { @@ -346,8 +359,7 @@ type providerConfigurationListener struct { func newProviderConfigurationListener(overrideListeners *sync.Map) *providerConfigurationListener { listener := &providerConfigurationListener{} listener.overrideListeners = overrideListeners - //TODO:error handler - listener.BaseConfigurationListener.InitWith(config.GetProviderConfig().ApplicationConfig.Name+constant.CONFIGURATORS_SUFFIX, listener, extension.GetDefaultConfiguratorFunc()) + listener.InitWith(config.GetProviderConfig().ApplicationConfig.Name+constant.CONFIGURATORS_SUFFIX, listener, extension.GetDefaultConfiguratorFunc()) return listener } @@ -367,9 +379,8 @@ type serviceConfigurationListener struct { func newServiceConfigurationListener(overrideListener *overrideSubscribeListener, providerUrl *common.URL) *serviceConfigurationListener { listener := &serviceConfigurationListener{overrideListener: overrideListener, providerUrl: providerUrl} - //TODO:error handler - listener.BaseConfigurationListener.InitWith(providerUrl.EncodedServiceKey()+constant.CONFIGURATORS_SUFFIX, listener, extension.GetDefaultConfiguratorFunc()) - return &serviceConfigurationListener{overrideListener: overrideListener, providerUrl: providerUrl} + listener.InitWith(providerUrl.EncodedServiceKey()+constant.CONFIGURATORS_SUFFIX, listener, extension.GetDefaultConfiguratorFunc()) + return listener } func (listener *serviceConfigurationListener) Process(event *config_center.ConfigChangeEvent) { diff --git a/registry/protocol/protocol_test.go b/registry/protocol/protocol_test.go index 55b031bfdb1bea7bfb2aeff22ee6137dc6f4845d..45f9069a7327be15c15fe5028e85700645bf4ab8 100644 --- a/registry/protocol/protocol_test.go +++ b/registry/protocol/protocol_test.go @@ -43,8 +43,7 @@ import ( ) func init() { - extension.SetDefaultConfigurator(configurator.NewMockConfigurator) - config.SetProviderConfig(config.ProviderConfig{ApplicationConfig: &config.ApplicationConfig{Name: "mock"}}) + config.SetProviderConfig(config.ProviderConfig{ApplicationConfig: &config.ApplicationConfig{Name: "test-application"}}) } func referNormal(t *testing.T, regProtocol *registryProtocol) { extension.SetProtocol("registry", GetProtocol) @@ -63,6 +62,7 @@ func referNormal(t *testing.T, regProtocol *registryProtocol) { } func TestRefer(t *testing.T) { + config.SetConsumerConfig(config.ConsumerConfig{ApplicationConfig: &config.ApplicationConfig{Name: "test-application"}}) regProtocol := newRegistryProtocol() referNormal(t, regProtocol) } @@ -107,7 +107,7 @@ func exporterNormal(t *testing.T, regProtocol *registryProtocol) *common.URL { extension.SetRegistry("mock", registry.NewMockRegistry) extension.SetProtocol(protocolwrapper.FILTER, protocolwrapper.NewMockProtocolFilter) url, _ := common.NewURL(context.TODO(), "mock://127.0.0.1:1111") - suburl, _ := common.NewURL(context.TODO(), "dubbo://127.0.0.1:20000/org.apache.dubbo-go.mockService", common.WithParamsValue(constant.CLUSTER_KEY, "mock")) + suburl, _ := common.NewURL(context.TODO(), "dubbo://127.0.0.1:20000/org.apache.dubbo-go.mockService", common.WithParamsValue(constant.CLUSTER_KEY, "mock"), common.WithParamsValue(constant.GROUP_KEY, "group"), common.WithParamsValue(constant.VERSION_KEY, "1.0.0")) url.SubURL = &suburl invoker := protocol.NewBaseInvoker(url) @@ -155,7 +155,7 @@ func TestOneRegAndProtoExporter(t *testing.T) { exporterNormal(t, regProtocol) url2, _ := common.NewURL(context.TODO(), "mock://127.0.0.1:1111") - suburl2, _ := common.NewURL(context.TODO(), "dubbo://127.0.0.1:20000//", common.WithParamsValue(constant.CLUSTER_KEY, "mock")) + suburl2, _ := common.NewURL(context.TODO(), "dubbo://127.0.0.1:20000/org.apache.dubbo-go.mockService", common.WithParamsValue(constant.CLUSTER_KEY, "mock"), common.WithParamsValue(constant.GROUP_KEY, "group"), common.WithParamsValue(constant.VERSION_KEY, "1.0.0")) url2.SubURL = &suburl2 invoker2 := protocol.NewBaseInvoker(url2) @@ -201,9 +201,7 @@ func TestDestry(t *testing.T) { func TestExportWithOverrideListener(t *testing.T) { extension.SetDefaultConfigurator(configurator.NewMockConfigurator) - ccUrl, _ := common.NewURL(context.TODO(), "mock://127.0.0.1:1111") - dc, _ := (&config_center.MockDynamicConfigurationFactory{}).GetDynamicConfiguration(&ccUrl) - common_cfg.GetEnvInstance().SetDynamicConfiguration(dc) + regProtocol := newRegistryProtocol() url := exporterNormal(t, regProtocol) var reg *registry.MockRegistry @@ -213,7 +211,7 @@ func TestExportWithOverrideListener(t *testing.T) { assert.Fail(t, "regProtocol.registries.Load can not be loaded") return } - overrideUrl, _ := common.NewURL(context.Background(), "override://0:0:0:0/org.apache.dubbo-go.mockService?cluster=mock1") + overrideUrl, _ := common.NewURL(context.Background(), "override://0:0:0:0/org.apache.dubbo-go.mockService?cluster=mock1&&group=group&&version=1.0.0") event := ®istry.ServiceEvent{Action: remoting.EventTypeAdd, Service: overrideUrl} reg.MockEvent(event) time.Sleep(1e9) @@ -222,3 +220,41 @@ func TestExportWithOverrideListener(t *testing.T) { v2, _ := regProtocol.bounds.Load(getCacheKey(newUrl)) assert.NotNil(t, v2) } + +func TestExportWithServiceConfig(t *testing.T) { + extension.SetDefaultConfigurator(configurator.NewMockConfigurator) + ccUrl, _ := common.NewURL(context.TODO(), "mock://127.0.0.1:1111") + dc, _ := (&config_center.MockDynamicConfigurationFactory{}).GetDynamicConfiguration(&ccUrl) + common_cfg.GetEnvInstance().SetDynamicConfiguration(dc) + regProtocol := newRegistryProtocol() + url := exporterNormal(t, regProtocol) + if _, loaded := regProtocol.registries.Load(url.Key()); !loaded { + assert.Fail(t, "regProtocol.registries.Load can not be loaded") + return + } + dc.(*config_center.MockDynamicConfiguration).MockServiceConfigEvent() + + newUrl := url.SubURL.Clone() + newUrl.Params.Set(constant.CLUSTER_KEY, "mock1") + v2, _ := regProtocol.bounds.Load(getCacheKey(newUrl)) + assert.NotNil(t, v2) +} + +func TestExportWithApplicationConfig(t *testing.T) { + extension.SetDefaultConfigurator(configurator.NewMockConfigurator) + ccUrl, _ := common.NewURL(context.TODO(), "mock://127.0.0.1:1111") + dc, _ := (&config_center.MockDynamicConfigurationFactory{}).GetDynamicConfiguration(&ccUrl) + common_cfg.GetEnvInstance().SetDynamicConfiguration(dc) + regProtocol := newRegistryProtocol() + url := exporterNormal(t, regProtocol) + if _, loaded := regProtocol.registries.Load(url.Key()); !loaded { + assert.Fail(t, "regProtocol.registries.Load can not be loaded") + return + } + dc.(*config_center.MockDynamicConfiguration).MockApplicationConfigEvent() + + newUrl := url.SubURL.Clone() + newUrl.Params.Set(constant.CLUSTER_KEY, "mock1") + v2, _ := regProtocol.bounds.Load(getCacheKey(newUrl)) + assert.NotNil(t, v2) +} diff --git a/registry/zookeeper/registry.go b/registry/zookeeper/registry.go index e86b7cfccad5fc03dfce56d2501af021a3cfe624..e066729d6a96f7ccd5053d32c6067bafa9c7165b 100644 --- a/registry/zookeeper/registry.go +++ b/registry/zookeeper/registry.go @@ -81,7 +81,6 @@ type zkRegistry struct { configListener *RegistryConfigurationListener //for provider zkPath map[string]int // key = protocol://ip:port/interface - } func newZkRegistry(url *common.URL) (registry.Registry, error) { @@ -459,8 +458,9 @@ func (r *zkRegistry) getListener(conf *common.URL) (*RegistryConfigurationListen //Interested register to dataconfig. r.dataListener.AddInterestedURL(conf) - - go r.listener.ListenServiceEvent(fmt.Sprintf("/dubbo/%s/"+conf.GetParam(constant.CATEGORY_KEY, constant.DEFAULT_CATEGORY), conf.Service()), r.dataListener) + for _, v := range strings.Split(conf.GetParam(constant.CATEGORY_KEY, constant.DEFAULT_CATEGORY), ",") { + go r.listener.ListenServiceEvent(fmt.Sprintf("/dubbo/%s/"+v, conf.Service()), r.dataListener) + } return zkListener, nil }