Skip to content
Snippets Groups Projects
Commit d5dfaf97 authored by shen's avatar shen
Browse files

suit consul

parent e2384782
No related branches found
No related tags found
No related merge requests found
......@@ -18,6 +18,7 @@
package consul
import (
"encoding/base64"
"fmt"
"strconv"
"strings"
......@@ -143,6 +144,7 @@ func (csd *consulServiceDiscovery) Register(instance registry.ServiceInstance) e
ins, _ := csd.buildRegisterInstance(instance)
err := csd.consulClient.Agent().ServiceRegister(ins)
if err != nil {
logger.Errorf("consul register the instance %s fail:%v", instance.GetServiceName(), err)
return perrors.WithMessage(err, "consul could not register the instance. "+instance.GetServiceName())
}
......@@ -162,7 +164,7 @@ func (csd *consulServiceDiscovery) registerTtl(instance registry.ServiceInstance
select {
case <-timer.C:
timer.Reset(period)
err := csd.consulClient.Agent().PassTTL(checkID, "")
err := csd.consulClient.Agent().PassTTL(fmt.Sprintf("service:%s", checkID), "")
if err != nil {
logger.Warnf("pass ttl heartbeat fail:%v", err)
break
......@@ -223,6 +225,37 @@ func (csd *consulServiceDiscovery) GetServices() *gxset.HashSet {
}
// encodeConsulMetadata because consul validate key strictly.
func encodeConsulMetadata(metadata map[string]string) map[string]string {
if metadata == nil {
metadata = make(map[string]string, 1)
}
encoder := base64.RawStdEncoding
for k, v := range metadata {
delete(metadata, k)
metadata[encoder.EncodeToString([]byte(k))] = v
}
return metadata
}
// nolint
func decodeConsulMetadata(metadata map[string]string) map[string]string {
if metadata == nil {
metadata = make(map[string]string, 1)
}
encoder := base64.RawStdEncoding
for k, v := range metadata {
delete(metadata, k)
kBytes, err := encoder.DecodeString(k)
if err != nil {
logger.Warnf("can not decoded consul metadata key %s", k)
continue
}
metadata[string(kBytes)] = v
}
return metadata
}
func (csd *consulServiceDiscovery) GetInstances(serviceName string) []registry.ServiceInstance {
waitTime := csd.serviceUrl.GetParamInt(constant.WATCH_TIMEOUT, constant.DEFAULT_WATCH_TIMEOUT) / 1000
instances, _, err := csd.consulClient.Health().Service(serviceName, csd.tag, true, &consul.QueryOptions{
......@@ -241,6 +274,7 @@ func (csd *consulServiceDiscovery) GetInstances(serviceName string) []registry.S
enableStr := metadata[enable]
delete(metadata, enable)
enable, _ := strconv.ParseBool(enableStr)
metadata = decodeConsulMetadata(metadata)
// health status
status := ins.Checks.AggregatedStatus()
......@@ -370,11 +404,8 @@ func (csd *consulServiceDiscovery) DispatchEvent(event *registry.ServiceInstance
func (csd *consulServiceDiscovery) buildRegisterInstance(instance registry.ServiceInstance) (*consul.AgentServiceRegistration, error) {
metadata := instance.GetMetadata()
if metadata == nil {
metadata = make(map[string]string, 1)
}
metadata = encodeConsulMetadata(metadata)
metadata[enable] = strconv.FormatBool(instance.IsEnable())
// check
check := csd.buildCheck(instance)
......@@ -395,7 +426,7 @@ func (csd *consulServiceDiscovery) buildCheck(instance registry.ServiceInstance)
deregister = constant.DEFAULT_DEREGISTER_TIME
}
return consul.AgentServiceCheck{
CheckID: buildID(instance),
//CheckID: buildID(instance),
TTL: strconv.FormatInt(csd.checkPassInterval/1000, 10) + "s",
DeregisterCriticalServiceAfter: deregister,
}
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment