diff --git a/remoting/kubernetes/client.go b/remoting/kubernetes/client.go index fbabd55beff3500d4346a44a157441cf09ea5674..0c9ffd2b914e6ad584023725867a3aaa4b641224 100644 --- a/remoting/kubernetes/client.go +++ b/remoting/kubernetes/client.go @@ -152,6 +152,8 @@ func newMockClient(namespace string, mockClientGenerator func() (kubernetes.Inte return nil, perrors.WithMessage(err, "init watcherSet") } + c.lastResourceVersion = c.currentPod.GetResourceVersion() + // start kubernetes watch loop if err := c.watchPods(); err != nil { return nil, perrors.WithMessage(err, "watch pods") @@ -324,7 +326,7 @@ func (c *Client) watchPodsLoop() { select { // double check ctx case <-c.ctx.Done(): - logger.Info("the kubernetes client stopped, resultChan len %d", len(wc.ResultChan())) + logger.Infof("the kubernetes client stopped, resultChan len %d", len(wc.ResultChan())) return // get one element from result-chan @@ -358,6 +360,7 @@ func (c *Client) watchPodsLoop() { continue } + logger.Debugf("kubernetes got pod %#v", p) // handle the watched pod go c.handleWatchedPodEvent(p, event.Type) } @@ -456,10 +459,14 @@ func (c *Client) readCurrentPod() (*v1.Pod, error) { // create k/v pair in watcher-set func (c *Client) Create(k, v string) error { + // the read current pod must be lock, protect every + // create operation can be atomic + c.lock.Lock() + defer c.lock.Unlock() + // 1. accord old pod && (k, v) assemble new pod dubbo annotion v // 2. get patch data // 3. PATCH the pod - currentPod, err := c.readCurrentPod() if err != nil { return perrors.WithMessage(err, "read current pod") @@ -480,9 +487,8 @@ func (c *Client) Create(k, v string) error { return perrors.WithMessage(err, "patch current pod") } - c.lock.Lock() c.currentPod = updatedPod - c.lock.Unlock() + logger.Debugf("put the @key = %s @value = %s success", k, v) // not update the watcherSet, the watcherSet should be write by the watchPodsLoop return nil } diff --git a/remoting/kubernetes/client_test.go b/remoting/kubernetes/client_test.go index 985aba6a94f7ccab15f8c1b92c0bcc3768b15a39..342285b345b5e45682fe792d35f2f910e7d86d9d 100644 --- a/remoting/kubernetes/client_test.go +++ b/remoting/kubernetes/client_test.go @@ -19,7 +19,10 @@ package kubernetes import ( "encoding/json" + "fmt" + "net/http" "os" + "runtime" "strings" "sync" "testing" @@ -216,11 +219,15 @@ func (s *KubernetesClientTestSuite) initClient() *Client { if err != nil { t.Fatal(err) } + + time.Sleep(time.Second) return client } func (s *KubernetesClientTestSuite) SetupSuite() { + runtime.GOMAXPROCS(1) + t := s.T() // 1. install test data @@ -236,6 +243,8 @@ func (s *KubernetesClientTestSuite) SetupSuite() { t.Fatal(err) } + go http.ListenAndServe(":6061", nil) + } func (s *KubernetesClientTestSuite) TestReadCurrentPodName() { @@ -336,14 +345,15 @@ func (s *KubernetesClientTestSuite) TestClientGetChildrenKVList() { if err != nil { t.Fatal(err) } - i := 0 + wg.Done() + i := 0 for { select { case e := <-wc: i++ - t.Logf("got event %v k %s v %s", e.EventType, e.Key, e.Value) + fmt.Printf("got event %v k %s v %s\n", e.EventType, e.Key, e.Value) if i == 3 { // already sync all event syncDataComplete <- struct{}{} diff --git a/remoting/kubernetes/watch.go b/remoting/kubernetes/watch.go index 835bcdb9b7c1df99c56d140a382974fffe10594d..c99a3ebcc041f2fed0160f1f286e72937d2c9aee 100644 --- a/remoting/kubernetes/watch.go +++ b/remoting/kubernetes/watch.go @@ -186,6 +186,8 @@ func (s *watcherSetImpl) Put(watcherEvent *WatcherEvent) error { // notify watcher for _, w := range s.watchers { + w := w + if !strings.Contains(watcherEvent.Key, w.interested.key) { // this watcher no interest in this element continue