Skip to content
Snippets Groups Projects
registry.go 8.88 KiB
Newer Older
AlexStocks's avatar
AlexStocks committed
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
fangyincheng's avatar
fangyincheng committed

vito.he's avatar
vito.he committed
package zookeeper

import (
	"fmt"
	"net/url"
vito.he's avatar
vito.he committed
	"sync"
	"time"
)

pantianying's avatar
pantianying committed
	"github.com/dubbogo/go-zookeeper/zk"
fangyincheng's avatar
fangyincheng committed
	perrors "github.com/pkg/errors"
vito.he's avatar
vito.he committed
)
	"github.com/apache/dubbo-go/common"
	"github.com/apache/dubbo-go/common/constant"
	"github.com/apache/dubbo-go/common/extension"
	"github.com/apache/dubbo-go/common/logger"
	"github.com/apache/dubbo-go/remoting/zookeeper"
vito.he's avatar
vito.he committed
)

vito.he's avatar
vito.he committed
const (
	// RegistryZkClient zk client name
AlexStocks's avatar
AlexStocks committed
	RegistryZkClient = "zk registry"
vito.he's avatar
vito.he committed
)
vito.he's avatar
vito.he committed

func init() {
	extension.SetRegistry("zookeeper", newZkRegistry)
vito.he's avatar
vito.he committed
}

AlexStocks's avatar
AlexStocks committed
/////////////////////////////////////
// zookeeper registry
/////////////////////////////////////

type zkRegistry struct {
	registry.BaseRegistry
	client       *zookeeper.ZookeeperClient
	listenerLock sync.Mutex
	listener     *zookeeper.ZkEventListener
	dataListener *RegistryDataListener
	cltLock      sync.Mutex
vito.he's avatar
vito.he committed
	//for provider
	zkPath map[string]int // key = protocol://ip:port/interface
vito.he's avatar
vito.he committed
}

func newZkRegistry(url *common.URL) (registry.Registry, error) {
vito.he's avatar
vito.he committed
	var (
		err error
		r   *zkRegistry
vito.he's avatar
vito.he committed
	)
	r = &zkRegistry{
		zkPath: make(map[string]int),
vito.he's avatar
vito.he committed
	}
	r.InitBaseRegistry(url, r)
vito.he's avatar
vito.he committed

	err = zookeeper.ValidateZookeeperClient(r, zookeeper.WithZkName(RegistryZkClient))
vito.he's avatar
vito.he committed
	if err != nil {
		return nil, err
vito.he's avatar
vito.he committed
	}

	go zookeeper.HandleClientRestart(r)
vito.he's avatar
vito.he committed

	r.listener = zookeeper.NewZkEventListener(r.client)

	r.dataListener = NewRegistryDataListener()
vito.he's avatar
vito.he committed

vito.he's avatar
vito.he committed
	return r, nil
}
type Options struct {
	client *zookeeper.ZookeeperClient
}

type Option func(*Options)

func newMockZkRegistry(url *common.URL, opts ...zookeeper.Option) (*zk.TestCluster, *zkRegistry, error) {
	var (
		err error
		r   *zkRegistry
		c   *zk.TestCluster
		//event <-chan zk.Event
	)

	r = &zkRegistry{
		zkPath: make(map[string]int),
	r.InitBaseRegistry(url, r)
	c, r.client, _, err = zookeeper.NewMockZookeeperClient("test", 15*time.Second, opts...)
	if err != nil {
		return nil, nil, err
	}
	r.WaitGroup().Add(1) //zk client start successful, then wg +1
	r.InitListeners()
	return c, r, nil
}
邹毅贤's avatar
邹毅贤 committed
// InitListeners initializes listeners of zookeeper registry center
func (r *zkRegistry) InitListeners() {
	r.listener = zookeeper.NewZkEventListener(r.client)
	newDataListener := NewRegistryDataListener()
CodingSinger's avatar
CodingSinger committed
	// should recover if dataListener isn't nil before
	if r.dataListener != nil {
		// close all old listener
		oldDataListener := r.dataListener
		oldDataListener.mutex.Lock()
		defer oldDataListener.mutex.Unlock()
wangwx's avatar
wangwx committed
		r.dataListener.closed = true
		recovered := r.dataListener.subscribed
		if recovered != nil && len(recovered) > 0 {
CodingSinger's avatar
CodingSinger committed
			// recover all subscribed url
wangwx's avatar
wangwx committed
			for _, oldListener := range recovered {
邹毅贤's avatar
邹毅贤 committed
				var (
					regConfigListener *RegistryConfigurationListener
					ok                bool
				)
				if regConfigListener, ok = oldListener.(*RegistryConfigurationListener); ok {
邹毅贤's avatar
邹毅贤 committed
				newDataListener.SubscribeURL(regConfigListener.subscribeURL, NewRegistryConfigurationListener(r.client, r, regConfigListener.subscribeURL))
				go r.listener.ListenServiceEvent(regConfigListener.subscribeURL, fmt.Sprintf("/dubbo/%s/"+constant.DEFAULT_CATEGORY, url.QueryEscape(regConfigListener.subscribeURL.Service())), newDataListener)
	}
	r.dataListener = newDataListener
邹毅贤's avatar
邹毅贤 committed
// CreatePath creates the path in the registry center of zookeeper
func (r *zkRegistry) CreatePath(path string) error {
	return r.ZkClient().Create(path)
}

// DoRegister actually do the register job in the registry center of zookeeper
func (r *zkRegistry) DoRegister(root string, node string) error {
	return r.registerTempZookeeperNode(root, node)
}

func (r *zkRegistry) DoUnregister(root string, node string) error {
邹毅贤's avatar
邹毅贤 committed
	r.cltLock.Lock()
	defer r.cltLock.Unlock()
	if !r.ZkClient().ZkConnValid() {
		return perrors.Errorf("zk client is not valid.")
	}
	return r.ZkClient().Delete(path.Join(root, node))
}

邹毅贤's avatar
邹毅贤 committed
// DoSubscribe actually subscribes the provider URL
func (r *zkRegistry) DoSubscribe(conf *common.URL) (registry.Listener, error) {
	return r.getListener(conf)
高辛格's avatar
高辛格 committed

邹毅贤's avatar
邹毅贤 committed
func (r *zkRegistry) DoUnsubscribe(conf *common.URL) (registry.Listener, error) {
	return r.getCloseListener(conf)
// CloseAndNilClient closes listeners and clear client
func (r *zkRegistry) CloseAndNilClient() {
	r.client.Close()
	r.client = nil
}
邹毅贤's avatar
邹毅贤 committed
// nolint
func (r *zkRegistry) ZkClient() *zookeeper.ZookeeperClient {
	return r.client
}

邹毅贤's avatar
邹毅贤 committed
// nolint
func (r *zkRegistry) SetZkClient(client *zookeeper.ZookeeperClient) {
	r.client = client
}

邹毅贤's avatar
邹毅贤 committed
// nolint
func (r *zkRegistry) ZkClientLock() *sync.Mutex {
	return &r.cltLock
}

// CloseListener closes listeners
func (r *zkRegistry) CloseListener() {
	if r.dataListener != nil {
		r.dataListener.Close()
vito.he's avatar
vito.he committed
}

func (r *zkRegistry) registerTempZookeeperNode(root string, node string) error {
vito.he's avatar
vito.he committed
	var (
		err    error
		zkPath string
	)

AlexStocks's avatar
AlexStocks committed
	r.cltLock.Lock()
	defer r.cltLock.Unlock()
	if r.client == nil {
wangwx's avatar
wangwx committed
		return perrors.WithStack(perrors.New("zk client already been closed"))
	}
vito.he's avatar
vito.he committed
	err = r.client.Create(root)
	if err != nil {
fangyincheng's avatar
fangyincheng committed
		logger.Errorf("zk.Create(root{%s}) = err{%v}", root, perrors.WithStack(err))
fangyincheng's avatar
fangyincheng committed
		return perrors.WithStack(err)
vito.he's avatar
vito.he committed
	}
vito.he's avatar
vito.he committed
	zkPath, err = r.client.RegisterTemp(root, node)
	if err != nil {
		logger.Errorf("Register temp node(root{%s}, node{%s}) = error{%v}", root, node, perrors.WithStack(err))
		if perrors.Cause(err) == zk.ErrNodeExists {
			// should delete the old node
			logger.Info("Register temp node failed, try to delete the old and recreate  (root{%s}, node{%s}) , ignore!", root, node)
			if err = r.client.Delete(zkPath); err == nil {
				_, err = r.client.RegisterTemp(root, node)
			}
			if err != nil {
				logger.Errorf("Recreate the temp node failed, (root{%s}, node{%s}) = error{%v}", root, node, perrors.WithStack(err))
			}
fangyincheng's avatar
fangyincheng committed
		return perrors.WithMessagef(err, "RegisterTempNode(root{%s}, node{%s})", root, node)
vito.he's avatar
vito.he committed
	}
	logger.Debugf("Create a zookeeper node:%s", zkPath)
vito.he's avatar
vito.he committed

	return nil
}

func (r *zkRegistry) getListener(conf *common.URL) (*RegistryConfigurationListener, error) {
CodingSinger's avatar
CodingSinger committed

	var zkListener *RegistryConfigurationListener
wangwx's avatar
wangwx committed
	ttl := r.GetParam(constant.REGISTRY_TTL_KEY, constant.DEFAULT_REG_TTL)
	conf.SetParam(constant.REGISTRY_TTL_KEY, ttl)
	dataListener.mutex.Lock()
	defer dataListener.mutex.Unlock()
邹毅贤's avatar
邹毅贤 committed
	if r.dataListener.subscribed[conf.ServiceKey()] != nil {
CodingSinger's avatar
CodingSinger committed

邹毅贤's avatar
邹毅贤 committed
		zkListener, _ := r.dataListener.subscribed[conf.ServiceKey()].(*RegistryConfigurationListener)
		if zkListener != nil {
			r.listenerLock.Lock()
			defer r.listenerLock.Unlock()
			if zkListener.isClosed {
				return nil, perrors.New("configListener already been closed")
			} else {
				return zkListener, nil
			}
邹毅贤's avatar
邹毅贤 committed
	zkListener = NewRegistryConfigurationListener(r.client, r, conf)
	if r.listener == nil {
		r.cltLock.Lock()
		client := r.client
		r.cltLock.Unlock()
		if client == nil {
			return nil, perrors.New("zk connection broken")
		}
		// new client & listener
		listener := zookeeper.NewZkEventListener(r.client)
		r.listenerLock.Lock()
		r.listener = listener
		r.listenerLock.Unlock()
	}
	//Interested register to dataconfig.
	r.dataListener.SubscribeURL(conf, zkListener)

	go r.listener.ListenServiceEvent(conf, fmt.Sprintf("/dubbo/%s/"+constant.DEFAULT_CATEGORY, url.QueryEscape(conf.Service())), r.dataListener)

	return zkListener, nil
}
邹毅贤's avatar
邹毅贤 committed

func (r *zkRegistry) getCloseListener(conf *common.URL) (*RegistryConfigurationListener, error) {

	var zkListener *RegistryConfigurationListener
邹毅贤's avatar
邹毅贤 committed
	r.dataListener.mutex.Lock()
邹毅贤's avatar
邹毅贤 committed
	configurationListener := r.dataListener.subscribed[conf.ServiceKey()]
邹毅贤's avatar
邹毅贤 committed
	if configurationListener != nil {
		zkListener, _ := configurationListener.(*RegistryConfigurationListener)
邹毅贤's avatar
邹毅贤 committed
		if zkListener != nil {
			if zkListener.isClosed {
wangwx's avatar
wangwx committed
				r.dataListener.mutex.Unlock()
邹毅贤's avatar
邹毅贤 committed
				return nil, perrors.New("configListener already been closed")
			}
		}
	}

邹毅贤's avatar
邹毅贤 committed
	zkListener = r.dataListener.UnSubscribeURL(conf).(*RegistryConfigurationListener)
	r.dataListener.mutex.Unlock()

邹毅贤's avatar
邹毅贤 committed
	if r.listener == nil {
		return nil, perrors.New("listener is null can not close.")
	}

	//Interested register to dataconfig.
	r.listenerLock.Lock()
邹毅贤's avatar
邹毅贤 committed
	listener := r.listener
	r.listener = nil
邹毅贤's avatar
邹毅贤 committed
	r.listenerLock.Unlock()
邹毅贤's avatar
邹毅贤 committed
	r.dataListener.Close()
邹毅贤's avatar
邹毅贤 committed
	listener.Close()

邹毅贤's avatar
邹毅贤 committed
	return zkListener, nil
}