diff --git a/registry/zookeeper/service_discovery.go b/registry/zookeeper/service_discovery.go index 1e827340dd9a31d0abae688c1761ec088f7011fb..a126e39fb3be914b0840918594ab1cda65ba3bdf 100644 --- a/registry/zookeeper/service_discovery.go +++ b/registry/zookeeper/service_discovery.go @@ -72,6 +72,7 @@ type zookeeperServiceDiscovery struct { listenNames []string } +// newZookeeperServiceDiscovery the constructor of newZookeeperServiceDiscovery func newZookeeperServiceDiscovery(name string) (registry.ServiceDiscovery, error) { instance, ok := instanceMap[name] if ok { @@ -115,66 +116,85 @@ func newZookeeperServiceDiscovery(name string) (registry.ServiceDiscovery, error return zksd, nil } +// nolint func (zksd *zookeeperServiceDiscovery) ZkClient() *zookeeper.ZookeeperClient { return zksd.client } +// nolint func (zksd *zookeeperServiceDiscovery) SetZkClient(client *zookeeper.ZookeeperClient) { zksd.client = client } +// nolint func (zksd *zookeeperServiceDiscovery) ZkClientLock() *sync.Mutex { return &zksd.cltLock } +// nolint func (zksd *zookeeperServiceDiscovery) WaitGroup() *sync.WaitGroup { return &zksd.wg } +// nolint func (zksd *zookeeperServiceDiscovery) Done() chan struct{} { return zksd.done } +// RestartCallBack when zookeeper connection reconnect this function will be invoked. +// try to re-register service, and listen services func (zksd *zookeeperServiceDiscovery) RestartCallBack() bool { - zksd.csd.ReRegisterService() + zksd.csd.ReRegisterServices() + zksd.listenLock.Lock() + defer zksd.listenLock.Unlock() for _, name := range zksd.listenNames { zksd.csd.ListenServiceEvent(name, zksd) } return true } +// nolint func (zksd *zookeeperServiceDiscovery) GetUrl() common.URL { return *zksd.url } +// nolint func (zksd *zookeeperServiceDiscovery) String() string { return fmt.Sprintf("zookeeper-service-discovery[%s]", zksd.url) } +// Close client be closed func (zksd *zookeeperServiceDiscovery) Destroy() error { zksd.client.Close() return nil } +// Register will register service in zookeeper, instance convert to curator's service instance +// which define in curator-x-discovery. func (zksd *zookeeperServiceDiscovery) Register(instance registry.ServiceInstance) error { cris := zksd.toCuratorInstance(instance) return zksd.csd.RegisterService(cris) } +// Register will update service in zookeeper, instance convert to curator's service instance +// which define in curator-x-discovery, please refer to https://github.com/apache/curator. func (zksd *zookeeperServiceDiscovery) Update(instance registry.ServiceInstance) error { cris := zksd.toCuratorInstance(instance) return zksd.csd.UpdateService(cris) } +// Unregister will unregister the instance in zookeeper func (zksd *zookeeperServiceDiscovery) Unregister(instance registry.ServiceInstance) error { cris := zksd.toCuratorInstance(instance) return zksd.csd.UnregisterService(cris) } +// GetDefaultPageSize will return the constant registry.DefaultPageSize func (zksd *zookeeperServiceDiscovery) GetDefaultPageSize() int { return registry.DefaultPageSize } +// GetServices will return the all services in zookeeper func (zksd *zookeeperServiceDiscovery) GetServices() *gxset.HashSet { services, err := zksd.csd.QueryForNames() res := gxset.NewSet() @@ -188,6 +208,7 @@ func (zksd *zookeeperServiceDiscovery) GetServices() *gxset.HashSet { return res } +// GetInstances will return the instances in a service func (zksd *zookeeperServiceDiscovery) GetInstances(serviceName string) []registry.ServiceInstance { criss, err := zksd.csd.QueryForInstances(serviceName) if err != nil { @@ -202,6 +223,7 @@ func (zksd *zookeeperServiceDiscovery) GetInstances(serviceName string) []regist return iss } +// GetInstancesByPage will return the instances func (zksd *zookeeperServiceDiscovery) GetInstancesByPage(serviceName string, offset int, pageSize int) gxpage.Pager { all := zksd.GetInstances(serviceName) res := make([]interface{}, 0, pageSize) @@ -212,6 +234,10 @@ func (zksd *zookeeperServiceDiscovery) GetInstancesByPage(serviceName string, of return gxpage.New(offset, pageSize, res, len(all)) } +// GetHealthyInstancesByPage will return the instance +// In zookeeper, all service instance's is healthy. +// However, the healthy parameter in this method maybe false. So we can not use that API. +// Thus, we must query all instances and then do filter func (zksd *zookeeperServiceDiscovery) GetHealthyInstancesByPage(serviceName string, offset int, pageSize int, healthy bool) gxpage.Pager { all := zksd.GetInstances(serviceName) res := make([]interface{}, 0, pageSize) @@ -231,6 +257,7 @@ func (zksd *zookeeperServiceDiscovery) GetHealthyInstancesByPage(serviceName str return gxpage.New(offset, pageSize, res, len(all)) } +// GetRequestInstances will return the instances func (zksd *zookeeperServiceDiscovery) GetRequestInstances(serviceNames []string, offset int, requestedSize int) map[string]gxpage.Pager { res := make(map[string]gxpage.Pager, len(serviceNames)) for _, name := range serviceNames { @@ -239,6 +266,7 @@ func (zksd *zookeeperServiceDiscovery) GetRequestInstances(serviceNames []string return res } +// AddListener ListenServiceEvent will add a data listener in service func (zksd *zookeeperServiceDiscovery) AddListener(listener *registry.ServiceInstancesChangedListener) error { zksd.listenLock.Lock() defer zksd.listenLock.Unlock() @@ -251,26 +279,32 @@ func (zksd *zookeeperServiceDiscovery) DispatchEventByServiceName(serviceName st return zksd.DispatchEventForInstances(serviceName, zksd.GetInstances(serviceName)) } +// DispatchEventForInstances dispatch ServiceInstancesChangedEvent func (zksd *zookeeperServiceDiscovery) DispatchEventForInstances(serviceName string, instances []registry.ServiceInstance) error { return zksd.DispatchEvent(registry.NewServiceInstancesChangedEvent(serviceName, instances)) } +// nolint func (zksd *zookeeperServiceDiscovery) DispatchEvent(event *registry.ServiceInstancesChangedEvent) error { extension.GetGlobalDispatcher().Dispatch(event) return nil } +// DataChange implement DataListener's DataChange function +// to resolve event to do DispatchEventByServiceName func (zksd *zookeeperServiceDiscovery) DataChange(eventType remoting.Event) bool { path := strings.TrimPrefix(eventType.Path, zksd.rootPath) path = strings.TrimPrefix(eventType.Path, constant.PATH_SEPARATOR) - name := strings.Split(path, constant.PATH_SEPARATOR)[0] - err := zksd.DispatchEventByServiceName(name) + // get service name in zk path + serviceName := strings.Split(path, constant.PATH_SEPARATOR)[0] + err := zksd.DispatchEventByServiceName(serviceName) if err != nil { - logger.Errorf("[zkServiceDiscovery] DispatchEventByServiceName{%s} error = err{%v}", name, err) + logger.Errorf("[zkServiceDiscovery] DispatchEventByServiceName{%s} error = err{%v}", serviceName, err) } return true } +// toCuratorInstance convert to curator's service instance func (zksd *zookeeperServiceDiscovery) toCuratorInstance(instance registry.ServiceInstance) *curator_discovery.ServiceInstance { id := instance.GetHost() + ":" + strconv.Itoa(instance.GetPort()) pl := make(map[string]interface{}) @@ -288,6 +322,7 @@ func (zksd *zookeeperServiceDiscovery) toCuratorInstance(instance registry.Servi return cuis } +// toZookeeperInstance convert to registry's service instance func (zksd *zookeeperServiceDiscovery) toZookeeperInstance(cris *curator_discovery.ServiceInstance) registry.ServiceInstance { pl, ok := cris.Payload.(map[string]interface{}) if !ok { diff --git a/remoting/zookeeper/curator_discovery/service_discovery.go b/remoting/zookeeper/curator_discovery/service_discovery.go index 19df0711254a5fef30de329cc057913b5956729f..3ece95b91729d4843a47e113f6e40fd5571fd729 100644 --- a/remoting/zookeeper/curator_discovery/service_discovery.go +++ b/remoting/zookeeper/curator_discovery/service_discovery.go @@ -35,11 +35,14 @@ import ( "github.com/apache/dubbo-go/remoting/zookeeper" ) +// Entry contain a service instance type Entry struct { sync.Mutex instance *ServiceInstance } +// ServiceInstance which define in curator-x-discovery, please refer to +// https://github.com/apache/curator/blob/master/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceDiscovery.java type ServiceDiscovery struct { client *zookeeper.ZookeeperClient mutex *sync.Mutex @@ -48,6 +51,7 @@ type ServiceDiscovery struct { listener *zookeeper.ZkEventListener } +// NewServiceDiscovery the constructor of service discovery func NewServiceDiscovery(client *zookeeper.ZookeeperClient, basePath string) *ServiceDiscovery { return &ServiceDiscovery{ client: client, @@ -58,6 +62,7 @@ func NewServiceDiscovery(client *zookeeper.ZookeeperClient, basePath string) *Se } } +// registerService register service to zookeeper func (sd *ServiceDiscovery) registerService(instance *ServiceInstance) error { path := sd.pathForInstance(instance.Name, instance.Id) data, err := json.Marshal(instance) @@ -71,6 +76,7 @@ func (sd *ServiceDiscovery) registerService(instance *ServiceInstance) error { return nil } +// RegisterService register service to zookeeper, and ensure cache is consistent with zookeeper func (sd *ServiceDiscovery) RegisterService(instance *ServiceInstance) error { value, loaded := sd.services.LoadOrStore(instance.Id, &Entry{}) entry, ok := value.(*Entry) @@ -90,6 +96,7 @@ func (sd *ServiceDiscovery) RegisterService(instance *ServiceInstance) error { return nil } +// UpdateService update service in zookeeper, and ensure cache is consistent with zookeeper func (sd *ServiceDiscovery) UpdateService(instance *ServiceInstance) error { value, ok := sd.services.Load(instance.Id) if !ok { @@ -114,6 +121,7 @@ func (sd *ServiceDiscovery) UpdateService(instance *ServiceInstance) error { return nil } +// updateInternalService update service in cache func (sd *ServiceDiscovery) updateInternalService(name, id string) { value, ok := sd.services.Load(id) if !ok { @@ -123,46 +131,43 @@ func (sd *ServiceDiscovery) updateInternalService(name, id string) { if !ok { return } + entry.Lock() + defer entry.Unlock() instance, err := sd.QueryForInstance(name, id) if err != nil { logger.Infof("[zkServiceDiscovery] UpdateInternalService{%s} error = err{%v}", id, err) return } - entry.Lock() entry.instance = instance - entry.Unlock() return } +// UnregisterService un-register service in zookeeper and delete service in cache func (sd *ServiceDiscovery) UnregisterService(instance *ServiceInstance) error { - value, ok := sd.services.Load(instance.Id) + _, ok := sd.services.Load(instance.Id) if !ok { return nil } - entry, ok := value.(*Entry) - if !ok { - return perrors.New("[ServiceDiscovery] services value not entry") - } - entry.Lock() - entry.Unlock() sd.services.Delete(instance.Id) return sd.unregisterService(instance) } +// unregisterService un-register service in zookeeper func (sd *ServiceDiscovery) unregisterService(instance *ServiceInstance) error { path := sd.pathForInstance(instance.Name, instance.Id) return sd.client.Delete(path) } -func (sd *ServiceDiscovery) ReRegisterService() { +// ReRegisterServices re-register all cache services to zookeeper +func (sd *ServiceDiscovery) ReRegisterServices() { sd.services.Range(func(key, value interface{}) bool { entry, ok := value.(*Entry) if !ok { return true } entry.Lock() + defer entry.Unlock() instance := entry.instance - entry.Unlock() err := sd.registerService(instance) if err != nil { logger.Errorf("[zkServiceDiscovery] registerService{%s} error = err{%v}", instance.Id, perrors.WithStack(err)) @@ -173,6 +178,7 @@ func (sd *ServiceDiscovery) ReRegisterService() { }) } +// QueryForInstances query instances in zookeeper by name func (sd *ServiceDiscovery) QueryForInstances(name string) ([]*ServiceInstance, error) { ids, err := sd.client.GetChildren(sd.pathForName(name)) if err != nil { @@ -192,6 +198,7 @@ func (sd *ServiceDiscovery) QueryForInstances(name string) ([]*ServiceInstance, return instances, nil } +// QueryForInstance query instances in zookeeper by name and id func (sd *ServiceDiscovery) QueryForInstance(name string, id string) (*ServiceInstance, error) { path := sd.pathForInstance(name, id) data, _, err := sd.client.GetContent(path) @@ -206,18 +213,22 @@ func (sd *ServiceDiscovery) QueryForInstance(name string, id string) (*ServiceIn return instance, nil } +// QueryForInstance query all service name in zookeeper func (sd *ServiceDiscovery) QueryForNames() ([]string, error) { return sd.client.GetChildren(sd.basePath) } +// ListenServiceEvent add a listener in a service func (sd *ServiceDiscovery) ListenServiceEvent(name string, listener remoting.DataListener) { sd.listener.ListenServiceEvent(nil, sd.pathForName(name), listener) } +// ListenServiceEvent add a listener in a instance func (sd *ServiceDiscovery) ListenServiceInstanceEvent(name, id string, listener remoting.DataListener) { sd.listener.ListenServiceNodeEvent(sd.pathForInstance(name, id), listener) } +// DataChange implement DataListener's DataChange function func (sd *ServiceDiscovery) DataChange(eventType remoting.Event) bool { path := eventType.Path name, id, err := sd.getNameAndId(path) @@ -229,6 +240,7 @@ func (sd *ServiceDiscovery) DataChange(eventType remoting.Event) bool { return true } +// getNameAndId get service name and instance id by path func (sd *ServiceDiscovery) getNameAndId(path string) (string, string, error) { path = strings.TrimPrefix(path, sd.basePath) path = strings.TrimPrefix(path, constant.PATH_SEPARATOR) @@ -241,10 +253,12 @@ func (sd *ServiceDiscovery) getNameAndId(path string) (string, string, error) { return name, id, nil } +// nolint func (sd *ServiceDiscovery) pathForInstance(name, id string) string { return path.Join(sd.basePath, name, id) } +// nolint func (sd *ServiceDiscovery) pathForName(name string) string { return path.Join(sd.basePath, name) } diff --git a/remoting/zookeeper/curator_discovery/service_instance.go b/remoting/zookeeper/curator_discovery/service_instance.go index 1ba7a16f9a805c243be94462adfb33169dd01e1b..f8d2bc723e0e0dd90ffdaa6ccd7c9908d65ac9a0 100644 --- a/remoting/zookeeper/curator_discovery/service_instance.go +++ b/remoting/zookeeper/curator_discovery/service_instance.go @@ -17,6 +17,8 @@ package curator_discovery +// ServiceInstance which define in curator-x-discovery, please refer to +// https://github.com/apache/curator/blob/master/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceInstance.java type ServiceInstance struct { Name string Id string