From 3af6213e1ca5896243562459c1fcc07b048bb896 Mon Sep 17 00:00:00 2001 From: wangwx <wangwx@tuya.com> Date: Tue, 4 Aug 2020 21:07:27 +0800 Subject: [PATCH] try to fix zk problem --- common/constant/default.go | 1 + common/constant/key.go | 1 + config/registry_config.go | 2 ++ registry/zookeeper/registry.go | 2 ++ remoting/zookeeper/listener.go | 43 ++++++++++++++++++++++++++-------- 5 files changed, 39 insertions(+), 10 deletions(-) diff --git a/common/constant/default.go b/common/constant/default.go index c1c404e08..da68d7a5b 100644 --- a/common/constant/default.go +++ b/common/constant/default.go @@ -37,6 +37,7 @@ const ( DEFAULT_RETRIES_INT = 2 DEFAULT_PROTOCOL = "dubbo" DEFAULT_REG_TIMEOUT = "10s" + DEFAULT_REG_TTL = "10m" DEFAULT_CLUSTER = "failover" DEFAULT_FAILBACK_TIMES = "3" DEFAULT_FAILBACK_TIMES_INT = 3 diff --git a/common/constant/key.go b/common/constant/key.go index cd23dd0f1..ea9bad9d5 100644 --- a/common/constant/key.go +++ b/common/constant/key.go @@ -97,6 +97,7 @@ const ( ROLE_KEY = "registry.role" REGISTRY_DEFAULT_KEY = "registry.default" REGISTRY_TIMEOUT_KEY = "registry.timeout" + REGISTRY_TTL_KEY = "registry.ttl" ) const ( diff --git a/config/registry_config.go b/config/registry_config.go index ef527c827..703606b83 100644 --- a/config/registry_config.go +++ b/config/registry_config.go @@ -39,6 +39,7 @@ type RegistryConfig struct { // I changed "type" to "protocol" ,the same as "protocol" field in java class RegistryConfig TimeoutStr string `yaml:"timeout" default:"5s" json:"timeout,omitempty" property:"timeout"` // unit: second Group string `yaml:"group" json:"group,omitempty" property:"group"` + TTL string `yaml:"ttl" default:"10m" json:"ttl,omitempty" property:"ttl"` // unit: minute // for registry Address string `yaml:"address" json:"address,omitempty" property:"address"` Username string `yaml:"username" json:"username,omitempty" property:"username"` @@ -118,6 +119,7 @@ func (c *RegistryConfig) getUrlMap(roleType common.RoleType) url.Values { urlMap.Set(constant.ROLE_KEY, strconv.Itoa(int(roleType))) urlMap.Set(constant.REGISTRY_KEY, c.Protocol) urlMap.Set(constant.REGISTRY_TIMEOUT_KEY, c.TimeoutStr) + urlMap.Set(constant.REGISTRY_TTL_KEY, c.TTL) for k, v := range c.Params { urlMap.Set(k, v) } diff --git a/registry/zookeeper/registry.go b/registry/zookeeper/registry.go index 8f2ac1023..e8ee51beb 100644 --- a/registry/zookeeper/registry.go +++ b/registry/zookeeper/registry.go @@ -243,6 +243,8 @@ func (r *zkRegistry) getListener(conf *common.URL) (*RegistryConfigurationListen var zkListener *RegistryConfigurationListener dataListener := r.dataListener + ttl := r.GetParam(constant.REGISTRY_TTL_KEY, constant.DEFAULT_REG_TTL) + conf.SetParam(constant.REGISTRY_TTL_KEY, ttl) dataListener.mutex.Lock() defer dataListener.mutex.Unlock() if r.dataListener.subscribed[conf.ServiceKey()] != nil { diff --git a/remoting/zookeeper/listener.go b/remoting/zookeeper/listener.go index 9a4874db2..97c61f2e4 100644 --- a/remoting/zookeeper/listener.go +++ b/remoting/zookeeper/listener.go @@ -37,6 +37,10 @@ import ( "github.com/apache/dubbo-go/remoting" ) +var ( + DefaultTTL = 10 * time.Minute +) + // nolint type ZkEventListener struct { client *ZookeeperClient @@ -197,10 +201,18 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkPath string, listen var ( failTimes int + ttl time.Duration event chan struct{} zkEvent zk.Event ) event = make(chan struct{}, 4) + ttl = DefaultTTL + timeout, err := time.ParseDuration(conf.GetParam(constant.REGISTRY_TTL_KEY, constant.DEFAULT_REG_TTL)) + if err == nil { + ttl = timeout + } else { + logger.Warnf("wrong configuration for registry ttl, error:=%+v", err) + } defer close(event) for { // get current children for a zkPath @@ -302,18 +314,29 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkPath string, listen }(dubboPath, listener) } } - select { - case zkEvent = <-childEventCh: - logger.Warnf("get a zookeeper zkEvent{type:%s, server:%s, path:%s, state:%d-%s, err:%s}", - zkEvent.Type.String(), zkEvent.Server, zkEvent.Path, zkEvent.State, StateToString(zkEvent.State), zkEvent.Err) - if zkEvent.Type != zk.EventNodeChildrenChanged { - continue + // Periodically update provider information + ticker := time.NewTicker(ttl) + WATCH: + for { + select { + case <-ticker.C: + l.handleZkNodeEvent(zkEvent.Path, children, listener) + case zkEvent = <-childEventCh: + logger.Warnf("get a zookeeper zkEvent{type:%s, server:%s, path:%s, state:%d-%s, err:%s}", + zkEvent.Type.String(), zkEvent.Server, zkEvent.Path, zkEvent.State, StateToString(zkEvent.State), zkEvent.Err) + ticker.Stop() + if zkEvent.Type != zk.EventNodeChildrenChanged { + break WATCH + } + l.handleZkNodeEvent(zkEvent.Path, children, listener) + break WATCH + case <-l.client.Done(): + logger.Warnf("client.done(), listen(path{%s}) goroutine exit now...", zkPath) + ticker.Stop() + return } - l.handleZkNodeEvent(zkEvent.Path, children, listener) - case <-l.client.Done(): - logger.Warnf("client.done(), listen(path{%s}) goroutine exit now...", zkPath) - return } + } } -- GitLab