From ef96233a15757e43876348e7b93bbb4580c19b21 Mon Sep 17 00:00:00 2001
From: Joe Zou <yixian.zou@gmail.com>
Date: Thu, 24 Sep 2020 23:27:53 +0800
Subject: [PATCH] Merge pull request #741 from beiwei30/address-batch-mode

Address notification batch mode
# Conflicts:
#	registry/directory/directory.go
---
 cluster/router/chan.go          |   7 ++
 common/url.go                   |  28 ++++++
 registry/directory/directory.go | 172 ++++++++++++++++++++++----------
 registry/protocol/protocol.go   |   7 +-
 registry/registry.go            |   7 +-
 5 files changed, 164 insertions(+), 57 deletions(-)

diff --git a/cluster/router/chan.go b/cluster/router/chan.go
index 6904e1734..e3e84b81f 100644
--- a/cluster/router/chan.go
+++ b/cluster/router/chan.go
@@ -17,9 +17,16 @@
 
 package router
 
+import (
+	"github.com/apache/dubbo-go/protocol"
+)
+
 // Chain
 type Chain interface {
 	router
 	// AddRouters Add routers
 	AddRouters([]PriorityRouter)
+
+	// SetInvokers notify router chain of the initial addresses from registry at the first time. Notify whenever addresses in registry change.
+	SetInvokers(invokers []protocol.Invoker)
 }
diff --git a/common/url.go b/common/url.go
index 02281c7c1..5a3e57406 100644
--- a/common/url.go
+++ b/common/url.go
@@ -657,6 +657,34 @@ func (c *URL) CloneWithParams(reserveParams []string) *URL {
 	)
 }
 
+// IsEquals compares if two URLs equals with each other. Excludes are all parameter keys which should ignored.
+func IsEquals(left URL, right URL, excludes ...string) bool {
+	if left.Ip != right.Ip || left.Port != right.Port {
+		return false
+	}
+
+	leftMap := left.ToMap()
+	rightMap := right.ToMap()
+	for _, exclude := range excludes {
+		delete(leftMap, exclude)
+		delete(rightMap, exclude)
+	}
+
+	if len(leftMap) != len(rightMap) {
+		return false
+	}
+
+	for lk, lv := range leftMap {
+		if rv, ok := rightMap[lk]; !ok {
+			return false
+		} else if lv != rv {
+			return false
+		}
+	}
+
+	return true
+}
+
 func mergeNormalParam(mergedUrl *URL, referenceUrl *URL, paramKeys []string) []func(method string) {
 	methodConfigMergeFcn := make([]func(method string), 0, len(paramKeys))
 	for _, paramKey := range paramKeys {
diff --git a/registry/directory/directory.go b/registry/directory/directory.go
index 2fbf9410f..8871a2a26 100644
--- a/registry/directory/directory.go
+++ b/registry/directory/directory.go
@@ -18,6 +18,7 @@
 package directory
 
 import (
+	"fmt"
 	"sync"
 )
 
@@ -89,69 +90,120 @@ func (dir *RegistryDirectory) subscribe(url *common.URL) {
 }
 
 // Notify monitor changes from registry,and update the cacheServices
-func (dir *RegistryDirectory) Notify(event *registry.ServiceEvent) {
-	go dir.update(event)
+func (dir *RegistryDirectory) Notify(events ...*registry.ServiceEvent) {
+	go dir.refreshInvokers(events...)
 }
 
-// update the cacheServices and subscribe service from registry
-func (dir *RegistryDirectory) update(res *registry.ServiceEvent) {
-	if res == nil {
-		return
+// refreshInvokers refreshes service's events. It supports two modes: incremental mode and batch mode. If a single
+// service event is passed in, then it is incremental mode, and if an array of service events are passed in, it is
+// batch mode, in this mode, we assume the registry center have the complete list of the service events, therefore
+// in this case, we can safely assume any cached invoker not in the incoming list can be removed. It is necessary
+// since in batch mode, the register center handles the different type of events by itself, then notify the directory
+// a batch of 'Update' events, instead of omit the different type of event one by one.
+func (dir *RegistryDirectory) refreshInvokers(events ...*registry.ServiceEvent) {
+	var oldInvokers []protocol.Invoker
+
+	// in batch mode, it is safe to remove since we have the complete list of events.
+	if len(events) > 1 {
+		dir.cacheInvokersMap.Range(func(k, v interface{}) bool {
+			if !dir.eventMatched(k.(string), events) {
+				if invoker := dir.uncacheInvokerWithKey(k.(string)); invoker != nil {
+					oldInvokers = append(oldInvokers, invoker)
+				}
+			}
+			return true
+		})
 	}
-	logger.Debugf("registry update, result{%s}", res)
-	logger.Debugf("update service name: %s!", res.Service)
-	dir.refreshInvokers(res)
-}
-
-func (dir *RegistryDirectory) refreshInvokers(res *registry.ServiceEvent) {
-	var (
-		url        *common.URL
-		oldInvoker protocol.Invoker = nil
-	)
-	// judge is override or others
-	if res != nil {
-		url = &res.Service
-		// 1.for override url in 2.6.x
-		if url.Protocol == constant.OVERRIDE_PROTOCOL ||
-			url.GetParam(constant.CATEGORY_KEY, constant.DEFAULT_CATEGORY) == constant.CONFIGURATORS_CATEGORY {
-			dir.configurators = append(dir.configurators, extension.GetDefaultConfigurator(url))
-			url = nil
-		} else if url.Protocol == constant.ROUTER_PROTOCOL || // 2.for router
-			url.GetParam(constant.CATEGORY_KEY, constant.DEFAULT_CATEGORY) == constant.ROUTER_CATEGORY {
-			url = nil
 
+	for _, event := range events {
+		logger.Debugf("registry update, result{%s}", event)
+		if oldInvoker, _ := dir.cacheInvokerByEvent(event); oldInvoker != nil {
+			oldInvokers = append(oldInvokers, oldInvoker)
 		}
-		switch res.Action {
-		case remoting.EventTypeAdd, remoting.EventTypeUpdate:
-			logger.Infof("selector add service url{%s}", res.Service)
+	}
 
-			var urls []*common.URL
-			for _, v := range config.GetRouterURLSet().Values() {
-				urls = append(urls, v.(*common.URL))
-			}
+	if len(events) > 0 {
+		dir.setNewInvokers()
+	}
 
-			if len(urls) > 0 {
-				dir.SetRouters(urls)
-			}
-			oldInvoker = dir.cacheInvoker(url)
-		case remoting.EventTypeDel:
-			oldInvoker = dir.uncacheInvoker(url)
-			logger.Infof("selector delete service url{%s}", res.Service)
-		default:
-			return
+	// After dir.cacheInvokers is updated,destroy the oldInvoker
+	// Ensure that no request will enter the oldInvoker
+	for _, invoker := range oldInvokers {
+		invoker.Destroy()
+	}
+}
+
+// eventMatched checks if a cached invoker appears in the incoming invoker list, if no, then it is safe to remove.
+func (dir *RegistryDirectory) eventMatched(key string, events []*registry.ServiceEvent) bool {
+	for _, event := range events {
+		if dir.invokerCacheKey(&event.Service) == key {
+			return true
 		}
 	}
+	return false
+}
 
+// invokerCacheKey generates the key in the cache for a given URL.
+func (dir *RegistryDirectory) invokerCacheKey(url *common.URL) string {
+	referenceUrl := dir.GetDirectoryUrl().SubURL
+	newUrl := common.MergeUrl(url, referenceUrl)
+	return newUrl.Key()
+}
+
+// setNewInvokers groups the invokers from the cache first, then set the result to both directory and router chain.
+func (dir *RegistryDirectory) setNewInvokers() {
 	newInvokers := dir.toGroupInvokers()
 	dir.listenerLock.Lock()
+	defer dir.listenerLock.Unlock()
 	dir.cacheInvokers = newInvokers
-	dir.listenerLock.Unlock()
-	// After dir.cacheInvokers is updated,destroy the oldInvoker
-	// Ensure that no request will enter the oldInvoker
-	if oldInvoker != nil {
-		oldInvoker.Destroy()
+	dir.RouterChain().SetInvokers(newInvokers)
+}
+
+// cacheInvokerByEvent caches invokers from the service event
+func (dir *RegistryDirectory) cacheInvokerByEvent(event *registry.ServiceEvent) (protocol.Invoker, error) {
+	// judge is override or others
+	if event != nil {
+		u := dir.convertUrl(event)
+		switch event.Action {
+		case remoting.EventTypeAdd, remoting.EventTypeUpdate:
+			logger.Infof("selector add service url{%s}", event.Service)
+			// FIXME: routers are built in every address notification?
+			dir.configRouters()
+			return dir.cacheInvoker(u), nil
+		case remoting.EventTypeDel:
+			logger.Infof("selector delete service url{%s}", event.Service)
+			return dir.uncacheInvoker(u), nil
+		default:
+			return nil, fmt.Errorf("illegal event type: %v", event.Action)
+		}
+	}
+	return nil, nil
+}
+
+// configRouters configures dynamic routers into the router chain, but, the current impl is incorrect, see FIXME above.
+func (dir *RegistryDirectory) configRouters() {
+	var urls []*common.URL
+	for _, v := range config.GetRouterURLSet().Values() {
+		urls = append(urls, v.(*common.URL))
 	}
 
+	if len(urls) > 0 {
+		dir.SetRouters(urls)
+	}
+}
+
+// convertUrl processes override:// and router://
+func (dir *RegistryDirectory) convertUrl(res *registry.ServiceEvent) *common.URL {
+	ret := &res.Service
+	if ret.Protocol == constant.OVERRIDE_PROTOCOL || // 1.for override url in 2.6.x
+		ret.GetParam(constant.CATEGORY_KEY, constant.DEFAULT_CATEGORY) == constant.CONFIGURATORS_CATEGORY {
+		dir.configurators = append(dir.configurators, extension.GetDefaultConfigurator(ret))
+		ret = nil
+	} else if ret.Protocol == constant.ROUTER_PROTOCOL || // 2.for router
+		ret.GetParam(constant.CATEGORY_KEY, constant.DEFAULT_CATEGORY) == constant.ROUTER_CATEGORY {
+		ret = nil
+	}
+	return ret
 }
 
 func (dir *RegistryDirectory) toGroupInvokers() []protocol.Invoker {
@@ -197,11 +249,15 @@ func (dir *RegistryDirectory) toGroupInvokers() []protocol.Invoker {
 	return groupInvokersList
 }
 
-// uncacheInvoker will return abandoned Invoker,if no Invoker to be abandoned,return nil
+// uncacheInvoker will return abandoned Invoker, if no Invoker to be abandoned, return nil
 func (dir *RegistryDirectory) uncacheInvoker(url *common.URL) protocol.Invoker {
-	logger.Debugf("service will be deleted in cache invokers: invokers key is  %s!", url.Key())
-	if cacheInvoker, ok := dir.cacheInvokersMap.Load(url.Key()); ok {
-		dir.cacheInvokersMap.Delete(url.Key())
+	return dir.uncacheInvokerWithKey(url.Key())
+}
+
+func (dir *RegistryDirectory) uncacheInvokerWithKey(key string) protocol.Invoker {
+	logger.Debugf("service will be deleted in cache invokers: invokers key is  %s!", key)
+	if cacheInvoker, ok := dir.cacheInvokersMap.Load(key); ok {
+		dir.cacheInvokersMap.Delete(key)
 		return cacheInvoker.(protocol.Invoker)
 	}
 	return nil
@@ -232,6 +288,12 @@ func (dir *RegistryDirectory) cacheInvoker(url *common.URL) protocol.Invoker {
 				dir.cacheInvokersMap.Store(newUrl.Key(), newInvoker)
 			}
 		} else {
+			// if cached invoker has the same URL with the new URL, then no need to re-refer, and no need to destroy
+			// the old invoker.
+			if common.IsEquals(*newUrl, cacheInvoker.(protocol.Invoker).GetUrl()) {
+				return nil
+			}
+
 			logger.Debugf("service will be updated in cache invokers: new invoker url is %s, old invoker url is %s", newUrl, cacheInvoker.(protocol.Invoker).GetUrl())
 			newInvoker := extension.GetProtocol(protocolwrapper.FILTER).Refer(*newUrl)
 			if newInvoker != nil {
@@ -312,7 +374,8 @@ func newReferenceConfigurationListener(dir *RegistryDirectory, url *common.URL)
 // Process handle events and update Invokers
 func (l *referenceConfigurationListener) Process(event *config_center.ConfigChangeEvent) {
 	l.BaseConfigurationListener.Process(event)
-	l.directory.refreshInvokers(nil)
+	// FIXME: this doesn't trigger dir.overrideUrl()
+	l.directory.refreshInvokers()
 }
 
 type consumerConfigurationListener struct {
@@ -338,5 +401,6 @@ func (l *consumerConfigurationListener) addNotifyListener(listener registry.Noti
 // Process handles events from Configuration Center and update Invokers
 func (l *consumerConfigurationListener) Process(event *config_center.ConfigChangeEvent) {
 	l.BaseConfigurationListener.Process(event)
-	l.directory.refreshInvokers(nil)
+	// FIXME: this doesn't trigger dir.overrideUrl()
+	l.directory.refreshInvokers()
 }
diff --git a/registry/protocol/protocol.go b/registry/protocol/protocol.go
index e004d6dfe..69a31ef2f 100644
--- a/registry/protocol/protocol.go
+++ b/registry/protocol/protocol.go
@@ -241,7 +241,12 @@ func newOverrideSubscribeListener(overriderUrl *common.URL, invoker protocol.Inv
 }
 
 // Notify will be triggered when a service change notification is received.
-func (nl *overrideSubscribeListener) Notify(event *registry.ServiceEvent) {
+func (nl *overrideSubscribeListener) Notify(events ...*registry.ServiceEvent) {
+	if len(events) == 0 {
+		return
+	}
+
+	event := events[0]
 	if isMatched(&(event.Service), nl.url) && event.Action == remoting.EventTypeAdd {
 		nl.configurator = extension.GetDefaultConfigurator(&(event.Service))
 		nl.doOverrideIfNecessary()
diff --git a/registry/registry.go b/registry/registry.go
index 855b487d4..2225d2c1f 100644
--- a/registry/registry.go
+++ b/registry/registry.go
@@ -68,8 +68,11 @@ type Registry interface {
 
 // nolint
 type NotifyListener interface {
-	// Notify supports notifications on the service interface and the dimension of the data type.
-	Notify(*ServiceEvent)
+	// Notify supports notifications on the service interface and the dimension of the data type. When a list of
+	// events are passed in, it's considered as a complete list, on the other side, if one single event is
+	// passed in, then it's a incremental event. Pls. note when a list (instead of single event) comes,
+	// the impl of NotifyListener may abandon the accumulated result from previous notifications.
+	Notify(...*ServiceEvent)
 }
 
 // Listener Deprecated!
-- 
GitLab