Newer
Older
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kubernetes
import (
"context"
"encoding/base64"
"encoding/json"
"os"
"sync"
"time"
)
import (
perrors "github.com/pkg/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/strategicpatch"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
)
import (
"github.com/apache/dubbo-go/common/logger"
)
const (
// kubernetes inject the var
podNameKey = "HOSTNAME"
nameSpaceKey = "NAMESPACE"
// all pod annotation key
DubboIOAnnotationKey = "dubbo.io/annotation"
DubboIOLabelKey = "dubbo.io/label"
DubboIOLabelValue = "dubbo.io-value"
)
var (
ErrDubboLabelAlreadyExist = perrors.New("dubbo label already exist")
)
type Client struct {
// kubernetes connection config
cfg *rest.Config
// the kubernetes interface
rawClient kubernetes.Interface
// current pod config
currentPodName string
ns string
// protect the wg && currentPod
lock sync.RWMutex
// current pod status
currentPod *v1.Pod
wg sync.WaitGroup
// manage the client lifecycle
ctx context.Context
cancel context.CancelFunc
}
// load CurrentPodName
func getCurrentPodName() (string, error) {
v := os.Getenv(podNameKey)
if len(v) == 0 {
return "", perrors.New("read value from env by key (HOSTNAME)")
}
return v, nil
}
// load CurrentNameSpace
func getCurrentNameSpace() (string, error) {
v := os.Getenv(nameSpaceKey)
if len(v) == 0 {
return "", perrors.New("read value from env by key (NAMESPACE)")
}
return v, nil
}
// NewMockClient
// export for registry package test
func NewMockClient(namespace string, mockClientGenerator func() (kubernetes.Interface, error)) (*Client, error) {
return newMockClient(namespace, mockClientGenerator)
}
// newMockClient
// new a client for test
func newMockClient(namespace string, mockClientGenerator func() (kubernetes.Interface, error)) (*Client, error) {
rawClient, err := mockClientGenerator()
if err != nil {
return nil, perrors.WithMessage(err, "call mock generator")
}
currentPodName, err := getCurrentPodName()
if err != nil {
return nil, perrors.WithMessage(err, "get pod name")
}
ctx, cancel := context.WithCancel(context.Background())
c := &Client{
currentPodName: currentPodName,
ns: namespace,
rawClient: rawClient,
ctx: ctx,
cancel: cancel,
}
currentPod, err := c.initCurrentPod()
if err != nil {
return nil, perrors.WithMessage(err, "init current pod")
}
// record current status
c.currentPod = currentPod
// init the watcherSet by current pods
if err := c.initWatchSet(); err != nil {
return nil, perrors.WithMessage(err, "init watcherSet")
logger.Infof("init kubernetes registry client success @namespace = %q @Podname = %q", namespace, c.currentPod.Name)
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
// newClient
// new a client for registry
func newClient(namespace string) (*Client, error) {
cfg, err := rest.InClusterConfig()
if err != nil {
return nil, perrors.WithMessage(err, "get in-cluster config")
}
rawClient, err := kubernetes.NewForConfig(cfg)
if err != nil {
return nil, perrors.WithMessage(err, "new kubernetes client by in cluster config")
}
currentPodName, err := getCurrentPodName()
if err != nil {
return nil, perrors.WithMessage(err, "get pod name")
}
ctx, cancel := context.WithCancel(context.Background())
c := &Client{
currentPodName: currentPodName,
ns: namespace,
cfg: cfg,
rawClient: rawClient,
ctx: ctx,
cancel: cancel,
}
currentPod, err := c.initCurrentPod()
if err != nil {
return nil, perrors.WithMessage(err, "init current pod")
}
// record current status
c.currentPod = currentPod
// init the watcherSet by current pods
if err := c.initWatchSet(); err != nil {
return nil, perrors.WithMessage(err, "init watcherSet")
}
// start kubernetes watch loop
logger.Infof("init kubernetes registry client success @namespace = %q @Podname = %q", namespace, c.currentPod.Name)
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
return c, nil
}
// initCurrentPod
// 1. get current pod
// 2. give the dubbo-label for this pod
func (c *Client) initCurrentPod() (*v1.Pod, error) {
// read the current pod status
currentPod, err := c.rawClient.CoreV1().Pods(c.ns).Get(c.currentPodName, metav1.GetOptions{})
if err != nil {
return nil, perrors.WithMessagef(err, "get current (%s) pod in namespace (%s)", c.currentPodName, c.ns)
}
oldPod, newPod, err := c.assembleDUBBOLabel(currentPod)
if err != nil {
if err != ErrDubboLabelAlreadyExist {
return nil, perrors.WithMessage(err, "assemble dubbo label")
}
// current pod don't have label
}
p, err := c.getPatch(oldPod, newPod)
if err != nil {
return nil, perrors.WithMessage(err, "get patch")
}
currentPod, err = c.patchCurrentPod(p)
if err != nil {
return nil, perrors.WithMessage(err, "patch to current pod")
}
return currentPod, nil
}
// 1. get all with dubbo label pods
// 2. put every element to watcherSet
func (c *Client) initWatchSet() error {
pods, err := c.rawClient.CoreV1().Pods(c.ns).List(metav1.ListOptions{
LabelSelector: fields.OneTermEqualSelector(DubboIOLabelKey, DubboIOLabelValue).String(),
})
if err != nil {
return perrors.WithMessagef(err, "list pods in namespace (%s)", c.ns)
}
// set resource version
c.lastResourceVersion = pods.GetResourceVersion()
for _, pod := range pods.Items {
logger.Debugf("got the pod (name: %s), (label: %v), (annotations: %v)", pod.Name, pod.GetLabels(), pod.GetAnnotations())
c.handleWatchedPodEvent(&pod, watch.Added)
}
return nil
}
// try once
watcher, err := c.rawClient.CoreV1().Pods(c.ns).Watch(metav1.ListOptions{
LabelSelector: fields.OneTermEqualSelector(DubboIOLabelKey, DubboIOLabelValue).String(),
Watch: true,
ResourceVersion: c.lastResourceVersion,
})
if err != nil {
return perrors.WithMessagef(err, "try to watch the namespace (%s) pods", c.ns)
}
watcher.Stop()
// add wg, grace close the client
type resourceVersionGetter interface {
GetResourceVersion() string
}
defer func() {
// notify other goroutine, this loop over
c.wg.Done()
wc, err := c.rawClient.CoreV1().Pods(c.ns).Watch(metav1.ListOptions{
LabelSelector: fields.OneTermEqualSelector(DubboIOLabelKey, DubboIOLabelValue).String(),
Watch: true,
})
if err != nil {
logger.Warnf("watch the namespace (%s) pods: %v, retry after 2 seconds", c.ns, err)
time.Sleep(2 * time.Second)
continue
}
logger.Infof("the old kubernetes client broken, collect the resource status from resource version (%s)", c.lastResourceVersion)
for {
select {
// double check ctx
case <-c.ctx.Done():
logger.Info("the kubernetes client stopped, resultChan len %d", len(wc.ResultChan()))
return
// get one element from result-chan
case event, ok := <-wc.ResultChan():
if !ok {
wc.Stop()
logger.Info("kubernetes watch chan die, create new")
goto onceWatch
}
if event.Type == watch.Error {
// watched a error event
logger.Warnf("kubernetes watch api report err (%#v)", event)
continue
}
o, ok := event.Object.(resourceVersionGetter)
if !ok {
logger.Warnf("kubernetes response object not a versioned object, its real type %T", event.Object)
// record the last resource version avoid to sync all pod
c.lastResourceVersion = o.GetResourceVersion()
logger.Infof("kubernetes get the current resource version %v", c.lastResourceVersion)
// check event object type
p, ok := event.Object.(*v1.Pod)
if !ok {
logger.Warnf("kubernetes response object not a Pod, its real type %T", event.Object)
// handle the watched pod
go c.handleWatchedPodEvent(p, event.Type)
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
}
}
}
}
// handleWatchedPodEvent
// handle watched pod event
func (c *Client) handleWatchedPodEvent(p *v1.Pod, eventType watch.EventType) {
for ak, av := range p.GetAnnotations() {
// not dubbo interest annotation
if ak != DubboIOAnnotationKey {
continue
}
ol, err := c.unmarshalRecord(av)
if err != nil {
logger.Errorf("there a pod with dubbo annotation, but unmarshal dubbo value %v", err)
return
}
for _, o := range ol {
switch eventType {
case watch.Added:
// if pod is added, the record always be create
o.EventType = Create
case watch.Modified:
o.EventType = Update
case watch.Deleted:
o.EventType = Delete
default:
logger.Errorf("no valid kubernetes event-type (%s) ", eventType)
return
}
logger.Debugf("prepare to put object (%#v) to kubernetes-watcherSet", o)
if err := c.watcherSet.Put(o); err != nil {
logger.Errorf("put (%#v) to cache watcherSet: %v ", o, err)
return
}
}
}
}
// unmarshalRecord
// unmarshal the kubernetes dubbo annotation value
func (c *Client) unmarshalRecord(record string) ([]*WatcherEvent, error) {
return nil, nil
}
rawMsg, err := base64.URLEncoding.DecodeString(record)
if err != nil {
return nil, perrors.WithMessagef(err, "decode record (%s)", record)
}
if err := json.Unmarshal(rawMsg, &out); err != nil {
return nil, perrors.WithMessage(err, "decode json")
}
return out, nil
}
// marshalRecord
// marshal the kubernetes dubbo annotation value
func (c *Client) marshalRecord(ol []*WatcherEvent) (string, error) {
msg, err := json.Marshal(ol)
if err != nil {
return "", perrors.WithMessage(err, "json encode object list")
}
return base64.URLEncoding.EncodeToString(msg), nil
}
// readCurrentPod
// read the current pod status from kubernetes api
func (c *Client) readCurrentPod() (*v1.Pod, error) {
currentPod, err := c.rawClient.CoreV1().Pods(c.ns).Get(c.currentPodName, metav1.GetOptions{})
if err != nil {
return nil, perrors.WithMessagef(err, "get current (%s) pod in namespace (%s)", c.currentPodName, c.ns)
}
return currentPod, nil
}
// Create
// create k/v pair in watcher-set
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
func (c *Client) Create(k, v string) error {
// 1. accord old pod && (k, v) assemble new pod dubbo annotion v
// 2. get patch data
// 3. PATCH the pod
c.lock.Lock()
defer c.lock.Unlock()
currentPod, err := c.readCurrentPod()
if err != nil {
return perrors.WithMessage(err, "read current pod")
}
oldPod, newPod, err := c.assembleDUBBOAnnotations(k, v, currentPod)
if err != nil {
return perrors.WithMessage(err, "assemble")
}
patchBytes, err := c.getPatch(oldPod, newPod)
if err != nil {
return perrors.WithMessage(err, "get patch")
}
updatedPod, err := c.patchCurrentPod(patchBytes)
if err != nil {
return perrors.WithMessage(err, "patch current pod")
}
c.currentPod = updatedPod
// not update the watcherSet, the watcherSet should be write by the watchPodsLoop
return nil
}
// patch current pod
// write new meta for current pod
func (c *Client) patchCurrentPod(patch []byte) (*v1.Pod, error) {
updatedPod, err := c.rawClient.CoreV1().Pods(c.ns).Patch(c.currentPodName, types.StrategicMergePatchType, patch)
if err != nil {
return nil, perrors.WithMessage(err, "patch in kubernetes pod ")
}
return updatedPod, nil
}
// every dubbo instance should be labeled spec {"dubbo.io/label":"dubbo.io/label-value"} label
func (c *Client) assembleDUBBOLabel(currentPod *v1.Pod) (*v1.Pod, *v1.Pod, error) {
oldPod.Labels = make(map[string]string, 8)
newPod.Labels = make(map[string]string, 8)
if currentPod.GetLabels() != nil {
if currentPod.GetLabels()[DubboIOLabelKey] == DubboIOLabelValue {
// already have label
}
}
// copy current pod labels to oldPod && newPod
for k, v := range currentPod.GetLabels() {
oldPod.Labels[k] = v
newPod.Labels[k] = v
}
// assign new label for current pod
newPod.Labels[DubboIOLabelKey] = DubboIOLabelValue
}
// assemble the dubbo kubernetes annotations
// accord the current pod && (k,v) assemble the old-pod, new-pod
func (c *Client) assembleDUBBOAnnotations(k, v string, currentPod *v1.Pod) (oldPod *v1.Pod, newPod *v1.Pod, err error) {
oldPod = &v1.Pod{}
newPod = &v1.Pod{}
oldPod.Annotations = make(map[string]string, 8)
newPod.Annotations = make(map[string]string, 8)
for k, v := range currentPod.GetAnnotations() {
oldPod.Annotations[k] = v
newPod.Annotations[k] = v
}
al, err := c.unmarshalRecord(oldPod.GetAnnotations()[DubboIOAnnotationKey])
if err != nil {
err = perrors.WithMessage(err, "unmarshal record")
return
}
newAnnotations, err := c.marshalRecord(append(al, &WatcherEvent{Key: k, Value: v}))
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
if err != nil {
err = perrors.WithMessage(err, "marshal record")
return
}
newPod.Annotations[DubboIOAnnotationKey] = newAnnotations
return
}
// getPatch
// get the kubernetes pod patch bytes
func (c *Client) getPatch(oldPod, newPod *v1.Pod) ([]byte, error) {
oldData, err := json.Marshal(oldPod)
if err != nil {
return nil, perrors.WithMessage(err, "marshal old pod")
}
newData, err := json.Marshal(newPod)
if err != nil {
return nil, perrors.WithMessage(err, "marshal newPod pod")
}
patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, v1.Pod{})
if err != nil {
return nil, perrors.WithMessage(err, "create two-way-merge-patch")
}
return patchBytes, nil
}
// GetChildren
func (c *Client) GetChildren(k string) ([]string, []string, error) {
return nil, nil, perrors.WithMessagef(err, "get children from watcherSet on (%s)", k)
}
var kList []string
var vList []string
for _, o := range objectList {
kList = append(kList, o.Key)
vList = append(vList, o.Value)
}
return kList, vList, nil
}
// Watch
// watch on spec key
func (c *Client) Watch(k string) (<-chan *WatcherEvent, <-chan struct{}, error) {
return nil, nil, perrors.WithMessagef(err, "watch on (%s)", k)
}
// Watch
// watch on spec prefix
func (c *Client) WatchWithPrefix(prefix string) (<-chan *WatcherEvent, <-chan struct{}, error) {
return nil, nil, perrors.WithMessagef(err, "watch on prefix (%s)", prefix)
}
// Valid
// Valid the client
// if return false, the client is die
func (c *Client) Valid() bool {
select {
case <-c.Done():
return false
default:
}
c.lock.RLock()
if c.rawClient == nil {
c.lock.RUnlock()
return false
}
c.lock.RUnlock()
return true
}
// Done
// read the client status
func (c *Client) Done() <-chan struct{} {
return c.ctx.Done()
}
// Stop
// read the client status
func (c *Client) Close() {
select {
case <-c.ctx.Done():
//already stopped
return
default:
}
c.cancel()
// the client ctx be canceled
// so, just wait
c.wg.Wait()
}
// ValidateClient
// validate the kubernetes client
func ValidateClient(container clientFacade) error {
if client == nil || client.Valid() {
ns, err := getCurrentNameSpace()
if err != nil {
return perrors.WithMessage(err, "get current namespace")
}
newClient, err := newClient(ns)
if err != nil {
logger.Warnf("new kubernetes client (namespace{%s}: %v)", ns, err)
return perrors.WithMessagef(err, "new kubernetes client (:%+v)", ns)
}
container.SetClient(newClient)
}
return nil
}