From bec9f3ced54d60dcdc5c29b89145ac09bf6a61fa Mon Sep 17 00:00:00 2001 From: scott <scottwangsxll@gmail.com> Date: Sun, 15 Mar 2020 17:13:36 +0800 Subject: [PATCH] Rename watcher and store name --- remoting/kubernetes/client.go | 20 ++--- remoting/kubernetes/listener.go | 2 +- remoting/kubernetes/{store.go => watch.go} | 86 ++++++++++--------- .../{store_test.go => watch_test.go} | 4 +- 4 files changed, 57 insertions(+), 55 deletions(-) rename remoting/kubernetes/{store.go => watch.go} (79%) rename remoting/kubernetes/{store_test.go => watch_test.go} (97%) diff --git a/remoting/kubernetes/client.go b/remoting/kubernetes/client.go index 34601e560..3e4b26c0e 100644 --- a/remoting/kubernetes/client.go +++ b/remoting/kubernetes/client.go @@ -71,7 +71,7 @@ type Client struct { ns string // the memory store - store Store + store WatcherSet // protect the wg && currentPod lock sync.RWMutex @@ -132,7 +132,7 @@ func newMockClient(namespace string, mockClientGenerator func() (kubernetes.Inte ns: namespace, rawClient: rawClient, ctx: ctx, - store: newStore(ctx), + store: newWatcherSet(ctx), cancel: cancel, } @@ -185,7 +185,7 @@ func newClient(namespace string) (*Client, error) { cfg: cfg, rawClient: rawClient, ctx: ctx, - store: newStore(ctx), + store: newWatcherSet(ctx), cancel: cancel, } @@ -415,10 +415,10 @@ func (c *Client) handleWatchedPodEvent(p *v1.Pod, eventType watch.EventType) { // unmarshalRecord // unmarshal the kubernetes dubbo annotation value -func (c *Client) unmarshalRecord(record string) ([]*Object, error) { +func (c *Client) unmarshalRecord(record string) ([]*WatcherEvent, error) { if len(record) == 0 { - // []*Object is nil. + // []*WatcherEvent is nil. return nil, nil } @@ -427,7 +427,7 @@ func (c *Client) unmarshalRecord(record string) ([]*Object, error) { return nil, perrors.WithMessagef(err, "decode record (%s)", record) } - var out []*Object + var out []*WatcherEvent if err := json.Unmarshal(rawMsg, &out); err != nil { return nil, perrors.WithMessage(err, "decode json") } @@ -436,7 +436,7 @@ func (c *Client) unmarshalRecord(record string) ([]*Object, error) { // marshalRecord // marshal the kubernetes dubbo annotation value -func (c *Client) marshalRecord(ol []*Object) (string, error) { +func (c *Client) marshalRecord(ol []*WatcherEvent) (string, error) { msg, err := json.Marshal(ol) if err != nil { @@ -552,7 +552,7 @@ func (c *Client) assembleDUBBOAnnotations(k, v string, currentPod *v1.Pod) (oldP return } - newAnnotations, err := c.marshalRecord(append(al, &Object{Key: k, Value: v})) + newAnnotations, err := c.marshalRecord(append(al, &WatcherEvent{Key: k, Value: v})) if err != nil { err = perrors.WithMessage(err, "marshal record") return @@ -605,7 +605,7 @@ func (c *Client) GetChildren(k string) ([]string, []string, error) { // Watch // watch on spec key -func (c *Client) Watch(k string) (<-chan *Object, <-chan struct{}, error) { +func (c *Client) Watch(k string) (<-chan *WatcherEvent, <-chan struct{}, error) { w, err := c.store.Watch(k, false) if err != nil { @@ -617,7 +617,7 @@ func (c *Client) Watch(k string) (<-chan *Object, <-chan struct{}, error) { // Watch // watch on spec prefix -func (c *Client) WatchWithPrefix(prefix string) (<-chan *Object, <-chan struct{}, error) { +func (c *Client) WatchWithPrefix(prefix string) (<-chan *WatcherEvent, <-chan struct{}, error) { w, err := c.store.Watch(prefix, true) if err != nil { diff --git a/remoting/kubernetes/listener.go b/remoting/kubernetes/listener.go index 0425aaf92..83b84785e 100644 --- a/remoting/kubernetes/listener.go +++ b/remoting/kubernetes/listener.go @@ -89,7 +89,7 @@ func (l *EventListener) ListenServiceNodeEvent(key string, listener ...remoting. // return true mean the event type is DELETE // return false mean the event type is CREATE || UPDATE -func (l *EventListener) handleEvents(event *Object, listeners ...remoting.DataListener) bool { +func (l *EventListener) handleEvents(event *WatcherEvent, listeners ...remoting.DataListener) bool { logger.Infof("got a kubernetes-store event {type: %d, key: %s}", event.EventType, event.Key) diff --git a/remoting/kubernetes/store.go b/remoting/kubernetes/watch.go similarity index 79% rename from remoting/kubernetes/store.go rename to remoting/kubernetes/watch.go index a2cc99419..4f36b3522 100644 --- a/remoting/kubernetes/store.go +++ b/remoting/kubernetes/watch.go @@ -22,7 +22,9 @@ import ( "strconv" "strings" "sync" +) +import ( perrors "github.com/pkg/errors" ) @@ -57,9 +59,9 @@ func (e eventType) String() string { } } -// Object +// WatcherEvent // object is element in store -type Object struct { +type WatcherEvent struct { // event-type EventType eventType `json:"-"` // the dubbo-go should consume the key @@ -68,14 +70,14 @@ type Object struct { Value string `json:"v"` } -// Watchable Store -type Store interface { +// Watchable WatcherSet +type WatcherSet interface { // put the object to the store - Put(object *Object) error + Put(object *WatcherEvent) error // if prefix is false, - // the len([]*Object) == 1 - Get(key string, prefix bool) ([]*Object, error) + // the len([]*WatcherEvent) == 1 + Get(key string, prefix bool) ([]*WatcherEvent, error) // watch the spec key or key prefix Watch(key string, prefix bool) (Watcher, error) // check the store status @@ -87,7 +89,7 @@ type Watcher interface { // the watcher's id ID() string // result stream - ResultChan() <-chan *Object + ResultChan() <-chan *WatcherEvent // Stop the watcher stop() // check the watcher status @@ -95,7 +97,7 @@ type Watcher interface { } // the store -type storeImpl struct { +type watcherSetImpl struct { // Client's ctx, client die, the store will die too ctx context.Context @@ -104,15 +106,15 @@ type storeImpl struct { lock sync.RWMutex // the key is dubbo-go interest meta - cache map[string]*Object + cache map[string]*WatcherEvent currentWatcherId uint64 watchers map[uint64]*watcher } -// wait exit +// closeWatchers // when the store was closed -func (s *storeImpl) waitExit() { +func (s *watcherSetImpl) closeWatchers() { select { case <-s.ctx.Done(): @@ -133,21 +135,21 @@ func (s *storeImpl) waitExit() { // Watch // watch on spec key, with or without prefix -func (s *storeImpl) Watch(key string, prefix bool) (Watcher, error) { +func (s *watcherSetImpl) Watch(key string, prefix bool) (Watcher, error) { return s.addWatcher(key, prefix) } // Done // get the store status -func (s *storeImpl) Done() <-chan struct{} { +func (s *watcherSetImpl) Done() <-chan struct{} { return s.ctx.Done() } // Put // put the object to store -func (s *storeImpl) Put(object *Object) error { +func (s *watcherSetImpl) Put(object *WatcherEvent) error { - sendMsg := func(object *Object, w *watcher) { + sendMsg := func(object *WatcherEvent, w *watcher) { select { case <-w.done(): @@ -202,8 +204,7 @@ func (s *storeImpl) Put(object *Object) error { } // valid -// valid the client status should protected by lock -func (s *storeImpl) valid() error { +func (s *watcherSetImpl) valid() error { select { case <-s.ctx.Done(): return ErrStoreAlreadyStopped @@ -213,34 +214,35 @@ func (s *storeImpl) valid() error { } // addWatcher -func (s *storeImpl) addWatcher(key string, prefix bool) (Watcher, error) { +func (s *watcherSetImpl) addWatcher(key string, prefix bool) (Watcher, error) { + + if err := s.valid(); err != nil { + return nil, err + } + + s.lock.Lock() + defer s.lock.Unlock() + + // increase the watcher-id + s.currentWatcherId++ w := &watcher{ + id: s.currentWatcherId, store: s, interested: struct { key string prefix bool }{key: key, prefix: prefix}, - ch: make(chan *Object, defaultWatcherChanSize), + ch: make(chan *WatcherEvent, defaultWatcherChanSize), exit: make(chan struct{}), } - - s.lock.Lock() - defer s.lock.Unlock() - - if err := s.valid(); err != nil { - return nil, err - } - s.watchers[s.currentWatcherId] = w - w.id = s.currentWatcherId - s.currentWatcherId = s.currentWatcherId + 1 return w, nil } // Get // get elements from cache -func (s *storeImpl) Get(key string, prefix bool) ([]*Object, error) { +func (s *watcherSetImpl) Get(key string, prefix bool) ([]*WatcherEvent, error) { s.lock.RLock() defer s.lock.RUnlock() @@ -252,14 +254,14 @@ func (s *storeImpl) Get(key string, prefix bool) ([]*Object, error) { if !prefix { for k, v := range s.cache { if k == key { - return []*Object{v}, nil + return []*WatcherEvent{v}, nil } } // object return nil, ErrKVPairNotFound } - var out []*Object + var out []*WatcherEvent for k, v := range s.cache { if strings.Contains(k, key) { @@ -279,21 +281,21 @@ type watcher struct { id uint64 // the underlay store - store *storeImpl + store *watcherSetImpl // the interest topic interested struct { key string prefix bool } - ch chan *Object + ch chan *WatcherEvent closeOnce sync.Once exit chan struct{} } // ResultChan -func (w *watcher) ResultChan() <-chan *Object { +func (w *watcher) ResultChan() <-chan *WatcherEvent { return w.ch } @@ -319,14 +321,14 @@ func (w *watcher) done() <-chan struct{} { return w.exit } -// newStore -// new store from parent context -func newStore(ctx context.Context) Store { - s := &storeImpl{ +// newWatcherSet +// new watcher set from parent context +func newWatcherSet(ctx context.Context) WatcherSet { + s := &watcherSetImpl{ ctx: ctx, - cache: map[string]*Object{}, + cache: map[string]*WatcherEvent{}, watchers: map[uint64]*watcher{}, } - go s.waitExit() + go s.closeWatchers() return s } diff --git a/remoting/kubernetes/store_test.go b/remoting/kubernetes/watch_test.go similarity index 97% rename from remoting/kubernetes/store_test.go rename to remoting/kubernetes/watch_test.go index fc6df0602..10fbc7eb5 100644 --- a/remoting/kubernetes/store_test.go +++ b/remoting/kubernetes/watch_test.go @@ -30,7 +30,7 @@ func TestStore(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) defer cancel() - s := newStore(ctx) + s := newWatcherSet(ctx) wg := sync.WaitGroup{} @@ -82,7 +82,7 @@ func TestStore(t *testing.T) { for i := 0; i < 5; i++ { go func(i int) { - if err := s.Put(&Object{ + if err := s.Put(&WatcherEvent{ Key: "key-" + strconv.Itoa(i), Value: strconv.Itoa(i), }); err != nil { -- GitLab