Skip to content
Snippets Groups Projects
registry.go 10.7 KiB
Newer Older
vito.he's avatar
vito.he committed
package zookeeper

import (
vito.he's avatar
vito.he committed
vito.he's avatar
vito.he committed
vito.he's avatar
vito.he committed
vito.he's avatar
vito.he committed

vito.he's avatar
vito.he committed
	log ""
	jerrors ""
vito.he's avatar
vito.he committed
vito.he's avatar
vito.he committed
vito.he's avatar
vito.he committed

vito.he's avatar
vito.he committed
const (
	defaultTimeout    = int64(10e9)
	RegistryZkClient  = "zk registry"
	RegistryConnDelay = 3
vito.he's avatar
vito.he committed

vito.he's avatar
vito.he committed
var (
	processID = ""
	localIP   = ""
vito.he's avatar
vito.he committed

func init() {
	processID = fmt.Sprintf("%d", os.Getpid())
	localIP, _ = gxnet.GetLocalIP()
vito.he's avatar
vito.he committed
	//plugins.PluggableRegistries["zookeeper"] = NewZkRegistry
	extension.SetRegistry("zookeeper", NewZkRegistry)
vito.he's avatar
vito.he committed

AlexStocks's avatar
AlexStocks committed
// zookeeper registry

vito.he's avatar
vito.he committed
type ZkRegistry struct {
vito.he's avatar
vito.he committed
	context context.Context
vito.he's avatar
vito.he committed
	birth int64          // time of file birth, seconds since Epoch; 0 if unknown
	wg    sync.WaitGroup // wg+done for zk restart
	done  chan struct{}

AlexStocks's avatar
AlexStocks committed
	cltLock  sync.Mutex
	client   *zookeeperClient
	services map[string]config.URL // service name + protocol -> service config
AlexStocks's avatar
AlexStocks committed

vito.he's avatar
vito.he committed
	listenerLock sync.Mutex
	listener     *zkEventListener
AlexStocks's avatar
AlexStocks committed

vito.he's avatar
vito.he committed
	//for provider
	zkPath map[string]int // key = protocol://ip:port/interface
vito.he's avatar
vito.he committed

func NewZkRegistry(url *config.URL) (registry.Registry, error) {
vito.he's avatar
vito.he committed
	var (
		err error
		r   *ZkRegistry

	r = &ZkRegistry{
		URL:      url,
		birth:    time.Now().UnixNano(),
		done:     make(chan struct{}),
		services: make(map[string]config.URL),
		zkPath:   make(map[string]int),
vito.he's avatar
vito.he committed

	//if r.SubURL.Name == "" {
	//	r.SubURL.Name = RegistryZkClient
	//if r.Version == "" {
	//	r.Version = version.Version
vito.he's avatar
vito.he committed

	err = r.validateZookeeperClient()
	if err != nil {
		return nil, jerrors.Trace(err)

	go r.handleZkRestart()
vito.he's avatar
vito.he committed

	//if r.RoleType == registry.CONSUMER {
	//	r.wg.Add(1)
	//	go r.listen()
vito.he's avatar
vito.he committed

vito.he's avatar
vito.he committed
	return r, nil
AlexStocks's avatar
AlexStocks committed

vito.he's avatar
vito.he committed
func (r *ZkRegistry) Close() {

func (r *ZkRegistry) validateZookeeperClient() error {
	var (
		err error

	err = nil
AlexStocks's avatar
AlexStocks committed
	defer r.cltLock.Unlock()
vito.he's avatar
vito.he committed
	if r.client == nil {
vito.he's avatar
vito.he committed
		//in dubbp ,every registry only connect one node ,so this is []string{r.Address}
		timeout, err := time.ParseDuration(r.GetParam(constant.REGISTRY_TIMEOUT_KEY, constant.DEFAULT_REG_TIMEOUT))
		if err != nil {
			log.Error("timeout config %v is invalid ,err is %v",
				r.GetParam(constant.REGISTRY_TIMEOUT_KEY, constant.DEFAULT_REG_TIMEOUT), err.Error())
			return jerrors.Annotatef(err, "newZookeeperClient(address:%+v)", r.Location)
		r.client, err = newZookeeperClient(RegistryZkClient, []string{r.Location}, timeout)
vito.he's avatar
vito.he committed
		if err != nil {
			log.Warn("newZookeeperClient(name{%s}, zk addresss{%v}, timeout{%d}) = error{%v}",
				RegistryZkClient, r.Location, timeout.String(), err)
			return jerrors.Annotatef(err, "newZookeeperClient(address:%+v)", r.Location)
vito.he's avatar
vito.he committed
vito.he's avatar
vito.he committed
	if r.client.conn == nil {
		var event <-chan zk.Event
		r.client.conn, event, err = zk.Connect(r.client.zkAddrs, r.client.timeout)
		if err != nil {
			go r.client.handleZkEvent(event)
vito.he's avatar
vito.he committed

	return jerrors.Annotatef(err, "newZookeeperClient(address:%+v)", r.PrimitiveURL)
vito.he's avatar
vito.he committed

func (r *ZkRegistry) handleZkRestart() {
	var (
		err       error
		flag      bool
		failTimes int
		confIf    config.URL
		services  []config.URL
vito.he's avatar
vito.he committed

	defer r.wg.Done()
	for {
		select {
		case <-r.done:
			log.Warn("(ZkProviderRegistry)reconnectZkRegistry goroutine exit now...")
			break LOOP
			// re-register all services
		case <-r.client.done():
AlexStocks's avatar
AlexStocks committed
vito.he's avatar
vito.he committed
			r.client = nil
AlexStocks's avatar
AlexStocks committed
vito.he's avatar
vito.he committed

			// 接zk,直至成功
			failTimes = 0
			for {
				select {
				case <-r.done:
					log.Warn("(ZkProviderRegistry)reconnectZkRegistry goroutine exit now...")
					break LOOP
				case <-time.After(time.Duration(1e9 * failTimes * RegistryConnDelay)): // 防止疯狂重连zk
vito.he's avatar
vito.he committed
				err = r.validateZookeeperClient()
				log.Info("ZkProviderRegistry.validateZookeeperClient(zkAddr{%s}) = error{%#v}",
					r.client.zkAddrs, jerrors.ErrorStack(err))
				if err == nil {
					// copy
AlexStocks's avatar
AlexStocks committed
vito.he's avatar
vito.he committed
					for _, confIf = range {
						services = append(services, confIf)
AlexStocks's avatar
AlexStocks committed
vito.he's avatar
vito.he committed

					flag = true
					for _, confIf = range services {
vito.he's avatar
vito.he committed
						err = r.register(confIf)
vito.he's avatar
vito.he committed
						if err != nil {
							log.Error("(ZkProviderRegistry)register(conf{%#v}) = error{%#v}",
								confIf, jerrors.ErrorStack(err))
							flag = false
					if flag {
				if MaxFailTimes <= failTimes {
					failTimes = MaxFailTimes
func (r *ZkRegistry) Register(conf config.URL) error {
	role, _ := strconv.Atoi(r.URL.GetParam(constant.ROLE_KEY, ""))
	switch role {
	case config.CONSUMER:
		ok = false
		_, ok =[conf.Key()]
		if ok {
			return jerrors.Errorf("Path{%s} has been registered", conf.Path)

		err = r.register(conf)
		if err != nil {
			return jerrors.Trace(err)

		r.cltLock.Lock()[conf.Key()] = conf
		log.Debug("(consumerZkConsumerRegistry)Register(conf{%#v})", conf)

		listener = r.listener
		if listener != nil {
			go listener.listenServiceEvent(conf)
	case config.PROVIDER:

		// 检验服务是否已经注册过
		ok = false
		// 注意此处与consumerZookeeperRegistry的差异,consumer用的是conf.Path,
		// 因为consumer要提供watch功能给selector使用, provider允许注册同一个service的多个group or version
vito.he's avatar
vito.he committed
		_, ok =[conf.Key()]
			return jerrors.Errorf("Path{%s} has been registered", conf.Key())

		err = r.register(conf)
		if err != nil {
			return jerrors.Annotatef(err, "register(conf:%+v)", conf)

vito.he's avatar
vito.he committed[conf.Key()] = conf

		log.Debug("(ZkProviderRegistry)Register(conf{%#v})", conf)

	return nil

func (r *ZkRegistry) register(c config.URL) error {
vito.he's avatar
vito.he committed
	var (
vito.he's avatar
vito.he committed
		params     url.Values
		urlPath    string
		rawURL     string
		encodedURL string
		dubboPath  string
		//conf       config.URL
vito.he's avatar
vito.he committed
AlexStocks's avatar
AlexStocks committed

vito.he's avatar
vito.he committed
	err = r.validateZookeeperClient()
	if err != nil {
		return jerrors.Trace(err)
	params = url.Values{}
vito.he's avatar
vito.he committed

	params.Add("pid", processID)
	params.Add("ip", localIP)
	//params.Add("timeout", fmt.Sprintf("%d", int64(r.Timeout)/1e6))
	role, _ := strconv.Atoi(r.URL.GetParam(constant.ROLE_KEY, ""))
	switch role {
	case config.PROVIDER:
vito.he's avatar
vito.he committed

		if c.Path == "" || len(c.Methods) == 0 {
			return jerrors.Errorf("conf{Path:%s, Methods:%s}", c.Path, c.Methods)
vito.he's avatar
vito.he committed
		// 先创建服务下面的provider node
		dubboPath = fmt.Sprintf("/dubbo%s/%s", c.Path, config.DubboNodes[config.PROVIDER])
AlexStocks's avatar
AlexStocks committed
vito.he's avatar
vito.he committed
		err = r.client.Create(dubboPath)
AlexStocks's avatar
AlexStocks committed
vito.he's avatar
vito.he committed
		if err != nil {
			log.Error("zkClient.create(path{%s}) = error{%#v}", dubboPath, jerrors.ErrorStack(err))
			return jerrors.Annotatef(err, "zkclient.Create(path:%s)", dubboPath)
		params.Add("anyhost", "true")

		// dubbo java consumer来启动找provider url时,因为category不匹配,会找不到provider,导致consumer启动不了,所以使用consumers&providers
		// DubboRole               = [...]string{"consumer", "", "", "provider"}
		// params.Add("category", (RoleType(PROVIDER)).Role())
		params.Add("category", (config.RoleType(config.PROVIDER)).String())
vito.he's avatar
vito.he committed
		params.Add("dubbo", "dubbo-provider-golang-"+version.Version)

		params.Add("side", (config.RoleType(config.PROVIDER)).Role())
vito.he's avatar
vito.he committed

		if len(c.Methods) == 0 {
			params.Add("methods", strings.Join(c.Methods, ","))
vito.he's avatar
vito.he committed
		log.Debug("provider zk url params:%#v", params)
vito.he's avatar
vito.he committed
		var host string
		if c.Ip == "" {
			host = localIP + ":" + c.Port
		} else {
			host = c.Ip + ":" + c.Port
vito.he's avatar
vito.he committed

		urlPath = c.Path
vito.he's avatar
vito.he committed
		if r.zkPath[urlPath] != 0 {
			urlPath += strconv.Itoa(r.zkPath[urlPath])
vito.he's avatar
vito.he committed
		rawURL = fmt.Sprintf("%s://%s%s?%s", c.Protocol, host, urlPath, params.Encode())
vito.he's avatar
vito.he committed
		encodedURL = url.QueryEscape(rawURL)

		// 把自己注册service providers
		dubboPath = fmt.Sprintf("/dubbo%s/%s", c.Path, (config.RoleType(config.PROVIDER)).String())
vito.he's avatar
vito.he committed
		log.Debug("provider path:%s, url:%s", dubboPath, rawURL)

	case config.CONSUMER:
		dubboPath = fmt.Sprintf("/dubbo%s/%s", c.Path, config.DubboNodes[config.CONSUMER])
AlexStocks's avatar
AlexStocks committed
vito.he's avatar
vito.he committed
		err = r.client.Create(dubboPath)
AlexStocks's avatar
AlexStocks committed
vito.he's avatar
vito.he committed
		if err != nil {
			log.Error("zkClient.create(path{%s}) = error{%v}", dubboPath, jerrors.ErrorStack(err))
			return jerrors.Trace(err)
		dubboPath = fmt.Sprintf("/dubbo%s/%s", c.Path, config.DubboNodes[config.PROVIDER])
AlexStocks's avatar
AlexStocks committed
vito.he's avatar
vito.he committed
		err = r.client.Create(dubboPath)
AlexStocks's avatar
AlexStocks committed
vito.he's avatar
vito.he committed
		if err != nil {
			log.Error("zkClient.create(path{%s}) = error{%v}", dubboPath, jerrors.ErrorStack(err))
			return jerrors.Trace(err)

vito.he's avatar
vito.he committed
		params.Add("protocol", c.Protocol)

		params.Add("category", (config.RoleType(config.CONSUMER)).String())
vito.he's avatar
vito.he committed
		params.Add("dubbo", "dubbogo-consumer-"+version.Version)

		rawURL = fmt.Sprintf("consumer://%s%s?%s", localIP, c.Path, params.Encode())
vito.he's avatar
vito.he committed
		encodedURL = url.QueryEscape(rawURL)

		dubboPath = fmt.Sprintf("/dubbo%s/%s", c.Path, (config.RoleType(config.CONSUMER)).String())
vito.he's avatar
vito.he committed
		log.Debug("consumer path:%s, url:%s", dubboPath, rawURL)
vito.he's avatar
vito.he committed
		return jerrors.Errorf("@c{%v} type is not referencer or provider", c)
vito.he's avatar
vito.he committed

	err = r.registerTempZookeeperNode(dubboPath, encodedURL)

	if err != nil {
		return jerrors.Annotatef(err, "registerTempZookeeperNode(path:%s, url:%s)", dubboPath, rawURL)
	return nil

func (r *ZkRegistry) registerTempZookeeperNode(root string, node string) error {
	var (
		err    error
		zkPath string

AlexStocks's avatar
AlexStocks committed
	defer r.cltLock.Unlock()
vito.he's avatar
vito.he committed
	err = r.client.Create(root)
	if err != nil {
		log.Error("zk.Create(root{%s}) = err{%v}", root, jerrors.ErrorStack(err))
		return jerrors.Trace(err)
	zkPath, err = r.client.RegisterTemp(root, node)
	if err != nil {
		log.Error("RegisterTempNode(root{%s}, node{%s}) = error{%v}", root, node, jerrors.ErrorStack(err))
		return jerrors.Annotatef(err, "RegisterTempNode(root{%s}, node{%s})", root, node)
	log.Debug("create a zookeeper node:%s", zkPath)

	return nil

func (r *ZkRegistry) closeRegisters() {
AlexStocks's avatar
AlexStocks committed
	defer r.cltLock.Unlock()
vito.he's avatar
vito.he committed
	log.Info("begin to close provider zk client")
	// 先关闭旧client,以关闭tmp node
	r.client = nil = nil

func (r *ZkRegistry) IsClosed() bool {
vito.he's avatar
vito.he committed
	select {
	case <-r.done:
		return true
		return false