Skip to content
Snippets Groups Projects
Commit 8120caa8 authored by scott's avatar scott
Browse files

Fix method name bug,and handle the del event in config-listener.

parent ef950def
No related branches found
No related tags found
No related merge requests found
......@@ -97,7 +97,7 @@ func (l *configurationListener) Next() (*registry.ServiceEvent, error) {
case e := <-l.events:
logger.Infof("got etcd event %#v", e)
if e.ConfigType == remoting.EventTypeDel {
if e.ConfigType == remoting.EventTypeDel && l.registry.client.Valid() {
select {
case <-l.registry.Done():
logger.Warnf("update @result{%s}. But its connection to registry is invalid", e.Value)
......
......@@ -97,7 +97,7 @@ func (l *configurationListener) Next() (*registry.ServiceEvent, error) {
case e := <-l.events:
logger.Infof("got kubernetes event %#v", e)
if e.ConfigType == remoting.EventTypeDel {
if e.ConfigType == remoting.EventTypeDel && !l.registry.client.Valid() {
select {
case <-l.registry.Done():
logger.Warnf("update @result{%s}. But its connection to registry is invalid", e.Value)
......
......@@ -74,7 +74,7 @@ type Client struct {
store Store
// protect the wg && currentPod
lock sync.Mutex
lock sync.RWMutex
// current pod status
currentPod *v1.Pod
// protect the maintenanceStatus loop && watcher
......@@ -636,8 +636,14 @@ func (c *Client) Valid() bool {
case <-c.Done():
return false
default:
return true
}
c.lock.RLock()
if c.rawClient == nil {
c.lock.RUnlock()
return false
}
c.lock.RUnlock()
return true
}
// Done
......
......@@ -22,9 +22,7 @@ import (
"strconv"
"strings"
"sync"
)
import (
perrors "github.com/pkg/errors"
)
......@@ -112,14 +110,19 @@ type storeImpl struct {
watchers map[uint64]*watcher
}
func (s *storeImpl) loop() {
// on stop
// when the store was closed
func (s *storeImpl) onStop() {
select {
case <-s.ctx.Done():
// parent ctx be canceled, close the store
s.lock.Lock()
defer s.lock.Unlock()
for _, w := range s.watchers {
watchers := s.watchers
s.lock.Unlock()
for _, w := range watchers {
// stop data stream
close(w.ch)
// stop watcher
......@@ -145,6 +148,7 @@ func (s *storeImpl) Done() <-chan struct{} {
func (s *storeImpl) Put(object *Object) error {
sendMsg := func(object *Object, w *watcher) {
s.lock.Lock()
defer s.lock.Unlock()
select {
......@@ -325,6 +329,6 @@ func newStore(ctx context.Context) Store {
cache: map[string]*Object{},
watchers: map[uint64]*watcher{},
}
go s.loop()
go s.onStop()
return s
}
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment