Skip to content
Snippets Groups Projects
Commit 2959bc7b authored by zhangshen023's avatar zhangshen023
Browse files

use rwlock

parent ea2f679a
No related branches found
No related tags found
No related merge requests found
...@@ -84,7 +84,7 @@ func newConsulServiceDiscovery(name string) (registry.ServiceDiscovery, error) { ...@@ -84,7 +84,7 @@ func newConsulServiceDiscovery(name string) (registry.ServiceDiscovery, error) {
tag: remoteConfig.Params[constant.QUERY_TAG], tag: remoteConfig.Params[constant.QUERY_TAG],
consulClient: client, consulClient: client,
deregisterCriticalServiceAfter: getDeregisterAfter(remoteConfig.Params), deregisterCriticalServiceAfter: getDeregisterAfter(remoteConfig.Params),
clientLock: sync.Mutex{}, clientLock: sync.RWMutex{},
}, nil }, nil
} }
...@@ -99,7 +99,7 @@ type consulServiceDiscovery struct { ...@@ -99,7 +99,7 @@ type consulServiceDiscovery struct {
address string address string
deregisterCriticalServiceAfter string deregisterCriticalServiceAfter string
ttl sync.Map ttl sync.Map
clientLock sync.Mutex clientLock sync.RWMutex
*consul.Config *consul.Config
} }
...@@ -132,11 +132,11 @@ func (csd *consulServiceDiscovery) Register(instance registry.ServiceInstance) e ...@@ -132,11 +132,11 @@ func (csd *consulServiceDiscovery) Register(instance registry.ServiceInstance) e
consulClient *consul.Client consulClient *consul.Client
) )
ins, _ := csd.buildRegisterInstance(instance) ins, _ := csd.buildRegisterInstance(instance)
csd.clientLock.Lock() csd.clientLock.RLock()
if consulClient, err = csd.getConsulClient(); err == nil { if consulClient, err = csd.getConsulClient(); err == nil {
err = consulClient.Agent().ServiceRegister(ins) err = consulClient.Agent().ServiceRegister(ins)
} }
csd.clientLock.Unlock() csd.clientLock.RUnlock()
if err != nil { if err != nil {
logger.Errorf("consul register the instance %s fail:%v", instance.GetServiceName(), err) logger.Errorf("consul register the instance %s fail:%v", instance.GetServiceName(), err)
return perrors.WithMessage(err, "consul could not register the instance. "+instance.GetServiceName()) return perrors.WithMessage(err, "consul could not register the instance. "+instance.GetServiceName())
...@@ -163,11 +163,11 @@ func (csd *consulServiceDiscovery) registerTtl(instance registry.ServiceInstance ...@@ -163,11 +163,11 @@ func (csd *consulServiceDiscovery) registerTtl(instance registry.ServiceInstance
for { for {
select { select {
case <-timer.C: case <-timer.C:
csd.clientLock.Lock() csd.clientLock.RLock()
if consulClient, err = csd.getConsulClient(); err == nil { if consulClient, err = csd.getConsulClient(); err == nil {
err = consulClient.Agent().PassTTL(fmt.Sprintf("service:%s", checkID), "") err = consulClient.Agent().PassTTL(fmt.Sprintf("service:%s", checkID), "")
} }
csd.clientLock.Unlock() csd.clientLock.RUnlock()
if err != nil { if err != nil {
logger.Warnf("pass ttl heartbeat fail:%v", err) logger.Warnf("pass ttl heartbeat fail:%v", err)
break break
...@@ -189,8 +189,8 @@ func (csd *consulServiceDiscovery) Update(instance registry.ServiceInstance) err ...@@ -189,8 +189,8 @@ func (csd *consulServiceDiscovery) Update(instance registry.ServiceInstance) err
consulClient *consul.Client consulClient *consul.Client
) )
ins, _ := csd.buildRegisterInstance(instance) ins, _ := csd.buildRegisterInstance(instance)
csd.clientLock.Lock() csd.clientLock.RLock()
defer csd.clientLock.Unlock() defer csd.clientLock.RUnlock()
if consulClient, err = csd.getConsulClient(); err == nil { if consulClient, err = csd.getConsulClient(); err == nil {
err = consulClient.Agent().ServiceDeregister(buildID(instance)) err = consulClient.Agent().ServiceDeregister(buildID(instance))
} }
...@@ -205,11 +205,11 @@ func (csd *consulServiceDiscovery) Unregister(instance registry.ServiceInstance) ...@@ -205,11 +205,11 @@ func (csd *consulServiceDiscovery) Unregister(instance registry.ServiceInstance)
err error err error
consulClient *consul.Client consulClient *consul.Client
) )
csd.clientLock.Lock() csd.clientLock.RLock()
if consulClient, err = csd.getConsulClient(); err == nil { if consulClient, err = csd.getConsulClient(); err == nil {
err = consulClient.Agent().ServiceDeregister(buildID(instance)) err = consulClient.Agent().ServiceDeregister(buildID(instance))
} }
csd.clientLock.Unlock() csd.clientLock.RUnlock()
if err != nil { if err != nil {
logger.Errorf("unregister service instance %s,error: %v", instance.GetId(), err) logger.Errorf("unregister service instance %s,error: %v", instance.GetId(), err)
return err return err
...@@ -235,11 +235,11 @@ func (csd *consulServiceDiscovery) GetServices() *gxset.HashSet { ...@@ -235,11 +235,11 @@ func (csd *consulServiceDiscovery) GetServices() *gxset.HashSet {
services map[string][]string services map[string][]string
) )
var res = gxset.NewSet() var res = gxset.NewSet()
csd.clientLock.Lock() csd.clientLock.RLock()
if consulClient, err = csd.getConsulClient(); err == nil { if consulClient, err = csd.getConsulClient(); err == nil {
services, _, err = consulClient.Catalog().Services(nil) services, _, err = consulClient.Catalog().Services(nil)
} }
csd.clientLock.Unlock() csd.clientLock.RUnlock()
if err != nil { if err != nil {
logger.Errorf("get services,error: %v", err) logger.Errorf("get services,error: %v", err)
return res return res
...@@ -283,13 +283,13 @@ func (csd *consulServiceDiscovery) GetInstances(serviceName string) []registry.S ...@@ -283,13 +283,13 @@ func (csd *consulServiceDiscovery) GetInstances(serviceName string) []registry.S
consulClient *consul.Client consulClient *consul.Client
instances []*consul.ServiceEntry instances []*consul.ServiceEntry
) )
csd.clientLock.Lock() csd.clientLock.RLock()
if consulClient, err = csd.getConsulClient(); err == nil { if consulClient, err = csd.getConsulClient(); err == nil {
instances, _, err = consulClient.Health().Service(serviceName, csd.tag, true, &consul.QueryOptions{ instances, _, err = consulClient.Health().Service(serviceName, csd.tag, true, &consul.QueryOptions{
WaitTime: time.Duration(csd.checkPassInterval), WaitTime: time.Duration(csd.checkPassInterval),
}) })
} }
csd.clientLock.Unlock() csd.clientLock.RUnlock()
if err != nil { if err != nil {
logger.Errorf("get instances for service %s,error: %v", serviceName, err) logger.Errorf("get instances for service %s,error: %v", serviceName, err)
......
...@@ -44,13 +44,7 @@ var ( ...@@ -44,13 +44,7 @@ var (
consulCheckPassInterval = 17000 consulCheckPassInterval = 17000
consulDeregisterCriticalServiceAfter = "20s" consulDeregisterCriticalServiceAfter = "20s"
consulWatchTimeout = 60000 consulWatchTimeout = 60000
registryURL = common.URL{ registryURL = common.URL{}
Path: "",
Username: "",
Password: "",
Methods: nil,
SubURL: nil,
}
) )
func TestConsulServiceDiscovery_newConsulServiceDiscovery(t *testing.T) { func TestConsulServiceDiscovery_newConsulServiceDiscovery(t *testing.T) {
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment