Skip to content
Snippets Groups Projects
directory.go 12 KiB
Newer Older
AlexStocks's avatar
AlexStocks committed
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
fangyincheng's avatar
fangyincheng committed

vito.he's avatar
vito.he committed
package directory

import (
	"sync"
)

import (
fangyincheng's avatar
fangyincheng committed
	perrors "github.com/pkg/errors"
aliiohs's avatar
aliiohs committed
	"go.uber.org/atomic"
	"github.com/apache/dubbo-go/cluster"
	"github.com/apache/dubbo-go/cluster/directory"
	"github.com/apache/dubbo-go/cluster/router/chain"
	"github.com/apache/dubbo-go/common"
	"github.com/apache/dubbo-go/common/constant"
	"github.com/apache/dubbo-go/common/extension"
fangyincheng's avatar
fangyincheng committed
	"github.com/apache/dubbo-go/common/logger"
vito.he's avatar
vito.he committed
	"github.com/apache/dubbo-go/config"
	"github.com/apache/dubbo-go/config_center"
vito.he's avatar
vito.he committed
	_ "github.com/apache/dubbo-go/config_center/configurator"
	"github.com/apache/dubbo-go/protocol"
	"github.com/apache/dubbo-go/protocol/protocolwrapper"
	"github.com/apache/dubbo-go/registry"
	"github.com/apache/dubbo-go/remoting"
func init() {
	extension.SetDefaultRegistryDirectory(NewRegistryDirectory)
// RegistryDirectory implementation of Directory:
// Invoker list returned from this Directory's list method have been filtered by Routers
type RegistryDirectory struct {
vito.he's avatar
vito.he committed
	directory.BaseDirectory
vito.he's avatar
vito.he committed
	cacheInvokers                  []protocol.Invoker
	listenerLock                   sync.Mutex
	serviceType                    string
	registry                       registry.Registry
flycash's avatar
flycash committed
	cacheInvokersMap               *sync.Map // use sync.map
Ian Luo's avatar
Ian Luo committed
	consumerURL                    *common.URL
vito.he's avatar
vito.he committed
	cacheOriginUrl                 *common.URL
	configurators                  []config_center.Configurator
	consumerConfigurationListener  *consumerConfigurationListener
	referenceConfigurationListener *referenceConfigurationListener
	serviceKey                     string
	forbidden                      atomic.Bool
renzhiyuan02's avatar
renzhiyuan02 committed
// NewRegistryDirectory will create a new RegistryDirectory
func NewRegistryDirectory(url *common.URL, registry registry.Registry) (cluster.Directory, error) {
fangyincheng's avatar
fangyincheng committed
		return nil, perrors.Errorf("url is invalid, suburl can not be nil")
	dir := &RegistryDirectory{
vito.he's avatar
vito.he committed
		BaseDirectory:    directory.NewBaseDirectory(url),
		cacheInvokers:    []protocol.Invoker{},
		cacheInvokersMap: &sync.Map{},
		serviceType:      url.SubURL.Service(),
		registry:         registry,
vito.he's avatar
vito.he committed
	}
Ian Luo's avatar
Ian Luo committed
	dir.consumerURL = dir.getConsumerUrl(url.SubURL)
Ian Luo's avatar
Ian Luo committed
	if routerChain, err := chain.NewRouterChain(dir.consumerURL); err == nil {
		dir.BaseDirectory.SetRouterChain(routerChain)
	} else {
		logger.Warnf("fail to create router chain with url: %s, err is: %v", url.SubURL, err)
	}

vito.he's avatar
vito.he committed
	dir.consumerConfigurationListener = newConsumerConfigurationListener(dir)

	go dir.subscribe(url.SubURL)
vito.he's avatar
vito.he committed
	return dir, nil
flycash's avatar
flycash committed
// subscribe from registry
func (dir *RegistryDirectory) subscribe(url *common.URL) {
邹毅贤's avatar
邹毅贤 committed
	dir.consumerConfigurationListener.addNotifyListener(dir)
	dir.referenceConfigurationListener = newReferenceConfigurationListener(dir, url)
	dir.registry.Subscribe(url, dir)
renzhiyuan02's avatar
renzhiyuan02 committed
// Notify monitor changes from registry,and update the cacheServices
func (dir *RegistryDirectory) Notify(event *registry.ServiceEvent) {
vito.he's avatar
vito.he committed
	go dir.update(event)
// update the cacheServices and subscribe service from registry
func (dir *RegistryDirectory) update(res *registry.ServiceEvent) {
	if res == nil {
		return
	}
fangyincheng's avatar
fangyincheng committed
	logger.Debugf("registry update, result{%s}", res)
	logger.Debugf("update service name: %s!", res.Service)
func (dir *RegistryDirectory) refreshInvokers(res *registry.ServiceEvent) {
pantianying's avatar
pantianying committed
	var (
		url        *common.URL
		oldInvoker protocol.Invoker = nil
	)
flycash's avatar
flycash committed
	// judge is override or others
vito.he's avatar
vito.he committed
	if res != nil {
		url = &res.Service
flycash's avatar
flycash committed
		// 1.for override url in 2.6.x
vito.he's avatar
vito.he committed
		if url.Protocol == constant.OVERRIDE_PROTOCOL ||
			url.GetParam(constant.CATEGORY_KEY, constant.DEFAULT_CATEGORY) == constant.CONFIGURATORS_CATEGORY {
			dir.configurators = append(dir.configurators, extension.GetDefaultConfigurator(url))
			url = nil
flycash's avatar
flycash committed
		} else if url.Protocol == constant.ROUTER_PROTOCOL || // 2.for router
vito.he's avatar
vito.he committed
			url.GetParam(constant.CATEGORY_KEY, constant.DEFAULT_CATEGORY) == constant.ROUTER_CATEGORY {
			url = nil
vito.he's avatar
vito.he committed
		}
pantianying's avatar
pantianying committed
		switch res.Action {
邹毅贤's avatar
邹毅贤 committed
		case remoting.EventTypeAdd, remoting.EventTypeUpdate:
pantianying's avatar
pantianying committed
			logger.Infof("selector add service url{%s}", res.Service)
邹毅贤's avatar
邹毅贤 committed

			var urls []*common.URL
flycash's avatar
flycash committed
			for _, v := range config.GetRouterURLSet().Values() {
邹毅贤's avatar
邹毅贤 committed
				urls = append(urls, v.(*common.URL))
			}

邹毅贤's avatar
邹毅贤 committed
			if len(urls) > 0 {
邹毅贤's avatar
邹毅贤 committed
				dir.SetRouters(urls)
邹毅贤's avatar
邹毅贤 committed
			}
			oldInvoker = dir.cacheInvoker(url)
邹毅贤's avatar
邹毅贤 committed
		case remoting.EventTypeDel:
pantianying's avatar
pantianying committed
			oldInvoker = dir.uncacheInvoker(url)
邹毅贤's avatar
邹毅贤 committed
			logger.Infof("selector delete service url{%s}", res.Service)
		default:
			return
		}
	newInvokers := dir.toGroupInvokers()
	dir.listenerLock.Lock()
	dir.cacheInvokers = newInvokers
Ian Luo's avatar
Ian Luo committed
	if res != nil {
		dir.RouterChain().SetInvokers(newInvokers)
	}
pantianying's avatar
pantianying committed
	dir.listenerLock.Unlock()
	// After dir.cacheInvokers is updated,destroy the oldInvoker
	// Ensure that no request will enter the oldInvoker
	if oldInvoker != nil {
		oldInvoker.Destroy()
	}

func (dir *RegistryDirectory) toGroupInvokers() []protocol.Invoker {
watermelo's avatar
watermelo committed
	var (
		err             error
		newInvokersList []protocol.Invoker
	)
	groupInvokersMap := make(map[string][]protocol.Invoker)

	dir.cacheInvokersMap.Range(func(key, value interface{}) bool {
		newInvokersList = append(newInvokersList, value.(protocol.Invoker))
		return true
	})
	for _, invoker := range newInvokersList {
		group := invoker.GetUrl().GetParam(constant.GROUP_KEY, "")

		if _, ok := groupInvokersMap[group]; ok {
			groupInvokersMap[group] = append(groupInvokersMap[group], invoker)
		} else {
vito.he's avatar
vito.he committed
			groupInvokersMap[group] = []protocol.Invoker{invoker}
watermelo's avatar
watermelo committed
	groupInvokersList := make([]protocol.Invoker, 0, len(groupInvokersMap))
	if len(groupInvokersMap) == 1 {
flycash's avatar
flycash committed
		// len is 1 it means no group setting ,so do not need cluster again
		for _, invokers := range groupInvokersMap {
			groupInvokersList = invokers
		}
	} else {
		for _, invokers := range groupInvokersMap {
vito.he's avatar
vito.he committed
			staticDir := directory.NewStaticDirectory(invokers)
watermelo's avatar
watermelo committed
			cst := extension.GetCluster(dir.GetUrl().SubURL.GetParam(constant.CLUSTER_KEY, constant.DEFAULT_CLUSTER))
			err = staticDir.BuildRouterChain(invokers)
			if err != nil {
				logger.Error(err)
				continue
			}
			groupInvokersList = append(groupInvokersList, cst.Join(staticDir))
// uncacheInvoker will return abandoned Invoker,if no Invoker to be abandoned,return nil
func (dir *RegistryDirectory) uncacheInvoker(url *common.URL) protocol.Invoker {
fangyincheng's avatar
fangyincheng committed
	logger.Debugf("service will be deleted in cache invokers: invokers key is  %s!", url.Key())
pantianying's avatar
pantianying committed
	if cacheInvoker, ok := dir.cacheInvokersMap.Load(url.Key()); ok {
		dir.cacheInvokersMap.Delete(url.Key())
		return cacheInvoker.(protocol.Invoker)
	}
	return nil
// cacheInvoker will return abandoned Invoker,if no Invoker to be abandoned,return nil
func (dir *RegistryDirectory) cacheInvoker(url *common.URL) protocol.Invoker {
vito.he's avatar
vito.he committed
	dir.overrideUrl(dir.GetDirectoryUrl())
	referenceUrl := dir.GetDirectoryUrl().SubURL

	if url == nil && dir.cacheOriginUrl != nil {
		url = dir.cacheOriginUrl
Ian Luo's avatar
Ian Luo committed
	} else {
		dir.cacheOriginUrl = url
vito.he's avatar
vito.he committed
	}
	if url == nil {
		logger.Error("URL is nil ,pls check if service url is subscribe successfully!")
pantianying's avatar
pantianying committed
		return nil
vito.he's avatar
vito.he committed
	}
flycash's avatar
flycash committed
	// check the url's protocol is equal to the protocol which is configured in reference config or referenceUrl is not care about protocol
	if url.Protocol == referenceUrl.Protocol || referenceUrl.Protocol == "" {
vito.he's avatar
vito.he committed
		newUrl := common.MergeUrl(url, referenceUrl)
		dir.overrideUrl(newUrl)
		if cacheInvoker, ok := dir.cacheInvokersMap.Load(newUrl.Key()); !ok {
pantianying's avatar
pantianying committed
			logger.Debugf("service will be added in cache invokers: invokers url is  %s!", newUrl)
vito.he's avatar
vito.he committed
			newInvoker := extension.GetProtocol(protocolwrapper.FILTER).Refer(*newUrl)
vito.he's avatar
vito.he committed
				dir.cacheInvokersMap.Store(newUrl.Key(), newInvoker)
		} else {
pantianying's avatar
pantianying committed
			logger.Debugf("service will be updated in cache invokers: new invoker url is %s, old invoker url is %s", newUrl, cacheInvoker.(protocol.Invoker).GetUrl())
			newInvoker := extension.GetProtocol(protocolwrapper.FILTER).Refer(*newUrl)
			if newInvoker != nil {
				dir.cacheInvokersMap.Store(newUrl.Key(), newInvoker)
pantianying's avatar
pantianying committed
				return cacheInvoker.(protocol.Invoker)
pantianying's avatar
pantianying committed
	return nil
// List selected protocol invokers from the directory
func (dir *RegistryDirectory) List(invocation protocol.Invocation) []protocol.Invoker {
aliiohs's avatar
aliiohs committed
	invokers := dir.cacheInvokers
邹毅贤's avatar
邹毅贤 committed
	routerChain := dir.RouterChain()
aliiohs's avatar
aliiohs committed

邹毅贤's avatar
邹毅贤 committed
	if routerChain == nil {
		return invokers
aliiohs's avatar
aliiohs committed
	}
Ian Luo's avatar
Ian Luo committed
	return routerChain.Route(dir.consumerURL, invocation)
renzhiyuan's avatar
renzhiyuan committed
// IsAvailable  whether the directory is available
func (dir *RegistryDirectory) IsAvailable() bool {
	if !dir.BaseDirectory.IsAvailable() {
		return dir.BaseDirectory.IsAvailable()
AlexStocks's avatar
AlexStocks committed
	}

	for _, ivk := range dir.cacheInvokers {
		if ivk.IsAvailable() {
			return true
	return false
renzhiyuan02's avatar
renzhiyuan02 committed
// Destroy method
func (dir *RegistryDirectory) Destroy() {
flycash's avatar
flycash committed
	// TODO:unregister & unsubscribe
	dir.BaseDirectory.Destroy(func() {
pantianying's avatar
pantianying committed
		invokers := dir.cacheInvokers
		dir.cacheInvokers = []protocol.Invoker{}
		for _, ivk := range invokers {
			ivk.Destroy()
		}
	})
高辛格's avatar
高辛格 committed

func (dir *RegistryDirectory) overrideUrl(targetUrl *common.URL) {
vito.he's avatar
vito.he committed
	doOverrideUrl(dir.configurators, targetUrl)
	doOverrideUrl(dir.consumerConfigurationListener.Configurators(), targetUrl)
	doOverrideUrl(dir.referenceConfigurationListener.Configurators(), targetUrl)
}

func (dir *RegistryDirectory) getConsumerUrl(c *common.URL) *common.URL {
	processID := fmt.Sprintf("%d", os.Getpid())
	localIP, _ := gxnet.GetLocalIP()

	params := url.Values{}
	c.RangeParams(func(key, value string) bool {
		params.Add(key, value)
		return true
	})

	params.Add("pid", processID)
	params.Add("ip", localIP)
	params.Add("protocol", c.Protocol)

	return common.NewURLWithOptions(common.WithProtocol("consumer"), common.WithIp(localIP), common.WithPath(c.Path),
		common.WithParams(params))
}

vito.he's avatar
vito.he committed
func doOverrideUrl(configurators []config_center.Configurator, targetUrl *common.URL) {
	for _, v := range configurators {
		v.Configure(targetUrl)
	}
}
vito.he's avatar
vito.he committed

type referenceConfigurationListener struct {
vito.he's avatar
vito.he committed
	registry.BaseConfigurationListener
	directory *RegistryDirectory
vito.he's avatar
vito.he committed
	url       *common.URL
}

func newReferenceConfigurationListener(dir *RegistryDirectory, url *common.URL) *referenceConfigurationListener {
vito.he's avatar
vito.he committed
	listener := &referenceConfigurationListener{directory: dir, url: url}
	listener.InitWith(
		url.EncodedServiceKey()+constant.CONFIGURATORS_SUFFIX,
		listener,
		extension.GetDefaultConfiguratorFunc(),
	)
vito.he's avatar
vito.he committed
	return listener
}

renzhiyuan's avatar
renzhiyuan committed
// Process handle events and update Invokers
func (l *referenceConfigurationListener) Process(event *config_center.ConfigChangeEvent) {
vito.he's avatar
vito.he committed
	l.BaseConfigurationListener.Process(event)
	l.directory.refreshInvokers(nil)
}

type consumerConfigurationListener struct {
	registry.BaseConfigurationListener
	listeners []registry.NotifyListener
	directory *RegistryDirectory
func newConsumerConfigurationListener(dir *RegistryDirectory) *consumerConfigurationListener {
vito.he's avatar
vito.he committed
	listener := &consumerConfigurationListener{directory: dir}
	listener.InitWith(
		config.GetConsumerConfig().ApplicationConfig.Name+constant.CONFIGURATORS_SUFFIX,
		listener,
		extension.GetDefaultConfiguratorFunc(),
	)
vito.he's avatar
vito.he committed
	return listener
}
高辛格's avatar
高辛格 committed

vito.he's avatar
vito.he committed
func (l *consumerConfigurationListener) addNotifyListener(listener registry.NotifyListener) {
	l.listeners = append(l.listeners, listener)
}
高辛格's avatar
高辛格 committed

renzhiyuan02's avatar
renzhiyuan02 committed
// Process handles events from Configuration Center and update Invokers
vito.he's avatar
vito.he committed
func (l *consumerConfigurationListener) Process(event *config_center.ConfigChangeEvent) {
	l.BaseConfigurationListener.Process(event)
	l.directory.refreshInvokers(nil)
vito.he's avatar
vito.he committed
}