diff --git a/remoting/kubernetes/client.go b/remoting/kubernetes/client.go index 6bddb0ec09a6f6157cec235b5caa1e65140a776c..1df8ac86df04f3954a1f7721aae3aaf405f3c35b 100644 --- a/remoting/kubernetes/client.go +++ b/remoting/kubernetes/client.go @@ -19,23 +19,12 @@ package kubernetes import ( "context" - "encoding/base64" - "encoding/json" - "os" "sync" - "time" ) import ( perrors "github.com/pkg/errors" - "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/fields" - "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/strategicpatch" - "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/kubernetes" - "k8s.io/client-go/rest" ) import ( @@ -58,405 +47,34 @@ var ( ) type Client struct { - - // kubernetes connection config - cfg *rest.Config - - // the kubernetes interface - rawClient kubernetes.Interface - - // WIP tag - - // current pod config - currentPodName string - - ns string - - // current resource version - lastResourceVersion string - - // the memory watcherSet - watcherSet WatcherSet - - // protect the wg && currentPod lock sync.RWMutex - // current pod status - currentPod *v1.Pod - // protect the watchPods loop && watcher - wg sync.WaitGroup // manage the client lifecycle ctx context.Context cancel context.CancelFunc -} - -// load CurrentPodName -func getCurrentPodName() (string, error) { - - v := os.Getenv(podNameKey) - if len(v) == 0 { - return "", perrors.New("read value from env by key (HOSTNAME)") - } - return v, nil -} -// load CurrentNameSpace -func getCurrentNameSpace() (string, error) { - - v := os.Getenv(nameSpaceKey) - if len(v) == 0 { - return "", perrors.New("read value from env by key (NAMESPACE)") - } - return v, nil -} - -// NewMockClient -// export for registry package test -func NewMockClient(namespace string, mockClientGenerator func() (kubernetes.Interface, error)) (*Client, error) { - return newMockClient(namespace, mockClientGenerator) -} - -// newMockClient -// new a client for test -func newMockClient(namespace string, mockClientGenerator func() (kubernetes.Interface, error)) (*Client, error) { - - rawClient, err := mockClientGenerator() - if err != nil { - return nil, perrors.WithMessage(err, "call mock generator") - } - - currentPodName, err := getCurrentPodName() - if err != nil { - return nil, perrors.WithMessage(err, "get pod name") - } - - ctx, cancel := context.WithCancel(context.Background()) - - c := &Client{ - currentPodName: currentPodName, - ns: namespace, - rawClient: rawClient, - ctx: ctx, - watcherSet: newWatcherSet(ctx), - cancel: cancel, - } - - currentPod, err := c.initCurrentPod() - if err != nil { - return nil, perrors.WithMessage(err, "init current pod") - } - - // record current status - c.currentPod = currentPod - - // init the watcherSet by current pods - if err := c.initWatchSet(); err != nil { - return nil, perrors.WithMessage(err, "init watcherSet") - } - - c.lastResourceVersion = c.currentPod.GetResourceVersion() - - // start kubernetes watch loop - if err := c.watchPods(); err != nil { - return nil, perrors.WithMessage(err, "watch pods") - } - - logger.Infof("init kubernetes registry client success @namespace = %q @Podname = %q", namespace, c.currentPod.Name) - return c, nil + controller *dubboRegistryController } // newClient // new a client for registry func newClient(namespace string) (*Client, error) { - cfg, err := rest.InClusterConfig() - if err != nil { - return nil, perrors.WithMessage(err, "get in-cluster config") - } - - rawClient, err := kubernetes.NewForConfig(cfg) - if err != nil { - return nil, perrors.WithMessage(err, "new kubernetes client by in cluster config") - } - - currentPodName, err := getCurrentPodName() - if err != nil { - return nil, perrors.WithMessage(err, "get pod name") - } - ctx, cancel := context.WithCancel(context.Background()) - - c := &Client{ - currentPodName: currentPodName, - ns: namespace, - cfg: cfg, - rawClient: rawClient, - ctx: ctx, - watcherSet: newWatcherSet(ctx), - cancel: cancel, - } - - currentPod, err := c.initCurrentPod() + controller, err := newDubboRegistryController(ctx) if err != nil { - return nil, perrors.WithMessage(err, "init current pod") - } - - // record current status - c.currentPod = currentPod - - // init the watcherSet by current pods - if err := c.initWatchSet(); err != nil { - return nil, perrors.WithMessage(err, "init watcherSet") + return nil, perrors.WithMessage(err, "new dubbo-registry controller") } - // start kubernetes watch loop - if err := c.watchPods(); err != nil { - return nil, perrors.WithMessage(err, "watch pods") + c := &Client{ + ctx: ctx, + cancel: cancel, + controller: controller, } - logger.Infof("init kubernetes registry client success @namespace = %q @Podname = %q", namespace, c.currentPod.Name) return c, nil } -// initCurrentPod -// 1. get current pod -// 2. give the dubbo-label for this pod -func (c *Client) initCurrentPod() (*v1.Pod, error) { - - // read the current pod status - currentPod, err := c.rawClient.CoreV1().Pods(c.ns).Get(c.currentPodName, metav1.GetOptions{}) - if err != nil { - return nil, perrors.WithMessagef(err, "get current (%s) pod in namespace (%s)", c.currentPodName, c.ns) - } - - oldPod, newPod, err := c.assembleDUBBOLabel(currentPod) - if err != nil { - if err != ErrDubboLabelAlreadyExist { - return nil, perrors.WithMessage(err, "assemble dubbo label") - } - // current pod don't have label - } - - p, err := c.getPatch(oldPod, newPod) - if err != nil { - return nil, perrors.WithMessage(err, "get patch") - } - - currentPod, err = c.patchCurrentPod(p) - if err != nil { - return nil, perrors.WithMessage(err, "patch to current pod") - } - - return currentPod, nil -} - -// initWatchSet -// 1. get all with dubbo label pods -// 2. put every element to watcherSet -func (c *Client) initWatchSet() error { - - pods, err := c.rawClient.CoreV1().Pods(c.ns).List(metav1.ListOptions{ - LabelSelector: fields.OneTermEqualSelector(DubboIOLabelKey, DubboIOLabelValue).String(), - }) - if err != nil { - return perrors.WithMessagef(err, "list pods in namespace (%s)", c.ns) - } - - // set resource version - c.lastResourceVersion = pods.GetResourceVersion() - - for _, pod := range pods.Items { - logger.Debugf("got the pod (name: %s), (label: %v), (annotations: %v)", pod.Name, pod.GetLabels(), pod.GetAnnotations()) - c.handleWatchedPodEvent(&pod, watch.Added) - } - - return nil -} - -// watchPods -// try to watch kubernetes pods -func (c *Client) watchPods() error { - - // try once - watcher, err := c.rawClient.CoreV1().Pods(c.ns).Watch(metav1.ListOptions{ - LabelSelector: fields.OneTermEqualSelector(DubboIOLabelKey, DubboIOLabelValue).String(), - Watch: true, - ResourceVersion: c.lastResourceVersion, - }) - if err != nil { - return perrors.WithMessagef(err, "try to watch the namespace (%s) pods", c.ns) - } - - watcher.Stop() - - c.wg.Add(1) - // add wg, grace close the client - go c.watchPodsLoop() - return nil -} - -type resourceVersionGetter interface { - GetResourceVersion() string -} - -// watchPods -// try to notify -func (c *Client) watchPodsLoop() { - - defer func() { - // notify other goroutine, this loop over - c.wg.Done() - logger.Info("watchPodsLoop goroutine game over") - }() - - for { - onceWatch: - wc, err := c.rawClient.CoreV1().Pods(c.ns).Watch(metav1.ListOptions{ - LabelSelector: fields.OneTermEqualSelector(DubboIOLabelKey, DubboIOLabelValue).String(), - Watch: true, - ResourceVersion: c.lastResourceVersion, - }) - if err != nil { - logger.Warnf("watch the namespace (%s) pods: %v, retry after 2 seconds", c.ns, err) - time.Sleep(2 * time.Second) - continue - } - - logger.Infof("the old kubernetes client broken, collect the resource status from resource version (%s)", c.lastResourceVersion) - - for { - select { - // double check ctx - case <-c.ctx.Done(): - logger.Infof("the kubernetes client stopped, resultChan len %d", len(wc.ResultChan())) - return - - // get one element from result-chan - case event, ok := <-wc.ResultChan(): - if !ok { - wc.Stop() - logger.Info("kubernetes watch chan die, create new") - goto onceWatch - } - - if event.Type == watch.Error { - // watched a error event - logger.Warnf("kubernetes watch api report err (%#v)", event) - continue - } - - o, ok := event.Object.(resourceVersionGetter) - if !ok { - logger.Warnf("kubernetes response object not a versioned object, its real type %T", event.Object) - continue - } - - // record the last resource version avoid to sync all pod - c.lastResourceVersion = o.GetResourceVersion() - logger.Infof("kubernetes get the current resource version %v", c.lastResourceVersion) - - // check event object type - p, ok := event.Object.(*v1.Pod) - if !ok { - logger.Warnf("kubernetes response object not a Pod, its real type %T", event.Object) - continue - } - - logger.Debugf("kubernetes got pod %#v", p) - // handle the watched pod - go c.handleWatchedPodEvent(p, event.Type) - } - } - } -} - -// handleWatchedPodEvent -// handle watched pod event -func (c *Client) handleWatchedPodEvent(p *v1.Pod, eventType watch.EventType) { - - for ak, av := range p.GetAnnotations() { - - // not dubbo interest annotation - if ak != DubboIOAnnotationKey { - continue - } - - ol, err := c.unmarshalRecord(av) - if err != nil { - logger.Errorf("there a pod with dubbo annotation, but unmarshal dubbo value %v", err) - return - } - - for _, o := range ol { - - switch eventType { - case watch.Added: - // if pod is added, the record always be create - o.SourceObjectEventType = Create - case watch.Modified: - o.SourceObjectEventType = Update - case watch.Deleted: - o.SourceObjectEventType = Delete - default: - logger.Errorf("no valid kubernetes event-type (%s) ", eventType) - return - } - - logger.Debugf("prepare to put object (%#v) to kubernetes-watcherSet", o) - - if err := c.watcherSet.Put(o); err != nil { - logger.Errorf("put (%#v) to cache watcherSet: %v ", o, err) - return - } - - } - - } -} - -// unmarshalRecord -// unmarshal the kubernetes dubbo annotation value -func (c *Client) unmarshalRecord(record string) ([]*WatcherEvent, error) { - - if len(record) == 0 { - // []*WatcherEvent is nil. - return nil, nil - } - - rawMsg, err := base64.URLEncoding.DecodeString(record) - if err != nil { - return nil, perrors.WithMessagef(err, "decode record (%s)", record) - } - - var out []*WatcherEvent - if err := json.Unmarshal(rawMsg, &out); err != nil { - return nil, perrors.WithMessage(err, "decode json") - } - return out, nil -} - -// marshalRecord -// marshal the kubernetes dubbo annotation value -func (c *Client) marshalRecord(ol []*WatcherEvent) (string, error) { - - msg, err := json.Marshal(ol) - if err != nil { - return "", perrors.WithMessage(err, "json encode object list") - } - return base64.URLEncoding.EncodeToString(msg), nil -} - -// readCurrentPod -// read the current pod status from kubernetes api -func (c *Client) readCurrentPod() (*v1.Pod, error) { - - currentPod, err := c.rawClient.CoreV1().Pods(c.ns).Get(c.currentPodName, metav1.GetOptions{}) - if err != nil { - return nil, perrors.WithMessagef(err, "get current (%s) pod in namespace (%s)", c.currentPodName, c.ns) - } - return currentPod, nil -} - // Create // create k/v pair in watcher-set func (c *Client) Create(k, v string) error { @@ -466,132 +84,19 @@ func (c *Client) Create(k, v string) error { c.lock.Lock() defer c.lock.Unlock() - // 1. accord old pod && (k, v) assemble new pod dubbo annotion v - // 2. get patch data - // 3. PATCH the pod - currentPod, err := c.readCurrentPod() - if err != nil { - return perrors.WithMessage(err, "read current pod") - } - - oldPod, newPod, err := c.assembleDUBBOAnnotations(k, v, currentPod) - if err != nil { - return perrors.WithMessage(err, "assemble") - } - - patchBytes, err := c.getPatch(oldPod, newPod) - if err != nil { - return perrors.WithMessage(err, "get patch") - } - - updatedPod, err := c.patchCurrentPod(patchBytes) - if err != nil { - return perrors.WithMessage(err, "patch current pod") + if err := c.controller.addAnnotationForCurrentPod(k, v); err != nil { + return perrors.WithMessagef(err, "add annotation @key = %s @value = %s", k, v) } - c.currentPod = updatedPod logger.Debugf("put the @key = %s @value = %s success", k, v) - // not update the watcherSet, the watcherSet should be write by the watchPodsLoop return nil } -// patch current pod -// write new meta for current pod -func (c *Client) patchCurrentPod(patch []byte) (*v1.Pod, error) { - - updatedPod, err := c.rawClient.CoreV1().Pods(c.ns).Patch(c.currentPodName, types.StrategicMergePatchType, patch) - if err != nil { - return nil, perrors.WithMessage(err, "patch in kubernetes pod ") - } - return updatedPod, nil -} - -// assemble the dubbo kubernetes label -// every dubbo instance should be labeled spec {"dubbo.io/label":"dubbo.io/label-value"} label -func (c *Client) assembleDUBBOLabel(currentPod *v1.Pod) (*v1.Pod, *v1.Pod, error) { - - var ( - oldPod = &v1.Pod{} - newPod = &v1.Pod{} - ) - - oldPod.Labels = make(map[string]string, 8) - newPod.Labels = make(map[string]string, 8) - - if currentPod.GetLabels() != nil { - - if currentPod.GetLabels()[DubboIOLabelKey] == DubboIOLabelValue { - // already have label - return nil, nil, ErrDubboLabelAlreadyExist - } - } - - // copy current pod labels to oldPod && newPod - for k, v := range currentPod.GetLabels() { - oldPod.Labels[k] = v - newPod.Labels[k] = v - } - // assign new label for current pod - newPod.Labels[DubboIOLabelKey] = DubboIOLabelValue - return oldPod, newPod, nil -} - -// assemble the dubbo kubernetes annotations -// accord the current pod && (k,v) assemble the old-pod, new-pod -func (c *Client) assembleDUBBOAnnotations(k, v string, currentPod *v1.Pod) (oldPod *v1.Pod, newPod *v1.Pod, err error) { - - oldPod = &v1.Pod{} - newPod = &v1.Pod{} - oldPod.Annotations = make(map[string]string, 8) - newPod.Annotations = make(map[string]string, 8) - - for k, v := range currentPod.GetAnnotations() { - oldPod.Annotations[k] = v - newPod.Annotations[k] = v - } - - al, err := c.unmarshalRecord(oldPod.GetAnnotations()[DubboIOAnnotationKey]) - if err != nil { - err = perrors.WithMessage(err, "unmarshal record") - return - } - - newAnnotations, err := c.marshalRecord(append(al, &WatcherEvent{Key: k, Value: v})) - if err != nil { - err = perrors.WithMessage(err, "marshal record") - return - } - - newPod.Annotations[DubboIOAnnotationKey] = newAnnotations - return -} - -// getPatch -// get the kubernetes pod patch bytes -func (c *Client) getPatch(oldPod, newPod *v1.Pod) ([]byte, error) { - - oldData, err := json.Marshal(oldPod) - if err != nil { - return nil, perrors.WithMessage(err, "marshal old pod") - } - - newData, err := json.Marshal(newPod) - if err != nil { - return nil, perrors.WithMessage(err, "marshal newPod pod") - } - - patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, v1.Pod{}) - if err != nil { - return nil, perrors.WithMessage(err, "create two-way-merge-patch") - } - return patchBytes, nil -} - // GetChildren // get k children list from kubernetes-watcherSet func (c *Client) GetChildren(k string) ([]string, []string, error) { - objectList, err := c.watcherSet.Get(k, true) + objectList, err := c.controller.watcherSet.Get(k, true) if err != nil { return nil, nil, perrors.WithMessagef(err, "get children from watcherSet on (%s)", k) } @@ -611,7 +116,7 @@ func (c *Client) GetChildren(k string) ([]string, []string, error) { // watch on spec key func (c *Client) Watch(k string) (<-chan *WatcherEvent, <-chan struct{}, error) { - w, err := c.watcherSet.Watch(k, false) + w, err := c.controller.watcherSet.Watch(k, false) if err != nil { return nil, nil, perrors.WithMessagef(err, "watch on (%s)", k) } @@ -623,7 +128,7 @@ func (c *Client) Watch(k string) (<-chan *WatcherEvent, <-chan struct{}, error) // watch on spec prefix func (c *Client) WatchWithPrefix(prefix string) (<-chan *WatcherEvent, <-chan struct{}, error) { - w, err := c.watcherSet.Watch(prefix, true) + w, err := c.controller.watcherSet.Watch(prefix, true) if err != nil { return nil, nil, perrors.WithMessagef(err, "watch on prefix (%s)", prefix) } @@ -643,7 +148,7 @@ func (c *Client) Valid() bool { } c.lock.RLock() defer c.lock.RUnlock() - return c.rawClient != nil + return c.controller != nil } // Done @@ -667,7 +172,6 @@ func (c *Client) Close() { // the client ctx be canceled // will trigger the watcherSet watchers all stopped // so, just wait - c.wg.Wait() } // ValidateClient @@ -678,17 +182,74 @@ func ValidateClient(container clientFacade) error { // new Client if client == nil || client.Valid() { - ns, err := getCurrentNameSpace() - if err != nil { - return perrors.WithMessage(err, "get current namespace") - } - newClient, err := newClient(ns) + //ns, err := getCurrentNameSpace() + //if err != nil { + // return perrors.WithMessage(err, "get current namespace") + //} + //newClient, err := newClient(ns) + newClient, err := newClient("") if err != nil { - logger.Warnf("new kubernetes client (namespace{%s}: %v)", ns, err) - return perrors.WithMessagef(err, "new kubernetes client (:%+v)", ns) + logger.Warnf("new kubernetes client (namespace{%s}: %v)", "", err) + return perrors.WithMessagef(err, "new kubernetes client (:%+v)", "") } container.SetClient(newClient) } return nil } + +// NewMockClient +// export for registry package test +func NewMockClient(namespace string, mockClientGenerator func() (kubernetes.Interface, error)) (*Client, error) { + return nil, nil + //return newMockClient(namespace, mockClientGenerator) +} + +// newMockClient +// new a client for test +//func newMockClient(namespace string, mockClientGenerator func() (kubernetes.Interface, error)) (*Client, error) { +// +// rawClient, err := mockClientGenerator() +// if err != nil { +// return nil, perrors.WithMessage(err, "call mock generator") +// } +// +// currentPodName, err := getCurrentPodName() +// if err != nil { +// return nil, perrors.WithMessage(err, "get pod name") +// } +// +// ctx, cancel := context.WithCancel(context.Background()) +// +// c := &Client{ +// currentPodName: currentPodName, +// ns: namespace, +// rawClient: rawClient, +// ctx: ctx, +// watcherSet: newWatcherSet(ctx), +// cancel: cancel, +// } +// +// currentPod, err := c.initCurrentPod() +// if err != nil { +// return nil, perrors.WithMessage(err, "init current pod") +// } +// +// // record current status +// c.currentPod = currentPod +// +// // init the watcherSet by current pods +// if err := c.initWatchSet(); err != nil { +// return nil, perrors.WithMessage(err, "init watcherSet") +// } +// +// c.lastResourceVersion = c.currentPod.GetResourceVersion() +// +// // start kubernetes watch loop +// if err := c.watchPods(); err != nil { +// return nil, perrors.WithMessage(err, "watch pods") +// } +// +// logger.Infof("init kubernetes registry client success @namespace = %q @Podname = %q", namespace, c.currentPod.Name) +// return c, nil +//} diff --git a/remoting/kubernetes/dubbo_controller.go b/remoting/kubernetes/registry_controller.go similarity index 53% rename from remoting/kubernetes/dubbo_controller.go rename to remoting/kubernetes/registry_controller.go index 5bb3c159ea770b79cfced2a27e5ad145307dbed9..150740655fb938ec4984b48942016f7e35a6fcc6 100644 --- a/remoting/kubernetes/dubbo_controller.go +++ b/remoting/kubernetes/registry_controller.go @@ -4,8 +4,10 @@ import ( "context" "encoding/base64" "encoding/json" - "fmt" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/strategicpatch" "os" + "sync" "time" "github.com/apache/dubbo-go/common/logger" @@ -31,8 +33,13 @@ const ( // dubboRegistryController // work like a kubernetes controller type dubboRegistryController struct { - ctx context.Context - cancel context.CancelFunc + + // clone from client + // manage lifecycle + ctx context.Context + + // protect patch current pod operation + lock sync.Mutex // current pod config needWatchedNamespaceList []string @@ -49,7 +56,33 @@ type dubboRegistryController struct { queue workqueue.Interface //shared by namespaced informers } +func newDubboRegistryController(ctx context.Context) (*dubboRegistryController, error) { + + c := &dubboRegistryController{ + ctx: ctx, + watcherSet: newWatcherSet(ctx), + } + + if err := c.readConfig(); err != nil { + return nil, perrors.WithMessage(err, "dubbo registry controller read config") + } + + if err := c.init(); err != nil { + return nil,perrors.WithMessage(err, "dubbo registry controller init") + } + + if _, err := c.initCurrentPod(); err != nil { + return nil,perrors.WithMessage(err, "init current pod") + } + + go c.run() + return c, nil +} + // read dubbo-registry controller config +// 1. read kubernetes InCluster config +// 1. current pod name +// 2. current pod working namespace func (c *dubboRegistryController) readConfig() error { // read in-cluster config cfg, err := rest.InClusterConfig() @@ -116,21 +149,6 @@ func (c *dubboRegistryController) init() error { return nil } -func newDubboRegistryController() error { - - c := &dubboRegistryController{} - if err := c.readConfig(); err != nil { - return perrors.WithMessage(err, "dubbo registry controller read config") - } - - if err := c.init(); err != nil { - return perrors.WithMessage(err, "dubbo registry controller init") - } - - go c.run() - return nil -} - type kubernetesEvent struct { p *v1.Pod t watch.EventType @@ -186,9 +204,6 @@ func (c *dubboRegistryController) Run() { } } -func (c *dubboRegistryController) stop() { - c.cancel() -} func (c *dubboRegistryController) run() { defer c.queue.ShutDown() @@ -200,11 +215,12 @@ func (c *dubboRegistryController) run() { } } + logger.Infof("init kubernetes registry client success @namespace = %q @Podname = %q", c.namespace, c.name) + // start work go c.work() - // block wait + // block wait context cancel <-c.ctx.Done() - } func (c *dubboRegistryController) work() { @@ -290,3 +306,176 @@ func (c *dubboRegistryController) unmarshalRecord(record string) ([]*WatcherEven } return out, nil } + +// initCurrentPod +// 1. get current pod +// 2. give the dubbo-label for this pod +func (c *dubboRegistryController) initCurrentPod() (*v1.Pod, error) { + + currentPod, err := c.kc.CoreV1().Pods(c.namespace).Get(c.name, metav1.GetOptions{}) + if err != nil { + return nil, perrors.WithMessagef(err, "get current (%s) pod in namespace (%s)", c.name, c.namespace) + } + + oldPod, newPod, err := c.assembleDUBBOLabel(currentPod) + if err != nil { + if err != ErrDubboLabelAlreadyExist { + return nil, perrors.WithMessage(err, "assemble dubbo label") + } + // current pod don't have label + } + + p, err := c.getPatch(oldPod, newPod) + if err != nil { + return nil, perrors.WithMessage(err, "get patch") + } + + currentPod, err = c.patchCurrentPod(p) + if err != nil { + return nil, perrors.WithMessage(err, "patch to current pod") + } + + return currentPod, nil +} + +// patch current pod +// write new meta for current pod +func (c *dubboRegistryController) patchCurrentPod(patch []byte) (*v1.Pod, error) { + + updatedPod, err := c.kc.CoreV1().Pods(c.namespace).Patch(c.name, types.StrategicMergePatchType, patch) + if err != nil { + return nil, perrors.WithMessage(err, "patch in kubernetes pod ") + } + return updatedPod, nil +} + +// assemble the dubbo kubernetes label +// every dubbo instance should be labeled spec {"dubbo.io/label":"dubbo.io/label-value"} label +func (c *dubboRegistryController) assembleDUBBOLabel(currentPod *v1.Pod) (*v1.Pod, *v1.Pod, error) { + + var ( + oldPod = &v1.Pod{} + newPod = &v1.Pod{} + ) + + oldPod.Labels = make(map[string]string, 8) + newPod.Labels = make(map[string]string, 8) + + if currentPod.GetLabels() != nil { + + if currentPod.GetLabels()[DubboIOLabelKey] == DubboIOLabelValue { + // already have label + return nil, nil, ErrDubboLabelAlreadyExist + } + } + + // copy current pod labels to oldPod && newPod + for k, v := range currentPod.GetLabels() { + oldPod.Labels[k] = v + newPod.Labels[k] = v + } + // assign new label for current pod + newPod.Labels[DubboIOLabelKey] = DubboIOLabelValue + return oldPod, newPod, nil +} + +// assemble the dubbo kubernetes annotations +// accord the current pod && (k,v) assemble the old-pod, new-pod +func (c *dubboRegistryController) assembleDUBBOAnnotations(k, v string, currentPod *v1.Pod) (oldPod *v1.Pod, newPod *v1.Pod, err error) { + + oldPod = &v1.Pod{} + newPod = &v1.Pod{} + oldPod.Annotations = make(map[string]string, 8) + newPod.Annotations = make(map[string]string, 8) + + for k, v := range currentPod.GetAnnotations() { + oldPod.Annotations[k] = v + newPod.Annotations[k] = v + } + + al, err := c.unmarshalRecord(oldPod.GetAnnotations()[DubboIOAnnotationKey]) + if err != nil { + err = perrors.WithMessage(err, "unmarshal record") + return + } + + newAnnotations, err := c.marshalRecord(append(al, &WatcherEvent{Key: k, Value: v})) + if err != nil { + err = perrors.WithMessage(err, "marshal record") + return + } + + newPod.Annotations[DubboIOAnnotationKey] = newAnnotations + return +} + +// getPatch +// get the kubernetes pod patch bytes +func (c *dubboRegistryController) getPatch(oldPod, newPod *v1.Pod) ([]byte, error) { + + oldData, err := json.Marshal(oldPod) + if err != nil { + return nil, perrors.WithMessage(err, "marshal old pod") + } + + newData, err := json.Marshal(newPod) + if err != nil { + return nil, perrors.WithMessage(err, "marshal newPod pod") + } + + patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, v1.Pod{}) + if err != nil { + return nil, perrors.WithMessage(err, "create two-way-merge-patch") + } + return patchBytes, nil +} + +// marshalRecord +// marshal the kubernetes dubbo annotation value +func (c *dubboRegistryController) marshalRecord(ol []*WatcherEvent) (string, error) { + + msg, err := json.Marshal(ol) + if err != nil { + return "", perrors.WithMessage(err, "json encode object list") + } + return base64.URLEncoding.EncodeToString(msg), nil +} + +func (c *dubboRegistryController) readCurrentPod() (*v1.Pod, error) { + + currentPod, err := c.kc.CoreV1().Pods(c.namespace).Get(c.name, metav1.GetOptions{}) + if err != nil { + return nil, perrors.WithMessagef(err, "get current (%s) pod in namespace (%s)", c.name, c.namespace) + } + return currentPod, nil +} + +func (c *dubboRegistryController) addAnnotationForCurrentPod(k string, v string) error { + + c.lock.Lock() + defer c.lock.Unlock() + + // 1. accord old pod && (k, v) assemble new pod dubbo annotion v + // 2. get patch data + // 3. PATCH the pod + currentPod, err := c.readCurrentPod() + if err != nil { + return perrors.WithMessage(err, "read current pod") + } + + oldPod, newPod, err := c.assembleDUBBOAnnotations(k, v, currentPod) + if err != nil { + return perrors.WithMessage(err, "assemble") + } + + patchBytes, err := c.getPatch(oldPod, newPod) + if err != nil { + return perrors.WithMessage(err, "get patch") + } + + _, err = c.patchCurrentPod(patchBytes) + if err != nil { + return perrors.WithMessage(err, "patch current pod") + } + return nil +} diff --git a/remoting/kubernetes/watch.go b/remoting/kubernetes/watch.go index ec3ae5cba3c89b738381ccdaa53cdb9631226a8a..a9e4c13ce1a9bd615b72c34d48d50f95f695f720 100644 --- a/remoting/kubernetes/watch.go +++ b/remoting/kubernetes/watch.go @@ -150,7 +150,7 @@ func (s *watcherSetImpl) Done() <-chan struct{} { // put the watch event to watcher-set func (s *watcherSetImpl) Put(watcherEvent *WatcherEvent) error { - sendMsg := func(object *WatcherEvent, w *watcher) { + blockSendMsg := func(object *WatcherEvent, w *watcher) { select { case <-w.done(): @@ -198,12 +198,12 @@ func (s *watcherSetImpl) Put(watcherEvent *WatcherEvent) error { } if !w.interested.prefix { if watcherEvent.Key == w.interested.key { - go sendMsg(watcherEvent, w) + blockSendMsg(watcherEvent, w) } // not interest continue } - go sendMsg(watcherEvent, w) + blockSendMsg(watcherEvent, w) } return nil }