diff --git a/cluster/directory/.gitkeep b/cluster/directory/.gitkeep deleted file mode 100644 index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..0000000000000000000000000000000000000000 diff --git a/cluster/support/failover_cluster.go b/cluster/support/failover_cluster.go index 3961e0551dcd5cb99e48d6e4e614b8ed85cdf3e5..de2df5ecece0a38a2acd659181ef37f96090ff41 100644 --- a/cluster/support/failover_cluster.go +++ b/cluster/support/failover_cluster.go @@ -1,9 +1,5 @@ package cluster -import ( - "context" -) - import ( "github.com/dubbo/dubbo-go/cluster" "github.com/dubbo/dubbo-go/common/extension" @@ -11,7 +7,6 @@ import ( ) type FailoverCluster struct { - context context.Context } const name = "failover" @@ -25,5 +20,5 @@ func NewFailoverCluster() cluster.Cluster { } func (cluster *FailoverCluster) Join(directory cluster.Directory) protocol.Invoker { - return NewFailoverClusterInvoker(directory) + return newFailoverClusterInvoker(directory) } diff --git a/cluster/support/failover_cluster_invoker.go b/cluster/support/failover_cluster_invoker.go index bee3b081751154473303c31ac9827fc6d7243533..d7803a0295eef2d9bf86126a810b0031dde37140 100644 --- a/cluster/support/failover_cluster_invoker.go +++ b/cluster/support/failover_cluster_invoker.go @@ -16,7 +16,7 @@ type failoverClusterInvoker struct { baseClusterInvoker } -func NewFailoverClusterInvoker(directory cluster.Directory) protocol.Invoker { +func newFailoverClusterInvoker(directory cluster.Directory) protocol.Invoker { return &failoverClusterInvoker{ baseClusterInvoker: newBaseClusterInvoker(directory), } diff --git a/cluster/support/registry_aware_cluster.go b/cluster/support/registry_aware_cluster.go new file mode 100644 index 0000000000000000000000000000000000000000..37692f11dd54e16b5da1c5d3e628b38dc4eebd3d --- /dev/null +++ b/cluster/support/registry_aware_cluster.go @@ -0,0 +1,22 @@ +package cluster + +import ( + "github.com/dubbo/dubbo-go/cluster" + "github.com/dubbo/dubbo-go/common/extension" + "github.com/dubbo/dubbo-go/protocol" +) + +type RegistryAwareCluster struct { +} + +func init() { + extension.SetCluster("registryAware", NewRegistryAwareCluster) +} + +func NewRegistryAwareCluster() cluster.Cluster { + return &RegistryAwareCluster{} +} + +func (cluster *RegistryAwareCluster) Join(directory cluster.Directory) protocol.Invoker { + return newFailoverClusterInvoker(directory) +} diff --git a/cluster/support/registry_aware_cluster_invoker.go b/cluster/support/registry_aware_cluster_invoker.go new file mode 100644 index 0000000000000000000000000000000000000000..4f5aa0523d4357b28748841bfc9581b05a972c94 --- /dev/null +++ b/cluster/support/registry_aware_cluster_invoker.go @@ -0,0 +1,35 @@ +package cluster + +import ( + "github.com/dubbo/dubbo-go/cluster" + "github.com/dubbo/dubbo-go/common/constant" + "github.com/dubbo/dubbo-go/protocol" +) + +type registryAwareClusterInvoker struct { + baseClusterInvoker +} + +func newRegistryAwareClusterInvoker(directory cluster.Directory) protocol.Invoker { + return ®istryAwareClusterInvoker{ + baseClusterInvoker: newBaseClusterInvoker(directory), + } +} + +func (invoker *registryAwareClusterInvoker) Invoke(invocation protocol.Invocation) protocol.Result { + invokers := invoker.directory.List(invocation) + //First, pick the invoker (XXXClusterInvoker) that comes from the local registry, distinguish by a 'default' key. + for _, invoker := range invokers { + if invoker.IsAvailable() && invoker.GetUrl().GetParam(constant.REGISTRY_DEFAULT_KEY, "false") == "true" { + return invoker.Invoke(invocation) + } + } + + //If none of the invokers has a local signal, pick the first one available. + for _, invoker := range invokers { + if invoker.IsAvailable() { + return invoker.Invoke(invocation) + } + } + return nil +} diff --git a/common/constant/key.go b/common/constant/key.go index 6ecfc1994529578bb21427d56782a6919526706e..889f694e958b352af800090066c7b02e80a0cadb 100644 --- a/common/constant/key.go +++ b/common/constant/key.go @@ -33,9 +33,10 @@ const ( ) const ( - REGISTRY_KEY = "registry" - REGISTRY_PROTOCOL = "registry" - ROLE_KEY = "registry.role" + REGISTRY_KEY = "registry" + REGISTRY_PROTOCOL = "registry" + ROLE_KEY = "registry.role" + REGISTRY_DEFAULT_KEY = "registry.default" ) const ( diff --git a/config/registry_url.go b/config/registry_url.go deleted file mode 100644 index d912156bec00a9f00850ab2ec3a3baf1016c2141..0000000000000000000000000000000000000000 --- a/config/registry_url.go +++ /dev/null @@ -1 +0,0 @@ -package config diff --git a/config/support/reference_config.go b/config/support/reference_config.go index 9c278af91875f1e81b522f6ca3db99ad2c1042cf..4dc97e0a475c4c525c470089b39bd8c4da50c953 100644 --- a/config/support/reference_config.go +++ b/config/support/reference_config.go @@ -2,6 +2,7 @@ package support import ( "context" + "github.com/dubbo/dubbo-go/cluster/directory" "net/url" "strconv" "time" @@ -42,22 +43,27 @@ func NewReferenceConfig(ctx context.Context) *ReferenceConfig { } func (refconfig *ReferenceConfig) Refer() { - //首先是user specified URL, could be peer-to-peer address, or register center's address. + //首先是user specified SubURL, could be peer-to-peer address, or register center's address. - //其次是assemble URL from register center's configuration模式 - regUrls := loadRegistries(refconfig.Registries, consumerConfig.Registries,config.CONSUMER) + //其次是assemble SubURL from register center's configuration模式 + regUrls := loadRegistries(refconfig.Registries, consumerConfig.Registries, config.CONSUMER) url := config.NewURLWithOptions(refconfig.Interface, config.WithParams(refconfig.getUrlMap())) //set url to regUrls for _, regUrl := range regUrls { - regUrl.URL = *url + regUrl.SubURL = url } if len(regUrls) == 1 { refconfig.invoker = extension.GetProtocolExtension("registry").Refer(*regUrls[0]) } else { - //TODO:multi registries ,just wrap multi registry as registry cluster invoker including cluster invoker + invokers := []protocol.Invoker{} + for _, regUrl := range regUrls { + invokers = append(invokers, extension.GetProtocolExtension("registry").Refer(*regUrl)) + } + cluster := extension.GetCluster("registryAware") + refconfig.invoker = cluster.Join(directory.NewStaticDirectory(invokers)) } //create proxy attachments := map[string]string{} diff --git a/config/support/service_config.go b/config/support/service_config.go index be7074d8e18281392ce75609a3c0f3f10e14c009..b6c99c92d0dbe3ef563942687c49dc0128b84e0e 100644 --- a/config/support/service_config.go +++ b/config/support/service_config.go @@ -86,7 +86,7 @@ func (srvconfig *ServiceConfig) Export() error { config.WithParams(urlMap)) for _, regUrl := range regUrls { - regUrl.URL = *url + regUrl.SubURL = url invoker := protocol.NewBaseInvoker(regUrl) exporter := extension.GetProtocolExtension("registry").Export(invoker) srvconfig.exporters = append(srvconfig.exporters, exporter) diff --git a/config/config_url.go b/config/url.go similarity index 99% rename from config/config_url.go rename to config/url.go index e122a2e23864d894860cbbc3b335a087c3497e37..bc2b4e08bf5e346b33a7cb859f88081ff750421d 100644 --- a/config/config_url.go +++ b/config/url.go @@ -60,7 +60,7 @@ type URL struct { Password string Methods []string //special for registry - URL URL + SubURL *URL } type option func(*URL) @@ -187,7 +187,7 @@ func (c *URL) URLEqual(url URL) bool { return true } -//func (c URL) String() string { +//func (c SubURL) String() string { // return fmt.Sprintf( // "DefaultServiceURL{Protocol:%s, Location:%s, Path:%s, Ip:%s, Port:%s, "+ // "Timeout:%s, Version:%s, Group:%s, Params:%+v}", diff --git a/registry/directory/directory.go b/registry/directory/directory.go index 521c8587d5e0447975ea5100323a62b7ec364c9f..32587c263672b016663bea51a92a434b2bc885ca 100644 --- a/registry/directory/directory.go +++ b/registry/directory/directory.go @@ -56,7 +56,7 @@ func NewRegistryDirectory(url *config.URL, registry registry.Registry, opts ...O BaseDirectory: directory.NewBaseDirectory(url), cacheInvokers: []protocol.Invoker{}, cacheInvokersMap: &sync.Map{}, - serviceType: url.URL.Service, + serviceType: url.SubURL.Service, registry: registry, Options: options, } @@ -157,7 +157,7 @@ func (dir *RegistryDirectory) toGroupInvokers(newInvokersMap *sync.Map) []protoc } else { for _, invokers := range groupInvokersMap { staticDir := directory.NewStaticDirectory(invokers) - cluster := extension.GetCluster(dir.GetUrl().URL.Params.Get(constant.CLUSTER_KEY)) + cluster := extension.GetCluster(dir.GetUrl().SubURL.Params.Get(constant.CLUSTER_KEY)) groupInvokersList = append(groupInvokersList, cluster.Join(staticDir)) } } @@ -174,7 +174,7 @@ func (dir *RegistryDirectory) uncacheInvoker(url config.URL) *sync.Map { func (dir *RegistryDirectory) cacheInvoker(url config.URL) *sync.Map { //check the url's protocol is equal to the protocol which is configured in reference config - referenceUrl := dir.GetUrl().URL + referenceUrl := dir.GetUrl().SubURL newCacheInvokers := dir.cacheInvokersMap if url.Protocol == referenceUrl.Protocol { url = mergeUrl(url, referenceUrl) @@ -206,7 +206,7 @@ func (dir *RegistryDirectory) Destroy() { // configuration > reference config >service config // in this function we should merge the reference local url config into the service url from registry. //TODO configuration merge, in the future , the configuration center's config should merge too. -func mergeUrl(serviceUrl config.URL, referenceUrl config.URL) config.URL { +func mergeUrl(serviceUrl config.URL, referenceUrl *config.URL) config.URL { mergedUrl := serviceUrl var methodConfigMergeFcn = []func(method string){} @@ -243,5 +243,5 @@ func mergeUrl(serviceUrl config.URL, referenceUrl config.URL) config.URL { } } - return serviceUrl + return mergedUrl } diff --git a/registry/protocol/protocol.go b/registry/protocol/protocol.go index 6132f08a6a87c717ddf17440a6df75d76940b519..6281156f4c24f526af60eb7ec663ea3e8d88757b 100644 --- a/registry/protocol/protocol.go +++ b/registry/protocol/protocol.go @@ -46,7 +46,7 @@ func getRegistry(regUrl *config.URL) registry.Registry { } func (protocol *RegistryProtocol) Refer(url config.URL) protocol.Invoker { var regUrl = url - var serviceUrl = regUrl.URL + var serviceUrl = regUrl.SubURL var reg registry.Registry @@ -56,7 +56,7 @@ func (protocol *RegistryProtocol) Refer(url config.URL) protocol.Invoker { //new registry directory for store service url from registry directory := directory2.NewRegistryDirectory(®Url, reg) - go directory.Subscribe(serviceUrl) + go directory.Subscribe(*serviceUrl) //new cluster invoker cluster := extension.GetCluster(serviceUrl.Params.Get(constant.CLUSTER_KEY)) @@ -99,7 +99,7 @@ func (*RegistryProtocol) getRegistryUrl(invoker protocol.Invoker) config.URL { func (*RegistryProtocol) getProviderUrl(invoker protocol.Invoker) config.URL { url := invoker.GetUrl() - return url.URL + return *url.SubURL } func GetProtocol() protocol.Protocol { diff --git a/registry/registry.go b/registry/registry.go index 0bdb4a8625319ab92debadc23fbe702918051986..2674287dfcccf6e33040eca7ca52bb9d721a77b5 100644 --- a/registry/registry.go +++ b/registry/registry.go @@ -15,7 +15,7 @@ type Registry interface { Subscribe(config.URL) (Listener, error) //input the serviceConfig , registry should return serviceUrlArray with multi location(provider nodes) available - //GetService(URL) ([]URL, error) + //GetService(SubURL) ([]SubURL, error) //close the registry for Elegant closing Close() //return if the registry is closed for consumer subscribing diff --git a/registry/zookeeper/consumer.go b/registry/zookeeper/consumer.go index 41bcef98fbc622c8eea1893d8956f62dfc79f345..16eddc627c7d2a7cdfcee96270ba493245641868 100644 --- a/registry/zookeeper/consumer.go +++ b/registry/zookeeper/consumer.go @@ -10,14 +10,14 @@ import ( ) // name: service@protocol -//func (r *ZkRegistry) GetService(conf registry.ReferenceConfig) ([]config.URL, error) { +//func (r *ZkRegistry) GetService(conf registry.ReferenceConfig) ([]config.SubURL, error) { // // var ( // err error // dubboPath string // nodes []string // listener *zkEventListener -// serviceURL config.URL +// serviceURL config.SubURL // serviceConf registry.ReferenceConfig // ok bool // ) @@ -52,7 +52,7 @@ import ( // return nil, jerrors.Trace(err) // } // -// var listenerServiceMap = make(map[string]config.URL) +// var listenerServiceMap = make(map[string]config.SubURL) // for _, n := range nodes { // // serviceURL, err = plugins.DefaultServiceURL()(n) @@ -72,7 +72,7 @@ import ( // } // } // -// var services []config.URL +// var services []config.SubURL // for _, service := range listenerServiceMap { // services = append(services, service) // } @@ -114,7 +114,7 @@ func (r *ZkRegistry) getListener(conf config.URL) (*zkEventListener, error) { // listen r.cltLock.Lock() for _, svs := range r.services { - if svs.URLEqual(&conf) { + if svs.URLEqual(conf) { go zkListener.listenServiceEvent(svs) } } diff --git a/registry/zookeeper/listener.go b/registry/zookeeper/listener.go index b15151200b4d2eae462483483269cab9f5484fe3..d8b7d1fae62ddc7925288b1fe8141d38aa63e00b 100644 --- a/registry/zookeeper/listener.go +++ b/registry/zookeeper/listener.go @@ -120,7 +120,7 @@ func (l *zkEventListener) handleZkNodeEvent(zkPath string, children []string, co continue } if !conf.URLEqual(serviceURL) { - log.Warn("serviceURL{%s} is not compatible with URL{%#v}", serviceURL, conf) + log.Warn("serviceURL{%s} is not compatible with SubURL{%#v}", serviceURL, conf) continue } log.Info("add serviceURL{%s}", serviceURL) @@ -147,7 +147,7 @@ func (l *zkEventListener) handleZkNodeEvent(zkPath string, children []string, co log.Warn("delete zkPath{%s}", oldNode) &serviceURL, err = config.NewURL(context.TODO(), n) if !conf.URLEqual(serviceURL) { - log.Warn("serviceURL{%s} has been deleted is not compatible with URL{%#v}", serviceURL, conf) + log.Warn("serviceURL{%s} has been deleted is not compatible with SubURL{%#v}", serviceURL, conf) continue } log.Warn("delete serviceURL{%s}", serviceURL) @@ -263,7 +263,7 @@ func (l *zkEventListener) listenServiceEvent(conf config.URL) { continue } if !conf.URLEqual(serviceURL) { - log.Warn("serviceURL{%s} is not compatible with URL{%#v}", serviceURL, conf) + log.Warn("serviceURL{%s} is not compatible with SubURL{%#v}", serviceURL, conf) continue } log.Debug("add serviceUrl{%s}", serviceURL) diff --git a/registry/zookeeper/registry.go b/registry/zookeeper/registry.go index cf6688cff56f294bf6572d6cad8e5e9a5722d823..99387c05133eb8893ebc3613894730445ba4292d 100644 --- a/registry/zookeeper/registry.go +++ b/registry/zookeeper/registry.go @@ -80,8 +80,8 @@ func NewZkRegistry(url *config.URL) (registry.Registry, error) { zkPath: make(map[string]int), } - //if r.URL.Name == "" { - // r.URL.Name = RegistryZkClient + //if r.SubURL.Name == "" { + // r.SubURL.Name = RegistryZkClient //} //if r.Version == "" { // r.Version = version.Version