diff --git a/cluster/directory/base_directory.go b/cluster/directory/base_directory.go index d1025a152b599d70c40bba5bc16009d8d5adee37..309cd4429c7256eccab556369db9b0dd264e7480 100644 --- a/cluster/directory/base_directory.go +++ b/cluster/directory/base_directory.go @@ -82,6 +82,8 @@ func (dir *BaseDirectory) SetRouters(urls []*common.URL) { routers := make([]router.PriorityRouter, 0, len(urls)) + rc := dir.routerChain + for _, url := range urls { routerKey := url.GetParam(constant.ROUTER_KEY, "") @@ -94,7 +96,7 @@ func (dir *BaseDirectory) SetRouters(urls []*common.URL) { } } factory := extension.GetRouterFactory(url.Protocol) - r, err := factory.NewPriorityRouter(url) + r, err := factory.NewPriorityRouter(url, rc.GetNotifyChan()) if err != nil { logger.Errorf("Create router fail. router key: %s, url:%s, error: %+v", routerKey, url.Service(), err) return @@ -104,10 +106,8 @@ func (dir *BaseDirectory) SetRouters(urls []*common.URL) { logger.Infof("Init file condition router success, size: %v", len(routers)) dir.mutex.Lock() - rc := dir.routerChain - dir.mutex.Unlock() - rc.AddRouters(routers) + dir.mutex.Unlock() } func (dir *BaseDirectory) isProperRouter(url *common.URL) bool { diff --git a/cluster/router/chain.go b/cluster/router/chain.go index 3614d0a5a3d6cfb462ef63149ae99da2c4541b8d..cb33cf927fb982ec8e62133d0c68f2dce5e142f3 100644 --- a/cluster/router/chain.go +++ b/cluster/router/chain.go @@ -29,4 +29,6 @@ type Chain interface { SetInvokers([]protocol.Invoker) // AddRouters Add routers AddRouters([]PriorityRouter) + // GetNotifyChan get notify channel of this chain + GetNotifyChan() chan struct{} } diff --git a/cluster/router/chain/chain.go b/cluster/router/chain/chain.go index d707173ccffcdc54850517d2676ab97fe9a14c1d..57788975e79e226db760302bc9b603a678103ff5 100644 --- a/cluster/router/chain/chain.go +++ b/cluster/router/chain/chain.go @@ -63,8 +63,10 @@ type RouterChain struct { notify chan struct{} // Address cache cache atomic.Value - // routerNeedsUpdate - routerNeedsUpdate atomic.Bool +} + +func (c *RouterChain) GetNotifyChan() chan struct{} { + return c.notify } // Route Loop routers in RouterChain and call Route method to determine the target invokers list. @@ -102,7 +104,7 @@ func (c *RouterChain) AddRouters(routers []router.PriorityRouter) { c.mutex.Lock() defer c.mutex.Unlock() c.routers = newRouters - c.routerNeedsUpdate.Store(true) + c.notify <- struct{}{} } // SetInvokers receives updated invokers from registry center. If the times of notification exceeds countThreshold and @@ -124,9 +126,8 @@ func (c *RouterChain) loop() { for { select { case <-ticker.C: - if protocol.GetAndRefreshState() || c.routerNeedsUpdate.Load() { + if protocol.GetAndRefreshState() { c.buildCache() - c.routerNeedsUpdate.Store(false) } case <-c.notify: c.buildCache() @@ -224,9 +225,15 @@ func NewRouterChain(url *common.URL) (*RouterChain, error) { if len(routerFactories) == 0 { return nil, perrors.Errorf("No routerFactory exits , create one please") } + + chain := &RouterChain{ + last: time.Now(), + notify: make(chan struct{}), + } + routers := make([]router.PriorityRouter, 0, len(routerFactories)) for key, routerFactory := range routerFactories { - r, err := routerFactory().NewPriorityRouter(url) + r, err := routerFactory().NewPriorityRouter(url, chain.notify) if r == nil || err != nil { logger.Errorf("router chain build router fail! routerFactories key:%s error:%s", key, err.Error()) continue @@ -241,13 +248,8 @@ func NewRouterChain(url *common.URL) (*RouterChain, error) { routerNeedsUpdateInit := atomic.Bool{} routerNeedsUpdateInit.Store(false) - chain := &RouterChain{ - builtinRouters: routers, - routers: newRouters, - last: time.Now(), - notify: make(chan struct{}), - routerNeedsUpdate: routerNeedsUpdateInit, - } + chain.routers = newRouters + chain.builtinRouters = routers if url != nil { chain.url = url } diff --git a/cluster/router/chain/chain_test.go b/cluster/router/chain/chain_test.go index b21990b08c5b960c407a395cddf7ef7518ba5822..f274d1b3182c6253d19135fc4c677d70630580e8 100644 --- a/cluster/router/chain/chain_test.go +++ b/cluster/router/chain/chain_test.go @@ -142,7 +142,8 @@ conditions: url := getConditionRouteUrl(applicationKey) assert.NotNil(t, url) factory := extension.GetRouterFactory(url.Protocol) - r, err := factory.NewPriorityRouter(url) + notify := make(chan struct{}) + r, err := factory.NewPriorityRouter(url, notify) assert.Nil(t, err) assert.NotNil(t, r) diff --git a/cluster/router/condition/app_router.go b/cluster/router/condition/app_router.go index 056e32851c11696c80d18a2a55b109fcdae06627..64b3914b87003419c04755b584b06f66d7f05435 100644 --- a/cluster/router/condition/app_router.go +++ b/cluster/router/condition/app_router.go @@ -34,14 +34,15 @@ const ( // AppRouter For listen application router with config center type AppRouter struct { listenableRouter + notify interface{} } // NewAppRouter Init AppRouter by url -func NewAppRouter(url *common.URL) (*AppRouter, error) { +func NewAppRouter(url *common.URL, notify chan struct{}) (*AppRouter, error) { if url == nil { return nil, perrors.Errorf("No route URL for create app router!") } - appRouter, err := newListenableRouter(url, url.GetParam(constant.APPLICATION_KEY, "")) + appRouter, err := newListenableRouter(url, url.GetParam(constant.APPLICATION_KEY, ""), notify) if err != nil { return nil, err } diff --git a/cluster/router/condition/app_router_test.go b/cluster/router/condition/app_router_test.go index 879abc5cc8b607ee9245ce632800b056ff740cc5..c47aaaf80b8ffcac7653eeca869d8130227066a8 100644 --- a/cluster/router/condition/app_router_test.go +++ b/cluster/router/condition/app_router_test.go @@ -78,7 +78,8 @@ conditions: assert.NotNil(t, configuration) appRouteURL := getAppRouteURL(routerKey) - appRouter, err := NewAppRouter(appRouteURL) + notify := make(chan struct{}) + appRouter, err := NewAppRouter(appRouteURL, notify) assert.Nil(t, err) assert.NotNil(t, appRouter) @@ -126,7 +127,8 @@ conditions: assert.NotNil(t, configuration) appRouteURL := getAppRouteURL(routerKey) - appRouter, err := NewAppRouter(appRouteURL) + notify := make(chan struct{}) + appRouter, err := NewAppRouter(appRouteURL, notify) assert.Nil(t, err) assert.NotNil(t, appRouter) @@ -165,7 +167,8 @@ conditions: assert.NotNil(t, configuration) appRouteURL := getAppRouteURL(routerKey) - appRouter, err := NewAppRouter(appRouteURL) + notify := make(chan struct{}) + appRouter, err := NewAppRouter(appRouteURL, notify) assert.Nil(t, err) assert.NotNil(t, appRouter) diff --git a/cluster/router/condition/factory.go b/cluster/router/condition/factory.go index f8d3e130102d4311f8b1ddb1055aece8a0633296..2c1f03516a88be2846a74029a990c67554201f05 100644 --- a/cluster/router/condition/factory.go +++ b/cluster/router/condition/factory.go @@ -37,8 +37,8 @@ func newConditionRouterFactory() router.PriorityRouterFactory { } // NewPriorityRouter creates ConditionRouterFactory by URL -func (c *ConditionRouterFactory) NewPriorityRouter(url *common.URL) (router.PriorityRouter, error) { - return NewConditionRouter(url) +func (c *ConditionRouterFactory) NewPriorityRouter(url *common.URL, notify chan struct{}) (router.PriorityRouter, error) { + return NewConditionRouter(url, notify) } // NewRouter Create FileRouterFactory by Content @@ -54,6 +54,6 @@ func newAppRouterFactory() router.PriorityRouterFactory { } // NewPriorityRouter creates AppRouterFactory by URL -func (c *AppRouterFactory) NewPriorityRouter(url *common.URL) (router.PriorityRouter, error) { - return NewAppRouter(url) +func (c *AppRouterFactory) NewPriorityRouter(url *common.URL, notify chan struct{}) (router.PriorityRouter, error) { + return NewAppRouter(url, notify) } diff --git a/cluster/router/condition/factory_test.go b/cluster/router/condition/factory_test.go index c916588eeeef68b796c35ee6b3b175e8de859863..c8cc9c9761b4835f94b83e0cb2a7ac3a91608e50 100644 --- a/cluster/router/condition/factory_test.go +++ b/cluster/router/condition/factory_test.go @@ -132,37 +132,39 @@ func (bi *MockInvoker) Destroy() { func TestRoute_matchWhen(t *testing.T) { inv := &invocation.RPCInvocation{} rule := base64.URLEncoding.EncodeToString([]byte("=> host = 1.2.3.4")) - router, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule)) + notify := make(chan struct{}) + router, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule), notify) cUrl, _ := common.NewURL(fmt.Sprintf(factoryDubboFormat, factory1111Ip)) matchWhen := router.(*ConditionRouter).MatchWhen(cUrl, inv) assert.Equal(t, true, matchWhen) rule1 := base64.URLEncoding.EncodeToString([]byte("host = 2.2.2.2,1.1.1.1,3.3.3.3 => host = 1.2.3.4")) - router1, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule1)) + router1, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule1), notify) matchWhen1 := router1.(*ConditionRouter).MatchWhen(cUrl, inv) assert.Equal(t, true, matchWhen1) rule2 := base64.URLEncoding.EncodeToString([]byte("host = 2.2.2.2,1.1.1.1,3.3.3.3 & host !=1.1.1.1 => host = 1.2.3.4")) - router2, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule2)) + router2, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule2), notify) matchWhen2 := router2.(*ConditionRouter).MatchWhen(cUrl, inv) assert.Equal(t, false, matchWhen2) rule3 := base64.URLEncoding.EncodeToString([]byte("host !=4.4.4.4 & host = 2.2.2.2,1.1.1.1,3.3.3.3 => host = 1.2.3.4")) - router3, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule3)) + router3, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule3), notify) matchWhen3 := router3.(*ConditionRouter).MatchWhen(cUrl, inv) assert.Equal(t, true, matchWhen3) rule4 := base64.URLEncoding.EncodeToString([]byte("host !=4.4.4.* & host = 2.2.2.2,1.1.1.1,3.3.3.3 => host = 1.2.3.4")) - router4, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule4)) + router4, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule4), notify) matchWhen4 := router4.(*ConditionRouter).MatchWhen(cUrl, inv) assert.Equal(t, true, matchWhen4) rule5 := base64.URLEncoding.EncodeToString([]byte("host = 2.2.2.2,1.1.1.*,3.3.3.3 & host != 1.1.1.1 => host = 1.2.3.4")) - router5, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule5)) + router5, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule5), notify) matchWhen5 := router5.(*ConditionRouter).MatchWhen(cUrl, inv) assert.Equal(t, false, matchWhen5) rule6 := base64.URLEncoding.EncodeToString([]byte("host = 2.2.2.2,1.1.1.*,3.3.3.3 & host != 1.1.1.2 => host = 1.2.3.4")) - router6, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule6)) + router6, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule6), notify) matchWhen6 := router6.(*ConditionRouter).MatchWhen(cUrl, inv) assert.Equal(t, true, matchWhen6) } func TestRoute_matchFilter(t *testing.T) { + notify := make(chan struct{}) localIP := common.GetLocalIp() t.Logf("The local ip is %s", localIP) url1, _ := common.NewURL("dubbo://10.20.3.3:20880/com.foo.BarService?default.serialization=fastjson") @@ -175,12 +177,12 @@ func TestRoute_matchFilter(t *testing.T) { rule4 := base64.URLEncoding.EncodeToString([]byte("host = " + localIP + " => " + " host = 10.20.3.2,10.20.3.3,10.20.3.4")) rule5 := base64.URLEncoding.EncodeToString([]byte("host = " + localIP + " => " + " host != 10.20.3.3")) rule6 := base64.URLEncoding.EncodeToString([]byte("host = " + localIP + " => " + " serialization = fastjson")) - router1, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule1)) - router2, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule2)) - router3, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule3)) - router4, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule4)) - router5, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule5)) - router6, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule6)) + router1, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule1), notify) + router2, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule2), notify) + router3, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule3), notify) + router4, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule4), notify) + router5, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule5), notify) + router6, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule6), notify) cUrl, _ := common.NewURL(fmt.Sprintf(factoryConsumerFormat, localIP)) ret1 := router1.Route(utils.ToBitmap(invokers), setUpAddrCache(invokers), cUrl, &invocation.RPCInvocation{}) ret2 := router2.Route(utils.ToBitmap(invokers), setUpAddrCache(invokers), cUrl, &invocation.RPCInvocation{}) @@ -198,9 +200,10 @@ func TestRoute_matchFilter(t *testing.T) { } func TestRoute_methodRoute(t *testing.T) { + notify := make(chan struct{}) inv := invocation.NewRPCInvocationWithOptions(invocation.WithMethodName("getFoo"), invocation.WithParameterTypes([]reflect.Type{}), invocation.WithArguments([]interface{}{})) rule := base64.URLEncoding.EncodeToString([]byte("host !=4.4.4.* & host = 2.2.2.2,1.1.1.1,3.3.3.3 => host = 1.2.3.4")) - r, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule)) + r, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule), notify) url, _ := common.NewURL("consumer://1.1.1.1/com.foo.BarService?methods=setFoo,getFoo,findFoo") matchWhen := r.(*ConditionRouter).MatchWhen(url, inv) assert.Equal(t, true, matchWhen) @@ -209,42 +212,45 @@ func TestRoute_methodRoute(t *testing.T) { assert.Equal(t, true, matchWhen) url2, _ := common.NewURL(fmt.Sprintf(factoryConsumerMethodFormat, factory1111Ip)) rule2 := base64.URLEncoding.EncodeToString([]byte("methods=getFoo & host!=1.1.1.1 => host = 1.2.3.4")) - router2, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule2)) + router2, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule2), notify) matchWhen = router2.(*ConditionRouter).MatchWhen(url2, inv) assert.Equal(t, false, matchWhen) url3, _ := common.NewURL(fmt.Sprintf(factoryConsumerMethodFormat, factory1111Ip)) rule3 := base64.URLEncoding.EncodeToString([]byte("methods=getFoo & host=1.1.1.1 => host = 1.2.3.4")) - router3, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule3)) + router3, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule3), notify) matchWhen = router3.(*ConditionRouter).MatchWhen(url3, inv) assert.Equal(t, true, matchWhen) } func TestRoute_ReturnFalse(t *testing.T) { + notify := make(chan struct{}) url, _ := common.NewURL("") localIP := common.GetLocalIp() invokers := []protocol.Invoker{NewMockInvoker(url, 1), NewMockInvoker(url, 2), NewMockInvoker(url, 3)} inv := &invocation.RPCInvocation{} rule := base64.URLEncoding.EncodeToString([]byte("host = " + localIP + " => false")) curl, _ := common.NewURL(fmt.Sprintf(factoryConsumerFormat, localIP)) - r, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule)) + r, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule), notify) ret := r.Route(utils.ToBitmap(invokers), setUpAddrCache(invokers), curl, inv) assert.Equal(t, 0, len(ret.ToArray())) } func TestRoute_ReturnEmpty(t *testing.T) { + notify := make(chan struct{}) localIP := common.GetLocalIp() url, _ := common.NewURL("") invokers := []protocol.Invoker{NewMockInvoker(url, 1), NewMockInvoker(url, 2), NewMockInvoker(url, 3)} inv := &invocation.RPCInvocation{} rule := base64.URLEncoding.EncodeToString([]byte("host = " + localIP + " => ")) curl, _ := common.NewURL(fmt.Sprintf(factoryConsumerFormat, localIP)) - r, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule)) + r, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule), notify) ret := r.Route(utils.ToBitmap(invokers), setUpAddrCache(invokers), curl, inv) assert.Equal(t, 0, len(ret.ToArray())) } func TestRoute_ReturnAll(t *testing.T) { + notify := make(chan struct{}) localIP := common.GetLocalIp() urlString := "dubbo://" + localIP + "/com.foo.BarService" dubboURL, _ := common.NewURL(urlString) @@ -255,7 +261,7 @@ func TestRoute_ReturnAll(t *testing.T) { inv := &invocation.RPCInvocation{} rule := base64.URLEncoding.EncodeToString([]byte("host = " + localIP + " => " + " host = " + localIP)) curl, _ := common.NewURL(fmt.Sprintf(factoryConsumerFormat, localIP)) - r, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule)) + r, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule), notify) ret := r.Route(utils.ToBitmap(invokers), setUpAddrCache(invokers), curl, inv) assert.Equal(t, len(invokers), len(ret.ToArray())) } @@ -265,6 +271,7 @@ func TestRoute_HostFilter(t *testing.T) { url1, _ := common.NewURL(factory333URL) url2, _ := common.NewURL(fmt.Sprintf(factoryDubboFormat, localIP)) url3, _ := common.NewURL(fmt.Sprintf(factoryDubboFormat, localIP)) + notify := make(chan struct{}) invoker1 := NewMockInvoker(url1, 1) invoker2 := NewMockInvoker(url2, 2) invoker3 := NewMockInvoker(url3, 3) @@ -272,7 +279,7 @@ func TestRoute_HostFilter(t *testing.T) { inv := &invocation.RPCInvocation{} rule := base64.URLEncoding.EncodeToString([]byte("host = " + localIP + " => " + " host = " + localIP)) curl, _ := common.NewURL(fmt.Sprintf(factoryConsumerFormat, localIP)) - r, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule)) + r, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule), notify) ret := r.Route(utils.ToBitmap(invokers), setUpAddrCache(invokers), curl, inv) assert.Equal(t, 2, len(ret.ToArray())) assert.Equal(t, invoker2, invokers[ret.ToArray()[0]]) @@ -280,6 +287,7 @@ func TestRoute_HostFilter(t *testing.T) { } func TestRoute_Empty_HostFilter(t *testing.T) { + notify := make(chan struct{}) localIP := common.GetLocalIp() url1, _ := common.NewURL(factory333URL) url2, _ := common.NewURL(fmt.Sprintf(factoryDubboFormat, localIP)) @@ -291,7 +299,7 @@ func TestRoute_Empty_HostFilter(t *testing.T) { inv := &invocation.RPCInvocation{} rule := base64.URLEncoding.EncodeToString([]byte(" => " + " host = " + localIP)) curl, _ := common.NewURL(fmt.Sprintf(factoryConsumerFormat, localIP)) - r, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule)) + r, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule), notify) ret := r.Route(utils.ToBitmap(invokers), setUpAddrCache(invokers), curl, inv) assert.Equal(t, 2, len(ret.ToArray())) assert.Equal(t, invoker2, invokers[ret.ToArray()[0]]) @@ -299,6 +307,7 @@ func TestRoute_Empty_HostFilter(t *testing.T) { } func TestRoute_False_HostFilter(t *testing.T) { + notify := make(chan struct{}) localIP := common.GetLocalIp() url1, _ := common.NewURL(factory333URL) url2, _ := common.NewURL(fmt.Sprintf(factoryDubboFormat, localIP)) @@ -310,7 +319,7 @@ func TestRoute_False_HostFilter(t *testing.T) { inv := &invocation.RPCInvocation{} rule := base64.URLEncoding.EncodeToString([]byte("true => " + " host = " + localIP)) curl, _ := common.NewURL(fmt.Sprintf(factoryConsumerFormat, localIP)) - r, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule)) + r, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule), notify) ret := r.Route(utils.ToBitmap(invokers), setUpAddrCache(invokers), curl, inv) assert.Equal(t, 2, len(ret.ToArray())) assert.Equal(t, invoker2, invokers[ret.ToArray()[0]]) @@ -318,6 +327,7 @@ func TestRoute_False_HostFilter(t *testing.T) { } func TestRoute_Placeholder(t *testing.T) { + notify := make(chan struct{}) localIP := common.GetLocalIp() url1, _ := common.NewURL(factory333URL) url2, _ := common.NewURL(fmt.Sprintf(factoryDubboFormat, localIP)) @@ -329,7 +339,7 @@ func TestRoute_Placeholder(t *testing.T) { inv := &invocation.RPCInvocation{} rule := base64.URLEncoding.EncodeToString([]byte("host = " + localIP + " => " + " host = $host")) curl, _ := common.NewURL(fmt.Sprintf(factoryConsumerFormat, localIP)) - r, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule)) + r, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule), notify) ret := r.Route(utils.ToBitmap(invokers), setUpAddrCache(invokers), curl, inv) assert.Equal(t, 2, len(ret.ToArray())) assert.Equal(t, invoker2, invokers[ret.ToArray()[0]]) @@ -337,6 +347,7 @@ func TestRoute_Placeholder(t *testing.T) { } func TestRoute_NoForce(t *testing.T) { + notify := make(chan struct{}) localIP := common.GetLocalIp() url1, _ := common.NewURL(factory333URL) url2, _ := common.NewURL(fmt.Sprintf(factoryDubboFormat, localIP)) @@ -348,12 +359,13 @@ func TestRoute_NoForce(t *testing.T) { inv := &invocation.RPCInvocation{} rule := base64.URLEncoding.EncodeToString([]byte(fmt.Sprintf(factoryHostIp1234Format, localIP))) curl, _ := common.NewURL(fmt.Sprintf(factoryConsumerFormat, localIP)) - r, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrlWithNoForce(rule)) + r, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrlWithNoForce(rule), notify) ret := r.Route(utils.ToBitmap(invokers), setUpAddrCache(invokers), curl, inv) assert.Equal(t, len(invokers), len(ret.ToArray())) } func TestRoute_Force(t *testing.T) { + notify := make(chan struct{}) localIP := common.GetLocalIp() url1, _ := common.NewURL(factory333URL) url2, _ := common.NewURL(fmt.Sprintf(factoryDubboFormat, localIP)) @@ -365,7 +377,7 @@ func TestRoute_Force(t *testing.T) { inv := &invocation.RPCInvocation{} rule := base64.URLEncoding.EncodeToString([]byte(fmt.Sprintf(factoryHostIp1234Format, localIP))) curl, _ := common.NewURL(fmt.Sprintf(factoryConsumerFormat, localIP)) - r, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrlWithForce(rule, "true")) + r, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrlWithForce(rule, "true"), notify) fileredInvokers := r.Route(utils.ToBitmap(invokers), setUpAddrCache(invokers), curl, inv) assert.Equal(t, 0, len(fileredInvokers.ToArray())) } diff --git a/cluster/router/condition/listenable_router.go b/cluster/router/condition/listenable_router.go index 0b47310dbfe9987a593bc2d0d949a76f08052114..1e35ac8c5ff0743f462f60850cfe4bf6ca0b241a 100644 --- a/cluster/router/condition/listenable_router.go +++ b/cluster/router/condition/listenable_router.go @@ -49,6 +49,7 @@ type listenableRouter struct { url *common.URL force bool priority int64 + notify chan struct{} } // RouterRule Get RouterRule instance from listenableRouter @@ -56,7 +57,7 @@ func (l *listenableRouter) RouterRule() *RouterRule { return l.routerRule } -func newListenableRouter(url *common.URL, ruleKey string) (*AppRouter, error) { +func newListenableRouter(url *common.URL, ruleKey string, notify chan struct{}) (*AppRouter, error) { if ruleKey == "" { return nil, perrors.Errorf("NewListenableRouter ruleKey is nil, can't create Listenable router") } @@ -64,6 +65,7 @@ func newListenableRouter(url *common.URL, ruleKey string) (*AppRouter, error) { l.url = url l.priority = listenableRouterDefaultPriority + l.notify = notify routerKey := ruleKey + constant.ConditionRouterRuleSuffix // add listener @@ -110,6 +112,7 @@ func (l *listenableRouter) Process(event *config_center.ConfigChangeEvent) { return } l.generateConditions(routerRule) + l.notify <- struct{}{} } func (l *listenableRouter) generateConditions(rule *RouterRule) { diff --git a/cluster/router/condition/router.go b/cluster/router/condition/router.go index 0817b32843938346610aa1e66c626e4895468372..d543ca3f94644370179c18a9c3a3f9a00268461e 100644 --- a/cluster/router/condition/router.go +++ b/cluster/router/condition/router.go @@ -62,6 +62,7 @@ type ConditionRouter struct { enabled bool WhenCondition map[string]MatchPair ThenCondition map[string]MatchPair + notify chan struct{} } // NewConditionRouterWithRule Init condition router by raw rule @@ -111,7 +112,7 @@ func NewConditionRouterWithRule(rule string) (*ConditionRouter, error) { } // NewConditionRouter Init condition router by URL -func NewConditionRouter(url *common.URL) (*ConditionRouter, error) { +func NewConditionRouter(url *common.URL, notify chan struct{}) (*ConditionRouter, error) { if url == nil { return nil, perrors.Errorf("Illegal route URL!") } @@ -135,6 +136,7 @@ func NewConditionRouter(url *common.URL) (*ConditionRouter, error) { router.priority = url.GetParamInt(constant.RouterPriority, defaultPriority) router.Force = url.GetParamBool(constant.RouterForce, false) router.enabled = url.GetParamBool(constant.RouterEnabled, true) + router.notify = notify return router, nil } diff --git a/cluster/router/condition/router_test.go b/cluster/router/condition/router_test.go index a344b64efb5d7be1fe7e5d2a5a4c9d206cfcf5da..f3dc1fe788d61adf6e25342f00a0ed8d29b2b529 100644 --- a/cluster/router/condition/router_test.go +++ b/cluster/router/condition/router_test.go @@ -58,8 +58,9 @@ func TestParseRule(t *testing.T) { } func TestNewConditionRouter(t *testing.T) { + notify := make(chan struct{}) url, _ := common.NewURL(`condition://0.0.0.0:?application=mock-app&category=routers&force=true&priority=1&router=condition&rule=YSAmIGMgPT4gYiAmIGQ%3D`) - router, err := NewConditionRouter(url) + router, err := NewConditionRouter(url, notify) assert.Nil(t, err) assert.Equal(t, true, router.Enabled()) assert.Equal(t, true, router.Force) @@ -69,22 +70,22 @@ func TestNewConditionRouter(t *testing.T) { assert.EqualValues(t, router.WhenCondition, whenRule) assert.EqualValues(t, router.ThenCondition, thenRule) - router, err = NewConditionRouter(nil) + router, err = NewConditionRouter(nil, notify) assert.Nil(t, router) assert.Error(t, err) url, _ = common.NewURL(`condition://0.0.0.0:?application=mock-app&category=routers&force=true&priority=1&router=condition&rule=YSAmT4gYiAmIGQ%3D`) - router, err = NewConditionRouter(url) + router, err = NewConditionRouter(url, notify) assert.Nil(t, router) assert.Error(t, err) url, _ = common.NewURL(`condition://0.0.0.0:?application=mock-app&category=routers&force=true&router=condition&rule=YSAmIGMgPT4gYiAmIGQ%3D`) - router, err = NewConditionRouter(url) + router, err = NewConditionRouter(url, notify) assert.Nil(t, err) assert.Equal(t, int64(150), router.Priority()) url, _ = common.NewURL(`condition://0.0.0.0:?category=routers&force=true&interface=mock-service&router=condition&rule=YSAmIGMgPT4gYiAmIGQ%3D`) - router, err = NewConditionRouter(url) + router, err = NewConditionRouter(url, notify) assert.Nil(t, err) assert.Equal(t, int64(140), router.Priority()) } diff --git a/cluster/router/conncheck/conn_check_route.go b/cluster/router/conncheck/conn_check_route.go index 8d94db409bbf896eb27ecdf74c813e34dfd298aa..97f049d85bb0369982e0a84f27a4c6b3b8925a55 100644 --- a/cluster/router/conncheck/conn_check_route.go +++ b/cluster/router/conncheck/conn_check_route.go @@ -40,12 +40,14 @@ const ( type ConnCheckRouter struct { url *common.URL checker router.ConnChecker + notify chan struct{} } // NewConnCheckRouter construct an NewConnCheckRouter via url -func NewConnCheckRouter(url *common.URL) (router.PriorityRouter, error) { +func NewConnCheckRouter(url *common.URL, notify chan struct{}) (router.PriorityRouter, error) { r := &ConnCheckRouter{ - url: url, + url: url, + notify: notify, } checkerName := url.GetParam(constant.HEALTH_CHECKER, constant.DEFAULT_HEALTH_CHECKER) r.checker = extension.GetConnChecker(checkerName, url) diff --git a/cluster/router/conncheck/conn_check_route_test.go b/cluster/router/conncheck/conn_check_route_test.go index 95b8d4a928b7ee92d0d2cf9d5e8f8f62442e77d0..c9aff023fb862015588eecbda104c647d3683660 100644 --- a/cluster/router/conncheck/conn_check_route_test.go +++ b/cluster/router/conncheck/conn_check_route_test.go @@ -48,11 +48,12 @@ const ( func TestConnCheckRouterRoute(t *testing.T) { defer protocol.CleanAllStatus() + notify := make(chan struct{}) consumerURL, _ := common.NewURL(connCheck1001URL) url1, _ := common.NewURL(fmt.Sprintf(connCheckRouteUrlFormat, connCheckRoute1010IP)) url2, _ := common.NewURL(fmt.Sprintf(connCheckRouteUrlFormat, connCheckRoute1011IP)) url3, _ := common.NewURL(fmt.Sprintf(connCheckRouteUrlFormat, connCheckRoute1012IP)) - hcr, _ := NewConnCheckRouter(consumerURL) + hcr, _ := NewConnCheckRouter(consumerURL, notify) var invokers []protocol.Invoker invoker1 := NewMockInvoker(url1) diff --git a/cluster/router/conncheck/factory.go b/cluster/router/conncheck/factory.go index 12498d18d85ea72b31a36c882ee46b5bc6769c01..a7b19aaea668b31006f8caa384067a3051ddb5ce 100644 --- a/cluster/router/conncheck/factory.go +++ b/cluster/router/conncheck/factory.go @@ -39,6 +39,6 @@ func newConnCheckRouteFactory() router.PriorityRouterFactory { } // NewPriorityRouter construct a new NewConnCheckRouter via url -func (f *ConnCheckRouteFactory) NewPriorityRouter(url *common.URL) (router.PriorityRouter, error) { - return NewConnCheckRouter(url) +func (f *ConnCheckRouteFactory) NewPriorityRouter(url *common.URL, notify chan struct{}) (router.PriorityRouter, error) { + return NewConnCheckRouter(url, notify) } diff --git a/cluster/router/healthcheck/factory.go b/cluster/router/healthcheck/factory.go index 40c9dd7ab9e431a16833507ee4093ff7fbff8c95..a9054c7714417426bcf916dddfe61099df69e26e 100644 --- a/cluster/router/healthcheck/factory.go +++ b/cluster/router/healthcheck/factory.go @@ -38,6 +38,6 @@ func newHealthCheckRouteFactory() router.PriorityRouterFactory { } // NewPriorityRouter construct a new NewHealthCheckRouter via url -func (f *HealthCheckRouteFactory) NewPriorityRouter(url *common.URL) (router.PriorityRouter, error) { - return NewHealthCheckRouter(url) +func (f *HealthCheckRouteFactory) NewPriorityRouter(url *common.URL, notify chan struct{}) (router.PriorityRouter, error) { + return NewHealthCheckRouter(url, notify) } diff --git a/cluster/router/healthcheck/health_check_route.go b/cluster/router/healthcheck/health_check_route.go index 42a802f947fd938c0f0b5a87dea70837b9cd22fa..bd9543e1b27586408293bb9dcbed35a44036d3c7 100644 --- a/cluster/router/healthcheck/health_check_route.go +++ b/cluster/router/healthcheck/health_check_route.go @@ -41,13 +41,15 @@ type HealthCheckRouter struct { url *common.URL enabled bool checker router.HealthChecker + notify chan struct{} } // NewHealthCheckRouter construct an HealthCheckRouter via url -func NewHealthCheckRouter(url *common.URL) (router.PriorityRouter, error) { +func NewHealthCheckRouter(url *common.URL, notify chan struct{}) (router.PriorityRouter, error) { r := &HealthCheckRouter{ url: url, enabled: url.GetParamBool(constant.HEALTH_ROUTE_ENABLED_KEY, false), + notify: notify, } if r.enabled { checkerName := url.GetParam(constant.HEALTH_CHECKER, constant.DEFAULT_HEALTH_CHECKER) diff --git a/cluster/router/healthcheck/health_check_route_test.go b/cluster/router/healthcheck/health_check_route_test.go index f088be531ade713d30d29f51cdb2ff1f2ae55584..62b690dcd6fd2144a5265d8e9b6ce41d482df14f 100644 --- a/cluster/router/healthcheck/health_check_route_test.go +++ b/cluster/router/healthcheck/health_check_route_test.go @@ -49,12 +49,13 @@ const ( func TestHealthCheckRouterRoute(t *testing.T) { defer protocol.CleanAllStatus() + notify := make(chan struct{}) consumerURL, _ := common.NewURL(healthCheck1001URL) consumerURL.SetParam(constant.HEALTH_ROUTE_ENABLED_KEY, "true") url1, _ := common.NewURL(fmt.Sprintf(healthCheckRouteUrlFormat, healthCheckRoute1010IP)) url2, _ := common.NewURL(fmt.Sprintf(healthCheckRouteUrlFormat, healthCheckRoute1011IP)) url3, _ := common.NewURL(fmt.Sprintf(healthCheckRouteUrlFormat, healthCheckRoute1012IP)) - hcr, _ := NewHealthCheckRouter(consumerURL) + hcr, _ := NewHealthCheckRouter(consumerURL, notify) var invokers []protocol.Invoker invoker1 := NewMockInvoker(url1) @@ -112,13 +113,14 @@ func TestHealthCheckRouterRoute(t *testing.T) { func TestNewHealthCheckRouter(t *testing.T) { defer protocol.CleanAllStatus() + notify := make(chan struct{}) url, _ := common.NewURL(fmt.Sprintf(healthCheckDubboUrlFormat, healthCheckDubbo1010IP)) - hcr, _ := NewHealthCheckRouter(url) + hcr, _ := NewHealthCheckRouter(url, notify) h := hcr.(*HealthCheckRouter) assert.Nil(t, h.checker) url.SetParam(constant.HEALTH_ROUTE_ENABLED_KEY, "true") - hcr, _ = NewHealthCheckRouter(url) + hcr, _ = NewHealthCheckRouter(url, notify) h = hcr.(*HealthCheckRouter) assert.NotNil(t, h.checker) @@ -130,7 +132,7 @@ func TestNewHealthCheckRouter(t *testing.T) { url.SetParam(constant.CIRCUIT_TRIPPED_TIMEOUT_FACTOR_KEY, "500") url.SetParam(constant.SUCCESSIVE_FAILED_REQUEST_THRESHOLD_KEY, "10") url.SetParam(constant.OUTSTANDING_REQUEST_COUNT_LIMIT_KEY, "1000") - hcr, _ = NewHealthCheckRouter(url) + hcr, _ = NewHealthCheckRouter(url, notify) h = hcr.(*HealthCheckRouter) dhc = h.checker.(*DefaultHealthChecker) assert.Equal(t, dhc.outStandingRequestConutLimit, int32(1000)) diff --git a/cluster/router/router.go b/cluster/router/router.go index 8a19dcf8cc2540ab0a30b8b6de8521e9759012ee..1d71554f6b547bce86a2f960ceebfcf6c2a1daa2 100644 --- a/cluster/router/router.go +++ b/cluster/router/router.go @@ -30,7 +30,7 @@ import ( // PriorityRouterFactory creates creates priority router with url type PriorityRouterFactory interface { // NewPriorityRouter creates router instance with URL - NewPriorityRouter(*common.URL) (PriorityRouter, error) + NewPriorityRouter(*common.URL, chan struct{}) (PriorityRouter, error) } // FilePriorityRouterFactory creates priority router with parse config file diff --git a/cluster/router/tag/factory.go b/cluster/router/tag/factory.go index a5d989cd31453f6d02eee9c5902dc3666defe4fe..fd2c15cddfe521a66f5ad4c5072fa0fb325a3201 100644 --- a/cluster/router/tag/factory.go +++ b/cluster/router/tag/factory.go @@ -37,8 +37,8 @@ func NewTagRouterFactory() router.PriorityRouterFactory { // NewPriorityRouter create a tagRouter by tagRouterFactory with a url // The url contains router configuration information -func (c *tagRouterFactory) NewPriorityRouter(url *common.URL) (router.PriorityRouter, error) { - return NewTagRouter(url) +func (c *tagRouterFactory) NewPriorityRouter(url *common.URL, notify chan struct{}) (router.PriorityRouter, error) { + return NewTagRouter(url, notify) } // NewFileRouter create a tagRouter by profile content diff --git a/cluster/router/tag/factory_test.go b/cluster/router/tag/factory_test.go index b350bb2a915ce9abae98ba46bda346fba7ad6b5b..85a0ff010ae185e8fde5a6c4cb2face823476bac 100644 --- a/cluster/router/tag/factory_test.go +++ b/cluster/router/tag/factory_test.go @@ -39,7 +39,8 @@ func TestTagRouterFactoryNewRouter(t *testing.T) { u1, err := common.NewURL(fmt.Sprintf(factoryFormat, factoryLocalIP)) assert.Nil(t, err) factory := NewTagRouterFactory() - tagRouter, e := factory.NewPriorityRouter(u1) + notify := make(chan struct{}) + tagRouter, e := factory.NewPriorityRouter(u1, notify) assert.Nil(t, e) assert.NotNil(t, tagRouter) } diff --git a/cluster/router/tag/file.go b/cluster/router/tag/file.go index 94daf1508eb7b3f4d8a8cacdbd6ed634be6852da..85507f7eb189739f8e9354bb9a726b4148598a02 100644 --- a/cluster/router/tag/file.go +++ b/cluster/router/tag/file.go @@ -45,6 +45,7 @@ type FileTagRouter struct { } // NewFileTagRouter Create file tag router instance with content (from config file) +// todo fix this router, now it is useless, tag router is nil func NewFileTagRouter(content []byte) (*FileTagRouter, error) { fileRouter := &FileTagRouter{} rule, err := getRule(string(content)) @@ -52,7 +53,8 @@ func NewFileTagRouter(content []byte) (*FileTagRouter, error) { return nil, perrors.Errorf("yaml.Unmarshal() failed , error:%v", perrors.WithStack(err)) } fileRouter.routerRule = rule - fileRouter.router, err = NewTagRouter(fileRouter.URL()) + notify := make(chan struct{}) + fileRouter.router, err = NewTagRouter(fileRouter.URL(), notify) return fileRouter, err } diff --git a/cluster/router/tag/tag_router.go b/cluster/router/tag/tag_router.go index c7f53047c1e2beee4545a10302e9f307f06c33d8..87f944a25a175dc7a2ef319b87f3322795df51c7 100644 --- a/cluster/router/tag/tag_router.go +++ b/cluster/router/tag/tag_router.go @@ -76,10 +76,11 @@ type tagRouter struct { application string ruleChanged bool mutex sync.RWMutex + notify chan struct{} } // NewTagRouter returns a tagRouter instance if url is not nil -func NewTagRouter(url *common.URL) (*tagRouter, error) { +func NewTagRouter(url *common.URL, notify chan struct{}) (*tagRouter, error) { if url == nil { return nil, perrors.Errorf("Illegal route URL!") } @@ -87,6 +88,7 @@ func NewTagRouter(url *common.URL) (*tagRouter, error) { url: url, enabled: url.GetParamBool(constant.RouterEnabled, true), priority: url.GetParamInt(constant.RouterPriority, 0), + notify: notify, }, nil } @@ -191,6 +193,7 @@ func (c *tagRouter) Process(event *config_center.ConfigChangeEvent) { defer c.mutex.Unlock() c.tagRouterRule = routerRule c.ruleChanged = true + c.notify <- struct{}{} } // URL gets the url of tagRouter diff --git a/cluster/router/tag/tag_router_test.go b/cluster/router/tag/tag_router_test.go index 3f7b979f3d52d51ab50c94ea32e87b5eb4f9591e..68fd0623716e402a8563d462fc9f5e7987358c73 100644 --- a/cluster/router/tag/tag_router_test.go +++ b/cluster/router/tag/tag_router_test.go @@ -120,17 +120,19 @@ func (bi *MockInvoker) Destroy() { func TestTagRouterPriority(t *testing.T) { u1, err := common.NewURL(tagRouterTestUserConsumerTag) + notify := make(chan struct{}) assert.Nil(t, err) - tagRouter, e := NewTagRouter(u1) + tagRouter, e := NewTagRouter(u1, notify) assert.Nil(t, e) p := tagRouter.Priority() assert.Equal(t, int64(0), p) } func TestTagRouterRouteForce(t *testing.T) { + notify := make(chan struct{}) u1, e1 := common.NewURL(tagRouterTestUserConsumerTag) assert.Nil(t, e1) - tagRouter, e := NewTagRouter(u1) + tagRouter, e := NewTagRouter(u1, notify) assert.Nil(t, e) u2, e2 := common.NewURL(tagRouterTestHangZhouUrl) @@ -162,7 +164,8 @@ func TestTagRouterRouteForce(t *testing.T) { func TestTagRouterRouteNoForce(t *testing.T) { u1, e1 := common.NewURL(tagRouterTestUserConsumer) assert.Nil(t, e1) - tagRouter, e := NewTagRouter(u1) + notify := make(chan struct{}) + tagRouter, e := NewTagRouter(u1, notify) assert.Nil(t, e) u2, e2 := common.NewURL(tagRouterTestHangZhouUrl) @@ -225,7 +228,8 @@ func TestRouteBeijingInvoker(t *testing.T) { invokers = append(invokers, inv2, inv3, inv4, inv5) url, _ := common.NewURL(tagRouterTestBeijingUrl) - tagRouter, _ := NewTagRouter(url) + notify := make(chan struct{}) + tagRouter, _ := NewTagRouter(url, notify) rb := roaring.NewBitmap() rb.AddRange(0, uint64(len(invokers))) @@ -301,7 +305,8 @@ tags: url, e1 := common.NewURL(tagRouterTestUserConsumerTag) suite.Nil(e1) - tagRouter, err := NewTagRouter(url) + notify := make(chan struct{}) + tagRouter, err := NewTagRouter(url, notify) suite.Nil(err) suite.NotNil(tagRouter) suite.route = tagRouter @@ -365,7 +370,8 @@ func (suite *DynamicTagRouter) TestDynamicTagRouterByNoTagAndAddressMatch() { func TestProcess(t *testing.T) { u1, err := common.NewURL(tagRouterTestUserConsumerTag) assert.Nil(t, err) - tagRouter, e := NewTagRouter(u1) + notify := make(chan struct{}) + tagRouter, e := NewTagRouter(u1, notify) assert.Nil(t, e) assert.NotNil(t, tagRouter) @@ -383,12 +389,18 @@ tags: - name: hangzhou addresses: [192.168.1.3, 192.168.1.4] ` + go func() { + for { + <-notify + } + }() tagRouter.Process(&config_center.ConfigChangeEvent{Value: testYML, ConfigType: remoting.EventTypeAdd}) assert.NotNil(t, tagRouter.tagRouterRule) assert.Equal(t, []string{"beijing", "hangzhou"}, tagRouter.tagRouterRule.getTagNames()) assert.Equal(t, []string{"192.168.1.1", "192.168.1.2", "192.168.1.3", "192.168.1.4"}, tagRouter.tagRouterRule.getAddresses()) assert.Equal(t, []string{"192.168.1.3", "192.168.1.4"}, tagRouter.tagRouterRule.getTagNameToAddresses()["hangzhou"]) assert.Equal(t, []string{"beijing"}, tagRouter.tagRouterRule.getAddressToTagNames()["192.168.1.1"]) + tagRouter.Process(&config_center.ConfigChangeEvent{ConfigType: remoting.EventTypeDel}) assert.Nil(t, tagRouter.tagRouterRule) }