Skip to content
Snippets Groups Projects
Commit 3a3e8a7d authored by scott's avatar scott
Browse files

Fix remote/kubernetes sendMsg locker

parent 8120caa8
No related branches found
No related tags found
No related merge requests found
......@@ -605,26 +605,26 @@ func (c *Client) GetChildren(k string) ([]string, []string, error) {
// Watch
// watch on spec key
func (c *Client) Watch(k string) (<-chan *Object, error) {
func (c *Client) Watch(k string) (<-chan *Object, <-chan struct{}, error) {
w, err := c.store.Watch(k, false)
if err != nil {
return nil, perrors.WithMessagef(err, "watch on (%s)", k)
return nil, nil, perrors.WithMessagef(err, "watch on (%s)", k)
}
return w.ResultChan(), nil
return w.ResultChan(), w.done(), nil
}
// Watch
// watch on spec prefix
func (c *Client) WatchWithPrefix(prefix string) (<-chan *Object, error) {
func (c *Client) WatchWithPrefix(prefix string) (<-chan *Object, <-chan struct{}, error) {
w, err := c.store.Watch(prefix, true)
if err != nil {
return nil, perrors.WithMessagef(err, "watch on prefix (%s)", prefix)
return nil, nil, perrors.WithMessagef(err, "watch on prefix (%s)", prefix)
}
return w.ResultChan(), nil
return w.ResultChan(), w.done(), nil
}
// Valid
......
......@@ -19,14 +19,16 @@ package kubernetes
import (
"encoding/json"
"net/http"
"os"
"strings"
"sync"
"testing"
"time"
)
import (
"github.com/stretchr/testify/suite"
v1 "k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/fake"
......@@ -234,6 +236,8 @@ func (s *KubernetesClientTestSuite) SetupSuite() {
if err := os.Setenv(nameSpaceKey, s.currentPod.GetNamespace()); err != nil {
t.Fatal(err)
}
go http.ListenAndServe(":6061", nil)
}
func (s *KubernetesClientTestSuite) TestReadCurrentPodName() {
......@@ -329,16 +333,23 @@ func (s *KubernetesClientTestSuite) TestClientGetChildrenKVList() {
go func() {
defer wg.Done()
wc, err := client.WatchWithPrefix(prefix)
wc, done, err := client.WatchWithPrefix(prefix)
if err != nil {
t.Fatal(err)
}
i := 0
for e := range wc {
i++
t.Logf("got event %v k %s v %s", e.EventType, e.Key, e.Value)
if i == 3 {
// already sync all event
for {
select {
case e := <-wc:
i++
t.Logf("got event %v k %s v %s", e.EventType, e.Key, e.Value)
if i == 3 {
// already sync all event
return
}
case <-done:
t.Log("the store watcher was stopped")
return
}
}
......@@ -395,17 +406,22 @@ func (s *KubernetesClientTestSuite) TestClientWatchPrefix() {
go func() {
wc, err := client.WatchWithPrefix(prefix)
wc, done, err := client.WatchWithPrefix(prefix)
if err != nil {
t.Fatal(err)
}
wg.Done()
for e := range wc {
t.Logf("got event %v k %s v %s", e.EventType, e.Key, e.Value)
for {
select {
case e := <-wc:
t.Logf("got event %v k %s v %s", e.EventType, e.Key, e.Value)
case <-done:
t.Log("the store watcher was stopped")
return
}
}
}()
// must wait the watch goroutine work
......@@ -446,14 +462,20 @@ func (s *KubernetesClientTestSuite) TestClientWatch() {
go func() {
wc, err := client.Watch(prefix)
wc, done, err := client.Watch(prefix)
if err != nil {
t.Fatal(err)
}
wg.Done()
for e := range wc {
t.Logf("got event %v k %s v %s", e.EventType, e.Key, e.Value)
for {
select {
case e := <-wc:
t.Logf("got event %v k %s v %s", e.EventType, e.Key, e.Value)
case <-done:
t.Log("the store watcher was stopped")
return
}
}
}()
......
......@@ -52,7 +52,7 @@ func (l *EventListener) ListenServiceNodeEvent(key string, listener ...remoting.
l.wg.Add(1)
defer l.wg.Done()
for {
wc, err := l.client.Watch(key)
wc, done, err := l.client.Watch(key)
if err != nil {
logger.Warnf("watch exist{key:%s} = error{%v}", key, err)
return false
......@@ -65,7 +65,12 @@ func (l *EventListener) ListenServiceNodeEvent(key string, listener ...remoting.
logger.Warnf("kubernetes client stopped")
return false
// handle kubernetes-store events
// store watcher stopped
case <-done:
logger.Warnf("kubernetes store watcher stopped")
return false
// handle kubernetes-store events
case e, ok := <-wc:
if !ok {
logger.Warnf("kubernetes-store watch-chan closed")
......@@ -123,7 +128,7 @@ func (l *EventListener) ListenServiceNodeEventWithPrefix(prefix string, listener
l.wg.Add(1)
defer l.wg.Done()
for {
wc, err := l.client.WatchWithPrefix(prefix)
wc, done, err := l.client.WatchWithPrefix(prefix)
if err != nil {
logger.Warnf("listenDirEvent(key{%s}) = error{%v}", prefix, err)
}
......@@ -134,6 +139,11 @@ func (l *EventListener) ListenServiceNodeEventWithPrefix(prefix string, listener
logger.Warnf("kubernetes client stopped")
return
// watcher stopped
case <-done:
logger.Warnf("kubernetes store watcher stopped")
return
// kuberentes-store event stream
case e, ok := <-wc:
......
......@@ -110,9 +110,9 @@ type storeImpl struct {
watchers map[uint64]*watcher
}
// on stop
// wait exit
// when the store was closed
func (s *storeImpl) onStop() {
func (s *storeImpl) waitExit() {
select {
case <-s.ctx.Done():
......@@ -124,7 +124,7 @@ func (s *storeImpl) onStop() {
for _, w := range watchers {
// stop data stream
close(w.ch)
// close(w.ch)
// stop watcher
w.stop()
}
......@@ -149,8 +149,6 @@ func (s *storeImpl) Put(object *Object) error {
sendMsg := func(object *Object, w *watcher) {
s.lock.Lock()
defer s.lock.Unlock()
select {
case <-w.done():
// the watcher already stop
......@@ -329,6 +327,6 @@ func newStore(ctx context.Context) Store {
cache: map[string]*Object{},
watchers: map[uint64]*watcher{},
}
go s.onStop()
go s.waitExit()
return s
}
......@@ -44,8 +44,15 @@ func TestStore(t *testing.T) {
if err != nil {
t.Fatal(err)
}
for e := range w.ResultChan() {
t.Logf("consumer %s got %s\n", w.ID(), e.Key)
for {
select {
case e := <-w.ResultChan():
t.Logf("consumer %s got %s\n", w.ID(), e.Key)
case <-w.done():
t.Logf("consumer %s stopped", w.ID())
return
}
}
}()
}
......@@ -59,8 +66,16 @@ func TestStore(t *testing.T) {
if err != nil {
t.Fatal(err)
}
for e := range w.ResultChan() {
t.Logf("prefix consumer %s got %s\n", w.ID(), e.Key)
for {
select {
case e := <-w.ResultChan():
t.Logf("prefix consumer %s got %s\n", w.ID(), e.Key)
case <-w.done():
t.Logf("prefix consumer %s stopped", w.ID())
return
}
}
}()
}
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment