/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package kubernetes import ( "context" "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/constant" v1 "k8s.io/api/core/v1" "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/fake" "strconv" "sync" ) import ( perrors "github.com/pkg/errors" ) import ( "github.com/apache/dubbo-go/common/logger" ) type Client struct { lock sync.RWMutex // manage the client lifecycle ctx context.Context cancel context.CancelFunc controller *dubboRegistryController } // newClient // new a client for registry func newClient(url common.URL) (*Client, error) { ctx, cancel := context.WithCancel(context.Background()) // read type r, err := strconv.Atoi(url.GetParams().Get(constant.ROLE_KEY)) if err != nil { return nil, perrors.WithMessage(err, "atoi role") } controller, err := newDubboRegistryController(ctx, common.RoleType(r), GetInClusterKubernetesClient) if err != nil { return nil, perrors.WithMessage(err, "new dubbo-registry controller") } c := &Client{ ctx: ctx, cancel: cancel, controller: controller, } if r == common.CONSUMER { // only consumer have to start informer factory c.controller.Run() } return c, nil } // Create // create k/v pair in watcher-set func (c *Client) Create(k, v string) error { // the read current pod must be lock, protect every // create operation can be atomic c.lock.Lock() defer c.lock.Unlock() if err := c.controller.addAnnotationForCurrentPod(k, v); err != nil { return perrors.WithMessagef(err, "add annotation @key = %s @value = %s", k, v) } logger.Debugf("put the @key = %s @value = %s success", k, v) return nil } // GetChildren // get k children list from kubernetes-watcherSet func (c *Client) GetChildren(k string) ([]string, []string, error) { objectList, err := c.controller.watcherSet.Get(k, true) if err != nil { return nil, nil, perrors.WithMessagef(err, "get children from watcherSet on (%s)", k) } var kList []string var vList []string for _, o := range objectList { kList = append(kList, o.Key) vList = append(vList, o.Value) } return kList, vList, nil } // Watch // watch on spec key func (c *Client) Watch(k string) (<-chan *WatcherEvent, <-chan struct{}, error) { w, err := c.controller.watcherSet.Watch(k, false) if err != nil { return nil, nil, perrors.WithMessagef(err, "watch on (%s)", k) } return w.ResultChan(), w.done(), nil } // Watch // watch on spec prefix func (c *Client) WatchWithPrefix(prefix string) (<-chan *WatcherEvent, <-chan struct{}, error) { w, err := c.controller.watcherSet.Watch(prefix, true) if err != nil { return nil, nil, perrors.WithMessagef(err, "watch on prefix (%s)", prefix) } return w.ResultChan(), w.done(), nil } // Valid // Valid the client // if return false, the client is die func (c *Client) Valid() bool { select { case <-c.Done(): return false default: } c.lock.RLock() defer c.lock.RUnlock() return c.controller != nil } // Done // read the client status func (c *Client) Done() <-chan struct{} { return c.ctx.Done() } // Stop // read the client status func (c *Client) Close() { select { case <-c.ctx.Done(): //already stopped return default: } c.cancel() // the client ctx be canceled // will trigger the watcherSet watchers all stopped // so, just wait } // ValidateClient // validate the kubernetes client func ValidateClient(container clientFacade) error { client := container.Client() // new Client if client == nil || client.Valid() { newClient, err := newClient(container.GetUrl()) if err != nil { logger.Warnf("new kubernetes client (namespace{%s}: %v)", "", err) return perrors.WithMessagef(err, "new kubernetes client (:%+v)", "") } container.SetClient(newClient) } return nil } // NewMockClient // export for registry package test func NewMockClient(podList *v1.PodList) (*Client, error) { ctx, cancel := context.WithCancel(context.Background()) controller, err := newDubboRegistryController(ctx, common.CONSUMER, func() (kubernetes.Interface, error) { return fake.NewSimpleClientset(podList), nil }) if err != nil { return nil, perrors.WithMessage(err, "new dubbo-registry controller") } c := &Client{ ctx: ctx, cancel: cancel, controller: controller, } c.controller.Run() return c, nil }