diff --git a/registry/consul/service_discovery.go b/registry/consul/service_discovery.go
index 9e19cfb4a7b466d7dc5012bb1831493e9b77948f..a41b03c8d69d7b38feccebf1c2afd461582e2921 100644
--- a/registry/consul/service_discovery.go
+++ b/registry/consul/service_discovery.go
@@ -18,8 +18,6 @@
package consul
import (
- "crypto/md5"
- "encoding/json"
"fmt"
"strconv"
"strings"
@@ -159,6 +157,10 @@ func (csd consulServiceDiscovery) String() string {
func (csd consulServiceDiscovery) Destroy() error {
csd.consulClient = nil
+ for _, t := range csd.ttl {
+ close(t)
+ }
+ csd.ttl = nil
return nil
}
@@ -188,7 +190,12 @@ func (csd *consulServiceDiscovery) registerTtl(instance registry.ServiceInstance
select {
case <-timer.C:
timer.Reset(period)
- csd.consulClient.Agent().PassTTL(checkID, "")
+ err := csd.consulClient.Agent().PassTTL(checkID, "")
+ if err != nil {
+ logger.Warnf("pass ttl heartbeat fail:%v", err)
+ break
+ }
+ logger.Debugf("passed ttl heartbeat for %s", checkID)
break
case <-stopChan:
logger.Info("ttl %s for service %s is stopped", checkID, instance.GetServiceName())
@@ -200,15 +207,26 @@ func (csd *consulServiceDiscovery) registerTtl(instance registry.ServiceInstance
}
func (csd consulServiceDiscovery) Update(instance registry.ServiceInstance) error {
- ins, err := csd.buildRegisterInstance(instance)
+ ins, _ := csd.buildRegisterInstance(instance)
+ err := csd.consulClient.Agent().ServiceDeregister(buildID(instance))
if err != nil {
- panic(err)
+ logger.Warnf("unregister instance %s fail:%v", instance.GetServiceName(), err)
}
- return csd.consulClient.Agent().ServiceRegisterOpts(ins, consul.ServiceRegisterOpts{ReplaceExistingChecks: true})
+ err = csd.consulClient.Agent().ServiceRegister(ins)
+ return err
}
func (csd consulServiceDiscovery) Unregister(instance registry.ServiceInstance) error {
- return csd.consulClient.Agent().ServiceDeregister(buildID(instance))
+ err := csd.consulClient.Agent().ServiceDeregister(buildID(instance))
+ if err != nil {
+ return err
+ }
+ stopChanel, ok := csd.ttl[buildID(instance)]
+ if ok {
+ close(stopChanel)
+ delete(csd.ttl, buildID(instance))
+ }
+ return nil
}
func (csd consulServiceDiscovery) GetDefaultPageSize() int {
@@ -233,7 +251,7 @@ func (csd consulServiceDiscovery) GetServices() *gxset.HashSet {
func (csd consulServiceDiscovery) GetInstances(serviceName string) []registry.ServiceInstance {
waitTime := csd.serviceUrl.GetParamInt(WATCH_TIMEOUT, DEFAULT_WATCH_TIMEOUT) / 1000
instances, _, err := csd.consulClient.Health().Service(serviceName, csd.tag, true, &consul.QueryOptions{
- WaitTime: time.Duration(waitTime),
+ WaitTime: time.Duration(waitTime),
})
if err != nil {
return nil
@@ -352,19 +370,17 @@ func (csd consulServiceDiscovery) AddListener(listener *registry.ServiceInstance
Metadata: metadata,
})
}
- if len(instances) < 1 {
- return
- }
e := csd.DispatchEventForInstances(listener.ServiceName, instances)
if e != nil {
logger.Errorf("Dispatching event got exception, service name: %s, err: %v", listener.ServiceName, err)
}
}
- err = plan.RunWithClientAndHclog(csd.consulClient, hcLogger)
- if err != nil {
- logger.Error("consul plan run failure!error:%v", err)
- return err
- }
+ go func() {
+ err = plan.RunWithClientAndHclog(csd.consulClient, hcLogger)
+ if err != nil {
+ logger.Error("consul plan run failure!error:%v", err)
+ }
+ }()
return nil
}
@@ -408,6 +424,7 @@ func (csd consulServiceDiscovery) buildCheck(instance registry.ServiceInstance)
deregister = DEFAULT_DEREGISTER_TIME
}
return consul.AgentServiceCheck{
+ CheckID: buildID(instance),
TTL: strconv.FormatInt(csd.checkPassInterval/1000, 10) + "s",
DeregisterCriticalServiceAfter: deregister,
}
@@ -415,10 +432,8 @@ func (csd consulServiceDiscovery) buildCheck(instance registry.ServiceInstance)
func buildID(instance registry.ServiceInstance) string {
- metaBytes, _ := json.Marshal(instance.GetMetadata())
- id := fmt.Sprintf("id:%s,serviceName:%s,host:%s,port:%d,enable:%t,healthy:%t,meta:%s", instance.GetId(), instance.GetServiceName(),
- instance.GetHost(), instance.GetPort(), instance.IsEnable(), instance.IsHealthy(), metaBytes)
- Md5Inst := md5.New()
- Md5Inst.Write([]byte(id))
- return string(Md5Inst.Sum([]byte("")))
+ id := fmt.Sprintf("id:%s,serviceName:%s,host:%s,port:%d", instance.GetId(), instance.GetServiceName(), instance.GetHost(), instance.GetPort())
+ //Md5Inst := md5.New()
+ //Md5Inst.Write([]byte(id))
+ return id
}
diff --git a/registry/consul/service_discovery_test.go b/registry/consul/service_discovery_test.go
index 3ee71e17a1e21a90299dceefdce83b4c5ab9ca4a..1410326d51301f02cb519c805ba602fe50049069 100644
--- a/registry/consul/service_discovery_test.go
+++ b/registry/consul/service_discovery_test.go
@@ -35,7 +35,6 @@ import (
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/common/observer"
- "github.com/apache/dubbo-go/common/observer/dispatcher"
"github.com/apache/dubbo-go/config"
"github.com/apache/dubbo-go/registry"
)
@@ -96,8 +95,9 @@ func TestConsulServiceDiscovery_CRUD(t *testing.T) {
defer consulAgent.Shutdown()
prepareData()
+ var eventDispatcher = MockEventDispatcher{Notify: make(chan struct{}, 1)}
extension.SetEventDispatcher("mock", func() observer.EventDispatcher {
- return &dispatcher.MockEventDispatcher{}
+ return &eventDispatcher
})
extension.SetAndInitGlobalDispatcher("mock")
@@ -113,33 +113,33 @@ func TestConsulServiceDiscovery_CRUD(t *testing.T) {
assert.Nil(t, err)
err = serviceDiscovery.Unregister(instance)
- assert.NotNil(t, err)
+ assert.Nil(t, err)
err = serviceDiscovery.Register(instance)
assert.Nil(t, err)
//sometimes nacos may be failed to push update of instance,
//so it need 10s to pull, we sleep 10 second to make sure instance has been update
- time.Sleep(11 * time.Second)
+ time.Sleep(3 * time.Second)
page := serviceDiscovery.GetHealthyInstancesByPage(instance.GetServiceName(), 0, 10, true)
assert.NotNil(t, page)
assert.Equal(t, 0, page.GetOffset())
assert.Equal(t, 10, page.GetPageSize())
assert.Equal(t, 1, page.GetDataSize())
- instance = page.GetData()[0].(*registry.DefaultServiceInstance)
- assert.NotNil(t, instance)
- assert.Equal(t, buildID(instance), instance.GetId())
- assert.Equal(t, instance.GetHost(), instance.GetHost())
- assert.Equal(t, instance.GetPort(), instance.GetPort())
- assert.Equal(t, instance.GetServiceName(), instance.GetServiceName())
- assert.Equal(t, 0, len(instance.GetMetadata()))
+ instanceResult := page.GetData()[0].(*registry.DefaultServiceInstance)
+ assert.NotNil(t, instanceResult)
+ assert.Equal(t, buildID(instance), instanceResult.GetId())
+ assert.Equal(t, instance.GetHost(), instanceResult.GetHost())
+ assert.Equal(t, instance.GetPort(), instanceResult.GetPort())
+ assert.Equal(t, instance.GetServiceName(), instanceResult.GetServiceName())
+ assert.Equal(t, 0, len(instanceResult.GetMetadata()))
- instance.GetMetadata()["a"] = "b"
+ instance.GetMetadata()["aaa"] = "bbb"
err = serviceDiscovery.Update(instance)
assert.Nil(t, err)
- time.Sleep(11 * time.Second)
+ time.Sleep(3 * time.Second)
pageMap := serviceDiscovery.GetRequestInstances([]string{instance.GetServiceName()}, 0, 1)
assert.Equal(t, 1, len(pageMap))
@@ -147,17 +147,28 @@ func TestConsulServiceDiscovery_CRUD(t *testing.T) {
assert.NotNil(t, page)
assert.Equal(t, 1, len(page.GetData()))
- instance = page.GetData()[0].(*registry.DefaultServiceInstance)
- v, _ := instance.GetMetadata()["a"]
- assert.Equal(t, "b", v)
+ instanceResult = page.GetData()[0].(*registry.DefaultServiceInstance)
+ v, _ := instanceResult.GetMetadata()["aaa"]
+ assert.Equal(t, "bbb", v)
// test dispatcher event
- err = serviceDiscovery.DispatchEventByServiceName(instance.GetServiceName())
- assert.Nil(t, err)
+ //err = serviceDiscovery.DispatchEventByServiceName(instanceResult.GetServiceName())
+ //assert.Nil(t, err)
// test AddListener
- err = serviceDiscovery.AddListener(®istry.ServiceInstancesChangedListener{})
+ err = serviceDiscovery.AddListener(®istry.ServiceInstancesChangedListener{ServiceName: instance.GetServiceName()})
assert.Nil(t, err)
+ err = serviceDiscovery.Unregister(instance)
+ assert.Nil(t, err)
+ timer := time.NewTimer(time.Second * 10)
+ select {
+ case <-eventDispatcher.Notify:
+ assert.NotNil(t, eventDispatcher.Event)
+ break
+ case <-timer.C:
+ assert.Fail(t, "")
+ break
+ }
}
func prepareData() {
@@ -190,3 +201,39 @@ func prepareService() (registry.ServiceInstance, common.URL) {
Metadata: nil,
}, registryUrl
}
+
+type MockEventDispatcher struct {
+ Notify chan struct{}
+ Event observer.Event
+}
+
+// AddEventListener do nothing
+func (m MockEventDispatcher) AddEventListener(listener observer.EventListener) {
+}
+
+// AddEventListeners do nothing
+func (m MockEventDispatcher) AddEventListeners(listenersSlice []observer.EventListener) {
+}
+
+// RemoveEventListener do nothing
+func (m MockEventDispatcher) RemoveEventListener(listener observer.EventListener) {
+}
+
+// RemoveEventListeners do nothing
+func (m MockEventDispatcher) RemoveEventListeners(listenersSlice []observer.EventListener) {
+}
+
+// GetAllEventListeners return empty list
+func (m MockEventDispatcher) GetAllEventListeners() []observer.EventListener {
+ return make([]observer.EventListener, 0)
+}
+
+// RemoveAllEventListeners do nothing
+func (m MockEventDispatcher) RemoveAllEventListeners() {
+}
+
+// Dispatch do nothing
+func (m *MockEventDispatcher) Dispatch(event observer.Event) {
+ m.Event = event
+ m.Notify <- struct{}{}
+}