diff --git a/remoting/kubernetes/client.go b/remoting/kubernetes/client.go
index 90e74aa8f2fd8911d5eca052ba2abf9204caf1aa..34601e56014a840da9edbc7482db27e73f017dcf 100644
--- a/remoting/kubernetes/client.go
+++ b/remoting/kubernetes/client.go
@@ -605,26 +605,26 @@ func (c *Client) GetChildren(k string) ([]string, []string, error) {
 
 // Watch
 // watch on spec key
-func (c *Client) Watch(k string) (<-chan *Object, error) {
+func (c *Client) Watch(k string) (<-chan *Object, <-chan struct{}, error) {
 
 	w, err := c.store.Watch(k, false)
 	if err != nil {
-		return nil, perrors.WithMessagef(err, "watch on (%s)", k)
+		return nil, nil, perrors.WithMessagef(err, "watch on (%s)", k)
 	}
 
-	return w.ResultChan(), nil
+	return w.ResultChan(), w.done(), nil
 }
 
 // Watch
 // watch on spec prefix
-func (c *Client) WatchWithPrefix(prefix string) (<-chan *Object, error) {
+func (c *Client) WatchWithPrefix(prefix string) (<-chan *Object, <-chan struct{}, error) {
 
 	w, err := c.store.Watch(prefix, true)
 	if err != nil {
-		return nil, perrors.WithMessagef(err, "watch on prefix (%s)", prefix)
+		return nil, nil, perrors.WithMessagef(err, "watch on prefix (%s)", prefix)
 	}
 
-	return w.ResultChan(), nil
+	return w.ResultChan(), w.done(), nil
 }
 
 // Valid
diff --git a/remoting/kubernetes/client_test.go b/remoting/kubernetes/client_test.go
index aedd7ced83f47bf80efa09d194e97f16840ebbba..49fcad608afa0869f51e084abaad452a40f94d1f 100644
--- a/remoting/kubernetes/client_test.go
+++ b/remoting/kubernetes/client_test.go
@@ -19,14 +19,16 @@ package kubernetes
 
 import (
 	"encoding/json"
+	"net/http"
 	"os"
 	"strings"
 	"sync"
 	"testing"
 	"time"
+)
 
+import (
 	"github.com/stretchr/testify/suite"
-
 	v1 "k8s.io/api/core/v1"
 	"k8s.io/client-go/kubernetes"
 	"k8s.io/client-go/kubernetes/fake"
@@ -234,6 +236,8 @@ func (s *KubernetesClientTestSuite) SetupSuite() {
 	if err := os.Setenv(nameSpaceKey, s.currentPod.GetNamespace()); err != nil {
 		t.Fatal(err)
 	}
+
+	go http.ListenAndServe(":6061", nil)
 }
 
 func (s *KubernetesClientTestSuite) TestReadCurrentPodName() {
@@ -329,16 +333,23 @@ func (s *KubernetesClientTestSuite) TestClientGetChildrenKVList() {
 	go func() {
 		defer wg.Done()
 
-		wc, err := client.WatchWithPrefix(prefix)
+		wc, done, err := client.WatchWithPrefix(prefix)
 		if err != nil {
 			t.Fatal(err)
 		}
 		i := 0
-		for e := range wc {
-			i++
-			t.Logf("got event %v k %s v %s", e.EventType, e.Key, e.Value)
-			if i == 3 {
-				// already sync all event
+
+		for {
+			select {
+			case e := <-wc:
+				i++
+				t.Logf("got event %v k %s v %s", e.EventType, e.Key, e.Value)
+				if i == 3 {
+					// already sync all event
+					return
+				}
+			case <-done:
+				t.Log("the store watcher was stopped")
 				return
 			}
 		}
@@ -395,17 +406,22 @@ func (s *KubernetesClientTestSuite) TestClientWatchPrefix() {
 
 	go func() {
 
-		wc, err := client.WatchWithPrefix(prefix)
+		wc, done, err := client.WatchWithPrefix(prefix)
 		if err != nil {
 			t.Fatal(err)
 		}
 
 		wg.Done()
 
-		for e := range wc {
-			t.Logf("got event %v k %s v %s", e.EventType, e.Key, e.Value)
+		for {
+			select {
+			case e := <-wc:
+				t.Logf("got event %v k %s v %s", e.EventType, e.Key, e.Value)
+			case <-done:
+				t.Log("the store watcher was stopped")
+				return
+			}
 		}
-
 	}()
 
 	// must wait the watch goroutine work
@@ -446,14 +462,20 @@ func (s *KubernetesClientTestSuite) TestClientWatch() {
 
 	go func() {
 
-		wc, err := client.Watch(prefix)
+		wc, done, err := client.Watch(prefix)
 		if err != nil {
 			t.Fatal(err)
 		}
 		wg.Done()
 
-		for e := range wc {
-			t.Logf("got event %v k %s v %s", e.EventType, e.Key, e.Value)
+		for {
+			select {
+			case e := <-wc:
+				t.Logf("got event %v k %s v %s", e.EventType, e.Key, e.Value)
+			case <-done:
+				t.Log("the store watcher was stopped")
+				return
+			}
 		}
 
 	}()
diff --git a/remoting/kubernetes/listener.go b/remoting/kubernetes/listener.go
index da6d8351590c29ab6ce7f53cee3263d235620c6f..0425aaf92d13c95e04204df2a4490fc9fc1ebc04 100644
--- a/remoting/kubernetes/listener.go
+++ b/remoting/kubernetes/listener.go
@@ -52,7 +52,7 @@ func (l *EventListener) ListenServiceNodeEvent(key string, listener ...remoting.
 	l.wg.Add(1)
 	defer l.wg.Done()
 	for {
-		wc, err := l.client.Watch(key)
+		wc, done, err := l.client.Watch(key)
 		if err != nil {
 			logger.Warnf("watch exist{key:%s} = error{%v}", key, err)
 			return false
@@ -65,7 +65,12 @@ func (l *EventListener) ListenServiceNodeEvent(key string, listener ...remoting.
 			logger.Warnf("kubernetes client stopped")
 			return false
 
-			// handle kubernetes-store events
+		// store watcher stopped
+		case <-done:
+			logger.Warnf("kubernetes store watcher stopped")
+			return false
+
+		// handle kubernetes-store events
 		case e, ok := <-wc:
 			if !ok {
 				logger.Warnf("kubernetes-store watch-chan closed")
@@ -123,7 +128,7 @@ func (l *EventListener) ListenServiceNodeEventWithPrefix(prefix string, listener
 	l.wg.Add(1)
 	defer l.wg.Done()
 	for {
-		wc, err := l.client.WatchWithPrefix(prefix)
+		wc, done, err := l.client.WatchWithPrefix(prefix)
 		if err != nil {
 			logger.Warnf("listenDirEvent(key{%s}) = error{%v}", prefix, err)
 		}
@@ -134,6 +139,11 @@ func (l *EventListener) ListenServiceNodeEventWithPrefix(prefix string, listener
 			logger.Warnf("kubernetes client stopped")
 			return
 
+		// watcher stopped
+		case <-done:
+			logger.Warnf("kubernetes store watcher stopped")
+			return
+
 			// kuberentes-store event stream
 		case e, ok := <-wc:
 
diff --git a/remoting/kubernetes/store.go b/remoting/kubernetes/store.go
index 51365c14c7c4709515f88bd5995c777dbaf25dd9..a2cc99419825965e812eabd07bc14559f2d59ab4 100644
--- a/remoting/kubernetes/store.go
+++ b/remoting/kubernetes/store.go
@@ -110,9 +110,9 @@ type storeImpl struct {
 	watchers         map[uint64]*watcher
 }
 
-// on stop
+// wait exit
 // when the store was closed
-func (s *storeImpl) onStop() {
+func (s *storeImpl) waitExit() {
 
 	select {
 	case <-s.ctx.Done():
@@ -124,7 +124,7 @@ func (s *storeImpl) onStop() {
 
 		for _, w := range watchers {
 			// stop data stream
-			close(w.ch)
+			// close(w.ch)
 			// stop watcher
 			w.stop()
 		}
@@ -149,8 +149,6 @@ func (s *storeImpl) Put(object *Object) error {
 
 	sendMsg := func(object *Object, w *watcher) {
 
-		s.lock.Lock()
-		defer s.lock.Unlock()
 		select {
 		case <-w.done():
 			// the watcher already stop
@@ -329,6 +327,6 @@ func newStore(ctx context.Context) Store {
 		cache:    map[string]*Object{},
 		watchers: map[uint64]*watcher{},
 	}
-	go s.onStop()
+	go s.waitExit()
 	return s
 }
diff --git a/remoting/kubernetes/store_test.go b/remoting/kubernetes/store_test.go
index 11036adb409cd456f89732978f939e715950a193..fc6df0602cee194a40e2287289e2fec42a54b79a 100644
--- a/remoting/kubernetes/store_test.go
+++ b/remoting/kubernetes/store_test.go
@@ -44,8 +44,15 @@ func TestStore(t *testing.T) {
 			if err != nil {
 				t.Fatal(err)
 			}
-			for e := range w.ResultChan() {
-				t.Logf("consumer %s got %s\n", w.ID(), e.Key)
+			for {
+				select {
+				case e := <-w.ResultChan():
+					t.Logf("consumer %s got %s\n", w.ID(), e.Key)
+
+				case <-w.done():
+					t.Logf("consumer %s stopped", w.ID())
+					return
+				}
 			}
 		}()
 	}
@@ -59,8 +66,16 @@ func TestStore(t *testing.T) {
 			if err != nil {
 				t.Fatal(err)
 			}
-			for e := range w.ResultChan() {
-				t.Logf("prefix consumer %s got %s\n", w.ID(), e.Key)
+
+			for {
+				select {
+				case e := <-w.ResultChan():
+					t.Logf("prefix consumer %s got %s\n", w.ID(), e.Key)
+
+				case <-w.done():
+					t.Logf("prefix consumer %s stopped", w.ID())
+					return
+				}
 			}
 		}()
 	}