Skip to content
Snippets Groups Projects
Commit 287da826 authored by flycash's avatar flycash
Browse files

Merge develop

parents e8ca1632 93a655f4
No related branches found
No related tags found
No related merge requests found
Showing
with 156 additions and 111 deletions
...@@ -20,7 +20,7 @@ set zkJarPath=remoting/zookeeper/zookeeper-4unittest/contrib/fatjar ...@@ -20,7 +20,7 @@ set zkJarPath=remoting/zookeeper/zookeeper-4unittest/contrib/fatjar
set zkJar=%zkJarPath%/%zkJarName% set zkJar=%zkJarPath%/%zkJarName%
if not exist "%zkJar%" ( if not exist "%zkJar%" (
md %zkJarPath% md "%zkJarPath%"
curl -L %remoteJarUrl% -o %zkJar% curl -L %remoteJarUrl% -o %zkJar%
) )
......
...@@ -21,7 +21,8 @@ import ( ...@@ -21,7 +21,8 @@ import (
"github.com/apache/dubbo-go/protocol" "github.com/apache/dubbo-go/protocol"
) )
// Cluster ... // Cluster
// Extension - Cluster
type Cluster interface { type Cluster interface {
Join(Directory) protocol.Invoker Join(Directory) protocol.Invoker
} }
...@@ -31,7 +31,9 @@ func init() { ...@@ -31,7 +31,9 @@ func init() {
extension.SetCluster(available, NewAvailableCluster) extension.SetCluster(available, NewAvailableCluster)
} }
// NewAvailableCluster ... // NewAvailableCluster returns a cluster instance
//
// Obtain available service providers
func NewAvailableCluster() cluster.Cluster { func NewAvailableCluster() cluster.Cluster {
return &availableCluster{} return &availableCluster{}
} }
......
...@@ -35,7 +35,7 @@ type availableClusterInvoker struct { ...@@ -35,7 +35,7 @@ type availableClusterInvoker struct {
baseClusterInvoker baseClusterInvoker
} }
// NewAvailableClusterInvoker ... // NewAvailableClusterInvoker returns a cluster invoker instance
func NewAvailableClusterInvoker(directory cluster.Directory) protocol.Invoker { func NewAvailableClusterInvoker(directory cluster.Directory) protocol.Invoker {
return &availableClusterInvoker{ return &availableClusterInvoker{
baseClusterInvoker: newBaseClusterInvoker(directory), baseClusterInvoker: newBaseClusterInvoker(directory),
......
...@@ -19,6 +19,7 @@ package cluster_impl ...@@ -19,6 +19,7 @@ package cluster_impl
import ( import (
"context" "context"
"fmt"
"strings" "strings"
"testing" "testing"
) )
...@@ -32,6 +33,7 @@ import ( ...@@ -32,6 +33,7 @@ import (
"github.com/apache/dubbo-go/cluster/directory" "github.com/apache/dubbo-go/cluster/directory"
"github.com/apache/dubbo-go/cluster/loadbalance" "github.com/apache/dubbo-go/cluster/loadbalance"
"github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/protocol" "github.com/apache/dubbo-go/protocol"
"github.com/apache/dubbo-go/protocol/invocation" "github.com/apache/dubbo-go/protocol/invocation"
...@@ -39,7 +41,8 @@ import ( ...@@ -39,7 +41,8 @@ import (
) )
var ( var (
availableUrl, _ = common.NewURL("dubbo://192.168.1.1:20000/com.ikurento.user.UserProvider") availableUrl, _ = common.NewURL(fmt.Sprintf("dubbo://%s:%d/com.ikurento.user.UserProvider",
constant.LOCAL_HOST_VALUE, constant.DEFAULT_PORT))
) )
func registerAvailable(invoker *mock.MockInvoker) protocol.Invoker { func registerAvailable(invoker *mock.MockInvoker) protocol.Invoker {
......
...@@ -88,6 +88,10 @@ func (invoker *baseClusterInvoker) checkWhetherDestroyed() error { ...@@ -88,6 +88,10 @@ func (invoker *baseClusterInvoker) checkWhetherDestroyed() error {
func (invoker *baseClusterInvoker) doSelect(lb cluster.LoadBalance, invocation protocol.Invocation, invokers []protocol.Invoker, invoked []protocol.Invoker) protocol.Invoker { func (invoker *baseClusterInvoker) doSelect(lb cluster.LoadBalance, invocation protocol.Invocation, invokers []protocol.Invoker, invoked []protocol.Invoker) protocol.Invoker {
var selectedInvoker protocol.Invoker var selectedInvoker protocol.Invoker
if len(invokers) <= 0 {
return selectedInvoker
}
url := invokers[0].GetUrl() url := invokers[0].GetUrl()
sticky := url.GetParamBool(constant.STICKY_KEY, false) sticky := url.GetParamBool(constant.STICKY_KEY, false)
//Get the service method sticky config if have //Get the service method sticky config if have
...@@ -97,19 +101,17 @@ func (invoker *baseClusterInvoker) doSelect(lb cluster.LoadBalance, invocation p ...@@ -97,19 +101,17 @@ func (invoker *baseClusterInvoker) doSelect(lb cluster.LoadBalance, invocation p
invoker.stickyInvoker = nil invoker.stickyInvoker = nil
} }
if sticky && invoker.stickyInvoker != nil && (invoked == nil || !isInvoked(invoker.stickyInvoker, invoked)) { if sticky && invoker.availablecheck &&
if invoker.availablecheck && invoker.stickyInvoker.IsAvailable() { invoker.stickyInvoker != nil && invoker.stickyInvoker.IsAvailable() &&
return invoker.stickyInvoker (invoked == nil || !isInvoked(invoker.stickyInvoker, invoked)) {
} return invoker.stickyInvoker
} }
selectedInvoker = invoker.doSelectInvoker(lb, invocation, invokers, invoked) selectedInvoker = invoker.doSelectInvoker(lb, invocation, invokers, invoked)
if sticky { if sticky {
invoker.stickyInvoker = selectedInvoker invoker.stickyInvoker = selectedInvoker
} }
return selectedInvoker return selectedInvoker
} }
func (invoker *baseClusterInvoker) doSelectInvoker(lb cluster.LoadBalance, invocation protocol.Invocation, invokers []protocol.Invoker, invoked []protocol.Invoker) protocol.Invoker { func (invoker *baseClusterInvoker) doSelectInvoker(lb cluster.LoadBalance, invocation protocol.Invocation, invokers []protocol.Invoker, invoked []protocol.Invoker) protocol.Invoker {
......
...@@ -33,7 +33,7 @@ import ( ...@@ -33,7 +33,7 @@ import (
"github.com/apache/dubbo-go/protocol/invocation" "github.com/apache/dubbo-go/protocol/invocation"
) )
func Test_StickyNormal(t *testing.T) { func TestStickyNormal(t *testing.T) {
invokers := []protocol.Invoker{} invokers := []protocol.Invoker{}
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
url, _ := common.NewURL(fmt.Sprintf("dubbo://192.168.1.%v:20000/com.ikurento.user.UserProvider", i)) url, _ := common.NewURL(fmt.Sprintf("dubbo://192.168.1.%v:20000/com.ikurento.user.UserProvider", i))
...@@ -43,12 +43,15 @@ func Test_StickyNormal(t *testing.T) { ...@@ -43,12 +43,15 @@ func Test_StickyNormal(t *testing.T) {
base := &baseClusterInvoker{} base := &baseClusterInvoker{}
base.availablecheck = true base.availablecheck = true
invoked := []protocol.Invoker{} invoked := []protocol.Invoker{}
result := base.doSelect(loadbalance.NewRandomLoadBalance(), invocation.NewRPCInvocation("getUser", nil, nil), invokers, invoked)
result1 := base.doSelect(loadbalance.NewRandomLoadBalance(), invocation.NewRPCInvocation("getUser", nil, nil), invokers, invoked) tmpRandomBalance := loadbalance.NewRandomLoadBalance()
tmpInvocation := invocation.NewRPCInvocation("getUser", nil, nil)
result := base.doSelect(tmpRandomBalance, tmpInvocation, invokers, invoked)
result1 := base.doSelect(tmpRandomBalance, tmpInvocation, invokers, invoked)
assert.Equal(t, result, result1) assert.Equal(t, result, result1)
} }
func Test_StickyNormalWhenError(t *testing.T) { func TestStickyNormalWhenError(t *testing.T) {
invokers := []protocol.Invoker{} invokers := []protocol.Invoker{}
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
url, _ := common.NewURL(fmt.Sprintf("dubbo://192.168.1.%v:20000/com.ikurento.user.UserProvider", i)) url, _ := common.NewURL(fmt.Sprintf("dubbo://192.168.1.%v:20000/com.ikurento.user.UserProvider", i))
......
...@@ -31,7 +31,10 @@ func init() { ...@@ -31,7 +31,10 @@ func init() {
extension.SetCluster(broadcast, NewBroadcastCluster) extension.SetCluster(broadcast, NewBroadcastCluster)
} }
// NewBroadcastCluster ... // NewBroadcastCluster returns a broadcast cluster instance.
//
// Calling all providers' broadcast one by one. All errors will be reported.
// It is usually used to notify all providers to update local resource information such as caches or logs.
func NewBroadcastCluster() cluster.Cluster { func NewBroadcastCluster() cluster.Cluster {
return &broadcastCluster{} return &broadcastCluster{}
} }
......
...@@ -20,6 +20,7 @@ package cluster_impl ...@@ -20,6 +20,7 @@ package cluster_impl
import ( import (
"context" "context"
"errors" "errors"
"fmt"
"testing" "testing"
) )
...@@ -32,6 +33,7 @@ import ( ...@@ -32,6 +33,7 @@ import (
"github.com/apache/dubbo-go/cluster/directory" "github.com/apache/dubbo-go/cluster/directory"
"github.com/apache/dubbo-go/cluster/loadbalance" "github.com/apache/dubbo-go/cluster/loadbalance"
"github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/protocol" "github.com/apache/dubbo-go/protocol"
"github.com/apache/dubbo-go/protocol/invocation" "github.com/apache/dubbo-go/protocol/invocation"
...@@ -39,10 +41,11 @@ import ( ...@@ -39,10 +41,11 @@ import (
) )
var ( var (
broadcastUrl, _ = common.NewURL("dubbo://192.168.1.1:20000/com.ikurento.user.UserProvider") broadcastUrl, _ = common.NewURL(
fmt.Sprintf("dubbo://%s:%d/com.ikurento.user.UserProvider", constant.LOCAL_HOST_VALUE, constant.DEFAULT_PORT))
) )
func registerBroadcast(t *testing.T, mockInvokers ...*mock.MockInvoker) protocol.Invoker { func registerBroadcast(mockInvokers ...*mock.MockInvoker) protocol.Invoker {
extension.SetLoadbalance("random", loadbalance.NewRandomLoadBalance) extension.SetLoadbalance("random", loadbalance.NewRandomLoadBalance)
invokers := []protocol.Invoker{} invokers := []protocol.Invoker{}
...@@ -59,7 +62,7 @@ func registerBroadcast(t *testing.T, mockInvokers ...*mock.MockInvoker) protocol ...@@ -59,7 +62,7 @@ func registerBroadcast(t *testing.T, mockInvokers ...*mock.MockInvoker) protocol
return clusterInvoker return clusterInvoker
} }
func Test_BroadcastInvokeSuccess(t *testing.T) { func TestBroadcastInvokeSuccess(t *testing.T) {
ctrl := gomock.NewController(t) ctrl := gomock.NewController(t)
defer ctrl.Finish() defer ctrl.Finish()
...@@ -72,13 +75,13 @@ func Test_BroadcastInvokeSuccess(t *testing.T) { ...@@ -72,13 +75,13 @@ func Test_BroadcastInvokeSuccess(t *testing.T) {
invoker.EXPECT().Invoke(gomock.Any()).Return(mockResult) invoker.EXPECT().Invoke(gomock.Any()).Return(mockResult)
} }
clusterInvoker := registerBroadcast(t, invokers...) clusterInvoker := registerBroadcast(invokers...)
result := clusterInvoker.Invoke(context.Background(), &invocation.RPCInvocation{}) result := clusterInvoker.Invoke(context.Background(), &invocation.RPCInvocation{})
assert.Equal(t, mockResult, result) assert.Equal(t, mockResult, result)
} }
func Test_BroadcastInvokeFailed(t *testing.T) { func TestBroadcastInvokeFailed(t *testing.T) {
ctrl := gomock.NewController(t) ctrl := gomock.NewController(t)
defer ctrl.Finish() defer ctrl.Finish()
...@@ -102,7 +105,7 @@ func Test_BroadcastInvokeFailed(t *testing.T) { ...@@ -102,7 +105,7 @@ func Test_BroadcastInvokeFailed(t *testing.T) {
invoker.EXPECT().Invoke(gomock.Any()).Return(mockResult) invoker.EXPECT().Invoke(gomock.Any()).Return(mockResult)
} }
clusterInvoker := registerBroadcast(t, invokers...) clusterInvoker := registerBroadcast(invokers...)
result := clusterInvoker.Invoke(context.Background(), &invocation.RPCInvocation{}) result := clusterInvoker.Invoke(context.Background(), &invocation.RPCInvocation{})
assert.Equal(t, mockFailedResult.Err, result.Error()) assert.Equal(t, mockFailedResult.Err, result.Error())
......
...@@ -31,7 +31,10 @@ func init() { ...@@ -31,7 +31,10 @@ func init() {
extension.SetCluster(failback, NewFailbackCluster) extension.SetCluster(failback, NewFailbackCluster)
} }
// NewFailbackCluster ... // NewFailbackCluster returns a failback cluster instance
//
// Failure automatically restored, failed to record the background request,
// regular retransmission. Usually used for message notification operations.
func NewFailbackCluster() cluster.Cluster { func NewFailbackCluster() cluster.Cluster {
return &failbackCluster{} return &failbackCluster{}
} }
......
...@@ -19,6 +19,7 @@ package cluster_impl ...@@ -19,6 +19,7 @@ package cluster_impl
import ( import (
"context" "context"
"fmt"
"sync" "sync"
"testing" "testing"
"time" "time"
...@@ -34,6 +35,7 @@ import ( ...@@ -34,6 +35,7 @@ import (
"github.com/apache/dubbo-go/cluster/directory" "github.com/apache/dubbo-go/cluster/directory"
"github.com/apache/dubbo-go/cluster/loadbalance" "github.com/apache/dubbo-go/cluster/loadbalance"
"github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/protocol" "github.com/apache/dubbo-go/protocol"
"github.com/apache/dubbo-go/protocol/invocation" "github.com/apache/dubbo-go/protocol/invocation"
...@@ -41,11 +43,12 @@ import ( ...@@ -41,11 +43,12 @@ import (
) )
var ( var (
failbackUrl, _ = common.NewURL("dubbo://192.168.1.1:20000/com.ikurento.user.UserProvider") failbackUrl, _ = common.NewURL(
fmt.Sprintf("dubbo://%s:%d/com.ikurento.user.UserProvider", constant.LOCAL_HOST_VALUE, constant.DEFAULT_PORT))
) )
// registerFailback register failbackCluster to cluster extension. // registerFailback register failbackCluster to cluster extension.
func registerFailback(t *testing.T, invoker *mock.MockInvoker) protocol.Invoker { func registerFailback(invoker *mock.MockInvoker) protocol.Invoker {
extension.SetLoadbalance("random", loadbalance.NewRandomLoadBalance) extension.SetLoadbalance("random", loadbalance.NewRandomLoadBalance)
failbackCluster := NewFailbackCluster() failbackCluster := NewFailbackCluster()
...@@ -60,12 +63,12 @@ func registerFailback(t *testing.T, invoker *mock.MockInvoker) protocol.Invoker ...@@ -60,12 +63,12 @@ func registerFailback(t *testing.T, invoker *mock.MockInvoker) protocol.Invoker
} }
// success firstly, failback should return origin invoke result. // success firstly, failback should return origin invoke result.
func Test_FailbackSuceess(t *testing.T) { func TestFailbackSuceess(t *testing.T) {
ctrl := gomock.NewController(t) ctrl := gomock.NewController(t)
defer ctrl.Finish() defer ctrl.Finish()
invoker := mock.NewMockInvoker(ctrl) invoker := mock.NewMockInvoker(ctrl)
clusterInvoker := registerFailback(t, invoker).(*failbackClusterInvoker) clusterInvoker := registerFailback(invoker).(*failbackClusterInvoker)
invoker.EXPECT().GetUrl().Return(failbackUrl).AnyTimes() invoker.EXPECT().GetUrl().Return(failbackUrl).AnyTimes()
...@@ -77,12 +80,12 @@ func Test_FailbackSuceess(t *testing.T) { ...@@ -77,12 +80,12 @@ func Test_FailbackSuceess(t *testing.T) {
} }
// failed firstly, success later after one retry. // failed firstly, success later after one retry.
func Test_FailbackRetryOneSuccess(t *testing.T) { func TestFailbackRetryOneSuccess(t *testing.T) {
ctrl := gomock.NewController(t) ctrl := gomock.NewController(t)
defer ctrl.Finish() defer ctrl.Finish()
invoker := mock.NewMockInvoker(ctrl) invoker := mock.NewMockInvoker(ctrl)
clusterInvoker := registerFailback(t, invoker).(*failbackClusterInvoker) clusterInvoker := registerFailback(invoker).(*failbackClusterInvoker)
invoker.EXPECT().GetUrl().Return(failbackUrl).AnyTimes() invoker.EXPECT().GetUrl().Return(failbackUrl).AnyTimes()
...@@ -95,7 +98,7 @@ func Test_FailbackRetryOneSuccess(t *testing.T) { ...@@ -95,7 +98,7 @@ func Test_FailbackRetryOneSuccess(t *testing.T) {
wg.Add(1) wg.Add(1)
now := time.Now() now := time.Now()
mockSuccResult := &protocol.RPCResult{Rest: rest{tried: 0, success: true}} mockSuccResult := &protocol.RPCResult{Rest: rest{tried: 0, success: true}}
invoker.EXPECT().Invoke(gomock.Any()).DoAndReturn(func(invocation protocol.Invocation) protocol.Result { invoker.EXPECT().Invoke(gomock.Any()).DoAndReturn(func(protocol.Invocation) protocol.Result {
delta := time.Since(now).Nanoseconds() / int64(time.Second) delta := time.Since(now).Nanoseconds() / int64(time.Second)
assert.True(t, delta >= 5) assert.True(t, delta >= 5)
wg.Done() wg.Done()
...@@ -120,12 +123,12 @@ func Test_FailbackRetryOneSuccess(t *testing.T) { ...@@ -120,12 +123,12 @@ func Test_FailbackRetryOneSuccess(t *testing.T) {
} }
// failed firstly, and failed again after ech retry time. // failed firstly, and failed again after ech retry time.
func Test_FailbackRetryFailed(t *testing.T) { func TestFailbackRetryFailed(t *testing.T) {
ctrl := gomock.NewController(t) ctrl := gomock.NewController(t)
defer ctrl.Finish() defer ctrl.Finish()
invoker := mock.NewMockInvoker(ctrl) invoker := mock.NewMockInvoker(ctrl)
clusterInvoker := registerFailback(t, invoker).(*failbackClusterInvoker) clusterInvoker := registerFailback(invoker).(*failbackClusterInvoker)
invoker.EXPECT().GetUrl().Return(failbackUrl).AnyTimes() invoker.EXPECT().GetUrl().Return(failbackUrl).AnyTimes()
...@@ -141,7 +144,7 @@ func Test_FailbackRetryFailed(t *testing.T) { ...@@ -141,7 +144,7 @@ func Test_FailbackRetryFailed(t *testing.T) {
// add retry call that eventually failed. // add retry call that eventually failed.
for i := 0; i < retries; i++ { for i := 0; i < retries; i++ {
j := i + 1 j := i + 1
invoker.EXPECT().Invoke(gomock.Any()).DoAndReturn(func(invocation protocol.Invocation) protocol.Result { invoker.EXPECT().Invoke(gomock.Any()).DoAndReturn(func(protocol.Invocation) protocol.Result {
delta := time.Since(now).Nanoseconds() / int64(time.Second) delta := time.Since(now).Nanoseconds() / int64(time.Second)
assert.True(t, delta >= int64(5*j)) assert.True(t, delta >= int64(5*j))
wg.Done() wg.Done()
...@@ -166,12 +169,12 @@ func Test_FailbackRetryFailed(t *testing.T) { ...@@ -166,12 +169,12 @@ func Test_FailbackRetryFailed(t *testing.T) {
} }
// add 10 tasks but all failed firstly, and failed again with one retry. // add 10 tasks but all failed firstly, and failed again with one retry.
func Test_FailbackRetryFailed10Times(t *testing.T) { func TestFailbackRetryFailed10Times(t *testing.T) {
ctrl := gomock.NewController(t) ctrl := gomock.NewController(t)
defer ctrl.Finish() defer ctrl.Finish()
invoker := mock.NewMockInvoker(ctrl) invoker := mock.NewMockInvoker(ctrl)
clusterInvoker := registerFailback(t, invoker).(*failbackClusterInvoker) clusterInvoker := registerFailback(invoker).(*failbackClusterInvoker)
clusterInvoker.maxRetries = 10 clusterInvoker.maxRetries = 10
invoker.EXPECT().GetUrl().Return(failbackUrl).AnyTimes() invoker.EXPECT().GetUrl().Return(failbackUrl).AnyTimes()
...@@ -184,7 +187,7 @@ func Test_FailbackRetryFailed10Times(t *testing.T) { ...@@ -184,7 +187,7 @@ func Test_FailbackRetryFailed10Times(t *testing.T) {
var wg sync.WaitGroup var wg sync.WaitGroup
wg.Add(10) wg.Add(10)
now := time.Now() now := time.Now()
invoker.EXPECT().Invoke(gomock.Any()).DoAndReturn(func(invocation protocol.Invocation) protocol.Result { invoker.EXPECT().Invoke(gomock.Any()).DoAndReturn(func(protocol.Invocation) protocol.Result {
delta := time.Since(now).Nanoseconds() / int64(time.Second) delta := time.Since(now).Nanoseconds() / int64(time.Second)
assert.True(t, delta >= 5) assert.True(t, delta >= 5)
wg.Done() wg.Done()
...@@ -208,12 +211,12 @@ func Test_FailbackRetryFailed10Times(t *testing.T) { ...@@ -208,12 +211,12 @@ func Test_FailbackRetryFailed10Times(t *testing.T) {
assert.Equal(t, int64(0), clusterInvoker.taskList.Len()) assert.Equal(t, int64(0), clusterInvoker.taskList.Len())
} }
func Test_FailbackOutOfLimit(t *testing.T) { func TestFailbackOutOfLimit(t *testing.T) {
ctrl := gomock.NewController(t) ctrl := gomock.NewController(t)
defer ctrl.Finish() defer ctrl.Finish()
invoker := mock.NewMockInvoker(ctrl) invoker := mock.NewMockInvoker(ctrl)
clusterInvoker := registerFailback(t, invoker).(*failbackClusterInvoker) clusterInvoker := registerFailback(invoker).(*failbackClusterInvoker)
clusterInvoker.failbackTasks = 1 clusterInvoker.failbackTasks = 1
invoker.EXPECT().GetUrl().Return(failbackUrl).AnyTimes() invoker.EXPECT().GetUrl().Return(failbackUrl).AnyTimes()
......
...@@ -31,7 +31,10 @@ func init() { ...@@ -31,7 +31,10 @@ func init() {
extension.SetCluster(failfast, NewFailFastCluster) extension.SetCluster(failfast, NewFailFastCluster)
} }
// NewFailFastCluster ... // NewFailFastCluster returns a failfast cluster instance.
//
// Fast failure, only made a call, failure immediately error. Usually used for non-idempotent write operations,
// such as adding records.
func NewFailFastCluster() cluster.Cluster { func NewFailFastCluster() cluster.Cluster {
return &failfastCluster{} return &failfastCluster{}
} }
......
...@@ -19,6 +19,7 @@ package cluster_impl ...@@ -19,6 +19,7 @@ package cluster_impl
import ( import (
"context" "context"
"fmt"
"testing" "testing"
) )
...@@ -32,6 +33,7 @@ import ( ...@@ -32,6 +33,7 @@ import (
"github.com/apache/dubbo-go/cluster/directory" "github.com/apache/dubbo-go/cluster/directory"
"github.com/apache/dubbo-go/cluster/loadbalance" "github.com/apache/dubbo-go/cluster/loadbalance"
"github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/protocol" "github.com/apache/dubbo-go/protocol"
"github.com/apache/dubbo-go/protocol/invocation" "github.com/apache/dubbo-go/protocol/invocation"
...@@ -39,11 +41,12 @@ import ( ...@@ -39,11 +41,12 @@ import (
) )
var ( var (
failfastUrl, _ = common.NewURL("dubbo://192.168.1.1:20000/com.ikurento.user.UserProvider") failfastUrl, _ = common.NewURL(
fmt.Sprintf("dubbo://%s:%d/com.ikurento.user.UserProvider", constant.LOCAL_HOST_VALUE, constant.DEFAULT_PORT))
) )
// registerFailfast register failfastCluster to cluster extension. // registerFailfast register failfastCluster to cluster extension.
func registerFailfast(t *testing.T, invoker *mock.MockInvoker) protocol.Invoker { func registerFailfast(invoker *mock.MockInvoker) protocol.Invoker {
extension.SetLoadbalance("random", loadbalance.NewRandomLoadBalance) extension.SetLoadbalance("random", loadbalance.NewRandomLoadBalance)
failfastCluster := NewFailFastCluster() failfastCluster := NewFailFastCluster()
...@@ -57,12 +60,12 @@ func registerFailfast(t *testing.T, invoker *mock.MockInvoker) protocol.Invoker ...@@ -57,12 +60,12 @@ func registerFailfast(t *testing.T, invoker *mock.MockInvoker) protocol.Invoker
return clusterInvoker return clusterInvoker
} }
func Test_FailfastInvokeSuccess(t *testing.T) { func TestFailfastInvokeSuccess(t *testing.T) {
ctrl := gomock.NewController(t) ctrl := gomock.NewController(t)
defer ctrl.Finish() defer ctrl.Finish()
invoker := mock.NewMockInvoker(ctrl) invoker := mock.NewMockInvoker(ctrl)
clusterInvoker := registerFailfast(t, invoker) clusterInvoker := registerFailfast(invoker)
invoker.EXPECT().GetUrl().Return(failfastUrl).AnyTimes() invoker.EXPECT().GetUrl().Return(failfastUrl).AnyTimes()
...@@ -77,12 +80,12 @@ func Test_FailfastInvokeSuccess(t *testing.T) { ...@@ -77,12 +80,12 @@ func Test_FailfastInvokeSuccess(t *testing.T) {
assert.Equal(t, 0, res.tried) assert.Equal(t, 0, res.tried)
} }
func Test_FailfastInvokeFail(t *testing.T) { func TestFailfastInvokeFail(t *testing.T) {
ctrl := gomock.NewController(t) ctrl := gomock.NewController(t)
defer ctrl.Finish() defer ctrl.Finish()
invoker := mock.NewMockInvoker(ctrl) invoker := mock.NewMockInvoker(ctrl)
clusterInvoker := registerFailfast(t, invoker) clusterInvoker := registerFailfast(invoker)
invoker.EXPECT().GetUrl().Return(failfastUrl).AnyTimes() invoker.EXPECT().GetUrl().Return(failfastUrl).AnyTimes()
......
...@@ -31,7 +31,11 @@ func init() { ...@@ -31,7 +31,11 @@ func init() {
extension.SetCluster(name, NewFailoverCluster) extension.SetCluster(name, NewFailoverCluster)
} }
// NewFailoverCluster ... // NewFailoverCluster returns a failover cluster instance
//
// Failure automatically switch, when there is a failure,
// retry the other server (default). Usually used for read operations,
// but retries can result in longer delays.
func NewFailoverCluster() cluster.Cluster { func NewFailoverCluster() cluster.Cluster {
return &failoverCluster{} return &failoverCluster{}
} }
......
...@@ -45,52 +45,35 @@ func newFailoverClusterInvoker(directory cluster.Directory) protocol.Invoker { ...@@ -45,52 +45,35 @@ func newFailoverClusterInvoker(directory cluster.Directory) protocol.Invoker {
} }
func (invoker *failoverClusterInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result { func (invoker *failoverClusterInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result {
var (
result protocol.Result
invoked []protocol.Invoker
providers []string
)
invokers := invoker.directory.List(invocation) invokers := invoker.directory.List(invocation)
err := invoker.checkInvokers(invokers, invocation) if err := invoker.checkInvokers(invokers, invocation); err != nil {
if err != nil {
return &protocol.RPCResult{Err: err} return &protocol.RPCResult{Err: err}
} }
loadbalance := getLoadBalance(invokers[0], invocation)
methodName := invocation.MethodName() methodName := invocation.MethodName()
url := invokers[0].GetUrl() retries := getRetries(invokers, methodName)
loadBalance := getLoadBalance(invokers[0], invocation)
//get reties
retriesConfig := url.GetParam(constant.RETRIES_KEY, constant.DEFAULT_RETRIES)
//Get the service method loadbalance config if have
if v := url.GetMethodParam(methodName, constant.RETRIES_KEY, ""); len(v) != 0 {
retriesConfig = v
}
retries, err := strconv.Atoi(retriesConfig)
if err != nil || retries < 0 {
logger.Error("Your retries config is invalid,pls do a check. And will use the default retries configuration instead.")
retries = constant.DEFAULT_RETRIES_INT
}
invoked := []protocol.Invoker{}
providers := []string{}
var result protocol.Result
if retries > len(invokers) {
retries = len(invokers)
}
for i := 0; i <= retries; i++ { for i := 0; i <= retries; i++ {
//Reselect before retry to avoid a change of candidate `invokers`. //Reselect before retry to avoid a change of candidate `invokers`.
//NOTE: if `invokers` changed, then `invoked` also lose accuracy. //NOTE: if `invokers` changed, then `invoked` also lose accuracy.
if i > 0 { if i > 0 {
err := invoker.checkWhetherDestroyed() if err := invoker.checkWhetherDestroyed(); err != nil {
if err != nil {
return &protocol.RPCResult{Err: err} return &protocol.RPCResult{Err: err}
} }
invokers = invoker.directory.List(invocation) invokers = invoker.directory.List(invocation)
err = invoker.checkInvokers(invokers, invocation) if err := invoker.checkInvokers(invokers, invocation); err != nil {
if err != nil {
return &protocol.RPCResult{Err: err} return &protocol.RPCResult{Err: err}
} }
} }
ivk := invoker.doSelect(loadbalance, invocation, invokers, invoked) ivk := invoker.doSelect(loadBalance, invocation, invokers, invoked)
if ivk == nil { if ivk == nil {
continue continue
} }
...@@ -100,13 +83,40 @@ func (invoker *failoverClusterInvoker) Invoke(ctx context.Context, invocation pr ...@@ -100,13 +83,40 @@ func (invoker *failoverClusterInvoker) Invoke(ctx context.Context, invocation pr
if result.Error() != nil { if result.Error() != nil {
providers = append(providers, ivk.GetUrl().Key()) providers = append(providers, ivk.GetUrl().Key())
continue continue
} else {
return result
} }
return result
} }
ip, _ := gxnet.GetLocalIP() ip, _ := gxnet.GetLocalIP()
return &protocol.RPCResult{Err: perrors.Errorf("Failed to invoke the method %v in the service %v. Tried %v times of "+ invokerSvc := invoker.GetUrl().Service()
"the providers %v (%v/%v)from the registry %v on the consumer %v using the dubbo version %v. Last error is %v.", invokerUrl := invoker.directory.GetUrl()
methodName, invoker.GetUrl().Service(), retries, providers, len(providers), len(invokers), invoker.directory.GetUrl(), ip, constant.Version, result.Error().Error(), return &protocol.RPCResult{
)} Err: perrors.Errorf("Failed to invoke the method %v in the service %v. Tried %v times of the providers %v (%v/%v)from the registry %v on the consumer %v using the dubbo version %v. Last error is %v.",
methodName, invokerSvc, retries, providers, len(providers), len(invokers), invokerUrl, ip, constant.Version, result.Error().Error(),
)}
}
func getRetries(invokers []protocol.Invoker, methodName string) int {
if len(invokers) <= 0 {
return constant.DEFAULT_RETRIES_INT
}
url := invokers[0].GetUrl()
//get reties
retriesConfig := url.GetParam(constant.RETRIES_KEY, constant.DEFAULT_RETRIES)
//Get the service method loadbalance config if have
if v := url.GetMethodParam(methodName, constant.RETRIES_KEY, ""); len(v) != 0 {
retriesConfig = v
}
retries, err := strconv.Atoi(retriesConfig)
if err != nil || retries < 0 {
logger.Error("Your retries config is invalid,pls do a check. And will use the default retries configuration instead.")
retries = constant.DEFAULT_RETRIES_INT
}
if retries > len(invokers) {
retries = len(invokers)
}
return retries
} }
...@@ -101,7 +101,7 @@ func (bi *MockInvoker) Destroy() { ...@@ -101,7 +101,7 @@ func (bi *MockInvoker) Destroy() {
var count int var count int
func normalInvoke(t *testing.T, successCount int, urlParam url.Values, invocations ...*invocation.RPCInvocation) protocol.Result { func normalInvoke(successCount int, urlParam url.Values, invocations ...*invocation.RPCInvocation) protocol.Result {
extension.SetLoadbalance("random", loadbalance.NewRandomLoadBalance) extension.SetLoadbalance("random", loadbalance.NewRandomLoadBalance)
failoverCluster := NewFailoverCluster() failoverCluster := NewFailoverCluster()
...@@ -119,40 +119,40 @@ func normalInvoke(t *testing.T, successCount int, urlParam url.Values, invocatio ...@@ -119,40 +119,40 @@ func normalInvoke(t *testing.T, successCount int, urlParam url.Values, invocatio
return clusterInvoker.Invoke(context.Background(), &invocation.RPCInvocation{}) return clusterInvoker.Invoke(context.Background(), &invocation.RPCInvocation{})
} }
func Test_FailoverInvokeSuccess(t *testing.T) { func TestFailoverInvokeSuccess(t *testing.T) {
urlParams := url.Values{} urlParams := url.Values{}
result := normalInvoke(t, 3, urlParams) result := normalInvoke(3, urlParams)
assert.NoError(t, result.Error()) assert.NoError(t, result.Error())
count = 0 count = 0
} }
func Test_FailoverInvokeFail(t *testing.T) { func TestFailoverInvokeFail(t *testing.T) {
urlParams := url.Values{} urlParams := url.Values{}
result := normalInvoke(t, 4, urlParams) result := normalInvoke(4, urlParams)
assert.Errorf(t, result.Error(), "error") assert.Errorf(t, result.Error(), "error")
count = 0 count = 0
} }
func Test_FailoverInvoke1(t *testing.T) { func TestFailoverInvoke1(t *testing.T) {
urlParams := url.Values{} urlParams := url.Values{}
urlParams.Set(constant.RETRIES_KEY, "3") urlParams.Set(constant.RETRIES_KEY, "3")
result := normalInvoke(t, 4, urlParams) result := normalInvoke(4, urlParams)
assert.NoError(t, result.Error()) assert.NoError(t, result.Error())
count = 0 count = 0
} }
func Test_FailoverInvoke2(t *testing.T) { func TestFailoverInvoke2(t *testing.T) {
urlParams := url.Values{} urlParams := url.Values{}
urlParams.Set(constant.RETRIES_KEY, "2") urlParams.Set(constant.RETRIES_KEY, "2")
urlParams.Set("methods.test."+constant.RETRIES_KEY, "3") urlParams.Set("methods.test."+constant.RETRIES_KEY, "3")
ivc := invocation.NewRPCInvocationWithOptions(invocation.WithMethodName("test")) ivc := invocation.NewRPCInvocationWithOptions(invocation.WithMethodName("test"))
result := normalInvoke(t, 4, urlParams, ivc) result := normalInvoke(4, urlParams, ivc)
assert.NoError(t, result.Error()) assert.NoError(t, result.Error())
count = 0 count = 0
} }
func Test_FailoverDestroy(t *testing.T) { func TestFailoverDestroy(t *testing.T) {
extension.SetLoadbalance("random", loadbalance.NewRandomLoadBalance) extension.SetLoadbalance("random", loadbalance.NewRandomLoadBalance)
failoverCluster := NewFailoverCluster() failoverCluster := NewFailoverCluster()
...@@ -170,5 +170,4 @@ func Test_FailoverDestroy(t *testing.T) { ...@@ -170,5 +170,4 @@ func Test_FailoverDestroy(t *testing.T) {
count = 0 count = 0
clusterInvoker.Destroy() clusterInvoker.Destroy()
assert.Equal(t, false, clusterInvoker.IsAvailable()) assert.Equal(t, false, clusterInvoker.IsAvailable())
} }
...@@ -31,7 +31,10 @@ func init() { ...@@ -31,7 +31,10 @@ func init() {
extension.SetCluster(failsafe, NewFailsafeCluster) extension.SetCluster(failsafe, NewFailsafeCluster)
} }
// NewFailsafeCluster ... // NewFailsafeCluster returns a failsafe cluster instance.
//
// Failure of security, anomalies, directly ignored. Usually it is
// used to write audit logs and other operations.
func NewFailsafeCluster() cluster.Cluster { func NewFailsafeCluster() cluster.Cluster {
return &failsafeCluster{} return &failsafeCluster{}
} }
......
...@@ -19,6 +19,7 @@ package cluster_impl ...@@ -19,6 +19,7 @@ package cluster_impl
import ( import (
"context" "context"
"fmt"
"testing" "testing"
) )
...@@ -32,6 +33,7 @@ import ( ...@@ -32,6 +33,7 @@ import (
"github.com/apache/dubbo-go/cluster/directory" "github.com/apache/dubbo-go/cluster/directory"
"github.com/apache/dubbo-go/cluster/loadbalance" "github.com/apache/dubbo-go/cluster/loadbalance"
"github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/protocol" "github.com/apache/dubbo-go/protocol"
"github.com/apache/dubbo-go/protocol/invocation" "github.com/apache/dubbo-go/protocol/invocation"
...@@ -39,11 +41,12 @@ import ( ...@@ -39,11 +41,12 @@ import (
) )
var ( var (
failsafeUrl, _ = common.NewURL("dubbo://192.168.1.1:20000/com.ikurento.user.UserProvider") failsafeUrl, _ = common.NewURL(
fmt.Sprintf("dubbo://%s:%d/com.ikurento.user.UserProvider", constant.LOCAL_HOST_VALUE, constant.DEFAULT_PORT))
) )
// registerFailsafe register failsafeCluster to cluster extension. // registerFailsafe register failsafeCluster to cluster extension.
func registerFailsafe(t *testing.T, invoker *mock.MockInvoker) protocol.Invoker { func registerFailsafe(invoker *mock.MockInvoker) protocol.Invoker {
extension.SetLoadbalance("random", loadbalance.NewRandomLoadBalance) extension.SetLoadbalance("random", loadbalance.NewRandomLoadBalance)
failsafeCluster := NewFailsafeCluster() failsafeCluster := NewFailsafeCluster()
...@@ -57,12 +60,12 @@ func registerFailsafe(t *testing.T, invoker *mock.MockInvoker) protocol.Invoker ...@@ -57,12 +60,12 @@ func registerFailsafe(t *testing.T, invoker *mock.MockInvoker) protocol.Invoker
return clusterInvoker return clusterInvoker
} }
func Test_FailSafeInvokeSuccess(t *testing.T) { func TestFailSafeInvokeSuccess(t *testing.T) {
ctrl := gomock.NewController(t) ctrl := gomock.NewController(t)
defer ctrl.Finish() defer ctrl.Finish()
invoker := mock.NewMockInvoker(ctrl) invoker := mock.NewMockInvoker(ctrl)
clusterInvoker := registerFailsafe(t, invoker) clusterInvoker := registerFailsafe(invoker)
invoker.EXPECT().GetUrl().Return(failsafeUrl).AnyTimes() invoker.EXPECT().GetUrl().Return(failsafeUrl).AnyTimes()
...@@ -76,12 +79,12 @@ func Test_FailSafeInvokeSuccess(t *testing.T) { ...@@ -76,12 +79,12 @@ func Test_FailSafeInvokeSuccess(t *testing.T) {
assert.True(t, res.success) assert.True(t, res.success)
} }
func Test_FailSafeInvokeFail(t *testing.T) { func TestFailSafeInvokeFail(t *testing.T) {
ctrl := gomock.NewController(t) ctrl := gomock.NewController(t)
defer ctrl.Finish() defer ctrl.Finish()
invoker := mock.NewMockInvoker(ctrl) invoker := mock.NewMockInvoker(ctrl)
clusterInvoker := registerFailsafe(t, invoker) clusterInvoker := registerFailsafe(invoker)
invoker.EXPECT().GetUrl().Return(failsafeUrl).AnyTimes() invoker.EXPECT().GetUrl().Return(failsafeUrl).AnyTimes()
......
...@@ -31,7 +31,10 @@ func init() { ...@@ -31,7 +31,10 @@ func init() {
extension.SetCluster(forking, NewForkingCluster) extension.SetCluster(forking, NewForkingCluster)
} }
// NewForkingCluster ... // NewForkingCluster returns a forking cluster instance.
//
// Multiple servers are invoked in parallel, returning as soon as one succeeds.
// Usually it is used for real-time demanding read operations while wasting more service resources.
func NewForkingCluster() cluster.Cluster { func NewForkingCluster() cluster.Cluster {
return &forkingCluster{} return &forkingCluster{}
} }
......
...@@ -46,14 +46,12 @@ func newForkingClusterInvoker(directory cluster.Directory) protocol.Invoker { ...@@ -46,14 +46,12 @@ func newForkingClusterInvoker(directory cluster.Directory) protocol.Invoker {
// Invoke ... // Invoke ...
func (invoker *forkingClusterInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result { func (invoker *forkingClusterInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result {
err := invoker.checkWhetherDestroyed() if err := invoker.checkWhetherDestroyed(); err != nil {
if err != nil {
return &protocol.RPCResult{Err: err} return &protocol.RPCResult{Err: err}
} }
invokers := invoker.directory.List(invocation) invokers := invoker.directory.List(invocation)
err = invoker.checkInvokers(invokers, invocation) if err := invoker.checkInvokers(invokers, invocation); err != nil {
if err != nil {
return &protocol.RPCResult{Err: err} return &protocol.RPCResult{Err: err}
} }
...@@ -63,11 +61,9 @@ func (invoker *forkingClusterInvoker) Invoke(ctx context.Context, invocation pro ...@@ -63,11 +61,9 @@ func (invoker *forkingClusterInvoker) Invoke(ctx context.Context, invocation pro
if forks < 0 || forks > len(invokers) { if forks < 0 || forks > len(invokers) {
selected = invokers selected = invokers
} else { } else {
selected = make([]protocol.Invoker, 0) loadBalance := getLoadBalance(invokers[0], invocation)
loadbalance := getLoadBalance(invokers[0], invocation)
for i := 0; i < forks; i++ { for i := 0; i < forks; i++ {
ivk := invoker.doSelect(loadbalance, invocation, invokers, selected) if ivk := invoker.doSelect(loadBalance, invocation, invokers, selected); ivk != nil {
if ivk != nil {
selected = append(selected, ivk) selected = append(selected, ivk)
} }
} }
...@@ -77,8 +73,7 @@ func (invoker *forkingClusterInvoker) Invoke(ctx context.Context, invocation pro ...@@ -77,8 +73,7 @@ func (invoker *forkingClusterInvoker) Invoke(ctx context.Context, invocation pro
for _, ivk := range selected { for _, ivk := range selected {
go func(k protocol.Invoker) { go func(k protocol.Invoker) {
result := k.Invoke(ctx, invocation) result := k.Invoke(ctx, invocation)
err := resultQ.Put(result) if err := resultQ.Put(result); err != nil {
if err != nil {
logger.Errorf("resultQ put failed with exception: %v.\n", err) logger.Errorf("resultQ put failed with exception: %v.\n", err)
} }
}(ivk) }(ivk)
...@@ -99,6 +94,5 @@ func (invoker *forkingClusterInvoker) Invoke(ctx context.Context, invocation pro ...@@ -99,6 +94,5 @@ func (invoker *forkingClusterInvoker) Invoke(ctx context.Context, invocation pro
if !ok { if !ok {
return &protocol.RPCResult{Err: fmt.Errorf("failed to forking invoke provider %v, but not legal resp", selected)} return &protocol.RPCResult{Err: fmt.Errorf("failed to forking invoke provider %v, but not legal resp", selected)}
} }
return result return result
} }
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment