From e0e19afbc46a6a171bd114313b85e908634e9b0b Mon Sep 17 00:00:00 2001
From: Joe Zou <yixian.zou@gmail.com>
Date: Tue, 22 Sep 2020 11:45:20 +0800
Subject: [PATCH] Merge pull request #701 from zhangshen023/develop

Ftr: consul service discovery
---
 common/constant/key.go                    |  12 +
 registry/consul/service_discovery.go      | 498 ++++++++++++++++++++++
 registry/consul/service_discovery_test.go | 231 ++++++++++
 3 files changed, 741 insertions(+)
 create mode 100644 registry/consul/service_discovery.go
 create mode 100644 registry/consul/service_discovery_test.go

diff --git a/common/constant/key.go b/common/constant/key.go
index 7a9eb683a..fe8c5baaf 100644
--- a/common/constant/key.go
+++ b/common/constant/key.go
@@ -181,6 +181,18 @@ const (
 	ETCDV3_KEY = "etcdv3"
 )
 
+const (
+	CONSUL_KEY          = "consul"
+	CHECK_PASS_INTERVAL = "consul-check-pass-interval"
+	// default time-to-live in millisecond
+	DEFAULT_CHECK_PASS_INTERVAL = 16000
+	QUERY_TAG                   = "consul_query_tag"
+	ACL_TOKEN                   = "acl-token"
+	// default deregister critical server after
+	DEFAULT_DEREGISTER_TIME = "20s"
+	DEREGISTER_AFTER        = "consul-deregister-critical-service-after"
+)
+
 const (
 	TRACING_REMOTE_SPAN_CTX = "tracing.remote.span.ctx"
 )
diff --git a/registry/consul/service_discovery.go b/registry/consul/service_discovery.go
new file mode 100644
index 000000000..d8ab93f31
--- /dev/null
+++ b/registry/consul/service_discovery.go
@@ -0,0 +1,498 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package consul
+
+import (
+	"encoding/base64"
+	"fmt"
+	"strconv"
+	"sync"
+	"time"
+)
+
+import (
+	"github.com/dubbogo/gost/container/set"
+	"github.com/dubbogo/gost/page"
+	consul "github.com/hashicorp/consul/api"
+	"github.com/hashicorp/consul/api/watch"
+	perrors "github.com/pkg/errors"
+)
+
+import (
+	"github.com/apache/dubbo-go/common/constant"
+	"github.com/apache/dubbo-go/common/extension"
+	"github.com/apache/dubbo-go/common/logger"
+	"github.com/apache/dubbo-go/config"
+	"github.com/apache/dubbo-go/registry"
+)
+
+const (
+	enable                 = "enable"
+	watch_type             = "type"
+	watch_type_service     = "service"
+	watch_service          = "service"
+	watch_passingonly      = "passingonly"
+	watch_passingonly_true = true
+)
+
+var (
+	errConsulClientClosed = perrors.New("consul client is closed")
+)
+
+// init will put the service discovery into extension
+func init() {
+	extension.SetServiceDiscovery(constant.CONSUL_KEY, newConsulServiceDiscovery)
+}
+
+// consulServiceDiscovery is the implementation of service discovery based on consul.
+type consulServiceDiscovery struct {
+	// descriptor is a short string about the basic information of this instance
+	descriptor string
+	clientLock sync.RWMutex
+	// Consul client.
+	consulClient                   *consul.Client
+	checkPassInterval              int64
+	tag                            string
+	address                        string
+	deregisterCriticalServiceAfter string
+	ttl                            sync.Map
+	*consul.Config
+}
+
+// newConsulServiceDiscovery will create new service discovery instance
+// use double-check pattern to reduce race condition
+func newConsulServiceDiscovery(name string) (registry.ServiceDiscovery, error) {
+	sdc, ok := config.GetBaseConfig().GetServiceDiscoveries(name)
+	if !ok || len(sdc.RemoteRef) == 0 {
+		return nil, perrors.New("could not init the instance because the config is invalid")
+	}
+
+	remoteConfig, ok := config.GetBaseConfig().GetRemoteConfig(sdc.RemoteRef)
+	if !ok {
+		return nil, perrors.New("could not find the remote config for name: " + sdc.RemoteRef)
+	}
+
+	descriptor := fmt.Sprintf("consul-service-discovery[%s]", remoteConfig.Address)
+
+	config := &consul.Config{Address: remoteConfig.Address, Token: remoteConfig.Params[constant.ACL_TOKEN]}
+	client, err := consul.NewClient(config)
+	if err != nil {
+		return nil, perrors.WithMessage(err, "create consul client failed.")
+	}
+
+	return &consulServiceDiscovery{
+		address:                        remoteConfig.Address,
+		descriptor:                     descriptor,
+		checkPassInterval:              getCheckPassInterval(remoteConfig.Params),
+		Config:                         config,
+		tag:                            remoteConfig.Params[constant.QUERY_TAG],
+		consulClient:                   client,
+		deregisterCriticalServiceAfter: getDeregisterAfter(remoteConfig.Params),
+		clientLock:                     sync.RWMutex{},
+	}, nil
+}
+
+func (csd *consulServiceDiscovery) String() string {
+	return csd.descriptor
+}
+
+// nolint
+func (csd *consulServiceDiscovery) getConsulClient() *consul.Client {
+	csd.clientLock.RLock()
+	defer csd.clientLock.RUnlock()
+	return csd.consulClient
+}
+
+// nolint
+func (csd *consulServiceDiscovery) setConsulClient(consulClient *consul.Client) {
+	csd.clientLock.Lock()
+	defer csd.clientLock.Unlock()
+	csd.consulClient = consulClient
+}
+
+func (csd *consulServiceDiscovery) Destroy() error {
+	csd.setConsulClient(nil)
+	csd.ttl.Range(func(key, t interface{}) bool {
+		close(t.(chan struct{}))
+		csd.ttl.Delete(key)
+		return true
+	})
+	return nil
+}
+
+func (csd *consulServiceDiscovery) Register(instance registry.ServiceInstance) error {
+	var (
+		err          error
+		consulClient *consul.Client
+	)
+	ins, _ := csd.buildRegisterInstance(instance)
+	if consulClient = csd.getConsulClient(); consulClient == nil {
+		return errConsulClientClosed
+	}
+	err = consulClient.Agent().ServiceRegister(ins)
+	if err != nil {
+		logger.Errorf("consul register the instance %s fail:%v", instance.GetServiceName(), err)
+		return perrors.WithMessage(err, "consul could not register the instance. "+instance.GetServiceName())
+	}
+
+	return csd.registerTtl(instance)
+}
+
+func (csd *consulServiceDiscovery) registerTtl(instance registry.ServiceInstance) error {
+	var (
+		err          error
+		consulClient *consul.Client
+	)
+
+	checkID := buildID(instance)
+
+	stopChan := make(chan struct{})
+	csd.ttl.LoadOrStore(buildID(instance), stopChan)
+
+	period := time.Duration(csd.checkPassInterval/8) * time.Millisecond
+	timer := time.NewTicker(period)
+	go func() {
+		defer timer.Stop()
+		for {
+			select {
+			case <-timer.C:
+				if consulClient = csd.getConsulClient(); consulClient == nil {
+					logger.Debugf("consul client is closed!")
+					return
+				}
+				err = consulClient.Agent().PassTTL(fmt.Sprintf("service:%s", checkID), "")
+				if err != nil {
+					logger.Warnf("pass ttl heartbeat fail:%v", err)
+					break
+				}
+				logger.Debugf("passed ttl heartbeat for %s", checkID)
+				break
+			case <-stopChan:
+				logger.Info("ttl %s for service %s is stopped", checkID, instance.GetServiceName())
+				return
+			}
+		}
+	}()
+	return nil
+}
+
+func (csd *consulServiceDiscovery) Update(instance registry.ServiceInstance) error {
+	var (
+		err          error
+		consulClient *consul.Client
+	)
+	ins, _ := csd.buildRegisterInstance(instance)
+	consulClient = csd.getConsulClient()
+	if consulClient == nil {
+		return errConsulClientClosed
+	}
+	err = consulClient.Agent().ServiceDeregister(buildID(instance))
+	if err != nil {
+		logger.Warnf("unregister instance %s fail:%v", instance.GetServiceName(), err)
+	}
+	return consulClient.Agent().ServiceRegister(ins)
+}
+
+func (csd *consulServiceDiscovery) Unregister(instance registry.ServiceInstance) error {
+	var (
+		err          error
+		consulClient *consul.Client
+	)
+	if consulClient = csd.getConsulClient(); consulClient == nil {
+		return errConsulClientClosed
+	}
+	err = consulClient.Agent().ServiceDeregister(buildID(instance))
+	if err != nil {
+		logger.Errorf("unregister service instance %s,error: %v", instance.GetId(), err)
+		return err
+	}
+	stopChanel, ok := csd.ttl.Load(buildID(instance))
+	if !ok {
+		logger.Warnf("ttl for service instance %s didn't exist", instance.GetId())
+		return nil
+	}
+	close(stopChanel.(chan struct{}))
+	csd.ttl.Delete(buildID(instance))
+	return nil
+}
+
+func (csd *consulServiceDiscovery) GetDefaultPageSize() int {
+	return registry.DefaultPageSize
+}
+
+func (csd *consulServiceDiscovery) GetServices() *gxset.HashSet {
+	var (
+		err          error
+		consulClient *consul.Client
+		services     map[string][]string
+	)
+	var res = gxset.NewSet()
+	if consulClient = csd.getConsulClient(); consulClient == nil {
+		logger.Warnf("consul client is closed!")
+		return res
+	}
+	services, _, err = consulClient.Catalog().Services(nil)
+	if err != nil {
+		logger.Errorf("get services,error: %v", err)
+		return res
+	}
+
+	for service, _ := range services {
+		res.Add(service)
+	}
+	return res
+
+}
+
+// encodeConsulMetadata because consul validate key strictly.
+func encodeConsulMetadata(metadata map[string]string) map[string]string {
+	consulMetadata := make(map[string]string, len(metadata))
+	encoder := base64.RawStdEncoding
+	for k, v := range metadata {
+		consulMetadata[encoder.EncodeToString([]byte(k))] = v
+	}
+	return consulMetadata
+}
+
+// nolint
+func decodeConsulMetadata(metadata map[string]string) map[string]string {
+	dubboMetadata := make(map[string]string, len(metadata))
+	encoder := base64.RawStdEncoding
+	for k, v := range metadata {
+		kBytes, err := encoder.DecodeString(k)
+		if err != nil {
+			logger.Warnf("can not decoded consul metadata key %s", k)
+			continue
+		}
+		dubboMetadata[string(kBytes)] = v
+	}
+	return dubboMetadata
+}
+
+func (csd *consulServiceDiscovery) GetInstances(serviceName string) []registry.ServiceInstance {
+	var (
+		err          error
+		consulClient *consul.Client
+		instances    []*consul.ServiceEntry
+	)
+	if consulClient = csd.getConsulClient(); consulClient == nil {
+		logger.Warn("consul client is closed!")
+		return nil
+	}
+	instances, _, err = consulClient.Health().Service(serviceName, csd.tag, true, &consul.QueryOptions{
+		WaitTime: time.Duration(csd.checkPassInterval),
+	})
+
+	if err != nil {
+		logger.Errorf("get instances for service %s,error: %v", serviceName, err)
+		return nil
+	}
+
+	res := make([]registry.ServiceInstance, 0, len(instances))
+	for _, ins := range instances {
+		metadata := ins.Service.Meta
+
+		// enable status
+		enableStr := metadata[enable]
+		delete(metadata, enable)
+		enable, _ := strconv.ParseBool(enableStr)
+		metadata = decodeConsulMetadata(metadata)
+
+		// health status
+		status := ins.Checks.AggregatedStatus()
+		healthy := false
+		if status == consul.HealthPassing {
+			healthy = true
+		}
+		res = append(res, &registry.DefaultServiceInstance{
+			Id:          ins.Service.ID,
+			ServiceName: ins.Service.Service,
+			Host:        ins.Service.Address,
+			Port:        ins.Service.Port,
+			Enable:      enable,
+			Healthy:     healthy,
+			Metadata:    metadata,
+		})
+	}
+
+	return res
+}
+
+func (csd *consulServiceDiscovery) GetInstancesByPage(serviceName string, offset int, pageSize int) gxpage.Pager {
+	all := csd.GetInstances(serviceName)
+	res := make([]interface{}, 0, pageSize)
+	for i := offset; i < len(all) && i < offset+pageSize; i++ {
+		res = append(res, all[i])
+	}
+	return gxpage.New(offset, pageSize, res, len(all))
+}
+
+func (csd *consulServiceDiscovery) GetHealthyInstancesByPage(serviceName string, offset int, pageSize int, healthy bool) gxpage.Pager {
+	all := csd.GetInstances(serviceName)
+	res := make([]interface{}, 0, pageSize)
+	// could not use res = all[a:b] here because the res should be []interface{}, not []ServiceInstance
+	var (
+		i     = offset
+		count = 0
+	)
+	for i < len(all) && count < pageSize {
+		ins := all[i]
+		if ins.IsHealthy() == healthy {
+			res = append(res, all[i])
+			count++
+		}
+		i++
+	}
+	return gxpage.New(offset, pageSize, res, len(all))
+}
+
+func (csd *consulServiceDiscovery) GetRequestInstances(serviceNames []string, offset int, requestedSize int) map[string]gxpage.Pager {
+	res := make(map[string]gxpage.Pager, len(serviceNames))
+	for _, name := range serviceNames {
+		res[name] = csd.GetInstancesByPage(name, offset, requestedSize)
+	}
+	return res
+}
+
+func (csd *consulServiceDiscovery) AddListener(listener *registry.ServiceInstancesChangedListener) error {
+
+	params := make(map[string]interface{}, 8)
+	params[watch_type] = watch_type_service
+	params[watch_service] = listener.ServiceName
+	params[watch_passingonly] = watch_passingonly_true
+	plan, err := watch.Parse(params)
+	if err != nil {
+		logger.Errorf("add listener for service %s,error:%v", listener.ServiceName, err)
+		return err
+	}
+
+	plan.Handler = func(idx uint64, raw interface{}) {
+		services, ok := raw.([]*consul.ServiceEntry)
+		if !ok {
+			err = perrors.New("handler get non ServiceEntry type parameter")
+			return
+		}
+		instances := make([]registry.ServiceInstance, 0, len(services))
+		for _, ins := range services {
+			metadata := ins.Service.Meta
+
+			// enable status
+			enableStr := metadata[enable]
+			delete(metadata, enable)
+			enable, _ := strconv.ParseBool(enableStr)
+
+			// health status
+			status := ins.Checks.AggregatedStatus()
+			healthy := false
+			if status == consul.HealthPassing {
+				healthy = true
+			}
+			instances = append(instances, &registry.DefaultServiceInstance{
+				Id:          ins.Service.ID,
+				ServiceName: ins.Service.Service,
+				Host:        ins.Service.Address,
+				Port:        ins.Service.Port,
+				Enable:      enable,
+				Healthy:     healthy,
+				Metadata:    metadata,
+			})
+		}
+		e := csd.DispatchEventForInstances(listener.ServiceName, instances)
+		if e != nil {
+			logger.Errorf("Dispatching event got exception, service name: %s, err: %v", listener.ServiceName, err)
+		}
+	}
+	go func() {
+		err = plan.RunWithConfig(csd.Config.Address, csd.Config)
+		if err != nil {
+			logger.Error("consul plan run failure!error:%v", err)
+		}
+	}()
+	return nil
+}
+
+func (csd *consulServiceDiscovery) DispatchEventByServiceName(serviceName string) error {
+	return csd.DispatchEventForInstances(serviceName, csd.GetInstances(serviceName))
+}
+
+func (csd *consulServiceDiscovery) DispatchEventForInstances(serviceName string, instances []registry.ServiceInstance) error {
+	return csd.DispatchEvent(registry.NewServiceInstancesChangedEvent(serviceName, instances))
+}
+
+func (csd *consulServiceDiscovery) DispatchEvent(event *registry.ServiceInstancesChangedEvent) error {
+	extension.GetGlobalDispatcher().Dispatch(event)
+	return nil
+}
+
+func (csd *consulServiceDiscovery) buildRegisterInstance(instance registry.ServiceInstance) (*consul.AgentServiceRegistration, error) {
+	metadata := instance.GetMetadata()
+	metadata = encodeConsulMetadata(metadata)
+	metadata[enable] = strconv.FormatBool(instance.IsEnable())
+	// check
+	check := csd.buildCheck(instance)
+
+	return &consul.AgentServiceRegistration{
+		ID:      buildID(instance),
+		Name:    instance.GetServiceName(),
+		Port:    instance.GetPort(),
+		Address: instance.GetHost(),
+		Meta:    metadata,
+		Check:   &check,
+	}, nil
+}
+
+func (csd *consulServiceDiscovery) buildCheck(instance registry.ServiceInstance) consul.AgentServiceCheck {
+
+	deregister, ok := instance.GetMetadata()[constant.DEREGISTER_AFTER]
+	if !ok || len(deregister) == 0 {
+		deregister = constant.DEFAULT_DEREGISTER_TIME
+	}
+	return consul.AgentServiceCheck{
+		TTL:                            strconv.FormatInt(csd.checkPassInterval/1000, 10) + "s",
+		DeregisterCriticalServiceAfter: csd.deregisterCriticalServiceAfter,
+	}
+}
+
+// nolint
+func getCheckPassInterval(params map[string]string) int64 {
+	checkPassIntervalStr, ok := params[constant.CHECK_PASS_INTERVAL]
+	if !ok {
+		return constant.DEFAULT_CHECK_PASS_INTERVAL
+	}
+	checkPassInterval, err := strconv.ParseInt(checkPassIntervalStr, 10, 64)
+	if err != nil {
+		logger.Warnf("consul service discovery remote config error:%s", checkPassIntervalStr)
+		return constant.DEFAULT_CHECK_PASS_INTERVAL
+	}
+	return checkPassInterval
+}
+
+// nolint
+func getDeregisterAfter(metadata map[string]string) string {
+	deregister, ok := metadata[constant.DEREGISTER_AFTER]
+	if !ok || len(deregister) == 0 {
+		deregister = constant.DEFAULT_DEREGISTER_TIME
+	}
+	return deregister
+}
+
+// nolint
+func buildID(instance registry.ServiceInstance) string {
+	id := fmt.Sprintf("id:%s,serviceName:%s,host:%s,port:%d", instance.GetId(), instance.GetServiceName(), instance.GetHost(), instance.GetPort())
+	return id
+}
diff --git a/registry/consul/service_discovery_test.go b/registry/consul/service_discovery_test.go
new file mode 100644
index 000000000..ed7220f2d
--- /dev/null
+++ b/registry/consul/service_discovery_test.go
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package consul
+
+import (
+	"fmt"
+	"math/rand"
+	"strconv"
+	"testing"
+	"time"
+)
+
+import (
+	"github.com/stretchr/testify/assert"
+)
+
+import (
+	"github.com/apache/dubbo-go/common"
+	"github.com/apache/dubbo-go/common/constant"
+	"github.com/apache/dubbo-go/common/extension"
+	"github.com/apache/dubbo-go/common/observer"
+	"github.com/apache/dubbo-go/config"
+	"github.com/apache/dubbo-go/registry"
+	"github.com/apache/dubbo-go/remoting/consul"
+)
+
+var (
+	testName                             = "test"
+	consulCheckPassInterval              = 17000
+	consulDeregisterCriticalServiceAfter = "20s"
+	consulWatchTimeout                   = 60000
+	registryURL                          = common.URL{}
+)
+
+func TestConsulServiceDiscovery_newConsulServiceDiscovery(t *testing.T) {
+	name := "consul1"
+	_, err := newConsulServiceDiscovery(name)
+	assert.NotNil(t, err)
+
+	sdc := &config.ServiceDiscoveryConfig{
+		Protocol:  "consul",
+		RemoteRef: "mock",
+	}
+
+	config.GetBaseConfig().ServiceDiscoveries[name] = sdc
+
+	_, err = newConsulServiceDiscovery(name)
+	assert.NotNil(t, err)
+
+	config.GetBaseConfig().Remotes["mock"] = &config.RemoteConfig{
+		Address: "localhost:8081",
+	}
+
+	res, err := newConsulServiceDiscovery(name)
+	assert.Nil(t, err)
+	assert.NotNil(t, res)
+}
+
+func TestConsulServiceDiscovery_Destroy(t *testing.T) {
+	prepareData()
+	serviceDiscovery, err := extension.GetServiceDiscovery(constant.CONSUL_KEY, testName)
+	prepareService()
+	assert.Nil(t, err)
+	assert.NotNil(t, serviceDiscovery)
+	err = serviceDiscovery.Destroy()
+	assert.Nil(t, err)
+	assert.Nil(t, serviceDiscovery.(*consulServiceDiscovery).consulClient)
+}
+
+func TestConsulServiceDiscovery_CRUD(t *testing.T) {
+	// start consul agent
+	consulAgent := consul.NewConsulAgent(t, registryPort)
+	defer consulAgent.Shutdown()
+
+	prepareData()
+	var eventDispatcher = MockEventDispatcher{Notify: make(chan struct{}, 1)}
+	extension.SetEventDispatcher("mock", func() observer.EventDispatcher {
+		return &eventDispatcher
+	})
+
+	extension.SetAndInitGlobalDispatcher("mock")
+	rand.Seed(time.Now().Unix())
+
+	instance, _ := prepareService()
+
+	// clean data
+	serviceDiscovery, err := extension.GetServiceDiscovery(constant.CONSUL_KEY, testName)
+	assert.Nil(t, err)
+
+	err = serviceDiscovery.Unregister(instance)
+	assert.Nil(t, err)
+
+	err = serviceDiscovery.Register(instance)
+	assert.Nil(t, err)
+
+	//sometimes nacos may be failed to push update of instance,
+	//so it need 10s to pull, we sleep 10 second to make sure instance has been update
+	time.Sleep(3 * time.Second)
+	page := serviceDiscovery.GetHealthyInstancesByPage(instance.GetServiceName(), 0, 10, true)
+	assert.NotNil(t, page)
+	assert.Equal(t, 0, page.GetOffset())
+	assert.Equal(t, 10, page.GetPageSize())
+	assert.Equal(t, 1, page.GetDataSize())
+
+	instanceResult := page.GetData()[0].(*registry.DefaultServiceInstance)
+	assert.NotNil(t, instanceResult)
+	assert.Equal(t, buildID(instance), instanceResult.GetId())
+	assert.Equal(t, instance.GetHost(), instanceResult.GetHost())
+	assert.Equal(t, instance.GetPort(), instanceResult.GetPort())
+	assert.Equal(t, instance.GetServiceName(), instanceResult.GetServiceName())
+	metadata := instanceResult.GetMetadata()
+	assert.Equal(t, 0, len(metadata))
+
+	instance.GetMetadata()["aaa"] = "bbb"
+	err = serviceDiscovery.Update(instance)
+	assert.Nil(t, err)
+
+	time.Sleep(3 * time.Second)
+	pageMap := serviceDiscovery.GetRequestInstances([]string{instance.GetServiceName()}, 0, 1)
+	assert.Equal(t, 1, len(pageMap))
+
+	page = pageMap[instance.GetServiceName()]
+	assert.NotNil(t, page)
+	assert.Equal(t, 1, len(page.GetData()))
+
+	instanceResult = page.GetData()[0].(*registry.DefaultServiceInstance)
+	v, _ := instanceResult.Metadata["aaa"]
+	assert.Equal(t, "bbb", v)
+
+	// test dispatcher event
+	//err = serviceDiscovery.DispatchEventByServiceName(instanceResult.GetServiceName())
+	//assert.Nil(t, err)
+
+	// test AddListener
+	err = serviceDiscovery.AddListener(&registry.ServiceInstancesChangedListener{ServiceName: instance.GetServiceName()})
+	assert.Nil(t, err)
+	err = serviceDiscovery.Unregister(instance)
+	assert.Nil(t, err)
+	timer := time.NewTimer(time.Second * 10)
+	select {
+	case <-eventDispatcher.Notify:
+		assert.NotNil(t, eventDispatcher.Event)
+		break
+	case <-timer.C:
+		assert.Fail(t, "")
+		break
+	}
+}
+
+func prepareData() {
+	config.GetBaseConfig().ServiceDiscoveries[testName] = &config.ServiceDiscoveryConfig{
+		Protocol:  "consul",
+		RemoteRef: testName,
+	}
+
+	config.GetBaseConfig().Remotes[testName] = &config.RemoteConfig{
+		Address: fmt.Sprintf("%s:%d", registryHost, registryPort),
+	}
+}
+
+func prepareService() (registry.ServiceInstance, common.URL) {
+	id := "id"
+
+	registryUrl, _ := common.NewURL(protocol + "://" + providerHost + ":" + strconv.Itoa(providerPort) + "/" + service + "?anyhost=true&" +
+		"application=BDTService&category=providers&default.timeout=10000&dubbo=dubbo-provider-golang-1.0.0&" +
+		"environment=dev&interface=com.ikurento.user.UserProvider&ip=192.168.56.1&methods=GetUser%2C&" +
+		"module=dubbogo+user-info+server&org=ikurento.com&owner=ZX&pid=1447&revision=0.0.1&" +
+		"side=provider&timeout=3000&timestamp=1556509797245&consul-check-pass-interval=" + strconv.Itoa(consulCheckPassInterval) + "&consul-deregister-critical-service-after=" + consulDeregisterCriticalServiceAfter + "&" +
+		"consul-watch-timeout=" + strconv.Itoa(consulWatchTimeout))
+
+	return &registry.DefaultServiceInstance{
+		Id:          id,
+		ServiceName: service,
+		Host:        registryHost,
+		Port:        registryPort,
+		Enable:      true,
+		Healthy:     true,
+		Metadata:    nil,
+	}, registryUrl
+}
+
+type MockEventDispatcher struct {
+	Notify chan struct{}
+	Event  observer.Event
+}
+
+// AddEventListener do nothing
+func (m *MockEventDispatcher) AddEventListener(listener observer.EventListener) {
+}
+
+// AddEventListeners do nothing
+func (m *MockEventDispatcher) AddEventListeners(listenersSlice []observer.EventListener) {
+}
+
+// RemoveEventListener do nothing
+func (m *MockEventDispatcher) RemoveEventListener(listener observer.EventListener) {
+}
+
+// RemoveEventListeners do nothing
+func (m *MockEventDispatcher) RemoveEventListeners(listenersSlice []observer.EventListener) {
+}
+
+// GetAllEventListeners return empty list
+func (m *MockEventDispatcher) GetAllEventListeners() []observer.EventListener {
+	return make([]observer.EventListener, 0)
+}
+
+// RemoveAllEventListeners do nothing
+func (m *MockEventDispatcher) RemoveAllEventListeners() {
+}
+
+// Dispatch do nothing
+func (m *MockEventDispatcher) Dispatch(event observer.Event) {
+	m.Event = event
+	m.Notify <- struct{}{}
+}
-- 
GitLab