From 0d91dfc2529dde6d2af9c4228e963602cfe36bc8 Mon Sep 17 00:00:00 2001 From: "vito.he" <hxmhlt@163.com> Date: Tue, 30 Apr 2019 14:45:13 +0800 Subject: [PATCH] Add:multi registry support && URL modify --- cluster/directory/.gitkeep | 0 cluster/support/failover_cluster.go | 7 +--- cluster/support/failover_cluster_invoker.go | 2 +- cluster/support/registry_aware_cluster.go | 22 ++++++++++++ .../support/registry_aware_cluster_invoker.go | 35 +++++++++++++++++++ common/constant/key.go | 7 ++-- config/registry_url.go | 1 - config/support/reference_config.go | 16 ++++++--- config/support/service_config.go | 2 +- config/{config_url.go => url.go} | 4 +-- registry/directory/directory.go | 10 +++--- registry/protocol/protocol.go | 6 ++-- registry/registry.go | 2 +- registry/zookeeper/consumer.go | 10 +++--- registry/zookeeper/listener.go | 6 ++-- registry/zookeeper/registry.go | 4 +-- 16 files changed, 96 insertions(+), 38 deletions(-) delete mode 100644 cluster/directory/.gitkeep create mode 100644 cluster/support/registry_aware_cluster.go create mode 100644 cluster/support/registry_aware_cluster_invoker.go delete mode 100644 config/registry_url.go rename config/{config_url.go => url.go} (99%) diff --git a/cluster/directory/.gitkeep b/cluster/directory/.gitkeep deleted file mode 100644 index e69de29bb..000000000 diff --git a/cluster/support/failover_cluster.go b/cluster/support/failover_cluster.go index 3961e0551..de2df5ece 100644 --- a/cluster/support/failover_cluster.go +++ b/cluster/support/failover_cluster.go @@ -1,9 +1,5 @@ package cluster -import ( - "context" -) - import ( "github.com/dubbo/dubbo-go/cluster" "github.com/dubbo/dubbo-go/common/extension" @@ -11,7 +7,6 @@ import ( ) type FailoverCluster struct { - context context.Context } const name = "failover" @@ -25,5 +20,5 @@ func NewFailoverCluster() cluster.Cluster { } func (cluster *FailoverCluster) Join(directory cluster.Directory) protocol.Invoker { - return NewFailoverClusterInvoker(directory) + return newFailoverClusterInvoker(directory) } diff --git a/cluster/support/failover_cluster_invoker.go b/cluster/support/failover_cluster_invoker.go index bee3b0817..d7803a029 100644 --- a/cluster/support/failover_cluster_invoker.go +++ b/cluster/support/failover_cluster_invoker.go @@ -16,7 +16,7 @@ type failoverClusterInvoker struct { baseClusterInvoker } -func NewFailoverClusterInvoker(directory cluster.Directory) protocol.Invoker { +func newFailoverClusterInvoker(directory cluster.Directory) protocol.Invoker { return &failoverClusterInvoker{ baseClusterInvoker: newBaseClusterInvoker(directory), } diff --git a/cluster/support/registry_aware_cluster.go b/cluster/support/registry_aware_cluster.go new file mode 100644 index 000000000..37692f11d --- /dev/null +++ b/cluster/support/registry_aware_cluster.go @@ -0,0 +1,22 @@ +package cluster + +import ( + "github.com/dubbo/dubbo-go/cluster" + "github.com/dubbo/dubbo-go/common/extension" + "github.com/dubbo/dubbo-go/protocol" +) + +type RegistryAwareCluster struct { +} + +func init() { + extension.SetCluster("registryAware", NewRegistryAwareCluster) +} + +func NewRegistryAwareCluster() cluster.Cluster { + return &RegistryAwareCluster{} +} + +func (cluster *RegistryAwareCluster) Join(directory cluster.Directory) protocol.Invoker { + return newFailoverClusterInvoker(directory) +} diff --git a/cluster/support/registry_aware_cluster_invoker.go b/cluster/support/registry_aware_cluster_invoker.go new file mode 100644 index 000000000..4f5aa0523 --- /dev/null +++ b/cluster/support/registry_aware_cluster_invoker.go @@ -0,0 +1,35 @@ +package cluster + +import ( + "github.com/dubbo/dubbo-go/cluster" + "github.com/dubbo/dubbo-go/common/constant" + "github.com/dubbo/dubbo-go/protocol" +) + +type registryAwareClusterInvoker struct { + baseClusterInvoker +} + +func newRegistryAwareClusterInvoker(directory cluster.Directory) protocol.Invoker { + return ®istryAwareClusterInvoker{ + baseClusterInvoker: newBaseClusterInvoker(directory), + } +} + +func (invoker *registryAwareClusterInvoker) Invoke(invocation protocol.Invocation) protocol.Result { + invokers := invoker.directory.List(invocation) + //First, pick the invoker (XXXClusterInvoker) that comes from the local registry, distinguish by a 'default' key. + for _, invoker := range invokers { + if invoker.IsAvailable() && invoker.GetUrl().GetParam(constant.REGISTRY_DEFAULT_KEY, "false") == "true" { + return invoker.Invoke(invocation) + } + } + + //If none of the invokers has a local signal, pick the first one available. + for _, invoker := range invokers { + if invoker.IsAvailable() { + return invoker.Invoke(invocation) + } + } + return nil +} diff --git a/common/constant/key.go b/common/constant/key.go index 6ecfc1994..889f694e9 100644 --- a/common/constant/key.go +++ b/common/constant/key.go @@ -33,9 +33,10 @@ const ( ) const ( - REGISTRY_KEY = "registry" - REGISTRY_PROTOCOL = "registry" - ROLE_KEY = "registry.role" + REGISTRY_KEY = "registry" + REGISTRY_PROTOCOL = "registry" + ROLE_KEY = "registry.role" + REGISTRY_DEFAULT_KEY = "registry.default" ) const ( diff --git a/config/registry_url.go b/config/registry_url.go deleted file mode 100644 index d912156be..000000000 --- a/config/registry_url.go +++ /dev/null @@ -1 +0,0 @@ -package config diff --git a/config/support/reference_config.go b/config/support/reference_config.go index 9c278af91..4dc97e0a4 100644 --- a/config/support/reference_config.go +++ b/config/support/reference_config.go @@ -2,6 +2,7 @@ package support import ( "context" + "github.com/dubbo/dubbo-go/cluster/directory" "net/url" "strconv" "time" @@ -42,22 +43,27 @@ func NewReferenceConfig(ctx context.Context) *ReferenceConfig { } func (refconfig *ReferenceConfig) Refer() { - //首先是user specified URL, could be peer-to-peer address, or register center's address. + //首先是user specified SubURL, could be peer-to-peer address, or register center's address. - //其次是assemble URL from register center's configuration模式 - regUrls := loadRegistries(refconfig.Registries, consumerConfig.Registries,config.CONSUMER) + //其次是assemble SubURL from register center's configuration模式 + regUrls := loadRegistries(refconfig.Registries, consumerConfig.Registries, config.CONSUMER) url := config.NewURLWithOptions(refconfig.Interface, config.WithParams(refconfig.getUrlMap())) //set url to regUrls for _, regUrl := range regUrls { - regUrl.URL = *url + regUrl.SubURL = url } if len(regUrls) == 1 { refconfig.invoker = extension.GetProtocolExtension("registry").Refer(*regUrls[0]) } else { - //TODO:multi registries ,just wrap multi registry as registry cluster invoker including cluster invoker + invokers := []protocol.Invoker{} + for _, regUrl := range regUrls { + invokers = append(invokers, extension.GetProtocolExtension("registry").Refer(*regUrl)) + } + cluster := extension.GetCluster("registryAware") + refconfig.invoker = cluster.Join(directory.NewStaticDirectory(invokers)) } //create proxy attachments := map[string]string{} diff --git a/config/support/service_config.go b/config/support/service_config.go index be7074d8e..b6c99c92d 100644 --- a/config/support/service_config.go +++ b/config/support/service_config.go @@ -86,7 +86,7 @@ func (srvconfig *ServiceConfig) Export() error { config.WithParams(urlMap)) for _, regUrl := range regUrls { - regUrl.URL = *url + regUrl.SubURL = url invoker := protocol.NewBaseInvoker(regUrl) exporter := extension.GetProtocolExtension("registry").Export(invoker) srvconfig.exporters = append(srvconfig.exporters, exporter) diff --git a/config/config_url.go b/config/url.go similarity index 99% rename from config/config_url.go rename to config/url.go index e122a2e23..bc2b4e08b 100644 --- a/config/config_url.go +++ b/config/url.go @@ -60,7 +60,7 @@ type URL struct { Password string Methods []string //special for registry - URL URL + SubURL *URL } type option func(*URL) @@ -187,7 +187,7 @@ func (c *URL) URLEqual(url URL) bool { return true } -//func (c URL) String() string { +//func (c SubURL) String() string { // return fmt.Sprintf( // "DefaultServiceURL{Protocol:%s, Location:%s, Path:%s, Ip:%s, Port:%s, "+ // "Timeout:%s, Version:%s, Group:%s, Params:%+v}", diff --git a/registry/directory/directory.go b/registry/directory/directory.go index 521c8587d..32587c263 100644 --- a/registry/directory/directory.go +++ b/registry/directory/directory.go @@ -56,7 +56,7 @@ func NewRegistryDirectory(url *config.URL, registry registry.Registry, opts ...O BaseDirectory: directory.NewBaseDirectory(url), cacheInvokers: []protocol.Invoker{}, cacheInvokersMap: &sync.Map{}, - serviceType: url.URL.Service, + serviceType: url.SubURL.Service, registry: registry, Options: options, } @@ -157,7 +157,7 @@ func (dir *RegistryDirectory) toGroupInvokers(newInvokersMap *sync.Map) []protoc } else { for _, invokers := range groupInvokersMap { staticDir := directory.NewStaticDirectory(invokers) - cluster := extension.GetCluster(dir.GetUrl().URL.Params.Get(constant.CLUSTER_KEY)) + cluster := extension.GetCluster(dir.GetUrl().SubURL.Params.Get(constant.CLUSTER_KEY)) groupInvokersList = append(groupInvokersList, cluster.Join(staticDir)) } } @@ -174,7 +174,7 @@ func (dir *RegistryDirectory) uncacheInvoker(url config.URL) *sync.Map { func (dir *RegistryDirectory) cacheInvoker(url config.URL) *sync.Map { //check the url's protocol is equal to the protocol which is configured in reference config - referenceUrl := dir.GetUrl().URL + referenceUrl := dir.GetUrl().SubURL newCacheInvokers := dir.cacheInvokersMap if url.Protocol == referenceUrl.Protocol { url = mergeUrl(url, referenceUrl) @@ -206,7 +206,7 @@ func (dir *RegistryDirectory) Destroy() { // configuration > reference config >service config // in this function we should merge the reference local url config into the service url from registry. //TODO configuration merge, in the future , the configuration center's config should merge too. -func mergeUrl(serviceUrl config.URL, referenceUrl config.URL) config.URL { +func mergeUrl(serviceUrl config.URL, referenceUrl *config.URL) config.URL { mergedUrl := serviceUrl var methodConfigMergeFcn = []func(method string){} @@ -243,5 +243,5 @@ func mergeUrl(serviceUrl config.URL, referenceUrl config.URL) config.URL { } } - return serviceUrl + return mergedUrl } diff --git a/registry/protocol/protocol.go b/registry/protocol/protocol.go index 6132f08a6..6281156f4 100644 --- a/registry/protocol/protocol.go +++ b/registry/protocol/protocol.go @@ -46,7 +46,7 @@ func getRegistry(regUrl *config.URL) registry.Registry { } func (protocol *RegistryProtocol) Refer(url config.URL) protocol.Invoker { var regUrl = url - var serviceUrl = regUrl.URL + var serviceUrl = regUrl.SubURL var reg registry.Registry @@ -56,7 +56,7 @@ func (protocol *RegistryProtocol) Refer(url config.URL) protocol.Invoker { //new registry directory for store service url from registry directory := directory2.NewRegistryDirectory(®Url, reg) - go directory.Subscribe(serviceUrl) + go directory.Subscribe(*serviceUrl) //new cluster invoker cluster := extension.GetCluster(serviceUrl.Params.Get(constant.CLUSTER_KEY)) @@ -99,7 +99,7 @@ func (*RegistryProtocol) getRegistryUrl(invoker protocol.Invoker) config.URL { func (*RegistryProtocol) getProviderUrl(invoker protocol.Invoker) config.URL { url := invoker.GetUrl() - return url.URL + return *url.SubURL } func GetProtocol() protocol.Protocol { diff --git a/registry/registry.go b/registry/registry.go index 0bdb4a862..2674287df 100644 --- a/registry/registry.go +++ b/registry/registry.go @@ -15,7 +15,7 @@ type Registry interface { Subscribe(config.URL) (Listener, error) //input the serviceConfig , registry should return serviceUrlArray with multi location(provider nodes) available - //GetService(URL) ([]URL, error) + //GetService(SubURL) ([]SubURL, error) //close the registry for Elegant closing Close() //return if the registry is closed for consumer subscribing diff --git a/registry/zookeeper/consumer.go b/registry/zookeeper/consumer.go index 41bcef98f..16eddc627 100644 --- a/registry/zookeeper/consumer.go +++ b/registry/zookeeper/consumer.go @@ -10,14 +10,14 @@ import ( ) // name: service@protocol -//func (r *ZkRegistry) GetService(conf registry.ReferenceConfig) ([]config.URL, error) { +//func (r *ZkRegistry) GetService(conf registry.ReferenceConfig) ([]config.SubURL, error) { // // var ( // err error // dubboPath string // nodes []string // listener *zkEventListener -// serviceURL config.URL +// serviceURL config.SubURL // serviceConf registry.ReferenceConfig // ok bool // ) @@ -52,7 +52,7 @@ import ( // return nil, jerrors.Trace(err) // } // -// var listenerServiceMap = make(map[string]config.URL) +// var listenerServiceMap = make(map[string]config.SubURL) // for _, n := range nodes { // // serviceURL, err = plugins.DefaultServiceURL()(n) @@ -72,7 +72,7 @@ import ( // } // } // -// var services []config.URL +// var services []config.SubURL // for _, service := range listenerServiceMap { // services = append(services, service) // } @@ -114,7 +114,7 @@ func (r *ZkRegistry) getListener(conf config.URL) (*zkEventListener, error) { // listen r.cltLock.Lock() for _, svs := range r.services { - if svs.URLEqual(&conf) { + if svs.URLEqual(conf) { go zkListener.listenServiceEvent(svs) } } diff --git a/registry/zookeeper/listener.go b/registry/zookeeper/listener.go index b15151200..d8b7d1fae 100644 --- a/registry/zookeeper/listener.go +++ b/registry/zookeeper/listener.go @@ -120,7 +120,7 @@ func (l *zkEventListener) handleZkNodeEvent(zkPath string, children []string, co continue } if !conf.URLEqual(serviceURL) { - log.Warn("serviceURL{%s} is not compatible with URL{%#v}", serviceURL, conf) + log.Warn("serviceURL{%s} is not compatible with SubURL{%#v}", serviceURL, conf) continue } log.Info("add serviceURL{%s}", serviceURL) @@ -147,7 +147,7 @@ func (l *zkEventListener) handleZkNodeEvent(zkPath string, children []string, co log.Warn("delete zkPath{%s}", oldNode) &serviceURL, err = config.NewURL(context.TODO(), n) if !conf.URLEqual(serviceURL) { - log.Warn("serviceURL{%s} has been deleted is not compatible with URL{%#v}", serviceURL, conf) + log.Warn("serviceURL{%s} has been deleted is not compatible with SubURL{%#v}", serviceURL, conf) continue } log.Warn("delete serviceURL{%s}", serviceURL) @@ -263,7 +263,7 @@ func (l *zkEventListener) listenServiceEvent(conf config.URL) { continue } if !conf.URLEqual(serviceURL) { - log.Warn("serviceURL{%s} is not compatible with URL{%#v}", serviceURL, conf) + log.Warn("serviceURL{%s} is not compatible with SubURL{%#v}", serviceURL, conf) continue } log.Debug("add serviceUrl{%s}", serviceURL) diff --git a/registry/zookeeper/registry.go b/registry/zookeeper/registry.go index cf6688cff..99387c051 100644 --- a/registry/zookeeper/registry.go +++ b/registry/zookeeper/registry.go @@ -80,8 +80,8 @@ func NewZkRegistry(url *config.URL) (registry.Registry, error) { zkPath: make(map[string]int), } - //if r.URL.Name == "" { - // r.URL.Name = RegistryZkClient + //if r.SubURL.Name == "" { + // r.SubURL.Name = RegistryZkClient //} //if r.Version == "" { // r.Version = version.Version -- GitLab