Skip to content
Snippets Groups Projects
Commit 6cd6aacc authored by Joe Zou's avatar Joe Zou Committed by GitHub
Browse files

Merge pull request #741 from beiwei30/address-batch-mode

Address notification batch mode
parents 14ef8e30 26659177
No related branches found
No related tags found
No related merge requests found
......@@ -104,72 +104,120 @@ func (dir *RegistryDirectory) subscribe(url *common.URL) {
}
// Notify monitor changes from registry,and update the cacheServices
func (dir *RegistryDirectory) Notify(event *registry.ServiceEvent) {
go dir.update(event)
func (dir *RegistryDirectory) Notify(events ...*registry.ServiceEvent) {
go dir.refreshInvokers(events...)
}
// update the cacheServices and subscribe service from registry
func (dir *RegistryDirectory) update(res *registry.ServiceEvent) {
if res == nil {
return
// refreshInvokers refreshes service's events. It supports two modes: incremental mode and batch mode. If a single
// service event is passed in, then it is incremental mode, and if an array of service events are passed in, it is
// batch mode, in this mode, we assume the registry center have the complete list of the service events, therefore
// in this case, we can safely assume any cached invoker not in the incoming list can be removed. It is necessary
// since in batch mode, the register center handles the different type of events by itself, then notify the directory
// a batch of 'Update' events, instead of omit the different type of event one by one.
func (dir *RegistryDirectory) refreshInvokers(events ...*registry.ServiceEvent) {
var oldInvokers []protocol.Invoker
// in batch mode, it is safe to remove since we have the complete list of events.
if len(events) > 1 {
dir.cacheInvokersMap.Range(func(k, v interface{}) bool {
if !dir.eventMatched(k.(string), events) {
if invoker := dir.uncacheInvokerWithKey(k.(string)); invoker != nil {
oldInvokers = append(oldInvokers, invoker)
}
}
return true
})
}
logger.Debugf("registry update, result{%s}", res)
logger.Debugf("update service name: %s!", res.Service)
dir.refreshInvokers(res)
}
func (dir *RegistryDirectory) refreshInvokers(res *registry.ServiceEvent) {
var (
url *common.URL
oldInvoker protocol.Invoker = nil
)
// judge is override or others
if res != nil {
url = &res.Service
// 1.for override url in 2.6.x
if url.Protocol == constant.OVERRIDE_PROTOCOL ||
url.GetParam(constant.CATEGORY_KEY, constant.DEFAULT_CATEGORY) == constant.CONFIGURATORS_CATEGORY {
dir.configurators = append(dir.configurators, extension.GetDefaultConfigurator(url))
url = nil
} else if url.Protocol == constant.ROUTER_PROTOCOL || // 2.for router
url.GetParam(constant.CATEGORY_KEY, constant.DEFAULT_CATEGORY) == constant.ROUTER_CATEGORY {
url = nil
for _, event := range events {
logger.Debugf("registry update, result{%s}", event)
if oldInvoker, _ := dir.cacheInvokerByEvent(event); oldInvoker != nil {
oldInvokers = append(oldInvokers, oldInvoker)
}
switch res.Action {
case remoting.EventTypeAdd, remoting.EventTypeUpdate:
logger.Infof("selector add service url{%s}", res.Service)
}
var urls []*common.URL
for _, v := range config.GetRouterURLSet().Values() {
urls = append(urls, v.(*common.URL))
}
if len(events) > 0 {
dir.setNewInvokers()
}
if len(urls) > 0 {
dir.SetRouters(urls)
}
oldInvoker = dir.cacheInvoker(url)
case remoting.EventTypeDel:
oldInvoker = dir.uncacheInvoker(url)
logger.Infof("selector delete service url{%s}", res.Service)
default:
return
// After dir.cacheInvokers is updated,destroy the oldInvoker
// Ensure that no request will enter the oldInvoker
for _, invoker := range oldInvokers {
invoker.Destroy()
}
}
// eventMatched checks if a cached invoker appears in the incoming invoker list, if no, then it is safe to remove.
func (dir *RegistryDirectory) eventMatched(key string, events []*registry.ServiceEvent) bool {
for _, event := range events {
if dir.invokerCacheKey(&event.Service) == key {
return true
}
}
return false
}
// invokerCacheKey generates the key in the cache for a given URL.
func (dir *RegistryDirectory) invokerCacheKey(url *common.URL) string {
referenceUrl := dir.GetDirectoryUrl().SubURL
newUrl := common.MergeUrl(url, referenceUrl)
return newUrl.Key()
}
// setNewInvokers groups the invokers from the cache first, then set the result to both directory and router chain.
func (dir *RegistryDirectory) setNewInvokers() {
newInvokers := dir.toGroupInvokers()
dir.listenerLock.Lock()
defer dir.listenerLock.Unlock()
dir.cacheInvokers = newInvokers
if res != nil {
dir.RouterChain().SetInvokers(newInvokers)
dir.RouterChain().SetInvokers(newInvokers)
}
// cacheInvokerByEvent caches invokers from the service event
func (dir *RegistryDirectory) cacheInvokerByEvent(event *registry.ServiceEvent) (protocol.Invoker, error) {
// judge is override or others
if event != nil {
u := dir.convertUrl(event)
switch event.Action {
case remoting.EventTypeAdd, remoting.EventTypeUpdate:
logger.Infof("selector add service url{%s}", event.Service)
// FIXME: routers are built in every address notification?
dir.configRouters()
return dir.cacheInvoker(u), nil
case remoting.EventTypeDel:
logger.Infof("selector delete service url{%s}", event.Service)
return dir.uncacheInvoker(u), nil
default:
return nil, fmt.Errorf("illegal event type: %v", event.Action)
}
}
dir.listenerLock.Unlock()
// After dir.cacheInvokers is updated,destroy the oldInvoker
// Ensure that no request will enter the oldInvoker
if oldInvoker != nil {
oldInvoker.Destroy()
return nil, nil
}
// configRouters configures dynamic routers into the router chain, but, the current impl is incorrect, see FIXME above.
func (dir *RegistryDirectory) configRouters() {
var urls []*common.URL
for _, v := range config.GetRouterURLSet().Values() {
urls = append(urls, v.(*common.URL))
}
if len(urls) > 0 {
dir.SetRouters(urls)
}
}
// convertUrl processes override:// and router://
func (dir *RegistryDirectory) convertUrl(res *registry.ServiceEvent) *common.URL {
ret := &res.Service
if ret.Protocol == constant.OVERRIDE_PROTOCOL || // 1.for override url in 2.6.x
ret.GetParam(constant.CATEGORY_KEY, constant.DEFAULT_CATEGORY) == constant.CONFIGURATORS_CATEGORY {
dir.configurators = append(dir.configurators, extension.GetDefaultConfigurator(ret))
ret = nil
} else if ret.Protocol == constant.ROUTER_PROTOCOL || // 2.for router
ret.GetParam(constant.CATEGORY_KEY, constant.DEFAULT_CATEGORY) == constant.ROUTER_CATEGORY {
ret = nil
}
return ret
}
func (dir *RegistryDirectory) toGroupInvokers() []protocol.Invoker {
......@@ -215,11 +263,15 @@ func (dir *RegistryDirectory) toGroupInvokers() []protocol.Invoker {
return groupInvokersList
}
// uncacheInvoker will return abandoned Invoker,if no Invoker to be abandoned,return nil
// uncacheInvoker will return abandoned Invoker, if no Invoker to be abandoned, return nil
func (dir *RegistryDirectory) uncacheInvoker(url *common.URL) protocol.Invoker {
logger.Debugf("service will be deleted in cache invokers: invokers key is %s!", url.Key())
if cacheInvoker, ok := dir.cacheInvokersMap.Load(url.Key()); ok {
dir.cacheInvokersMap.Delete(url.Key())
return dir.uncacheInvokerWithKey(url.Key())
}
func (dir *RegistryDirectory) uncacheInvokerWithKey(key string) protocol.Invoker {
logger.Debugf("service will be deleted in cache invokers: invokers key is %s!", key)
if cacheInvoker, ok := dir.cacheInvokersMap.Load(key); ok {
dir.cacheInvokersMap.Delete(key)
return cacheInvoker.(protocol.Invoker)
}
return nil
......@@ -250,6 +302,12 @@ func (dir *RegistryDirectory) cacheInvoker(url *common.URL) protocol.Invoker {
dir.cacheInvokersMap.Store(newUrl.Key(), newInvoker)
}
} else {
// if cached invoker has the same URL with the new URL, then no need to re-refer, and no need to destroy
// the old invoker.
if common.IsEquals(*newUrl, cacheInvoker.(protocol.Invoker).GetUrl()) {
return nil
}
logger.Debugf("service will be updated in cache invokers: new invoker url is %s, old invoker url is %s", newUrl, cacheInvoker.(protocol.Invoker).GetUrl())
newInvoker := extension.GetProtocol(protocolwrapper.FILTER).Refer(*newUrl)
if newInvoker != nil {
......@@ -348,7 +406,8 @@ func newReferenceConfigurationListener(dir *RegistryDirectory, url *common.URL)
// Process handle events and update Invokers
func (l *referenceConfigurationListener) Process(event *config_center.ConfigChangeEvent) {
l.BaseConfigurationListener.Process(event)
l.directory.refreshInvokers(nil)
// FIXME: this doesn't trigger dir.overrideUrl()
l.directory.refreshInvokers()
}
type consumerConfigurationListener struct {
......@@ -374,5 +433,6 @@ func (l *consumerConfigurationListener) addNotifyListener(listener registry.Noti
// Process handles events from Configuration Center and update Invokers
func (l *consumerConfigurationListener) Process(event *config_center.ConfigChangeEvent) {
l.BaseConfigurationListener.Process(event)
l.directory.refreshInvokers(nil)
// FIXME: this doesn't trigger dir.overrideUrl()
l.directory.refreshInvokers()
}
......@@ -242,7 +242,12 @@ func newOverrideSubscribeListener(overriderUrl *common.URL, invoker protocol.Inv
}
// Notify will be triggered when a service change notification is received.
func (nl *overrideSubscribeListener) Notify(event *registry.ServiceEvent) {
func (nl *overrideSubscribeListener) Notify(events ...*registry.ServiceEvent) {
if len(events) == 0 {
return
}
event := events[0]
if isMatched(&(event.Service), nl.url) && event.Action == remoting.EventTypeAdd {
nl.configurator = extension.GetDefaultConfigurator(&(event.Service))
nl.doOverrideIfNecessary()
......
......@@ -68,8 +68,11 @@ type Registry interface {
// nolint
type NotifyListener interface {
// Notify supports notifications on the service interface and the dimension of the data type.
Notify(*ServiceEvent)
// Notify supports notifications on the service interface and the dimension of the data type. When a list of
// events are passed in, it's considered as a complete list, on the other side, if one single event is
// passed in, then it's a incremental event. Pls. note when a list (instead of single event) comes,
// the impl of NotifyListener may abandon the accumulated result from previous notifications.
Notify(...*ServiceEvent)
}
// Listener Deprecated!
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment