Skip to content
Snippets Groups Projects
Unverified Commit 21d82629 authored by fangyincheng's avatar fangyincheng Committed by GitHub
Browse files

Merge pull request #258 from pantianying/develop_fix_zkbug

Fix: After disconnection with ZK registry, cosumer can't listen to provider changes
parents 4014c59d ec0209da
No related branches found
No related tags found
No related merge requests found
......@@ -74,11 +74,12 @@ type RegistryConfigurationListener struct {
client *zk.ZookeeperClient
registry *zkRegistry
events chan *config_center.ConfigChangeEvent
isClosed bool
}
func NewRegistryConfigurationListener(client *zk.ZookeeperClient, reg *zkRegistry) *RegistryConfigurationListener {
reg.wg.Add(1)
return &RegistryConfigurationListener{client: client, registry: reg, events: make(chan *config_center.ConfigChangeEvent, 32)}
return &RegistryConfigurationListener{client: client, registry: reg, events: make(chan *config_center.ConfigChangeEvent, 32), isClosed: false}
}
func (l *RegistryConfigurationListener) Process(configType *config_center.ConfigChangeEvent) {
l.events <- configType
......@@ -114,6 +115,7 @@ func (l *RegistryConfigurationListener) Close() {
* if the registry is not available, it means that the registry has been destroy
* so we don't need to call Done(), or it will cause the negative count panic for registry.wg
*/
l.isClosed = true
l.registry.wg.Done()
}
}
......
......@@ -46,6 +46,7 @@ import (
const (
RegistryZkClient = "zk registry"
RegistryConnDelay = 3
MaxWaitInterval = time.Duration(3e9)
)
var (
......@@ -200,6 +201,10 @@ func (r *zkRegistry) RestartCallBack() bool {
}
logger.Infof("success to re-register service :%v", confIf.Key())
}
r.listener = zookeeper.NewZkEventListener(r.client)
r.configListener = NewRegistryConfigurationListener(r.client, r)
r.dataListener = NewRegistryDataListener(r.configListener)
return flag
}
......@@ -399,10 +404,19 @@ func (r *zkRegistry) registerTempZookeeperNode(root string, node string) error {
func (r *zkRegistry) subscribe(conf *common.URL) (registry.Listener, error) {
return r.getListener(conf)
}
func sleepWait(n int) {
wait := time.Duration((n + 1) * 2e8)
if wait > MaxWaitInterval {
wait = MaxWaitInterval
}
time.Sleep(wait)
}
//subscribe from registry
func (r *zkRegistry) Subscribe(url *common.URL, notifyListener registry.NotifyListener) {
n := 0
for {
n++
if !r.IsAvailable() {
logger.Warnf("event listener game over.")
return
......@@ -423,14 +437,14 @@ func (r *zkRegistry) Subscribe(url *common.URL, notifyListener registry.NotifyLi
if serviceEvent, err := listener.Next(); err != nil {
logger.Warnf("Selector.watch() = error{%v}", perrors.WithStack(err))
listener.Close()
return
break
} else {
logger.Infof("update begin, service event: %v", serviceEvent.String())
notifyListener.Notify(serviceEvent)
}
}
sleepWait(n)
}
}
......@@ -440,6 +454,10 @@ func (r *zkRegistry) getListener(conf *common.URL) (*RegistryConfigurationListen
)
r.listenerLock.Lock()
if r.configListener.isClosed {
r.listenerLock.Unlock()
return nil, perrors.New("configListener already been closed")
}
zkListener = r.configListener
r.listenerLock.Unlock()
if r.listener == nil {
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment