diff --git a/cluster/loadbalance/round_robin.go b/cluster/loadbalance/round_robin.go
new file mode 100644
index 0000000000000000000000000000000000000000..e173e211c3630f4d4786edc2c1a09709fd7bf0a1
--- /dev/null
+++ b/cluster/loadbalance/round_robin.go
@@ -0,0 +1,153 @@
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package loadbalance
+
+import (
+	"math"
+	"sync"
+	"sync/atomic"
+	"time"
+)
+
+import (
+	"github.com/apache/dubbo-go/cluster"
+	"github.com/apache/dubbo-go/common/extension"
+	"github.com/apache/dubbo-go/protocol"
+)
+
+const (
+	RoundRobin = "roundrobin"
+
+	COMPLETE = 0
+	UPDATING = 1
+)
+
+var (
+	methodWeightMap sync.Map            // [string]invokers
+	state           int32    = COMPLETE // update lock acquired ?
+	recyclePeriod   int64    = 60 * time.Second.Nanoseconds()
+)
+
+func init() {
+	extension.SetLoadbalance(RoundRobin, NewRoundRobinLoadBalance)
+}
+
+type roundRobinLoadBalance struct{}
+
+func NewRoundRobinLoadBalance() cluster.LoadBalance {
+	return &roundRobinLoadBalance{}
+}
+
+func (lb *roundRobinLoadBalance) Select(invokers []protocol.Invoker, invocation protocol.Invocation) protocol.Invoker {
+	count := len(invokers)
+	if count == 0 {
+		return nil
+	}
+	if count == 1 {
+		return invokers[0]
+	}
+
+	key := invokers[0].GetUrl().Path + "." + invocation.MethodName()
+	cache, _ := methodWeightMap.LoadOrStore(key, &cachedInvokers{})
+	cachedInvokers := cache.(*cachedInvokers)
+
+	var (
+		clean               = false
+		totalWeight         = int64(0)
+		maxCurrentWeight    = int64(math.MinInt64)
+		now                 = time.Now()
+		selectedInvoker     protocol.Invoker
+		selectedWeightRobin *weightedRoundRobin
+	)
+
+	for _, invoker := range invokers {
+		var weight = GetWeight(invoker, invocation)
+		if weight < 0 {
+			weight = 0
+		}
+
+		identifier := invoker.GetUrl().Key()
+		loaded, found := cachedInvokers.LoadOrStore(identifier, &weightedRoundRobin{weight: weight})
+		weightRobin := loaded.(*weightedRoundRobin)
+		if !found {
+			clean = true
+		}
+
+		if weightRobin.Weight() != weight {
+			weightRobin.setWeight(weight)
+		}
+
+		currentWeight := weightRobin.increaseCurrent()
+		weightRobin.lastUpdate = &now
+
+		if currentWeight > maxCurrentWeight {
+			maxCurrentWeight = currentWeight
+			selectedInvoker = invoker
+			selectedWeightRobin = weightRobin
+		}
+		totalWeight += weight
+	}
+
+	cleanIfRequired(clean, cachedInvokers, &now)
+
+	if selectedWeightRobin != nil {
+		selectedWeightRobin.Current(totalWeight)
+		return selectedInvoker
+	}
+
+	// should never happen
+	return invokers[0]
+}
+
+func cleanIfRequired(clean bool, invokers *cachedInvokers, now *time.Time) {
+	if clean && atomic.CompareAndSwapInt32(&state, COMPLETE, UPDATING) {
+		defer atomic.CompareAndSwapInt32(&state, UPDATING, COMPLETE)
+		invokers.Range(func(identify, robin interface{}) bool {
+			weightedRoundRobin := robin.(*weightedRoundRobin)
+			elapsed := now.Sub(*weightedRoundRobin.lastUpdate).Nanoseconds()
+			if elapsed > recyclePeriod {
+				invokers.Delete(identify)
+			}
+			return true
+		})
+	}
+}
+
+// Record the weight of the invoker
+type weightedRoundRobin struct {
+	weight     int64
+	current    int64
+	lastUpdate *time.Time
+}
+
+func (robin *weightedRoundRobin) Weight() int64 {
+	return atomic.LoadInt64(&robin.weight)
+}
+
+func (robin *weightedRoundRobin) setWeight(weight int64) {
+	robin.weight = weight
+	robin.current = 0
+}
+
+func (robin *weightedRoundRobin) increaseCurrent() int64 {
+	return atomic.AddInt64(&robin.current, robin.weight)
+}
+
+func (robin *weightedRoundRobin) Current(delta int64) {
+	atomic.AddInt64(&robin.current, -1*delta)
+}
+
+type cachedInvokers struct {
+	sync.Map /*[string]weightedRoundRobin*/
+}
diff --git a/cluster/loadbalance/round_robin_test.go b/cluster/loadbalance/round_robin_test.go
new file mode 100644
index 0000000000000000000000000000000000000000..e261884b55971d95ccfbfdc34e32a07dc256fcca
--- /dev/null
+++ b/cluster/loadbalance/round_robin_test.go
@@ -0,0 +1,59 @@
+package loadbalance
+
+import (
+	"context"
+	"fmt"
+	"strconv"
+	"testing"
+)
+
+import (
+	"github.com/stretchr/testify/assert"
+)
+
+import (
+	"github.com/apache/dubbo-go/common"
+	"github.com/apache/dubbo-go/protocol"
+	"github.com/apache/dubbo-go/protocol/invocation"
+)
+
+func TestRoundRobinSelect(t *testing.T) {
+	loadBalance := NewRoundRobinLoadBalance()
+
+	var invokers []protocol.Invoker
+
+	url, _ := common.NewURL(context.TODO(), "dubbo://192.168.1.0:20000/org.apache.demo.HelloService")
+	invokers = append(invokers, protocol.NewBaseInvoker(url))
+	i := loadBalance.Select(invokers, &invocation.RPCInvocation{})
+	assert.True(t, i.GetUrl().URLEqual(url))
+
+	for i := 1; i < 10; i++ {
+		url, _ := common.NewURL(context.TODO(), fmt.Sprintf("dubbo://192.168.1.%v:20000/org.apache.demo.HelloService", i))
+		invokers = append(invokers, protocol.NewBaseInvoker(url))
+	}
+	loadBalance.Select(invokers, &invocation.RPCInvocation{})
+}
+
+func TestRoundRobinByWeight(t *testing.T) {
+	loadBalance := NewRoundRobinLoadBalance()
+
+	var invokers []protocol.Invoker
+	loop := 10
+	for i := 1; i <= loop; i++ {
+		url, _ := common.NewURL(context.TODO(), fmt.Sprintf("dubbo://192.168.1.%v:20000/org.apache.demo.HelloService?weight=%v", i, i))
+		invokers = append(invokers, protocol.NewBaseInvoker(url))
+	}
+
+	loop = (1 + loop) * loop / 2
+	selected := make(map[protocol.Invoker]int)
+
+	for i := 1; i <= loop; i++ {
+		invoker := loadBalance.Select(invokers, &invocation.RPCInvocation{})
+		selected[invoker]++
+	}
+
+	for _, i := range invokers {
+		w, _ := strconv.Atoi(i.GetUrl().GetParam("weight", "-1"))
+		assert.True(t, selected[i] == w)
+	}
+}
diff --git a/cluster/loadbalance/util.go b/cluster/loadbalance/util.go
index 736952159d83a9f4ade2c2d0cb190b60f1ef3fbe..7e0c2e265073c0a96032a6dd3294a6d73c1a4001 100644
--- a/cluster/loadbalance/util.go
+++ b/cluster/loadbalance/util.go
@@ -28,7 +28,8 @@ import (
 
 func GetWeight(invoker protocol.Invoker, invocation protocol.Invocation) int64 {
 	url := invoker.GetUrl()
-	weight := url.GetMethodParamInt(invocation.MethodName(), constant.WEIGHT_KEY, constant.DEFAULT_WEIGHT)
+	weight := url.GetMethodParamInt64(invocation.MethodName(), constant.WEIGHT_KEY, constant.DEFAULT_WEIGHT)
+
 	if weight > 0 {
 		//get service register time an do warm up time
 		now := time.Now().Unix()
diff --git a/common/constant/key.go b/common/constant/key.go
index 985dd4f80f531c107ef11cee1282da8742d7d62a..e49ddae7259bc8fd07351e38964594f2b28d3795 100644
--- a/common/constant/key.go
+++ b/common/constant/key.go
@@ -67,3 +67,8 @@ const (
 	OWNER_KEY        = "owner"
 	ENVIRONMENT_KEY  = "environment"
 )
+
+const (
+	CONFIG_NAMESPACE_KEY = "config.namespace"
+	CONFIG_TIMEOUT_KET   = "config.timeout"
+)
diff --git a/common/extension/config_center.go b/common/extension/config_center.go
new file mode 100644
index 0000000000000000000000000000000000000000..be4b62ccdd9c36500c306c7f16abd054f91ae86b
--- /dev/null
+++ b/common/extension/config_center.go
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package extension
+
+import (
+	"github.com/apache/dubbo-go/common"
+	"github.com/apache/dubbo-go/config_center"
+)
+
+var (
+	configCenters = make(map[string]func(config *common.URL) (config_center.DynamicConfiguration, error))
+)
+
+func SetConfigCenter(name string, v func(config *common.URL) (config_center.DynamicConfiguration, error)) {
+	configCenters[name] = v
+}
+
+func GetConfigCenter(name string, config *common.URL) (config_center.DynamicConfiguration, error) {
+	if configCenters[name] == nil {
+		panic("config center for " + name + " is not existing, make sure you have import the package.")
+	}
+	return configCenters[name](config)
+
+}
diff --git a/common/url.go b/common/url.go
index 115167ee3ebab6718dfd8a229f2a370e5b79a982..4fb1af767fa7db6e84aefd3e33afddc099ebfcb1 100644
--- a/common/url.go
+++ b/common/url.go
@@ -20,6 +20,7 @@ package common
 import (
 	"context"
 	"fmt"
+	"math"
 	"net"
 	"net/url"
 	"strconv"
@@ -288,6 +289,15 @@ func (c URL) GetMethodParamInt(method string, key string, d int64) int64 {
 	return int64(r)
 }
 
+func (c URL) GetMethodParamInt64(method string, key string, d int64) int64 {
+	r := c.GetMethodParamInt(method, key, math.MinInt64)
+	if r == math.MinInt64 {
+		return c.GetParamInt(key, d)
+	}
+
+	return r
+}
+
 func (c URL) GetMethodParam(method string, key string, d string) string {
 	var r string
 	if r = c.Params.Get("methods." + method + "." + key); r == "" {
diff --git a/config_center/dynamic_configuration.go b/config_center/dynamic_configuration.go
new file mode 100644
index 0000000000000000000000000000000000000000..8115fed8203438c9371a1b7fa20815843a2e81fd
--- /dev/null
+++ b/config_center/dynamic_configuration.go
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package config_center
+
+import (
+	"time"
+)
+import (
+	"github.com/apache/dubbo-go/remoting"
+)
+
+//////////////////////////////////////////
+// DynamicConfiguration
+//////////////////////////////////////////
+const DEFAULT_GROUP = "dubbo"
+const DEFAULT_CONFIG_TIMEOUT = "10s"
+
+type DynamicConfiguration interface {
+	AddListener(string, remoting.ConfigurationListener, ...Option)
+	RemoveListener(string, remoting.ConfigurationListener, ...Option)
+	GetConfig(string, ...Option) string
+	GetConfigs(string, ...Option) string
+}
+
+type Options struct {
+	Group   string
+	Timeout time.Duration
+}
+
+type Option func(*Options)
+
+func WithGroup(group string) Option {
+	return func(opt *Options) {
+		opt.Group = group
+	}
+}
+
+func WithTimeout(time time.Duration) Option {
+	return func(opt *Options) {
+		opt.Timeout = time
+	}
+}
diff --git a/config_center/zookeeper/dynamic_configuration.go b/config_center/zookeeper/dynamic_configuration.go
new file mode 100644
index 0000000000000000000000000000000000000000..c998c586fb0a22223acaee634cb5f187e04223fc
--- /dev/null
+++ b/config_center/zookeeper/dynamic_configuration.go
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package zookeeper
+
+import (
+	"sync"
+)
+import (
+	"github.com/apache/dubbo-go/common"
+	"github.com/apache/dubbo-go/common/constant"
+	"github.com/apache/dubbo-go/common/logger"
+	"github.com/apache/dubbo-go/config_center"
+	"github.com/apache/dubbo-go/remoting"
+	"github.com/apache/dubbo-go/remoting/zookeeper"
+)
+
+const ZK_CLIENT = "zk config_center"
+
+type ZookeeperDynamicConfiguration struct {
+	url      common.URL
+	rootPath string
+	wg       sync.WaitGroup
+	cltLock  sync.Mutex
+	done     chan struct{}
+	client   *zookeeper.ZookeeperClient
+
+	listenerLock sync.Mutex
+	listener     *zookeeper.ZkEventListener
+}
+
+func NewZookeeperDynamicConfiguration(url common.URL) (config_center.DynamicConfiguration, error) {
+	c := &ZookeeperDynamicConfiguration{
+		url:      url,
+		rootPath: "/" + url.GetParam(constant.CONFIG_NAMESPACE_KEY, config_center.DEFAULT_GROUP) + "/config",
+	}
+	err := zookeeper.ValidateZookeeperClient(c, zookeeper.WithZkName(ZK_CLIENT))
+	if err != nil {
+		return nil, err
+	}
+	c.wg.Add(1)
+	go zookeeper.HandleClientRestart(c)
+
+	c.listener = zookeeper.NewZkEventListener(c.client)
+	//c.configListener = NewRegistryConfigurationListener(c.client, c)
+	//c.dataListener = NewRegistryDataListener(c.configListener)
+	return c, nil
+
+}
+
+func (*ZookeeperDynamicConfiguration) AddListener(key string, listener remoting.ConfigurationListener, opions ...config_center.Option) {
+
+}
+
+func (*ZookeeperDynamicConfiguration) RemoveListener(key string, listener remoting.ConfigurationListener, opions ...config_center.Option) {
+
+}
+
+func (*ZookeeperDynamicConfiguration) GetConfig(key string, opions ...config_center.Option) string {
+	return ""
+}
+
+func (*ZookeeperDynamicConfiguration) GetConfigs(key string, opions ...config_center.Option) string {
+	return ""
+}
+
+func (r *ZookeeperDynamicConfiguration) ZkClient() *zookeeper.ZookeeperClient {
+	return r.client
+}
+
+func (r *ZookeeperDynamicConfiguration) SetZkClient(client *zookeeper.ZookeeperClient) {
+	r.client = client
+}
+
+func (r *ZookeeperDynamicConfiguration) ZkClientLock() *sync.Mutex {
+	return &r.cltLock
+}
+
+func (r *ZookeeperDynamicConfiguration) WaitGroup() *sync.WaitGroup {
+	return &r.wg
+}
+
+func (r *ZookeeperDynamicConfiguration) GetDone() chan struct{} {
+	return r.done
+}
+
+func (r *ZookeeperDynamicConfiguration) GetUrl() common.URL {
+	return r.url
+}
+
+func (r *ZookeeperDynamicConfiguration) Destroy() {
+	if r.listener != nil {
+		r.listener.Close()
+	}
+	close(r.done)
+	r.wg.Wait()
+	r.closeConfigs()
+}
+
+func (r *ZookeeperDynamicConfiguration) IsAvailable() bool {
+	select {
+	case <-r.done:
+		return false
+	default:
+		return true
+	}
+}
+
+func (r *ZookeeperDynamicConfiguration) closeConfigs() {
+	r.cltLock.Lock()
+	defer r.cltLock.Unlock()
+	logger.Infof("begin to close provider zk client")
+	// 鍏堝叧闂棫client锛屼互鍏抽棴tmp node
+	r.client.Close()
+	r.client = nil
+}
+
+func (r *ZookeeperDynamicConfiguration) RestartCallBack() bool {
+	return true
+}
diff --git a/registry/directory/directory.go b/registry/directory/directory.go
index 32841be5f68b07b1f87d2080834de83debb53893..763a6149b355aadeff212b8e2f05ae51f6457045 100644
--- a/registry/directory/directory.go
+++ b/registry/directory/directory.go
@@ -18,6 +18,7 @@
 package directory
 
 import (
+	"github.com/apache/dubbo-go/remoting"
 	"sync"
 	"time"
 )
@@ -130,10 +131,10 @@ func (dir *registryDirectory) update(res *registry.ServiceEvent) {
 func (dir *registryDirectory) refreshInvokers(res *registry.ServiceEvent) {
 
 	switch res.Action {
-	case registry.ServiceAdd:
+	case remoting.Add:
 		//dir.cacheService.Add(res.Path, dir.serviceTTL)
 		dir.cacheInvoker(res.Service)
-	case registry.ServiceDel:
+	case remoting.Del:
 		//dir.cacheService.Del(res.Path, dir.serviceTTL)
 		dir.uncacheInvoker(res.Service)
 		logger.Infof("selector delete service url{%s}", res.Service)
diff --git a/registry/directory/directory_test.go b/registry/directory/directory_test.go
index eafd3f7c17a47520bd91e99176814753f5d7ddef..99cf93a23f7d93afdc9790a1399f30da13e857ac 100644
--- a/registry/directory/directory_test.go
+++ b/registry/directory/directory_test.go
@@ -19,6 +19,7 @@ package directory
 
 import (
 	"context"
+	"github.com/apache/dubbo-go/remoting"
 	"net/url"
 	"strconv"
 	"testing"
@@ -50,7 +51,7 @@ func TestSubscribe_Delete(t *testing.T) {
 	registryDirectory, mockRegistry := normalRegistryDir()
 	time.Sleep(1e9)
 	assert.Len(t, registryDirectory.cacheInvokers, 3)
-	mockRegistry.MockEvent(&registry.ServiceEvent{Action: registry.ServiceDel, Service: *common.NewURLWithOptions("TEST0", common.WithProtocol("dubbo"))})
+	mockRegistry.MockEvent(&registry.ServiceEvent{Action: remoting.Del, Service: *common.NewURLWithOptions("TEST0", common.WithProtocol("dubbo"))})
 	time.Sleep(1e9)
 	assert.Len(t, registryDirectory.cacheInvokers, 2)
 }
@@ -80,7 +81,7 @@ func TestSubscribe_Group(t *testing.T) {
 	urlmap.Set(constant.GROUP_KEY, "group1")
 	urlmap.Set(constant.CLUSTER_KEY, "failover") //to test merge url
 	for i := 0; i < 3; i++ {
-		mockRegistry.(*registry.MockRegistry).MockEvent(&registry.ServiceEvent{Action: registry.ServiceAdd, Service: *common.NewURLWithOptions("TEST"+strconv.FormatInt(int64(i), 10), common.WithProtocol("dubbo"),
+		mockRegistry.(*registry.MockRegistry).MockEvent(&registry.ServiceEvent{Action: remoting.Add, Service: *common.NewURLWithOptions("TEST"+strconv.FormatInt(int64(i), 10), common.WithProtocol("dubbo"),
 			common.WithParams(urlmap))})
 	}
 	//for group2
@@ -88,7 +89,7 @@ func TestSubscribe_Group(t *testing.T) {
 	urlmap2.Set(constant.GROUP_KEY, "group2")
 	urlmap2.Set(constant.CLUSTER_KEY, "failover") //to test merge url
 	for i := 0; i < 3; i++ {
-		mockRegistry.(*registry.MockRegistry).MockEvent(&registry.ServiceEvent{Action: registry.ServiceAdd, Service: *common.NewURLWithOptions("TEST"+strconv.FormatInt(int64(i), 10), common.WithProtocol("dubbo"),
+		mockRegistry.(*registry.MockRegistry).MockEvent(&registry.ServiceEvent{Action: remoting.Add, Service: *common.NewURLWithOptions("TEST"+strconv.FormatInt(int64(i), 10), common.WithProtocol("dubbo"),
 			common.WithParams(urlmap2))})
 	}
 
@@ -128,7 +129,7 @@ func normalRegistryDir() (*registryDirectory, *registry.MockRegistry) {
 
 	go registryDirectory.Subscribe(*common.NewURLWithOptions("testservice"))
 	for i := 0; i < 3; i++ {
-		mockRegistry.(*registry.MockRegistry).MockEvent(&registry.ServiceEvent{Action: registry.ServiceAdd, Service: *common.NewURLWithOptions("TEST"+strconv.FormatInt(int64(i), 10), common.WithProtocol("dubbo"))})
+		mockRegistry.(*registry.MockRegistry).MockEvent(&registry.ServiceEvent{Action: remoting.Add, Service: *common.NewURLWithOptions("TEST"+strconv.FormatInt(int64(i), 10), common.WithProtocol("dubbo"))})
 	}
 	return registryDirectory, mockRegistry.(*registry.MockRegistry)
 }
diff --git a/registry/event.go b/registry/event.go
index ef51bbbe1710b1fdb166d34af58c630a5203b53a..24f5b72e8b27d4dc727e72d641d8bae3e00ff165 100644
--- a/registry/event.go
+++ b/registry/event.go
@@ -25,38 +25,19 @@ import (
 
 import (
 	"github.com/apache/dubbo-go/common"
+	"github.com/apache/dubbo-go/remoting"
 )
 
 func init() {
 	rand.Seed(time.Now().UnixNano())
 }
 
-//////////////////////////////////////////
-// service url event type
-//////////////////////////////////////////
-
-type ServiceEventType int
-
-const (
-	ServiceAdd = iota
-	ServiceDel
-)
-
-var serviceEventTypeStrings = [...]string{
-	"add service",
-	"delete service",
-}
-
-func (t ServiceEventType) String() string {
-	return serviceEventTypeStrings[t]
-}
-
 //////////////////////////////////////////
 // service event
 //////////////////////////////////////////
 
 type ServiceEvent struct {
-	Action  ServiceEventType
+	Action  remoting.EventType
 	Service common.URL
 }
 
diff --git a/registry/zookeeper/listener.go b/registry/zookeeper/listener.go
index 711b876ef2bb2339423a56423155441ae54657f3..4f3cf1e19ad28cb10be8adf85351c1f2eb4d4622 100644
--- a/registry/zookeeper/listener.go
+++ b/registry/zookeeper/listener.go
@@ -19,295 +19,64 @@ package zookeeper
 
 import (
 	"context"
-	"fmt"
-	"path"
-	"sync"
-	"time"
 )
-
 import (
 	perrors "github.com/pkg/errors"
-	"github.com/samuel/go-zookeeper/zk"
 )
-
 import (
 	"github.com/apache/dubbo-go/common"
 	"github.com/apache/dubbo-go/common/logger"
 	"github.com/apache/dubbo-go/registry"
+	"github.com/apache/dubbo-go/remoting"
+	zk "github.com/apache/dubbo-go/remoting/zookeeper"
 )
 
-const (
-	MaxFailTimes = 15
-)
-
-type zkEvent struct {
-	res *registry.ServiceEvent
-	err error
+type RegistryDataListener struct {
+	interestedURL []*common.URL
+	listener      *RegistryConfigurationListener
 }
 
-func (e zkEvent) String() string {
-	return fmt.Sprintf("err:%s, res:%s", e.err, e.res)
+func NewRegistryDataListener(listener *RegistryConfigurationListener) *RegistryDataListener {
+	return &RegistryDataListener{listener: listener, interestedURL: []*common.URL{}}
 }
-
-type zkEventListener struct {
-	client         *zookeeperClient
-	events         chan zkEvent
-	serviceMapLock sync.Mutex
-	serviceMap     map[string]struct{}
-	wg             sync.WaitGroup
-	registry       *zkRegistry
+func (l *RegistryDataListener) AddInterestedURL(url *common.URL) {
+	l.interestedURL = append(l.interestedURL, url)
 }
 
-func newZkEventListener(registry *zkRegistry, client *zookeeperClient) *zkEventListener {
-	return &zkEventListener{
-		client:     client,
-		registry:   registry,
-		events:     make(chan zkEvent, 32),
-		serviceMap: make(map[string]struct{}),
+func (l *RegistryDataListener) DataChange(eventType remoting.Event) bool {
+	serviceURL, err := common.NewURL(context.TODO(), eventType.Content)
+	if err != nil {
+		logger.Errorf("Listen NewURL(r{%s}) = error{%v}", eventType.Content, err)
+		return false
 	}
-}
-
-func (l *zkEventListener) listenServiceNodeEvent(zkPath string) bool {
-	l.wg.Add(1)
-	defer l.wg.Done()
-	var zkEvent zk.Event
-	for {
-		keyEventCh, err := l.client.existW(zkPath)
-		if err != nil {
-			logger.Errorf("existW{key:%s} = error{%v}", zkPath, err)
-			return false
-		}
-
-		select {
-		case zkEvent = <-keyEventCh:
-			logger.Warnf("get a zookeeper zkEvent{type:%s, server:%s, path:%s, state:%d-%s, err:%s}",
-				zkEvent.Type.String(), zkEvent.Server, zkEvent.Path, zkEvent.State, stateToString(zkEvent.State), zkEvent.Err)
-			switch zkEvent.Type {
-			case zk.EventNodeDataChanged:
-				logger.Warnf("zk.ExistW(key{%s}) = event{EventNodeDataChanged}", zkPath)
-			case zk.EventNodeCreated:
-				logger.Warnf("zk.ExistW(key{%s}) = event{EventNodeCreated}", zkPath)
-			case zk.EventNotWatching:
-				logger.Warnf("zk.ExistW(key{%s}) = event{EventNotWatching}", zkPath)
-			case zk.EventNodeDeleted:
-				logger.Warnf("zk.ExistW(key{%s}) = event{EventNodeDeleted}", zkPath)
-				return true
-			}
-		case <-l.client.done():
-			return false
+	for _, v := range l.interestedURL {
+		if serviceURL.URLEqual(*v) {
+			l.listener.Process(&remoting.ConfigChangeEvent{Value: serviceURL, ConfigType: eventType.Action})
+			return true
 		}
 	}
 
 	return false
 }
 
-func (l *zkEventListener) handleZkNodeEvent(zkPath string, children []string, conf common.URL) {
-	contains := func(s []string, e string) bool {
-		for _, a := range s {
-			if a == e {
-				return true
-			}
-		}
-
-		return false
-	}
-
-	newChildren, err := l.client.getChildren(zkPath)
-	if err != nil {
-		logger.Errorf("path{%s} child nodes changed, zk.Children() = error{%v}", zkPath, perrors.WithStack(err))
-		return
-	}
-
-	// a node was added -- listen the new node
-	var (
-		newNode    string
-		serviceURL common.URL
-	)
-	for _, n := range newChildren {
-		if contains(children, n) {
-			continue
-		}
-
-		newNode = path.Join(zkPath, n)
-		logger.Infof("add zkNode{%s}", newNode)
-		//context.TODO
-		serviceURL, err = common.NewURL(context.TODO(), n)
-		if err != nil {
-			logger.Errorf("NewURL(%s) = error{%v}", n, perrors.WithStack(err))
-			continue
-		}
-		if !conf.URLEqual(serviceURL) {
-			logger.Warnf("serviceURL{%s} is not compatible with SubURL{%#v}", serviceURL.Key(), conf.Key())
-			continue
-		}
-		logger.Infof("add serviceURL{%s}", serviceURL)
-		l.events <- zkEvent{&registry.ServiceEvent{Action: registry.ServiceAdd, Service: serviceURL}, nil}
-		// listen l service node
-		go func(node string, serviceURL common.URL) {
-			logger.Infof("delete zkNode{%s}", node)
-			if l.listenServiceNodeEvent(node) {
-				logger.Infof("delete serviceURL{%s}", serviceURL)
-				l.events <- zkEvent{&registry.ServiceEvent{Action: registry.ServiceDel, Service: serviceURL}, nil}
-			}
-			logger.Warnf("listenSelf(zk path{%s}) goroutine exit now", zkPath)
-		}(newNode, serviceURL)
-	}
-
-	// old node was deleted
-	var oldNode string
-	for _, n := range children {
-		if contains(newChildren, n) {
-			continue
-		}
-
-		oldNode = path.Join(zkPath, n)
-		logger.Warnf("delete zkPath{%s}", oldNode)
-		serviceURL, err = common.NewURL(context.TODO(), n)
-		if !conf.URLEqual(serviceURL) {
-			logger.Warnf("serviceURL{%s} has been deleted is not compatible with SubURL{%#v}", serviceURL.Key(), conf.Key())
-			continue
-		}
-		logger.Warnf("delete serviceURL{%s}", serviceURL)
-		if err != nil {
-			logger.Errorf("NewURL(i{%s}) = error{%v}", n, perrors.WithStack(err))
-			continue
-		}
-		l.events <- zkEvent{&registry.ServiceEvent{Action: registry.ServiceDel, Service: serviceURL}, nil}
-	}
+type RegistryConfigurationListener struct {
+	client   *zk.ZookeeperClient
+	registry *zkRegistry
+	events   chan *remoting.ConfigChangeEvent
 }
 
-func (l *zkEventListener) listenDirEvent(zkPath string, conf common.URL) {
-	l.wg.Add(1)
-	defer l.wg.Done()
-
-	var (
-		failTimes int
-		event     chan struct{}
-		zkEvent   zk.Event
-	)
-	event = make(chan struct{}, 4)
-	defer close(event)
-	for {
-		// get current children for a zkPath
-		children, childEventCh, err := l.client.getChildrenW(zkPath)
-		if err != nil {
-			failTimes++
-			if MaxFailTimes <= failTimes {
-				failTimes = MaxFailTimes
-			}
-			logger.Errorf("listenDirEvent(path{%s}) = error{%v}", zkPath, err)
-			// clear the event channel
-		CLEAR:
-			for {
-				select {
-				case <-event:
-				default:
-					break CLEAR
-				}
-			}
-			l.client.registerEvent(zkPath, &event)
-			select {
-			case <-time.After(timeSecondDuration(failTimes * RegistryConnDelay)):
-				l.client.unregisterEvent(zkPath, &event)
-				continue
-			case <-l.client.done():
-				l.client.unregisterEvent(zkPath, &event)
-				logger.Warnf("client.done(), listen(path{%s}, ReferenceConfig{%#v}) goroutine exit now...", zkPath, conf)
-				return
-			case <-event:
-				logger.Infof("get zk.EventNodeDataChange notify event")
-				l.client.unregisterEvent(zkPath, &event)
-				l.handleZkNodeEvent(zkPath, nil, conf)
-				continue
-			}
-		}
-		failTimes = 0
-
-		select {
-		case zkEvent = <-childEventCh:
-			logger.Warnf("get a zookeeper zkEvent{type:%s, server:%s, path:%s, state:%d-%s, err:%s}",
-				zkEvent.Type.String(), zkEvent.Server, zkEvent.Path, zkEvent.State, stateToString(zkEvent.State), zkEvent.Err)
-			if zkEvent.Type != zk.EventNodeChildrenChanged {
-				continue
-			}
-			l.handleZkNodeEvent(zkEvent.Path, children, conf)
-		case <-l.client.done():
-			logger.Warnf("client.done(), listen(path{%s}, ReferenceConfig{%#v}) goroutine exit now...", zkPath, conf)
-			return
-		}
-	}
+func NewRegistryConfigurationListener(client *zk.ZookeeperClient, reg *zkRegistry) *RegistryConfigurationListener {
+	reg.wg.Add(1)
+	return &RegistryConfigurationListener{client: client, registry: reg, events: make(chan *remoting.ConfigChangeEvent, 32)}
 }
-
-// this func is invoked by ZkConsumerRegistry::Registe/ZkConsumerRegistry::get/ZkConsumerRegistry::getListener
-// registry.go:Listen -> listenServiceEvent -> listenDirEvent -> listenServiceNodeEvent
-//                            |
-//                            --------> listenServiceNodeEvent
-func (l *zkEventListener) listenServiceEvent(conf common.URL) {
-	var (
-		err        error
-		zkPath     string
-		dubboPath  string
-		children   []string
-		serviceURL common.URL
-	)
-
-	zkPath = fmt.Sprintf("/dubbo%s/providers", conf.Path)
-
-	l.serviceMapLock.Lock()
-	_, ok := l.serviceMap[zkPath]
-	l.serviceMapLock.Unlock()
-	if ok {
-		logger.Warnf("@zkPath %s has already been listened.", zkPath)
-		return
-	}
-
-	l.serviceMapLock.Lock()
-	l.serviceMap[zkPath] = struct{}{}
-	l.serviceMapLock.Unlock()
-
-	logger.Infof("listen dubbo provider path{%s} event and wait to get all provider zk nodes", zkPath)
-	children, err = l.client.getChildren(zkPath)
-	if err != nil {
-		children = nil
-		logger.Errorf("fail to get children of zk path{%s}", zkPath)
-	}
-
-	for _, c := range children {
-		serviceURL, err = common.NewURL(context.TODO(), c)
-		if err != nil {
-			logger.Errorf("NewURL(r{%s}) = error{%v}", c, err)
-			continue
-		}
-		if !conf.URLEqual(serviceURL) {
-			logger.Warnf("serviceURL %v is not compatible with SubURL %v", serviceURL.Key(), conf.Key())
-			continue
-		}
-		logger.Debugf("add serviceUrl{%s}", serviceURL)
-		l.events <- zkEvent{&registry.ServiceEvent{Action: registry.ServiceAdd, Service: serviceURL}, nil}
-
-		// listen l service node
-		dubboPath = path.Join(zkPath, c)
-		logger.Infof("listen dubbo service key{%s}", dubboPath)
-		go func(zkPath string, serviceURL common.URL) {
-			if l.listenServiceNodeEvent(dubboPath) {
-				logger.Debugf("delete serviceUrl{%s}", serviceURL)
-				l.events <- zkEvent{&registry.ServiceEvent{Action: registry.ServiceDel, Service: serviceURL}, nil}
-			}
-			logger.Warnf("listenSelf(zk path{%s}) goroutine exit now", zkPath)
-		}(dubboPath, serviceURL)
-	}
-
-	logger.Infof("listen dubbo path{%s}", zkPath)
-	go func(zkPath string, conf common.URL) {
-		l.listenDirEvent(zkPath, conf)
-		logger.Warnf("listenDirEvent(zkPath{%s}) goroutine exit now", zkPath)
-	}(zkPath, conf)
+func (l *RegistryConfigurationListener) Process(configType *remoting.ConfigChangeEvent) {
+	l.events <- configType
 }
 
-func (l *zkEventListener) Next() (*registry.ServiceEvent, error) {
+func (l *RegistryConfigurationListener) Next() (*registry.ServiceEvent, error) {
 	for {
 		select {
-		case <-l.client.done():
+		case <-l.client.Done():
 			logger.Warnf("listener's zk client connection is broken, so zk event listener exit now.")
 			return nil, perrors.New("listener stopped")
 
@@ -317,29 +86,21 @@ func (l *zkEventListener) Next() (*registry.ServiceEvent, error) {
 
 		case e := <-l.events:
 			logger.Debugf("got zk event %s", e)
-			if e.err != nil {
-				return nil, perrors.WithStack(e.err)
-			}
-			if e.res.Action == registry.ServiceDel && !l.valid() {
-				logger.Warnf("update @result{%s}. But its connection to registry is invalid", e.res)
+			if e.ConfigType == remoting.Del && !l.valid() {
+				logger.Warnf("update @result{%s}. But its connection to registry is invalid", e.Value)
 				continue
 			}
 			//r.update(e.res)
 			//write to invoker
 			//r.outerEventCh <- e.res
-			return e.res, nil
+			return &registry.ServiceEvent{Action: e.ConfigType, Service: e.Value.(common.URL)}, nil
 		}
 	}
 }
-
-func (l *zkEventListener) valid() bool {
-	return l.client.zkConnValid()
+func (l *RegistryConfigurationListener) Close() {
+	l.registry.wg.Done()
 }
 
-func (l *zkEventListener) Close() {
-	l.registry.listenerLock.Lock()
-	l.client.Close()
-	l.registry.listenerLock.Unlock()
-	l.registry.wg.Done()
-	l.wg.Wait()
+func (l *RegistryConfigurationListener) valid() bool {
+	return l.client.ZkConnValid()
 }
diff --git a/registry/zookeeper/registry.go b/registry/zookeeper/registry.go
index 5302c84c8e445026d6d9c68221249a46d263e09c..ff6a9939694acf89f9a4e73e9566d20d55bf9c0e 100644
--- a/registry/zookeeper/registry.go
+++ b/registry/zookeeper/registry.go
@@ -40,13 +40,12 @@ import (
 	"github.com/apache/dubbo-go/common/logger"
 	"github.com/apache/dubbo-go/common/utils"
 	"github.com/apache/dubbo-go/registry"
+	"github.com/apache/dubbo-go/remoting/zookeeper"
 	"github.com/apache/dubbo-go/version"
 )
 
 const (
-	defaultTimeout    = int64(10e9)
-	RegistryZkClient  = "zk registry"
-	RegistryConnDelay = 3
+	RegistryZkClient = "zk registry"
 )
 
 var (
@@ -73,14 +72,16 @@ type zkRegistry struct {
 	done  chan struct{}
 
 	cltLock  sync.Mutex
-	client   *zookeeperClient
+	client   *zookeeper.ZookeeperClient
 	services map[string]common.URL // service name + protocol -> service config
 
-	listenerLock sync.Mutex
-	listener     *zkEventListener
-
+	listenerLock   sync.Mutex
+	listener       *zookeeper.ZkEventListener
+	dataListener   *RegistryDataListener
+	configListener *RegistryConfigurationListener
 	//for provider
 	zkPath map[string]int // key = protocol://ip:port/interface
+
 }
 
 func newZkRegistry(url *common.URL) (registry.Registry, error) {
@@ -97,30 +98,28 @@ func newZkRegistry(url *common.URL) (registry.Registry, error) {
 		zkPath:   make(map[string]int),
 	}
 
-	//if r.SubURL.Name == "" {
-	//	r.SubURL.Name = RegistryZkClient
-	//}
-	//if r.Version == "" {
-	//	r.Version = version.Version
-	//}
-
-	err = r.validateZookeeperClient()
+	err = zookeeper.ValidateZookeeperClient(r, zookeeper.WithZkName(RegistryZkClient))
 	if err != nil {
 		return nil, err
 	}
 
 	r.wg.Add(1)
-	go r.handleZkRestart()
+	go zookeeper.HandleClientRestart(r)
 
-	//if r.RoleType == registry.CONSUMER {
-	//	r.wg.Add(1)
-	//	go r.listen()
-	//}
+	r.listener = zookeeper.NewZkEventListener(r.client)
+	r.configListener = NewRegistryConfigurationListener(r.client, r)
+	r.dataListener = NewRegistryDataListener(r.configListener)
 
 	return r, nil
 }
 
-func newMockZkRegistry(url *common.URL) (*zk.TestCluster, *zkRegistry, error) {
+type Options struct {
+	client *zookeeper.ZookeeperClient
+}
+
+type Option func(*Options)
+
+func newMockZkRegistry(url *common.URL, opts ...zookeeper.Option) (*zk.TestCluster, *zkRegistry, error) {
 	var (
 		err error
 		r   *zkRegistry
@@ -136,139 +135,78 @@ func newMockZkRegistry(url *common.URL) (*zk.TestCluster, *zkRegistry, error) {
 		zkPath:   make(map[string]int),
 	}
 
-	c, r.client, _, err = newMockZookeeperClient("test", 15*time.Second)
+	c, r.client, _, err = zookeeper.NewMockZookeeperClient("test", 15*time.Second, opts...)
 	if err != nil {
 		return nil, nil, err
 	}
-
 	r.wg.Add(1)
-	go r.handleZkRestart()
+	go zookeeper.HandleClientRestart(r)
 
-	//if r.RoleType == registry.CONSUMER {
-	//	r.wg.Add(1)
-	//	go r.listen()
-	//}
+	r.listener = zookeeper.NewZkEventListener(r.client)
+	r.configListener = NewRegistryConfigurationListener(r.client, r)
+	r.dataListener = NewRegistryDataListener(r.configListener)
 
 	return c, r, nil
 }
+func (r *zkRegistry) ZkClient() *zookeeper.ZookeeperClient {
+	return r.client
+}
+
+func (r *zkRegistry) SetZkClient(client *zookeeper.ZookeeperClient) {
+	r.client = client
+}
+
+func (r *zkRegistry) ZkClientLock() *sync.Mutex {
+	return &r.cltLock
+}
+
+func (r *zkRegistry) WaitGroup() *sync.WaitGroup {
+	return &r.wg
+}
+
+func (r *zkRegistry) GetDone() chan struct{} {
+	return r.done
+}
+
 func (r *zkRegistry) GetUrl() common.URL {
 	return *r.URL
 }
 
 func (r *zkRegistry) Destroy() {
-	if r.listener != nil {
-		r.listener.Close()
+	if r.configListener != nil {
+		r.configListener.Close()
 	}
 	close(r.done)
 	r.wg.Wait()
 	r.closeRegisters()
 }
 
-func (r *zkRegistry) validateZookeeperClient() error {
-	var (
-		err error
-	)
+func (r *zkRegistry) RestartCallBack() bool {
 
-	err = nil
-	r.cltLock.Lock()
-	defer r.cltLock.Unlock()
-	if r.client == nil {
-		//in dubbp ,every registry only connect one node ,so this is []string{r.Address}
-		timeout, err := time.ParseDuration(r.GetParam(constant.REGISTRY_TIMEOUT_KEY, constant.DEFAULT_REG_TIMEOUT))
-		if err != nil {
-			logger.Errorf("timeout config %v is invalid ,err is %v",
-				r.GetParam(constant.REGISTRY_TIMEOUT_KEY, constant.DEFAULT_REG_TIMEOUT), err.Error())
-			return perrors.WithMessagef(err, "newZookeeperClient(address:%+v)", r.Location)
-		}
-		r.client, err = newZookeeperClient(RegistryZkClient, []string{r.Location}, timeout)
-		if err != nil {
-			logger.Warnf("newZookeeperClient(name{%s}, zk addresss{%v}, timeout{%d}) = error{%v}",
-				RegistryZkClient, r.Location, timeout.String(), err)
-			return perrors.WithMessagef(err, "newZookeeperClient(address:%+v)", r.Location)
-		}
-	}
-	if r.client.conn == nil {
-		var event <-chan zk.Event
-		r.client.conn, event, err = zk.Connect(r.client.zkAddrs, r.client.timeout)
-		if err == nil {
-			r.client.wait.Add(1)
-			go r.client.handleZkEvent(event)
-		}
+	// copy r.services
+	services := []common.URL{}
+	for _, confIf := range r.services {
+		services = append(services, confIf)
 	}
 
-	return perrors.WithMessagef(err, "newZookeeperClient(address:%+v)", r.PrimitiveURL)
-}
-
-func (r *zkRegistry) handleZkRestart() {
-	var (
-		err       error
-		flag      bool
-		failTimes int
-		confIf    common.URL
-	)
-
-	defer r.wg.Done()
-LOOP:
-	for {
-		select {
-		case <-r.done:
-			logger.Warnf("(ZkProviderRegistry)reconnectZkRegistry goroutine exit now...")
-			break LOOP
-			// re-register all services
-		case <-r.client.done():
-			r.cltLock.Lock()
-			r.client.Close()
-			r.client = nil
-			r.cltLock.Unlock()
-
-			// 鎺k锛岀洿鑷虫垚鍔�
-			failTimes = 0
-			for {
-				select {
-				case <-r.done:
-					logger.Warnf("(ZkProviderRegistry)reconnectZkRegistry goroutine exit now...")
-					break LOOP
-				case <-time.After(time.Duration(1e9 * failTimes * RegistryConnDelay)): // 闃叉鐤媯閲嶈繛zk
-				}
-				err = r.validateZookeeperClient()
-				logger.Infof("ZkProviderRegistry.validateZookeeperClient(zkAddr{%s}) = error{%#v}",
-					r.client.zkAddrs, perrors.WithStack(err))
-				if err == nil {
-					// copy r.services
-					services := []common.URL{}
-					for _, confIf = range r.services {
-						services = append(services, confIf)
-					}
-
-					flag = true
-					for _, confIf = range services {
-						err = r.register(confIf)
-						if err != nil {
-							logger.Errorf("(ZkProviderRegistry)register(conf{%#v}) = error{%#v}",
-								confIf, perrors.WithStack(err))
-							flag = false
-							break
-						}
-						logger.Infof("success to re-register service :%v", confIf.Key())
-					}
-					if flag {
-						break
-					}
-				}
-				failTimes++
-				if MaxFailTimes <= failTimes {
-					failTimes = MaxFailTimes
-				}
-			}
+	flag := true
+	for _, confIf := range services {
+		err := r.register(confIf)
+		if err != nil {
+			logger.Errorf("(ZkProviderRegistry)register(conf{%#v}) = error{%#v}",
+				confIf, perrors.WithStack(err))
+			flag = false
+			break
 		}
+		logger.Infof("success to re-register service :%v", confIf.Key())
 	}
+	return flag
 }
 
 func (r *zkRegistry) Register(conf common.URL) error {
 	var (
-		ok       bool
-		err      error
-		listener *zkEventListener
+		ok  bool
+		err error
 	)
 	role, _ := strconv.Atoi(r.URL.GetParam(constant.ROLE_KEY, ""))
 	switch role {
@@ -291,12 +229,6 @@ func (r *zkRegistry) Register(conf common.URL) error {
 		r.cltLock.Unlock()
 		logger.Debugf("(consumerZkConsumerRegistry)Register(conf{%#v})", conf)
 
-		r.listenerLock.Lock()
-		listener = r.listener
-		r.listenerLock.Unlock()
-		if listener != nil {
-			go listener.listenServiceEvent(conf)
-		}
 	case common.PROVIDER:
 
 		// 妫€楠屾湇鍔℃槸鍚﹀凡缁忔敞鍐岃繃
@@ -337,7 +269,7 @@ func (r *zkRegistry) register(c common.URL) error {
 		//conf       config.URL
 	)
 
-	err = r.validateZookeeperClient()
+	err = zookeeper.ValidateZookeeperClient(r, zookeeper.WithZkName(RegistryZkClient))
 	if err != nil {
 		return perrors.WithStack(err)
 	}
@@ -428,6 +360,7 @@ func (r *zkRegistry) register(c common.URL) error {
 
 		dubboPath = fmt.Sprintf("/dubbo%s/%s", c.Path, (common.RoleType(common.CONSUMER)).String())
 		logger.Debugf("consumer path:%s, url:%s", dubboPath, rawURL)
+
 	default:
 		return perrors.Errorf("@c{%v} type is not referencer or provider", c)
 	}
@@ -464,44 +397,37 @@ func (r *zkRegistry) registerTempZookeeperNode(root string, node string) error {
 }
 
 func (r *zkRegistry) Subscribe(conf common.URL) (registry.Listener, error) {
-	r.wg.Add(1)
 	return r.getListener(conf)
 }
 
-func (r *zkRegistry) getListener(conf common.URL) (*zkEventListener, error) {
+func (r *zkRegistry) getListener(conf common.URL) (*RegistryConfigurationListener, error) {
 	var (
-		zkListener *zkEventListener
+		zkListener *RegistryConfigurationListener
 	)
 
 	r.listenerLock.Lock()
-	zkListener = r.listener
+	zkListener = r.configListener
 	r.listenerLock.Unlock()
-	if zkListener != nil {
-		return zkListener, nil
-	}
+	if r.listener == nil {
+		r.cltLock.Lock()
+		client := r.client
+		r.cltLock.Unlock()
+		if client == nil {
+			return nil, perrors.New("zk connection broken")
+		}
 
-	r.cltLock.Lock()
-	client := r.client
-	r.cltLock.Unlock()
-	if client == nil {
-		return nil, perrors.New("zk connection broken")
-	}
+		// new client & listener
+		listener := zookeeper.NewZkEventListener(r.client)
 
-	// new client & listener
-	zkListener = newZkEventListener(r, client)
+		r.listenerLock.Lock()
+		r.listener = listener
+		r.listenerLock.Unlock()
+	}
 
-	r.listenerLock.Lock()
-	r.listener = zkListener
-	r.listenerLock.Unlock()
+	//娉ㄥ唽鍒癲ataconfig鐨刬nterested
+	r.dataListener.AddInterestedURL(&conf)
 
-	// listen
-	r.cltLock.Lock()
-	for _, svs := range r.services {
-		if svs.URLEqual(conf) {
-			go zkListener.listenServiceEvent(svs)
-		}
-	}
-	r.cltLock.Unlock()
+	go r.listener.ListenServiceEvent(fmt.Sprintf("/dubbo%s/providers", conf.Path), r.dataListener)
 
 	return zkListener, nil
 }
diff --git a/registry/zookeeper/registry_test.go b/registry/zookeeper/registry_test.go
index 2e85e12f3a87df880b78dc70760efaa7e6dd8203..ba2755fa9923d9e6c11a1908594a176ace458691 100644
--- a/registry/zookeeper/registry_test.go
+++ b/registry/zookeeper/registry_test.go
@@ -31,6 +31,7 @@ import (
 import (
 	"github.com/apache/dubbo-go/common"
 	"github.com/apache/dubbo-go/common/constant"
+	"github.com/apache/dubbo-go/remoting/zookeeper"
 )
 
 func Test_Register(t *testing.T) {
@@ -40,7 +41,7 @@ func Test_Register(t *testing.T) {
 	ts, reg, err := newMockZkRegistry(&regurl)
 	defer ts.Stop()
 	err = reg.Register(url)
-	children, _ := reg.client.getChildren("/dubbo/com.ikurento.user.UserProvider/providers")
+	children, _ := reg.client.GetChildren("/dubbo/com.ikurento.user.UserProvider/providers")
 	assert.Regexp(t, ".*dubbo%3A%2F%2F127.0.0.1%3A20000%2Fcom.ikurento.user.UserProvider%3Fanyhost%3Dtrue%26category%3Dproviders%26cluster%3Dmock%26dubbo%3Ddubbo-provider-golang-2.6.0%26.*provider", children)
 	assert.NoError(t, err)
 }
@@ -49,7 +50,6 @@ func Test_Subscribe(t *testing.T) {
 	regurl, _ := common.NewURL(context.TODO(), "registry://127.0.0.1:1111", common.WithParamsValue(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER)))
 	url, _ := common.NewURL(context.TODO(), "dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider", common.WithParamsValue(constant.CLUSTER_KEY, "mock"), common.WithMethods([]string{"GetUser", "AddUser"}))
 	ts, reg, err := newMockZkRegistry(&regurl)
-	defer ts.Stop()
 
 	//provider register
 	err = reg.Register(url)
@@ -61,8 +61,8 @@ func Test_Subscribe(t *testing.T) {
 
 	//consumer register
 	regurl.Params.Set(constant.ROLE_KEY, strconv.Itoa(common.CONSUMER))
-	_, reg2, err := newMockZkRegistry(&regurl)
-	reg2.client = reg.client
+	_, reg2, err := newMockZkRegistry(&regurl, zookeeper.WithTestCluster(ts))
+
 	err = reg2.Register(url)
 	listener, err := reg2.Subscribe(url)
 
@@ -71,8 +71,8 @@ func Test_Subscribe(t *testing.T) {
 	if err != nil {
 		return
 	}
-	assert.Regexp(t, ".*ServiceEvent{Action{add service}.*", serviceEvent.String())
-
+	assert.Regexp(t, ".*ServiceEvent{Action{add}.*", serviceEvent.String())
+	defer ts.Stop()
 }
 
 func Test_ConsumerDestory(t *testing.T) {
diff --git a/remoting/listener.go b/remoting/listener.go
new file mode 100644
index 0000000000000000000000000000000000000000..37f75d46522c8b2405029725ba2178167ee19668
--- /dev/null
+++ b/remoting/listener.go
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package remoting
+
+import "fmt"
+
+type ConfigurationListener interface {
+	Process(*ConfigChangeEvent)
+}
+
+type DataListener interface {
+	DataChange(eventType Event) bool //bool is return for interface implement is interesting
+}
+
+type ConfigChangeEvent struct {
+	Key        string
+	Value      interface{}
+	ConfigType EventType
+}
+
+func (c ConfigChangeEvent) String() string {
+	return fmt.Sprintf("ConfigChangeEvent{key = %v , value = %v , changeType = %v}", c.Key, c.Value, c.ConfigType)
+}
+
+//////////////////////////////////////////
+// event type
+//////////////////////////////////////////
+
+type EventType int
+
+const (
+	Add = iota
+	Del
+)
+
+var serviceEventTypeStrings = [...]string{
+	"add",
+	"delete",
+}
+
+func (t EventType) String() string {
+	return serviceEventTypeStrings[t]
+}
+
+//////////////////////////////////////////
+// service event
+//////////////////////////////////////////
+
+type Event struct {
+	Path    string
+	Action  EventType
+	Content string
+}
+
+func (e Event) String() string {
+	return fmt.Sprintf("Event{Action{%s}, Content{%s}}", e.Action, e.Content)
+}
diff --git a/registry/zookeeper/zk_client.go b/remoting/zookeeper/client.go
similarity index 66%
rename from registry/zookeeper/zk_client.go
rename to remoting/zookeeper/client.go
index bd273ff409f0bda2f0bf7e5f5cbdeaaad83fcce3..93382ed265ef2189cd0ad41bbe72adb8de606844 100644
--- a/registry/zookeeper/zk_client.go
+++ b/remoting/zookeeper/client.go
@@ -30,25 +30,31 @@ import (
 )
 
 import (
+	"github.com/apache/dubbo-go/common/constant"
 	"github.com/apache/dubbo-go/common/logger"
 )
 
+const (
+	ConnDelay    = 3
+	MaxFailTimes = 15
+)
+
 var (
 	errNilZkClientConn = perrors.New("zookeeperclient{conn} is nil")
 )
 
-type zookeeperClient struct {
+type ZookeeperClient struct {
 	name          string
-	zkAddrs       []string
+	ZkAddrs       []string
 	sync.Mutex    // for conn
-	conn          *zk.Conn
-	timeout       time.Duration
+	Conn          *zk.Conn
+	Timeout       time.Duration
 	exit          chan struct{}
-	wait          sync.WaitGroup
+	Wait          sync.WaitGroup
 	eventRegistry map[string][]*chan struct{}
 }
 
-func stateToString(state zk.State) string {
+func StateToString(state zk.State) string {
 	switch state {
 	case zk.StateDisconnected:
 		return "zookeeper disconnected"
@@ -79,55 +85,128 @@ func stateToString(state zk.State) string {
 	return "zookeeper unknown state"
 }
 
-func timeSecondDuration(sec int) time.Duration {
-	return time.Duration(sec) * time.Second
+type Options struct {
+	zkName string
+	client *ZookeeperClient
+
+	ts *zk.TestCluster
+}
+
+type Option func(*Options)
+
+func WithZkName(name string) Option {
+	return func(opt *Options) {
+		opt.zkName = name
+	}
 }
 
-func newZookeeperClient(name string, zkAddrs []string, timeout time.Duration) (*zookeeperClient, error) {
+func ValidateZookeeperClient(container ZkClientContainer, opts ...Option) error {
+	var (
+		err error
+	)
+	opions := &Options{}
+	for _, opt := range opts {
+		opt(opions)
+	}
+
+	err = nil
+
+	lock := container.ZkClientLock()
+	url := container.GetUrl()
+
+	lock.Lock()
+	defer lock.Unlock()
+
+	if container.ZkClient() == nil {
+		//in dubbp ,every registry only connect one node ,so this is []string{r.Address}
+		timeout, err := time.ParseDuration(url.GetParam(constant.REGISTRY_TIMEOUT_KEY, constant.DEFAULT_REG_TIMEOUT))
+		if err != nil {
+			logger.Errorf("timeout config %v is invalid ,err is %v",
+				url.GetParam(constant.REGISTRY_TIMEOUT_KEY, constant.DEFAULT_REG_TIMEOUT), err.Error())
+			return perrors.WithMessagef(err, "newZookeeperClient(address:%+v)", url.Location)
+		}
+		newClient, err := newZookeeperClient(opions.zkName, []string{url.Location}, timeout)
+		if err != nil {
+			logger.Warnf("newZookeeperClient(name{%s}, zk addresss{%v}, timeout{%d}) = error{%v}",
+				opions.zkName, url.Location, timeout.String(), err)
+			return perrors.WithMessagef(err, "newZookeeperClient(address:%+v)", url.Location)
+		}
+		container.SetZkClient(newClient)
+	}
+
+	if container.ZkClient().Conn == nil {
+		var event <-chan zk.Event
+		container.ZkClient().Conn, event, err = zk.Connect(container.ZkClient().ZkAddrs, container.ZkClient().Timeout)
+		if err == nil {
+			container.ZkClient().Wait.Add(1)
+			go container.ZkClient().HandleZkEvent(event)
+		}
+	}
+
+	return perrors.WithMessagef(err, "newZookeeperClient(address:%+v)", url.PrimitiveURL)
+}
+
+func newZookeeperClient(name string, zkAddrs []string, timeout time.Duration) (*ZookeeperClient, error) {
 	var (
 		err   error
 		event <-chan zk.Event
-		z     *zookeeperClient
+		z     *ZookeeperClient
 	)
 
-	z = &zookeeperClient{
+	z = &ZookeeperClient{
 		name:          name,
-		zkAddrs:       zkAddrs,
-		timeout:       timeout,
+		ZkAddrs:       zkAddrs,
+		Timeout:       timeout,
 		exit:          make(chan struct{}),
 		eventRegistry: make(map[string][]*chan struct{}),
 	}
 	// connect to zookeeper
-	z.conn, event, err = zk.Connect(zkAddrs, timeout)
+	z.Conn, event, err = zk.Connect(zkAddrs, timeout)
 	if err != nil {
 		return nil, perrors.WithMessagef(err, "zk.Connect(zkAddrs:%+v)", zkAddrs)
 	}
 
-	z.wait.Add(1)
-	go z.handleZkEvent(event)
+	z.Wait.Add(1)
+	go z.HandleZkEvent(event)
 
 	return z, nil
 }
 
-func newMockZookeeperClient(name string, timeout time.Duration) (*zk.TestCluster, *zookeeperClient, <-chan zk.Event, error) {
+func WithTestCluster(ts *zk.TestCluster) Option {
+	return func(opt *Options) {
+		opt.ts = ts
+	}
+}
+
+func NewMockZookeeperClient(name string, timeout time.Duration, opts ...Option) (*zk.TestCluster, *ZookeeperClient, <-chan zk.Event, error) {
 	var (
 		err   error
 		event <-chan zk.Event
-		z     *zookeeperClient
+		z     *ZookeeperClient
+		ts    *zk.TestCluster
 	)
 
-	z = &zookeeperClient{
+	z = &ZookeeperClient{
 		name:          name,
-		zkAddrs:       []string{},
-		timeout:       timeout,
+		ZkAddrs:       []string{},
+		Timeout:       timeout,
 		exit:          make(chan struct{}),
 		eventRegistry: make(map[string][]*chan struct{}),
 	}
-	// connect to zookeeper
 
-	ts, err := zk.StartTestCluster(1, nil, nil)
-	if err != nil {
-		return nil, nil, nil, perrors.WithMessagef(err, "zk.Connect")
+	opions := &Options{}
+	for _, opt := range opts {
+		opt(opions)
+	}
+
+	// connect to zookeeper
+	if opions.ts != nil {
+		ts = opions.ts
+	} else {
+		ts, err = zk.StartTestCluster(1, nil, nil)
+		if err != nil {
+			return nil, nil, nil, perrors.WithMessagef(err, "zk.Connect")
+		}
 	}
 
 	//callbackChan := make(chan zk.Event)
@@ -135,7 +214,7 @@ func newMockZookeeperClient(name string, timeout time.Duration) (*zk.TestCluster
 	//	callbackChan <- event
 	//}
 
-	z.conn, event, err = ts.ConnectWithOptions(timeout)
+	z.Conn, event, err = ts.ConnectWithOptions(timeout)
 	if err != nil {
 		return nil, nil, nil, perrors.WithMessagef(err, "zk.Connect")
 	}
@@ -144,15 +223,15 @@ func newMockZookeeperClient(name string, timeout time.Duration) (*zk.TestCluster
 	return ts, z, event, nil
 }
 
-func (z *zookeeperClient) handleZkEvent(session <-chan zk.Event) {
+func (z *ZookeeperClient) HandleZkEvent(session <-chan zk.Event) {
 	var (
 		state int
 		event zk.Event
 	)
 
 	defer func() {
-		z.wait.Done()
-		logger.Infof("zk{path:%v, name:%s} connection goroutine game over.", z.zkAddrs, z.name)
+		z.Wait.Done()
+		logger.Infof("zk{path:%v, name:%s} connection goroutine game over.", z.ZkAddrs, z.name)
 	}()
 
 LOOP:
@@ -162,15 +241,15 @@ LOOP:
 			break LOOP
 		case event = <-session:
 			logger.Warnf("client{%s} get a zookeeper event{type:%s, server:%s, path:%s, state:%d-%s, err:%v}",
-				z.name, event.Type, event.Server, event.Path, event.State, stateToString(event.State), event.Err)
+				z.name, event.Type, event.Server, event.Path, event.State, StateToString(event.State), event.Err)
 			switch (int)(event.State) {
 			case (int)(zk.StateDisconnected):
-				logger.Warnf("zk{addr:%s} state is StateDisconnected, so close the zk client{name:%s}.", z.zkAddrs, z.name)
+				logger.Warnf("zk{addr:%s} state is StateDisconnected, so close the zk client{name:%s}.", z.ZkAddrs, z.name)
 				z.stop()
 				z.Lock()
-				if z.conn != nil {
-					z.conn.Close()
-					z.conn = nil
+				if z.Conn != nil {
+					z.Conn.Close()
+					z.Conn = nil
 				}
 				z.Unlock()
 				break LOOP
@@ -202,7 +281,7 @@ LOOP:
 	}
 }
 
-func (z *zookeeperClient) registerEvent(zkPath string, event *chan struct{}) {
+func (z *ZookeeperClient) RegisterEvent(zkPath string, event *chan struct{}) {
 	if zkPath == "" || event == nil {
 		return
 	}
@@ -215,7 +294,7 @@ func (z *zookeeperClient) registerEvent(zkPath string, event *chan struct{}) {
 	z.Unlock()
 }
 
-func (z *zookeeperClient) unregisterEvent(zkPath string, event *chan struct{}) {
+func (z *ZookeeperClient) UnregisterEvent(zkPath string, event *chan struct{}) {
 	if zkPath == "" {
 		return
 	}
@@ -244,11 +323,11 @@ func (z *zookeeperClient) unregisterEvent(zkPath string, event *chan struct{}) {
 	z.Unlock()
 }
 
-func (z *zookeeperClient) done() <-chan struct{} {
+func (z *ZookeeperClient) Done() <-chan struct{} {
 	return z.exit
 }
 
-func (z *zookeeperClient) stop() bool {
+func (z *ZookeeperClient) stop() bool {
 	select {
 	case <-z.exit:
 		return true
@@ -259,7 +338,7 @@ func (z *zookeeperClient) stop() bool {
 	return false
 }
 
-func (z *zookeeperClient) zkConnValid() bool {
+func (z *ZookeeperClient) ZkConnValid() bool {
 	select {
 	case <-z.exit:
 		return false
@@ -268,7 +347,7 @@ func (z *zookeeperClient) zkConnValid() bool {
 
 	valid := true
 	z.Lock()
-	if z.conn == nil {
+	if z.Conn == nil {
 		valid = false
 	}
 	z.Unlock()
@@ -276,23 +355,23 @@ func (z *zookeeperClient) zkConnValid() bool {
 	return valid
 }
 
-func (z *zookeeperClient) Close() {
+func (z *ZookeeperClient) Close() {
 	if z == nil {
 		return
 	}
 
 	z.stop()
-	z.wait.Wait()
+	z.Wait.Wait()
 	z.Lock()
-	if z.conn != nil {
-		z.conn.Close()
-		z.conn = nil
+	if z.Conn != nil {
+		z.Conn.Close()
+		z.Conn = nil
 	}
 	z.Unlock()
-	logger.Warnf("zkClient{name:%s, zk addr:%s} exit now.", z.name, z.zkAddrs)
+	logger.Warnf("zkClient{name:%s, zk addr:%s} exit now.", z.name, z.ZkAddrs)
 }
 
-func (z *zookeeperClient) Create(basePath string) error {
+func (z *ZookeeperClient) Create(basePath string) error {
 	var (
 		err     error
 		tmpPath string
@@ -303,8 +382,8 @@ func (z *zookeeperClient) Create(basePath string) error {
 		tmpPath = path.Join(tmpPath, "/", str)
 		err = errNilZkClientConn
 		z.Lock()
-		if z.conn != nil {
-			_, err = z.conn.Create(tmpPath, []byte(""), 0, zk.WorldACL(zk.PermAll))
+		if z.Conn != nil {
+			_, err = z.Conn.Create(tmpPath, []byte(""), 0, zk.WorldACL(zk.PermAll))
 		}
 		z.Unlock()
 		if err != nil {
@@ -320,22 +399,22 @@ func (z *zookeeperClient) Create(basePath string) error {
 	return nil
 }
 
-func (z *zookeeperClient) Delete(basePath string) error {
+func (z *ZookeeperClient) Delete(basePath string) error {
 	var (
 		err error
 	)
 
 	err = errNilZkClientConn
 	z.Lock()
-	if z.conn != nil {
-		err = z.conn.Delete(basePath, -1)
+	if z.Conn != nil {
+		err = z.Conn.Delete(basePath, -1)
 	}
 	z.Unlock()
 
 	return perrors.WithMessagef(err, "Delete(basePath:%s)", basePath)
 }
 
-func (z *zookeeperClient) RegisterTemp(basePath string, node string) (string, error) {
+func (z *ZookeeperClient) RegisterTemp(basePath string, node string) (string, error) {
 	var (
 		err     error
 		data    []byte
@@ -347,8 +426,8 @@ func (z *zookeeperClient) RegisterTemp(basePath string, node string) (string, er
 	data = []byte("")
 	zkPath = path.Join(basePath) + "/" + node
 	z.Lock()
-	if z.conn != nil {
-		tmpPath, err = z.conn.Create(zkPath, data, zk.FlagEphemeral, zk.WorldACL(zk.PermAll))
+	if z.Conn != nil {
+		tmpPath, err = z.Conn.Create(zkPath, data, zk.FlagEphemeral, zk.WorldACL(zk.PermAll))
 	}
 	z.Unlock()
 	//if err != nil && err != zk.ErrNodeExists {
@@ -361,7 +440,7 @@ func (z *zookeeperClient) RegisterTemp(basePath string, node string) (string, er
 	return tmpPath, nil
 }
 
-func (z *zookeeperClient) RegisterTempSeq(basePath string, data []byte) (string, error) {
+func (z *ZookeeperClient) RegisterTempSeq(basePath string, data []byte) (string, error) {
 	var (
 		err     error
 		tmpPath string
@@ -369,8 +448,8 @@ func (z *zookeeperClient) RegisterTempSeq(basePath string, data []byte) (string,
 
 	err = errNilZkClientConn
 	z.Lock()
-	if z.conn != nil {
-		tmpPath, err = z.conn.Create(
+	if z.Conn != nil {
+		tmpPath, err = z.Conn.Create(
 			path.Join(basePath)+"/",
 			data,
 			zk.FlagEphemeral|zk.FlagSequence,
@@ -389,7 +468,7 @@ func (z *zookeeperClient) RegisterTempSeq(basePath string, data []byte) (string,
 	return tmpPath, nil
 }
 
-func (z *zookeeperClient) getChildrenW(path string) ([]string, <-chan zk.Event, error) {
+func (z *ZookeeperClient) GetChildrenW(path string) ([]string, <-chan zk.Event, error) {
 	var (
 		err      error
 		children []string
@@ -399,8 +478,8 @@ func (z *zookeeperClient) getChildrenW(path string) ([]string, <-chan zk.Event,
 
 	err = errNilZkClientConn
 	z.Lock()
-	if z.conn != nil {
-		children, stat, event, err = z.conn.ChildrenW(path)
+	if z.Conn != nil {
+		children, stat, event, err = z.Conn.ChildrenW(path)
 	}
 	z.Unlock()
 	if err != nil {
@@ -420,7 +499,7 @@ func (z *zookeeperClient) getChildrenW(path string) ([]string, <-chan zk.Event,
 	return children, event, nil
 }
 
-func (z *zookeeperClient) getChildren(path string) ([]string, error) {
+func (z *ZookeeperClient) GetChildren(path string) ([]string, error) {
 	var (
 		err      error
 		children []string
@@ -429,8 +508,8 @@ func (z *zookeeperClient) getChildren(path string) ([]string, error) {
 
 	err = errNilZkClientConn
 	z.Lock()
-	if z.conn != nil {
-		children, stat, err = z.conn.Children(path)
+	if z.Conn != nil {
+		children, stat, err = z.Conn.Children(path)
 	}
 	z.Unlock()
 	if err != nil {
@@ -450,7 +529,7 @@ func (z *zookeeperClient) getChildren(path string) ([]string, error) {
 	return children, nil
 }
 
-func (z *zookeeperClient) existW(zkPath string) (<-chan zk.Event, error) {
+func (z *ZookeeperClient) ExistW(zkPath string) (<-chan zk.Event, error) {
 	var (
 		exist bool
 		err   error
@@ -459,8 +538,8 @@ func (z *zookeeperClient) existW(zkPath string) (<-chan zk.Event, error) {
 
 	err = errNilZkClientConn
 	z.Lock()
-	if z.conn != nil {
-		exist, _, event, err = z.conn.ExistsW(zkPath)
+	if z.Conn != nil {
+		exist, _, event, err = z.Conn.ExistsW(zkPath)
 	}
 	z.Unlock()
 	if err != nil {
diff --git a/registry/zookeeper/zk_client_test.go b/remoting/zookeeper/client_test.go
similarity index 92%
rename from registry/zookeeper/zk_client_test.go
rename to remoting/zookeeper/client_test.go
index ff98bb5bc783da0908c5f3cbe28c94385b0bd1db..4a71ebd6107c499bafe7baa7112e31dd53dfdfd4 100644
--- a/registry/zookeeper/zk_client_test.go
+++ b/remoting/zookeeper/client_test.go
@@ -93,7 +93,8 @@ func verifyEventOrder(t *testing.T, c <-chan zk.Event, expectedEvent []zk.EventT
 //}
 
 func Test_newMockZookeeperClient(t *testing.T) {
-	ts, z, event, _ := newMockZookeeperClient("test", 15*time.Second)
+	ts, z, event, err := NewMockZookeeperClient("test", 15*time.Second)
+	assert.NoError(t, err)
 	defer ts.Stop()
 	states := []zk.State{zk.StateConnecting, zk.StateConnected, zk.StateHasSession}
 	verifyEventStateOrder(t, event, states, "event channel")
@@ -103,7 +104,7 @@ func Test_newMockZookeeperClient(t *testing.T) {
 }
 
 func TestCreate(t *testing.T) {
-	ts, z, event, _ := newMockZookeeperClient("test", 15*time.Second)
+	ts, z, event, _ := NewMockZookeeperClient("test", 15*time.Second)
 	defer ts.Stop()
 	err := z.Create("test1/test2/test3/test4")
 	assert.NoError(t, err)
@@ -113,7 +114,7 @@ func TestCreate(t *testing.T) {
 }
 
 func TestCreateDelete(t *testing.T) {
-	ts, z, event, _ := newMockZookeeperClient("test", 15*time.Second)
+	ts, z, event, _ := NewMockZookeeperClient("test", 15*time.Second)
 	defer ts.Stop()
 
 	states := []zk.State{zk.StateConnecting, zk.StateConnected, zk.StateHasSession}
@@ -126,7 +127,7 @@ func TestCreateDelete(t *testing.T) {
 }
 
 func TestRegisterTemp(t *testing.T) {
-	ts, z, event, _ := newMockZookeeperClient("test", 15*time.Second)
+	ts, z, event, _ := NewMockZookeeperClient("test", 15*time.Second)
 	defer ts.Stop()
 	err := z.Create("/test1/test2/test3")
 	assert.NoError(t, err)
@@ -139,7 +140,7 @@ func TestRegisterTemp(t *testing.T) {
 }
 
 func TestRegisterTempSeq(t *testing.T) {
-	ts, z, event, _ := newMockZookeeperClient("test", 15*time.Second)
+	ts, z, event, _ := NewMockZookeeperClient("test", 15*time.Second)
 	defer ts.Stop()
 	err := z.Create("/test1/test2/test3")
 	assert.NoError(t, err)
diff --git a/remoting/zookeeper/container.go b/remoting/zookeeper/container.go
new file mode 100644
index 0000000000000000000000000000000000000000..f869b32444a58c3396ec5520b7b8f859f4accd6f
--- /dev/null
+++ b/remoting/zookeeper/container.go
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package zookeeper
+
+import (
+	"sync"
+	"time"
+)
+import (
+	perrors "github.com/pkg/errors"
+)
+
+import (
+	"github.com/apache/dubbo-go/common"
+	"github.com/apache/dubbo-go/common/logger"
+)
+
+type ZkClientContainer interface {
+	ZkClient() *ZookeeperClient
+	SetZkClient(*ZookeeperClient)
+	ZkClientLock() *sync.Mutex
+	WaitGroup() *sync.WaitGroup //for wait group control, zk client listener & zk client container
+	GetDone() chan struct{}     //for zk client control
+	RestartCallBack() bool
+	common.Node
+}
+
+func HandleClientRestart(r ZkClientContainer) {
+	var (
+		err error
+
+		failTimes int
+	)
+
+	defer r.WaitGroup().Done()
+LOOP:
+	for {
+		select {
+		case <-r.GetDone():
+			logger.Warnf("(ZkProviderRegistry)reconnectZkRegistry goroutine exit now...")
+			break LOOP
+			// re-register all services
+		case <-r.ZkClient().Done():
+			r.ZkClientLock().Lock()
+			r.ZkClient().Close()
+			zkName := r.ZkClient().name
+			zkAddress := r.ZkClient().ZkAddrs
+			r.SetZkClient(nil)
+			r.ZkClientLock().Unlock()
+
+			// 鎺k锛岀洿鑷虫垚鍔�
+			failTimes = 0
+			for {
+				select {
+				case <-r.GetDone():
+					logger.Warnf("(ZkProviderRegistry)reconnectZkRegistry goroutine exit now...")
+					break LOOP
+				case <-time.After(time.Duration(1e9 * failTimes * ConnDelay)): // 闃叉鐤媯閲嶈繛zk
+				}
+				err = ValidateZookeeperClient(r, WithZkName(zkName))
+				logger.Infof("ZkProviderRegistry.validateZookeeperClient(zkAddr{%s}) = error{%#v}",
+					zkAddress, perrors.WithStack(err))
+				if err == nil {
+					if r.RestartCallBack() {
+						break
+					}
+				}
+				failTimes++
+				if MaxFailTimes <= failTimes {
+					failTimes = MaxFailTimes
+				}
+			}
+		}
+	}
+}
diff --git a/remoting/zookeeper/listener.go b/remoting/zookeeper/listener.go
new file mode 100644
index 0000000000000000000000000000000000000000..703d06f84eacef8f830a75846c6130bb36239a1e
--- /dev/null
+++ b/remoting/zookeeper/listener.go
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package zookeeper
+
+import (
+	"path"
+	"sync"
+	"time"
+)
+
+import (
+	perrors "github.com/pkg/errors"
+	"github.com/samuel/go-zookeeper/zk"
+)
+
+import (
+	"github.com/apache/dubbo-go/common"
+	"github.com/apache/dubbo-go/common/logger"
+	"github.com/apache/dubbo-go/remoting"
+)
+
+type ZkEventListener struct {
+	client      *ZookeeperClient
+	pathMapLock sync.Mutex
+	pathMap     map[string]struct{}
+	wg          sync.WaitGroup
+}
+
+func NewZkEventListener(client *ZookeeperClient) *ZkEventListener {
+	return &ZkEventListener{
+		client:  client,
+		pathMap: make(map[string]struct{}),
+	}
+}
+func (l *ZkEventListener) SetClient(client *ZookeeperClient) {
+	l.client = client
+}
+func (l *ZkEventListener) listenServiceNodeEvent(zkPath string) bool {
+	l.wg.Add(1)
+	defer l.wg.Done()
+	var zkEvent zk.Event
+	for {
+		keyEventCh, err := l.client.ExistW(zkPath)
+		if err != nil {
+			logger.Errorf("existW{key:%s} = error{%v}", zkPath, err)
+			return false
+		}
+
+		select {
+		case zkEvent = <-keyEventCh:
+			logger.Warnf("get a zookeeper zkEvent{type:%s, server:%s, path:%s, state:%d-%s, err:%s}",
+				zkEvent.Type.String(), zkEvent.Server, zkEvent.Path, zkEvent.State, StateToString(zkEvent.State), zkEvent.Err)
+			switch zkEvent.Type {
+			case zk.EventNodeDataChanged:
+				logger.Warnf("zk.ExistW(key{%s}) = event{EventNodeDataChanged}", zkPath)
+			case zk.EventNodeCreated:
+				logger.Warnf("zk.ExistW(key{%s}) = event{EventNodeCreated}", zkPath)
+			case zk.EventNotWatching:
+				logger.Warnf("zk.ExistW(key{%s}) = event{EventNotWatching}", zkPath)
+			case zk.EventNodeDeleted:
+				logger.Warnf("zk.ExistW(key{%s}) = event{EventNodeDeleted}", zkPath)
+				return true
+			}
+		case <-l.client.Done():
+			return false
+		}
+	}
+
+	return false
+}
+
+func (l *ZkEventListener) handleZkNodeEvent(zkPath string, children []string, listener remoting.DataListener) {
+	contains := func(s []string, e string) bool {
+		for _, a := range s {
+			if a == e {
+				return true
+			}
+		}
+
+		return false
+	}
+
+	newChildren, err := l.client.GetChildren(zkPath)
+	if err != nil {
+		logger.Errorf("path{%s} child nodes changed, zk.Children() = error{%v}", zkPath, perrors.WithStack(err))
+		return
+	}
+
+	// a node was added -- listen the new node
+	var (
+		newNode string
+	)
+	for _, n := range newChildren {
+		if contains(children, n) {
+			continue
+		}
+
+		newNode = path.Join(zkPath, n)
+		logger.Infof("add zkNode{%s}", newNode)
+		if !listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.Add, Content: n}) {
+			continue
+		}
+		// listen l service node
+		go func(node string) {
+			logger.Infof("delete zkNode{%s}", node)
+			if l.listenServiceNodeEvent(node) {
+				logger.Infof("delete content{%s}", n)
+				listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.Del, Content: n})
+			}
+			logger.Warnf("listenSelf(zk path{%s}) goroutine exit now", zkPath)
+		}(newNode)
+	}
+
+	// old node was deleted
+	var oldNode string
+	for _, n := range children {
+		if contains(newChildren, n) {
+			continue
+		}
+
+		oldNode = path.Join(zkPath, n)
+		logger.Warnf("delete zkPath{%s}", oldNode)
+		if !listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.Add, Content: n}) {
+			continue
+		}
+		logger.Warnf("delete content{%s}", n)
+		if err != nil {
+			logger.Errorf("NewURL(i{%s}) = error{%v}", n, perrors.WithStack(err))
+			continue
+		}
+		listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.Del, Content: n})
+	}
+}
+
+func (l *ZkEventListener) listenDirEvent(zkPath string, listener remoting.DataListener) {
+	l.wg.Add(1)
+	defer l.wg.Done()
+
+	var (
+		failTimes int
+		event     chan struct{}
+		zkEvent   zk.Event
+	)
+	event = make(chan struct{}, 4)
+	defer close(event)
+	for {
+		// get current children for a zkPath
+		children, childEventCh, err := l.client.GetChildrenW(zkPath)
+		if err != nil {
+			failTimes++
+			if MaxFailTimes <= failTimes {
+				failTimes = MaxFailTimes
+			}
+			logger.Errorf("listenDirEvent(path{%s}) = error{%v}", zkPath, err)
+			// clear the event channel
+		CLEAR:
+			for {
+				select {
+				case <-event:
+				default:
+					break CLEAR
+				}
+			}
+			l.client.RegisterEvent(zkPath, &event)
+			select {
+			case <-time.After(timeSecondDuration(failTimes * ConnDelay)):
+				l.client.UnregisterEvent(zkPath, &event)
+				continue
+			case <-l.client.Done():
+				l.client.UnregisterEvent(zkPath, &event)
+				logger.Warnf("client.done(), listen(path{%s}) goroutine exit now...", zkPath)
+				return
+			case <-event:
+				logger.Infof("get zk.EventNodeDataChange notify event")
+				l.client.UnregisterEvent(zkPath, &event)
+				l.handleZkNodeEvent(zkPath, nil, listener)
+				continue
+			}
+		}
+		failTimes = 0
+
+		select {
+		case zkEvent = <-childEventCh:
+			logger.Warnf("get a zookeeper zkEvent{type:%s, server:%s, path:%s, state:%d-%s, err:%s}",
+				zkEvent.Type.String(), zkEvent.Server, zkEvent.Path, zkEvent.State, StateToString(zkEvent.State), zkEvent.Err)
+			if zkEvent.Type != zk.EventNodeChildrenChanged {
+				continue
+			}
+			l.handleZkNodeEvent(zkEvent.Path, children, listener)
+		case <-l.client.Done():
+			logger.Warnf("client.done(), listen(path{%s}) goroutine exit now...", zkPath)
+			return
+		}
+	}
+}
+
+func timeSecondDuration(sec int) time.Duration {
+	return time.Duration(sec) * time.Second
+}
+
+// this func is invoked by ZkConsumerRegistry::Registe/ZkConsumerRegistry::get/ZkConsumerRegistry::getListener
+// registry.go:Listen -> listenServiceEvent -> listenDirEvent -> listenServiceNodeEvent
+//                            |
+//                            --------> listenServiceNodeEvent
+func (l *ZkEventListener) ListenServiceEvent(zkPath string, listener remoting.DataListener) {
+	var (
+		err        error
+		dubboPath  string
+		children   []string
+		serviceURL common.URL
+	)
+
+	l.pathMapLock.Lock()
+	_, ok := l.pathMap[zkPath]
+	l.pathMapLock.Unlock()
+	if ok {
+		logger.Warnf("@zkPath %s has already been listened.", zkPath)
+		return
+	}
+
+	l.pathMapLock.Lock()
+	l.pathMap[zkPath] = struct{}{}
+	l.pathMapLock.Unlock()
+
+	logger.Infof("listen dubbo provider path{%s} event and wait to get all provider zk nodes", zkPath)
+	children, err = l.client.GetChildren(zkPath)
+	if err != nil {
+		children = nil
+		logger.Errorf("fail to get children of zk path{%s}", zkPath)
+	}
+
+	for _, c := range children {
+		if !listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.Add, Content: c}) {
+			continue
+		}
+
+		// listen l service node
+		dubboPath = path.Join(zkPath, c)
+		logger.Infof("listen dubbo service key{%s}", dubboPath)
+		go func(zkPath string, serviceURL common.URL) {
+			if l.listenServiceNodeEvent(dubboPath) {
+				logger.Debugf("delete serviceUrl{%s}", serviceURL)
+				listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.Del, Content: c})
+			}
+			logger.Warnf("listenSelf(zk path{%s}) goroutine exit now", zkPath)
+		}(dubboPath, serviceURL)
+	}
+
+	logger.Infof("listen dubbo path{%s}", zkPath)
+	go func(zkPath string, listener remoting.DataListener) {
+		l.listenDirEvent(zkPath, listener)
+		logger.Warnf("listenDirEvent(zkPath{%s}) goroutine exit now", zkPath)
+	}(zkPath, listener)
+}
+
+func (l *ZkEventListener) valid() bool {
+	return l.client.ZkConnValid()
+}
+
+func (l *ZkEventListener) Close() {
+	l.wg.Wait()
+}
diff --git a/remoting/zookeeper/zookeeper-4unitest/contrib/fatjar/zookeeper-3.4.9-fatjar.jar b/remoting/zookeeper/zookeeper-4unitest/contrib/fatjar/zookeeper-3.4.9-fatjar.jar
new file mode 100644
index 0000000000000000000000000000000000000000..839531b8b8762a9c19e334a5cbf79314cb16f945
Binary files /dev/null and b/remoting/zookeeper/zookeeper-4unitest/contrib/fatjar/zookeeper-3.4.9-fatjar.jar differ