Skip to content
Snippets Groups Projects
Commit bec9f3ce authored by scott's avatar scott
Browse files

Rename watcher and store name

parent c7d35ad4
No related branches found
No related tags found
No related merge requests found
......@@ -71,7 +71,7 @@ type Client struct {
ns string
// the memory store
store Store
store WatcherSet
// protect the wg && currentPod
lock sync.RWMutex
......@@ -132,7 +132,7 @@ func newMockClient(namespace string, mockClientGenerator func() (kubernetes.Inte
ns: namespace,
rawClient: rawClient,
ctx: ctx,
store: newStore(ctx),
store: newWatcherSet(ctx),
cancel: cancel,
}
......@@ -185,7 +185,7 @@ func newClient(namespace string) (*Client, error) {
cfg: cfg,
rawClient: rawClient,
ctx: ctx,
store: newStore(ctx),
store: newWatcherSet(ctx),
cancel: cancel,
}
......@@ -415,10 +415,10 @@ func (c *Client) handleWatchedPodEvent(p *v1.Pod, eventType watch.EventType) {
// unmarshalRecord
// unmarshal the kubernetes dubbo annotation value
func (c *Client) unmarshalRecord(record string) ([]*Object, error) {
func (c *Client) unmarshalRecord(record string) ([]*WatcherEvent, error) {
if len(record) == 0 {
// []*Object is nil.
// []*WatcherEvent is nil.
return nil, nil
}
......@@ -427,7 +427,7 @@ func (c *Client) unmarshalRecord(record string) ([]*Object, error) {
return nil, perrors.WithMessagef(err, "decode record (%s)", record)
}
var out []*Object
var out []*WatcherEvent
if err := json.Unmarshal(rawMsg, &out); err != nil {
return nil, perrors.WithMessage(err, "decode json")
}
......@@ -436,7 +436,7 @@ func (c *Client) unmarshalRecord(record string) ([]*Object, error) {
// marshalRecord
// marshal the kubernetes dubbo annotation value
func (c *Client) marshalRecord(ol []*Object) (string, error) {
func (c *Client) marshalRecord(ol []*WatcherEvent) (string, error) {
msg, err := json.Marshal(ol)
if err != nil {
......@@ -552,7 +552,7 @@ func (c *Client) assembleDUBBOAnnotations(k, v string, currentPod *v1.Pod) (oldP
return
}
newAnnotations, err := c.marshalRecord(append(al, &Object{Key: k, Value: v}))
newAnnotations, err := c.marshalRecord(append(al, &WatcherEvent{Key: k, Value: v}))
if err != nil {
err = perrors.WithMessage(err, "marshal record")
return
......@@ -605,7 +605,7 @@ func (c *Client) GetChildren(k string) ([]string, []string, error) {
// Watch
// watch on spec key
func (c *Client) Watch(k string) (<-chan *Object, <-chan struct{}, error) {
func (c *Client) Watch(k string) (<-chan *WatcherEvent, <-chan struct{}, error) {
w, err := c.store.Watch(k, false)
if err != nil {
......@@ -617,7 +617,7 @@ func (c *Client) Watch(k string) (<-chan *Object, <-chan struct{}, error) {
// Watch
// watch on spec prefix
func (c *Client) WatchWithPrefix(prefix string) (<-chan *Object, <-chan struct{}, error) {
func (c *Client) WatchWithPrefix(prefix string) (<-chan *WatcherEvent, <-chan struct{}, error) {
w, err := c.store.Watch(prefix, true)
if err != nil {
......
......@@ -89,7 +89,7 @@ func (l *EventListener) ListenServiceNodeEvent(key string, listener ...remoting.
// return true mean the event type is DELETE
// return false mean the event type is CREATE || UPDATE
func (l *EventListener) handleEvents(event *Object, listeners ...remoting.DataListener) bool {
func (l *EventListener) handleEvents(event *WatcherEvent, listeners ...remoting.DataListener) bool {
logger.Infof("got a kubernetes-store event {type: %d, key: %s}", event.EventType, event.Key)
......
......@@ -22,7 +22,9 @@ import (
"strconv"
"strings"
"sync"
)
import (
perrors "github.com/pkg/errors"
)
......@@ -57,9 +59,9 @@ func (e eventType) String() string {
}
}
// Object
// WatcherEvent
// object is element in store
type Object struct {
type WatcherEvent struct {
// event-type
EventType eventType `json:"-"`
// the dubbo-go should consume the key
......@@ -68,14 +70,14 @@ type Object struct {
Value string `json:"v"`
}
// Watchable Store
type Store interface {
// Watchable WatcherSet
type WatcherSet interface {
// put the object to the store
Put(object *Object) error
Put(object *WatcherEvent) error
// if prefix is false,
// the len([]*Object) == 1
Get(key string, prefix bool) ([]*Object, error)
// the len([]*WatcherEvent) == 1
Get(key string, prefix bool) ([]*WatcherEvent, error)
// watch the spec key or key prefix
Watch(key string, prefix bool) (Watcher, error)
// check the store status
......@@ -87,7 +89,7 @@ type Watcher interface {
// the watcher's id
ID() string
// result stream
ResultChan() <-chan *Object
ResultChan() <-chan *WatcherEvent
// Stop the watcher
stop()
// check the watcher status
......@@ -95,7 +97,7 @@ type Watcher interface {
}
// the store
type storeImpl struct {
type watcherSetImpl struct {
// Client's ctx, client die, the store will die too
ctx context.Context
......@@ -104,15 +106,15 @@ type storeImpl struct {
lock sync.RWMutex
// the key is dubbo-go interest meta
cache map[string]*Object
cache map[string]*WatcherEvent
currentWatcherId uint64
watchers map[uint64]*watcher
}
// wait exit
// closeWatchers
// when the store was closed
func (s *storeImpl) waitExit() {
func (s *watcherSetImpl) closeWatchers() {
select {
case <-s.ctx.Done():
......@@ -133,21 +135,21 @@ func (s *storeImpl) waitExit() {
// Watch
// watch on spec key, with or without prefix
func (s *storeImpl) Watch(key string, prefix bool) (Watcher, error) {
func (s *watcherSetImpl) Watch(key string, prefix bool) (Watcher, error) {
return s.addWatcher(key, prefix)
}
// Done
// get the store status
func (s *storeImpl) Done() <-chan struct{} {
func (s *watcherSetImpl) Done() <-chan struct{} {
return s.ctx.Done()
}
// Put
// put the object to store
func (s *storeImpl) Put(object *Object) error {
func (s *watcherSetImpl) Put(object *WatcherEvent) error {
sendMsg := func(object *Object, w *watcher) {
sendMsg := func(object *WatcherEvent, w *watcher) {
select {
case <-w.done():
......@@ -202,8 +204,7 @@ func (s *storeImpl) Put(object *Object) error {
}
// valid
// valid the client status should protected by lock
func (s *storeImpl) valid() error {
func (s *watcherSetImpl) valid() error {
select {
case <-s.ctx.Done():
return ErrStoreAlreadyStopped
......@@ -213,34 +214,35 @@ func (s *storeImpl) valid() error {
}
// addWatcher
func (s *storeImpl) addWatcher(key string, prefix bool) (Watcher, error) {
func (s *watcherSetImpl) addWatcher(key string, prefix bool) (Watcher, error) {
if err := s.valid(); err != nil {
return nil, err
}
s.lock.Lock()
defer s.lock.Unlock()
// increase the watcher-id
s.currentWatcherId++
w := &watcher{
id: s.currentWatcherId,
store: s,
interested: struct {
key string
prefix bool
}{key: key, prefix: prefix},
ch: make(chan *Object, defaultWatcherChanSize),
ch: make(chan *WatcherEvent, defaultWatcherChanSize),
exit: make(chan struct{}),
}
s.lock.Lock()
defer s.lock.Unlock()
if err := s.valid(); err != nil {
return nil, err
}
s.watchers[s.currentWatcherId] = w
w.id = s.currentWatcherId
s.currentWatcherId = s.currentWatcherId + 1
return w, nil
}
// Get
// get elements from cache
func (s *storeImpl) Get(key string, prefix bool) ([]*Object, error) {
func (s *watcherSetImpl) Get(key string, prefix bool) ([]*WatcherEvent, error) {
s.lock.RLock()
defer s.lock.RUnlock()
......@@ -252,14 +254,14 @@ func (s *storeImpl) Get(key string, prefix bool) ([]*Object, error) {
if !prefix {
for k, v := range s.cache {
if k == key {
return []*Object{v}, nil
return []*WatcherEvent{v}, nil
}
}
// object
return nil, ErrKVPairNotFound
}
var out []*Object
var out []*WatcherEvent
for k, v := range s.cache {
if strings.Contains(k, key) {
......@@ -279,21 +281,21 @@ type watcher struct {
id uint64
// the underlay store
store *storeImpl
store *watcherSetImpl
// the interest topic
interested struct {
key string
prefix bool
}
ch chan *Object
ch chan *WatcherEvent
closeOnce sync.Once
exit chan struct{}
}
// ResultChan
func (w *watcher) ResultChan() <-chan *Object {
func (w *watcher) ResultChan() <-chan *WatcherEvent {
return w.ch
}
......@@ -319,14 +321,14 @@ func (w *watcher) done() <-chan struct{} {
return w.exit
}
// newStore
// new store from parent context
func newStore(ctx context.Context) Store {
s := &storeImpl{
// newWatcherSet
// new watcher set from parent context
func newWatcherSet(ctx context.Context) WatcherSet {
s := &watcherSetImpl{
ctx: ctx,
cache: map[string]*Object{},
cache: map[string]*WatcherEvent{},
watchers: map[uint64]*watcher{},
}
go s.waitExit()
go s.closeWatchers()
return s
}
......@@ -30,7 +30,7 @@ func TestStore(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
s := newStore(ctx)
s := newWatcherSet(ctx)
wg := sync.WaitGroup{}
......@@ -82,7 +82,7 @@ func TestStore(t *testing.T) {
for i := 0; i < 5; i++ {
go func(i int) {
if err := s.Put(&Object{
if err := s.Put(&WatcherEvent{
Key: "key-" + strconv.Itoa(i),
Value: strconv.Itoa(i),
}); err != nil {
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment