Skip to content
Snippets Groups Projects
client.go 5.98 KiB
Newer Older
scott.wang's avatar
scott.wang committed
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package kubernetes

import (
	"context"
scott.wang's avatar
scott.wang committed
	v1 "k8s.io/api/core/v1"
	"k8s.io/client-go/kubernetes"
	"k8s.io/client-go/kubernetes/fake"
	"sync"
)

import (
	perrors "github.com/pkg/errors"
)

import (
	"github.com/apache/dubbo-go/common/logger"
)

type Client struct {

	// manage the  client lifecycle
	ctx    context.Context
	cancel context.CancelFunc

scott.wang's avatar
scott.wang committed
	controller *dubboRegistryController
// newClient
// new a client for registry
scott.wang's avatar
scott.wang committed
func newClient() (*Client, error) {

	ctx, cancel := context.WithCancel(context.Background())
scott.wang's avatar
scott.wang committed
	controller, err := newDubboRegistryController(ctx, GetInClusterKubernetesClient)
	if err != nil {
scott.wang's avatar
scott.wang committed
		return nil, perrors.WithMessage(err, "new dubbo-registry controller")
scott.wang's avatar
scott.wang committed
	c := &Client{
		ctx:        ctx,
		cancel:     cancel,
		controller: controller,
scott.wang's avatar
scott.wang committed
	c.controller.Run()

	return c, nil
}

// Create
// create k/v pair in watcher-set
func (c *Client) Create(k, v string) error {

	// the read current pod must be lock, protect every
	// create operation can be atomic
	c.lock.Lock()
	defer c.lock.Unlock()

scott.wang's avatar
scott.wang committed
	if err := c.controller.addAnnotationForCurrentPod(k, v); err != nil {
		return perrors.WithMessagef(err, "add annotation @key = %s @value = %s", k, v)
	logger.Debugf("put the @key = %s @value = %s success", k, v)
	return nil
}

// GetChildren
scott's avatar
scott committed
// get k children list from kubernetes-watcherSet
func (c *Client) GetChildren(k string) ([]string, []string, error) {

scott.wang's avatar
scott.wang committed
	objectList, err := c.controller.watcherSet.Get(k, true)
	if err != nil {
scott's avatar
scott committed
		return nil, nil, perrors.WithMessagef(err, "get children from watcherSet on (%s)", k)
	}

	var kList []string
	var vList []string

	for _, o := range objectList {
		kList = append(kList, o.Key)
		vList = append(vList, o.Value)
	}

	return kList, vList, nil
}

// Watch
// watch on spec key
scott's avatar
scott committed
func (c *Client) Watch(k string) (<-chan *WatcherEvent, <-chan struct{}, error) {
scott.wang's avatar
scott.wang committed
	w, err := c.controller.watcherSet.Watch(k, false)
	if err != nil {
		return nil, nil, perrors.WithMessagef(err, "watch on (%s)", k)
	return w.ResultChan(), w.done(), nil
}

// Watch
// watch on spec prefix
scott's avatar
scott committed
func (c *Client) WatchWithPrefix(prefix string) (<-chan *WatcherEvent, <-chan struct{}, error) {
scott.wang's avatar
scott.wang committed
	w, err := c.controller.watcherSet.Watch(prefix, true)
	if err != nil {
		return nil, nil, perrors.WithMessagef(err, "watch on prefix (%s)", prefix)
	return w.ResultChan(), w.done(), nil
}

// Valid
// Valid the client
// if return false, the client is die
func (c *Client) Valid() bool {

	select {
	case <-c.Done():
		return false
	default:
	}
	defer c.lock.RUnlock()
scott.wang's avatar
scott.wang committed
	return c.controller != nil
}

// Done
// read the client status
func (c *Client) Done() <-chan struct{} {
	return c.ctx.Done()
}

// Stop
// read the client status
func (c *Client) Close() {

	select {
	case <-c.ctx.Done():
		//already stopped
		return
	default:
	}
	c.cancel()

	// the client ctx be canceled
scott's avatar
scott committed
	// will trigger the watcherSet watchers all stopped
	// so, just wait
}

// ValidateClient
// validate the kubernetes client
func ValidateClient(container clientFacade) error {

	client := container.Client()
	if client == nil || client.Valid() {
scott.wang's avatar
scott.wang committed
		newClient, err := newClient()
		if err != nil {
scott.wang's avatar
scott.wang committed
			logger.Warnf("new kubernetes client (namespace{%s}: %v)", "", err)
			return perrors.WithMessagef(err, "new kubernetes client (:%+v)", "")
		}
		container.SetClient(newClient)
	}

	return nil
}
scott.wang's avatar
scott.wang committed

// NewMockClient
// export for registry package test
scott.wang's avatar
scott.wang committed
func NewMockClient(podList *v1.PodList) (*Client, error) {

	ctx, cancel := context.WithCancel(context.Background())
	controller, err := newDubboRegistryController(ctx, func() (kubernetes.Interface, error) {
		return fake.NewSimpleClientset(podList), nil
	})
	if err != nil {
		return nil, perrors.WithMessage(err, "new dubbo-registry controller")
	}

	c := &Client{
		ctx:        ctx,
		cancel:     cancel,
		controller: controller,
	}

	c.controller.Run()
	return c, nil
scott.wang's avatar
scott.wang committed
}

// newMockClient
// new a client for  test
//func newMockClient(namespace string, mockClientGenerator func() (kubernetes.Interface, error)) (*Client, error) {
//
//	rawClient, err := mockClientGenerator()
//	if err != nil {
//		return nil, perrors.WithMessage(err, "call mock generator")
//	}
//
//	currentPodName, err := getCurrentPodName()
//	if err != nil {
//		return nil, perrors.WithMessage(err, "get pod name")
//	}
//
//	ctx, cancel := context.WithCancel(context.Background())
//
//	c := &Client{
//		currentPodName: currentPodName,
//		ns:             namespace,
//		rawClient:      rawClient,
//		ctx:            ctx,
//		watcherSet:     newWatcherSet(ctx),
//		cancel:         cancel,
//	}
//
//	currentPod, err := c.initCurrentPod()
//	if err != nil {
//		return nil, perrors.WithMessage(err, "init current pod")
//	}
//
//	// record current status
//	c.currentPod = currentPod
//
//	// init the watcherSet by current pods
//	if err := c.initWatchSet(); err != nil {
//		return nil, perrors.WithMessage(err, "init watcherSet")
//	}
//
//	c.lastResourceVersion = c.currentPod.GetResourceVersion()
//
//	// start kubernetes watch loop
//	if err := c.watchPods(); err != nil {
//		return nil, perrors.WithMessage(err, "watch pods")
//	}
//
//	logger.Infof("init kubernetes registry client success @namespace = %q @Podname = %q", namespace, c.currentPod.Name)
//	return c, nil
//}