Skip to content
Snippets Groups Projects
Commit 376adec8 authored by cvictory's avatar cvictory
Browse files

fix #750. 1.fix empty ServiceEvent, we should clear all invokers 2.we want to...

fix #750.  1.fix empty ServiceEvent, we should clear all invokers 2.we want to notify the address with complete address list.
parent 38c27b5e
No related branches found
No related tags found
No related merge requests found
......@@ -104,44 +104,65 @@ func (dir *RegistryDirectory) subscribe(url *common.URL) {
}
// Notify monitor changes from registry,and update the cacheServices
func (dir *RegistryDirectory) Notify(events ...*registry.ServiceEvent) {
go dir.refreshInvokers(events...)
func (dir *RegistryDirectory) Notify(event *registry.ServiceEvent) {
if event == nil {
return
}
go dir.refreshInvokers(event)
}
// refreshInvokers refreshes service's events. It supports two modes: incremental mode and batch mode. If a single
// service event is passed in, then it is incremental mode, and if an array of service events are passed in, it is
// batch mode, in this mode, we assume the registry center have the complete list of the service events, therefore
// in this case, we can safely assume any cached invoker not in the incoming list can be removed. It is necessary
// since in batch mode, the register center handles the different type of events by itself, then notify the directory
// a batch of 'Update' events, instead of omit the different type of event one by one.
func (dir *RegistryDirectory) refreshInvokers(events ...*registry.ServiceEvent) {
var oldInvokers []protocol.Invoker
// in batch mode, it is safe to remove since we have the complete list of events.
if len(events) > 1 {
dir.cacheInvokersMap.Range(func(k, v interface{}) bool {
if !dir.eventMatched(k.(string), events) {
if invoker := dir.uncacheInvokerWithKey(k.(string)); invoker != nil {
oldInvokers = append(oldInvokers, invoker)
}
}
return true
})
func (dir *RegistryDirectory) NotifyAll(events []*registry.ServiceEvent) {
go dir.refreshAllInvokers(events)
}
// refreshInvokers refreshes service's events.
func (dir *RegistryDirectory) refreshInvokers(event *registry.ServiceEvent) {
logger.Debugf("refresh invokers with %+v", event)
var oldInvoker protocol.Invoker
if event != nil {
oldInvoker, _ = dir.cacheInvokerByEvent(event)
}
dir.setNewInvokers()
if oldInvoker != nil {
oldInvoker.Destroy()
}
}
// refreshAllInvokers the argument is the complete list of the service events, we can safely assume any cached invoker
// not in the incoming list can be removed. It will ignore Action of serviceEvent.
func (dir *RegistryDirectory) refreshAllInvokers(events []*registry.ServiceEvent) {
var (
oldInvokers []protocol.Invoker
addEvents []*registry.ServiceEvent
)
// get need clear invokers from original invoker list
dir.cacheInvokersMap.Range(func(k, v interface{}) bool {
if !dir.eventMatched(k.(string), events) {
// delete unused invoker from cache
if invoker := dir.uncacheInvokerWithKey(k.(string)); invoker != nil {
oldInvokers = append(oldInvokers, invoker)
}
}
return true
})
// get need add invokers from events
for _, event := range events {
// Is the key (url.Key()) of cacheInvokersMap the best way?
if _, ok := dir.cacheInvokersMap.Load(event.Service.Key()); !ok {
event.Action = remoting.EventTypeAdd
addEvents = append(addEvents, event)
}
}
// loop the addEvents
for _, event := range addEvents {
logger.Debugf("registry update, result{%s}", event)
if oldInvoker, _ := dir.cacheInvokerByEvent(event); oldInvoker != nil {
oldInvokers = append(oldInvokers, oldInvoker)
}
}
if len(events) > 0 {
dir.setNewInvokers()
}
// After dir.cacheInvokers is updated,destroy the oldInvoker
// Ensure that no request will enter the oldInvoker
dir.setNewInvokers()
// destroy unused invokers
for _, invoker := range oldInvokers {
invoker.Destroy()
}
......@@ -296,7 +317,7 @@ func (dir *RegistryDirectory) cacheInvoker(url *common.URL) protocol.Invoker {
newUrl := common.MergeUrl(url, referenceUrl)
dir.overrideUrl(newUrl)
if cacheInvoker, ok := dir.cacheInvokersMap.Load(newUrl.Key()); !ok {
logger.Debugf("service will be added in cache invokers: invokers url is %s!", newUrl)
logger.Debugf("service will be added in cache invokers: invokers url is %+v!", newUrl)
newInvoker := extension.GetProtocol(protocolwrapper.FILTER).Refer(*newUrl)
if newInvoker != nil {
dir.cacheInvokersMap.Store(newUrl.Key(), newInvoker)
......@@ -407,7 +428,7 @@ func newReferenceConfigurationListener(dir *RegistryDirectory, url *common.URL)
func (l *referenceConfigurationListener) Process(event *config_center.ConfigChangeEvent) {
l.BaseConfigurationListener.Process(event)
// FIXME: this doesn't trigger dir.overrideUrl()
l.directory.refreshInvokers()
l.directory.refreshInvokers(nil)
}
type consumerConfigurationListener struct {
......@@ -434,5 +455,5 @@ func (l *consumerConfigurationListener) addNotifyListener(listener registry.Noti
func (l *consumerConfigurationListener) Process(event *config_center.ConfigChangeEvent) {
l.BaseConfigurationListener.Process(event)
// FIXME: this doesn't trigger dir.overrideUrl()
l.directory.refreshInvokers()
l.directory.refreshInvokers(nil)
}
......@@ -192,6 +192,30 @@ func Test_toGroupInvokers(t *testing.T) {
assert.Len(t, groupInvokers, 2)
}
func Test_RefreshUrl(t *testing.T) {
registryDirectory, mockRegistry := normalRegistryDir()
providerUrl, _ := common.NewURL("dubbo://0.0.0.0:20011/org.apache.dubbo-go.mockService",
common.WithParamsValue(constant.CLUSTER_KEY, "mock1"),
common.WithParamsValue(constant.GROUP_KEY, "group"),
common.WithParamsValue(constant.VERSION_KEY, "1.0.0"))
providerUrl2, _ := common.NewURL("dubbo://0.0.0.0:20012/org.apache.dubbo-go.mockService",
common.WithParamsValue(constant.CLUSTER_KEY, "mock1"),
common.WithParamsValue(constant.GROUP_KEY, "group"),
common.WithParamsValue(constant.VERSION_KEY, "1.0.0"))
time.Sleep(1e9)
assert.Len(t, registryDirectory.cacheInvokers, 3)
mockRegistry.MockEvent(&registry.ServiceEvent{Action: remoting.EventTypeAdd, Service: providerUrl})
time.Sleep(1e9)
assert.Len(t, registryDirectory.cacheInvokers, 4)
mockRegistry.MockEvents([]*registry.ServiceEvent{&registry.ServiceEvent{Action: remoting.EventTypeAdd, Service: providerUrl}})
time.Sleep(1e9)
assert.Len(t, registryDirectory.cacheInvokers, 1)
mockRegistry.MockEvents([]*registry.ServiceEvent{&registry.ServiceEvent{Action: remoting.EventTypeAdd, Service: providerUrl},
&registry.ServiceEvent{Action: remoting.EventTypeAdd, Service: providerUrl2}})
time.Sleep(1e9)
assert.Len(t, registryDirectory.cacheInvokers, 2)
}
func normalRegistryDir(noMockEvent ...bool) (*RegistryDirectory, *registry.MockRegistry) {
extension.SetProtocol(protocolwrapper.FILTER, protocolwrapper.NewMockProtocolFilter)
......
......@@ -32,14 +32,16 @@ import (
// MockRegistry is used as mock registry
type MockRegistry struct {
listener *listener
destroyed *atomic.Bool
listener *listener
destroyed *atomic.Bool
allAddress chan []*ServiceEvent
}
// NewMockRegistry creates a mock registry
func NewMockRegistry(url *common.URL) (Registry, error) {
registry := &MockRegistry{
destroyed: atomic.NewBool(false),
destroyed: atomic.NewBool(false),
allAddress: make(chan []*ServiceEvent),
}
listener := &listener{count: 0, registry: registry, listenChan: make(chan *ServiceEvent)}
registry.listener = listener
......@@ -80,22 +82,12 @@ func (r *MockRegistry) subscribe(*common.URL) (Listener, error) {
func (r *MockRegistry) Subscribe(url *common.URL, notifyListener NotifyListener) error {
go func() {
for {
if !r.IsAvailable() {
logger.Warnf("event listener game over.")
time.Sleep(time.Duration(3) * time.Second)
return
}
listener, err := r.subscribe(url)
if err != nil {
if !r.IsAvailable() {
logger.Warnf("event listener game over.")
return
}
time.Sleep(time.Duration(3) * time.Second)
t, listener := r.checkLoopSubscribe(url)
if t == 0 {
continue
} else if t == -1 {
return
}
for {
serviceEvent, err := listener.Next()
if err != nil {
......@@ -109,6 +101,24 @@ func (r *MockRegistry) Subscribe(url *common.URL, notifyListener NotifyListener)
}
}
}()
go func() {
for {
t, _ := r.checkLoopSubscribe(url)
if t == 0 {
continue
} else if t == -1 {
return
}
for {
select {
case e := <-r.allAddress:
notifyListener.NotifyAll(e)
break
}
}
}
}()
return nil
}
......@@ -138,3 +148,27 @@ func (*listener) Close() {
func (r *MockRegistry) MockEvent(event *ServiceEvent) {
r.listener.listenChan <- event
}
// nolint
func (r *MockRegistry) MockEvents(events []*ServiceEvent) {
r.allAddress <- events
}
func (r *MockRegistry) checkLoopSubscribe(url *common.URL) (int, Listener) {
if !r.IsAvailable() {
logger.Warnf("event listener game over.")
time.Sleep(time.Duration(3) * time.Second)
return -1, nil
}
listener, err := r.subscribe(url)
if err != nil {
if !r.IsAvailable() {
logger.Warnf("event listener game over.")
return -1, nil
}
time.Sleep(time.Duration(3) * time.Second)
return 0, nil
}
return 1, listener
}
......@@ -141,7 +141,8 @@ func (n *nacosServiceDiscovery) GetInstances(serviceName string) []registry.Serv
GroupName: n.group,
})
if err != nil {
logger.Errorf("Could not query the instances for service: " + serviceName + ", group: " + n.group)
logger.Errorf("Could not query the instances for service: %+v, group: %+v . It happened err %+v",
serviceName, n.group, err)
return make([]registry.ServiceInstance, 0, 0)
}
res := make([]registry.ServiceInstance, 0, len(instances))
......
......@@ -241,18 +241,23 @@ func newOverrideSubscribeListener(overriderUrl *common.URL, invoker protocol.Inv
}
// Notify will be triggered when a service change notification is received.
func (nl *overrideSubscribeListener) Notify(events ...*registry.ServiceEvent) {
if len(events) == 0 {
return
}
func (nl *overrideSubscribeListener) Notify(event *registry.ServiceEvent) {
event := events[0]
if isMatched(&(event.Service), nl.url) && event.Action == remoting.EventTypeAdd {
nl.configurator = extension.GetDefaultConfigurator(&(event.Service))
nl.doOverrideIfNecessary()
}
}
func (nl *overrideSubscribeListener) NotifyAll(events []*registry.ServiceEvent) {
if len(events) == 0 {
return
}
event := events[0]
nl.Notify(event)
}
func (nl *overrideSubscribeListener) doOverrideIfNecessary() {
providerUrl := getProviderUrl(nl.originInvoker)
key := getCacheKey(providerUrl)
......
......@@ -72,7 +72,11 @@ type NotifyListener interface {
// events are passed in, it's considered as a complete list, on the other side, if one single event is
// passed in, then it's a incremental event. Pls. note when a list (instead of single event) comes,
// the impl of NotifyListener may abandon the accumulated result from previous notifications.
Notify(...*ServiceEvent)
Notify(*ServiceEvent)
// Notify the events are complete Service Event List.
// The argument of events []*ServiceEvent is equal to urls []*URL, because Action of ServiceEvent will be ignored.
// If your registry center can only get all urls but can't get individual event, you should use this one.
NotifyAll([]*ServiceEvent)
}
// Listener Deprecated!
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment