Skip to content
Snippets Groups Projects
Commit a192291b authored by Ian Luo's avatar Ian Luo
Browse files

address batch mode notification

parent bb197cc7
No related branches found
No related tags found
No related merge requests found
......@@ -104,72 +104,129 @@ func (dir *RegistryDirectory) subscribe(url *common.URL) {
}
// Notify monitor changes from registry,and update the cacheServices
func (dir *RegistryDirectory) Notify(event *registry.ServiceEvent) {
go dir.update(event)
func (dir *RegistryDirectory) Notify(event ...*registry.ServiceEvent) {
go dir.update(event...)
}
// update the cacheServices and subscribe service from registry
func (dir *RegistryDirectory) update(res *registry.ServiceEvent) {
if res == nil {
return
func (dir *RegistryDirectory) update(events ...*registry.ServiceEvent) {
for _, event := range events {
logger.Debugf("registry update, result{%s}", event)
logger.Debugf("update service name: %s!", event.Service)
}
logger.Debugf("registry update, result{%s}", res)
logger.Debugf("update service name: %s!", res.Service)
dir.refreshInvokers(res)
dir.refreshInvokers(events...)
}
func (dir *RegistryDirectory) refreshInvokers(res *registry.ServiceEvent) {
var (
url *common.URL
oldInvoker protocol.Invoker = nil
)
// judge is override or others
if res != nil {
url = &res.Service
// 1.for override url in 2.6.x
if url.Protocol == constant.OVERRIDE_PROTOCOL ||
url.GetParam(constant.CATEGORY_KEY, constant.DEFAULT_CATEGORY) == constant.CONFIGURATORS_CATEGORY {
dir.configurators = append(dir.configurators, extension.GetDefaultConfigurator(url))
url = nil
} else if url.Protocol == constant.ROUTER_PROTOCOL || // 2.for router
url.GetParam(constant.CATEGORY_KEY, constant.DEFAULT_CATEGORY) == constant.ROUTER_CATEGORY {
url = nil
// refreshInvokers refreshes service's events. It supports two modes: incremental mode and batch mode. If a single
// service event is passed in, then it is incremental mode, and if an array of service events are passed in, it is
// batch mode, in this mode, we assume the registry center have the complete list of the service events, therefore
// in this case, we can safely assume any cached invoker not in the incoming list can be removed. It is necessary
// since in batch mode, the register center handles the different type of events by itself, then notify the directory
// a batch of 'Update' events, instead of omit the different type of event one by one.
func (dir *RegistryDirectory) refreshInvokers(events ...*registry.ServiceEvent) {
var oldInvokers []protocol.Invoker
// in batch mode, it is safe to remove since we have the complete list of events.
if events != nil && len(events) > 1 {
dir.cacheInvokersMap.Range(func(k, v interface{}) bool {
if !dir.eventMatched(k.(string), events) {
if invoker := dir.uncacheInvokerWithKey(k.(string)); invoker != nil {
oldInvokers = append(oldInvokers, invoker)
}
}
return true
})
}
for _, event := range events {
if oldInvoker, _ := dir.cacheInvokerByEvent(event); oldInvoker != nil {
oldInvokers = append(oldInvokers, oldInvoker)
}
switch res.Action {
case remoting.EventTypeAdd, remoting.EventTypeUpdate:
logger.Infof("selector add service url{%s}", res.Service)
}
var urls []*common.URL
for _, v := range config.GetRouterURLSet().Values() {
urls = append(urls, v.(*common.URL))
}
if events != nil && len(events) > 0 {
dir.setNewInvokers()
}
if len(urls) > 0 {
dir.SetRouters(urls)
}
oldInvoker = dir.cacheInvoker(url)
case remoting.EventTypeDel:
oldInvoker = dir.uncacheInvoker(url)
logger.Infof("selector delete service url{%s}", res.Service)
default:
return
// After dir.cacheInvokers is updated,destroy the oldInvoker
// Ensure that no request will enter the oldInvoker
for _, invoker := range oldInvokers {
invoker.Destroy()
}
}
// eventMatched checks if a cached invoker appears in the incoming invoker list, if no, then it is safe to remove.
func (dir *RegistryDirectory) eventMatched(key string, events []*registry.ServiceEvent) bool {
for _, event := range events {
if dir.invokerCacheKey(&event.Service) == key {
return true
}
}
return false
}
// invokerCacheKey generates the key in the cache for a given URL.
func (dir *RegistryDirectory) invokerCacheKey(url *common.URL) string {
referenceUrl := dir.GetDirectoryUrl().SubURL
newUrl := common.MergeUrl(url, referenceUrl)
return newUrl.Key()
}
// setNewInvokers groups the invokers from the cache first, then set the result to both directory and router chain.
func (dir *RegistryDirectory) setNewInvokers() {
newInvokers := dir.toGroupInvokers()
dir.listenerLock.Lock()
defer dir.listenerLock.Unlock()
dir.cacheInvokers = newInvokers
if res != nil {
dir.RouterChain().SetInvokers(newInvokers)
dir.RouterChain().SetInvokers(newInvokers)
}
// cacheInvokerByEvent caches invokers from the service event
func (dir *RegistryDirectory) cacheInvokerByEvent(event *registry.ServiceEvent) (protocol.Invoker, error) {
// judge is override or others
if event != nil {
u := dir.convertUrl(event)
switch event.Action {
case remoting.EventTypeAdd, remoting.EventTypeUpdate:
logger.Infof("selector add service url{%s}", event.Service)
// FIXME: routers are built in every address notification?
dir.configRouters()
return dir.cacheInvoker(u), nil
case remoting.EventTypeDel:
logger.Infof("selector delete service url{%s}", event.Service)
return dir.uncacheInvoker(u), nil
default:
return nil, fmt.Errorf("illegal event type: %v", event.Action)
}
}
dir.listenerLock.Unlock()
// After dir.cacheInvokers is updated,destroy the oldInvoker
// Ensure that no request will enter the oldInvoker
if oldInvoker != nil {
oldInvoker.Destroy()
return nil, nil
}
// configRouters configures dynamic routers into the router chain, but, the current impl is incorrect, see FIXME above.
func (dir *RegistryDirectory) configRouters() {
var urls []*common.URL
for _, v := range config.GetRouterURLSet().Values() {
urls = append(urls, v.(*common.URL))
}
if len(urls) > 0 {
dir.SetRouters(urls)
}
}
// convertUrl processes override:// and router://
func (dir *RegistryDirectory) convertUrl(res *registry.ServiceEvent) *common.URL {
ret := &res.Service
if ret.Protocol == constant.OVERRIDE_PROTOCOL || // 1.for override url in 2.6.x
ret.GetParam(constant.CATEGORY_KEY, constant.DEFAULT_CATEGORY) == constant.CONFIGURATORS_CATEGORY {
dir.configurators = append(dir.configurators, extension.GetDefaultConfigurator(ret))
ret = nil
} else if ret.Protocol == constant.ROUTER_PROTOCOL || // 2.for router
ret.GetParam(constant.CATEGORY_KEY, constant.DEFAULT_CATEGORY) == constant.ROUTER_CATEGORY {
ret = nil
}
return ret
}
func (dir *RegistryDirectory) toGroupInvokers() []protocol.Invoker {
......@@ -215,11 +272,15 @@ func (dir *RegistryDirectory) toGroupInvokers() []protocol.Invoker {
return groupInvokersList
}
// uncacheInvoker will return abandoned Invoker,if no Invoker to be abandoned,return nil
// uncacheInvoker will return abandoned Invoker, if no Invoker to be abandoned, return nil
func (dir *RegistryDirectory) uncacheInvoker(url *common.URL) protocol.Invoker {
logger.Debugf("service will be deleted in cache invokers: invokers key is %s!", url.Key())
if cacheInvoker, ok := dir.cacheInvokersMap.Load(url.Key()); ok {
dir.cacheInvokersMap.Delete(url.Key())
return dir.uncacheInvokerWithKey(url.Key())
}
func (dir *RegistryDirectory) uncacheInvokerWithKey(key string) protocol.Invoker {
logger.Debugf("service will be deleted in cache invokers: invokers key is %s!", key)
if cacheInvoker, ok := dir.cacheInvokersMap.Load(key); ok {
dir.cacheInvokersMap.Delete(key)
return cacheInvoker.(protocol.Invoker)
}
return nil
......@@ -250,6 +311,12 @@ func (dir *RegistryDirectory) cacheInvoker(url *common.URL) protocol.Invoker {
dir.cacheInvokersMap.Store(newUrl.Key(), newInvoker)
}
} else {
// if cached invoker has the same URL with the new URL, then no need to re-refer, and no need to destroy
// the old invoker.
if common.IsEquals(*newUrl, cacheInvoker.(protocol.Invoker).GetUrl()) {
return nil
}
logger.Debugf("service will be updated in cache invokers: new invoker url is %s, old invoker url is %s", newUrl, cacheInvoker.(protocol.Invoker).GetUrl())
newInvoker := extension.GetProtocol(protocolwrapper.FILTER).Refer(*newUrl)
if newInvoker != nil {
......@@ -348,7 +415,8 @@ func newReferenceConfigurationListener(dir *RegistryDirectory, url *common.URL)
// Process handle events and update Invokers
func (l *referenceConfigurationListener) Process(event *config_center.ConfigChangeEvent) {
l.BaseConfigurationListener.Process(event)
l.directory.refreshInvokers(nil)
// FIXME: this doesn't trigger dir.overrideUrl()
l.directory.refreshInvokers()
}
type consumerConfigurationListener struct {
......@@ -374,5 +442,6 @@ func (l *consumerConfigurationListener) addNotifyListener(listener registry.Noti
// Process handles events from Configuration Center and update Invokers
func (l *consumerConfigurationListener) Process(event *config_center.ConfigChangeEvent) {
l.BaseConfigurationListener.Process(event)
l.directory.refreshInvokers(nil)
// FIXME: this doesn't trigger dir.overrideUrl()
l.directory.refreshInvokers()
}
......@@ -242,7 +242,12 @@ func newOverrideSubscribeListener(overriderUrl *common.URL, invoker protocol.Inv
}
// Notify will be triggered when a service change notification is received.
func (nl *overrideSubscribeListener) Notify(event *registry.ServiceEvent) {
func (nl *overrideSubscribeListener) Notify(events ...*registry.ServiceEvent) {
if events == nil || len(events) == 0 {
return
}
event := events[0]
if isMatched(&(event.Service), nl.url) && event.Action == remoting.EventTypeAdd {
nl.configurator = extension.GetDefaultConfigurator(&(event.Service))
nl.doOverrideIfNecessary()
......
......@@ -69,7 +69,7 @@ type Registry interface {
// nolint
type NotifyListener interface {
// Notify supports notifications on the service interface and the dimension of the data type.
Notify(*ServiceEvent)
Notify(...*ServiceEvent)
}
// Listener Deprecated!
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment