Skip to content
Snippets Groups Projects
watch.go 6.49 KiB
Newer Older
scott.wang's avatar
scott.wang committed
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package kubernetes

import (
	"context"
	"strconv"
	"strings"
	"sync"
scott's avatar
scott committed
)
scott's avatar
scott committed
import (
	perrors "github.com/pkg/errors"
)

var (
scott's avatar
scott committed
	ErrWatcherSetAlreadyStopped = perrors.New("the watcher-set already be stopped")
	ErrKVPairNotFound           = perrors.New("k/v pair not found")
)

const (
	defaultWatcherChanSize = 100
)

type eventType int

const (
	Create eventType = iota
	Update
	Delete
)

func (e eventType) String() string {

	switch e {
	case Create:
		return "CREATE"
	case Update:
		return "UPDATE"
	case Delete:
		return "DELETE"
	default:
		return "UNKNOWN"
	}
}

scott's avatar
scott committed
// WatcherEvent
scott's avatar
scott committed
// watch event is element in watcherSet
scott's avatar
scott committed
type WatcherEvent struct {
	// event-type
	EventType eventType `json:"-"`
	// the dubbo-go should consume the key
	Key string `json:"k"`
	// the dubbo-go should consume the value
	Value string `json:"v"`
}

scott's avatar
scott committed
// Watchable WatcherSet
scott's avatar
scott committed
type WatcherSet interface {
scott's avatar
scott committed
	// put the watch event to the watch set
scott's avatar
scott committed
	Put(object *WatcherEvent) error
	// if prefix is false,
scott's avatar
scott committed
	// the len([]*WatcherEvent) == 1
	Get(key string, prefix bool) ([]*WatcherEvent, error)
	// watch the spec key or key prefix
	Watch(key string, prefix bool) (Watcher, error)
scott's avatar
scott committed
	// check the watcher set status
	Done() <-chan struct{}
}

scott's avatar
scott committed
// Watcher
type Watcher interface {
	// the watcher's id
	ID() string
	// result stream
scott's avatar
scott committed
	ResultChan() <-chan *WatcherEvent
	// Stop the watcher
	stop()
	// check the watcher status
	done() <-chan struct{}
}

scott's avatar
scott committed
// the watch set implement
scott's avatar
scott committed
type watcherSetImpl struct {
scott's avatar
scott committed
	// Client's ctx, client die, the watch set will die too
	ctx context.Context

scott's avatar
scott committed
	// protect watcher-set and watchers
	lock sync.RWMutex

	// the key is dubbo-go interest meta
scott's avatar
scott committed
	cache map[string]*WatcherEvent

	currentWatcherId uint64
	watchers         map[uint64]*watcher
}

scott's avatar
scott committed
// closeWatchers
scott's avatar
scott committed
// when the watcher-set was closed
scott's avatar
scott committed
func (s *watcherSetImpl) closeWatchers() {

	select {
	case <-s.ctx.Done():
scott's avatar
scott committed
		// parent ctx be canceled, close the watch-set's watchers
		s.lock.Lock()
		watchers := s.watchers
		s.lock.Unlock()

		for _, w := range watchers {
			// stop data stream
			// close(w.ch)
			// stop watcher
			w.stop()
		}
	}
}

// Watch
// watch on spec key, with or without prefix
scott's avatar
scott committed
func (s *watcherSetImpl) Watch(key string, prefix bool) (Watcher, error) {
	return s.addWatcher(key, prefix)
}

// Done
scott's avatar
scott committed
// get the watcher-set status
scott's avatar
scott committed
func (s *watcherSetImpl) Done() <-chan struct{} {
	return s.ctx.Done()
}

// Put
scott's avatar
scott committed
// put the watch event to watcher-set
func (s *watcherSetImpl) Put(watcherEvent *WatcherEvent) error {
scott.wang's avatar
scott.wang committed
	blockSendMsg := func(object *WatcherEvent, w *watcher) {
		select {
		case <-w.done():
			// the watcher already stop
		case w.ch <- object:
			// block send the msg
		}
	}

	s.lock.Lock()
	defer s.lock.Unlock()

	if err := s.valid(); err != nil {
		return err
	}

scott's avatar
scott committed
	// put to watcher-set
	switch watcherEvent.EventType {
	case Delete:
scott.wang's avatar
scott.wang committed
		// delete from store
scott's avatar
scott committed
		delete(s.cache, watcherEvent.Key)
scott.wang's avatar
scott.wang committed
	case Update, Create:
		o, ok := s.cache[watcherEvent.Key]
		if !ok {
			// pod update, but create new k/v pair
			watcherEvent.EventType = Create
			s.cache[watcherEvent.Key] = watcherEvent
			break
		// k/v pair already latest
		if o.Value == watcherEvent.Value {
			return nil
		}
		// update to latest status
scott's avatar
scott committed
		s.cache[watcherEvent.Key] = watcherEvent
	}

	// notify watcher
	for _, w := range s.watchers {
scott's avatar
scott committed
		if !strings.Contains(watcherEvent.Key, w.interested.key) {
			//  this watcher no interest in this element
			continue
		}
		if !w.interested.prefix {
scott's avatar
scott committed
			if watcherEvent.Key == w.interested.key {
scott.wang's avatar
scott.wang committed
				blockSendMsg(watcherEvent, w)
			}
			// not interest
			continue
		}
scott.wang's avatar
scott.wang committed
		blockSendMsg(watcherEvent, w)
scott's avatar
scott committed
func (s *watcherSetImpl) valid() error {
	select {
	case <-s.ctx.Done():
scott's avatar
scott committed
		return ErrWatcherSetAlreadyStopped
	default:
		return nil
	}
}

// addWatcher
scott's avatar
scott committed
func (s *watcherSetImpl) addWatcher(key string, prefix bool) (Watcher, error) {

	if err := s.valid(); err != nil {
		return nil, err
	}

	s.lock.Lock()
	defer s.lock.Unlock()

	// increase the watcher-id
	s.currentWatcherId++
scott's avatar
scott committed
		id:         s.currentWatcherId,
		watcherSet: s,
		interested: struct {
			key    string
			prefix bool
		}{key: key, prefix: prefix},
scott's avatar
scott committed
		ch:   make(chan *WatcherEvent, defaultWatcherChanSize),
		exit: make(chan struct{}),
	}
	s.watchers[s.currentWatcherId] = w
	return w, nil
}

// Get
scott's avatar
scott committed
// get elements from watcher-set
scott's avatar
scott committed
func (s *watcherSetImpl) Get(key string, prefix bool) ([]*WatcherEvent, error) {

	s.lock.RLock()
	defer s.lock.RUnlock()

	if err := s.valid(); err != nil {
		return nil, err
	}

	if !prefix {
		for k, v := range s.cache {
			if k == key {
scott's avatar
scott committed
				return []*WatcherEvent{v}, nil
			}
		}
		// object
		return nil, ErrKVPairNotFound
	}

scott's avatar
scott committed
	var out []*WatcherEvent

	for k, v := range s.cache {
		if strings.Contains(k, key) {
			out = append(out, v)
		}
	}

	if len(out) == 0 {
		return nil, ErrKVPairNotFound
	}

	return out, nil
}

scott's avatar
scott committed
// the watcher-set watcher
type watcher struct {
	id uint64

scott's avatar
scott committed
	// the underlay watcherSet
	watcherSet *watcherSetImpl

	// the interest topic
	interested struct {
		key    string
		prefix bool
	}
scott's avatar
scott committed
	ch chan *WatcherEvent

	closeOnce sync.Once
	exit      chan struct{}
}

// ResultChan
scott's avatar
scott committed
func (w *watcher) ResultChan() <-chan *WatcherEvent {
	return w.ch
}

// ID
// the watcher's id
func (w *watcher) ID() string {
	return strconv.FormatUint(w.id, 10)
}

// stop
// stop the watcher
func (w *watcher) stop() {

	// double close will panic
	w.closeOnce.Do(func() {
		close(w.exit)
	})
}

// done
// check watcher status
func (w *watcher) done() <-chan struct{} {
	return w.exit
}

scott's avatar
scott committed
// newWatcherSet
// new watcher set from parent context
func newWatcherSet(ctx context.Context) WatcherSet {
	s := &watcherSetImpl{
scott's avatar
scott committed
		cache:    map[string]*WatcherEvent{},
		watchers: map[uint64]*watcher{},
	}
scott's avatar
scott committed
	go s.closeWatchers()