Skip to content
Snippets Groups Projects
directory.go 9.66 KiB
Newer Older
AlexStocks's avatar
AlexStocks committed
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
fangyincheng's avatar
fangyincheng committed

vito.he's avatar
vito.he committed
package directory

import (
	"sync"
	"time"
)

import (
fangyincheng's avatar
fangyincheng committed
	perrors "github.com/pkg/errors"
	"github.com/apache/dubbo-go/cluster/directory"
	"github.com/apache/dubbo-go/common"
	"github.com/apache/dubbo-go/common/constant"
	"github.com/apache/dubbo-go/common/extension"
fangyincheng's avatar
fangyincheng committed
	"github.com/apache/dubbo-go/common/logger"
vito.he's avatar
vito.he committed
	"github.com/apache/dubbo-go/config"
	"github.com/apache/dubbo-go/config_center"
vito.he's avatar
vito.he committed
	_ "github.com/apache/dubbo-go/config_center/configurator"
	"github.com/apache/dubbo-go/protocol"
	"github.com/apache/dubbo-go/protocol/protocolwrapper"
	"github.com/apache/dubbo-go/registry"
	"github.com/apache/dubbo-go/remoting"
vito.he's avatar
vito.he committed
type Options struct {
	serviceTTL time.Duration
}
vito.he's avatar
vito.he committed
type Option func(*Options)

type registryDirectory struct {
vito.he's avatar
vito.he committed
	directory.BaseDirectory
vito.he's avatar
vito.he committed
	cacheInvokers                  []protocol.Invoker
	listenerLock                   sync.Mutex
	serviceType                    string
	registry                       registry.Registry
	cacheInvokersMap               *sync.Map //use sync.map
	cacheOriginUrl                 *common.URL
	configurators                  []config_center.Configurator
	consumerConfigurationListener  *consumerConfigurationListener
	referenceConfigurationListener *referenceConfigurationListener
vito.he's avatar
vito.he committed
	Options
// NewRegistryDirectory ...
func NewRegistryDirectory(url *common.URL, registry registry.Registry, opts ...Option) (*registryDirectory, error) {
vito.he's avatar
vito.he committed
	options := Options{
		//default 300s
		serviceTTL: time.Duration(300e9),
	}
	for _, opt := range opts {
		opt(&options)
	}
fangyincheng's avatar
fangyincheng committed
		return nil, perrors.Errorf("url is invalid, suburl can not be nil")
vito.he's avatar
vito.he committed
	dir := &registryDirectory{
vito.he's avatar
vito.he committed
		BaseDirectory:    directory.NewBaseDirectory(url),
		cacheInvokers:    []protocol.Invoker{},
		cacheInvokersMap: &sync.Map{},
		serviceType:      url.SubURL.Service(),
		registry:         registry,
		Options:          options,
vito.he's avatar
vito.he committed
	}
	dir.consumerConfigurationListener = newConsumerConfigurationListener(dir)
	return dir, nil
高辛格's avatar
高辛格 committed
//subscribe from registry
func (dir *registryDirectory) Subscribe(url *common.URL) {
vito.he's avatar
vito.he committed
	dir.consumerConfigurationListener.addNotifyListener(dir)
	dir.referenceConfigurationListener = newReferenceConfigurationListener(dir, url)
	dir.registry.Subscribe(url, dir)
vito.he's avatar
vito.he committed
func (dir *registryDirectory) Notify(event *registry.ServiceEvent) {
	go dir.update(event)
高辛格's avatar
高辛格 committed
//subscribe service from registry, and update the cacheServices
func (dir *registryDirectory) update(res *registry.ServiceEvent) {
	if res == nil {
		return
	}
fangyincheng's avatar
fangyincheng committed
	logger.Debugf("registry update, result{%s}", res)
	logger.Debugf("update service name: %s!", res.Service)
func (dir *registryDirectory) refreshInvokers(res *registry.ServiceEvent) {
vito.he's avatar
vito.he committed
	var url *common.URL
	//judge is override or others
	if res != nil {
		url = &res.Service
		//1.for override url in 2.6.x
		if url.Protocol == constant.OVERRIDE_PROTOCOL ||
			url.GetParam(constant.CATEGORY_KEY, constant.DEFAULT_CATEGORY) == constant.CONFIGURATORS_CATEGORY {
			dir.configurators = append(dir.configurators, extension.GetDefaultConfigurator(url))
			url = nil
vito.he's avatar
vito.he committed
		} else if url.Protocol == constant.ROUTER_PROTOCOL || //2.for router
			url.GetParam(constant.CATEGORY_KEY, constant.DEFAULT_CATEGORY) == constant.ROUTER_CATEGORY {
			url = nil
vito.he's avatar
vito.he committed
			//TODO: router
		}
pantianying's avatar
pantianying committed
		switch res.Action {
		case remoting.EventTypeAdd, remoting.EventTypeUpdate:
			//dir.cacheService.EventTypeAdd(res.Path, dir.serviceTTL)
			dir.cacheInvoker(url)
		case remoting.EventTypeDel:
			//dir.cacheService.EventTypeDel(res.Path, dir.serviceTTL)
			dir.uncacheInvoker(url)
			logger.Infof("selector delete service url{%s}", res.Service)
		default:
			return
		}
	newInvokers := dir.toGroupInvokers()
	dir.listenerLock.Lock()
	defer dir.listenerLock.Unlock()
	dir.cacheInvokers = newInvokers
}

func (dir *registryDirectory) toGroupInvokers() []protocol.Invoker {
	newInvokersList := []protocol.Invoker{}
	groupInvokersMap := make(map[string][]protocol.Invoker)
	groupInvokersList := []protocol.Invoker{}

	dir.cacheInvokersMap.Range(func(key, value interface{}) bool {
		newInvokersList = append(newInvokersList, value.(protocol.Invoker))
		return true
	})
	for _, invoker := range newInvokersList {
		group := invoker.GetUrl().GetParam(constant.GROUP_KEY, "")

		if _, ok := groupInvokersMap[group]; ok {
			groupInvokersMap[group] = append(groupInvokersMap[group], invoker)
		} else {
vito.he's avatar
vito.he committed
			groupInvokersMap[group] = []protocol.Invoker{invoker}
	if len(groupInvokersMap) == 1 {
		//len is 1 it means no group setting ,so do not need cluster again
		for _, invokers := range groupInvokersMap {
			groupInvokersList = invokers
		}
	} else {
		for _, invokers := range groupInvokersMap {
vito.he's avatar
vito.he committed
			staticDir := directory.NewStaticDirectory(invokers)
			cluster := extension.GetCluster(dir.GetUrl().SubURL.GetParam(constant.CLUSTER_KEY, constant.DEFAULT_CLUSTER))
			groupInvokersList = append(groupInvokersList, cluster.Join(staticDir))
		}
	}

	return groupInvokersList
vito.he's avatar
vito.he committed
func (dir *registryDirectory) uncacheInvoker(url *common.URL) {
fangyincheng's avatar
fangyincheng committed
	logger.Debugf("service will be deleted in cache invokers: invokers key is  %s!", url.Key())
	dir.cacheInvokersMap.Delete(url.Key())
vito.he's avatar
vito.he committed
func (dir *registryDirectory) cacheInvoker(url *common.URL) {
	dir.overrideUrl(dir.GetDirectoryUrl())
	referenceUrl := dir.GetDirectoryUrl().SubURL

	if url == nil && dir.cacheOriginUrl != nil {
		url = dir.cacheOriginUrl
	} else {
		dir.cacheOriginUrl = url
	}
	if url == nil {
		logger.Error("URL is nil ,pls check if service url is subscribe successfully!")
		return
	}
	//check the url's protocol is equal to the protocol which is configured in reference config or referenceUrl is not care about protocol
	if url.Protocol == referenceUrl.Protocol || referenceUrl.Protocol == "" {
vito.he's avatar
vito.he committed
		newUrl := common.MergeUrl(url, referenceUrl)
		dir.overrideUrl(newUrl)
		if cacheInvoker, ok := dir.cacheInvokersMap.Load(newUrl.Key()); !ok {
			logger.Infof("service will be added in cache invokers: invokers url is  %s!", newUrl)
vito.he's avatar
vito.he committed
			newInvoker := extension.GetProtocol(protocolwrapper.FILTER).Refer(*newUrl)
vito.he's avatar
vito.he committed
				dir.cacheInvokersMap.Store(newUrl.Key(), newInvoker)
		} else {
			logger.Infof("service will be updated in cache invokers: new invoker url is %s, old invoker url is %s", newUrl, cacheInvoker.(protocol.Invoker).GetUrl())
			newInvoker := extension.GetProtocol(protocolwrapper.FILTER).Refer(*newUrl)
			if newInvoker != nil {
				dir.cacheInvokersMap.Store(newUrl.Key(), newInvoker)
				cacheInvoker.(protocol.Invoker).Destroy()
			}
		}
	}
}

//select the protocol invokers from the directory
func (dir *registryDirectory) List(invocation protocol.Invocation) []protocol.Invoker {
	//TODO:router
	return dir.cacheInvokers
func (dir *registryDirectory) IsAvailable() bool {
	if !dir.BaseDirectory.IsAvailable() {
		return dir.BaseDirectory.IsAvailable()
AlexStocks's avatar
AlexStocks committed
	}

	for _, ivk := range dir.cacheInvokers {
		if ivk.IsAvailable() {
			return true
	return false
func (dir *registryDirectory) Destroy() {
	//TODO:unregister & unsubscribe
	dir.BaseDirectory.Destroy(func() {
		for _, ivk := range dir.cacheInvokers {
			ivk.Destroy()
		}
		dir.cacheInvokers = []protocol.Invoker{}
	})
高辛格's avatar
高辛格 committed

vito.he's avatar
vito.he committed
func (dir *registryDirectory) overrideUrl(targetUrl *common.URL) {
	doOverrideUrl(dir.configurators, targetUrl)
	doOverrideUrl(dir.consumerConfigurationListener.Configurators(), targetUrl)
	doOverrideUrl(dir.referenceConfigurationListener.Configurators(), targetUrl)
}

func doOverrideUrl(configurators []config_center.Configurator, targetUrl *common.URL) {
	for _, v := range configurators {
		v.Configure(targetUrl)
	}
}
vito.he's avatar
vito.he committed

type referenceConfigurationListener struct {
vito.he's avatar
vito.he committed
	registry.BaseConfigurationListener
vito.he's avatar
vito.he committed
	directory *registryDirectory
	url       *common.URL
}

vito.he's avatar
vito.he committed
func newReferenceConfigurationListener(dir *registryDirectory, url *common.URL) *referenceConfigurationListener {
	listener := &referenceConfigurationListener{directory: dir, url: url}
	listener.InitWith(
		url.EncodedServiceKey()+constant.CONFIGURATORS_SUFFIX,
		listener,
		extension.GetDefaultConfiguratorFunc(),
	)
vito.he's avatar
vito.he committed
	return listener
}

func (l *referenceConfigurationListener) Process(event *config_center.ConfigChangeEvent) {
vito.he's avatar
vito.he committed
	l.BaseConfigurationListener.Process(event)
	l.directory.refreshInvokers(nil)
}

type consumerConfigurationListener struct {
	registry.BaseConfigurationListener
	listeners []registry.NotifyListener
	directory *registryDirectory
}

func newConsumerConfigurationListener(dir *registryDirectory) *consumerConfigurationListener {
	listener := &consumerConfigurationListener{directory: dir}
	listener.InitWith(
		config.GetConsumerConfig().ApplicationConfig.Name+constant.CONFIGURATORS_SUFFIX,
		listener,
		extension.GetDefaultConfiguratorFunc(),
	)
vito.he's avatar
vito.he committed
	return listener
}
高辛格's avatar
高辛格 committed

vito.he's avatar
vito.he committed
func (l *consumerConfigurationListener) addNotifyListener(listener registry.NotifyListener) {
	l.listeners = append(l.listeners, listener)
}
高辛格's avatar
高辛格 committed

vito.he's avatar
vito.he committed
func (l *consumerConfigurationListener) Process(event *config_center.ConfigChangeEvent) {
	l.BaseConfigurationListener.Process(event)
	l.directory.refreshInvokers(nil)
vito.he's avatar
vito.he committed
}