Skip to content
Snippets Groups Projects
Commit 89425d55 authored by Huxing Zhang's avatar Huxing Zhang Committed by GitHub
Browse files

Merge pull request #87 from hxmhlt/master

refactor registry.zookeeper & define config center module interface
parents 9fb3056e 988e0a0d
No related branches found
No related tags found
No related merge requests found
Showing with 963 additions and 539 deletions
......@@ -67,3 +67,8 @@ const (
OWNER_KEY = "owner"
ENVIRONMENT_KEY = "environment"
)
const (
CONFIG_NAMESPACE_KEY = "config.namespace"
CONFIG_TIMEOUT_KET = "config.timeout"
)
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package extension
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/config_center"
)
var (
configCenters = make(map[string]func(config *common.URL) (config_center.DynamicConfiguration, error))
)
func SetConfigCenter(name string, v func(config *common.URL) (config_center.DynamicConfiguration, error)) {
configCenters[name] = v
}
func GetConfigCenter(name string, config *common.URL) (config_center.DynamicConfiguration, error) {
if configCenters[name] == nil {
panic("config center for " + name + " is not existing, make sure you have import the package.")
}
return configCenters[name](config)
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package config_center
import (
"time"
)
import (
"github.com/apache/dubbo-go/remoting"
)
//////////////////////////////////////////
// DynamicConfiguration
//////////////////////////////////////////
const DEFAULT_GROUP = "dubbo"
const DEFAULT_CONFIG_TIMEOUT = "10s"
type DynamicConfiguration interface {
AddListener(string, remoting.ConfigurationListener, ...Option)
RemoveListener(string, remoting.ConfigurationListener, ...Option)
GetConfig(string, ...Option) string
GetConfigs(string, ...Option) string
}
type Options struct {
Group string
Timeout time.Duration
}
type Option func(*Options)
func WithGroup(group string) Option {
return func(opt *Options) {
opt.Group = group
}
}
func WithTimeout(time time.Duration) Option {
return func(opt *Options) {
opt.Timeout = time
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package zookeeper
import (
"sync"
)
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/config_center"
"github.com/apache/dubbo-go/remoting"
"github.com/apache/dubbo-go/remoting/zookeeper"
)
const ZK_CLIENT = "zk config_center"
type ZookeeperDynamicConfiguration struct {
url common.URL
rootPath string
wg sync.WaitGroup
cltLock sync.Mutex
done chan struct{}
client *zookeeper.ZookeeperClient
listenerLock sync.Mutex
listener *zookeeper.ZkEventListener
}
func NewZookeeperDynamicConfiguration(url common.URL) (config_center.DynamicConfiguration, error) {
c := &ZookeeperDynamicConfiguration{
url: url,
rootPath: "/" + url.GetParam(constant.CONFIG_NAMESPACE_KEY, config_center.DEFAULT_GROUP) + "/config",
}
err := zookeeper.ValidateZookeeperClient(c, zookeeper.WithZkName(ZK_CLIENT))
if err != nil {
return nil, err
}
c.wg.Add(1)
go zookeeper.HandleClientRestart(c)
c.listener = zookeeper.NewZkEventListener(c.client)
//c.configListener = NewRegistryConfigurationListener(c.client, c)
//c.dataListener = NewRegistryDataListener(c.configListener)
return c, nil
}
func (*ZookeeperDynamicConfiguration) AddListener(key string, listener remoting.ConfigurationListener, opions ...config_center.Option) {
}
func (*ZookeeperDynamicConfiguration) RemoveListener(key string, listener remoting.ConfigurationListener, opions ...config_center.Option) {
}
func (*ZookeeperDynamicConfiguration) GetConfig(key string, opions ...config_center.Option) string {
return ""
}
func (*ZookeeperDynamicConfiguration) GetConfigs(key string, opions ...config_center.Option) string {
return ""
}
func (r *ZookeeperDynamicConfiguration) ZkClient() *zookeeper.ZookeeperClient {
return r.client
}
func (r *ZookeeperDynamicConfiguration) SetZkClient(client *zookeeper.ZookeeperClient) {
r.client = client
}
func (r *ZookeeperDynamicConfiguration) ZkClientLock() *sync.Mutex {
return &r.cltLock
}
func (r *ZookeeperDynamicConfiguration) WaitGroup() *sync.WaitGroup {
return &r.wg
}
func (r *ZookeeperDynamicConfiguration) GetDone() chan struct{} {
return r.done
}
func (r *ZookeeperDynamicConfiguration) GetUrl() common.URL {
return r.url
}
func (r *ZookeeperDynamicConfiguration) Destroy() {
if r.listener != nil {
r.listener.Close()
}
close(r.done)
r.wg.Wait()
r.closeConfigs()
}
func (r *ZookeeperDynamicConfiguration) IsAvailable() bool {
select {
case <-r.done:
return false
default:
return true
}
}
func (r *ZookeeperDynamicConfiguration) closeConfigs() {
r.cltLock.Lock()
defer r.cltLock.Unlock()
logger.Infof("begin to close provider zk client")
// 先关闭旧client,以关闭tmp node
r.client.Close()
r.client = nil
}
func (r *ZookeeperDynamicConfiguration) RestartCallBack() bool {
return true
}
......@@ -18,6 +18,7 @@
package directory
import (
"github.com/apache/dubbo-go/remoting"
"sync"
"time"
)
......@@ -130,10 +131,10 @@ func (dir *registryDirectory) update(res *registry.ServiceEvent) {
func (dir *registryDirectory) refreshInvokers(res *registry.ServiceEvent) {
switch res.Action {
case registry.ServiceAdd:
case remoting.Add:
//dir.cacheService.Add(res.Path, dir.serviceTTL)
dir.cacheInvoker(res.Service)
case registry.ServiceDel:
case remoting.Del:
//dir.cacheService.Del(res.Path, dir.serviceTTL)
dir.uncacheInvoker(res.Service)
logger.Infof("selector delete service url{%s}", res.Service)
......
......@@ -19,6 +19,7 @@ package directory
import (
"context"
"github.com/apache/dubbo-go/remoting"
"net/url"
"strconv"
"testing"
......@@ -50,7 +51,7 @@ func TestSubscribe_Delete(t *testing.T) {
registryDirectory, mockRegistry := normalRegistryDir()
time.Sleep(1e9)
assert.Len(t, registryDirectory.cacheInvokers, 3)
mockRegistry.MockEvent(&registry.ServiceEvent{Action: registry.ServiceDel, Service: *common.NewURLWithOptions("TEST0", common.WithProtocol("dubbo"))})
mockRegistry.MockEvent(&registry.ServiceEvent{Action: remoting.Del, Service: *common.NewURLWithOptions("TEST0", common.WithProtocol("dubbo"))})
time.Sleep(1e9)
assert.Len(t, registryDirectory.cacheInvokers, 2)
}
......@@ -80,7 +81,7 @@ func TestSubscribe_Group(t *testing.T) {
urlmap.Set(constant.GROUP_KEY, "group1")
urlmap.Set(constant.CLUSTER_KEY, "failover") //to test merge url
for i := 0; i < 3; i++ {
mockRegistry.(*registry.MockRegistry).MockEvent(&registry.ServiceEvent{Action: registry.ServiceAdd, Service: *common.NewURLWithOptions("TEST"+strconv.FormatInt(int64(i), 10), common.WithProtocol("dubbo"),
mockRegistry.(*registry.MockRegistry).MockEvent(&registry.ServiceEvent{Action: remoting.Add, Service: *common.NewURLWithOptions("TEST"+strconv.FormatInt(int64(i), 10), common.WithProtocol("dubbo"),
common.WithParams(urlmap))})
}
//for group2
......@@ -88,7 +89,7 @@ func TestSubscribe_Group(t *testing.T) {
urlmap2.Set(constant.GROUP_KEY, "group2")
urlmap2.Set(constant.CLUSTER_KEY, "failover") //to test merge url
for i := 0; i < 3; i++ {
mockRegistry.(*registry.MockRegistry).MockEvent(&registry.ServiceEvent{Action: registry.ServiceAdd, Service: *common.NewURLWithOptions("TEST"+strconv.FormatInt(int64(i), 10), common.WithProtocol("dubbo"),
mockRegistry.(*registry.MockRegistry).MockEvent(&registry.ServiceEvent{Action: remoting.Add, Service: *common.NewURLWithOptions("TEST"+strconv.FormatInt(int64(i), 10), common.WithProtocol("dubbo"),
common.WithParams(urlmap2))})
}
......@@ -128,7 +129,7 @@ func normalRegistryDir() (*registryDirectory, *registry.MockRegistry) {
go registryDirectory.Subscribe(*common.NewURLWithOptions("testservice"))
for i := 0; i < 3; i++ {
mockRegistry.(*registry.MockRegistry).MockEvent(&registry.ServiceEvent{Action: registry.ServiceAdd, Service: *common.NewURLWithOptions("TEST"+strconv.FormatInt(int64(i), 10), common.WithProtocol("dubbo"))})
mockRegistry.(*registry.MockRegistry).MockEvent(&registry.ServiceEvent{Action: remoting.Add, Service: *common.NewURLWithOptions("TEST"+strconv.FormatInt(int64(i), 10), common.WithProtocol("dubbo"))})
}
return registryDirectory, mockRegistry.(*registry.MockRegistry)
}
......@@ -25,38 +25,19 @@ import (
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/remoting"
)
func init() {
rand.Seed(time.Now().UnixNano())
}
//////////////////////////////////////////
// service url event type
//////////////////////////////////////////
type ServiceEventType int
const (
ServiceAdd = iota
ServiceDel
)
var serviceEventTypeStrings = [...]string{
"add service",
"delete service",
}
func (t ServiceEventType) String() string {
return serviceEventTypeStrings[t]
}
//////////////////////////////////////////
// service event
//////////////////////////////////////////
type ServiceEvent struct {
Action ServiceEventType
Action remoting.EventType
Service common.URL
}
......
......@@ -19,295 +19,64 @@ package zookeeper
import (
"context"
"fmt"
"path"
"sync"
"time"
)
import (
perrors "github.com/pkg/errors"
"github.com/samuel/go-zookeeper/zk"
)
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/registry"
"github.com/apache/dubbo-go/remoting"
zk "github.com/apache/dubbo-go/remoting/zookeeper"
)
const (
MaxFailTimes = 15
)
type zkEvent struct {
res *registry.ServiceEvent
err error
type RegistryDataListener struct {
interestedURL []*common.URL
listener *RegistryConfigurationListener
}
func (e zkEvent) String() string {
return fmt.Sprintf("err:%s, res:%s", e.err, e.res)
func NewRegistryDataListener(listener *RegistryConfigurationListener) *RegistryDataListener {
return &RegistryDataListener{listener: listener, interestedURL: []*common.URL{}}
}
type zkEventListener struct {
client *zookeeperClient
events chan zkEvent
serviceMapLock sync.Mutex
serviceMap map[string]struct{}
wg sync.WaitGroup
registry *zkRegistry
func (l *RegistryDataListener) AddInterestedURL(url *common.URL) {
l.interestedURL = append(l.interestedURL, url)
}
func newZkEventListener(registry *zkRegistry, client *zookeeperClient) *zkEventListener {
return &zkEventListener{
client: client,
registry: registry,
events: make(chan zkEvent, 32),
serviceMap: make(map[string]struct{}),
func (l *RegistryDataListener) DataChange(eventType remoting.Event) bool {
serviceURL, err := common.NewURL(context.TODO(), eventType.Content)
if err != nil {
logger.Errorf("Listen NewURL(r{%s}) = error{%v}", eventType.Content, err)
return false
}
}
func (l *zkEventListener) listenServiceNodeEvent(zkPath string) bool {
l.wg.Add(1)
defer l.wg.Done()
var zkEvent zk.Event
for {
keyEventCh, err := l.client.existW(zkPath)
if err != nil {
logger.Errorf("existW{key:%s} = error{%v}", zkPath, err)
return false
}
select {
case zkEvent = <-keyEventCh:
logger.Warnf("get a zookeeper zkEvent{type:%s, server:%s, path:%s, state:%d-%s, err:%s}",
zkEvent.Type.String(), zkEvent.Server, zkEvent.Path, zkEvent.State, stateToString(zkEvent.State), zkEvent.Err)
switch zkEvent.Type {
case zk.EventNodeDataChanged:
logger.Warnf("zk.ExistW(key{%s}) = event{EventNodeDataChanged}", zkPath)
case zk.EventNodeCreated:
logger.Warnf("zk.ExistW(key{%s}) = event{EventNodeCreated}", zkPath)
case zk.EventNotWatching:
logger.Warnf("zk.ExistW(key{%s}) = event{EventNotWatching}", zkPath)
case zk.EventNodeDeleted:
logger.Warnf("zk.ExistW(key{%s}) = event{EventNodeDeleted}", zkPath)
return true
}
case <-l.client.done():
return false
for _, v := range l.interestedURL {
if serviceURL.URLEqual(*v) {
l.listener.Process(&remoting.ConfigChangeEvent{Value: serviceURL, ConfigType: eventType.Action})
return true
}
}
return false
}
func (l *zkEventListener) handleZkNodeEvent(zkPath string, children []string, conf common.URL) {
contains := func(s []string, e string) bool {
for _, a := range s {
if a == e {
return true
}
}
return false
}
newChildren, err := l.client.getChildren(zkPath)
if err != nil {
logger.Errorf("path{%s} child nodes changed, zk.Children() = error{%v}", zkPath, perrors.WithStack(err))
return
}
// a node was added -- listen the new node
var (
newNode string
serviceURL common.URL
)
for _, n := range newChildren {
if contains(children, n) {
continue
}
newNode = path.Join(zkPath, n)
logger.Infof("add zkNode{%s}", newNode)
//context.TODO
serviceURL, err = common.NewURL(context.TODO(), n)
if err != nil {
logger.Errorf("NewURL(%s) = error{%v}", n, perrors.WithStack(err))
continue
}
if !conf.URLEqual(serviceURL) {
logger.Warnf("serviceURL{%s} is not compatible with SubURL{%#v}", serviceURL.Key(), conf.Key())
continue
}
logger.Infof("add serviceURL{%s}", serviceURL)
l.events <- zkEvent{&registry.ServiceEvent{Action: registry.ServiceAdd, Service: serviceURL}, nil}
// listen l service node
go func(node string, serviceURL common.URL) {
logger.Infof("delete zkNode{%s}", node)
if l.listenServiceNodeEvent(node) {
logger.Infof("delete serviceURL{%s}", serviceURL)
l.events <- zkEvent{&registry.ServiceEvent{Action: registry.ServiceDel, Service: serviceURL}, nil}
}
logger.Warnf("listenSelf(zk path{%s}) goroutine exit now", zkPath)
}(newNode, serviceURL)
}
// old node was deleted
var oldNode string
for _, n := range children {
if contains(newChildren, n) {
continue
}
oldNode = path.Join(zkPath, n)
logger.Warnf("delete zkPath{%s}", oldNode)
serviceURL, err = common.NewURL(context.TODO(), n)
if !conf.URLEqual(serviceURL) {
logger.Warnf("serviceURL{%s} has been deleted is not compatible with SubURL{%#v}", serviceURL.Key(), conf.Key())
continue
}
logger.Warnf("delete serviceURL{%s}", serviceURL)
if err != nil {
logger.Errorf("NewURL(i{%s}) = error{%v}", n, perrors.WithStack(err))
continue
}
l.events <- zkEvent{&registry.ServiceEvent{Action: registry.ServiceDel, Service: serviceURL}, nil}
}
type RegistryConfigurationListener struct {
client *zk.ZookeeperClient
registry *zkRegistry
events chan *remoting.ConfigChangeEvent
}
func (l *zkEventListener) listenDirEvent(zkPath string, conf common.URL) {
l.wg.Add(1)
defer l.wg.Done()
var (
failTimes int
event chan struct{}
zkEvent zk.Event
)
event = make(chan struct{}, 4)
defer close(event)
for {
// get current children for a zkPath
children, childEventCh, err := l.client.getChildrenW(zkPath)
if err != nil {
failTimes++
if MaxFailTimes <= failTimes {
failTimes = MaxFailTimes
}
logger.Errorf("listenDirEvent(path{%s}) = error{%v}", zkPath, err)
// clear the event channel
CLEAR:
for {
select {
case <-event:
default:
break CLEAR
}
}
l.client.registerEvent(zkPath, &event)
select {
case <-time.After(timeSecondDuration(failTimes * RegistryConnDelay)):
l.client.unregisterEvent(zkPath, &event)
continue
case <-l.client.done():
l.client.unregisterEvent(zkPath, &event)
logger.Warnf("client.done(), listen(path{%s}, ReferenceConfig{%#v}) goroutine exit now...", zkPath, conf)
return
case <-event:
logger.Infof("get zk.EventNodeDataChange notify event")
l.client.unregisterEvent(zkPath, &event)
l.handleZkNodeEvent(zkPath, nil, conf)
continue
}
}
failTimes = 0
select {
case zkEvent = <-childEventCh:
logger.Warnf("get a zookeeper zkEvent{type:%s, server:%s, path:%s, state:%d-%s, err:%s}",
zkEvent.Type.String(), zkEvent.Server, zkEvent.Path, zkEvent.State, stateToString(zkEvent.State), zkEvent.Err)
if zkEvent.Type != zk.EventNodeChildrenChanged {
continue
}
l.handleZkNodeEvent(zkEvent.Path, children, conf)
case <-l.client.done():
logger.Warnf("client.done(), listen(path{%s}, ReferenceConfig{%#v}) goroutine exit now...", zkPath, conf)
return
}
}
func NewRegistryConfigurationListener(client *zk.ZookeeperClient, reg *zkRegistry) *RegistryConfigurationListener {
reg.wg.Add(1)
return &RegistryConfigurationListener{client: client, registry: reg, events: make(chan *remoting.ConfigChangeEvent, 32)}
}
// this func is invoked by ZkConsumerRegistry::Registe/ZkConsumerRegistry::get/ZkConsumerRegistry::getListener
// registry.go:Listen -> listenServiceEvent -> listenDirEvent -> listenServiceNodeEvent
// |
// --------> listenServiceNodeEvent
func (l *zkEventListener) listenServiceEvent(conf common.URL) {
var (
err error
zkPath string
dubboPath string
children []string
serviceURL common.URL
)
zkPath = fmt.Sprintf("/dubbo%s/providers", conf.Path)
l.serviceMapLock.Lock()
_, ok := l.serviceMap[zkPath]
l.serviceMapLock.Unlock()
if ok {
logger.Warnf("@zkPath %s has already been listened.", zkPath)
return
}
l.serviceMapLock.Lock()
l.serviceMap[zkPath] = struct{}{}
l.serviceMapLock.Unlock()
logger.Infof("listen dubbo provider path{%s} event and wait to get all provider zk nodes", zkPath)
children, err = l.client.getChildren(zkPath)
if err != nil {
children = nil
logger.Errorf("fail to get children of zk path{%s}", zkPath)
}
for _, c := range children {
serviceURL, err = common.NewURL(context.TODO(), c)
if err != nil {
logger.Errorf("NewURL(r{%s}) = error{%v}", c, err)
continue
}
if !conf.URLEqual(serviceURL) {
logger.Warnf("serviceURL %v is not compatible with SubURL %v", serviceURL.Key(), conf.Key())
continue
}
logger.Debugf("add serviceUrl{%s}", serviceURL)
l.events <- zkEvent{&registry.ServiceEvent{Action: registry.ServiceAdd, Service: serviceURL}, nil}
// listen l service node
dubboPath = path.Join(zkPath, c)
logger.Infof("listen dubbo service key{%s}", dubboPath)
go func(zkPath string, serviceURL common.URL) {
if l.listenServiceNodeEvent(dubboPath) {
logger.Debugf("delete serviceUrl{%s}", serviceURL)
l.events <- zkEvent{&registry.ServiceEvent{Action: registry.ServiceDel, Service: serviceURL}, nil}
}
logger.Warnf("listenSelf(zk path{%s}) goroutine exit now", zkPath)
}(dubboPath, serviceURL)
}
logger.Infof("listen dubbo path{%s}", zkPath)
go func(zkPath string, conf common.URL) {
l.listenDirEvent(zkPath, conf)
logger.Warnf("listenDirEvent(zkPath{%s}) goroutine exit now", zkPath)
}(zkPath, conf)
func (l *RegistryConfigurationListener) Process(configType *remoting.ConfigChangeEvent) {
l.events <- configType
}
func (l *zkEventListener) Next() (*registry.ServiceEvent, error) {
func (l *RegistryConfigurationListener) Next() (*registry.ServiceEvent, error) {
for {
select {
case <-l.client.done():
case <-l.client.Done():
logger.Warnf("listener's zk client connection is broken, so zk event listener exit now.")
return nil, perrors.New("listener stopped")
......@@ -317,29 +86,21 @@ func (l *zkEventListener) Next() (*registry.ServiceEvent, error) {
case e := <-l.events:
logger.Debugf("got zk event %s", e)
if e.err != nil {
return nil, perrors.WithStack(e.err)
}
if e.res.Action == registry.ServiceDel && !l.valid() {
logger.Warnf("update @result{%s}. But its connection to registry is invalid", e.res)
if e.ConfigType == remoting.Del && !l.valid() {
logger.Warnf("update @result{%s}. But its connection to registry is invalid", e.Value)
continue
}
//r.update(e.res)
//write to invoker
//r.outerEventCh <- e.res
return e.res, nil
return &registry.ServiceEvent{Action: e.ConfigType, Service: e.Value.(common.URL)}, nil
}
}
}
func (l *zkEventListener) valid() bool {
return l.client.zkConnValid()
func (l *RegistryConfigurationListener) Close() {
l.registry.wg.Done()
}
func (l *zkEventListener) Close() {
l.registry.listenerLock.Lock()
l.client.Close()
l.registry.listenerLock.Unlock()
l.registry.wg.Done()
l.wg.Wait()
func (l *RegistryConfigurationListener) valid() bool {
return l.client.ZkConnValid()
}
......@@ -40,13 +40,12 @@ import (
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/common/utils"
"github.com/apache/dubbo-go/registry"
"github.com/apache/dubbo-go/remoting/zookeeper"
"github.com/apache/dubbo-go/version"
)
const (
defaultTimeout = int64(10e9)
RegistryZkClient = "zk registry"
RegistryConnDelay = 3
RegistryZkClient = "zk registry"
)
var (
......@@ -73,14 +72,16 @@ type zkRegistry struct {
done chan struct{}
cltLock sync.Mutex
client *zookeeperClient
client *zookeeper.ZookeeperClient
services map[string]common.URL // service name + protocol -> service config
listenerLock sync.Mutex
listener *zkEventListener
listenerLock sync.Mutex
listener *zookeeper.ZkEventListener
dataListener *RegistryDataListener
configListener *RegistryConfigurationListener
//for provider
zkPath map[string]int // key = protocol://ip:port/interface
}
func newZkRegistry(url *common.URL) (registry.Registry, error) {
......@@ -97,30 +98,28 @@ func newZkRegistry(url *common.URL) (registry.Registry, error) {
zkPath: make(map[string]int),
}
//if r.SubURL.Name == "" {
// r.SubURL.Name = RegistryZkClient
//}
//if r.Version == "" {
// r.Version = version.Version
//}
err = r.validateZookeeperClient()
err = zookeeper.ValidateZookeeperClient(r, zookeeper.WithZkName(RegistryZkClient))
if err != nil {
return nil, err
}
r.wg.Add(1)
go r.handleZkRestart()
go zookeeper.HandleClientRestart(r)
//if r.RoleType == registry.CONSUMER {
// r.wg.Add(1)
// go r.listen()
//}
r.listener = zookeeper.NewZkEventListener(r.client)
r.configListener = NewRegistryConfigurationListener(r.client, r)
r.dataListener = NewRegistryDataListener(r.configListener)
return r, nil
}
func newMockZkRegistry(url *common.URL) (*zk.TestCluster, *zkRegistry, error) {
type Options struct {
client *zookeeper.ZookeeperClient
}
type Option func(*Options)
func newMockZkRegistry(url *common.URL, opts ...zookeeper.Option) (*zk.TestCluster, *zkRegistry, error) {
var (
err error
r *zkRegistry
......@@ -136,139 +135,78 @@ func newMockZkRegistry(url *common.URL) (*zk.TestCluster, *zkRegistry, error) {
zkPath: make(map[string]int),
}
c, r.client, _, err = newMockZookeeperClient("test", 15*time.Second)
c, r.client, _, err = zookeeper.NewMockZookeeperClient("test", 15*time.Second, opts...)
if err != nil {
return nil, nil, err
}
r.wg.Add(1)
go r.handleZkRestart()
go zookeeper.HandleClientRestart(r)
//if r.RoleType == registry.CONSUMER {
// r.wg.Add(1)
// go r.listen()
//}
r.listener = zookeeper.NewZkEventListener(r.client)
r.configListener = NewRegistryConfigurationListener(r.client, r)
r.dataListener = NewRegistryDataListener(r.configListener)
return c, r, nil
}
func (r *zkRegistry) ZkClient() *zookeeper.ZookeeperClient {
return r.client
}
func (r *zkRegistry) SetZkClient(client *zookeeper.ZookeeperClient) {
r.client = client
}
func (r *zkRegistry) ZkClientLock() *sync.Mutex {
return &r.cltLock
}
func (r *zkRegistry) WaitGroup() *sync.WaitGroup {
return &r.wg
}
func (r *zkRegistry) GetDone() chan struct{} {
return r.done
}
func (r *zkRegistry) GetUrl() common.URL {
return *r.URL
}
func (r *zkRegistry) Destroy() {
if r.listener != nil {
r.listener.Close()
if r.configListener != nil {
r.configListener.Close()
}
close(r.done)
r.wg.Wait()
r.closeRegisters()
}
func (r *zkRegistry) validateZookeeperClient() error {
var (
err error
)
func (r *zkRegistry) RestartCallBack() bool {
err = nil
r.cltLock.Lock()
defer r.cltLock.Unlock()
if r.client == nil {
//in dubbp ,every registry only connect one node ,so this is []string{r.Address}
timeout, err := time.ParseDuration(r.GetParam(constant.REGISTRY_TIMEOUT_KEY, constant.DEFAULT_REG_TIMEOUT))
if err != nil {
logger.Errorf("timeout config %v is invalid ,err is %v",
r.GetParam(constant.REGISTRY_TIMEOUT_KEY, constant.DEFAULT_REG_TIMEOUT), err.Error())
return perrors.WithMessagef(err, "newZookeeperClient(address:%+v)", r.Location)
}
r.client, err = newZookeeperClient(RegistryZkClient, []string{r.Location}, timeout)
if err != nil {
logger.Warnf("newZookeeperClient(name{%s}, zk addresss{%v}, timeout{%d}) = error{%v}",
RegistryZkClient, r.Location, timeout.String(), err)
return perrors.WithMessagef(err, "newZookeeperClient(address:%+v)", r.Location)
}
}
if r.client.conn == nil {
var event <-chan zk.Event
r.client.conn, event, err = zk.Connect(r.client.zkAddrs, r.client.timeout)
if err == nil {
r.client.wait.Add(1)
go r.client.handleZkEvent(event)
}
// copy r.services
services := []common.URL{}
for _, confIf := range r.services {
services = append(services, confIf)
}
return perrors.WithMessagef(err, "newZookeeperClient(address:%+v)", r.PrimitiveURL)
}
func (r *zkRegistry) handleZkRestart() {
var (
err error
flag bool
failTimes int
confIf common.URL
)
defer r.wg.Done()
LOOP:
for {
select {
case <-r.done:
logger.Warnf("(ZkProviderRegistry)reconnectZkRegistry goroutine exit now...")
break LOOP
// re-register all services
case <-r.client.done():
r.cltLock.Lock()
r.client.Close()
r.client = nil
r.cltLock.Unlock()
// 接zk,直至成功
failTimes = 0
for {
select {
case <-r.done:
logger.Warnf("(ZkProviderRegistry)reconnectZkRegistry goroutine exit now...")
break LOOP
case <-time.After(time.Duration(1e9 * failTimes * RegistryConnDelay)): // 防止疯狂重连zk
}
err = r.validateZookeeperClient()
logger.Infof("ZkProviderRegistry.validateZookeeperClient(zkAddr{%s}) = error{%#v}",
r.client.zkAddrs, perrors.WithStack(err))
if err == nil {
// copy r.services
services := []common.URL{}
for _, confIf = range r.services {
services = append(services, confIf)
}
flag = true
for _, confIf = range services {
err = r.register(confIf)
if err != nil {
logger.Errorf("(ZkProviderRegistry)register(conf{%#v}) = error{%#v}",
confIf, perrors.WithStack(err))
flag = false
break
}
logger.Infof("success to re-register service :%v", confIf.Key())
}
if flag {
break
}
}
failTimes++
if MaxFailTimes <= failTimes {
failTimes = MaxFailTimes
}
}
flag := true
for _, confIf := range services {
err := r.register(confIf)
if err != nil {
logger.Errorf("(ZkProviderRegistry)register(conf{%#v}) = error{%#v}",
confIf, perrors.WithStack(err))
flag = false
break
}
logger.Infof("success to re-register service :%v", confIf.Key())
}
return flag
}
func (r *zkRegistry) Register(conf common.URL) error {
var (
ok bool
err error
listener *zkEventListener
ok bool
err error
)
role, _ := strconv.Atoi(r.URL.GetParam(constant.ROLE_KEY, ""))
switch role {
......@@ -291,12 +229,6 @@ func (r *zkRegistry) Register(conf common.URL) error {
r.cltLock.Unlock()
logger.Debugf("(consumerZkConsumerRegistry)Register(conf{%#v})", conf)
r.listenerLock.Lock()
listener = r.listener
r.listenerLock.Unlock()
if listener != nil {
go listener.listenServiceEvent(conf)
}
case common.PROVIDER:
// 检验服务是否已经注册过
......@@ -337,7 +269,7 @@ func (r *zkRegistry) register(c common.URL) error {
//conf config.URL
)
err = r.validateZookeeperClient()
err = zookeeper.ValidateZookeeperClient(r, zookeeper.WithZkName(RegistryZkClient))
if err != nil {
return perrors.WithStack(err)
}
......@@ -428,6 +360,7 @@ func (r *zkRegistry) register(c common.URL) error {
dubboPath = fmt.Sprintf("/dubbo%s/%s", c.Path, (common.RoleType(common.CONSUMER)).String())
logger.Debugf("consumer path:%s, url:%s", dubboPath, rawURL)
default:
return perrors.Errorf("@c{%v} type is not referencer or provider", c)
}
......@@ -464,44 +397,37 @@ func (r *zkRegistry) registerTempZookeeperNode(root string, node string) error {
}
func (r *zkRegistry) Subscribe(conf common.URL) (registry.Listener, error) {
r.wg.Add(1)
return r.getListener(conf)
}
func (r *zkRegistry) getListener(conf common.URL) (*zkEventListener, error) {
func (r *zkRegistry) getListener(conf common.URL) (*RegistryConfigurationListener, error) {
var (
zkListener *zkEventListener
zkListener *RegistryConfigurationListener
)
r.listenerLock.Lock()
zkListener = r.listener
zkListener = r.configListener
r.listenerLock.Unlock()
if zkListener != nil {
return zkListener, nil
}
if r.listener == nil {
r.cltLock.Lock()
client := r.client
r.cltLock.Unlock()
if client == nil {
return nil, perrors.New("zk connection broken")
}
r.cltLock.Lock()
client := r.client
r.cltLock.Unlock()
if client == nil {
return nil, perrors.New("zk connection broken")
}
// new client & listener
listener := zookeeper.NewZkEventListener(r.client)
// new client & listener
zkListener = newZkEventListener(r, client)
r.listenerLock.Lock()
r.listener = listener
r.listenerLock.Unlock()
}
r.listenerLock.Lock()
r.listener = zkListener
r.listenerLock.Unlock()
//注册到dataconfig的interested
r.dataListener.AddInterestedURL(&conf)
// listen
r.cltLock.Lock()
for _, svs := range r.services {
if svs.URLEqual(conf) {
go zkListener.listenServiceEvent(svs)
}
}
r.cltLock.Unlock()
go r.listener.ListenServiceEvent(fmt.Sprintf("/dubbo%s/providers", conf.Path), r.dataListener)
return zkListener, nil
}
......
......@@ -31,6 +31,7 @@ import (
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/remoting/zookeeper"
)
func Test_Register(t *testing.T) {
......@@ -40,7 +41,7 @@ func Test_Register(t *testing.T) {
ts, reg, err := newMockZkRegistry(&regurl)
defer ts.Stop()
err = reg.Register(url)
children, _ := reg.client.getChildren("/dubbo/com.ikurento.user.UserProvider/providers")
children, _ := reg.client.GetChildren("/dubbo/com.ikurento.user.UserProvider/providers")
assert.Regexp(t, ".*dubbo%3A%2F%2F127.0.0.1%3A20000%2Fcom.ikurento.user.UserProvider%3Fanyhost%3Dtrue%26category%3Dproviders%26cluster%3Dmock%26dubbo%3Ddubbo-provider-golang-2.6.0%26.*provider", children)
assert.NoError(t, err)
}
......@@ -49,7 +50,6 @@ func Test_Subscribe(t *testing.T) {
regurl, _ := common.NewURL(context.TODO(), "registry://127.0.0.1:1111", common.WithParamsValue(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER)))
url, _ := common.NewURL(context.TODO(), "dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider", common.WithParamsValue(constant.CLUSTER_KEY, "mock"), common.WithMethods([]string{"GetUser", "AddUser"}))
ts, reg, err := newMockZkRegistry(&regurl)
defer ts.Stop()
//provider register
err = reg.Register(url)
......@@ -61,8 +61,8 @@ func Test_Subscribe(t *testing.T) {
//consumer register
regurl.Params.Set(constant.ROLE_KEY, strconv.Itoa(common.CONSUMER))
_, reg2, err := newMockZkRegistry(&regurl)
reg2.client = reg.client
_, reg2, err := newMockZkRegistry(&regurl, zookeeper.WithTestCluster(ts))
err = reg2.Register(url)
listener, err := reg2.Subscribe(url)
......@@ -71,8 +71,8 @@ func Test_Subscribe(t *testing.T) {
if err != nil {
return
}
assert.Regexp(t, ".*ServiceEvent{Action{add service}.*", serviceEvent.String())
assert.Regexp(t, ".*ServiceEvent{Action{add}.*", serviceEvent.String())
defer ts.Stop()
}
func Test_ConsumerDestory(t *testing.T) {
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package remoting
import "fmt"
type ConfigurationListener interface {
Process(*ConfigChangeEvent)
}
type DataListener interface {
DataChange(eventType Event) bool //bool is return for interface implement is interesting
}
type ConfigChangeEvent struct {
Key string
Value interface{}
ConfigType EventType
}
func (c ConfigChangeEvent) String() string {
return fmt.Sprintf("ConfigChangeEvent{key = %v , value = %v , changeType = %v}", c.Key, c.Value, c.ConfigType)
}
//////////////////////////////////////////
// event type
//////////////////////////////////////////
type EventType int
const (
Add = iota
Del
)
var serviceEventTypeStrings = [...]string{
"add",
"delete",
}
func (t EventType) String() string {
return serviceEventTypeStrings[t]
}
//////////////////////////////////////////
// service event
//////////////////////////////////////////
type Event struct {
Path string
Action EventType
Content string
}
func (e Event) String() string {
return fmt.Sprintf("Event{Action{%s}, Content{%s}}", e.Action, e.Content)
}
......@@ -30,25 +30,31 @@ import (
)
import (
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/logger"
)
const (
ConnDelay = 3
MaxFailTimes = 15
)
var (
errNilZkClientConn = perrors.New("zookeeperclient{conn} is nil")
)
type zookeeperClient struct {
type ZookeeperClient struct {
name string
zkAddrs []string
ZkAddrs []string
sync.Mutex // for conn
conn *zk.Conn
timeout time.Duration
Conn *zk.Conn
Timeout time.Duration
exit chan struct{}
wait sync.WaitGroup
Wait sync.WaitGroup
eventRegistry map[string][]*chan struct{}
}
func stateToString(state zk.State) string {
func StateToString(state zk.State) string {
switch state {
case zk.StateDisconnected:
return "zookeeper disconnected"
......@@ -79,55 +85,128 @@ func stateToString(state zk.State) string {
return "zookeeper unknown state"
}
func timeSecondDuration(sec int) time.Duration {
return time.Duration(sec) * time.Second
type Options struct {
zkName string
client *ZookeeperClient
ts *zk.TestCluster
}
type Option func(*Options)
func WithZkName(name string) Option {
return func(opt *Options) {
opt.zkName = name
}
}
func newZookeeperClient(name string, zkAddrs []string, timeout time.Duration) (*zookeeperClient, error) {
func ValidateZookeeperClient(container ZkClientContainer, opts ...Option) error {
var (
err error
)
opions := &Options{}
for _, opt := range opts {
opt(opions)
}
err = nil
lock := container.ZkClientLock()
url := container.GetUrl()
lock.Lock()
defer lock.Unlock()
if container.ZkClient() == nil {
//in dubbp ,every registry only connect one node ,so this is []string{r.Address}
timeout, err := time.ParseDuration(url.GetParam(constant.REGISTRY_TIMEOUT_KEY, constant.DEFAULT_REG_TIMEOUT))
if err != nil {
logger.Errorf("timeout config %v is invalid ,err is %v",
url.GetParam(constant.REGISTRY_TIMEOUT_KEY, constant.DEFAULT_REG_TIMEOUT), err.Error())
return perrors.WithMessagef(err, "newZookeeperClient(address:%+v)", url.Location)
}
newClient, err := newZookeeperClient(opions.zkName, []string{url.Location}, timeout)
if err != nil {
logger.Warnf("newZookeeperClient(name{%s}, zk addresss{%v}, timeout{%d}) = error{%v}",
opions.zkName, url.Location, timeout.String(), err)
return perrors.WithMessagef(err, "newZookeeperClient(address:%+v)", url.Location)
}
container.SetZkClient(newClient)
}
if container.ZkClient().Conn == nil {
var event <-chan zk.Event
container.ZkClient().Conn, event, err = zk.Connect(container.ZkClient().ZkAddrs, container.ZkClient().Timeout)
if err == nil {
container.ZkClient().Wait.Add(1)
go container.ZkClient().HandleZkEvent(event)
}
}
return perrors.WithMessagef(err, "newZookeeperClient(address:%+v)", url.PrimitiveURL)
}
func newZookeeperClient(name string, zkAddrs []string, timeout time.Duration) (*ZookeeperClient, error) {
var (
err error
event <-chan zk.Event
z *zookeeperClient
z *ZookeeperClient
)
z = &zookeeperClient{
z = &ZookeeperClient{
name: name,
zkAddrs: zkAddrs,
timeout: timeout,
ZkAddrs: zkAddrs,
Timeout: timeout,
exit: make(chan struct{}),
eventRegistry: make(map[string][]*chan struct{}),
}
// connect to zookeeper
z.conn, event, err = zk.Connect(zkAddrs, timeout)
z.Conn, event, err = zk.Connect(zkAddrs, timeout)
if err != nil {
return nil, perrors.WithMessagef(err, "zk.Connect(zkAddrs:%+v)", zkAddrs)
}
z.wait.Add(1)
go z.handleZkEvent(event)
z.Wait.Add(1)
go z.HandleZkEvent(event)
return z, nil
}
func newMockZookeeperClient(name string, timeout time.Duration) (*zk.TestCluster, *zookeeperClient, <-chan zk.Event, error) {
func WithTestCluster(ts *zk.TestCluster) Option {
return func(opt *Options) {
opt.ts = ts
}
}
func NewMockZookeeperClient(name string, timeout time.Duration, opts ...Option) (*zk.TestCluster, *ZookeeperClient, <-chan zk.Event, error) {
var (
err error
event <-chan zk.Event
z *zookeeperClient
z *ZookeeperClient
ts *zk.TestCluster
)
z = &zookeeperClient{
z = &ZookeeperClient{
name: name,
zkAddrs: []string{},
timeout: timeout,
ZkAddrs: []string{},
Timeout: timeout,
exit: make(chan struct{}),
eventRegistry: make(map[string][]*chan struct{}),
}
// connect to zookeeper
ts, err := zk.StartTestCluster(1, nil, nil)
if err != nil {
return nil, nil, nil, perrors.WithMessagef(err, "zk.Connect")
opions := &Options{}
for _, opt := range opts {
opt(opions)
}
// connect to zookeeper
if opions.ts != nil {
ts = opions.ts
} else {
ts, err = zk.StartTestCluster(1, nil, nil)
if err != nil {
return nil, nil, nil, perrors.WithMessagef(err, "zk.Connect")
}
}
//callbackChan := make(chan zk.Event)
......@@ -135,7 +214,7 @@ func newMockZookeeperClient(name string, timeout time.Duration) (*zk.TestCluster
// callbackChan <- event
//}
z.conn, event, err = ts.ConnectWithOptions(timeout)
z.Conn, event, err = ts.ConnectWithOptions(timeout)
if err != nil {
return nil, nil, nil, perrors.WithMessagef(err, "zk.Connect")
}
......@@ -144,15 +223,15 @@ func newMockZookeeperClient(name string, timeout time.Duration) (*zk.TestCluster
return ts, z, event, nil
}
func (z *zookeeperClient) handleZkEvent(session <-chan zk.Event) {
func (z *ZookeeperClient) HandleZkEvent(session <-chan zk.Event) {
var (
state int
event zk.Event
)
defer func() {
z.wait.Done()
logger.Infof("zk{path:%v, name:%s} connection goroutine game over.", z.zkAddrs, z.name)
z.Wait.Done()
logger.Infof("zk{path:%v, name:%s} connection goroutine game over.", z.ZkAddrs, z.name)
}()
LOOP:
......@@ -162,15 +241,15 @@ LOOP:
break LOOP
case event = <-session:
logger.Warnf("client{%s} get a zookeeper event{type:%s, server:%s, path:%s, state:%d-%s, err:%v}",
z.name, event.Type, event.Server, event.Path, event.State, stateToString(event.State), event.Err)
z.name, event.Type, event.Server, event.Path, event.State, StateToString(event.State), event.Err)
switch (int)(event.State) {
case (int)(zk.StateDisconnected):
logger.Warnf("zk{addr:%s} state is StateDisconnected, so close the zk client{name:%s}.", z.zkAddrs, z.name)
logger.Warnf("zk{addr:%s} state is StateDisconnected, so close the zk client{name:%s}.", z.ZkAddrs, z.name)
z.stop()
z.Lock()
if z.conn != nil {
z.conn.Close()
z.conn = nil
if z.Conn != nil {
z.Conn.Close()
z.Conn = nil
}
z.Unlock()
break LOOP
......@@ -202,7 +281,7 @@ LOOP:
}
}
func (z *zookeeperClient) registerEvent(zkPath string, event *chan struct{}) {
func (z *ZookeeperClient) RegisterEvent(zkPath string, event *chan struct{}) {
if zkPath == "" || event == nil {
return
}
......@@ -215,7 +294,7 @@ func (z *zookeeperClient) registerEvent(zkPath string, event *chan struct{}) {
z.Unlock()
}
func (z *zookeeperClient) unregisterEvent(zkPath string, event *chan struct{}) {
func (z *ZookeeperClient) UnregisterEvent(zkPath string, event *chan struct{}) {
if zkPath == "" {
return
}
......@@ -244,11 +323,11 @@ func (z *zookeeperClient) unregisterEvent(zkPath string, event *chan struct{}) {
z.Unlock()
}
func (z *zookeeperClient) done() <-chan struct{} {
func (z *ZookeeperClient) Done() <-chan struct{} {
return z.exit
}
func (z *zookeeperClient) stop() bool {
func (z *ZookeeperClient) stop() bool {
select {
case <-z.exit:
return true
......@@ -259,7 +338,7 @@ func (z *zookeeperClient) stop() bool {
return false
}
func (z *zookeeperClient) zkConnValid() bool {
func (z *ZookeeperClient) ZkConnValid() bool {
select {
case <-z.exit:
return false
......@@ -268,7 +347,7 @@ func (z *zookeeperClient) zkConnValid() bool {
valid := true
z.Lock()
if z.conn == nil {
if z.Conn == nil {
valid = false
}
z.Unlock()
......@@ -276,23 +355,23 @@ func (z *zookeeperClient) zkConnValid() bool {
return valid
}
func (z *zookeeperClient) Close() {
func (z *ZookeeperClient) Close() {
if z == nil {
return
}
z.stop()
z.wait.Wait()
z.Wait.Wait()
z.Lock()
if z.conn != nil {
z.conn.Close()
z.conn = nil
if z.Conn != nil {
z.Conn.Close()
z.Conn = nil
}
z.Unlock()
logger.Warnf("zkClient{name:%s, zk addr:%s} exit now.", z.name, z.zkAddrs)
logger.Warnf("zkClient{name:%s, zk addr:%s} exit now.", z.name, z.ZkAddrs)
}
func (z *zookeeperClient) Create(basePath string) error {
func (z *ZookeeperClient) Create(basePath string) error {
var (
err error
tmpPath string
......@@ -303,8 +382,8 @@ func (z *zookeeperClient) Create(basePath string) error {
tmpPath = path.Join(tmpPath, "/", str)
err = errNilZkClientConn
z.Lock()
if z.conn != nil {
_, err = z.conn.Create(tmpPath, []byte(""), 0, zk.WorldACL(zk.PermAll))
if z.Conn != nil {
_, err = z.Conn.Create(tmpPath, []byte(""), 0, zk.WorldACL(zk.PermAll))
}
z.Unlock()
if err != nil {
......@@ -320,22 +399,22 @@ func (z *zookeeperClient) Create(basePath string) error {
return nil
}
func (z *zookeeperClient) Delete(basePath string) error {
func (z *ZookeeperClient) Delete(basePath string) error {
var (
err error
)
err = errNilZkClientConn
z.Lock()
if z.conn != nil {
err = z.conn.Delete(basePath, -1)
if z.Conn != nil {
err = z.Conn.Delete(basePath, -1)
}
z.Unlock()
return perrors.WithMessagef(err, "Delete(basePath:%s)", basePath)
}
func (z *zookeeperClient) RegisterTemp(basePath string, node string) (string, error) {
func (z *ZookeeperClient) RegisterTemp(basePath string, node string) (string, error) {
var (
err error
data []byte
......@@ -347,8 +426,8 @@ func (z *zookeeperClient) RegisterTemp(basePath string, node string) (string, er
data = []byte("")
zkPath = path.Join(basePath) + "/" + node
z.Lock()
if z.conn != nil {
tmpPath, err = z.conn.Create(zkPath, data, zk.FlagEphemeral, zk.WorldACL(zk.PermAll))
if z.Conn != nil {
tmpPath, err = z.Conn.Create(zkPath, data, zk.FlagEphemeral, zk.WorldACL(zk.PermAll))
}
z.Unlock()
//if err != nil && err != zk.ErrNodeExists {
......@@ -361,7 +440,7 @@ func (z *zookeeperClient) RegisterTemp(basePath string, node string) (string, er
return tmpPath, nil
}
func (z *zookeeperClient) RegisterTempSeq(basePath string, data []byte) (string, error) {
func (z *ZookeeperClient) RegisterTempSeq(basePath string, data []byte) (string, error) {
var (
err error
tmpPath string
......@@ -369,8 +448,8 @@ func (z *zookeeperClient) RegisterTempSeq(basePath string, data []byte) (string,
err = errNilZkClientConn
z.Lock()
if z.conn != nil {
tmpPath, err = z.conn.Create(
if z.Conn != nil {
tmpPath, err = z.Conn.Create(
path.Join(basePath)+"/",
data,
zk.FlagEphemeral|zk.FlagSequence,
......@@ -389,7 +468,7 @@ func (z *zookeeperClient) RegisterTempSeq(basePath string, data []byte) (string,
return tmpPath, nil
}
func (z *zookeeperClient) getChildrenW(path string) ([]string, <-chan zk.Event, error) {
func (z *ZookeeperClient) GetChildrenW(path string) ([]string, <-chan zk.Event, error) {
var (
err error
children []string
......@@ -399,8 +478,8 @@ func (z *zookeeperClient) getChildrenW(path string) ([]string, <-chan zk.Event,
err = errNilZkClientConn
z.Lock()
if z.conn != nil {
children, stat, event, err = z.conn.ChildrenW(path)
if z.Conn != nil {
children, stat, event, err = z.Conn.ChildrenW(path)
}
z.Unlock()
if err != nil {
......@@ -420,7 +499,7 @@ func (z *zookeeperClient) getChildrenW(path string) ([]string, <-chan zk.Event,
return children, event, nil
}
func (z *zookeeperClient) getChildren(path string) ([]string, error) {
func (z *ZookeeperClient) GetChildren(path string) ([]string, error) {
var (
err error
children []string
......@@ -429,8 +508,8 @@ func (z *zookeeperClient) getChildren(path string) ([]string, error) {
err = errNilZkClientConn
z.Lock()
if z.conn != nil {
children, stat, err = z.conn.Children(path)
if z.Conn != nil {
children, stat, err = z.Conn.Children(path)
}
z.Unlock()
if err != nil {
......@@ -450,7 +529,7 @@ func (z *zookeeperClient) getChildren(path string) ([]string, error) {
return children, nil
}
func (z *zookeeperClient) existW(zkPath string) (<-chan zk.Event, error) {
func (z *ZookeeperClient) ExistW(zkPath string) (<-chan zk.Event, error) {
var (
exist bool
err error
......@@ -459,8 +538,8 @@ func (z *zookeeperClient) existW(zkPath string) (<-chan zk.Event, error) {
err = errNilZkClientConn
z.Lock()
if z.conn != nil {
exist, _, event, err = z.conn.ExistsW(zkPath)
if z.Conn != nil {
exist, _, event, err = z.Conn.ExistsW(zkPath)
}
z.Unlock()
if err != nil {
......
......@@ -93,7 +93,8 @@ func verifyEventOrder(t *testing.T, c <-chan zk.Event, expectedEvent []zk.EventT
//}
func Test_newMockZookeeperClient(t *testing.T) {
ts, z, event, _ := newMockZookeeperClient("test", 15*time.Second)
ts, z, event, err := NewMockZookeeperClient("test", 15*time.Second)
assert.NoError(t, err)
defer ts.Stop()
states := []zk.State{zk.StateConnecting, zk.StateConnected, zk.StateHasSession}
verifyEventStateOrder(t, event, states, "event channel")
......@@ -103,7 +104,7 @@ func Test_newMockZookeeperClient(t *testing.T) {
}
func TestCreate(t *testing.T) {
ts, z, event, _ := newMockZookeeperClient("test", 15*time.Second)
ts, z, event, _ := NewMockZookeeperClient("test", 15*time.Second)
defer ts.Stop()
err := z.Create("test1/test2/test3/test4")
assert.NoError(t, err)
......@@ -113,7 +114,7 @@ func TestCreate(t *testing.T) {
}
func TestCreateDelete(t *testing.T) {
ts, z, event, _ := newMockZookeeperClient("test", 15*time.Second)
ts, z, event, _ := NewMockZookeeperClient("test", 15*time.Second)
defer ts.Stop()
states := []zk.State{zk.StateConnecting, zk.StateConnected, zk.StateHasSession}
......@@ -126,7 +127,7 @@ func TestCreateDelete(t *testing.T) {
}
func TestRegisterTemp(t *testing.T) {
ts, z, event, _ := newMockZookeeperClient("test", 15*time.Second)
ts, z, event, _ := NewMockZookeeperClient("test", 15*time.Second)
defer ts.Stop()
err := z.Create("/test1/test2/test3")
assert.NoError(t, err)
......@@ -139,7 +140,7 @@ func TestRegisterTemp(t *testing.T) {
}
func TestRegisterTempSeq(t *testing.T) {
ts, z, event, _ := newMockZookeeperClient("test", 15*time.Second)
ts, z, event, _ := NewMockZookeeperClient("test", 15*time.Second)
defer ts.Stop()
err := z.Create("/test1/test2/test3")
assert.NoError(t, err)
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package zookeeper
import (
"sync"
"time"
)
import (
perrors "github.com/pkg/errors"
)
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/logger"
)
type ZkClientContainer interface {
ZkClient() *ZookeeperClient
SetZkClient(*ZookeeperClient)
ZkClientLock() *sync.Mutex
WaitGroup() *sync.WaitGroup //for wait group control, zk client listener & zk client container
GetDone() chan struct{} //for zk client control
RestartCallBack() bool
common.Node
}
func HandleClientRestart(r ZkClientContainer) {
var (
err error
failTimes int
)
defer r.WaitGroup().Done()
LOOP:
for {
select {
case <-r.GetDone():
logger.Warnf("(ZkProviderRegistry)reconnectZkRegistry goroutine exit now...")
break LOOP
// re-register all services
case <-r.ZkClient().Done():
r.ZkClientLock().Lock()
r.ZkClient().Close()
zkName := r.ZkClient().name
zkAddress := r.ZkClient().ZkAddrs
r.SetZkClient(nil)
r.ZkClientLock().Unlock()
// 接zk,直至成功
failTimes = 0
for {
select {
case <-r.GetDone():
logger.Warnf("(ZkProviderRegistry)reconnectZkRegistry goroutine exit now...")
break LOOP
case <-time.After(time.Duration(1e9 * failTimes * ConnDelay)): // 防止疯狂重连zk
}
err = ValidateZookeeperClient(r, WithZkName(zkName))
logger.Infof("ZkProviderRegistry.validateZookeeperClient(zkAddr{%s}) = error{%#v}",
zkAddress, perrors.WithStack(err))
if err == nil {
if r.RestartCallBack() {
break
}
}
failTimes++
if MaxFailTimes <= failTimes {
failTimes = MaxFailTimes
}
}
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package zookeeper
import (
"path"
"sync"
"time"
)
import (
perrors "github.com/pkg/errors"
"github.com/samuel/go-zookeeper/zk"
)
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/remoting"
)
type ZkEventListener struct {
client *ZookeeperClient
pathMapLock sync.Mutex
pathMap map[string]struct{}
wg sync.WaitGroup
}
func NewZkEventListener(client *ZookeeperClient) *ZkEventListener {
return &ZkEventListener{
client: client,
pathMap: make(map[string]struct{}),
}
}
func (l *ZkEventListener) SetClient(client *ZookeeperClient) {
l.client = client
}
func (l *ZkEventListener) listenServiceNodeEvent(zkPath string) bool {
l.wg.Add(1)
defer l.wg.Done()
var zkEvent zk.Event
for {
keyEventCh, err := l.client.ExistW(zkPath)
if err != nil {
logger.Errorf("existW{key:%s} = error{%v}", zkPath, err)
return false
}
select {
case zkEvent = <-keyEventCh:
logger.Warnf("get a zookeeper zkEvent{type:%s, server:%s, path:%s, state:%d-%s, err:%s}",
zkEvent.Type.String(), zkEvent.Server, zkEvent.Path, zkEvent.State, StateToString(zkEvent.State), zkEvent.Err)
switch zkEvent.Type {
case zk.EventNodeDataChanged:
logger.Warnf("zk.ExistW(key{%s}) = event{EventNodeDataChanged}", zkPath)
case zk.EventNodeCreated:
logger.Warnf("zk.ExistW(key{%s}) = event{EventNodeCreated}", zkPath)
case zk.EventNotWatching:
logger.Warnf("zk.ExistW(key{%s}) = event{EventNotWatching}", zkPath)
case zk.EventNodeDeleted:
logger.Warnf("zk.ExistW(key{%s}) = event{EventNodeDeleted}", zkPath)
return true
}
case <-l.client.Done():
return false
}
}
return false
}
func (l *ZkEventListener) handleZkNodeEvent(zkPath string, children []string, listener remoting.DataListener) {
contains := func(s []string, e string) bool {
for _, a := range s {
if a == e {
return true
}
}
return false
}
newChildren, err := l.client.GetChildren(zkPath)
if err != nil {
logger.Errorf("path{%s} child nodes changed, zk.Children() = error{%v}", zkPath, perrors.WithStack(err))
return
}
// a node was added -- listen the new node
var (
newNode string
)
for _, n := range newChildren {
if contains(children, n) {
continue
}
newNode = path.Join(zkPath, n)
logger.Infof("add zkNode{%s}", newNode)
if !listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.Add, Content: n}) {
continue
}
// listen l service node
go func(node string) {
logger.Infof("delete zkNode{%s}", node)
if l.listenServiceNodeEvent(node) {
logger.Infof("delete content{%s}", n)
listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.Del, Content: n})
}
logger.Warnf("listenSelf(zk path{%s}) goroutine exit now", zkPath)
}(newNode)
}
// old node was deleted
var oldNode string
for _, n := range children {
if contains(newChildren, n) {
continue
}
oldNode = path.Join(zkPath, n)
logger.Warnf("delete zkPath{%s}", oldNode)
if !listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.Add, Content: n}) {
continue
}
logger.Warnf("delete content{%s}", n)
if err != nil {
logger.Errorf("NewURL(i{%s}) = error{%v}", n, perrors.WithStack(err))
continue
}
listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.Del, Content: n})
}
}
func (l *ZkEventListener) listenDirEvent(zkPath string, listener remoting.DataListener) {
l.wg.Add(1)
defer l.wg.Done()
var (
failTimes int
event chan struct{}
zkEvent zk.Event
)
event = make(chan struct{}, 4)
defer close(event)
for {
// get current children for a zkPath
children, childEventCh, err := l.client.GetChildrenW(zkPath)
if err != nil {
failTimes++
if MaxFailTimes <= failTimes {
failTimes = MaxFailTimes
}
logger.Errorf("listenDirEvent(path{%s}) = error{%v}", zkPath, err)
// clear the event channel
CLEAR:
for {
select {
case <-event:
default:
break CLEAR
}
}
l.client.RegisterEvent(zkPath, &event)
select {
case <-time.After(timeSecondDuration(failTimes * ConnDelay)):
l.client.UnregisterEvent(zkPath, &event)
continue
case <-l.client.Done():
l.client.UnregisterEvent(zkPath, &event)
logger.Warnf("client.done(), listen(path{%s}) goroutine exit now...", zkPath)
return
case <-event:
logger.Infof("get zk.EventNodeDataChange notify event")
l.client.UnregisterEvent(zkPath, &event)
l.handleZkNodeEvent(zkPath, nil, listener)
continue
}
}
failTimes = 0
select {
case zkEvent = <-childEventCh:
logger.Warnf("get a zookeeper zkEvent{type:%s, server:%s, path:%s, state:%d-%s, err:%s}",
zkEvent.Type.String(), zkEvent.Server, zkEvent.Path, zkEvent.State, StateToString(zkEvent.State), zkEvent.Err)
if zkEvent.Type != zk.EventNodeChildrenChanged {
continue
}
l.handleZkNodeEvent(zkEvent.Path, children, listener)
case <-l.client.Done():
logger.Warnf("client.done(), listen(path{%s}) goroutine exit now...", zkPath)
return
}
}
}
func timeSecondDuration(sec int) time.Duration {
return time.Duration(sec) * time.Second
}
// this func is invoked by ZkConsumerRegistry::Registe/ZkConsumerRegistry::get/ZkConsumerRegistry::getListener
// registry.go:Listen -> listenServiceEvent -> listenDirEvent -> listenServiceNodeEvent
// |
// --------> listenServiceNodeEvent
func (l *ZkEventListener) ListenServiceEvent(zkPath string, listener remoting.DataListener) {
var (
err error
dubboPath string
children []string
serviceURL common.URL
)
l.pathMapLock.Lock()
_, ok := l.pathMap[zkPath]
l.pathMapLock.Unlock()
if ok {
logger.Warnf("@zkPath %s has already been listened.", zkPath)
return
}
l.pathMapLock.Lock()
l.pathMap[zkPath] = struct{}{}
l.pathMapLock.Unlock()
logger.Infof("listen dubbo provider path{%s} event and wait to get all provider zk nodes", zkPath)
children, err = l.client.GetChildren(zkPath)
if err != nil {
children = nil
logger.Errorf("fail to get children of zk path{%s}", zkPath)
}
for _, c := range children {
if !listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.Add, Content: c}) {
continue
}
// listen l service node
dubboPath = path.Join(zkPath, c)
logger.Infof("listen dubbo service key{%s}", dubboPath)
go func(zkPath string, serviceURL common.URL) {
if l.listenServiceNodeEvent(dubboPath) {
logger.Debugf("delete serviceUrl{%s}", serviceURL)
listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.Del, Content: c})
}
logger.Warnf("listenSelf(zk path{%s}) goroutine exit now", zkPath)
}(dubboPath, serviceURL)
}
logger.Infof("listen dubbo path{%s}", zkPath)
go func(zkPath string, listener remoting.DataListener) {
l.listenDirEvent(zkPath, listener)
logger.Warnf("listenDirEvent(zkPath{%s}) goroutine exit now", zkPath)
}(zkPath, listener)
}
func (l *ZkEventListener) valid() bool {
return l.client.ZkConnValid()
}
func (l *ZkEventListener) Close() {
l.wg.Wait()
}
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment