Skip to content
Snippets Groups Projects
dubbo_controller.go 6.72 KiB
Newer Older
package kubernetes

import (
	"context"
	"encoding/base64"
	"encoding/json"
	"fmt"
	"os"
	"time"

	"github.com/apache/dubbo-go/common/logger"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/apimachinery/pkg/labels"
	"k8s.io/apimachinery/pkg/watch"
	"k8s.io/client-go/informers"
	"k8s.io/client-go/tools/cache"

	perrors "github.com/pkg/errors"
	v1 "k8s.io/api/core/v1"
	informerscorev1 "k8s.io/client-go/informers/core/v1"
	"k8s.io/client-go/kubernetes"
	"k8s.io/client-go/rest"
	"k8s.io/client-go/util/workqueue"
)

const (
	// kubernetes suggest resync
	defaultResync = 5 * time.Minute
)

// dubboRegistryController
// work like a kubernetes controller
type dubboRegistryController struct {
	ctx    context.Context
	cancel context.CancelFunc

	// current pod config
	needWatchedNamespaceList []string
	namespace                string
	name                     string
	cfg                      *rest.Config

	watcherSet WatcherSet

	// kubernetes
	kc                        kubernetes.Interface
	namespacedInformerFactory map[string]informers.SharedInformerFactory
	namespacedPodInformers    map[string]informerscorev1.PodInformer
	queue                     workqueue.Interface //shared by namespaced informers
}

// read dubbo-registry controller config
func (c *dubboRegistryController) readConfig() error {
	// read in-cluster config
	cfg, err := rest.InClusterConfig()
	if err != nil {
		return perrors.WithMessage(err, "get in-cluster config")
	}
	c.cfg = cfg

	// read current pod name && namespace
	c.name = os.Getenv(podNameKey)
	if len(c.name) == 0 {
		return perrors.New("read value from env by key (HOSTNAME)")
	}
	c.namespace = os.Getenv(nameSpaceKey)
	if len(c.namespace) == 0 {
		return perrors.New("read value from env by key (NAMESPACE)")
	}
	return nil
}

func (c *dubboRegistryController) initNamespacedPodInformer(ns string) {

	informersFactory := informers.NewSharedInformerFactoryWithOptions(
		c.kc,
		defaultResync,
		informers.WithNamespace(ns),
		informers.WithTweakListOptions(func(options *metav1.ListOptions) {
			labelSelector := &metav1.LabelSelector{
				MatchLabels: map[string]string{DubboIOLabelKey: DubboIOLabelValue},
			}
			labelMap, err := metav1.LabelSelectorAsMap(labelSelector)
			if err != nil {
				panic(err)
			}
			options.LabelSelector = labels.SelectorFromSet(labelMap).String()
		}),
	)
	podInformer := informersFactory.Core().V1().Pods()

	podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
		AddFunc:    c.addPod,
		UpdateFunc: c.updatePod,
		DeleteFunc: c.deletePod,
	})

	c.namespacedInformerFactory[ns] = informersFactory
	c.namespacedPodInformers[ns] = podInformer
}

func (c *dubboRegistryController) init() error {
	// init kubernetes client
	var err error
	c.kc, err = kubernetes.NewForConfig(c.cfg)
	if err != nil {
		return perrors.WithMessage(err, "new kubernetes client from config")
	}
	c.queue = workqueue.New()

	// init all watch needed pod-informer
	for _, watchedNS := range c.needWatchedNamespaceList {
		c.initNamespacedPodInformer(watchedNS)
	}

	return nil
}

func newDubboRegistryController() error {

	c := &dubboRegistryController{}
	if err := c.readConfig(); err != nil {
		return perrors.WithMessage(err, "dubbo registry controller read config")
	}

	if err := c.init(); err != nil {
		return perrors.WithMessage(err, "dubbo registry controller init")
	}

	go c.run()
	return nil
}

type kubernetesEvent struct {
	p *v1.Pod
	t watch.EventType
}

func (c *dubboRegistryController) addPod(obj interface{}) {
	p, ok := obj.(*v1.Pod)
	if !ok {
		logger.Warnf("pod-informer got object %T not *v1.Pod", obj)
		return
	}
	c.queue.Add(&kubernetesEvent{
		t: watch.Added,
		p: p,
	})
}

func (c *dubboRegistryController) updatePod(oldObj, newObj interface{}) {
	op, ok := oldObj.(*v1.Pod)
	if !ok {
		logger.Warnf("pod-informer got object %T not *v1.Pod", oldObj)
		return
	}
	np, ok := newObj.(*v1.Pod)
	if !ok {
		logger.Warnf("pod-informer got object %T not *v1.Pod", newObj)
		return
	}
	if op.GetResourceVersion() == np.GetResourceVersion() {
		return
	}
	c.queue.Add(&kubernetesEvent{
		p: np,
		t: watch.Modified,
	})
}

func (c *dubboRegistryController) deletePod(obj interface{}) {
	p, ok := obj.(*v1.Pod)
	if !ok {
		logger.Warnf("pod-informer got object %T not *v1.Pod", obj)
		return
	}
	c.queue.Add(&kubernetesEvent{
		p: p,
		t: watch.Deleted,
	})
}

func (c *dubboRegistryController) Run() {
	for _, factory := range c.namespacedInformerFactory {
		go factory.Start(c.ctx.Done())
	}
}

func (c *dubboRegistryController) stop() {
	c.cancel()
}
func (c *dubboRegistryController) run() {

	defer c.queue.ShutDown()

	for ns, podInformer := range c.namespacedPodInformers {
		if !cache.WaitForCacheSync(c.ctx.Done(), podInformer.Informer().HasSynced) {
			logger.Errorf("wait for cache sync finish @namespace %s fail", ns)
			return
		}
	}

	// start work
	go c.work()
	// block wait
	<-c.ctx.Done()

}

func (c *dubboRegistryController) work() {
	defer logger.Debugf("dubbo registry controller work stopped")
	for c.processNextWorkItem() {
	}
}

// processNextWorkItem process work-queue elements
func (c *dubboRegistryController) processNextWorkItem() bool {

	item, shutdown := c.queue.Get()
	if shutdown {
		return false
	}
	defer c.queue.Done(item)
	o := item.(*kubernetesEvent)
	c.handleWatchedPodEvent(o.p, o.t)
	return true
}

// handleWatchedPodEvent
// handle watched pod event
func (c *dubboRegistryController) handleWatchedPodEvent(p *v1.Pod, eventType watch.EventType) {

	for ak, av := range p.GetAnnotations() {

		// not dubbo interest annotation
		if ak != DubboIOAnnotationKey {
			continue
		}

		ol, err := c.unmarshalRecord(av)
		if err != nil {
			logger.Errorf("there a pod with dubbo annotation, but unmarshal dubbo value %v", err)
			return
		}

		for _, o := range ol {

			switch eventType {
			case watch.Added:
				// if pod is added, the record always be create
				o.EventType = Create
			case watch.Modified:
				o.EventType = Update
			case watch.Deleted:
				o.EventType = Delete
			default:
				logger.Errorf("no valid kubernetes event-type (%s) ", eventType)
				return
			}

			logger.Debugf("prepare to put object (%#v) to kubernetes-watcherSet", o)

			if err := c.watcherSet.Put(o); err != nil {
				logger.Errorf("put (%#v) to cache watcherSet: %v ", o, err)
				return
			}

		}

	}
}

// unmarshalRecord
// unmarshal the kubernetes dubbo annotation value
func (c *dubboRegistryController) unmarshalRecord(record string) ([]*WatcherEvent, error) {

	if len(record) == 0 {
		// []*WatcherEvent is nil.
		return nil, nil
	}

	rawMsg, err := base64.URLEncoding.DecodeString(record)
	if err != nil {
		return nil, perrors.WithMessagef(err, "decode record (%s)", record)
	}

	var out []*WatcherEvent
	if err := json.Unmarshal(rawMsg, &out); err != nil {
		return nil, perrors.WithMessage(err, "decode json")
	}
	return out, nil
}