diff --git a/registry/protocol/protocol.go b/registry/protocol/protocol.go index c8b019bd4cb8d88cb3247660b2cb38c587f09083..b47b9f372da8948d1add97c7b354503e18e23511 100644 --- a/registry/protocol/protocol.go +++ b/registry/protocol/protocol.go @@ -347,7 +347,7 @@ func setProviderUrl(regURL *common.URL, providerURL *common.URL) { regURL.SubURL = providerURL } -// GetProtocol ... +// GetProtocol return the singleton RegistryProtocol func GetProtocol() protocol.Protocol { once.Do(func() { regProtocol = newRegistryProtocol() diff --git a/registry/zookeeper/listener.go b/registry/zookeeper/listener.go index 45ffa31722fa5232b6b6e3c00250942e277f9abc..355e7c18f571d5a61c5233f10c2f9f07c8dc6d53 100644 --- a/registry/zookeeper/listener.go +++ b/registry/zookeeper/listener.go @@ -35,14 +35,14 @@ import ( zk "github.com/apache/dubbo-go/remoting/zookeeper" ) -// RegistryDataListener ... +// RegistryDataListener contains all URL information subscribed by zookeeper registry type RegistryDataListener struct { subscribed map[*common.URL]config_center.ConfigurationListener mutex sync.Mutex closed bool } -// NewRegistryDataListener ... +// NewRegistryDataListener constructs a new RegistryDataListener func NewRegistryDataListener() *RegistryDataListener { return &RegistryDataListener{ subscribed: make(map[*common.URL]config_center.ConfigurationListener)} @@ -58,7 +58,7 @@ func (l *RegistryDataListener) SubscribeURL(url *common.URL, listener config_cen l.subscribed[url] = listener } -// DataChange ... +// DataChange accepts all events sent from the zookeeper server and trigger the corresponding listener for processing func (l *RegistryDataListener) DataChange(eventType remoting.Event) bool { // Intercept the last bit index := strings.Index(eventType.Path, "/providers/") @@ -92,6 +92,7 @@ func (l *RegistryDataListener) DataChange(eventType remoting.Event) bool { return false } +// Close all RegistryConfigurationListener in subscribed func (l *RegistryDataListener) Close() { l.mutex.Lock() defer l.mutex.Unlock() @@ -100,7 +101,7 @@ func (l *RegistryDataListener) Close() { } } -// RegistryConfigurationListener ... +// RegistryConfigurationListener represent the processor of zookeeper watcher type RegistryConfigurationListener struct { client *zk.ZookeeperClient registry *zkRegistry @@ -115,12 +116,12 @@ func NewRegistryConfigurationListener(client *zk.ZookeeperClient, reg *zkRegistr return &RegistryConfigurationListener{client: client, registry: reg, events: make(chan *config_center.ConfigChangeEvent, 32), isClosed: false} } -// Process ... +// Process submit the ConfigChangeEvent to the event chan to notify all observer func (l *RegistryConfigurationListener) Process(configType *config_center.ConfigChangeEvent) { l.events <- configType } -// Next ... +// Next will observe the registry state and events chan func (l *RegistryConfigurationListener) Next() (*registry.ServiceEvent, error) { for { select { @@ -146,7 +147,7 @@ func (l *RegistryConfigurationListener) Next() (*registry.ServiceEvent, error) { } } -// Close ... +// Close RegistryConfigurationListener only once func (l *RegistryConfigurationListener) Close() { // ensure that the listener will be closed at most once. l.closeOnce.Do(func() { @@ -154,7 +155,7 @@ func (l *RegistryConfigurationListener) Close() { l.registry.WaitGroup().Done() }) } - +// valid return the true if the client conn isn't nil func (l *RegistryConfigurationListener) valid() bool { return l.client.ZkConnValid() } diff --git a/registry/zookeeper/registry.go b/registry/zookeeper/registry.go index 37cf20149dd9f5b8c18ffd855d045ae9e24a9770..9d74aa9d6ec477a379eeb64131c53215bbb93214 100644 --- a/registry/zookeeper/registry.go +++ b/registry/zookeeper/registry.go @@ -120,10 +120,16 @@ func newMockZkRegistry(url *common.URL, opts ...zookeeper.Option) (*zk.TestClust func (r *zkRegistry) InitListeners() { r.listener = zookeeper.NewZkEventListener(r.client) - recoverd := r.dataListener.subscribed newDataListener := NewRegistryDataListener() - for url, _ := range recoverd { - newDataListener.SubscribeURL(url, NewRegistryConfigurationListener(r.client, r)) + // should recover if dataListener isn't nil before + if r.dataListener != nil { + recoverd := r.dataListener.subscribed + if recoverd != nil && len(recoverd) > 0 { + // recover all subscribed url + for url, _ := range recoverd { + newDataListener.SubscribeURL(url, NewRegistryConfigurationListener(r.client, r)) + } + } } r.dataListener = newDataListener }