Newer
Older
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
"github.com/samuel/go-zookeeper/zk"

AlexStocks
committed
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/common/logger"

AlexStocks
committed
"github.com/apache/dubbo-go/common/utils"
"github.com/apache/dubbo-go/registry"
"github.com/apache/dubbo-go/remoting/zookeeper"
RegistryZkClient = "zk registry"
func init() {
processID = fmt.Sprintf("%d", os.Getpid())
//plugins.PluggableRegistries["zookeeper"] = newZkRegistry
extension.SetRegistry("zookeeper", newZkRegistry)
/////////////////////////////////////
// zookeeper registry
/////////////////////////////////////
birth int64 // time of file birth, seconds since Epoch; 0 if unknown
wg sync.WaitGroup // wg+done for zk restart
done chan struct{}
client *zookeeper.ZookeeperClient
services map[string]common.URL // service name + protocol -> service config
listenerLock sync.Mutex
listener *zookeeper.ZkEventListener
dataListener *RegistryDataListener
configListener *RegistryConfigurationListener
zkPath map[string]int // key = protocol://ip:port/interface
func newZkRegistry(url *common.URL) (registry.Registry, error) {
URL: url,
birth: time.Now().UnixNano(),
done: make(chan struct{}),
zkPath: make(map[string]int),
err = zookeeper.ValidateZookeeperClient(r, zookeeper.WithZkName(RegistryZkClient))
go zookeeper.HandleClientRestart(r)
r.listener = zookeeper.NewZkEventListener(r.client)
r.configListener = NewRegistryConfigurationListener(r.client, r)
r.dataListener = NewRegistryDataListener(r.configListener)
type Options struct {
client *zookeeper.ZookeeperClient
}
type Option func(*Options)
func newMockZkRegistry(url *common.URL, opts ...zookeeper.Option) (*zk.TestCluster, *zkRegistry, error) {
c *zk.TestCluster
//event <-chan zk.Event
)
URL: url,
birth: time.Now().UnixNano(),
done: make(chan struct{}),
c, r.client, _, err = zookeeper.NewMockZookeeperClient("test", 15*time.Second, opts...)
if err != nil {
return nil, nil, err
}
r.wg.Add(1)
go zookeeper.HandleClientRestart(r)
r.listener = zookeeper.NewZkEventListener(r.client)
r.configListener = NewRegistryConfigurationListener(r.client, r)
r.dataListener = NewRegistryDataListener(r.configListener)
func (r *zkRegistry) ZkClient() *zookeeper.ZookeeperClient {
return r.client
}
func (r *zkRegistry) SetZkClient(client *zookeeper.ZookeeperClient) {
r.client = client
}
func (r *zkRegistry) ZkClientLock() *sync.Mutex {
return &r.cltLock
}
func (r *zkRegistry) WaitGroup() *sync.WaitGroup {
return &r.wg
}
func (r *zkRegistry) GetDone() chan struct{} {
return r.done
}
func (r *zkRegistry) GetUrl() common.URL {
return *r.URL
}
if r.configListener != nil {
r.configListener.Close()
close(r.done)
r.wg.Wait()
r.closeRegisters()
}
func (r *zkRegistry) RestartCallBack() bool {
// copy r.services
services := []common.URL{}
for _, confIf := range r.services {
services = append(services, confIf)
flag := true
for _, confIf := range services {
err := r.register(confIf)
if err != nil {
logger.Errorf("(ZkProviderRegistry)register(conf{%#v}) = error{%#v}",
confIf, perrors.WithStack(err))
flag = false
break
logger.Infof("success to re-register service :%v", confIf.Key())
return flag
func (r *zkRegistry) Register(conf common.URL) error {
var (
ok bool
err error
)
role, _ := strconv.Atoi(r.URL.GetParam(constant.ROLE_KEY, ""))
switch role {
r.cltLock.Lock()
_, ok = r.services[conf.Key()]
r.cltLock.Unlock()
if ok {
return perrors.Errorf("Path{%s} has been registered", conf.Path)
}
err = r.register(conf)
if err != nil {
}
r.cltLock.Lock()
r.services[conf.Key()] = conf
r.cltLock.Unlock()
logger.Debugf("(consumerZkConsumerRegistry)Register(conf{%#v})", conf)
// Check if the service has been registered
r.cltLock.Lock()
// Note the difference between consumer and consumerZookeeperRegistry (consumer use conf.Path).
// Because the consumer wants to provide monitoring functions for the selector,
// the provider allows multiple groups or versions of the same service to be registered.
r.cltLock.Unlock()
if ok {
return perrors.Errorf("Path{%s} has been registered", conf.Key())
}
err = r.register(conf)
if err != nil {
return perrors.WithMessagef(err, "register(conf:%+v)", conf)
}
r.cltLock.Lock()
r.cltLock.Unlock()
logger.Debugf("(ZkProviderRegistry)Register(conf{%#v})", conf)
}
return nil
}
func (r *zkRegistry) register(c common.URL) error {
err error
//revision string
params url.Values
rawURL string
encodedURL string
dubboPath string
err = zookeeper.ValidateZookeeperClient(r, zookeeper.WithZkName(RegistryZkClient))
for k, v := range c.Params {
params[k] = v
}
params.Add("pid", processID)
params.Add("ip", localIP)
//params.Add("timeout", fmt.Sprintf("%d", int64(r.Timeout)/1e6))
role, _ := strconv.Atoi(r.URL.GetParam(constant.ROLE_KEY, ""))
switch role {
return perrors.Errorf("conf{Path:%s, Methods:%s}", c.Path, c.Methods)
dubboPath = fmt.Sprintf("/dubbo/%s/%s", c.Service(), common.DubboNodes[common.PROVIDER])
logger.Errorf("zkClient.create(path{%s}) = error{%#v}", dubboPath, perrors.WithStack(err))
return perrors.WithMessagef(err, "zkclient.Create(path:%s)", dubboPath)
// Dubbo java consumer to start looking for the provider url,because the category does not match,
// the provider will not find, causing the consumer can not start, so we use consumers.
// params.Add("category", (RoleType(PROVIDER)).Role())
params.Add("category", (common.RoleType(common.PROVIDER)).String())
params.Add("dubbo", "dubbo-provider-golang-"+constant.Version)
params.Add("side", (common.RoleType(common.PROVIDER)).Role())
if len(c.Methods) == 0 {
params.Add("methods", strings.Join(c.Methods, ","))
logger.Debugf("provider zk url params:%#v", params)
var host string
if c.Ip == "" {
host = localIP + ":" + c.Port
} else {
host = c.Ip + ":" + c.Port
}
rawURL = fmt.Sprintf("%s://%s%s?%s", c.Protocol, host, c.Path, params.Encode())
// Print your own registration service providers.
dubboPath = fmt.Sprintf("/dubbo/%s/%s", c.Service(), (common.RoleType(common.PROVIDER)).String())
logger.Debugf("provider path:%s, url:%s", dubboPath, rawURL)
dubboPath = fmt.Sprintf("/dubbo/%s/%s", c.Service(), common.DubboNodes[common.CONSUMER])
logger.Errorf("zkClient.create(path{%s}) = error{%v}", dubboPath, perrors.WithStack(err))
dubboPath = fmt.Sprintf("/dubbo/%s/%s", c.Service(), common.DubboNodes[common.PROVIDER])
logger.Errorf("zkClient.create(path{%s}) = error{%v}", dubboPath, perrors.WithStack(err))
params.Add("category", (common.RoleType(common.CONSUMER)).String())
rawURL = fmt.Sprintf("consumer://%s%s?%s", localIP, c.Path, params.Encode())
dubboPath = fmt.Sprintf("/dubbo/%s/%s", c.Service(), (common.RoleType(common.CONSUMER)).String())
logger.Debugf("consumer path:%s, url:%s", dubboPath, rawURL)
return perrors.Errorf("@c{%v} type is not referencer or provider", c)
}
err = r.registerTempZookeeperNode(dubboPath, encodedURL)
if err != nil {
return perrors.WithMessagef(err, "registerTempZookeeperNode(path:%s, url:%s)", dubboPath, rawURL)
func (r *zkRegistry) registerTempZookeeperNode(root string, node string) error {
logger.Errorf("zk.Create(root{%s}) = err{%v}", root, perrors.WithStack(err))
}
zkPath, err = r.client.RegisterTemp(root, node)
if err != nil {
logger.Errorf("RegisterTempNode(root{%s}, node{%s}) = error{%v}", root, node, perrors.WithStack(err))
return perrors.WithMessagef(err, "RegisterTempNode(root{%s}, node{%s})", root, node)
logger.Debugf("create a zookeeper node:%s", zkPath)
func (r *zkRegistry) Subscribe(conf common.URL) (registry.Listener, error) {
func (r *zkRegistry) getListener(conf common.URL) (*RegistryConfigurationListener, error) {
zkListener *RegistryConfigurationListener
zkListener = r.configListener
if r.listener == nil {
r.cltLock.Lock()
client := r.client
r.cltLock.Unlock()
if client == nil {
return nil, perrors.New("zk connection broken")
}
// new client & listener
listener := zookeeper.NewZkEventListener(r.client)
r.listenerLock.Lock()
r.listener = listener
r.listenerLock.Unlock()
}
//Interested register to dataconfig.
r.dataListener.AddInterestedURL(&conf)
go r.listener.ListenServiceEvent(fmt.Sprintf("/dubbo/%s/providers", conf.Service()), r.dataListener)
logger.Infof("begin to close provider zk client")
// Close the old client first to close the tmp node.
r.client.Close()
r.client = nil
r.services = nil
}
func (r *zkRegistry) IsAvailable() bool {
default:
return true