From 376adec84ca58cd176d515b22927ae71aa0044ef Mon Sep 17 00:00:00 2001
From: cvictory <shenglicao2@gmail.com>
Date: Wed, 16 Sep 2020 15:05:42 +0800
Subject: [PATCH] fix #750.  1.fix empty ServiceEvent, we should clear all
 invokers 2.we want to notify the address with complete address list.

---
 registry/directory/directory.go      | 83 +++++++++++++++++-----------
 registry/directory/directory_test.go | 24 ++++++++
 registry/mock_registry.go            | 68 +++++++++++++++++------
 registry/nacos/service_discovery.go  |  3 +-
 registry/protocol/protocol.go        | 15 +++--
 registry/registry.go                 |  6 +-
 6 files changed, 144 insertions(+), 55 deletions(-)

diff --git a/registry/directory/directory.go b/registry/directory/directory.go
index 5d4a890c6..148483604 100644
--- a/registry/directory/directory.go
+++ b/registry/directory/directory.go
@@ -104,44 +104,65 @@ func (dir *RegistryDirectory) subscribe(url *common.URL) {
 }
 
 // Notify monitor changes from registry,and update the cacheServices
-func (dir *RegistryDirectory) Notify(events ...*registry.ServiceEvent) {
-	go dir.refreshInvokers(events...)
+func (dir *RegistryDirectory) Notify(event *registry.ServiceEvent) {
+	if event == nil {
+		return
+	}
+	go dir.refreshInvokers(event)
 }
 
-// refreshInvokers refreshes service's events. It supports two modes: incremental mode and batch mode. If a single
-// service event is passed in, then it is incremental mode, and if an array of service events are passed in, it is
-// batch mode, in this mode, we assume the registry center have the complete list of the service events, therefore
-// in this case, we can safely assume any cached invoker not in the incoming list can be removed. It is necessary
-// since in batch mode, the register center handles the different type of events by itself, then notify the directory
-// a batch of 'Update' events, instead of omit the different type of event one by one.
-func (dir *RegistryDirectory) refreshInvokers(events ...*registry.ServiceEvent) {
-	var oldInvokers []protocol.Invoker
-
-	// in batch mode, it is safe to remove since we have the complete list of events.
-	if len(events) > 1 {
-		dir.cacheInvokersMap.Range(func(k, v interface{}) bool {
-			if !dir.eventMatched(k.(string), events) {
-				if invoker := dir.uncacheInvokerWithKey(k.(string)); invoker != nil {
-					oldInvokers = append(oldInvokers, invoker)
-				}
-			}
-			return true
-		})
+func (dir *RegistryDirectory) NotifyAll(events []*registry.ServiceEvent) {
+	go dir.refreshAllInvokers(events)
+}
+
+// refreshInvokers refreshes service's events.
+func (dir *RegistryDirectory) refreshInvokers(event *registry.ServiceEvent) {
+	logger.Debugf("refresh invokers with %+v", event)
+	var oldInvoker protocol.Invoker
+	if event != nil {
+		oldInvoker, _ = dir.cacheInvokerByEvent(event)
+	}
+	dir.setNewInvokers()
+	if oldInvoker != nil {
+		oldInvoker.Destroy()
 	}
+}
+
+// refreshAllInvokers the argument is the complete list of the service events,  we can safely assume any cached invoker
+// not in the incoming list can be removed.  It will ignore Action of serviceEvent.
+func (dir *RegistryDirectory) refreshAllInvokers(events []*registry.ServiceEvent) {
+	var (
+		oldInvokers []protocol.Invoker
+		addEvents   []*registry.ServiceEvent
+	)
 
+	// get need clear invokers from original invoker list
+	dir.cacheInvokersMap.Range(func(k, v interface{}) bool {
+		if !dir.eventMatched(k.(string), events) {
+			// delete unused invoker from cache
+			if invoker := dir.uncacheInvokerWithKey(k.(string)); invoker != nil {
+				oldInvokers = append(oldInvokers, invoker)
+			}
+		}
+		return true
+	})
+	// get need add invokers from events
 	for _, event := range events {
+		// Is the key (url.Key()) of cacheInvokersMap the best way?
+		if _, ok := dir.cacheInvokersMap.Load(event.Service.Key()); !ok {
+			event.Action = remoting.EventTypeAdd
+			addEvents = append(addEvents, event)
+		}
+	}
+	// loop the addEvents
+	for _, event := range addEvents {
 		logger.Debugf("registry update, result{%s}", event)
 		if oldInvoker, _ := dir.cacheInvokerByEvent(event); oldInvoker != nil {
 			oldInvokers = append(oldInvokers, oldInvoker)
 		}
 	}
-
-	if len(events) > 0 {
-		dir.setNewInvokers()
-	}
-
-	// After dir.cacheInvokers is updated,destroy the oldInvoker
-	// Ensure that no request will enter the oldInvoker
+	dir.setNewInvokers()
+	// destroy unused invokers
 	for _, invoker := range oldInvokers {
 		invoker.Destroy()
 	}
@@ -296,7 +317,7 @@ func (dir *RegistryDirectory) cacheInvoker(url *common.URL) protocol.Invoker {
 		newUrl := common.MergeUrl(url, referenceUrl)
 		dir.overrideUrl(newUrl)
 		if cacheInvoker, ok := dir.cacheInvokersMap.Load(newUrl.Key()); !ok {
-			logger.Debugf("service will be added in cache invokers: invokers url is  %s!", newUrl)
+			logger.Debugf("service will be added in cache invokers: invokers url is  %+v!", newUrl)
 			newInvoker := extension.GetProtocol(protocolwrapper.FILTER).Refer(*newUrl)
 			if newInvoker != nil {
 				dir.cacheInvokersMap.Store(newUrl.Key(), newInvoker)
@@ -407,7 +428,7 @@ func newReferenceConfigurationListener(dir *RegistryDirectory, url *common.URL)
 func (l *referenceConfigurationListener) Process(event *config_center.ConfigChangeEvent) {
 	l.BaseConfigurationListener.Process(event)
 	// FIXME: this doesn't trigger dir.overrideUrl()
-	l.directory.refreshInvokers()
+	l.directory.refreshInvokers(nil)
 }
 
 type consumerConfigurationListener struct {
@@ -434,5 +455,5 @@ func (l *consumerConfigurationListener) addNotifyListener(listener registry.Noti
 func (l *consumerConfigurationListener) Process(event *config_center.ConfigChangeEvent) {
 	l.BaseConfigurationListener.Process(event)
 	// FIXME: this doesn't trigger dir.overrideUrl()
-	l.directory.refreshInvokers()
+	l.directory.refreshInvokers(nil)
 }
diff --git a/registry/directory/directory_test.go b/registry/directory/directory_test.go
index f2b2f8edd..6c85ea08f 100644
--- a/registry/directory/directory_test.go
+++ b/registry/directory/directory_test.go
@@ -192,6 +192,30 @@ func Test_toGroupInvokers(t *testing.T) {
 	assert.Len(t, groupInvokers, 2)
 }
 
+func Test_RefreshUrl(t *testing.T) {
+	registryDirectory, mockRegistry := normalRegistryDir()
+	providerUrl, _ := common.NewURL("dubbo://0.0.0.0:20011/org.apache.dubbo-go.mockService",
+		common.WithParamsValue(constant.CLUSTER_KEY, "mock1"),
+		common.WithParamsValue(constant.GROUP_KEY, "group"),
+		common.WithParamsValue(constant.VERSION_KEY, "1.0.0"))
+	providerUrl2, _ := common.NewURL("dubbo://0.0.0.0:20012/org.apache.dubbo-go.mockService",
+		common.WithParamsValue(constant.CLUSTER_KEY, "mock1"),
+		common.WithParamsValue(constant.GROUP_KEY, "group"),
+		common.WithParamsValue(constant.VERSION_KEY, "1.0.0"))
+	time.Sleep(1e9)
+	assert.Len(t, registryDirectory.cacheInvokers, 3)
+	mockRegistry.MockEvent(&registry.ServiceEvent{Action: remoting.EventTypeAdd, Service: providerUrl})
+	time.Sleep(1e9)
+	assert.Len(t, registryDirectory.cacheInvokers, 4)
+	mockRegistry.MockEvents([]*registry.ServiceEvent{&registry.ServiceEvent{Action: remoting.EventTypeAdd, Service: providerUrl}})
+	time.Sleep(1e9)
+	assert.Len(t, registryDirectory.cacheInvokers, 1)
+	mockRegistry.MockEvents([]*registry.ServiceEvent{&registry.ServiceEvent{Action: remoting.EventTypeAdd, Service: providerUrl},
+		&registry.ServiceEvent{Action: remoting.EventTypeAdd, Service: providerUrl2}})
+	time.Sleep(1e9)
+	assert.Len(t, registryDirectory.cacheInvokers, 2)
+}
+
 func normalRegistryDir(noMockEvent ...bool) (*RegistryDirectory, *registry.MockRegistry) {
 	extension.SetProtocol(protocolwrapper.FILTER, protocolwrapper.NewMockProtocolFilter)
 
diff --git a/registry/mock_registry.go b/registry/mock_registry.go
index 10561d0f4..d84b52945 100644
--- a/registry/mock_registry.go
+++ b/registry/mock_registry.go
@@ -32,14 +32,16 @@ import (
 
 // MockRegistry is used as mock registry
 type MockRegistry struct {
-	listener  *listener
-	destroyed *atomic.Bool
+	listener   *listener
+	destroyed  *atomic.Bool
+	allAddress chan []*ServiceEvent
 }
 
 // NewMockRegistry creates a mock registry
 func NewMockRegistry(url *common.URL) (Registry, error) {
 	registry := &MockRegistry{
-		destroyed: atomic.NewBool(false),
+		destroyed:  atomic.NewBool(false),
+		allAddress: make(chan []*ServiceEvent),
 	}
 	listener := &listener{count: 0, registry: registry, listenChan: make(chan *ServiceEvent)}
 	registry.listener = listener
@@ -80,22 +82,12 @@ func (r *MockRegistry) subscribe(*common.URL) (Listener, error) {
 func (r *MockRegistry) Subscribe(url *common.URL, notifyListener NotifyListener) error {
 	go func() {
 		for {
-			if !r.IsAvailable() {
-				logger.Warnf("event listener game over.")
-				time.Sleep(time.Duration(3) * time.Second)
-				return
-			}
-
-			listener, err := r.subscribe(url)
-			if err != nil {
-				if !r.IsAvailable() {
-					logger.Warnf("event listener game over.")
-					return
-				}
-				time.Sleep(time.Duration(3) * time.Second)
+			t, listener := r.checkLoopSubscribe(url)
+			if t == 0 {
 				continue
+			} else if t == -1 {
+				return
 			}
-
 			for {
 				serviceEvent, err := listener.Next()
 				if err != nil {
@@ -109,6 +101,24 @@ func (r *MockRegistry) Subscribe(url *common.URL, notifyListener NotifyListener)
 			}
 		}
 	}()
+	go func() {
+		for {
+			t, _ := r.checkLoopSubscribe(url)
+			if t == 0 {
+				continue
+			} else if t == -1 {
+				return
+			}
+
+			for {
+				select {
+				case e := <-r.allAddress:
+					notifyListener.NotifyAll(e)
+					break
+				}
+			}
+		}
+	}()
 	return nil
 }
 
@@ -138,3 +148,27 @@ func (*listener) Close() {
 func (r *MockRegistry) MockEvent(event *ServiceEvent) {
 	r.listener.listenChan <- event
 }
+
+// nolint
+func (r *MockRegistry) MockEvents(events []*ServiceEvent) {
+	r.allAddress <- events
+}
+
+func (r *MockRegistry) checkLoopSubscribe(url *common.URL) (int, Listener) {
+	if !r.IsAvailable() {
+		logger.Warnf("event listener game over.")
+		time.Sleep(time.Duration(3) * time.Second)
+		return -1, nil
+	}
+
+	listener, err := r.subscribe(url)
+	if err != nil {
+		if !r.IsAvailable() {
+			logger.Warnf("event listener game over.")
+			return -1, nil
+		}
+		time.Sleep(time.Duration(3) * time.Second)
+		return 0, nil
+	}
+	return 1, listener
+}
diff --git a/registry/nacos/service_discovery.go b/registry/nacos/service_discovery.go
index 0e5ad8e69..b38e150e5 100644
--- a/registry/nacos/service_discovery.go
+++ b/registry/nacos/service_discovery.go
@@ -141,7 +141,8 @@ func (n *nacosServiceDiscovery) GetInstances(serviceName string) []registry.Serv
 		GroupName:   n.group,
 	})
 	if err != nil {
-		logger.Errorf("Could not query the instances for service: " + serviceName + ", group: " + n.group)
+		logger.Errorf("Could not query the instances for service: %+v, group: %+v . It happened err %+v",
+			serviceName, n.group, err)
 		return make([]registry.ServiceInstance, 0, 0)
 	}
 	res := make([]registry.ServiceInstance, 0, len(instances))
diff --git a/registry/protocol/protocol.go b/registry/protocol/protocol.go
index 69a31ef2f..00f7e8b2b 100644
--- a/registry/protocol/protocol.go
+++ b/registry/protocol/protocol.go
@@ -241,18 +241,23 @@ func newOverrideSubscribeListener(overriderUrl *common.URL, invoker protocol.Inv
 }
 
 // Notify will be triggered when a service change notification is received.
-func (nl *overrideSubscribeListener) Notify(events ...*registry.ServiceEvent) {
-	if len(events) == 0 {
-		return
-	}
+func (nl *overrideSubscribeListener) Notify(event *registry.ServiceEvent) {
 
-	event := events[0]
 	if isMatched(&(event.Service), nl.url) && event.Action == remoting.EventTypeAdd {
 		nl.configurator = extension.GetDefaultConfigurator(&(event.Service))
 		nl.doOverrideIfNecessary()
 	}
 }
 
+func (nl *overrideSubscribeListener) NotifyAll(events []*registry.ServiceEvent) {
+	if len(events) == 0 {
+		return
+	}
+
+	event := events[0]
+	nl.Notify(event)
+}
+
 func (nl *overrideSubscribeListener) doOverrideIfNecessary() {
 	providerUrl := getProviderUrl(nl.originInvoker)
 	key := getCacheKey(providerUrl)
diff --git a/registry/registry.go b/registry/registry.go
index 2225d2c1f..e61576ac8 100644
--- a/registry/registry.go
+++ b/registry/registry.go
@@ -72,7 +72,11 @@ type NotifyListener interface {
 	// events are passed in, it's considered as a complete list, on the other side, if one single event is
 	// passed in, then it's a incremental event. Pls. note when a list (instead of single event) comes,
 	// the impl of NotifyListener may abandon the accumulated result from previous notifications.
-	Notify(...*ServiceEvent)
+	Notify(*ServiceEvent)
+	// Notify the events are complete Service Event List.
+	// The argument of events []*ServiceEvent is equal to urls []*URL, because Action of ServiceEvent will be ignored.
+	// If your registry center can only get all urls but can't get individual event, you should use this one.
+	NotifyAll([]*ServiceEvent)
 }
 
 // Listener Deprecated!
-- 
GitLab