Skip to content
Snippets Groups Projects
Commit a2eea0b7 authored by cvictory's avatar cvictory
Browse files

fix review issue: add constraint of Action value of ServiceEvent in NotifyAll func

parent 35ccd359
No related branches found
No related tags found
No related merge requests found
...@@ -67,6 +67,7 @@ type RegistryDirectory struct { ...@@ -67,6 +67,7 @@ type RegistryDirectory struct {
referenceConfigurationListener *referenceConfigurationListener referenceConfigurationListener *referenceConfigurationListener
serviceKey string serviceKey string
forbidden atomic.Bool forbidden atomic.Bool
registerLock sync.Mutex // this lock if for register
} }
// NewRegistryDirectory will create a new RegistryDirectory // NewRegistryDirectory will create a new RegistryDirectory
...@@ -111,6 +112,7 @@ func (dir *RegistryDirectory) Notify(event *registry.ServiceEvent) { ...@@ -111,6 +112,7 @@ func (dir *RegistryDirectory) Notify(event *registry.ServiceEvent) {
go dir.refreshInvokers(event) go dir.refreshInvokers(event)
} }
// NotifyAll notify the events that are complete Service Event List.
func (dir *RegistryDirectory) NotifyAll(events []*registry.ServiceEvent) { func (dir *RegistryDirectory) NotifyAll(events []*registry.ServiceEvent) {
go dir.refreshAllInvokers(events) go dir.refreshAllInvokers(events)
} }
...@@ -129,38 +131,49 @@ func (dir *RegistryDirectory) refreshInvokers(event *registry.ServiceEvent) { ...@@ -129,38 +131,49 @@ func (dir *RegistryDirectory) refreshInvokers(event *registry.ServiceEvent) {
} }
// refreshAllInvokers the argument is the complete list of the service events, we can safely assume any cached invoker // refreshAllInvokers the argument is the complete list of the service events, we can safely assume any cached invoker
// not in the incoming list can be removed. It will ignore Action of serviceEvent. // not in the incoming list can be removed. The Action of serviceEvent should be EventTypeUpdate.
func (dir *RegistryDirectory) refreshAllInvokers(events []*registry.ServiceEvent) { func (dir *RegistryDirectory) refreshAllInvokers(events []*registry.ServiceEvent) {
var ( var (
oldInvokers []protocol.Invoker oldInvokers []protocol.Invoker
addEvents []*registry.ServiceEvent addEvents []*registry.ServiceEvent
) )
// loop the events to check the Action should be EventTypeUpdate.
// get need clear invokers from original invoker list
dir.cacheInvokersMap.Range(func(k, v interface{}) bool {
if !dir.eventMatched(k.(string), events) {
// delete unused invoker from cache
if invoker := dir.uncacheInvokerWithKey(k.(string)); invoker != nil {
oldInvokers = append(oldInvokers, invoker)
}
}
return true
})
// get need add invokers from events
for _, event := range events { for _, event := range events {
// Is the key (url.Key()) of cacheInvokersMap the best way? if event.Action != remoting.EventTypeUpdate {
if _, ok := dir.cacheInvokersMap.Load(event.Service.Key()); !ok { panic("Your implements of register center is wrong, " +
event.Action = remoting.EventTypeAdd "please check the Action of ServiceEvent should be EventTypeUpdate")
addEvents = append(addEvents, event) return
} }
} }
// loop the addEvents func() {
for _, event := range addEvents { // this lock is work at batch update of InvokeCache
logger.Debugf("registry update, result{%s}", event) dir.registerLock.Lock()
if oldInvoker, _ := dir.cacheInvokerByEvent(event); oldInvoker != nil { defer dir.registerLock.Unlock()
oldInvokers = append(oldInvokers, oldInvoker) // get need clear invokers from original invoker list
dir.cacheInvokersMap.Range(func(k, v interface{}) bool {
if !dir.eventMatched(k.(string), events) {
// delete unused invoker from cache
if invoker := dir.uncacheInvokerWithKey(k.(string)); invoker != nil {
oldInvokers = append(oldInvokers, invoker)
}
}
return true
})
// get need add invokers from events
for _, event := range events {
// Is the key (url.Key()) of cacheInvokersMap the best way?
if _, ok := dir.cacheInvokersMap.Load(event.Service.Key()); !ok {
addEvents = append(addEvents, event)
}
} }
} // loop the updateEvents
for _, event := range addEvents {
logger.Debugf("registry update, result{%s}", event)
if oldInvoker, _ := dir.cacheInvokerByEvent(event); oldInvoker != nil {
oldInvokers = append(oldInvokers, oldInvoker)
}
}
}()
dir.setNewInvokers() dir.setNewInvokers()
// destroy unused invokers // destroy unused invokers
for _, invoker := range oldInvokers { for _, invoker := range oldInvokers {
......
...@@ -73,8 +73,8 @@ type NotifyListener interface { ...@@ -73,8 +73,8 @@ type NotifyListener interface {
// passed in, then it's a incremental event. Pls. note when a list (instead of single event) comes, // passed in, then it's a incremental event. Pls. note when a list (instead of single event) comes,
// the impl of NotifyListener may abandon the accumulated result from previous notifications. // the impl of NotifyListener may abandon the accumulated result from previous notifications.
Notify(*ServiceEvent) Notify(*ServiceEvent)
// Notify the events are complete Service Event List. // NotifyAll the events are complete Service Event List.
// The argument of events []*ServiceEvent is equal to urls []*URL, because Action of ServiceEvent will be ignored. // The argument of events []*ServiceEvent is equal to urls []*URL, The Action of serviceEvent should be EventTypeUpdate.
// If your registry center can only get all urls but can't get individual event, you should use this one. // If your registry center can only get all urls but can't get individual event, you should use this one.
NotifyAll([]*ServiceEvent) NotifyAll([]*ServiceEvent)
} }
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment