diff --git a/registry/kubernetes/listener.go b/registry/kubernetes/listener.go index f8869fea7b77541eb929624cc1fa708c1218d7dd..ac6f8af8a2dbe5b072a580a65dadb3902eb7b750 100644 --- a/registry/kubernetes/listener.go +++ b/registry/kubernetes/listener.go @@ -103,7 +103,7 @@ func (l *configurationListener) Next() (*registry.ServiceEvent, error) { return nil, perrors.New("listener stopped") case e := <-l.events: - logger.Infof("got kubernetes event %#v", e) + logger.Debugf("got kubernetes event %#v", e) if e.ConfigType == remoting.EventTypeDel && !l.registry.client.Valid() { select { case <-l.registry.Done(): diff --git a/registry/kubernetes/registry.go b/registry/kubernetes/registry.go index 69a3fca18816054738024fb5e1ab7541734444f7..9fa6b83941621e8a4b9cf55d738d9b6a02dcc7f0 100644 --- a/registry/kubernetes/registry.go +++ b/registry/kubernetes/registry.go @@ -160,7 +160,7 @@ func newKubernetesRegistry(url *common.URL) (registry.Registry, error) { go r.HandleClientRestart() r.InitListeners() - logger.Debugf("the kubernetes registry started") + logger.Debugf("kubernetes registry started") return r, nil } diff --git a/remoting/kubernetes/client.go b/remoting/kubernetes/client.go index 430667fec7bfc749baaf93a1974b53a8e85db11f..240257dbf55028a203bf9d419da0698fbfa9f8a3 100644 --- a/remoting/kubernetes/client.go +++ b/remoting/kubernetes/client.go @@ -70,7 +70,7 @@ func newClient(url common.URL) (*Client, error) { if r == common.CONSUMER { // only consumer have to start informer factory - c.controller.Run() + c.controller.startALLInformers() } return c, nil } @@ -185,8 +185,8 @@ func ValidateClient(container clientFacade) error { newClient, err := newClient(container.GetUrl()) if err != nil { - logger.Warnf("new kubernetes client (namespace{%s}: %v)", "", err) - return perrors.WithMessagef(err, "new kubernetes client (:%+v)", "") + logger.Warnf("new kubernetes client: %v)", err) + return perrors.WithMessage(err, "new kubernetes client") } container.SetClient(newClient) } @@ -212,6 +212,6 @@ func NewMockClient(podList *v1.PodList) (*Client, error) { controller: controller, } - c.controller.Run() + c.controller.startALLInformers() return c, nil } diff --git a/remoting/kubernetes/listener.go b/remoting/kubernetes/listener.go index 4c198c66cc3e02006291a195af9d023ec5a02340..a5e7a544fadfc249426d34ce68081ab3d4b01bdb 100644 --- a/remoting/kubernetes/listener.go +++ b/remoting/kubernetes/listener.go @@ -81,8 +81,6 @@ func (l *EventListener) ListenServiceNodeEvent(key string, listener ...remoting. } } } - - return false } // return true mean the event type is DELETE diff --git a/remoting/kubernetes/registry_controller.go b/remoting/kubernetes/registry_controller.go index 75799b3100ab3ea6a8a5d2a9aacb62014c734238..b9cb129942d71fc354a75252ea9d39cf9d04ee4e 100644 --- a/remoting/kubernetes/registry_controller.go +++ b/remoting/kubernetes/registry_controller.go @@ -52,21 +52,20 @@ import ( ) const ( - // kubernetes suggest resync - defaultResync = 5 * time.Minute -) - -const ( - // kubernetes inject the var + // kubernetes inject env var podNameKey = "HOSTNAME" nameSpaceKey = "NAMESPACE" needWatchedNameSpaceKey = "DUBBO_NAMESPACE" + // all pod annotation key DubboIOAnnotationKey = "dubbo.io/annotation" - + // all pod label key and value pair DubboIOLabelKey = "dubbo.io/label" DubboIOConsumerLabelValue = "dubbo.io.consumer" DubboIOProviderLabelValue = "dubbo.io.provider" + + // kubernetes suggest resync + defaultResync = 5 * time.Minute ) var ( @@ -246,9 +245,9 @@ func (c *dubboRegistryController) initPodInformer() error { for _, ns := range strings.Split(needWatchedNameSpaceList, ",") { c.needWatchedNamespace[ns] = struct{}{} } - // current work namespace should be watched c.needWatchedNamespace[c.namespace] = struct{}{} + c.queue = workqueue.New() // init all watch needed pod-informer @@ -309,13 +308,11 @@ func (c *dubboRegistryController) deletePod(obj interface{}) { }) } -func (c *dubboRegistryController) Run() { - +func (c *dubboRegistryController) startALLInformers() { logger.Debugf("starting namespaced informer-factory") for _, factory := range c.namespacedInformerFactory { go factory.Start(c.ctx.Done()) } - logger.Debugf("finish start namespaced informer-factory") } // run @@ -326,6 +323,7 @@ func (c *dubboRegistryController) run() { return } + defer logger.Warn("dubbo registry controller work stopped") defer c.queue.ShutDown() for ns, podInformer := range c.namespacedPodInformers { @@ -344,7 +342,6 @@ func (c *dubboRegistryController) run() { } func (c *dubboRegistryController) work() { - defer logger.Warn("dubbo registry controller work stopped") for c.processNextWorkItem() { } } @@ -365,7 +362,6 @@ func (c *dubboRegistryController) processNextWorkItem() bool { // handleWatchedPodEvent // handle watched pod event func (c *dubboRegistryController) handleWatchedPodEvent(p *v1.Pod, eventType watch.EventType) { - logger.Debugf("get @type = %s event from @pod = %s", eventType, p.GetName()) for ak, av := range p.GetAnnotations() { @@ -382,7 +378,6 @@ func (c *dubboRegistryController) handleWatchedPodEvent(p *v1.Pod, eventType wat } for _, o := range ol { - switch eventType { case watch.Added: // if pod is added, the record always be create @@ -396,15 +391,12 @@ func (c *dubboRegistryController) handleWatchedPodEvent(p *v1.Pod, eventType wat return } - logger.Debugf("putting object (%#v) to watcherSet", o) - + logger.Debugf("putting @key=%s @value=%s to watcherSet", o.Key, o.Value) if err := c.watcherSet.Put(o); err != nil { logger.Errorf("put (%#v) to cache watcherSet: %v ", o, err) return } - } - } } @@ -557,7 +549,6 @@ func (c *dubboRegistryController) getPatch(oldPod, newPod *v1.Pod) ([]byte, erro // marshalRecord // marshal the kubernetes dubbo annotation value func (c *dubboRegistryController) marshalRecord(ol []*WatcherEvent) (string, error) { - msg, err := json.Marshal(ol) if err != nil { return "", perrors.WithMessage(err, "json encode object list")