diff --git a/remoting/kubernetes/client.go b/remoting/kubernetes/client.go index 3e4b26c0efee483fa69435f953714a1292036630..26bf627d4be296ffa61044c8f621b4ee19652583 100644 --- a/remoting/kubernetes/client.go +++ b/remoting/kubernetes/client.go @@ -70,14 +70,14 @@ type Client struct { ns string - // the memory store - store WatcherSet + // the memory watcherSet + watcherSet WatcherSet // protect the wg && currentPod lock sync.RWMutex // current pod status currentPod *v1.Pod - // protect the maintenanceStatus loop && watcher + // protect the watchPods loop && watcher wg sync.WaitGroup // manage the client lifecycle @@ -132,7 +132,7 @@ func newMockClient(namespace string, mockClientGenerator func() (kubernetes.Inte ns: namespace, rawClient: rawClient, ctx: ctx, - store: newWatcherSet(ctx), + watcherSet: newWatcherSet(ctx), cancel: cancel, } @@ -144,13 +144,13 @@ func newMockClient(namespace string, mockClientGenerator func() (kubernetes.Inte // record current status c.currentPod = currentPod - // init the store by current pods - if err := c.initStore(); err != nil { - return nil, perrors.WithMessage(err, "init store") + // init the watcherSet by current pods + if err := c.initWatchSet(); err != nil { + return nil, perrors.WithMessage(err, "init watcherSet") } // start kubernetes watch loop - if err := c.maintenanceStatus(); err != nil { + if err := c.watchPods(); err != nil { return nil, perrors.WithMessage(err, "maintenance the kubernetes status") } @@ -185,7 +185,7 @@ func newClient(namespace string) (*Client, error) { cfg: cfg, rawClient: rawClient, ctx: ctx, - store: newWatcherSet(ctx), + watcherSet: newWatcherSet(ctx), cancel: cancel, } @@ -197,13 +197,13 @@ func newClient(namespace string) (*Client, error) { // record current status c.currentPod = currentPod - // init the store by current pods - if err := c.initStore(); err != nil { - return nil, perrors.WithMessage(err, "init store") + // init the watcherSet by current pods + if err := c.initWatchSet(); err != nil { + return nil, perrors.WithMessage(err, "init watcherSet") } // start kubernetes watch loop - if err := c.maintenanceStatus(); err != nil { + if err := c.watchPods(); err != nil { return nil, perrors.WithMessage(err, "maintenance the kubernetes status") } @@ -243,10 +243,10 @@ func (c *Client) initCurrentPod() (*v1.Pod, error) { return currentPod, nil } -// initStore +// initWatchSet // 1. get all with dubbo label pods -// 2. put every element to store -func (c *Client) initStore() error { +// 2. put every element to watcherSet +func (c *Client) initWatchSet() error { pods, err := c.rawClient.CoreV1().Pods(c.ns).List(metav1.ListOptions{ LabelSelector: fields.OneTermEqualSelector(DubboIOLabelKey, DubboIOLabelValue).String(), @@ -263,9 +263,9 @@ func (c *Client) initStore() error { return nil } -// maintenanceStatus +// watchPods // try to watch kubernetes pods -func (c *Client) maintenanceStatus() error { +func (c *Client) watchPods() error { // try once watcher, err := c.rawClient.CoreV1().Pods(c.ns).Watch(metav1.ListOptions{ @@ -280,18 +280,18 @@ func (c *Client) maintenanceStatus() error { c.wg.Add(1) // add wg, grace close the client - go c.maintenanceStatusLoop() + go c.watchPodsLoop() return nil } -// maintenanceStatus +// watchPods // try to notify -func (c *Client) maintenanceStatusLoop() { +func (c *Client) watchPodsLoop() { defer func() { // notify other goroutine, this loop over c.wg.Done() - logger.Info("maintenanceStatusLoop goroutine game over") + logger.Info("watchPodsLoop goroutine game over") }() var lastResourceVersion string @@ -401,10 +401,10 @@ func (c *Client) handleWatchedPodEvent(p *v1.Pod, eventType watch.EventType) { return } - logger.Debugf("prepare to put object (%#v) to kuberentes-store", o) + logger.Debugf("prepare to put object (%#v) to kuberentes-watcherSet", o) - if err := c.store.Put(o); err != nil { - logger.Errorf("put (%#v) to cache store: %v ", o, err) + if err := c.watcherSet.Put(o); err != nil { + logger.Errorf("put (%#v) to cache watcherSet: %v ", o, err) return } @@ -487,7 +487,7 @@ func (c *Client) Create(k, v string) error { } c.currentPod = updatedPod - // not update the store, the store should be write by the maintenanceStatusLoop + // not update the watcherSet, the watcherSet should be write by the watchPodsLoop return nil } @@ -584,12 +584,12 @@ func (c *Client) getPatch(oldPod, newPod *v1.Pod) ([]byte, error) { } // GetChildren -// get k children list from kubernetes-store +// get k children list from kubernetes-watcherSet func (c *Client) GetChildren(k string) ([]string, []string, error) { - objectList, err := c.store.Get(k, true) + objectList, err := c.watcherSet.Get(k, true) if err != nil { - return nil, nil, perrors.WithMessagef(err, "get children from store on (%s)", k) + return nil, nil, perrors.WithMessagef(err, "get children from watcherSet on (%s)", k) } var kList []string @@ -607,7 +607,7 @@ func (c *Client) GetChildren(k string) ([]string, []string, error) { // watch on spec key func (c *Client) Watch(k string) (<-chan *WatcherEvent, <-chan struct{}, error) { - w, err := c.store.Watch(k, false) + w, err := c.watcherSet.Watch(k, false) if err != nil { return nil, nil, perrors.WithMessagef(err, "watch on (%s)", k) } @@ -619,7 +619,7 @@ func (c *Client) Watch(k string) (<-chan *WatcherEvent, <-chan struct{}, error) // watch on spec prefix func (c *Client) WatchWithPrefix(prefix string) (<-chan *WatcherEvent, <-chan struct{}, error) { - w, err := c.store.Watch(prefix, true) + w, err := c.watcherSet.Watch(prefix, true) if err != nil { return nil, nil, perrors.WithMessagef(err, "watch on prefix (%s)", prefix) } @@ -665,7 +665,7 @@ func (c *Client) Close() { c.cancel() // the client ctx be canceled - // will trigger the store watchers all stopped + // will trigger the watcherSet watchers all stopped // so, just wait c.wg.Wait() } diff --git a/remoting/kubernetes/client_test.go b/remoting/kubernetes/client_test.go index 0ffdda42a5a2c5fcda4e1bf831c91302a4e70bbf..d8ed78291974c5e348d5edf72c077ab2a1f4021e 100644 --- a/remoting/kubernetes/client_test.go +++ b/remoting/kubernetes/client_test.go @@ -347,7 +347,7 @@ func (s *KubernetesClientTestSuite) TestClientGetChildrenKVList() { return } case <-done: - t.Log("the store watcher was stopped") + t.Log("the watcherSet watcher was stopped") return } } @@ -416,7 +416,7 @@ func (s *KubernetesClientTestSuite) TestClientWatchPrefix() { case e := <-wc: t.Logf("got event %v k %s v %s", e.EventType, e.Key, e.Value) case <-done: - t.Log("the store watcher was stopped") + t.Log("the watcherSet watcher was stopped") return } } @@ -471,7 +471,7 @@ func (s *KubernetesClientTestSuite) TestClientWatch() { case e := <-wc: t.Logf("got event %v k %s v %s", e.EventType, e.Key, e.Value) case <-done: - t.Log("the store watcher was stopped") + t.Log("the watcherSet watcher was stopped") return } } diff --git a/remoting/kubernetes/listener.go b/remoting/kubernetes/listener.go index 83b84785e87d7d44a1f2d649b2a0a25d45b82aaa..575734ab30f839d0fa1264581d629dd5e27ac68d 100644 --- a/remoting/kubernetes/listener.go +++ b/remoting/kubernetes/listener.go @@ -65,15 +65,15 @@ func (l *EventListener) ListenServiceNodeEvent(key string, listener ...remoting. logger.Warnf("kubernetes client stopped") return false - // store watcher stopped + // watcherSet watcher stopped case <-done: - logger.Warnf("kubernetes store watcher stopped") + logger.Warnf("kubernetes watcherSet watcher stopped") return false - // handle kubernetes-store events + // handle kubernetes-watcherSet events case e, ok := <-wc: if !ok { - logger.Warnf("kubernetes-store watch-chan closed") + logger.Warnf("kubernetes-watcherSet watch-chan closed") return false } @@ -91,12 +91,12 @@ func (l *EventListener) ListenServiceNodeEvent(key string, listener ...remoting. // return false mean the event type is CREATE || UPDATE func (l *EventListener) handleEvents(event *WatcherEvent, listeners ...remoting.DataListener) bool { - logger.Infof("got a kubernetes-store event {type: %d, key: %s}", event.EventType, event.Key) + logger.Infof("got a kubernetes-watcherSet event {type: %d, key: %s}", event.EventType, event.Key) switch event.EventType { case Create: for _, listener := range listeners { - logger.Infof("kubernetes-store get event (key{%s}) = event{EventNodeDataCreated}", event.Key) + logger.Infof("kubernetes-watcherSet get event (key{%s}) = event{EventNodeDataCreated}", event.Key) listener.DataChange(remoting.Event{ Path: string(event.Key), Action: remoting.EventTypeAdd, @@ -106,7 +106,7 @@ func (l *EventListener) handleEvents(event *WatcherEvent, listeners ...remoting. return false case Update: for _, listener := range listeners { - logger.Infof("kubernetes-store get event (key{%s}) = event{EventNodeDataChanged}", event.Key) + logger.Infof("kubernetes-watcherSet get event (key{%s}) = event{EventNodeDataChanged}", event.Key) listener.DataChange(remoting.Event{ Path: string(event.Key), Action: remoting.EventTypeUpdate, @@ -115,7 +115,7 @@ func (l *EventListener) handleEvents(event *WatcherEvent, listeners ...remoting. } return false case Delete: - logger.Warnf("kubernetes-store get event (key{%s}) = event{EventNodeDeleted}", event.Key) + logger.Warnf("kubernetes-watcherSet get event (key{%s}) = event{EventNodeDeleted}", event.Key) return true default: return false @@ -141,14 +141,14 @@ func (l *EventListener) ListenServiceNodeEventWithPrefix(prefix string, listener // watcher stopped case <-done: - logger.Warnf("kubernetes store watcher stopped") + logger.Warnf("kubernetes watcherSet watcher stopped") return - // kuberentes-store event stream + // kuberentes-watcherSet event stream case e, ok := <-wc: if !ok { - logger.Warnf("kubernetes-store watch-chan closed") + logger.Warnf("kubernetes-watcherSet watch-chan closed") return } @@ -171,7 +171,7 @@ func (l *EventListener) ListenServiceEvent(key string, listener remoting.DataLis _, ok := l.keyMap[key] l.keyMapLock.RUnlock() if ok { - logger.Warnf("kubernetes-store key %s has already been listened.", key) + logger.Warnf("kubernetes-watcherSet key %s has already been listened.", key) return } @@ -201,7 +201,7 @@ func (l *EventListener) ListenServiceEvent(key string, listener remoting.DataLis }) } - logger.Infof("listen dubbo provider key{%s} event and wait to get all provider from kubernetes-store", key) + logger.Infof("listen dubbo provider key{%s} event and wait to get all provider from kubernetes-watcherSet", key) go func(key string, listener remoting.DataListener) { l.ListenServiceNodeEventWithPrefix(key, listener) logger.Warnf("listenDirEvent(key{%s}) goroutine exit now", key) diff --git a/remoting/kubernetes/watch.go b/remoting/kubernetes/watch.go index 4f36b3522c04365d7edf58b4934a0e0f4f87bdb6..835bcdb9b7c1df99c56d140a382974fffe10594d 100644 --- a/remoting/kubernetes/watch.go +++ b/remoting/kubernetes/watch.go @@ -29,8 +29,8 @@ import ( ) var ( - ErrStoreAlreadyStopped = perrors.New("the store already be stopped") - ErrKVPairNotFound = perrors.New("k/v pair not found") + ErrWatcherSetAlreadyStopped = perrors.New("the watcher-set already be stopped") + ErrKVPairNotFound = perrors.New("k/v pair not found") ) const ( @@ -60,7 +60,7 @@ func (e eventType) String() string { } // WatcherEvent -// object is element in store +// watch event is element in watcherSet type WatcherEvent struct { // event-type EventType eventType `json:"-"` @@ -73,14 +73,14 @@ type WatcherEvent struct { // Watchable WatcherSet type WatcherSet interface { - // put the object to the store + // put the watch event to the watch set Put(object *WatcherEvent) error // if prefix is false, // the len([]*WatcherEvent) == 1 Get(key string, prefix bool) ([]*WatcherEvent, error) // watch the spec key or key prefix Watch(key string, prefix bool) (Watcher, error) - // check the store status + // check the watcher set status Done() <-chan struct{} } @@ -96,13 +96,13 @@ type Watcher interface { done() <-chan struct{} } -// the store +// the watch set implement type watcherSetImpl struct { - // Client's ctx, client die, the store will die too + // Client's ctx, client die, the watch set will die too ctx context.Context - // protect store and watchers + // protect watcher-set and watchers lock sync.RWMutex // the key is dubbo-go interest meta @@ -113,13 +113,13 @@ type watcherSetImpl struct { } // closeWatchers -// when the store was closed +// when the watcher-set was closed func (s *watcherSetImpl) closeWatchers() { select { case <-s.ctx.Done(): - // parent ctx be canceled, close the store + // parent ctx be canceled, close the watch-set's watchers s.lock.Lock() watchers := s.watchers s.lock.Unlock() @@ -140,14 +140,14 @@ func (s *watcherSetImpl) Watch(key string, prefix bool) (Watcher, error) { } // Done -// get the store status +// get the watcher-set status func (s *watcherSetImpl) Done() <-chan struct{} { return s.ctx.Done() } // Put -// put the object to store -func (s *watcherSetImpl) Put(object *WatcherEvent) error { +// put the watch event to watcher-set +func (s *watcherSetImpl) Put(watcherEvent *WatcherEvent) error { sendMsg := func(object *WatcherEvent, w *watcher) { @@ -166,39 +166,39 @@ func (s *watcherSetImpl) Put(object *WatcherEvent) error { return err } - // put to store - if object.EventType == Delete { - delete(s.cache, object.Key) + // put to watcher-set + if watcherEvent.EventType == Delete { + delete(s.cache, watcherEvent.Key) } else { - old, ok := s.cache[object.Key] + old, ok := s.cache[watcherEvent.Key] if ok { - if old.Value == object.Value { + if old.Value == watcherEvent.Value { // already have this k/v pair return nil } } - // refresh the object - s.cache[object.Key] = object + // refresh the watcherEvent + s.cache[watcherEvent.Key] = watcherEvent } // notify watcher for _, w := range s.watchers { - if !strings.Contains(object.Key, w.interested.key) { + if !strings.Contains(watcherEvent.Key, w.interested.key) { // this watcher no interest in this element continue } if !w.interested.prefix { - if object.Key == w.interested.key { - go sendMsg(object, w) + if watcherEvent.Key == w.interested.key { + go sendMsg(watcherEvent, w) } // not interest continue } - go sendMsg(object, w) + go sendMsg(watcherEvent, w) } return nil } @@ -207,7 +207,7 @@ func (s *watcherSetImpl) Put(object *WatcherEvent) error { func (s *watcherSetImpl) valid() error { select { case <-s.ctx.Done(): - return ErrStoreAlreadyStopped + return ErrWatcherSetAlreadyStopped default: return nil } @@ -227,8 +227,8 @@ func (s *watcherSetImpl) addWatcher(key string, prefix bool) (Watcher, error) { s.currentWatcherId++ w := &watcher{ - id: s.currentWatcherId, - store: s, + id: s.currentWatcherId, + watcherSet: s, interested: struct { key string prefix bool @@ -241,7 +241,7 @@ func (s *watcherSetImpl) addWatcher(key string, prefix bool) (Watcher, error) { } // Get -// get elements from cache +// get elements from watcher-set func (s *watcherSetImpl) Get(key string, prefix bool) ([]*WatcherEvent, error) { s.lock.RLock() @@ -276,12 +276,12 @@ func (s *watcherSetImpl) Get(key string, prefix bool) ([]*WatcherEvent, error) { return out, nil } -// the store watcher +// the watcher-set watcher type watcher struct { id uint64 - // the underlay store - store *watcherSetImpl + // the underlay watcherSet + watcherSet *watcherSetImpl // the interest topic interested struct {