From ae1df23caaf97a0a70b718ca56bf19da94c2e3b9 Mon Sep 17 00:00:00 2001
From: Patrick <dreamlike.sky@foxmail.com>
Date: Sun, 28 Jun 2020 19:03:52 +0800
Subject: [PATCH] add comment and adjust service_discovery's critical area

---
 registry/zookeeper/service_discovery.go       | 43 +++++++++++++++++--
 .../curator_discovery/service_discovery.go    | 36 +++++++++++-----
 .../curator_discovery/service_instance.go     |  2 +
 3 files changed, 66 insertions(+), 15 deletions(-)

diff --git a/registry/zookeeper/service_discovery.go b/registry/zookeeper/service_discovery.go
index 1e827340d..a126e39fb 100644
--- a/registry/zookeeper/service_discovery.go
+++ b/registry/zookeeper/service_discovery.go
@@ -72,6 +72,7 @@ type zookeeperServiceDiscovery struct {
 	listenNames []string
 }
 
+// newZookeeperServiceDiscovery the constructor of newZookeeperServiceDiscovery
 func newZookeeperServiceDiscovery(name string) (registry.ServiceDiscovery, error) {
 	instance, ok := instanceMap[name]
 	if ok {
@@ -115,66 +116,85 @@ func newZookeeperServiceDiscovery(name string) (registry.ServiceDiscovery, error
 	return zksd, nil
 }
 
+// nolint
 func (zksd *zookeeperServiceDiscovery) ZkClient() *zookeeper.ZookeeperClient {
 	return zksd.client
 }
 
+// nolint
 func (zksd *zookeeperServiceDiscovery) SetZkClient(client *zookeeper.ZookeeperClient) {
 	zksd.client = client
 }
 
+// nolint
 func (zksd *zookeeperServiceDiscovery) ZkClientLock() *sync.Mutex {
 	return &zksd.cltLock
 }
 
+// nolint
 func (zksd *zookeeperServiceDiscovery) WaitGroup() *sync.WaitGroup {
 	return &zksd.wg
 }
 
+// nolint
 func (zksd *zookeeperServiceDiscovery) Done() chan struct{} {
 	return zksd.done
 }
 
+// RestartCallBack when zookeeper connection reconnect this function will be invoked.
+// try to re-register service, and listen services
 func (zksd *zookeeperServiceDiscovery) RestartCallBack() bool {
-	zksd.csd.ReRegisterService()
+	zksd.csd.ReRegisterServices()
+	zksd.listenLock.Lock()
+	defer zksd.listenLock.Unlock()
 	for _, name := range zksd.listenNames {
 		zksd.csd.ListenServiceEvent(name, zksd)
 	}
 	return true
 }
 
+// nolint
 func (zksd *zookeeperServiceDiscovery) GetUrl() common.URL {
 	return *zksd.url
 }
 
+// nolint
 func (zksd *zookeeperServiceDiscovery) String() string {
 	return fmt.Sprintf("zookeeper-service-discovery[%s]", zksd.url)
 }
 
+// Close client be closed
 func (zksd *zookeeperServiceDiscovery) Destroy() error {
 	zksd.client.Close()
 	return nil
 }
 
+// Register will register service in zookeeper, instance convert to curator's service instance
+// which define in curator-x-discovery.
 func (zksd *zookeeperServiceDiscovery) Register(instance registry.ServiceInstance) error {
 	cris := zksd.toCuratorInstance(instance)
 	return zksd.csd.RegisterService(cris)
 }
 
+// Register will update service in zookeeper, instance convert to curator's service instance
+// which define in curator-x-discovery, please refer to https://github.com/apache/curator.
 func (zksd *zookeeperServiceDiscovery) Update(instance registry.ServiceInstance) error {
 	cris := zksd.toCuratorInstance(instance)
 	return zksd.csd.UpdateService(cris)
 }
 
+// Unregister will unregister the instance in zookeeper
 func (zksd *zookeeperServiceDiscovery) Unregister(instance registry.ServiceInstance) error {
 	cris := zksd.toCuratorInstance(instance)
 	return zksd.csd.UnregisterService(cris)
 }
 
+// GetDefaultPageSize will return the constant registry.DefaultPageSize
 func (zksd *zookeeperServiceDiscovery) GetDefaultPageSize() int {
 	return registry.DefaultPageSize
 }
 
+// GetServices will return the all services in zookeeper
 func (zksd *zookeeperServiceDiscovery) GetServices() *gxset.HashSet {
 	services, err := zksd.csd.QueryForNames()
 	res := gxset.NewSet()
@@ -188,6 +208,7 @@ func (zksd *zookeeperServiceDiscovery) GetServices() *gxset.HashSet {
 	return res
 }
 
+// GetInstances will return the instances in a service
 func (zksd *zookeeperServiceDiscovery) GetInstances(serviceName string) []registry.ServiceInstance {
 	criss, err := zksd.csd.QueryForInstances(serviceName)
 	if err != nil {
@@ -202,6 +223,7 @@ func (zksd *zookeeperServiceDiscovery) GetInstances(serviceName string) []regist
 	return iss
 }
 
+// GetInstancesByPage will return the instances
 func (zksd *zookeeperServiceDiscovery) GetInstancesByPage(serviceName string, offset int, pageSize int) gxpage.Pager {
 	all := zksd.GetInstances(serviceName)
 	res := make([]interface{}, 0, pageSize)
@@ -212,6 +234,10 @@ func (zksd *zookeeperServiceDiscovery) GetInstancesByPage(serviceName string, of
 	return gxpage.New(offset, pageSize, res, len(all))
 }
 
+// GetHealthyInstancesByPage will return the instance
+// In zookeeper, all service instance's is healthy.
+// However, the healthy parameter in this method maybe false. So we can not use that API.
+// Thus, we must query all instances and then do filter
 func (zksd *zookeeperServiceDiscovery) GetHealthyInstancesByPage(serviceName string, offset int, pageSize int, healthy bool) gxpage.Pager {
 	all := zksd.GetInstances(serviceName)
 	res := make([]interface{}, 0, pageSize)
@@ -231,6 +257,7 @@ func (zksd *zookeeperServiceDiscovery) GetHealthyInstancesByPage(serviceName str
 	return gxpage.New(offset, pageSize, res, len(all))
 }
 
+// GetRequestInstances will return the instances
 func (zksd *zookeeperServiceDiscovery) GetRequestInstances(serviceNames []string, offset int, requestedSize int) map[string]gxpage.Pager {
 	res := make(map[string]gxpage.Pager, len(serviceNames))
 	for _, name := range serviceNames {
@@ -239,6 +266,7 @@ func (zksd *zookeeperServiceDiscovery) GetRequestInstances(serviceNames []string
 	return res
 }
 
+// AddListener ListenServiceEvent will add a data listener in service
 func (zksd *zookeeperServiceDiscovery) AddListener(listener *registry.ServiceInstancesChangedListener) error {
 	zksd.listenLock.Lock()
 	defer zksd.listenLock.Unlock()
@@ -251,26 +279,32 @@ func (zksd *zookeeperServiceDiscovery) DispatchEventByServiceName(serviceName st
 	return zksd.DispatchEventForInstances(serviceName, zksd.GetInstances(serviceName))
 }
 
+// DispatchEventForInstances dispatch ServiceInstancesChangedEvent
 func (zksd *zookeeperServiceDiscovery) DispatchEventForInstances(serviceName string, instances []registry.ServiceInstance) error {
 	return zksd.DispatchEvent(registry.NewServiceInstancesChangedEvent(serviceName, instances))
 }
 
+// nolint
 func (zksd *zookeeperServiceDiscovery) DispatchEvent(event *registry.ServiceInstancesChangedEvent) error {
 	extension.GetGlobalDispatcher().Dispatch(event)
 	return nil
 }
 
+// DataChange implement DataListener's DataChange function
+// to resolve event to do DispatchEventByServiceName
 func (zksd *zookeeperServiceDiscovery) DataChange(eventType remoting.Event) bool {
 	path := strings.TrimPrefix(eventType.Path, zksd.rootPath)
 	path = strings.TrimPrefix(eventType.Path, constant.PATH_SEPARATOR)
-	name := strings.Split(path, constant.PATH_SEPARATOR)[0]
-	err := zksd.DispatchEventByServiceName(name)
+	// get service name in zk path
+	serviceName := strings.Split(path, constant.PATH_SEPARATOR)[0]
+	err := zksd.DispatchEventByServiceName(serviceName)
 	if err != nil {
-		logger.Errorf("[zkServiceDiscovery] DispatchEventByServiceName{%s} error = err{%v}", name, err)
+		logger.Errorf("[zkServiceDiscovery] DispatchEventByServiceName{%s} error = err{%v}", serviceName, err)
 	}
 	return true
 }
 
+// toCuratorInstance convert to curator's service instance
 func (zksd *zookeeperServiceDiscovery) toCuratorInstance(instance registry.ServiceInstance) *curator_discovery.ServiceInstance {
 	id := instance.GetHost() + ":" + strconv.Itoa(instance.GetPort())
 	pl := make(map[string]interface{})
@@ -288,6 +322,7 @@ func (zksd *zookeeperServiceDiscovery) toCuratorInstance(instance registry.Servi
 	return cuis
 }
 
+// toZookeeperInstance convert to registry's service instance
 func (zksd *zookeeperServiceDiscovery) toZookeeperInstance(cris *curator_discovery.ServiceInstance) registry.ServiceInstance {
 	pl, ok := cris.Payload.(map[string]interface{})
 	if !ok {
diff --git a/remoting/zookeeper/curator_discovery/service_discovery.go b/remoting/zookeeper/curator_discovery/service_discovery.go
index 19df07112..3ece95b91 100644
--- a/remoting/zookeeper/curator_discovery/service_discovery.go
+++ b/remoting/zookeeper/curator_discovery/service_discovery.go
@@ -35,11 +35,14 @@ import (
 	"github.com/apache/dubbo-go/remoting/zookeeper"
 )
 
+// Entry contain a service instance
 type Entry struct {
 	sync.Mutex
 	instance *ServiceInstance
 }
 
+// ServiceInstance which define in curator-x-discovery, please refer to
+// https://github.com/apache/curator/blob/master/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceDiscovery.java
 type ServiceDiscovery struct {
 	client   *zookeeper.ZookeeperClient
 	mutex    *sync.Mutex
@@ -48,6 +51,7 @@ type ServiceDiscovery struct {
 	listener *zookeeper.ZkEventListener
 }
 
+// NewServiceDiscovery the constructor of service discovery
 func NewServiceDiscovery(client *zookeeper.ZookeeperClient, basePath string) *ServiceDiscovery {
 	return &ServiceDiscovery{
 		client:   client,
@@ -58,6 +62,7 @@ func NewServiceDiscovery(client *zookeeper.ZookeeperClient, basePath string) *Se
 	}
 }
 
+// registerService register service to zookeeper
 func (sd *ServiceDiscovery) registerService(instance *ServiceInstance) error {
 	path := sd.pathForInstance(instance.Name, instance.Id)
 	data, err := json.Marshal(instance)
@@ -71,6 +76,7 @@ func (sd *ServiceDiscovery) registerService(instance *ServiceInstance) error {
 	return nil
 }
 
+// RegisterService register service to zookeeper, and ensure cache is consistent with zookeeper
 func (sd *ServiceDiscovery) RegisterService(instance *ServiceInstance) error {
 	value, loaded := sd.services.LoadOrStore(instance.Id, &Entry{})
 	entry, ok := value.(*Entry)
@@ -90,6 +96,7 @@ func (sd *ServiceDiscovery) RegisterService(instance *ServiceInstance) error {
 	return nil
 }
 
+// UpdateService update service in zookeeper, and ensure cache is consistent with zookeeper
 func (sd *ServiceDiscovery) UpdateService(instance *ServiceInstance) error {
 	value, ok := sd.services.Load(instance.Id)
 	if !ok {
@@ -114,6 +121,7 @@ func (sd *ServiceDiscovery) UpdateService(instance *ServiceInstance) error {
 	return nil
 }
 
+// updateInternalService update service in cache
 func (sd *ServiceDiscovery) updateInternalService(name, id string) {
 	value, ok := sd.services.Load(id)
 	if !ok {
@@ -123,46 +131,43 @@ func (sd *ServiceDiscovery) updateInternalService(name, id string) {
 	if !ok {
 		return
 	}
+	entry.Lock()
+	defer entry.Unlock()
 	instance, err := sd.QueryForInstance(name, id)
 	if err != nil {
 		logger.Infof("[zkServiceDiscovery] UpdateInternalService{%s} error = err{%v}", id, err)
 		return
 	}
-	entry.Lock()
 	entry.instance = instance
-	entry.Unlock()
 	return
 }
 
+// UnregisterService un-register service in zookeeper and delete service in cache
 func (sd *ServiceDiscovery) UnregisterService(instance *ServiceInstance) error {
-	value, ok := sd.services.Load(instance.Id)
+	_, ok := sd.services.Load(instance.Id)
 	if !ok {
 		return nil
 	}
-	entry, ok := value.(*Entry)
-	if !ok {
-		return perrors.New("[ServiceDiscovery] services value not entry")
-	}
-	entry.Lock()
-	entry.Unlock()
 	sd.services.Delete(instance.Id)
 	return sd.unregisterService(instance)
 }
 
+// unregisterService un-register service in zookeeper
 func (sd *ServiceDiscovery) unregisterService(instance *ServiceInstance) error {
 	path := sd.pathForInstance(instance.Name, instance.Id)
 	return sd.client.Delete(path)
 }
 
-func (sd *ServiceDiscovery) ReRegisterService() {
+// ReRegisterServices re-register all cache services to zookeeper
+func (sd *ServiceDiscovery) ReRegisterServices() {
 	sd.services.Range(func(key, value interface{}) bool {
 		entry, ok := value.(*Entry)
 		if !ok {
 			return true
 		}
 		entry.Lock()
+		defer entry.Unlock()
 		instance := entry.instance
-		entry.Unlock()
 		err := sd.registerService(instance)
 		if err != nil {
 			logger.Errorf("[zkServiceDiscovery] registerService{%s} error = err{%v}", instance.Id, perrors.WithStack(err))
@@ -173,6 +178,7 @@ func (sd *ServiceDiscovery) ReRegisterService() {
 	})
 }
 
+// QueryForInstances query instances in zookeeper by name
 func (sd *ServiceDiscovery) QueryForInstances(name string) ([]*ServiceInstance, error) {
 	ids, err := sd.client.GetChildren(sd.pathForName(name))
 	if err != nil {
@@ -192,6 +198,7 @@ func (sd *ServiceDiscovery) QueryForInstances(name string) ([]*ServiceInstance,
 	return instances, nil
 }
 
+// QueryForInstance query instances in zookeeper by name and id
 func (sd *ServiceDiscovery) QueryForInstance(name string, id string) (*ServiceInstance, error) {
 	path := sd.pathForInstance(name, id)
 	data, _, err := sd.client.GetContent(path)
@@ -206,18 +213,22 @@ func (sd *ServiceDiscovery) QueryForInstance(name string, id string) (*ServiceIn
 	return instance, nil
 }
 
+// QueryForInstance query all service name in zookeeper
 func (sd *ServiceDiscovery) QueryForNames() ([]string, error) {
 	return sd.client.GetChildren(sd.basePath)
 }
 
+// ListenServiceEvent add a listener in a service
 func (sd *ServiceDiscovery) ListenServiceEvent(name string, listener remoting.DataListener) {
 	sd.listener.ListenServiceEvent(nil, sd.pathForName(name), listener)
 }
 
+// ListenServiceEvent add a listener in a instance
 func (sd *ServiceDiscovery) ListenServiceInstanceEvent(name, id string, listener remoting.DataListener) {
 	sd.listener.ListenServiceNodeEvent(sd.pathForInstance(name, id), listener)
 }
 
+// DataChange implement DataListener's DataChange function
 func (sd *ServiceDiscovery) DataChange(eventType remoting.Event) bool {
 	path := eventType.Path
 	name, id, err := sd.getNameAndId(path)
@@ -229,6 +240,7 @@ func (sd *ServiceDiscovery) DataChange(eventType remoting.Event) bool {
 	return true
 }
 
+// getNameAndId get service name and instance id by path
 func (sd *ServiceDiscovery) getNameAndId(path string) (string, string, error) {
 	path = strings.TrimPrefix(path, sd.basePath)
 	path = strings.TrimPrefix(path, constant.PATH_SEPARATOR)
@@ -241,10 +253,12 @@ func (sd *ServiceDiscovery) getNameAndId(path string) (string, string, error) {
 	return name, id, nil
 }
 
+// nolint
 func (sd *ServiceDiscovery) pathForInstance(name, id string) string {
 	return path.Join(sd.basePath, name, id)
 }
 
+// nolint
 func (sd *ServiceDiscovery) pathForName(name string) string {
 	return path.Join(sd.basePath, name)
 }
diff --git a/remoting/zookeeper/curator_discovery/service_instance.go b/remoting/zookeeper/curator_discovery/service_instance.go
index 1ba7a16f9..f8d2bc723 100644
--- a/remoting/zookeeper/curator_discovery/service_instance.go
+++ b/remoting/zookeeper/curator_discovery/service_instance.go
@@ -17,6 +17,8 @@
 
 package curator_discovery
 
+// ServiceInstance which define in curator-x-discovery, please refer to
+// https://github.com/apache/curator/blob/master/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceInstance.java
 type ServiceInstance struct {
 	Name                string
 	Id                  string
-- 
GitLab