Skip to content
Snippets Groups Projects
Commit 938de29c authored by zhangshen023's avatar zhangshen023
Browse files

refactor repeated code

parent 2959bc7b
No related branches found
No related tags found
No related merge requests found
......@@ -92,6 +92,7 @@ func newConsulServiceDiscovery(name string) (registry.ServiceDiscovery, error) {
type consulServiceDiscovery struct {
// descriptor is a short string about the basic information of this instance
descriptor string
clientLock sync.RWMutex
// Consul client.
consulClient *consul.Client
checkPassInterval int64
......@@ -99,7 +100,6 @@ type consulServiceDiscovery struct {
address string
deregisterCriticalServiceAfter string
ttl sync.Map
clientLock sync.RWMutex
*consul.Config
}
......@@ -108,16 +108,25 @@ func (csd *consulServiceDiscovery) String() string {
}
// nolint
func (csd *consulServiceDiscovery) getConsulClient() (*consul.Client, error) {
func (csd *consulServiceDiscovery) getConsulClient() (consulClient consul.Client, err error) {
csd.clientLock.RLock()
defer csd.clientLock.RUnlock()
if csd.consulClient == nil {
return nil, perrors.New("consul client is destroyed or not ready!")
err = perrors.New("consul client is destroyed or not ready!")
return
}
return csd.consulClient, nil
return *csd.consulClient, nil
}
func (csd *consulServiceDiscovery) Destroy() error {
// nolint
func (csd *consulServiceDiscovery) setConsulClient(consulClient *consul.Client) {
csd.clientLock.Lock()
csd.consulClient = nil
csd.clientLock.Unlock()
defer csd.clientLock.Unlock()
csd.consulClient = consulClient
}
func (csd *consulServiceDiscovery) Destroy() error {
csd.setConsulClient(nil)
csd.ttl.Range(func(key, t interface{}) bool {
close(t.(chan struct{}))
csd.ttl.Delete(key)
......@@ -129,14 +138,12 @@ func (csd *consulServiceDiscovery) Destroy() error {
func (csd *consulServiceDiscovery) Register(instance registry.ServiceInstance) error {
var (
err error
consulClient *consul.Client
consulClient consul.Client
)
ins, _ := csd.buildRegisterInstance(instance)
csd.clientLock.RLock()
if consulClient, err = csd.getConsulClient(); err == nil {
err = consulClient.Agent().ServiceRegister(ins)
}
csd.clientLock.RUnlock()
if err != nil {
logger.Errorf("consul register the instance %s fail:%v", instance.GetServiceName(), err)
return perrors.WithMessage(err, "consul could not register the instance. "+instance.GetServiceName())
......@@ -148,7 +155,7 @@ func (csd *consulServiceDiscovery) Register(instance registry.ServiceInstance) e
func (csd *consulServiceDiscovery) registerTtl(instance registry.ServiceInstance) error {
var (
err error
consulClient *consul.Client
consulClient consul.Client
)
checkID := buildID(instance)
......@@ -163,11 +170,9 @@ func (csd *consulServiceDiscovery) registerTtl(instance registry.ServiceInstance
for {
select {
case <-timer.C:
csd.clientLock.RLock()
if consulClient, err = csd.getConsulClient(); err == nil {
err = consulClient.Agent().PassTTL(fmt.Sprintf("service:%s", checkID), "")
}
csd.clientLock.RUnlock()
if err != nil {
logger.Warnf("pass ttl heartbeat fail:%v", err)
break
......@@ -186,30 +191,27 @@ func (csd *consulServiceDiscovery) registerTtl(instance registry.ServiceInstance
func (csd *consulServiceDiscovery) Update(instance registry.ServiceInstance) error {
var (
err error
consulClient *consul.Client
consulClient consul.Client
)
ins, _ := csd.buildRegisterInstance(instance)
csd.clientLock.RLock()
defer csd.clientLock.RUnlock()
if consulClient, err = csd.getConsulClient(); err == nil {
consulClient, err = csd.getConsulClient()
if err == nil {
err = consulClient.Agent().ServiceDeregister(buildID(instance))
}
if err != nil {
logger.Warnf("unregister instance %s fail:%v", instance.GetServiceName(), err)
}
return csd.consulClient.Agent().ServiceRegister(ins)
return consulClient.Agent().ServiceRegister(ins)
}
func (csd *consulServiceDiscovery) Unregister(instance registry.ServiceInstance) error {
var (
err error
consulClient *consul.Client
consulClient consul.Client
)
csd.clientLock.RLock()
if consulClient, err = csd.getConsulClient(); err == nil {
err = consulClient.Agent().ServiceDeregister(buildID(instance))
}
csd.clientLock.RUnlock()
if err != nil {
logger.Errorf("unregister service instance %s,error: %v", instance.GetId(), err)
return err
......@@ -231,7 +233,7 @@ func (csd *consulServiceDiscovery) GetDefaultPageSize() int {
func (csd *consulServiceDiscovery) GetServices() *gxset.HashSet {
var (
err error
consulClient *consul.Client
consulClient consul.Client
services map[string][]string
)
var res = gxset.NewSet()
......@@ -280,7 +282,7 @@ func decodeConsulMetadata(metadata map[string]string) map[string]string {
func (csd *consulServiceDiscovery) GetInstances(serviceName string) []registry.ServiceInstance {
var (
err error
consulClient *consul.Client
consulClient consul.Client
instances []*consul.ServiceEntry
)
csd.clientLock.RLock()
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment