Skip to content
Snippets Groups Projects
registry.go 9.67 KiB
Newer Older
scott.wang's avatar
scott.wang committed
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package kubernetes

import (
	"fmt"
	"net/url"
	"os"
	"path"
	"strconv"
	"strings"
	"sync"
	"time"
)

import (
	gxnet "github.com/dubbogo/gost/net"
	perrors "github.com/pkg/errors"
)

import (
	"github.com/apache/dubbo-go/common"
	"github.com/apache/dubbo-go/common/constant"
	"github.com/apache/dubbo-go/common/extension"
	"github.com/apache/dubbo-go/common/logger"
	"github.com/apache/dubbo-go/registry"
	"github.com/apache/dubbo-go/remoting/kubernetes"
)

var (
	processID = ""
	localIP   = ""
)

const (
	Name              = "kubernetes"
	RegistryConnDelay = 3
)

func init() {
	processID = fmt.Sprintf("%d", os.Getpid())
	localIP, _ = gxnet.GetLocalIP()
	extension.SetRegistry(Name, newKubernetesRegistry)
}

type kubernetesRegistry struct {
	*common.URL
	birth int64 // time of file birth, seconds since Epoch; 0 if unknown
scott.wang's avatar
scott.wang committed
	registry.BaseRegistry

	cltLock  sync.Mutex
	client   *kubernetes.Client
	services map[string]common.URL // service name + protocol -> service config

	listenerLock   sync.Mutex
	listener       *kubernetes.EventListener
	dataListener   *dataListener
	configListener *configurationListener

	wg        sync.WaitGroup // wg+done for kubernetes client restart
	closeOnce sync.Once      // protect the done
	done      chan struct{}
}

func (r *kubernetesRegistry) Client() *kubernetes.Client {
	return r.client
}
func (r *kubernetesRegistry) SetClient(client *kubernetes.Client) {
	r.client = client
}
func (r *kubernetesRegistry) ClientLock() *sync.Mutex {
	return &r.cltLock
}
func (r *kubernetesRegistry) WaitGroup() *sync.WaitGroup {
	return &r.wg
}
func (r *kubernetesRegistry) GetDone() chan struct{} {
	return r.done
}
func (r *kubernetesRegistry) RestartCallBack() bool {

	services := []common.URL{}
	for _, confIf := range r.services {
		services = append(services, confIf)
	}

	for _, confIf := range services {
		err := r.Register(confIf)
		if err != nil {
			logger.Errorf("(kubernetesProviderRegistry)register(conf{%#v}) = error{%#v}",
				confIf, perrors.WithStack(err))
scott.wang's avatar
scott.wang committed
			return false
		}
		logger.Infof("success to re-register service :%v", confIf.Key())
	}
scott.wang's avatar
scott.wang committed
	return true
}

func newKubernetesRegistry(url *common.URL) (registry.Registry, error) {

	r := &kubernetesRegistry{
		URL:      url,
		birth:    time.Now().UnixNano(),
		done:     make(chan struct{}),
		services: make(map[string]common.URL),
	}

	if err := kubernetes.ValidateClient(r); err != nil {
		return nil, err
	}

	r.wg.Add(1)
	go kubernetes.HandleClientRestart(r)

	r.listener = kubernetes.NewEventListener(r.client)
	r.configListener = NewConfigurationListener(r)
	r.dataListener = NewRegistryDataListener(r.configListener)

	return r, nil
}

func (r *kubernetesRegistry) GetUrl() common.URL {
	return *r.URL
}

func (r *kubernetesRegistry) IsAvailable() bool {

	select {
	case <-r.done:
		return false
	default:
		return true
	}
}

func (r *kubernetesRegistry) Destroy() {

	if r.configListener != nil {
		r.configListener.Close()
	}
	r.stop()
}

func (r *kubernetesRegistry) stop() {

	// close will be call concurrent
	r.closeOnce.Do(func() {
		close(r.done)
	})

	// close current client
	r.client.Close()

	r.cltLock.Lock()
	r.client = nil
	r.services = nil
	r.cltLock.Unlock()
}

func (r *kubernetesRegistry) Register(svc common.URL) error {

	role, err := strconv.Atoi(r.URL.GetParam(constant.ROLE_KEY, ""))
	if err != nil {
		return perrors.WithMessage(err, "get registry role")
	}

	r.cltLock.Lock()
	if _, ok := r.services[svc.Key()]; ok {
		r.cltLock.Unlock()
		return perrors.New(fmt.Sprintf("Path{%s} has been registered", svc.Path))
	}
	r.cltLock.Unlock()

	switch role {
	case common.PROVIDER:
		logger.Debugf("(provider register )Register(conf{%#v})", svc)
		if err := r.registerProvider(svc); err != nil {
			return perrors.WithMessage(err, "register provider")
		}
	case common.CONSUMER:
		logger.Debugf("(consumer register )Register(conf{%#v})", svc)
		if err := r.registerConsumer(svc); err != nil {
			return perrors.WithMessage(err, "register consumer")
		}
	default:
		return perrors.New(fmt.Sprintf("unknown role %d", role))
	}

	r.cltLock.Lock()
	r.services[svc.Key()] = svc
	r.cltLock.Unlock()
	return nil
}

func (r *kubernetesRegistry) createDirIfNotExist(k string) error {

	var tmpPath string
	for _, str := range strings.Split(k, "/")[1:] {
		tmpPath = path.Join(tmpPath, "/", str)
		if err := r.client.Create(tmpPath, ""); err != nil {
			return perrors.WithMessagef(err, "create path %s in kubernetes", tmpPath)
		}
	}

	return nil
}

func (r *kubernetesRegistry) registerConsumer(svc common.URL) error {

	consumersNode := fmt.Sprintf("/dubbo/%s/%s", svc.Service(), common.DubboNodes[common.CONSUMER])
	if err := r.createDirIfNotExist(consumersNode); err != nil {
		logger.Errorf("kubernetes client create path %s: %v", consumersNode, err)
		return perrors.WithMessage(err, "kubernetes create consumer nodes")
	}
sxllwx's avatar
sxllwx committed

	// NOTICE kubernetes && etcdv3 not need create provider metadata dir in consumer logic
	//providersNode := fmt.Sprintf("/dubbo/%s/%s", svc.Service(), common.DubboNodes[common.PROVIDER])
	//if err := r.createDirIfNotExist(providersNode); err != nil {
	//	return perrors.WithMessage(err, "create provider node")
	//}

	params := url.Values{}

	params.Add("protocol", svc.Protocol)

	params.Add("category", (common.RoleType(common.CONSUMER)).String())
	params.Add("dubbo", "dubbogo-consumer-"+constant.Version)

	encodedURL := url.QueryEscape(fmt.Sprintf("consumer://%s%s?%s", localIP, svc.Path, params.Encode()))
	dubboPath := fmt.Sprintf("/dubbo/%s/%s", svc.Service(), (common.RoleType(common.CONSUMER)).String())
	if err := r.client.Create(path.Join(dubboPath, encodedURL), ""); err != nil {
		return perrors.WithMessagef(err, "create k/v in kubernetes (path:%s, url:%s)", dubboPath, encodedURL)
	}

	return nil
}

func (r *kubernetesRegistry) registerProvider(svc common.URL) error {

	if len(svc.Path) == 0 || len(svc.Methods) == 0 {
		return perrors.New(fmt.Sprintf("service path %s or service method %s", svc.Path, svc.Methods))
	}

	var (
		urlPath    string
		encodedURL string
		dubboPath  string
	)

	providersNode := fmt.Sprintf("/dubbo/%s/%s", svc.Service(), common.DubboNodes[common.PROVIDER])
	if err := r.createDirIfNotExist(providersNode); err != nil {
		return perrors.WithMessage(err, "create provider node")
	}

	params := url.Values{}

	svc.RangeParams(func(key, value string) bool {
		params[key] = []string{value}
		return true
	})
	params.Add("pid", processID)
	params.Add("ip", localIP)
	params.Add("anyhost", "true")
	params.Add("category", (common.RoleType(common.PROVIDER)).String())
	params.Add("dubbo", "dubbo-provider-golang-"+constant.Version)
	params.Add("side", (common.RoleType(common.PROVIDER)).Role())

	logger.Debugf("provider url params:%#v", params)
	var host string
	if len(svc.Ip) == 0 {
		host = localIP + ":" + svc.Port
	} else {
		host = svc.Ip + ":" + svc.Port
	}

	urlPath = svc.Path

	encodedURL = url.QueryEscape(fmt.Sprintf("%s://%s%s?%s", svc.Protocol, host, urlPath, params.Encode()))
	dubboPath = fmt.Sprintf("/dubbo/%s/%s", svc.Service(), (common.RoleType(common.PROVIDER)).String())

	if err := r.client.Create(path.Join(dubboPath, encodedURL), ""); err != nil {
		return perrors.WithMessagef(err, "create k/v in kubernetes (path:%s, url:%s)", dubboPath, encodedURL)
	}

	return nil
}

func (r *kubernetesRegistry) subscribe(svc *common.URL) (registry.Listener, error) {

	var (
		configListener *configurationListener
	)

	r.listenerLock.Lock()
	configListener = r.configListener
	r.listenerLock.Unlock()
	if r.listener == nil {
		r.cltLock.Lock()
		client := r.client
		r.cltLock.Unlock()
		if client == nil {
			return nil, perrors.New("kubernetes client broken")
		}

		// new client & listener
		listener := kubernetes.NewEventListener(r.client)

		r.listenerLock.Lock()
		// NOTICE:
		// double-check the listener
		// if r.listener already be assigned, discard the new value
scott.wang's avatar
scott.wang committed
		if r.listener == nil {
			r.listener = listener
		}
		r.listenerLock.Unlock()
	}

	//register the svc to dataListener
	r.dataListener.AddInterestedURL(svc)
	for _, v := range strings.Split(svc.GetParam(constant.CATEGORY_KEY, constant.DEFAULT_CATEGORY), ",") {
		go r.listener.ListenServiceEvent(fmt.Sprintf("/dubbo/%s/"+v, svc.Service()), r.dataListener)
	}

	return configListener, nil
}

//subscribe from registry
func (r *kubernetesRegistry) Subscribe(url *common.URL, notifyListener registry.NotifyListener) {
	for {
		if !r.IsAvailable() {
			logger.Warnf("event listener game over.")
			return
		}

		listener, err := r.subscribe(url)
		if err != nil {
			if !r.IsAvailable() {
				logger.Warnf("event listener game over.")
				return
			}
			logger.Warnf("getListener() = err:%v", perrors.WithStack(err))
			time.Sleep(time.Duration(RegistryConnDelay) * time.Second)
			continue
		}

		for {
			if serviceEvent, err := listener.Next(); err != nil {
				logger.Warnf("Selector.watch() = error{%v}", perrors.WithStack(err))
				listener.Close()
				return
			} else {
				logger.Infof("update begin, service event: %v", serviceEvent.String())
				notifyListener.Notify(serviceEvent)
			}

		}

	}
}