diff --git a/common/constant/default.go b/common/constant/default.go index c1c404e089ea90899d2b599b01cd5980c3e92ab1..629aa32392a0151046eaaea67287618eae02158d 100644 --- a/common/constant/default.go +++ b/common/constant/default.go @@ -37,6 +37,7 @@ const ( DEFAULT_RETRIES_INT = 2 DEFAULT_PROTOCOL = "dubbo" DEFAULT_REG_TIMEOUT = "10s" + DEFAULT_REG_TTL = "15m" DEFAULT_CLUSTER = "failover" DEFAULT_FAILBACK_TIMES = "3" DEFAULT_FAILBACK_TIMES_INT = 3 diff --git a/common/constant/key.go b/common/constant/key.go index cd23dd0f1ad3e000fe54c251d9563bcf12ba86c7..ea9bad9d501e7db63c69b0157e568f816ca7ba81 100644 --- a/common/constant/key.go +++ b/common/constant/key.go @@ -97,6 +97,7 @@ const ( ROLE_KEY = "registry.role" REGISTRY_DEFAULT_KEY = "registry.default" REGISTRY_TIMEOUT_KEY = "registry.timeout" + REGISTRY_TTL_KEY = "registry.ttl" ) const ( diff --git a/config/registry_config.go b/config/registry_config.go index ef527c827e9dac4cd2762f579d30254e9e51150f..703606b836906a8dbe9964e74da0edbe76991f48 100644 --- a/config/registry_config.go +++ b/config/registry_config.go @@ -39,6 +39,7 @@ type RegistryConfig struct { // I changed "type" to "protocol" ,the same as "protocol" field in java class RegistryConfig TimeoutStr string `yaml:"timeout" default:"5s" json:"timeout,omitempty" property:"timeout"` // unit: second Group string `yaml:"group" json:"group,omitempty" property:"group"` + TTL string `yaml:"ttl" default:"10m" json:"ttl,omitempty" property:"ttl"` // unit: minute // for registry Address string `yaml:"address" json:"address,omitempty" property:"address"` Username string `yaml:"username" json:"username,omitempty" property:"username"` @@ -118,6 +119,7 @@ func (c *RegistryConfig) getUrlMap(roleType common.RoleType) url.Values { urlMap.Set(constant.ROLE_KEY, strconv.Itoa(int(roleType))) urlMap.Set(constant.REGISTRY_KEY, c.Protocol) urlMap.Set(constant.REGISTRY_TIMEOUT_KEY, c.TimeoutStr) + urlMap.Set(constant.REGISTRY_TTL_KEY, c.TTL) for k, v := range c.Params { urlMap.Set(k, v) } diff --git a/registry/zookeeper/registry.go b/registry/zookeeper/registry.go index 8f2ac1023b8ad34938b9996b480e3bbc4adbaaea..e8ee51beb70b5a08ec60b213c5342ef52972c59f 100644 --- a/registry/zookeeper/registry.go +++ b/registry/zookeeper/registry.go @@ -243,6 +243,8 @@ func (r *zkRegistry) getListener(conf *common.URL) (*RegistryConfigurationListen var zkListener *RegistryConfigurationListener dataListener := r.dataListener + ttl := r.GetParam(constant.REGISTRY_TTL_KEY, constant.DEFAULT_REG_TTL) + conf.SetParam(constant.REGISTRY_TTL_KEY, ttl) dataListener.mutex.Lock() defer dataListener.mutex.Unlock() if r.dataListener.subscribed[conf.ServiceKey()] != nil { diff --git a/remoting/zookeeper/listener.go b/remoting/zookeeper/listener.go index 9a4874db24696d90e4fcc7d9d987f5888f1be599..4f50c18ab61ba6faf373dfd0f831c14ae7ab6d5d 100644 --- a/remoting/zookeeper/listener.go +++ b/remoting/zookeeper/listener.go @@ -37,6 +37,10 @@ import ( "github.com/apache/dubbo-go/remoting" ) +var ( + defaultTTL = 15 * time.Minute +) + // nolint type ZkEventListener struct { client *ZookeeperClient @@ -197,10 +201,20 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkPath string, listen var ( failTimes int + ttl time.Duration event chan struct{} zkEvent zk.Event ) event = make(chan struct{}, 4) + ttl = defaultTTL + if conf != nil { + timeout, err := time.ParseDuration(conf.GetParam(constant.REGISTRY_TTL_KEY, constant.DEFAULT_REG_TTL)) + if err == nil { + ttl = timeout + } else { + logger.Warnf("wrong configuration for registry ttl, error:=%+v, using default value %v instead", err, defaultTTL) + } + } defer close(event) for { // get current children for a zkPath @@ -302,18 +316,29 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkPath string, listen }(dubboPath, listener) } } - select { - case zkEvent = <-childEventCh: - logger.Warnf("get a zookeeper zkEvent{type:%s, server:%s, path:%s, state:%d-%s, err:%s}", - zkEvent.Type.String(), zkEvent.Server, zkEvent.Path, zkEvent.State, StateToString(zkEvent.State), zkEvent.Err) - if zkEvent.Type != zk.EventNodeChildrenChanged { - continue + // Periodically update provider information + ticker := time.NewTicker(ttl) + WATCH: + for { + select { + case <-ticker.C: + l.handleZkNodeEvent(zkEvent.Path, children, listener) + case zkEvent = <-childEventCh: + logger.Warnf("get a zookeeper zkEvent{type:%s, server:%s, path:%s, state:%d-%s, err:%s}", + zkEvent.Type.String(), zkEvent.Server, zkEvent.Path, zkEvent.State, StateToString(zkEvent.State), zkEvent.Err) + ticker.Stop() + if zkEvent.Type != zk.EventNodeChildrenChanged { + break WATCH + } + l.handleZkNodeEvent(zkEvent.Path, children, listener) + break WATCH + case <-l.client.Done(): + logger.Warnf("client.done(), listen(path{%s}) goroutine exit now...", zkPath) + ticker.Stop() + return } - l.handleZkNodeEvent(zkEvent.Path, children, listener) - case <-l.client.Done(): - logger.Warnf("client.done(), listen(path{%s}) goroutine exit now...", zkPath) - return } + } }