Skip to content
Snippets Groups Projects
Commit ec67719b authored by vito.he's avatar vito.he
Browse files

Ref: refactor code in registry.zookeeper, create remoting/zookeeper to code...

Ref: refactor code in registry.zookeeper, create remoting/zookeeper to code reuse in config_center for zk
parent 8b3fdae1
No related branches found
No related tags found
No related merge requests found
Showing
with 757 additions and 388 deletions
package common
import "fmt"
type ConfigurationListener interface {
Process(*ConfigChangeEvent)
}
type ConfigChangeEvent struct {
Key string
Value interface{}
ConfigType EventType
}
func (c ConfigChangeEvent) String() string {
return fmt.Sprintf("ConfigChangeEvent{key = %v , value = %v , changeType = %v}", c.Key, c.Value, c.ConfigType)
}
//////////////////////////////////////////
// event type
//////////////////////////////////////////
type EventType int
const (
Add = iota
Del
)
var serviceEventTypeStrings = [...]string{
"add",
"delete",
}
func (t EventType) String() string {
return serviceEventTypeStrings[t]
}
//////////////////////////////////////////
// service event
//////////////////////////////////////////
type Event struct {
Path string
Action EventType
Content string
}
func (e Event) String() string {
return fmt.Sprintf("Event{Action{%s}, Content{%s}}", e.Action, e.Content)
}
......@@ -37,3 +37,4 @@ const (
DEFAULT_REFERENCE_FILTERS = ""
ECHO = "$echo"
)
......@@ -67,3 +67,8 @@ const (
OWNER_KEY = "owner"
ENVIRONMENT_KEY = "environment"
)
const(
CONFIG_NAMESPACE_KEY = "config.namespace"
CONFIG_TIMEOUT_KET = "config.timeout"
)
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package extension
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/config_center"
)
var (
configCenters = make(map[string]func(config *common.URL) (config_center.DynamicConfiguration, error))
)
func SetConfigCenter(name string, v func(config *common.URL) (config_center.DynamicConfiguration, error)) {
configCenters[name] = v
}
func GetConfigCenter(name string, config *common.URL) (config_center.DynamicConfiguration, error) {
if configCenters[name] == nil {
panic("config center for " + name + " is not existing, make sure you have import the package.")
}
return configCenters[name](config)
}
......@@ -246,6 +246,7 @@ func (c URL) Key() string {
return buildString
}
func (c URL) Context() context.Context {
return c.ctx
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package config_center
import (
"github.com/apache/dubbo-go/common"
"time"
)
//////////////////////////////////////////
// DynamicConfiguration
//////////////////////////////////////////
const DEFAULT_GROUP = "dubbo"
const DEFAULT_CONFIG_TIMEOUT = "10s"
type DynamicConfiguration interface {
AddListener(string, common.ConfigurationListener, ...Option)
RemoveListener(string, common.ConfigurationListener, ...Option)
GetConfig(string, ...Option) string
GetConfigs(string, ...Option) string
}
type Options struct {
Group string
Timeout time.Duration
}
type Option func(*Options)
func WithGroup(group string) Option {
return func(opt *Options) {
opt.Group = group
}
}
func WithTimeout(time time.Duration) Option {
return func(opt *Options) {
opt.Timeout = time
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package zookeeper
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/config_center"
"github.com/apache/dubbo-go/remoting/zookeeper"
"sync"
)
const ZkClient = "zk config_center"
type ZookeeperDynamicConfiguration struct {
url common.URL
rootPath string
wg sync.WaitGroup
cltLock sync.Mutex
done chan struct{}
client *zookeeper.ZookeeperClient
listenerLock sync.Mutex
listener *zookeeper.ZkEventListener
}
func NewZookeeperDynamicConfiguration(url common.URL) (config_center.DynamicConfiguration, error) {
c := &ZookeeperDynamicConfiguration{
url: url,
rootPath: "/" + url.GetParam(constant.CONFIG_NAMESPACE_KEY, config_center.DEFAULT_GROUP) + "/config",
}
err := zookeeper.ValidateZookeeperClient(c, zookeeper.WithZkName(ZkClient))
if err != nil {
return nil, err
}
c.wg.Add(1)
go zookeeper.HandleClientRestart(c)
c.listener = zookeeper.NewZkEventListener(c.client)
//c.configListener = NewRegistryConfigurationListener(c.client, c)
//c.dataListener = NewRegistryDataListener(c.configListener)
return c, nil
}
func (*ZookeeperDynamicConfiguration) AddListener(key string, listener common.ConfigurationListener, opions ...config_center.Option) {
}
func (*ZookeeperDynamicConfiguration) RemoveListener(key string, listener common.ConfigurationListener, opions ...config_center.Option) {
}
func (*ZookeeperDynamicConfiguration) GetConfig(key string, opions ...config_center.Option) string {
return ""
}
func (*ZookeeperDynamicConfiguration) GetConfigs(key string, opions ...config_center.Option) string {
return ""
}
func (r *ZookeeperDynamicConfiguration) ZkClient() *zookeeper.ZookeeperClient {
return r.client
}
func (r *ZookeeperDynamicConfiguration) SetZkClient(client *zookeeper.ZookeeperClient) {
r.client = client
}
func (r *ZookeeperDynamicConfiguration) ZkClientLock() *sync.Mutex {
return &r.cltLock
}
func (r *ZookeeperDynamicConfiguration) WaitGroup() *sync.WaitGroup {
return &r.wg
}
func (r *ZookeeperDynamicConfiguration) GetDone() chan struct{} {
return r.done
}
func (r *ZookeeperDynamicConfiguration) GetUrl() common.URL {
return r.url
}
func (r *ZookeeperDynamicConfiguration) Destroy() {
if r.listener != nil {
r.listener.Close()
}
close(r.done)
r.wg.Wait()
r.closeConfigs()
}
func (r *ZookeeperDynamicConfiguration) IsAvailable() bool {
select {
case <-r.done:
return false
default:
return true
}
}
func (r *ZookeeperDynamicConfiguration) closeConfigs() {
r.cltLock.Lock()
defer r.cltLock.Unlock()
logger.Infof("begin to close provider zk client")
// 先关闭旧client,以关闭tmp node
r.client.Close()
r.client = nil
}
func (r *ZookeeperDynamicConfiguration) RestartCallBack() bool {
return true
}
......@@ -130,10 +130,10 @@ func (dir *registryDirectory) update(res *registry.ServiceEvent) {
func (dir *registryDirectory) refreshInvokers(res *registry.ServiceEvent) {
switch res.Action {
case registry.ServiceAdd:
case common.Add:
//dir.cacheService.Add(res.Path, dir.serviceTTL)
dir.cacheInvoker(res.Service)
case registry.ServiceDel:
case common.Del:
//dir.cacheService.Del(res.Path, dir.serviceTTL)
dir.uncacheInvoker(res.Service)
logger.Infof("selector delete service url{%s}", res.Service)
......
......@@ -50,7 +50,7 @@ func TestSubscribe_Delete(t *testing.T) {
registryDirectory, mockRegistry := normalRegistryDir()
time.Sleep(1e9)
assert.Len(t, registryDirectory.cacheInvokers, 3)
mockRegistry.MockEvent(&registry.ServiceEvent{Action: registry.ServiceDel, Service: *common.NewURLWithOptions("TEST0", common.WithProtocol("dubbo"))})
mockRegistry.MockEvent(&registry.ServiceEvent{Action: common.Del, Service: *common.NewURLWithOptions("TEST0", common.WithProtocol("dubbo"))})
time.Sleep(1e9)
assert.Len(t, registryDirectory.cacheInvokers, 2)
}
......@@ -80,7 +80,7 @@ func TestSubscribe_Group(t *testing.T) {
urlmap.Set(constant.GROUP_KEY, "group1")
urlmap.Set(constant.CLUSTER_KEY, "failover") //to test merge url
for i := 0; i < 3; i++ {
mockRegistry.(*registry.MockRegistry).MockEvent(&registry.ServiceEvent{Action: registry.ServiceAdd, Service: *common.NewURLWithOptions("TEST"+strconv.FormatInt(int64(i), 10), common.WithProtocol("dubbo"),
mockRegistry.(*registry.MockRegistry).MockEvent(&registry.ServiceEvent{Action: common.Add, Service: *common.NewURLWithOptions("TEST"+strconv.FormatInt(int64(i), 10), common.WithProtocol("dubbo"),
common.WithParams(urlmap))})
}
//for group2
......@@ -88,7 +88,7 @@ func TestSubscribe_Group(t *testing.T) {
urlmap2.Set(constant.GROUP_KEY, "group2")
urlmap2.Set(constant.CLUSTER_KEY, "failover") //to test merge url
for i := 0; i < 3; i++ {
mockRegistry.(*registry.MockRegistry).MockEvent(&registry.ServiceEvent{Action: registry.ServiceAdd, Service: *common.NewURLWithOptions("TEST"+strconv.FormatInt(int64(i), 10), common.WithProtocol("dubbo"),
mockRegistry.(*registry.MockRegistry).MockEvent(&registry.ServiceEvent{Action: common.Add, Service: *common.NewURLWithOptions("TEST"+strconv.FormatInt(int64(i), 10), common.WithProtocol("dubbo"),
common.WithParams(urlmap2))})
}
......@@ -128,7 +128,7 @@ func normalRegistryDir() (*registryDirectory, *registry.MockRegistry) {
go registryDirectory.Subscribe(*common.NewURLWithOptions("testservice"))
for i := 0; i < 3; i++ {
mockRegistry.(*registry.MockRegistry).MockEvent(&registry.ServiceEvent{Action: registry.ServiceAdd, Service: *common.NewURLWithOptions("TEST"+strconv.FormatInt(int64(i), 10), common.WithProtocol("dubbo"))})
mockRegistry.(*registry.MockRegistry).MockEvent(&registry.ServiceEvent{Action: common.Add, Service: *common.NewURLWithOptions("TEST"+strconv.FormatInt(int64(i), 10), common.WithProtocol("dubbo"))})
}
return registryDirectory, mockRegistry.(*registry.MockRegistry)
}
......@@ -31,32 +31,12 @@ func init() {
rand.Seed(time.Now().UnixNano())
}
//////////////////////////////////////////
// service url event type
//////////////////////////////////////////
type ServiceEventType int
const (
ServiceAdd = iota
ServiceDel
)
var serviceEventTypeStrings = [...]string{
"add service",
"delete service",
}
func (t ServiceEventType) String() string {
return serviceEventTypeStrings[t]
}
//////////////////////////////////////////
// service event
//////////////////////////////////////////
type ServiceEvent struct {
Action ServiceEventType
Action common.EventType
Service common.URL
}
......
package zookeeper
import (
"context"
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/registry"
zk "github.com/apache/dubbo-go/remoting/zookeeper"
perrors "github.com/pkg/errors"
)
type RegistryDataListener struct {
interestedURL []*common.URL
listener *RegistryConfigurationListener
}
func NewRegistryDataListener(listener *RegistryConfigurationListener) *RegistryDataListener {
return &RegistryDataListener{listener: listener, interestedURL: []*common.URL{}}
}
func (l *RegistryDataListener) AddInterestedURL(url *common.URL) {
l.interestedURL = append(l.interestedURL, url)
}
func (l *RegistryDataListener) DataChange(eventType zk.ZkEvent) bool {
serviceURL, err := common.NewURL(context.TODO(), eventType.Res.Content)
if err != nil {
logger.Errorf("Listen NewURL(r{%s}) = error{%v}", eventType.Res.Content, err)
return false
}
for _, v := range l.interestedURL {
if serviceURL.URLEqual(*v) {
l.listener.Process(&common.ConfigChangeEvent{Value: serviceURL, ConfigType: eventType.Res.Action})
return true
}
}
return false
}
type RegistryConfigurationListener struct {
client *zk.ZookeeperClient
registry *zkRegistry
events chan *common.ConfigChangeEvent
}
func NewRegistryConfigurationListener(client *zk.ZookeeperClient, reg *zkRegistry) *RegistryConfigurationListener {
reg.wg.Add(1)
return &RegistryConfigurationListener{client: client, registry: reg, events: make(chan *common.ConfigChangeEvent, 32)}
}
func (l *RegistryConfigurationListener) Process(configType *common.ConfigChangeEvent) {
l.events <- configType
}
func (l *RegistryConfigurationListener) Next() (*registry.ServiceEvent, error) {
for {
select {
case <-l.client.Done():
logger.Warnf("listener's zk client connection is broken, so zk event listener exit now.")
return nil, perrors.New("listener stopped")
case <-l.registry.done:
logger.Warnf("zk consumer register has quit, so zk event listener exit asap now.")
return nil, perrors.New("listener stopped")
case e := <-l.events:
logger.Debugf("got zk event %s", e)
if e.ConfigType == common.Del && !l.valid() {
logger.Warnf("update @result{%s}. But its connection to registry is invalid", e.Value)
continue
}
//r.update(e.res)
//write to invoker
//r.outerEventCh <- e.res
return &registry.ServiceEvent{Action: e.ConfigType, Service: e.Value.(common.URL)}, nil
}
}
}
func (l *RegistryConfigurationListener) Close() {
l.registry.wg.Done()
}
func (l *RegistryConfigurationListener) valid() bool {
return l.client.ZkConnValid()
}
......@@ -40,13 +40,13 @@ import (
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/common/utils"
"github.com/apache/dubbo-go/registry"
"github.com/apache/dubbo-go/remoting/zookeeper"
"github.com/apache/dubbo-go/version"
)
const (
defaultTimeout = int64(10e9)
RegistryZkClient = "zk registry"
RegistryConnDelay = 3
defaultTimeout = int64(10e9)
RegistryZkClient = "zk registry"
)
var (
......@@ -73,14 +73,16 @@ type zkRegistry struct {
done chan struct{}
cltLock sync.Mutex
client *zookeeperClient
client *zookeeper.ZookeeperClient
services map[string]common.URL // service name + protocol -> service config
listenerLock sync.Mutex
listener *zkEventListener
listenerLock sync.Mutex
listener *zookeeper.ZkEventListener
dataListener *RegistryDataListener
configListener *RegistryConfigurationListener
//for provider
zkPath map[string]int // key = protocol://ip:port/interface
}
func newZkRegistry(url *common.URL) (registry.Registry, error) {
......@@ -97,30 +99,28 @@ func newZkRegistry(url *common.URL) (registry.Registry, error) {
zkPath: make(map[string]int),
}
//if r.SubURL.Name == "" {
// r.SubURL.Name = RegistryZkClient
//}
//if r.Version == "" {
// r.Version = version.Version
//}
err = r.validateZookeeperClient()
err = zookeeper.ValidateZookeeperClient(r, zookeeper.WithZkName(RegistryZkClient))
if err != nil {
return nil, err
}
r.wg.Add(1)
go r.handleZkRestart()
go zookeeper.HandleClientRestart(r)
//if r.RoleType == registry.CONSUMER {
// r.wg.Add(1)
// go r.listen()
//}
r.listener = zookeeper.NewZkEventListener(r.client)
r.configListener = NewRegistryConfigurationListener(r.client, r)
r.dataListener = NewRegistryDataListener(r.configListener)
return r, nil
}
func newMockZkRegistry(url *common.URL) (*zk.TestCluster, *zkRegistry, error) {
type Options struct {
client *zookeeper.ZookeeperClient
}
type Option func(*Options)
func newMockZkRegistry(url *common.URL, opts ...zookeeper.Option) (*zk.TestCluster, *zkRegistry, error) {
var (
err error
r *zkRegistry
......@@ -136,139 +136,78 @@ func newMockZkRegistry(url *common.URL) (*zk.TestCluster, *zkRegistry, error) {
zkPath: make(map[string]int),
}
c, r.client, _, err = newMockZookeeperClient("test", 15*time.Second)
c, r.client, _, err = zookeeper.NewMockZookeeperClient("test", 15*time.Second, opts...)
if err != nil {
return nil, nil, err
}
r.wg.Add(1)
go r.handleZkRestart()
go zookeeper.HandleClientRestart(r)
//if r.RoleType == registry.CONSUMER {
// r.wg.Add(1)
// go r.listen()
//}
r.listener = zookeeper.NewZkEventListener(r.client)
r.configListener = NewRegistryConfigurationListener(r.client, r)
r.dataListener = NewRegistryDataListener(r.configListener)
return c, r, nil
}
func (r *zkRegistry) ZkClient() *zookeeper.ZookeeperClient {
return r.client
}
func (r *zkRegistry) SetZkClient(client *zookeeper.ZookeeperClient) {
r.client = client
}
func (r *zkRegistry) ZkClientLock() *sync.Mutex {
return &r.cltLock
}
func (r *zkRegistry) WaitGroup() *sync.WaitGroup {
return &r.wg
}
func (r *zkRegistry) GetDone() chan struct{} {
return r.done
}
func (r *zkRegistry) GetUrl() common.URL {
return *r.URL
}
func (r *zkRegistry) Destroy() {
if r.listener != nil {
r.listener.Close()
if r.configListener != nil {
r.configListener.Close()
}
close(r.done)
r.wg.Wait()
r.closeRegisters()
}
func (r *zkRegistry) validateZookeeperClient() error {
var (
err error
)
func (r *zkRegistry) RestartCallBack() bool {
err = nil
r.cltLock.Lock()
defer r.cltLock.Unlock()
if r.client == nil {
//in dubbp ,every registry only connect one node ,so this is []string{r.Address}
timeout, err := time.ParseDuration(r.GetParam(constant.REGISTRY_TIMEOUT_KEY, constant.DEFAULT_REG_TIMEOUT))
if err != nil {
logger.Errorf("timeout config %v is invalid ,err is %v",
r.GetParam(constant.REGISTRY_TIMEOUT_KEY, constant.DEFAULT_REG_TIMEOUT), err.Error())
return perrors.WithMessagef(err, "newZookeeperClient(address:%+v)", r.Location)
}
r.client, err = newZookeeperClient(RegistryZkClient, []string{r.Location}, timeout)
if err != nil {
logger.Warnf("newZookeeperClient(name{%s}, zk addresss{%v}, timeout{%d}) = error{%v}",
RegistryZkClient, r.Location, timeout.String(), err)
return perrors.WithMessagef(err, "newZookeeperClient(address:%+v)", r.Location)
}
}
if r.client.conn == nil {
var event <-chan zk.Event
r.client.conn, event, err = zk.Connect(r.client.zkAddrs, r.client.timeout)
if err == nil {
r.client.wait.Add(1)
go r.client.handleZkEvent(event)
}
// copy r.services
services := []common.URL{}
for _, confIf := range r.services {
services = append(services, confIf)
}
return perrors.WithMessagef(err, "newZookeeperClient(address:%+v)", r.PrimitiveURL)
}
func (r *zkRegistry) handleZkRestart() {
var (
err error
flag bool
failTimes int
confIf common.URL
)
defer r.wg.Done()
LOOP:
for {
select {
case <-r.done:
logger.Warnf("(ZkProviderRegistry)reconnectZkRegistry goroutine exit now...")
break LOOP
// re-register all services
case <-r.client.done():
r.cltLock.Lock()
r.client.Close()
r.client = nil
r.cltLock.Unlock()
// 接zk,直至成功
failTimes = 0
for {
select {
case <-r.done:
logger.Warnf("(ZkProviderRegistry)reconnectZkRegistry goroutine exit now...")
break LOOP
case <-time.After(time.Duration(1e9 * failTimes * RegistryConnDelay)): // 防止疯狂重连zk
}
err = r.validateZookeeperClient()
logger.Infof("ZkProviderRegistry.validateZookeeperClient(zkAddr{%s}) = error{%#v}",
r.client.zkAddrs, perrors.WithStack(err))
if err == nil {
// copy r.services
services := []common.URL{}
for _, confIf = range r.services {
services = append(services, confIf)
}
flag = true
for _, confIf = range services {
err = r.register(confIf)
if err != nil {
logger.Errorf("(ZkProviderRegistry)register(conf{%#v}) = error{%#v}",
confIf, perrors.WithStack(err))
flag = false
break
}
logger.Infof("success to re-register service :%v", confIf.Key())
}
if flag {
break
}
}
failTimes++
if MaxFailTimes <= failTimes {
failTimes = MaxFailTimes
}
}
flag := true
for _, confIf := range services {
err := r.register(confIf)
if err != nil {
logger.Errorf("(ZkProviderRegistry)register(conf{%#v}) = error{%#v}",
confIf, perrors.WithStack(err))
flag = false
break
}
logger.Infof("success to re-register service :%v", confIf.Key())
}
return flag
}
func (r *zkRegistry) Register(conf common.URL) error {
var (
ok bool
err error
listener *zkEventListener
ok bool
err error
)
role, _ := strconv.Atoi(r.URL.GetParam(constant.ROLE_KEY, ""))
switch role {
......@@ -291,12 +230,6 @@ func (r *zkRegistry) Register(conf common.URL) error {
r.cltLock.Unlock()
logger.Debugf("(consumerZkConsumerRegistry)Register(conf{%#v})", conf)
r.listenerLock.Lock()
listener = r.listener
r.listenerLock.Unlock()
if listener != nil {
go listener.listenServiceEvent(conf)
}
case common.PROVIDER:
// 检验服务是否已经注册过
......@@ -337,7 +270,7 @@ func (r *zkRegistry) register(c common.URL) error {
//conf config.URL
)
err = r.validateZookeeperClient()
err = zookeeper.ValidateZookeeperClient(r, zookeeper.WithZkName(RegistryZkClient))
if err != nil {
return perrors.WithStack(err)
}
......@@ -428,6 +361,7 @@ func (r *zkRegistry) register(c common.URL) error {
dubboPath = fmt.Sprintf("/dubbo%s/%s", c.Path, (common.RoleType(common.CONSUMER)).String())
logger.Debugf("consumer path:%s, url:%s", dubboPath, rawURL)
default:
return perrors.Errorf("@c{%v} type is not referencer or provider", c)
}
......@@ -464,44 +398,37 @@ func (r *zkRegistry) registerTempZookeeperNode(root string, node string) error {
}
func (r *zkRegistry) Subscribe(conf common.URL) (registry.Listener, error) {
r.wg.Add(1)
return r.getListener(conf)
}
func (r *zkRegistry) getListener(conf common.URL) (*zkEventListener, error) {
func (r *zkRegistry) getListener(conf common.URL) (*RegistryConfigurationListener, error) {
var (
zkListener *zkEventListener
zkListener *RegistryConfigurationListener
)
r.listenerLock.Lock()
zkListener = r.listener
zkListener = r.configListener
r.listenerLock.Unlock()
if zkListener != nil {
return zkListener, nil
}
if r.listener == nil {
r.cltLock.Lock()
client := r.client
r.cltLock.Unlock()
if client == nil {
return nil, perrors.New("zk connection broken")
}
r.cltLock.Lock()
client := r.client
r.cltLock.Unlock()
if client == nil {
return nil, perrors.New("zk connection broken")
}
// new client & listener
listener := zookeeper.NewZkEventListener(r.client)
// new client & listener
zkListener = newZkEventListener(r, client)
r.listenerLock.Lock()
r.listener = listener
r.listenerLock.Unlock()
}
r.listenerLock.Lock()
r.listener = zkListener
r.listenerLock.Unlock()
//注册到dataconfig的interested
r.dataListener.AddInterestedURL(&conf)
// listen
r.cltLock.Lock()
for _, svs := range r.services {
if svs.URLEqual(conf) {
go zkListener.listenServiceEvent(svs)
}
}
r.cltLock.Unlock()
go r.listener.ListenServiceEvent(fmt.Sprintf("/dubbo%s/providers", conf.Path), r.dataListener)
return zkListener, nil
}
......
......@@ -19,6 +19,7 @@ package zookeeper
import (
"context"
"github.com/apache/dubbo-go/remoting/zookeeper"
"strconv"
"testing"
"time"
......@@ -40,7 +41,7 @@ func Test_Register(t *testing.T) {
ts, reg, err := newMockZkRegistry(&regurl)
defer ts.Stop()
err = reg.Register(url)
children, _ := reg.client.getChildren("/dubbo/com.ikurento.user.UserProvider/providers")
children, _ := reg.client.GetChildren("/dubbo/com.ikurento.user.UserProvider/providers")
assert.Regexp(t, ".*dubbo%3A%2F%2F127.0.0.1%3A20000%2Fcom.ikurento.user.UserProvider%3Fanyhost%3Dtrue%26category%3Dproviders%26cluster%3Dmock%26dubbo%3Ddubbo-provider-golang-2.6.0%26.*provider", children)
assert.NoError(t, err)
}
......@@ -49,7 +50,6 @@ func Test_Subscribe(t *testing.T) {
regurl, _ := common.NewURL(context.TODO(), "registry://127.0.0.1:1111", common.WithParamsValue(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER)))
url, _ := common.NewURL(context.TODO(), "dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider", common.WithParamsValue(constant.CLUSTER_KEY, "mock"), common.WithMethods([]string{"GetUser", "AddUser"}))
ts, reg, err := newMockZkRegistry(&regurl)
defer ts.Stop()
//provider register
err = reg.Register(url)
......@@ -61,8 +61,8 @@ func Test_Subscribe(t *testing.T) {
//consumer register
regurl.Params.Set(constant.ROLE_KEY, strconv.Itoa(common.CONSUMER))
_, reg2, err := newMockZkRegistry(&regurl)
reg2.client = reg.client
_, reg2, err := newMockZkRegistry(&regurl, zookeeper.WithTestCluster(ts))
err = reg2.Register(url)
listener, err := reg2.Subscribe(url)
......@@ -71,8 +71,8 @@ func Test_Subscribe(t *testing.T) {
if err != nil {
return
}
assert.Regexp(t, ".*ServiceEvent{Action{add service}.*", serviceEvent.String())
assert.Regexp(t, ".*ServiceEvent{Action{add}.*", serviceEvent.String())
defer ts.Stop()
}
func Test_ConsumerDestory(t *testing.T) {
......
......@@ -18,6 +18,7 @@
package zookeeper
import (
"github.com/apache/dubbo-go/common/constant"
"path"
"strings"
"sync"
......@@ -30,22 +31,27 @@ import (
"github.com/samuel/go-zookeeper/zk"
)
const (
ConnDelay = 3
MaxFailTimes = 15
)
var (
errNilZkClientConn = perrors.New("zookeeperclient{conn} is nil")
)
type zookeeperClient struct {
type ZookeeperClient struct {
name string
zkAddrs []string
ZkAddrs []string
sync.Mutex // for conn
conn *zk.Conn
timeout time.Duration
Conn *zk.Conn
Timeout time.Duration
exit chan struct{}
wait sync.WaitGroup
Wait sync.WaitGroup
eventRegistry map[string][]*chan struct{}
}
func stateToString(state zk.State) string {
func StateToString(state zk.State) string {
switch state {
case zk.StateDisconnected:
return "zookeeper disconnected"
......@@ -76,55 +82,128 @@ func stateToString(state zk.State) string {
return "zookeeper unknown state"
}
func timeSecondDuration(sec int) time.Duration {
return time.Duration(sec) * time.Second
type Options struct {
zkName string
client *ZookeeperClient
ts *zk.TestCluster
}
type Option func(*Options)
func WithZkName(name string) Option {
return func(opt *Options) {
opt.zkName = name
}
}
func ValidateZookeeperClient(container ZkClientContainer, opts ...Option) error {
var (
err error
)
opions := &Options{}
for _, opt := range opts {
opt(opions)
}
err = nil
lock := container.ZkClientLock()
url := container.GetUrl()
lock.Lock()
defer lock.Unlock()
if container.ZkClient() == nil {
//in dubbp ,every registry only connect one node ,so this is []string{r.Address}
timeout, err := time.ParseDuration(url.GetParam(constant.REGISTRY_TIMEOUT_KEY, constant.DEFAULT_REG_TIMEOUT))
if err != nil {
logger.Errorf("timeout config %v is invalid ,err is %v",
url.GetParam(constant.REGISTRY_TIMEOUT_KEY, constant.DEFAULT_REG_TIMEOUT), err.Error())
return perrors.WithMessagef(err, "newZookeeperClient(address:%+v)", url.Location)
}
newClient, err := newZookeeperClient(opions.zkName, []string{url.Location}, timeout)
if err != nil {
logger.Warnf("newZookeeperClient(name{%s}, zk addresss{%v}, timeout{%d}) = error{%v}",
opions.zkName, url.Location, timeout.String(), err)
return perrors.WithMessagef(err, "newZookeeperClient(address:%+v)", url.Location)
}
container.SetZkClient(newClient)
}
if container.ZkClient().Conn == nil {
var event <-chan zk.Event
container.ZkClient().Conn, event, err = zk.Connect(container.ZkClient().ZkAddrs, container.ZkClient().Timeout)
if err == nil {
container.ZkClient().Wait.Add(1)
go container.ZkClient().HandleZkEvent(event)
}
}
return perrors.WithMessagef(err, "newZookeeperClient(address:%+v)", url.PrimitiveURL)
}
func newZookeeperClient(name string, zkAddrs []string, timeout time.Duration) (*zookeeperClient, error) {
func newZookeeperClient(name string, zkAddrs []string, timeout time.Duration) (*ZookeeperClient, error) {
var (
err error
event <-chan zk.Event
z *zookeeperClient
z *ZookeeperClient
)
z = &zookeeperClient{
z = &ZookeeperClient{
name: name,
zkAddrs: zkAddrs,
timeout: timeout,
ZkAddrs: zkAddrs,
Timeout: timeout,
exit: make(chan struct{}),
eventRegistry: make(map[string][]*chan struct{}),
}
// connect to zookeeper
z.conn, event, err = zk.Connect(zkAddrs, timeout)
z.Conn, event, err = zk.Connect(zkAddrs, timeout)
if err != nil {
return nil, perrors.WithMessagef(err, "zk.Connect(zkAddrs:%+v)", zkAddrs)
}
z.wait.Add(1)
go z.handleZkEvent(event)
z.Wait.Add(1)
go z.HandleZkEvent(event)
return z, nil
}
func newMockZookeeperClient(name string, timeout time.Duration) (*zk.TestCluster, *zookeeperClient, <-chan zk.Event, error) {
func WithTestCluster(ts *zk.TestCluster) Option {
return func(opt *Options) {
opt.ts = ts
}
}
func NewMockZookeeperClient(name string, timeout time.Duration, opts ...Option) (*zk.TestCluster, *ZookeeperClient, <-chan zk.Event, error) {
var (
err error
event <-chan zk.Event
z *zookeeperClient
z *ZookeeperClient
ts *zk.TestCluster
)
z = &zookeeperClient{
z = &ZookeeperClient{
name: name,
zkAddrs: []string{},
timeout: timeout,
ZkAddrs: []string{},
Timeout: timeout,
exit: make(chan struct{}),
eventRegistry: make(map[string][]*chan struct{}),
}
// connect to zookeeper
ts, err := zk.StartTestCluster(1, nil, nil)
if err != nil {
return nil, nil, nil, perrors.WithMessagef(err, "zk.Connect")
opions := &Options{}
for _, opt := range opts {
opt(opions)
}
// connect to zookeeper
if opions.ts != nil {
ts = opions.ts
} else {
ts, err = zk.StartTestCluster(1, nil, nil)
if err != nil {
return nil, nil, nil, perrors.WithMessagef(err, "zk.Connect")
}
}
//callbackChan := make(chan zk.Event)
......@@ -132,7 +211,7 @@ func newMockZookeeperClient(name string, timeout time.Duration) (*zk.TestCluster
// callbackChan <- event
//}
z.conn, event, err = ts.ConnectWithOptions(timeout)
z.Conn, event, err = ts.ConnectWithOptions(timeout)
if err != nil {
return nil, nil, nil, perrors.WithMessagef(err, "zk.Connect")
}
......@@ -141,15 +220,16 @@ func newMockZookeeperClient(name string, timeout time.Duration) (*zk.TestCluster
return ts, z, event, nil
}
func (z *zookeeperClient) handleZkEvent(session <-chan zk.Event) {
func (z *ZookeeperClient) HandleZkEvent(session <-chan zk.Event) {
var (
state int
event zk.Event
)
defer func() {
z.wait.Done()
logger.Infof("zk{path:%v, name:%s} connection goroutine game over.", z.zkAddrs, z.name)
z.Wait.Done()
logger.Infof("zk{path:%v, name:%s} connection goroutine game over.", z.ZkAddrs, z.name)
}()
LOOP:
......@@ -159,15 +239,15 @@ LOOP:
break LOOP
case event = <-session:
logger.Warnf("client{%s} get a zookeeper event{type:%s, server:%s, path:%s, state:%d-%s, err:%v}",
z.name, event.Type, event.Server, event.Path, event.State, stateToString(event.State), event.Err)
z.name, event.Type, event.Server, event.Path, event.State, StateToString(event.State), event.Err)
switch (int)(event.State) {
case (int)(zk.StateDisconnected):
logger.Warnf("zk{addr:%s} state is StateDisconnected, so close the zk client{name:%s}.", z.zkAddrs, z.name)
logger.Warnf("zk{addr:%s} state is StateDisconnected, so close the zk client{name:%s}.", z.ZkAddrs, z.name)
z.stop()
z.Lock()
if z.conn != nil {
z.conn.Close()
z.conn = nil
if z.Conn != nil {
z.Conn.Close()
z.Conn = nil
}
z.Unlock()
break LOOP
......@@ -199,7 +279,7 @@ LOOP:
}
}
func (z *zookeeperClient) registerEvent(zkPath string, event *chan struct{}) {
func (z *ZookeeperClient) RegisterEvent(zkPath string, event *chan struct{}) {
if zkPath == "" || event == nil {
return
}
......@@ -212,7 +292,7 @@ func (z *zookeeperClient) registerEvent(zkPath string, event *chan struct{}) {
z.Unlock()
}
func (z *zookeeperClient) unregisterEvent(zkPath string, event *chan struct{}) {
func (z *ZookeeperClient) UnregisterEvent(zkPath string, event *chan struct{}) {
if zkPath == "" {
return
}
......@@ -241,11 +321,11 @@ func (z *zookeeperClient) unregisterEvent(zkPath string, event *chan struct{}) {
z.Unlock()
}
func (z *zookeeperClient) done() <-chan struct{} {
func (z *ZookeeperClient) Done() <-chan struct{} {
return z.exit
}
func (z *zookeeperClient) stop() bool {
func (z *ZookeeperClient) stop() bool {
select {
case <-z.exit:
return true
......@@ -256,7 +336,7 @@ func (z *zookeeperClient) stop() bool {
return false
}
func (z *zookeeperClient) zkConnValid() bool {
func (z *ZookeeperClient) ZkConnValid() bool {
select {
case <-z.exit:
return false
......@@ -265,7 +345,7 @@ func (z *zookeeperClient) zkConnValid() bool {
valid := true
z.Lock()
if z.conn == nil {
if z.Conn == nil {
valid = false
}
z.Unlock()
......@@ -273,23 +353,23 @@ func (z *zookeeperClient) zkConnValid() bool {
return valid
}
func (z *zookeeperClient) Close() {
func (z *ZookeeperClient) Close() {
if z == nil {
return
}
z.stop()
z.wait.Wait()
z.Wait.Wait()
z.Lock()
if z.conn != nil {
z.conn.Close()
z.conn = nil
if z.Conn != nil {
z.Conn.Close()
z.Conn = nil
}
z.Unlock()
logger.Warnf("zkClient{name:%s, zk addr:%s} exit now.", z.name, z.zkAddrs)
logger.Warnf("zkClient{name:%s, zk addr:%s} exit now.", z.name, z.ZkAddrs)
}
func (z *zookeeperClient) Create(basePath string) error {
func (z *ZookeeperClient) Create(basePath string) error {
var (
err error
tmpPath string
......@@ -300,8 +380,8 @@ func (z *zookeeperClient) Create(basePath string) error {
tmpPath = path.Join(tmpPath, "/", str)
err = errNilZkClientConn
z.Lock()
if z.conn != nil {
_, err = z.conn.Create(tmpPath, []byte(""), 0, zk.WorldACL(zk.PermAll))
if z.Conn != nil {
_, err = z.Conn.Create(tmpPath, []byte(""), 0, zk.WorldACL(zk.PermAll))
}
z.Unlock()
if err != nil {
......@@ -317,22 +397,22 @@ func (z *zookeeperClient) Create(basePath string) error {
return nil
}
func (z *zookeeperClient) Delete(basePath string) error {
func (z *ZookeeperClient) Delete(basePath string) error {
var (
err error
)
err = errNilZkClientConn
z.Lock()
if z.conn != nil {
err = z.conn.Delete(basePath, -1)
if z.Conn != nil {
err = z.Conn.Delete(basePath, -1)
}
z.Unlock()
return perrors.WithMessagef(err, "Delete(basePath:%s)", basePath)
}
func (z *zookeeperClient) RegisterTemp(basePath string, node string) (string, error) {
func (z *ZookeeperClient) RegisterTemp(basePath string, node string) (string, error) {
var (
err error
data []byte
......@@ -344,8 +424,8 @@ func (z *zookeeperClient) RegisterTemp(basePath string, node string) (string, er
data = []byte("")
zkPath = path.Join(basePath) + "/" + node
z.Lock()
if z.conn != nil {
tmpPath, err = z.conn.Create(zkPath, data, zk.FlagEphemeral, zk.WorldACL(zk.PermAll))
if z.Conn != nil {
tmpPath, err = z.Conn.Create(zkPath, data, zk.FlagEphemeral, zk.WorldACL(zk.PermAll))
}
z.Unlock()
//if err != nil && err != zk.ErrNodeExists {
......@@ -358,7 +438,7 @@ func (z *zookeeperClient) RegisterTemp(basePath string, node string) (string, er
return tmpPath, nil
}
func (z *zookeeperClient) RegisterTempSeq(basePath string, data []byte) (string, error) {
func (z *ZookeeperClient) RegisterTempSeq(basePath string, data []byte) (string, error) {
var (
err error
tmpPath string
......@@ -366,8 +446,8 @@ func (z *zookeeperClient) RegisterTempSeq(basePath string, data []byte) (string,
err = errNilZkClientConn
z.Lock()
if z.conn != nil {
tmpPath, err = z.conn.Create(
if z.Conn != nil {
tmpPath, err = z.Conn.Create(
path.Join(basePath)+"/",
data,
zk.FlagEphemeral|zk.FlagSequence,
......@@ -386,7 +466,7 @@ func (z *zookeeperClient) RegisterTempSeq(basePath string, data []byte) (string,
return tmpPath, nil
}
func (z *zookeeperClient) getChildrenW(path string) ([]string, <-chan zk.Event, error) {
func (z *ZookeeperClient) GetChildrenW(path string) ([]string, <-chan zk.Event, error) {
var (
err error
children []string
......@@ -396,8 +476,8 @@ func (z *zookeeperClient) getChildrenW(path string) ([]string, <-chan zk.Event,
err = errNilZkClientConn
z.Lock()
if z.conn != nil {
children, stat, event, err = z.conn.ChildrenW(path)
if z.Conn != nil {
children, stat, event, err = z.Conn.ChildrenW(path)
}
z.Unlock()
if err != nil {
......@@ -417,7 +497,7 @@ func (z *zookeeperClient) getChildrenW(path string) ([]string, <-chan zk.Event,
return children, event, nil
}
func (z *zookeeperClient) getChildren(path string) ([]string, error) {
func (z *ZookeeperClient) GetChildren(path string) ([]string, error) {
var (
err error
children []string
......@@ -426,8 +506,8 @@ func (z *zookeeperClient) getChildren(path string) ([]string, error) {
err = errNilZkClientConn
z.Lock()
if z.conn != nil {
children, stat, err = z.conn.Children(path)
if z.Conn != nil {
children, stat, err = z.Conn.Children(path)
}
z.Unlock()
if err != nil {
......@@ -447,7 +527,7 @@ func (z *zookeeperClient) getChildren(path string) ([]string, error) {
return children, nil
}
func (z *zookeeperClient) existW(zkPath string) (<-chan zk.Event, error) {
func (z *ZookeeperClient) ExistW(zkPath string) (<-chan zk.Event, error) {
var (
exist bool
err error
......@@ -456,8 +536,8 @@ func (z *zookeeperClient) existW(zkPath string) (<-chan zk.Event, error) {
err = errNilZkClientConn
z.Lock()
if z.conn != nil {
exist, _, event, err = z.conn.ExistsW(zkPath)
if z.Conn != nil {
exist, _, event, err = z.Conn.ExistsW(zkPath)
}
z.Unlock()
if err != nil {
......
......@@ -93,7 +93,8 @@ func verifyEventOrder(t *testing.T, c <-chan zk.Event, expectedEvent []zk.EventT
//}
func Test_newMockZookeeperClient(t *testing.T) {
ts, z, event, _ := newMockZookeeperClient("test", 15*time.Second)
ts, z, event, err := NewMockZookeeperClient("test", 15*time.Second)
assert.NoError(t, err)
defer ts.Stop()
states := []zk.State{zk.StateConnecting, zk.StateConnected, zk.StateHasSession}
verifyEventStateOrder(t, event, states, "event channel")
......@@ -103,7 +104,7 @@ func Test_newMockZookeeperClient(t *testing.T) {
}
func TestCreate(t *testing.T) {
ts, z, event, _ := newMockZookeeperClient("test", 15*time.Second)
ts, z, event, _ := NewMockZookeeperClient("test", 15*time.Second)
defer ts.Stop()
err := z.Create("test1/test2/test3/test4")
assert.NoError(t, err)
......@@ -113,7 +114,7 @@ func TestCreate(t *testing.T) {
}
func TestCreateDelete(t *testing.T) {
ts, z, event, _ := newMockZookeeperClient("test", 15*time.Second)
ts, z, event, _ := NewMockZookeeperClient("test", 15*time.Second)
defer ts.Stop()
states := []zk.State{zk.StateConnecting, zk.StateConnected, zk.StateHasSession}
......@@ -126,7 +127,7 @@ func TestCreateDelete(t *testing.T) {
}
func TestRegisterTemp(t *testing.T) {
ts, z, event, _ := newMockZookeeperClient("test", 15*time.Second)
ts, z, event, _ := NewMockZookeeperClient("test", 15*time.Second)
defer ts.Stop()
err := z.Create("/test1/test2/test3")
assert.NoError(t, err)
......@@ -139,7 +140,7 @@ func TestRegisterTemp(t *testing.T) {
}
func TestRegisterTempSeq(t *testing.T) {
ts, z, event, _ := newMockZookeeperClient("test", 15*time.Second)
ts, z, event, _ := NewMockZookeeperClient("test", 15*time.Second)
defer ts.Stop()
err := z.Create("/test1/test2/test3")
assert.NoError(t, err)
......
package zookeeper
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/logger"
perrors "github.com/pkg/errors"
"sync"
"time"
)
type ZkClientContainer interface {
ZkClient() *ZookeeperClient
SetZkClient(*ZookeeperClient)
ZkClientLock() *sync.Mutex
WaitGroup() *sync.WaitGroup //for wait group control, zk client listener & zk client container
GetDone() chan struct{} //for zk client control
RestartCallBack() bool
common.Node
}
func HandleClientRestart(r ZkClientContainer) {
var (
err error
failTimes int
)
defer r.WaitGroup().Done()
LOOP:
for {
select {
case <-r.GetDone():
logger.Warnf("(ZkProviderRegistry)reconnectZkRegistry goroutine exit now...")
break LOOP
// re-register all services
case <-r.ZkClient().Done():
r.ZkClientLock().Lock()
r.ZkClient().Close()
zkName := r.ZkClient().name
zkAddress := r.ZkClient().ZkAddrs
r.SetZkClient(nil)
r.ZkClientLock().Unlock()
// 接zk,直至成功
failTimes = 0
for {
select {
case <-r.GetDone():
logger.Warnf("(ZkProviderRegistry)reconnectZkRegistry goroutine exit now...")
break LOOP
case <-time.After(time.Duration(1e9 * failTimes * ConnDelay)): // 防止疯狂重连zk
}
err = ValidateZookeeperClient(r, WithZkName(zkName))
logger.Infof("ZkProviderRegistry.validateZookeeperClient(zkAddr{%s}) = error{%#v}",
zkAddress, perrors.WithStack(err))
if err == nil {
if r.RestartCallBack() {
break
}
}
failTimes++
if MaxFailTimes <= failTimes {
failTimes = MaxFailTimes
}
}
}
}
}
package zookeeper
type DataListener interface {
DataChange(eventType ZkEvent) bool //bool is return for interface implement is interesting
}
......@@ -18,7 +18,6 @@
package zookeeper
import (
"context"
"fmt"
"path"
"sync"
......@@ -33,46 +32,39 @@ import (
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/registry"
)
const (
MaxFailTimes = 15
)
type zkEvent struct {
res *registry.ServiceEvent
type ZkEvent struct {
Res *common.Event
err error
}
func (e zkEvent) String() string {
return fmt.Sprintf("err:%s, res:%s", e.err, e.res)
func (e ZkEvent) String() string {
return fmt.Sprintf("err:%s, res:%s", e.err, e.Res)
}
type zkEventListener struct {
client *zookeeperClient
events chan zkEvent
serviceMapLock sync.Mutex
serviceMap map[string]struct{}
wg sync.WaitGroup
registry *zkRegistry
type ZkEventListener struct {
client *ZookeeperClient
pathMapLock sync.Mutex
pathMap map[string]struct{}
wg sync.WaitGroup
}
func newZkEventListener(registry *zkRegistry, client *zookeeperClient) *zkEventListener {
return &zkEventListener{
client: client,
registry: registry,
events: make(chan zkEvent, 32),
serviceMap: make(map[string]struct{}),
func NewZkEventListener(client *ZookeeperClient) *ZkEventListener {
return &ZkEventListener{
client: client,
pathMap: make(map[string]struct{}),
}
}
func (l *zkEventListener) listenServiceNodeEvent(zkPath string) bool {
func (l *ZkEventListener) SetClient(client *ZookeeperClient) {
l.client = client
}
func (l *ZkEventListener) listenServiceNodeEvent(zkPath string) bool {
l.wg.Add(1)
defer l.wg.Done()
var zkEvent zk.Event
for {
keyEventCh, err := l.client.existW(zkPath)
keyEventCh, err := l.client.ExistW(zkPath)
if err != nil {
logger.Errorf("existW{key:%s} = error{%v}", zkPath, err)
return false
......@@ -81,7 +73,7 @@ func (l *zkEventListener) listenServiceNodeEvent(zkPath string) bool {
select {
case zkEvent = <-keyEventCh:
logger.Warnf("get a zookeeper zkEvent{type:%s, server:%s, path:%s, state:%d-%s, err:%s}",
zkEvent.Type.String(), zkEvent.Server, zkEvent.Path, zkEvent.State, stateToString(zkEvent.State), zkEvent.Err)
zkEvent.Type.String(), zkEvent.Server, zkEvent.Path, zkEvent.State, StateToString(zkEvent.State), zkEvent.Err)
switch zkEvent.Type {
case zk.EventNodeDataChanged:
logger.Warnf("zk.ExistW(key{%s}) = event{EventNodeDataChanged}", zkPath)
......@@ -93,7 +85,7 @@ func (l *zkEventListener) listenServiceNodeEvent(zkPath string) bool {
logger.Warnf("zk.ExistW(key{%s}) = event{EventNodeDeleted}", zkPath)
return true
}
case <-l.client.done():
case <-l.client.Done():
return false
}
}
......@@ -101,7 +93,7 @@ func (l *zkEventListener) listenServiceNodeEvent(zkPath string) bool {
return false
}
func (l *zkEventListener) handleZkNodeEvent(zkPath string, children []string, conf common.URL) {
func (l *ZkEventListener) handleZkNodeEvent(zkPath string, children []string, listener DataListener) {
contains := func(s []string, e string) bool {
for _, a := range s {
if a == e {
......@@ -112,7 +104,7 @@ func (l *zkEventListener) handleZkNodeEvent(zkPath string, children []string, co
return false
}
newChildren, err := l.client.getChildren(zkPath)
newChildren, err := l.client.GetChildren(zkPath)
if err != nil {
logger.Errorf("path{%s} child nodes changed, zk.Children() = error{%v}", zkPath, perrors.WithStack(err))
return
......@@ -120,8 +112,7 @@ func (l *zkEventListener) handleZkNodeEvent(zkPath string, children []string, co
// a node was added -- listen the new node
var (
newNode string
serviceURL common.URL
newNode string
)
for _, n := range newChildren {
if contains(children, n) {
......@@ -130,27 +121,18 @@ func (l *zkEventListener) handleZkNodeEvent(zkPath string, children []string, co
newNode = path.Join(zkPath, n)
logger.Infof("add zkNode{%s}", newNode)
//context.TODO
serviceURL, err = common.NewURL(context.TODO(), n)
if err != nil {
logger.Errorf("NewURL(%s) = error{%v}", n, perrors.WithStack(err))
if !listener.DataChange(ZkEvent{&common.Event{Path: zkPath, Action: common.Add, Content: n}, nil}) {
continue
}
if !conf.URLEqual(serviceURL) {
logger.Warnf("serviceURL{%s} is not compatible with SubURL{%#v}", serviceURL.Key(), conf.Key())
continue
}
logger.Infof("add serviceURL{%s}", serviceURL)
l.events <- zkEvent{&registry.ServiceEvent{Action: registry.ServiceAdd, Service: serviceURL}, nil}
// listen l service node
go func(node string, serviceURL common.URL) {
go func(node string) {
logger.Infof("delete zkNode{%s}", node)
if l.listenServiceNodeEvent(node) {
logger.Infof("delete serviceURL{%s}", serviceURL)
l.events <- zkEvent{&registry.ServiceEvent{Action: registry.ServiceDel, Service: serviceURL}, nil}
logger.Infof("delete content{%s}", n)
listener.DataChange(ZkEvent{&common.Event{Path: zkPath, Action: common.Del, Content: n}, nil})
}
logger.Warnf("listenSelf(zk path{%s}) goroutine exit now", zkPath)
}(newNode, serviceURL)
}(newNode)
}
// old node was deleted
......@@ -162,21 +144,19 @@ func (l *zkEventListener) handleZkNodeEvent(zkPath string, children []string, co
oldNode = path.Join(zkPath, n)
logger.Warnf("delete zkPath{%s}", oldNode)
serviceURL, err = common.NewURL(context.TODO(), n)
if !conf.URLEqual(serviceURL) {
logger.Warnf("serviceURL{%s} has been deleted is not compatible with SubURL{%#v}", serviceURL.Key(), conf.Key())
if !listener.DataChange(ZkEvent{&common.Event{Path: zkPath, Action: common.Add, Content: n}, nil}) {
continue
}
logger.Warnf("delete serviceURL{%s}", serviceURL)
logger.Warnf("delete content{%s}", n)
if err != nil {
logger.Errorf("NewURL(i{%s}) = error{%v}", n, perrors.WithStack(err))
continue
}
l.events <- zkEvent{&registry.ServiceEvent{Action: registry.ServiceDel, Service: serviceURL}, nil}
listener.DataChange(ZkEvent{&common.Event{Path: zkPath, Action: common.Del, Content: n}, nil})
}
}
func (l *zkEventListener) listenDirEvent(zkPath string, conf common.URL) {
func (l *ZkEventListener) listenDirEvent(zkPath string, listener DataListener) {
l.wg.Add(1)
defer l.wg.Done()
......@@ -189,7 +169,7 @@ func (l *zkEventListener) listenDirEvent(zkPath string, conf common.URL) {
defer close(event)
for {
// get current children for a zkPath
children, childEventCh, err := l.client.getChildrenW(zkPath)
children, childEventCh, err := l.client.GetChildrenW(zkPath)
if err != nil {
failTimes++
if MaxFailTimes <= failTimes {
......@@ -205,19 +185,19 @@ func (l *zkEventListener) listenDirEvent(zkPath string, conf common.URL) {
break CLEAR
}
}
l.client.registerEvent(zkPath, &event)
l.client.RegisterEvent(zkPath, &event)
select {
case <-time.After(timeSecondDuration(failTimes * RegistryConnDelay)):
l.client.unregisterEvent(zkPath, &event)
case <-time.After(timeSecondDuration(failTimes * ConnDelay)):
l.client.UnregisterEvent(zkPath, &event)
continue
case <-l.client.done():
l.client.unregisterEvent(zkPath, &event)
logger.Warnf("client.done(), listen(path{%s}, ReferenceConfig{%#v}) goroutine exit now...", zkPath, conf)
case <-l.client.Done():
l.client.UnregisterEvent(zkPath, &event)
logger.Warnf("client.done(), listen(path{%s}) goroutine exit now...", zkPath)
return
case <-event:
logger.Infof("get zk.EventNodeDataChange notify event")
l.client.unregisterEvent(zkPath, &event)
l.handleZkNodeEvent(zkPath, nil, conf)
l.client.UnregisterEvent(zkPath, &event)
l.handleZkNodeEvent(zkPath, nil, listener)
continue
}
}
......@@ -226,64 +206,57 @@ func (l *zkEventListener) listenDirEvent(zkPath string, conf common.URL) {
select {
case zkEvent = <-childEventCh:
logger.Warnf("get a zookeeper zkEvent{type:%s, server:%s, path:%s, state:%d-%s, err:%s}",
zkEvent.Type.String(), zkEvent.Server, zkEvent.Path, zkEvent.State, stateToString(zkEvent.State), zkEvent.Err)
zkEvent.Type.String(), zkEvent.Server, zkEvent.Path, zkEvent.State, StateToString(zkEvent.State), zkEvent.Err)
if zkEvent.Type != zk.EventNodeChildrenChanged {
continue
}
l.handleZkNodeEvent(zkEvent.Path, children, conf)
case <-l.client.done():
logger.Warnf("client.done(), listen(path{%s}, ReferenceConfig{%#v}) goroutine exit now...", zkPath, conf)
l.handleZkNodeEvent(zkEvent.Path, children, listener)
case <-l.client.Done():
logger.Warnf("client.done(), listen(path{%s}) goroutine exit now...", zkPath)
return
}
}
}
func timeSecondDuration(sec int) time.Duration {
return time.Duration(sec) * time.Second
}
// this func is invoked by ZkConsumerRegistry::Registe/ZkConsumerRegistry::get/ZkConsumerRegistry::getListener
// registry.go:Listen -> listenServiceEvent -> listenDirEvent -> listenServiceNodeEvent
// |
// --------> listenServiceNodeEvent
func (l *zkEventListener) listenServiceEvent(conf common.URL) {
func (l *ZkEventListener) ListenServiceEvent(zkPath string, listener DataListener) {
var (
err error
zkPath string
dubboPath string
children []string
serviceURL common.URL
)
zkPath = fmt.Sprintf("/dubbo%s/providers", conf.Path)
l.serviceMapLock.Lock()
_, ok := l.serviceMap[zkPath]
l.serviceMapLock.Unlock()
l.pathMapLock.Lock()
_, ok := l.pathMap[zkPath]
l.pathMapLock.Unlock()
if ok {
logger.Warnf("@zkPath %s has already been listened.", zkPath)
return
}
l.serviceMapLock.Lock()
l.serviceMap[zkPath] = struct{}{}
l.serviceMapLock.Unlock()
l.pathMapLock.Lock()
l.pathMap[zkPath] = struct{}{}
l.pathMapLock.Unlock()
logger.Infof("listen dubbo provider path{%s} event and wait to get all provider zk nodes", zkPath)
children, err = l.client.getChildren(zkPath)
children, err = l.client.GetChildren(zkPath)
if err != nil {
children = nil
logger.Errorf("fail to get children of zk path{%s}", zkPath)
}
for _, c := range children {
serviceURL, err = common.NewURL(context.TODO(), c)
if err != nil {
logger.Errorf("NewURL(r{%s}) = error{%v}", c, err)
if !listener.DataChange(ZkEvent{&common.Event{Path: zkPath, Action: common.Add, Content: c}, nil}) {
continue
}
if !conf.URLEqual(serviceURL) {
logger.Warnf("serviceURL %v is not compatible with SubURL %v", serviceURL.Key(), conf.Key())
continue
}
logger.Debugf("add serviceUrl{%s}", serviceURL)
l.events <- zkEvent{&registry.ServiceEvent{Action: registry.ServiceAdd, Service: serviceURL}, nil}
// listen l service node
dubboPath = path.Join(zkPath, c)
......@@ -291,55 +264,23 @@ func (l *zkEventListener) listenServiceEvent(conf common.URL) {
go func(zkPath string, serviceURL common.URL) {
if l.listenServiceNodeEvent(dubboPath) {
logger.Debugf("delete serviceUrl{%s}", serviceURL)
l.events <- zkEvent{&registry.ServiceEvent{Action: registry.ServiceDel, Service: serviceURL}, nil}
listener.DataChange(ZkEvent{&common.Event{Path: zkPath, Action: common.Del, Content: c}, nil})
}
logger.Warnf("listenSelf(zk path{%s}) goroutine exit now", zkPath)
}(dubboPath, serviceURL)
}
logger.Infof("listen dubbo path{%s}", zkPath)
go func(zkPath string, conf common.URL) {
l.listenDirEvent(zkPath, conf)
go func(zkPath string, listener DataListener) {
l.listenDirEvent(zkPath, listener)
logger.Warnf("listenDirEvent(zkPath{%s}) goroutine exit now", zkPath)
}(zkPath, conf)
}
func (l *zkEventListener) Next() (*registry.ServiceEvent, error) {
for {
select {
case <-l.client.done():
logger.Warnf("listener's zk client connection is broken, so zk event listener exit now.")
return nil, perrors.New("listener stopped")
case <-l.registry.done:
logger.Warnf("zk consumer register has quit, so zk event listener exit asap now.")
return nil, perrors.New("listener stopped")
case e := <-l.events:
logger.Debugf("got zk event %s", e)
if e.err != nil {
return nil, perrors.WithStack(e.err)
}
if e.res.Action == registry.ServiceDel && !l.valid() {
logger.Warnf("update @result{%s}. But its connection to registry is invalid", e.res)
continue
}
//r.update(e.res)
//write to invoker
//r.outerEventCh <- e.res
return e.res, nil
}
}
}(zkPath, listener)
}
func (l *zkEventListener) valid() bool {
return l.client.zkConnValid()
func (l *ZkEventListener) valid() bool {
return l.client.ZkConnValid()
}
func (l *zkEventListener) Close() {
l.registry.listenerLock.Lock()
l.client.Close()
l.registry.listenerLock.Unlock()
l.registry.wg.Done()
func (l *ZkEventListener) Close() {
l.wg.Wait()
}
File added
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment