Skip to content
Snippets Groups Projects
Commit 7ad26fd6 authored by wangwx's avatar wangwx
Browse files

try to fix zk problem

parent abcc4df4
No related branches found
No related tags found
No related merge requests found
......@@ -37,6 +37,7 @@ const (
DEFAULT_RETRIES_INT = 2
DEFAULT_PROTOCOL = "dubbo"
DEFAULT_REG_TIMEOUT = "10s"
DEFAULT_REG_TTL = "10m"
DEFAULT_CLUSTER = "failover"
DEFAULT_FAILBACK_TIMES = "3"
DEFAULT_FAILBACK_TIMES_INT = 3
......
......@@ -97,6 +97,7 @@ const (
ROLE_KEY = "registry.role"
REGISTRY_DEFAULT_KEY = "registry.default"
REGISTRY_TIMEOUT_KEY = "registry.timeout"
REGISTRY_TTL_KEY = "registry.ttl"
)
const (
......
......@@ -39,6 +39,7 @@ type RegistryConfig struct {
// I changed "type" to "protocol" ,the same as "protocol" field in java class RegistryConfig
TimeoutStr string `yaml:"timeout" default:"5s" json:"timeout,omitempty" property:"timeout"` // unit: second
Group string `yaml:"group" json:"group,omitempty" property:"group"`
TTL string `yaml:"ttl" default:"10m" json:"ttl,omitempty" property:"ttl"` // unit: minute
// for registry
Address string `yaml:"address" json:"address,omitempty" property:"address"`
Username string `yaml:"username" json:"username,omitempty" property:"username"`
......@@ -118,6 +119,7 @@ func (c *RegistryConfig) getUrlMap(roleType common.RoleType) url.Values {
urlMap.Set(constant.ROLE_KEY, strconv.Itoa(int(roleType)))
urlMap.Set(constant.REGISTRY_KEY, c.Protocol)
urlMap.Set(constant.REGISTRY_TIMEOUT_KEY, c.TimeoutStr)
urlMap.Set(constant.REGISTRY_TTL_KEY, c.TTL)
for k, v := range c.Params {
urlMap.Set(k, v)
}
......
......@@ -243,6 +243,8 @@ func (r *zkRegistry) getListener(conf *common.URL) (*RegistryConfigurationListen
var zkListener *RegistryConfigurationListener
dataListener := r.dataListener
ttl := r.GetParam(constant.REGISTRY_TTL_KEY, constant.DEFAULT_REG_TTL)
conf.SetParam(constant.REGISTRY_TTL_KEY, ttl)
dataListener.mutex.Lock()
defer dataListener.mutex.Unlock()
if r.dataListener.subscribed[conf.ServiceKey()] != nil {
......
......@@ -37,6 +37,10 @@ import (
"github.com/apache/dubbo-go/remoting"
)
var (
DefaultTTL = 10 * time.Minute
)
// nolint
type ZkEventListener struct {
client *ZookeeperClient
......@@ -197,10 +201,18 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkPath string, listen
var (
failTimes int
ttl time.Duration
event chan struct{}
zkEvent zk.Event
)
event = make(chan struct{}, 4)
ttl = DefaultTTL
timeout, err := time.ParseDuration(conf.GetParam(constant.REGISTRY_TTL_KEY, constant.DEFAULT_REG_TTL))
if err == nil {
ttl = timeout
} else {
logger.Warnf("wrong configuration for registry ttl, error:=%+v", err)
}
defer close(event)
for {
// get current children for a zkPath
......@@ -302,18 +314,29 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkPath string, listen
}(dubboPath, listener)
}
}
select {
case zkEvent = <-childEventCh:
logger.Warnf("get a zookeeper zkEvent{type:%s, server:%s, path:%s, state:%d-%s, err:%s}",
zkEvent.Type.String(), zkEvent.Server, zkEvent.Path, zkEvent.State, StateToString(zkEvent.State), zkEvent.Err)
if zkEvent.Type != zk.EventNodeChildrenChanged {
continue
// Periodically update provider information
ticker := time.NewTicker(ttl)
WATCH:
for {
select {
case <-ticker.C:
l.handleZkNodeEvent(zkEvent.Path, children, listener)
case zkEvent = <-childEventCh:
logger.Warnf("get a zookeeper zkEvent{type:%s, server:%s, path:%s, state:%d-%s, err:%s}",
zkEvent.Type.String(), zkEvent.Server, zkEvent.Path, zkEvent.State, StateToString(zkEvent.State), zkEvent.Err)
ticker.Stop()
if zkEvent.Type != zk.EventNodeChildrenChanged {
break WATCH
}
l.handleZkNodeEvent(zkEvent.Path, children, listener)
break WATCH
case <-l.client.Done():
logger.Warnf("client.done(), listen(path{%s}) goroutine exit now...", zkPath)
ticker.Stop()
return
}
l.handleZkNodeEvent(zkEvent.Path, children, listener)
case <-l.client.Done():
logger.Warnf("client.done(), listen(path{%s}) goroutine exit now...", zkPath)
return
}
}
}
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment