Newer
Older
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
"fmt"
"net/url"
"os"

AlexStocks
committed
"github.com/apache/dubbo-go/cluster/directory"
"github.com/apache/dubbo-go/cluster/router/chain"

AlexStocks
committed
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
_ "github.com/apache/dubbo-go/config_center/configurator"

AlexStocks
committed
"github.com/apache/dubbo-go/protocol"
"github.com/apache/dubbo-go/protocol/protocolwrapper"
"github.com/apache/dubbo-go/registry"
"github.com/apache/dubbo-go/remoting"
extension.SetDefaultRegistryDirectory(NewRegistryDirectory)

AlexStocks
committed
// RegistryDirectory implementation of Directory:
// Invoker list returned from this Directory's list method have been filtered by Routers
cacheInvokers []protocol.Invoker
listenerLock sync.Mutex
serviceType string
registry registry.Registry
cacheOriginUrl *common.URL
configurators []config_center.Configurator
consumerConfigurationListener *consumerConfigurationListener
referenceConfigurationListener *referenceConfigurationListener
//serviceKey string
//forbidden atomic.Bool
registerLock sync.Mutex // this lock if for register
// NewRegistryDirectory will create a new RegistryDirectory
func NewRegistryDirectory(url *common.URL, registry registry.Registry) (cluster.Directory, error) {
if url.SubURL == nil {
return nil, perrors.Errorf("url is invalid, suburl can not be nil")
logger.Debugf("new RegistryDirectory for service :%s.", url.Key())
cacheInvokers: []protocol.Invoker{},
if routerChain, err := chain.NewRouterChain(dir.consumerURL); err == nil {
dir.BaseDirectory.SetRouterChain(routerChain)
} else {
logger.Warnf("fail to create router chain with url: %s, err is: %v", url.SubURL, err)
}
dir.consumerConfigurationListener = newConsumerConfigurationListener(dir)
func (dir *RegistryDirectory) subscribe(url *common.URL) {
logger.Debugf("subscribe service :%s for RegistryDirectory.", url.Key())
dir.consumerConfigurationListener.addNotifyListener(dir)
dir.referenceConfigurationListener = newReferenceConfigurationListener(dir, url)
if err := dir.registry.Subscribe(url, dir); err != nil {
logger.Error("registry.Subscribe(url:%v, dir:%v) = error:%v", url, dir, err)
}
// Notify monitor changes from registry,and update the cacheServices
func (dir *RegistryDirectory) Notify(event *registry.ServiceEvent) {
if event == nil {
return
}
go dir.refreshInvokers(event)

cvictory
committed
// NotifyAll notify the events that are complete Service Event List.
// After notify the address, the callback func will be invoked.
func (dir *RegistryDirectory) NotifyAll(events []*registry.ServiceEvent, callback func()) {
go dir.refreshAllInvokers(events, callback)
}
// refreshInvokers refreshes service's events.
func (dir *RegistryDirectory) refreshInvokers(event *registry.ServiceEvent) {
} else {
logger.Debug("refresh invokers with nil")
}
var oldInvoker protocol.Invoker
if event != nil {
oldInvoker, _ = dir.cacheInvokerByEvent(event)
}
dir.setNewInvokers()
if oldInvoker != nil {
oldInvoker.Destroy()
// refreshAllInvokers the argument is the complete list of the service events, we can safely assume any cached invoker

cvictory
committed
// not in the incoming list can be removed. The Action of serviceEvent should be EventTypeUpdate.
func (dir *RegistryDirectory) refreshAllInvokers(events []*registry.ServiceEvent, callback func()) {
var (
oldInvokers []protocol.Invoker
addEvents []*registry.ServiceEvent
)
dir.overrideUrl(dir.GetDirectoryUrl())
referenceUrl := dir.GetDirectoryUrl().SubURL

cvictory
committed
// loop the events to check the Action should be EventTypeUpdate.

cvictory
committed
if event.Action != remoting.EventTypeUpdate {
panic("Your implements of register center is wrong, " +
"please check the Action of ServiceEvent should be EventTypeUpdate")
}
// Originally it will Merge URL many times, now we just execute once.
// MergeUrl is executed once and put the result into Event. After this, the key will get from Event.Key().
newUrl := dir.convertUrl(event)
newUrl = common.MergeUrl(newUrl, referenceUrl)
dir.overrideUrl(newUrl)
event.Update(newUrl)
}
// After notify all addresses, do some callback.
defer callback()

cvictory
committed
func() {
// this lock is work at batch update of InvokeCache
dir.registerLock.Lock()
defer dir.registerLock.Unlock()
// get need clear invokers from original invoker list
dir.cacheInvokersMap.Range(func(k, v interface{}) bool {
if !dir.eventMatched(k.(string), events) {

cvictory
committed
// delete unused invoker from cache
if invoker := dir.uncacheInvokerWithKey(k.(string)); invoker != nil {
oldInvokers = append(oldInvokers, invoker)
}
}
return true
})

cvictory
committed
// get need add invokers from events
for _, event := range events {
// Get the key from Event.Key()
if _, ok := dir.cacheInvokersMap.Load(event.Key()); !ok {

cvictory
committed
addEvents = append(addEvents, event)
}

cvictory
committed
// loop the updateEvents
for _, event := range addEvents {
logger.Debugf("registry update, result{%s}", event)
logger.Infof("selector add service url{%s}", event.Service)
if constant.ROUTER_PROTOCOL == event.Service.Protocol {
dir.configRouters()
}
if oldInvoker, _ := dir.doCacheInvoker(event.Service); oldInvoker != nil {

cvictory
committed
oldInvokers = append(oldInvokers, oldInvoker)
}
}
}()
dir.setNewInvokers()
// destroy unused invokers
}
}
// eventMatched checks if a cached invoker appears in the incoming invoker list, if no, then it is safe to remove.
func (dir *RegistryDirectory) eventMatched(key string, events []*registry.ServiceEvent) bool {
for _, event := range events {
if dir.invokerCacheKey(event) == key {
// invokerCacheKey generates the key in the cache for a given ServiceEvent.
func (dir *RegistryDirectory) invokerCacheKey(event *registry.ServiceEvent) string {
// If the url is merged, then return Event.Key() directly.
if event.Updated() {
return event.Key()
}
newUrl := common.MergeUrl(event.Service, referenceUrl)
event.Update(newUrl)
return newUrl.Key()
}
// setNewInvokers groups the invokers from the cache first, then set the result to both directory and router chain.
func (dir *RegistryDirectory) setNewInvokers() {
newInvokers := dir.toGroupInvokers()
dir.cacheInvokers = newInvokers
dir.RouterChain().SetInvokers(newInvokers)
}
// cacheInvokerByEvent caches invokers from the service event
func (dir *RegistryDirectory) cacheInvokerByEvent(event *registry.ServiceEvent) (protocol.Invoker, error) {
// judge is override or others
if event != nil {
u := dir.convertUrl(event)
switch event.Action {
case remoting.EventTypeAdd, remoting.EventTypeUpdate:
logger.Infof("selector add service url{%s}", event.Service)
// FIXME: routers are built in every address notification?
if constant.ROUTER_PROTOCOL == u.Protocol {
dir.configRouters()
}
return dir.cacheInvoker(u), nil
case remoting.EventTypeDel:
logger.Infof("selector delete service url{%s}", event.Service)
return dir.uncacheInvoker(u), nil
default:
return nil, fmt.Errorf("illegal event type: %v", event.Action)
}
return nil, nil
}
// configRouters configures dynamic routers into the router chain, but, the current impl is incorrect, see FIXME above.
func (dir *RegistryDirectory) configRouters() {
var urls []*common.URL
for _, v := range config.GetRouterURLSet().Values() {
urls = append(urls, v.(*common.URL))
if len(urls) > 0 {
dir.SetRouters(urls)
}
}
// convertUrl processes override:// and router://
func (dir *RegistryDirectory) convertUrl(res *registry.ServiceEvent) *common.URL {
if ret.Protocol == constant.OVERRIDE_PROTOCOL || // 1.for override url in 2.6.x
ret.GetParam(constant.CATEGORY_KEY, constant.DEFAULT_CATEGORY) == constant.CONFIGURATORS_CATEGORY {
dir.configurators = append(dir.configurators, extension.GetDefaultConfigurator(ret))
ret = nil
} else if ret.Protocol == constant.ROUTER_PROTOCOL || // 2.for router
ret.GetParam(constant.CATEGORY_KEY, constant.DEFAULT_CATEGORY) == constant.ROUTER_CATEGORY {
ret = nil
}
return ret
func (dir *RegistryDirectory) toGroupInvokers() []protocol.Invoker {
var (
err error
newInvokersList []protocol.Invoker
)
groupInvokersMap := make(map[string][]protocol.Invoker)
dir.cacheInvokersMap.Range(func(key, value interface{}) bool {
newInvokersList = append(newInvokersList, value.(protocol.Invoker))
return true
})
for _, invoker := range newInvokersList {
group := invoker.GetUrl().GetParam(constant.GROUP_KEY, "")
groupInvokersMap[group] = append(groupInvokersMap[group], invoker)
groupInvokersList := make([]protocol.Invoker, 0, len(groupInvokersMap))
if len(groupInvokersMap) == 1 {
// len is 1 it means no group setting ,so do not need cluster again
for _, invokers := range groupInvokersMap {
groupInvokersList = invokers
}
} else {
for _, invokers := range groupInvokersMap {
staticDir := directory.NewStaticDirectory(invokers)
cst := extension.GetCluster(dir.GetUrl().SubURL.GetParam(constant.CLUSTER_KEY, constant.DEFAULT_CLUSTER))
err = staticDir.BuildRouterChain(invokers)
if err != nil {
logger.Error(err)
continue
}
groupInvokersList = append(groupInvokersList, cst.Join(staticDir))
}
}
return groupInvokersList
// uncacheInvoker will return abandoned Invoker, if no Invoker to be abandoned, return nil
func (dir *RegistryDirectory) uncacheInvoker(url *common.URL) protocol.Invoker {
return dir.uncacheInvokerWithKey(url.Key())
}
func (dir *RegistryDirectory) uncacheInvokerWithKey(key string) protocol.Invoker {
logger.Debugf("service will be deleted in cache invokers: invokers key is %s!", key)
protocol.RemoveUrlKeyUnhealthyStatus(key)
if cacheInvoker, ok := dir.cacheInvokersMap.Load(key); ok {
dir.cacheInvokersMap.Delete(key)
// cacheInvoker will return abandoned Invoker,if no Invoker to be abandoned,return nil
func (dir *RegistryDirectory) cacheInvoker(url *common.URL) protocol.Invoker {
dir.overrideUrl(dir.GetDirectoryUrl())
referenceUrl := dir.GetDirectoryUrl().SubURL
if url == nil && dir.cacheOriginUrl != nil {
url = dir.cacheOriginUrl
}
if url == nil {
logger.Error("URL is nil ,pls check if service url is subscribe successfully!")
// check the url's protocol is equal to the protocol which is configured in reference config or referenceUrl is not care about protocol
if url.Protocol == referenceUrl.Protocol || referenceUrl.Protocol == "" {
newUrl := common.MergeUrl(url, referenceUrl)
dir.overrideUrl(newUrl)
func (dir *RegistryDirectory) doCacheInvoker(newUrl *common.URL) (protocol.Invoker, bool) {
key := newUrl.Key()
if cacheInvoker, ok := dir.cacheInvokersMap.Load(key); !ok {
logger.Debugf("service will be added in cache invokers: invokers url is %s!", newUrl)
newInvoker := extension.GetProtocol(protocolwrapper.FILTER).Refer(newUrl)
if newInvoker != nil {
dir.cacheInvokersMap.Store(key, newInvoker)
}
} else {
// if cached invoker has the same URL with the new URL, then no need to re-refer, and no need to destroy
// the old invoker.
if common.GetCompareURLEqualFunc()(newUrl, cacheInvoker.(protocol.Invoker).GetUrl()) {
return nil, true
}
logger.Debugf("service will be updated in cache invokers: new invoker url is %s, old invoker url is %s", newUrl, cacheInvoker.(protocol.Invoker).GetUrl())
newInvoker := extension.GetProtocol(protocolwrapper.FILTER).Refer(newUrl)
if newInvoker != nil {
dir.cacheInvokersMap.Store(key, newInvoker)
return cacheInvoker.(protocol.Invoker), true
}
}
return nil, false
}
// List selected protocol invokers from the directory
func (dir *RegistryDirectory) List(invocation protocol.Invocation) []protocol.Invoker {
// IsAvailable whether the directory is available
func (dir *RegistryDirectory) IsAvailable() bool {
if !dir.BaseDirectory.IsAvailable() {
return dir.BaseDirectory.IsAvailable()
}
for _, ivk := range dir.cacheInvokers {
if ivk.IsAvailable() {
return true
func (dir *RegistryDirectory) Destroy() {
invokers := dir.cacheInvokers
dir.cacheInvokers = []protocol.Invoker{}
for _, ivk := range invokers {
func (dir *RegistryDirectory) overrideUrl(targetUrl *common.URL) {
doOverrideUrl(dir.configurators, targetUrl)
doOverrideUrl(dir.consumerConfigurationListener.Configurators(), targetUrl)
doOverrideUrl(dir.referenceConfigurationListener.Configurators(), targetUrl)
}
func (dir *RegistryDirectory) getConsumerUrl(c *common.URL) *common.URL {
processID := fmt.Sprintf("%d", os.Getpid())
params := url.Values{}
c.RangeParams(func(key, value string) bool {
params.Add(key, value)
return true
})
params.Add("pid", processID)
params.Add("ip", localIP)
params.Add("protocol", c.Protocol)
return common.NewURLWithOptions(common.WithProtocol("consumer"), common.WithIp(localIP), common.WithPath(c.Path),
common.WithParams(params))
}
func doOverrideUrl(configurators []config_center.Configurator, targetUrl *common.URL) {
for _, v := range configurators {
v.Configure(targetUrl)
}
}
func newReferenceConfigurationListener(dir *RegistryDirectory, url *common.URL) *referenceConfigurationListener {
listener := &referenceConfigurationListener{directory: dir, url: url}
listener.InitWith(
url.EncodedServiceKey()+constant.CONFIGURATORS_SUFFIX,
listener,
extension.GetDefaultConfiguratorFunc(),
)
func (l *referenceConfigurationListener) Process(event *config_center.ConfigChangeEvent) {
// FIXME: this doesn't trigger dir.overrideUrl()
l.directory.refreshInvokers(nil)
}
type consumerConfigurationListener struct {
registry.BaseConfigurationListener
listeners []registry.NotifyListener
func newConsumerConfigurationListener(dir *RegistryDirectory) *consumerConfigurationListener {
listener := &consumerConfigurationListener{directory: dir}
listener.InitWith(
config.GetConsumerConfig().ApplicationConfig.Name+constant.CONFIGURATORS_SUFFIX,
listener,
extension.GetDefaultConfiguratorFunc(),
)
func (l *consumerConfigurationListener) addNotifyListener(listener registry.NotifyListener) {
l.listeners = append(l.listeners, listener)
}
// Process handles events from Configuration Center and update Invokers
func (l *consumerConfigurationListener) Process(event *config_center.ConfigChangeEvent) {
l.BaseConfigurationListener.Process(event)
// FIXME: this doesn't trigger dir.overrideUrl()
l.directory.refreshInvokers(nil)