Newer
Older
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kubernetes
import (
"context"
"strconv"
"strings"
"sync"
perrors "github.com/pkg/errors"
)
var (
ErrWatcherSetAlreadyStopped = perrors.New("the watcher-set already be stopped")
ErrKVPairNotFound = perrors.New("k/v pair not found")
)
const (
defaultWatcherChanSize = 100
)
type eventType int
const (
Create eventType = iota
Update
Delete
)
func (e eventType) String() string {
switch e {
case Create:
return "CREATE"
case Update:
return "UPDATE"
case Delete:
return "DELETE"
default:
return "UNKNOWN"
}
}
// event-type
EventType eventType `json:"-"`
// the dubbo-go should consume the key
Key string `json:"k"`
// the dubbo-go should consume the value
Value string `json:"v"`
}
// thread-safe
// the len([]*WatcherEvent) == 1
Get(key string, prefix bool) ([]*WatcherEvent, error)
// watch the spec key or key prefix
Watch(key string, prefix bool) (Watcher, error)
type Watcher interface {
// the watcher's id
ID() string
// result stream
// Stop the watcher
stop()
// check the watcher status
done() <-chan struct{}
}
lock sync.RWMutex
// the key is dubbo-go interest meta
currentWatcherId uint64
watchers map[uint64]*watcher
}
select {
case <-s.ctx.Done():
watchers := s.watchers
s.lock.Unlock()
for _, w := range watchers {
// stop watcher
w.stop()
}
}
}
// Watch
// watch on spec key, with or without prefix
func (s *watcherSetImpl) Watch(key string, prefix bool) (Watcher, error) {
return s.addWatcher(key, prefix)
}
// Done
return s.ctx.Done()
}
// Put
// put the watch event to watcher-set
func (s *watcherSetImpl) Put(watcherEvent *WatcherEvent) error {
select {
case <-w.done():
// the watcher already stop
case w.ch <- object:
// block send the msg
}
}
s.lock.Lock()
defer s.lock.Unlock()
if err := s.valid(); err != nil {
return err
}
switch watcherEvent.EventType {
case Delete:
o, ok := s.cache[watcherEvent.Key]
if !ok {
// pod update, but create new k/v pair
watcherEvent.EventType = Create
s.cache[watcherEvent.Key] = watcherEvent
break
// k/v pair already latest
if o.Value == watcherEvent.Value {
return nil
}
// update to latest status
}
// notify watcher
for _, w := range s.watchers {
// this watcher no interest in this element
continue
}
if !w.interested.prefix {
}
// not interest
continue
}
}
return nil
}
// valid
select {
case <-s.ctx.Done():
default:
return nil
}
}
// addWatcher
func (s *watcherSetImpl) addWatcher(key string, prefix bool) (Watcher, error) {
if err := s.valid(); err != nil {
return nil, err
}
s.lock.Lock()
defer s.lock.Unlock()
// increase the watcher-id
s.currentWatcherId++
interested: struct {
key string
prefix bool
}{key: key, prefix: prefix},
ch: make(chan *WatcherEvent, defaultWatcherChanSize),
exit: make(chan struct{}),
}
s.watchers[s.currentWatcherId] = w
return w, nil
}
// Get
func (s *watcherSetImpl) Get(key string, prefix bool) ([]*WatcherEvent, error) {
s.lock.RLock()
defer s.lock.RUnlock()
if err := s.valid(); err != nil {
return nil, err
}
if !prefix {
for k, v := range s.cache {
if k == key {
}
}
// object
return nil, ErrKVPairNotFound
}
for k, v := range s.cache {
if strings.Contains(k, key) {
out = append(out, v)
}
}
if len(out) == 0 {
return nil, ErrKVPairNotFound
}
return out, nil
}
type watcher struct {
id uint64
// the interest topic
interested struct {
key string
prefix bool
}
closeOnce sync.Once
exit chan struct{}
}
// ResultChan
func (w *watcher) ResultChan() <-chan *WatcherEvent {
return w.ch
}
// ID
// the watcher's id
func (w *watcher) ID() string {
return strconv.FormatUint(w.id, 10)
}
// stop
// stop the watcher
func (w *watcher) stop() {
// double close will panic
w.closeOnce.Do(func() {
close(w.exit)
})
}
// done
// check watcher status
func (w *watcher) done() <-chan struct{} {
return w.exit
}
// newWatcherSet
// new watcher set from parent context
func newWatcherSet(ctx context.Context) WatcherSet {
s := &watcherSetImpl{
watchers: map[uint64]*watcher{},
}