Skip to content
Snippets Groups Projects
watch.go 6.1 KiB
Newer Older
scott.wang's avatar
scott.wang committed
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package kubernetes

import (
	"context"
	"strconv"
	"strings"
	"sync"
scott's avatar
scott committed
)
scott's avatar
scott committed
import (
	perrors "github.com/pkg/errors"
)

var (
	ErrStoreAlreadyStopped = perrors.New("the store already be stopped")
	ErrKVPairNotFound      = perrors.New("k/v pair not found")
)

const (
	defaultWatcherChanSize = 100
)

type eventType int

const (
	Create eventType = iota
	Update
	Delete
)

func (e eventType) String() string {

	switch e {
	case Create:
		return "CREATE"
	case Update:
		return "UPDATE"
	case Delete:
		return "DELETE"
	default:
		return "UNKNOWN"
	}
}

scott's avatar
scott committed
// WatcherEvent
// object is element in store
scott's avatar
scott committed
type WatcherEvent struct {
	// event-type
	EventType eventType `json:"-"`
	// the dubbo-go should consume the key
	Key string `json:"k"`
	// the dubbo-go should consume the value
	Value string `json:"v"`
}

scott's avatar
scott committed
// Watchable WatcherSet
type WatcherSet interface {

	// put the object to the store
scott's avatar
scott committed
	Put(object *WatcherEvent) error
	// if prefix is false,
scott's avatar
scott committed
	// the len([]*WatcherEvent) == 1
	Get(key string, prefix bool) ([]*WatcherEvent, error)
	// watch the spec key or key prefix
	Watch(key string, prefix bool) (Watcher, error)
	// check the store status
	Done() <-chan struct{}
}

scott's avatar
scott committed
// Watcher
type Watcher interface {
	// the watcher's id
	ID() string
	// result stream
scott's avatar
scott committed
	ResultChan() <-chan *WatcherEvent
	// Stop the watcher
	stop()
	// check the watcher status
	done() <-chan struct{}
}

// the store
scott's avatar
scott committed
type watcherSetImpl struct {

	// Client's ctx, client die, the store will die too
	ctx context.Context

	// protect store and watchers
	lock sync.RWMutex

	// the key is dubbo-go interest meta
scott's avatar
scott committed
	cache map[string]*WatcherEvent

	currentWatcherId uint64
	watchers         map[uint64]*watcher
}

scott's avatar
scott committed
// closeWatchers
// when the store was closed
scott's avatar
scott committed
func (s *watcherSetImpl) closeWatchers() {

	select {
	case <-s.ctx.Done():
		// parent ctx be canceled, close the store
		s.lock.Lock()
		watchers := s.watchers
		s.lock.Unlock()

		for _, w := range watchers {
			// stop data stream
			// close(w.ch)
			// stop watcher
			w.stop()
		}
	}
}

// Watch
// watch on spec key, with or without prefix
scott's avatar
scott committed
func (s *watcherSetImpl) Watch(key string, prefix bool) (Watcher, error) {
	return s.addWatcher(key, prefix)
}

// Done
// get the store status
scott's avatar
scott committed
func (s *watcherSetImpl) Done() <-chan struct{} {
	return s.ctx.Done()
}

// Put
// put the object to store
scott's avatar
scott committed
func (s *watcherSetImpl) Put(object *WatcherEvent) error {
scott's avatar
scott committed
	sendMsg := func(object *WatcherEvent, w *watcher) {
		select {
		case <-w.done():
			// the watcher already stop
		case w.ch <- object:
			// block send the msg
		}
	}

	s.lock.Lock()
	defer s.lock.Unlock()

	if err := s.valid(); err != nil {
		return err
	}

	// put to store
	if object.EventType == Delete {
		delete(s.cache, object.Key)
	} else {

		old, ok := s.cache[object.Key]
		if ok {
			if old.Value == object.Value {
				// already have this k/v pair
				return nil
			}
		}

		// refresh the object
		s.cache[object.Key] = object
	}

	// notify watcher
	for _, w := range s.watchers {

		if !strings.Contains(object.Key, w.interested.key) {
			//  this watcher no interest in this element
			continue
		}

		if !w.interested.prefix {
			if object.Key == w.interested.key {
				go sendMsg(object, w)
			}
			// not interest
			continue
		}
		go sendMsg(object, w)
	}
	return nil
}

// valid
scott's avatar
scott committed
func (s *watcherSetImpl) valid() error {
	select {
	case <-s.ctx.Done():
		return ErrStoreAlreadyStopped
	default:
		return nil
	}
}

// addWatcher
scott's avatar
scott committed
func (s *watcherSetImpl) addWatcher(key string, prefix bool) (Watcher, error) {

	if err := s.valid(); err != nil {
		return nil, err
	}

	s.lock.Lock()
	defer s.lock.Unlock()

	// increase the watcher-id
	s.currentWatcherId++
scott's avatar
scott committed
		id:    s.currentWatcherId,
		store: s,
		interested: struct {
			key    string
			prefix bool
		}{key: key, prefix: prefix},
scott's avatar
scott committed
		ch:   make(chan *WatcherEvent, defaultWatcherChanSize),
		exit: make(chan struct{}),
	}
	s.watchers[s.currentWatcherId] = w
	return w, nil
}

// Get
// get elements from cache
scott's avatar
scott committed
func (s *watcherSetImpl) Get(key string, prefix bool) ([]*WatcherEvent, error) {

	s.lock.RLock()
	defer s.lock.RUnlock()

	if err := s.valid(); err != nil {
		return nil, err
	}

	if !prefix {
		for k, v := range s.cache {
			if k == key {
scott's avatar
scott committed
				return []*WatcherEvent{v}, nil
			}
		}
		// object
		return nil, ErrKVPairNotFound
	}

scott's avatar
scott committed
	var out []*WatcherEvent

	for k, v := range s.cache {
		if strings.Contains(k, key) {
			out = append(out, v)
		}
	}

	if len(out) == 0 {
		return nil, ErrKVPairNotFound
	}

	return out, nil
}

// the store watcher
type watcher struct {
	id uint64

	// the underlay store
scott's avatar
scott committed
	store *watcherSetImpl

	// the interest topic
	interested struct {
		key    string
		prefix bool
	}
scott's avatar
scott committed
	ch chan *WatcherEvent

	closeOnce sync.Once
	exit      chan struct{}
}

// ResultChan
scott's avatar
scott committed
func (w *watcher) ResultChan() <-chan *WatcherEvent {
	return w.ch
}

// ID
// the watcher's id
func (w *watcher) ID() string {
	return strconv.FormatUint(w.id, 10)
}

// stop
// stop the watcher
func (w *watcher) stop() {

	// double close will panic
	w.closeOnce.Do(func() {
		close(w.exit)
	})
}

// done
// check watcher status
func (w *watcher) done() <-chan struct{} {
	return w.exit
}

scott's avatar
scott committed
// newWatcherSet
// new watcher set from parent context
func newWatcherSet(ctx context.Context) WatcherSet {
	s := &watcherSetImpl{
scott's avatar
scott committed
		cache:    map[string]*WatcherEvent{},
		watchers: map[uint64]*watcher{},
	}
scott's avatar
scott committed
	go s.closeWatchers()