Skip to content
Snippets Groups Projects
registry.go 8.4 KiB
Newer Older
vito.he's avatar
vito.he committed
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

lzp0412's avatar
lzp0412 committed
package nacos

import (
	"bytes"
flycash's avatar
flycash committed
	"net"
lzp0412's avatar
lzp0412 committed
	"strconv"
	"strings"
	"time"
import (
flycash's avatar
flycash committed
	"github.com/nacos-group/nacos-sdk-go/clients"
	"github.com/nacos-group/nacos-sdk-go/clients/naming_client"
	nacosConstant "github.com/nacos-group/nacos-sdk-go/common/constant"
lzp0412's avatar
lzp0412 committed
	"github.com/nacos-group/nacos-sdk-go/vo"
	perrors "github.com/pkg/errors"
)
lzp0412's avatar
lzp0412 committed

import (
lzp0412's avatar
lzp0412 committed
	"github.com/apache/dubbo-go/common"
	"github.com/apache/dubbo-go/common/constant"
	"github.com/apache/dubbo-go/common/extension"
	"github.com/apache/dubbo-go/common/logger"
lzp0412's avatar
lzp0412 committed
	"github.com/apache/dubbo-go/registry"
)

var (
	localIP = ""
)

const (
flycash's avatar
flycash committed
	// RegistryConnDelay registry connection delay
	RegistryConnDelay = 3
)

lzp0412's avatar
lzp0412 committed
func init() {
	localIP = common.GetLocalIp()
lizhipeng1's avatar
lizhipeng1 committed
	extension.SetRegistry(constant.NACOS_KEY, newNacosRegistry)
lzp0412's avatar
lzp0412 committed
}

type nacosRegistry struct {
flycash's avatar
flycash committed
	*common.URL
	namingClient naming_client.INamingClient
haohongfan's avatar
haohongfan committed
	registryUrls []*common.URL
lzp0412's avatar
lzp0412 committed
}

haohongfan's avatar
haohongfan committed
func getCategory(url *common.URL) string {
lzp0412's avatar
lzp0412 committed
	role, _ := strconv.Atoi(url.GetParam(constant.ROLE_KEY, strconv.Itoa(constant.NACOS_DEFAULT_ROLETYPE)))
	category := common.DubboNodes[role]
	return category
}

haohongfan's avatar
haohongfan committed
func getServiceName(url *common.URL) string {
lzp0412's avatar
lzp0412 committed
	var buffer bytes.Buffer

	buffer.Write([]byte(getCategory(url)))
	appendParam(&buffer, url, constant.INTERFACE_KEY)
	appendParam(&buffer, url, constant.VERSION_KEY)
	appendParam(&buffer, url, constant.GROUP_KEY)
	return buffer.String()
}

haohongfan's avatar
haohongfan committed
func appendParam(target *bytes.Buffer, url *common.URL, key string) {
lzp0412's avatar
lzp0412 committed
	value := url.GetParam(key, "")
	if strings.TrimSpace(value) != "" {
		target.Write([]byte(constant.NACOS_SERVICE_NAME_SEPARATOR))
		target.Write([]byte(value))
	}
}

haohongfan's avatar
haohongfan committed
func createRegisterParam(url *common.URL, serviceName string) vo.RegisterInstanceParam {
lzp0412's avatar
lzp0412 committed
	category := getCategory(url)
vito.he's avatar
vito.he committed
	params := make(map[string]string)

	url.RangeParams(func(key, value string) bool {
		params[key] = value
		return true
	})

lzp0412's avatar
lzp0412 committed
	params[constant.NACOS_CATEGORY_KEY] = category
	params[constant.NACOS_PROTOCOL_KEY] = url.Protocol
	params[constant.NACOS_PATH_KEY] = url.Path
lizhipeng1's avatar
lizhipeng1 committed
	if len(url.Ip) == 0 {
lzp0412's avatar
lzp0412 committed
		url.Ip = localIP
	}
lizhipeng1's avatar
lizhipeng1 committed
	if len(url.Port) == 0 || url.Port == "0" {
lzp0412's avatar
lzp0412 committed
		url.Port = "80"
	}
	port, _ := strconv.Atoi(url.Port)
	instance := vo.RegisterInstanceParam{
		Ip:          url.Ip,
		Port:        uint64(port),
		Metadata:    params,
		Weight:      1,
		Enable:      true,
		Healthy:     true,
		Ephemeral:   true,
		ServiceName: serviceName,
	}
	return instance
}

邹毅贤's avatar
邹毅贤 committed
// Register will register the service @url to its nacos registry center
haohongfan's avatar
haohongfan committed
func (nr *nacosRegistry) Register(url *common.URL) error {
lzp0412's avatar
lzp0412 committed
	serviceName := getServiceName(url)
	param := createRegisterParam(url, serviceName)
	isRegistry, err := nr.namingClient.RegisterInstance(param)
	if err != nil {
		return err
	}
	if !isRegistry {
lizhipeng1's avatar
lizhipeng1 committed
		return perrors.New("registry [" + serviceName + "] to  nacos failed")
lzp0412's avatar
lzp0412 committed
	}
	nr.registryUrls = append(nr.registryUrls, url)
	return nil
}

haohongfan's avatar
haohongfan committed
func createDeregisterParam(url *common.URL, serviceName string) vo.DeregisterInstanceParam {
	if len(url.Ip) == 0 {
		url.Ip = localIP
	}
	if len(url.Port) == 0 || url.Port == "0" {
		url.Port = "80"
	}
	port, _ := strconv.Atoi(url.Port)
	return vo.DeregisterInstanceParam{
		Ip:          url.Ip,
		Port:        uint64(port),
		ServiceName: serviceName,
		Ephemeral:   true,
	}
}

haohongfan's avatar
haohongfan committed
func (nr *nacosRegistry) DeRegister(url *common.URL) error {
	serviceName := getServiceName(url)
	param := createDeregisterParam(url, serviceName)
	isDeRegistry, err := nr.namingClient.DeregisterInstance(param)
	if err != nil {
		return err
	}
	if !isDeRegistry {
		return perrors.New("DeRegistry [" + serviceName + "] to nacos failed")
	}
lzp0412's avatar
lzp0412 committed
	return nil
}

haohongfan's avatar
haohongfan committed
func (nr *nacosRegistry) UnRegister(conf *common.URL) error {
邹毅贤's avatar
邹毅贤 committed
	return perrors.New("UnRegister is not support in nacosRegistry")
func (nr *nacosRegistry) subscribe(conf *common.URL) (registry.Listener, error) {
haohongfan's avatar
haohongfan committed
	return NewNacosListener(conf, nr.namingClient)
flycash's avatar
flycash committed
// subscribe from registry
邹毅贤's avatar
邹毅贤 committed
func (nr *nacosRegistry) Subscribe(url *common.URL, notifyListener registry.NotifyListener) error {
	role, _ := strconv.Atoi(nr.URL.GetParam(constant.ROLE_KEY, ""))
	if role != common.CONSUMER {
		return nil
	}

	for {
高辛格's avatar
高辛格 committed
		if !nr.IsAvailable() {
			logger.Warnf("event listener game over.")
邹毅贤's avatar
邹毅贤 committed
			return perrors.New("nacosRegistry is not available.")
高辛格's avatar
高辛格 committed
		listener, err := nr.subscribe(url)
		if err != nil {
高辛格's avatar
高辛格 committed
			if !nr.IsAvailable() {
				logger.Warnf("event listener game over.")
邹毅贤's avatar
邹毅贤 committed
				return err
			}
			logger.Warnf("getListener() = err:%v", perrors.WithStack(err))
			time.Sleep(time.Duration(RegistryConnDelay) * time.Second)
			continue
		}

		for {
AlexStocks's avatar
AlexStocks committed
			serviceEvent, err := listener.Next()
			if err != nil {
				logger.Warnf("Selector.watch() = error{%v}", perrors.WithStack(err))
				listener.Close()
邹毅贤's avatar
邹毅贤 committed
				return err
AlexStocks's avatar
AlexStocks committed
			logger.Infof("update begin, service event: %v", serviceEvent.String())
			notifyListener.Notify(serviceEvent)
高辛格's avatar
高辛格 committed

// UnSubscribe :
邹毅贤's avatar
邹毅贤 committed
func (nr *nacosRegistry) UnSubscribe(url *common.URL, notifyListener registry.NotifyListener) error {
	return perrors.New("UnSubscribe not support in nacosRegistry")
高辛格's avatar
高辛格 committed

邹毅贤's avatar
邹毅贤 committed
// GetUrl gets its registration URL
haohongfan's avatar
haohongfan committed
func (nr *nacosRegistry) GetUrl() *common.URL {
	return nr.URL
邹毅贤's avatar
邹毅贤 committed
// IsAvailable determines nacos registry center whether it is available
lzp0412's avatar
lzp0412 committed
func (nr *nacosRegistry) IsAvailable() bool {
flycash's avatar
flycash committed
	// TODO
lzp0412's avatar
lzp0412 committed
	return true
}

邹毅贤's avatar
邹毅贤 committed
// nolint
lzp0412's avatar
lzp0412 committed
func (nr *nacosRegistry) Destroy() {
	for _, url := range nr.registryUrls {
		err := nr.DeRegister(url)
lzp0412's avatar
lzp0412 committed
		logger.Infof("DeRegister Nacos URL:%+v", url)
		if err != nil {
lzp0412's avatar
lzp0412 committed
			logger.Errorf("Deregister URL:%+v err:%v", url, err.Error())
lzp0412's avatar
lzp0412 committed
	return
flycash's avatar
flycash committed

// newNacosRegistry will create new instance
func newNacosRegistry(url *common.URL) (registry.Registry, error) {
	nacosConfig, err := getNacosConfig(url)
	if err != nil {
		return &nacosRegistry{}, err
	}
	client, err := clients.CreateNamingClient(nacosConfig)
	if err != nil {
		return &nacosRegistry{}, err
	}
haohongfan's avatar
haohongfan committed
	tmpRegistry := &nacosRegistry{
flycash's avatar
flycash committed
		URL:          url,
		namingClient: client,
haohongfan's avatar
haohongfan committed
		registryUrls: []*common.URL{},
flycash's avatar
flycash committed
	}
haohongfan's avatar
haohongfan committed
	return tmpRegistry, nil
flycash's avatar
flycash committed
}

// getNacosConfig will return the nacos config
// TODO support RemoteRef
func getNacosConfig(url *common.URL) (map[string]interface{}, error) {
	if url == nil {
		return nil, perrors.New("url is empty!")
	}
	if len(url.Location) == 0 {
		return nil, perrors.New("url.location is empty!")
	}
	configMap := make(map[string]interface{}, 2)

	addresses := strings.Split(url.Location, ",")
	serverConfigs := make([]nacosConstant.ServerConfig, 0, len(addresses))
	for _, addr := range addresses {
		ip, portStr, err := net.SplitHostPort(addr)
		if err != nil {
			return nil, perrors.WithMessagef(err, "split [%s] ", addr)
		}
		port, _ := strconv.Atoi(portStr)
		serverConfigs = append(serverConfigs, nacosConstant.ServerConfig{
			IpAddr: ip,
			Port:   uint64(port),
		})
	}
	configMap[nacosConstant.KEY_SERVER_CONFIGS] = serverConfigs
flycash's avatar
flycash committed

	var clientConfig nacosConstant.ClientConfig
	timeout, err := time.ParseDuration(url.GetParam(constant.REGISTRY_TIMEOUT_KEY, constant.DEFAULT_REG_TIMEOUT))
	if err != nil {
		return nil, err
	}
	clientConfig.TimeoutMs = uint64(timeout.Seconds() * 1000)
	clientConfig.ListenInterval = 2 * clientConfig.TimeoutMs
	clientConfig.CacheDir = url.GetParam(constant.NACOS_CACHE_DIR_KEY, "")
	clientConfig.LogDir = url.GetParam(constant.NACOS_LOG_DIR_KEY, "")
	clientConfig.Endpoint = url.GetParam(constant.NACOS_ENDPOINT, "")
	clientConfig.NamespaceId = url.GetParam(constant.NACOS_NAMESPACE_ID, "")

	//enable local cache when nacos can not connect.
	notLoadCache, err := strconv.ParseBool(url.GetParam(constant.NACOS_NOT_LOAD_LOCAL_CACHE, "false"))
	if err != nil {
		logger.Errorf("ParseBool - error: %v", err)
		notLoadCache = false
	}
	clientConfig.NotLoadCacheAtStart = notLoadCache

	configMap[nacosConstant.KEY_CLIENT_CONFIG] = clientConfig
flycash's avatar
flycash committed

	return configMap, nil
}