From 667f4a088c6c55c37ea88ba83b03b40cc7b0fce1 Mon Sep 17 00:00:00 2001 From: "scott.wang" <scottwangsxll@gmail.com> Date: Wed, 3 Jun 2020 18:38:36 +0800 Subject: [PATCH] provider and consumer pod use different label --- remoting/kubernetes/registry_controller.go | 75 +++++++++++++++------- 1 file changed, 52 insertions(+), 23 deletions(-) diff --git a/remoting/kubernetes/registry_controller.go b/remoting/kubernetes/registry_controller.go index 0fd023d1e..2b2aa51da 100644 --- a/remoting/kubernetes/registry_controller.go +++ b/remoting/kubernetes/registry_controller.go @@ -4,8 +4,9 @@ import ( "context" "encoding/base64" "encoding/json" + "fmt" "github.com/apache/dubbo-go/common" - "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/selection" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/strategicpatch" "os" @@ -42,8 +43,9 @@ const ( // all pod annotation key DubboIOAnnotationKey = "dubbo.io/annotation" - DubboIOLabelKey = "dubbo.io/label" - DubboIOLabelValue = "dubbo.io-value" + DubboIOLabelKey = "dubbo.io/label" + DubboIOConsumerLabelValue = "dubbo.io.consumer" + DubboIOProviderLabelValue = "dubbo.io.provider" ) var ( @@ -58,6 +60,8 @@ type dubboRegistryController struct { // manage lifecycle ctx context.Context + role common.RoleType + // protect patch current pod operation lock sync.Mutex @@ -85,6 +89,7 @@ func newDubboRegistryController(ctx context.Context, roleType common.RoleType, k c := &dubboRegistryController{ ctx: ctx, + role: roleType, watcherSet: newWatcherSet(ctx), needWatchedNamespace: make(map[string]struct{}), namespacedInformerFactory: make(map[string]informers.SharedInformerFactory), @@ -104,13 +109,12 @@ func newDubboRegistryController(ctx context.Context, roleType common.RoleType, k return nil, perrors.WithMessage(err, "init watch set") } - if roleType == common.CONSUMER { - // only consumer need list && watch - if err := c.initPodInformer(); err != nil { - return nil, perrors.WithMessage(err, "init pod informer") - } - go c.run() + if err := c.initPodInformer(); err != nil { + return nil, perrors.WithMessage(err, "init pod informer") } + + go c.run() + return c, nil } @@ -132,9 +136,15 @@ func GetInClusterKubernetesClient() (kubernetes.Interface, error) { // 2. put every element to watcherSet // 3. refresh watch book-mark func (c *dubboRegistryController) initWatchSet() error { + + req, err := labels.NewRequirement(DubboIOLabelKey, selection.In, []string{DubboIOConsumerLabelValue, DubboIOProviderLabelValue}) + if err != nil{ + return perrors.WithMessage(err, "new requirement") + } + for ns := range c.needWatchedNamespace { pods, err := c.kc.CoreV1().Pods(ns).List(metav1.ListOptions{ - LabelSelector: fields.OneTermEqualSelector(DubboIOLabelKey, DubboIOLabelValue).String(), + LabelSelector: req.String(), }) if err != nil { return perrors.WithMessagef(err, "list pods in namespace (%s)", ns) @@ -171,22 +181,19 @@ func (c *dubboRegistryController) readConfig() error { return nil } -func (c *dubboRegistryController) initNamespacedPodInformer(ns string) { +func (c *dubboRegistryController) initNamespacedPodInformer(ns string) error { + + req, err := labels.NewRequirement(DubboIOLabelKey, selection.In, []string{DubboIOConsumerLabelValue, DubboIOProviderLabelValue}) + if err != nil{ + return perrors.WithMessage(err, "new requirement") + } informersFactory := informers.NewSharedInformerFactoryWithOptions( c.kc, defaultResync, informers.WithNamespace(ns), informers.WithTweakListOptions(func(options *metav1.ListOptions) { - labelSelector := &metav1.LabelSelector{ - MatchLabels: map[string]string{DubboIOLabelKey: DubboIOLabelValue}, - } - labelMap, err := metav1.LabelSelectorAsMap(labelSelector) - if err != nil { - logger.Errorf("label selector ad map: %v", err) - return - } - options.LabelSelector = labels.SelectorFromSet(labelMap).String() + options.LabelSelector = req.String() options.ResourceVersion = strconv.FormatUint(c.listAndWatchStartResourceVersion, 10) }), ) @@ -200,10 +207,16 @@ func (c *dubboRegistryController) initNamespacedPodInformer(ns string) { c.namespacedInformerFactory[ns] = informersFactory c.namespacedPodInformers[ns] = podInformer + + return nil } func (c *dubboRegistryController) initPodInformer() error { + if c.role == common.PROVIDER{ + return nil + } + // read need watched namespaces list needWatchedNameSpaceList := os.Getenv(needWatchedNameSpaceKey) if len(needWatchedNameSpaceList) == 0 { @@ -219,7 +232,9 @@ func (c *dubboRegistryController) initPodInformer() error { // init all watch needed pod-informer for watchedNS := range c.needWatchedNamespace { - c.initNamespacedPodInformer(watchedNS) + if err := c.initNamespacedPodInformer(watchedNS); err != nil { + return err + } } return nil } @@ -282,8 +297,14 @@ func (c *dubboRegistryController) Run() { logger.Debugf("finish start namespaced informer-factory") } +// run +// controller process every event in work-queue func (c *dubboRegistryController) run() { + if c.role == common.PROVIDER{ + return + } + defer c.queue.ShutDown() for ns, podInformer := range c.namespacedPodInformers { @@ -438,7 +459,7 @@ func (c *dubboRegistryController) assembleDUBBOLabel(p *v1.Pod) (*v1.Pod, *v1.Po newPod.Labels = make(map[string]string, 8) if p.GetLabels() != nil { - if p.GetLabels()[DubboIOLabelKey] == DubboIOLabelValue { + if _, ok := p.GetLabels()[DubboIOLabelKey]; ok { // already have label return nil, nil, ErrDubboLabelAlreadyExist } @@ -449,8 +470,16 @@ func (c *dubboRegistryController) assembleDUBBOLabel(p *v1.Pod) (*v1.Pod, *v1.Po oldPod.Labels[k] = v newPod.Labels[k] = v } + // assign new label for current pod - newPod.Labels[DubboIOLabelKey] = DubboIOLabelValue + switch c.role { + case common.CONSUMER: + newPod.Labels[DubboIOLabelKey] = DubboIOConsumerLabelValue + case common.PROVIDER: + newPod.Labels[DubboIOLabelKey] = DubboIOProviderLabelValue + default: + return nil, nil, perrors.New(fmt.Sprintf("unknown role %s", c.role)) + } return oldPod, newPod, nil } -- GitLab