* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package zookeeper
import (
gxset ""
gxpage ""
var (
// 16 would be enough. We won't use concurrentMap because in most cases, there are not race condition
instanceMap = make(map[string]registry.ServiceDiscovery, 16)
initLock sync.Mutex
// init will put the service discovery into extension
func init() {
extension.SetServiceDiscovery(constant.ZOOKEEPER_KEY, newZookeeperServiceDiscovery)
type zookeeperServiceDiscovery struct {
csd *curator_discovery.ServiceDiscovery
//listener *zookeeper.ZkEventListener
url *common.URL
wg sync.WaitGroup
cltLock sync.Mutex
done chan struct{}
rootPath string
listenNames []string
// newZookeeperServiceDiscovery the constructor of newZookeeperServiceDiscovery
func newZookeeperServiceDiscovery(name string) (registry.ServiceDiscovery, error) {
instance, ok := instanceMap[name]
if ok {
return instance, nil
defer initLock.Unlock()
// double check
instance, ok = instanceMap[name]
if ok {
return instance, nil
sdc, ok := config.GetBaseConfig().GetServiceDiscoveries(name)
if !ok || len(sdc.RemoteRef) == 0 {
return nil, perrors.New("could not init the instance because the config is invalid")
remoteConfig, ok := config.GetBaseConfig().GetRemoteConfig(sdc.RemoteRef)
if !ok {
return nil, perrors.New("could not find the remote config for name: " + sdc.RemoteRef)
rootPath := remoteConfig.GetParam("rootPath", "/services")
common.WithParamsValue(constant.REGISTRY_TIMEOUT_KEY, remoteConfig.TimeoutStr))
zksd := &zookeeperServiceDiscovery{
url: url,
zksd.WaitGroup().Add(1) //zk client start successful, then wg +1
zksd.csd = curator_discovery.NewServiceDiscovery(zksd.client, rootPath)
func (zksd *zookeeperServiceDiscovery) ZkClient() *gxzookeeper.ZookeeperClient {
func (zksd *zookeeperServiceDiscovery) SetZkClient(client *gxzookeeper.ZookeeperClient) {
func (zksd *zookeeperServiceDiscovery) ZkClientLock() *sync.Mutex {
func (zksd *zookeeperServiceDiscovery) WaitGroup() *sync.WaitGroup {
func (zksd *zookeeperServiceDiscovery) Done() chan struct{} {
// RestartCallBack when zookeeper connection reconnect this function will be invoked.
// try to re-register service, and listen services
func (zksd *zookeeperServiceDiscovery) RestartCallBack() bool {
defer zksd.listenLock.Unlock()
for _, name := range zksd.listenNames {
zksd.csd.ListenServiceEvent(name, zksd)
func (zksd *zookeeperServiceDiscovery) GetURL() *common.URL {
func (zksd *zookeeperServiceDiscovery) String() string {
return fmt.Sprintf("zookeeper-service-discovery[%s]", zksd.url)
// Close client be closed
func (zksd *zookeeperServiceDiscovery) Destroy() error {
// Register will register service in zookeeper, instance convert to curator's service instance
// which define in curator-x-discovery.
func (zksd *zookeeperServiceDiscovery) Register(instance registry.ServiceInstance) error {
cris := zksd.toCuratorInstance(instance)
return zksd.csd.RegisterService(cris)
// Register will update service in zookeeper, instance convert to curator's service instance
// which define in curator-x-discovery, please refer to
func (zksd *zookeeperServiceDiscovery) Update(instance registry.ServiceInstance) error {
cris := zksd.toCuratorInstance(instance)
return zksd.csd.UpdateService(cris)
// Unregister will unregister the instance in zookeeper
func (zksd *zookeeperServiceDiscovery) Unregister(instance registry.ServiceInstance) error {
cris := zksd.toCuratorInstance(instance)
return zksd.csd.UnregisterService(cris)
// GetDefaultPageSize will return the constant registry.DefaultPageSize
func (zksd *zookeeperServiceDiscovery) GetDefaultPageSize() int {
// GetServices will return the all services in zookeeper
func (zksd *zookeeperServiceDiscovery) GetServices() *gxset.HashSet {
services, err := zksd.csd.QueryForNames()
res := gxset.NewSet()
if err != nil {
logger.Errorf("[zkServiceDiscovery] Could not query the services: %v", err)
return res
for _, service := range services {
return res
// GetInstances will return the instances in a service
func (zksd *zookeeperServiceDiscovery) GetInstances(serviceName string) []registry.ServiceInstance {
criss, err := zksd.csd.QueryForInstances(serviceName)
if err != nil {
logger.Errorf("[zkServiceDiscovery] Could not query the instances for service{%s}, error = err{%v} ",
serviceName, err)
iss := make([]registry.ServiceInstance, 0, len(criss))
for _, cris := range criss {
iss = append(iss, zksd.toZookeeperInstance(cris))
return iss
// GetInstancesByPage will return the instances
func (zksd *zookeeperServiceDiscovery) GetInstancesByPage(serviceName string, offset int, pageSize int) gxpage.Pager {
all := zksd.GetInstances(serviceName)
res := make([]interface{}, 0, pageSize)
// could not use res = all[a:b] here because the res should be []interface{}, not []ServiceInstance
for i := offset; i < len(all) && i < offset+pageSize; i++ {
res = append(res, all[i])
return gxpage.NewPage(offset, pageSize, res, len(all))
// GetHealthyInstancesByPage will return the instance
// In zookeeper, all service instance's is healthy.
// However, the healthy parameter in this method maybe false. So we can not use that API.
// Thus, we must query all instances and then do filter
func (zksd *zookeeperServiceDiscovery) GetHealthyInstancesByPage(serviceName string, offset int, pageSize int, healthy bool) gxpage.Pager {
all := zksd.GetInstances(serviceName)
res := make([]interface{}, 0, pageSize)
// could not use res = all[a:b] here because the res should be []interface{}, not []ServiceInstance
var (
i = offset
count = 0
for i < len(all) && count < pageSize {
ins := all[i]
if ins.IsHealthy() == healthy {
res = append(res, all[i])
return gxpage.NewPage(offset, pageSize, res, len(all))
// GetRequestInstances will return the instances
func (zksd *zookeeperServiceDiscovery) GetRequestInstances(serviceNames []string, offset int, requestedSize int) map[string]gxpage.Pager {
res := make(map[string]gxpage.Pager, len(serviceNames))
for _, name := range serviceNames {
res[name] = zksd.GetInstancesByPage(name, offset, requestedSize)
return res
// AddListener ListenServiceEvent will add a data listener in service
func (zksd *zookeeperServiceDiscovery) AddListener(listener *registry.ServiceInstancesChangedListener) error {
defer zksd.listenLock.Unlock()
zksd.listenNames = append(zksd.listenNames, listener.ServiceName)
zksd.csd.ListenServiceEvent(listener.ServiceName, zksd)
func (zksd *zookeeperServiceDiscovery) DispatchEventByServiceName(serviceName string) error {
return zksd.DispatchEventForInstances(serviceName, zksd.GetInstances(serviceName))
// DispatchEventForInstances dispatch ServiceInstancesChangedEvent
func (zksd *zookeeperServiceDiscovery) DispatchEventForInstances(serviceName string, instances []registry.ServiceInstance) error {
return zksd.DispatchEvent(registry.NewServiceInstancesChangedEvent(serviceName, instances))
func (zksd *zookeeperServiceDiscovery) DispatchEvent(event *registry.ServiceInstancesChangedEvent) error {
return nil
// DataChange implement DataListener's DataChange function
// to resolve event to do DispatchEventByServiceName
func (zksd *zookeeperServiceDiscovery) DataChange(eventType remoting.Event) bool {
path := strings.TrimPrefix(eventType.Path, zksd.rootPath)
// get service name in zk path
serviceName := strings.Split(path, constant.PATH_SEPARATOR)[0]
err := zksd.DispatchEventByServiceName(serviceName)
logger.Errorf("[zkServiceDiscovery] DispatchEventByServiceName{%s} error = err{%v}", serviceName, err)
// toCuratorInstance convert to curator's service instance
func (zksd *zookeeperServiceDiscovery) toCuratorInstance(instance registry.ServiceInstance) *curator_discovery.ServiceInstance {
id := instance.GetHost() + ":" + strconv.Itoa(instance.GetPort())
pl["id"] = id
pl["name"] = instance.GetServiceName()
pl["metadata"] = instance.GetMetadata()
cuis := &curator_discovery.ServiceInstance{
Name: instance.GetServiceName(),
Id: id,
Address: instance.GetHost(),
Port: instance.GetPort(),
Payload: pl,
RegistrationTimeUTC: 0,
return cuis
// toZookeeperInstance convert to registry's service instance
func (zksd *zookeeperServiceDiscovery) toZookeeperInstance(cris *curator_discovery.ServiceInstance) registry.ServiceInstance {
pl, ok := cris.Payload.(map[string]interface{})
if !ok {
logger.Errorf("[zkServiceDiscovery] toZookeeperInstance{%s} payload is not map[string]interface{}", cris.Id)
return nil
mdi, ok := pl["metadata"].(map[string]interface{})
if !ok {
logger.Errorf("[zkServiceDiscovery] toZookeeperInstance{%s} metadata is not map[string]interface{}", cris.Id)
return nil
md := make(map[string]string, len(mdi))
for k, v := range mdi {
md[k] = fmt.Sprint(v)