Skip to content
Snippets Groups Projects
client.go 5 KiB
Newer Older
scott.wang's avatar
scott.wang committed
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package kubernetes

import (
	"context"
	"sync"
)

import (
	perrors "github.com/pkg/errors"
	v1 "k8s.io/api/core/v1"
	"k8s.io/client-go/kubernetes"
	"k8s.io/client-go/kubernetes/fake"
	"github.com/apache/dubbo-go/common"
	"github.com/apache/dubbo-go/common/constant"
	"github.com/apache/dubbo-go/common/logger"
)

type Client struct {

	// manage the  client lifecycle
	ctx    context.Context
	cancel context.CancelFunc

scott.wang's avatar
scott.wang committed
	controller *dubboRegistryController
// newClient
// new a client for registry
func newClient(url common.URL) (*Client, error) {

	ctx, cancel := context.WithCancel(context.Background())

	// read type
	r, err := strconv.Atoi(url.GetParams().Get(constant.ROLE_KEY))
	if err != nil {
		return nil, perrors.WithMessage(err, "atoi role")
	}
	controller, err := newDubboRegistryController(ctx, common.RoleType(r), GetInClusterKubernetesClient)
	if err != nil {
scott.wang's avatar
scott.wang committed
		return nil, perrors.WithMessage(err, "new dubbo-registry controller")
scott.wang's avatar
scott.wang committed
	c := &Client{
		ctx:        ctx,
		cancel:     cancel,
		controller: controller,
	if r == common.CONSUMER {
scott.wang's avatar
scott.wang committed
		// only consumer have to start informer factory
		c.controller.startALLInformers()
	return c, nil
}

// Create
// create k/v pair in watcher-set
func (c *Client) Create(k, v string) error {

	// the read current pod must be lock, protect every
	// create operation can be atomic
	c.lock.Lock()
	defer c.lock.Unlock()

scott.wang's avatar
scott.wang committed
	if err := c.controller.addAnnotationForCurrentPod(k, v); err != nil {
		return perrors.WithMessagef(err, "add annotation @key = %s @value = %s", k, v)
	logger.Debugf("put the @key = %s @value = %s success", k, v)
	return nil
}

// GetChildren
scott's avatar
scott committed
// get k children list from kubernetes-watcherSet
func (c *Client) GetChildren(k string) ([]string, []string, error) {

scott.wang's avatar
scott.wang committed
	objectList, err := c.controller.watcherSet.Get(k, true)
	if err != nil {
scott's avatar
scott committed
		return nil, nil, perrors.WithMessagef(err, "get children from watcherSet on (%s)", k)
	}

	var kList []string
	var vList []string

	for _, o := range objectList {
		kList = append(kList, o.Key)
		vList = append(vList, o.Value)
	}

	return kList, vList, nil
}

// Watch
// watch on spec key
scott's avatar
scott committed
func (c *Client) Watch(k string) (<-chan *WatcherEvent, <-chan struct{}, error) {
scott.wang's avatar
scott.wang committed
	w, err := c.controller.watcherSet.Watch(k, false)
	if err != nil {
		return nil, nil, perrors.WithMessagef(err, "watch on (%s)", k)
	return w.ResultChan(), w.done(), nil
}

// Watch
// watch on spec prefix
scott's avatar
scott committed
func (c *Client) WatchWithPrefix(prefix string) (<-chan *WatcherEvent, <-chan struct{}, error) {
scott.wang's avatar
scott.wang committed
	w, err := c.controller.watcherSet.Watch(prefix, true)
	if err != nil {
		return nil, nil, perrors.WithMessagef(err, "watch on prefix (%s)", prefix)
	return w.ResultChan(), w.done(), nil
}

// Valid
// Valid the client
// if return false, the client is die
func (c *Client) Valid() bool {

	select {
	case <-c.Done():
		return false
	default:
	}
	defer c.lock.RUnlock()
scott.wang's avatar
scott.wang committed
	return c.controller != nil
}

// Done
// read the client status
func (c *Client) Done() <-chan struct{} {
	return c.ctx.Done()
}

// Stop
// read the client status
func (c *Client) Close() {

	select {
	case <-c.ctx.Done():
		//already stopped
		return
	default:
	}
	c.cancel()

	// the client ctx be canceled
scott's avatar
scott committed
	// will trigger the watcherSet watchers all stopped
	// so, just wait
}

// ValidateClient
// validate the kubernetes client
func ValidateClient(container clientFacade) error {

	client := container.Client()
	if client == nil || client.Valid() {

		newClient, err := newClient(container.GetUrl())
		if err != nil {
			logger.Warnf("new kubernetes client: %v)", err)
			return perrors.WithMessage(err, "new kubernetes client")
		}
		container.SetClient(newClient)
	}

	return nil
}
scott.wang's avatar
scott.wang committed

// NewMockClient
// export for registry package test
scott.wang's avatar
scott.wang committed
func NewMockClient(podList *v1.PodList) (*Client, error) {

	ctx, cancel := context.WithCancel(context.Background())
	controller, err := newDubboRegistryController(ctx, common.CONSUMER, func() (kubernetes.Interface, error) {
scott.wang's avatar
scott.wang committed
		return fake.NewSimpleClientset(podList), nil
	})
	if err != nil {
		return nil, perrors.WithMessage(err, "new dubbo-registry controller")
	}

	c := &Client{
		ctx:        ctx,
		cancel:     cancel,
		controller: controller,
	}

	c.controller.startALLInformers()
scott.wang's avatar
scott.wang committed
	return c, nil
scott.wang's avatar
scott.wang committed
}