Skip to content
Snippets Groups Projects
registry_controller.go 16.1 KiB
Newer Older
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package kubernetes

import (
	"context"
	"encoding/base64"
	"encoding/json"
scott.wang's avatar
scott.wang committed
	"strconv"
scott.wang's avatar
scott.wang committed
	"strings"
scott.wang's avatar
scott.wang committed
	"sync"
import (
	perrors "github.com/pkg/errors"
	v1 "k8s.io/api/core/v1"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/apimachinery/pkg/labels"
	"k8s.io/apimachinery/pkg/selection"
	"k8s.io/apimachinery/pkg/types"
	"k8s.io/apimachinery/pkg/util/strategicpatch"
	"k8s.io/apimachinery/pkg/watch"
	"k8s.io/client-go/informers"
	informerscorev1 "k8s.io/client-go/informers/core/v1"
	"k8s.io/client-go/kubernetes"
	"k8s.io/client-go/rest"
	"k8s.io/client-go/tools/cache"
	"k8s.io/client-go/util/workqueue"
)

import (
	"github.com/apache/dubbo-go/common"
	"github.com/apache/dubbo-go/common/logger"
)

	// kubernetes inject env var
scott.wang's avatar
scott.wang committed
	podNameKey              = "HOSTNAME"
	nameSpaceKey            = "NAMESPACE"
	needWatchedNameSpaceKey = "DUBBO_NAMESPACE"
scott.wang's avatar
scott.wang committed
	// all pod annotation key
	DubboIOAnnotationKey = "dubbo.io/annotation"
	// all pod label key and value pair
	DubboIOLabelKey           = "dubbo.io/label"
	DubboIOConsumerLabelValue = "dubbo.io.consumer"
	DubboIOProviderLabelValue = "dubbo.io.provider"

	// kubernetes suggest resync
	defaultResync = 5 * time.Minute
scott.wang's avatar
scott.wang committed
)

var (
	ErrDubboLabelAlreadyExist = perrors.New("dubbo label already exist")
)

// dubboRegistryController
// work like a kubernetes controller
type dubboRegistryController struct {
scott.wang's avatar
scott.wang committed

	// clone from client
	// manage lifecycle
	ctx context.Context

	role common.RoleType

scott.wang's avatar
scott.wang committed
	// protect patch current pod operation
	lock sync.Mutex
	needWatchedNamespace map[string]struct{}
	namespace            string
	name                 string
scott.wang's avatar
scott.wang committed
	kc                               kubernetes.Interface
scott.wang's avatar
scott.wang committed
	listAndWatchStartResourceVersion uint64
scott.wang's avatar
scott.wang committed
	namespacedInformerFactory        map[string]informers.SharedInformerFactory
	namespacedPodInformers           map[string]informerscorev1.PodInformer
	queue                            workqueue.Interface //shared by namespaced informers
scott.wang's avatar
scott.wang committed
func newDubboRegistryController(
	ctx context.Context,
	// different provider and consumer have behavior
	roleType common.RoleType,
	// used to inject mock kubernetes client
	kcGetter func() (kubernetes.Interface, error),
) (*dubboRegistryController, error) {
scott.wang's avatar
scott.wang committed

	kc, err := kcGetter()
	if err != nil {
		return nil, perrors.WithMessage(err, "get kubernetes client")
	}
scott.wang's avatar
scott.wang committed

	c := &dubboRegistryController{
scott.wang's avatar
scott.wang committed
		ctx:                       ctx,
scott.wang's avatar
scott.wang committed
		watcherSet:                newWatcherSet(ctx),
		needWatchedNamespace:      make(map[string]struct{}),
scott.wang's avatar
scott.wang committed
		namespacedInformerFactory: make(map[string]informers.SharedInformerFactory),
scott.wang's avatar
scott.wang committed
		namespacedPodInformers:    make(map[string]informerscorev1.PodInformer),
scott.wang's avatar
scott.wang committed
		kc:                        kc,
scott.wang's avatar
scott.wang committed
	}

	if err := c.readConfig(); err != nil {
scott.wang's avatar
scott.wang committed
		return nil, perrors.WithMessage(err, "read config")
scott.wang's avatar
scott.wang committed
	}

scott.wang's avatar
scott.wang committed
	if err := c.initCurrentPod(); err != nil {
scott.wang's avatar
scott.wang committed
		return nil, perrors.WithMessage(err, "init current pod")
scott.wang's avatar
scott.wang committed
	}

scott.wang's avatar
scott.wang committed
	if err := c.initWatchSet(); err != nil {
scott.wang's avatar
scott.wang committed
		return nil, perrors.WithMessage(err, "init watch set")
	}

	if err := c.initPodInformer(); err != nil {
		return nil, perrors.WithMessage(err, "init pod informer")
scott.wang's avatar
scott.wang committed
	return c, nil
}

scott.wang's avatar
scott.wang committed
// GetInClusterKubernetesClient
// current pod running in kubernetes-cluster
func GetInClusterKubernetesClient() (kubernetes.Interface, error) {

	// read in-cluster config
	cfg, err := rest.InClusterConfig()
	if err != nil {
		return nil, perrors.WithMessage(err, "get in-cluster config")
	}

	return kubernetes.NewForConfig(cfg)
}

scott.wang's avatar
scott.wang committed
// initWatchSet
// 1. get all with dubbo label pods
// 2. put every element to watcherSet
scott.wang's avatar
scott.wang committed
// 3. refresh watch book-mark
scott.wang's avatar
scott.wang committed
func (c *dubboRegistryController) initWatchSet() error {
scott.wang's avatar
scott.wang committed
	req, err := labels.NewRequirement(DubboIOLabelKey, selection.In, []string{DubboIOConsumerLabelValue, DubboIOProviderLabelValue})
	if err != nil {
		return perrors.WithMessage(err, "new requirement")
	}

	for ns := range c.needWatchedNamespace {
scott.wang's avatar
scott.wang committed
		pods, err := c.kc.CoreV1().Pods(ns).List(metav1.ListOptions{
			LabelSelector: req.String(),
scott.wang's avatar
scott.wang committed
		})
		if err != nil {
			return perrors.WithMessagef(err, "list pods in namespace (%s)", ns)
		}
scott.wang's avatar
scott.wang committed
		for _, p := range pods.Items {
			// set resource version
			rv, err := strconv.ParseUint(p.GetResourceVersion(), 10, 0)
			if err != nil {
				return perrors.WithMessagef(err, "parse resource version %s", p.GetResourceVersion())
			}
			if c.listAndWatchStartResourceVersion < rv {
				c.listAndWatchStartResourceVersion = rv
			}
			c.handleWatchedPodEvent(&p, watch.Added)
// read dubbo-registry controller config
scott.wang's avatar
scott.wang committed
// 1. current pod name
// 2. current pod working namespace
func (c *dubboRegistryController) readConfig() error {

	// read current pod name && namespace
	c.name = os.Getenv(podNameKey)
	if len(c.name) == 0 {
		return perrors.New("read value from env by key (HOSTNAME)")
	}
	c.namespace = os.Getenv(nameSpaceKey)
	if len(c.namespace) == 0 {
		return perrors.New("read value from env by key (NAMESPACE)")
	}
	return nil
}

func (c *dubboRegistryController) initNamespacedPodInformer(ns string) error {

scott.wang's avatar
scott.wang committed
	req, err := labels.NewRequirement(DubboIOLabelKey, selection.In, []string{DubboIOConsumerLabelValue, DubboIOProviderLabelValue})
	if err != nil {
		return perrors.WithMessage(err, "new requirement")
	}

	informersFactory := informers.NewSharedInformerFactoryWithOptions(
		c.kc,
		defaultResync,
		informers.WithNamespace(ns),
		informers.WithTweakListOptions(func(options *metav1.ListOptions) {
			options.LabelSelector = req.String()
scott.wang's avatar
scott.wang committed
			options.ResourceVersion = strconv.FormatUint(c.listAndWatchStartResourceVersion, 10)
		}),
	)
	podInformer := informersFactory.Core().V1().Pods()

	podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
		AddFunc:    c.addPod,
		UpdateFunc: c.updatePod,
		DeleteFunc: c.deletePod,
	})

	c.namespacedInformerFactory[ns] = informersFactory
	c.namespacedPodInformers[ns] = podInformer
func (c *dubboRegistryController) initPodInformer() error {
scott.wang's avatar
scott.wang committed
	if c.role == common.PROVIDER {
	// read need watched namespaces list
	needWatchedNameSpaceList := os.Getenv(needWatchedNameSpaceKey)
	if len(needWatchedNameSpaceList) == 0 {
		return perrors.New("read value from env by key (DUBBO_NAMESPACE)")
	}
	for _, ns := range strings.Split(needWatchedNameSpaceList, ",") {
		c.needWatchedNamespace[ns] = struct{}{}
	}
	// current work namespace should be watched
	c.needWatchedNamespace[c.namespace] = struct{}{}
	c.queue = workqueue.New()

	// init all watch needed pod-informer
	for watchedNS := range c.needWatchedNamespace {
		if err := c.initNamespacedPodInformer(watchedNS); err != nil {
			return err
		}
	}
	return nil
}

type kubernetesEvent struct {
	p *v1.Pod
	t watch.EventType
}

func (c *dubboRegistryController) addPod(obj interface{}) {
	p, ok := obj.(*v1.Pod)
	if !ok {
		logger.Warnf("pod-informer got object %T not *v1.Pod", obj)
		return
	}
	c.queue.Add(&kubernetesEvent{
		t: watch.Added,
		p: p,
	})
}

func (c *dubboRegistryController) updatePod(oldObj, newObj interface{}) {
	op, ok := oldObj.(*v1.Pod)
	if !ok {
		logger.Warnf("pod-informer got object %T not *v1.Pod", oldObj)
		return
	}
	np, ok := newObj.(*v1.Pod)
	if !ok {
		logger.Warnf("pod-informer got object %T not *v1.Pod", newObj)
		return
	}
	if op.GetResourceVersion() == np.GetResourceVersion() {
		return
	}
	c.queue.Add(&kubernetesEvent{
		p: np,
		t: watch.Modified,
	})
}

func (c *dubboRegistryController) deletePod(obj interface{}) {
	p, ok := obj.(*v1.Pod)
	if !ok {
		logger.Warnf("pod-informer got object %T not *v1.Pod", obj)
		return
	}
	c.queue.Add(&kubernetesEvent{
		p: p,
		t: watch.Deleted,
	})
}

func (c *dubboRegistryController) startALLInformers() {
scott.wang's avatar
scott.wang committed
	logger.Debugf("starting namespaced informer-factory")
	for _, factory := range c.namespacedInformerFactory {
		go factory.Start(c.ctx.Done())
	}
}

// run
// controller process every event in work-queue
func (c *dubboRegistryController) run() {

scott.wang's avatar
scott.wang committed
	if c.role == common.PROVIDER {
	defer logger.Warn("dubbo registry controller work stopped")
	defer c.queue.ShutDown()

	for ns, podInformer := range c.namespacedPodInformers {
		if !cache.WaitForCacheSync(c.ctx.Done(), podInformer.Informer().HasSynced) {
			logger.Errorf("wait for cache sync finish @namespace %s fail", ns)
			return
		}
	}

scott.wang's avatar
scott.wang committed
	logger.Infof("kubernetes registry-controller running @Namespace = %q @PodName = %q", c.namespace, c.name)
scott.wang's avatar
scott.wang committed

scott.wang's avatar
scott.wang committed
	// block wait context cancel
	<-c.ctx.Done()
}

func (c *dubboRegistryController) work() {
	for c.processNextWorkItem() {
	}
}

// processNextWorkItem process work-queue elements
func (c *dubboRegistryController) processNextWorkItem() bool {
	item, shutdown := c.queue.Get()
	if shutdown {
		return false
	}
	defer c.queue.Done(item)
	o := item.(*kubernetesEvent)
	c.handleWatchedPodEvent(o.p, o.t)
	return true
}

// handleWatchedPodEvent
// handle watched pod event
func (c *dubboRegistryController) handleWatchedPodEvent(p *v1.Pod, eventType watch.EventType) {
scott.wang's avatar
scott.wang committed
	logger.Debugf("get @type = %s event from @pod = %s", eventType, p.GetName())
	for ak, av := range p.GetAnnotations() {
		// not dubbo interest annotation
		if ak != DubboIOAnnotationKey {
			continue
		}
		ol, err := c.unmarshalRecord(av)
		if err != nil {
			logger.Errorf("there a pod with dubbo annotation, but unmarshal dubbo value %v", err)
			return
		}
		for _, o := range ol {
			switch eventType {
			case watch.Added:
				// if pod is added, the record always be create
				o.EventType = Create
			case watch.Modified:
				o.EventType = Update
			case watch.Deleted:
				o.EventType = Delete
			default:
				logger.Errorf("no valid kubernetes event-type (%s) ", eventType)
				return
			}

			logger.Debugf("putting @key=%s @value=%s to watcherSet", o.Key, o.Value)
			if err := c.watcherSet.Put(o); err != nil {
				logger.Errorf("put (%#v) to cache watcherSet: %v ", o, err)
				return
			}
		}
	}
}

// unmarshalRecord
// unmarshal the kubernetes dubbo annotation value
func (c *dubboRegistryController) unmarshalRecord(record string) ([]*WatcherEvent, error) {

	if len(record) == 0 {
		// []*WatcherEvent is nil.
		return nil, nil
	}

	rawMsg, err := base64.URLEncoding.DecodeString(record)
	if err != nil {
		return nil, perrors.WithMessagef(err, "decode record (%s)", record)
	}

	var out []*WatcherEvent
	if err := json.Unmarshal(rawMsg, &out); err != nil {
		return nil, perrors.WithMessage(err, "decode json")
	}
	return out, nil
}
scott.wang's avatar
scott.wang committed

// initCurrentPod
// 1. get current pod
// 2. give the dubbo-label for this pod
scott.wang's avatar
scott.wang committed
func (c *dubboRegistryController) initCurrentPod() error {
	currentPod, err := c.readCurrentPod()
scott.wang's avatar
scott.wang committed
	if err != nil {
scott.wang's avatar
scott.wang committed
		return perrors.WithMessagef(err, "get current (%s) pod in namespace (%s)", c.name, c.namespace)
scott.wang's avatar
scott.wang committed
	}

	oldPod, newPod, err := c.assembleDUBBOLabel(currentPod)
	if err != nil {
scott.wang's avatar
scott.wang committed
		if err == ErrDubboLabelAlreadyExist {
			return nil
scott.wang's avatar
scott.wang committed
		}
scott.wang's avatar
scott.wang committed
		return perrors.WithMessage(err, "assemble dubbo label")
scott.wang's avatar
scott.wang committed
	}
scott.wang's avatar
scott.wang committed
	// current pod don't have label
scott.wang's avatar
scott.wang committed
	p, err := c.getPatch(oldPod, newPod)
	if err != nil {
scott.wang's avatar
scott.wang committed
		return perrors.WithMessage(err, "get patch")
scott.wang's avatar
scott.wang committed
	}

	currentPod, err = c.patchCurrentPod(p)
	if err != nil {
scott.wang's avatar
scott.wang committed
		return perrors.WithMessage(err, "patch to current pod")
scott.wang's avatar
scott.wang committed
	}

scott.wang's avatar
scott.wang committed
	return nil
scott.wang's avatar
scott.wang committed
}

// patch current pod
// write new meta for current pod
func (c *dubboRegistryController) patchCurrentPod(patch []byte) (*v1.Pod, error) {
	updatedPod, err := c.kc.CoreV1().Pods(c.namespace).Patch(c.name, types.StrategicMergePatchType, patch)
	if err != nil {
		return nil, perrors.WithMessage(err, "patch in kubernetes pod ")
	}
	return updatedPod, nil
}

// assemble the dubbo kubernetes label
// every dubbo instance should be labeled spec {"dubbo.io/label":"dubbo.io/label-value"} label
scott.wang's avatar
scott.wang committed
func (c *dubboRegistryController) assembleDUBBOLabel(p *v1.Pod) (*v1.Pod, *v1.Pod, error) {
scott.wang's avatar
scott.wang committed
	var (
		oldPod = &v1.Pod{}
		newPod = &v1.Pod{}
	)
	oldPod.Labels = make(map[string]string, 8)
	newPod.Labels = make(map[string]string, 8)

scott.wang's avatar
scott.wang committed
	if p.GetLabels() != nil {
		if _, ok := p.GetLabels()[DubboIOLabelKey]; ok {
scott.wang's avatar
scott.wang committed
			// already have label
			return nil, nil, ErrDubboLabelAlreadyExist
		}
	}

	// copy current pod labels to oldPod && newPod
scott.wang's avatar
scott.wang committed
	for k, v := range p.GetLabels() {
scott.wang's avatar
scott.wang committed
		oldPod.Labels[k] = v
		newPod.Labels[k] = v
	}
scott.wang's avatar
scott.wang committed
	// assign new label for current pod
	switch c.role {
	case common.CONSUMER:
		newPod.Labels[DubboIOLabelKey] = DubboIOConsumerLabelValue
	case common.PROVIDER:
		newPod.Labels[DubboIOLabelKey] = DubboIOProviderLabelValue
	default:
		return nil, nil, perrors.New(fmt.Sprintf("unknown role %s", c.role))
	}
scott.wang's avatar
scott.wang committed
	return oldPod, newPod, nil
}

// assemble the dubbo kubernetes annotations
// accord the current pod && (k,v) assemble the old-pod, new-pod
func (c *dubboRegistryController) assembleDUBBOAnnotations(k, v string, currentPod *v1.Pod) (oldPod *v1.Pod, newPod *v1.Pod, err error) {

	oldPod = &v1.Pod{}
	newPod = &v1.Pod{}
	oldPod.Annotations = make(map[string]string, 8)
	newPod.Annotations = make(map[string]string, 8)

	for k, v := range currentPod.GetAnnotations() {
		oldPod.Annotations[k] = v
		newPod.Annotations[k] = v
	}

	al, err := c.unmarshalRecord(oldPod.GetAnnotations()[DubboIOAnnotationKey])
	if err != nil {
		err = perrors.WithMessage(err, "unmarshal record")
		return
	}

	newAnnotations, err := c.marshalRecord(append(al, &WatcherEvent{Key: k, Value: v}))
	if err != nil {
		err = perrors.WithMessage(err, "marshal record")
		return
	}

	newPod.Annotations[DubboIOAnnotationKey] = newAnnotations
	return
}

// getPatch
// get the kubernetes pod patch bytes
func (c *dubboRegistryController) getPatch(oldPod, newPod *v1.Pod) ([]byte, error) {
	oldData, err := json.Marshal(oldPod)
	if err != nil {
		return nil, perrors.WithMessage(err, "marshal old pod")
	}

	newData, err := json.Marshal(newPod)
	if err != nil {
		return nil, perrors.WithMessage(err, "marshal newPod pod")
	}

	patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, v1.Pod{})
	if err != nil {
		return nil, perrors.WithMessage(err, "create two-way-merge-patch")
	}
	return patchBytes, nil
}

// marshalRecord
// marshal the kubernetes dubbo annotation value
func (c *dubboRegistryController) marshalRecord(ol []*WatcherEvent) (string, error) {
	msg, err := json.Marshal(ol)
	if err != nil {
		return "", perrors.WithMessage(err, "json encode object list")
	}
	return base64.URLEncoding.EncodeToString(msg), nil
}

scott.wang's avatar
scott.wang committed
// read from kubernetes-env current pod status
scott.wang's avatar
scott.wang committed
func (c *dubboRegistryController) readCurrentPod() (*v1.Pod, error) {
	currentPod, err := c.kc.CoreV1().Pods(c.namespace).Get(c.name, metav1.GetOptions{})
	if err != nil {
		return nil, perrors.WithMessagef(err, "get current (%s) pod in namespace (%s)", c.name, c.namespace)
	}
	return currentPod, nil
}

scott.wang's avatar
scott.wang committed
// add annotation for current pod
scott.wang's avatar
scott.wang committed
func (c *dubboRegistryController) addAnnotationForCurrentPod(k string, v string) error {

	c.lock.Lock()
	defer c.lock.Unlock()

	// 1. accord old pod && (k, v) assemble new pod dubbo annotion v
	// 2. get patch data
	// 3. PATCH the pod
	currentPod, err := c.readCurrentPod()
	if err != nil {
		return perrors.WithMessage(err, "read current pod")
	}

	oldPod, newPod, err := c.assembleDUBBOAnnotations(k, v, currentPod)
	if err != nil {
		return perrors.WithMessage(err, "assemble")
	}

	patchBytes, err := c.getPatch(oldPod, newPod)
	if err != nil {
		return perrors.WithMessage(err, "get patch")
	}

	_, err = c.patchCurrentPod(patchBytes)
	if err != nil {
		return perrors.WithMessage(err, "patch current pod")
	}
scott.wang's avatar
scott.wang committed

	return c.watcherSet.Put(&WatcherEvent{
		Key:       k,
		Value:     v,
		EventType: Create,
	})
scott.wang's avatar
scott.wang committed
}