Skip to content
Snippets Groups Projects
directory.go 16.8 KiB
Newer Older
AlexStocks's avatar
AlexStocks committed
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
fangyincheng's avatar
fangyincheng committed

vito.he's avatar
vito.he committed
package directory

import (
	"sync"
)

import (
fangyincheng's avatar
fangyincheng committed
	perrors "github.com/pkg/errors"
	"github.com/apache/dubbo-go/cluster"
	"github.com/apache/dubbo-go/cluster/directory"
	"github.com/apache/dubbo-go/cluster/router/chain"
	"github.com/apache/dubbo-go/common"
	"github.com/apache/dubbo-go/common/constant"
	"github.com/apache/dubbo-go/common/extension"
fangyincheng's avatar
fangyincheng committed
	"github.com/apache/dubbo-go/common/logger"
vito.he's avatar
vito.he committed
	"github.com/apache/dubbo-go/config"
	"github.com/apache/dubbo-go/config_center"
vito.he's avatar
vito.he committed
	_ "github.com/apache/dubbo-go/config_center/configurator"
	"github.com/apache/dubbo-go/protocol"
	"github.com/apache/dubbo-go/protocol/protocolwrapper"
	"github.com/apache/dubbo-go/registry"
	"github.com/apache/dubbo-go/remoting"
func init() {
	extension.SetDefaultRegistryDirectory(NewRegistryDirectory)
// RegistryDirectory implementation of Directory:
// Invoker list returned from this Directory's list method have been filtered by Routers
type RegistryDirectory struct {
vito.he's avatar
vito.he committed
	directory.BaseDirectory
vito.he's avatar
vito.he committed
	cacheInvokers                  []protocol.Invoker
	listenerLock                   sync.Mutex
	serviceType                    string
	registry                       registry.Registry
flycash's avatar
flycash committed
	cacheInvokersMap               *sync.Map // use sync.map
Ian Luo's avatar
Ian Luo committed
	consumerURL                    *common.URL
vito.he's avatar
vito.he committed
	cacheOriginUrl                 *common.URL
	configurators                  []config_center.Configurator
	consumerConfigurationListener  *consumerConfigurationListener
	referenceConfigurationListener *referenceConfigurationListener
AlexStocks's avatar
AlexStocks committed
	//serviceKey                     string
	//forbidden                      atomic.Bool
	registerLock sync.Mutex // this lock if for register
renzhiyuan02's avatar
renzhiyuan02 committed
// NewRegistryDirectory will create a new RegistryDirectory
func NewRegistryDirectory(url *common.URL, registry registry.Registry) (cluster.Directory, error) {
fangyincheng's avatar
fangyincheng committed
		return nil, perrors.Errorf("url is invalid, suburl can not be nil")
cvictory's avatar
cvictory committed
	logger.Debugf("new RegistryDirectory for service :%s.", url.Key())
	dir := &RegistryDirectory{
vito.he's avatar
vito.he committed
		BaseDirectory:    directory.NewBaseDirectory(url),
		cacheInvokers:    []protocol.Invoker{},
		cacheInvokersMap: &sync.Map{},
		serviceType:      url.SubURL.Service(),
		registry:         registry,
vito.he's avatar
vito.he committed
	}
Ian Luo's avatar
Ian Luo committed
	dir.consumerURL = dir.getConsumerUrl(url.SubURL)
Ian Luo's avatar
Ian Luo committed
	if routerChain, err := chain.NewRouterChain(dir.consumerURL); err == nil {
		dir.BaseDirectory.SetRouterChain(routerChain)
	} else {
		logger.Warnf("fail to create router chain with url: %s, err is: %v", url.SubURL, err)
	}

vito.he's avatar
vito.he committed
	dir.consumerConfigurationListener = newConsumerConfigurationListener(dir)

	go dir.subscribe(url.SubURL)
vito.he's avatar
vito.he committed
	return dir, nil
flycash's avatar
flycash committed
// subscribe from registry
func (dir *RegistryDirectory) subscribe(url *common.URL) {
cvictory's avatar
cvictory committed
	logger.Debugf("subscribe service :%s for RegistryDirectory.", url.Key())
邹毅贤's avatar
邹毅贤 committed
	dir.consumerConfigurationListener.addNotifyListener(dir)
	dir.referenceConfigurationListener = newReferenceConfigurationListener(dir, url)
AlexStocks's avatar
AlexStocks committed
	if err := dir.registry.Subscribe(url, dir); err != nil {
		logger.Error("registry.Subscribe(url:%v, dir:%v) = error:%v", url, dir, err)
	}
renzhiyuan02's avatar
renzhiyuan02 committed
// Notify monitor changes from registry,and update the cacheServices
func (dir *RegistryDirectory) Notify(event *registry.ServiceEvent) {
	if event == nil {
		return
	}
	go dir.refreshInvokers(event)
// NotifyAll notify the events that are complete Service Event List.
cvictory's avatar
cvictory committed
// After notify the address, the callback func will be invoked.
func (dir *RegistryDirectory) NotifyAll(events []*registry.ServiceEvent, callback func()) {
	go dir.refreshAllInvokers(events, callback)
}

// refreshInvokers refreshes service's events.
func (dir *RegistryDirectory) refreshInvokers(event *registry.ServiceEvent) {
cvictory's avatar
cvictory committed
	if event != nil {
cvictory's avatar
cvictory committed
		logger.Debugf("refresh invokers with %+v", event)
cvictory's avatar
cvictory committed
	} else {
		logger.Debug("refresh invokers with nil")
	}

	var oldInvoker protocol.Invoker
	if event != nil {
		oldInvoker, _ = dir.cacheInvokerByEvent(event)
	}
	dir.setNewInvokers()
	if oldInvoker != nil {
		oldInvoker.Destroy()
// refreshAllInvokers the argument is the complete list of the service events,  we can safely assume any cached invoker
// not in the incoming list can be removed.  The Action of serviceEvent should be EventTypeUpdate.
cvictory's avatar
cvictory committed
func (dir *RegistryDirectory) refreshAllInvokers(events []*registry.ServiceEvent, callback func()) {
	var (
		oldInvokers []protocol.Invoker
		addEvents   []*registry.ServiceEvent
	)
	dir.overrideUrl(dir.GetDirectoryUrl())
	referenceUrl := dir.GetDirectoryUrl().SubURL
	// loop the events to check the Action should be EventTypeUpdate.
	for _, event := range events {
		if event.Action != remoting.EventTypeUpdate {
			panic("Your implements of register center is wrong, " +
				"please check the Action of ServiceEvent should be EventTypeUpdate")
cvictory's avatar
cvictory committed
		// Originally it will Merge URL many times, now we just execute once.
		// MergeUrl is executed once and put the result into Event. After this, the key will get from Event.Key().
		newUrl := dir.convertUrl(event)
		newUrl = common.MergeUrl(newUrl, referenceUrl)
		dir.overrideUrl(newUrl)
		event.Update(newUrl)
cvictory's avatar
cvictory committed
	// After notify all addresses, do some callback.
	defer callback()
	func() {
		// this lock is work at batch update of InvokeCache
		dir.registerLock.Lock()
		defer dir.registerLock.Unlock()
		// get need clear invokers from original invoker list
		dir.cacheInvokersMap.Range(func(k, v interface{}) bool {
			if !dir.eventMatched(k.(string), events) {
				if invoker := dir.uncacheInvokerWithKey(k.(string)); invoker != nil {
					oldInvokers = append(oldInvokers, invoker)
				}
			}
			return true
		})
		// get need add invokers from events
		for _, event := range events {
cvictory's avatar
cvictory committed
			// Get the key from Event.Key()
			if _, ok := dir.cacheInvokersMap.Load(event.Key()); !ok {
vito.he's avatar
vito.he committed
		}
		// loop the updateEvents
		for _, event := range addEvents {
			logger.Debugf("registry update, result{%s}", event)
cvictory's avatar
cvictory committed
			logger.Infof("selector add service url{%s}", event.Service)
			if constant.ROUTER_PROTOCOL == event.Service.Protocol {
				dir.configRouters()
			}
cvictory's avatar
cvictory committed
			if oldInvoker, _ := dir.doCacheInvoker(event.Service); oldInvoker != nil {
				oldInvokers = append(oldInvokers, oldInvoker)
			}
		}
	}()
	dir.setNewInvokers()
	// destroy unused invokers
	for _, invoker := range oldInvokers {
		go invoker.Destroy()
	}
}

// eventMatched checks if a cached invoker appears in the incoming invoker list, if no, then it is safe to remove.
func (dir *RegistryDirectory) eventMatched(key string, events []*registry.ServiceEvent) bool {
	for _, event := range events {
		if dir.invokerCacheKey(event) == key {
			return true
邹毅贤's avatar
邹毅贤 committed
		}
	return false
}
// invokerCacheKey generates the key in the cache for a given ServiceEvent.
func (dir *RegistryDirectory) invokerCacheKey(event *registry.ServiceEvent) string {
cvictory's avatar
cvictory committed
	// If the url is merged, then return Event.Key() directly.
	if event.Updated() {
		return event.Key()
	}
	referenceUrl := dir.GetDirectoryUrl().SubURL
	newUrl := common.MergeUrl(event.Service, referenceUrl)
	event.Update(newUrl)
	return newUrl.Key()
}

// setNewInvokers groups the invokers from the cache first, then set the result to both directory and router chain.
func (dir *RegistryDirectory) setNewInvokers() {
	newInvokers := dir.toGroupInvokers()
	dir.listenerLock.Lock()
	defer dir.listenerLock.Unlock()
	dir.cacheInvokers = newInvokers
	dir.RouterChain().SetInvokers(newInvokers)
}

// cacheInvokerByEvent caches invokers from the service event
func (dir *RegistryDirectory) cacheInvokerByEvent(event *registry.ServiceEvent) (protocol.Invoker, error) {
	// judge is override or others
	if event != nil {
		u := dir.convertUrl(event)
		switch event.Action {
		case remoting.EventTypeAdd, remoting.EventTypeUpdate:
			logger.Infof("selector add service url{%s}", event.Service)
			// FIXME: routers are built in every address notification?
			if constant.ROUTER_PROTOCOL == u.Protocol {
				dir.configRouters()
			}
			return dir.cacheInvoker(u), nil
		case remoting.EventTypeDel:
			logger.Infof("selector delete service url{%s}", event.Service)
			return dir.uncacheInvoker(u), nil
		default:
			return nil, fmt.Errorf("illegal event type: %v", event.Action)
		}
Ian Luo's avatar
Ian Luo committed
	}
	return nil, nil
}

// configRouters configures dynamic routers into the router chain, but, the current impl is incorrect, see FIXME above.
func (dir *RegistryDirectory) configRouters() {
	var urls []*common.URL
	for _, v := range config.GetRouterURLSet().Values() {
		urls = append(urls, v.(*common.URL))
pantianying's avatar
pantianying committed
	}

	if len(urls) > 0 {
		dir.SetRouters(urls)
	}
}

// convertUrl processes override:// and router://
func (dir *RegistryDirectory) convertUrl(res *registry.ServiceEvent) *common.URL {
haohongfan's avatar
haohongfan committed
	ret := res.Service
	if ret.Protocol == constant.OVERRIDE_PROTOCOL || // 1.for override url in 2.6.x
		ret.GetParam(constant.CATEGORY_KEY, constant.DEFAULT_CATEGORY) == constant.CONFIGURATORS_CATEGORY {
		dir.configurators = append(dir.configurators, extension.GetDefaultConfigurator(ret))
		ret = nil
	} else if ret.Protocol == constant.ROUTER_PROTOCOL || // 2.for router
		ret.GetParam(constant.CATEGORY_KEY, constant.DEFAULT_CATEGORY) == constant.ROUTER_CATEGORY {
		ret = nil
	}
	return ret
func (dir *RegistryDirectory) toGroupInvokers() []protocol.Invoker {
watermelo's avatar
watermelo committed
	var (
		err             error
		newInvokersList []protocol.Invoker
	)
	groupInvokersMap := make(map[string][]protocol.Invoker)

	dir.cacheInvokersMap.Range(func(key, value interface{}) bool {
		newInvokersList = append(newInvokersList, value.(protocol.Invoker))
		return true
	})
	for _, invoker := range newInvokersList {
		group := invoker.GetUrl().GetParam(constant.GROUP_KEY, "")
AlexStocks's avatar
AlexStocks committed
		groupInvokersMap[group] = append(groupInvokersMap[group], invoker)
watermelo's avatar
watermelo committed
	groupInvokersList := make([]protocol.Invoker, 0, len(groupInvokersMap))
	if len(groupInvokersMap) == 1 {
flycash's avatar
flycash committed
		// len is 1 it means no group setting ,so do not need cluster again
		for _, invokers := range groupInvokersMap {
			groupInvokersList = invokers
		}
	} else {
		for _, invokers := range groupInvokersMap {
vito.he's avatar
vito.he committed
			staticDir := directory.NewStaticDirectory(invokers)
watermelo's avatar
watermelo committed
			cst := extension.GetCluster(dir.GetUrl().SubURL.GetParam(constant.CLUSTER_KEY, constant.DEFAULT_CLUSTER))
			err = staticDir.BuildRouterChain(invokers)
			if err != nil {
				logger.Error(err)
				continue
			}
			groupInvokersList = append(groupInvokersList, cst.Join(staticDir))
// uncacheInvoker will return abandoned Invoker, if no Invoker to be abandoned, return nil
func (dir *RegistryDirectory) uncacheInvoker(url *common.URL) protocol.Invoker {
	return dir.uncacheInvokerWithKey(url.Key())
}

func (dir *RegistryDirectory) uncacheInvokerWithKey(key string) protocol.Invoker {
	logger.Debugf("service will be deleted in cache invokers: invokers key is  %s!", key)
	protocol.RemoveUrlKeyUnhealthyStatus(key)
	if cacheInvoker, ok := dir.cacheInvokersMap.Load(key); ok {
		dir.cacheInvokersMap.Delete(key)
pantianying's avatar
pantianying committed
		return cacheInvoker.(protocol.Invoker)
	}
	return nil
// cacheInvoker will return abandoned Invoker,if no Invoker to be abandoned,return nil
func (dir *RegistryDirectory) cacheInvoker(url *common.URL) protocol.Invoker {
vito.he's avatar
vito.he committed
	dir.overrideUrl(dir.GetDirectoryUrl())
	referenceUrl := dir.GetDirectoryUrl().SubURL

	if url == nil && dir.cacheOriginUrl != nil {
		url = dir.cacheOriginUrl
Ian Luo's avatar
Ian Luo committed
	} else {
		dir.cacheOriginUrl = url
vito.he's avatar
vito.he committed
	}
	if url == nil {
		logger.Error("URL is nil ,pls check if service url is subscribe successfully!")
pantianying's avatar
pantianying committed
		return nil
vito.he's avatar
vito.he committed
	}
flycash's avatar
flycash committed
	// check the url's protocol is equal to the protocol which is configured in reference config or referenceUrl is not care about protocol
	if url.Protocol == referenceUrl.Protocol || referenceUrl.Protocol == "" {
vito.he's avatar
vito.he committed
		newUrl := common.MergeUrl(url, referenceUrl)
		dir.overrideUrl(newUrl)
cvictory's avatar
cvictory committed
		if v, ok := dir.doCacheInvoker(newUrl); ok {
			return v
pantianying's avatar
pantianying committed
	return nil
cvictory's avatar
cvictory committed
func (dir *RegistryDirectory) doCacheInvoker(newUrl *common.URL) (protocol.Invoker, bool) {
	key := newUrl.Key()
	if cacheInvoker, ok := dir.cacheInvokersMap.Load(key); !ok {
		logger.Debugf("service will be added in cache invokers: invokers url is  %s!", newUrl)
		newInvoker := extension.GetProtocol(protocolwrapper.FILTER).Refer(newUrl)
		if newInvoker != nil {
			dir.cacheInvokersMap.Store(key, newInvoker)
		}
	} else {
		// if cached invoker has the same URL with the new URL, then no need to re-refer, and no need to destroy
		// the old invoker.
		if common.GetCompareURLEqualFunc()(newUrl, cacheInvoker.(protocol.Invoker).GetUrl()) {
cvictory's avatar
cvictory committed
			return nil, true
		}

		logger.Debugf("service will be updated in cache invokers: new invoker url is %s, old invoker url is %s", newUrl, cacheInvoker.(protocol.Invoker).GetUrl())
		newInvoker := extension.GetProtocol(protocolwrapper.FILTER).Refer(newUrl)
		if newInvoker != nil {
			dir.cacheInvokersMap.Store(key, newInvoker)
			return cacheInvoker.(protocol.Invoker), true
		}
	}
	return nil, false
}

// List selected protocol invokers from the directory
func (dir *RegistryDirectory) List(invocation protocol.Invocation) []protocol.Invoker {
aliiohs's avatar
aliiohs committed
	invokers := dir.cacheInvokers
邹毅贤's avatar
邹毅贤 committed
	routerChain := dir.RouterChain()
aliiohs's avatar
aliiohs committed

邹毅贤's avatar
邹毅贤 committed
	if routerChain == nil {
		return invokers
aliiohs's avatar
aliiohs committed
	}
Ian Luo's avatar
Ian Luo committed
	return routerChain.Route(dir.consumerURL, invocation)
renzhiyuan's avatar
renzhiyuan committed
// IsAvailable  whether the directory is available
func (dir *RegistryDirectory) IsAvailable() bool {
	if !dir.BaseDirectory.IsAvailable() {
		return dir.BaseDirectory.IsAvailable()
AlexStocks's avatar
AlexStocks committed
	}

	for _, ivk := range dir.cacheInvokers {
		if ivk.IsAvailable() {
			return true
	return false
renzhiyuan02's avatar
renzhiyuan02 committed
// Destroy method
func (dir *RegistryDirectory) Destroy() {
flycash's avatar
flycash committed
	// TODO:unregister & unsubscribe
	dir.BaseDirectory.Destroy(func() {
pantianying's avatar
pantianying committed
		invokers := dir.cacheInvokers
		dir.cacheInvokers = []protocol.Invoker{}
		for _, ivk := range invokers {
			ivk.Destroy()
		}
	})
高辛格's avatar
高辛格 committed

func (dir *RegistryDirectory) overrideUrl(targetUrl *common.URL) {
vito.he's avatar
vito.he committed
	doOverrideUrl(dir.configurators, targetUrl)
	doOverrideUrl(dir.consumerConfigurationListener.Configurators(), targetUrl)
	doOverrideUrl(dir.referenceConfigurationListener.Configurators(), targetUrl)
}

func (dir *RegistryDirectory) getConsumerUrl(c *common.URL) *common.URL {
	processID := fmt.Sprintf("%d", os.Getpid())
	localIP := common.GetLocalIp()

	params := url.Values{}
	c.RangeParams(func(key, value string) bool {
		params.Add(key, value)
		return true
	})

	params.Add("pid", processID)
	params.Add("ip", localIP)
	params.Add("protocol", c.Protocol)

	return common.NewURLWithOptions(common.WithProtocol("consumer"), common.WithIp(localIP), common.WithPath(c.Path),
		common.WithParams(params))
}

vito.he's avatar
vito.he committed
func doOverrideUrl(configurators []config_center.Configurator, targetUrl *common.URL) {
	for _, v := range configurators {
		v.Configure(targetUrl)
	}
}
vito.he's avatar
vito.he committed

type referenceConfigurationListener struct {
vito.he's avatar
vito.he committed
	registry.BaseConfigurationListener
	directory *RegistryDirectory
vito.he's avatar
vito.he committed
	url       *common.URL
}

func newReferenceConfigurationListener(dir *RegistryDirectory, url *common.URL) *referenceConfigurationListener {
vito.he's avatar
vito.he committed
	listener := &referenceConfigurationListener{directory: dir, url: url}
	listener.InitWith(
		url.EncodedServiceKey()+constant.CONFIGURATORS_SUFFIX,
		listener,
		extension.GetDefaultConfiguratorFunc(),
	)
vito.he's avatar
vito.he committed
	return listener
}

renzhiyuan's avatar
renzhiyuan committed
// Process handle events and update Invokers
func (l *referenceConfigurationListener) Process(event *config_center.ConfigChangeEvent) {
vito.he's avatar
vito.he committed
	l.BaseConfigurationListener.Process(event)
	// FIXME: this doesn't trigger dir.overrideUrl()
vito.he's avatar
vito.he committed
}

type consumerConfigurationListener struct {
	registry.BaseConfigurationListener
	listeners []registry.NotifyListener
	directory *RegistryDirectory
func newConsumerConfigurationListener(dir *RegistryDirectory) *consumerConfigurationListener {
vito.he's avatar
vito.he committed
	listener := &consumerConfigurationListener{directory: dir}
	listener.InitWith(
		config.GetConsumerConfig().ApplicationConfig.Name+constant.CONFIGURATORS_SUFFIX,
		listener,
		extension.GetDefaultConfiguratorFunc(),
	)
vito.he's avatar
vito.he committed
	return listener
}
高辛格's avatar
高辛格 committed

vito.he's avatar
vito.he committed
func (l *consumerConfigurationListener) addNotifyListener(listener registry.NotifyListener) {
	l.listeners = append(l.listeners, listener)
}
高辛格's avatar
高辛格 committed

renzhiyuan02's avatar
renzhiyuan02 committed
// Process handles events from Configuration Center and update Invokers
vito.he's avatar
vito.he committed
func (l *consumerConfigurationListener) Process(event *config_center.ConfigChangeEvent) {
	l.BaseConfigurationListener.Process(event)
	// FIXME: this doesn't trigger dir.overrideUrl()
vito.he's avatar
vito.he committed
}