Skip to content
Snippets Groups Projects
Commit bd59cfcc authored by wangwx's avatar wangwx
Browse files

fix ut error

parent a53ed2bc
No related branches found
No related tags found
No related merge requests found
......@@ -56,12 +56,13 @@ func (l *RegistryDataListener) SubscribeURL(url *common.URL, listener config_cen
l.subscribed[url.ServiceKey()] = listener
}
// UnSubscribeURL is used to set a watch listener for url
// UnSubscribeURL is used to unset a watch listener for url
func (l *RegistryDataListener) UnSubscribeURL(url *common.URL) config_center.ConfigurationListener {
if l.closed {
return nil
}
listener := l.subscribed[url.ServiceKey()]
listener.(*RegistryConfigurationListener).Close()
delete(l.subscribed, url.ServiceKey())
return listener
}
......
......@@ -77,7 +77,7 @@ func newZkRegistry(url *common.URL) (registry.Registry, error) {
if err != nil {
return nil, err
}
r.WaitGroup().Add(1) //zk client start successful, then wg +1
go zookeeper.HandleClientRestart(r)
r.listener = zookeeper.NewZkEventListener(r.client)
......@@ -129,7 +129,7 @@ func (r *zkRegistry) InitListeners() {
defer oldDataListener.mutex.Unlock()
r.dataListener.closed = true
recovered := r.dataListener.subscribed
if len(recovered) > 0 {
if recovered != nil && len(recovered) > 0 {
// recover all subscribed url
for _, oldListener := range recovered {
var (
......@@ -178,6 +178,7 @@ func (r *zkRegistry) DoUnsubscribe(conf *common.URL) (registry.Listener, error)
// CloseAndNilClient closes listeners and clear client
func (r *zkRegistry) CloseAndNilClient() {
r.client.Close()
r.client = nil
}
......@@ -250,7 +251,8 @@ func (r *zkRegistry) getListener(conf *common.URL) (*RegistryConfigurationListen
dataListener.mutex.Lock()
defer dataListener.mutex.Unlock()
if r.dataListener.subscribed[conf.ServiceKey()] != nil {
zkListener, _ = r.dataListener.subscribed[conf.ServiceKey()].(*RegistryConfigurationListener)
zkListener, _ := r.dataListener.subscribed[conf.ServiceKey()].(*RegistryConfigurationListener)
if zkListener != nil {
r.listenerLock.Lock()
defer r.listenerLock.Unlock()
......@@ -282,11 +284,7 @@ func (r *zkRegistry) getListener(conf *common.URL) (*RegistryConfigurationListen
//Interested register to dataconfig.
r.dataListener.SubscribeURL(conf, zkListener)
go r.listener.ListenServiceEvent(
conf,
fmt.Sprintf("/dubbo/%s/"+constant.DEFAULT_CATEGORY, url.QueryEscape(conf.Service())),
r.dataListener,
)
go r.listener.ListenServiceEvent(conf, fmt.Sprintf("/dubbo/%s/"+constant.DEFAULT_CATEGORY, url.QueryEscape(conf.Service())), r.dataListener)
return zkListener, nil
}
......@@ -297,9 +295,9 @@ func (r *zkRegistry) getCloseListener(conf *common.URL) (*RegistryConfigurationL
r.dataListener.mutex.Lock()
configurationListener := r.dataListener.subscribed[conf.ServiceKey()]
if configurationListener != nil {
rcListener, _ := configurationListener.(*RegistryConfigurationListener)
if rcListener != nil {
if rcListener.isClosed {
zkListener, _ := configurationListener.(*RegistryConfigurationListener)
if zkListener != nil {
if zkListener.isClosed {
r.dataListener.mutex.Unlock()
return nil, perrors.New("configListener already been closed")
}
......
......@@ -37,11 +37,14 @@ func Test_Register(t *testing.T) {
regURL, _ := common.NewURL("registry://127.0.0.1:1111", common.WithParamsValue(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER)))
url, _ := common.NewURL("dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider", common.WithParamsValue(constant.CLUSTER_KEY, "mock"), common.WithParamsValue("serviceid", "soa.mock"), common.WithMethods([]string{"GetUser", "AddUser"}))
ts, reg, _ := newMockZkRegistry(regURL)
ts, reg, err := newMockZkRegistry(regURL)
if err != nil {
assert.NoError(t, err)
}
defer func() {
_ = ts.Stop()
}()
err := reg.Register(url)
err = reg.Register(url)
children, _ := reg.client.GetChildren("/dubbo/com.ikurento.user.UserProvider/providers")
assert.Regexp(t, ".*dubbo%3A%2F%2F127.0.0.1%3A20000%2Fcom.ikurento.user.UserProvider%3Fanyhost%3Dtrue%26cluster%3Dmock%26.*.serviceid%3Dsoa.mock", children)
assert.NoError(t, err)
......
......@@ -47,6 +47,7 @@ type ZkEventListener struct {
pathMapLock sync.Mutex
pathMap map[string]struct{}
wg sync.WaitGroup
exit chan struct{}
}
// NewZkEventListener returns a EventListener instance
......@@ -54,6 +55,7 @@ func NewZkEventListener(client *ZookeeperClient) *ZkEventListener {
return &ZkEventListener{
client: client,
pathMap: make(map[string]struct{}),
exit: make(chan struct{}),
}
}
......@@ -116,6 +118,8 @@ func (l *ZkEventListener) listenServiceNodeEvent(zkPath string, listener ...remo
logger.Warnf("zk.ExistW(key{%s}) = event{EventNodeDeleted}", zkPath)
return true
}
case <-l.exit:
return false
}
}
}
......@@ -244,6 +248,10 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkPath string, listen
case <-getty.GetTimeWheel().After(timeSecondDuration(failTimes * ConnDelay)):
l.client.UnregisterEvent(zkPath, &event)
continue
case <-l.exit:
l.client.UnregisterEvent(zkPath, &event)
logger.Warnf("listen(path{%s}) goroutine exit now...", zkPath)
return
case <-event:
logger.Infof("get zk.EventNodeDataChange notify event")
l.client.UnregisterEvent(zkPath, &event)
......@@ -331,6 +339,10 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkPath string, listen
}
l.handleZkNodeEvent(zkEvent.Path, children, listener)
break WATCH
case <-l.exit:
logger.Warnf("listen(path{%s}) goroutine exit now...", zkPath)
ticker.Stop()
return
}
}
......@@ -360,5 +372,6 @@ func (l *ZkEventListener) ListenServiceEvent(conf *common.URL, zkPath string, li
// Close will let client listen exit
func (l *ZkEventListener) Close() {
close(l.exit)
l.wg.Wait()
}
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment