Newer
Older
"github.com/AlexStocks/goext/net"
log "github.com/AlexStocks/log4go"
jerrors "github.com/juju/errors"
"github.com/samuel/go-zookeeper/zk"
"github.com/dubbo/go-for-apache-dubbo/common/constant"
"github.com/dubbo/go-for-apache-dubbo/common/extension"
"github.com/dubbo/go-for-apache-dubbo/registry"
"github.com/dubbo/go-for-apache-dubbo/version"
defaultTimeout = int64(10e9)
RegistryZkClient = "zk registry"
RegistryConnDelay = 3
func init() {
processID = fmt.Sprintf("%d", os.Getpid())
localIP, _ = gxnet.GetLocalIP()
//plugins.PluggableRegistries["zookeeper"] = newZkRegistry
extension.SetRegistry("zookeeper", newZkRegistry)
/////////////////////////////////////
// zookeeper registry
/////////////////////////////////////
birth int64 // time of file birth, seconds since Epoch; 0 if unknown
wg sync.WaitGroup // wg+done for zk restart
done chan struct{}
services map[string]common.URL // service name + protocol -> service config
zkPath map[string]int // key = protocol://ip:port/interface
func newZkRegistry(url *common.URL) (registry.Registry, error) {
URL: url,
birth: time.Now().UnixNano(),
done: make(chan struct{}),
zkPath: make(map[string]int),
//if r.SubURL.Name == "" {
// r.SubURL.Name = RegistryZkClient
//}
//if r.Version == "" {
// r.Version = version.Version
//}
//if r.RoleType == registry.CONSUMER {
// r.wg.Add(1)
// go r.listen()
//}
func newMockZkRegistry(url *common.URL) (*zk.TestCluster, *zkRegistry, error) {
c *zk.TestCluster
//event <-chan zk.Event
)
URL: url,
birth: time.Now().UnixNano(),
done: make(chan struct{}),
zkPath: make(map[string]int),
}
c, r.client, _, err = newMockZookeeperClient("test", 15*time.Second)
if err != nil {
return nil, nil, err
}
r.wg.Add(1)
go r.handleZkRestart()
//if r.RoleType == registry.CONSUMER {
// r.wg.Add(1)
// go r.listen()
//}
return c, r, nil
}
func (r *zkRegistry) GetUrl() common.URL {
return *r.URL
}
if r.listener != nil {
r.listener.Close()
}
close(r.done)
r.wg.Wait()
r.closeRegisters()
}
func (r *zkRegistry) validateZookeeperClient() error {
//in dubbp ,every registry only connect one node ,so this is []string{r.Address}
timeout, err := time.ParseDuration(r.GetParam(constant.REGISTRY_TIMEOUT_KEY, constant.DEFAULT_REG_TIMEOUT))
if err != nil {
log.Error("timeout config %v is invalid ,err is %v",
r.GetParam(constant.REGISTRY_TIMEOUT_KEY, constant.DEFAULT_REG_TIMEOUT), err.Error())
return jerrors.Annotatef(err, "newZookeeperClient(address:%+v)", r.Location)
r.client, err = newZookeeperClient(RegistryZkClient, []string{r.Location}, timeout)
if err != nil {
log.Warn("newZookeeperClient(name{%s}, zk addresss{%v}, timeout{%d}) = error{%v}",
RegistryZkClient, r.Location, timeout.String(), err)
return jerrors.Annotatef(err, "newZookeeperClient(address:%+v)", r.Location)
if r.client.conn == nil {
var event <-chan zk.Event
r.client.conn, event, err = zk.Connect(r.client.zkAddrs, r.client.timeout)
if err == nil {
r.client.wait.Add(1)
go r.client.handleZkEvent(event)
}
}
return jerrors.Annotatef(err, "newZookeeperClient(address:%+v)", r.PrimitiveURL)
func (r *zkRegistry) handleZkRestart() {
)
defer r.wg.Done()
LOOP:
for {
select {
case <-r.done:
log.Warn("(ZkProviderRegistry)reconnectZkRegistry goroutine exit now...")
break LOOP
// re-register all services
case <-r.client.done():
// 接zk,直至成功
failTimes = 0
for {
select {
case <-r.done:
log.Warn("(ZkProviderRegistry)reconnectZkRegistry goroutine exit now...")
break LOOP
case <-time.After(time.Duration(1e9 * failTimes * RegistryConnDelay)): // 防止疯狂重连zk
}
err = r.validateZookeeperClient()
log.Info("ZkProviderRegistry.validateZookeeperClient(zkAddr{%s}) = error{%#v}",
r.client.zkAddrs, jerrors.ErrorStack(err))
if err == nil {
// copy r.services
for _, confIf = range r.services {
services = append(services, confIf)
}
flag = true
for _, confIf = range services {
if err != nil {
log.Error("(ZkProviderRegistry)register(conf{%#v}) = error{%#v}",
confIf, jerrors.ErrorStack(err))
flag = false
break
}
if MaxFailTimes <= failTimes {
failTimes = MaxFailTimes
func (r *zkRegistry) Register(conf common.URL) error {
var (
ok bool
err error
listener *zkEventListener
)
role, _ := strconv.Atoi(r.URL.GetParam(constant.ROLE_KEY, ""))
switch role {
ok = false
r.cltLock.Lock()
_, ok = r.services[conf.Key()]
r.cltLock.Unlock()
if ok {
return jerrors.Errorf("Path{%s} has been registered", conf.Path)
}
err = r.register(conf)
if err != nil {
return jerrors.Trace(err)
}
r.cltLock.Lock()
r.services[conf.Key()] = conf
r.cltLock.Unlock()
log.Debug("(consumerZkConsumerRegistry)Register(conf{%#v})", conf)
r.listenerLock.Lock()
listener = r.listener
r.listenerLock.Unlock()
if listener != nil {
go listener.listenServiceEvent(conf)
}
// 检验服务是否已经注册过
ok = false
r.cltLock.Lock()
// 注意此处与consumerZookeeperRegistry的差异,consumer用的是conf.Path,
// 因为consumer要提供watch功能给selector使用, provider允许注册同一个service的多个group or version
r.cltLock.Unlock()
if ok {
return jerrors.Errorf("Path{%s} has been registered", conf.Key())
}
err = r.register(conf)
if err != nil {
return jerrors.Annotatef(err, "register(conf:%+v)", conf)
}
r.cltLock.Lock()
r.cltLock.Unlock()
log.Debug("(ZkProviderRegistry)Register(conf{%#v})", conf)
}
return nil
}
func (r *zkRegistry) register(c common.URL) error {
err error
//revision string
params url.Values
urlPath string
rawURL string
encodedURL string
dubboPath string
err = r.validateZookeeperClient()
if err != nil {
return jerrors.Trace(err)
}
params = url.Values{}
for k, v := range c.Params {
params[k] = v
}
params.Add("pid", processID)
params.Add("ip", localIP)
//params.Add("timeout", fmt.Sprintf("%d", int64(r.Timeout)/1e6))
role, _ := strconv.Atoi(r.URL.GetParam(constant.ROLE_KEY, ""))
switch role {
if c.Path == "" || len(c.Methods) == 0 {
return jerrors.Errorf("conf{Path:%s, Methods:%s}", c.Path, c.Methods)
dubboPath = fmt.Sprintf("/dubbo%s/%s", c.Path, common.DubboNodes[common.PROVIDER])
if err != nil {
log.Error("zkClient.create(path{%s}) = error{%#v}", dubboPath, jerrors.ErrorStack(err))
return jerrors.Annotatef(err, "zkclient.Create(path:%s)", dubboPath)
}
params.Add("anyhost", "true")
// dubbo java consumer来启动找provider url时,因为category不匹配,会找不到provider,导致consumer启动不了,所以使用consumers&providers
// DubboRole = [...]string{"consumer", "", "", "provider"}
// params.Add("category", (RoleType(PROVIDER)).Role())
params.Add("category", (common.RoleType(common.PROVIDER)).String())
params.Add("dubbo", "dubbo-provider-golang-"+version.Version)
params.Add("side", (common.RoleType(common.PROVIDER)).Role())
if len(c.Methods) == 0 {
params.Add("methods", strings.Join(c.Methods, ","))
var host string
if c.Ip == "" {
host = localIP + ":" + c.Port
} else {
host = c.Ip + ":" + c.Port
}
if r.zkPath[urlPath] != 0 {
urlPath += strconv.Itoa(r.zkPath[urlPath])
}
r.zkPath[urlPath]++
rawURL = fmt.Sprintf("%s://%s%s?%s", c.Protocol, host, urlPath, params.Encode())
encodedURL = url.QueryEscape(rawURL)
// 把自己注册service providers
dubboPath = fmt.Sprintf("/dubbo%s/%s", c.Path, (common.RoleType(common.PROVIDER)).String())
log.Debug("provider path:%s, url:%s", dubboPath, rawURL)
case common.CONSUMER:
dubboPath = fmt.Sprintf("/dubbo%s/%s", c.Path, common.DubboNodes[common.CONSUMER])
if err != nil {
log.Error("zkClient.create(path{%s}) = error{%v}", dubboPath, jerrors.ErrorStack(err))
return jerrors.Trace(err)
}
dubboPath = fmt.Sprintf("/dubbo%s/%s", c.Path, common.DubboNodes[common.PROVIDER])
if err != nil {
log.Error("zkClient.create(path{%s}) = error{%v}", dubboPath, jerrors.ErrorStack(err))
return jerrors.Trace(err)
}
params.Add("category", (common.RoleType(common.CONSUMER)).String())
params.Add("dubbo", "dubbogo-consumer-"+version.Version)
rawURL = fmt.Sprintf("consumer://%s%s?%s", localIP, c.Path, params.Encode())
dubboPath = fmt.Sprintf("/dubbo%s/%s", c.Path, (common.RoleType(common.CONSUMER)).String())
}
err = r.registerTempZookeeperNode(dubboPath, encodedURL)
if err != nil {
return jerrors.Annotatef(err, "registerTempZookeeperNode(path:%s, url:%s)", dubboPath, rawURL)
}
return nil
}
func (r *zkRegistry) registerTempZookeeperNode(root string, node string) error {
err = r.client.Create(root)
if err != nil {
log.Error("zk.Create(root{%s}) = err{%v}", root, jerrors.ErrorStack(err))
return jerrors.Trace(err)
}
zkPath, err = r.client.RegisterTemp(root, node)
if err != nil {
log.Error("RegisterTempNode(root{%s}, node{%s}) = error{%v}", root, node, jerrors.ErrorStack(err))
return jerrors.Annotatef(err, "RegisterTempNode(root{%s}, node{%s})", root, node)
}
log.Debug("create a zookeeper node:%s", zkPath)
return nil
}
func (r *zkRegistry) Subscribe(conf common.URL) (registry.Listener, error) {
r.wg.Add(1)
return r.getListener(conf)
}
func (r *zkRegistry) getListener(conf common.URL) (*zkEventListener, error) {
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
var (
zkListener *zkEventListener
)
r.listenerLock.Lock()
zkListener = r.listener
r.listenerLock.Unlock()
if zkListener != nil {
return zkListener, nil
}
r.cltLock.Lock()
client := r.client
r.cltLock.Unlock()
if client == nil {
return nil, jerrors.New("zk connection broken")
}
// new client & listener
zkListener = newZkEventListener(r, client)
r.listenerLock.Lock()
r.listener = zkListener
r.listenerLock.Unlock()
// listen
r.cltLock.Lock()
for _, svs := range r.services {
if svs.URLEqual(conf) {
go zkListener.listenServiceEvent(svs)
}
}
r.cltLock.Unlock()
return zkListener, nil
}
log.Info("begin to close provider zk client")
// 先关闭旧client,以关闭tmp node
r.client.Close()
r.client = nil
r.services = nil
}
func (r *zkRegistry) IsAvailable() bool {
default:
return true