Skip to content
Snippets Groups Projects
registry.go 10.7 KiB
Newer Older
vito.he's avatar
vito.he committed
package zookeeper

import (
vito.he's avatar
vito.he committed
	"context"
vito.he's avatar
vito.he committed
	"fmt"
	"github.com/dubbo/dubbo-go/common/constant"
vito.he's avatar
vito.he committed
	"net/url"
	"os"
	"strconv"
	"strings"
vito.he's avatar
vito.he committed
	"sync"
	"time"
)

vito.he's avatar
vito.he committed
	"github.com/AlexStocks/goext/net"
	log "github.com/AlexStocks/log4go"
	jerrors "github.com/juju/errors"
	"github.com/samuel/go-zookeeper/zk"
vito.he's avatar
vito.he committed
)
vito.he's avatar
vito.he committed
	"github.com/dubbo/dubbo-go/common/extension"
	"github.com/dubbo/dubbo-go/config"
vito.he's avatar
vito.he committed
	"github.com/dubbo/dubbo-go/registry"
	"github.com/dubbo/dubbo-go/version"
)

vito.he's avatar
vito.he committed
const (
	defaultTimeout    = int64(10e9)
	RegistryZkClient  = "zk registry"
	RegistryConnDelay = 3
vito.he's avatar
vito.he committed
)

vito.he's avatar
vito.he committed
var (
	processID = ""
	localIP   = ""
)
vito.he's avatar
vito.he committed

func init() {
	processID = fmt.Sprintf("%d", os.Getpid())
	localIP, _ = gxnet.GetLocalIP()
vito.he's avatar
vito.he committed
	//plugins.PluggableRegistries["zookeeper"] = NewZkRegistry
	extension.SetRegistry("zookeeper", NewZkRegistry)
vito.he's avatar
vito.he committed
}

AlexStocks's avatar
AlexStocks committed
/////////////////////////////////////
// zookeeper registry
/////////////////////////////////////

vito.he's avatar
vito.he committed
type ZkRegistry struct {
vito.he's avatar
vito.he committed
	context context.Context
vito.he's avatar
vito.he committed
	birth int64          // time of file birth, seconds since Epoch; 0 if unknown
	wg    sync.WaitGroup // wg+done for zk restart
	done  chan struct{}

AlexStocks's avatar
AlexStocks committed
	cltLock  sync.Mutex
	client   *zookeeperClient
	services map[string]config.URL // service name + protocol -> service config
AlexStocks's avatar
AlexStocks committed

vito.he's avatar
vito.he committed
	listenerLock sync.Mutex
	listener     *zkEventListener
AlexStocks's avatar
AlexStocks committed

vito.he's avatar
vito.he committed
	//for provider
	zkPath map[string]int // key = protocol://ip:port/interface
vito.he's avatar
vito.he committed
}

func NewZkRegistry(url *config.URL) (registry.Registry, error) {
vito.he's avatar
vito.he committed
	var (
		err error
		r   *ZkRegistry
	)

	r = &ZkRegistry{
		URL:      url,
		birth:    time.Now().UnixNano(),
		done:     make(chan struct{}),
		services: make(map[string]config.URL),
		zkPath:   make(map[string]int),
vito.he's avatar
vito.he committed
	}

	//if r.SubURL.Name == "" {
	//	r.SubURL.Name = RegistryZkClient
	//}
	//if r.Version == "" {
	//	r.Version = version.Version
	//}
vito.he's avatar
vito.he committed

	err = r.validateZookeeperClient()
	if err != nil {
		return nil, jerrors.Trace(err)
	}

	r.wg.Add(1)
	go r.handleZkRestart()
vito.he's avatar
vito.he committed

	//if r.RoleType == registry.CONSUMER {
	//	r.wg.Add(1)
	//	go r.listen()
	//}
vito.he's avatar
vito.he committed

vito.he's avatar
vito.he committed
	return r, nil
}
AlexStocks's avatar
AlexStocks committed

vito.he's avatar
vito.he committed
func (r *ZkRegistry) Close() {
	close(r.done)
	r.wg.Wait()
	r.closeRegisters()
}

func (r *ZkRegistry) validateZookeeperClient() error {
	var (
		err error
	)

	err = nil
AlexStocks's avatar
AlexStocks committed
	r.cltLock.Lock()
	defer r.cltLock.Unlock()
vito.he's avatar
vito.he committed
	if r.client == nil {
vito.he's avatar
vito.he committed
		//in dubbp ,every registry only connect one node ,so this is []string{r.Address}
		timeout, err := time.ParseDuration(r.GetParam(constant.REGISTRY_TIMEOUT_KEY, constant.DEFAULT_REG_TIMEOUT))
		if err != nil {
			log.Error("timeout config %v is invalid ,err is %v",
				r.GetParam(constant.REGISTRY_TIMEOUT_KEY, constant.DEFAULT_REG_TIMEOUT), err.Error())
			return jerrors.Annotatef(err, "newZookeeperClient(address:%+v)", r.Location)
		r.client, err = newZookeeperClient(RegistryZkClient, []string{r.Location}, timeout)
vito.he's avatar
vito.he committed
		if err != nil {
			log.Warn("newZookeeperClient(name{%s}, zk addresss{%v}, timeout{%d}) = error{%v}",
				RegistryZkClient, r.Location, timeout.String(), err)
			return jerrors.Annotatef(err, "newZookeeperClient(address:%+v)", r.Location)
vito.he's avatar
vito.he committed
		}
	}
vito.he's avatar
vito.he committed
	if r.client.conn == nil {
		var event <-chan zk.Event
		r.client.conn, event, err = zk.Connect(r.client.zkAddrs, r.client.timeout)
		if err != nil {
			r.client.wait.Add(1)
			go r.client.handleZkEvent(event)
		}
	}
vito.he's avatar
vito.he committed

	return jerrors.Annotatef(err, "newZookeeperClient(address:%+v)", r.PrimitiveURL)
vito.he's avatar
vito.he committed
}

func (r *ZkRegistry) handleZkRestart() {
	var (
		err       error
		flag      bool
		failTimes int
		confIf    config.URL
		services  []config.URL
vito.he's avatar
vito.he committed
	)

	defer r.wg.Done()
LOOP:
	for {
		select {
		case <-r.done:
			log.Warn("(ZkProviderRegistry)reconnectZkRegistry goroutine exit now...")
			break LOOP
			// re-register all services
		case <-r.client.done():
AlexStocks's avatar
AlexStocks committed
			r.cltLock.Lock()
vito.he's avatar
vito.he committed
			r.client.Close()
			r.client = nil
AlexStocks's avatar
AlexStocks committed
			r.cltLock.Unlock()
vito.he's avatar
vito.he committed

			// 接zk,直至成功
			failTimes = 0
			for {
				select {
				case <-r.done:
					log.Warn("(ZkProviderRegistry)reconnectZkRegistry goroutine exit now...")
					break LOOP
				case <-time.After(time.Duration(1e9 * failTimes * RegistryConnDelay)): // 防止疯狂重连zk
vito.he's avatar
vito.he committed
				}
				err = r.validateZookeeperClient()
				log.Info("ZkProviderRegistry.validateZookeeperClient(zkAddr{%s}) = error{%#v}",
					r.client.zkAddrs, jerrors.ErrorStack(err))
				if err == nil {
					// copy r.services
AlexStocks's avatar
AlexStocks committed
					r.cltLock.Lock()
vito.he's avatar
vito.he committed
					for _, confIf = range r.services {
						services = append(services, confIf)
					}
AlexStocks's avatar
AlexStocks committed
					r.cltLock.Unlock()
vito.he's avatar
vito.he committed

					flag = true
					for _, confIf = range services {
vito.he's avatar
vito.he committed
						err = r.register(confIf)
vito.he's avatar
vito.he committed
						if err != nil {
							log.Error("(ZkProviderRegistry)register(conf{%#v}) = error{%#v}",
								confIf, jerrors.ErrorStack(err))
							flag = false
							break
						}
					}
					if flag {
						break
					}
				}
				failTimes++
				if MaxFailTimes <= failTimes {
					failTimes = MaxFailTimes
func (r *ZkRegistry) Register(conf config.URL) error {
	role, _ := strconv.Atoi(r.URL.GetParam(constant.ROLE_KEY, ""))
	switch role {
	case config.CONSUMER:
		ok = false
		r.cltLock.Lock()
		_, ok = r.services[conf.Key()]
		r.cltLock.Unlock()
		if ok {
			return jerrors.Errorf("Path{%s} has been registered", conf.Path)
		}

		err = r.register(conf)
		if err != nil {
			return jerrors.Trace(err)
		}

		r.cltLock.Lock()
		r.services[conf.Key()] = conf
		r.cltLock.Unlock()
		log.Debug("(consumerZkConsumerRegistry)Register(conf{%#v})", conf)

		r.listenerLock.Lock()
		listener = r.listener
		r.listenerLock.Unlock()
		if listener != nil {
			go listener.listenServiceEvent(conf)
		}
	case config.PROVIDER:

		// 检验服务是否已经注册过
		ok = false
		r.cltLock.Lock()
		// 注意此处与consumerZookeeperRegistry的差异,consumer用的是conf.Path,
		// 因为consumer要提供watch功能给selector使用, provider允许注册同一个service的多个group or version
vito.he's avatar
vito.he committed
		_, ok = r.services[conf.Key()]
			return jerrors.Errorf("Path{%s} has been registered", conf.Key())
		}

		err = r.register(conf)
		if err != nil {
			return jerrors.Annotatef(err, "register(conf:%+v)", conf)
		}

		r.cltLock.Lock()
vito.he's avatar
vito.he committed
		r.services[conf.Key()] = conf
		r.cltLock.Unlock()

		log.Debug("(ZkProviderRegistry)Register(conf{%#v})", conf)
	}

	return nil
}

func (r *ZkRegistry) register(c config.URL) error {
vito.he's avatar
vito.he committed
	var (
vito.he's avatar
vito.he committed
		params     url.Values
		urlPath    string
		rawURL     string
		encodedURL string
		dubboPath  string
		//conf       config.URL
vito.he's avatar
vito.he committed
	)
AlexStocks's avatar
AlexStocks committed

vito.he's avatar
vito.he committed
	err = r.validateZookeeperClient()
	if err != nil {
		return jerrors.Trace(err)
	}
	params = url.Values{}
vito.he's avatar
vito.he committed

	params.Add("pid", processID)
	params.Add("ip", localIP)
	//params.Add("timeout", fmt.Sprintf("%d", int64(r.Timeout)/1e6))
	role, _ := strconv.Atoi(r.URL.GetParam(constant.ROLE_KEY, ""))
	switch role {
	case config.PROVIDER:
vito.he's avatar
vito.he committed

		if c.Path == "" || len(c.Methods) == 0 {
			return jerrors.Errorf("conf{Path:%s, Methods:%s}", c.Path, c.Methods)
vito.he's avatar
vito.he committed
		}
		// 先创建服务下面的provider node
		dubboPath = fmt.Sprintf("/dubbo%s/%s", c.Path, config.DubboNodes[config.PROVIDER])
AlexStocks's avatar
AlexStocks committed
		r.cltLock.Lock()
vito.he's avatar
vito.he committed
		err = r.client.Create(dubboPath)
AlexStocks's avatar
AlexStocks committed
		r.cltLock.Unlock()
vito.he's avatar
vito.he committed
		if err != nil {
			log.Error("zkClient.create(path{%s}) = error{%#v}", dubboPath, jerrors.ErrorStack(err))
			return jerrors.Annotatef(err, "zkclient.Create(path:%s)", dubboPath)
		}
		params.Add("anyhost", "true")

		// dubbo java consumer来启动找provider url时,因为category不匹配,会找不到provider,导致consumer启动不了,所以使用consumers&providers
		// DubboRole               = [...]string{"consumer", "", "", "provider"}
		// params.Add("category", (RoleType(PROVIDER)).Role())
		params.Add("category", (config.RoleType(config.PROVIDER)).String())
vito.he's avatar
vito.he committed
		params.Add("dubbo", "dubbo-provider-golang-"+version.Version)

		params.Add("side", (config.RoleType(config.PROVIDER)).Role())
vito.he's avatar
vito.he committed

		if len(c.Methods) == 0 {
			params.Add("methods", strings.Join(c.Methods, ","))
vito.he's avatar
vito.he committed
		}
		log.Debug("provider zk url params:%#v", params)
vito.he's avatar
vito.he committed
		var host string
		if c.Ip == "" {
			host = localIP + ":" + c.Port
		} else {
			host = c.Ip + ":" + c.Port
		}
vito.he's avatar
vito.he committed

		urlPath = c.Path
vito.he's avatar
vito.he committed
		if r.zkPath[urlPath] != 0 {
			urlPath += strconv.Itoa(r.zkPath[urlPath])
		}
		r.zkPath[urlPath]++
vito.he's avatar
vito.he committed
		rawURL = fmt.Sprintf("%s://%s%s?%s", c.Protocol, host, urlPath, params.Encode())
vito.he's avatar
vito.he committed
		encodedURL = url.QueryEscape(rawURL)

		// 把自己注册service providers
		dubboPath = fmt.Sprintf("/dubbo%s/%s", c.Path, (config.RoleType(config.PROVIDER)).String())
vito.he's avatar
vito.he committed
		log.Debug("provider path:%s, url:%s", dubboPath, rawURL)

	case config.CONSUMER:
		dubboPath = fmt.Sprintf("/dubbo%s/%s", c.Path, config.DubboNodes[config.CONSUMER])
AlexStocks's avatar
AlexStocks committed
		r.cltLock.Lock()
vito.he's avatar
vito.he committed
		err = r.client.Create(dubboPath)
AlexStocks's avatar
AlexStocks committed
		r.cltLock.Unlock()
vito.he's avatar
vito.he committed
		if err != nil {
			log.Error("zkClient.create(path{%s}) = error{%v}", dubboPath, jerrors.ErrorStack(err))
			return jerrors.Trace(err)
		}
		dubboPath = fmt.Sprintf("/dubbo%s/%s", c.Path, config.DubboNodes[config.PROVIDER])
AlexStocks's avatar
AlexStocks committed
		r.cltLock.Lock()
vito.he's avatar
vito.he committed
		err = r.client.Create(dubboPath)
AlexStocks's avatar
AlexStocks committed
		r.cltLock.Unlock()
vito.he's avatar
vito.he committed
		if err != nil {
			log.Error("zkClient.create(path{%s}) = error{%v}", dubboPath, jerrors.ErrorStack(err))
			return jerrors.Trace(err)
		}

vito.he's avatar
vito.he committed
		params.Add("protocol", c.Protocol)

		params.Add("category", (config.RoleType(config.CONSUMER)).String())
vito.he's avatar
vito.he committed
		params.Add("dubbo", "dubbogo-consumer-"+version.Version)

		rawURL = fmt.Sprintf("consumer://%s%s?%s", localIP, c.Path, params.Encode())
vito.he's avatar
vito.he committed
		encodedURL = url.QueryEscape(rawURL)

		dubboPath = fmt.Sprintf("/dubbo%s/%s", c.Path, (config.RoleType(config.CONSUMER)).String())
vito.he's avatar
vito.he committed
		log.Debug("consumer path:%s, url:%s", dubboPath, rawURL)
vito.he's avatar
vito.he committed
		return jerrors.Errorf("@c{%v} type is not referencer or provider", c)
vito.he's avatar
vito.he committed
	}

	err = r.registerTempZookeeperNode(dubboPath, encodedURL)

	if err != nil {
		return jerrors.Annotatef(err, "registerTempZookeeperNode(path:%s, url:%s)", dubboPath, rawURL)
	}
	return nil
}

func (r *ZkRegistry) registerTempZookeeperNode(root string, node string) error {
	var (
		err    error
		zkPath string
	)

AlexStocks's avatar
AlexStocks committed
	r.cltLock.Lock()
	defer r.cltLock.Unlock()
vito.he's avatar
vito.he committed
	err = r.client.Create(root)
	if err != nil {
		log.Error("zk.Create(root{%s}) = err{%v}", root, jerrors.ErrorStack(err))
		return jerrors.Trace(err)
	}
	zkPath, err = r.client.RegisterTemp(root, node)
	if err != nil {
		log.Error("RegisterTempNode(root{%s}, node{%s}) = error{%v}", root, node, jerrors.ErrorStack(err))
		return jerrors.Annotatef(err, "RegisterTempNode(root{%s}, node{%s})", root, node)
	}
	log.Debug("create a zookeeper node:%s", zkPath)

	return nil
}

func (r *ZkRegistry) closeRegisters() {
AlexStocks's avatar
AlexStocks committed
	r.cltLock.Lock()
	defer r.cltLock.Unlock()
vito.he's avatar
vito.he committed
	log.Info("begin to close provider zk client")
	// 先关闭旧client,以关闭tmp node
	r.client.Close()
	r.client = nil
	r.services = nil
}

func (r *ZkRegistry) IsClosed() bool {
vito.he's avatar
vito.he committed
	select {
	case <-r.done:
		return true
	default:
		return false
	}
}