Skip to content
Snippets Groups Projects
registry_controller.go 14.5 KiB
Newer Older
package kubernetes

import (
	"context"
	"encoding/base64"
	"encoding/json"
scott.wang's avatar
scott.wang committed
	"k8s.io/apimachinery/pkg/fields"
scott.wang's avatar
scott.wang committed
	"k8s.io/apimachinery/pkg/types"
	"k8s.io/apimachinery/pkg/util/strategicpatch"
scott.wang's avatar
scott.wang committed
	"strconv"
scott.wang's avatar
scott.wang committed
	"strings"
scott.wang's avatar
scott.wang committed
	"sync"
	"time"

	"github.com/apache/dubbo-go/common/logger"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/apimachinery/pkg/labels"
	"k8s.io/apimachinery/pkg/watch"
	"k8s.io/client-go/informers"
	"k8s.io/client-go/tools/cache"

	perrors "github.com/pkg/errors"
	v1 "k8s.io/api/core/v1"
	informerscorev1 "k8s.io/client-go/informers/core/v1"
	"k8s.io/client-go/kubernetes"
	"k8s.io/client-go/rest"
	"k8s.io/client-go/util/workqueue"
)

const (
	// kubernetes suggest resync
	defaultResync = 5 * time.Minute
)

scott.wang's avatar
scott.wang committed
const (
	// kubernetes inject the var
	podNameKey              = "HOSTNAME"
	nameSpaceKey            = "NAMESPACE"
	needWatchedNameSpaceKey = "DUBBO_NAMESPACE"
	// all pod annotation key
	DubboIOAnnotationKey = "dubbo.io/annotation"

	DubboIOLabelKey   = "dubbo.io/label"
	DubboIOLabelValue = "dubbo.io-value"
)

var (
	ErrDubboLabelAlreadyExist = perrors.New("dubbo label already exist")
)

// dubboRegistryController
// work like a kubernetes controller
type dubboRegistryController struct {
scott.wang's avatar
scott.wang committed

	// clone from client
	// manage lifecycle
	ctx context.Context

	// protect patch current pod operation
	lock sync.Mutex
	needWatchedNamespace map[string]struct{}
	namespace            string
	name                 string
	cfg                  *rest.Config
scott.wang's avatar
scott.wang committed
	kc                               kubernetes.Interface
scott.wang's avatar
scott.wang committed
	listAndWatchStartResourceVersion uint64
scott.wang's avatar
scott.wang committed
	namespacedInformerFactory        map[string]informers.SharedInformerFactory
	namespacedPodInformers           map[string]informerscorev1.PodInformer
	queue                            workqueue.Interface //shared by namespaced informers
scott.wang's avatar
scott.wang committed
func newDubboRegistryController(ctx context.Context) (*dubboRegistryController, error) {

	c := &dubboRegistryController{
scott.wang's avatar
scott.wang committed
		ctx:                       ctx,
		watcherSet:                newWatcherSet(ctx),
		needWatchedNamespace:      make(map[string]struct{}),
scott.wang's avatar
scott.wang committed
		namespacedInformerFactory: make(map[string]informers.SharedInformerFactory),
scott.wang's avatar
scott.wang committed
		namespacedPodInformers:    make(map[string]informerscorev1.PodInformer),
scott.wang's avatar
scott.wang committed
	}

	if err := c.readConfig(); err != nil {
		return nil, perrors.WithMessage(err, "dubbo registry controller read config")
	}

	if err := c.init(); err != nil {
scott.wang's avatar
scott.wang committed
		return nil, perrors.WithMessage(err, "dubbo registry controller init")
scott.wang's avatar
scott.wang committed
	}

scott.wang's avatar
scott.wang committed
	if err := c.initCurrentPod(); err != nil {
scott.wang's avatar
scott.wang committed
		return nil, perrors.WithMessage(err, "init current pod")
scott.wang's avatar
scott.wang committed
	}

scott.wang's avatar
scott.wang committed
	if err := c.initWatchSet(); err != nil {
scott.wang's avatar
scott.wang committed
		return nil, perrors.WithMessage(err, "init watch set")
	}

scott.wang's avatar
scott.wang committed
	go c.run()
	return c, nil
}

scott.wang's avatar
scott.wang committed
// initWatchSet
// 1. get all with dubbo label pods
// 2. put every element to watcherSet
scott.wang's avatar
scott.wang committed
// 3. refresh watch book-mark
scott.wang's avatar
scott.wang committed
func (c *dubboRegistryController) initWatchSet() error {
	for ns := range c.needWatchedNamespace {
scott.wang's avatar
scott.wang committed
		pods, err := c.kc.CoreV1().Pods(ns).List(metav1.ListOptions{
			LabelSelector: fields.OneTermEqualSelector(DubboIOLabelKey, DubboIOLabelValue).String(),
		})
		if err != nil {
			return perrors.WithMessagef(err, "list pods in namespace (%s)", ns)
		}

		// set resource version
		rv, err := strconv.ParseUint(pods.GetResourceVersion(), 10, 0)
scott.wang's avatar
scott.wang committed
		if err != nil {
scott.wang's avatar
scott.wang committed
			return perrors.WithMessagef(err, "parse resource version %s", pods.GetResourceVersion())
		}
scott.wang's avatar
scott.wang committed
		if c.listAndWatchStartResourceVersion < rv {
scott.wang's avatar
scott.wang committed
			c.listAndWatchStartResourceVersion = rv
		}

		for _, pod := range pods.Items {
			logger.Debugf("got the pod (name: %s), (label: %v), (annotations: %v)", pod.Name, pod.GetLabels(), pod.GetAnnotations())
			c.handleWatchedPodEvent(&pod, watch.Added)
		}
	}
	return nil
}

// read dubbo-registry controller config
scott.wang's avatar
scott.wang committed
// 1. read kubernetes InCluster config
// 1. current pod name
// 2. current pod working namespace
func (c *dubboRegistryController) readConfig() error {
	// read in-cluster config
	cfg, err := rest.InClusterConfig()
	if err != nil {
		return perrors.WithMessage(err, "get in-cluster config")
	}
	c.cfg = cfg

	// read current pod name && namespace
	c.name = os.Getenv(podNameKey)
	if len(c.name) == 0 {
		return perrors.New("read value from env by key (HOSTNAME)")
	}
	c.namespace = os.Getenv(nameSpaceKey)
	if len(c.namespace) == 0 {
		return perrors.New("read value from env by key (NAMESPACE)")
	}
scott.wang's avatar
scott.wang committed
	// read need watched namespaces list
	needWatchedNameSpaceList := os.Getenv(needWatchedNameSpaceKey)
	if len(needWatchedNameSpaceList) == 0 {
		return perrors.New("read value from env by key (DUBBO_NAMESPACE)")
	}
	for _, ns := range strings.Split(needWatchedNameSpaceList, ",") {
		c.needWatchedNamespace[ns] = struct{}{}
scott.wang's avatar
scott.wang committed
	}

	// current work namespace should be watched
	c.needWatchedNamespace[c.namespace] = struct{}{}
	return nil
}

func (c *dubboRegistryController) initNamespacedPodInformer(ns string) {

	informersFactory := informers.NewSharedInformerFactoryWithOptions(
		c.kc,
		defaultResync,
		informers.WithNamespace(ns),
		informers.WithTweakListOptions(func(options *metav1.ListOptions) {
			labelSelector := &metav1.LabelSelector{
				MatchLabels: map[string]string{DubboIOLabelKey: DubboIOLabelValue},
			}
			labelMap, err := metav1.LabelSelectorAsMap(labelSelector)
			if err != nil {
scott.wang's avatar
scott.wang committed
				logger.Errorf("label selector ad map: %v", err)
				return
			}
			options.LabelSelector = labels.SelectorFromSet(labelMap).String()
scott.wang's avatar
scott.wang committed
			options.ResourceVersion = strconv.FormatUint(c.listAndWatchStartResourceVersion, 10)
		}),
	)
	podInformer := informersFactory.Core().V1().Pods()

	podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
		AddFunc:    c.addPod,
		UpdateFunc: c.updatePod,
		DeleteFunc: c.deletePod,
	})

	c.namespacedInformerFactory[ns] = informersFactory
	c.namespacedPodInformers[ns] = podInformer
}

func (c *dubboRegistryController) init() error {
	// init kubernetes client
	var err error
	c.kc, err = kubernetes.NewForConfig(c.cfg)
	if err != nil {
		return perrors.WithMessage(err, "new kubernetes client from config")
	}
	c.queue = workqueue.New()

	// init all watch needed pod-informer
	for watchedNS := range c.needWatchedNamespace {
		c.initNamespacedPodInformer(watchedNS)
	}

	return nil
}

type kubernetesEvent struct {
	p *v1.Pod
	t watch.EventType
}

func (c *dubboRegistryController) addPod(obj interface{}) {
	p, ok := obj.(*v1.Pod)
	if !ok {
		logger.Warnf("pod-informer got object %T not *v1.Pod", obj)
		return
	}
	c.queue.Add(&kubernetesEvent{
		t: watch.Added,
		p: p,
	})
}

func (c *dubboRegistryController) updatePod(oldObj, newObj interface{}) {
	op, ok := oldObj.(*v1.Pod)
	if !ok {
		logger.Warnf("pod-informer got object %T not *v1.Pod", oldObj)
		return
	}
	np, ok := newObj.(*v1.Pod)
	if !ok {
		logger.Warnf("pod-informer got object %T not *v1.Pod", newObj)
		return
	}
	if op.GetResourceVersion() == np.GetResourceVersion() {
		return
	}
	c.queue.Add(&kubernetesEvent{
		p: np,
		t: watch.Modified,
	})
}

func (c *dubboRegistryController) deletePod(obj interface{}) {
	p, ok := obj.(*v1.Pod)
	if !ok {
		logger.Warnf("pod-informer got object %T not *v1.Pod", obj)
		return
	}
	c.queue.Add(&kubernetesEvent{
		p: p,
		t: watch.Deleted,
	})
}

func (c *dubboRegistryController) Run() {
scott.wang's avatar
scott.wang committed

	logger.Debugf("starting namespaced informer-factory")
	for _, factory := range c.namespacedInformerFactory {
		go factory.Start(c.ctx.Done())
	}
scott.wang's avatar
scott.wang committed
	logger.Debugf("finish start namespaced informer-factory")
}

func (c *dubboRegistryController) run() {

	defer c.queue.ShutDown()

	for ns, podInformer := range c.namespacedPodInformers {
		if !cache.WaitForCacheSync(c.ctx.Done(), podInformer.Informer().HasSynced) {
			logger.Errorf("wait for cache sync finish @namespace %s fail", ns)
			return
		}
	}

scott.wang's avatar
scott.wang committed
	logger.Infof("kubernetes registry-controller running @Namespace = %q @PodName = %q", c.namespace, c.name)
scott.wang's avatar
scott.wang committed

scott.wang's avatar
scott.wang committed
	// block wait context cancel
	<-c.ctx.Done()
}

func (c *dubboRegistryController) work() {
	defer logger.Debugf("dubbo registry controller work stopped")
	for c.processNextWorkItem() {
	}
}

// processNextWorkItem process work-queue elements
func (c *dubboRegistryController) processNextWorkItem() bool {

	item, shutdown := c.queue.Get()
	if shutdown {
		return false
	}
	defer c.queue.Done(item)
	o := item.(*kubernetesEvent)
	c.handleWatchedPodEvent(o.p, o.t)
	return true
}

// handleWatchedPodEvent
// handle watched pod event
func (c *dubboRegistryController) handleWatchedPodEvent(p *v1.Pod, eventType watch.EventType) {

scott.wang's avatar
scott.wang committed
	logger.Warnf("get @type = %s event from @pod = %s", eventType, p.GetName())
	for ak, av := range p.GetAnnotations() {

		// not dubbo interest annotation
		if ak != DubboIOAnnotationKey {
			continue
		}

		ol, err := c.unmarshalRecord(av)
		if err != nil {
			logger.Errorf("there a pod with dubbo annotation, but unmarshal dubbo value %v", err)
			return
		}

		for _, o := range ol {

			switch eventType {
			case watch.Added:
				// if pod is added, the record always be create
				o.EventType = Create
			case watch.Modified:
				o.EventType = Update
			case watch.Deleted:
				o.EventType = Delete
			default:
				logger.Errorf("no valid kubernetes event-type (%s) ", eventType)
				return
			}

			logger.Debugf("prepare to put object (%#v) to kubernetes-watcherSet", o)

			if err := c.watcherSet.Put(o); err != nil {
				logger.Errorf("put (%#v) to cache watcherSet: %v ", o, err)
				return
			}

		}

	}
}

// unmarshalRecord
// unmarshal the kubernetes dubbo annotation value
func (c *dubboRegistryController) unmarshalRecord(record string) ([]*WatcherEvent, error) {

	if len(record) == 0 {
		// []*WatcherEvent is nil.
		return nil, nil
	}

	rawMsg, err := base64.URLEncoding.DecodeString(record)
	if err != nil {
		return nil, perrors.WithMessagef(err, "decode record (%s)", record)
	}

	var out []*WatcherEvent
	if err := json.Unmarshal(rawMsg, &out); err != nil {
		return nil, perrors.WithMessage(err, "decode json")
	}
	return out, nil
}
scott.wang's avatar
scott.wang committed

// initCurrentPod
// 1. get current pod
// 2. give the dubbo-label for this pod
scott.wang's avatar
scott.wang committed
func (c *dubboRegistryController) initCurrentPod() error {
	currentPod, err := c.readCurrentPod()
scott.wang's avatar
scott.wang committed
	if err != nil {
scott.wang's avatar
scott.wang committed
		return perrors.WithMessagef(err, "get current (%s) pod in namespace (%s)", c.name, c.namespace)
scott.wang's avatar
scott.wang committed
	}

	oldPod, newPod, err := c.assembleDUBBOLabel(currentPod)
	if err != nil {
scott.wang's avatar
scott.wang committed
		if err == ErrDubboLabelAlreadyExist {
			return nil
scott.wang's avatar
scott.wang committed
		}
scott.wang's avatar
scott.wang committed
		return perrors.WithMessage(err, "assemble dubbo label")
scott.wang's avatar
scott.wang committed
	}
scott.wang's avatar
scott.wang committed
	// current pod don't have label
scott.wang's avatar
scott.wang committed
	p, err := c.getPatch(oldPod, newPod)
	if err != nil {
scott.wang's avatar
scott.wang committed
		return perrors.WithMessage(err, "get patch")
scott.wang's avatar
scott.wang committed
	}

	currentPod, err = c.patchCurrentPod(p)
	if err != nil {
scott.wang's avatar
scott.wang committed
		return perrors.WithMessage(err, "patch to current pod")
scott.wang's avatar
scott.wang committed
	}

scott.wang's avatar
scott.wang committed
	return nil
scott.wang's avatar
scott.wang committed
}

// patch current pod
// write new meta for current pod
func (c *dubboRegistryController) patchCurrentPod(patch []byte) (*v1.Pod, error) {
	updatedPod, err := c.kc.CoreV1().Pods(c.namespace).Patch(c.name, types.StrategicMergePatchType, patch)
	if err != nil {
		return nil, perrors.WithMessage(err, "patch in kubernetes pod ")
	}
	return updatedPod, nil
}

// assemble the dubbo kubernetes label
// every dubbo instance should be labeled spec {"dubbo.io/label":"dubbo.io/label-value"} label
scott.wang's avatar
scott.wang committed
func (c *dubboRegistryController) assembleDUBBOLabel(p *v1.Pod) (*v1.Pod, *v1.Pod, error) {
scott.wang's avatar
scott.wang committed
	var (
		oldPod = &v1.Pod{}
		newPod = &v1.Pod{}
	)
	oldPod.Labels = make(map[string]string, 8)
	newPod.Labels = make(map[string]string, 8)

scott.wang's avatar
scott.wang committed
	if p.GetLabels() != nil {
		if p.GetLabels()[DubboIOLabelKey] == DubboIOLabelValue {
scott.wang's avatar
scott.wang committed
			// already have label
			return nil, nil, ErrDubboLabelAlreadyExist
		}
	}

	// copy current pod labels to oldPod && newPod
scott.wang's avatar
scott.wang committed
	for k, v := range p.GetLabels() {
scott.wang's avatar
scott.wang committed
		oldPod.Labels[k] = v
		newPod.Labels[k] = v
	}
	// assign new label for current pod
	newPod.Labels[DubboIOLabelKey] = DubboIOLabelValue
	return oldPod, newPod, nil
}

// assemble the dubbo kubernetes annotations
// accord the current pod && (k,v) assemble the old-pod, new-pod
func (c *dubboRegistryController) assembleDUBBOAnnotations(k, v string, currentPod *v1.Pod) (oldPod *v1.Pod, newPod *v1.Pod, err error) {

	oldPod = &v1.Pod{}
	newPod = &v1.Pod{}
	oldPod.Annotations = make(map[string]string, 8)
	newPod.Annotations = make(map[string]string, 8)

	for k, v := range currentPod.GetAnnotations() {
		oldPod.Annotations[k] = v
		newPod.Annotations[k] = v
	}

	al, err := c.unmarshalRecord(oldPod.GetAnnotations()[DubboIOAnnotationKey])
	if err != nil {
		err = perrors.WithMessage(err, "unmarshal record")
		return
	}

	newAnnotations, err := c.marshalRecord(append(al, &WatcherEvent{Key: k, Value: v}))
	if err != nil {
		err = perrors.WithMessage(err, "marshal record")
		return
	}

	newPod.Annotations[DubboIOAnnotationKey] = newAnnotations
	return
}

// getPatch
// get the kubernetes pod patch bytes
func (c *dubboRegistryController) getPatch(oldPod, newPod *v1.Pod) ([]byte, error) {
	oldData, err := json.Marshal(oldPod)
	if err != nil {
		return nil, perrors.WithMessage(err, "marshal old pod")
	}

	newData, err := json.Marshal(newPod)
	if err != nil {
		return nil, perrors.WithMessage(err, "marshal newPod pod")
	}

	patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, v1.Pod{})
	if err != nil {
		return nil, perrors.WithMessage(err, "create two-way-merge-patch")
	}
	return patchBytes, nil
}

// marshalRecord
// marshal the kubernetes dubbo annotation value
func (c *dubboRegistryController) marshalRecord(ol []*WatcherEvent) (string, error) {

	msg, err := json.Marshal(ol)
	if err != nil {
		return "", perrors.WithMessage(err, "json encode object list")
	}
	return base64.URLEncoding.EncodeToString(msg), nil
}

scott.wang's avatar
scott.wang committed
// read from kubernetes-env current pod status
scott.wang's avatar
scott.wang committed
func (c *dubboRegistryController) readCurrentPod() (*v1.Pod, error) {
	currentPod, err := c.kc.CoreV1().Pods(c.namespace).Get(c.name, metav1.GetOptions{})
	if err != nil {
		return nil, perrors.WithMessagef(err, "get current (%s) pod in namespace (%s)", c.name, c.namespace)
	}
	return currentPod, nil
}

scott.wang's avatar
scott.wang committed
// add annotation for current pod
scott.wang's avatar
scott.wang committed
func (c *dubboRegistryController) addAnnotationForCurrentPod(k string, v string) error {

	c.lock.Lock()
	defer c.lock.Unlock()

	// 1. accord old pod && (k, v) assemble new pod dubbo annotion v
	// 2. get patch data
	// 3. PATCH the pod
	currentPod, err := c.readCurrentPod()
	if err != nil {
		return perrors.WithMessage(err, "read current pod")
	}

	oldPod, newPod, err := c.assembleDUBBOAnnotations(k, v, currentPod)
	if err != nil {
		return perrors.WithMessage(err, "assemble")
	}

	patchBytes, err := c.getPatch(oldPod, newPod)
	if err != nil {
		return perrors.WithMessage(err, "get patch")
	}

	_, err = c.patchCurrentPod(patchBytes)
	if err != nil {
		return perrors.WithMessage(err, "patch current pod")
	}
	return nil
}