Skip to content
Snippets Groups Projects
Commit bbc34823 authored by vito.he's avatar vito.he
Browse files

Ftr:config_center

parent d8763ab2
No related branches found
No related tags found
No related merge requests found
Showing
with 534 additions and 32 deletions
package config
import "sync"
type Environment struct {
}
var instance *Environment
var once sync.Once
func GetEnvInstance() *Environment {
once.Do(func() {
instance = &Environment{}
})
return instance
}
......@@ -37,4 +37,3 @@ const (
DEFAULT_REFERENCE_FILTERS = ""
ECHO = "$echo"
)
......@@ -68,7 +68,7 @@ const (
ENVIRONMENT_KEY = "environment"
)
const(
const (
CONFIG_NAMESPACE_KEY = "config.namespace"
CONFIG_TIMEOUT_KET = "config.timeout"
)
\ No newline at end of file
CONFIG_TIMEOUT_KET = "config.timeout"
)
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package extension
import (
"github.com/apache/dubbo-go/config_center"
)
var (
configCenterFactories = make(map[string]func() config_center.DynamicConfigurationFactory)
)
func SetConfigCenterFactory(name string, v func() config_center.DynamicConfigurationFactory) {
configCenterFactories[name] = v
}
func GetConfigCenterFactory(name string) config_center.DynamicConfigurationFactory {
if configCenterFactories[name] == nil {
panic("config center for " + name + " is not existing, make sure you have import the package.")
}
return configCenterFactories[name]()
}
......@@ -246,7 +246,6 @@ func (c URL) Key() string {
return buildString
}
func (c URL) Context() context.Context {
return c.ctx
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package config
import (
"context"
)
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/logger"
perrors "github.com/pkg/errors"
)
import (
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/config_center"
)
type baseConfig struct {
// application
ApplicationConfig ApplicationConfig `yaml:"application_config" json:"application_config,omitempty"`
ConfigCenterConfig ConfigCenterConfig
configCenterUrl *common.URL
}
func (c *baseConfig) startConfigCenter(ctx context.Context) {
var err error
*c.configCenterUrl, err = common.NewURL(ctx, c.ConfigCenterConfig.Address)
c.prepareEnvironment()
}
func (c *baseConfig) prepareEnvironment() error {
factory := extension.GetConfigCenterFactory(c.ConfigCenterConfig.Protocol)
dynamicConfig, err := factory.GetDynamicConfiguration(c.configCenterUrl)
if err != nil {
logger.Errorf("get dynamic configuration error , error message is %v", err)
return err
}
content, err := dynamicConfig.GetConfig(c.ConfigCenterConfig.ConfigFile, config_center.WithGroup(c.ConfigCenterConfig.Group))
if err != nil {
logger.Errorf("Get config content in dynamic configuration error , error message is %v", err)
return err
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package config
import (
"context"
"time"
)
type ConfigCenterConfig struct {
context context.Context
Protocol string `required:"true" yaml:"protocol" json:"protocol,omitempty"`
Address string `yaml:"address" json:"address,omitempty"`
Cluster string `yaml:"cluster" json:"cluster,omitempty"`
Group string `default:"dubbo" yaml:"group" json:"group,omitempty"`
Username string `yaml:"username" json:"username,omitempty"`
Password string `yaml:"password" json:"password,omitempty"`
Check *bool `yaml:"check" json:"check,omitempty"`
ConfigFile string `default:"dubbo.properties" yaml:"config_file" json:"config_file,omitempty"`
TimeoutStr string `yaml:"timeout" json:"timeout,omitempty"`
timeout time.Duration
}
......@@ -123,6 +123,7 @@ func providerInit(confProFile string) error {
/////////////////////////
type ConsumerConfig struct {
baseConfig
Filter string `yaml:"filter" json:"filter,omitempty"`
// client
......@@ -133,11 +134,11 @@ type ConsumerConfig struct {
RequestTimeout time.Duration
ProxyFactory string `yaml:"proxy_factory" default:"default" json:"proxy_factory,omitempty"`
Check *bool `yaml:"check" json:"check,omitempty"`
// application
ApplicationConfig ApplicationConfig `yaml:"application_config" json:"application_config,omitempty"`
Registries []RegistryConfig `yaml:"registries" json:"registries,omitempty"`
References []ReferenceConfig `yaml:"references" json:"references,omitempty"`
ProtocolConf interface{} `yaml:"protocol_conf" json:"protocol_conf,omitempty"`
Registries []RegistryConfig `yaml:"registries" json:"registries,omitempty"`
References []ReferenceConfig `yaml:"references" json:"references,omitempty"`
ConfigCenter ConfigCenterConfig `yaml:"config_center" json:"config_center,omitempty"`
ProtocolConf interface{} `yaml:"protocol_conf" json:"protocol_conf,omitempty"`
}
type ReferenceConfigTmp struct {
......
......@@ -33,8 +33,8 @@ const DEFAULT_CONFIG_TIMEOUT = "10s"
type DynamicConfiguration interface {
AddListener(string, remoting.ConfigurationListener, ...Option)
RemoveListener(string, remoting.ConfigurationListener, ...Option)
GetConfig(string, ...Option) string
GetConfigs(string, ...Option) string
GetConfig(string, ...Option) (string, error)
GetConfigs(string, ...Option) (string, error)
}
type Options struct {
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package config_center
import "github.com/apache/dubbo-go/common"
type DynamicConfigurationFactory interface {
GetDynamicConfiguration(*common.URL) (DynamicConfiguration, error)
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package zookeeper
import (
"sync"
)
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/config_center"
)
func init() {
extension.SetConfigCenterFactory("zookeeper", func() config_center.DynamicConfigurationFactory { return &zookeeperDynamicConfigurationFactory{} })
}
type zookeeperDynamicConfigurationFactory struct {
}
var once sync.Once
var dynamicConfiguration *zookeeperDynamicConfiguration
func (f *zookeeperDynamicConfigurationFactory) GetDynamicConfiguration(url *common.URL) (config_center.DynamicConfiguration, error) {
var err error
once.Do(func() {
dynamicConfiguration, err = newZookeeperDynamicConfiguration(url)
})
return dynamicConfiguration, err
}
......@@ -18,8 +18,12 @@
package zookeeper
import (
"strings"
"sync"
)
import (
perrors "github.com/pkg/errors"
)
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
......@@ -29,80 +33,106 @@ import (
"github.com/apache/dubbo-go/remoting/zookeeper"
)
const ZK_CLIENT = "zk config_center"
const ZkClient = "zk config_center"
type ZookeeperDynamicConfiguration struct {
url common.URL
type zookeeperDynamicConfiguration struct {
url *common.URL
rootPath string
wg sync.WaitGroup
cltLock sync.Mutex
done chan struct{}
client *zookeeper.ZookeeperClient
listenerLock sync.Mutex
listener *zookeeper.ZkEventListener
listenerLock sync.Mutex
listener *zookeeper.ZkEventListener
cacheListener *CacheListener
}
func NewZookeeperDynamicConfiguration(url common.URL) (config_center.DynamicConfiguration, error) {
c := &ZookeeperDynamicConfiguration{
func newZookeeperDynamicConfiguration(url *common.URL) (*zookeeperDynamicConfiguration, error) {
c := &zookeeperDynamicConfiguration{
url: url,
rootPath: "/" + url.GetParam(constant.CONFIG_NAMESPACE_KEY, config_center.DEFAULT_GROUP) + "/config",
}
err := zookeeper.ValidateZookeeperClient(c, zookeeper.WithZkName(ZK_CLIENT))
err := zookeeper.ValidateZookeeperClient(c, zookeeper.WithZkName(ZkClient))
if err != nil {
logger.Errorf("zookeeper client start error ,error message is %v", err)
return nil, err
}
c.wg.Add(1)
go zookeeper.HandleClientRestart(c)
c.listener = zookeeper.NewZkEventListener(c.client)
//c.configListener = NewRegistryConfigurationListener(c.client, c)
//c.dataListener = NewRegistryDataListener(c.configListener)
c.cacheListener = NewCacheListener(c.rootPath)
return c, nil
}
func (*ZookeeperDynamicConfiguration) AddListener(key string, listener remoting.ConfigurationListener, opions ...config_center.Option) {
func (c *zookeeperDynamicConfiguration) AddListener(key string, listener remoting.ConfigurationListener, opions ...config_center.Option) {
c.cacheListener.AddListener(key, listener)
}
func (*ZookeeperDynamicConfiguration) RemoveListener(key string, listener remoting.ConfigurationListener, opions ...config_center.Option) {
func (c *zookeeperDynamicConfiguration) RemoveListener(key string, listener remoting.ConfigurationListener, opions ...config_center.Option) {
c.cacheListener.RemoveListener(key, listener)
}
func (*ZookeeperDynamicConfiguration) GetConfig(key string, opions ...config_center.Option) string {
return ""
func (c *zookeeperDynamicConfiguration) GetConfig(key string, opts ...config_center.Option) (string, error) {
/**
* when group is not null, we are getting startup configs from Config Center, for example:
* group=dubbo, key=dubbo.properties
*/
opions := &config_center.Options{}
for _, opt := range opts {
opt(opions)
}
if opions.Group != "" {
key = opions.Group + "/" + key
}
/**
* when group is null, we are fetching governance rules, for example:
* 1. key=org.apache.dubbo.DemoService.configurators
* 2. key = org.apache.dubbo.DemoService.condition-router
*/
i := strings.LastIndex(key, ".")
key = key[0:i] + "/" + key[i+1:]
content, _, err := c.client.GetContent(c.rootPath + "/" + key)
if err != nil {
return "", perrors.WithStack(err)
} else {
return string(content), nil
}
}
func (*ZookeeperDynamicConfiguration) GetConfigs(key string, opions ...config_center.Option) string {
return ""
//For zookeeper, getConfig and getConfigs have the same meaning.
func (c *zookeeperDynamicConfiguration) GetConfigs(key string, opts ...config_center.Option) (string, error) {
return c.GetConfig(key, opts...)
}
func (r *ZookeeperDynamicConfiguration) ZkClient() *zookeeper.ZookeeperClient {
func (r *zookeeperDynamicConfiguration) ZkClient() *zookeeper.ZookeeperClient {
return r.client
}
func (r *ZookeeperDynamicConfiguration) SetZkClient(client *zookeeper.ZookeeperClient) {
func (r *zookeeperDynamicConfiguration) SetZkClient(client *zookeeper.ZookeeperClient) {
r.client = client
}
func (r *ZookeeperDynamicConfiguration) ZkClientLock() *sync.Mutex {
func (r *zookeeperDynamicConfiguration) ZkClientLock() *sync.Mutex {
return &r.cltLock
}
func (r *ZookeeperDynamicConfiguration) WaitGroup() *sync.WaitGroup {
func (r *zookeeperDynamicConfiguration) WaitGroup() *sync.WaitGroup {
return &r.wg
}
func (r *ZookeeperDynamicConfiguration) GetDone() chan struct{} {
func (r *zookeeperDynamicConfiguration) GetDone() chan struct{} {
return r.done
}
func (r *ZookeeperDynamicConfiguration) GetUrl() common.URL {
return r.url
func (r *zookeeperDynamicConfiguration) GetUrl() common.URL {
return *r.url
}
func (r *ZookeeperDynamicConfiguration) Destroy() {
func (r *zookeeperDynamicConfiguration) Destroy() {
if r.listener != nil {
r.listener.Close()
}
......@@ -111,7 +141,7 @@ func (r *ZookeeperDynamicConfiguration) Destroy() {
r.closeConfigs()
}
func (r *ZookeeperDynamicConfiguration) IsAvailable() bool {
func (r *zookeeperDynamicConfiguration) IsAvailable() bool {
select {
case <-r.done:
return false
......@@ -120,7 +150,7 @@ func (r *ZookeeperDynamicConfiguration) IsAvailable() bool {
}
}
func (r *ZookeeperDynamicConfiguration) closeConfigs() {
func (r *zookeeperDynamicConfiguration) closeConfigs() {
r.cltLock.Lock()
defer r.cltLock.Unlock()
logger.Infof("begin to close provider zk client")
......@@ -129,6 +159,6 @@ func (r *ZookeeperDynamicConfiguration) closeConfigs() {
r.client = nil
}
func (r *ZookeeperDynamicConfiguration) RestartCallBack() bool {
func (r *zookeeperDynamicConfiguration) RestartCallBack() bool {
return true
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package zookeeper
import (
"fmt"
"strings"
"sync"
)
import (
"github.com/apache/dubbo-go/remoting"
)
type CacheListener struct {
keyListeners sync.Map
rootPath string
}
func NewCacheListener(rootPath string) *CacheListener {
return &CacheListener{rootPath: rootPath}
}
func (l *CacheListener) AddListener(key string, listener remoting.ConfigurationListener) {
// reference from https://stackoverflow.com/questions/34018908/golang-why-dont-we-have-a-set-datastructure
// make a map[your type]struct{} like set in java
listeners, loaded := l.keyListeners.LoadOrStore(key, map[remoting.ConfigurationListener]struct{}{listener: struct{}{}})
if loaded {
listeners.(map[remoting.ConfigurationListener]struct{})[listener] = struct{}{}
l.keyListeners.Store(key, listeners)
}
}
func (l *CacheListener) RemoveListener(key string, listener remoting.ConfigurationListener) {
listeners, loaded := l.keyListeners.Load(key)
if loaded {
delete(listeners.(map[remoting.ConfigurationListener]struct{}), listener)
}
}
func (l *CacheListener) DataChange(event remoting.Event) bool {
fmt.Println(event)
key := l.pathToKey(event.Path)
if key != "" {
if listeners, ok := l.keyListeners.Load(key); ok {
for listener := range listeners.(map[remoting.ConfigurationListener]struct{}) {
listener.Process(&remoting.ConfigChangeEvent{Key: key, Value: event.Content, ConfigType: event.Action})
}
return true
}
}
return false
}
func (l *CacheListener) pathToKey(path string) string {
return strings.ReplaceAll(strings.Replace(path, l.rootPath+"/", "", -1), "/", ".")
}
......@@ -27,7 +27,9 @@ go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/net v0.0.0-20190320064053-1272bf9dcd53 h1:kcXqo9vE6fsZY5X5Rd7R1l7fTgnWaDCVmln65REefiE=
golang.org/x/net v0.0.0-20190320064053-1272bf9dcd53/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a h1:1BGLXjeY4akVXGgbC9HugT3Jv3hCI0z56oJR5vAMgBU=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
......
......@@ -19,6 +19,7 @@ package zookeeper
import (
"context"
"strings"
)
import (
perrors "github.com/pkg/errors"
......@@ -33,10 +34,10 @@ import (
type RegistryDataListener struct {
interestedURL []*common.URL
listener *RegistryConfigurationListener
listener remoting.ConfigurationListener
}
func NewRegistryDataListener(listener *RegistryConfigurationListener) *RegistryDataListener {
func NewRegistryDataListener(listener remoting.ConfigurationListener) *RegistryDataListener {
return &RegistryDataListener{listener: listener, interestedURL: []*common.URL{}}
}
func (l *RegistryDataListener) AddInterestedURL(url *common.URL) {
......@@ -44,9 +45,11 @@ func (l *RegistryDataListener) AddInterestedURL(url *common.URL) {
}
func (l *RegistryDataListener) DataChange(eventType remoting.Event) bool {
serviceURL, err := common.NewURL(context.TODO(), eventType.Content)
//截取最后一位
url := eventType.Path[strings.Index(eventType.Path, "/providers/")+len("/providers/"):]
serviceURL, err := common.NewURL(context.TODO(), url)
if err != nil {
logger.Errorf("Listen NewURL(r{%s}) = error{%v}", eventType.Content, err)
logger.Errorf("Listen NewURL(r{%s}) = error{%v}", url, err)
return false
}
for _, v := range l.interestedURL {
......
package zookeeper
import (
"context"
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/remoting"
"github.com/stretchr/testify/assert"
"testing"
)
func Test_DataChange(t *testing.T) {
listener := NewRegistryDataListener(&MockDataListener{})
url, _ := common.NewURL(context.TODO(), "jsonrpc%3A%2F%2F127.0.0.1%3A20001%2Fcom.ikurento.user.UserProvider%3Fanyhost%3Dtrue%26app.version%3D0.0.1%26application%3DBDTService%26category%3Dproviders%26cluster%3Dfailover%26dubbo%3Ddubbo-provider-golang-2.6.0%26environment%3Ddev%26group%3D%26interface%3Dcom.ikurento.user.UserProvider%26ip%3D10.32.20.124%26loadbalance%3Drandom%26methods.GetUser.loadbalance%3Drandom%26methods.GetUser.retries%3D1%26methods.GetUser.weight%3D0%26module%3Ddubbogo%2Buser-info%2Bserver%26name%3DBDTService%26organization%3Dikurento.com%26owner%3DZX%26pid%3D74500%26retries%3D0%26service.filter%3Decho%26side%3Dprovider%26timestamp%3D1560155407%26version%3D%26warmup%3D100")
listener.AddInterestedURL(&url)
int := listener.DataChange(remoting.Event{Path: "/dubbo/com.ikurento.user.UserProvider/providers/jsonrpc%3A%2F%2F127.0.0.1%3A20001%2Fcom.ikurento.user.UserProvider%3Fanyhost%3Dtrue%26app.version%3D0.0.1%26application%3DBDTService%26category%3Dproviders%26cluster%3Dfailover%26dubbo%3Ddubbo-provider-golang-2.6.0%26environment%3Ddev%26group%3D%26interface%3Dcom.ikurento.user.UserProvider%26ip%3D10.32.20.124%26loadbalance%3Drandom%26methods.GetUser.loadbalance%3Drandom%26methods.GetUser.retries%3D1%26methods.GetUser.weight%3D0%26module%3Ddubbogo%2Buser-info%2Bserver%26name%3DBDTService%26organization%3Dikurento.com%26owner%3DZX%26pid%3D74500%26retries%3D0%26service.filter%3Decho%26side%3Dprovider%26timestamp%3D1560155407%26version%3D%26warmup%3D100"})
assert.Equal(t, true, int)
}
type MockDataListener struct {
}
func (*MockDataListener) Process(configType *remoting.ConfigChangeEvent) {
}
......@@ -46,6 +46,7 @@ type EventType int
const (
Add = iota
Del
Mod
)
var serviceEventTypeStrings = [...]string{
......
......@@ -100,7 +100,7 @@ func WithZkName(name string) Option {
}
}
func ValidateZookeeperClient(container ZkClientContainer, opts ...Option) error {
func ValidateZookeeperClient(container zkClientFacade, opts ...Option) error {
var (
err error
)
......@@ -553,3 +553,7 @@ func (z *ZookeeperClient) ExistW(zkPath string) (<-chan zk.Event, error) {
return event, nil
}
func (z *ZookeeperClient) GetContent(zkPath string) ([]byte, *zk.Stat, error) {
return z.Conn.Get(zkPath)
}
......@@ -30,7 +30,7 @@ import (
"github.com/apache/dubbo-go/common/logger"
)
type ZkClientContainer interface {
type zkClientFacade interface {
ZkClient() *ZookeeperClient
SetZkClient(*ZookeeperClient)
ZkClientLock() *sync.Mutex
......@@ -40,7 +40,7 @@ type ZkClientContainer interface {
common.Node
}
func HandleClientRestart(r ZkClientContainer) {
func HandleClientRestart(r zkClientFacade) {
var (
err error
......
......@@ -50,7 +50,7 @@ func NewZkEventListener(client *ZookeeperClient) *ZkEventListener {
func (l *ZkEventListener) SetClient(client *ZookeeperClient) {
l.client = client
}
func (l *ZkEventListener) listenServiceNodeEvent(zkPath string) bool {
func (l *ZkEventListener) listenServiceNodeEvent(zkPath string, listener ...remoting.DataListener) bool {
l.wg.Add(1)
defer l.wg.Done()
var zkEvent zk.Event
......@@ -68,8 +68,17 @@ func (l *ZkEventListener) listenServiceNodeEvent(zkPath string) bool {
switch zkEvent.Type {
case zk.EventNodeDataChanged:
logger.Warnf("zk.ExistW(key{%s}) = event{EventNodeDataChanged}", zkPath)
if len(listener) > 0 {
content, _, _ := l.client.Conn.Get(zkEvent.Path)
listener[0].DataChange(remoting.Event{Path: zkEvent.Path, Action: remoting.Mod, Content: string(content)})
}
case zk.EventNodeCreated:
logger.Warnf("zk.ExistW(key{%s}) = event{EventNodeCreated}", zkPath)
if len(listener) > 0 {
content, _, _ := l.client.Conn.Get(zkEvent.Path)
listener[0].DataChange(remoting.Event{Path: zkEvent.Path, Action: remoting.Add, Content: string(content)})
}
case zk.EventNotWatching:
logger.Warnf("zk.ExistW(key{%s}) = event{EventNotWatching}", zkPath)
case zk.EventNodeDeleted:
......@@ -112,15 +121,19 @@ func (l *ZkEventListener) handleZkNodeEvent(zkPath string, children []string, li
newNode = path.Join(zkPath, n)
logger.Infof("add zkNode{%s}", newNode)
if !listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.Add, Content: n}) {
content, _, err := l.client.Conn.Get(newNode)
if err != nil {
logger.Errorf("Get new node path {%v} 's content error,message is {%v}", newNode, perrors.WithStack(err))
}
if !listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.Add, Content: string(content)}) {
continue
}
// listen l service node
go func(node string) {
logger.Infof("delete zkNode{%s}", node)
if l.listenServiceNodeEvent(node) {
if l.listenServiceNodeEvent(node, listener) {
logger.Infof("delete content{%s}", n)
listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.Del, Content: n})
listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.Del})
}
logger.Warnf("listenSelf(zk path{%s}) goroutine exit now", zkPath)
}(newNode)
......@@ -135,15 +148,12 @@ func (l *ZkEventListener) handleZkNodeEvent(zkPath string, children []string, li
oldNode = path.Join(zkPath, n)
logger.Warnf("delete zkPath{%s}", oldNode)
if !listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.Add, Content: n}) {
continue
}
logger.Warnf("delete content{%s}", n)
if err != nil {
logger.Errorf("NewURL(i{%s}) = error{%v}", n, perrors.WithStack(err))
continue
}
listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.Del, Content: n})
listener.DataChange(remoting.Event{Path: oldNode, Action: remoting.Del})
}
}
......@@ -245,17 +255,21 @@ func (l *ZkEventListener) ListenServiceEvent(zkPath string, listener remoting.Da
}
for _, c := range children {
if !listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.Add, Content: c}) {
continue
}
// listen l service node
dubboPath = path.Join(zkPath, c)
content, _, err := l.client.Conn.Get(dubboPath)
if err != nil {
logger.Errorf("Get new node path {%v} 's content error,message is {%v}", dubboPath, perrors.WithStack(err))
}
if !listener.DataChange(remoting.Event{Path: dubboPath, Action: remoting.Add, Content: string(content)}) {
continue
}
logger.Infof("listen dubbo service key{%s}", dubboPath)
go func(zkPath string, serviceURL common.URL) {
if l.listenServiceNodeEvent(dubboPath) {
logger.Debugf("delete serviceUrl{%s}", serviceURL)
listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.Del, Content: c})
listener.DataChange(remoting.Event{Path: dubboPath, Action: remoting.Del})
}
logger.Warnf("listenSelf(zk path{%s}) goroutine exit now", zkPath)
}(dubboPath, serviceURL)
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment