Skip to content
Snippets Groups Projects
Commit b00f4a4e authored by shen's avatar shen
Browse files

suit consul

parent 2035a22e
No related branches found
No related tags found
No related merge requests found
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
package consul package consul
import ( import (
"encoding/base64"
"fmt" "fmt"
"strconv" "strconv"
"strings" "strings"
...@@ -143,6 +144,7 @@ func (csd *consulServiceDiscovery) Register(instance registry.ServiceInstance) e ...@@ -143,6 +144,7 @@ func (csd *consulServiceDiscovery) Register(instance registry.ServiceInstance) e
ins, _ := csd.buildRegisterInstance(instance) ins, _ := csd.buildRegisterInstance(instance)
err := csd.consulClient.Agent().ServiceRegister(ins) err := csd.consulClient.Agent().ServiceRegister(ins)
if err != nil { if err != nil {
logger.Errorf("consul register the instance %s fail:%v", instance.GetServiceName(), err)
return perrors.WithMessage(err, "consul could not register the instance. "+instance.GetServiceName()) return perrors.WithMessage(err, "consul could not register the instance. "+instance.GetServiceName())
} }
...@@ -162,7 +164,7 @@ func (csd *consulServiceDiscovery) registerTtl(instance registry.ServiceInstance ...@@ -162,7 +164,7 @@ func (csd *consulServiceDiscovery) registerTtl(instance registry.ServiceInstance
select { select {
case <-timer.C: case <-timer.C:
timer.Reset(period) timer.Reset(period)
err := csd.consulClient.Agent().PassTTL(checkID, "") err := csd.consulClient.Agent().PassTTL(fmt.Sprintf("service:%s", checkID), "")
if err != nil { if err != nil {
logger.Warnf("pass ttl heartbeat fail:%v", err) logger.Warnf("pass ttl heartbeat fail:%v", err)
break break
...@@ -223,6 +225,37 @@ func (csd *consulServiceDiscovery) GetServices() *gxset.HashSet { ...@@ -223,6 +225,37 @@ func (csd *consulServiceDiscovery) GetServices() *gxset.HashSet {
} }
// encodeConsulMetadata because consul validate key strictly.
func encodeConsulMetadata(metadata map[string]string) map[string]string {
if metadata == nil {
metadata = make(map[string]string, 1)
}
encoder := base64.RawStdEncoding
for k, v := range metadata {
delete(metadata, k)
metadata[encoder.EncodeToString([]byte(k))] = v
}
return metadata
}
// nolint
func decodeConsulMetadata(metadata map[string]string) map[string]string {
if metadata == nil {
metadata = make(map[string]string, 1)
}
encoder := base64.RawStdEncoding
for k, v := range metadata {
delete(metadata, k)
kBytes, err := encoder.DecodeString(k)
if err != nil {
logger.Warnf("can not decoded consul metadata key %s", k)
continue
}
metadata[string(kBytes)] = v
}
return metadata
}
func (csd *consulServiceDiscovery) GetInstances(serviceName string) []registry.ServiceInstance { func (csd *consulServiceDiscovery) GetInstances(serviceName string) []registry.ServiceInstance {
waitTime := csd.serviceUrl.GetParamInt(constant.WATCH_TIMEOUT, constant.DEFAULT_WATCH_TIMEOUT) / 1000 waitTime := csd.serviceUrl.GetParamInt(constant.WATCH_TIMEOUT, constant.DEFAULT_WATCH_TIMEOUT) / 1000
instances, _, err := csd.consulClient.Health().Service(serviceName, csd.tag, true, &consul.QueryOptions{ instances, _, err := csd.consulClient.Health().Service(serviceName, csd.tag, true, &consul.QueryOptions{
...@@ -241,6 +274,7 @@ func (csd *consulServiceDiscovery) GetInstances(serviceName string) []registry.S ...@@ -241,6 +274,7 @@ func (csd *consulServiceDiscovery) GetInstances(serviceName string) []registry.S
enableStr := metadata[enable] enableStr := metadata[enable]
delete(metadata, enable) delete(metadata, enable)
enable, _ := strconv.ParseBool(enableStr) enable, _ := strconv.ParseBool(enableStr)
metadata = decodeConsulMetadata(metadata)
// health status // health status
status := ins.Checks.AggregatedStatus() status := ins.Checks.AggregatedStatus()
...@@ -370,11 +404,8 @@ func (csd *consulServiceDiscovery) DispatchEvent(event *registry.ServiceInstance ...@@ -370,11 +404,8 @@ func (csd *consulServiceDiscovery) DispatchEvent(event *registry.ServiceInstance
func (csd *consulServiceDiscovery) buildRegisterInstance(instance registry.ServiceInstance) (*consul.AgentServiceRegistration, error) { func (csd *consulServiceDiscovery) buildRegisterInstance(instance registry.ServiceInstance) (*consul.AgentServiceRegistration, error) {
metadata := instance.GetMetadata() metadata := instance.GetMetadata()
if metadata == nil { metadata = encodeConsulMetadata(metadata)
metadata = make(map[string]string, 1)
}
metadata[enable] = strconv.FormatBool(instance.IsEnable()) metadata[enable] = strconv.FormatBool(instance.IsEnable())
// check // check
check := csd.buildCheck(instance) check := csd.buildCheck(instance)
...@@ -395,7 +426,7 @@ func (csd *consulServiceDiscovery) buildCheck(instance registry.ServiceInstance) ...@@ -395,7 +426,7 @@ func (csd *consulServiceDiscovery) buildCheck(instance registry.ServiceInstance)
deregister = constant.DEFAULT_DEREGISTER_TIME deregister = constant.DEFAULT_DEREGISTER_TIME
} }
return consul.AgentServiceCheck{ return consul.AgentServiceCheck{
CheckID: buildID(instance), //CheckID: buildID(instance),
TTL: strconv.FormatInt(csd.checkPassInterval/1000, 10) + "s", TTL: strconv.FormatInt(csd.checkPassInterval/1000, 10) + "s",
DeregisterCriticalServiceAfter: deregister, DeregisterCriticalServiceAfter: deregister,
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment