Skip to content
Snippets Groups Projects
registry.go 11.7 KiB
Newer Older
AlexStocks's avatar
AlexStocks committed
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
fangyincheng's avatar
fangyincheng committed

vito.he's avatar
vito.he committed
package zookeeper

import (
vito.he's avatar
vito.he committed
	"context"
vito.he's avatar
vito.he committed
	"fmt"
	"net/url"
	"os"
	"strconv"
	"strings"
vito.he's avatar
vito.he committed
	"sync"
	"time"
)

fangyincheng's avatar
fangyincheng committed
	perrors "github.com/pkg/errors"
	"github.com/samuel/go-zookeeper/zk"
vito.he's avatar
vito.he committed
)
	"github.com/apache/dubbo-go/common"
	"github.com/apache/dubbo-go/common/constant"
	"github.com/apache/dubbo-go/common/extension"
	"github.com/apache/dubbo-go/common/logger"
	"github.com/apache/dubbo-go/common/utils"
	"github.com/apache/dubbo-go/registry"
	"github.com/apache/dubbo-go/remoting/zookeeper"
vito.he's avatar
vito.he committed
)

vito.he's avatar
vito.he committed
const (
vito.he's avatar
vito.he committed
)

vito.he's avatar
vito.he committed
var (
	processID = ""
	localIP   = ""
)
vito.he's avatar
vito.he committed

func init() {
	processID = fmt.Sprintf("%d", os.Getpid())
	localIP, _ = utils.GetLocalIP()
	//plugins.PluggableRegistries["zookeeper"] = newZkRegistry
	extension.SetRegistry("zookeeper", newZkRegistry)
vito.he's avatar
vito.he committed
}

AlexStocks's avatar
AlexStocks committed
/////////////////////////////////////
// zookeeper registry
/////////////////////////////////////

type zkRegistry struct {
vito.he's avatar
vito.he committed
	context context.Context
vito.he's avatar
vito.he committed
	*common.URL
vito.he's avatar
vito.he committed
	birth int64          // time of file birth, seconds since Epoch; 0 if unknown
	wg    sync.WaitGroup // wg+done for zk restart
	done  chan struct{}

AlexStocks's avatar
AlexStocks committed
	cltLock  sync.Mutex
vito.he's avatar
vito.he committed
	services map[string]common.URL // service name + protocol -> service config
AlexStocks's avatar
AlexStocks committed

	listenerLock   sync.Mutex
	listener       *zookeeper.ZkEventListener
	dataListener   *RegistryDataListener
	configListener *RegistryConfigurationListener
vito.he's avatar
vito.he committed
	//for provider
	zkPath map[string]int // key = protocol://ip:port/interface
vito.he's avatar
vito.he committed
}

func newZkRegistry(url *common.URL) (registry.Registry, error) {
vito.he's avatar
vito.he committed
	var (
		err error
		r   *zkRegistry
vito.he's avatar
vito.he committed
	)

	r = &zkRegistry{
		URL:      url,
		birth:    time.Now().UnixNano(),
		done:     make(chan struct{}),
vito.he's avatar
vito.he committed
		services: make(map[string]common.URL),
vito.he's avatar
vito.he committed
	}

	err = zookeeper.ValidateZookeeperClient(r, zookeeper.WithZkName(RegistryZkClient))
vito.he's avatar
vito.he committed
	if err != nil {
		return nil, err
vito.he's avatar
vito.he committed
	}

	r.wg.Add(1)
vito.he's avatar
vito.he committed

	r.listener = zookeeper.NewZkEventListener(r.client)
	r.configListener = NewRegistryConfigurationListener(r.client, r)
	r.dataListener = NewRegistryDataListener(r.configListener)
vito.he's avatar
vito.he committed

vito.he's avatar
vito.he committed
	return r, nil
}
type Options struct {
	client *zookeeper.ZookeeperClient
}

type Option func(*Options)

func newMockZkRegistry(url *common.URL, opts ...zookeeper.Option) (*zk.TestCluster, *zkRegistry, error) {
	var (
		err error
		r   *zkRegistry
		c   *zk.TestCluster
		//event <-chan zk.Event
	)

	r = &zkRegistry{
		URL:      url,
		birth:    time.Now().UnixNano(),
		done:     make(chan struct{}),
vito.he's avatar
vito.he committed
		services: make(map[string]common.URL),
		zkPath:   make(map[string]int),
	}

	c, r.client, _, err = zookeeper.NewMockZookeeperClient("test", 15*time.Second, opts...)
	if err != nil {
		return nil, nil, err
	}
	r.wg.Add(1)
	r.listener = zookeeper.NewZkEventListener(r.client)
	r.configListener = NewRegistryConfigurationListener(r.client, r)
	r.dataListener = NewRegistryDataListener(r.configListener)

	return c, r, nil
}
func (r *zkRegistry) ZkClient() *zookeeper.ZookeeperClient {
	return r.client
}

func (r *zkRegistry) SetZkClient(client *zookeeper.ZookeeperClient) {
	r.client = client
}

func (r *zkRegistry) ZkClientLock() *sync.Mutex {
	return &r.cltLock
}

func (r *zkRegistry) WaitGroup() *sync.WaitGroup {
	return &r.wg
}

func (r *zkRegistry) GetDone() chan struct{} {
	return r.done
}

func (r *zkRegistry) GetUrl() common.URL {
AlexStocks's avatar
AlexStocks committed

func (r *zkRegistry) Destroy() {
	if r.configListener != nil {
		r.configListener.Close()
vito.he's avatar
vito.he committed
	close(r.done)
	r.wg.Wait()
	r.closeRegisters()
}

func (r *zkRegistry) RestartCallBack() bool {
vito.he's avatar
vito.he committed

	// copy r.services
	services := []common.URL{}
	for _, confIf := range r.services {
		services = append(services, confIf)
vito.he's avatar
vito.he committed
	}
vito.he's avatar
vito.he committed

	flag := true
	for _, confIf := range services {
		err := r.register(confIf)
		if err != nil {
			logger.Errorf("(ZkProviderRegistry)register(conf{%#v}) = error{%#v}",
				confIf, perrors.WithStack(err))
			flag = false
			break
vito.he's avatar
vito.he committed
		}
		logger.Infof("success to re-register service :%v", confIf.Key())
vito.he's avatar
vito.he committed
	}
vito.he's avatar
vito.he committed
}

func (r *zkRegistry) Register(conf common.URL) error {
	role, _ := strconv.Atoi(r.URL.GetParam(constant.ROLE_KEY, ""))
	switch role {
vito.he's avatar
vito.he committed
	case common.CONSUMER:
		r.cltLock.Lock()
		_, ok = r.services[conf.Key()]
		r.cltLock.Unlock()
		if ok {
fangyincheng's avatar
fangyincheng committed
			return perrors.Errorf("Path{%s} has been registered", conf.Path)
fangyincheng's avatar
fangyincheng committed
			return perrors.WithStack(err)
		}

		r.cltLock.Lock()
		r.services[conf.Key()] = conf
		r.cltLock.Unlock()
fangyincheng's avatar
fangyincheng committed
		logger.Debugf("(consumerZkConsumerRegistry)Register(conf{%#v})", conf)
vito.he's avatar
vito.he committed
	case common.PROVIDER:
		// Check if the service has been registered
AlexStocks's avatar
AlexStocks committed
		// Note the difference between consumer and consumerZookeeperRegistry (consumer use conf.Path).
		// Because the consumer wants to provide monitoring functions for the selector,
		// the provider allows multiple groups or versions of the same service to be registered.
vito.he's avatar
vito.he committed
		_, ok = r.services[conf.Key()]
fangyincheng's avatar
fangyincheng committed
			return perrors.Errorf("Path{%s} has been registered", conf.Key())
fangyincheng's avatar
fangyincheng committed
			return perrors.WithMessagef(err, "register(conf:%+v)", conf)
vito.he's avatar
vito.he committed
		r.services[conf.Key()] = conf
fangyincheng's avatar
fangyincheng committed
		logger.Debugf("(ZkProviderRegistry)Register(conf{%#v})", conf)
func (r *zkRegistry) register(c common.URL) error {
vito.he's avatar
vito.he committed
	var (
vito.he's avatar
vito.he committed
		params     url.Values
		rawURL     string
		encodedURL string
		dubboPath  string
		//conf       config.URL
vito.he's avatar
vito.he committed
	)
AlexStocks's avatar
AlexStocks committed

	err = zookeeper.ValidateZookeeperClient(r, zookeeper.WithZkName(RegistryZkClient))
vito.he's avatar
vito.he committed
	if err != nil {
fangyincheng's avatar
fangyincheng committed
		return perrors.WithStack(err)
vito.he's avatar
vito.he committed
	}
	params = url.Values{}
	for k, v := range c.Params {
		params[k] = v
	}
vito.he's avatar
vito.he committed

	params.Add("pid", processID)
	params.Add("ip", localIP)
	//params.Add("timeout", fmt.Sprintf("%d", int64(r.Timeout)/1e6))
	role, _ := strconv.Atoi(r.URL.GetParam(constant.ROLE_KEY, ""))
	switch role {
vito.he's avatar
vito.he committed
	case common.PROVIDER:
vito.he's avatar
vito.he committed

		if c.Path == "" || len(c.Methods) == 0 {
fangyincheng's avatar
fangyincheng committed
			return perrors.Errorf("conf{Path:%s, Methods:%s}", c.Path, c.Methods)
vito.he's avatar
vito.he committed
		}
		// 先创建服务下面的provider node
		dubboPath = fmt.Sprintf("/dubbo/%s/%s", c.Service(), common.DubboNodes[common.PROVIDER])
AlexStocks's avatar
AlexStocks committed
		r.cltLock.Lock()
vito.he's avatar
vito.he committed
		err = r.client.Create(dubboPath)
AlexStocks's avatar
AlexStocks committed
		r.cltLock.Unlock()
vito.he's avatar
vito.he committed
		if err != nil {
fangyincheng's avatar
fangyincheng committed
			logger.Errorf("zkClient.create(path{%s}) = error{%#v}", dubboPath, perrors.WithStack(err))
fangyincheng's avatar
fangyincheng committed
			return perrors.WithMessagef(err, "zkclient.Create(path:%s)", dubboPath)
vito.he's avatar
vito.he committed
		}
		params.Add("anyhost", "true")

AlexStocks's avatar
AlexStocks committed
		// Dubbo java consumer to start looking for the provider url,because the category does not match,
		// the provider will not find, causing the consumer can not start, so we use consumers.
vito.he's avatar
vito.he committed
		// DubboRole               = [...]string{"consumer", "", "", "provider"}
		// params.Add("category", (RoleType(PROVIDER)).Role())
vito.he's avatar
vito.he committed
		params.Add("category", (common.RoleType(common.PROVIDER)).String())
AlexStocks's avatar
AlexStocks committed
		params.Add("dubbo", "dubbo-provider-golang-"+constant.Version)
vito.he's avatar
vito.he committed

vito.he's avatar
vito.he committed
		params.Add("side", (common.RoleType(common.PROVIDER)).Role())
vito.he's avatar
vito.he committed

		if len(c.Methods) == 0 {
			params.Add("methods", strings.Join(c.Methods, ","))
vito.he's avatar
vito.he committed
		}
fangyincheng's avatar
fangyincheng committed
		logger.Debugf("provider zk url params:%#v", params)
vito.he's avatar
vito.he committed
		var host string
		if c.Ip == "" {
			host = localIP + ":" + c.Port
		} else {
			host = c.Ip + ":" + c.Port
		}
vito.he's avatar
vito.he committed

vito.he's avatar
vito.he committed
		rawURL = fmt.Sprintf("%s://%s%s?%s", c.Protocol, host, c.Path, params.Encode())
vito.he's avatar
vito.he committed
		encodedURL = url.QueryEscape(rawURL)

		// Print your own registration service providers.
		dubboPath = fmt.Sprintf("/dubbo/%s/%s", c.Service(), (common.RoleType(common.PROVIDER)).String())
fangyincheng's avatar
fangyincheng committed
		logger.Debugf("provider path:%s, url:%s", dubboPath, rawURL)
vito.he's avatar
vito.he committed

vito.he's avatar
vito.he committed
	case common.CONSUMER:
		dubboPath = fmt.Sprintf("/dubbo/%s/%s", c.Service(), common.DubboNodes[common.CONSUMER])
AlexStocks's avatar
AlexStocks committed
		r.cltLock.Lock()
vito.he's avatar
vito.he committed
		err = r.client.Create(dubboPath)
AlexStocks's avatar
AlexStocks committed
		r.cltLock.Unlock()
vito.he's avatar
vito.he committed
		if err != nil {
fangyincheng's avatar
fangyincheng committed
			logger.Errorf("zkClient.create(path{%s}) = error{%v}", dubboPath, perrors.WithStack(err))
fangyincheng's avatar
fangyincheng committed
			return perrors.WithStack(err)
vito.he's avatar
vito.he committed
		}
		dubboPath = fmt.Sprintf("/dubbo/%s/%s", c.Service(), common.DubboNodes[common.PROVIDER])
AlexStocks's avatar
AlexStocks committed
		r.cltLock.Lock()
vito.he's avatar
vito.he committed
		err = r.client.Create(dubboPath)
AlexStocks's avatar
AlexStocks committed
		r.cltLock.Unlock()
vito.he's avatar
vito.he committed
		if err != nil {
fangyincheng's avatar
fangyincheng committed
			logger.Errorf("zkClient.create(path{%s}) = error{%v}", dubboPath, perrors.WithStack(err))
fangyincheng's avatar
fangyincheng committed
			return perrors.WithStack(err)
vito.he's avatar
vito.he committed
		}

vito.he's avatar
vito.he committed
		params.Add("protocol", c.Protocol)
vito.he's avatar
vito.he committed
		params.Add("category", (common.RoleType(common.CONSUMER)).String())
AlexStocks's avatar
AlexStocks committed
		params.Add("dubbo", "dubbogo-consumer-"+constant.Version)
vito.he's avatar
vito.he committed

		rawURL = fmt.Sprintf("consumer://%s%s?%s", localIP, c.Path, params.Encode())
vito.he's avatar
vito.he committed
		encodedURL = url.QueryEscape(rawURL)

		dubboPath = fmt.Sprintf("/dubbo/%s/%s", c.Service(), (common.RoleType(common.CONSUMER)).String())
fangyincheng's avatar
fangyincheng committed
		logger.Debugf("consumer path:%s, url:%s", dubboPath, rawURL)
fangyincheng's avatar
fangyincheng committed
		return perrors.Errorf("@c{%v} type is not referencer or provider", c)
vito.he's avatar
vito.he committed
	}

	err = r.registerTempZookeeperNode(dubboPath, encodedURL)

	if err != nil {
fangyincheng's avatar
fangyincheng committed
		return perrors.WithMessagef(err, "registerTempZookeeperNode(path:%s, url:%s)", dubboPath, rawURL)
vito.he's avatar
vito.he committed
	}
	return nil
}

func (r *zkRegistry) registerTempZookeeperNode(root string, node string) error {
vito.he's avatar
vito.he committed
	var (
		err    error
		zkPath string
	)

AlexStocks's avatar
AlexStocks committed
	r.cltLock.Lock()
	defer r.cltLock.Unlock()
vito.he's avatar
vito.he committed
	err = r.client.Create(root)
	if err != nil {
fangyincheng's avatar
fangyincheng committed
		logger.Errorf("zk.Create(root{%s}) = err{%v}", root, perrors.WithStack(err))
fangyincheng's avatar
fangyincheng committed
		return perrors.WithStack(err)
vito.he's avatar
vito.he committed
	}
	zkPath, err = r.client.RegisterTemp(root, node)
	if err != nil {
pantianying's avatar
pantianying committed
		if strings.Contains(err.Error(), "node already exists") {
			logger.Warnf("RegisterTempNode(root{%s}, node{%s}) = error{%v}", root, node, perrors.WithStack(err))
		} else {
			logger.Errorf("RegisterTempNode(root{%s}, node{%s}) = error{%v}", root, node, perrors.WithStack(err))
		}
fangyincheng's avatar
fangyincheng committed
		return perrors.WithMessagef(err, "RegisterTempNode(root{%s}, node{%s})", root, node)
vito.he's avatar
vito.he committed
	}
fangyincheng's avatar
fangyincheng committed
	logger.Debugf("create a zookeeper node:%s", zkPath)
vito.he's avatar
vito.he committed

	return nil
}

func (r *zkRegistry) Subscribe(conf common.URL) (registry.Listener, error) {
	return r.getListener(conf)
}

func (r *zkRegistry) getListener(conf common.URL) (*RegistryConfigurationListener, error) {
		zkListener *RegistryConfigurationListener
	)

	r.listenerLock.Lock()
	r.listenerLock.Unlock()
	if r.listener == nil {
		r.cltLock.Lock()
		client := r.client
		r.cltLock.Unlock()
		if client == nil {
			return nil, perrors.New("zk connection broken")
		}
		// new client & listener
		listener := zookeeper.NewZkEventListener(r.client)
		r.listenerLock.Lock()
		r.listener = listener
		r.listenerLock.Unlock()
	}
	//Interested register to dataconfig.
	r.dataListener.AddInterestedURL(&conf)
	go r.listener.ListenServiceEvent(fmt.Sprintf("/dubbo/%s/providers", conf.Service()), r.dataListener)

	return zkListener, nil
}

func (r *zkRegistry) closeRegisters() {
AlexStocks's avatar
AlexStocks committed
	r.cltLock.Lock()
	defer r.cltLock.Unlock()
fangyincheng's avatar
fangyincheng committed
	logger.Infof("begin to close provider zk client")
	// Close the old client first to close the tmp node.
vito.he's avatar
vito.he committed
	r.client.Close()
	r.client = nil
	r.services = nil
}

func (r *zkRegistry) IsAvailable() bool {
vito.he's avatar
vito.he committed
	select {
	case <-r.done:
		return false
vito.he's avatar
vito.he committed
	}
}