From b08b8fc1ed12e1371bdfff7f73d8c536ef3a3c57 Mon Sep 17 00:00:00 2001
From: Patrick <dreamlike.sky@foxmail.com>
Date: Fri, 26 Jun 2020 15:19:54 +0800
Subject: [PATCH] zookeeper's service_discovery

---
 common/constant/key.go                        |   4 +
 registry/zookeeper/service_discovery.go       | 284 ++++++++++++++++++
 remoting/zookeeper/client.go                  |   7 +-
 .../curator_discovery/service_discovery.go    | 149 +++++++++
 .../curator_discovery/service_instance.go     |  27 ++
 remoting/zookeeper/facade.go                  |   8 +-
 remoting/zookeeper/facade_test.go             |   2 +-
 7 files changed, 475 insertions(+), 6 deletions(-)
 create mode 100644 registry/zookeeper/service_discovery.go
 create mode 100644 remoting/zookeeper/curator_discovery/service_discovery.go
 create mode 100644 remoting/zookeeper/curator_discovery/service_instance.go

diff --git a/common/constant/key.go b/common/constant/key.go
index 414d24e6e..18d7f3de3 100644
--- a/common/constant/key.go
+++ b/common/constant/key.go
@@ -159,6 +159,10 @@ const (
 	NACOS_PATH_KEY               = "path"
 )
 
+const (
+	ZOOKEEPER_KEY = "zookeeper"
+)
+
 const (
 	TRACING_REMOTE_SPAN_CTX = "tracing.remote.span.ctx"
 )
diff --git a/registry/zookeeper/service_discovery.go b/registry/zookeeper/service_discovery.go
new file mode 100644
index 000000000..5daf3e1a2
--- /dev/null
+++ b/registry/zookeeper/service_discovery.go
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package zookeeper
+
+import (
+	"fmt"
+	"strconv"
+	"strings"
+	"sync"
+)
+
+import (
+	"github.com/dubbogo/gost/container/set"
+	"github.com/dubbogo/gost/page"
+	perrors "github.com/pkg/errors"
+)
+
+import (
+	"github.com/apache/dubbo-go/common"
+	"github.com/apache/dubbo-go/common/constant"
+	"github.com/apache/dubbo-go/common/extension"
+	"github.com/apache/dubbo-go/common/logger"
+	"github.com/apache/dubbo-go/config"
+	"github.com/apache/dubbo-go/registry"
+	"github.com/apache/dubbo-go/remoting"
+	"github.com/apache/dubbo-go/remoting/zookeeper"
+	"github.com/apache/dubbo-go/remoting/zookeeper/curator_discovery"
+)
+
+const (
+	// RegistryZkClient zk client name
+	ServiceDiscoveryZkClient = "zk service discovery"
+)
+
+var (
+	// 16 would be enough. We won't use concurrentMap because in most cases, there are not race condition
+	instanceMap = make(map[string]registry.ServiceDiscovery, 16)
+	initLock    sync.Mutex
+)
+
+// init will put the service discovery into extension
+func init() {
+	extension.SetServiceDiscovery(constant.ZOOKEEPER_KEY, newZookeeperServiceDiscovery)
+}
+
+type zookeeperServiceDiscovery struct {
+	client   *zookeeper.ZookeeperClient
+	csd      *curator_discovery.ServiceDiscovery
+	listener *zookeeper.ZkEventListener
+	url      *common.URL
+	wg       sync.WaitGroup
+	cltLock  sync.Mutex
+	done     chan struct{}
+	rootPath string
+}
+
+func newZookeeperServiceDiscovery(name string) (registry.ServiceDiscovery, error) {
+	instance, ok := instanceMap[name]
+	if ok {
+		return instance, nil
+	}
+
+	initLock.Lock()
+	defer initLock.Unlock()
+
+	// double check
+	instance, ok = instanceMap[name]
+	if ok {
+		return instance, nil
+	}
+
+	sdc, ok := config.GetBaseConfig().GetServiceDiscoveries(name)
+	if !ok || len(sdc.RemoteRef) == 0 {
+		return nil, perrors.New("could not init the instance because the config is invalid")
+	}
+	remoteConfig, ok := config.GetBaseConfig().GetRemoteConfig(sdc.RemoteRef)
+	if !ok {
+		return nil, perrors.New("could not find the remote config for name: " + sdc.RemoteRef)
+	}
+	basePath := remoteConfig.GetParam("rootPath", "/services")
+	url := common.NewURLWithOptions(
+		common.WithLocation(remoteConfig.Address),
+		common.WithPassword(remoteConfig.Password),
+		common.WithUsername(remoteConfig.Username),
+		common.WithParamsValue(constant.REGISTRY_TIMEOUT_KEY, remoteConfig.TimeoutStr))
+	zksd := &zookeeperServiceDiscovery{
+		url:      url,
+		rootPath: basePath,
+	}
+	err := zookeeper.ValidateZookeeperClient(zksd, zookeeper.WithZkName(ServiceDiscoveryZkClient))
+	if err != nil {
+		return nil, err
+	}
+	go zookeeper.HandleClientRestart(zksd)
+	zksd.listener = zookeeper.NewZkEventListener(zksd.client)
+	zksd.listener.ListenServiceEvent(nil, basePath, zksd)
+	zksd.csd = curator_discovery.NewServiceDiscovery(zksd.client, basePath)
+	return zksd, nil
+}
+
+func (zksd zookeeperServiceDiscovery) ZkClient() *zookeeper.ZookeeperClient {
+	return zksd.client
+}
+
+func (zksd zookeeperServiceDiscovery) SetZkClient(client *zookeeper.ZookeeperClient) {
+	zksd.client = client
+}
+
+func (zksd zookeeperServiceDiscovery) ZkClientLock() *sync.Mutex {
+	return &zksd.cltLock
+}
+
+func (zksd zookeeperServiceDiscovery) WaitGroup() *sync.WaitGroup {
+	return &zksd.wg
+}
+
+func (zksd zookeeperServiceDiscovery) Done() chan struct{} {
+	return zksd.done
+}
+
+func (zksd zookeeperServiceDiscovery) RestartCallBack() bool {
+	zksd.csd.ReRegisterService()
+	zksd.listener.ListenServiceEvent(nil, zksd.rootPath, zksd)
+	return true
+}
+
+func (zksd zookeeperServiceDiscovery) GetUrl() common.URL {
+	return *zksd.url
+}
+
+func (zksd zookeeperServiceDiscovery) String() string {
+	return fmt.Sprintf("zookeeper-service-discovery[%s]", zksd.url)
+}
+
+func (zksd zookeeperServiceDiscovery) Destroy() error {
+	zksd.client.Close()
+	return nil
+}
+
+func (zksd zookeeperServiceDiscovery) Register(instance registry.ServiceInstance) error {
+	cris := zksd.toCuratorInstance(instance)
+	return zksd.csd.RegisterService(cris)
+}
+
+func (zksd zookeeperServiceDiscovery) Update(instance registry.ServiceInstance) error {
+	cris := zksd.toCuratorInstance(instance)
+	return zksd.csd.UpdateService(cris)
+}
+
+func (zksd zookeeperServiceDiscovery) Unregister(instance registry.ServiceInstance) error {
+	cris := zksd.toCuratorInstance(instance)
+	return zksd.csd.UnregisterService(cris)
+}
+
+func (zksd zookeeperServiceDiscovery) GetDefaultPageSize() int {
+	return registry.DefaultPageSize
+}
+
+func (zksd zookeeperServiceDiscovery) GetServices() *gxset.HashSet {
+	services, err := zksd.csd.QueryForNames()
+	res := gxset.NewSet()
+	if err != nil {
+		logger.Errorf("[zkServiceDiscovery] Could not query the services: %v", err)
+		return res
+	}
+	for _, service := range services {
+		res.Add(service)
+	}
+	return res
+}
+
+func (zksd zookeeperServiceDiscovery) GetInstances(serviceName string) []registry.ServiceInstance {
+	criss, err := zksd.csd.QueryForInstances(serviceName)
+	if err != nil {
+		logger.Errorf("[zkServiceDiscovery] Could not query the instances for service{%s}, error = err{%v} ",
+			serviceName, err)
+		return make([]registry.ServiceInstance, 0, 0)
+	}
+	iss := make([]registry.ServiceInstance, 0, len(criss))
+	for _, cris := range criss {
+		iss = append(iss, zksd.toZookeeperInstance(cris))
+	}
+	return iss
+}
+
+func (zksd zookeeperServiceDiscovery) GetInstancesByPage(serviceName string, offset int, pageSize int) gxpage.Pager {
+	all := zksd.GetInstances(serviceName)
+	res := make([]interface{}, 0, pageSize)
+	// could not use res = all[a:b] here because the res should be []interface{}, not []ServiceInstance
+	for i := offset; i < len(all) && i < offset+pageSize; i++ {
+		res = append(res, all[i])
+	}
+	return gxpage.New(offset, pageSize, res, len(all))
+}
+
+func (zksd zookeeperServiceDiscovery) GetHealthyInstancesByPage(serviceName string, offset int, pageSize int, _ bool) gxpage.Pager {
+	return zksd.GetInstancesByPage(serviceName, offset, pageSize)
+}
+
+func (zksd zookeeperServiceDiscovery) GetRequestInstances(serviceNames []string, offset int, requestedSize int) map[string]gxpage.Pager {
+	res := make(map[string]gxpage.Pager, len(serviceNames))
+	for _, name := range serviceNames {
+		res[name] = zksd.GetInstancesByPage(name, offset, requestedSize)
+	}
+	return res
+}
+
+func (zksd zookeeperServiceDiscovery) AddListener(listener *registry.ServiceInstancesChangedListener) error {
+	return nil
+}
+
+func (zksd zookeeperServiceDiscovery) DispatchEventByServiceName(serviceName string) error {
+	return zksd.DispatchEventForInstances(serviceName, zksd.GetInstances(serviceName))
+}
+
+func (zksd zookeeperServiceDiscovery) DispatchEventForInstances(serviceName string, instances []registry.ServiceInstance) error {
+	return zksd.DispatchEvent(registry.NewServiceInstancesChangedEvent(serviceName, instances))
+}
+
+func (zksd zookeeperServiceDiscovery) DispatchEvent(event *registry.ServiceInstancesChangedEvent) error {
+	extension.GetGlobalDispatcher().Dispatch(event)
+	return nil
+}
+
+func (zksd zookeeperServiceDiscovery) DataChange(eventType remoting.Event) bool {
+	path := eventType.Path
+	name := strings.Split(path, "/")[1]
+	id := strings.Split(path, "/")[2]
+	zksd.csd.UpdateInternalService(name, id)
+	err := zksd.DispatchEventByServiceName(name)
+	if err != nil {
+		logger.Errorf("[zkServiceDiscovery] DispatchEventByServiceName{%s} error = err{%v}", name, err)
+	}
+	return true
+}
+
+func (zksd zookeeperServiceDiscovery) toCuratorInstance(instance registry.ServiceInstance) *curator_discovery.ServiceInstance {
+	id := instance.GetHost() + ":" + strconv.Itoa(instance.GetPort())
+	pl := make(map[string]interface{})
+	pl["id"] = id
+	pl["name"] = instance.GetServiceName()
+	pl["metadata"] = instance.GetMetadata()
+	cuis := &curator_discovery.ServiceInstance{
+		Name:                instance.GetServiceName(),
+		Id:                  id,
+		Address:             instance.GetHost(),
+		Port:                instance.GetPort(),
+		Payload:             pl,
+		RegistrationTimeUTC: 0,
+	}
+	return cuis
+}
+
+func (zksd zookeeperServiceDiscovery) toZookeeperInstance(cris *curator_discovery.ServiceInstance) registry.ServiceInstance {
+	pl, ok := cris.Payload.(map[string]interface{})
+	if !ok {
+		logger.Errorf("[zkServiceDiscovery] toZookeeperInstance{%s} payload is not map", cris.Id)
+	}
+	md, ok := pl["metadata"].(map[string]string)
+	return &registry.DefaultServiceInstance{
+		Id:          cris.Id,
+		ServiceName: cris.Name,
+		Host:        cris.Address,
+		Port:        cris.Port,
+		Enable:      true,
+		Healthy:     true,
+		Metadata:    md,
+	}
+}
diff --git a/remoting/zookeeper/client.go b/remoting/zookeeper/client.go
index f4aea5903..918345f25 100644
--- a/remoting/zookeeper/client.go
+++ b/remoting/zookeeper/client.go
@@ -108,7 +108,7 @@ func WithZkName(name string) Option {
 }
 
 // ValidateZookeeperClient ...
-func ValidateZookeeperClient(container zkClientFacade, opts ...Option) error {
+func ValidateZookeeperClient(container ZkClientFacade, opts ...Option) error {
 	var err error
 	options := &Options{}
 	for _, opt := range opts {
@@ -604,6 +604,11 @@ func (z *ZookeeperClient) GetContent(zkPath string) ([]byte, *zk.Stat, error) {
 	return z.Conn.Get(zkPath)
 }
 
+// GetContent ...
+func (z *ZookeeperClient) SetContent(zkPath string, content []byte, version int32) (*zk.Stat, error) {
+	return z.Conn.Set(zkPath, content, version)
+}
+
 // getConn gets zookeeper connection safely
 func (z *ZookeeperClient) getConn() *zk.Conn {
 	z.RLock()
diff --git a/remoting/zookeeper/curator_discovery/service_discovery.go b/remoting/zookeeper/curator_discovery/service_discovery.go
new file mode 100644
index 000000000..e300af122
--- /dev/null
+++ b/remoting/zookeeper/curator_discovery/service_discovery.go
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package curator_discovery
+
+import (
+	"encoding/json"
+	"github.com/apache/dubbo-go/common/logger"
+	"github.com/apache/dubbo-go/remoting/zookeeper"
+	perrors "github.com/pkg/errors"
+	"sync"
+)
+
+type ServiceDiscovery struct {
+	client   *zookeeper.ZookeeperClient
+	mutex    *sync.Mutex
+	basePath string
+	services *sync.Map
+}
+
+func NewServiceDiscovery(client *zookeeper.ZookeeperClient, basePath string) *ServiceDiscovery {
+	return &ServiceDiscovery{
+		client:   client,
+		mutex:    &sync.Mutex{},
+		basePath: basePath,
+		services: &sync.Map{},
+	}
+}
+
+func (sd *ServiceDiscovery) registerService(instance *ServiceInstance) error {
+	path := sd.basePath + "/" + instance.Name + "/" + instance.Id
+	data, err := json.Marshal(instance)
+	if err != nil {
+		return err
+	}
+	err = sd.client.CreateWithValue(path, data)
+	if err != nil {
+		return err
+	}
+	return nil
+}
+
+func (sd *ServiceDiscovery) RegisterService(instance *ServiceInstance) error {
+	sd.services.Store(instance.Id, instance)
+	return sd.registerService(instance)
+}
+
+func (sd *ServiceDiscovery) UpdateService(instance *ServiceInstance) error {
+	sd.services.Store(instance.Id, instance)
+	path := sd.basePath + "/" + instance.Name + "/" + instance.Id
+	data, err := json.Marshal(instance)
+	if err != nil {
+		return err
+	}
+	_, err = sd.client.SetContent(path, data, -1)
+	if err != nil {
+		return err
+	}
+	return nil
+}
+
+func (sd *ServiceDiscovery) UpdateInternalService(name, id string) {
+	_, ok := sd.services.Load(id)
+	if !ok {
+		return
+	}
+	instance, err := sd.QueryForInstance(name, id)
+	if err != nil {
+		logger.Infof("[zkServiceDiscovery] UpdateInternalService{%s} error = err{%v}", id, err)
+		return
+	}
+	sd.services.Store(instance.Id, instance)
+	return
+}
+
+func (sd *ServiceDiscovery) UnregisterService(instance *ServiceInstance) error {
+	sd.services.Delete(instance.Id)
+	return sd.unregisterService(instance)
+}
+
+func (sd *ServiceDiscovery) unregisterService(instance *ServiceInstance) error {
+	path := sd.basePath + "/" + instance.Name + "/" + instance.Id
+	return sd.client.Delete(path)
+}
+
+func (sd *ServiceDiscovery) ReRegisterService() {
+	sd.services.Range(func(key, value interface{}) bool {
+		instance, ok := value.(*ServiceInstance)
+		if !ok {
+
+		}
+		err := sd.registerService(instance)
+		if err != nil {
+			logger.Errorf("[zkServiceDiscovery] registerService{%s} error = err{%v}", instance.Id, perrors.WithStack(err))
+		}
+		return true
+	})
+}
+
+func (sd *ServiceDiscovery) QueryForInstances(name string) ([]*ServiceInstance, error) {
+	ids, err := sd.client.GetChildren(sd.basePath + "/" + name)
+	if err != nil {
+		return nil, err
+	}
+	var (
+		instance  *ServiceInstance
+		instances []*ServiceInstance
+	)
+	for _, id := range ids {
+		instance, err = sd.QueryForInstance(name, id)
+		if err != nil {
+			return nil, err
+		}
+		instances = append(instances, instance)
+	}
+	return instances, nil
+}
+
+func (sd *ServiceDiscovery) QueryForInstance(name string, id string) (*ServiceInstance, error) {
+	path := sd.basePath + "/" + name + "/" + id
+	data, _, err := sd.client.GetContent(path)
+	if err != nil {
+		return nil, err
+	}
+	instance := &ServiceInstance{}
+	err = json.Unmarshal(data, instance)
+	if err != nil {
+		return nil, err
+	}
+	return instance, nil
+}
+
+func (sd *ServiceDiscovery) QueryForNames() ([]string, error) {
+	return sd.client.GetChildren(sd.basePath)
+}
diff --git a/remoting/zookeeper/curator_discovery/service_instance.go b/remoting/zookeeper/curator_discovery/service_instance.go
new file mode 100644
index 000000000..1ba7a16f9
--- /dev/null
+++ b/remoting/zookeeper/curator_discovery/service_instance.go
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package curator_discovery
+
+type ServiceInstance struct {
+	Name                string
+	Id                  string
+	Address             string
+	Port                int
+	Payload             interface{}
+	RegistrationTimeUTC int64
+}
diff --git a/remoting/zookeeper/facade.go b/remoting/zookeeper/facade.go
index 4e3945388..504395f76 100644
--- a/remoting/zookeeper/facade.go
+++ b/remoting/zookeeper/facade.go
@@ -18,6 +18,7 @@
 package zookeeper
 
 import (
+	"github.com/apache/dubbo-go/common"
 	"sync"
 )
 import (
@@ -26,22 +27,21 @@ import (
 )
 
 import (
-	"github.com/apache/dubbo-go/common"
 	"github.com/apache/dubbo-go/common/logger"
 )
 
-type zkClientFacade interface {
+type ZkClientFacade interface {
 	ZkClient() *ZookeeperClient
 	SetZkClient(*ZookeeperClient)
 	ZkClientLock() *sync.Mutex
 	WaitGroup() *sync.WaitGroup //for wait group control, zk client listener & zk client container
 	Done() chan struct{}        //for zk client control
 	RestartCallBack() bool
-	common.Node
+	GetUrl() common.URL
 }
 
 // HandleClientRestart ...
-func HandleClientRestart(r zkClientFacade) {
+func HandleClientRestart(r ZkClientFacade) {
 	var (
 		err error
 
diff --git a/remoting/zookeeper/facade_test.go b/remoting/zookeeper/facade_test.go
index 01d46da6c..1cd8f064b 100644
--- a/remoting/zookeeper/facade_test.go
+++ b/remoting/zookeeper/facade_test.go
@@ -38,7 +38,7 @@ type mockFacade struct {
 	done    chan struct{}
 }
 
-func newMockFacade(client *ZookeeperClient, url *common.URL) zkClientFacade {
+func newMockFacade(client *ZookeeperClient, url *common.URL) ZkClientFacade {
 	mock := &mockFacade{
 		client: client,
 		URL:    url,
-- 
GitLab