diff --git a/registry/consul/service_discovery.go b/registry/consul/service_discovery.go
index 2ab2d63f34b11859f094250c4612601aac9d78b4..9e19cfb4a7b466d7dc5012bb1831493e9b77948f 100644
--- a/registry/consul/service_discovery.go
+++ b/registry/consul/service_discovery.go
@@ -54,7 +54,7 @@ const (
CHECK_PASS_INTERVAL = "consul-check-pass-interval"
// default time-to-live in millisecond
DEFAULT_CHECK_PASS_INTERVAL = 16000
- UERY_TAG = "consul_query_tag"
+ QUERY_TAG = "consul_query_tag"
ACL_TOKEN = "acl-token"
// default deregister critical server after
DEFAULT_DEREGISTER_TIME = "20s"
@@ -117,6 +117,7 @@ func newConsulServiceDiscovery(name string) (registry.ServiceDiscovery, error) {
address: remoteConfig.Address,
descriptor: descriptor,
PageSize: pageSize,
+ ttl: make(map[string]chan struct{}),
}, nil
}
@@ -135,12 +136,13 @@ type consulServiceDiscovery struct {
tag string
tags []string
address string
+ ttl map[string]chan struct{}
}
-func (csd consulServiceDiscovery) Initialize(registryURL common.URL) error {
+func (csd *consulServiceDiscovery) Initialize(registryURL common.URL) error {
csd.serviceUrl = registryURL
csd.checkPassInterval = registryURL.GetParamInt(CHECK_PASS_INTERVAL, DEFAULT_CHECK_PASS_INTERVAL)
- csd.tag = registryURL.GetParam(UERY_TAG, "")
+ csd.tag = registryURL.GetParam(QUERY_TAG, "")
csd.tags = strings.Split(registryURL.GetParam("tags", ""), ",")
aclToken := registryURL.GetParam(ACL_TOKEN, "")
config := &consul.Config{Address: csd.address, Token: aclToken}
@@ -160,15 +162,42 @@ func (csd consulServiceDiscovery) Destroy() error {
return nil
}
-func (csd consulServiceDiscovery) Register(instance registry.ServiceInstance) error {
+func (csd *consulServiceDiscovery) Register(instance registry.ServiceInstance) error {
ins, _ := csd.buildRegisterInstance(instance)
err := csd.consulClient.Agent().ServiceRegister(ins)
if err != nil {
return perrors.WithMessage(err, "consul could not register the instance. "+instance.GetServiceName())
}
+
+ csd.registerTtl(instance)
+
return nil
}
+func (csd *consulServiceDiscovery) registerTtl(instance registry.ServiceInstance) error {
+
+ checkID := buildID(instance)
+
+ stopChan := make(chan struct{})
+ csd.ttl[buildID(instance)] = stopChan
+
+ period := time.Duration(csd.checkPassInterval/8) * time.Millisecond
+ timer := time.NewTimer(period)
+ go func() {
+ for {
+ select {
+ case <-timer.C:
+ timer.Reset(period)
+ csd.consulClient.Agent().PassTTL(checkID, "")
+ break
+ case <-stopChan:
+ logger.Info("ttl %s for service %s is stopped", checkID, instance.GetServiceName())
+ return
+ }
+ }
+ }()
+ return nil
+}
func (csd consulServiceDiscovery) Update(instance registry.ServiceInstance) error {
ins, err := csd.buildRegisterInstance(instance)
@@ -205,7 +234,6 @@ func (csd consulServiceDiscovery) GetInstances(serviceName string) []registry.Se
waitTime := csd.serviceUrl.GetParamInt(WATCH_TIMEOUT, DEFAULT_WATCH_TIMEOUT) / 1000
instances, _, err := csd.consulClient.Health().Service(serviceName, csd.tag, true, &consul.QueryOptions{
WaitTime: time.Duration(waitTime),
- WaitIndex: -1,
})
if err != nil {
return nil
@@ -388,7 +416,7 @@ func (csd consulServiceDiscovery) buildCheck(instance registry.ServiceInstance)
func buildID(instance registry.ServiceInstance) string {
metaBytes, _ := json.Marshal(instance.GetMetadata())
- id := fmt.Sprintf("id:%s,serviceName:%s,host:%s,port:%d,enable:%b,healthy:%b,meta:%s", instance.GetId(), instance.GetServiceName(),
+ id := fmt.Sprintf("id:%s,serviceName:%s,host:%s,port:%d,enable:%t,healthy:%t,meta:%s", instance.GetId(), instance.GetServiceName(),
instance.GetHost(), instance.GetPort(), instance.IsEnable(), instance.IsHealthy(), metaBytes)
Md5Inst := md5.New()
Md5Inst.Write([]byte(id))
diff --git a/registry/consul/service_discovery_test.go b/registry/consul/service_discovery_test.go
index 9f7bc54cce731a531807610597eb8f5ae00e2f27..3ee71e17a1e21a90299dceefdce83b4c5ab9ca4a 100644
--- a/registry/consul/service_discovery_test.go
+++ b/registry/consul/service_discovery_test.go
@@ -18,7 +18,9 @@
package consul
import (
+ "fmt"
"github.com/apache/dubbo-go/common"
+ "github.com/apache/dubbo-go/remoting/consul"
"math/rand"
"strconv"
"testing"
@@ -39,8 +41,11 @@ import (
)
var (
- testName = "test"
- registryURL = common.URL{
+ testName = "test"
+ consulCheckPassInterval = 17000
+ consulDeregisterCriticalServiceAfter = "20s"
+ consulWatchTimeout = 60000
+ registryURL = common.URL{
Path: "",
Username: "",
Password: "",
@@ -65,7 +70,7 @@ func TestConsulServiceDiscovery_newConsulServiceDiscovery(t *testing.T) {
assert.NotNil(t, err)
config.GetBaseConfig().Remotes["mock"] = &config.RemoteConfig{
- Address: "", // TODO
+ Address: "localhost:8081",
}
res, err := newConsulServiceDiscovery(name)
@@ -86,6 +91,10 @@ func TestConsulServiceDiscovery_Destroy(t *testing.T) {
}
func TestConsulServiceDiscovery_CRUD(t *testing.T) {
+ // start consul agent
+ consulAgent := consul.NewConsulAgent(t, registryPort)
+ defer consulAgent.Shutdown()
+
prepareData()
extension.SetEventDispatcher("mock", func() observer.EventDispatcher {
return &dispatcher.MockEventDispatcher{}
@@ -102,9 +111,9 @@ func TestConsulServiceDiscovery_CRUD(t *testing.T) {
err = serviceDiscovery.Initialize(registryUrl)
assert.Nil(t, err)
- // clean data for local test
+
err = serviceDiscovery.Unregister(instance)
- assert.Nil(t, err)
+ assert.NotNil(t, err)
err = serviceDiscovery.Register(instance)
assert.Nil(t, err)
@@ -158,28 +167,24 @@ func prepareData() {
}
config.GetBaseConfig().Remotes[testName] = &config.RemoteConfig{
- Address: "", // TODO
- TimeoutStr: "10s",
+ Address: fmt.Sprintf("%s:%d", registryHost, registryPort),
}
}
func prepareService() (registry.ServiceInstance, common.URL) {
- serviceName := "service-name" + strconv.Itoa(rand.Intn(10000))
id := "id"
- host := "host"
- port := 123
- registryUrl, _ := common.NewURL("dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider?anyhost=true&" +
+ registryUrl, _ := common.NewURL(protocol + "://" + providerHost + ":" + strconv.Itoa(providerPort) + "/" + service + "?anyhost=true&" +
"application=BDTService&category=providers&default.timeout=10000&dubbo=dubbo-provider-golang-1.0.0&" +
"environment=dev&interface=com.ikurento.user.UserProvider&ip=192.168.56.1&methods=GetUser%2C&" +
"module=dubbogo+user-info+server&org=ikurento.com&owner=ZX&pid=1447&revision=0.0.1&" +
- "side=provider&timeout=3000×tamp=1556509797245&consul-check-pass-interval=17000&consul-deregister-critical-service-after=20s&" +
- "consul-watch-timeout=60000")
+ "side=provider&timeout=3000×tamp=1556509797245&consul-check-pass-interval=" + strconv.Itoa(consulCheckPassInterval) + "&consul-deregister-critical-service-after=" + consulDeregisterCriticalServiceAfter + "&" +
+ "consul-watch-timeout=" + strconv.Itoa(consulWatchTimeout))
return ®istry.DefaultServiceInstance{
Id: id,
- ServiceName: serviceName,
- Host: host,
- Port: port,
+ ServiceName: service,
+ Host: registryHost,
+ Port: registryPort,
Enable: true,
Healthy: true,
Metadata: nil,