Skip to content
Snippets Groups Projects
registry.go 5.63 KiB
Newer Older
scott.wang's avatar
scott.wang committed
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package kubernetes

import (
	"fmt"
	"os"
	"path"
	"strings"
	"sync"
	"github.com/dubbogo/getty"
scott.wang's avatar
scott.wang committed
	"github.com/dubbogo/gost/net"
	perrors "github.com/pkg/errors"
	k8s "k8s.io/client-go/kubernetes"
)

import (
	"github.com/apache/dubbo-go/common"
	"github.com/apache/dubbo-go/common/constant"
	"github.com/apache/dubbo-go/common/extension"
	"github.com/apache/dubbo-go/common/logger"
	"github.com/apache/dubbo-go/registry"
	"github.com/apache/dubbo-go/remoting/kubernetes"
)

var (
	processID = ""
	localIP   = ""
)

const (
	Name         = "kubernetes"
	ConnDelay    = 3
	MaxFailTimes = 15
)

func init() {
	processID = fmt.Sprintf("%d", os.Getpid())
	localIP, _ = gxnet.GetLocalIP()
	extension.SetRegistry(Name, newKubernetesRegistry)
}

type kubernetesRegistry struct {
scott.wang's avatar
scott.wang committed
	registry.BaseRegistry
	cltLock        sync.RWMutex
scott.wang's avatar
scott.wang committed
	client         *kubernetes.Client
	listenerLock   sync.Mutex
	listener       *kubernetes.EventListener
	dataListener   *dataListener
	configListener *configurationListener
}

func (r *kubernetesRegistry) Client() *kubernetes.Client {
	r.cltLock.RLock()
	client := r.client
	r.cltLock.RUnlock()
	return client
}
func (r *kubernetesRegistry) SetClient(client *kubernetes.Client) {
	r.cltLock.Lock()
	r.client = client
	r.cltLock.Unlock()
scott.wang's avatar
scott.wang committed
func (r *kubernetesRegistry) CloseAndNilClient() {
	r.client.Close()
	r.client = nil
}

scott.wang's avatar
scott.wang committed
func (r *kubernetesRegistry) CloseListener() {
scott.wang's avatar
scott.wang committed
	if r.configListener != nil {
		r.configListener.Close()
	r.configListener = nil
	r.cltLock.Unlock()
scott.wang's avatar
scott.wang committed
func (r *kubernetesRegistry) CreatePath(k string) error {
scott's avatar
scott committed
	if err := r.client.Create(k, ""); err != nil {
		return perrors.WithMessagef(err, "create path %s in kubernetes", k)
scott.wang's avatar
scott.wang committed
func (r *kubernetesRegistry) DoRegister(root string, node string) error {
	return r.client.Create(path.Join(root, node), "")
scott.wang's avatar
scott.wang committed
func (r *kubernetesRegistry) DoSubscribe(svc *common.URL) (registry.Listener, error) {

	var (
		configListener *configurationListener
	)

	r.listenerLock.Lock()
	configListener = r.configListener
	r.listenerLock.Unlock()
	if r.listener == nil {
		r.cltLock.Lock()
		client := r.client
		r.cltLock.Unlock()
		if client == nil {
			return nil, perrors.New("kubernetes client broken")
		}

		// new client & listener
		listener := kubernetes.NewEventListener(r.client)

		r.listenerLock.Lock()
scott.wang's avatar
scott.wang committed
		r.listener = listener
		r.listenerLock.Unlock()
	}

	//register the svc to dataListener
	r.dataListener.AddInterestedURL(svc)
	for _, v := range strings.Split(svc.GetParam(constant.CATEGORY_KEY, constant.DEFAULT_CATEGORY), ",") {
		go r.listener.ListenServiceEvent(fmt.Sprintf("/dubbo/%s/"+v, svc.Service()), r.dataListener)
	}

	return configListener, nil
}

scott.wang's avatar
scott.wang committed
func (r *kubernetesRegistry) InitListeners() {
	r.listener = kubernetes.NewEventListener(r.client)
	r.configListener = NewConfigurationListener(r)
	r.dataListener = NewRegistryDataListener(r.configListener)
}
scott.wang's avatar
scott.wang committed
func newKubernetesRegistry(url *common.URL) (registry.Registry, error) {
scott.wang's avatar
scott.wang committed
	// actually, kubernetes use in-cluster config,
	r := &kubernetesRegistry{}
scott.wang's avatar
scott.wang committed
	r.InitBaseRegistry(url, r)
scott.wang's avatar
scott.wang committed
	if err := kubernetes.ValidateClient(r); err != nil {
scott's avatar
scott committed
		return nil, perrors.WithStack(err)
scott.wang's avatar
scott.wang committed

	r.WaitGroup().Add(1)
	go r.HandleClientRestart()
scott.wang's avatar
scott.wang committed
	r.InitListeners()

	logger.Debugf("the kubernetes registry started")

	return r, nil
scott's avatar
scott committed
func newMockKubernetesRegistry(
	url *common.URL,
	namespace string,
	clientGeneratorFunc func() (k8s.Interface, error),
) (registry.Registry, error) {

	var err error

	r := &kubernetesRegistry{}

	r.InitBaseRegistry(url, r)
	r.client, err = kubernetes.NewMockClient(namespace, clientGeneratorFunc)
	if err != nil {
		return nil, perrors.WithMessage(err, "new mock client")
	}
	r.WaitGroup().Add(1) //zk client start successful, then wg +1
	go r.HandleClientRestart()
	r.InitListeners()
	return r, nil
}

func (r *kubernetesRegistry) HandleClientRestart() {

	var (
		err       error
		failTimes int
	)

	defer r.WaitGroup()
LOOP:
	for {
		select {
		case <-r.Done():
			logger.Warnf("(KubernetesProviderRegistry)reconnectKubernetes goroutine exit now...")
			break LOOP
			// re-register all services
		case <-r.Client().Done():
			r.Client().Close()
			r.SetClient(nil)

			// try to connect to kubernetes,
			failTimes = 0
			for {
				select {
				case <-r.Done():
					logger.Warnf("(KubernetesProviderRegistry)reconnectKubernetes Registry goroutine exit now...")
					break LOOP
				case <-getty.GetTimeWheel().After(timeSecondDuration(failTimes * ConnDelay)): // avoid connect frequent
				}
				err = kubernetes.ValidateClient(r)
				logger.Infof("Kubernetes ProviderRegistry.validateKubernetesClient = error{%#v}", perrors.WithStack(err))

				if err == nil {
					if r.RestartCallBack() {
						break
					}
				}
				failTimes++
				if MaxFailTimes <= failTimes {
					failTimes = MaxFailTimes
				}
			}
		}
	}
}

func timeSecondDuration(sec int) time.Duration {
	return time.Duration(sec) * time.Second
}