diff --git a/remoting/kubernetes/registry_controller.go b/remoting/kubernetes/registry_controller.go index 7929b5628f345fb1088a919d52c070e0fae3c796..0602d6001fe6c6f190539018f6dbe219ac47b18d 100644 --- a/remoting/kubernetes/registry_controller.go +++ b/remoting/kubernetes/registry_controller.go @@ -4,9 +4,11 @@ import ( "context" "encoding/base64" "encoding/json" + "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/strategicpatch" "os" + "strconv" "sync" "time" @@ -51,6 +53,7 @@ type dubboRegistryController struct { // kubernetes kc kubernetes.Interface + listAndWatchStartResourceVersion uint64 namespacedInformerFactory map[string]informers.SharedInformerFactory namespacedPodInformers map[string]informerscorev1.PodInformer queue workqueue.Interface //shared by namespaced informers @@ -77,15 +80,51 @@ func newDubboRegistryController(ctx context.Context) (*dubboRegistryController, return nil,perrors.WithMessage(err, "init current pod") } + if err := c.initWatchSet(); err != nil{ + return nil, perrors.WithMessage(err, "init watch set") + } + go c.run() return c, nil } +// initWatchSet +// 1. get all with dubbo label pods +// 2. put every element to watcherSet +func (c *dubboRegistryController) initWatchSet() error { + + + for _, ns := range c.needWatchedNamespaceList{ + + pods, err := c.kc.CoreV1().Pods(ns).List(metav1.ListOptions{ + LabelSelector: fields.OneTermEqualSelector(DubboIOLabelKey, DubboIOLabelValue).String(), + }) + if err != nil { + return perrors.WithMessagef(err, "list pods in namespace (%s)", ns) + } + + // set resource version + rv, err := strconv.ParseUint(pods.GetResourceVersion(), 10, 0) + if err != nil{ + return perrors.WithMessagef(err, "parse resource version %s", pods.GetResourceVersion()) + } + if c.listAndWatchStartResourceVersion < rv{ + c.listAndWatchStartResourceVersion = rv + } + + for _, pod := range pods.Items { + logger.Debugf("got the pod (name: %s), (label: %v), (annotations: %v)", pod.Name, pod.GetLabels(), pod.GetAnnotations()) + c.handleWatchedPodEvent(&pod, watch.Added) + } + + } + + return nil +} + // TODO // Wait informer sync latest status -func (c *dubboRegistryController) waitSync() error { -} // read dubbo-registry controller config // 1. read kubernetes InCluster config // 1. current pod name @@ -125,6 +164,7 @@ func (c *dubboRegistryController) initNamespacedPodInformer(ns string) { panic(err) } options.LabelSelector = labels.SelectorFromSet(labelMap).String() + options.ResourceVersion = strconv.FormatUint(c.listAndWatchStartResourceVersion, 10) }), ) podInformer := informersFactory.Core().V1().Pods() @@ -227,7 +267,7 @@ func (c *dubboRegistryController) run() { } } - logger.Infof("init kubernetes registry client success @namespace = %q @Podname = %q", c.namespace, c.name) + logger.Infof("kubernetes registry-controller running @namespace = %q @Podname = %q", c.namespace, c.name) // start work go c.work()