diff --git a/common/config/environment.go b/common/config/environment.go new file mode 100644 index 0000000000000000000000000000000000000000..ce9c333635fc533e39bb32ae3fd00e0ddbb35f70 --- /dev/null +++ b/common/config/environment.go @@ -0,0 +1,17 @@ +package config + +import "sync" + +type Environment struct { +} + +var instance *Environment + +var once sync.Once + +func GetEnvInstance() *Environment { + once.Do(func() { + instance = &Environment{} + }) + return instance +} diff --git a/common/constant/default.go b/common/constant/default.go index f02ee7c56d5b2b83cd105f1a83e482b7b4679695..85f61f30f115493d3d520f2a68f36921735055d7 100644 --- a/common/constant/default.go +++ b/common/constant/default.go @@ -37,4 +37,3 @@ const ( DEFAULT_REFERENCE_FILTERS = "" ECHO = "$echo" ) - diff --git a/common/constant/key.go b/common/constant/key.go index 4ab735dd01e37e5d23f00fd6d16340c14169b174..e49ddae7259bc8fd07351e38964594f2b28d3795 100644 --- a/common/constant/key.go +++ b/common/constant/key.go @@ -68,7 +68,7 @@ const ( ENVIRONMENT_KEY = "environment" ) -const( +const ( CONFIG_NAMESPACE_KEY = "config.namespace" - CONFIG_TIMEOUT_KET = "config.timeout" -) \ No newline at end of file + CONFIG_TIMEOUT_KET = "config.timeout" +) diff --git a/common/extension/config_center_factory.go b/common/extension/config_center_factory.go new file mode 100644 index 0000000000000000000000000000000000000000..b66f40b1188fda9afcbdda5eeea5989640eacdb6 --- /dev/null +++ b/common/extension/config_center_factory.go @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package extension + +import ( + "github.com/apache/dubbo-go/config_center" +) + +var ( + configCenterFactories = make(map[string]func() config_center.DynamicConfigurationFactory) +) + +func SetConfigCenterFactory(name string, v func() config_center.DynamicConfigurationFactory) { + configCenterFactories[name] = v +} + +func GetConfigCenterFactory(name string) config_center.DynamicConfigurationFactory { + if configCenterFactories[name] == nil { + panic("config center for " + name + " is not existing, make sure you have import the package.") + } + return configCenterFactories[name]() + +} diff --git a/common/url.go b/common/url.go index 925c378310a007159479b046df7f68013364bc4a..115167ee3ebab6718dfd8a229f2a370e5b79a982 100644 --- a/common/url.go +++ b/common/url.go @@ -246,7 +246,6 @@ func (c URL) Key() string { return buildString } - func (c URL) Context() context.Context { return c.ctx } diff --git a/config/base_config.go b/config/base_config.go new file mode 100644 index 0000000000000000000000000000000000000000..edecb85b058382feb3aa8a3397a455790c5bfc54 --- /dev/null +++ b/config/base_config.go @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package config + +import ( + "context" +) +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/logger" + perrors "github.com/pkg/errors" +) +import ( + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/config_center" +) + +type baseConfig struct { + // application + ApplicationConfig ApplicationConfig `yaml:"application_config" json:"application_config,omitempty"` + ConfigCenterConfig ConfigCenterConfig + configCenterUrl *common.URL +} + +func (c *baseConfig) startConfigCenter(ctx context.Context) { + var err error + *c.configCenterUrl, err = common.NewURL(ctx, c.ConfigCenterConfig.Address) + c.prepareEnvironment() + +} + +func (c *baseConfig) prepareEnvironment() error { + + factory := extension.GetConfigCenterFactory(c.ConfigCenterConfig.Protocol) + dynamicConfig, err := factory.GetDynamicConfiguration(c.configCenterUrl) + if err != nil { + logger.Errorf("get dynamic configuration error , error message is %v", err) + return err + } + content, err := dynamicConfig.GetConfig(c.ConfigCenterConfig.ConfigFile, config_center.WithGroup(c.ConfigCenterConfig.Group)) + if err != nil { + logger.Errorf("Get config content in dynamic configuration error , error message is %v", err) + return err + } +} diff --git a/config/config_center_config.go b/config/config_center_config.go new file mode 100644 index 0000000000000000000000000000000000000000..54a0347709b311a730079a4882c26895045c5281 --- /dev/null +++ b/config/config_center_config.go @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package config + +import ( + "context" + "time" +) + +type ConfigCenterConfig struct { + context context.Context + Protocol string `required:"true" yaml:"protocol" json:"protocol,omitempty"` + Address string `yaml:"address" json:"address,omitempty"` + Cluster string `yaml:"cluster" json:"cluster,omitempty"` + Group string `default:"dubbo" yaml:"group" json:"group,omitempty"` + Username string `yaml:"username" json:"username,omitempty"` + Password string `yaml:"password" json:"password,omitempty"` + Check *bool `yaml:"check" json:"check,omitempty"` + ConfigFile string `default:"dubbo.properties" yaml:"config_file" json:"config_file,omitempty"` + TimeoutStr string `yaml:"timeout" json:"timeout,omitempty"` + timeout time.Duration +} diff --git a/config/config_loader.go b/config/config_loader.go index 2560b78bd15686d221cdb7f2e0e613954688e20e..af586af943607e70fad5d3f942efc1cb7105e010 100644 --- a/config/config_loader.go +++ b/config/config_loader.go @@ -123,6 +123,7 @@ func providerInit(confProFile string) error { ///////////////////////// type ConsumerConfig struct { + baseConfig Filter string `yaml:"filter" json:"filter,omitempty"` // client @@ -133,11 +134,11 @@ type ConsumerConfig struct { RequestTimeout time.Duration ProxyFactory string `yaml:"proxy_factory" default:"default" json:"proxy_factory,omitempty"` Check *bool `yaml:"check" json:"check,omitempty"` - // application - ApplicationConfig ApplicationConfig `yaml:"application_config" json:"application_config,omitempty"` - Registries []RegistryConfig `yaml:"registries" json:"registries,omitempty"` - References []ReferenceConfig `yaml:"references" json:"references,omitempty"` - ProtocolConf interface{} `yaml:"protocol_conf" json:"protocol_conf,omitempty"` + + Registries []RegistryConfig `yaml:"registries" json:"registries,omitempty"` + References []ReferenceConfig `yaml:"references" json:"references,omitempty"` + ConfigCenter ConfigCenterConfig `yaml:"config_center" json:"config_center,omitempty"` + ProtocolConf interface{} `yaml:"protocol_conf" json:"protocol_conf,omitempty"` } type ReferenceConfigTmp struct { diff --git a/config_center/dynamic_configuration.go b/config_center/dynamic_configuration.go index 8115fed8203438c9371a1b7fa20815843a2e81fd..e497ce728441aef4ea90eddcf7d335d92e6f0f9e 100644 --- a/config_center/dynamic_configuration.go +++ b/config_center/dynamic_configuration.go @@ -33,8 +33,8 @@ const DEFAULT_CONFIG_TIMEOUT = "10s" type DynamicConfiguration interface { AddListener(string, remoting.ConfigurationListener, ...Option) RemoveListener(string, remoting.ConfigurationListener, ...Option) - GetConfig(string, ...Option) string - GetConfigs(string, ...Option) string + GetConfig(string, ...Option) (string, error) + GetConfigs(string, ...Option) (string, error) } type Options struct { diff --git a/config_center/dynamic_configuration_factory.go b/config_center/dynamic_configuration_factory.go new file mode 100644 index 0000000000000000000000000000000000000000..95f074e283f3103bb224075dcf18c929cc82e765 --- /dev/null +++ b/config_center/dynamic_configuration_factory.go @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package config_center + +import "github.com/apache/dubbo-go/common" + +type DynamicConfigurationFactory interface { + GetDynamicConfiguration(*common.URL) (DynamicConfiguration, error) +} diff --git a/config_center/zookeeper/dynamic_configuration.go b/config_center/zookeeper/dynamic_configuration.go deleted file mode 100644 index c998c586fb0a22223acaee634cb5f187e04223fc..0000000000000000000000000000000000000000 --- a/config_center/zookeeper/dynamic_configuration.go +++ /dev/null @@ -1,134 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package zookeeper - -import ( - "sync" -) -import ( - "github.com/apache/dubbo-go/common" - "github.com/apache/dubbo-go/common/constant" - "github.com/apache/dubbo-go/common/logger" - "github.com/apache/dubbo-go/config_center" - "github.com/apache/dubbo-go/remoting" - "github.com/apache/dubbo-go/remoting/zookeeper" -) - -const ZK_CLIENT = "zk config_center" - -type ZookeeperDynamicConfiguration struct { - url common.URL - rootPath string - wg sync.WaitGroup - cltLock sync.Mutex - done chan struct{} - client *zookeeper.ZookeeperClient - - listenerLock sync.Mutex - listener *zookeeper.ZkEventListener -} - -func NewZookeeperDynamicConfiguration(url common.URL) (config_center.DynamicConfiguration, error) { - c := &ZookeeperDynamicConfiguration{ - url: url, - rootPath: "/" + url.GetParam(constant.CONFIG_NAMESPACE_KEY, config_center.DEFAULT_GROUP) + "/config", - } - err := zookeeper.ValidateZookeeperClient(c, zookeeper.WithZkName(ZK_CLIENT)) - if err != nil { - return nil, err - } - c.wg.Add(1) - go zookeeper.HandleClientRestart(c) - - c.listener = zookeeper.NewZkEventListener(c.client) - //c.configListener = NewRegistryConfigurationListener(c.client, c) - //c.dataListener = NewRegistryDataListener(c.configListener) - return c, nil - -} - -func (*ZookeeperDynamicConfiguration) AddListener(key string, listener remoting.ConfigurationListener, opions ...config_center.Option) { - -} - -func (*ZookeeperDynamicConfiguration) RemoveListener(key string, listener remoting.ConfigurationListener, opions ...config_center.Option) { - -} - -func (*ZookeeperDynamicConfiguration) GetConfig(key string, opions ...config_center.Option) string { - return "" -} - -func (*ZookeeperDynamicConfiguration) GetConfigs(key string, opions ...config_center.Option) string { - return "" -} - -func (r *ZookeeperDynamicConfiguration) ZkClient() *zookeeper.ZookeeperClient { - return r.client -} - -func (r *ZookeeperDynamicConfiguration) SetZkClient(client *zookeeper.ZookeeperClient) { - r.client = client -} - -func (r *ZookeeperDynamicConfiguration) ZkClientLock() *sync.Mutex { - return &r.cltLock -} - -func (r *ZookeeperDynamicConfiguration) WaitGroup() *sync.WaitGroup { - return &r.wg -} - -func (r *ZookeeperDynamicConfiguration) GetDone() chan struct{} { - return r.done -} - -func (r *ZookeeperDynamicConfiguration) GetUrl() common.URL { - return r.url -} - -func (r *ZookeeperDynamicConfiguration) Destroy() { - if r.listener != nil { - r.listener.Close() - } - close(r.done) - r.wg.Wait() - r.closeConfigs() -} - -func (r *ZookeeperDynamicConfiguration) IsAvailable() bool { - select { - case <-r.done: - return false - default: - return true - } -} - -func (r *ZookeeperDynamicConfiguration) closeConfigs() { - r.cltLock.Lock() - defer r.cltLock.Unlock() - logger.Infof("begin to close provider zk client") - // 鍏堝叧闂棫client锛屼互鍏抽棴tmp node - r.client.Close() - r.client = nil -} - -func (r *ZookeeperDynamicConfiguration) RestartCallBack() bool { - return true -} diff --git a/config_center/zookeeper/factory.go b/config_center/zookeeper/factory.go new file mode 100644 index 0000000000000000000000000000000000000000..3862db55bd001d64e66c7352a011ac0c32deefae --- /dev/null +++ b/config_center/zookeeper/factory.go @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package zookeeper + +import ( + "sync" +) +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/config_center" +) + +func init() { + extension.SetConfigCenterFactory("zookeeper", func() config_center.DynamicConfigurationFactory { return &zookeeperDynamicConfigurationFactory{} }) +} + +type zookeeperDynamicConfigurationFactory struct { +} + +var once sync.Once +var dynamicConfiguration *zookeeperDynamicConfiguration + +func (f *zookeeperDynamicConfigurationFactory) GetDynamicConfiguration(url *common.URL) (config_center.DynamicConfiguration, error) { + var err error + once.Do(func() { + dynamicConfiguration, err = newZookeeperDynamicConfiguration(url) + }) + return dynamicConfiguration, err + +} diff --git a/config_center/zookeeper/impl.go b/config_center/zookeeper/impl.go new file mode 100644 index 0000000000000000000000000000000000000000..dd46a7606fa17ec540da83c9cb778c58add45b9e --- /dev/null +++ b/config_center/zookeeper/impl.go @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package zookeeper + +import ( + "strings" + "sync" +) +import ( + perrors "github.com/pkg/errors" +) +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/logger" + "github.com/apache/dubbo-go/config_center" + "github.com/apache/dubbo-go/remoting" + "github.com/apache/dubbo-go/remoting/zookeeper" +) + +const ZkClient = "zk config_center" + +type zookeeperDynamicConfiguration struct { + url *common.URL + rootPath string + wg sync.WaitGroup + cltLock sync.Mutex + done chan struct{} + client *zookeeper.ZookeeperClient + + listenerLock sync.Mutex + listener *zookeeper.ZkEventListener + cacheListener *CacheListener +} + +func newZookeeperDynamicConfiguration(url *common.URL) (*zookeeperDynamicConfiguration, error) { + c := &zookeeperDynamicConfiguration{ + url: url, + rootPath: "/" + url.GetParam(constant.CONFIG_NAMESPACE_KEY, config_center.DEFAULT_GROUP) + "/config", + } + err := zookeeper.ValidateZookeeperClient(c, zookeeper.WithZkName(ZkClient)) + if err != nil { + logger.Errorf("zookeeper client start error ,error message is %v", err) + return nil, err + } + c.wg.Add(1) + go zookeeper.HandleClientRestart(c) + + c.listener = zookeeper.NewZkEventListener(c.client) + c.cacheListener = NewCacheListener(c.rootPath) + return c, nil + +} + +func (c *zookeeperDynamicConfiguration) AddListener(key string, listener remoting.ConfigurationListener, opions ...config_center.Option) { + c.cacheListener.AddListener(key, listener) +} + +func (c *zookeeperDynamicConfiguration) RemoveListener(key string, listener remoting.ConfigurationListener, opions ...config_center.Option) { + c.cacheListener.RemoveListener(key, listener) +} + +func (c *zookeeperDynamicConfiguration) GetConfig(key string, opts ...config_center.Option) (string, error) { + /** + * when group is not null, we are getting startup configs from Config Center, for example: + * group=dubbo, key=dubbo.properties + */ + opions := &config_center.Options{} + for _, opt := range opts { + opt(opions) + } + + if opions.Group != "" { + key = opions.Group + "/" + key + } + /** + * when group is null, we are fetching governance rules, for example: + * 1. key=org.apache.dubbo.DemoService.configurators + * 2. key = org.apache.dubbo.DemoService.condition-router + */ + i := strings.LastIndex(key, ".") + key = key[0:i] + "/" + key[i+1:] + content, _, err := c.client.GetContent(c.rootPath + "/" + key) + if err != nil { + return "", perrors.WithStack(err) + } else { + return string(content), nil + } +} + +//For zookeeper, getConfig and getConfigs have the same meaning. +func (c *zookeeperDynamicConfiguration) GetConfigs(key string, opts ...config_center.Option) (string, error) { + return c.GetConfig(key, opts...) +} + +func (r *zookeeperDynamicConfiguration) ZkClient() *zookeeper.ZookeeperClient { + return r.client +} + +func (r *zookeeperDynamicConfiguration) SetZkClient(client *zookeeper.ZookeeperClient) { + r.client = client +} + +func (r *zookeeperDynamicConfiguration) ZkClientLock() *sync.Mutex { + return &r.cltLock +} + +func (r *zookeeperDynamicConfiguration) WaitGroup() *sync.WaitGroup { + return &r.wg +} + +func (r *zookeeperDynamicConfiguration) GetDone() chan struct{} { + return r.done +} + +func (r *zookeeperDynamicConfiguration) GetUrl() common.URL { + return *r.url +} + +func (r *zookeeperDynamicConfiguration) Destroy() { + if r.listener != nil { + r.listener.Close() + } + close(r.done) + r.wg.Wait() + r.closeConfigs() +} + +func (r *zookeeperDynamicConfiguration) IsAvailable() bool { + select { + case <-r.done: + return false + default: + return true + } +} + +func (r *zookeeperDynamicConfiguration) closeConfigs() { + r.cltLock.Lock() + defer r.cltLock.Unlock() + logger.Infof("begin to close provider zk client") + // 鍏堝叧闂棫client锛屼互鍏抽棴tmp node + r.client.Close() + r.client = nil +} + +func (r *zookeeperDynamicConfiguration) RestartCallBack() bool { + return true +} diff --git a/config_center/zookeeper/listener.go b/config_center/zookeeper/listener.go new file mode 100644 index 0000000000000000000000000000000000000000..0188e8af259e1c243b133968d21e9b5a62e8598c --- /dev/null +++ b/config_center/zookeeper/listener.go @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package zookeeper + +import ( + "fmt" + "strings" + "sync" +) +import ( + "github.com/apache/dubbo-go/remoting" +) + +type CacheListener struct { + keyListeners sync.Map + rootPath string +} + +func NewCacheListener(rootPath string) *CacheListener { + return &CacheListener{rootPath: rootPath} +} +func (l *CacheListener) AddListener(key string, listener remoting.ConfigurationListener) { + + // reference from https://stackoverflow.com/questions/34018908/golang-why-dont-we-have-a-set-datastructure + // make a map[your type]struct{} like set in java + listeners, loaded := l.keyListeners.LoadOrStore(key, map[remoting.ConfigurationListener]struct{}{listener: struct{}{}}) + if loaded { + listeners.(map[remoting.ConfigurationListener]struct{})[listener] = struct{}{} + l.keyListeners.Store(key, listeners) + } +} + +func (l *CacheListener) RemoveListener(key string, listener remoting.ConfigurationListener) { + listeners, loaded := l.keyListeners.Load(key) + if loaded { + delete(listeners.(map[remoting.ConfigurationListener]struct{}), listener) + } +} + +func (l *CacheListener) DataChange(event remoting.Event) bool { + fmt.Println(event) + key := l.pathToKey(event.Path) + if key != "" { + if listeners, ok := l.keyListeners.Load(key); ok { + for listener := range listeners.(map[remoting.ConfigurationListener]struct{}) { + listener.Process(&remoting.ConfigChangeEvent{Key: key, Value: event.Content, ConfigType: event.Action}) + } + return true + } + } + return false +} + +func (l *CacheListener) pathToKey(path string) string { + return strings.ReplaceAll(strings.Replace(path, l.rootPath+"/", "", -1), "/", ".") +} diff --git a/go.sum b/go.sum index a4fd7f5099b5521b03e03cf5958c641991433d39..973f80fc375fee47d51b6680fbebc5a3566dbe88 100644 --- a/go.sum +++ b/go.sum @@ -27,7 +27,9 @@ go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/net v0.0.0-20190320064053-1272bf9dcd53 h1:kcXqo9vE6fsZY5X5Rd7R1l7fTgnWaDCVmln65REefiE= golang.org/x/net v0.0.0-20190320064053-1272bf9dcd53/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a h1:1BGLXjeY4akVXGgbC9HugT3Jv3hCI0z56oJR5vAMgBU= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/registry/zookeeper/listener.go b/registry/zookeeper/listener.go index 4f3cf1e19ad28cb10be8adf85351c1f2eb4d4622..99e4559b736194b00541be210b09bd0d68e6f882 100644 --- a/registry/zookeeper/listener.go +++ b/registry/zookeeper/listener.go @@ -19,6 +19,7 @@ package zookeeper import ( "context" + "strings" ) import ( perrors "github.com/pkg/errors" @@ -33,10 +34,10 @@ import ( type RegistryDataListener struct { interestedURL []*common.URL - listener *RegistryConfigurationListener + listener remoting.ConfigurationListener } -func NewRegistryDataListener(listener *RegistryConfigurationListener) *RegistryDataListener { +func NewRegistryDataListener(listener remoting.ConfigurationListener) *RegistryDataListener { return &RegistryDataListener{listener: listener, interestedURL: []*common.URL{}} } func (l *RegistryDataListener) AddInterestedURL(url *common.URL) { @@ -44,9 +45,11 @@ func (l *RegistryDataListener) AddInterestedURL(url *common.URL) { } func (l *RegistryDataListener) DataChange(eventType remoting.Event) bool { - serviceURL, err := common.NewURL(context.TODO(), eventType.Content) + //鎴彇鏈€鍚庝竴浣� + url := eventType.Path[strings.Index(eventType.Path, "/providers/")+len("/providers/"):] + serviceURL, err := common.NewURL(context.TODO(), url) if err != nil { - logger.Errorf("Listen NewURL(r{%s}) = error{%v}", eventType.Content, err) + logger.Errorf("Listen NewURL(r{%s}) = error{%v}", url, err) return false } for _, v := range l.interestedURL { diff --git a/registry/zookeeper/listener_test.go b/registry/zookeeper/listener_test.go new file mode 100644 index 0000000000000000000000000000000000000000..7ebc32b8a50bcb38d5ac859e4216065559fbd4e5 --- /dev/null +++ b/registry/zookeeper/listener_test.go @@ -0,0 +1,23 @@ +package zookeeper + +import ( + "context" + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/remoting" + "github.com/stretchr/testify/assert" + "testing" +) + +func Test_DataChange(t *testing.T) { + listener := NewRegistryDataListener(&MockDataListener{}) + url, _ := common.NewURL(context.TODO(), "jsonrpc%3A%2F%2F127.0.0.1%3A20001%2Fcom.ikurento.user.UserProvider%3Fanyhost%3Dtrue%26app.version%3D0.0.1%26application%3DBDTService%26category%3Dproviders%26cluster%3Dfailover%26dubbo%3Ddubbo-provider-golang-2.6.0%26environment%3Ddev%26group%3D%26interface%3Dcom.ikurento.user.UserProvider%26ip%3D10.32.20.124%26loadbalance%3Drandom%26methods.GetUser.loadbalance%3Drandom%26methods.GetUser.retries%3D1%26methods.GetUser.weight%3D0%26module%3Ddubbogo%2Buser-info%2Bserver%26name%3DBDTService%26organization%3Dikurento.com%26owner%3DZX%26pid%3D74500%26retries%3D0%26service.filter%3Decho%26side%3Dprovider%26timestamp%3D1560155407%26version%3D%26warmup%3D100") + listener.AddInterestedURL(&url) + int := listener.DataChange(remoting.Event{Path: "/dubbo/com.ikurento.user.UserProvider/providers/jsonrpc%3A%2F%2F127.0.0.1%3A20001%2Fcom.ikurento.user.UserProvider%3Fanyhost%3Dtrue%26app.version%3D0.0.1%26application%3DBDTService%26category%3Dproviders%26cluster%3Dfailover%26dubbo%3Ddubbo-provider-golang-2.6.0%26environment%3Ddev%26group%3D%26interface%3Dcom.ikurento.user.UserProvider%26ip%3D10.32.20.124%26loadbalance%3Drandom%26methods.GetUser.loadbalance%3Drandom%26methods.GetUser.retries%3D1%26methods.GetUser.weight%3D0%26module%3Ddubbogo%2Buser-info%2Bserver%26name%3DBDTService%26organization%3Dikurento.com%26owner%3DZX%26pid%3D74500%26retries%3D0%26service.filter%3Decho%26side%3Dprovider%26timestamp%3D1560155407%26version%3D%26warmup%3D100"}) + assert.Equal(t, true, int) +} + +type MockDataListener struct { +} + +func (*MockDataListener) Process(configType *remoting.ConfigChangeEvent) { +} diff --git a/remoting/listener.go b/remoting/listener.go index 37f75d46522c8b2405029725ba2178167ee19668..1bf5b5284ce10a1b8ed2238f41d3894888e41ea4 100644 --- a/remoting/listener.go +++ b/remoting/listener.go @@ -46,6 +46,7 @@ type EventType int const ( Add = iota Del + Mod ) var serviceEventTypeStrings = [...]string{ diff --git a/remoting/zookeeper/client.go b/remoting/zookeeper/client.go index 93382ed265ef2189cd0ad41bbe72adb8de606844..2ef5f7aeb78d7546cdb149f6af4d5a698eec6540 100644 --- a/remoting/zookeeper/client.go +++ b/remoting/zookeeper/client.go @@ -100,7 +100,7 @@ func WithZkName(name string) Option { } } -func ValidateZookeeperClient(container ZkClientContainer, opts ...Option) error { +func ValidateZookeeperClient(container zkClientFacade, opts ...Option) error { var ( err error ) @@ -553,3 +553,7 @@ func (z *ZookeeperClient) ExistW(zkPath string) (<-chan zk.Event, error) { return event, nil } + +func (z *ZookeeperClient) GetContent(zkPath string) ([]byte, *zk.Stat, error) { + return z.Conn.Get(zkPath) +} diff --git a/remoting/zookeeper/container.go b/remoting/zookeeper/facade.go similarity index 96% rename from remoting/zookeeper/container.go rename to remoting/zookeeper/facade.go index f869b32444a58c3396ec5520b7b8f859f4accd6f..4fd800f87732288527d9387580fe70d0a9cae9d2 100644 --- a/remoting/zookeeper/container.go +++ b/remoting/zookeeper/facade.go @@ -30,7 +30,7 @@ import ( "github.com/apache/dubbo-go/common/logger" ) -type ZkClientContainer interface { +type zkClientFacade interface { ZkClient() *ZookeeperClient SetZkClient(*ZookeeperClient) ZkClientLock() *sync.Mutex @@ -40,7 +40,7 @@ type ZkClientContainer interface { common.Node } -func HandleClientRestart(r ZkClientContainer) { +func HandleClientRestart(r zkClientFacade) { var ( err error diff --git a/remoting/zookeeper/listener.go b/remoting/zookeeper/listener.go index 703d06f84eacef8f830a75846c6130bb36239a1e..dcb0c308f1d8a7553323e151b4f333cb88557b2f 100644 --- a/remoting/zookeeper/listener.go +++ b/remoting/zookeeper/listener.go @@ -50,7 +50,7 @@ func NewZkEventListener(client *ZookeeperClient) *ZkEventListener { func (l *ZkEventListener) SetClient(client *ZookeeperClient) { l.client = client } -func (l *ZkEventListener) listenServiceNodeEvent(zkPath string) bool { +func (l *ZkEventListener) listenServiceNodeEvent(zkPath string, listener ...remoting.DataListener) bool { l.wg.Add(1) defer l.wg.Done() var zkEvent zk.Event @@ -68,8 +68,17 @@ func (l *ZkEventListener) listenServiceNodeEvent(zkPath string) bool { switch zkEvent.Type { case zk.EventNodeDataChanged: logger.Warnf("zk.ExistW(key{%s}) = event{EventNodeDataChanged}", zkPath) + if len(listener) > 0 { + content, _, _ := l.client.Conn.Get(zkEvent.Path) + listener[0].DataChange(remoting.Event{Path: zkEvent.Path, Action: remoting.Mod, Content: string(content)}) + } + case zk.EventNodeCreated: logger.Warnf("zk.ExistW(key{%s}) = event{EventNodeCreated}", zkPath) + if len(listener) > 0 { + content, _, _ := l.client.Conn.Get(zkEvent.Path) + listener[0].DataChange(remoting.Event{Path: zkEvent.Path, Action: remoting.Add, Content: string(content)}) + } case zk.EventNotWatching: logger.Warnf("zk.ExistW(key{%s}) = event{EventNotWatching}", zkPath) case zk.EventNodeDeleted: @@ -112,15 +121,19 @@ func (l *ZkEventListener) handleZkNodeEvent(zkPath string, children []string, li newNode = path.Join(zkPath, n) logger.Infof("add zkNode{%s}", newNode) - if !listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.Add, Content: n}) { + content, _, err := l.client.Conn.Get(newNode) + if err != nil { + logger.Errorf("Get new node path {%v} 's content error,message is {%v}", newNode, perrors.WithStack(err)) + } + if !listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.Add, Content: string(content)}) { continue } // listen l service node go func(node string) { logger.Infof("delete zkNode{%s}", node) - if l.listenServiceNodeEvent(node) { + if l.listenServiceNodeEvent(node, listener) { logger.Infof("delete content{%s}", n) - listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.Del, Content: n}) + listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.Del}) } logger.Warnf("listenSelf(zk path{%s}) goroutine exit now", zkPath) }(newNode) @@ -135,15 +148,12 @@ func (l *ZkEventListener) handleZkNodeEvent(zkPath string, children []string, li oldNode = path.Join(zkPath, n) logger.Warnf("delete zkPath{%s}", oldNode) - if !listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.Add, Content: n}) { - continue - } - logger.Warnf("delete content{%s}", n) + if err != nil { logger.Errorf("NewURL(i{%s}) = error{%v}", n, perrors.WithStack(err)) continue } - listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.Del, Content: n}) + listener.DataChange(remoting.Event{Path: oldNode, Action: remoting.Del}) } } @@ -245,17 +255,21 @@ func (l *ZkEventListener) ListenServiceEvent(zkPath string, listener remoting.Da } for _, c := range children { - if !listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.Add, Content: c}) { - continue - } // listen l service node dubboPath = path.Join(zkPath, c) + content, _, err := l.client.Conn.Get(dubboPath) + if err != nil { + logger.Errorf("Get new node path {%v} 's content error,message is {%v}", dubboPath, perrors.WithStack(err)) + } + if !listener.DataChange(remoting.Event{Path: dubboPath, Action: remoting.Add, Content: string(content)}) { + continue + } logger.Infof("listen dubbo service key{%s}", dubboPath) go func(zkPath string, serviceURL common.URL) { if l.listenServiceNodeEvent(dubboPath) { logger.Debugf("delete serviceUrl{%s}", serviceURL) - listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.Del, Content: c}) + listener.DataChange(remoting.Event{Path: dubboPath, Action: remoting.Del}) } logger.Warnf("listenSelf(zk path{%s}) goroutine exit now", zkPath) }(dubboPath, serviceURL)