Skip to content
Snippets Groups Projects
directory.go 7.28 KiB
Newer Older
vito.he's avatar
vito.he committed
package directory

import (
	"sync"
	"time"
)

import (
	log "github.com/AlexStocks/log4go"
	jerrors "github.com/juju/errors"
)

import (
	"github.com/dubbo/go-for-apache-dubbo/cluster/directory"
vito.he's avatar
vito.he committed
	"github.com/dubbo/go-for-apache-dubbo/common"
	"github.com/dubbo/go-for-apache-dubbo/common/constant"
	"github.com/dubbo/go-for-apache-dubbo/common/extension"
	"github.com/dubbo/go-for-apache-dubbo/protocol"
	"github.com/dubbo/go-for-apache-dubbo/protocol/protocolwrapper"
	"github.com/dubbo/go-for-apache-dubbo/registry"
fangyincheng's avatar
fangyincheng committed
const RegistryConnDelay = 3

vito.he's avatar
vito.he committed
type Options struct {
	serviceTTL time.Duration
}
type Option func(*Options)

type registryDirectory struct {
vito.he's avatar
vito.he committed
	directory.BaseDirectory
	cacheInvokers    []protocol.Invoker
	listenerLock     sync.Mutex
	serviceType      string
vito.he's avatar
vito.he committed
	registry         registry.Registry
	cacheInvokersMap *sync.Map //use sync.map
	//cacheInvokersMap map[string]protocol.Invoker
vito.he's avatar
vito.he committed
	Options
func NewRegistryDirectory(url *common.URL, registry registry.Registry, opts ...Option) (*registryDirectory, error) {
vito.he's avatar
vito.he committed
	options := Options{
		//default 300s
		serviceTTL: time.Duration(300e9),
	}
	for _, opt := range opts {
		opt(&options)
	}
	if url.SubURL == nil {
		return nil, jerrors.Errorf("url is invalid, suburl can not be nil")
	}
	return &registryDirectory{
vito.he's avatar
vito.he committed
		BaseDirectory:    directory.NewBaseDirectory(url),
		cacheInvokers:    []protocol.Invoker{},
		cacheInvokersMap: &sync.Map{},
		serviceType:      url.SubURL.Service(),
		registry:         registry,
		Options:          options,
vito.he's avatar
vito.he committed
//subscibe from registry
func (dir *registryDirectory) Subscribe(url common.URL) {
	for {
		if !dir.registry.IsAvailable() {
			log.Warn("event listener game over.")
			return
		}

		listener, err := dir.registry.Subscribe(url)
		if err != nil {
			if !dir.registry.IsAvailable() {
				log.Warn("event listener game over.")
				return
			}
			log.Warn("getListener() = err:%s", jerrors.ErrorStack(err))
fangyincheng's avatar
fangyincheng committed
			time.Sleep(time.Duration(RegistryConnDelay) * time.Second)
			continue
		}

		for {
			if serviceEvent, err := listener.Next(); err != nil {
				log.Warn("Selector.watch() = error{%v}", jerrors.ErrorStack(err))
				listener.Close()
fangyincheng's avatar
fangyincheng committed
				time.Sleep(time.Duration(RegistryConnDelay) * time.Second)
				return
			} else {
				log.Info("update begin, service event: %v", serviceEvent.String())
				go dir.update(serviceEvent)
vito.he's avatar
vito.he committed
//subscribe service from registry , and update the cacheServices
func (dir *registryDirectory) update(res *registry.ServiceEvent) {
	if res == nil {
		return
	}

	log.Debug("registry update, result{%s}", res)

vito.he's avatar
vito.he committed
	log.Debug("update service name: %s!", res.Service)
func (dir *registryDirectory) refreshInvokers(res *registry.ServiceEvent) {
	switch res.Action {
vito.he's avatar
vito.he committed
	case registry.ServiceAdd:
		//dir.cacheService.Add(res.Path, dir.serviceTTL)
		dir.cacheInvoker(res.Service)
vito.he's avatar
vito.he committed
	case registry.ServiceDel:
		//dir.cacheService.Del(res.Path, dir.serviceTTL)
		dir.uncacheInvoker(res.Service)
		log.Info("selector delete service url{%s}", res.Service)
	default:
		return
	}

	newInvokers := dir.toGroupInvokers()

	dir.listenerLock.Lock()
	defer dir.listenerLock.Unlock()
	dir.cacheInvokers = newInvokers
}

func (dir *registryDirectory) toGroupInvokers() []protocol.Invoker {

	newInvokersList := []protocol.Invoker{}
	groupInvokersMap := make(map[string][]protocol.Invoker)
	groupInvokersList := []protocol.Invoker{}

	dir.cacheInvokersMap.Range(func(key, value interface{}) bool {
		newInvokersList = append(newInvokersList, value.(protocol.Invoker))
		return true
	})
	for _, invoker := range newInvokersList {
		group := invoker.GetUrl().GetParam(constant.GROUP_KEY, "")

		if _, ok := groupInvokersMap[group]; ok {
			groupInvokersMap[group] = append(groupInvokersMap[group], invoker)
		} else {
vito.he's avatar
vito.he committed
			groupInvokersMap[group] = []protocol.Invoker{invoker}
	if len(groupInvokersMap) == 1 {
		//len is 1 it means no group setting ,so do not need cluster again
		groupInvokersList = groupInvokersMap[""]
	} else {
		for _, invokers := range groupInvokersMap {
vito.he's avatar
vito.he committed
			staticDir := directory.NewStaticDirectory(invokers)
			cluster := extension.GetCluster(dir.GetUrl().SubURL.GetParam(constant.CLUSTER_KEY, constant.DEFAULT_CLUSTER))
			groupInvokersList = append(groupInvokersList, cluster.Join(staticDir))
		}
	}

	return groupInvokersList
func (dir *registryDirectory) uncacheInvoker(url common.URL) {
vito.he's avatar
vito.he committed
	log.Debug("service will be deleted in cache invokers: invokers key is  %s!", url.Key())
	dir.cacheInvokersMap.Delete(url.Key())
func (dir *registryDirectory) cacheInvoker(url common.URL) {
	referenceUrl := dir.GetUrl().SubURL
	//check the url's protocol is equal to the protocol which is configured in reference config or referenceUrl is not care about protocol
	if url.Protocol == referenceUrl.Protocol || referenceUrl.Protocol == "" {
		url = mergeUrl(url, referenceUrl)

		if _, ok := dir.cacheInvokersMap.Load(url.Key()); !ok {
vito.he's avatar
vito.he committed
			log.Debug("service will be added in cache invokers: invokers key is  %s!", url.Key())
			newInvoker := extension.GetProtocolExtension(protocolwrapper.FILTER).Refer(url)
				dir.cacheInvokersMap.Store(url.Key(), newInvoker)
		}
	}
}

//select the protocol invokers from the directory
func (dir *registryDirectory) List(invocation protocol.Invocation) []protocol.Invoker {
	//TODO:router
	return dir.cacheInvokers
func (dir *registryDirectory) IsAvailable() bool {
	if !dir.BaseDirectory.IsAvailable() {
		return dir.BaseDirectory.IsAvailable()
	} else {
		for _, ivk := range dir.cacheInvokers {
			if ivk.IsAvailable() {
				return true
			}
		}
	}
	return false
func (dir *registryDirectory) Destroy() {
	//TODO:unregister & unsubscribe
	dir.BaseDirectory.Destroy(func() {
		for _, ivk := range dir.cacheInvokers {
			ivk.Destroy()
		}
		dir.cacheInvokers = []protocol.Invoker{}
	})
vito.he's avatar
vito.he committed
// configuration  > reference config >service config
//  in this function we should merge the reference local url config into the service url from registry.
vito.he's avatar
vito.he committed
//TODO configuration merge, in the future , the configuration center's config should merge too.
vito.he's avatar
vito.he committed
func mergeUrl(serviceUrl common.URL, referenceUrl *common.URL) common.URL {
	mergedUrl := serviceUrl
	var methodConfigMergeFcn = []func(method string){}

vito.he's avatar
vito.he committed
	//loadBalance strategy config
	if v := referenceUrl.Params.Get(constant.LOADBALANCE_KEY); v != "" {
		mergedUrl.Params.Set(constant.LOADBALANCE_KEY, v)
	}
	methodConfigMergeFcn = append(methodConfigMergeFcn, func(method string) {
		if v := referenceUrl.Params.Get(method + "." + constant.LOADBALANCE_KEY); v != "" {
			mergedUrl.Params.Set(method+"."+constant.LOADBALANCE_KEY, v)
		}
	})
vito.he's avatar
vito.he committed

	//cluster strategy config
	if v := referenceUrl.Params.Get(constant.CLUSTER_KEY); v != "" {
		mergedUrl.Params.Set(constant.CLUSTER_KEY, v)
	}
	methodConfigMergeFcn = append(methodConfigMergeFcn, func(method string) {
		if v := referenceUrl.Params.Get(method + "." + constant.CLUSTER_KEY); v != "" {
			mergedUrl.Params.Set(method+"."+constant.CLUSTER_KEY, v)
		}
	})

	//remote timestamp
	if v := serviceUrl.Params.Get(constant.TIMESTAMP_KEY); v != "" {
		mergedUrl.Params.Set(constant.REMOTE_TIMESTAMP_KEY, v)
		mergedUrl.Params.Set(constant.TIMESTAMP_KEY, referenceUrl.Params.Get(constant.TIMESTAMP_KEY))
	}

	//finally execute methodConfigMergeFcn
	for _, method := range referenceUrl.Methods {
		for _, fcn := range methodConfigMergeFcn {
			fcn("methods." + method)
		}
	}
	return mergedUrl