diff --git a/registry/etcdv3/listener.go b/registry/etcdv3/listener.go index f9b046a2c52814cd4e5ea38f9ea4c58c8bdb5bc4..6fa84126f82f134be8a8bbe3129d176a5e17d59d 100644 --- a/registry/etcdv3/listener.go +++ b/registry/etcdv3/listener.go @@ -97,7 +97,7 @@ func (l *configurationListener) Next() (*registry.ServiceEvent, error) { case e := <-l.events: logger.Infof("got etcd event %#v", e) - if e.ConfigType == remoting.EventTypeDel { + if e.ConfigType == remoting.EventTypeDel && l.registry.client.Valid() { select { case <-l.registry.Done(): logger.Warnf("update @result{%s}. But its connection to registry is invalid", e.Value) diff --git a/registry/kubernetes/listener.go b/registry/kubernetes/listener.go index 6ddcd4f07d945932e8ebbcb3e9f601fc6615872e..c1d4f70fed307f782e05f15a96ace6fc3f27afa9 100644 --- a/registry/kubernetes/listener.go +++ b/registry/kubernetes/listener.go @@ -97,7 +97,7 @@ func (l *configurationListener) Next() (*registry.ServiceEvent, error) { case e := <-l.events: logger.Infof("got kubernetes event %#v", e) - if e.ConfigType == remoting.EventTypeDel { + if e.ConfigType == remoting.EventTypeDel && !l.registry.client.Valid() { select { case <-l.registry.Done(): logger.Warnf("update @result{%s}. But its connection to registry is invalid", e.Value) diff --git a/remoting/kubernetes/client.go b/remoting/kubernetes/client.go index 2091cc2f1ef5a7deb3edbe314896c98bda75cfad..90e74aa8f2fd8911d5eca052ba2abf9204caf1aa 100644 --- a/remoting/kubernetes/client.go +++ b/remoting/kubernetes/client.go @@ -74,7 +74,7 @@ type Client struct { store Store // protect the wg && currentPod - lock sync.Mutex + lock sync.RWMutex // current pod status currentPod *v1.Pod // protect the maintenanceStatus loop && watcher @@ -636,8 +636,14 @@ func (c *Client) Valid() bool { case <-c.Done(): return false default: - return true } + c.lock.RLock() + if c.rawClient == nil { + c.lock.RUnlock() + return false + } + c.lock.RUnlock() + return true } // Done diff --git a/remoting/kubernetes/store.go b/remoting/kubernetes/store.go index 06f32cc0cbc5b56981cf0b9a5c7d2420a985d8de..51365c14c7c4709515f88bd5995c777dbaf25dd9 100644 --- a/remoting/kubernetes/store.go +++ b/remoting/kubernetes/store.go @@ -22,9 +22,7 @@ import ( "strconv" "strings" "sync" -) -import ( perrors "github.com/pkg/errors" ) @@ -112,14 +110,19 @@ type storeImpl struct { watchers map[uint64]*watcher } -func (s *storeImpl) loop() { +// on stop +// when the store was closed +func (s *storeImpl) onStop() { select { case <-s.ctx.Done(): + // parent ctx be canceled, close the store s.lock.Lock() - defer s.lock.Unlock() - for _, w := range s.watchers { + watchers := s.watchers + s.lock.Unlock() + + for _, w := range watchers { // stop data stream close(w.ch) // stop watcher @@ -145,6 +148,7 @@ func (s *storeImpl) Done() <-chan struct{} { func (s *storeImpl) Put(object *Object) error { sendMsg := func(object *Object, w *watcher) { + s.lock.Lock() defer s.lock.Unlock() select { @@ -325,6 +329,6 @@ func newStore(ctx context.Context) Store { cache: map[string]*Object{}, watchers: map[uint64]*watcher{}, } - go s.loop() + go s.onStop() return s }