Skip to content
Snippets Groups Projects
Commit cc499c21 authored by 邹毅贤's avatar 邹毅贤
Browse files

Merge branch 'develop' into feature/addRegistryComment

# Conflicts:
#	registry/etcdv3/registry.go
parents 32df9e88 41879daf
No related branches found
No related tags found
No related merge requests found
Showing
with 162 additions and 133 deletions
......@@ -176,5 +176,5 @@ About dubbo-go benchmarking report, please refer to [dubbo benchmarking report](
If you are using [apache/dubbo-go](github.com/apache/dubbo-go) and think that it helps you or want do some contributions to it, please add your company to to [the user list](https://github.com/apache/dubbo-go/issues/2) to let us know your needs.
![ctrip](https://pic.c-ctrip.com/common/c_logo2013.png)
![Excellent Health Technology Group](https://raw.githubusercontent.com/dajiiu/photo/static/mirror/haozhuo_logo.png)
![Excellent Health Technology Group](https://user-images.githubusercontent.com/52339367/84628582-80512200-af1b-11ea-945a-c6b4b9ad31f2.png)
![tuya](https://raw.githubusercontent.com/pantianying/go-tool/master/picture/logo_2-removebg-preview.png)
......@@ -175,5 +175,5 @@ go test ./... -coverprofile=coverage.txt -covermode=atomic
若你正在使用 [apache/dubbo-go](github.com/apache/dubbo-go) 且认为其有用或者向对其做改进,请忝列贵司信息于 [用户列表](https://github.com/apache/dubbo-go/issues/2),以便我们知晓之。
![ctrip](https://pic.c-ctrip.com/common/c_logo2013.png)
![Excellent Health Technology Group](https://raw.githubusercontent.com/dajiiu/photo/static/mirror/haozhuo_logo.png)
![Excellent Health Technology Group](https://user-images.githubusercontent.com/52339367/84628582-80512200-af1b-11ea-945a-c6b4b9ad31f2.png)
![tuya](https://raw.githubusercontent.com/pantianying/go-tool/master/picture/logo_2-removebg-preview.png)
......@@ -20,7 +20,7 @@ set zkJarPath=remoting/zookeeper/zookeeper-4unittest/contrib/fatjar
set zkJar=%zkJarPath%/%zkJarName%
if not exist "%zkJar%" (
md %zkJarPath%
md "%zkJarPath%"
curl -L %remoteJarUrl% -o %zkJar%
)
......
......@@ -19,6 +19,7 @@ package cluster_impl
import (
"context"
"fmt"
"strings"
"testing"
)
......@@ -32,6 +33,7 @@ import (
"github.com/apache/dubbo-go/cluster/directory"
"github.com/apache/dubbo-go/cluster/loadbalance"
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/protocol"
"github.com/apache/dubbo-go/protocol/invocation"
......@@ -39,7 +41,8 @@ import (
)
var (
availableUrl, _ = common.NewURL("dubbo://192.168.1.1:20000/com.ikurento.user.UserProvider")
availableUrl, _ = common.NewURL(fmt.Sprintf("dubbo://%s:%d/com.ikurento.user.UserProvider",
constant.LOCAL_HOST_VALUE, constant.DEFAULT_PORT))
)
func registerAvailable(invoker *mock.MockInvoker) protocol.Invoker {
......
......@@ -88,6 +88,10 @@ func (invoker *baseClusterInvoker) checkWhetherDestroyed() error {
func (invoker *baseClusterInvoker) doSelect(lb cluster.LoadBalance, invocation protocol.Invocation, invokers []protocol.Invoker, invoked []protocol.Invoker) protocol.Invoker {
var selectedInvoker protocol.Invoker
if len(invokers) <= 0 {
return selectedInvoker
}
url := invokers[0].GetUrl()
sticky := url.GetParamBool(constant.STICKY_KEY, false)
//Get the service method sticky config if have
......@@ -97,19 +101,17 @@ func (invoker *baseClusterInvoker) doSelect(lb cluster.LoadBalance, invocation p
invoker.stickyInvoker = nil
}
if sticky && invoker.stickyInvoker != nil && (invoked == nil || !isInvoked(invoker.stickyInvoker, invoked)) {
if invoker.availablecheck && invoker.stickyInvoker.IsAvailable() {
return invoker.stickyInvoker
}
if sticky && invoker.availablecheck &&
invoker.stickyInvoker != nil && invoker.stickyInvoker.IsAvailable() &&
(invoked == nil || !isInvoked(invoker.stickyInvoker, invoked)) {
return invoker.stickyInvoker
}
selectedInvoker = invoker.doSelectInvoker(lb, invocation, invokers, invoked)
if sticky {
invoker.stickyInvoker = selectedInvoker
}
return selectedInvoker
}
func (invoker *baseClusterInvoker) doSelectInvoker(lb cluster.LoadBalance, invocation protocol.Invocation, invokers []protocol.Invoker, invoked []protocol.Invoker) protocol.Invoker {
......
......@@ -33,7 +33,7 @@ import (
"github.com/apache/dubbo-go/protocol/invocation"
)
func Test_StickyNormal(t *testing.T) {
func TestStickyNormal(t *testing.T) {
invokers := []protocol.Invoker{}
for i := 0; i < 10; i++ {
url, _ := common.NewURL(fmt.Sprintf("dubbo://192.168.1.%v:20000/com.ikurento.user.UserProvider", i))
......@@ -43,12 +43,15 @@ func Test_StickyNormal(t *testing.T) {
base := &baseClusterInvoker{}
base.availablecheck = true
invoked := []protocol.Invoker{}
result := base.doSelect(loadbalance.NewRandomLoadBalance(), invocation.NewRPCInvocation("getUser", nil, nil), invokers, invoked)
result1 := base.doSelect(loadbalance.NewRandomLoadBalance(), invocation.NewRPCInvocation("getUser", nil, nil), invokers, invoked)
tmpRandomBalance := loadbalance.NewRandomLoadBalance()
tmpInvocation := invocation.NewRPCInvocation("getUser", nil, nil)
result := base.doSelect(tmpRandomBalance, tmpInvocation, invokers, invoked)
result1 := base.doSelect(tmpRandomBalance, tmpInvocation, invokers, invoked)
assert.Equal(t, result, result1)
}
func Test_StickyNormalWhenError(t *testing.T) {
func TestStickyNormalWhenError(t *testing.T) {
invokers := []protocol.Invoker{}
for i := 0; i < 10; i++ {
url, _ := common.NewURL(fmt.Sprintf("dubbo://192.168.1.%v:20000/com.ikurento.user.UserProvider", i))
......
......@@ -20,6 +20,7 @@ package cluster_impl
import (
"context"
"errors"
"fmt"
"testing"
)
......@@ -32,6 +33,7 @@ import (
"github.com/apache/dubbo-go/cluster/directory"
"github.com/apache/dubbo-go/cluster/loadbalance"
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/protocol"
"github.com/apache/dubbo-go/protocol/invocation"
......@@ -39,10 +41,11 @@ import (
)
var (
broadcastUrl, _ = common.NewURL("dubbo://192.168.1.1:20000/com.ikurento.user.UserProvider")
broadcastUrl, _ = common.NewURL(
fmt.Sprintf("dubbo://%s:%d/com.ikurento.user.UserProvider", constant.LOCAL_HOST_VALUE, constant.DEFAULT_PORT))
)
func registerBroadcast(t *testing.T, mockInvokers ...*mock.MockInvoker) protocol.Invoker {
func registerBroadcast(mockInvokers ...*mock.MockInvoker) protocol.Invoker {
extension.SetLoadbalance("random", loadbalance.NewRandomLoadBalance)
invokers := []protocol.Invoker{}
......@@ -59,7 +62,7 @@ func registerBroadcast(t *testing.T, mockInvokers ...*mock.MockInvoker) protocol
return clusterInvoker
}
func Test_BroadcastInvokeSuccess(t *testing.T) {
func TestBroadcastInvokeSuccess(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
......@@ -72,13 +75,13 @@ func Test_BroadcastInvokeSuccess(t *testing.T) {
invoker.EXPECT().Invoke(gomock.Any()).Return(mockResult)
}
clusterInvoker := registerBroadcast(t, invokers...)
clusterInvoker := registerBroadcast(invokers...)
result := clusterInvoker.Invoke(context.Background(), &invocation.RPCInvocation{})
assert.Equal(t, mockResult, result)
}
func Test_BroadcastInvokeFailed(t *testing.T) {
func TestBroadcastInvokeFailed(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
......@@ -102,7 +105,7 @@ func Test_BroadcastInvokeFailed(t *testing.T) {
invoker.EXPECT().Invoke(gomock.Any()).Return(mockResult)
}
clusterInvoker := registerBroadcast(t, invokers...)
clusterInvoker := registerBroadcast(invokers...)
result := clusterInvoker.Invoke(context.Background(), &invocation.RPCInvocation{})
assert.Equal(t, mockFailedResult.Err, result.Error())
......
......@@ -19,6 +19,7 @@ package cluster_impl
import (
"context"
"fmt"
"sync"
"testing"
"time"
......@@ -34,6 +35,7 @@ import (
"github.com/apache/dubbo-go/cluster/directory"
"github.com/apache/dubbo-go/cluster/loadbalance"
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/protocol"
"github.com/apache/dubbo-go/protocol/invocation"
......@@ -41,11 +43,12 @@ import (
)
var (
failbackUrl, _ = common.NewURL("dubbo://192.168.1.1:20000/com.ikurento.user.UserProvider")
failbackUrl, _ = common.NewURL(
fmt.Sprintf("dubbo://%s:%d/com.ikurento.user.UserProvider", constant.LOCAL_HOST_VALUE, constant.DEFAULT_PORT))
)
// registerFailback register failbackCluster to cluster extension.
func registerFailback(t *testing.T, invoker *mock.MockInvoker) protocol.Invoker {
func registerFailback(invoker *mock.MockInvoker) protocol.Invoker {
extension.SetLoadbalance("random", loadbalance.NewRandomLoadBalance)
failbackCluster := NewFailbackCluster()
......@@ -60,12 +63,12 @@ func registerFailback(t *testing.T, invoker *mock.MockInvoker) protocol.Invoker
}
// success firstly, failback should return origin invoke result.
func Test_FailbackSuceess(t *testing.T) {
func TestFailbackSuceess(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
invoker := mock.NewMockInvoker(ctrl)
clusterInvoker := registerFailback(t, invoker).(*failbackClusterInvoker)
clusterInvoker := registerFailback(invoker).(*failbackClusterInvoker)
invoker.EXPECT().GetUrl().Return(failbackUrl).AnyTimes()
......@@ -77,12 +80,12 @@ func Test_FailbackSuceess(t *testing.T) {
}
// failed firstly, success later after one retry.
func Test_FailbackRetryOneSuccess(t *testing.T) {
func TestFailbackRetryOneSuccess(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
invoker := mock.NewMockInvoker(ctrl)
clusterInvoker := registerFailback(t, invoker).(*failbackClusterInvoker)
clusterInvoker := registerFailback(invoker).(*failbackClusterInvoker)
invoker.EXPECT().GetUrl().Return(failbackUrl).AnyTimes()
......@@ -95,7 +98,7 @@ func Test_FailbackRetryOneSuccess(t *testing.T) {
wg.Add(1)
now := time.Now()
mockSuccResult := &protocol.RPCResult{Rest: rest{tried: 0, success: true}}
invoker.EXPECT().Invoke(gomock.Any()).DoAndReturn(func(invocation protocol.Invocation) protocol.Result {
invoker.EXPECT().Invoke(gomock.Any()).DoAndReturn(func(protocol.Invocation) protocol.Result {
delta := time.Since(now).Nanoseconds() / int64(time.Second)
assert.True(t, delta >= 5)
wg.Done()
......@@ -120,12 +123,12 @@ func Test_FailbackRetryOneSuccess(t *testing.T) {
}
// failed firstly, and failed again after ech retry time.
func Test_FailbackRetryFailed(t *testing.T) {
func TestFailbackRetryFailed(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
invoker := mock.NewMockInvoker(ctrl)
clusterInvoker := registerFailback(t, invoker).(*failbackClusterInvoker)
clusterInvoker := registerFailback(invoker).(*failbackClusterInvoker)
invoker.EXPECT().GetUrl().Return(failbackUrl).AnyTimes()
......@@ -141,7 +144,7 @@ func Test_FailbackRetryFailed(t *testing.T) {
// add retry call that eventually failed.
for i := 0; i < retries; i++ {
j := i + 1
invoker.EXPECT().Invoke(gomock.Any()).DoAndReturn(func(invocation protocol.Invocation) protocol.Result {
invoker.EXPECT().Invoke(gomock.Any()).DoAndReturn(func(protocol.Invocation) protocol.Result {
delta := time.Since(now).Nanoseconds() / int64(time.Second)
assert.True(t, delta >= int64(5*j))
wg.Done()
......@@ -166,12 +169,12 @@ func Test_FailbackRetryFailed(t *testing.T) {
}
// add 10 tasks but all failed firstly, and failed again with one retry.
func Test_FailbackRetryFailed10Times(t *testing.T) {
func TestFailbackRetryFailed10Times(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
invoker := mock.NewMockInvoker(ctrl)
clusterInvoker := registerFailback(t, invoker).(*failbackClusterInvoker)
clusterInvoker := registerFailback(invoker).(*failbackClusterInvoker)
clusterInvoker.maxRetries = 10
invoker.EXPECT().GetUrl().Return(failbackUrl).AnyTimes()
......@@ -184,7 +187,7 @@ func Test_FailbackRetryFailed10Times(t *testing.T) {
var wg sync.WaitGroup
wg.Add(10)
now := time.Now()
invoker.EXPECT().Invoke(gomock.Any()).DoAndReturn(func(invocation protocol.Invocation) protocol.Result {
invoker.EXPECT().Invoke(gomock.Any()).DoAndReturn(func(protocol.Invocation) protocol.Result {
delta := time.Since(now).Nanoseconds() / int64(time.Second)
assert.True(t, delta >= 5)
wg.Done()
......@@ -208,12 +211,12 @@ func Test_FailbackRetryFailed10Times(t *testing.T) {
assert.Equal(t, int64(0), clusterInvoker.taskList.Len())
}
func Test_FailbackOutOfLimit(t *testing.T) {
func TestFailbackOutOfLimit(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
invoker := mock.NewMockInvoker(ctrl)
clusterInvoker := registerFailback(t, invoker).(*failbackClusterInvoker)
clusterInvoker := registerFailback(invoker).(*failbackClusterInvoker)
clusterInvoker.failbackTasks = 1
invoker.EXPECT().GetUrl().Return(failbackUrl).AnyTimes()
......
......@@ -19,6 +19,7 @@ package cluster_impl
import (
"context"
"fmt"
"testing"
)
......@@ -32,6 +33,7 @@ import (
"github.com/apache/dubbo-go/cluster/directory"
"github.com/apache/dubbo-go/cluster/loadbalance"
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/protocol"
"github.com/apache/dubbo-go/protocol/invocation"
......@@ -39,11 +41,12 @@ import (
)
var (
failfastUrl, _ = common.NewURL("dubbo://192.168.1.1:20000/com.ikurento.user.UserProvider")
failfastUrl, _ = common.NewURL(
fmt.Sprintf("dubbo://%s:%d/com.ikurento.user.UserProvider", constant.LOCAL_HOST_VALUE, constant.DEFAULT_PORT))
)
// registerFailfast register failfastCluster to cluster extension.
func registerFailfast(t *testing.T, invoker *mock.MockInvoker) protocol.Invoker {
func registerFailfast(invoker *mock.MockInvoker) protocol.Invoker {
extension.SetLoadbalance("random", loadbalance.NewRandomLoadBalance)
failfastCluster := NewFailFastCluster()
......@@ -57,12 +60,12 @@ func registerFailfast(t *testing.T, invoker *mock.MockInvoker) protocol.Invoker
return clusterInvoker
}
func Test_FailfastInvokeSuccess(t *testing.T) {
func TestFailfastInvokeSuccess(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
invoker := mock.NewMockInvoker(ctrl)
clusterInvoker := registerFailfast(t, invoker)
clusterInvoker := registerFailfast(invoker)
invoker.EXPECT().GetUrl().Return(failfastUrl).AnyTimes()
......@@ -77,12 +80,12 @@ func Test_FailfastInvokeSuccess(t *testing.T) {
assert.Equal(t, 0, res.tried)
}
func Test_FailfastInvokeFail(t *testing.T) {
func TestFailfastInvokeFail(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
invoker := mock.NewMockInvoker(ctrl)
clusterInvoker := registerFailfast(t, invoker)
clusterInvoker := registerFailfast(invoker)
invoker.EXPECT().GetUrl().Return(failfastUrl).AnyTimes()
......
......@@ -45,52 +45,35 @@ func newFailoverClusterInvoker(directory cluster.Directory) protocol.Invoker {
}
func (invoker *failoverClusterInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result {
var (
result protocol.Result
invoked []protocol.Invoker
providers []string
)
invokers := invoker.directory.List(invocation)
err := invoker.checkInvokers(invokers, invocation)
if err != nil {
if err := invoker.checkInvokers(invokers, invocation); err != nil {
return &protocol.RPCResult{Err: err}
}
loadbalance := getLoadBalance(invokers[0], invocation)
methodName := invocation.MethodName()
url := invokers[0].GetUrl()
//get reties
retriesConfig := url.GetParam(constant.RETRIES_KEY, constant.DEFAULT_RETRIES)
retries := getRetries(invokers, methodName)
loadBalance := getLoadBalance(invokers[0], invocation)
//Get the service method loadbalance config if have
if v := url.GetMethodParam(methodName, constant.RETRIES_KEY, ""); len(v) != 0 {
retriesConfig = v
}
retries, err := strconv.Atoi(retriesConfig)
if err != nil || retries < 0 {
logger.Error("Your retries config is invalid,pls do a check. And will use the default retries configuration instead.")
retries = constant.DEFAULT_RETRIES_INT
}
invoked := []protocol.Invoker{}
providers := []string{}
var result protocol.Result
if retries > len(invokers) {
retries = len(invokers)
}
for i := 0; i <= retries; i++ {
//Reselect before retry to avoid a change of candidate `invokers`.
//NOTE: if `invokers` changed, then `invoked` also lose accuracy.
if i > 0 {
err := invoker.checkWhetherDestroyed()
if err != nil {
if err := invoker.checkWhetherDestroyed(); err != nil {
return &protocol.RPCResult{Err: err}
}
invokers = invoker.directory.List(invocation)
err = invoker.checkInvokers(invokers, invocation)
if err != nil {
if err := invoker.checkInvokers(invokers, invocation); err != nil {
return &protocol.RPCResult{Err: err}
}
}
ivk := invoker.doSelect(loadbalance, invocation, invokers, invoked)
ivk := invoker.doSelect(loadBalance, invocation, invokers, invoked)
if ivk == nil {
continue
}
......@@ -100,13 +83,40 @@ func (invoker *failoverClusterInvoker) Invoke(ctx context.Context, invocation pr
if result.Error() != nil {
providers = append(providers, ivk.GetUrl().Key())
continue
} else {
return result
}
return result
}
ip, _ := gxnet.GetLocalIP()
return &protocol.RPCResult{Err: perrors.Errorf("Failed to invoke the method %v in the service %v. Tried %v times of "+
"the providers %v (%v/%v)from the registry %v on the consumer %v using the dubbo version %v. Last error is %v.",
methodName, invoker.GetUrl().Service(), retries, providers, len(providers), len(invokers), invoker.directory.GetUrl(), ip, constant.Version, result.Error().Error(),
)}
invokerSvc := invoker.GetUrl().Service()
invokerUrl := invoker.directory.GetUrl()
return &protocol.RPCResult{
Err: perrors.Errorf("Failed to invoke the method %v in the service %v. Tried %v times of the providers %v (%v/%v)from the registry %v on the consumer %v using the dubbo version %v. Last error is %v.",
methodName, invokerSvc, retries, providers, len(providers), len(invokers), invokerUrl, ip, constant.Version, result.Error().Error(),
)}
}
func getRetries(invokers []protocol.Invoker, methodName string) int {
if len(invokers) <= 0 {
return constant.DEFAULT_RETRIES_INT
}
url := invokers[0].GetUrl()
//get reties
retriesConfig := url.GetParam(constant.RETRIES_KEY, constant.DEFAULT_RETRIES)
//Get the service method loadbalance config if have
if v := url.GetMethodParam(methodName, constant.RETRIES_KEY, ""); len(v) != 0 {
retriesConfig = v
}
retries, err := strconv.Atoi(retriesConfig)
if err != nil || retries < 0 {
logger.Error("Your retries config is invalid,pls do a check. And will use the default retries configuration instead.")
retries = constant.DEFAULT_RETRIES_INT
}
if retries > len(invokers) {
retries = len(invokers)
}
return retries
}
......@@ -101,7 +101,7 @@ func (bi *MockInvoker) Destroy() {
var count int
func normalInvoke(t *testing.T, successCount int, urlParam url.Values, invocations ...*invocation.RPCInvocation) protocol.Result {
func normalInvoke(successCount int, urlParam url.Values, invocations ...*invocation.RPCInvocation) protocol.Result {
extension.SetLoadbalance("random", loadbalance.NewRandomLoadBalance)
failoverCluster := NewFailoverCluster()
......@@ -119,40 +119,40 @@ func normalInvoke(t *testing.T, successCount int, urlParam url.Values, invocatio
return clusterInvoker.Invoke(context.Background(), &invocation.RPCInvocation{})
}
func Test_FailoverInvokeSuccess(t *testing.T) {
func TestFailoverInvokeSuccess(t *testing.T) {
urlParams := url.Values{}
result := normalInvoke(t, 3, urlParams)
result := normalInvoke(3, urlParams)
assert.NoError(t, result.Error())
count = 0
}
func Test_FailoverInvokeFail(t *testing.T) {
func TestFailoverInvokeFail(t *testing.T) {
urlParams := url.Values{}
result := normalInvoke(t, 4, urlParams)
result := normalInvoke(4, urlParams)
assert.Errorf(t, result.Error(), "error")
count = 0
}
func Test_FailoverInvoke1(t *testing.T) {
func TestFailoverInvoke1(t *testing.T) {
urlParams := url.Values{}
urlParams.Set(constant.RETRIES_KEY, "3")
result := normalInvoke(t, 4, urlParams)
result := normalInvoke(4, urlParams)
assert.NoError(t, result.Error())
count = 0
}
func Test_FailoverInvoke2(t *testing.T) {
func TestFailoverInvoke2(t *testing.T) {
urlParams := url.Values{}
urlParams.Set(constant.RETRIES_KEY, "2")
urlParams.Set("methods.test."+constant.RETRIES_KEY, "3")
ivc := invocation.NewRPCInvocationWithOptions(invocation.WithMethodName("test"))
result := normalInvoke(t, 4, urlParams, ivc)
result := normalInvoke(4, urlParams, ivc)
assert.NoError(t, result.Error())
count = 0
}
func Test_FailoverDestroy(t *testing.T) {
func TestFailoverDestroy(t *testing.T) {
extension.SetLoadbalance("random", loadbalance.NewRandomLoadBalance)
failoverCluster := NewFailoverCluster()
......@@ -170,5 +170,4 @@ func Test_FailoverDestroy(t *testing.T) {
count = 0
clusterInvoker.Destroy()
assert.Equal(t, false, clusterInvoker.IsAvailable())
}
......@@ -19,6 +19,7 @@ package cluster_impl
import (
"context"
"fmt"
"testing"
)
......@@ -32,6 +33,7 @@ import (
"github.com/apache/dubbo-go/cluster/directory"
"github.com/apache/dubbo-go/cluster/loadbalance"
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/protocol"
"github.com/apache/dubbo-go/protocol/invocation"
......@@ -39,11 +41,12 @@ import (
)
var (
failsafeUrl, _ = common.NewURL("dubbo://192.168.1.1:20000/com.ikurento.user.UserProvider")
failsafeUrl, _ = common.NewURL(
fmt.Sprintf("dubbo://%s:%d/com.ikurento.user.UserProvider", constant.LOCAL_HOST_VALUE, constant.DEFAULT_PORT))
)
// registerFailsafe register failsafeCluster to cluster extension.
func registerFailsafe(t *testing.T, invoker *mock.MockInvoker) protocol.Invoker {
func registerFailsafe(invoker *mock.MockInvoker) protocol.Invoker {
extension.SetLoadbalance("random", loadbalance.NewRandomLoadBalance)
failsafeCluster := NewFailsafeCluster()
......@@ -57,12 +60,12 @@ func registerFailsafe(t *testing.T, invoker *mock.MockInvoker) protocol.Invoker
return clusterInvoker
}
func Test_FailSafeInvokeSuccess(t *testing.T) {
func TestFailSafeInvokeSuccess(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
invoker := mock.NewMockInvoker(ctrl)
clusterInvoker := registerFailsafe(t, invoker)
clusterInvoker := registerFailsafe(invoker)
invoker.EXPECT().GetUrl().Return(failsafeUrl).AnyTimes()
......@@ -76,12 +79,12 @@ func Test_FailSafeInvokeSuccess(t *testing.T) {
assert.True(t, res.success)
}
func Test_FailSafeInvokeFail(t *testing.T) {
func TestFailSafeInvokeFail(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
invoker := mock.NewMockInvoker(ctrl)
clusterInvoker := registerFailsafe(t, invoker)
clusterInvoker := registerFailsafe(invoker)
invoker.EXPECT().GetUrl().Return(failsafeUrl).AnyTimes()
......
......@@ -46,14 +46,12 @@ func newForkingClusterInvoker(directory cluster.Directory) protocol.Invoker {
// Invoke ...
func (invoker *forkingClusterInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result {
err := invoker.checkWhetherDestroyed()
if err != nil {
if err := invoker.checkWhetherDestroyed(); err != nil {
return &protocol.RPCResult{Err: err}
}
invokers := invoker.directory.List(invocation)
err = invoker.checkInvokers(invokers, invocation)
if err != nil {
if err := invoker.checkInvokers(invokers, invocation); err != nil {
return &protocol.RPCResult{Err: err}
}
......@@ -63,11 +61,9 @@ func (invoker *forkingClusterInvoker) Invoke(ctx context.Context, invocation pro
if forks < 0 || forks > len(invokers) {
selected = invokers
} else {
selected = make([]protocol.Invoker, 0)
loadbalance := getLoadBalance(invokers[0], invocation)
loadBalance := getLoadBalance(invokers[0], invocation)
for i := 0; i < forks; i++ {
ivk := invoker.doSelect(loadbalance, invocation, invokers, selected)
if ivk != nil {
if ivk := invoker.doSelect(loadBalance, invocation, invokers, selected); ivk != nil {
selected = append(selected, ivk)
}
}
......@@ -77,8 +73,7 @@ func (invoker *forkingClusterInvoker) Invoke(ctx context.Context, invocation pro
for _, ivk := range selected {
go func(k protocol.Invoker) {
result := k.Invoke(ctx, invocation)
err := resultQ.Put(result)
if err != nil {
if err := resultQ.Put(result); err != nil {
logger.Errorf("resultQ put failed with exception: %v.\n", err)
}
}(ivk)
......@@ -99,6 +94,5 @@ func (invoker *forkingClusterInvoker) Invoke(ctx context.Context, invocation pro
if !ok {
return &protocol.RPCResult{Err: fmt.Errorf("failed to forking invoke provider %v, but not legal resp", selected)}
}
return result
}
......@@ -19,6 +19,7 @@ package cluster_impl
import (
"context"
"fmt"
"strconv"
"sync"
"testing"
......@@ -42,10 +43,11 @@ import (
)
var (
forkingUrl, _ = common.NewURL("dubbo://192.168.1.1:20000/com.ikurento.user.UserProvider")
forkingUrl, _ = common.NewURL(
fmt.Sprintf("dubbo://%s:%d/com.ikurento.user.UserProvider", constant.LOCAL_HOST_VALUE, constant.DEFAULT_PORT))
)
func registerForking(t *testing.T, mockInvokers ...*mock.MockInvoker) protocol.Invoker {
func registerForking(mockInvokers ...*mock.MockInvoker) protocol.Invoker {
extension.SetLoadbalance(loadbalance.RoundRobin, loadbalance.NewRoundRobinLoadBalance)
invokers := []protocol.Invoker{}
......@@ -62,7 +64,7 @@ func registerForking(t *testing.T, mockInvokers ...*mock.MockInvoker) protocol.I
return clusterInvoker
}
func Test_ForkingInvokeSuccess(t *testing.T) {
func TestForkingInvokeSuccess(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
......@@ -79,20 +81,20 @@ func Test_ForkingInvokeSuccess(t *testing.T) {
invokers = append(invokers, invoker)
invoker.EXPECT().IsAvailable().Return(true).AnyTimes()
invoker.EXPECT().Invoke(gomock.Any()).DoAndReturn(
func(invocation protocol.Invocation) protocol.Result {
func(protocol.Invocation) protocol.Result {
wg.Done()
return mockResult
})
}
clusterInvoker := registerForking(t, invokers...)
clusterInvoker := registerForking(invokers...)
result := clusterInvoker.Invoke(context.Background(), &invocation.RPCInvocation{})
assert.Equal(t, mockResult, result)
wg.Wait()
}
func Test_ForkingInvokeTimeout(t *testing.T) {
func TestForkingInvokeTimeout(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
......@@ -108,14 +110,14 @@ func Test_ForkingInvokeTimeout(t *testing.T) {
invokers = append(invokers, invoker)
invoker.EXPECT().IsAvailable().Return(true).AnyTimes()
invoker.EXPECT().Invoke(gomock.Any()).DoAndReturn(
func(invocation protocol.Invocation) protocol.Result {
func(protocol.Invocation) protocol.Result {
time.Sleep(2 * time.Second)
wg.Done()
return mockResult
})
}
clusterInvoker := registerForking(t, invokers...)
clusterInvoker := registerForking(invokers...)
result := clusterInvoker.Invoke(context.Background(), &invocation.RPCInvocation{})
assert.NotNil(t, result)
......@@ -123,7 +125,7 @@ func Test_ForkingInvokeTimeout(t *testing.T) {
wg.Wait()
}
func Test_ForkingInvokeHalfTimeout(t *testing.T) {
func TestForkingInvokeHalfTimeout(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
......@@ -140,13 +142,13 @@ func Test_ForkingInvokeHalfTimeout(t *testing.T) {
invoker.EXPECT().IsAvailable().Return(true).AnyTimes()
if i == 1 {
invoker.EXPECT().Invoke(gomock.Any()).DoAndReturn(
func(invocation protocol.Invocation) protocol.Result {
func(protocol.Invocation) protocol.Result {
wg.Done()
return mockResult
})
} else {
invoker.EXPECT().Invoke(gomock.Any()).DoAndReturn(
func(invocation protocol.Invocation) protocol.Result {
func(protocol.Invocation) protocol.Result {
time.Sleep(2 * time.Second)
wg.Done()
return mockResult
......@@ -154,7 +156,7 @@ func Test_ForkingInvokeHalfTimeout(t *testing.T) {
}
}
clusterInvoker := registerForking(t, invokers...)
clusterInvoker := registerForking(invokers...)
result := clusterInvoker.Invoke(context.Background(), &invocation.RPCInvocation{})
assert.Equal(t, mockResult, result)
......
......@@ -33,7 +33,7 @@ import (
"github.com/apache/dubbo-go/protocol/invocation"
)
func Test_RegAwareInvokeSuccess(t *testing.T) {
func TestRegAwareInvokeSuccess(t *testing.T) {
regAwareCluster := NewRegistryAwareCluster()
......
......@@ -19,6 +19,7 @@ package directory
import (
"encoding/base64"
"fmt"
"testing"
)
......@@ -33,19 +34,20 @@ import (
"github.com/apache/dubbo-go/common/constant"
)
var (
url, _ = common.NewURL(
fmt.Sprintf("dubbo://%s:%d/com.ikurento.user.UserProvider", constant.LOCAL_HOST_VALUE, constant.DEFAULT_PORT))
anyUrl, _ = common.NewURL(fmt.Sprintf("condition://%s/com.foo.BarService", constant.ANYHOST_VALUE))
)
func TestNewBaseDirectory(t *testing.T) {
url, _ := common.NewURL("dubbo://192.168.1.1:20000/com.ikurento.user.UserProvider")
directory := NewBaseDirectory(&url)
assert.NotNil(t, directory)
assert.Equal(t, url, directory.GetUrl())
assert.Equal(t, &url, directory.GetDirectoryUrl())
}
func TestBuildRouterChain(t *testing.T) {
url, _ := common.NewURL("dubbo://192.168.1.1:20000/com.ikurento.user.UserProvider")
directory := NewBaseDirectory(&url)
assert.NotNil(t, directory)
......@@ -62,9 +64,8 @@ func TestBuildRouterChain(t *testing.T) {
}
func getRouteUrl(rule string) *common.URL {
url, _ := common.NewURL("condition://0.0.0.0/com.foo.BarService")
url.AddParam("rule", rule)
url.AddParam("force", "true")
url.AddParam(constant.ROUTER_KEY, "router")
anyUrl.AddParam("rule", rule)
anyUrl.AddParam("force", "true")
anyUrl.AddParam(constant.ROUTER_KEY, "router")
return &url
}
......@@ -32,7 +32,7 @@ import (
"github.com/apache/dubbo-go/protocol/invocation"
)
func Test_StaticDirList(t *testing.T) {
func TestStaticDirList(t *testing.T) {
invokers := []protocol.Invoker{}
for i := 0; i < 10; i++ {
url, _ := common.NewURL(fmt.Sprintf("dubbo://192.168.1.%v:20000/com.ikurento.user.UserProvider", i))
......@@ -45,7 +45,7 @@ func Test_StaticDirList(t *testing.T) {
assert.Len(t, list, 10)
}
func Test_StaticDirDestroy(t *testing.T) {
func TestStaticDirDestroy(t *testing.T) {
invokers := []protocol.Invoker{}
for i := 0; i < 10; i++ {
url, _ := common.NewURL(fmt.Sprintf("dubbo://192.168.1.%v:20000/com.ikurento.user.UserProvider", i))
......
......@@ -28,6 +28,7 @@ import (
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/protocol"
"github.com/apache/dubbo-go/protocol/invocation"
)
......@@ -37,7 +38,7 @@ func TestLeastActiveSelect(t *testing.T) {
var invokers []protocol.Invoker
url, _ := common.NewURL("dubbo://192.168.1.0:20000/org.apache.demo.HelloService")
url, _ := common.NewURL(fmt.Sprintf("dubbo://%s:%d/org.apache.demo.HelloService", constant.LOCAL_HOST_VALUE, constant.DEFAULT_PORT))
invokers = append(invokers, protocol.NewBaseInvoker(url))
i := loadBalance.Select(invokers, &invocation.RPCInvocation{})
assert.True(t, i.GetUrl().URLEqual(url))
......
......@@ -36,7 +36,7 @@ import (
"github.com/apache/dubbo-go/protocol/invocation"
)
func Test_RandomlbSelect(t *testing.T) {
func TestRandomlbSelect(t *testing.T) {
randomlb := NewRandomLoadBalance()
invokers := []protocol.Invoker{}
......@@ -53,7 +53,7 @@ func Test_RandomlbSelect(t *testing.T) {
randomlb.Select(invokers, &invocation.RPCInvocation{})
}
func Test_RandomlbSelectWeight(t *testing.T) {
func TestRandomlbSelectWeight(t *testing.T) {
randomlb := NewRandomLoadBalance()
invokers := []protocol.Invoker{}
......@@ -84,7 +84,7 @@ func Test_RandomlbSelectWeight(t *testing.T) {
})
}
func Test_RandomlbSelectWarmup(t *testing.T) {
func TestRandomlbSelectWarmup(t *testing.T) {
randomlb := NewRandomLoadBalance()
invokers := []protocol.Invoker{}
......
......@@ -29,6 +29,7 @@ import (
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/protocol"
"github.com/apache/dubbo-go/protocol/invocation"
)
......@@ -38,7 +39,8 @@ func TestRoundRobinSelect(t *testing.T) {
var invokers []protocol.Invoker
url, _ := common.NewURL("dubbo://192.168.1.0:20000/org.apache.demo.HelloService")
url, _ := common.NewURL(fmt.Sprintf("dubbo://%s:%d/org.apache.demo.HelloService",
constant.LOCAL_HOST_VALUE, constant.DEFAULT_PORT))
invokers = append(invokers, protocol.NewBaseInvoker(url))
i := loadBalance.Select(invokers, &invocation.RPCInvocation{})
assert.True(t, i.GetUrl().URLEqual(url))
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment