Skip to content
Snippets Groups Projects
Unverified Commit dcbd9679 authored by Xin.Zh's avatar Xin.Zh Committed by GitHub
Browse files

Merge pull request #210 from dubbo-x/url

Imp: skip subscribe on provider side when using consul
parents 41405b24 71d53f12
No related branches found
No related tags found
No related merge requests found
...@@ -18,7 +18,7 @@ ...@@ -18,7 +18,7 @@
package constant package constant
const ( const (
CONF_CONSUMER_FILE_PATH = "CONF_CONSUMER_FILE_PATH" CONF_CONSUMER_FILE_PATH = "CONF_CONSUMER_FILE_PATH"
CONF_PROVIDER_FILE_PATH = "CONF_PROVIDER_FILE_PATH" CONF_PROVIDER_FILE_PATH = "CONF_PROVIDER_FILE_PATH"
APP_LOG_CONF_FILE string = "APP_LOG_CONF_FILE" APP_LOG_CONF_FILE = "APP_LOG_CONF_FILE"
) )
...@@ -54,6 +54,7 @@ func init() { ...@@ -54,6 +54,7 @@ func init() {
providerConfig = nil providerConfig = nil
} }
} }
func checkRegistries(registries map[string]*RegistryConfig, singleRegistry *RegistryConfig) { func checkRegistries(registries map[string]*RegistryConfig, singleRegistry *RegistryConfig) {
if len(registries) == 0 && singleRegistry != nil { if len(registries) == 0 && singleRegistry != nil {
registries[constant.DEFAULT_KEY] = singleRegistry registries[constant.DEFAULT_KEY] = singleRegistry
......
...@@ -111,31 +111,21 @@ func (r *consulRegistry) unregister(url common.URL) error { ...@@ -111,31 +111,21 @@ func (r *consulRegistry) unregister(url common.URL) error {
return r.client.Agent().ServiceDeregister(buildId(url)) return r.client.Agent().ServiceDeregister(buildId(url))
} }
func (r *consulRegistry) subscribe(url *common.URL) (registry.Listener, error) { func (r *consulRegistry) Subscribe(url *common.URL, notifyListener registry.NotifyListener) {
var (
listener registry.Listener
err error
)
role, _ := strconv.Atoi(r.URL.GetParam(constant.ROLE_KEY, "")) role, _ := strconv.Atoi(r.URL.GetParam(constant.ROLE_KEY, ""))
if role == common.CONSUMER { if role == common.CONSUMER {
listener, err = r.getListener(*url) r.subscribe(url, notifyListener)
if err != nil {
return nil, err
}
} }
return listener, nil
} }
//subscibe from registry func (r *consulRegistry) subscribe(url *common.URL, notifyListener registry.NotifyListener) {
func (r *consulRegistry) Subscribe(url *common.URL, notifyListener registry.NotifyListener) {
for { for {
if !r.IsAvailable() { if !r.IsAvailable() {
logger.Warnf("event listener game over.") logger.Warnf("event listener game over.")
return return
} }
listener, err := r.subscribe(url) listener, err := r.getListener(*url)
if err != nil { if err != nil {
if !r.IsAvailable() { if !r.IsAvailable() {
logger.Warnf("event listener game over.") logger.Warnf("event listener game over.")
...@@ -155,9 +145,7 @@ func (r *consulRegistry) Subscribe(url *common.URL, notifyListener registry.Noti ...@@ -155,9 +145,7 @@ func (r *consulRegistry) Subscribe(url *common.URL, notifyListener registry.Noti
logger.Infof("update begin, service event: %v", serviceEvent.String()) logger.Infof("update begin, service event: %v", serviceEvent.String())
notifyListener.Notify(serviceEvent) notifyListener.Notify(serviceEvent)
} }
} }
} }
} }
......
...@@ -51,7 +51,7 @@ func (suite *consulRegistryTestSuite) testUnregister() { ...@@ -51,7 +51,7 @@ func (suite *consulRegistryTestSuite) testUnregister() {
func (suite *consulRegistryTestSuite) testSubscribe() { func (suite *consulRegistryTestSuite) testSubscribe() {
consumerUrl := newConsumerUrl(consumerHost, consumerPort, service, protocol) consumerUrl := newConsumerUrl(consumerHost, consumerPort, service, protocol)
suite.consumerUrl = consumerUrl suite.consumerUrl = consumerUrl
listener, err := suite.consumerRegistry.subscribe(&consumerUrl) listener, err := suite.consumerRegistry.getListener(consumerUrl)
assert.NoError(suite.t, err) assert.NoError(suite.t, err)
suite.listener = listener suite.listener = listener
} }
...@@ -40,10 +40,6 @@ import ( ...@@ -40,10 +40,6 @@ import (
"github.com/apache/dubbo-go/registry" "github.com/apache/dubbo-go/registry"
) )
const (
RegistryConnDelay = 3
)
type Options struct { type Options struct {
serviceTTL time.Duration serviceTTL time.Duration
} }
...@@ -87,7 +83,7 @@ func NewRegistryDirectory(url *common.URL, registry registry.Registry, opts ...O ...@@ -87,7 +83,7 @@ func NewRegistryDirectory(url *common.URL, registry registry.Registry, opts ...O
return dir, nil return dir, nil
} }
//subscibe from registry //subscribe from registry
func (dir *registryDirectory) Subscribe(url *common.URL) { func (dir *registryDirectory) Subscribe(url *common.URL) {
dir.consumerConfigurationListener.addNotifyListener(dir) dir.consumerConfigurationListener.addNotifyListener(dir)
dir.referenceConfigurationListener = newReferenceConfigurationListener(dir, url) dir.referenceConfigurationListener = newReferenceConfigurationListener(dir, url)
...@@ -245,6 +241,7 @@ func (dir *registryDirectory) Destroy() { ...@@ -245,6 +241,7 @@ func (dir *registryDirectory) Destroy() {
dir.cacheInvokers = []protocol.Invoker{} dir.cacheInvokers = []protocol.Invoker{}
}) })
} }
func (dir *registryDirectory) overrideUrl(targetUrl *common.URL) { func (dir *registryDirectory) overrideUrl(targetUrl *common.URL) {
doOverrideUrl(dir.configurators, targetUrl) doOverrideUrl(dir.configurators, targetUrl)
doOverrideUrl(dir.consumerConfigurationListener.Configurators(), targetUrl) doOverrideUrl(dir.consumerConfigurationListener.Configurators(), targetUrl)
...@@ -293,9 +290,11 @@ func newConsumerConfigurationListener(dir *registryDirectory) *consumerConfigura ...@@ -293,9 +290,11 @@ func newConsumerConfigurationListener(dir *registryDirectory) *consumerConfigura
) )
return listener return listener
} }
func (l *consumerConfigurationListener) addNotifyListener(listener registry.NotifyListener) { func (l *consumerConfigurationListener) addNotifyListener(listener registry.NotifyListener) {
l.listeners = append(l.listeners, listener) l.listeners = append(l.listeners, listener)
} }
func (l *consumerConfigurationListener) Process(event *config_center.ConfigChangeEvent) { func (l *consumerConfigurationListener) Process(event *config_center.ConfigChangeEvent) {
l.BaseConfigurationListener.Process(event) l.BaseConfigurationListener.Process(event)
l.directory.refreshInvokers(nil) l.directory.refreshInvokers(nil)
......
...@@ -330,7 +330,7 @@ func (r *etcdV3Registry) subscribe(svc *common.URL) (registry.Listener, error) { ...@@ -330,7 +330,7 @@ func (r *etcdV3Registry) subscribe(svc *common.URL) (registry.Listener, error) {
return configListener, nil return configListener, nil
} }
//subscibe from registry //subscribe from registry
func (r *etcdV3Registry) Subscribe(url *common.URL, notifyListener registry.NotifyListener) { func (r *etcdV3Registry) Subscribe(url *common.URL, notifyListener registry.NotifyListener) {
for { for {
if !r.IsAvailable() { if !r.IsAvailable() {
......
...@@ -172,17 +172,17 @@ func (nr *nacosRegistry) subscribe(conf *common.URL) (registry.Listener, error) ...@@ -172,17 +172,17 @@ func (nr *nacosRegistry) subscribe(conf *common.URL) (registry.Listener, error)
return NewNacosListener(*conf, nr.namingClient) return NewNacosListener(*conf, nr.namingClient)
} }
//subscibe from registry //subscribe from registry
func (r *nacosRegistry) Subscribe(url *common.URL, notifyListener registry.NotifyListener) { func (nr *nacosRegistry) Subscribe(url *common.URL, notifyListener registry.NotifyListener) {
for { for {
if !r.IsAvailable() { if !nr.IsAvailable() {
logger.Warnf("event listener game over.") logger.Warnf("event listener game over.")
return return
} }
listener, err := r.subscribe(url) listener, err := nr.subscribe(url)
if err != nil { if err != nil {
if !r.IsAvailable() { if !nr.IsAvailable() {
logger.Warnf("event listener game over.") logger.Warnf("event listener game over.")
return return
} }
...@@ -205,6 +205,7 @@ func (r *nacosRegistry) Subscribe(url *common.URL, notifyListener registry.Notif ...@@ -205,6 +205,7 @@ func (r *nacosRegistry) Subscribe(url *common.URL, notifyListener registry.Notif
} }
} }
func (nr *nacosRegistry) GetUrl() common.URL { func (nr *nacosRegistry) GetUrl() common.URL {
return *nr.URL return *nr.URL
} }
......
...@@ -147,6 +147,7 @@ func newMockZkRegistry(url *common.URL, opts ...zookeeper.Option) (*zk.TestClust ...@@ -147,6 +147,7 @@ func newMockZkRegistry(url *common.URL, opts ...zookeeper.Option) (*zk.TestClust
return c, r, nil return c, r, nil
} }
func (r *zkRegistry) ZkClient() *zookeeper.ZookeeperClient { func (r *zkRegistry) ZkClient() *zookeeper.ZookeeperClient {
return r.client return r.client
} }
...@@ -399,7 +400,7 @@ func (r *zkRegistry) subscribe(conf *common.URL) (registry.Listener, error) { ...@@ -399,7 +400,7 @@ func (r *zkRegistry) subscribe(conf *common.URL) (registry.Listener, error) {
return r.getListener(conf) return r.getListener(conf)
} }
//subscibe from registry //subscribe from registry
func (r *zkRegistry) Subscribe(url *common.URL, notifyListener registry.NotifyListener) { func (r *zkRegistry) Subscribe(url *common.URL, notifyListener registry.NotifyListener) {
for { for {
if !r.IsAvailable() { if !r.IsAvailable() {
...@@ -432,6 +433,7 @@ func (r *zkRegistry) Subscribe(url *common.URL, notifyListener registry.NotifyLi ...@@ -432,6 +433,7 @@ func (r *zkRegistry) Subscribe(url *common.URL, notifyListener registry.NotifyLi
} }
} }
func (r *zkRegistry) getListener(conf *common.URL) (*RegistryConfigurationListener, error) { func (r *zkRegistry) getListener(conf *common.URL) (*RegistryConfigurationListener, error) {
var ( var (
zkListener *RegistryConfigurationListener zkListener *RegistryConfigurationListener
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment