Skip to content
Snippets Groups Projects
Commit 7a227733 authored by AlexStocks's avatar AlexStocks
Browse files

merge 1.5

parents 281ef51b 1fc310ab
No related branches found
No related tags found
No related merge requests found
Showing
with 375 additions and 116 deletions
<!-- Thanks for sending a pull request!
Read https://github.com/apache/dubbo-go/blob/master/contributing.md before commit pull request.
Read https://github.com/apache/dubbo-go/blob/master/contributing.md before commit pull request.
-->
**What this PR does**:
......
......@@ -35,7 +35,7 @@ jobs:
uses: actions/checkout@v2
- name: Cache dependencies
uses: actions/cache@v2
uses: actions/cache@v2.1.4
with:
# Cache
path: ~/go/pkg/mod
......@@ -69,9 +69,9 @@ jobs:
run: |
make verify
- name: Integrate Test
run: |
chmod +x integrate_test.sh && ./integrate_test.sh ${{github.event.pull_request.head.repo.full_name}} ${{github.event.pull_request.head.sha}}
# - name: Integrate Test
# run: |
# chmod +x integrate_test.sh && ./integrate_test.sh ${{github.event.pull_request.head.repo.full_name}} ${{github.event.pull_request.head.sha}}
- name: Post Coverage
run: bash <(curl -s https://codecov.io/bash)
......
# Release Notes
---
## 1.5.6
### New Features
- [Add dubbo-go-cli telnet tool](https://github.com/apache/dubbo-go/pull/818)
- [Add Prox ImplementFunc to allow override impl](https://github.com/apache/dubbo-go/pull/1019)
- [Add read configuration path from the command line when start](https://github.com/apache/dubbo-go/pull/1039)
### Enhancement
- [introduce ConfigPostProcessor extension](https://github.com/apache/dubbo-go/pull/943)
- [Impl extension of two urls comparison](https://github.com/apache/dubbo-go/pull/854)
- [using event-driven to let router send signal to notify channel](https://github.com/apache/dubbo-go/pull/976)
- [lint codes](https://github.com/apache/dubbo-go/pull/941)
### Bugfixes
- [Fix: generic struct2MapAll key of map keep type](https://github.com/apache/dubbo-go/pull/928)
- [Fix: when events empty, delete all the invokers](https://github.com/apache/dubbo-go/pull/758)
- [Fix: file service discovery run in windows](https://github.com/apache/dubbo-go/pull/932)
- [Fix: make metadata report work without serviceDiscovery](https://github.com/apache/dubbo-go/pull/948)
- [Fix: consumer invoker cache set nil after the ZK connection is lost](https://github.com/apache/dubbo-go/pull/985)
- [Fix: integration test in Github action](https://github.com/apache/dubbo-go/pull/1012)
- [Fix: etcd exit panic](https://github.com/apache/dubbo-go/pull/1013)
- [Fix: when connect to provider fail, will occur panic](https://github.com/apache/dubbo-go/pull/1021)
- [Fix: support getty send Length, when the data transfer failed](https://github.com/apache/dubbo-go/pull/1028)
Milestone: [https://github.com/apache/dubbo-go/milestone/7](https://github.com/apache/dubbo-go/milestone/7?closed=1)
## 1.5.5
### New Features
- [Add Address notification batch mode](https://github.com/apache/dubbo-go/pull/741)
- [Add dubbo-gen stream support](https://github.com/apache/dubbo-go/pull/794)
- [Add Change verify to Makefile](https://github.com/apache/dubbo-go/pull/831)
- [Add more automatic components](https://github.com/apache/dubbo-go/pull/832)
- [Add grpc max message size config](https://github.com/apache/dubbo-go/pull/824)
### Enhancement
- [when it need local ip, it will get it every time. We can get local ip once, and reused it](https://github.com/apache/dubbo-go/pull/807)
- [enhance client's connectivity](https://github.com/apache/dubbo-go/pull/800)
- [Imp: get local ip once and reused it](https://github.com/apache/dubbo-go/pull/808)
- [Remove unmeaning logic](https://github.com/apache/dubbo-go/pull/855)
### Bugfixes
- [Fix: nacos registry can not get namespaceId](https://github.com/apache/dubbo-go/pull/778) [@peaman](https://github.com/peaman)
- [Fix: url encode](https://github.com/apache/dubbo-go/pull/802)
- [Fix: try to fix too many files open error](https://github.com/apache/dubbo-go/pull/797)
- [Fix: refact heartbeat](https://github.com/apache/dubbo-go/pull/889)
- [Fix: router_config add &url to url](https://github.com/apache/dubbo-go/pull/910)
- [Fix: Router chain can not build immediately when started](https://github.com/apache/dubbo-go/pull/927)
- [Fix: client block until timeout when provider return with PackageResponse_Exception](https://github.com/apache/dubbo-go/pull/926)
- [Fix: URL.String() data race panic](https://github.com/apache/dubbo-go/pull/944)
- [Fix: generic "encode hessian.Object"](https://github.com/apache/dubbo-go/pull/945)
### Dependencies
- [Bump github.com/mitchellh/mapstructure from 1.2.3 to 1.3.3](https://github.com/apache/dubbo-go/pull/838)
- [Bump github.com/go-resty/resty/v2 from 2.1.0 to 2.3.0](https://github.com/apache/dubbo-go/pull/837)
- [Bump github.com/opentracing/opentracing-go from 1.1.0 to 1.2.0](https://github.com/apache/dubbo-go/pull/836)
- [Bump github.com/creasty/defaults from 1.3.0 to 1.5.1](https://github.com/apache/dubbo-go/pull/835)
- [Bump github.com/dubbogo/gost from 1.9.1 to 1.9.2](https://github.com/apache/dubbo-go/pull/834)
- [Bump github.com/zouyx/agollo/v3 from 3.4.4 to 3.4.5](https://github.com/apache/dubbo-go/pull/845)
- [Bump github.com/golang/mock from 1.3.1 to 1.4.4](https://github.com/apache/dubbo-go/pull/844)
- [Bump github.com/nacos-group/nacos-sdk-go from 1.0.0 to 1.0.1](https://github.com/apache/dubbo-go/pull/843)
- [Bump github.com/magiconair/properties from 1.8.1 to 1.8.4](https://github.com/apache/dubbo-go/pull/861)
- [Bump github.com/prometheus/client_golang from 1.1.0 to 1.8.0 ](https://github.com/apache/dubbo-go/pull/860)
- [Bump go.uber.org/atomic from 1.6.0 to 1.7.0](https://github.com/apache/dubbo-go/pull/859)
- [](https://github.com/apache/dubbo-go/pull/843)
Milestone: [https://github.com/apache/dubbo-go/milestone/5](https://github.com/apache/dubbo-go/milestone/5?closed=1)
## 1.4.5
### Bugfixes
......@@ -7,6 +75,51 @@
## 1.5.4
### Bugfixes
- [Fix etcd cluster reconnect](https://github.com/apache/dubbo-go/pull/828)
- [Fix zookeeper deadlock problem](https://github.com/apache/dubbo-go/pull/826)
- [Fix generic struct2MapAll](https://github.com/apache/dubbo-go/pull/822)
- [Fix Consumer panic when restart provider](https://github.com/apache/dubbo-go/pull/803)
- [Fix etcd can not registry](https://github.com/apache/dubbo-go/pull/819) [@lin-jianjun](https://github.com/lin-jianjun)
- [Fix cannot call go provider service when used by java dubbo 2.7.7 version](https://github.com/apache/dubbo-go/pull/815) [@jack15083](https://github.com/jack15083)
- [Fix go client quit abnormally when it connects java server](https://github.com/apache/dubbo-go/pull/820) [@wenxuwan](https://github.com/wenxuwan)
- [Fix sentinel windows issue](https://github.com/apache/dubbo-go/pull/821) [@louyuting](https://github.com/louyuting)
- [Fix metadata default port](https://github.com/apache/dubbo-go/pull/821) [@sanxun0325](https://github.com/sanxun0325)
- [Fix consul can not destory](https://github.com/apache/dubbo-go/pull/788) [@LaurenceLiZhixin](https://github.com/LaurenceLiZhixin)
Milestone: [https://github.com/apache/dubbo-go/milestone/6](https://github.com/apache/dubbo-go/milestone/6?closed=1)
## 1.5.3
### New Features
- [Add consul service discovery](https://github.com/apache/dubbo-go/pull/701) [@zhangshen023](https://github.com/zhangshen023)
- [Add File system service discovery](https://github.com/apache/dubbo-go/pull/732) [@DogBaoBao](https://github.com/DogBaoBao)
- [Migrate travis Ci to Github Actions](https://github.com/apache/dubbo-go/pull/752) [@sdttttt](https://github.com/sdttttt)
- [Add sentinel-golang flow control/circuit breaker](https://github.com/apache/dubbo-go/pull/748) [@louyuting](https://github.com/louyuting)
- [Add dubbo-go docs and blog into doc directory](https://github.com/apache/dubbo-go/pull/767) [@oaoit](https://github.com/oaoit)
### Enhancement
- [Add address notification batch mode](https://github.com/apache/dubbo-go/pull/741) [@beiwei30](https://github.com/beiwei30)
- [Refactor network and codec model](https://github.com/apache/dubbo-go/pull/673) [@fangyincheng](https://github.com/fangyincheng) [@georgehao](https://github.com/georgehao)
- [Remove unnecessary return and judgement](https://github.com/apache/dubbo-go/pull/730) [@YongHaoWu](https://github.com/YongHaoWu)
- [Improve exporter append method](https://github.com/apache/dubbo-go/pull/722) [@gaoxinge](https://github.com/gaoxinge)
- [Refactor for proxyInvoker cannot be extended](https://github.com/apache/dubbo-go/pull/747) [@cvictory](https://github.com/cvictory)
- [Refactor attachment type from map\[string\]stiring to map\[string\]interface{}](https://github.com/apache/dubbo-go/pull/713) [@cvictory](https://github.com/cvictory)
- [Improve map access concurrency](https://github.com/apache/dubbo-go/pull/739) [@skyao](https://github.com/skyao)
- [Improve code quantity](https://github.com/apache/dubbo-go/pull/763) [@gaoxinge](https://github.com/gaoxinge)
### Bugfixes
- [Fix etcdv3 lease](https://github.com/apache/dubbo-go/pull/738) [@zhangshen023](https://github.com/zhangshen023)
- [Fix rename SethealthChecker to SetHealthChecker](https://github.com/apache/dubbo-go/pull/746) [@watermelo](https://github.com/watermelo)
- [Fix init config problem in HystrixFilter](https://github.com/apache/dubbo-go/pull/731) [@YGrylls](https://github.com/YGrylls)
- [Fix zookeeper listener report error after started](https://github.com/apache/dubbo-go/pull/735) [@wenxuwan](https://github.com/wenxuwan)
Milestone: [https://github.com/apache/dubbo-go/milestone/4](https://github.com/apache/dubbo-go/milestone/4?closed=1)
Project: [https://github.com/apache/dubbo-go/projects/10](https://github.com/apache/dubbo-go/projects/10)
## 1.5.4
### Bugfixes
- [Fix etcd cluster reconnect](https://github.com/apache/dubbo-go/pull/828)
- [Fix zookeeper deadlock problem](https://github.com/apache/dubbo-go/pull/826)
......@@ -171,7 +284,7 @@ Project: [https://github.com/apache/dubbo-go/projects/8](https://github.com/apac
- [Add grpc protocol](https://github.com/apache/dubbo-go/pull/311)
### Enhancement
- [The SIGSYS and SIGSTOP are not supported in windows platform](https://github.com/apache/dubbo-go/pull/262)
- [Error should be returned when `NewURL` failed](https://github.com/apache/dubbo-go/pull/266)
- [Split config center GetConfig method](https://github.com/apache/dubbo-go/pull/267)
......@@ -181,7 +294,7 @@ Project: [https://github.com/apache/dubbo-go/projects/8](https://github.com/apac
- [Change zk version and add base_registry](https://github.com/apache/dubbo-go/pull/355)
### Bugfixes
- [Fix negative wait group count](https://github.com/apache/dubbo-go/pull/253)
- [After disconnection with ZK registry, cosumer can't listen to provider changes](https://github.com/apache/dubbo-go/pull/258)
- [The generic filter and default reference filters lack ','](https://github.com/apache/dubbo-go/pull/260)
......
# Apache Dubbo-go [中文](./README_CN.md) #
[![Build Status](https://travis-ci.org/apache/dubbo-go.svg?branch=master)](https://travis-ci.org/apache/dubbo-go)
[![Build Status](https://github.com/apache/dubbo-go/workflows/CI/badge.svg)](https://travis-ci.org/apache/dubbo-go)
[![codecov](https://codecov.io/gh/apache/dubbo-go/branch/master/graph/badge.svg)](https://codecov.io/gh/apache/dubbo-go)
[![go.dev reference](https://img.shields.io/badge/go.dev-reference-007d9c?logo=go&logoColor=white&style=flat-square)](https://pkg.go.dev/github.com/apache/dubbo-go?tab=doc)
[![Go Report Card](https://goreportcard.com/badge/github.com/apache/dubbo-go)](https://goreportcard.com/report/github.com/apache/dubbo-go)
......@@ -8,7 +8,7 @@
---
Apache Dubbo Go Implementation.
Apache Dubbo Go Implementation to bridge the gap between java and go.
## License
......@@ -45,7 +45,7 @@ Both extension module and layered project architecture is according to Apache Du
![dubbo go extend](./doc/pic/arch/dubbo-go-ext.png)
If you wanna know more about dubbo-go, please visit this reference [Project Architecture design](https://github.com/apache/dubbo-go/wiki/dubbo-go-V1.0-design)
If you want to know more about dubbo-go, please visit this reference [Project Architecture design](https://github.com/apache/dubbo-go/wiki/dubbo-go-V1.0-design)
## Feature list ##
......@@ -179,8 +179,21 @@ If you are willing to do some code contributions and document contributions to [
If u want to communicate with our community, pls scan the following [dubbobo Ding-Ding QR code](https://mmbiz.qpic.cn/mmbiz_jpg/yvBJb5IiafvnHVBdtia30dxA2hKotr9DEckWsZ7aOJcDWDaSVMGwLmYv8GRgIQtqb4C2svicp8nVkMmGy7yKC5tyA/640?wx_fmt=jpeg&tp=webp&wxfrom=5&wx_lazy=1&wx_co=1) or search our commnity DingDing group code 31363295.
<a href="http://alexstocks.github.io/html/dubbogo.html">
<img src="./doc/pic/misc/dubbogo-dingding.png" height="80" width="80"></a>
<div>
<table>
<tbody>
<tr></tr>
<tr>
<td align="center" valign="middle">
<a href="http://alexstocks.github.io/html/dubbogo.html" target="_blank">
<img width="80px" height="85px" src="./doc/pic/misc/dubbogo-dingding.png">
</a>
</td>
</tr>
<tr></tr>
</tbody>
</table>
</div>
If u want to visit the wechat group, pls add my wechat AlexanderStocks.
......@@ -194,7 +207,7 @@ About dubbo-go benchmarking report, please refer to [dubbo benchmarking report](
## [User List](https://github.com/apache/dubbo-go/issues/2)
If you are using [apache/dubbo-go](github.com/apache/dubbo-go) and think that it helps you or want do some contributions to it, please add your company to to [the user list](https://github.com/apache/dubbo-go/issues/2) to let us know your needs.
If you are using [apache/dubbo-go](https://github.com/apache/dubbo-go) and think that it helps you or want do some contributions to it, please add your company to to [the user list](https://github.com/apache/dubbo-go/issues/2) to let us know your needs.
<div>
......
# Apache Dubbo-go [English](./README.md) #
[![Build Status](https://travis-ci.org/apache/dubbo-go.svg?branch=master)](https://travis-ci.org/apache/dubbo-go)
[![Build Status](https://github.com/apache/dubbo-go/workflows/CI/badge.svg)](https://travis-ci.org/apache/dubbo-go)
[![codecov](https://codecov.io/gh/apache/dubbo-go/branch/master/graph/badge.svg)](https://codecov.io/gh/apache/dubbo-go)
[![go.dev reference](https://img.shields.io/badge/go.dev-reference-007d9c?logo=go&logoColor=white&style=flat-square)](https://pkg.go.dev/github.com/apache/dubbo-go?tab=doc)
[![Go Report Card](https://goreportcard.com/badge/github.com/apache/dubbo-go)](https://goreportcard.com/report/github.com/apache/dubbo-go)
![license](https://img.shields.io/badge/license-Apache--2.0-green.svg)
---
Apache Dubbo Go 语言实现
Apache Dubbo Go 语言实现,架起java和go之间的桥梁,与 gRPC/Spring Cloud 生态互联互通,带领Java生态享受云原生时代的技术红利。
## 证书 ##
......@@ -177,12 +178,25 @@ make test
如果想访问官方钉钉群,请在钉钉中搜索社区群号 31363295 或者 扫描如下[二维码](https://mmbiz.qpic.cn/mmbiz_jpg/yvBJb5IiafvnHVBdtia30dxA2hKotr9DEckWsZ7aOJcDWDaSVMGwLmYv8GRgIQtqb4C2svicp8nVkMmGy7yKC5tyA/640?wx_fmt=jpeg&tp=webp&wxfrom=5&wx_lazy=1&wx_co=1)
<a href="http://alexstocks.github.io/html/dubbogo.html">
<img src="./doc/pic/misc/dubbogo-dingding.png" height="80" width="80"></a>
<div>
<table>
<tbody>
<tr></tr>
<tr>
<td align="center" valign="middle">
<a href="http://alexstocks.github.io/html/dubbogo.html" target="_blank">
<img width="80px" height="85px" src="./doc/pic/misc/dubbogo-dingding.png">
</a>
</td>
</tr>
<tr></tr>
</tbody>
</table>
</div>
如果想加入到社区微信群,可以先添加社区负责人 于雨 的微信 AlexanderStocks 。
如果想加入到社区微信群,可以先添加社区负责人 于雨 的微信 AlexanderStocks 。添加微信之前,请先给 dubbo-go 点 star 作为对项目的支持,添加好友时请报上 github ID 以进行验证。
作为一个维护已经帮助构建了经受多家大型微服务系统的社区,我们足以为现有的成绩感到自豪。社区欢迎能提出建设性意见者。
作为一个维护已经帮助构建了经受多家大型微服务系统的社区,我们足以为现有的成绩感到自豪。社区欢迎能提出建设性意见者,只知索取者和喷子请绕行
## 性能测试 ##
......@@ -192,7 +206,7 @@ make test
## [User List](https://github.com/apache/dubbo-go/issues/2)
若你正在使用 [apache/dubbo-go](github.com/apache/dubbo-go) 且认为其有用或者想对其做改进,请添列贵司信息于 [用户列表](https://github.com/apache/dubbo-go/issues/2),以便我们知晓。
若你正在使用 [apache/dubbo-go](https://github.com/apache/dubbo-go) 且认为其有用或者想对其做改进,请添列贵司信息于 [用户列表](https://github.com/apache/dubbo-go/issues/2),以便我们知晓。
<div>
<table>
......
......@@ -121,41 +121,49 @@ func (invoker *baseClusterInvoker) doSelect(lb cluster.LoadBalance, invocation p
func (invoker *baseClusterInvoker) doSelectInvoker(lb cluster.LoadBalance, invocation protocol.Invocation, invokers []protocol.Invoker, invoked []protocol.Invoker) protocol.Invoker {
if len(invokers) == 0 {
logger.Errorf("the invokers of %s is nil. ", invocation.Invoker().GetUrl().ServiceKey())
return nil
}
go protocol.TryRefreshBlackList()
if len(invokers) == 1 {
return invokers[0]
if invokers[0].IsAvailable() {
return invokers[0]
}
protocol.SetInvokerUnhealthyStatus(invokers[0])
logger.Errorf("the invokers of %s is nil. ", invokers[0].GetUrl().ServiceKey())
return nil
}
selectedInvoker := lb.Select(invokers, invocation)
//judge to if the selectedInvoker is invoked
//judge if the selected Invoker is invoked and available
if (!selectedInvoker.IsAvailable() && invoker.availablecheck) || isInvoked(selectedInvoker, invoked) {
protocol.SetInvokerUnhealthyStatus(selectedInvoker)
otherInvokers := getOtherInvokers(invokers, selectedInvoker)
// do reselect
var reslectInvokers []protocol.Invoker
for _, invoker := range invokers {
if !invoker.IsAvailable() {
for i := 0; i < 3; i++ {
if len(otherInvokers) == 0 {
// no other ivk to reselect, return to fallback
break
}
reselectedInvoker := lb.Select(otherInvokers, invocation)
if isInvoked(reselectedInvoker, invoked) {
otherInvokers = getOtherInvokers(otherInvokers, reselectedInvoker)
continue
}
if !reselectedInvoker.IsAvailable() {
logger.Infof("the invoker of %s is not available, maybe some network error happened or the server is shutdown.",
invoker.GetUrl().Ip)
protocol.SetInvokerUnhealthyStatus(reselectedInvoker)
otherInvokers = getOtherInvokers(otherInvokers, reselectedInvoker)
continue
}
if !isInvoked(invoker, invoked) {
reslectInvokers = append(reslectInvokers, invoker)
}
}
if len(reslectInvokers) > 0 {
selectedInvoker = lb.Select(reslectInvokers, invocation)
} else {
logger.Errorf("all %d invokers is unavailable for %s.", len(invokers), selectedInvoker.GetUrl().String())
return nil
return reselectedInvoker
}
} else {
return selectedInvoker
}
return selectedInvoker
logger.Errorf("all %d invokers is unavailable for %s.", len(invokers), selectedInvoker.GetUrl().String())
return nil
}
func (invoker *baseClusterInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result {
......@@ -194,3 +202,13 @@ func getLoadBalance(invoker protocol.Invoker, invocation protocol.Invocation) cl
}
return extension.GetLoadbalance(lb)
}
func getOtherInvokers(invokers []protocol.Invoker, invoker protocol.Invoker) []protocol.Invoker {
otherInvokers := make([]protocol.Invoker, 0)
for _, i := range invokers {
if i != invoker {
otherInvokers = append(otherInvokers, i)
}
}
return otherInvokers
}
......@@ -72,6 +72,8 @@ func TestFailbackSuceess(t *testing.T) {
invoker.EXPECT().GetUrl().Return(failbackUrl).AnyTimes()
invoker.EXPECT().IsAvailable().Return(true)
mockResult := &protocol.RPCResult{Rest: rest{tried: 0, success: true}}
invoker.EXPECT().Invoke(gomock.Any()).Return(mockResult)
......@@ -88,6 +90,7 @@ func TestFailbackRetryOneSuccess(t *testing.T) {
clusterInvoker := registerFailback(invoker).(*failbackClusterInvoker)
invoker.EXPECT().GetUrl().Return(failbackUrl).AnyTimes()
invoker.EXPECT().IsAvailable().Return(true)
// failed at first
mockFailedResult := &protocol.RPCResult{Err: perrors.New("error")}
......@@ -98,6 +101,7 @@ func TestFailbackRetryOneSuccess(t *testing.T) {
wg.Add(1)
now := time.Now()
mockSuccResult := &protocol.RPCResult{Rest: rest{tried: 0, success: true}}
invoker.EXPECT().IsAvailable().Return(true)
invoker.EXPECT().Invoke(gomock.Any()).DoAndReturn(func(protocol.Invocation) protocol.Result {
delta := time.Since(now).Nanoseconds() / int64(time.Second)
assert.True(t, delta >= 5)
......@@ -131,6 +135,7 @@ func TestFailbackRetryFailed(t *testing.T) {
clusterInvoker := registerFailback(invoker).(*failbackClusterInvoker)
invoker.EXPECT().GetUrl().Return(failbackUrl).AnyTimes()
invoker.EXPECT().IsAvailable().Return(true).AnyTimes()
mockFailedResult := &protocol.RPCResult{Err: perrors.New("error")}
invoker.EXPECT().Invoke(gomock.Any()).Return(mockFailedResult)
......@@ -177,6 +182,7 @@ func TestFailbackRetryFailed10Times(t *testing.T) {
clusterInvoker := registerFailback(invoker).(*failbackClusterInvoker)
clusterInvoker.maxRetries = 10
invoker.EXPECT().IsAvailable().Return(true).AnyTimes()
invoker.EXPECT().GetUrl().Return(failbackUrl).AnyTimes()
// 10 task should failed firstly.
......@@ -220,6 +226,7 @@ func TestFailbackOutOfLimit(t *testing.T) {
clusterInvoker.failbackTasks = 1
invoker.EXPECT().GetUrl().Return(failbackUrl).AnyTimes()
invoker.EXPECT().IsAvailable().Return(true).AnyTimes()
mockFailedResult := &protocol.RPCResult{Err: perrors.New("error")}
invoker.EXPECT().Invoke(gomock.Any()).Return(mockFailedResult).Times(11)
......
......@@ -53,6 +53,7 @@ func registerFailfast(invoker *mock.MockInvoker) protocol.Invoker {
invokers := []protocol.Invoker{}
invokers = append(invokers, invoker)
invoker.EXPECT().IsAvailable().Return(true).AnyTimes()
invoker.EXPECT().GetUrl().Return(failfastUrl)
staticDir := directory.NewStaticDirectory(invokers)
......@@ -67,6 +68,7 @@ func TestFailfastInvokeSuccess(t *testing.T) {
invoker := mock.NewMockInvoker(ctrl)
clusterInvoker := registerFailfast(invoker)
invoker.EXPECT().IsAvailable().Return(true).AnyTimes()
invoker.EXPECT().GetUrl().Return(failfastUrl).AnyTimes()
mockResult := &protocol.RPCResult{Rest: rest{tried: 0, success: true}}
......@@ -87,6 +89,7 @@ func TestFailfastInvokeFail(t *testing.T) {
invoker := mock.NewMockInvoker(ctrl)
clusterInvoker := registerFailfast(invoker)
invoker.EXPECT().IsAvailable().Return(true).AnyTimes()
invoker.EXPECT().GetUrl().Return(failfastUrl).AnyTimes()
mockResult := &protocol.RPCResult{Err: perrors.New("error")}
......
......@@ -52,6 +52,7 @@ func registerFailsafe(invoker *mock.MockInvoker) protocol.Invoker {
invokers := []protocol.Invoker{}
invokers = append(invokers, invoker)
invoker.EXPECT().IsAvailable().Return(true).AnyTimes()
invoker.EXPECT().GetUrl().Return(failbackUrl)
......@@ -67,6 +68,8 @@ func TestFailSafeInvokeSuccess(t *testing.T) {
invoker := mock.NewMockInvoker(ctrl)
clusterInvoker := registerFailsafe(invoker)
invoker.EXPECT().IsAvailable().Return(true).AnyTimes()
invoker.EXPECT().GetUrl().Return(failsafeUrl).AnyTimes()
mockResult := &protocol.RPCResult{Rest: rest{tried: 0, success: true}}
......@@ -85,6 +88,7 @@ func TestFailSafeInvokeFail(t *testing.T) {
invoker := mock.NewMockInvoker(ctrl)
clusterInvoker := registerFailsafe(invoker)
invoker.EXPECT().IsAvailable().Return(true).AnyTimes()
invoker.EXPECT().GetUrl().Return(failsafeUrl).AnyTimes()
......
......@@ -82,6 +82,8 @@ func (dir *BaseDirectory) SetRouters(urls []*common.URL) {
routers := make([]router.PriorityRouter, 0, len(urls))
rc := dir.routerChain
for _, url := range urls {
routerKey := url.GetParam(constant.ROUTER_KEY, "")
......@@ -94,7 +96,7 @@ func (dir *BaseDirectory) SetRouters(urls []*common.URL) {
}
}
factory := extension.GetRouterFactory(url.Protocol)
r, err := factory.NewPriorityRouter(url)
r, err := factory.NewPriorityRouter(url, rc.GetNotifyChan())
if err != nil {
logger.Errorf("Create router fail. router key: %s, url:%s, error: %+v", routerKey, url.Service(), err)
return
......@@ -104,10 +106,8 @@ func (dir *BaseDirectory) SetRouters(urls []*common.URL) {
logger.Infof("Init file condition router success, size: %v", len(routers))
dir.mutex.Lock()
rc := dir.routerChain
dir.mutex.Unlock()
rc.AddRouters(routers)
dir.mutex.Unlock()
}
func (dir *BaseDirectory) isProperRouter(url *common.URL) bool {
......
......@@ -28,6 +28,7 @@ import (
)
import (
"github.com/apache/dubbo-go/cluster/router/chain"
_ "github.com/apache/dubbo-go/cluster/router/condition"
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
......@@ -50,7 +51,9 @@ func TestBuildRouterChain(t *testing.T) {
regURL := url
regURL.AddParam(constant.INTERFACE_KEY, "mock-app")
directory := NewBaseDirectory(regURL)
var err error
directory.routerChain, err = chain.NewRouterChain(regURL)
assert.Nil(t, err)
localIP := common.GetLocalIp()
rule := base64.URLEncoding.EncodeToString([]byte("true => " + " host = " + localIP))
routeURL := getRouteURL(rule, anyURL)
......
......@@ -29,4 +29,6 @@ type Chain interface {
SetInvokers([]protocol.Invoker)
// AddRouters Add routers
AddRouters([]PriorityRouter)
// GetNotifyChan get notify channel of this chain
GetNotifyChan() chan struct{}
}
......@@ -20,12 +20,12 @@ package chain
import (
"sort"
"sync"
"sync/atomic"
"time"
)
import (
perrors "github.com/pkg/errors"
"go.uber.org/atomic"
)
import (
......@@ -38,9 +38,7 @@ import (
)
const (
timeInterval = 5 * time.Second
timeThreshold = 2 * time.Second
countThreshold = 5
timeInterval = 5 * time.Second
)
// RouterChain Router chain
......@@ -65,8 +63,10 @@ type RouterChain struct {
notify chan struct{}
// Address cache
cache atomic.Value
// init
init sync.Once
}
func (c *RouterChain) GetNotifyChan() chan struct{} {
return c.notify
}
// Route Loop routers in RouterChain and call Route method to determine the target invokers list.
......@@ -104,6 +104,9 @@ func (c *RouterChain) AddRouters(routers []router.PriorityRouter) {
c.mutex.Lock()
defer c.mutex.Unlock()
c.routers = newRouters
go func() {
c.notify <- struct{}{}
}()
}
// SetInvokers receives updated invokers from registry center. If the times of notification exceeds countThreshold and
......@@ -113,32 +116,21 @@ func (c *RouterChain) SetInvokers(invokers []protocol.Invoker) {
c.invokers = invokers
c.mutex.Unlock()
// it should trigger init router for first call
c.init.Do(func() {
go func() {
c.notify <- struct{}{}
}()
})
c.count++
now := time.Now()
if c.count >= countThreshold && now.Sub(c.last) >= timeThreshold {
c.last = now
c.count = 0
go func() {
c.notify <- struct{}{}
}()
}
go func() {
c.notify <- struct{}{}
}()
}
// loop listens on events to update the address cache when it's necessary, either when it receives notification
// from address update, or when timeInterval exceeds.
// loop listens on events to update the address cache when it receives notification
// from address update,
func (c *RouterChain) loop() {
ticker := time.NewTicker(timeInterval)
for {
select {
case <-ticker.C:
c.buildCache()
if protocol.GetAndRefreshState() {
c.buildCache()
}
case <-c.notify:
c.buildCache()
}
......@@ -235,9 +227,15 @@ func NewRouterChain(url *common.URL) (*RouterChain, error) {
if len(routerFactories) == 0 {
return nil, perrors.Errorf("No routerFactory exits , create one please")
}
chain := &RouterChain{
last: time.Now(),
notify: make(chan struct{}),
}
routers := make([]router.PriorityRouter, 0, len(routerFactories))
for key, routerFactory := range routerFactories {
r, err := routerFactory().NewPriorityRouter(url)
r, err := routerFactory().NewPriorityRouter(url, chain.notify)
if r == nil || err != nil {
logger.Errorf("router chain build router fail! routerFactories key:%s error:%s", key, err.Error())
continue
......@@ -250,12 +248,10 @@ func NewRouterChain(url *common.URL) (*RouterChain, error) {
sortRouter(newRouters)
chain := &RouterChain{
builtinRouters: routers,
routers: newRouters,
last: time.Now(),
notify: make(chan struct{}),
}
routerNeedsUpdateInit := atomic.Bool{}
routerNeedsUpdateInit.Store(false)
chain.routers = newRouters
chain.builtinRouters = routers
if url != nil {
chain.url = url
}
......
......@@ -148,7 +148,12 @@ conditions:
url := getConditionRouteUrl(applicationKey)
assert.NotNil(t, url)
factory := extension.GetRouterFactory(url.Protocol)
r, err := factory.NewPriorityRouter(url)
notify := make(chan struct{})
go func() {
for range notify {
}
}()
r, err := factory.NewPriorityRouter(url, notify)
assert.Nil(t, err)
assert.NotNil(t, r)
......
......@@ -34,14 +34,15 @@ const (
// AppRouter For listen application router with config center
type AppRouter struct {
listenableRouter
notify interface{}
}
// NewAppRouter Init AppRouter by url
func NewAppRouter(url *common.URL) (*AppRouter, error) {
func NewAppRouter(url *common.URL, notify chan struct{}) (*AppRouter, error) {
if url == nil {
return nil, perrors.Errorf("No route URL for create app router!")
}
appRouter, err := newListenableRouter(url, url.GetParam(constant.APPLICATION_KEY, ""))
appRouter, err := newListenableRouter(url, url.GetParam(constant.APPLICATION_KEY, ""), notify)
if err != nil {
return nil, err
}
......
......@@ -81,7 +81,12 @@ conditions:
assert.NotNil(t, configuration)
appRouteURL := getAppRouteURL(routerKey)
appRouter, err := NewAppRouter(appRouteURL)
notify := make(chan struct{})
go func() {
for range notify {
}
}()
appRouter, err := NewAppRouter(appRouteURL, notify)
assert.Nil(t, err)
assert.NotNil(t, appRouter)
......@@ -109,7 +114,7 @@ force: true
runtime: false
conditions:
- => host != 172.22.3.91
- host = 192.168.199.208 => host = 192.168.199.208
- host = 192.168.199.208 => host = 192.168.199.208
`
ts, z, _, err := zookeeper.NewMockZookeeperClient("test", 15*time.Second)
assert.NoError(t, err)
......@@ -132,7 +137,12 @@ conditions:
assert.NotNil(t, configuration)
appRouteURL := getAppRouteURL(routerKey)
appRouter, err := NewAppRouter(appRouteURL)
notify := make(chan struct{})
go func() {
for range notify {
}
}()
appRouter, err := NewAppRouter(appRouteURL, notify)
assert.Nil(t, err)
assert.NotNil(t, appRouter)
......@@ -174,7 +184,12 @@ conditions:
assert.NotNil(t, configuration)
appRouteURL := getAppRouteURL(routerKey)
appRouter, err := NewAppRouter(appRouteURL)
notify := make(chan struct{})
go func() {
for range notify {
}
}()
appRouter, err := NewAppRouter(appRouteURL, notify)
assert.Nil(t, err)
assert.NotNil(t, appRouter)
......
......@@ -37,8 +37,8 @@ func newConditionRouterFactory() router.PriorityRouterFactory {
}
// NewPriorityRouter creates ConditionRouterFactory by URL
func (c *ConditionRouterFactory) NewPriorityRouter(url *common.URL) (router.PriorityRouter, error) {
return NewConditionRouter(url)
func (c *ConditionRouterFactory) NewPriorityRouter(url *common.URL, notify chan struct{}) (router.PriorityRouter, error) {
return NewConditionRouter(url, notify)
}
// NewRouter Create FileRouterFactory by Content
......@@ -54,6 +54,6 @@ func newAppRouterFactory() router.PriorityRouterFactory {
}
// NewPriorityRouter creates AppRouterFactory by URL
func (c *AppRouterFactory) NewPriorityRouter(url *common.URL) (router.PriorityRouter, error) {
return NewAppRouter(url)
func (c *AppRouterFactory) NewPriorityRouter(url *common.URL, notify chan struct{}) (router.PriorityRouter, error) {
return NewAppRouter(url, notify)
}
......@@ -132,37 +132,47 @@ func (bi *MockInvoker) Destroy() {
func TestRoute_matchWhen(t *testing.T) {
inv := &invocation.RPCInvocation{}
rule := base64.URLEncoding.EncodeToString([]byte("=> host = 1.2.3.4"))
router, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule))
notify := make(chan struct{})
go func() {
for range notify {
}
}()
router, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule), notify)
cUrl, _ := common.NewURL(fmt.Sprintf(factoryDubboFormat, factory1111Ip))
matchWhen := router.(*ConditionRouter).MatchWhen(cUrl, inv)
assert.Equal(t, true, matchWhen)
rule1 := base64.URLEncoding.EncodeToString([]byte("host = 2.2.2.2,1.1.1.1,3.3.3.3 => host = 1.2.3.4"))
router1, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule1))
router1, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule1), notify)
matchWhen1 := router1.(*ConditionRouter).MatchWhen(cUrl, inv)
assert.Equal(t, true, matchWhen1)
rule2 := base64.URLEncoding.EncodeToString([]byte("host = 2.2.2.2,1.1.1.1,3.3.3.3 & host !=1.1.1.1 => host = 1.2.3.4"))
router2, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule2))
router2, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule2), notify)
matchWhen2 := router2.(*ConditionRouter).MatchWhen(cUrl, inv)
assert.Equal(t, false, matchWhen2)
rule3 := base64.URLEncoding.EncodeToString([]byte("host !=4.4.4.4 & host = 2.2.2.2,1.1.1.1,3.3.3.3 => host = 1.2.3.4"))
router3, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule3))
router3, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule3), notify)
matchWhen3 := router3.(*ConditionRouter).MatchWhen(cUrl, inv)
assert.Equal(t, true, matchWhen3)
rule4 := base64.URLEncoding.EncodeToString([]byte("host !=4.4.4.* & host = 2.2.2.2,1.1.1.1,3.3.3.3 => host = 1.2.3.4"))
router4, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule4))
router4, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule4), notify)
matchWhen4 := router4.(*ConditionRouter).MatchWhen(cUrl, inv)
assert.Equal(t, true, matchWhen4)
rule5 := base64.URLEncoding.EncodeToString([]byte("host = 2.2.2.2,1.1.1.*,3.3.3.3 & host != 1.1.1.1 => host = 1.2.3.4"))
router5, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule5))
router5, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule5), notify)
matchWhen5 := router5.(*ConditionRouter).MatchWhen(cUrl, inv)
assert.Equal(t, false, matchWhen5)
rule6 := base64.URLEncoding.EncodeToString([]byte("host = 2.2.2.2,1.1.1.*,3.3.3.3 & host != 1.1.1.2 => host = 1.2.3.4"))
router6, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule6))
router6, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule6), notify)
matchWhen6 := router6.(*ConditionRouter).MatchWhen(cUrl, inv)
assert.Equal(t, true, matchWhen6)
}
func TestRoute_matchFilter(t *testing.T) {
notify := make(chan struct{})
go func() {
for range notify {
}
}()
localIP := common.GetLocalIp()
t.Logf("The local ip is %s", localIP)
url1, _ := common.NewURL("dubbo://10.20.3.3:20880/com.foo.BarService?default.serialization=fastjson")
......@@ -175,12 +185,12 @@ func TestRoute_matchFilter(t *testing.T) {
rule4 := base64.URLEncoding.EncodeToString([]byte("host = " + localIP + " => " + " host = 10.20.3.2,10.20.3.3,10.20.3.4"))
rule5 := base64.URLEncoding.EncodeToString([]byte("host = " + localIP + " => " + " host != 10.20.3.3"))
rule6 := base64.URLEncoding.EncodeToString([]byte("host = " + localIP + " => " + " serialization = fastjson"))
router1, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule1))
router2, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule2))
router3, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule3))
router4, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule4))
router5, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule5))
router6, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule6))
router1, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule1), notify)
router2, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule2), notify)
router3, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule3), notify)
router4, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule4), notify)
router5, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule5), notify)
router6, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule6), notify)
cUrl, _ := common.NewURL(fmt.Sprintf(factoryConsumerFormat, localIP))
ret1 := router1.Route(utils.ToBitmap(invokers), setUpAddrCache(invokers), cUrl, &invocation.RPCInvocation{})
ret2 := router2.Route(utils.ToBitmap(invokers), setUpAddrCache(invokers), cUrl, &invocation.RPCInvocation{})
......@@ -198,9 +208,14 @@ func TestRoute_matchFilter(t *testing.T) {
}
func TestRoute_methodRoute(t *testing.T) {
notify := make(chan struct{})
go func() {
for range notify {
}
}()
inv := invocation.NewRPCInvocationWithOptions(invocation.WithMethodName("getFoo"), invocation.WithParameterTypes([]reflect.Type{}), invocation.WithArguments([]interface{}{}))
rule := base64.URLEncoding.EncodeToString([]byte("host !=4.4.4.* & host = 2.2.2.2,1.1.1.1,3.3.3.3 => host = 1.2.3.4"))
r, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule))
r, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule), notify)
url, _ := common.NewURL("consumer://1.1.1.1/com.foo.BarService?methods=setFoo,getFoo,findFoo")
matchWhen := r.(*ConditionRouter).MatchWhen(url, inv)
assert.Equal(t, true, matchWhen)
......@@ -209,42 +224,57 @@ func TestRoute_methodRoute(t *testing.T) {
assert.Equal(t, true, matchWhen)
url2, _ := common.NewURL(fmt.Sprintf(factoryConsumerMethodFormat, factory1111Ip))
rule2 := base64.URLEncoding.EncodeToString([]byte("methods=getFoo & host!=1.1.1.1 => host = 1.2.3.4"))
router2, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule2))
router2, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule2), notify)
matchWhen = router2.(*ConditionRouter).MatchWhen(url2, inv)
assert.Equal(t, false, matchWhen)
url3, _ := common.NewURL(fmt.Sprintf(factoryConsumerMethodFormat, factory1111Ip))
rule3 := base64.URLEncoding.EncodeToString([]byte("methods=getFoo & host=1.1.1.1 => host = 1.2.3.4"))
router3, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule3))
router3, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule3), notify)
matchWhen = router3.(*ConditionRouter).MatchWhen(url3, inv)
assert.Equal(t, true, matchWhen)
}
func TestRoute_ReturnFalse(t *testing.T) {
notify := make(chan struct{})
go func() {
for range notify {
}
}()
url, _ := common.NewURL("")
localIP := common.GetLocalIp()
invokers := []protocol.Invoker{NewMockInvoker(url, 1), NewMockInvoker(url, 2), NewMockInvoker(url, 3)}
inv := &invocation.RPCInvocation{}
rule := base64.URLEncoding.EncodeToString([]byte("host = " + localIP + " => false"))
curl, _ := common.NewURL(fmt.Sprintf(factoryConsumerFormat, localIP))
r, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule))
r, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule), notify)
ret := r.Route(utils.ToBitmap(invokers), setUpAddrCache(invokers), curl, inv)
assert.Equal(t, 0, len(ret.ToArray()))
}
func TestRoute_ReturnEmpty(t *testing.T) {
notify := make(chan struct{})
go func() {
for range notify {
}
}()
localIP := common.GetLocalIp()
url, _ := common.NewURL("")
invokers := []protocol.Invoker{NewMockInvoker(url, 1), NewMockInvoker(url, 2), NewMockInvoker(url, 3)}
inv := &invocation.RPCInvocation{}
rule := base64.URLEncoding.EncodeToString([]byte("host = " + localIP + " => "))
curl, _ := common.NewURL(fmt.Sprintf(factoryConsumerFormat, localIP))
r, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule))
r, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule), notify)
ret := r.Route(utils.ToBitmap(invokers), setUpAddrCache(invokers), curl, inv)
assert.Equal(t, 0, len(ret.ToArray()))
}
func TestRoute_ReturnAll(t *testing.T) {
notify := make(chan struct{})
go func() {
for range notify {
}
}()
localIP := common.GetLocalIp()
urlString := "dubbo://" + localIP + "/com.foo.BarService"
dubboURL, _ := common.NewURL(urlString)
......@@ -255,7 +285,7 @@ func TestRoute_ReturnAll(t *testing.T) {
inv := &invocation.RPCInvocation{}
rule := base64.URLEncoding.EncodeToString([]byte("host = " + localIP + " => " + " host = " + localIP))
curl, _ := common.NewURL(fmt.Sprintf(factoryConsumerFormat, localIP))
r, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule))
r, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule), notify)
ret := r.Route(utils.ToBitmap(invokers), setUpAddrCache(invokers), curl, inv)
assert.Equal(t, len(invokers), len(ret.ToArray()))
}
......@@ -265,6 +295,11 @@ func TestRoute_HostFilter(t *testing.T) {
url1, _ := common.NewURL(factory333URL)
url2, _ := common.NewURL(fmt.Sprintf(factoryDubboFormat, localIP))
url3, _ := common.NewURL(fmt.Sprintf(factoryDubboFormat, localIP))
notify := make(chan struct{})
go func() {
for range notify {
}
}()
invoker1 := NewMockInvoker(url1, 1)
invoker2 := NewMockInvoker(url2, 2)
invoker3 := NewMockInvoker(url3, 3)
......@@ -272,7 +307,7 @@ func TestRoute_HostFilter(t *testing.T) {
inv := &invocation.RPCInvocation{}
rule := base64.URLEncoding.EncodeToString([]byte("host = " + localIP + " => " + " host = " + localIP))
curl, _ := common.NewURL(fmt.Sprintf(factoryConsumerFormat, localIP))
r, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule))
r, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule), notify)
ret := r.Route(utils.ToBitmap(invokers), setUpAddrCache(invokers), curl, inv)
assert.Equal(t, 2, len(ret.ToArray()))
assert.Equal(t, invoker2, invokers[ret.ToArray()[0]])
......@@ -280,6 +315,11 @@ func TestRoute_HostFilter(t *testing.T) {
}
func TestRoute_Empty_HostFilter(t *testing.T) {
notify := make(chan struct{})
go func() {
for range notify {
}
}()
localIP := common.GetLocalIp()
url1, _ := common.NewURL(factory333URL)
url2, _ := common.NewURL(fmt.Sprintf(factoryDubboFormat, localIP))
......@@ -291,7 +331,7 @@ func TestRoute_Empty_HostFilter(t *testing.T) {
inv := &invocation.RPCInvocation{}
rule := base64.URLEncoding.EncodeToString([]byte(" => " + " host = " + localIP))
curl, _ := common.NewURL(fmt.Sprintf(factoryConsumerFormat, localIP))
r, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule))
r, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule), notify)
ret := r.Route(utils.ToBitmap(invokers), setUpAddrCache(invokers), curl, inv)
assert.Equal(t, 2, len(ret.ToArray()))
assert.Equal(t, invoker2, invokers[ret.ToArray()[0]])
......@@ -299,6 +339,11 @@ func TestRoute_Empty_HostFilter(t *testing.T) {
}
func TestRoute_False_HostFilter(t *testing.T) {
notify := make(chan struct{})
go func() {
for range notify {
}
}()
localIP := common.GetLocalIp()
url1, _ := common.NewURL(factory333URL)
url2, _ := common.NewURL(fmt.Sprintf(factoryDubboFormat, localIP))
......@@ -310,7 +355,7 @@ func TestRoute_False_HostFilter(t *testing.T) {
inv := &invocation.RPCInvocation{}
rule := base64.URLEncoding.EncodeToString([]byte("true => " + " host = " + localIP))
curl, _ := common.NewURL(fmt.Sprintf(factoryConsumerFormat, localIP))
r, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule))
r, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule), notify)
ret := r.Route(utils.ToBitmap(invokers), setUpAddrCache(invokers), curl, inv)
assert.Equal(t, 2, len(ret.ToArray()))
assert.Equal(t, invoker2, invokers[ret.ToArray()[0]])
......@@ -318,6 +363,11 @@ func TestRoute_False_HostFilter(t *testing.T) {
}
func TestRoute_Placeholder(t *testing.T) {
notify := make(chan struct{})
go func() {
for range notify {
}
}()
localIP := common.GetLocalIp()
url1, _ := common.NewURL(factory333URL)
url2, _ := common.NewURL(fmt.Sprintf(factoryDubboFormat, localIP))
......@@ -329,7 +379,7 @@ func TestRoute_Placeholder(t *testing.T) {
inv := &invocation.RPCInvocation{}
rule := base64.URLEncoding.EncodeToString([]byte("host = " + localIP + " => " + " host = $host"))
curl, _ := common.NewURL(fmt.Sprintf(factoryConsumerFormat, localIP))
r, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule))
r, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule), notify)
ret := r.Route(utils.ToBitmap(invokers), setUpAddrCache(invokers), curl, inv)
assert.Equal(t, 2, len(ret.ToArray()))
assert.Equal(t, invoker2, invokers[ret.ToArray()[0]])
......@@ -337,6 +387,11 @@ func TestRoute_Placeholder(t *testing.T) {
}
func TestRoute_NoForce(t *testing.T) {
notify := make(chan struct{})
go func() {
for range notify {
}
}()
localIP := common.GetLocalIp()
url1, _ := common.NewURL(factory333URL)
url2, _ := common.NewURL(fmt.Sprintf(factoryDubboFormat, localIP))
......@@ -348,12 +403,17 @@ func TestRoute_NoForce(t *testing.T) {
inv := &invocation.RPCInvocation{}
rule := base64.URLEncoding.EncodeToString([]byte(fmt.Sprintf(factoryHostIp1234Format, localIP)))
curl, _ := common.NewURL(fmt.Sprintf(factoryConsumerFormat, localIP))
r, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrlWithNoForce(rule))
r, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrlWithNoForce(rule), notify)
ret := r.Route(utils.ToBitmap(invokers), setUpAddrCache(invokers), curl, inv)
assert.Equal(t, len(invokers), len(ret.ToArray()))
}
func TestRoute_Force(t *testing.T) {
notify := make(chan struct{})
go func() {
for range notify {
}
}()
localIP := common.GetLocalIp()
url1, _ := common.NewURL(factory333URL)
url2, _ := common.NewURL(fmt.Sprintf(factoryDubboFormat, localIP))
......@@ -365,7 +425,7 @@ func TestRoute_Force(t *testing.T) {
inv := &invocation.RPCInvocation{}
rule := base64.URLEncoding.EncodeToString([]byte(fmt.Sprintf(factoryHostIp1234Format, localIP)))
curl, _ := common.NewURL(fmt.Sprintf(factoryConsumerFormat, localIP))
r, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrlWithForce(rule, "true"))
r, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrlWithForce(rule, "true"), notify)
fileredInvokers := r.Route(utils.ToBitmap(invokers), setUpAddrCache(invokers), curl, inv)
assert.Equal(t, 0, len(fileredInvokers.ToArray()))
}
......
......@@ -47,8 +47,9 @@ type listenableRouter struct {
conditionRouters []*ConditionRouter
routerRule *RouterRule
url *common.URL
//force bool
priority int64
force bool
priority int64
notify chan struct{}
}
// RouterRule Get RouterRule instance from listenableRouter
......@@ -56,7 +57,7 @@ func (l *listenableRouter) RouterRule() *RouterRule {
return l.routerRule
}
func newListenableRouter(url *common.URL, ruleKey string) (*AppRouter, error) {
func newListenableRouter(url *common.URL, ruleKey string, notify chan struct{}) (*AppRouter, error) {
if ruleKey == "" {
return nil, perrors.Errorf("NewListenableRouter ruleKey is nil, can't create Listenable router")
}
......@@ -64,6 +65,7 @@ func newListenableRouter(url *common.URL, ruleKey string) (*AppRouter, error) {
l.url = url
l.priority = listenableRouterDefaultPriority
l.notify = notify
routerKey := ruleKey + constant.ConditionRouterRuleSuffix
// add listener
......@@ -110,6 +112,9 @@ func (l *listenableRouter) Process(event *config_center.ConfigChangeEvent) {
return
}
l.generateConditions(routerRule)
go func() {
l.notify <- struct{}{}
}()
}
func (l *listenableRouter) generateConditions(rule *RouterRule) {
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment