From c2f293eb6e6df800c4d0f09fa5948d1a9a918d89 Mon Sep 17 00:00:00 2001 From: pantianying <601666418@qq.com> Date: Mon, 25 Nov 2019 11:24:42 +0800 Subject: [PATCH] update --- registry/zookeeper/listener.go | 1 - registry/zookeeper/registry.go | 26 ++++++++++++-------------- 2 files changed, 12 insertions(+), 15 deletions(-) diff --git a/registry/zookeeper/listener.go b/registry/zookeeper/listener.go index c25028d58..777c460f5 100644 --- a/registry/zookeeper/listener.go +++ b/registry/zookeeper/listener.go @@ -77,7 +77,6 @@ type RegistryConfigurationListener struct { } func NewRegistryConfigurationListener(client *zk.ZookeeperClient, reg *zkRegistry) *RegistryConfigurationListener { - reg.wg.Add(1) return &RegistryConfigurationListener{client: client, registry: reg, events: make(chan *config_center.ConfigChangeEvent, 32)} } func (l *RegistryConfigurationListener) Process(configType *config_center.ConfigChangeEvent) { diff --git a/registry/zookeeper/registry.go b/registry/zookeeper/registry.go index 31f1dc3a8..c0e3ca6b6 100644 --- a/registry/zookeeper/registry.go +++ b/registry/zookeeper/registry.go @@ -46,6 +46,8 @@ import ( const ( RegistryZkClient = "zk registry" RegistryConnDelay = 3 + + MaxWaitInterval = time.Duration(3e9) ) var ( @@ -403,6 +405,13 @@ func (r *zkRegistry) registerTempZookeeperNode(root string, node string) error { func (r *zkRegistry) subscribe(conf *common.URL) (registry.Listener, error) { return r.getListener(conf) } +func sleepWait(n int) { + wait := time.Duration((n + 1) * 2e8) + if wait > MaxWaitInterval { + wait = MaxWaitInterval + } + time.Sleep(wait) +} //subscribe from registry func (r *zkRegistry) Subscribe(url *common.URL, notifyListener registry.NotifyListener) { @@ -424,13 +433,10 @@ func (r *zkRegistry) Subscribe(url *common.URL, notifyListener registry.NotifyLi time.Sleep(time.Duration(RegistryConnDelay) * time.Second) continue } - for i := 0; ; i++ { + r.wg.Add(1) + for { if serviceEvent, err := listener.Next(); err != nil { - logger.Warnf("Selector.watch() = error{%v}", perrors.WithStack(err)) - if i == 0 { - listener.Close() - break - } + listener.Close() break } else { logger.Infof("update begin, service event: %v", serviceEvent.String()) @@ -438,7 +444,6 @@ func (r *zkRegistry) Subscribe(url *common.URL, notifyListener registry.NotifyLi } } - logger.Infof("wait for get subscribe listener, key{%v}", url.Key()) sleepWait(n) } } @@ -494,10 +499,3 @@ func (r *zkRegistry) IsAvailable() bool { return true } } -func sleepWait(n int) { - wait := time.Duration(200*n) * time.Millisecond - if wait > 3*time.Second { - wait = 3 * time.Second - } - time.Sleep(wait) -} -- GitLab