Skip to content
Snippets Groups Projects
Commit 883f30a6 authored by scott's avatar scott
Browse files

Fix time gap

parent 84da2bd6
No related branches found
No related tags found
No related merge requests found
...@@ -70,6 +70,9 @@ type Client struct { ...@@ -70,6 +70,9 @@ type Client struct {
ns string ns string
// current resource version
lastResourceVersion string
// the memory watcherSet // the memory watcherSet
watcherSet WatcherSet watcherSet WatcherSet
...@@ -255,6 +258,9 @@ func (c *Client) initWatchSet() error { ...@@ -255,6 +258,9 @@ func (c *Client) initWatchSet() error {
return perrors.WithMessagef(err, "list pods in namespace (%s)", c.ns) return perrors.WithMessagef(err, "list pods in namespace (%s)", c.ns)
} }
// set resource version
c.lastResourceVersion = pods.GetResourceVersion()
for _, pod := range pods.Items { for _, pod := range pods.Items {
logger.Debugf("got the pod (name: %s), (label: %v), (annotations: %v)", pod.Name, pod.GetLabels(), pod.GetAnnotations()) logger.Debugf("got the pod (name: %s), (label: %v), (annotations: %v)", pod.Name, pod.GetLabels(), pod.GetAnnotations())
c.handleWatchedPodEvent(&pod, watch.Added) c.handleWatchedPodEvent(&pod, watch.Added)
...@@ -271,6 +277,7 @@ func (c *Client) watchPods() error { ...@@ -271,6 +277,7 @@ func (c *Client) watchPods() error {
watcher, err := c.rawClient.CoreV1().Pods(c.ns).Watch(metav1.ListOptions{ watcher, err := c.rawClient.CoreV1().Pods(c.ns).Watch(metav1.ListOptions{
LabelSelector: fields.OneTermEqualSelector(DubboIOLabelKey, DubboIOLabelValue).String(), LabelSelector: fields.OneTermEqualSelector(DubboIOLabelKey, DubboIOLabelValue).String(),
Watch: true, Watch: true,
ResourceVersion: c.lastResourceVersion,
}) })
if err != nil { if err != nil {
return perrors.WithMessagef(err, "try to watch the namespace (%s) pods", c.ns) return perrors.WithMessagef(err, "try to watch the namespace (%s) pods", c.ns)
...@@ -294,14 +301,12 @@ func (c *Client) watchPodsLoop() { ...@@ -294,14 +301,12 @@ func (c *Client) watchPodsLoop() {
logger.Info("watchPodsLoop goroutine game over") logger.Info("watchPodsLoop goroutine game over")
}() }()
var lastResourceVersion string
for { for {
wc, err := c.rawClient.CoreV1().Pods(c.ns).Watch(metav1.ListOptions{ wc, err := c.rawClient.CoreV1().Pods(c.ns).Watch(metav1.ListOptions{
LabelSelector: fields.OneTermEqualSelector(DubboIOLabelKey, DubboIOLabelValue).String(), LabelSelector: fields.OneTermEqualSelector(DubboIOLabelKey, DubboIOLabelValue).String(),
Watch: true, Watch: true,
ResourceVersion: lastResourceVersion, ResourceVersion: c.lastResourceVersion,
}) })
if err != nil { if err != nil {
logger.Warnf("watch the namespace (%s) pods: %v, retry after 2 seconds", c.ns, err) logger.Warnf("watch the namespace (%s) pods: %v, retry after 2 seconds", c.ns, err)
...@@ -309,7 +314,7 @@ func (c *Client) watchPodsLoop() { ...@@ -309,7 +314,7 @@ func (c *Client) watchPodsLoop() {
continue continue
} }
logger.Infof("the old kubernetes client broken, collect the resource status from resource version (%s)", lastResourceVersion) logger.Infof("the old kubernetes client broken, collect the resource status from resource version (%s)", c.lastResourceVersion)
select { select {
case <-c.ctx.Done(): case <-c.ctx.Done():
...@@ -350,8 +355,8 @@ func (c *Client) watchPodsLoop() { ...@@ -350,8 +355,8 @@ func (c *Client) watchPodsLoop() {
} }
// record the last resource version avoid to sync all pod // record the last resource version avoid to sync all pod
lastResourceVersion = o.GetResourceVersion() c.lastResourceVersion = o.GetResourceVersion()
logger.Infof("kuberentes get the current resource version %v", lastResourceVersion) logger.Infof("kubernetes get the current resource version %v", c.lastResourceVersion)
// check event object type // check event object type
p, ok := event.Object.(*v1.Pod) p, ok := event.Object.(*v1.Pod)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment