Skip to content
Snippets Groups Projects
Commit 96448f40 authored by scott's avatar scott
Browse files

Fix named err

parent bec9f3ce
No related branches found
No related tags found
No related merge requests found
......@@ -70,14 +70,14 @@ type Client struct {
ns string
// the memory store
store WatcherSet
// the memory watcherSet
watcherSet WatcherSet
// protect the wg && currentPod
lock sync.RWMutex
// current pod status
currentPod *v1.Pod
// protect the maintenanceStatus loop && watcher
// protect the watchPods loop && watcher
wg sync.WaitGroup
// manage the client lifecycle
......@@ -132,7 +132,7 @@ func newMockClient(namespace string, mockClientGenerator func() (kubernetes.Inte
ns: namespace,
rawClient: rawClient,
ctx: ctx,
store: newWatcherSet(ctx),
watcherSet: newWatcherSet(ctx),
cancel: cancel,
}
......@@ -144,13 +144,13 @@ func newMockClient(namespace string, mockClientGenerator func() (kubernetes.Inte
// record current status
c.currentPod = currentPod
// init the store by current pods
if err := c.initStore(); err != nil {
return nil, perrors.WithMessage(err, "init store")
// init the watcherSet by current pods
if err := c.initWatchSet(); err != nil {
return nil, perrors.WithMessage(err, "init watcherSet")
}
// start kubernetes watch loop
if err := c.maintenanceStatus(); err != nil {
if err := c.watchPods(); err != nil {
return nil, perrors.WithMessage(err, "maintenance the kubernetes status")
}
......@@ -185,7 +185,7 @@ func newClient(namespace string) (*Client, error) {
cfg: cfg,
rawClient: rawClient,
ctx: ctx,
store: newWatcherSet(ctx),
watcherSet: newWatcherSet(ctx),
cancel: cancel,
}
......@@ -197,13 +197,13 @@ func newClient(namespace string) (*Client, error) {
// record current status
c.currentPod = currentPod
// init the store by current pods
if err := c.initStore(); err != nil {
return nil, perrors.WithMessage(err, "init store")
// init the watcherSet by current pods
if err := c.initWatchSet(); err != nil {
return nil, perrors.WithMessage(err, "init watcherSet")
}
// start kubernetes watch loop
if err := c.maintenanceStatus(); err != nil {
if err := c.watchPods(); err != nil {
return nil, perrors.WithMessage(err, "maintenance the kubernetes status")
}
......@@ -243,10 +243,10 @@ func (c *Client) initCurrentPod() (*v1.Pod, error) {
return currentPod, nil
}
// initStore
// initWatchSet
// 1. get all with dubbo label pods
// 2. put every element to store
func (c *Client) initStore() error {
// 2. put every element to watcherSet
func (c *Client) initWatchSet() error {
pods, err := c.rawClient.CoreV1().Pods(c.ns).List(metav1.ListOptions{
LabelSelector: fields.OneTermEqualSelector(DubboIOLabelKey, DubboIOLabelValue).String(),
......@@ -263,9 +263,9 @@ func (c *Client) initStore() error {
return nil
}
// maintenanceStatus
// watchPods
// try to watch kubernetes pods
func (c *Client) maintenanceStatus() error {
func (c *Client) watchPods() error {
// try once
watcher, err := c.rawClient.CoreV1().Pods(c.ns).Watch(metav1.ListOptions{
......@@ -280,18 +280,18 @@ func (c *Client) maintenanceStatus() error {
c.wg.Add(1)
// add wg, grace close the client
go c.maintenanceStatusLoop()
go c.watchPodsLoop()
return nil
}
// maintenanceStatus
// watchPods
// try to notify
func (c *Client) maintenanceStatusLoop() {
func (c *Client) watchPodsLoop() {
defer func() {
// notify other goroutine, this loop over
c.wg.Done()
logger.Info("maintenanceStatusLoop goroutine game over")
logger.Info("watchPodsLoop goroutine game over")
}()
var lastResourceVersion string
......@@ -401,10 +401,10 @@ func (c *Client) handleWatchedPodEvent(p *v1.Pod, eventType watch.EventType) {
return
}
logger.Debugf("prepare to put object (%#v) to kuberentes-store", o)
logger.Debugf("prepare to put object (%#v) to kuberentes-watcherSet", o)
if err := c.store.Put(o); err != nil {
logger.Errorf("put (%#v) to cache store: %v ", o, err)
if err := c.watcherSet.Put(o); err != nil {
logger.Errorf("put (%#v) to cache watcherSet: %v ", o, err)
return
}
......@@ -487,7 +487,7 @@ func (c *Client) Create(k, v string) error {
}
c.currentPod = updatedPod
// not update the store, the store should be write by the maintenanceStatusLoop
// not update the watcherSet, the watcherSet should be write by the watchPodsLoop
return nil
}
......@@ -584,12 +584,12 @@ func (c *Client) getPatch(oldPod, newPod *v1.Pod) ([]byte, error) {
}
// GetChildren
// get k children list from kubernetes-store
// get k children list from kubernetes-watcherSet
func (c *Client) GetChildren(k string) ([]string, []string, error) {
objectList, err := c.store.Get(k, true)
objectList, err := c.watcherSet.Get(k, true)
if err != nil {
return nil, nil, perrors.WithMessagef(err, "get children from store on (%s)", k)
return nil, nil, perrors.WithMessagef(err, "get children from watcherSet on (%s)", k)
}
var kList []string
......@@ -607,7 +607,7 @@ func (c *Client) GetChildren(k string) ([]string, []string, error) {
// watch on spec key
func (c *Client) Watch(k string) (<-chan *WatcherEvent, <-chan struct{}, error) {
w, err := c.store.Watch(k, false)
w, err := c.watcherSet.Watch(k, false)
if err != nil {
return nil, nil, perrors.WithMessagef(err, "watch on (%s)", k)
}
......@@ -619,7 +619,7 @@ func (c *Client) Watch(k string) (<-chan *WatcherEvent, <-chan struct{}, error)
// watch on spec prefix
func (c *Client) WatchWithPrefix(prefix string) (<-chan *WatcherEvent, <-chan struct{}, error) {
w, err := c.store.Watch(prefix, true)
w, err := c.watcherSet.Watch(prefix, true)
if err != nil {
return nil, nil, perrors.WithMessagef(err, "watch on prefix (%s)", prefix)
}
......@@ -665,7 +665,7 @@ func (c *Client) Close() {
c.cancel()
// the client ctx be canceled
// will trigger the store watchers all stopped
// will trigger the watcherSet watchers all stopped
// so, just wait
c.wg.Wait()
}
......
......@@ -347,7 +347,7 @@ func (s *KubernetesClientTestSuite) TestClientGetChildrenKVList() {
return
}
case <-done:
t.Log("the store watcher was stopped")
t.Log("the watcherSet watcher was stopped")
return
}
}
......@@ -416,7 +416,7 @@ func (s *KubernetesClientTestSuite) TestClientWatchPrefix() {
case e := <-wc:
t.Logf("got event %v k %s v %s", e.EventType, e.Key, e.Value)
case <-done:
t.Log("the store watcher was stopped")
t.Log("the watcherSet watcher was stopped")
return
}
}
......@@ -471,7 +471,7 @@ func (s *KubernetesClientTestSuite) TestClientWatch() {
case e := <-wc:
t.Logf("got event %v k %s v %s", e.EventType, e.Key, e.Value)
case <-done:
t.Log("the store watcher was stopped")
t.Log("the watcherSet watcher was stopped")
return
}
}
......
......@@ -65,15 +65,15 @@ func (l *EventListener) ListenServiceNodeEvent(key string, listener ...remoting.
logger.Warnf("kubernetes client stopped")
return false
// store watcher stopped
// watcherSet watcher stopped
case <-done:
logger.Warnf("kubernetes store watcher stopped")
logger.Warnf("kubernetes watcherSet watcher stopped")
return false
// handle kubernetes-store events
// handle kubernetes-watcherSet events
case e, ok := <-wc:
if !ok {
logger.Warnf("kubernetes-store watch-chan closed")
logger.Warnf("kubernetes-watcherSet watch-chan closed")
return false
}
......@@ -91,12 +91,12 @@ func (l *EventListener) ListenServiceNodeEvent(key string, listener ...remoting.
// return false mean the event type is CREATE || UPDATE
func (l *EventListener) handleEvents(event *WatcherEvent, listeners ...remoting.DataListener) bool {
logger.Infof("got a kubernetes-store event {type: %d, key: %s}", event.EventType, event.Key)
logger.Infof("got a kubernetes-watcherSet event {type: %d, key: %s}", event.EventType, event.Key)
switch event.EventType {
case Create:
for _, listener := range listeners {
logger.Infof("kubernetes-store get event (key{%s}) = event{EventNodeDataCreated}", event.Key)
logger.Infof("kubernetes-watcherSet get event (key{%s}) = event{EventNodeDataCreated}", event.Key)
listener.DataChange(remoting.Event{
Path: string(event.Key),
Action: remoting.EventTypeAdd,
......@@ -106,7 +106,7 @@ func (l *EventListener) handleEvents(event *WatcherEvent, listeners ...remoting.
return false
case Update:
for _, listener := range listeners {
logger.Infof("kubernetes-store get event (key{%s}) = event{EventNodeDataChanged}", event.Key)
logger.Infof("kubernetes-watcherSet get event (key{%s}) = event{EventNodeDataChanged}", event.Key)
listener.DataChange(remoting.Event{
Path: string(event.Key),
Action: remoting.EventTypeUpdate,
......@@ -115,7 +115,7 @@ func (l *EventListener) handleEvents(event *WatcherEvent, listeners ...remoting.
}
return false
case Delete:
logger.Warnf("kubernetes-store get event (key{%s}) = event{EventNodeDeleted}", event.Key)
logger.Warnf("kubernetes-watcherSet get event (key{%s}) = event{EventNodeDeleted}", event.Key)
return true
default:
return false
......@@ -141,14 +141,14 @@ func (l *EventListener) ListenServiceNodeEventWithPrefix(prefix string, listener
// watcher stopped
case <-done:
logger.Warnf("kubernetes store watcher stopped")
logger.Warnf("kubernetes watcherSet watcher stopped")
return
// kuberentes-store event stream
// kuberentes-watcherSet event stream
case e, ok := <-wc:
if !ok {
logger.Warnf("kubernetes-store watch-chan closed")
logger.Warnf("kubernetes-watcherSet watch-chan closed")
return
}
......@@ -171,7 +171,7 @@ func (l *EventListener) ListenServiceEvent(key string, listener remoting.DataLis
_, ok := l.keyMap[key]
l.keyMapLock.RUnlock()
if ok {
logger.Warnf("kubernetes-store key %s has already been listened.", key)
logger.Warnf("kubernetes-watcherSet key %s has already been listened.", key)
return
}
......@@ -201,7 +201,7 @@ func (l *EventListener) ListenServiceEvent(key string, listener remoting.DataLis
})
}
logger.Infof("listen dubbo provider key{%s} event and wait to get all provider from kubernetes-store", key)
logger.Infof("listen dubbo provider key{%s} event and wait to get all provider from kubernetes-watcherSet", key)
go func(key string, listener remoting.DataListener) {
l.ListenServiceNodeEventWithPrefix(key, listener)
logger.Warnf("listenDirEvent(key{%s}) goroutine exit now", key)
......
......@@ -29,8 +29,8 @@ import (
)
var (
ErrStoreAlreadyStopped = perrors.New("the store already be stopped")
ErrKVPairNotFound = perrors.New("k/v pair not found")
ErrWatcherSetAlreadyStopped = perrors.New("the watcher-set already be stopped")
ErrKVPairNotFound = perrors.New("k/v pair not found")
)
const (
......@@ -60,7 +60,7 @@ func (e eventType) String() string {
}
// WatcherEvent
// object is element in store
// watch event is element in watcherSet
type WatcherEvent struct {
// event-type
EventType eventType `json:"-"`
......@@ -73,14 +73,14 @@ type WatcherEvent struct {
// Watchable WatcherSet
type WatcherSet interface {
// put the object to the store
// put the watch event to the watch set
Put(object *WatcherEvent) error
// if prefix is false,
// the len([]*WatcherEvent) == 1
Get(key string, prefix bool) ([]*WatcherEvent, error)
// watch the spec key or key prefix
Watch(key string, prefix bool) (Watcher, error)
// check the store status
// check the watcher set status
Done() <-chan struct{}
}
......@@ -96,13 +96,13 @@ type Watcher interface {
done() <-chan struct{}
}
// the store
// the watch set implement
type watcherSetImpl struct {
// Client's ctx, client die, the store will die too
// Client's ctx, client die, the watch set will die too
ctx context.Context
// protect store and watchers
// protect watcher-set and watchers
lock sync.RWMutex
// the key is dubbo-go interest meta
......@@ -113,13 +113,13 @@ type watcherSetImpl struct {
}
// closeWatchers
// when the store was closed
// when the watcher-set was closed
func (s *watcherSetImpl) closeWatchers() {
select {
case <-s.ctx.Done():
// parent ctx be canceled, close the store
// parent ctx be canceled, close the watch-set's watchers
s.lock.Lock()
watchers := s.watchers
s.lock.Unlock()
......@@ -140,14 +140,14 @@ func (s *watcherSetImpl) Watch(key string, prefix bool) (Watcher, error) {
}
// Done
// get the store status
// get the watcher-set status
func (s *watcherSetImpl) Done() <-chan struct{} {
return s.ctx.Done()
}
// Put
// put the object to store
func (s *watcherSetImpl) Put(object *WatcherEvent) error {
// put the watch event to watcher-set
func (s *watcherSetImpl) Put(watcherEvent *WatcherEvent) error {
sendMsg := func(object *WatcherEvent, w *watcher) {
......@@ -166,39 +166,39 @@ func (s *watcherSetImpl) Put(object *WatcherEvent) error {
return err
}
// put to store
if object.EventType == Delete {
delete(s.cache, object.Key)
// put to watcher-set
if watcherEvent.EventType == Delete {
delete(s.cache, watcherEvent.Key)
} else {
old, ok := s.cache[object.Key]
old, ok := s.cache[watcherEvent.Key]
if ok {
if old.Value == object.Value {
if old.Value == watcherEvent.Value {
// already have this k/v pair
return nil
}
}
// refresh the object
s.cache[object.Key] = object
// refresh the watcherEvent
s.cache[watcherEvent.Key] = watcherEvent
}
// notify watcher
for _, w := range s.watchers {
if !strings.Contains(object.Key, w.interested.key) {
if !strings.Contains(watcherEvent.Key, w.interested.key) {
// this watcher no interest in this element
continue
}
if !w.interested.prefix {
if object.Key == w.interested.key {
go sendMsg(object, w)
if watcherEvent.Key == w.interested.key {
go sendMsg(watcherEvent, w)
}
// not interest
continue
}
go sendMsg(object, w)
go sendMsg(watcherEvent, w)
}
return nil
}
......@@ -207,7 +207,7 @@ func (s *watcherSetImpl) Put(object *WatcherEvent) error {
func (s *watcherSetImpl) valid() error {
select {
case <-s.ctx.Done():
return ErrStoreAlreadyStopped
return ErrWatcherSetAlreadyStopped
default:
return nil
}
......@@ -227,8 +227,8 @@ func (s *watcherSetImpl) addWatcher(key string, prefix bool) (Watcher, error) {
s.currentWatcherId++
w := &watcher{
id: s.currentWatcherId,
store: s,
id: s.currentWatcherId,
watcherSet: s,
interested: struct {
key string
prefix bool
......@@ -241,7 +241,7 @@ func (s *watcherSetImpl) addWatcher(key string, prefix bool) (Watcher, error) {
}
// Get
// get elements from cache
// get elements from watcher-set
func (s *watcherSetImpl) Get(key string, prefix bool) ([]*WatcherEvent, error) {
s.lock.RLock()
......@@ -276,12 +276,12 @@ func (s *watcherSetImpl) Get(key string, prefix bool) ([]*WatcherEvent, error) {
return out, nil
}
// the store watcher
// the watcher-set watcher
type watcher struct {
id uint64
// the underlay store
store *watcherSetImpl
// the underlay watcherSet
watcherSet *watcherSetImpl
// the interest topic
interested struct {
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment