Newer
Older
"github.com/AlexStocks/goext/net"
log "github.com/AlexStocks/log4go"
jerrors "github.com/juju/errors"
"github.com/samuel/go-zookeeper/zk"
"github.com/dubbo/go-for-apache-dubbo/common/constant"
"github.com/dubbo/go-for-apache-dubbo/common/extension"
"github.com/dubbo/go-for-apache-dubbo/config"
"github.com/dubbo/go-for-apache-dubbo/registry"
"github.com/dubbo/go-for-apache-dubbo/version"
defaultTimeout = int64(10e9)
RegistryZkClient = "zk registry"
RegistryConnDelay = 3
func init() {
processID = fmt.Sprintf("%d", os.Getpid())
localIP, _ = gxnet.GetLocalIP()
//plugins.PluggableRegistries["zookeeper"] = NewZkRegistry
extension.SetRegistry("zookeeper", NewZkRegistry)
/////////////////////////////////////
// zookeeper registry
/////////////////////////////////////
*config.URL
birth int64 // time of file birth, seconds since Epoch; 0 if unknown
wg sync.WaitGroup // wg+done for zk restart
done chan struct{}
services map[string]config.URL // service name + protocol -> service config
zkPath map[string]int // key = protocol://ip:port/interface
func NewZkRegistry(url *config.URL) (registry.Registry, error) {
var (
err error
r *ZkRegistry
)
r = &ZkRegistry{
URL: url,
birth: time.Now().UnixNano(),
done: make(chan struct{}),
services: make(map[string]config.URL),
zkPath: make(map[string]int),
//if r.SubURL.Name == "" {
// r.SubURL.Name = RegistryZkClient
//}
//if r.Version == "" {
// r.Version = version.Version
//}
//if r.RoleType == registry.CONSUMER {
// r.wg.Add(1)
// go r.listen()
//}
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
func NewMockZkRegistry(url *config.URL) (*zk.TestCluster, *ZkRegistry, error) {
var (
err error
r *ZkRegistry
c *zk.TestCluster
//event <-chan zk.Event
)
r = &ZkRegistry{
URL: url,
birth: time.Now().UnixNano(),
done: make(chan struct{}),
services: make(map[string]config.URL),
zkPath: make(map[string]int),
}
c, r.client, _, err = newMockZookeeperClient("test", 15*time.Second)
if err != nil {
return nil, nil, err
}
r.wg.Add(1)
go r.handleZkRestart()
//if r.RoleType == registry.CONSUMER {
// r.wg.Add(1)
// go r.listen()
//}
return c, r, nil
}
func (r *ZkRegistry) GetUrl() config.URL {
return *r.URL
}
func (r *ZkRegistry) Destroy() {
if r.listener != nil {
r.listener.Close()
}
close(r.done)
r.wg.Wait()
r.closeRegisters()
}
func (r *ZkRegistry) validateZookeeperClient() error {
var (
err error
)
err = nil
//in dubbp ,every registry only connect one node ,so this is []string{r.Address}
timeout, err := time.ParseDuration(r.GetParam(constant.REGISTRY_TIMEOUT_KEY, constant.DEFAULT_REG_TIMEOUT))
if err != nil {
log.Error("timeout config %v is invalid ,err is %v",
r.GetParam(constant.REGISTRY_TIMEOUT_KEY, constant.DEFAULT_REG_TIMEOUT), err.Error())
return jerrors.Annotatef(err, "newZookeeperClient(address:%+v)", r.Location)
r.client, err = newZookeeperClient(RegistryZkClient, []string{r.Location}, timeout)
if err != nil {
log.Warn("newZookeeperClient(name{%s}, zk addresss{%v}, timeout{%d}) = error{%v}",
RegistryZkClient, r.Location, timeout.String(), err)
return jerrors.Annotatef(err, "newZookeeperClient(address:%+v)", r.Location)
if r.client.conn == nil {
var event <-chan zk.Event
r.client.conn, event, err = zk.Connect(r.client.zkAddrs, r.client.timeout)
if err == nil {
r.client.wait.Add(1)
go r.client.handleZkEvent(event)
}
}
return jerrors.Annotatef(err, "newZookeeperClient(address:%+v)", r.PrimitiveURL)
}
func (r *ZkRegistry) handleZkRestart() {
var (
err error
flag bool
failTimes int
)
defer r.wg.Done()
LOOP:
for {
select {
case <-r.done:
log.Warn("(ZkProviderRegistry)reconnectZkRegistry goroutine exit now...")
break LOOP
// re-register all services
case <-r.client.done():
// 接zk,直至成功
failTimes = 0
for {
select {
case <-r.done:
log.Warn("(ZkProviderRegistry)reconnectZkRegistry goroutine exit now...")
break LOOP
case <-time.After(time.Duration(1e9 * failTimes * RegistryConnDelay)): // 防止疯狂重连zk
}
err = r.validateZookeeperClient()
log.Info("ZkProviderRegistry.validateZookeeperClient(zkAddr{%s}) = error{%#v}",
r.client.zkAddrs, jerrors.ErrorStack(err))
if err == nil {
// copy r.services
for _, confIf = range r.services {
services = append(services, confIf)
}
flag = true
for _, confIf = range services {
if err != nil {
log.Error("(ZkProviderRegistry)register(conf{%#v}) = error{%#v}",
confIf, jerrors.ErrorStack(err))
flag = false
break
}
if MaxFailTimes <= failTimes {
failTimes = MaxFailTimes
func (r *ZkRegistry) Register(conf config.URL) error {
var (
ok bool
err error
listener *zkEventListener
)
role, _ := strconv.Atoi(r.URL.GetParam(constant.ROLE_KEY, ""))
switch role {
ok = false
r.cltLock.Lock()
_, ok = r.services[conf.Key()]
r.cltLock.Unlock()
if ok {
return jerrors.Errorf("Path{%s} has been registered", conf.Path)
}
err = r.register(conf)
if err != nil {
return jerrors.Trace(err)
}
r.cltLock.Lock()
r.services[conf.Key()] = conf
r.cltLock.Unlock()
log.Debug("(consumerZkConsumerRegistry)Register(conf{%#v})", conf)
r.listenerLock.Lock()
listener = r.listener
r.listenerLock.Unlock()
if listener != nil {
go listener.listenServiceEvent(conf)
}
// 检验服务是否已经注册过
ok = false
r.cltLock.Lock()
// 注意此处与consumerZookeeperRegistry的差异,consumer用的是conf.Path,
// 因为consumer要提供watch功能给selector使用, provider允许注册同一个service的多个group or version
r.cltLock.Unlock()
if ok {
return jerrors.Errorf("Path{%s} has been registered", conf.Key())
}
err = r.register(conf)
if err != nil {
return jerrors.Annotatef(err, "register(conf:%+v)", conf)
}
r.cltLock.Lock()
r.cltLock.Unlock()
log.Debug("(ZkProviderRegistry)Register(conf{%#v})", conf)
}
return nil
}
func (r *ZkRegistry) register(c config.URL) error {
err error
//revision string
params url.Values
urlPath string
rawURL string
encodedURL string
dubboPath string
err = r.validateZookeeperClient()
if err != nil {
return jerrors.Trace(err)
}
params = url.Values{}
for k, v := range c.Params {
params[k] = v
}
params.Add("pid", processID)
params.Add("ip", localIP)
//params.Add("timeout", fmt.Sprintf("%d", int64(r.Timeout)/1e6))
role, _ := strconv.Atoi(r.URL.GetParam(constant.ROLE_KEY, ""))
switch role {
if c.Path == "" || len(c.Methods) == 0 {
return jerrors.Errorf("conf{Path:%s, Methods:%s}", c.Path, c.Methods)
dubboPath = fmt.Sprintf("/dubbo%s/%s", c.Path, config.DubboNodes[config.PROVIDER])
if err != nil {
log.Error("zkClient.create(path{%s}) = error{%#v}", dubboPath, jerrors.ErrorStack(err))
return jerrors.Annotatef(err, "zkclient.Create(path:%s)", dubboPath)
}
params.Add("anyhost", "true")
// dubbo java consumer来启动找provider url时,因为category不匹配,会找不到provider,导致consumer启动不了,所以使用consumers&providers
// DubboRole = [...]string{"consumer", "", "", "provider"}
// params.Add("category", (RoleType(PROVIDER)).Role())
params.Add("category", (config.RoleType(config.PROVIDER)).String())
params.Add("dubbo", "dubbo-provider-golang-"+version.Version)
params.Add("side", (config.RoleType(config.PROVIDER)).Role())
if len(c.Methods) == 0 {
params.Add("methods", strings.Join(c.Methods, ","))
var host string
if c.Ip == "" {
host = localIP + ":" + c.Port
} else {
host = c.Ip + ":" + c.Port
}
if r.zkPath[urlPath] != 0 {
urlPath += strconv.Itoa(r.zkPath[urlPath])
}
r.zkPath[urlPath]++
rawURL = fmt.Sprintf("%s://%s%s?%s", c.Protocol, host, urlPath, params.Encode())
encodedURL = url.QueryEscape(rawURL)
// 把自己注册service providers
dubboPath = fmt.Sprintf("/dubbo%s/%s", c.Path, (config.RoleType(config.PROVIDER)).String())
log.Debug("provider path:%s, url:%s", dubboPath, rawURL)
dubboPath = fmt.Sprintf("/dubbo%s/%s", c.Path, config.DubboNodes[config.CONSUMER])
if err != nil {
log.Error("zkClient.create(path{%s}) = error{%v}", dubboPath, jerrors.ErrorStack(err))
return jerrors.Trace(err)
}
dubboPath = fmt.Sprintf("/dubbo%s/%s", c.Path, config.DubboNodes[config.PROVIDER])
if err != nil {
log.Error("zkClient.create(path{%s}) = error{%v}", dubboPath, jerrors.ErrorStack(err))
return jerrors.Trace(err)
}
params.Add("category", (config.RoleType(config.CONSUMER)).String())
params.Add("dubbo", "dubbogo-consumer-"+version.Version)
rawURL = fmt.Sprintf("consumer://%s%s?%s", localIP, c.Path, params.Encode())
dubboPath = fmt.Sprintf("/dubbo%s/%s", c.Path, (config.RoleType(config.CONSUMER)).String())
}
err = r.registerTempZookeeperNode(dubboPath, encodedURL)
if err != nil {
return jerrors.Annotatef(err, "registerTempZookeeperNode(path:%s, url:%s)", dubboPath, rawURL)
}
return nil
}
func (r *ZkRegistry) registerTempZookeeperNode(root string, node string) error {
var (
err error
zkPath string
)
err = r.client.Create(root)
if err != nil {
log.Error("zk.Create(root{%s}) = err{%v}", root, jerrors.ErrorStack(err))
return jerrors.Trace(err)
}
zkPath, err = r.client.RegisterTemp(root, node)
if err != nil {
log.Error("RegisterTempNode(root{%s}, node{%s}) = error{%v}", root, node, jerrors.ErrorStack(err))
return jerrors.Annotatef(err, "RegisterTempNode(root{%s}, node{%s})", root, node)
}
log.Debug("create a zookeeper node:%s", zkPath)
return nil
}
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
func (r *ZkRegistry) Subscribe(conf config.URL) (registry.Listener, error) {
r.wg.Add(1)
return r.getListener(conf)
}
func (r *ZkRegistry) getListener(conf config.URL) (*zkEventListener, error) {
var (
zkListener *zkEventListener
)
r.listenerLock.Lock()
zkListener = r.listener
r.listenerLock.Unlock()
if zkListener != nil {
return zkListener, nil
}
r.cltLock.Lock()
client := r.client
r.cltLock.Unlock()
if client == nil {
return nil, jerrors.New("zk connection broken")
}
// new client & listener
zkListener = newZkEventListener(r, client)
r.listenerLock.Lock()
r.listener = zkListener
r.listenerLock.Unlock()
// listen
r.cltLock.Lock()
for _, svs := range r.services {
if svs.URLEqual(conf) {
go zkListener.listenServiceEvent(svs)
}
}
r.cltLock.Unlock()
return zkListener, nil
}
log.Info("begin to close provider zk client")
// 先关闭旧client,以关闭tmp node
r.client.Close()
r.client = nil
r.services = nil
}
func (r *ZkRegistry) IsAvailable() bool {
default:
return true