Skip to content
Snippets Groups Projects
Commit bea99263 authored by zhangshen023's avatar zhangshen023
Browse files

consul ttl check

parent 106d51b1
No related branches found
No related tags found
No related merge requests found
...@@ -54,7 +54,7 @@ const ( ...@@ -54,7 +54,7 @@ const (
CHECK_PASS_INTERVAL = "consul-check-pass-interval" CHECK_PASS_INTERVAL = "consul-check-pass-interval"
// default time-to-live in millisecond // default time-to-live in millisecond
DEFAULT_CHECK_PASS_INTERVAL = 16000 DEFAULT_CHECK_PASS_INTERVAL = 16000
UERY_TAG = "consul_query_tag" QUERY_TAG = "consul_query_tag"
ACL_TOKEN = "acl-token" ACL_TOKEN = "acl-token"
// default deregister critical server after // default deregister critical server after
DEFAULT_DEREGISTER_TIME = "20s" DEFAULT_DEREGISTER_TIME = "20s"
...@@ -117,6 +117,7 @@ func newConsulServiceDiscovery(name string) (registry.ServiceDiscovery, error) { ...@@ -117,6 +117,7 @@ func newConsulServiceDiscovery(name string) (registry.ServiceDiscovery, error) {
address: remoteConfig.Address, address: remoteConfig.Address,
descriptor: descriptor, descriptor: descriptor,
PageSize: pageSize, PageSize: pageSize,
ttl: make(map[string]chan struct{}),
}, nil }, nil
} }
...@@ -135,12 +136,13 @@ type consulServiceDiscovery struct { ...@@ -135,12 +136,13 @@ type consulServiceDiscovery struct {
tag string tag string
tags []string tags []string
address string address string
ttl map[string]chan struct{}
} }
func (csd consulServiceDiscovery) Initialize(registryURL common.URL) error { func (csd *consulServiceDiscovery) Initialize(registryURL common.URL) error {
csd.serviceUrl = registryURL csd.serviceUrl = registryURL
csd.checkPassInterval = registryURL.GetParamInt(CHECK_PASS_INTERVAL, DEFAULT_CHECK_PASS_INTERVAL) csd.checkPassInterval = registryURL.GetParamInt(CHECK_PASS_INTERVAL, DEFAULT_CHECK_PASS_INTERVAL)
csd.tag = registryURL.GetParam(UERY_TAG, "") csd.tag = registryURL.GetParam(QUERY_TAG, "")
csd.tags = strings.Split(registryURL.GetParam("tags", ""), ",") csd.tags = strings.Split(registryURL.GetParam("tags", ""), ",")
aclToken := registryURL.GetParam(ACL_TOKEN, "") aclToken := registryURL.GetParam(ACL_TOKEN, "")
config := &consul.Config{Address: csd.address, Token: aclToken} config := &consul.Config{Address: csd.address, Token: aclToken}
...@@ -160,15 +162,42 @@ func (csd consulServiceDiscovery) Destroy() error { ...@@ -160,15 +162,42 @@ func (csd consulServiceDiscovery) Destroy() error {
return nil return nil
} }
func (csd consulServiceDiscovery) Register(instance registry.ServiceInstance) error { func (csd *consulServiceDiscovery) Register(instance registry.ServiceInstance) error {
ins, _ := csd.buildRegisterInstance(instance) ins, _ := csd.buildRegisterInstance(instance)
err := csd.consulClient.Agent().ServiceRegister(ins) err := csd.consulClient.Agent().ServiceRegister(ins)
if err != nil { if err != nil {
return perrors.WithMessage(err, "consul could not register the instance. "+instance.GetServiceName()) return perrors.WithMessage(err, "consul could not register the instance. "+instance.GetServiceName())
} }
csd.registerTtl(instance)
return nil return nil
} }
func (csd *consulServiceDiscovery) registerTtl(instance registry.ServiceInstance) error {
checkID := buildID(instance)
stopChan := make(chan struct{})
csd.ttl[buildID(instance)] = stopChan
period := time.Duration(csd.checkPassInterval/8) * time.Millisecond
timer := time.NewTimer(period)
go func() {
for {
select {
case <-timer.C:
timer.Reset(period)
csd.consulClient.Agent().PassTTL(checkID, "")
break
case <-stopChan:
logger.Info("ttl %s for service %s is stopped", checkID, instance.GetServiceName())
return
}
}
}()
return nil
}
func (csd consulServiceDiscovery) Update(instance registry.ServiceInstance) error { func (csd consulServiceDiscovery) Update(instance registry.ServiceInstance) error {
ins, err := csd.buildRegisterInstance(instance) ins, err := csd.buildRegisterInstance(instance)
...@@ -205,7 +234,6 @@ func (csd consulServiceDiscovery) GetInstances(serviceName string) []registry.Se ...@@ -205,7 +234,6 @@ func (csd consulServiceDiscovery) GetInstances(serviceName string) []registry.Se
waitTime := csd.serviceUrl.GetParamInt(WATCH_TIMEOUT, DEFAULT_WATCH_TIMEOUT) / 1000 waitTime := csd.serviceUrl.GetParamInt(WATCH_TIMEOUT, DEFAULT_WATCH_TIMEOUT) / 1000
instances, _, err := csd.consulClient.Health().Service(serviceName, csd.tag, true, &consul.QueryOptions{ instances, _, err := csd.consulClient.Health().Service(serviceName, csd.tag, true, &consul.QueryOptions{
WaitTime: time.Duration(waitTime), WaitTime: time.Duration(waitTime),
WaitIndex: -1,
}) })
if err != nil { if err != nil {
return nil return nil
...@@ -388,7 +416,7 @@ func (csd consulServiceDiscovery) buildCheck(instance registry.ServiceInstance) ...@@ -388,7 +416,7 @@ func (csd consulServiceDiscovery) buildCheck(instance registry.ServiceInstance)
func buildID(instance registry.ServiceInstance) string { func buildID(instance registry.ServiceInstance) string {
metaBytes, _ := json.Marshal(instance.GetMetadata()) metaBytes, _ := json.Marshal(instance.GetMetadata())
id := fmt.Sprintf("id:%s,serviceName:%s,host:%s,port:%d,enable:%b,healthy:%b,meta:%s", instance.GetId(), instance.GetServiceName(), id := fmt.Sprintf("id:%s,serviceName:%s,host:%s,port:%d,enable:%t,healthy:%t,meta:%s", instance.GetId(), instance.GetServiceName(),
instance.GetHost(), instance.GetPort(), instance.IsEnable(), instance.IsHealthy(), metaBytes) instance.GetHost(), instance.GetPort(), instance.IsEnable(), instance.IsHealthy(), metaBytes)
Md5Inst := md5.New() Md5Inst := md5.New()
Md5Inst.Write([]byte(id)) Md5Inst.Write([]byte(id))
......
...@@ -18,7 +18,9 @@ ...@@ -18,7 +18,9 @@
package consul package consul
import ( import (
"fmt"
"github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/remoting/consul"
"math/rand" "math/rand"
"strconv" "strconv"
"testing" "testing"
...@@ -39,8 +41,11 @@ import ( ...@@ -39,8 +41,11 @@ import (
) )
var ( var (
testName = "test" testName = "test"
registryURL = common.URL{ consulCheckPassInterval = 17000
consulDeregisterCriticalServiceAfter = "20s"
consulWatchTimeout = 60000
registryURL = common.URL{
Path: "", Path: "",
Username: "", Username: "",
Password: "", Password: "",
...@@ -65,7 +70,7 @@ func TestConsulServiceDiscovery_newConsulServiceDiscovery(t *testing.T) { ...@@ -65,7 +70,7 @@ func TestConsulServiceDiscovery_newConsulServiceDiscovery(t *testing.T) {
assert.NotNil(t, err) assert.NotNil(t, err)
config.GetBaseConfig().Remotes["mock"] = &config.RemoteConfig{ config.GetBaseConfig().Remotes["mock"] = &config.RemoteConfig{
Address: "", // TODO Address: "localhost:8081",
} }
res, err := newConsulServiceDiscovery(name) res, err := newConsulServiceDiscovery(name)
...@@ -86,6 +91,10 @@ func TestConsulServiceDiscovery_Destroy(t *testing.T) { ...@@ -86,6 +91,10 @@ func TestConsulServiceDiscovery_Destroy(t *testing.T) {
} }
func TestConsulServiceDiscovery_CRUD(t *testing.T) { func TestConsulServiceDiscovery_CRUD(t *testing.T) {
// start consul agent
consulAgent := consul.NewConsulAgent(t, registryPort)
defer consulAgent.Shutdown()
prepareData() prepareData()
extension.SetEventDispatcher("mock", func() observer.EventDispatcher { extension.SetEventDispatcher("mock", func() observer.EventDispatcher {
return &dispatcher.MockEventDispatcher{} return &dispatcher.MockEventDispatcher{}
...@@ -102,9 +111,9 @@ func TestConsulServiceDiscovery_CRUD(t *testing.T) { ...@@ -102,9 +111,9 @@ func TestConsulServiceDiscovery_CRUD(t *testing.T) {
err = serviceDiscovery.Initialize(registryUrl) err = serviceDiscovery.Initialize(registryUrl)
assert.Nil(t, err) assert.Nil(t, err)
// clean data for local test
err = serviceDiscovery.Unregister(instance) err = serviceDiscovery.Unregister(instance)
assert.Nil(t, err) assert.NotNil(t, err)
err = serviceDiscovery.Register(instance) err = serviceDiscovery.Register(instance)
assert.Nil(t, err) assert.Nil(t, err)
...@@ -158,28 +167,24 @@ func prepareData() { ...@@ -158,28 +167,24 @@ func prepareData() {
} }
config.GetBaseConfig().Remotes[testName] = &config.RemoteConfig{ config.GetBaseConfig().Remotes[testName] = &config.RemoteConfig{
Address: "", // TODO Address: fmt.Sprintf("%s:%d", registryHost, registryPort),
TimeoutStr: "10s",
} }
} }
func prepareService() (registry.ServiceInstance, common.URL) { func prepareService() (registry.ServiceInstance, common.URL) {
serviceName := "service-name" + strconv.Itoa(rand.Intn(10000))
id := "id" id := "id"
host := "host"
port := 123
registryUrl, _ := common.NewURL("dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider?anyhost=true&" + registryUrl, _ := common.NewURL(protocol + "://" + providerHost + ":" + strconv.Itoa(providerPort) + "/" + service + "?anyhost=true&" +
"application=BDTService&category=providers&default.timeout=10000&dubbo=dubbo-provider-golang-1.0.0&" + "application=BDTService&category=providers&default.timeout=10000&dubbo=dubbo-provider-golang-1.0.0&" +
"environment=dev&interface=com.ikurento.user.UserProvider&ip=192.168.56.1&methods=GetUser%2C&" + "environment=dev&interface=com.ikurento.user.UserProvider&ip=192.168.56.1&methods=GetUser%2C&" +
"module=dubbogo+user-info+server&org=ikurento.com&owner=ZX&pid=1447&revision=0.0.1&" + "module=dubbogo+user-info+server&org=ikurento.com&owner=ZX&pid=1447&revision=0.0.1&" +
"side=provider&timeout=3000&timestamp=1556509797245&consul-check-pass-interval=17000&consul-deregister-critical-service-after=20s&" + "side=provider&timeout=3000&timestamp=1556509797245&consul-check-pass-interval=" + strconv.Itoa(consulCheckPassInterval) + "&consul-deregister-critical-service-after=" + consulDeregisterCriticalServiceAfter + "&" +
"consul-watch-timeout=60000") "consul-watch-timeout=" + strconv.Itoa(consulWatchTimeout))
return &registry.DefaultServiceInstance{ return &registry.DefaultServiceInstance{
Id: id, Id: id,
ServiceName: serviceName, ServiceName: service,
Host: host, Host: registryHost,
Port: port, Port: registryPort,
Enable: true, Enable: true,
Healthy: true, Healthy: true,
Metadata: nil, Metadata: nil,
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment