diff --git a/registry/consul/service_discovery.go b/registry/consul/service_discovery.go index ea4ea99ed3322a77669afb399a599c06626d06c6..698758735abfacaceb68440a760975cf7b751e0d 100644 --- a/registry/consul/service_discovery.go +++ b/registry/consul/service_discovery.go @@ -84,6 +84,7 @@ func newConsulServiceDiscovery(name string) (registry.ServiceDiscovery, error) { tag: remoteConfig.Params[constant.QUERY_TAG], consulClient: client, deregisterCriticalServiceAfter: getDeregisterAfter(remoteConfig.Params), + clientLock: sync.Mutex{}, }, nil } @@ -98,6 +99,7 @@ type consulServiceDiscovery struct { address string deregisterCriticalServiceAfter string ttl sync.Map + clientLock sync.Mutex *consul.Config } @@ -105,8 +107,17 @@ func (csd *consulServiceDiscovery) String() string { return csd.descriptor } +// nolint +func (csd *consulServiceDiscovery) getConsulClient() (*consul.Client, error) { + if csd.consulClient == nil { + return nil, perrors.New("consul client is destroyed or not ready!") + } + return csd.consulClient, nil +} func (csd *consulServiceDiscovery) Destroy() error { + csd.clientLock.Lock() csd.consulClient = nil + csd.clientLock.Unlock() csd.ttl.Range(func(key, t interface{}) bool { close(t.(chan struct{})) csd.ttl.Delete(key) @@ -116,8 +127,16 @@ func (csd *consulServiceDiscovery) Destroy() error { } func (csd *consulServiceDiscovery) Register(instance registry.ServiceInstance) error { + var ( + err error + consulClient *consul.Client + ) ins, _ := csd.buildRegisterInstance(instance) - err := csd.consulClient.Agent().ServiceRegister(ins) + csd.clientLock.Lock() + if consulClient, err = csd.getConsulClient(); err == nil { + err = consulClient.Agent().ServiceRegister(ins) + } + csd.clientLock.Unlock() if err != nil { logger.Errorf("consul register the instance %s fail:%v", instance.GetServiceName(), err) return perrors.WithMessage(err, "consul could not register the instance. "+instance.GetServiceName()) @@ -127,19 +146,28 @@ func (csd *consulServiceDiscovery) Register(instance registry.ServiceInstance) e } func (csd *consulServiceDiscovery) registerTtl(instance registry.ServiceInstance) error { + var ( + err error + consulClient *consul.Client + ) + checkID := buildID(instance) stopChan := make(chan struct{}) csd.ttl.LoadOrStore(buildID(instance), stopChan) period := time.Duration(csd.checkPassInterval/8) * time.Millisecond - timer := time.NewTimer(period) + timer := time.NewTicker(period) go func() { + defer timer.Stop() for { select { case <-timer.C: - timer.Reset(period) - err := csd.consulClient.Agent().PassTTL(fmt.Sprintf("service:%s", checkID), "") + csd.clientLock.Lock() + if consulClient, err = csd.getConsulClient(); err == nil { + err = consulClient.Agent().PassTTL(fmt.Sprintf("service:%s", checkID), "") + } + csd.clientLock.Unlock() if err != nil { logger.Warnf("pass ttl heartbeat fail:%v", err) break @@ -156,8 +184,16 @@ func (csd *consulServiceDiscovery) registerTtl(instance registry.ServiceInstance } func (csd *consulServiceDiscovery) Update(instance registry.ServiceInstance) error { + var ( + err error + consulClient *consul.Client + ) ins, _ := csd.buildRegisterInstance(instance) - err := csd.consulClient.Agent().ServiceDeregister(buildID(instance)) + csd.clientLock.Lock() + defer csd.clientLock.Unlock() + if consulClient, err = csd.getConsulClient(); err == nil { + err = consulClient.Agent().ServiceDeregister(buildID(instance)) + } if err != nil { logger.Warnf("unregister instance %s fail:%v", instance.GetServiceName(), err) } @@ -165,12 +201,19 @@ func (csd *consulServiceDiscovery) Update(instance registry.ServiceInstance) err } func (csd *consulServiceDiscovery) Unregister(instance registry.ServiceInstance) error { - err := csd.consulClient.Agent().ServiceDeregister(buildID(instance)) + var ( + err error + consulClient *consul.Client + ) + csd.clientLock.Lock() + if consulClient, err = csd.getConsulClient(); err == nil { + err = consulClient.Agent().ServiceDeregister(buildID(instance)) + } + csd.clientLock.Unlock() if err != nil { logger.Errorf("unregister service instance %s,error: %v", instance.GetId(), err) return err } - stopChanel, ok := csd.ttl.Load(buildID(instance)) if !ok { logger.Warnf("ttl for service instance %s didn't exist", instance.GetId()) @@ -186,9 +229,17 @@ func (csd *consulServiceDiscovery) GetDefaultPageSize() int { } func (csd *consulServiceDiscovery) GetServices() *gxset.HashSet { - + var ( + err error + consulClient *consul.Client + services map[string][]string + ) var res = gxset.NewSet() - services, _, err := csd.consulClient.Catalog().Services(nil) + csd.clientLock.Lock() + if consulClient, err = csd.getConsulClient(); err == nil { + services, _, err = consulClient.Catalog().Services(nil) + } + csd.clientLock.Unlock() if err != nil { logger.Errorf("get services,error: %v", err) return res @@ -227,9 +278,19 @@ func decodeConsulMetadata(metadata map[string]string) map[string]string { } func (csd *consulServiceDiscovery) GetInstances(serviceName string) []registry.ServiceInstance { - instances, _, err := csd.consulClient.Health().Service(serviceName, csd.tag, true, &consul.QueryOptions{ - WaitTime: time.Duration(csd.checkPassInterval), - }) + var ( + err error + consulClient *consul.Client + instances []*consul.ServiceEntry + ) + csd.clientLock.Lock() + if consulClient, err = csd.getConsulClient(); err == nil { + instances, _, err = consulClient.Health().Service(serviceName, csd.tag, true, &consul.QueryOptions{ + WaitTime: time.Duration(csd.checkPassInterval), + }) + } + csd.clientLock.Unlock() + if err != nil { logger.Errorf("get instances for service %s,error: %v", serviceName, err) return nil