Skip to content
Snippets Groups Projects
Unverified Commit 3df419d0 authored by Ian Luo's avatar Ian Luo Committed by GitHub
Browse files

fix: issue #1151 - router from CONF_ROUTER_FILE_PATH doesn't take affect (#1176)


* fix: issue #1151 - router from CONF_ROUTER_FILE_PATH doesn't take affect

* reformat the code

Co-authored-by: default avatarAlexStocks <alexstocks@foxmail.com>
parent 86151f32
Branches main
No related tags found
No related merge requests found
......@@ -74,42 +74,46 @@ func (dir *BaseDirectory) GetDirectoryUrl() *common.URL {
return dir.url
}
// SetRouters Convert url to routers and add them into dir.routerChain
func (dir *BaseDirectory) SetRouters(urls []*common.URL) {
// AddRouters Convert url to routers and add them into dir.routerChain
func (dir *BaseDirectory) AddRouters(urls []*common.URL) {
if len(urls) == 0 {
return
}
routers := make([]router.PriorityRouter, 0, len(urls))
rc := dir.routerChain
for _, url := range urls {
routerKey := url.GetParam(constant.ROUTER_KEY, "")
if len(routerKey) == 0 {
continue
}
if url.Protocol == constant.CONDITION_ROUTE_PROTOCOL {
if !dir.isProperRouter(url) {
continue
}
if r := dir.buildRouter(url); r != nil {
routers = append(routers, r)
}
factory := extension.GetRouterFactory(url.Protocol)
r, err := factory.NewPriorityRouter(url, rc.GetNotifyChan())
if err != nil {
logger.Errorf("Create router fail. router key: %s, url:%s, error: %+v", routerKey, url.Service(), err)
return
}
routers = append(routers, r)
}
logger.Infof("Init file condition router success, size: %v", len(routers))
dir.mutex.Lock()
rc.AddRouters(routers)
dir.routerChain.AddRouters(routers)
dir.mutex.Unlock()
}
func (dir *BaseDirectory) buildRouter(url *common.URL) router.PriorityRouter {
routerKey := url.GetParam(constant.ROUTER_KEY, "")
if len(routerKey) == 0 {
return nil
}
if url.Protocol == constant.CONDITION_ROUTE_PROTOCOL {
if !dir.isProperRouter(url) {
return nil
}
}
factory := extension.GetRouterFactory(url.Protocol)
r, err := factory.NewPriorityRouter(url, dir.routerChain.GetNotifyChan())
if err != nil {
logger.Errorf("Create router fail. router key: %s, url:%s, error: %+v", routerKey, url.Service(), err)
return nil
}
return r
}
func (dir *BaseDirectory) isProperRouter(url *common.URL) bool {
app := url.GetParam(constant.APPLICATION_KEY, "")
dirApp := dir.GetURL().GetParam(constant.APPLICATION_KEY, "")
......
......@@ -60,7 +60,7 @@ func TestBuildRouterChain(t *testing.T) {
routeURL.AddParam(constant.INTERFACE_KEY, "mock-app")
routerURLs := make([]*common.URL, 0)
routerURLs = append(routerURLs, routeURL)
directory.SetRouters(routerURLs)
directory.AddRouters(routerURLs)
chain := directory.RouterChain()
assert.NotNil(t, chain)
......
......@@ -18,6 +18,7 @@
package tag
import (
"encoding/base64"
"net/url"
"strconv"
"sync"
......@@ -41,7 +42,7 @@ type FileTagRouter struct {
router *tagRouter
routerRule *RouterRule
url *common.URL
//force bool
// force bool
}
// NewFileTagRouter Create file tag router instance with content (from config file)
......@@ -54,21 +55,19 @@ func NewFileTagRouter(content []byte) (*FileTagRouter, error) {
}
fileRouter.routerRule = rule
notify := make(chan struct{})
fileRouter.url = common.NewURLWithOptions(
common.WithProtocol(constant.TAG_ROUTE_PROTOCOL),
common.WithParams(url.Values{}),
common.WithParamsValue(constant.RULE_KEY, base64.URLEncoding.EncodeToString(content)),
common.WithParamsValue(constant.ForceUseTag, strconv.FormatBool(rule.Force)),
common.WithParamsValue(constant.RouterPriority, strconv.Itoa(rule.Priority)),
common.WithParamsValue(constant.ROUTER_KEY, constant.TAG_ROUTE_PROTOCOL))
fileRouter.router, err = NewTagRouter(fileRouter.URL(), notify)
return fileRouter, err
}
// URL Return URL in file tag router n
func (f *FileTagRouter) URL() *common.URL {
f.parseOnce.Do(func() {
routerRule := f.routerRule
f.url = common.NewURLWithOptions(
common.WithProtocol(constant.TAG_ROUTE_PROTOCOL),
common.WithParams(url.Values{}),
common.WithParamsValue(constant.ForceUseTag, strconv.FormatBool(routerRule.Force)),
common.WithParamsValue(constant.RouterPriority, strconv.Itoa(routerRule.Priority)),
common.WithParamsValue(constant.ROUTER_KEY, constant.TAG_ROUTE_PROTOCOL))
})
return f.url
}
......@@ -78,9 +77,5 @@ func (f *FileTagRouter) Priority() int64 {
}
func (f *FileTagRouter) Route(invokers *roaring.Bitmap, cache router.Cache, url *common.URL, invocation protocol.Invocation) *roaring.Bitmap {
if invokers.IsEmpty() {
return invokers
}
// FIXME: I believe this is incorrect.
return f.Route(invokers, cache, url, invocation)
return invokers
}
......@@ -53,7 +53,7 @@ type addrMetadata struct {
// application name
application string
// is rule a runtime rule
//ruleRuntime bool
// ruleRuntime bool
// is rule a force rule
ruleForce bool
// is rule a valid rule
......@@ -84,12 +84,25 @@ func NewTagRouter(url *common.URL, notify chan struct{}) (*tagRouter, error) {
if url == nil {
return nil, perrors.Errorf("Illegal route URL!")
}
return &tagRouter{
r := &tagRouter{
url: url,
enabled: url.GetParamBool(constant.RouterEnabled, true),
priority: url.GetParamInt(constant.RouterPriority, 0),
notify: notify,
}, nil
}
if content, err := url.GetParamAndDecoded(constant.RULE_KEY); err != nil {
return nil, err
} else if content != "" {
if rule, err := getRule(content); err != nil {
return nil, err
} else {
r.tagRouterRule = rule
r.ruleChanged = true
}
}
return r, nil
}
// nolint
......
......@@ -67,12 +67,12 @@ const (
const (
CONFIGURATORS_CATEGORY = "configurators"
ROUTER_CATEGORY = "category"
DEFAULT_CATEGORY = PROVIDER_CATEGORY
DYNAMIC_CONFIGURATORS_CATEGORY = "dynamicconfigurators"
APP_DYNAMIC_CONFIGURATORS_CATEGORY = "appdynamicconfigurators"
DEFAULT_CATEGORY = PROVIDER_CATEGORY
PROVIDER_CATEGORY = "providers"
CONSUMER_CATEGORY = "consumers"
ROUTER_CATEGORY = "routers"
)
const (
......
......@@ -86,6 +86,7 @@ func NewRegistryDirectory(url *common.URL, registry registry.Registry) (cluster.
if routerChain, err := chain.NewRouterChain(dir.consumerURL); err == nil {
dir.BaseDirectory.SetRouterChain(routerChain)
dir.AddRouters(fileBasedRouterUrls())
} else {
logger.Warnf("fail to create router chain with url: %s, err is: %v", url.SubURL, err)
}
......@@ -185,18 +186,21 @@ func (dir *RegistryDirectory) refreshAllInvokers(events []*registry.ServiceEvent
}
}
// loop the updateEvents
var routerUrls []*common.URL
for _, event := range addEvents {
logger.Debugf("registry update, result{%s}", event)
if event.Service != nil {
logger.Infof("selector add service url{%s}", event.Service.String())
}
if event != nil && event.Service != nil && constant.ROUTER_PROTOCOL == event.Service.Protocol {
dir.configRouters()
routerUrls = append(routerUrls, event.Service)
continue
}
if oldInvoker, _ := dir.doCacheInvoker(event.Service); oldInvoker != nil {
oldInvokers = append(oldInvokers, oldInvoker)
}
}
dir.AddRouters(append(fileBasedRouterUrls(), routerUrls...))
}()
dir.setNewInvokers()
// destroy unused invokers
......@@ -245,7 +249,8 @@ func (dir *RegistryDirectory) cacheInvokerByEvent(event *registry.ServiceEvent)
case remoting.EventTypeAdd, remoting.EventTypeUpdate:
logger.Infof("selector add service url{%s}", event.Service)
if u != nil && constant.ROUTER_PROTOCOL == u.Protocol {
dir.configRouters()
dir.AddRouters(append(fileBasedRouterUrls(), u))
return nil, nil
}
return dir.cacheInvoker(u), nil
case remoting.EventTypeDel:
......@@ -258,18 +263,6 @@ func (dir *RegistryDirectory) cacheInvokerByEvent(event *registry.ServiceEvent)
return nil, nil
}
// configRouters configures dynamic routers into the router chain, but, the current impl is incorrect, see FIXME above.
func (dir *RegistryDirectory) configRouters() {
var urls []*common.URL
for _, v := range config.GetRouterURLSet().Values() {
urls = append(urls, v.(*common.URL))
}
if len(urls) > 0 {
dir.SetRouters(urls)
}
}
// convertUrl processes override:// and router://
func (dir *RegistryDirectory) convertUrl(res *registry.ServiceEvent) *common.URL {
ret := res.Service
......@@ -460,6 +453,15 @@ func doOverrideUrl(configurators []config_center.Configurator, targetUrl *common
}
}
// build static configured router from local router config
func fileBasedRouterUrls() []*common.URL {
var urls []*common.URL
for _, v := range config.GetRouterURLSet().Values() {
urls = append(urls, v.(*common.URL))
}
return urls
}
type referenceConfigurationListener struct {
registry.BaseConfigurationListener
directory *RegistryDirectory
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment