diff --git a/registry/consul/service_discovery.go b/registry/consul/service_discovery.go index db039508e7609a7e3ac277c383e774fb2455ed29..23743ee68f49a7a8a056e51789393002123ff853 100644 --- a/registry/consul/service_discovery.go +++ b/registry/consul/service_discovery.go @@ -92,6 +92,7 @@ func newConsulServiceDiscovery(name string) (registry.ServiceDiscovery, error) { type consulServiceDiscovery struct { // descriptor is a short string about the basic information of this instance descriptor string + clientLock sync.RWMutex // Consul client. consulClient *consul.Client checkPassInterval int64 @@ -99,7 +100,6 @@ type consulServiceDiscovery struct { address string deregisterCriticalServiceAfter string ttl sync.Map - clientLock sync.RWMutex *consul.Config } @@ -108,16 +108,25 @@ func (csd *consulServiceDiscovery) String() string { } // nolint -func (csd *consulServiceDiscovery) getConsulClient() (*consul.Client, error) { +func (csd *consulServiceDiscovery) getConsulClient() (consulClient consul.Client, err error) { + csd.clientLock.RLock() + defer csd.clientLock.RUnlock() if csd.consulClient == nil { - return nil, perrors.New("consul client is destroyed or not ready!") + err = perrors.New("consul client is destroyed or not ready!") + return } - return csd.consulClient, nil + return *csd.consulClient, nil } -func (csd *consulServiceDiscovery) Destroy() error { + +// nolint +func (csd *consulServiceDiscovery) setConsulClient(consulClient *consul.Client) { csd.clientLock.Lock() - csd.consulClient = nil - csd.clientLock.Unlock() + defer csd.clientLock.Unlock() + csd.consulClient = consulClient +} + +func (csd *consulServiceDiscovery) Destroy() error { + csd.setConsulClient(nil) csd.ttl.Range(func(key, t interface{}) bool { close(t.(chan struct{})) csd.ttl.Delete(key) @@ -129,14 +138,12 @@ func (csd *consulServiceDiscovery) Destroy() error { func (csd *consulServiceDiscovery) Register(instance registry.ServiceInstance) error { var ( err error - consulClient *consul.Client + consulClient consul.Client ) ins, _ := csd.buildRegisterInstance(instance) - csd.clientLock.RLock() if consulClient, err = csd.getConsulClient(); err == nil { err = consulClient.Agent().ServiceRegister(ins) } - csd.clientLock.RUnlock() if err != nil { logger.Errorf("consul register the instance %s fail:%v", instance.GetServiceName(), err) return perrors.WithMessage(err, "consul could not register the instance. "+instance.GetServiceName()) @@ -148,7 +155,7 @@ func (csd *consulServiceDiscovery) Register(instance registry.ServiceInstance) e func (csd *consulServiceDiscovery) registerTtl(instance registry.ServiceInstance) error { var ( err error - consulClient *consul.Client + consulClient consul.Client ) checkID := buildID(instance) @@ -163,11 +170,9 @@ func (csd *consulServiceDiscovery) registerTtl(instance registry.ServiceInstance for { select { case <-timer.C: - csd.clientLock.RLock() if consulClient, err = csd.getConsulClient(); err == nil { err = consulClient.Agent().PassTTL(fmt.Sprintf("service:%s", checkID), "") } - csd.clientLock.RUnlock() if err != nil { logger.Warnf("pass ttl heartbeat fail:%v", err) break @@ -186,30 +191,27 @@ func (csd *consulServiceDiscovery) registerTtl(instance registry.ServiceInstance func (csd *consulServiceDiscovery) Update(instance registry.ServiceInstance) error { var ( err error - consulClient *consul.Client + consulClient consul.Client ) ins, _ := csd.buildRegisterInstance(instance) - csd.clientLock.RLock() - defer csd.clientLock.RUnlock() - if consulClient, err = csd.getConsulClient(); err == nil { + consulClient, err = csd.getConsulClient() + if err == nil { err = consulClient.Agent().ServiceDeregister(buildID(instance)) } if err != nil { logger.Warnf("unregister instance %s fail:%v", instance.GetServiceName(), err) } - return csd.consulClient.Agent().ServiceRegister(ins) + return consulClient.Agent().ServiceRegister(ins) } func (csd *consulServiceDiscovery) Unregister(instance registry.ServiceInstance) error { var ( err error - consulClient *consul.Client + consulClient consul.Client ) - csd.clientLock.RLock() if consulClient, err = csd.getConsulClient(); err == nil { err = consulClient.Agent().ServiceDeregister(buildID(instance)) } - csd.clientLock.RUnlock() if err != nil { logger.Errorf("unregister service instance %s,error: %v", instance.GetId(), err) return err @@ -231,7 +233,7 @@ func (csd *consulServiceDiscovery) GetDefaultPageSize() int { func (csd *consulServiceDiscovery) GetServices() *gxset.HashSet { var ( err error - consulClient *consul.Client + consulClient consul.Client services map[string][]string ) var res = gxset.NewSet() @@ -280,7 +282,7 @@ func decodeConsulMetadata(metadata map[string]string) map[string]string { func (csd *consulServiceDiscovery) GetInstances(serviceName string) []registry.ServiceInstance { var ( err error - consulClient *consul.Client + consulClient consul.Client instances []*consul.ServiceEntry ) csd.clientLock.RLock()