diff --git a/remoting/kubernetes/client.go b/remoting/kubernetes/client.go index 9be25cb0eeaab7ca6c398cb1eaf984b1df2f9359..dcb28c37d2d6a0665046425053abedaab7b607fe 100644 --- a/remoting/kubernetes/client.go +++ b/remoting/kubernetes/client.go @@ -291,6 +291,10 @@ func (c *Client) watchPods() error { return nil } +type resourceVersionGetter interface { + GetResourceVersion() string +} + // watchPods // try to notify func (c *Client) watchPodsLoop() { @@ -316,61 +320,49 @@ func (c *Client) watchPodsLoop() { logger.Infof("the old kubernetes client broken, collect the resource status from resource version (%s)", c.lastResourceVersion) - select { - case <-c.ctx.Done(): - // the client stopped - logger.Info("the kubernetes client stopped") - return - - default: + for { + select { + // double check ctx + case <-c.ctx.Done(): + logger.Info("the kubernetes client stopped, resultChan len %d", len(wc.ResultChan())) + return - for { - select { - // double check ctx - case <-c.ctx.Done(): - logger.Info("the kubernetes client stopped") + // get one element from result-chan + case event, ok := <-wc.ResultChan(): + if !ok { + wc.Stop() + logger.Info("kubernetes watch chan die, create new") goto onceWatch + } - // get one element from result-chan - case event, ok := <-wc.ResultChan(): - if !ok { - wc.Stop() - logger.Info("kubernetes watch chan die, create new") - goto onceWatch - } - - if event.Type == watch.Error { - // watched a error event - logger.Warnf("kubernetes watch api report err (%#v)", event) - continue - } - - type resourceVersionGetter interface { - GetResourceVersion() string - } - - o, ok := event.Object.(resourceVersionGetter) - if !ok { - continue - } - - // record the last resource version avoid to sync all pod - c.lastResourceVersion = o.GetResourceVersion() - logger.Infof("kubernetes get the current resource version %v", c.lastResourceVersion) - - // check event object type - p, ok := event.Object.(*v1.Pod) - if !ok { - // not a pod - continue - } - - // handle the watched pod - go c.handleWatchedPodEvent(p, event.Type) + if event.Type == watch.Error { + // watched a error event + logger.Warnf("kubernetes watch api report err (%#v)", event) + continue } + + o, ok := event.Object.(resourceVersionGetter) + if !ok { + logger.Warnf("kubernetes response object not a versioned object") + continue + } + + // record the last resource version avoid to sync all pod + c.lastResourceVersion = o.GetResourceVersion() + logger.Infof("kubernetes get the current resource version %v", c.lastResourceVersion) + + // check event object type + p, ok := event.Object.(*v1.Pod) + if !ok { + // not a pod + continue + } + + // handle the watched pod + go c.handleWatchedPodEvent(p, event.Type) } - onceWatch: } + onceWatch: } }