diff --git a/registry/consul/service_discovery.go b/registry/consul/service_discovery.go index 2ab2d63f34b11859f094250c4612601aac9d78b4..9e19cfb4a7b466d7dc5012bb1831493e9b77948f 100644 --- a/registry/consul/service_discovery.go +++ b/registry/consul/service_discovery.go @@ -54,7 +54,7 @@ const ( CHECK_PASS_INTERVAL = "consul-check-pass-interval" // default time-to-live in millisecond DEFAULT_CHECK_PASS_INTERVAL = 16000 - UERY_TAG = "consul_query_tag" + QUERY_TAG = "consul_query_tag" ACL_TOKEN = "acl-token" // default deregister critical server after DEFAULT_DEREGISTER_TIME = "20s" @@ -117,6 +117,7 @@ func newConsulServiceDiscovery(name string) (registry.ServiceDiscovery, error) { address: remoteConfig.Address, descriptor: descriptor, PageSize: pageSize, + ttl: make(map[string]chan struct{}), }, nil } @@ -135,12 +136,13 @@ type consulServiceDiscovery struct { tag string tags []string address string + ttl map[string]chan struct{} } -func (csd consulServiceDiscovery) Initialize(registryURL common.URL) error { +func (csd *consulServiceDiscovery) Initialize(registryURL common.URL) error { csd.serviceUrl = registryURL csd.checkPassInterval = registryURL.GetParamInt(CHECK_PASS_INTERVAL, DEFAULT_CHECK_PASS_INTERVAL) - csd.tag = registryURL.GetParam(UERY_TAG, "") + csd.tag = registryURL.GetParam(QUERY_TAG, "") csd.tags = strings.Split(registryURL.GetParam("tags", ""), ",") aclToken := registryURL.GetParam(ACL_TOKEN, "") config := &consul.Config{Address: csd.address, Token: aclToken} @@ -160,15 +162,42 @@ func (csd consulServiceDiscovery) Destroy() error { return nil } -func (csd consulServiceDiscovery) Register(instance registry.ServiceInstance) error { +func (csd *consulServiceDiscovery) Register(instance registry.ServiceInstance) error { ins, _ := csd.buildRegisterInstance(instance) err := csd.consulClient.Agent().ServiceRegister(ins) if err != nil { return perrors.WithMessage(err, "consul could not register the instance. "+instance.GetServiceName()) } + + csd.registerTtl(instance) + return nil } +func (csd *consulServiceDiscovery) registerTtl(instance registry.ServiceInstance) error { + + checkID := buildID(instance) + + stopChan := make(chan struct{}) + csd.ttl[buildID(instance)] = stopChan + + period := time.Duration(csd.checkPassInterval/8) * time.Millisecond + timer := time.NewTimer(period) + go func() { + for { + select { + case <-timer.C: + timer.Reset(period) + csd.consulClient.Agent().PassTTL(checkID, "") + break + case <-stopChan: + logger.Info("ttl %s for service %s is stopped", checkID, instance.GetServiceName()) + return + } + } + }() + return nil +} func (csd consulServiceDiscovery) Update(instance registry.ServiceInstance) error { ins, err := csd.buildRegisterInstance(instance) @@ -205,7 +234,6 @@ func (csd consulServiceDiscovery) GetInstances(serviceName string) []registry.Se waitTime := csd.serviceUrl.GetParamInt(WATCH_TIMEOUT, DEFAULT_WATCH_TIMEOUT) / 1000 instances, _, err := csd.consulClient.Health().Service(serviceName, csd.tag, true, &consul.QueryOptions{ WaitTime: time.Duration(waitTime), - WaitIndex: -1, }) if err != nil { return nil @@ -388,7 +416,7 @@ func (csd consulServiceDiscovery) buildCheck(instance registry.ServiceInstance) func buildID(instance registry.ServiceInstance) string { metaBytes, _ := json.Marshal(instance.GetMetadata()) - id := fmt.Sprintf("id:%s,serviceName:%s,host:%s,port:%d,enable:%b,healthy:%b,meta:%s", instance.GetId(), instance.GetServiceName(), + id := fmt.Sprintf("id:%s,serviceName:%s,host:%s,port:%d,enable:%t,healthy:%t,meta:%s", instance.GetId(), instance.GetServiceName(), instance.GetHost(), instance.GetPort(), instance.IsEnable(), instance.IsHealthy(), metaBytes) Md5Inst := md5.New() Md5Inst.Write([]byte(id)) diff --git a/registry/consul/service_discovery_test.go b/registry/consul/service_discovery_test.go index 9f7bc54cce731a531807610597eb8f5ae00e2f27..3ee71e17a1e21a90299dceefdce83b4c5ab9ca4a 100644 --- a/registry/consul/service_discovery_test.go +++ b/registry/consul/service_discovery_test.go @@ -18,7 +18,9 @@ package consul import ( + "fmt" "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/remoting/consul" "math/rand" "strconv" "testing" @@ -39,8 +41,11 @@ import ( ) var ( - testName = "test" - registryURL = common.URL{ + testName = "test" + consulCheckPassInterval = 17000 + consulDeregisterCriticalServiceAfter = "20s" + consulWatchTimeout = 60000 + registryURL = common.URL{ Path: "", Username: "", Password: "", @@ -65,7 +70,7 @@ func TestConsulServiceDiscovery_newConsulServiceDiscovery(t *testing.T) { assert.NotNil(t, err) config.GetBaseConfig().Remotes["mock"] = &config.RemoteConfig{ - Address: "", // TODO + Address: "localhost:8081", } res, err := newConsulServiceDiscovery(name) @@ -86,6 +91,10 @@ func TestConsulServiceDiscovery_Destroy(t *testing.T) { } func TestConsulServiceDiscovery_CRUD(t *testing.T) { + // start consul agent + consulAgent := consul.NewConsulAgent(t, registryPort) + defer consulAgent.Shutdown() + prepareData() extension.SetEventDispatcher("mock", func() observer.EventDispatcher { return &dispatcher.MockEventDispatcher{} @@ -102,9 +111,9 @@ func TestConsulServiceDiscovery_CRUD(t *testing.T) { err = serviceDiscovery.Initialize(registryUrl) assert.Nil(t, err) - // clean data for local test + err = serviceDiscovery.Unregister(instance) - assert.Nil(t, err) + assert.NotNil(t, err) err = serviceDiscovery.Register(instance) assert.Nil(t, err) @@ -158,28 +167,24 @@ func prepareData() { } config.GetBaseConfig().Remotes[testName] = &config.RemoteConfig{ - Address: "", // TODO - TimeoutStr: "10s", + Address: fmt.Sprintf("%s:%d", registryHost, registryPort), } } func prepareService() (registry.ServiceInstance, common.URL) { - serviceName := "service-name" + strconv.Itoa(rand.Intn(10000)) id := "id" - host := "host" - port := 123 - registryUrl, _ := common.NewURL("dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider?anyhost=true&" + + registryUrl, _ := common.NewURL(protocol + "://" + providerHost + ":" + strconv.Itoa(providerPort) + "/" + service + "?anyhost=true&" + "application=BDTService&category=providers&default.timeout=10000&dubbo=dubbo-provider-golang-1.0.0&" + "environment=dev&interface=com.ikurento.user.UserProvider&ip=192.168.56.1&methods=GetUser%2C&" + "module=dubbogo+user-info+server&org=ikurento.com&owner=ZX&pid=1447&revision=0.0.1&" + - "side=provider&timeout=3000×tamp=1556509797245&consul-check-pass-interval=17000&consul-deregister-critical-service-after=20s&" + - "consul-watch-timeout=60000") + "side=provider&timeout=3000×tamp=1556509797245&consul-check-pass-interval=" + strconv.Itoa(consulCheckPassInterval) + "&consul-deregister-critical-service-after=" + consulDeregisterCriticalServiceAfter + "&" + + "consul-watch-timeout=" + strconv.Itoa(consulWatchTimeout)) return ®istry.DefaultServiceInstance{ Id: id, - ServiceName: serviceName, - Host: host, - Port: port, + ServiceName: service, + Host: registryHost, + Port: registryPort, Enable: true, Healthy: true, Metadata: nil,