Newer
Older
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kubernetes
import (
"context"
"strconv"
"strings"
"sync"
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
perrors "github.com/pkg/errors"
)
var (
ErrStoreAlreadyStopped = perrors.New("the store already be stopped")
ErrKVPairNotFound = perrors.New("k/v pair not found")
)
const (
defaultWatcherChanSize = 100
)
type eventType int
const (
Create eventType = iota
Update
Delete
)
func (e eventType) String() string {
switch e {
case Create:
return "CREATE"
case Update:
return "UPDATE"
case Delete:
return "DELETE"
default:
return "UNKNOWN"
}
}
// event-type
EventType eventType `json:"-"`
// the dubbo-go should consume the key
Key string `json:"k"`
// the dubbo-go should consume the value
Value string `json:"v"`
}
// Watchable WatcherSet
type WatcherSet interface {
// the len([]*WatcherEvent) == 1
Get(key string, prefix bool) ([]*WatcherEvent, error)
// watch the spec key or key prefix
Watch(key string, prefix bool) (Watcher, error)
// check the store status
Done() <-chan struct{}
}
type Watcher interface {
// the watcher's id
ID() string
// result stream
// Stop the watcher
stop()
// check the watcher status
done() <-chan struct{}
}
// the store
// Client's ctx, client die, the store will die too
ctx context.Context
// protect store and watchers
lock sync.RWMutex
// the key is dubbo-go interest meta
currentWatcherId uint64
watchers map[uint64]*watcher
}
// when the store was closed
select {
case <-s.ctx.Done():
// parent ctx be canceled, close the store
s.lock.Lock()
watchers := s.watchers
s.lock.Unlock()
for _, w := range watchers {
// stop watcher
w.stop()
}
}
}
// Watch
// watch on spec key, with or without prefix
func (s *watcherSetImpl) Watch(key string, prefix bool) (Watcher, error) {
return s.addWatcher(key, prefix)
}
// Done
// get the store status
return s.ctx.Done()
}
// Put
// put the object to store
func (s *watcherSetImpl) Put(object *WatcherEvent) error {
sendMsg := func(object *WatcherEvent, w *watcher) {
select {
case <-w.done():
// the watcher already stop
case w.ch <- object:
// block send the msg
}
}
s.lock.Lock()
defer s.lock.Unlock()
if err := s.valid(); err != nil {
return err
}
// put to store
if object.EventType == Delete {
delete(s.cache, object.Key)
} else {
old, ok := s.cache[object.Key]
if ok {
if old.Value == object.Value {
// already have this k/v pair
return nil
}
}
// refresh the object
s.cache[object.Key] = object
}
// notify watcher
for _, w := range s.watchers {
if !strings.Contains(object.Key, w.interested.key) {
// this watcher no interest in this element
continue
}
if !w.interested.prefix {
if object.Key == w.interested.key {
go sendMsg(object, w)
}
// not interest
continue
}
go sendMsg(object, w)
}
return nil
}
// valid
select {
case <-s.ctx.Done():
return ErrStoreAlreadyStopped
default:
return nil
}
}
// addWatcher
func (s *watcherSetImpl) addWatcher(key string, prefix bool) (Watcher, error) {
if err := s.valid(); err != nil {
return nil, err
}
s.lock.Lock()
defer s.lock.Unlock()
// increase the watcher-id
s.currentWatcherId++
store: s,
interested: struct {
key string
prefix bool
}{key: key, prefix: prefix},
ch: make(chan *WatcherEvent, defaultWatcherChanSize),
exit: make(chan struct{}),
}
s.watchers[s.currentWatcherId] = w
return w, nil
}
// Get
// get elements from cache
func (s *watcherSetImpl) Get(key string, prefix bool) ([]*WatcherEvent, error) {
s.lock.RLock()
defer s.lock.RUnlock()
if err := s.valid(); err != nil {
return nil, err
}
if !prefix {
for k, v := range s.cache {
if k == key {
}
}
// object
return nil, ErrKVPairNotFound
}
for k, v := range s.cache {
if strings.Contains(k, key) {
out = append(out, v)
}
}
if len(out) == 0 {
return nil, ErrKVPairNotFound
}
return out, nil
}
// the store watcher
type watcher struct {
id uint64
// the underlay store
// the interest topic
interested struct {
key string
prefix bool
}
closeOnce sync.Once
exit chan struct{}
}
// ResultChan
func (w *watcher) ResultChan() <-chan *WatcherEvent {
return w.ch
}
// ID
// the watcher's id
func (w *watcher) ID() string {
return strconv.FormatUint(w.id, 10)
}
// stop
// stop the watcher
func (w *watcher) stop() {
// double close will panic
w.closeOnce.Do(func() {
close(w.exit)
})
}
// done
// check watcher status
func (w *watcher) done() <-chan struct{} {
return w.exit
}
// newWatcherSet
// new watcher set from parent context
func newWatcherSet(ctx context.Context) WatcherSet {
s := &watcherSetImpl{
watchers: map[uint64]*watcher{},
}