diff --git a/common/configuration_listener.go b/common/configuration_listener.go new file mode 100644 index 0000000000000000000000000000000000000000..25f02da6f540c64862c8b333e85b9a445b4161c6 --- /dev/null +++ b/common/configuration_listener.go @@ -0,0 +1,51 @@ +package common + +import "fmt" + +type ConfigurationListener interface { + Process(*ConfigChangeEvent) +} + +type ConfigChangeEvent struct { + Key string + Value interface{} + ConfigType EventType +} + +func (c ConfigChangeEvent) String() string { + return fmt.Sprintf("ConfigChangeEvent{key = %v , value = %v , changeType = %v}", c.Key, c.Value, c.ConfigType) +} + +////////////////////////////////////////// +// event type +////////////////////////////////////////// + +type EventType int + +const ( + Add = iota + Del +) + +var serviceEventTypeStrings = [...]string{ + "add", + "delete", +} + +func (t EventType) String() string { + return serviceEventTypeStrings[t] +} + +////////////////////////////////////////// +// service event +////////////////////////////////////////// + +type Event struct { + Path string + Action EventType + Content string +} + +func (e Event) String() string { + return fmt.Sprintf("Event{Action{%s}, Content{%s}}", e.Action, e.Content) +} diff --git a/common/constant/default.go b/common/constant/default.go index 85f61f30f115493d3d520f2a68f36921735055d7..f02ee7c56d5b2b83cd105f1a83e482b7b4679695 100644 --- a/common/constant/default.go +++ b/common/constant/default.go @@ -37,3 +37,4 @@ const ( DEFAULT_REFERENCE_FILTERS = "" ECHO = "$echo" ) + diff --git a/common/constant/key.go b/common/constant/key.go index 985dd4f80f531c107ef11cee1282da8742d7d62a..4ab735dd01e37e5d23f00fd6d16340c14169b174 100644 --- a/common/constant/key.go +++ b/common/constant/key.go @@ -67,3 +67,8 @@ const ( OWNER_KEY = "owner" ENVIRONMENT_KEY = "environment" ) + +const( + CONFIG_NAMESPACE_KEY = "config.namespace" + CONFIG_TIMEOUT_KET = "config.timeout" +) \ No newline at end of file diff --git a/common/extension/config_center.go b/common/extension/config_center.go new file mode 100644 index 0000000000000000000000000000000000000000..be4b62ccdd9c36500c306c7f16abd054f91ae86b --- /dev/null +++ b/common/extension/config_center.go @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package extension + +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/config_center" +) + +var ( + configCenters = make(map[string]func(config *common.URL) (config_center.DynamicConfiguration, error)) +) + +func SetConfigCenter(name string, v func(config *common.URL) (config_center.DynamicConfiguration, error)) { + configCenters[name] = v +} + +func GetConfigCenter(name string, config *common.URL) (config_center.DynamicConfiguration, error) { + if configCenters[name] == nil { + panic("config center for " + name + " is not existing, make sure you have import the package.") + } + return configCenters[name](config) + +} diff --git a/common/url.go b/common/url.go index 115167ee3ebab6718dfd8a229f2a370e5b79a982..925c378310a007159479b046df7f68013364bc4a 100644 --- a/common/url.go +++ b/common/url.go @@ -246,6 +246,7 @@ func (c URL) Key() string { return buildString } + func (c URL) Context() context.Context { return c.ctx } diff --git a/config_center/dynamic_configuration.go b/config_center/dynamic_configuration.go new file mode 100644 index 0000000000000000000000000000000000000000..4f388a7dd89c121d0e8c1b83829ff5fa35fea24c --- /dev/null +++ b/config_center/dynamic_configuration.go @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package config_center + +import ( + "github.com/apache/dubbo-go/common" + "time" +) + +////////////////////////////////////////// +// DynamicConfiguration +////////////////////////////////////////// +const DEFAULT_GROUP = "dubbo" +const DEFAULT_CONFIG_TIMEOUT = "10s" + +type DynamicConfiguration interface { + AddListener(string, common.ConfigurationListener, ...Option) + RemoveListener(string, common.ConfigurationListener, ...Option) + GetConfig(string, ...Option) string + GetConfigs(string, ...Option) string +} + +type Options struct { + Group string + Timeout time.Duration +} + +type Option func(*Options) + +func WithGroup(group string) Option { + return func(opt *Options) { + opt.Group = group + } +} + +func WithTimeout(time time.Duration) Option { + return func(opt *Options) { + opt.Timeout = time + } +} diff --git a/config_center/zookeeper/dynamic_configuration.go b/config_center/zookeeper/dynamic_configuration.go new file mode 100644 index 0000000000000000000000000000000000000000..7c5ca4487fcdcaa88d403bb5587b47470e7666cb --- /dev/null +++ b/config_center/zookeeper/dynamic_configuration.go @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package zookeeper + +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/logger" + "github.com/apache/dubbo-go/config_center" + "github.com/apache/dubbo-go/remoting/zookeeper" + "sync" +) + +const ZkClient = "zk config_center" + +type ZookeeperDynamicConfiguration struct { + url common.URL + rootPath string + wg sync.WaitGroup + cltLock sync.Mutex + done chan struct{} + client *zookeeper.ZookeeperClient + + listenerLock sync.Mutex + listener *zookeeper.ZkEventListener +} + +func NewZookeeperDynamicConfiguration(url common.URL) (config_center.DynamicConfiguration, error) { + c := &ZookeeperDynamicConfiguration{ + url: url, + rootPath: "/" + url.GetParam(constant.CONFIG_NAMESPACE_KEY, config_center.DEFAULT_GROUP) + "/config", + } + err := zookeeper.ValidateZookeeperClient(c, zookeeper.WithZkName(ZkClient)) + if err != nil { + return nil, err + } + c.wg.Add(1) + go zookeeper.HandleClientRestart(c) + + c.listener = zookeeper.NewZkEventListener(c.client) + //c.configListener = NewRegistryConfigurationListener(c.client, c) + //c.dataListener = NewRegistryDataListener(c.configListener) + return c, nil + +} + +func (*ZookeeperDynamicConfiguration) AddListener(key string, listener common.ConfigurationListener, opions ...config_center.Option) { + +} + +func (*ZookeeperDynamicConfiguration) RemoveListener(key string, listener common.ConfigurationListener, opions ...config_center.Option) { + +} + +func (*ZookeeperDynamicConfiguration) GetConfig(key string, opions ...config_center.Option) string { + return "" +} + +func (*ZookeeperDynamicConfiguration) GetConfigs(key string, opions ...config_center.Option) string { + return "" +} + +func (r *ZookeeperDynamicConfiguration) ZkClient() *zookeeper.ZookeeperClient { + return r.client +} + +func (r *ZookeeperDynamicConfiguration) SetZkClient(client *zookeeper.ZookeeperClient) { + r.client = client +} + +func (r *ZookeeperDynamicConfiguration) ZkClientLock() *sync.Mutex { + return &r.cltLock +} + +func (r *ZookeeperDynamicConfiguration) WaitGroup() *sync.WaitGroup { + return &r.wg +} + +func (r *ZookeeperDynamicConfiguration) GetDone() chan struct{} { + return r.done +} + +func (r *ZookeeperDynamicConfiguration) GetUrl() common.URL { + return r.url +} + +func (r *ZookeeperDynamicConfiguration) Destroy() { + if r.listener != nil { + r.listener.Close() + } + close(r.done) + r.wg.Wait() + r.closeConfigs() +} + +func (r *ZookeeperDynamicConfiguration) IsAvailable() bool { + select { + case <-r.done: + return false + default: + return true + } +} + +func (r *ZookeeperDynamicConfiguration) closeConfigs() { + r.cltLock.Lock() + defer r.cltLock.Unlock() + logger.Infof("begin to close provider zk client") + // 鍏堝叧闂棫client锛屼互鍏抽棴tmp node + r.client.Close() + r.client = nil +} + +func (r *ZookeeperDynamicConfiguration) RestartCallBack() bool { + return true +} diff --git a/registry/directory/directory.go b/registry/directory/directory.go index 1003baf5df7bb659200bb12e118e9113577051af..8b740f449349eb51a8119e3abffb55ce06b35345 100644 --- a/registry/directory/directory.go +++ b/registry/directory/directory.go @@ -130,10 +130,10 @@ func (dir *registryDirectory) update(res *registry.ServiceEvent) { func (dir *registryDirectory) refreshInvokers(res *registry.ServiceEvent) { switch res.Action { - case registry.ServiceAdd: + case common.Add: //dir.cacheService.Add(res.Path, dir.serviceTTL) dir.cacheInvoker(res.Service) - case registry.ServiceDel: + case common.Del: //dir.cacheService.Del(res.Path, dir.serviceTTL) dir.uncacheInvoker(res.Service) logger.Infof("selector delete service url{%s}", res.Service) diff --git a/registry/directory/directory_test.go b/registry/directory/directory_test.go index eafd3f7c17a47520bd91e99176814753f5d7ddef..dc0cb71c9203041f7ad053fcd2b56cae88fde2c3 100644 --- a/registry/directory/directory_test.go +++ b/registry/directory/directory_test.go @@ -50,7 +50,7 @@ func TestSubscribe_Delete(t *testing.T) { registryDirectory, mockRegistry := normalRegistryDir() time.Sleep(1e9) assert.Len(t, registryDirectory.cacheInvokers, 3) - mockRegistry.MockEvent(®istry.ServiceEvent{Action: registry.ServiceDel, Service: *common.NewURLWithOptions("TEST0", common.WithProtocol("dubbo"))}) + mockRegistry.MockEvent(®istry.ServiceEvent{Action: common.Del, Service: *common.NewURLWithOptions("TEST0", common.WithProtocol("dubbo"))}) time.Sleep(1e9) assert.Len(t, registryDirectory.cacheInvokers, 2) } @@ -80,7 +80,7 @@ func TestSubscribe_Group(t *testing.T) { urlmap.Set(constant.GROUP_KEY, "group1") urlmap.Set(constant.CLUSTER_KEY, "failover") //to test merge url for i := 0; i < 3; i++ { - mockRegistry.(*registry.MockRegistry).MockEvent(®istry.ServiceEvent{Action: registry.ServiceAdd, Service: *common.NewURLWithOptions("TEST"+strconv.FormatInt(int64(i), 10), common.WithProtocol("dubbo"), + mockRegistry.(*registry.MockRegistry).MockEvent(®istry.ServiceEvent{Action: common.Add, Service: *common.NewURLWithOptions("TEST"+strconv.FormatInt(int64(i), 10), common.WithProtocol("dubbo"), common.WithParams(urlmap))}) } //for group2 @@ -88,7 +88,7 @@ func TestSubscribe_Group(t *testing.T) { urlmap2.Set(constant.GROUP_KEY, "group2") urlmap2.Set(constant.CLUSTER_KEY, "failover") //to test merge url for i := 0; i < 3; i++ { - mockRegistry.(*registry.MockRegistry).MockEvent(®istry.ServiceEvent{Action: registry.ServiceAdd, Service: *common.NewURLWithOptions("TEST"+strconv.FormatInt(int64(i), 10), common.WithProtocol("dubbo"), + mockRegistry.(*registry.MockRegistry).MockEvent(®istry.ServiceEvent{Action: common.Add, Service: *common.NewURLWithOptions("TEST"+strconv.FormatInt(int64(i), 10), common.WithProtocol("dubbo"), common.WithParams(urlmap2))}) } @@ -128,7 +128,7 @@ func normalRegistryDir() (*registryDirectory, *registry.MockRegistry) { go registryDirectory.Subscribe(*common.NewURLWithOptions("testservice")) for i := 0; i < 3; i++ { - mockRegistry.(*registry.MockRegistry).MockEvent(®istry.ServiceEvent{Action: registry.ServiceAdd, Service: *common.NewURLWithOptions("TEST"+strconv.FormatInt(int64(i), 10), common.WithProtocol("dubbo"))}) + mockRegistry.(*registry.MockRegistry).MockEvent(®istry.ServiceEvent{Action: common.Add, Service: *common.NewURLWithOptions("TEST"+strconv.FormatInt(int64(i), 10), common.WithProtocol("dubbo"))}) } return registryDirectory, mockRegistry.(*registry.MockRegistry) } diff --git a/registry/event.go b/registry/event.go index ef51bbbe1710b1fdb166d34af58c630a5203b53a..83607087512de71988f15e38177f38ad807c423a 100644 --- a/registry/event.go +++ b/registry/event.go @@ -31,32 +31,12 @@ func init() { rand.Seed(time.Now().UnixNano()) } -////////////////////////////////////////// -// service url event type -////////////////////////////////////////// - -type ServiceEventType int - -const ( - ServiceAdd = iota - ServiceDel -) - -var serviceEventTypeStrings = [...]string{ - "add service", - "delete service", -} - -func (t ServiceEventType) String() string { - return serviceEventTypeStrings[t] -} - ////////////////////////////////////////// // service event ////////////////////////////////////////// type ServiceEvent struct { - Action ServiceEventType + Action common.EventType Service common.URL } diff --git a/registry/zookeeper/dataListener.go b/registry/zookeeper/dataListener.go new file mode 100644 index 0000000000000000000000000000000000000000..792db4c7cc409be833912a038664d00cc0ad42f7 --- /dev/null +++ b/registry/zookeeper/dataListener.go @@ -0,0 +1,84 @@ +package zookeeper + +import ( + "context" + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/logger" + "github.com/apache/dubbo-go/registry" + zk "github.com/apache/dubbo-go/remoting/zookeeper" + perrors "github.com/pkg/errors" +) + +type RegistryDataListener struct { + interestedURL []*common.URL + listener *RegistryConfigurationListener +} + +func NewRegistryDataListener(listener *RegistryConfigurationListener) *RegistryDataListener { + return &RegistryDataListener{listener: listener, interestedURL: []*common.URL{}} +} +func (l *RegistryDataListener) AddInterestedURL(url *common.URL) { + l.interestedURL = append(l.interestedURL, url) +} + +func (l *RegistryDataListener) DataChange(eventType zk.ZkEvent) bool { + serviceURL, err := common.NewURL(context.TODO(), eventType.Res.Content) + if err != nil { + logger.Errorf("Listen NewURL(r{%s}) = error{%v}", eventType.Res.Content, err) + return false + } + for _, v := range l.interestedURL { + if serviceURL.URLEqual(*v) { + l.listener.Process(&common.ConfigChangeEvent{Value: serviceURL, ConfigType: eventType.Res.Action}) + return true + } + } + + return false +} + +type RegistryConfigurationListener struct { + client *zk.ZookeeperClient + registry *zkRegistry + events chan *common.ConfigChangeEvent +} + +func NewRegistryConfigurationListener(client *zk.ZookeeperClient, reg *zkRegistry) *RegistryConfigurationListener { + reg.wg.Add(1) + return &RegistryConfigurationListener{client: client, registry: reg, events: make(chan *common.ConfigChangeEvent, 32)} +} +func (l *RegistryConfigurationListener) Process(configType *common.ConfigChangeEvent) { + l.events <- configType +} + +func (l *RegistryConfigurationListener) Next() (*registry.ServiceEvent, error) { + for { + select { + case <-l.client.Done(): + logger.Warnf("listener's zk client connection is broken, so zk event listener exit now.") + return nil, perrors.New("listener stopped") + + case <-l.registry.done: + logger.Warnf("zk consumer register has quit, so zk event listener exit asap now.") + return nil, perrors.New("listener stopped") + + case e := <-l.events: + logger.Debugf("got zk event %s", e) + if e.ConfigType == common.Del && !l.valid() { + logger.Warnf("update @result{%s}. But its connection to registry is invalid", e.Value) + continue + } + //r.update(e.res) + //write to invoker + //r.outerEventCh <- e.res + return ®istry.ServiceEvent{Action: e.ConfigType, Service: e.Value.(common.URL)}, nil + } + } +} +func (l *RegistryConfigurationListener) Close() { + l.registry.wg.Done() +} + +func (l *RegistryConfigurationListener) valid() bool { + return l.client.ZkConnValid() +} diff --git a/registry/zookeeper/registry.go b/registry/zookeeper/registry.go index fbcc9e4546b908f5d11d3b2f8915003fa9512be2..01841867051c84d536f38f23833430a433938bdb 100644 --- a/registry/zookeeper/registry.go +++ b/registry/zookeeper/registry.go @@ -40,13 +40,13 @@ import ( "github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/common/utils" "github.com/apache/dubbo-go/registry" + "github.com/apache/dubbo-go/remoting/zookeeper" "github.com/apache/dubbo-go/version" ) const ( - defaultTimeout = int64(10e9) - RegistryZkClient = "zk registry" - RegistryConnDelay = 3 + defaultTimeout = int64(10e9) + RegistryZkClient = "zk registry" ) var ( @@ -73,14 +73,16 @@ type zkRegistry struct { done chan struct{} cltLock sync.Mutex - client *zookeeperClient + client *zookeeper.ZookeeperClient services map[string]common.URL // service name + protocol -> service config - listenerLock sync.Mutex - listener *zkEventListener - + listenerLock sync.Mutex + listener *zookeeper.ZkEventListener + dataListener *RegistryDataListener + configListener *RegistryConfigurationListener //for provider zkPath map[string]int // key = protocol://ip:port/interface + } func newZkRegistry(url *common.URL) (registry.Registry, error) { @@ -97,30 +99,28 @@ func newZkRegistry(url *common.URL) (registry.Registry, error) { zkPath: make(map[string]int), } - //if r.SubURL.Name == "" { - // r.SubURL.Name = RegistryZkClient - //} - //if r.Version == "" { - // r.Version = version.Version - //} - - err = r.validateZookeeperClient() + err = zookeeper.ValidateZookeeperClient(r, zookeeper.WithZkName(RegistryZkClient)) if err != nil { return nil, err } r.wg.Add(1) - go r.handleZkRestart() + go zookeeper.HandleClientRestart(r) - //if r.RoleType == registry.CONSUMER { - // r.wg.Add(1) - // go r.listen() - //} + r.listener = zookeeper.NewZkEventListener(r.client) + r.configListener = NewRegistryConfigurationListener(r.client, r) + r.dataListener = NewRegistryDataListener(r.configListener) return r, nil } -func newMockZkRegistry(url *common.URL) (*zk.TestCluster, *zkRegistry, error) { +type Options struct { + client *zookeeper.ZookeeperClient +} + +type Option func(*Options) + +func newMockZkRegistry(url *common.URL, opts ...zookeeper.Option) (*zk.TestCluster, *zkRegistry, error) { var ( err error r *zkRegistry @@ -136,139 +136,78 @@ func newMockZkRegistry(url *common.URL) (*zk.TestCluster, *zkRegistry, error) { zkPath: make(map[string]int), } - c, r.client, _, err = newMockZookeeperClient("test", 15*time.Second) + c, r.client, _, err = zookeeper.NewMockZookeeperClient("test", 15*time.Second, opts...) if err != nil { return nil, nil, err } - r.wg.Add(1) - go r.handleZkRestart() + go zookeeper.HandleClientRestart(r) - //if r.RoleType == registry.CONSUMER { - // r.wg.Add(1) - // go r.listen() - //} + r.listener = zookeeper.NewZkEventListener(r.client) + r.configListener = NewRegistryConfigurationListener(r.client, r) + r.dataListener = NewRegistryDataListener(r.configListener) return c, r, nil } +func (r *zkRegistry) ZkClient() *zookeeper.ZookeeperClient { + return r.client +} + +func (r *zkRegistry) SetZkClient(client *zookeeper.ZookeeperClient) { + r.client = client +} + +func (r *zkRegistry) ZkClientLock() *sync.Mutex { + return &r.cltLock +} + +func (r *zkRegistry) WaitGroup() *sync.WaitGroup { + return &r.wg +} + +func (r *zkRegistry) GetDone() chan struct{} { + return r.done +} + func (r *zkRegistry) GetUrl() common.URL { return *r.URL } func (r *zkRegistry) Destroy() { - if r.listener != nil { - r.listener.Close() + if r.configListener != nil { + r.configListener.Close() } close(r.done) r.wg.Wait() r.closeRegisters() } -func (r *zkRegistry) validateZookeeperClient() error { - var ( - err error - ) +func (r *zkRegistry) RestartCallBack() bool { - err = nil - r.cltLock.Lock() - defer r.cltLock.Unlock() - if r.client == nil { - //in dubbp ,every registry only connect one node ,so this is []string{r.Address} - timeout, err := time.ParseDuration(r.GetParam(constant.REGISTRY_TIMEOUT_KEY, constant.DEFAULT_REG_TIMEOUT)) - if err != nil { - logger.Errorf("timeout config %v is invalid ,err is %v", - r.GetParam(constant.REGISTRY_TIMEOUT_KEY, constant.DEFAULT_REG_TIMEOUT), err.Error()) - return perrors.WithMessagef(err, "newZookeeperClient(address:%+v)", r.Location) - } - r.client, err = newZookeeperClient(RegistryZkClient, []string{r.Location}, timeout) - if err != nil { - logger.Warnf("newZookeeperClient(name{%s}, zk addresss{%v}, timeout{%d}) = error{%v}", - RegistryZkClient, r.Location, timeout.String(), err) - return perrors.WithMessagef(err, "newZookeeperClient(address:%+v)", r.Location) - } - } - if r.client.conn == nil { - var event <-chan zk.Event - r.client.conn, event, err = zk.Connect(r.client.zkAddrs, r.client.timeout) - if err == nil { - r.client.wait.Add(1) - go r.client.handleZkEvent(event) - } + // copy r.services + services := []common.URL{} + for _, confIf := range r.services { + services = append(services, confIf) } - return perrors.WithMessagef(err, "newZookeeperClient(address:%+v)", r.PrimitiveURL) -} - -func (r *zkRegistry) handleZkRestart() { - var ( - err error - flag bool - failTimes int - confIf common.URL - ) - - defer r.wg.Done() -LOOP: - for { - select { - case <-r.done: - logger.Warnf("(ZkProviderRegistry)reconnectZkRegistry goroutine exit now...") - break LOOP - // re-register all services - case <-r.client.done(): - r.cltLock.Lock() - r.client.Close() - r.client = nil - r.cltLock.Unlock() - - // 鎺k锛岀洿鑷虫垚鍔� - failTimes = 0 - for { - select { - case <-r.done: - logger.Warnf("(ZkProviderRegistry)reconnectZkRegistry goroutine exit now...") - break LOOP - case <-time.After(time.Duration(1e9 * failTimes * RegistryConnDelay)): // 闃叉鐤媯閲嶈繛zk - } - err = r.validateZookeeperClient() - logger.Infof("ZkProviderRegistry.validateZookeeperClient(zkAddr{%s}) = error{%#v}", - r.client.zkAddrs, perrors.WithStack(err)) - if err == nil { - // copy r.services - services := []common.URL{} - for _, confIf = range r.services { - services = append(services, confIf) - } - - flag = true - for _, confIf = range services { - err = r.register(confIf) - if err != nil { - logger.Errorf("(ZkProviderRegistry)register(conf{%#v}) = error{%#v}", - confIf, perrors.WithStack(err)) - flag = false - break - } - logger.Infof("success to re-register service :%v", confIf.Key()) - } - if flag { - break - } - } - failTimes++ - if MaxFailTimes <= failTimes { - failTimes = MaxFailTimes - } - } + flag := true + for _, confIf := range services { + err := r.register(confIf) + if err != nil { + logger.Errorf("(ZkProviderRegistry)register(conf{%#v}) = error{%#v}", + confIf, perrors.WithStack(err)) + flag = false + break } + logger.Infof("success to re-register service :%v", confIf.Key()) } + return flag } func (r *zkRegistry) Register(conf common.URL) error { var ( - ok bool - err error - listener *zkEventListener + ok bool + err error ) role, _ := strconv.Atoi(r.URL.GetParam(constant.ROLE_KEY, "")) switch role { @@ -291,12 +230,6 @@ func (r *zkRegistry) Register(conf common.URL) error { r.cltLock.Unlock() logger.Debugf("(consumerZkConsumerRegistry)Register(conf{%#v})", conf) - r.listenerLock.Lock() - listener = r.listener - r.listenerLock.Unlock() - if listener != nil { - go listener.listenServiceEvent(conf) - } case common.PROVIDER: // 妫€楠屾湇鍔℃槸鍚﹀凡缁忔敞鍐岃繃 @@ -337,7 +270,7 @@ func (r *zkRegistry) register(c common.URL) error { //conf config.URL ) - err = r.validateZookeeperClient() + err = zookeeper.ValidateZookeeperClient(r, zookeeper.WithZkName(RegistryZkClient)) if err != nil { return perrors.WithStack(err) } @@ -428,6 +361,7 @@ func (r *zkRegistry) register(c common.URL) error { dubboPath = fmt.Sprintf("/dubbo%s/%s", c.Path, (common.RoleType(common.CONSUMER)).String()) logger.Debugf("consumer path:%s, url:%s", dubboPath, rawURL) + default: return perrors.Errorf("@c{%v} type is not referencer or provider", c) } @@ -464,44 +398,37 @@ func (r *zkRegistry) registerTempZookeeperNode(root string, node string) error { } func (r *zkRegistry) Subscribe(conf common.URL) (registry.Listener, error) { - r.wg.Add(1) return r.getListener(conf) } -func (r *zkRegistry) getListener(conf common.URL) (*zkEventListener, error) { +func (r *zkRegistry) getListener(conf common.URL) (*RegistryConfigurationListener, error) { var ( - zkListener *zkEventListener + zkListener *RegistryConfigurationListener ) r.listenerLock.Lock() - zkListener = r.listener + zkListener = r.configListener r.listenerLock.Unlock() - if zkListener != nil { - return zkListener, nil - } + if r.listener == nil { + r.cltLock.Lock() + client := r.client + r.cltLock.Unlock() + if client == nil { + return nil, perrors.New("zk connection broken") + } - r.cltLock.Lock() - client := r.client - r.cltLock.Unlock() - if client == nil { - return nil, perrors.New("zk connection broken") - } + // new client & listener + listener := zookeeper.NewZkEventListener(r.client) - // new client & listener - zkListener = newZkEventListener(r, client) + r.listenerLock.Lock() + r.listener = listener + r.listenerLock.Unlock() + } - r.listenerLock.Lock() - r.listener = zkListener - r.listenerLock.Unlock() + //娉ㄥ唽鍒癲ataconfig鐨刬nterested + r.dataListener.AddInterestedURL(&conf) - // listen - r.cltLock.Lock() - for _, svs := range r.services { - if svs.URLEqual(conf) { - go zkListener.listenServiceEvent(svs) - } - } - r.cltLock.Unlock() + go r.listener.ListenServiceEvent(fmt.Sprintf("/dubbo%s/providers", conf.Path), r.dataListener) return zkListener, nil } diff --git a/registry/zookeeper/registry_test.go b/registry/zookeeper/registry_test.go index 2e85e12f3a87df880b78dc70760efaa7e6dd8203..aac6b883d89cc52d71712a66c718eac7bb823f2c 100644 --- a/registry/zookeeper/registry_test.go +++ b/registry/zookeeper/registry_test.go @@ -19,6 +19,7 @@ package zookeeper import ( "context" + "github.com/apache/dubbo-go/remoting/zookeeper" "strconv" "testing" "time" @@ -40,7 +41,7 @@ func Test_Register(t *testing.T) { ts, reg, err := newMockZkRegistry(®url) defer ts.Stop() err = reg.Register(url) - children, _ := reg.client.getChildren("/dubbo/com.ikurento.user.UserProvider/providers") + children, _ := reg.client.GetChildren("/dubbo/com.ikurento.user.UserProvider/providers") assert.Regexp(t, ".*dubbo%3A%2F%2F127.0.0.1%3A20000%2Fcom.ikurento.user.UserProvider%3Fanyhost%3Dtrue%26category%3Dproviders%26cluster%3Dmock%26dubbo%3Ddubbo-provider-golang-2.6.0%26.*provider", children) assert.NoError(t, err) } @@ -49,7 +50,6 @@ func Test_Subscribe(t *testing.T) { regurl, _ := common.NewURL(context.TODO(), "registry://127.0.0.1:1111", common.WithParamsValue(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER))) url, _ := common.NewURL(context.TODO(), "dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider", common.WithParamsValue(constant.CLUSTER_KEY, "mock"), common.WithMethods([]string{"GetUser", "AddUser"})) ts, reg, err := newMockZkRegistry(®url) - defer ts.Stop() //provider register err = reg.Register(url) @@ -61,8 +61,8 @@ func Test_Subscribe(t *testing.T) { //consumer register regurl.Params.Set(constant.ROLE_KEY, strconv.Itoa(common.CONSUMER)) - _, reg2, err := newMockZkRegistry(®url) - reg2.client = reg.client + _, reg2, err := newMockZkRegistry(®url, zookeeper.WithTestCluster(ts)) + err = reg2.Register(url) listener, err := reg2.Subscribe(url) @@ -71,8 +71,8 @@ func Test_Subscribe(t *testing.T) { if err != nil { return } - assert.Regexp(t, ".*ServiceEvent{Action{add service}.*", serviceEvent.String()) - + assert.Regexp(t, ".*ServiceEvent{Action{add}.*", serviceEvent.String()) + defer ts.Stop() } func Test_ConsumerDestory(t *testing.T) { diff --git a/registry/zookeeper/zk_client.go b/remoting/zookeeper/client.go similarity index 66% rename from registry/zookeeper/zk_client.go rename to remoting/zookeeper/client.go index a8b14bb2bd04938e84cf3c7f482f6ae650c3c98b..d431822ffbeb912c19dd1ae79e7792c06c80aef6 100644 --- a/registry/zookeeper/zk_client.go +++ b/remoting/zookeeper/client.go @@ -18,6 +18,7 @@ package zookeeper import ( + "github.com/apache/dubbo-go/common/constant" "path" "strings" "sync" @@ -30,22 +31,27 @@ import ( "github.com/samuel/go-zookeeper/zk" ) +const ( + ConnDelay = 3 + MaxFailTimes = 15 +) + var ( errNilZkClientConn = perrors.New("zookeeperclient{conn} is nil") ) -type zookeeperClient struct { +type ZookeeperClient struct { name string - zkAddrs []string + ZkAddrs []string sync.Mutex // for conn - conn *zk.Conn - timeout time.Duration + Conn *zk.Conn + Timeout time.Duration exit chan struct{} - wait sync.WaitGroup + Wait sync.WaitGroup eventRegistry map[string][]*chan struct{} } -func stateToString(state zk.State) string { +func StateToString(state zk.State) string { switch state { case zk.StateDisconnected: return "zookeeper disconnected" @@ -76,55 +82,128 @@ func stateToString(state zk.State) string { return "zookeeper unknown state" } -func timeSecondDuration(sec int) time.Duration { - return time.Duration(sec) * time.Second +type Options struct { + zkName string + client *ZookeeperClient + + ts *zk.TestCluster +} + +type Option func(*Options) + +func WithZkName(name string) Option { + return func(opt *Options) { + opt.zkName = name + } +} + +func ValidateZookeeperClient(container ZkClientContainer, opts ...Option) error { + var ( + err error + ) + opions := &Options{} + for _, opt := range opts { + opt(opions) + } + + err = nil + + lock := container.ZkClientLock() + url := container.GetUrl() + + lock.Lock() + defer lock.Unlock() + + if container.ZkClient() == nil { + //in dubbp ,every registry only connect one node ,so this is []string{r.Address} + timeout, err := time.ParseDuration(url.GetParam(constant.REGISTRY_TIMEOUT_KEY, constant.DEFAULT_REG_TIMEOUT)) + if err != nil { + logger.Errorf("timeout config %v is invalid ,err is %v", + url.GetParam(constant.REGISTRY_TIMEOUT_KEY, constant.DEFAULT_REG_TIMEOUT), err.Error()) + return perrors.WithMessagef(err, "newZookeeperClient(address:%+v)", url.Location) + } + newClient, err := newZookeeperClient(opions.zkName, []string{url.Location}, timeout) + if err != nil { + logger.Warnf("newZookeeperClient(name{%s}, zk addresss{%v}, timeout{%d}) = error{%v}", + opions.zkName, url.Location, timeout.String(), err) + return perrors.WithMessagef(err, "newZookeeperClient(address:%+v)", url.Location) + } + container.SetZkClient(newClient) + } + + if container.ZkClient().Conn == nil { + var event <-chan zk.Event + container.ZkClient().Conn, event, err = zk.Connect(container.ZkClient().ZkAddrs, container.ZkClient().Timeout) + if err == nil { + container.ZkClient().Wait.Add(1) + go container.ZkClient().HandleZkEvent(event) + } + } + + return perrors.WithMessagef(err, "newZookeeperClient(address:%+v)", url.PrimitiveURL) } -func newZookeeperClient(name string, zkAddrs []string, timeout time.Duration) (*zookeeperClient, error) { +func newZookeeperClient(name string, zkAddrs []string, timeout time.Duration) (*ZookeeperClient, error) { var ( err error event <-chan zk.Event - z *zookeeperClient + z *ZookeeperClient ) - z = &zookeeperClient{ + z = &ZookeeperClient{ name: name, - zkAddrs: zkAddrs, - timeout: timeout, + ZkAddrs: zkAddrs, + Timeout: timeout, exit: make(chan struct{}), eventRegistry: make(map[string][]*chan struct{}), } // connect to zookeeper - z.conn, event, err = zk.Connect(zkAddrs, timeout) + z.Conn, event, err = zk.Connect(zkAddrs, timeout) if err != nil { return nil, perrors.WithMessagef(err, "zk.Connect(zkAddrs:%+v)", zkAddrs) } - z.wait.Add(1) - go z.handleZkEvent(event) + z.Wait.Add(1) + go z.HandleZkEvent(event) return z, nil } -func newMockZookeeperClient(name string, timeout time.Duration) (*zk.TestCluster, *zookeeperClient, <-chan zk.Event, error) { +func WithTestCluster(ts *zk.TestCluster) Option { + return func(opt *Options) { + opt.ts = ts + } +} + +func NewMockZookeeperClient(name string, timeout time.Duration, opts ...Option) (*zk.TestCluster, *ZookeeperClient, <-chan zk.Event, error) { var ( err error event <-chan zk.Event - z *zookeeperClient + z *ZookeeperClient + ts *zk.TestCluster ) - z = &zookeeperClient{ + z = &ZookeeperClient{ name: name, - zkAddrs: []string{}, - timeout: timeout, + ZkAddrs: []string{}, + Timeout: timeout, exit: make(chan struct{}), eventRegistry: make(map[string][]*chan struct{}), } - // connect to zookeeper - ts, err := zk.StartTestCluster(1, nil, nil) - if err != nil { - return nil, nil, nil, perrors.WithMessagef(err, "zk.Connect") + opions := &Options{} + for _, opt := range opts { + opt(opions) + } + + // connect to zookeeper + if opions.ts != nil { + ts = opions.ts + } else { + ts, err = zk.StartTestCluster(1, nil, nil) + if err != nil { + return nil, nil, nil, perrors.WithMessagef(err, "zk.Connect") + } } //callbackChan := make(chan zk.Event) @@ -132,7 +211,7 @@ func newMockZookeeperClient(name string, timeout time.Duration) (*zk.TestCluster // callbackChan <- event //} - z.conn, event, err = ts.ConnectWithOptions(timeout) + z.Conn, event, err = ts.ConnectWithOptions(timeout) if err != nil { return nil, nil, nil, perrors.WithMessagef(err, "zk.Connect") } @@ -141,15 +220,16 @@ func newMockZookeeperClient(name string, timeout time.Duration) (*zk.TestCluster return ts, z, event, nil } -func (z *zookeeperClient) handleZkEvent(session <-chan zk.Event) { + +func (z *ZookeeperClient) HandleZkEvent(session <-chan zk.Event) { var ( state int event zk.Event ) defer func() { - z.wait.Done() - logger.Infof("zk{path:%v, name:%s} connection goroutine game over.", z.zkAddrs, z.name) + z.Wait.Done() + logger.Infof("zk{path:%v, name:%s} connection goroutine game over.", z.ZkAddrs, z.name) }() LOOP: @@ -159,15 +239,15 @@ LOOP: break LOOP case event = <-session: logger.Warnf("client{%s} get a zookeeper event{type:%s, server:%s, path:%s, state:%d-%s, err:%v}", - z.name, event.Type, event.Server, event.Path, event.State, stateToString(event.State), event.Err) + z.name, event.Type, event.Server, event.Path, event.State, StateToString(event.State), event.Err) switch (int)(event.State) { case (int)(zk.StateDisconnected): - logger.Warnf("zk{addr:%s} state is StateDisconnected, so close the zk client{name:%s}.", z.zkAddrs, z.name) + logger.Warnf("zk{addr:%s} state is StateDisconnected, so close the zk client{name:%s}.", z.ZkAddrs, z.name) z.stop() z.Lock() - if z.conn != nil { - z.conn.Close() - z.conn = nil + if z.Conn != nil { + z.Conn.Close() + z.Conn = nil } z.Unlock() break LOOP @@ -199,7 +279,7 @@ LOOP: } } -func (z *zookeeperClient) registerEvent(zkPath string, event *chan struct{}) { +func (z *ZookeeperClient) RegisterEvent(zkPath string, event *chan struct{}) { if zkPath == "" || event == nil { return } @@ -212,7 +292,7 @@ func (z *zookeeperClient) registerEvent(zkPath string, event *chan struct{}) { z.Unlock() } -func (z *zookeeperClient) unregisterEvent(zkPath string, event *chan struct{}) { +func (z *ZookeeperClient) UnregisterEvent(zkPath string, event *chan struct{}) { if zkPath == "" { return } @@ -241,11 +321,11 @@ func (z *zookeeperClient) unregisterEvent(zkPath string, event *chan struct{}) { z.Unlock() } -func (z *zookeeperClient) done() <-chan struct{} { +func (z *ZookeeperClient) Done() <-chan struct{} { return z.exit } -func (z *zookeeperClient) stop() bool { +func (z *ZookeeperClient) stop() bool { select { case <-z.exit: return true @@ -256,7 +336,7 @@ func (z *zookeeperClient) stop() bool { return false } -func (z *zookeeperClient) zkConnValid() bool { +func (z *ZookeeperClient) ZkConnValid() bool { select { case <-z.exit: return false @@ -265,7 +345,7 @@ func (z *zookeeperClient) zkConnValid() bool { valid := true z.Lock() - if z.conn == nil { + if z.Conn == nil { valid = false } z.Unlock() @@ -273,23 +353,23 @@ func (z *zookeeperClient) zkConnValid() bool { return valid } -func (z *zookeeperClient) Close() { +func (z *ZookeeperClient) Close() { if z == nil { return } z.stop() - z.wait.Wait() + z.Wait.Wait() z.Lock() - if z.conn != nil { - z.conn.Close() - z.conn = nil + if z.Conn != nil { + z.Conn.Close() + z.Conn = nil } z.Unlock() - logger.Warnf("zkClient{name:%s, zk addr:%s} exit now.", z.name, z.zkAddrs) + logger.Warnf("zkClient{name:%s, zk addr:%s} exit now.", z.name, z.ZkAddrs) } -func (z *zookeeperClient) Create(basePath string) error { +func (z *ZookeeperClient) Create(basePath string) error { var ( err error tmpPath string @@ -300,8 +380,8 @@ func (z *zookeeperClient) Create(basePath string) error { tmpPath = path.Join(tmpPath, "/", str) err = errNilZkClientConn z.Lock() - if z.conn != nil { - _, err = z.conn.Create(tmpPath, []byte(""), 0, zk.WorldACL(zk.PermAll)) + if z.Conn != nil { + _, err = z.Conn.Create(tmpPath, []byte(""), 0, zk.WorldACL(zk.PermAll)) } z.Unlock() if err != nil { @@ -317,22 +397,22 @@ func (z *zookeeperClient) Create(basePath string) error { return nil } -func (z *zookeeperClient) Delete(basePath string) error { +func (z *ZookeeperClient) Delete(basePath string) error { var ( err error ) err = errNilZkClientConn z.Lock() - if z.conn != nil { - err = z.conn.Delete(basePath, -1) + if z.Conn != nil { + err = z.Conn.Delete(basePath, -1) } z.Unlock() return perrors.WithMessagef(err, "Delete(basePath:%s)", basePath) } -func (z *zookeeperClient) RegisterTemp(basePath string, node string) (string, error) { +func (z *ZookeeperClient) RegisterTemp(basePath string, node string) (string, error) { var ( err error data []byte @@ -344,8 +424,8 @@ func (z *zookeeperClient) RegisterTemp(basePath string, node string) (string, er data = []byte("") zkPath = path.Join(basePath) + "/" + node z.Lock() - if z.conn != nil { - tmpPath, err = z.conn.Create(zkPath, data, zk.FlagEphemeral, zk.WorldACL(zk.PermAll)) + if z.Conn != nil { + tmpPath, err = z.Conn.Create(zkPath, data, zk.FlagEphemeral, zk.WorldACL(zk.PermAll)) } z.Unlock() //if err != nil && err != zk.ErrNodeExists { @@ -358,7 +438,7 @@ func (z *zookeeperClient) RegisterTemp(basePath string, node string) (string, er return tmpPath, nil } -func (z *zookeeperClient) RegisterTempSeq(basePath string, data []byte) (string, error) { +func (z *ZookeeperClient) RegisterTempSeq(basePath string, data []byte) (string, error) { var ( err error tmpPath string @@ -366,8 +446,8 @@ func (z *zookeeperClient) RegisterTempSeq(basePath string, data []byte) (string, err = errNilZkClientConn z.Lock() - if z.conn != nil { - tmpPath, err = z.conn.Create( + if z.Conn != nil { + tmpPath, err = z.Conn.Create( path.Join(basePath)+"/", data, zk.FlagEphemeral|zk.FlagSequence, @@ -386,7 +466,7 @@ func (z *zookeeperClient) RegisterTempSeq(basePath string, data []byte) (string, return tmpPath, nil } -func (z *zookeeperClient) getChildrenW(path string) ([]string, <-chan zk.Event, error) { +func (z *ZookeeperClient) GetChildrenW(path string) ([]string, <-chan zk.Event, error) { var ( err error children []string @@ -396,8 +476,8 @@ func (z *zookeeperClient) getChildrenW(path string) ([]string, <-chan zk.Event, err = errNilZkClientConn z.Lock() - if z.conn != nil { - children, stat, event, err = z.conn.ChildrenW(path) + if z.Conn != nil { + children, stat, event, err = z.Conn.ChildrenW(path) } z.Unlock() if err != nil { @@ -417,7 +497,7 @@ func (z *zookeeperClient) getChildrenW(path string) ([]string, <-chan zk.Event, return children, event, nil } -func (z *zookeeperClient) getChildren(path string) ([]string, error) { +func (z *ZookeeperClient) GetChildren(path string) ([]string, error) { var ( err error children []string @@ -426,8 +506,8 @@ func (z *zookeeperClient) getChildren(path string) ([]string, error) { err = errNilZkClientConn z.Lock() - if z.conn != nil { - children, stat, err = z.conn.Children(path) + if z.Conn != nil { + children, stat, err = z.Conn.Children(path) } z.Unlock() if err != nil { @@ -447,7 +527,7 @@ func (z *zookeeperClient) getChildren(path string) ([]string, error) { return children, nil } -func (z *zookeeperClient) existW(zkPath string) (<-chan zk.Event, error) { +func (z *ZookeeperClient) ExistW(zkPath string) (<-chan zk.Event, error) { var ( exist bool err error @@ -456,8 +536,8 @@ func (z *zookeeperClient) existW(zkPath string) (<-chan zk.Event, error) { err = errNilZkClientConn z.Lock() - if z.conn != nil { - exist, _, event, err = z.conn.ExistsW(zkPath) + if z.Conn != nil { + exist, _, event, err = z.Conn.ExistsW(zkPath) } z.Unlock() if err != nil { diff --git a/registry/zookeeper/zk_client_test.go b/remoting/zookeeper/client_test.go similarity index 92% rename from registry/zookeeper/zk_client_test.go rename to remoting/zookeeper/client_test.go index ff98bb5bc783da0908c5f3cbe28c94385b0bd1db..4a71ebd6107c499bafe7baa7112e31dd53dfdfd4 100644 --- a/registry/zookeeper/zk_client_test.go +++ b/remoting/zookeeper/client_test.go @@ -93,7 +93,8 @@ func verifyEventOrder(t *testing.T, c <-chan zk.Event, expectedEvent []zk.EventT //} func Test_newMockZookeeperClient(t *testing.T) { - ts, z, event, _ := newMockZookeeperClient("test", 15*time.Second) + ts, z, event, err := NewMockZookeeperClient("test", 15*time.Second) + assert.NoError(t, err) defer ts.Stop() states := []zk.State{zk.StateConnecting, zk.StateConnected, zk.StateHasSession} verifyEventStateOrder(t, event, states, "event channel") @@ -103,7 +104,7 @@ func Test_newMockZookeeperClient(t *testing.T) { } func TestCreate(t *testing.T) { - ts, z, event, _ := newMockZookeeperClient("test", 15*time.Second) + ts, z, event, _ := NewMockZookeeperClient("test", 15*time.Second) defer ts.Stop() err := z.Create("test1/test2/test3/test4") assert.NoError(t, err) @@ -113,7 +114,7 @@ func TestCreate(t *testing.T) { } func TestCreateDelete(t *testing.T) { - ts, z, event, _ := newMockZookeeperClient("test", 15*time.Second) + ts, z, event, _ := NewMockZookeeperClient("test", 15*time.Second) defer ts.Stop() states := []zk.State{zk.StateConnecting, zk.StateConnected, zk.StateHasSession} @@ -126,7 +127,7 @@ func TestCreateDelete(t *testing.T) { } func TestRegisterTemp(t *testing.T) { - ts, z, event, _ := newMockZookeeperClient("test", 15*time.Second) + ts, z, event, _ := NewMockZookeeperClient("test", 15*time.Second) defer ts.Stop() err := z.Create("/test1/test2/test3") assert.NoError(t, err) @@ -139,7 +140,7 @@ func TestRegisterTemp(t *testing.T) { } func TestRegisterTempSeq(t *testing.T) { - ts, z, event, _ := newMockZookeeperClient("test", 15*time.Second) + ts, z, event, _ := NewMockZookeeperClient("test", 15*time.Second) defer ts.Stop() err := z.Create("/test1/test2/test3") assert.NoError(t, err) diff --git a/remoting/zookeeper/container.go b/remoting/zookeeper/container.go new file mode 100644 index 0000000000000000000000000000000000000000..fe262adb20e483f8044189857689ae3808e0e32a --- /dev/null +++ b/remoting/zookeeper/container.go @@ -0,0 +1,68 @@ +package zookeeper + +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/logger" + perrors "github.com/pkg/errors" + "sync" + "time" +) + +type ZkClientContainer interface { + ZkClient() *ZookeeperClient + SetZkClient(*ZookeeperClient) + ZkClientLock() *sync.Mutex + WaitGroup() *sync.WaitGroup //for wait group control, zk client listener & zk client container + GetDone() chan struct{} //for zk client control + RestartCallBack() bool + common.Node +} + +func HandleClientRestart(r ZkClientContainer) { + var ( + err error + + failTimes int + ) + + defer r.WaitGroup().Done() +LOOP: + for { + select { + case <-r.GetDone(): + logger.Warnf("(ZkProviderRegistry)reconnectZkRegistry goroutine exit now...") + break LOOP + // re-register all services + case <-r.ZkClient().Done(): + r.ZkClientLock().Lock() + r.ZkClient().Close() + zkName := r.ZkClient().name + zkAddress := r.ZkClient().ZkAddrs + r.SetZkClient(nil) + r.ZkClientLock().Unlock() + + // 鎺k锛岀洿鑷虫垚鍔� + failTimes = 0 + for { + select { + case <-r.GetDone(): + logger.Warnf("(ZkProviderRegistry)reconnectZkRegistry goroutine exit now...") + break LOOP + case <-time.After(time.Duration(1e9 * failTimes * ConnDelay)): // 闃叉鐤媯閲嶈繛zk + } + err = ValidateZookeeperClient(r, WithZkName(zkName)) + logger.Infof("ZkProviderRegistry.validateZookeeperClient(zkAddr{%s}) = error{%#v}", + zkAddress, perrors.WithStack(err)) + if err == nil { + if r.RestartCallBack() { + break + } + } + failTimes++ + if MaxFailTimes <= failTimes { + failTimes = MaxFailTimes + } + } + } + } +} diff --git a/remoting/zookeeper/dataListener.go b/remoting/zookeeper/dataListener.go new file mode 100644 index 0000000000000000000000000000000000000000..4d3cb9efb5b81214bc4a3a6bf0ec862014e94627 --- /dev/null +++ b/remoting/zookeeper/dataListener.go @@ -0,0 +1,5 @@ +package zookeeper + +type DataListener interface { + DataChange(eventType ZkEvent) bool //bool is return for interface implement is interesting +} diff --git a/registry/zookeeper/listener.go b/remoting/zookeeper/listener.go similarity index 51% rename from registry/zookeeper/listener.go rename to remoting/zookeeper/listener.go index d2cebde2ad7eabd9554069a969a390f193552d6c..c7ac4593523604e9a84330d2a88d8bfb2c735b47 100644 --- a/registry/zookeeper/listener.go +++ b/remoting/zookeeper/listener.go @@ -18,7 +18,6 @@ package zookeeper import ( - "context" "fmt" "path" "sync" @@ -33,46 +32,39 @@ import ( import ( "github.com/apache/dubbo-go/common" - "github.com/apache/dubbo-go/registry" ) -const ( - MaxFailTimes = 15 -) - -type zkEvent struct { - res *registry.ServiceEvent +type ZkEvent struct { + Res *common.Event err error } -func (e zkEvent) String() string { - return fmt.Sprintf("err:%s, res:%s", e.err, e.res) +func (e ZkEvent) String() string { + return fmt.Sprintf("err:%s, res:%s", e.err, e.Res) } -type zkEventListener struct { - client *zookeeperClient - events chan zkEvent - serviceMapLock sync.Mutex - serviceMap map[string]struct{} - wg sync.WaitGroup - registry *zkRegistry +type ZkEventListener struct { + client *ZookeeperClient + pathMapLock sync.Mutex + pathMap map[string]struct{} + wg sync.WaitGroup } -func newZkEventListener(registry *zkRegistry, client *zookeeperClient) *zkEventListener { - return &zkEventListener{ - client: client, - registry: registry, - events: make(chan zkEvent, 32), - serviceMap: make(map[string]struct{}), +func NewZkEventListener(client *ZookeeperClient) *ZkEventListener { + return &ZkEventListener{ + client: client, + pathMap: make(map[string]struct{}), } } - -func (l *zkEventListener) listenServiceNodeEvent(zkPath string) bool { +func (l *ZkEventListener) SetClient(client *ZookeeperClient) { + l.client = client +} +func (l *ZkEventListener) listenServiceNodeEvent(zkPath string) bool { l.wg.Add(1) defer l.wg.Done() var zkEvent zk.Event for { - keyEventCh, err := l.client.existW(zkPath) + keyEventCh, err := l.client.ExistW(zkPath) if err != nil { logger.Errorf("existW{key:%s} = error{%v}", zkPath, err) return false @@ -81,7 +73,7 @@ func (l *zkEventListener) listenServiceNodeEvent(zkPath string) bool { select { case zkEvent = <-keyEventCh: logger.Warnf("get a zookeeper zkEvent{type:%s, server:%s, path:%s, state:%d-%s, err:%s}", - zkEvent.Type.String(), zkEvent.Server, zkEvent.Path, zkEvent.State, stateToString(zkEvent.State), zkEvent.Err) + zkEvent.Type.String(), zkEvent.Server, zkEvent.Path, zkEvent.State, StateToString(zkEvent.State), zkEvent.Err) switch zkEvent.Type { case zk.EventNodeDataChanged: logger.Warnf("zk.ExistW(key{%s}) = event{EventNodeDataChanged}", zkPath) @@ -93,7 +85,7 @@ func (l *zkEventListener) listenServiceNodeEvent(zkPath string) bool { logger.Warnf("zk.ExistW(key{%s}) = event{EventNodeDeleted}", zkPath) return true } - case <-l.client.done(): + case <-l.client.Done(): return false } } @@ -101,7 +93,7 @@ func (l *zkEventListener) listenServiceNodeEvent(zkPath string) bool { return false } -func (l *zkEventListener) handleZkNodeEvent(zkPath string, children []string, conf common.URL) { +func (l *ZkEventListener) handleZkNodeEvent(zkPath string, children []string, listener DataListener) { contains := func(s []string, e string) bool { for _, a := range s { if a == e { @@ -112,7 +104,7 @@ func (l *zkEventListener) handleZkNodeEvent(zkPath string, children []string, co return false } - newChildren, err := l.client.getChildren(zkPath) + newChildren, err := l.client.GetChildren(zkPath) if err != nil { logger.Errorf("path{%s} child nodes changed, zk.Children() = error{%v}", zkPath, perrors.WithStack(err)) return @@ -120,8 +112,7 @@ func (l *zkEventListener) handleZkNodeEvent(zkPath string, children []string, co // a node was added -- listen the new node var ( - newNode string - serviceURL common.URL + newNode string ) for _, n := range newChildren { if contains(children, n) { @@ -130,27 +121,18 @@ func (l *zkEventListener) handleZkNodeEvent(zkPath string, children []string, co newNode = path.Join(zkPath, n) logger.Infof("add zkNode{%s}", newNode) - //context.TODO - serviceURL, err = common.NewURL(context.TODO(), n) - if err != nil { - logger.Errorf("NewURL(%s) = error{%v}", n, perrors.WithStack(err)) + if !listener.DataChange(ZkEvent{&common.Event{Path: zkPath, Action: common.Add, Content: n}, nil}) { continue } - if !conf.URLEqual(serviceURL) { - logger.Warnf("serviceURL{%s} is not compatible with SubURL{%#v}", serviceURL.Key(), conf.Key()) - continue - } - logger.Infof("add serviceURL{%s}", serviceURL) - l.events <- zkEvent{®istry.ServiceEvent{Action: registry.ServiceAdd, Service: serviceURL}, nil} // listen l service node - go func(node string, serviceURL common.URL) { + go func(node string) { logger.Infof("delete zkNode{%s}", node) if l.listenServiceNodeEvent(node) { - logger.Infof("delete serviceURL{%s}", serviceURL) - l.events <- zkEvent{®istry.ServiceEvent{Action: registry.ServiceDel, Service: serviceURL}, nil} + logger.Infof("delete content{%s}", n) + listener.DataChange(ZkEvent{&common.Event{Path: zkPath, Action: common.Del, Content: n}, nil}) } logger.Warnf("listenSelf(zk path{%s}) goroutine exit now", zkPath) - }(newNode, serviceURL) + }(newNode) } // old node was deleted @@ -162,21 +144,19 @@ func (l *zkEventListener) handleZkNodeEvent(zkPath string, children []string, co oldNode = path.Join(zkPath, n) logger.Warnf("delete zkPath{%s}", oldNode) - serviceURL, err = common.NewURL(context.TODO(), n) - if !conf.URLEqual(serviceURL) { - logger.Warnf("serviceURL{%s} has been deleted is not compatible with SubURL{%#v}", serviceURL.Key(), conf.Key()) + if !listener.DataChange(ZkEvent{&common.Event{Path: zkPath, Action: common.Add, Content: n}, nil}) { continue } - logger.Warnf("delete serviceURL{%s}", serviceURL) + logger.Warnf("delete content{%s}", n) if err != nil { logger.Errorf("NewURL(i{%s}) = error{%v}", n, perrors.WithStack(err)) continue } - l.events <- zkEvent{®istry.ServiceEvent{Action: registry.ServiceDel, Service: serviceURL}, nil} + listener.DataChange(ZkEvent{&common.Event{Path: zkPath, Action: common.Del, Content: n}, nil}) } } -func (l *zkEventListener) listenDirEvent(zkPath string, conf common.URL) { +func (l *ZkEventListener) listenDirEvent(zkPath string, listener DataListener) { l.wg.Add(1) defer l.wg.Done() @@ -189,7 +169,7 @@ func (l *zkEventListener) listenDirEvent(zkPath string, conf common.URL) { defer close(event) for { // get current children for a zkPath - children, childEventCh, err := l.client.getChildrenW(zkPath) + children, childEventCh, err := l.client.GetChildrenW(zkPath) if err != nil { failTimes++ if MaxFailTimes <= failTimes { @@ -205,19 +185,19 @@ func (l *zkEventListener) listenDirEvent(zkPath string, conf common.URL) { break CLEAR } } - l.client.registerEvent(zkPath, &event) + l.client.RegisterEvent(zkPath, &event) select { - case <-time.After(timeSecondDuration(failTimes * RegistryConnDelay)): - l.client.unregisterEvent(zkPath, &event) + case <-time.After(timeSecondDuration(failTimes * ConnDelay)): + l.client.UnregisterEvent(zkPath, &event) continue - case <-l.client.done(): - l.client.unregisterEvent(zkPath, &event) - logger.Warnf("client.done(), listen(path{%s}, ReferenceConfig{%#v}) goroutine exit now...", zkPath, conf) + case <-l.client.Done(): + l.client.UnregisterEvent(zkPath, &event) + logger.Warnf("client.done(), listen(path{%s}) goroutine exit now...", zkPath) return case <-event: logger.Infof("get zk.EventNodeDataChange notify event") - l.client.unregisterEvent(zkPath, &event) - l.handleZkNodeEvent(zkPath, nil, conf) + l.client.UnregisterEvent(zkPath, &event) + l.handleZkNodeEvent(zkPath, nil, listener) continue } } @@ -226,64 +206,57 @@ func (l *zkEventListener) listenDirEvent(zkPath string, conf common.URL) { select { case zkEvent = <-childEventCh: logger.Warnf("get a zookeeper zkEvent{type:%s, server:%s, path:%s, state:%d-%s, err:%s}", - zkEvent.Type.String(), zkEvent.Server, zkEvent.Path, zkEvent.State, stateToString(zkEvent.State), zkEvent.Err) + zkEvent.Type.String(), zkEvent.Server, zkEvent.Path, zkEvent.State, StateToString(zkEvent.State), zkEvent.Err) if zkEvent.Type != zk.EventNodeChildrenChanged { continue } - l.handleZkNodeEvent(zkEvent.Path, children, conf) - case <-l.client.done(): - logger.Warnf("client.done(), listen(path{%s}, ReferenceConfig{%#v}) goroutine exit now...", zkPath, conf) + l.handleZkNodeEvent(zkEvent.Path, children, listener) + case <-l.client.Done(): + logger.Warnf("client.done(), listen(path{%s}) goroutine exit now...", zkPath) return } } } +func timeSecondDuration(sec int) time.Duration { + return time.Duration(sec) * time.Second +} + // this func is invoked by ZkConsumerRegistry::Registe/ZkConsumerRegistry::get/ZkConsumerRegistry::getListener // registry.go:Listen -> listenServiceEvent -> listenDirEvent -> listenServiceNodeEvent // | // --------> listenServiceNodeEvent -func (l *zkEventListener) listenServiceEvent(conf common.URL) { +func (l *ZkEventListener) ListenServiceEvent(zkPath string, listener DataListener) { var ( err error - zkPath string dubboPath string children []string serviceURL common.URL ) - zkPath = fmt.Sprintf("/dubbo%s/providers", conf.Path) - - l.serviceMapLock.Lock() - _, ok := l.serviceMap[zkPath] - l.serviceMapLock.Unlock() + l.pathMapLock.Lock() + _, ok := l.pathMap[zkPath] + l.pathMapLock.Unlock() if ok { logger.Warnf("@zkPath %s has already been listened.", zkPath) return } - l.serviceMapLock.Lock() - l.serviceMap[zkPath] = struct{}{} - l.serviceMapLock.Unlock() + l.pathMapLock.Lock() + l.pathMap[zkPath] = struct{}{} + l.pathMapLock.Unlock() logger.Infof("listen dubbo provider path{%s} event and wait to get all provider zk nodes", zkPath) - children, err = l.client.getChildren(zkPath) + children, err = l.client.GetChildren(zkPath) if err != nil { children = nil logger.Errorf("fail to get children of zk path{%s}", zkPath) } for _, c := range children { - serviceURL, err = common.NewURL(context.TODO(), c) - if err != nil { - logger.Errorf("NewURL(r{%s}) = error{%v}", c, err) + if !listener.DataChange(ZkEvent{&common.Event{Path: zkPath, Action: common.Add, Content: c}, nil}) { continue } - if !conf.URLEqual(serviceURL) { - logger.Warnf("serviceURL %v is not compatible with SubURL %v", serviceURL.Key(), conf.Key()) - continue - } - logger.Debugf("add serviceUrl{%s}", serviceURL) - l.events <- zkEvent{®istry.ServiceEvent{Action: registry.ServiceAdd, Service: serviceURL}, nil} // listen l service node dubboPath = path.Join(zkPath, c) @@ -291,55 +264,23 @@ func (l *zkEventListener) listenServiceEvent(conf common.URL) { go func(zkPath string, serviceURL common.URL) { if l.listenServiceNodeEvent(dubboPath) { logger.Debugf("delete serviceUrl{%s}", serviceURL) - l.events <- zkEvent{®istry.ServiceEvent{Action: registry.ServiceDel, Service: serviceURL}, nil} + listener.DataChange(ZkEvent{&common.Event{Path: zkPath, Action: common.Del, Content: c}, nil}) } logger.Warnf("listenSelf(zk path{%s}) goroutine exit now", zkPath) }(dubboPath, serviceURL) } logger.Infof("listen dubbo path{%s}", zkPath) - go func(zkPath string, conf common.URL) { - l.listenDirEvent(zkPath, conf) + go func(zkPath string, listener DataListener) { + l.listenDirEvent(zkPath, listener) logger.Warnf("listenDirEvent(zkPath{%s}) goroutine exit now", zkPath) - }(zkPath, conf) -} - -func (l *zkEventListener) Next() (*registry.ServiceEvent, error) { - for { - select { - case <-l.client.done(): - logger.Warnf("listener's zk client connection is broken, so zk event listener exit now.") - return nil, perrors.New("listener stopped") - - case <-l.registry.done: - logger.Warnf("zk consumer register has quit, so zk event listener exit asap now.") - return nil, perrors.New("listener stopped") - - case e := <-l.events: - logger.Debugf("got zk event %s", e) - if e.err != nil { - return nil, perrors.WithStack(e.err) - } - if e.res.Action == registry.ServiceDel && !l.valid() { - logger.Warnf("update @result{%s}. But its connection to registry is invalid", e.res) - continue - } - //r.update(e.res) - //write to invoker - //r.outerEventCh <- e.res - return e.res, nil - } - } + }(zkPath, listener) } -func (l *zkEventListener) valid() bool { - return l.client.zkConnValid() +func (l *ZkEventListener) valid() bool { + return l.client.ZkConnValid() } -func (l *zkEventListener) Close() { - l.registry.listenerLock.Lock() - l.client.Close() - l.registry.listenerLock.Unlock() - l.registry.wg.Done() +func (l *ZkEventListener) Close() { l.wg.Wait() } diff --git a/remoting/zookeeper/zookeeper-4unitest/contrib/fatjar/zookeeper-3.4.9-fatjar.jar b/remoting/zookeeper/zookeeper-4unitest/contrib/fatjar/zookeeper-3.4.9-fatjar.jar new file mode 100644 index 0000000000000000000000000000000000000000..839531b8b8762a9c19e334a5cbf79314cb16f945 Binary files /dev/null and b/remoting/zookeeper/zookeeper-4unitest/contrib/fatjar/zookeeper-3.4.9-fatjar.jar differ