Skip to content
Snippets Groups Projects
Commit e35f5e1f authored by LaurenceLiZhixin's avatar LaurenceLiZhixin
Browse files

fix: fix merge conflict

parents b27e77b7 b771ef24
No related branches found
No related tags found
No related merge requests found
Showing
with 375 additions and 116 deletions
<!-- Thanks for sending a pull request!
Read https://github.com/apache/dubbo-go/blob/master/contributing.md before commit pull request.
Read https://github.com/apache/dubbo-go/blob/master/contributing.md before commit pull request.
-->
**What this PR does**:
......
......@@ -35,7 +35,7 @@ jobs:
uses: actions/checkout@v2
- name: Cache dependencies
uses: actions/cache@v2
uses: actions/cache@v2.1.4
with:
# Cache
path: ~/go/pkg/mod
......@@ -69,9 +69,9 @@ jobs:
run: |
make verify
- name: Integrate Test
run: |
chmod +x integrate_test.sh && ./integrate_test.sh ${{github.event.pull_request.head.repo.full_name}} ${{github.event.pull_request.head.sha}}
# - name: Integrate Test
# run: |
# chmod +x integrate_test.sh && ./integrate_test.sh ${{github.event.pull_request.head.repo.full_name}} ${{github.event.pull_request.head.sha}}
- name: Post Coverage
run: bash <(curl -s https://codecov.io/bash)
......
# Release Notes
---
## 1.5.6
### New Features
- [Add dubbo-go-cli telnet tool](https://github.com/apache/dubbo-go/pull/818)
- [Add Prox ImplementFunc to allow override impl](https://github.com/apache/dubbo-go/pull/1019)
- [Add read configuration path from the command line when start](https://github.com/apache/dubbo-go/pull/1039)
### Enhancement
- [introduce ConfigPostProcessor extension](https://github.com/apache/dubbo-go/pull/943)
- [Impl extension of two urls comparison](https://github.com/apache/dubbo-go/pull/854)
- [using event-driven to let router send signal to notify channel](https://github.com/apache/dubbo-go/pull/976)
- [lint codes](https://github.com/apache/dubbo-go/pull/941)
### Bugfixes
- [Fix: generic struct2MapAll key of map keep type](https://github.com/apache/dubbo-go/pull/928)
- [Fix: when events empty, delete all the invokers](https://github.com/apache/dubbo-go/pull/758)
- [Fix: file service discovery run in windows](https://github.com/apache/dubbo-go/pull/932)
- [Fix: make metadata report work without serviceDiscovery](https://github.com/apache/dubbo-go/pull/948)
- [Fix: consumer invoker cache set nil after the ZK connection is lost](https://github.com/apache/dubbo-go/pull/985)
- [Fix: integration test in Github action](https://github.com/apache/dubbo-go/pull/1012)
- [Fix: etcd exit panic](https://github.com/apache/dubbo-go/pull/1013)
- [Fix: when connect to provider fail, will occur panic](https://github.com/apache/dubbo-go/pull/1021)
- [Fix: support getty send Length, when the data transfer failed](https://github.com/apache/dubbo-go/pull/1028)
Milestone: [https://github.com/apache/dubbo-go/milestone/7](https://github.com/apache/dubbo-go/milestone/7?closed=1)
## 1.5.5
### New Features
- [Add Address notification batch mode](https://github.com/apache/dubbo-go/pull/741)
- [Add dubbo-gen stream support](https://github.com/apache/dubbo-go/pull/794)
- [Add Change verify to Makefile](https://github.com/apache/dubbo-go/pull/831)
- [Add more automatic components](https://github.com/apache/dubbo-go/pull/832)
- [Add grpc max message size config](https://github.com/apache/dubbo-go/pull/824)
### Enhancement
- [when it need local ip, it will get it every time. We can get local ip once, and reused it](https://github.com/apache/dubbo-go/pull/807)
- [enhance client's connectivity](https://github.com/apache/dubbo-go/pull/800)
- [Imp: get local ip once and reused it](https://github.com/apache/dubbo-go/pull/808)
- [Remove unmeaning logic](https://github.com/apache/dubbo-go/pull/855)
### Bugfixes
- [Fix: nacos registry can not get namespaceId](https://github.com/apache/dubbo-go/pull/778) [@peaman](https://github.com/peaman)
- [Fix: url encode](https://github.com/apache/dubbo-go/pull/802)
- [Fix: try to fix too many files open error](https://github.com/apache/dubbo-go/pull/797)
- [Fix: refact heartbeat](https://github.com/apache/dubbo-go/pull/889)
- [Fix: router_config add &url to url](https://github.com/apache/dubbo-go/pull/910)
- [Fix: Router chain can not build immediately when started](https://github.com/apache/dubbo-go/pull/927)
- [Fix: client block until timeout when provider return with PackageResponse_Exception](https://github.com/apache/dubbo-go/pull/926)
- [Fix: URL.String() data race panic](https://github.com/apache/dubbo-go/pull/944)
- [Fix: generic "encode hessian.Object"](https://github.com/apache/dubbo-go/pull/945)
### Dependencies
- [Bump github.com/mitchellh/mapstructure from 1.2.3 to 1.3.3](https://github.com/apache/dubbo-go/pull/838)
- [Bump github.com/go-resty/resty/v2 from 2.1.0 to 2.3.0](https://github.com/apache/dubbo-go/pull/837)
- [Bump github.com/opentracing/opentracing-go from 1.1.0 to 1.2.0](https://github.com/apache/dubbo-go/pull/836)
- [Bump github.com/creasty/defaults from 1.3.0 to 1.5.1](https://github.com/apache/dubbo-go/pull/835)
- [Bump github.com/dubbogo/gost from 1.9.1 to 1.9.2](https://github.com/apache/dubbo-go/pull/834)
- [Bump github.com/zouyx/agollo/v3 from 3.4.4 to 3.4.5](https://github.com/apache/dubbo-go/pull/845)
- [Bump github.com/golang/mock from 1.3.1 to 1.4.4](https://github.com/apache/dubbo-go/pull/844)
- [Bump github.com/nacos-group/nacos-sdk-go from 1.0.0 to 1.0.1](https://github.com/apache/dubbo-go/pull/843)
- [Bump github.com/magiconair/properties from 1.8.1 to 1.8.4](https://github.com/apache/dubbo-go/pull/861)
- [Bump github.com/prometheus/client_golang from 1.1.0 to 1.8.0 ](https://github.com/apache/dubbo-go/pull/860)
- [Bump go.uber.org/atomic from 1.6.0 to 1.7.0](https://github.com/apache/dubbo-go/pull/859)
- [](https://github.com/apache/dubbo-go/pull/843)
Milestone: [https://github.com/apache/dubbo-go/milestone/5](https://github.com/apache/dubbo-go/milestone/5?closed=1)
## 1.4.5
### Bugfixes
......@@ -7,6 +75,51 @@
## 1.5.4
### Bugfixes
- [Fix etcd cluster reconnect](https://github.com/apache/dubbo-go/pull/828)
- [Fix zookeeper deadlock problem](https://github.com/apache/dubbo-go/pull/826)
- [Fix generic struct2MapAll](https://github.com/apache/dubbo-go/pull/822)
- [Fix Consumer panic when restart provider](https://github.com/apache/dubbo-go/pull/803)
- [Fix etcd can not registry](https://github.com/apache/dubbo-go/pull/819) [@lin-jianjun](https://github.com/lin-jianjun)
- [Fix cannot call go provider service when used by java dubbo 2.7.7 version](https://github.com/apache/dubbo-go/pull/815) [@jack15083](https://github.com/jack15083)
- [Fix go client quit abnormally when it connects java server](https://github.com/apache/dubbo-go/pull/820) [@wenxuwan](https://github.com/wenxuwan)
- [Fix sentinel windows issue](https://github.com/apache/dubbo-go/pull/821) [@louyuting](https://github.com/louyuting)
- [Fix metadata default port](https://github.com/apache/dubbo-go/pull/821) [@sanxun0325](https://github.com/sanxun0325)
- [Fix consul can not destory](https://github.com/apache/dubbo-go/pull/788) [@LaurenceLiZhixin](https://github.com/LaurenceLiZhixin)
Milestone: [https://github.com/apache/dubbo-go/milestone/6](https://github.com/apache/dubbo-go/milestone/6?closed=1)
## 1.5.3
### New Features
- [Add consul service discovery](https://github.com/apache/dubbo-go/pull/701) [@zhangshen023](https://github.com/zhangshen023)
- [Add File system service discovery](https://github.com/apache/dubbo-go/pull/732) [@DogBaoBao](https://github.com/DogBaoBao)
- [Migrate travis Ci to Github Actions](https://github.com/apache/dubbo-go/pull/752) [@sdttttt](https://github.com/sdttttt)
- [Add sentinel-golang flow control/circuit breaker](https://github.com/apache/dubbo-go/pull/748) [@louyuting](https://github.com/louyuting)
- [Add dubbo-go docs and blog into doc directory](https://github.com/apache/dubbo-go/pull/767) [@oaoit](https://github.com/oaoit)
### Enhancement
- [Add address notification batch mode](https://github.com/apache/dubbo-go/pull/741) [@beiwei30](https://github.com/beiwei30)
- [Refactor network and codec model](https://github.com/apache/dubbo-go/pull/673) [@fangyincheng](https://github.com/fangyincheng) [@georgehao](https://github.com/georgehao)
- [Remove unnecessary return and judgement](https://github.com/apache/dubbo-go/pull/730) [@YongHaoWu](https://github.com/YongHaoWu)
- [Improve exporter append method](https://github.com/apache/dubbo-go/pull/722) [@gaoxinge](https://github.com/gaoxinge)
- [Refactor for proxyInvoker cannot be extended](https://github.com/apache/dubbo-go/pull/747) [@cvictory](https://github.com/cvictory)
- [Refactor attachment type from map\[string\]stiring to map\[string\]interface{}](https://github.com/apache/dubbo-go/pull/713) [@cvictory](https://github.com/cvictory)
- [Improve map access concurrency](https://github.com/apache/dubbo-go/pull/739) [@skyao](https://github.com/skyao)
- [Improve code quantity](https://github.com/apache/dubbo-go/pull/763) [@gaoxinge](https://github.com/gaoxinge)
### Bugfixes
- [Fix etcdv3 lease](https://github.com/apache/dubbo-go/pull/738) [@zhangshen023](https://github.com/zhangshen023)
- [Fix rename SethealthChecker to SetHealthChecker](https://github.com/apache/dubbo-go/pull/746) [@watermelo](https://github.com/watermelo)
- [Fix init config problem in HystrixFilter](https://github.com/apache/dubbo-go/pull/731) [@YGrylls](https://github.com/YGrylls)
- [Fix zookeeper listener report error after started](https://github.com/apache/dubbo-go/pull/735) [@wenxuwan](https://github.com/wenxuwan)
Milestone: [https://github.com/apache/dubbo-go/milestone/4](https://github.com/apache/dubbo-go/milestone/4?closed=1)
Project: [https://github.com/apache/dubbo-go/projects/10](https://github.com/apache/dubbo-go/projects/10)
## 1.5.4
### Bugfixes
- [Fix etcd cluster reconnect](https://github.com/apache/dubbo-go/pull/828)
- [Fix zookeeper deadlock problem](https://github.com/apache/dubbo-go/pull/826)
......@@ -171,7 +284,7 @@ Project: [https://github.com/apache/dubbo-go/projects/8](https://github.com/apac
- [Add grpc protocol](https://github.com/apache/dubbo-go/pull/311)
### Enhancement
- [The SIGSYS and SIGSTOP are not supported in windows platform](https://github.com/apache/dubbo-go/pull/262)
- [Error should be returned when `NewURL` failed](https://github.com/apache/dubbo-go/pull/266)
- [Split config center GetConfig method](https://github.com/apache/dubbo-go/pull/267)
......@@ -181,7 +294,7 @@ Project: [https://github.com/apache/dubbo-go/projects/8](https://github.com/apac
- [Change zk version and add base_registry](https://github.com/apache/dubbo-go/pull/355)
### Bugfixes
- [Fix negative wait group count](https://github.com/apache/dubbo-go/pull/253)
- [After disconnection with ZK registry, cosumer can't listen to provider changes](https://github.com/apache/dubbo-go/pull/258)
- [The generic filter and default reference filters lack ','](https://github.com/apache/dubbo-go/pull/260)
......
# Apache Dubbo-go [中文](./README_CN.md) #
[![Build Status](https://travis-ci.org/apache/dubbo-go.svg?branch=master)](https://travis-ci.org/apache/dubbo-go)
[![Build Status](https://github.com/apache/dubbo-go/workflows/CI/badge.svg)](https://travis-ci.org/apache/dubbo-go)
[![codecov](https://codecov.io/gh/apache/dubbo-go/branch/master/graph/badge.svg)](https://codecov.io/gh/apache/dubbo-go)
[![go.dev reference](https://img.shields.io/badge/go.dev-reference-007d9c?logo=go&logoColor=white&style=flat-square)](https://pkg.go.dev/github.com/apache/dubbo-go?tab=doc)
[![Go Report Card](https://goreportcard.com/badge/github.com/apache/dubbo-go)](https://goreportcard.com/report/github.com/apache/dubbo-go)
......@@ -8,7 +8,7 @@
---
Apache Dubbo Go Implementation.
Apache Dubbo Go Implementation to bridge the gap between java and go.
## License
......@@ -45,7 +45,7 @@ Both extension module and layered project architecture is according to Apache Du
![dubbo go extend](./doc/pic/arch/dubbo-go-ext.png)
If you wanna know more about dubbo-go, please visit this reference [Project Architecture design](https://github.com/apache/dubbo-go/wiki/dubbo-go-V1.0-design)
If you want to know more about dubbo-go, please visit this reference [Project Architecture design](https://github.com/apache/dubbo-go/wiki/dubbo-go-V1.0-design)
## Feature list ##
......@@ -179,8 +179,21 @@ If you are willing to do some code contributions and document contributions to [
If u want to communicate with our community, pls scan the following [dubbobo Ding-Ding QR code](https://mmbiz.qpic.cn/mmbiz_jpg/yvBJb5IiafvnHVBdtia30dxA2hKotr9DEckWsZ7aOJcDWDaSVMGwLmYv8GRgIQtqb4C2svicp8nVkMmGy7yKC5tyA/640?wx_fmt=jpeg&tp=webp&wxfrom=5&wx_lazy=1&wx_co=1) or search our commnity DingDing group code 31363295.
<a href="http://alexstocks.github.io/html/dubbogo.html">
<img src="./doc/pic/misc/dubbogo-dingding.png" height="80" width="80"></a>
<div>
<table>
<tbody>
<tr></tr>
<tr>
<td align="center" valign="middle">
<a href="http://alexstocks.github.io/html/dubbogo.html" target="_blank">
<img width="80px" height="85px" src="./doc/pic/misc/dubbogo-dingding.png">
</a>
</td>
</tr>
<tr></tr>
</tbody>
</table>
</div>
If u want to visit the wechat group, pls add my wechat AlexanderStocks.
......@@ -194,7 +207,7 @@ About dubbo-go benchmarking report, please refer to [dubbo benchmarking report](
## [User List](https://github.com/apache/dubbo-go/issues/2)
If you are using [apache/dubbo-go](github.com/apache/dubbo-go) and think that it helps you or want do some contributions to it, please add your company to to [the user list](https://github.com/apache/dubbo-go/issues/2) to let us know your needs.
If you are using [apache/dubbo-go](https://github.com/apache/dubbo-go) and think that it helps you or want do some contributions to it, please add your company to to [the user list](https://github.com/apache/dubbo-go/issues/2) to let us know your needs.
<div>
......
# Apache Dubbo-go [English](./README.md) #
[![Build Status](https://travis-ci.org/apache/dubbo-go.svg?branch=master)](https://travis-ci.org/apache/dubbo-go)
[![Build Status](https://github.com/apache/dubbo-go/workflows/CI/badge.svg)](https://travis-ci.org/apache/dubbo-go)
[![codecov](https://codecov.io/gh/apache/dubbo-go/branch/master/graph/badge.svg)](https://codecov.io/gh/apache/dubbo-go)
[![go.dev reference](https://img.shields.io/badge/go.dev-reference-007d9c?logo=go&logoColor=white&style=flat-square)](https://pkg.go.dev/github.com/apache/dubbo-go?tab=doc)
[![Go Report Card](https://goreportcard.com/badge/github.com/apache/dubbo-go)](https://goreportcard.com/report/github.com/apache/dubbo-go)
![license](https://img.shields.io/badge/license-Apache--2.0-green.svg)
---
Apache Dubbo Go 语言实现
Apache Dubbo Go 语言实现,架起java和go之间的桥梁,与 gRPC/Spring Cloud 生态互联互通,带领Java生态享受云原生时代的技术红利。
## 证书 ##
......@@ -177,12 +178,25 @@ make test
如果想访问官方钉钉群,请在钉钉中搜索社区群号 31363295 或者 扫描如下[二维码](https://mmbiz.qpic.cn/mmbiz_jpg/yvBJb5IiafvnHVBdtia30dxA2hKotr9DEckWsZ7aOJcDWDaSVMGwLmYv8GRgIQtqb4C2svicp8nVkMmGy7yKC5tyA/640?wx_fmt=jpeg&tp=webp&wxfrom=5&wx_lazy=1&wx_co=1)
<a href="http://alexstocks.github.io/html/dubbogo.html">
<img src="./doc/pic/misc/dubbogo-dingding.png" height="80" width="80"></a>
<div>
<table>
<tbody>
<tr></tr>
<tr>
<td align="center" valign="middle">
<a href="http://alexstocks.github.io/html/dubbogo.html" target="_blank">
<img width="80px" height="85px" src="./doc/pic/misc/dubbogo-dingding.png">
</a>
</td>
</tr>
<tr></tr>
</tbody>
</table>
</div>
如果想加入到社区微信群,可以先添加社区负责人 于雨 的微信 AlexanderStocks 。
如果想加入到社区微信群,可以先添加社区负责人 于雨 的微信 AlexanderStocks 。添加微信之前,请先给 dubbo-go 点 star 作为对项目的支持,添加好友时请报上 github ID 以进行验证。
作为一个维护已经帮助构建了经受多家大型微服务系统的社区,我们足以为现有的成绩感到自豪。社区欢迎能提出建设性意见者。
作为一个维护已经帮助构建了经受多家大型微服务系统的社区,我们足以为现有的成绩感到自豪。社区欢迎能提出建设性意见者,只知索取者和喷子请绕行
## 性能测试 ##
......@@ -192,7 +206,7 @@ make test
## [User List](https://github.com/apache/dubbo-go/issues/2)
若你正在使用 [apache/dubbo-go](github.com/apache/dubbo-go) 且认为其有用或者想对其做改进,请添列贵司信息于 [用户列表](https://github.com/apache/dubbo-go/issues/2),以便我们知晓。
若你正在使用 [apache/dubbo-go](https://github.com/apache/dubbo-go) 且认为其有用或者想对其做改进,请添列贵司信息于 [用户列表](https://github.com/apache/dubbo-go/issues/2),以便我们知晓。
<div>
<table>
......
......@@ -121,41 +121,49 @@ func (invoker *baseClusterInvoker) doSelect(lb cluster.LoadBalance, invocation p
func (invoker *baseClusterInvoker) doSelectInvoker(lb cluster.LoadBalance, invocation protocol.Invocation, invokers []protocol.Invoker, invoked []protocol.Invoker) protocol.Invoker {
if len(invokers) == 0 {
logger.Errorf("the invokers of %s is nil. ", invocation.Invoker().GetUrl().ServiceKey())
return nil
}
go protocol.TryRefreshBlackList()
if len(invokers) == 1 {
return invokers[0]
if invokers[0].IsAvailable() {
return invokers[0]
}
protocol.SetInvokerUnhealthyStatus(invokers[0])
logger.Errorf("the invokers of %s is nil. ", invokers[0].GetUrl().ServiceKey())
return nil
}
selectedInvoker := lb.Select(invokers, invocation)
//judge to if the selectedInvoker is invoked
//judge if the selected Invoker is invoked and available
if (!selectedInvoker.IsAvailable() && invoker.availablecheck) || isInvoked(selectedInvoker, invoked) {
protocol.SetInvokerUnhealthyStatus(selectedInvoker)
otherInvokers := getOtherInvokers(invokers, selectedInvoker)
// do reselect
var reslectInvokers []protocol.Invoker
for _, invoker := range invokers {
if !invoker.IsAvailable() {
for i := 0; i < 3; i++ {
if len(otherInvokers) == 0 {
// no other ivk to reselect, return to fallback
break
}
reselectedInvoker := lb.Select(otherInvokers, invocation)
if isInvoked(reselectedInvoker, invoked) {
otherInvokers = getOtherInvokers(otherInvokers, reselectedInvoker)
continue
}
if !reselectedInvoker.IsAvailable() {
logger.Infof("the invoker of %s is not available, maybe some network error happened or the server is shutdown.",
invoker.GetUrl().Ip)
protocol.SetInvokerUnhealthyStatus(reselectedInvoker)
otherInvokers = getOtherInvokers(otherInvokers, reselectedInvoker)
continue
}
if !isInvoked(invoker, invoked) {
reslectInvokers = append(reslectInvokers, invoker)
}
}
if len(reslectInvokers) > 0 {
selectedInvoker = lb.Select(reslectInvokers, invocation)
} else {
logger.Errorf("all %d invokers is unavailable for %s.", len(invokers), selectedInvoker.GetUrl().String())
return nil
return reselectedInvoker
}
} else {
return selectedInvoker
}
return selectedInvoker
logger.Errorf("all %d invokers is unavailable for %s.", len(invokers), selectedInvoker.GetUrl().String())
return nil
}
func (invoker *baseClusterInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result {
......@@ -194,3 +202,13 @@ func getLoadBalance(invoker protocol.Invoker, invocation protocol.Invocation) cl
}
return extension.GetLoadbalance(lb)
}
func getOtherInvokers(invokers []protocol.Invoker, invoker protocol.Invoker) []protocol.Invoker {
otherInvokers := make([]protocol.Invoker, 0)
for _, i := range invokers {
if i != invoker {
otherInvokers = append(otherInvokers, i)
}
}
return otherInvokers
}
......@@ -72,6 +72,8 @@ func TestFailbackSuceess(t *testing.T) {
invoker.EXPECT().GetUrl().Return(failbackUrl).AnyTimes()
invoker.EXPECT().IsAvailable().Return(true)
mockResult := &protocol.RPCResult{Rest: rest{tried: 0, success: true}}
invoker.EXPECT().Invoke(gomock.Any()).Return(mockResult)
......@@ -88,6 +90,7 @@ func TestFailbackRetryOneSuccess(t *testing.T) {
clusterInvoker := registerFailback(invoker).(*failbackClusterInvoker)
invoker.EXPECT().GetUrl().Return(failbackUrl).AnyTimes()
invoker.EXPECT().IsAvailable().Return(true)
// failed at first
mockFailedResult := &protocol.RPCResult{Err: perrors.New("error")}
......@@ -98,6 +101,7 @@ func TestFailbackRetryOneSuccess(t *testing.T) {
wg.Add(1)
now := time.Now()
mockSuccResult := &protocol.RPCResult{Rest: rest{tried: 0, success: true}}
invoker.EXPECT().IsAvailable().Return(true)
invoker.EXPECT().Invoke(gomock.Any()).DoAndReturn(func(protocol.Invocation) protocol.Result {
delta := time.Since(now).Nanoseconds() / int64(time.Second)
assert.True(t, delta >= 5)
......@@ -131,6 +135,7 @@ func TestFailbackRetryFailed(t *testing.T) {
clusterInvoker := registerFailback(invoker).(*failbackClusterInvoker)
invoker.EXPECT().GetUrl().Return(failbackUrl).AnyTimes()
invoker.EXPECT().IsAvailable().Return(true).AnyTimes()
mockFailedResult := &protocol.RPCResult{Err: perrors.New("error")}
invoker.EXPECT().Invoke(gomock.Any()).Return(mockFailedResult)
......@@ -177,6 +182,7 @@ func TestFailbackRetryFailed10Times(t *testing.T) {
clusterInvoker := registerFailback(invoker).(*failbackClusterInvoker)
clusterInvoker.maxRetries = 10
invoker.EXPECT().IsAvailable().Return(true).AnyTimes()
invoker.EXPECT().GetUrl().Return(failbackUrl).AnyTimes()
// 10 task should failed firstly.
......@@ -220,6 +226,7 @@ func TestFailbackOutOfLimit(t *testing.T) {
clusterInvoker.failbackTasks = 1
invoker.EXPECT().GetUrl().Return(failbackUrl).AnyTimes()
invoker.EXPECT().IsAvailable().Return(true).AnyTimes()
mockFailedResult := &protocol.RPCResult{Err: perrors.New("error")}
invoker.EXPECT().Invoke(gomock.Any()).Return(mockFailedResult).Times(11)
......
......@@ -53,6 +53,7 @@ func registerFailfast(invoker *mock.MockInvoker) protocol.Invoker {
invokers := []protocol.Invoker{}
invokers = append(invokers, invoker)
invoker.EXPECT().IsAvailable().Return(true).AnyTimes()
invoker.EXPECT().GetUrl().Return(failfastUrl)
staticDir := directory.NewStaticDirectory(invokers)
......@@ -67,6 +68,7 @@ func TestFailfastInvokeSuccess(t *testing.T) {
invoker := mock.NewMockInvoker(ctrl)
clusterInvoker := registerFailfast(invoker)
invoker.EXPECT().IsAvailable().Return(true).AnyTimes()
invoker.EXPECT().GetUrl().Return(failfastUrl).AnyTimes()
mockResult := &protocol.RPCResult{Rest: rest{tried: 0, success: true}}
......@@ -87,6 +89,7 @@ func TestFailfastInvokeFail(t *testing.T) {
invoker := mock.NewMockInvoker(ctrl)
clusterInvoker := registerFailfast(invoker)
invoker.EXPECT().IsAvailable().Return(true).AnyTimes()
invoker.EXPECT().GetUrl().Return(failfastUrl).AnyTimes()
mockResult := &protocol.RPCResult{Err: perrors.New("error")}
......
......@@ -52,6 +52,7 @@ func registerFailsafe(invoker *mock.MockInvoker) protocol.Invoker {
invokers := []protocol.Invoker{}
invokers = append(invokers, invoker)
invoker.EXPECT().IsAvailable().Return(true).AnyTimes()
invoker.EXPECT().GetUrl().Return(failbackUrl)
......@@ -67,6 +68,8 @@ func TestFailSafeInvokeSuccess(t *testing.T) {
invoker := mock.NewMockInvoker(ctrl)
clusterInvoker := registerFailsafe(invoker)
invoker.EXPECT().IsAvailable().Return(true).AnyTimes()
invoker.EXPECT().GetUrl().Return(failsafeUrl).AnyTimes()
mockResult := &protocol.RPCResult{Rest: rest{tried: 0, success: true}}
......@@ -85,6 +88,7 @@ func TestFailSafeInvokeFail(t *testing.T) {
invoker := mock.NewMockInvoker(ctrl)
clusterInvoker := registerFailsafe(invoker)
invoker.EXPECT().IsAvailable().Return(true).AnyTimes()
invoker.EXPECT().GetUrl().Return(failsafeUrl).AnyTimes()
......
......@@ -82,6 +82,8 @@ func (dir *BaseDirectory) SetRouters(urls []*common.URL) {
routers := make([]router.PriorityRouter, 0, len(urls))
rc := dir.routerChain
for _, url := range urls {
routerKey := url.GetParam(constant.ROUTER_KEY, "")
......@@ -94,7 +96,7 @@ func (dir *BaseDirectory) SetRouters(urls []*common.URL) {
}
}
factory := extension.GetRouterFactory(url.Protocol)
r, err := factory.NewPriorityRouter(url)
r, err := factory.NewPriorityRouter(url, rc.GetNotifyChan())
if err != nil {
logger.Errorf("Create router fail. router key: %s, url:%s, error: %+v", routerKey, url.Service(), err)
return
......@@ -104,10 +106,8 @@ func (dir *BaseDirectory) SetRouters(urls []*common.URL) {
logger.Infof("Init file condition router success, size: %v", len(routers))
dir.mutex.Lock()
rc := dir.routerChain
dir.mutex.Unlock()
rc.AddRouters(routers)
dir.mutex.Unlock()
}
func (dir *BaseDirectory) isProperRouter(url *common.URL) bool {
......
......@@ -28,6 +28,7 @@ import (
)
import (
"github.com/apache/dubbo-go/cluster/router/chain"
_ "github.com/apache/dubbo-go/cluster/router/condition"
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
......@@ -50,7 +51,9 @@ func TestBuildRouterChain(t *testing.T) {
regURL := url
regURL.AddParam(constant.INTERFACE_KEY, "mock-app")
directory := NewBaseDirectory(regURL)
var err error
directory.routerChain, err = chain.NewRouterChain(regURL)
assert.Nil(t, err)
localIP := common.GetLocalIp()
rule := base64.URLEncoding.EncodeToString([]byte("true => " + " host = " + localIP))
routeURL := getRouteURL(rule, anyURL)
......
......@@ -29,4 +29,6 @@ type Chain interface {
SetInvokers([]protocol.Invoker)
// AddRouters Add routers
AddRouters([]PriorityRouter)
// GetNotifyChan get notify channel of this chain
GetNotifyChan() chan struct{}
}
......@@ -20,12 +20,12 @@ package chain
import (
"sort"
"sync"
"sync/atomic"
"time"
)
import (
perrors "github.com/pkg/errors"
"go.uber.org/atomic"
)
import (
......@@ -38,9 +38,7 @@ import (
)
const (
timeInterval = 5 * time.Second
timeThreshold = 2 * time.Second
countThreshold = 5
timeInterval = 5 * time.Second
)
// RouterChain Router chain
......@@ -65,8 +63,10 @@ type RouterChain struct {
notify chan struct{}
// Address cache
cache atomic.Value
// init
init sync.Once
}
func (c *RouterChain) GetNotifyChan() chan struct{} {
return c.notify
}
// Route Loop routers in RouterChain and call Route method to determine the target invokers list.
......@@ -104,6 +104,9 @@ func (c *RouterChain) AddRouters(routers []router.PriorityRouter) {
c.mutex.Lock()
defer c.mutex.Unlock()
c.routers = newRouters
go func() {
c.notify <- struct{}{}
}()
}
// SetInvokers receives updated invokers from registry center. If the times of notification exceeds countThreshold and
......@@ -113,32 +116,21 @@ func (c *RouterChain) SetInvokers(invokers []protocol.Invoker) {
c.invokers = invokers
c.mutex.Unlock()
// it should trigger init router for first call
c.init.Do(func() {
go func() {
c.notify <- struct{}{}
}()
})
c.count++
now := time.Now()
if c.count >= countThreshold && now.Sub(c.last) >= timeThreshold {
c.last = now
c.count = 0
go func() {
c.notify <- struct{}{}
}()
}
go func() {
c.notify <- struct{}{}
}()
}
// loop listens on events to update the address cache when it's necessary, either when it receives notification
// from address update, or when timeInterval exceeds.
// loop listens on events to update the address cache when it receives notification
// from address update,
func (c *RouterChain) loop() {
ticker := time.NewTicker(timeInterval)
for {
select {
case <-ticker.C:
c.buildCache()
if protocol.GetAndRefreshState() {
c.buildCache()
}
case <-c.notify:
c.buildCache()
}
......@@ -235,9 +227,15 @@ func NewRouterChain(url *common.URL) (*RouterChain, error) {
if len(routerFactories) == 0 {
return nil, perrors.Errorf("No routerFactory exits , create one please")
}
chain := &RouterChain{
last: time.Now(),
notify: make(chan struct{}),
}
routers := make([]router.PriorityRouter, 0, len(routerFactories))
for key, routerFactory := range routerFactories {
r, err := routerFactory().NewPriorityRouter(url)
r, err := routerFactory().NewPriorityRouter(url, chain.notify)
if r == nil || err != nil {
logger.Errorf("router chain build router fail! routerFactories key:%s error:%s", key, err.Error())
continue
......@@ -250,12 +248,10 @@ func NewRouterChain(url *common.URL) (*RouterChain, error) {
sortRouter(newRouters)
chain := &RouterChain{
builtinRouters: routers,
routers: newRouters,
last: time.Now(),
notify: make(chan struct{}),
}
routerNeedsUpdateInit := atomic.Bool{}
routerNeedsUpdateInit.Store(false)
chain.routers = newRouters
chain.builtinRouters = routers
if url != nil {
chain.url = url
}
......
......@@ -148,7 +148,12 @@ conditions:
url := getConditionRouteUrl(applicationKey)
assert.NotNil(t, url)
factory := extension.GetRouterFactory(url.Protocol)
r, err := factory.NewPriorityRouter(url)
notify := make(chan struct{})
go func() {
for range notify {
}
}()
r, err := factory.NewPriorityRouter(url, notify)
assert.Nil(t, err)
assert.NotNil(t, r)
......
......@@ -34,14 +34,15 @@ const (
// AppRouter For listen application router with config center
type AppRouter struct {
listenableRouter
notify interface{}
}
// NewAppRouter Init AppRouter by url
func NewAppRouter(url *common.URL) (*AppRouter, error) {
func NewAppRouter(url *common.URL, notify chan struct{}) (*AppRouter, error) {
if url == nil {
return nil, perrors.Errorf("No route URL for create app router!")
}
appRouter, err := newListenableRouter(url, url.GetParam(constant.APPLICATION_KEY, ""))
appRouter, err := newListenableRouter(url, url.GetParam(constant.APPLICATION_KEY, ""), notify)
if err != nil {
return nil, err
}
......
......@@ -81,7 +81,12 @@ conditions:
assert.NotNil(t, configuration)
appRouteURL := getAppRouteURL(routerKey)
appRouter, err := NewAppRouter(appRouteURL)
notify := make(chan struct{})
go func() {
for range notify {
}
}()
appRouter, err := NewAppRouter(appRouteURL, notify)
assert.Nil(t, err)
assert.NotNil(t, appRouter)
......@@ -109,7 +114,7 @@ force: true
runtime: false
conditions:
- => host != 172.22.3.91
- host = 192.168.199.208 => host = 192.168.199.208
- host = 192.168.199.208 => host = 192.168.199.208
`
ts, z, _, err := zookeeper.NewMockZookeeperClient("test", 15*time.Second)
assert.NoError(t, err)
......@@ -132,7 +137,12 @@ conditions:
assert.NotNil(t, configuration)
appRouteURL := getAppRouteURL(routerKey)
appRouter, err := NewAppRouter(appRouteURL)
notify := make(chan struct{})
go func() {
for range notify {
}
}()
appRouter, err := NewAppRouter(appRouteURL, notify)
assert.Nil(t, err)
assert.NotNil(t, appRouter)
......@@ -174,7 +184,12 @@ conditions:
assert.NotNil(t, configuration)
appRouteURL := getAppRouteURL(routerKey)
appRouter, err := NewAppRouter(appRouteURL)
notify := make(chan struct{})
go func() {
for range notify {
}
}()
appRouter, err := NewAppRouter(appRouteURL, notify)
assert.Nil(t, err)
assert.NotNil(t, appRouter)
......
......@@ -37,8 +37,8 @@ func newConditionRouterFactory() router.PriorityRouterFactory {
}
// NewPriorityRouter creates ConditionRouterFactory by URL
func (c *ConditionRouterFactory) NewPriorityRouter(url *common.URL) (router.PriorityRouter, error) {
return NewConditionRouter(url)
func (c *ConditionRouterFactory) NewPriorityRouter(url *common.URL, notify chan struct{}) (router.PriorityRouter, error) {
return NewConditionRouter(url, notify)
}
// NewRouter Create FileRouterFactory by Content
......@@ -54,6 +54,6 @@ func newAppRouterFactory() router.PriorityRouterFactory {
}
// NewPriorityRouter creates AppRouterFactory by URL
func (c *AppRouterFactory) NewPriorityRouter(url *common.URL) (router.PriorityRouter, error) {
return NewAppRouter(url)
func (c *AppRouterFactory) NewPriorityRouter(url *common.URL, notify chan struct{}) (router.PriorityRouter, error) {
return NewAppRouter(url, notify)
}
......@@ -132,37 +132,47 @@ func (bi *MockInvoker) Destroy() {
func TestRoute_matchWhen(t *testing.T) {
inv := &invocation.RPCInvocation{}
rule := base64.URLEncoding.EncodeToString([]byte("=> host = 1.2.3.4"))
router, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule))
notify := make(chan struct{})
go func() {
for range notify {
}
}()
router, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule), notify)
cUrl, _ := common.NewURL(fmt.Sprintf(factoryDubboFormat, factory1111Ip))
matchWhen := router.(*ConditionRouter).MatchWhen(cUrl, inv)
assert.Equal(t, true, matchWhen)
rule1 := base64.URLEncoding.EncodeToString([]byte("host = 2.2.2.2,1.1.1.1,3.3.3.3 => host = 1.2.3.4"))
router1, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule1))
router1, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule1), notify)
matchWhen1 := router1.(*ConditionRouter).MatchWhen(cUrl, inv)
assert.Equal(t, true, matchWhen1)
rule2 := base64.URLEncoding.EncodeToString([]byte("host = 2.2.2.2,1.1.1.1,3.3.3.3 & host !=1.1.1.1 => host = 1.2.3.4"))
router2, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule2))
router2, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule2), notify)
matchWhen2 := router2.(*ConditionRouter).MatchWhen(cUrl, inv)
assert.Equal(t, false, matchWhen2)
rule3 := base64.URLEncoding.EncodeToString([]byte("host !=4.4.4.4 & host = 2.2.2.2,1.1.1.1,3.3.3.3 => host = 1.2.3.4"))
router3, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule3))
router3, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule3), notify)
matchWhen3 := router3.(*ConditionRouter).MatchWhen(cUrl, inv)
assert.Equal(t, true, matchWhen3)
rule4 := base64.URLEncoding.EncodeToString([]byte("host !=4.4.4.* & host = 2.2.2.2,1.1.1.1,3.3.3.3 => host = 1.2.3.4"))
router4, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule4))
router4, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule4), notify)
matchWhen4 := router4.(*ConditionRouter).MatchWhen(cUrl, inv)
assert.Equal(t, true, matchWhen4)
rule5 := base64.URLEncoding.EncodeToString([]byte("host = 2.2.2.2,1.1.1.*,3.3.3.3 & host != 1.1.1.1 => host = 1.2.3.4"))
router5, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule5))
router5, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule5), notify)
matchWhen5 := router5.(*ConditionRouter).MatchWhen(cUrl, inv)
assert.Equal(t, false, matchWhen5)
rule6 := base64.URLEncoding.EncodeToString([]byte("host = 2.2.2.2,1.1.1.*,3.3.3.3 & host != 1.1.1.2 => host = 1.2.3.4"))
router6, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule6))
router6, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule6), notify)
matchWhen6 := router6.(*ConditionRouter).MatchWhen(cUrl, inv)
assert.Equal(t, true, matchWhen6)
}
func TestRoute_matchFilter(t *testing.T) {
notify := make(chan struct{})
go func() {
for range notify {
}
}()
localIP := common.GetLocalIp()
t.Logf("The local ip is %s", localIP)
url1, _ := common.NewURL("dubbo://10.20.3.3:20880/com.foo.BarService?default.serialization=fastjson")
......@@ -175,12 +185,12 @@ func TestRoute_matchFilter(t *testing.T) {
rule4 := base64.URLEncoding.EncodeToString([]byte("host = " + localIP + " => " + " host = 10.20.3.2,10.20.3.3,10.20.3.4"))
rule5 := base64.URLEncoding.EncodeToString([]byte("host = " + localIP + " => " + " host != 10.20.3.3"))
rule6 := base64.URLEncoding.EncodeToString([]byte("host = " + localIP + " => " + " serialization = fastjson"))
router1, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule1))
router2, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule2))
router3, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule3))
router4, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule4))
router5, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule5))
router6, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule6))
router1, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule1), notify)
router2, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule2), notify)
router3, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule3), notify)
router4, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule4), notify)
router5, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule5), notify)
router6, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule6), notify)
cUrl, _ := common.NewURL(fmt.Sprintf(factoryConsumerFormat, localIP))
ret1 := router1.Route(utils.ToBitmap(invokers), setUpAddrCache(invokers), cUrl, &invocation.RPCInvocation{})
ret2 := router2.Route(utils.ToBitmap(invokers), setUpAddrCache(invokers), cUrl, &invocation.RPCInvocation{})
......@@ -198,9 +208,14 @@ func TestRoute_matchFilter(t *testing.T) {
}
func TestRoute_methodRoute(t *testing.T) {
notify := make(chan struct{})
go func() {
for range notify {
}
}()
inv := invocation.NewRPCInvocationWithOptions(invocation.WithMethodName("getFoo"), invocation.WithParameterTypes([]reflect.Type{}), invocation.WithArguments([]interface{}{}))
rule := base64.URLEncoding.EncodeToString([]byte("host !=4.4.4.* & host = 2.2.2.2,1.1.1.1,3.3.3.3 => host = 1.2.3.4"))
r, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule))
r, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule), notify)
url, _ := common.NewURL("consumer://1.1.1.1/com.foo.BarService?methods=setFoo,getFoo,findFoo")
matchWhen := r.(*ConditionRouter).MatchWhen(url, inv)
assert.Equal(t, true, matchWhen)
......@@ -209,42 +224,57 @@ func TestRoute_methodRoute(t *testing.T) {
assert.Equal(t, true, matchWhen)
url2, _ := common.NewURL(fmt.Sprintf(factoryConsumerMethodFormat, factory1111Ip))
rule2 := base64.URLEncoding.EncodeToString([]byte("methods=getFoo & host!=1.1.1.1 => host = 1.2.3.4"))
router2, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule2))
router2, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule2), notify)
matchWhen = router2.(*ConditionRouter).MatchWhen(url2, inv)
assert.Equal(t, false, matchWhen)
url3, _ := common.NewURL(fmt.Sprintf(factoryConsumerMethodFormat, factory1111Ip))
rule3 := base64.URLEncoding.EncodeToString([]byte("methods=getFoo & host=1.1.1.1 => host = 1.2.3.4"))
router3, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule3))
router3, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule3), notify)
matchWhen = router3.(*ConditionRouter).MatchWhen(url3, inv)
assert.Equal(t, true, matchWhen)
}
func TestRoute_ReturnFalse(t *testing.T) {
notify := make(chan struct{})
go func() {
for range notify {
}
}()
url, _ := common.NewURL("")
localIP := common.GetLocalIp()
invokers := []protocol.Invoker{NewMockInvoker(url, 1), NewMockInvoker(url, 2), NewMockInvoker(url, 3)}
inv := &invocation.RPCInvocation{}
rule := base64.URLEncoding.EncodeToString([]byte("host = " + localIP + " => false"))
curl, _ := common.NewURL(fmt.Sprintf(factoryConsumerFormat, localIP))
r, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule))
r, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule), notify)
ret := r.Route(utils.ToBitmap(invokers), setUpAddrCache(invokers), curl, inv)
assert.Equal(t, 0, len(ret.ToArray()))
}
func TestRoute_ReturnEmpty(t *testing.T) {
notify := make(chan struct{})
go func() {
for range notify {
}
}()
localIP := common.GetLocalIp()
url, _ := common.NewURL("")
invokers := []protocol.Invoker{NewMockInvoker(url, 1), NewMockInvoker(url, 2), NewMockInvoker(url, 3)}
inv := &invocation.RPCInvocation{}
rule := base64.URLEncoding.EncodeToString([]byte("host = " + localIP + " => "))
curl, _ := common.NewURL(fmt.Sprintf(factoryConsumerFormat, localIP))
r, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule))
r, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule), notify)
ret := r.Route(utils.ToBitmap(invokers), setUpAddrCache(invokers), curl, inv)
assert.Equal(t, 0, len(ret.ToArray()))
}
func TestRoute_ReturnAll(t *testing.T) {
notify := make(chan struct{})
go func() {
for range notify {
}
}()
localIP := common.GetLocalIp()
urlString := "dubbo://" + localIP + "/com.foo.BarService"
dubboURL, _ := common.NewURL(urlString)
......@@ -255,7 +285,7 @@ func TestRoute_ReturnAll(t *testing.T) {
inv := &invocation.RPCInvocation{}
rule := base64.URLEncoding.EncodeToString([]byte("host = " + localIP + " => " + " host = " + localIP))
curl, _ := common.NewURL(fmt.Sprintf(factoryConsumerFormat, localIP))
r, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule))
r, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule), notify)
ret := r.Route(utils.ToBitmap(invokers), setUpAddrCache(invokers), curl, inv)
assert.Equal(t, len(invokers), len(ret.ToArray()))
}
......@@ -265,6 +295,11 @@ func TestRoute_HostFilter(t *testing.T) {
url1, _ := common.NewURL(factory333URL)
url2, _ := common.NewURL(fmt.Sprintf(factoryDubboFormat, localIP))
url3, _ := common.NewURL(fmt.Sprintf(factoryDubboFormat, localIP))
notify := make(chan struct{})
go func() {
for range notify {
}
}()
invoker1 := NewMockInvoker(url1, 1)
invoker2 := NewMockInvoker(url2, 2)
invoker3 := NewMockInvoker(url3, 3)
......@@ -272,7 +307,7 @@ func TestRoute_HostFilter(t *testing.T) {
inv := &invocation.RPCInvocation{}
rule := base64.URLEncoding.EncodeToString([]byte("host = " + localIP + " => " + " host = " + localIP))
curl, _ := common.NewURL(fmt.Sprintf(factoryConsumerFormat, localIP))
r, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule))
r, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule), notify)
ret := r.Route(utils.ToBitmap(invokers), setUpAddrCache(invokers), curl, inv)
assert.Equal(t, 2, len(ret.ToArray()))
assert.Equal(t, invoker2, invokers[ret.ToArray()[0]])
......@@ -280,6 +315,11 @@ func TestRoute_HostFilter(t *testing.T) {
}
func TestRoute_Empty_HostFilter(t *testing.T) {
notify := make(chan struct{})
go func() {
for range notify {
}
}()
localIP := common.GetLocalIp()
url1, _ := common.NewURL(factory333URL)
url2, _ := common.NewURL(fmt.Sprintf(factoryDubboFormat, localIP))
......@@ -291,7 +331,7 @@ func TestRoute_Empty_HostFilter(t *testing.T) {
inv := &invocation.RPCInvocation{}
rule := base64.URLEncoding.EncodeToString([]byte(" => " + " host = " + localIP))
curl, _ := common.NewURL(fmt.Sprintf(factoryConsumerFormat, localIP))
r, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule))
r, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule), notify)
ret := r.Route(utils.ToBitmap(invokers), setUpAddrCache(invokers), curl, inv)
assert.Equal(t, 2, len(ret.ToArray()))
assert.Equal(t, invoker2, invokers[ret.ToArray()[0]])
......@@ -299,6 +339,11 @@ func TestRoute_Empty_HostFilter(t *testing.T) {
}
func TestRoute_False_HostFilter(t *testing.T) {
notify := make(chan struct{})
go func() {
for range notify {
}
}()
localIP := common.GetLocalIp()
url1, _ := common.NewURL(factory333URL)
url2, _ := common.NewURL(fmt.Sprintf(factoryDubboFormat, localIP))
......@@ -310,7 +355,7 @@ func TestRoute_False_HostFilter(t *testing.T) {
inv := &invocation.RPCInvocation{}
rule := base64.URLEncoding.EncodeToString([]byte("true => " + " host = " + localIP))
curl, _ := common.NewURL(fmt.Sprintf(factoryConsumerFormat, localIP))
r, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule))
r, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule), notify)
ret := r.Route(utils.ToBitmap(invokers), setUpAddrCache(invokers), curl, inv)
assert.Equal(t, 2, len(ret.ToArray()))
assert.Equal(t, invoker2, invokers[ret.ToArray()[0]])
......@@ -318,6 +363,11 @@ func TestRoute_False_HostFilter(t *testing.T) {
}
func TestRoute_Placeholder(t *testing.T) {
notify := make(chan struct{})
go func() {
for range notify {
}
}()
localIP := common.GetLocalIp()
url1, _ := common.NewURL(factory333URL)
url2, _ := common.NewURL(fmt.Sprintf(factoryDubboFormat, localIP))
......@@ -329,7 +379,7 @@ func TestRoute_Placeholder(t *testing.T) {
inv := &invocation.RPCInvocation{}
rule := base64.URLEncoding.EncodeToString([]byte("host = " + localIP + " => " + " host = $host"))
curl, _ := common.NewURL(fmt.Sprintf(factoryConsumerFormat, localIP))
r, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule))
r, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule), notify)
ret := r.Route(utils.ToBitmap(invokers), setUpAddrCache(invokers), curl, inv)
assert.Equal(t, 2, len(ret.ToArray()))
assert.Equal(t, invoker2, invokers[ret.ToArray()[0]])
......@@ -337,6 +387,11 @@ func TestRoute_Placeholder(t *testing.T) {
}
func TestRoute_NoForce(t *testing.T) {
notify := make(chan struct{})
go func() {
for range notify {
}
}()
localIP := common.GetLocalIp()
url1, _ := common.NewURL(factory333URL)
url2, _ := common.NewURL(fmt.Sprintf(factoryDubboFormat, localIP))
......@@ -348,12 +403,17 @@ func TestRoute_NoForce(t *testing.T) {
inv := &invocation.RPCInvocation{}
rule := base64.URLEncoding.EncodeToString([]byte(fmt.Sprintf(factoryHostIp1234Format, localIP)))
curl, _ := common.NewURL(fmt.Sprintf(factoryConsumerFormat, localIP))
r, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrlWithNoForce(rule))
r, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrlWithNoForce(rule), notify)
ret := r.Route(utils.ToBitmap(invokers), setUpAddrCache(invokers), curl, inv)
assert.Equal(t, len(invokers), len(ret.ToArray()))
}
func TestRoute_Force(t *testing.T) {
notify := make(chan struct{})
go func() {
for range notify {
}
}()
localIP := common.GetLocalIp()
url1, _ := common.NewURL(factory333URL)
url2, _ := common.NewURL(fmt.Sprintf(factoryDubboFormat, localIP))
......@@ -365,7 +425,7 @@ func TestRoute_Force(t *testing.T) {
inv := &invocation.RPCInvocation{}
rule := base64.URLEncoding.EncodeToString([]byte(fmt.Sprintf(factoryHostIp1234Format, localIP)))
curl, _ := common.NewURL(fmt.Sprintf(factoryConsumerFormat, localIP))
r, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrlWithForce(rule, "true"))
r, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrlWithForce(rule, "true"), notify)
fileredInvokers := r.Route(utils.ToBitmap(invokers), setUpAddrCache(invokers), curl, inv)
assert.Equal(t, 0, len(fileredInvokers.ToArray()))
}
......
......@@ -47,8 +47,9 @@ type listenableRouter struct {
conditionRouters []*ConditionRouter
routerRule *RouterRule
url *common.URL
//force bool
priority int64
force bool
priority int64
notify chan struct{}
}
// RouterRule Get RouterRule instance from listenableRouter
......@@ -56,7 +57,7 @@ func (l *listenableRouter) RouterRule() *RouterRule {
return l.routerRule
}
func newListenableRouter(url *common.URL, ruleKey string) (*AppRouter, error) {
func newListenableRouter(url *common.URL, ruleKey string, notify chan struct{}) (*AppRouter, error) {
if ruleKey == "" {
return nil, perrors.Errorf("NewListenableRouter ruleKey is nil, can't create Listenable router")
}
......@@ -64,6 +65,7 @@ func newListenableRouter(url *common.URL, ruleKey string) (*AppRouter, error) {
l.url = url
l.priority = listenableRouterDefaultPriority
l.notify = notify
routerKey := ruleKey + constant.ConditionRouterRuleSuffix
// add listener
......@@ -110,6 +112,9 @@ func (l *listenableRouter) Process(event *config_center.ConfigChangeEvent) {
return
}
l.generateConditions(routerRule)
go func() {
l.notify <- struct{}{}
}()
}
func (l *listenableRouter) generateConditions(rule *RouterRule) {
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment