From 3a3e8a7db4f88e5cffac4a813ff2389a8c16c81a Mon Sep 17 00:00:00 2001 From: scott <scottwangsxll@gmail.com> Date: Sun, 15 Mar 2020 15:56:26 +0800 Subject: [PATCH] Fix remote/kubernetes sendMsg locker --- remoting/kubernetes/client.go | 12 +++---- remoting/kubernetes/client_test.go | 50 +++++++++++++++++++++--------- remoting/kubernetes/listener.go | 16 ++++++++-- remoting/kubernetes/store.go | 10 +++--- remoting/kubernetes/store_test.go | 23 +++++++++++--- 5 files changed, 78 insertions(+), 33 deletions(-) diff --git a/remoting/kubernetes/client.go b/remoting/kubernetes/client.go index 90e74aa8f..34601e560 100644 --- a/remoting/kubernetes/client.go +++ b/remoting/kubernetes/client.go @@ -605,26 +605,26 @@ func (c *Client) GetChildren(k string) ([]string, []string, error) { // Watch // watch on spec key -func (c *Client) Watch(k string) (<-chan *Object, error) { +func (c *Client) Watch(k string) (<-chan *Object, <-chan struct{}, error) { w, err := c.store.Watch(k, false) if err != nil { - return nil, perrors.WithMessagef(err, "watch on (%s)", k) + return nil, nil, perrors.WithMessagef(err, "watch on (%s)", k) } - return w.ResultChan(), nil + return w.ResultChan(), w.done(), nil } // Watch // watch on spec prefix -func (c *Client) WatchWithPrefix(prefix string) (<-chan *Object, error) { +func (c *Client) WatchWithPrefix(prefix string) (<-chan *Object, <-chan struct{}, error) { w, err := c.store.Watch(prefix, true) if err != nil { - return nil, perrors.WithMessagef(err, "watch on prefix (%s)", prefix) + return nil, nil, perrors.WithMessagef(err, "watch on prefix (%s)", prefix) } - return w.ResultChan(), nil + return w.ResultChan(), w.done(), nil } // Valid diff --git a/remoting/kubernetes/client_test.go b/remoting/kubernetes/client_test.go index aedd7ced8..49fcad608 100644 --- a/remoting/kubernetes/client_test.go +++ b/remoting/kubernetes/client_test.go @@ -19,14 +19,16 @@ package kubernetes import ( "encoding/json" + "net/http" "os" "strings" "sync" "testing" "time" +) +import ( "github.com/stretchr/testify/suite" - v1 "k8s.io/api/core/v1" "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/fake" @@ -234,6 +236,8 @@ func (s *KubernetesClientTestSuite) SetupSuite() { if err := os.Setenv(nameSpaceKey, s.currentPod.GetNamespace()); err != nil { t.Fatal(err) } + + go http.ListenAndServe(":6061", nil) } func (s *KubernetesClientTestSuite) TestReadCurrentPodName() { @@ -329,16 +333,23 @@ func (s *KubernetesClientTestSuite) TestClientGetChildrenKVList() { go func() { defer wg.Done() - wc, err := client.WatchWithPrefix(prefix) + wc, done, err := client.WatchWithPrefix(prefix) if err != nil { t.Fatal(err) } i := 0 - for e := range wc { - i++ - t.Logf("got event %v k %s v %s", e.EventType, e.Key, e.Value) - if i == 3 { - // already sync all event + + for { + select { + case e := <-wc: + i++ + t.Logf("got event %v k %s v %s", e.EventType, e.Key, e.Value) + if i == 3 { + // already sync all event + return + } + case <-done: + t.Log("the store watcher was stopped") return } } @@ -395,17 +406,22 @@ func (s *KubernetesClientTestSuite) TestClientWatchPrefix() { go func() { - wc, err := client.WatchWithPrefix(prefix) + wc, done, err := client.WatchWithPrefix(prefix) if err != nil { t.Fatal(err) } wg.Done() - for e := range wc { - t.Logf("got event %v k %s v %s", e.EventType, e.Key, e.Value) + for { + select { + case e := <-wc: + t.Logf("got event %v k %s v %s", e.EventType, e.Key, e.Value) + case <-done: + t.Log("the store watcher was stopped") + return + } } - }() // must wait the watch goroutine work @@ -446,14 +462,20 @@ func (s *KubernetesClientTestSuite) TestClientWatch() { go func() { - wc, err := client.Watch(prefix) + wc, done, err := client.Watch(prefix) if err != nil { t.Fatal(err) } wg.Done() - for e := range wc { - t.Logf("got event %v k %s v %s", e.EventType, e.Key, e.Value) + for { + select { + case e := <-wc: + t.Logf("got event %v k %s v %s", e.EventType, e.Key, e.Value) + case <-done: + t.Log("the store watcher was stopped") + return + } } }() diff --git a/remoting/kubernetes/listener.go b/remoting/kubernetes/listener.go index da6d83515..0425aaf92 100644 --- a/remoting/kubernetes/listener.go +++ b/remoting/kubernetes/listener.go @@ -52,7 +52,7 @@ func (l *EventListener) ListenServiceNodeEvent(key string, listener ...remoting. l.wg.Add(1) defer l.wg.Done() for { - wc, err := l.client.Watch(key) + wc, done, err := l.client.Watch(key) if err != nil { logger.Warnf("watch exist{key:%s} = error{%v}", key, err) return false @@ -65,7 +65,12 @@ func (l *EventListener) ListenServiceNodeEvent(key string, listener ...remoting. logger.Warnf("kubernetes client stopped") return false - // handle kubernetes-store events + // store watcher stopped + case <-done: + logger.Warnf("kubernetes store watcher stopped") + return false + + // handle kubernetes-store events case e, ok := <-wc: if !ok { logger.Warnf("kubernetes-store watch-chan closed") @@ -123,7 +128,7 @@ func (l *EventListener) ListenServiceNodeEventWithPrefix(prefix string, listener l.wg.Add(1) defer l.wg.Done() for { - wc, err := l.client.WatchWithPrefix(prefix) + wc, done, err := l.client.WatchWithPrefix(prefix) if err != nil { logger.Warnf("listenDirEvent(key{%s}) = error{%v}", prefix, err) } @@ -134,6 +139,11 @@ func (l *EventListener) ListenServiceNodeEventWithPrefix(prefix string, listener logger.Warnf("kubernetes client stopped") return + // watcher stopped + case <-done: + logger.Warnf("kubernetes store watcher stopped") + return + // kuberentes-store event stream case e, ok := <-wc: diff --git a/remoting/kubernetes/store.go b/remoting/kubernetes/store.go index 51365c14c..a2cc99419 100644 --- a/remoting/kubernetes/store.go +++ b/remoting/kubernetes/store.go @@ -110,9 +110,9 @@ type storeImpl struct { watchers map[uint64]*watcher } -// on stop +// wait exit // when the store was closed -func (s *storeImpl) onStop() { +func (s *storeImpl) waitExit() { select { case <-s.ctx.Done(): @@ -124,7 +124,7 @@ func (s *storeImpl) onStop() { for _, w := range watchers { // stop data stream - close(w.ch) + // close(w.ch) // stop watcher w.stop() } @@ -149,8 +149,6 @@ func (s *storeImpl) Put(object *Object) error { sendMsg := func(object *Object, w *watcher) { - s.lock.Lock() - defer s.lock.Unlock() select { case <-w.done(): // the watcher already stop @@ -329,6 +327,6 @@ func newStore(ctx context.Context) Store { cache: map[string]*Object{}, watchers: map[uint64]*watcher{}, } - go s.onStop() + go s.waitExit() return s } diff --git a/remoting/kubernetes/store_test.go b/remoting/kubernetes/store_test.go index 11036adb4..fc6df0602 100644 --- a/remoting/kubernetes/store_test.go +++ b/remoting/kubernetes/store_test.go @@ -44,8 +44,15 @@ func TestStore(t *testing.T) { if err != nil { t.Fatal(err) } - for e := range w.ResultChan() { - t.Logf("consumer %s got %s\n", w.ID(), e.Key) + for { + select { + case e := <-w.ResultChan(): + t.Logf("consumer %s got %s\n", w.ID(), e.Key) + + case <-w.done(): + t.Logf("consumer %s stopped", w.ID()) + return + } } }() } @@ -59,8 +66,16 @@ func TestStore(t *testing.T) { if err != nil { t.Fatal(err) } - for e := range w.ResultChan() { - t.Logf("prefix consumer %s got %s\n", w.ID(), e.Key) + + for { + select { + case e := <-w.ResultChan(): + t.Logf("prefix consumer %s got %s\n", w.ID(), e.Key) + + case <-w.done(): + t.Logf("prefix consumer %s stopped", w.ID()) + return + } } }() } -- GitLab