Skip to content
Snippets Groups Projects
Commit 77e59d10 authored by zhangshen023's avatar zhangshen023
Browse files

fix concurrency problems

parent 2fa6522c
No related branches found
No related tags found
No related merge requests found
......@@ -84,6 +84,7 @@ func newConsulServiceDiscovery(name string) (registry.ServiceDiscovery, error) {
tag: remoteConfig.Params[constant.QUERY_TAG],
consulClient: client,
deregisterCriticalServiceAfter: getDeregisterAfter(remoteConfig.Params),
clientLock: sync.Mutex{},
}, nil
}
......@@ -98,6 +99,7 @@ type consulServiceDiscovery struct {
address string
deregisterCriticalServiceAfter string
ttl sync.Map
clientLock sync.Mutex
*consul.Config
}
......@@ -105,8 +107,17 @@ func (csd *consulServiceDiscovery) String() string {
return csd.descriptor
}
// nolint
func (csd *consulServiceDiscovery) getConsulClient() (*consul.Client, error) {
if csd.consulClient == nil {
return nil, perrors.New("consul client is destroyed or not ready!")
}
return csd.consulClient, nil
}
func (csd *consulServiceDiscovery) Destroy() error {
csd.clientLock.Lock()
csd.consulClient = nil
csd.clientLock.Unlock()
csd.ttl.Range(func(key, t interface{}) bool {
close(t.(chan struct{}))
csd.ttl.Delete(key)
......@@ -116,8 +127,16 @@ func (csd *consulServiceDiscovery) Destroy() error {
}
func (csd *consulServiceDiscovery) Register(instance registry.ServiceInstance) error {
var (
err error
consulClient *consul.Client
)
ins, _ := csd.buildRegisterInstance(instance)
err := csd.consulClient.Agent().ServiceRegister(ins)
csd.clientLock.Lock()
if consulClient, err = csd.getConsulClient(); err == nil {
err = consulClient.Agent().ServiceRegister(ins)
}
csd.clientLock.Unlock()
if err != nil {
logger.Errorf("consul register the instance %s fail:%v", instance.GetServiceName(), err)
return perrors.WithMessage(err, "consul could not register the instance. "+instance.GetServiceName())
......@@ -127,19 +146,28 @@ func (csd *consulServiceDiscovery) Register(instance registry.ServiceInstance) e
}
func (csd *consulServiceDiscovery) registerTtl(instance registry.ServiceInstance) error {
var (
err error
consulClient *consul.Client
)
checkID := buildID(instance)
stopChan := make(chan struct{})
csd.ttl.LoadOrStore(buildID(instance), stopChan)
period := time.Duration(csd.checkPassInterval/8) * time.Millisecond
timer := time.NewTimer(period)
timer := time.NewTicker(period)
go func() {
defer timer.Stop()
for {
select {
case <-timer.C:
timer.Reset(period)
err := csd.consulClient.Agent().PassTTL(fmt.Sprintf("service:%s", checkID), "")
csd.clientLock.Lock()
if consulClient, err = csd.getConsulClient(); err == nil {
err = consulClient.Agent().PassTTL(fmt.Sprintf("service:%s", checkID), "")
}
csd.clientLock.Unlock()
if err != nil {
logger.Warnf("pass ttl heartbeat fail:%v", err)
break
......@@ -156,8 +184,16 @@ func (csd *consulServiceDiscovery) registerTtl(instance registry.ServiceInstance
}
func (csd *consulServiceDiscovery) Update(instance registry.ServiceInstance) error {
var (
err error
consulClient *consul.Client
)
ins, _ := csd.buildRegisterInstance(instance)
err := csd.consulClient.Agent().ServiceDeregister(buildID(instance))
csd.clientLock.Lock()
defer csd.clientLock.Unlock()
if consulClient, err = csd.getConsulClient(); err == nil {
err = consulClient.Agent().ServiceDeregister(buildID(instance))
}
if err != nil {
logger.Warnf("unregister instance %s fail:%v", instance.GetServiceName(), err)
}
......@@ -165,12 +201,19 @@ func (csd *consulServiceDiscovery) Update(instance registry.ServiceInstance) err
}
func (csd *consulServiceDiscovery) Unregister(instance registry.ServiceInstance) error {
err := csd.consulClient.Agent().ServiceDeregister(buildID(instance))
var (
err error
consulClient *consul.Client
)
csd.clientLock.Lock()
if consulClient, err = csd.getConsulClient(); err == nil {
err = consulClient.Agent().ServiceDeregister(buildID(instance))
}
csd.clientLock.Unlock()
if err != nil {
logger.Errorf("unregister service instance %s,error: %v", instance.GetId(), err)
return err
}
stopChanel, ok := csd.ttl.Load(buildID(instance))
if !ok {
logger.Warnf("ttl for service instance %s didn't exist", instance.GetId())
......@@ -186,9 +229,17 @@ func (csd *consulServiceDiscovery) GetDefaultPageSize() int {
}
func (csd *consulServiceDiscovery) GetServices() *gxset.HashSet {
var (
err error
consulClient *consul.Client
services map[string][]string
)
var res = gxset.NewSet()
services, _, err := csd.consulClient.Catalog().Services(nil)
csd.clientLock.Lock()
if consulClient, err = csd.getConsulClient(); err == nil {
services, _, err = consulClient.Catalog().Services(nil)
}
csd.clientLock.Unlock()
if err != nil {
logger.Errorf("get services,error: %v", err)
return res
......@@ -227,9 +278,19 @@ func decodeConsulMetadata(metadata map[string]string) map[string]string {
}
func (csd *consulServiceDiscovery) GetInstances(serviceName string) []registry.ServiceInstance {
instances, _, err := csd.consulClient.Health().Service(serviceName, csd.tag, true, &consul.QueryOptions{
WaitTime: time.Duration(csd.checkPassInterval),
})
var (
err error
consulClient *consul.Client
instances []*consul.ServiceEntry
)
csd.clientLock.Lock()
if consulClient, err = csd.getConsulClient(); err == nil {
instances, _, err = consulClient.Health().Service(serviceName, csd.tag, true, &consul.QueryOptions{
WaitTime: time.Duration(csd.checkPassInterval),
})
}
csd.clientLock.Unlock()
if err != nil {
logger.Errorf("get instances for service %s,error: %v", serviceName, err)
return nil
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment