Skip to content
Snippets Groups Projects
Commit 14d668d6 authored by watermelon's avatar watermelon
Browse files

Fix: issue #951 etcd exit panic

parent 8e2aa4a8
No related branches found
No related tags found
No related merge requests found
......@@ -19,6 +19,7 @@ package etcdv3
import (
"strings"
"sync"
)
import (
......@@ -79,8 +80,9 @@ func (l *dataListener) DataChange(eventType remoting.Event) bool {
}
type configurationListener struct {
registry *etcdV3Registry
events chan *config_center.ConfigChangeEvent
registry *etcdV3Registry
events chan *config_center.ConfigChangeEvent
closeOnce sync.Once
}
// NewConfigurationListener for listening the event of etcdv3.
......@@ -119,6 +121,9 @@ func (l *configurationListener) Next() (*registry.ServiceEvent, error) {
}
// Close etcd registry center
// BugFix why no real close
func (l *configurationListener) Close() {
l.registry.WaitGroup().Done()
l.closeOnce.Do(func() {
l.registry.WaitGroup().Done()
})
}
......@@ -51,7 +51,7 @@ type etcdV3Registry struct {
registry.BaseRegistry
cltLock sync.Mutex
client *etcdv3.Client
listenerLock sync.Mutex
listenerLock sync.RWMutex
listener *etcdv3.EventListener
dataListener *dataListener
configListener *configurationListener
......@@ -150,14 +150,9 @@ func (r *etcdV3Registry) CreatePath(k string) error {
// DoSubscribe actually subscribe the provider URL
func (r *etcdV3Registry) DoSubscribe(svc *common.URL) (registry.Listener, error) {
var (
configListener *configurationListener
)
r.listenerLock.Lock()
configListener = r.configListener
r.listenerLock.Unlock()
r.listenerLock.RLock()
configListener := r.configListener
r.listenerLock.RUnlock()
if r.listener == nil {
r.cltLock.Lock()
client := r.client
......@@ -165,12 +160,8 @@ func (r *etcdV3Registry) DoSubscribe(svc *common.URL) (registry.Listener, error)
if client == nil {
return nil, perrors.New("etcd client broken")
}
// new client & listener
listener := etcdv3.NewEventListener(r.client)
r.listenerLock.Lock()
r.listener = listener
r.listener = etcdv3.NewEventListener(r.client) // new client & listener
r.listenerLock.Unlock()
}
......
......@@ -36,7 +36,7 @@ import (
// nolint
type EventListener struct {
client *Client
keyMapLock sync.Mutex
keyMapLock sync.RWMutex
keyMap map[string]struct{}
wg sync.WaitGroup
}
......@@ -181,9 +181,9 @@ func timeSecondDuration(sec int) time.Duration {
// --------> listenServiceNodeEvent
func (l *EventListener) ListenServiceEvent(key string, listener remoting.DataListener) {
l.keyMapLock.Lock()
l.keyMapLock.RLock()
_, ok := l.keyMap[key]
l.keyMapLock.Unlock()
l.keyMapLock.RUnlock()
if ok {
logger.Warnf("etcdv3 key %s has already been listened.", key)
return
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment