Skip to content
Snippets Groups Projects
Unverified Commit 5ec05193 authored by lzp0412's avatar lzp0412 Committed by GitHub
Browse files

Merge pull request #15 from apache/develop

Develop
parents 862bf03e 9697b2b5
No related branches found
No related tags found
No related merge requests found
Showing
with 181 additions and 125 deletions
notifications:
commits: commits@dubbo.apache.org
issues: notifications@dubbo.apache.org
pullrequests: notifications@dubbo.apache.org
jira_options: link label link label
language: go
dist: trusty
sudo: required
# define the dependence env
language: go
os:
- linux
go:
- "1.13"
services:
- docker
env:
- GO111MODULE=on
install: true
# define ci-stage
script:
# license-check
- echo 'start license check'
- go fmt ./... && [[ -z `git status -s` ]]
- sh before_validate_license.sh
- chmod u+x /tmp/tools/license/license-header-checker
- /tmp/tools/license/license-header-checker -v -a -r -i vendor /tmp/tools/license/license.txt . go && [[ -z `git status -s` ]]
# unit-test
- echo 'start unit-test'
- chmod u+x before_ut.sh && ./before_ut.sh
- go mod vendor && go test ./... -coverprofile=coverage.txt -covermode=atomic
# integrate-test
- chmod +x integrate_test.sh && ./integrate_test.sh
after_success:
- bash <(curl -s https://codecov.io/bash)
......
......@@ -16,7 +16,7 @@ Apache License, Version 2.0
## Release note ##
[v1.4.0-rc1 - Mar 12, 2020](https://github.com/apache/dubbo-go/releases/tag/v1.4.0-rc1)
[v1.4.0 - Mar 17, 2020](https://github.com/apache/dubbo-go/releases/tag/v1.4.0)
[v1.3.0 - Mar 1, 2020](https://github.com/apache/dubbo-go/releases/tag/v1.3.0)
......@@ -176,5 +176,5 @@ About dubbo-go benchmarking report, please refer to [dubbo benchmarking report](
If you are using [apache/dubbo-go](github.com/apache/dubbo-go) and think that it helps you or want do some contributions to it, please add your company to to [the user list](https://github.com/apache/dubbo-go/issues/2) to let us know your needs.
![ctrip](https://pic.c-ctrip.com/common/c_logo2013.png)
![Excellent Health Technology Group](https://raw.githubusercontent.com/dajiiu/photo/static/mirror/haozhuo_logo.png)
![Excellent Health Technology Group](https://user-images.githubusercontent.com/52339367/84628582-80512200-af1b-11ea-945a-c6b4b9ad31f2.png)
![tuya](https://raw.githubusercontent.com/pantianying/go-tool/master/picture/logo_2-removebg-preview.png)
......@@ -15,7 +15,7 @@ Apache License, Version 2.0
## 发布日志 ##
[v1.4.0-rc1 - 2020年3月12](https://github.com/apache/dubbo-go/releases/tag/v1.4.0-rc1)
[v1.4.0 - 2020年3月17](https://github.com/apache/dubbo-go/releases/tag/v1.4.0)
[v1.3.0 - 2020年3月1日](https://github.com/apache/dubbo-go/releases/tag/v1.3.0)
......@@ -175,5 +175,5 @@ go test ./... -coverprofile=coverage.txt -covermode=atomic
若你正在使用 [apache/dubbo-go](github.com/apache/dubbo-go) 且认为其有用或者向对其做改进,请忝列贵司信息于 [用户列表](https://github.com/apache/dubbo-go/issues/2),以便我们知晓之。
![ctrip](https://pic.c-ctrip.com/common/c_logo2013.png)
![Excellent Health Technology Group](https://raw.githubusercontent.com/dajiiu/photo/static/mirror/haozhuo_logo.png)
![Excellent Health Technology Group](https://user-images.githubusercontent.com/52339367/84628582-80512200-af1b-11ea-945a-c6b4b9ad31f2.png)
![tuya](https://raw.githubusercontent.com/pantianying/go-tool/master/picture/logo_2-removebg-preview.png)
......@@ -20,7 +20,7 @@ set zkJarPath=remoting/zookeeper/zookeeper-4unittest/contrib/fatjar
set zkJar=%zkJarPath%/%zkJarName%
if not exist "%zkJar%" (
md %zkJarPath%
md "%zkJarPath%"
curl -L %remoteJarUrl% -o %zkJar%
)
......
......@@ -21,7 +21,8 @@ import (
"github.com/apache/dubbo-go/protocol"
)
// Cluster ...
// Cluster
// Extension - Cluster
type Cluster interface {
Join(Directory) protocol.Invoker
}
......@@ -31,7 +31,9 @@ func init() {
extension.SetCluster(available, NewAvailableCluster)
}
// NewAvailableCluster ...
// NewAvailableCluster returns a cluster instance
//
// Obtain available service providers
func NewAvailableCluster() cluster.Cluster {
return &availableCluster{}
}
......
......@@ -35,7 +35,7 @@ type availableClusterInvoker struct {
baseClusterInvoker
}
// NewAvailableClusterInvoker ...
// NewAvailableClusterInvoker returns a cluster invoker instance
func NewAvailableClusterInvoker(directory cluster.Directory) protocol.Invoker {
return &availableClusterInvoker{
baseClusterInvoker: newBaseClusterInvoker(directory),
......
......@@ -19,6 +19,7 @@ package cluster_impl
import (
"context"
"fmt"
"strings"
"testing"
)
......@@ -32,6 +33,7 @@ import (
"github.com/apache/dubbo-go/cluster/directory"
"github.com/apache/dubbo-go/cluster/loadbalance"
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/protocol"
"github.com/apache/dubbo-go/protocol/invocation"
......@@ -39,10 +41,11 @@ import (
)
var (
availableUrl, _ = common.NewURL("dubbo://192.168.1.1:20000/com.ikurento.user.UserProvider")
availableUrl, _ = common.NewURL(fmt.Sprintf("dubbo://%s:%d/com.ikurento.user.UserProvider",
constant.LOCAL_HOST_VALUE, constant.DEFAULT_PORT))
)
func registerAvailable(t *testing.T, invoker *mock.MockInvoker) protocol.Invoker {
func registerAvailable(invoker *mock.MockInvoker) protocol.Invoker {
extension.SetLoadbalance("random", loadbalance.NewRandomLoadBalance)
availableCluster := NewAvailableCluster()
......@@ -60,7 +63,7 @@ func TestAvailableClusterInvokerSuccess(t *testing.T) {
defer ctrl.Finish()
invoker := mock.NewMockInvoker(ctrl)
clusterInvoker := registerAvailable(t, invoker)
clusterInvoker := registerAvailable(invoker)
mockResult := &protocol.RPCResult{Rest: rest{tried: 0, success: true}}
invoker.EXPECT().IsAvailable().Return(true)
......@@ -76,7 +79,7 @@ func TestAvailableClusterInvokerNoAvail(t *testing.T) {
defer ctrl.Finish()
invoker := mock.NewMockInvoker(ctrl)
clusterInvoker := registerAvailable(t, invoker)
clusterInvoker := registerAvailable(invoker)
invoker.EXPECT().IsAvailable().Return(false)
......
......@@ -87,8 +87,11 @@ func (invoker *baseClusterInvoker) checkWhetherDestroyed() error {
}
func (invoker *baseClusterInvoker) doSelect(lb cluster.LoadBalance, invocation protocol.Invocation, invokers []protocol.Invoker, invoked []protocol.Invoker) protocol.Invoker {
var selectedInvoker protocol.Invoker
if len(invokers) <= 0 {
return selectedInvoker
}
url := invokers[0].GetUrl()
sticky := url.GetParamBool(constant.STICKY_KEY, false)
//Get the service method sticky config if have
......@@ -98,19 +101,17 @@ func (invoker *baseClusterInvoker) doSelect(lb cluster.LoadBalance, invocation p
invoker.stickyInvoker = nil
}
if sticky && invoker.stickyInvoker != nil && (invoked == nil || !isInvoked(invoker.stickyInvoker, invoked)) {
if invoker.availablecheck && invoker.stickyInvoker.IsAvailable() {
return invoker.stickyInvoker
}
if sticky && invoker.availablecheck &&
invoker.stickyInvoker != nil && invoker.stickyInvoker.IsAvailable() &&
(invoked == nil || !isInvoked(invoker.stickyInvoker, invoked)) {
return invoker.stickyInvoker
}
selectedInvoker = invoker.doSelectInvoker(lb, invocation, invokers, invoked)
if sticky {
invoker.stickyInvoker = selectedInvoker
}
return selectedInvoker
}
func (invoker *baseClusterInvoker) doSelectInvoker(lb cluster.LoadBalance, invocation protocol.Invocation, invokers []protocol.Invoker, invoked []protocol.Invoker) protocol.Invoker {
......
......@@ -33,25 +33,33 @@ import (
"github.com/apache/dubbo-go/protocol/invocation"
)
func Test_StickyNormal(t *testing.T) {
const (
baseClusterInvokerMethodName = "getUser"
baseClusterInvokerFormat = "dubbo://192.168.1.%v:20000/com.ikurento.user.UserProvider"
)
func TestStickyNormal(t *testing.T) {
invokers := []protocol.Invoker{}
for i := 0; i < 10; i++ {
url, _ := common.NewURL(fmt.Sprintf("dubbo://192.168.1.%v:20000/com.ikurento.user.UserProvider", i))
url, _ := common.NewURL(fmt.Sprintf(baseClusterInvokerFormat, i))
url.SetParam("sticky", "true")
invokers = append(invokers, NewMockInvoker(url, 1))
}
base := &baseClusterInvoker{}
base.availablecheck = true
invoked := []protocol.Invoker{}
result := base.doSelect(loadbalance.NewRandomLoadBalance(), invocation.NewRPCInvocation("getUser", nil, nil), invokers, invoked)
result1 := base.doSelect(loadbalance.NewRandomLoadBalance(), invocation.NewRPCInvocation("getUser", nil, nil), invokers, invoked)
tmpRandomBalance := loadbalance.NewRandomLoadBalance()
tmpInvocation := invocation.NewRPCInvocation(baseClusterInvokerMethodName, nil, nil)
result := base.doSelect(tmpRandomBalance, tmpInvocation, invokers, invoked)
result1 := base.doSelect(tmpRandomBalance, tmpInvocation, invokers, invoked)
assert.Equal(t, result, result1)
}
func Test_StickyNormalWhenError(t *testing.T) {
func TestStickyNormalWhenError(t *testing.T) {
invokers := []protocol.Invoker{}
for i := 0; i < 10; i++ {
url, _ := common.NewURL(fmt.Sprintf("dubbo://192.168.1.%v:20000/com.ikurento.user.UserProvider", i))
url, _ := common.NewURL(fmt.Sprintf(baseClusterInvokerFormat, i))
url.SetParam("sticky", "true")
invokers = append(invokers, NewMockInvoker(url, 1))
}
......@@ -59,8 +67,8 @@ func Test_StickyNormalWhenError(t *testing.T) {
base.availablecheck = true
invoked := []protocol.Invoker{}
result := base.doSelect(loadbalance.NewRandomLoadBalance(), invocation.NewRPCInvocation("getUser", nil, nil), invokers, invoked)
result := base.doSelect(loadbalance.NewRandomLoadBalance(), invocation.NewRPCInvocation(baseClusterInvokerMethodName, nil, nil), invokers, invoked)
invoked = append(invoked, result)
result1 := base.doSelect(loadbalance.NewRandomLoadBalance(), invocation.NewRPCInvocation("getUser", nil, nil), invokers, invoked)
result1 := base.doSelect(loadbalance.NewRandomLoadBalance(), invocation.NewRPCInvocation(baseClusterInvokerMethodName, nil, nil), invokers, invoked)
assert.NotEqual(t, result, result1)
}
......@@ -31,7 +31,10 @@ func init() {
extension.SetCluster(broadcast, NewBroadcastCluster)
}
// NewBroadcastCluster ...
// NewBroadcastCluster returns a broadcast cluster instance.
//
// Calling all providers' broadcast one by one. All errors will be reported.
// It is usually used to notify all providers to update local resource information such as caches or logs.
func NewBroadcastCluster() cluster.Cluster {
return &broadcastCluster{}
}
......
......@@ -20,6 +20,7 @@ package cluster_impl
import (
"context"
"errors"
"fmt"
"testing"
)
......@@ -32,6 +33,7 @@ import (
"github.com/apache/dubbo-go/cluster/directory"
"github.com/apache/dubbo-go/cluster/loadbalance"
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/protocol"
"github.com/apache/dubbo-go/protocol/invocation"
......@@ -39,10 +41,11 @@ import (
)
var (
broadcastUrl, _ = common.NewURL("dubbo://192.168.1.1:20000/com.ikurento.user.UserProvider")
broadcastUrl, _ = common.NewURL(
fmt.Sprintf("dubbo://%s:%d/com.ikurento.user.UserProvider", constant.LOCAL_HOST_VALUE, constant.DEFAULT_PORT))
)
func registerBroadcast(t *testing.T, mockInvokers ...*mock.MockInvoker) protocol.Invoker {
func registerBroadcast(mockInvokers ...*mock.MockInvoker) protocol.Invoker {
extension.SetLoadbalance("random", loadbalance.NewRandomLoadBalance)
invokers := []protocol.Invoker{}
......@@ -59,7 +62,7 @@ func registerBroadcast(t *testing.T, mockInvokers ...*mock.MockInvoker) protocol
return clusterInvoker
}
func Test_BroadcastInvokeSuccess(t *testing.T) {
func TestBroadcastInvokeSuccess(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
......@@ -72,13 +75,13 @@ func Test_BroadcastInvokeSuccess(t *testing.T) {
invoker.EXPECT().Invoke(gomock.Any()).Return(mockResult)
}
clusterInvoker := registerBroadcast(t, invokers...)
clusterInvoker := registerBroadcast(invokers...)
result := clusterInvoker.Invoke(context.Background(), &invocation.RPCInvocation{})
assert.Equal(t, mockResult, result)
}
func Test_BroadcastInvokeFailed(t *testing.T) {
func TestBroadcastInvokeFailed(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
......@@ -102,7 +105,7 @@ func Test_BroadcastInvokeFailed(t *testing.T) {
invoker.EXPECT().Invoke(gomock.Any()).Return(mockResult)
}
clusterInvoker := registerBroadcast(t, invokers...)
clusterInvoker := registerBroadcast(invokers...)
result := clusterInvoker.Invoke(context.Background(), &invocation.RPCInvocation{})
assert.Equal(t, mockFailedResult.Err, result.Error())
......
......@@ -31,7 +31,10 @@ func init() {
extension.SetCluster(failback, NewFailbackCluster)
}
// NewFailbackCluster ...
// NewFailbackCluster returns a failback cluster instance
//
// Failure automatically restored, failed to record the background request,
// regular retransmission. Usually used for message notification operations.
func NewFailbackCluster() cluster.Cluster {
return &failbackCluster{}
}
......
......@@ -72,6 +72,19 @@ func newFailbackClusterInvoker(directory cluster.Directory) protocol.Invoker {
return invoker
}
func (invoker *failbackClusterInvoker) tryTimerTaskProc(ctx context.Context, retryTask *retryTimerTask) {
invoked := make([]protocol.Invoker, 0)
invoked = append(invoked, retryTask.lastInvoker)
retryInvoker := invoker.doSelect(retryTask.loadbalance, retryTask.invocation, retryTask.invokers, invoked)
var result protocol.Result
result = retryInvoker.Invoke(ctx, retryTask.invocation)
if result.Error() != nil {
retryTask.lastInvoker = retryInvoker
invoker.checkRetry(retryTask, result.Error())
}
}
func (invoker *failbackClusterInvoker) process(ctx context.Context) {
invoker.ticker = time.NewTicker(time.Second * 1)
for range invoker.ticker.C {
......@@ -91,25 +104,11 @@ func (invoker *failbackClusterInvoker) process(ctx context.Context) {
}
// ignore return. the get must success.
_, err = invoker.taskList.Get(1)
if err != nil {
if _, err = invoker.taskList.Get(1); err != nil {
logger.Warnf("get task found err: %v\n", err)
break
}
go func(retryTask *retryTimerTask) {
invoked := make([]protocol.Invoker, 0)
invoked = append(invoked, retryTask.lastInvoker)
retryInvoker := invoker.doSelect(retryTask.loadbalance, retryTask.invocation, retryTask.invokers, invoked)
var result protocol.Result
result = retryInvoker.Invoke(ctx, retryTask.invocation)
if result.Error() != nil {
retryTask.lastInvoker = retryInvoker
invoker.checkRetry(retryTask, result.Error())
}
}(retryTask)
go invoker.tryTimerTaskProc(ctx, retryTask)
}
}
}
......@@ -129,29 +128,26 @@ func (invoker *failbackClusterInvoker) checkRetry(retryTask *retryTimerTask, err
func (invoker *failbackClusterInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result {
invokers := invoker.directory.List(invocation)
err := invoker.checkInvokers(invokers, invocation)
if err != nil {
if err := invoker.checkInvokers(invokers, invocation); err != nil {
logger.Errorf("Failed to invoke the method %v in the service %v, wait for retry in background. Ignored exception: %v.\n",
invocation.MethodName(), invoker.GetUrl().Service(), err)
return &protocol.RPCResult{}
}
url := invokers[0].GetUrl()
methodName := invocation.MethodName()
//Get the service loadbalance config
url := invokers[0].GetUrl()
lb := url.GetParam(constant.LOADBALANCE_KEY, constant.DEFAULT_LOADBALANCE)
//Get the service method loadbalance config if have
methodName := invocation.MethodName()
if v := url.GetMethodParam(methodName, constant.LOADBALANCE_KEY, ""); v != "" {
lb = v
}
loadbalance := extension.GetLoadbalance(lb)
loadBalance := extension.GetLoadbalance(lb)
invoked := make([]protocol.Invoker, 0, len(invokers))
var result protocol.Result
ivk := invoker.doSelect(loadbalance, invocation, invokers, invoked)
ivk := invoker.doSelect(loadBalance, invocation, invokers, invoked)
//DO INVOKE
result = ivk.Invoke(ctx, invocation)
result := ivk.Invoke(ctx, invocation)
if result.Error() != nil {
invoker.once.Do(func() {
invoker.taskList = queue.New(invoker.failbackTasks)
......@@ -164,7 +160,7 @@ func (invoker *failbackClusterInvoker) Invoke(ctx context.Context, invocation pr
return &protocol.RPCResult{}
}
timerTask := newRetryTimerTask(loadbalance, invocation, invokers, ivk)
timerTask := newRetryTimerTask(loadBalance, invocation, invokers, ivk)
invoker.taskList.Put(timerTask)
logger.Errorf("Failback to invoke the method %v in the service %v, wait for retry in background. Ignored exception: %v.\n",
......@@ -172,7 +168,6 @@ func (invoker *failbackClusterInvoker) Invoke(ctx context.Context, invocation pr
// ignore
return &protocol.RPCResult{}
}
return result
}
......
......@@ -19,6 +19,7 @@ package cluster_impl
import (
"context"
"fmt"
"sync"
"testing"
"time"
......@@ -34,6 +35,7 @@ import (
"github.com/apache/dubbo-go/cluster/directory"
"github.com/apache/dubbo-go/cluster/loadbalance"
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/protocol"
"github.com/apache/dubbo-go/protocol/invocation"
......@@ -41,11 +43,12 @@ import (
)
var (
failbackUrl, _ = common.NewURL("dubbo://192.168.1.1:20000/com.ikurento.user.UserProvider")
failbackUrl, _ = common.NewURL(
fmt.Sprintf("dubbo://%s:%d/com.ikurento.user.UserProvider", constant.LOCAL_HOST_VALUE, constant.DEFAULT_PORT))
)
// registerFailback register failbackCluster to cluster extension.
func registerFailback(t *testing.T, invoker *mock.MockInvoker) protocol.Invoker {
func registerFailback(invoker *mock.MockInvoker) protocol.Invoker {
extension.SetLoadbalance("random", loadbalance.NewRandomLoadBalance)
failbackCluster := NewFailbackCluster()
......@@ -60,12 +63,12 @@ func registerFailback(t *testing.T, invoker *mock.MockInvoker) protocol.Invoker
}
// success firstly, failback should return origin invoke result.
func Test_FailbackSuceess(t *testing.T) {
func TestFailbackSuceess(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
invoker := mock.NewMockInvoker(ctrl)
clusterInvoker := registerFailback(t, invoker).(*failbackClusterInvoker)
clusterInvoker := registerFailback(invoker).(*failbackClusterInvoker)
invoker.EXPECT().GetUrl().Return(failbackUrl).AnyTimes()
......@@ -77,12 +80,12 @@ func Test_FailbackSuceess(t *testing.T) {
}
// failed firstly, success later after one retry.
func Test_FailbackRetryOneSuccess(t *testing.T) {
func TestFailbackRetryOneSuccess(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
invoker := mock.NewMockInvoker(ctrl)
clusterInvoker := registerFailback(t, invoker).(*failbackClusterInvoker)
clusterInvoker := registerFailback(invoker).(*failbackClusterInvoker)
invoker.EXPECT().GetUrl().Return(failbackUrl).AnyTimes()
......@@ -95,7 +98,7 @@ func Test_FailbackRetryOneSuccess(t *testing.T) {
wg.Add(1)
now := time.Now()
mockSuccResult := &protocol.RPCResult{Rest: rest{tried: 0, success: true}}
invoker.EXPECT().Invoke(gomock.Any()).DoAndReturn(func(invocation protocol.Invocation) protocol.Result {
invoker.EXPECT().Invoke(gomock.Any()).DoAndReturn(func(protocol.Invocation) protocol.Result {
delta := time.Since(now).Nanoseconds() / int64(time.Second)
assert.True(t, delta >= 5)
wg.Done()
......@@ -120,12 +123,12 @@ func Test_FailbackRetryOneSuccess(t *testing.T) {
}
// failed firstly, and failed again after ech retry time.
func Test_FailbackRetryFailed(t *testing.T) {
func TestFailbackRetryFailed(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
invoker := mock.NewMockInvoker(ctrl)
clusterInvoker := registerFailback(t, invoker).(*failbackClusterInvoker)
clusterInvoker := registerFailback(invoker).(*failbackClusterInvoker)
invoker.EXPECT().GetUrl().Return(failbackUrl).AnyTimes()
......@@ -141,7 +144,7 @@ func Test_FailbackRetryFailed(t *testing.T) {
// add retry call that eventually failed.
for i := 0; i < retries; i++ {
j := i + 1
invoker.EXPECT().Invoke(gomock.Any()).DoAndReturn(func(invocation protocol.Invocation) protocol.Result {
invoker.EXPECT().Invoke(gomock.Any()).DoAndReturn(func(protocol.Invocation) protocol.Result {
delta := time.Since(now).Nanoseconds() / int64(time.Second)
assert.True(t, delta >= int64(5*j))
wg.Done()
......@@ -166,12 +169,12 @@ func Test_FailbackRetryFailed(t *testing.T) {
}
// add 10 tasks but all failed firstly, and failed again with one retry.
func Test_FailbackRetryFailed10Times(t *testing.T) {
func TestFailbackRetryFailed10Times(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
invoker := mock.NewMockInvoker(ctrl)
clusterInvoker := registerFailback(t, invoker).(*failbackClusterInvoker)
clusterInvoker := registerFailback(invoker).(*failbackClusterInvoker)
clusterInvoker.maxRetries = 10
invoker.EXPECT().GetUrl().Return(failbackUrl).AnyTimes()
......@@ -184,7 +187,7 @@ func Test_FailbackRetryFailed10Times(t *testing.T) {
var wg sync.WaitGroup
wg.Add(10)
now := time.Now()
invoker.EXPECT().Invoke(gomock.Any()).DoAndReturn(func(invocation protocol.Invocation) protocol.Result {
invoker.EXPECT().Invoke(gomock.Any()).DoAndReturn(func(protocol.Invocation) protocol.Result {
delta := time.Since(now).Nanoseconds() / int64(time.Second)
assert.True(t, delta >= 5)
wg.Done()
......@@ -208,12 +211,12 @@ func Test_FailbackRetryFailed10Times(t *testing.T) {
assert.Equal(t, int64(0), clusterInvoker.taskList.Len())
}
func Test_FailbackOutOfLimit(t *testing.T) {
func TestFailbackOutOfLimit(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
invoker := mock.NewMockInvoker(ctrl)
clusterInvoker := registerFailback(t, invoker).(*failbackClusterInvoker)
clusterInvoker := registerFailback(invoker).(*failbackClusterInvoker)
clusterInvoker.failbackTasks = 1
invoker.EXPECT().GetUrl().Return(failbackUrl).AnyTimes()
......
......@@ -31,7 +31,10 @@ func init() {
extension.SetCluster(failfast, NewFailFastCluster)
}
// NewFailFastCluster ...
// NewFailFastCluster returns a failfast cluster instance.
//
// Fast failure, only made a call, failure immediately error. Usually used for non-idempotent write operations,
// such as adding records.
func NewFailFastCluster() cluster.Cluster {
return &failfastCluster{}
}
......
......@@ -19,6 +19,7 @@ package cluster_impl
import (
"context"
"fmt"
"testing"
)
......@@ -32,6 +33,7 @@ import (
"github.com/apache/dubbo-go/cluster/directory"
"github.com/apache/dubbo-go/cluster/loadbalance"
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/protocol"
"github.com/apache/dubbo-go/protocol/invocation"
......@@ -39,11 +41,12 @@ import (
)
var (
failfastUrl, _ = common.NewURL("dubbo://192.168.1.1:20000/com.ikurento.user.UserProvider")
failfastUrl, _ = common.NewURL(
fmt.Sprintf("dubbo://%s:%d/com.ikurento.user.UserProvider", constant.LOCAL_HOST_VALUE, constant.DEFAULT_PORT))
)
// registerFailfast register failfastCluster to cluster extension.
func registerFailfast(t *testing.T, invoker *mock.MockInvoker) protocol.Invoker {
func registerFailfast(invoker *mock.MockInvoker) protocol.Invoker {
extension.SetLoadbalance("random", loadbalance.NewRandomLoadBalance)
failfastCluster := NewFailFastCluster()
......@@ -57,12 +60,12 @@ func registerFailfast(t *testing.T, invoker *mock.MockInvoker) protocol.Invoker
return clusterInvoker
}
func Test_FailfastInvokeSuccess(t *testing.T) {
func TestFailfastInvokeSuccess(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
invoker := mock.NewMockInvoker(ctrl)
clusterInvoker := registerFailfast(t, invoker)
clusterInvoker := registerFailfast(invoker)
invoker.EXPECT().GetUrl().Return(failfastUrl).AnyTimes()
......@@ -77,12 +80,12 @@ func Test_FailfastInvokeSuccess(t *testing.T) {
assert.Equal(t, 0, res.tried)
}
func Test_FailfastInvokeFail(t *testing.T) {
func TestFailfastInvokeFail(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
invoker := mock.NewMockInvoker(ctrl)
clusterInvoker := registerFailfast(t, invoker)
clusterInvoker := registerFailfast(invoker)
invoker.EXPECT().GetUrl().Return(failfastUrl).AnyTimes()
......
......@@ -31,7 +31,11 @@ func init() {
extension.SetCluster(name, NewFailoverCluster)
}
// NewFailoverCluster ...
// NewFailoverCluster returns a failover cluster instance
//
// Failure automatically switch, when there is a failure,
// retry the other server (default). Usually used for read operations,
// but retries can result in longer delays.
func NewFailoverCluster() cluster.Cluster {
return &failoverCluster{}
}
......
......@@ -45,52 +45,35 @@ func newFailoverClusterInvoker(directory cluster.Directory) protocol.Invoker {
}
func (invoker *failoverClusterInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result {
var (
result protocol.Result
invoked []protocol.Invoker
providers []string
)
invokers := invoker.directory.List(invocation)
err := invoker.checkInvokers(invokers, invocation)
if err != nil {
if err := invoker.checkInvokers(invokers, invocation); err != nil {
return &protocol.RPCResult{Err: err}
}
loadbalance := getLoadBalance(invokers[0], invocation)
methodName := invocation.MethodName()
url := invokers[0].GetUrl()
//get reties
retriesConfig := url.GetParam(constant.RETRIES_KEY, constant.DEFAULT_RETRIES)
retries := getRetries(invokers, methodName)
loadBalance := getLoadBalance(invokers[0], invocation)
//Get the service method loadbalance config if have
if v := url.GetMethodParam(methodName, constant.RETRIES_KEY, ""); len(v) != 0 {
retriesConfig = v
}
retries, err := strconv.Atoi(retriesConfig)
if err != nil || retries < 0 {
logger.Error("Your retries config is invalid,pls do a check. And will use the default retries configuration instead.")
retries = constant.DEFAULT_RETRIES_INT
}
invoked := []protocol.Invoker{}
providers := []string{}
var result protocol.Result
if retries > len(invokers) {
retries = len(invokers)
}
for i := 0; i <= retries; i++ {
//Reselect before retry to avoid a change of candidate `invokers`.
//NOTE: if `invokers` changed, then `invoked` also lose accuracy.
if i > 0 {
err := invoker.checkWhetherDestroyed()
if err != nil {
if err := invoker.checkWhetherDestroyed(); err != nil {
return &protocol.RPCResult{Err: err}
}
invokers = invoker.directory.List(invocation)
err = invoker.checkInvokers(invokers, invocation)
if err != nil {
if err := invoker.checkInvokers(invokers, invocation); err != nil {
return &protocol.RPCResult{Err: err}
}
}
ivk := invoker.doSelect(loadbalance, invocation, invokers, invoked)
ivk := invoker.doSelect(loadBalance, invocation, invokers, invoked)
if ivk == nil {
continue
}
......@@ -100,13 +83,40 @@ func (invoker *failoverClusterInvoker) Invoke(ctx context.Context, invocation pr
if result.Error() != nil {
providers = append(providers, ivk.GetUrl().Key())
continue
} else {
return result
}
return result
}
ip, _ := gxnet.GetLocalIP()
return &protocol.RPCResult{Err: perrors.Errorf("Failed to invoke the method %v in the service %v. Tried %v times of "+
"the providers %v (%v/%v)from the registry %v on the consumer %v using the dubbo version %v. Last error is %v.",
methodName, invoker.GetUrl().Service(), retries, providers, len(providers), len(invokers), invoker.directory.GetUrl(), ip, constant.Version, result.Error().Error(),
)}
invokerSvc := invoker.GetUrl().Service()
invokerUrl := invoker.directory.GetUrl()
return &protocol.RPCResult{
Err: perrors.Errorf("Failed to invoke the method %v in the service %v. Tried %v times of the providers %v (%v/%v)from the registry %v on the consumer %v using the dubbo version %v. Last error is %v.",
methodName, invokerSvc, retries, providers, len(providers), len(invokers), invokerUrl, ip, constant.Version, result.Error().Error(),
)}
}
func getRetries(invokers []protocol.Invoker, methodName string) int {
if len(invokers) <= 0 {
return constant.DEFAULT_RETRIES_INT
}
url := invokers[0].GetUrl()
//get reties
retriesConfig := url.GetParam(constant.RETRIES_KEY, constant.DEFAULT_RETRIES)
//Get the service method loadbalance config if have
if v := url.GetMethodParam(methodName, constant.RETRIES_KEY, ""); len(v) != 0 {
retriesConfig = v
}
retries, err := strconv.Atoi(retriesConfig)
if err != nil || retries < 0 {
logger.Error("Your retries config is invalid,pls do a check. And will use the default retries configuration instead.")
retries = constant.DEFAULT_RETRIES_INT
}
if retries > len(invokers) {
retries = len(invokers)
}
return retries
}
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment