Skip to content
Snippets Groups Projects
chain.go 8.98 KiB
Newer Older
邹毅贤's avatar
邹毅贤 committed
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package chain

邹毅贤's avatar
邹毅贤 committed
import (
	"sort"
邹毅贤's avatar
邹毅贤 committed
	"sync"
邹毅贤's avatar
邹毅贤 committed
)

import (
	perrors "github.com/pkg/errors"
)

邹毅贤's avatar
邹毅贤 committed
import (
	"github.com/apache/dubbo-go/cluster/router"
	"github.com/apache/dubbo-go/common"
Ian Luo's avatar
Ian Luo committed
	"github.com/apache/dubbo-go/common/constant"
邹毅贤's avatar
邹毅贤 committed
	"github.com/apache/dubbo-go/common/extension"
	"github.com/apache/dubbo-go/common/logger"
	"github.com/apache/dubbo-go/protocol"
)

const (
	timeInterval   = 5 * time.Second
	timeThreshold  = 2 * time.Second
	countThreshold = 5
)

邹毅贤's avatar
邹毅贤 committed
// RouterChain Router chain
邹毅贤's avatar
邹毅贤 committed
type RouterChain struct {
邹毅贤's avatar
邹毅贤 committed
	// Full list of addresses from registry, classified by method name.
邹毅贤's avatar
邹毅贤 committed
	invokers []protocol.Invoker
邹毅贤's avatar
邹毅贤 committed
	// Containing all routers, reconstruct every time 'route://' urls change.
邹毅贤's avatar
邹毅贤 committed
	routers []router.PriorityRouter
邹毅贤's avatar
邹毅贤 committed
	// Fixed router instances: ConfigConditionRouter, TagRouter, e.g., the rule for each instance may change but the
	// instance will never delete or recreate.
邹毅贤's avatar
邹毅贤 committed
	builtinRouters []router.PriorityRouter
邹毅贤's avatar
邹毅贤 committed

	mutex sync.RWMutex
邹毅贤's avatar
邹毅贤 committed

haohongfan's avatar
haohongfan committed
	url *common.URL
Ian Luo's avatar
Ian Luo committed

	// The times of address notification since last update for address cache
	count int64
Ian Luo's avatar
Ian Luo committed
	// The timestamp of last update for address cache
	last time.Time
	// Channel for notify to update the address cache
Ian Luo's avatar
Ian Luo committed
	notify chan struct{}
邹毅贤's avatar
邹毅贤 committed
}

邹毅贤's avatar
邹毅贤 committed
// Route Loop routers in RouterChain and call Route method to determine the target invokers list.
func (c *RouterChain) Route(url *common.URL, invocation protocol.Invocation) []protocol.Invoker {
	cache := c.loadCache()
	if cache == nil {
		c.mutex.RLock()
		defer c.mutex.RUnlock()
		return c.invokers
	}
Ian Luo's avatar
Ian Luo committed
	bitmap := cache.bitmap
	for _, r := range c.copyRouters() {
		bitmap = r.Route(bitmap, cache, url, invocation)
	}

	indexes := bitmap.ToArray()
	finalInvokers := make([]protocol.Invoker, len(indexes))
Ian Luo's avatar
Ian Luo committed
		finalInvokers[i] = cache.invokers[index]
Ian Luo's avatar
Ian Luo committed
	return finalInvokers
邹毅贤's avatar
邹毅贤 committed
}
邹毅贤's avatar
邹毅贤 committed

// AddRouters Add routers to router chain
// New a array add builtinRouters which is not sorted in RouterChain and routers
// Sort the array
// Replace router array in RouterChain
邹毅贤's avatar
邹毅贤 committed
func (c *RouterChain) AddRouters(routers []router.PriorityRouter) {
	newRouters := make([]router.PriorityRouter, 0, len(c.builtinRouters)+len(routers))
邹毅贤's avatar
邹毅贤 committed
	newRouters = append(newRouters, c.builtinRouters...)
	newRouters = append(newRouters, routers...)
	sortRouter(newRouters)
邹毅贤's avatar
邹毅贤 committed
	c.mutex.Lock()
	defer c.mutex.Unlock()
邹毅贤's avatar
邹毅贤 committed
	c.routers = newRouters
}

Ian Luo's avatar
Ian Luo committed
// SetInvokers receives updated invokers from registry center. If the times of notification exceeds countThreshold and
// time interval exceeds timeThreshold since last cache update, then notify to update the cache.
func (c *RouterChain) SetInvokers(invokers []protocol.Invoker) {
	c.mutex.Lock()
	c.invokers = invokers
	c.mutex.Unlock()

	c.count++
	now := time.Now()
	if c.count >= countThreshold && now.Sub(c.last) >= timeThreshold {
		c.last = now
		c.count = 0
		go func() {
Ian Luo's avatar
Ian Luo committed
			c.notify <- struct{}{}
Ian Luo's avatar
Ian Luo committed
// loop listens on events to update the address cache when it's necessary, either when it receives notification
// from address update, or when timeInterval exceeds.
func (c *RouterChain) loop() {
	ticker := time.NewTicker(timeInterval)
			c.buildCache()
Ian Luo's avatar
Ian Luo committed
		case <-c.notify:
Ian Luo's avatar
Ian Luo committed
// copyRouters make a snapshot copy from RouterChain's router list.
func (c *RouterChain) copyRouters() []router.PriorityRouter {
	c.mutex.RLock()
Ian Luo's avatar
Ian Luo committed
	defer c.mutex.RUnlock()
Ian Luo's avatar
Ian Luo committed
	ret := make([]router.PriorityRouter, 0, len(c.routers))
	ret = append(ret, c.routers...)
	return ret
// copyInvokers copies a snapshot of the received invokers.
func (c *RouterChain) copyInvokers() []protocol.Invoker {
	c.mutex.RLock()
	defer c.mutex.RUnlock()
	if c.invokers == nil || len(c.invokers) == 0 {
		return nil
	ret := make([]protocol.Invoker, 0, len(c.invokers))
	ret = append(ret, c.invokers...)
	return ret
// loadCache loads cache from sync.Value to guarantee the visibility
func (c *RouterChain) loadCache() *InvokerCache {
	v := c.cache.Load()
	if v == nil {
		return nil
// copyInvokerIfNecessary compares chain's invokers copy and cache's invokers copy, to avoid copy as much as possible
func (c *RouterChain) copyInvokerIfNecessary(cache *InvokerCache) []protocol.Invoker {
	var invokers []protocol.Invoker
	if cache != nil {
		invokers = cache.invokers
	}

	c.mutex.RLock()
	defer c.mutex.RUnlock()
	if isInvokersChanged(invokers, c.invokers) {
		invokers = c.copyInvokers()
	}
	return invokers
}

// buildCache builds address cache with the new invokers for all poolable routers.
func (c *RouterChain) buildCache() {
	origin := c.loadCache()
	invokers := c.copyInvokerIfNecessary(origin)
	if invokers == nil || len(invokers) == 0 {
Ian Luo's avatar
Ian Luo committed
	var (
		mutex sync.Mutex
		wg    sync.WaitGroup
	)
	cache := BuildCache(invokers)
	for _, r := range c.copyRouters() {
		if p, ok := r.(router.Poolable); ok {
			wg.Add(1)
			go func(p router.Poolable) {
beiwei.ly's avatar
beiwei.ly committed
				defer wg.Done()
				pool, info := poolRouter(p, origin, invokers)
				mutex.Lock()
Ian Luo's avatar
Ian Luo committed
				defer mutex.Unlock()
Ian Luo's avatar
Ian Luo committed
				cache.pools[p.Name()] = pool
				cache.metadatas[p.Name()] = info
邹毅贤's avatar
邹毅贤 committed
// URL Return URL in RouterChain
haohongfan's avatar
haohongfan committed
func (c *RouterChain) URL() *common.URL {
邹毅贤's avatar
邹毅贤 committed
	return c.url
}

邹毅贤's avatar
邹毅贤 committed
// NewRouterChain Use url to init router chain
// Loop routerFactories and call NewRouter method
邹毅贤's avatar
邹毅贤 committed
func NewRouterChain(url *common.URL) (*RouterChain, error) {
邹毅贤's avatar
邹毅贤 committed
	routerFactories := extension.GetRouterFactories()
邹毅贤's avatar
邹毅贤 committed
	if len(routerFactories) == 0 {
邹毅贤's avatar
邹毅贤 committed
		return nil, perrors.Errorf("No routerFactory exits , create one please")
邹毅贤's avatar
邹毅贤 committed
	}
邹毅贤's avatar
邹毅贤 committed
	routers := make([]router.PriorityRouter, 0, len(routerFactories))
邹毅贤's avatar
邹毅贤 committed
	for key, routerFactory := range routerFactories {
邹毅贤's avatar
邹毅贤 committed
		r, err := routerFactory().NewPriorityRouter(url)
邹毅贤's avatar
邹毅贤 committed
		if r == nil || err != nil {
邹毅贤's avatar
邹毅贤 committed
			logger.Errorf("router chain build router fail! routerFactories key:%s  error:%s", key, err.Error())
邹毅贤's avatar
邹毅贤 committed
			continue
		}
		routers = append(routers, r)
	}

邹毅贤's avatar
邹毅贤 committed
	newRouters := make([]router.PriorityRouter, len(routers))
邹毅贤's avatar
邹毅贤 committed
	copy(newRouters, routers)

	sortRouter(newRouters)

邹毅贤's avatar
邹毅贤 committed
	chain := &RouterChain{
邹毅贤's avatar
邹毅贤 committed
		builtinRouters: routers,
		routers:        newRouters,
		last:           time.Now(),
Ian Luo's avatar
Ian Luo committed
		notify:         make(chan struct{}),
邹毅贤's avatar
邹毅贤 committed
	}
邹毅贤's avatar
邹毅贤 committed
	if url != nil {
haohongfan's avatar
haohongfan committed
		chain.url = url
邹毅贤's avatar
邹毅贤 committed
	}
邹毅贤's avatar
邹毅贤 committed

	go chain.loop()
邹毅贤's avatar
邹毅贤 committed
	return chain, nil
邹毅贤's avatar
邹毅贤 committed
}

Ian Luo's avatar
Ian Luo committed
// poolRouter calls poolable router's Pool() to create new address pool and address metadata if necessary.
// If the corresponding cache entry exists, and the poolable router answers no need to re-pool (possibly because its
// rule doesn't change), and the address list doesn't change, then the existing data will be re-used.
func poolRouter(p router.Poolable, origin *InvokerCache, invokers []protocol.Invoker) (router.AddrPool, router.AddrMetadata) {
	name := p.Name()
	if isCacheMiss(origin, name) || p.ShouldPool() || &(origin.invokers) != &invokers {
		logger.Debugf("build address cache for router %q", name)
		return p.Pool(invokers)
	}
Ian Luo's avatar
Ian Luo committed

	logger.Debugf("reuse existing address cache for router %q", name)
	return origin.pools[name], origin.metadatas[name]
Ian Luo's avatar
Ian Luo committed
// isCacheMiss checks if the corresponding cache entry for a poolable router has already existed.
// False returns when the cache is nil, or cache's pool is nil, or cache's invokers snapshot is nil, or the entry
// doesn't exist.
func isCacheMiss(cache *InvokerCache, key string) bool {
	if cache == nil || cache.pools == nil || cache.invokers == nil || cache.pools[key] == nil {
		return true
	}
	return false
}

Ian Luo's avatar
Ian Luo committed
// isInvokersChanged compares new invokers on the right changes, compared with the old invokers on the left.
func isInvokersChanged(left []protocol.Invoker, right []protocol.Invoker) bool {
	if len(right) != len(left) {
		return true
	}

	for _, r := range right {
		found := false
		for _, l := range left {
Ian Luo's avatar
Ian Luo committed
			if common.IsEquals(l.GetUrl(), r.GetUrl(), constant.TIMESTAMP_KEY, constant.REMOTE_TIMESTAMP_KEY) {
				found = true
				break
			}
		}
		if !found {
			return true
		}
	}
	return false
}

邹毅贤's avatar
邹毅贤 committed
// sortRouter Sort router instance by priority with stable algorithm
邹毅贤's avatar
邹毅贤 committed
func sortRouter(routers []router.PriorityRouter) {
邹毅贤's avatar
邹毅贤 committed
	sort.Stable(byPriority(routers))
邹毅贤's avatar
邹毅贤 committed
}

邹毅贤's avatar
邹毅贤 committed
// byPriority Sort by priority
邹毅贤's avatar
邹毅贤 committed
type byPriority []router.PriorityRouter
邹毅贤's avatar
邹毅贤 committed

邹毅贤's avatar
邹毅贤 committed
func (a byPriority) Len() int           { return len(a) }
func (a byPriority) Swap(i, j int)      { a[i], a[j] = a[j], a[i] }
func (a byPriority) Less(i, j int) bool { return a[i].Priority() < a[j].Priority() }