diff --git a/registry/consul/service_discovery.go b/registry/consul/service_discovery.go index 590efbcd548751f441e54f926d77ac68bffc9e2e..b57f7719111e428804a5e56b38c3ec6246668a6e 100644 --- a/registry/consul/service_discovery.go +++ b/registry/consul/service_discovery.go @@ -18,6 +18,7 @@ package consul import ( + "encoding/base64" "fmt" "strconv" "strings" @@ -143,6 +144,7 @@ func (csd *consulServiceDiscovery) Register(instance registry.ServiceInstance) e ins, _ := csd.buildRegisterInstance(instance) err := csd.consulClient.Agent().ServiceRegister(ins) if err != nil { + logger.Errorf("consul register the instance %s fail:%v", instance.GetServiceName(), err) return perrors.WithMessage(err, "consul could not register the instance. "+instance.GetServiceName()) } @@ -162,7 +164,7 @@ func (csd *consulServiceDiscovery) registerTtl(instance registry.ServiceInstance select { case <-timer.C: timer.Reset(period) - err := csd.consulClient.Agent().PassTTL(checkID, "") + err := csd.consulClient.Agent().PassTTL(fmt.Sprintf("service:%s", checkID), "") if err != nil { logger.Warnf("pass ttl heartbeat fail:%v", err) break @@ -223,6 +225,37 @@ func (csd *consulServiceDiscovery) GetServices() *gxset.HashSet { } +// encodeConsulMetadata because consul validate key strictly. +func encodeConsulMetadata(metadata map[string]string) map[string]string { + if metadata == nil { + metadata = make(map[string]string, 1) + } + encoder := base64.RawStdEncoding + for k, v := range metadata { + delete(metadata, k) + metadata[encoder.EncodeToString([]byte(k))] = v + } + return metadata +} + +// nolint +func decodeConsulMetadata(metadata map[string]string) map[string]string { + if metadata == nil { + metadata = make(map[string]string, 1) + } + encoder := base64.RawStdEncoding + for k, v := range metadata { + delete(metadata, k) + kBytes, err := encoder.DecodeString(k) + if err != nil { + logger.Warnf("can not decoded consul metadata key %s", k) + continue + } + metadata[string(kBytes)] = v + } + return metadata +} + func (csd *consulServiceDiscovery) GetInstances(serviceName string) []registry.ServiceInstance { waitTime := csd.serviceUrl.GetParamInt(constant.WATCH_TIMEOUT, constant.DEFAULT_WATCH_TIMEOUT) / 1000 instances, _, err := csd.consulClient.Health().Service(serviceName, csd.tag, true, &consul.QueryOptions{ @@ -241,6 +274,7 @@ func (csd *consulServiceDiscovery) GetInstances(serviceName string) []registry.S enableStr := metadata[enable] delete(metadata, enable) enable, _ := strconv.ParseBool(enableStr) + metadata = decodeConsulMetadata(metadata) // health status status := ins.Checks.AggregatedStatus() @@ -370,11 +404,8 @@ func (csd *consulServiceDiscovery) DispatchEvent(event *registry.ServiceInstance func (csd *consulServiceDiscovery) buildRegisterInstance(instance registry.ServiceInstance) (*consul.AgentServiceRegistration, error) { metadata := instance.GetMetadata() - if metadata == nil { - metadata = make(map[string]string, 1) - } + metadata = encodeConsulMetadata(metadata) metadata[enable] = strconv.FormatBool(instance.IsEnable()) - // check check := csd.buildCheck(instance) @@ -395,7 +426,7 @@ func (csd *consulServiceDiscovery) buildCheck(instance registry.ServiceInstance) deregister = constant.DEFAULT_DEREGISTER_TIME } return consul.AgentServiceCheck{ - CheckID: buildID(instance), + //CheckID: buildID(instance), TTL: strconv.FormatInt(csd.checkPassInterval/1000, 10) + "s", DeregisterCriticalServiceAfter: deregister, }