Skip to content
Snippets Groups Projects
Commit c2e00e55 authored by Patrick's avatar Patrick
Browse files

Merge remote-tracking branch 'flycash/2.7.5-bk' into zk-servicediscovery

parents dd6e4bdc be56ab1c
No related branches found
No related tags found
No related merge requests found
Showing
with 149 additions and 101 deletions
...@@ -176,5 +176,5 @@ About dubbo-go benchmarking report, please refer to [dubbo benchmarking report]( ...@@ -176,5 +176,5 @@ About dubbo-go benchmarking report, please refer to [dubbo benchmarking report](
If you are using [apache/dubbo-go](github.com/apache/dubbo-go) and think that it helps you or want do some contributions to it, please add your company to to [the user list](https://github.com/apache/dubbo-go/issues/2) to let us know your needs. If you are using [apache/dubbo-go](github.com/apache/dubbo-go) and think that it helps you or want do some contributions to it, please add your company to to [the user list](https://github.com/apache/dubbo-go/issues/2) to let us know your needs.
![ctrip](https://pic.c-ctrip.com/common/c_logo2013.png) ![ctrip](https://pic.c-ctrip.com/common/c_logo2013.png)
![Excellent Health Technology Group](https://raw.githubusercontent.com/dajiiu/photo/static/mirror/haozhuo_logo.png) ![Excellent Health Technology Group](https://user-images.githubusercontent.com/52339367/84628582-80512200-af1b-11ea-945a-c6b4b9ad31f2.png)
![tuya](https://raw.githubusercontent.com/pantianying/go-tool/master/picture/logo_2-removebg-preview.png) ![tuya](https://raw.githubusercontent.com/pantianying/go-tool/master/picture/logo_2-removebg-preview.png)
...@@ -175,5 +175,5 @@ go test ./... -coverprofile=coverage.txt -covermode=atomic ...@@ -175,5 +175,5 @@ go test ./... -coverprofile=coverage.txt -covermode=atomic
若你正在使用 [apache/dubbo-go](github.com/apache/dubbo-go) 且认为其有用或者向对其做改进,请忝列贵司信息于 [用户列表](https://github.com/apache/dubbo-go/issues/2),以便我们知晓之。 若你正在使用 [apache/dubbo-go](github.com/apache/dubbo-go) 且认为其有用或者向对其做改进,请忝列贵司信息于 [用户列表](https://github.com/apache/dubbo-go/issues/2),以便我们知晓之。
![ctrip](https://pic.c-ctrip.com/common/c_logo2013.png) ![ctrip](https://pic.c-ctrip.com/common/c_logo2013.png)
![Excellent Health Technology Group](https://raw.githubusercontent.com/dajiiu/photo/static/mirror/haozhuo_logo.png) ![Excellent Health Technology Group](https://user-images.githubusercontent.com/52339367/84628582-80512200-af1b-11ea-945a-c6b4b9ad31f2.png)
![tuya](https://raw.githubusercontent.com/pantianying/go-tool/master/picture/logo_2-removebg-preview.png) ![tuya](https://raw.githubusercontent.com/pantianying/go-tool/master/picture/logo_2-removebg-preview.png)
...@@ -20,7 +20,7 @@ set zkJarPath=remoting/zookeeper/zookeeper-4unittest/contrib/fatjar ...@@ -20,7 +20,7 @@ set zkJarPath=remoting/zookeeper/zookeeper-4unittest/contrib/fatjar
set zkJar=%zkJarPath%/%zkJarName% set zkJar=%zkJarPath%/%zkJarName%
if not exist "%zkJar%" ( if not exist "%zkJar%" (
md %zkJarPath% md "%zkJarPath%"
curl -L %remoteJarUrl% -o %zkJar% curl -L %remoteJarUrl% -o %zkJar%
) )
......
...@@ -21,7 +21,8 @@ import ( ...@@ -21,7 +21,8 @@ import (
"github.com/apache/dubbo-go/protocol" "github.com/apache/dubbo-go/protocol"
) )
// Cluster ... // Cluster
// Extension - Cluster
type Cluster interface { type Cluster interface {
Join(Directory) protocol.Invoker Join(Directory) protocol.Invoker
} }
...@@ -31,7 +31,9 @@ func init() { ...@@ -31,7 +31,9 @@ func init() {
extension.SetCluster(available, NewAvailableCluster) extension.SetCluster(available, NewAvailableCluster)
} }
// NewAvailableCluster ... // NewAvailableCluster returns a cluster instance
//
// Obtain available service providers
func NewAvailableCluster() cluster.Cluster { func NewAvailableCluster() cluster.Cluster {
return &availableCluster{} return &availableCluster{}
} }
......
...@@ -35,7 +35,7 @@ type availableClusterInvoker struct { ...@@ -35,7 +35,7 @@ type availableClusterInvoker struct {
baseClusterInvoker baseClusterInvoker
} }
// NewAvailableClusterInvoker ... // NewAvailableClusterInvoker returns a cluster invoker instance
func NewAvailableClusterInvoker(directory cluster.Directory) protocol.Invoker { func NewAvailableClusterInvoker(directory cluster.Directory) protocol.Invoker {
return &availableClusterInvoker{ return &availableClusterInvoker{
baseClusterInvoker: newBaseClusterInvoker(directory), baseClusterInvoker: newBaseClusterInvoker(directory),
......
...@@ -19,6 +19,7 @@ package cluster_impl ...@@ -19,6 +19,7 @@ package cluster_impl
import ( import (
"context" "context"
"fmt"
"strings" "strings"
"testing" "testing"
) )
...@@ -32,6 +33,7 @@ import ( ...@@ -32,6 +33,7 @@ import (
"github.com/apache/dubbo-go/cluster/directory" "github.com/apache/dubbo-go/cluster/directory"
"github.com/apache/dubbo-go/cluster/loadbalance" "github.com/apache/dubbo-go/cluster/loadbalance"
"github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/protocol" "github.com/apache/dubbo-go/protocol"
"github.com/apache/dubbo-go/protocol/invocation" "github.com/apache/dubbo-go/protocol/invocation"
...@@ -39,7 +41,8 @@ import ( ...@@ -39,7 +41,8 @@ import (
) )
var ( var (
availableUrl, _ = common.NewURL("dubbo://192.168.1.1:20000/com.ikurento.user.UserProvider") availableUrl, _ = common.NewURL(fmt.Sprintf("dubbo://%s:%d/com.ikurento.user.UserProvider",
constant.LOCAL_HOST_VALUE, constant.DEFAULT_PORT))
) )
func registerAvailable(invoker *mock.MockInvoker) protocol.Invoker { func registerAvailable(invoker *mock.MockInvoker) protocol.Invoker {
......
...@@ -88,6 +88,10 @@ func (invoker *baseClusterInvoker) checkWhetherDestroyed() error { ...@@ -88,6 +88,10 @@ func (invoker *baseClusterInvoker) checkWhetherDestroyed() error {
func (invoker *baseClusterInvoker) doSelect(lb cluster.LoadBalance, invocation protocol.Invocation, invokers []protocol.Invoker, invoked []protocol.Invoker) protocol.Invoker { func (invoker *baseClusterInvoker) doSelect(lb cluster.LoadBalance, invocation protocol.Invocation, invokers []protocol.Invoker, invoked []protocol.Invoker) protocol.Invoker {
var selectedInvoker protocol.Invoker var selectedInvoker protocol.Invoker
if len(invokers) <= 0 {
return selectedInvoker
}
url := invokers[0].GetUrl() url := invokers[0].GetUrl()
sticky := url.GetParamBool(constant.STICKY_KEY, false) sticky := url.GetParamBool(constant.STICKY_KEY, false)
//Get the service method sticky config if have //Get the service method sticky config if have
...@@ -97,19 +101,17 @@ func (invoker *baseClusterInvoker) doSelect(lb cluster.LoadBalance, invocation p ...@@ -97,19 +101,17 @@ func (invoker *baseClusterInvoker) doSelect(lb cluster.LoadBalance, invocation p
invoker.stickyInvoker = nil invoker.stickyInvoker = nil
} }
if sticky && invoker.stickyInvoker != nil && (invoked == nil || !isInvoked(invoker.stickyInvoker, invoked)) { if sticky && invoker.availablecheck &&
if invoker.availablecheck && invoker.stickyInvoker.IsAvailable() { invoker.stickyInvoker != nil && invoker.stickyInvoker.IsAvailable() &&
return invoker.stickyInvoker (invoked == nil || !isInvoked(invoker.stickyInvoker, invoked)) {
} return invoker.stickyInvoker
} }
selectedInvoker = invoker.doSelectInvoker(lb, invocation, invokers, invoked) selectedInvoker = invoker.doSelectInvoker(lb, invocation, invokers, invoked)
if sticky { if sticky {
invoker.stickyInvoker = selectedInvoker invoker.stickyInvoker = selectedInvoker
} }
return selectedInvoker return selectedInvoker
} }
func (invoker *baseClusterInvoker) doSelectInvoker(lb cluster.LoadBalance, invocation protocol.Invocation, invokers []protocol.Invoker, invoked []protocol.Invoker) protocol.Invoker { func (invoker *baseClusterInvoker) doSelectInvoker(lb cluster.LoadBalance, invocation protocol.Invocation, invokers []protocol.Invoker, invoked []protocol.Invoker) protocol.Invoker {
......
...@@ -33,7 +33,7 @@ import ( ...@@ -33,7 +33,7 @@ import (
"github.com/apache/dubbo-go/protocol/invocation" "github.com/apache/dubbo-go/protocol/invocation"
) )
func Test_StickyNormal(t *testing.T) { func TestStickyNormal(t *testing.T) {
invokers := []protocol.Invoker{} invokers := []protocol.Invoker{}
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
url, _ := common.NewURL(fmt.Sprintf("dubbo://192.168.1.%v:20000/com.ikurento.user.UserProvider", i)) url, _ := common.NewURL(fmt.Sprintf("dubbo://192.168.1.%v:20000/com.ikurento.user.UserProvider", i))
...@@ -43,12 +43,15 @@ func Test_StickyNormal(t *testing.T) { ...@@ -43,12 +43,15 @@ func Test_StickyNormal(t *testing.T) {
base := &baseClusterInvoker{} base := &baseClusterInvoker{}
base.availablecheck = true base.availablecheck = true
invoked := []protocol.Invoker{} invoked := []protocol.Invoker{}
result := base.doSelect(loadbalance.NewRandomLoadBalance(), invocation.NewRPCInvocation("getUser", nil, nil), invokers, invoked)
result1 := base.doSelect(loadbalance.NewRandomLoadBalance(), invocation.NewRPCInvocation("getUser", nil, nil), invokers, invoked) tmpRandomBalance := loadbalance.NewRandomLoadBalance()
tmpInvocation := invocation.NewRPCInvocation("getUser", nil, nil)
result := base.doSelect(tmpRandomBalance, tmpInvocation, invokers, invoked)
result1 := base.doSelect(tmpRandomBalance, tmpInvocation, invokers, invoked)
assert.Equal(t, result, result1) assert.Equal(t, result, result1)
} }
func Test_StickyNormalWhenError(t *testing.T) { func TestStickyNormalWhenError(t *testing.T) {
invokers := []protocol.Invoker{} invokers := []protocol.Invoker{}
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
url, _ := common.NewURL(fmt.Sprintf("dubbo://192.168.1.%v:20000/com.ikurento.user.UserProvider", i)) url, _ := common.NewURL(fmt.Sprintf("dubbo://192.168.1.%v:20000/com.ikurento.user.UserProvider", i))
......
...@@ -31,7 +31,10 @@ func init() { ...@@ -31,7 +31,10 @@ func init() {
extension.SetCluster(broadcast, NewBroadcastCluster) extension.SetCluster(broadcast, NewBroadcastCluster)
} }
// NewBroadcastCluster ... // NewBroadcastCluster returns a broadcast cluster instance.
//
// Calling all providers' broadcast one by one. All errors will be reported.
// It is usually used to notify all providers to update local resource information such as caches or logs.
func NewBroadcastCluster() cluster.Cluster { func NewBroadcastCluster() cluster.Cluster {
return &broadcastCluster{} return &broadcastCluster{}
} }
......
...@@ -20,6 +20,7 @@ package cluster_impl ...@@ -20,6 +20,7 @@ package cluster_impl
import ( import (
"context" "context"
"errors" "errors"
"fmt"
"testing" "testing"
) )
...@@ -32,6 +33,7 @@ import ( ...@@ -32,6 +33,7 @@ import (
"github.com/apache/dubbo-go/cluster/directory" "github.com/apache/dubbo-go/cluster/directory"
"github.com/apache/dubbo-go/cluster/loadbalance" "github.com/apache/dubbo-go/cluster/loadbalance"
"github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/protocol" "github.com/apache/dubbo-go/protocol"
"github.com/apache/dubbo-go/protocol/invocation" "github.com/apache/dubbo-go/protocol/invocation"
...@@ -39,10 +41,11 @@ import ( ...@@ -39,10 +41,11 @@ import (
) )
var ( var (
broadcastUrl, _ = common.NewURL("dubbo://192.168.1.1:20000/com.ikurento.user.UserProvider") broadcastUrl, _ = common.NewURL(
fmt.Sprintf("dubbo://%s:%d/com.ikurento.user.UserProvider", constant.LOCAL_HOST_VALUE, constant.DEFAULT_PORT))
) )
func registerBroadcast(t *testing.T, mockInvokers ...*mock.MockInvoker) protocol.Invoker { func registerBroadcast(mockInvokers ...*mock.MockInvoker) protocol.Invoker {
extension.SetLoadbalance("random", loadbalance.NewRandomLoadBalance) extension.SetLoadbalance("random", loadbalance.NewRandomLoadBalance)
invokers := []protocol.Invoker{} invokers := []protocol.Invoker{}
...@@ -59,7 +62,7 @@ func registerBroadcast(t *testing.T, mockInvokers ...*mock.MockInvoker) protocol ...@@ -59,7 +62,7 @@ func registerBroadcast(t *testing.T, mockInvokers ...*mock.MockInvoker) protocol
return clusterInvoker return clusterInvoker
} }
func Test_BroadcastInvokeSuccess(t *testing.T) { func TestBroadcastInvokeSuccess(t *testing.T) {
ctrl := gomock.NewController(t) ctrl := gomock.NewController(t)
defer ctrl.Finish() defer ctrl.Finish()
...@@ -72,13 +75,13 @@ func Test_BroadcastInvokeSuccess(t *testing.T) { ...@@ -72,13 +75,13 @@ func Test_BroadcastInvokeSuccess(t *testing.T) {
invoker.EXPECT().Invoke(gomock.Any()).Return(mockResult) invoker.EXPECT().Invoke(gomock.Any()).Return(mockResult)
} }
clusterInvoker := registerBroadcast(t, invokers...) clusterInvoker := registerBroadcast(invokers...)
result := clusterInvoker.Invoke(context.Background(), &invocation.RPCInvocation{}) result := clusterInvoker.Invoke(context.Background(), &invocation.RPCInvocation{})
assert.Equal(t, mockResult, result) assert.Equal(t, mockResult, result)
} }
func Test_BroadcastInvokeFailed(t *testing.T) { func TestBroadcastInvokeFailed(t *testing.T) {
ctrl := gomock.NewController(t) ctrl := gomock.NewController(t)
defer ctrl.Finish() defer ctrl.Finish()
...@@ -102,7 +105,7 @@ func Test_BroadcastInvokeFailed(t *testing.T) { ...@@ -102,7 +105,7 @@ func Test_BroadcastInvokeFailed(t *testing.T) {
invoker.EXPECT().Invoke(gomock.Any()).Return(mockResult) invoker.EXPECT().Invoke(gomock.Any()).Return(mockResult)
} }
clusterInvoker := registerBroadcast(t, invokers...) clusterInvoker := registerBroadcast(invokers...)
result := clusterInvoker.Invoke(context.Background(), &invocation.RPCInvocation{}) result := clusterInvoker.Invoke(context.Background(), &invocation.RPCInvocation{})
assert.Equal(t, mockFailedResult.Err, result.Error()) assert.Equal(t, mockFailedResult.Err, result.Error())
......
...@@ -31,7 +31,10 @@ func init() { ...@@ -31,7 +31,10 @@ func init() {
extension.SetCluster(failback, NewFailbackCluster) extension.SetCluster(failback, NewFailbackCluster)
} }
// NewFailbackCluster ... // NewFailbackCluster returns a failback cluster instance
//
// Failure automatically restored, failed to record the background request,
// regular retransmission. Usually used for message notification operations.
func NewFailbackCluster() cluster.Cluster { func NewFailbackCluster() cluster.Cluster {
return &failbackCluster{} return &failbackCluster{}
} }
......
...@@ -19,6 +19,7 @@ package cluster_impl ...@@ -19,6 +19,7 @@ package cluster_impl
import ( import (
"context" "context"
"fmt"
"sync" "sync"
"testing" "testing"
"time" "time"
...@@ -34,6 +35,7 @@ import ( ...@@ -34,6 +35,7 @@ import (
"github.com/apache/dubbo-go/cluster/directory" "github.com/apache/dubbo-go/cluster/directory"
"github.com/apache/dubbo-go/cluster/loadbalance" "github.com/apache/dubbo-go/cluster/loadbalance"
"github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/protocol" "github.com/apache/dubbo-go/protocol"
"github.com/apache/dubbo-go/protocol/invocation" "github.com/apache/dubbo-go/protocol/invocation"
...@@ -41,11 +43,12 @@ import ( ...@@ -41,11 +43,12 @@ import (
) )
var ( var (
failbackUrl, _ = common.NewURL("dubbo://192.168.1.1:20000/com.ikurento.user.UserProvider") failbackUrl, _ = common.NewURL(
fmt.Sprintf("dubbo://%s:%d/com.ikurento.user.UserProvider", constant.LOCAL_HOST_VALUE, constant.DEFAULT_PORT))
) )
// registerFailback register failbackCluster to cluster extension. // registerFailback register failbackCluster to cluster extension.
func registerFailback(t *testing.T, invoker *mock.MockInvoker) protocol.Invoker { func registerFailback(invoker *mock.MockInvoker) protocol.Invoker {
extension.SetLoadbalance("random", loadbalance.NewRandomLoadBalance) extension.SetLoadbalance("random", loadbalance.NewRandomLoadBalance)
failbackCluster := NewFailbackCluster() failbackCluster := NewFailbackCluster()
...@@ -60,12 +63,12 @@ func registerFailback(t *testing.T, invoker *mock.MockInvoker) protocol.Invoker ...@@ -60,12 +63,12 @@ func registerFailback(t *testing.T, invoker *mock.MockInvoker) protocol.Invoker
} }
// success firstly, failback should return origin invoke result. // success firstly, failback should return origin invoke result.
func Test_FailbackSuceess(t *testing.T) { func TestFailbackSuceess(t *testing.T) {
ctrl := gomock.NewController(t) ctrl := gomock.NewController(t)
defer ctrl.Finish() defer ctrl.Finish()
invoker := mock.NewMockInvoker(ctrl) invoker := mock.NewMockInvoker(ctrl)
clusterInvoker := registerFailback(t, invoker).(*failbackClusterInvoker) clusterInvoker := registerFailback(invoker).(*failbackClusterInvoker)
invoker.EXPECT().GetUrl().Return(failbackUrl).AnyTimes() invoker.EXPECT().GetUrl().Return(failbackUrl).AnyTimes()
...@@ -77,12 +80,12 @@ func Test_FailbackSuceess(t *testing.T) { ...@@ -77,12 +80,12 @@ func Test_FailbackSuceess(t *testing.T) {
} }
// failed firstly, success later after one retry. // failed firstly, success later after one retry.
func Test_FailbackRetryOneSuccess(t *testing.T) { func TestFailbackRetryOneSuccess(t *testing.T) {
ctrl := gomock.NewController(t) ctrl := gomock.NewController(t)
defer ctrl.Finish() defer ctrl.Finish()
invoker := mock.NewMockInvoker(ctrl) invoker := mock.NewMockInvoker(ctrl)
clusterInvoker := registerFailback(t, invoker).(*failbackClusterInvoker) clusterInvoker := registerFailback(invoker).(*failbackClusterInvoker)
invoker.EXPECT().GetUrl().Return(failbackUrl).AnyTimes() invoker.EXPECT().GetUrl().Return(failbackUrl).AnyTimes()
...@@ -95,7 +98,7 @@ func Test_FailbackRetryOneSuccess(t *testing.T) { ...@@ -95,7 +98,7 @@ func Test_FailbackRetryOneSuccess(t *testing.T) {
wg.Add(1) wg.Add(1)
now := time.Now() now := time.Now()
mockSuccResult := &protocol.RPCResult{Rest: rest{tried: 0, success: true}} mockSuccResult := &protocol.RPCResult{Rest: rest{tried: 0, success: true}}
invoker.EXPECT().Invoke(gomock.Any()).DoAndReturn(func(invocation protocol.Invocation) protocol.Result { invoker.EXPECT().Invoke(gomock.Any()).DoAndReturn(func(protocol.Invocation) protocol.Result {
delta := time.Since(now).Nanoseconds() / int64(time.Second) delta := time.Since(now).Nanoseconds() / int64(time.Second)
assert.True(t, delta >= 5) assert.True(t, delta >= 5)
wg.Done() wg.Done()
...@@ -120,12 +123,12 @@ func Test_FailbackRetryOneSuccess(t *testing.T) { ...@@ -120,12 +123,12 @@ func Test_FailbackRetryOneSuccess(t *testing.T) {
} }
// failed firstly, and failed again after ech retry time. // failed firstly, and failed again after ech retry time.
func Test_FailbackRetryFailed(t *testing.T) { func TestFailbackRetryFailed(t *testing.T) {
ctrl := gomock.NewController(t) ctrl := gomock.NewController(t)
defer ctrl.Finish() defer ctrl.Finish()
invoker := mock.NewMockInvoker(ctrl) invoker := mock.NewMockInvoker(ctrl)
clusterInvoker := registerFailback(t, invoker).(*failbackClusterInvoker) clusterInvoker := registerFailback(invoker).(*failbackClusterInvoker)
invoker.EXPECT().GetUrl().Return(failbackUrl).AnyTimes() invoker.EXPECT().GetUrl().Return(failbackUrl).AnyTimes()
...@@ -141,7 +144,7 @@ func Test_FailbackRetryFailed(t *testing.T) { ...@@ -141,7 +144,7 @@ func Test_FailbackRetryFailed(t *testing.T) {
// add retry call that eventually failed. // add retry call that eventually failed.
for i := 0; i < retries; i++ { for i := 0; i < retries; i++ {
j := i + 1 j := i + 1
invoker.EXPECT().Invoke(gomock.Any()).DoAndReturn(func(invocation protocol.Invocation) protocol.Result { invoker.EXPECT().Invoke(gomock.Any()).DoAndReturn(func(protocol.Invocation) protocol.Result {
delta := time.Since(now).Nanoseconds() / int64(time.Second) delta := time.Since(now).Nanoseconds() / int64(time.Second)
assert.True(t, delta >= int64(5*j)) assert.True(t, delta >= int64(5*j))
wg.Done() wg.Done()
...@@ -166,12 +169,12 @@ func Test_FailbackRetryFailed(t *testing.T) { ...@@ -166,12 +169,12 @@ func Test_FailbackRetryFailed(t *testing.T) {
} }
// add 10 tasks but all failed firstly, and failed again with one retry. // add 10 tasks but all failed firstly, and failed again with one retry.
func Test_FailbackRetryFailed10Times(t *testing.T) { func TestFailbackRetryFailed10Times(t *testing.T) {
ctrl := gomock.NewController(t) ctrl := gomock.NewController(t)
defer ctrl.Finish() defer ctrl.Finish()
invoker := mock.NewMockInvoker(ctrl) invoker := mock.NewMockInvoker(ctrl)
clusterInvoker := registerFailback(t, invoker).(*failbackClusterInvoker) clusterInvoker := registerFailback(invoker).(*failbackClusterInvoker)
clusterInvoker.maxRetries = 10 clusterInvoker.maxRetries = 10
invoker.EXPECT().GetUrl().Return(failbackUrl).AnyTimes() invoker.EXPECT().GetUrl().Return(failbackUrl).AnyTimes()
...@@ -184,7 +187,7 @@ func Test_FailbackRetryFailed10Times(t *testing.T) { ...@@ -184,7 +187,7 @@ func Test_FailbackRetryFailed10Times(t *testing.T) {
var wg sync.WaitGroup var wg sync.WaitGroup
wg.Add(10) wg.Add(10)
now := time.Now() now := time.Now()
invoker.EXPECT().Invoke(gomock.Any()).DoAndReturn(func(invocation protocol.Invocation) protocol.Result { invoker.EXPECT().Invoke(gomock.Any()).DoAndReturn(func(protocol.Invocation) protocol.Result {
delta := time.Since(now).Nanoseconds() / int64(time.Second) delta := time.Since(now).Nanoseconds() / int64(time.Second)
assert.True(t, delta >= 5) assert.True(t, delta >= 5)
wg.Done() wg.Done()
...@@ -208,12 +211,12 @@ func Test_FailbackRetryFailed10Times(t *testing.T) { ...@@ -208,12 +211,12 @@ func Test_FailbackRetryFailed10Times(t *testing.T) {
assert.Equal(t, int64(0), clusterInvoker.taskList.Len()) assert.Equal(t, int64(0), clusterInvoker.taskList.Len())
} }
func Test_FailbackOutOfLimit(t *testing.T) { func TestFailbackOutOfLimit(t *testing.T) {
ctrl := gomock.NewController(t) ctrl := gomock.NewController(t)
defer ctrl.Finish() defer ctrl.Finish()
invoker := mock.NewMockInvoker(ctrl) invoker := mock.NewMockInvoker(ctrl)
clusterInvoker := registerFailback(t, invoker).(*failbackClusterInvoker) clusterInvoker := registerFailback(invoker).(*failbackClusterInvoker)
clusterInvoker.failbackTasks = 1 clusterInvoker.failbackTasks = 1
invoker.EXPECT().GetUrl().Return(failbackUrl).AnyTimes() invoker.EXPECT().GetUrl().Return(failbackUrl).AnyTimes()
......
...@@ -31,7 +31,10 @@ func init() { ...@@ -31,7 +31,10 @@ func init() {
extension.SetCluster(failfast, NewFailFastCluster) extension.SetCluster(failfast, NewFailFastCluster)
} }
// NewFailFastCluster ... // NewFailFastCluster returns a failfast cluster instance.
//
// Fast failure, only made a call, failure immediately error. Usually used for non-idempotent write operations,
// such as adding records.
func NewFailFastCluster() cluster.Cluster { func NewFailFastCluster() cluster.Cluster {
return &failfastCluster{} return &failfastCluster{}
} }
......
...@@ -19,6 +19,7 @@ package cluster_impl ...@@ -19,6 +19,7 @@ package cluster_impl
import ( import (
"context" "context"
"fmt"
"testing" "testing"
) )
...@@ -32,6 +33,7 @@ import ( ...@@ -32,6 +33,7 @@ import (
"github.com/apache/dubbo-go/cluster/directory" "github.com/apache/dubbo-go/cluster/directory"
"github.com/apache/dubbo-go/cluster/loadbalance" "github.com/apache/dubbo-go/cluster/loadbalance"
"github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/protocol" "github.com/apache/dubbo-go/protocol"
"github.com/apache/dubbo-go/protocol/invocation" "github.com/apache/dubbo-go/protocol/invocation"
...@@ -39,11 +41,12 @@ import ( ...@@ -39,11 +41,12 @@ import (
) )
var ( var (
failfastUrl, _ = common.NewURL("dubbo://192.168.1.1:20000/com.ikurento.user.UserProvider") failfastUrl, _ = common.NewURL(
fmt.Sprintf("dubbo://%s:%d/com.ikurento.user.UserProvider", constant.LOCAL_HOST_VALUE, constant.DEFAULT_PORT))
) )
// registerFailfast register failfastCluster to cluster extension. // registerFailfast register failfastCluster to cluster extension.
func registerFailfast(t *testing.T, invoker *mock.MockInvoker) protocol.Invoker { func registerFailfast(invoker *mock.MockInvoker) protocol.Invoker {
extension.SetLoadbalance("random", loadbalance.NewRandomLoadBalance) extension.SetLoadbalance("random", loadbalance.NewRandomLoadBalance)
failfastCluster := NewFailFastCluster() failfastCluster := NewFailFastCluster()
...@@ -57,12 +60,12 @@ func registerFailfast(t *testing.T, invoker *mock.MockInvoker) protocol.Invoker ...@@ -57,12 +60,12 @@ func registerFailfast(t *testing.T, invoker *mock.MockInvoker) protocol.Invoker
return clusterInvoker return clusterInvoker
} }
func Test_FailfastInvokeSuccess(t *testing.T) { func TestFailfastInvokeSuccess(t *testing.T) {
ctrl := gomock.NewController(t) ctrl := gomock.NewController(t)
defer ctrl.Finish() defer ctrl.Finish()
invoker := mock.NewMockInvoker(ctrl) invoker := mock.NewMockInvoker(ctrl)
clusterInvoker := registerFailfast(t, invoker) clusterInvoker := registerFailfast(invoker)
invoker.EXPECT().GetUrl().Return(failfastUrl).AnyTimes() invoker.EXPECT().GetUrl().Return(failfastUrl).AnyTimes()
...@@ -77,12 +80,12 @@ func Test_FailfastInvokeSuccess(t *testing.T) { ...@@ -77,12 +80,12 @@ func Test_FailfastInvokeSuccess(t *testing.T) {
assert.Equal(t, 0, res.tried) assert.Equal(t, 0, res.tried)
} }
func Test_FailfastInvokeFail(t *testing.T) { func TestFailfastInvokeFail(t *testing.T) {
ctrl := gomock.NewController(t) ctrl := gomock.NewController(t)
defer ctrl.Finish() defer ctrl.Finish()
invoker := mock.NewMockInvoker(ctrl) invoker := mock.NewMockInvoker(ctrl)
clusterInvoker := registerFailfast(t, invoker) clusterInvoker := registerFailfast(invoker)
invoker.EXPECT().GetUrl().Return(failfastUrl).AnyTimes() invoker.EXPECT().GetUrl().Return(failfastUrl).AnyTimes()
......
...@@ -31,7 +31,11 @@ func init() { ...@@ -31,7 +31,11 @@ func init() {
extension.SetCluster(name, NewFailoverCluster) extension.SetCluster(name, NewFailoverCluster)
} }
// NewFailoverCluster ... // NewFailoverCluster returns a failover cluster instance
//
// Failure automatically switch, when there is a failure,
// retry the other server (default). Usually used for read operations,
// but retries can result in longer delays.
func NewFailoverCluster() cluster.Cluster { func NewFailoverCluster() cluster.Cluster {
return &failoverCluster{} return &failoverCluster{}
} }
......
...@@ -45,52 +45,35 @@ func newFailoverClusterInvoker(directory cluster.Directory) protocol.Invoker { ...@@ -45,52 +45,35 @@ func newFailoverClusterInvoker(directory cluster.Directory) protocol.Invoker {
} }
func (invoker *failoverClusterInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result { func (invoker *failoverClusterInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result {
var (
result protocol.Result
invoked []protocol.Invoker
providers []string
)
invokers := invoker.directory.List(invocation) invokers := invoker.directory.List(invocation)
err := invoker.checkInvokers(invokers, invocation) if err := invoker.checkInvokers(invokers, invocation); err != nil {
if err != nil {
return &protocol.RPCResult{Err: err} return &protocol.RPCResult{Err: err}
} }
loadbalance := getLoadBalance(invokers[0], invocation)
methodName := invocation.MethodName() methodName := invocation.MethodName()
url := invokers[0].GetUrl() retries := getRetries(invokers, methodName)
loadBalance := getLoadBalance(invokers[0], invocation)
//get reties
retriesConfig := url.GetParam(constant.RETRIES_KEY, constant.DEFAULT_RETRIES)
//Get the service method loadbalance config if have
if v := url.GetMethodParam(methodName, constant.RETRIES_KEY, ""); len(v) != 0 {
retriesConfig = v
}
retries, err := strconv.Atoi(retriesConfig)
if err != nil || retries < 0 {
logger.Error("Your retries config is invalid,pls do a check. And will use the default retries configuration instead.")
retries = constant.DEFAULT_RETRIES_INT
}
invoked := []protocol.Invoker{}
providers := []string{}
var result protocol.Result
if retries > len(invokers) {
retries = len(invokers)
}
for i := 0; i <= retries; i++ { for i := 0; i <= retries; i++ {
//Reselect before retry to avoid a change of candidate `invokers`. //Reselect before retry to avoid a change of candidate `invokers`.
//NOTE: if `invokers` changed, then `invoked` also lose accuracy. //NOTE: if `invokers` changed, then `invoked` also lose accuracy.
if i > 0 { if i > 0 {
err := invoker.checkWhetherDestroyed() if err := invoker.checkWhetherDestroyed(); err != nil {
if err != nil {
return &protocol.RPCResult{Err: err} return &protocol.RPCResult{Err: err}
} }
invokers = invoker.directory.List(invocation) invokers = invoker.directory.List(invocation)
err = invoker.checkInvokers(invokers, invocation) if err := invoker.checkInvokers(invokers, invocation); err != nil {
if err != nil {
return &protocol.RPCResult{Err: err} return &protocol.RPCResult{Err: err}
} }
} }
ivk := invoker.doSelect(loadbalance, invocation, invokers, invoked) ivk := invoker.doSelect(loadBalance, invocation, invokers, invoked)
if ivk == nil { if ivk == nil {
continue continue
} }
...@@ -100,13 +83,40 @@ func (invoker *failoverClusterInvoker) Invoke(ctx context.Context, invocation pr ...@@ -100,13 +83,40 @@ func (invoker *failoverClusterInvoker) Invoke(ctx context.Context, invocation pr
if result.Error() != nil { if result.Error() != nil {
providers = append(providers, ivk.GetUrl().Key()) providers = append(providers, ivk.GetUrl().Key())
continue continue
} else {
return result
} }
return result
} }
ip, _ := gxnet.GetLocalIP() ip, _ := gxnet.GetLocalIP()
return &protocol.RPCResult{Err: perrors.Errorf("Failed to invoke the method %v in the service %v. Tried %v times of "+ invokerSvc := invoker.GetUrl().Service()
"the providers %v (%v/%v)from the registry %v on the consumer %v using the dubbo version %v. Last error is %v.", invokerUrl := invoker.directory.GetUrl()
methodName, invoker.GetUrl().Service(), retries, providers, len(providers), len(invokers), invoker.directory.GetUrl(), ip, constant.Version, result.Error().Error(), return &protocol.RPCResult{
)} Err: perrors.Errorf("Failed to invoke the method %v in the service %v. Tried %v times of the providers %v (%v/%v)from the registry %v on the consumer %v using the dubbo version %v. Last error is %v.",
methodName, invokerSvc, retries, providers, len(providers), len(invokers), invokerUrl, ip, constant.Version, result.Error().Error(),
)}
}
func getRetries(invokers []protocol.Invoker, methodName string) int {
if len(invokers) <= 0 {
return constant.DEFAULT_RETRIES_INT
}
url := invokers[0].GetUrl()
//get reties
retriesConfig := url.GetParam(constant.RETRIES_KEY, constant.DEFAULT_RETRIES)
//Get the service method loadbalance config if have
if v := url.GetMethodParam(methodName, constant.RETRIES_KEY, ""); len(v) != 0 {
retriesConfig = v
}
retries, err := strconv.Atoi(retriesConfig)
if err != nil || retries < 0 {
logger.Error("Your retries config is invalid,pls do a check. And will use the default retries configuration instead.")
retries = constant.DEFAULT_RETRIES_INT
}
if retries > len(invokers) {
retries = len(invokers)
}
return retries
} }
...@@ -101,7 +101,7 @@ func (bi *MockInvoker) Destroy() { ...@@ -101,7 +101,7 @@ func (bi *MockInvoker) Destroy() {
var count int var count int
func normalInvoke(t *testing.T, successCount int, urlParam url.Values, invocations ...*invocation.RPCInvocation) protocol.Result { func normalInvoke(successCount int, urlParam url.Values, invocations ...*invocation.RPCInvocation) protocol.Result {
extension.SetLoadbalance("random", loadbalance.NewRandomLoadBalance) extension.SetLoadbalance("random", loadbalance.NewRandomLoadBalance)
failoverCluster := NewFailoverCluster() failoverCluster := NewFailoverCluster()
...@@ -119,40 +119,40 @@ func normalInvoke(t *testing.T, successCount int, urlParam url.Values, invocatio ...@@ -119,40 +119,40 @@ func normalInvoke(t *testing.T, successCount int, urlParam url.Values, invocatio
return clusterInvoker.Invoke(context.Background(), &invocation.RPCInvocation{}) return clusterInvoker.Invoke(context.Background(), &invocation.RPCInvocation{})
} }
func Test_FailoverInvokeSuccess(t *testing.T) { func TestFailoverInvokeSuccess(t *testing.T) {
urlParams := url.Values{} urlParams := url.Values{}
result := normalInvoke(t, 3, urlParams) result := normalInvoke(3, urlParams)
assert.NoError(t, result.Error()) assert.NoError(t, result.Error())
count = 0 count = 0
} }
func Test_FailoverInvokeFail(t *testing.T) { func TestFailoverInvokeFail(t *testing.T) {
urlParams := url.Values{} urlParams := url.Values{}
result := normalInvoke(t, 4, urlParams) result := normalInvoke(4, urlParams)
assert.Errorf(t, result.Error(), "error") assert.Errorf(t, result.Error(), "error")
count = 0 count = 0
} }
func Test_FailoverInvoke1(t *testing.T) { func TestFailoverInvoke1(t *testing.T) {
urlParams := url.Values{} urlParams := url.Values{}
urlParams.Set(constant.RETRIES_KEY, "3") urlParams.Set(constant.RETRIES_KEY, "3")
result := normalInvoke(t, 4, urlParams) result := normalInvoke(4, urlParams)
assert.NoError(t, result.Error()) assert.NoError(t, result.Error())
count = 0 count = 0
} }
func Test_FailoverInvoke2(t *testing.T) { func TestFailoverInvoke2(t *testing.T) {
urlParams := url.Values{} urlParams := url.Values{}
urlParams.Set(constant.RETRIES_KEY, "2") urlParams.Set(constant.RETRIES_KEY, "2")
urlParams.Set("methods.test."+constant.RETRIES_KEY, "3") urlParams.Set("methods.test."+constant.RETRIES_KEY, "3")
ivc := invocation.NewRPCInvocationWithOptions(invocation.WithMethodName("test")) ivc := invocation.NewRPCInvocationWithOptions(invocation.WithMethodName("test"))
result := normalInvoke(t, 4, urlParams, ivc) result := normalInvoke(4, urlParams, ivc)
assert.NoError(t, result.Error()) assert.NoError(t, result.Error())
count = 0 count = 0
} }
func Test_FailoverDestroy(t *testing.T) { func TestFailoverDestroy(t *testing.T) {
extension.SetLoadbalance("random", loadbalance.NewRandomLoadBalance) extension.SetLoadbalance("random", loadbalance.NewRandomLoadBalance)
failoverCluster := NewFailoverCluster() failoverCluster := NewFailoverCluster()
...@@ -170,5 +170,4 @@ func Test_FailoverDestroy(t *testing.T) { ...@@ -170,5 +170,4 @@ func Test_FailoverDestroy(t *testing.T) {
count = 0 count = 0
clusterInvoker.Destroy() clusterInvoker.Destroy()
assert.Equal(t, false, clusterInvoker.IsAvailable()) assert.Equal(t, false, clusterInvoker.IsAvailable())
} }
...@@ -31,7 +31,10 @@ func init() { ...@@ -31,7 +31,10 @@ func init() {
extension.SetCluster(failsafe, NewFailsafeCluster) extension.SetCluster(failsafe, NewFailsafeCluster)
} }
// NewFailsafeCluster ... // NewFailsafeCluster returns a failsafe cluster instance.
//
// Failure of security, anomalies, directly ignored. Usually it is
// used to write audit logs and other operations.
func NewFailsafeCluster() cluster.Cluster { func NewFailsafeCluster() cluster.Cluster {
return &failsafeCluster{} return &failsafeCluster{}
} }
......
...@@ -19,6 +19,7 @@ package cluster_impl ...@@ -19,6 +19,7 @@ package cluster_impl
import ( import (
"context" "context"
"fmt"
"testing" "testing"
) )
...@@ -32,6 +33,7 @@ import ( ...@@ -32,6 +33,7 @@ import (
"github.com/apache/dubbo-go/cluster/directory" "github.com/apache/dubbo-go/cluster/directory"
"github.com/apache/dubbo-go/cluster/loadbalance" "github.com/apache/dubbo-go/cluster/loadbalance"
"github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/protocol" "github.com/apache/dubbo-go/protocol"
"github.com/apache/dubbo-go/protocol/invocation" "github.com/apache/dubbo-go/protocol/invocation"
...@@ -39,11 +41,12 @@ import ( ...@@ -39,11 +41,12 @@ import (
) )
var ( var (
failsafeUrl, _ = common.NewURL("dubbo://192.168.1.1:20000/com.ikurento.user.UserProvider") failsafeUrl, _ = common.NewURL(
fmt.Sprintf("dubbo://%s:%d/com.ikurento.user.UserProvider", constant.LOCAL_HOST_VALUE, constant.DEFAULT_PORT))
) )
// registerFailsafe register failsafeCluster to cluster extension. // registerFailsafe register failsafeCluster to cluster extension.
func registerFailsafe(t *testing.T, invoker *mock.MockInvoker) protocol.Invoker { func registerFailsafe(invoker *mock.MockInvoker) protocol.Invoker {
extension.SetLoadbalance("random", loadbalance.NewRandomLoadBalance) extension.SetLoadbalance("random", loadbalance.NewRandomLoadBalance)
failsafeCluster := NewFailsafeCluster() failsafeCluster := NewFailsafeCluster()
...@@ -57,12 +60,12 @@ func registerFailsafe(t *testing.T, invoker *mock.MockInvoker) protocol.Invoker ...@@ -57,12 +60,12 @@ func registerFailsafe(t *testing.T, invoker *mock.MockInvoker) protocol.Invoker
return clusterInvoker return clusterInvoker
} }
func Test_FailSafeInvokeSuccess(t *testing.T) { func TestFailSafeInvokeSuccess(t *testing.T) {
ctrl := gomock.NewController(t) ctrl := gomock.NewController(t)
defer ctrl.Finish() defer ctrl.Finish()
invoker := mock.NewMockInvoker(ctrl) invoker := mock.NewMockInvoker(ctrl)
clusterInvoker := registerFailsafe(t, invoker) clusterInvoker := registerFailsafe(invoker)
invoker.EXPECT().GetUrl().Return(failsafeUrl).AnyTimes() invoker.EXPECT().GetUrl().Return(failsafeUrl).AnyTimes()
...@@ -76,12 +79,12 @@ func Test_FailSafeInvokeSuccess(t *testing.T) { ...@@ -76,12 +79,12 @@ func Test_FailSafeInvokeSuccess(t *testing.T) {
assert.True(t, res.success) assert.True(t, res.success)
} }
func Test_FailSafeInvokeFail(t *testing.T) { func TestFailSafeInvokeFail(t *testing.T) {
ctrl := gomock.NewController(t) ctrl := gomock.NewController(t)
defer ctrl.Finish() defer ctrl.Finish()
invoker := mock.NewMockInvoker(ctrl) invoker := mock.NewMockInvoker(ctrl)
clusterInvoker := registerFailsafe(t, invoker) clusterInvoker := registerFailsafe(invoker)
invoker.EXPECT().GetUrl().Return(failsafeUrl).AnyTimes() invoker.EXPECT().GetUrl().Return(failsafeUrl).AnyTimes()
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment