diff --git a/registry/kubernetes/listener_test.go b/registry/kubernetes/listener_test.go index c9ff6260880546e9a5f3297f7cf63b039521e7ec..f89154e32e4ef3b6423d6970f9bfc0dd0207d7dd 100644 --- a/registry/kubernetes/listener_test.go +++ b/registry/kubernetes/listener_test.go @@ -237,6 +237,7 @@ func (s *KubernetesRegistryTestSuite) SetupSuite() { if err := os.Setenv(nameSpaceKey, s.currentPod.GetNamespace()); err != nil { t.Fatal(err) } + } func (s *KubernetesRegistryTestSuite) TestDataChange() { diff --git a/remoting/kubernetes/listener.go b/remoting/kubernetes/listener.go index c8cc10f36af9a164a259d220dc09a0bd3dea213f..c87559e7f4b5496dc9cced711b8aabbb4b630a51 100644 --- a/remoting/kubernetes/listener.go +++ b/remoting/kubernetes/listener.go @@ -33,7 +33,7 @@ import ( type EventListener struct { client *Client - keyMapLock sync.Mutex + keyMapLock sync.RWMutex keyMap map[string]struct{} wg sync.WaitGroup } @@ -157,9 +157,9 @@ func timeSecondDuration(sec int) time.Duration { // --------> ListenServiceNodeEvent func (l *EventListener) ListenServiceEvent(key string, listener remoting.DataListener) { - l.keyMapLock.Lock() + l.keyMapLock.RLock() _, ok := l.keyMap[key] - l.keyMapLock.Unlock() + l.keyMapLock.RUnlock() if ok { logger.Warnf("kubernetes-store key %s has already been listened.", key) return diff --git a/remoting/kubernetes/store_test.go b/remoting/kubernetes/store_test.go index 2bc8b277a76d940908e30c224fb9d4c608828fae..11036adb409cd456f89732978f939e715950a193 100644 --- a/remoting/kubernetes/store_test.go +++ b/remoting/kubernetes/store_test.go @@ -19,7 +19,6 @@ package kubernetes import ( "context" - "fmt" "strconv" "sync" "testing" @@ -43,11 +42,10 @@ func TestStore(t *testing.T) { defer wg.Done() w, err := s.Watch("key-1", false) if err != nil { - fmt.Println("watch spec result", err) - return + t.Fatal(err) } for e := range w.ResultChan() { - fmt.Printf("consumer %s got %s\n", w.ID(), e.Key) + t.Logf("consumer %s got %s\n", w.ID(), e.Key) } }() } @@ -59,11 +57,10 @@ func TestStore(t *testing.T) { defer wg.Done() w, err := s.Watch("key", true) if err != nil { - fmt.Println("watch prefix result", err) - return + t.Fatal(err) } for e := range w.ResultChan() { - fmt.Printf("prefix consumer %s got %s\n", w.ID(), e.Key) + t.Logf("prefix consumer %s got %s\n", w.ID(), e.Key) } }() }