From 3df419d0e3e3b3e5c6f79afcc3527fe160943efb Mon Sep 17 00:00:00 2001 From: Ian Luo <ian.luo@gmail.com> Date: Sat, 15 May 2021 18:13:35 +0800 Subject: [PATCH] fix: issue #1151 - router from CONF_ROUTER_FILE_PATH doesn't take affect (#1176) * fix: issue #1151 - router from CONF_ROUTER_FILE_PATH doesn't take affect * reformat the code Co-authored-by: AlexStocks <alexstocks@foxmail.com> --- cluster/directory/base_directory.go | 48 +++++++++++++----------- cluster/directory/base_directory_test.go | 2 +- cluster/router/tag/file.go | 25 +++++------- cluster/router/tag/tag_router.go | 19 ++++++++-- common/constant/default.go | 4 +- registry/directory/directory.go | 30 ++++++++------- 6 files changed, 71 insertions(+), 57 deletions(-) diff --git a/cluster/directory/base_directory.go b/cluster/directory/base_directory.go index f969e7a24..0d530d9c0 100644 --- a/cluster/directory/base_directory.go +++ b/cluster/directory/base_directory.go @@ -74,42 +74,46 @@ func (dir *BaseDirectory) GetDirectoryUrl() *common.URL { return dir.url } -// SetRouters Convert url to routers and add them into dir.routerChain -func (dir *BaseDirectory) SetRouters(urls []*common.URL) { +// AddRouters Convert url to routers and add them into dir.routerChain +func (dir *BaseDirectory) AddRouters(urls []*common.URL) { if len(urls) == 0 { return } routers := make([]router.PriorityRouter, 0, len(urls)) - - rc := dir.routerChain - for _, url := range urls { - routerKey := url.GetParam(constant.ROUTER_KEY, "") - - if len(routerKey) == 0 { - continue - } - if url.Protocol == constant.CONDITION_ROUTE_PROTOCOL { - if !dir.isProperRouter(url) { - continue - } + if r := dir.buildRouter(url); r != nil { + routers = append(routers, r) } - factory := extension.GetRouterFactory(url.Protocol) - r, err := factory.NewPriorityRouter(url, rc.GetNotifyChan()) - if err != nil { - logger.Errorf("Create router fail. router key: %s, url:%s, error: %+v", routerKey, url.Service(), err) - return - } - routers = append(routers, r) } logger.Infof("Init file condition router success, size: %v", len(routers)) dir.mutex.Lock() - rc.AddRouters(routers) + dir.routerChain.AddRouters(routers) dir.mutex.Unlock() } +func (dir *BaseDirectory) buildRouter(url *common.URL) router.PriorityRouter { + routerKey := url.GetParam(constant.ROUTER_KEY, "") + if len(routerKey) == 0 { + return nil + } + + if url.Protocol == constant.CONDITION_ROUTE_PROTOCOL { + if !dir.isProperRouter(url) { + return nil + } + } + + factory := extension.GetRouterFactory(url.Protocol) + r, err := factory.NewPriorityRouter(url, dir.routerChain.GetNotifyChan()) + if err != nil { + logger.Errorf("Create router fail. router key: %s, url:%s, error: %+v", routerKey, url.Service(), err) + return nil + } + return r +} + func (dir *BaseDirectory) isProperRouter(url *common.URL) bool { app := url.GetParam(constant.APPLICATION_KEY, "") dirApp := dir.GetURL().GetParam(constant.APPLICATION_KEY, "") diff --git a/cluster/directory/base_directory_test.go b/cluster/directory/base_directory_test.go index 7130c3eb5..c0f084d57 100644 --- a/cluster/directory/base_directory_test.go +++ b/cluster/directory/base_directory_test.go @@ -60,7 +60,7 @@ func TestBuildRouterChain(t *testing.T) { routeURL.AddParam(constant.INTERFACE_KEY, "mock-app") routerURLs := make([]*common.URL, 0) routerURLs = append(routerURLs, routeURL) - directory.SetRouters(routerURLs) + directory.AddRouters(routerURLs) chain := directory.RouterChain() assert.NotNil(t, chain) diff --git a/cluster/router/tag/file.go b/cluster/router/tag/file.go index 81c25109d..4d7847981 100644 --- a/cluster/router/tag/file.go +++ b/cluster/router/tag/file.go @@ -18,6 +18,7 @@ package tag import ( + "encoding/base64" "net/url" "strconv" "sync" @@ -41,7 +42,7 @@ type FileTagRouter struct { router *tagRouter routerRule *RouterRule url *common.URL - //force bool + // force bool } // NewFileTagRouter Create file tag router instance with content (from config file) @@ -54,21 +55,19 @@ func NewFileTagRouter(content []byte) (*FileTagRouter, error) { } fileRouter.routerRule = rule notify := make(chan struct{}) + fileRouter.url = common.NewURLWithOptions( + common.WithProtocol(constant.TAG_ROUTE_PROTOCOL), + common.WithParams(url.Values{}), + common.WithParamsValue(constant.RULE_KEY, base64.URLEncoding.EncodeToString(content)), + common.WithParamsValue(constant.ForceUseTag, strconv.FormatBool(rule.Force)), + common.WithParamsValue(constant.RouterPriority, strconv.Itoa(rule.Priority)), + common.WithParamsValue(constant.ROUTER_KEY, constant.TAG_ROUTE_PROTOCOL)) fileRouter.router, err = NewTagRouter(fileRouter.URL(), notify) return fileRouter, err } // URL Return URL in file tag router n func (f *FileTagRouter) URL() *common.URL { - f.parseOnce.Do(func() { - routerRule := f.routerRule - f.url = common.NewURLWithOptions( - common.WithProtocol(constant.TAG_ROUTE_PROTOCOL), - common.WithParams(url.Values{}), - common.WithParamsValue(constant.ForceUseTag, strconv.FormatBool(routerRule.Force)), - common.WithParamsValue(constant.RouterPriority, strconv.Itoa(routerRule.Priority)), - common.WithParamsValue(constant.ROUTER_KEY, constant.TAG_ROUTE_PROTOCOL)) - }) return f.url } @@ -78,9 +77,5 @@ func (f *FileTagRouter) Priority() int64 { } func (f *FileTagRouter) Route(invokers *roaring.Bitmap, cache router.Cache, url *common.URL, invocation protocol.Invocation) *roaring.Bitmap { - if invokers.IsEmpty() { - return invokers - } - // FIXME: I believe this is incorrect. - return f.Route(invokers, cache, url, invocation) + return invokers } diff --git a/cluster/router/tag/tag_router.go b/cluster/router/tag/tag_router.go index 03d52647e..524bb417a 100644 --- a/cluster/router/tag/tag_router.go +++ b/cluster/router/tag/tag_router.go @@ -53,7 +53,7 @@ type addrMetadata struct { // application name application string // is rule a runtime rule - //ruleRuntime bool + // ruleRuntime bool // is rule a force rule ruleForce bool // is rule a valid rule @@ -84,12 +84,25 @@ func NewTagRouter(url *common.URL, notify chan struct{}) (*tagRouter, error) { if url == nil { return nil, perrors.Errorf("Illegal route URL!") } - return &tagRouter{ + r := &tagRouter{ url: url, enabled: url.GetParamBool(constant.RouterEnabled, true), priority: url.GetParamInt(constant.RouterPriority, 0), notify: notify, - }, nil + } + + if content, err := url.GetParamAndDecoded(constant.RULE_KEY); err != nil { + return nil, err + } else if content != "" { + if rule, err := getRule(content); err != nil { + return nil, err + } else { + r.tagRouterRule = rule + r.ruleChanged = true + } + } + + return r, nil } // nolint diff --git a/common/constant/default.go b/common/constant/default.go index bbe022cb2..da8cd20a6 100644 --- a/common/constant/default.go +++ b/common/constant/default.go @@ -67,12 +67,12 @@ const ( const ( CONFIGURATORS_CATEGORY = "configurators" - ROUTER_CATEGORY = "category" - DEFAULT_CATEGORY = PROVIDER_CATEGORY DYNAMIC_CONFIGURATORS_CATEGORY = "dynamicconfigurators" APP_DYNAMIC_CONFIGURATORS_CATEGORY = "appdynamicconfigurators" + DEFAULT_CATEGORY = PROVIDER_CATEGORY PROVIDER_CATEGORY = "providers" CONSUMER_CATEGORY = "consumers" + ROUTER_CATEGORY = "routers" ) const ( diff --git a/registry/directory/directory.go b/registry/directory/directory.go index 263b248e7..04b109390 100644 --- a/registry/directory/directory.go +++ b/registry/directory/directory.go @@ -86,6 +86,7 @@ func NewRegistryDirectory(url *common.URL, registry registry.Registry) (cluster. if routerChain, err := chain.NewRouterChain(dir.consumerURL); err == nil { dir.BaseDirectory.SetRouterChain(routerChain) + dir.AddRouters(fileBasedRouterUrls()) } else { logger.Warnf("fail to create router chain with url: %s, err is: %v", url.SubURL, err) } @@ -185,18 +186,21 @@ func (dir *RegistryDirectory) refreshAllInvokers(events []*registry.ServiceEvent } } // loop the updateEvents + var routerUrls []*common.URL for _, event := range addEvents { logger.Debugf("registry update, result{%s}", event) if event.Service != nil { logger.Infof("selector add service url{%s}", event.Service.String()) } if event != nil && event.Service != nil && constant.ROUTER_PROTOCOL == event.Service.Protocol { - dir.configRouters() + routerUrls = append(routerUrls, event.Service) + continue } if oldInvoker, _ := dir.doCacheInvoker(event.Service); oldInvoker != nil { oldInvokers = append(oldInvokers, oldInvoker) } } + dir.AddRouters(append(fileBasedRouterUrls(), routerUrls...)) }() dir.setNewInvokers() // destroy unused invokers @@ -245,7 +249,8 @@ func (dir *RegistryDirectory) cacheInvokerByEvent(event *registry.ServiceEvent) case remoting.EventTypeAdd, remoting.EventTypeUpdate: logger.Infof("selector add service url{%s}", event.Service) if u != nil && constant.ROUTER_PROTOCOL == u.Protocol { - dir.configRouters() + dir.AddRouters(append(fileBasedRouterUrls(), u)) + return nil, nil } return dir.cacheInvoker(u), nil case remoting.EventTypeDel: @@ -258,18 +263,6 @@ func (dir *RegistryDirectory) cacheInvokerByEvent(event *registry.ServiceEvent) return nil, nil } -// configRouters configures dynamic routers into the router chain, but, the current impl is incorrect, see FIXME above. -func (dir *RegistryDirectory) configRouters() { - var urls []*common.URL - for _, v := range config.GetRouterURLSet().Values() { - urls = append(urls, v.(*common.URL)) - } - - if len(urls) > 0 { - dir.SetRouters(urls) - } -} - // convertUrl processes override:// and router:// func (dir *RegistryDirectory) convertUrl(res *registry.ServiceEvent) *common.URL { ret := res.Service @@ -460,6 +453,15 @@ func doOverrideUrl(configurators []config_center.Configurator, targetUrl *common } } +// build static configured router from local router config +func fileBasedRouterUrls() []*common.URL { + var urls []*common.URL + for _, v := range config.GetRouterURLSet().Values() { + urls = append(urls, v.(*common.URL)) + } + return urls +} + type referenceConfigurationListener struct { registry.BaseConfigurationListener directory *RegistryDirectory -- GitLab