Skip to content
Snippets Groups Projects
Commit b226b13f authored by xg.gao's avatar xg.gao
Browse files

Merge branch 'develop' into consul

parents b5617bcc 04673ca4
No related branches found
No related tags found
No related merge requests found
Showing
with 41 additions and 6 deletions
Apache Dubbo Go
Apache Dubbo-go
Copyright 2018-2020 The Apache Software Foundation
This product includes software developed at
......
......@@ -38,6 +38,7 @@ func NewAvailableCluster() cluster.Cluster {
return &availableCluster{}
}
// Join returns a baseClusterInvoker instance
func (cluser *availableCluster) Join(directory cluster.Directory) protocol.Invoker {
return NewAvailableClusterInvoker(directory)
}
......@@ -39,6 +39,7 @@ func NewBroadcastCluster() cluster.Cluster {
return &broadcastCluster{}
}
// Join returns a baseClusterInvoker instance
func (cluster *broadcastCluster) Join(directory cluster.Directory) protocol.Invoker {
return newBroadcastClusterInvoker(directory)
}
......@@ -36,6 +36,7 @@ func newBroadcastClusterInvoker(directory cluster.Directory) protocol.Invoker {
}
}
// nolint
func (invoker *broadcastClusterInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result {
invokers := invoker.directory.List(invocation)
err := invoker.checkInvokers(invokers, invocation)
......
......@@ -39,6 +39,7 @@ func NewFailbackCluster() cluster.Cluster {
return &failbackCluster{}
}
// Join returns a baseClusterInvoker instance
func (cluster *failbackCluster) Join(directory cluster.Directory) protocol.Invoker {
return newFailbackClusterInvoker(directory)
}
......@@ -126,6 +126,7 @@ func (invoker *failbackClusterInvoker) checkRetry(retryTask *retryTimerTask, err
}
}
// nolint
func (invoker *failbackClusterInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result {
invokers := invoker.directory.List(invocation)
if err := invoker.checkInvokers(invokers, invocation); err != nil {
......
......@@ -39,6 +39,7 @@ func NewFailFastCluster() cluster.Cluster {
return &failfastCluster{}
}
// Join returns a baseClusterInvoker instance
func (cluster *failfastCluster) Join(directory cluster.Directory) protocol.Invoker {
return newFailFastClusterInvoker(directory)
}
......@@ -35,6 +35,7 @@ func newFailFastClusterInvoker(directory cluster.Directory) protocol.Invoker {
}
}
// nolint
func (invoker *failfastClusterInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result {
invokers := invoker.directory.List(invocation)
err := invoker.checkInvokers(invokers, invocation)
......
......@@ -40,6 +40,7 @@ func NewFailoverCluster() cluster.Cluster {
return &failoverCluster{}
}
// Join returns a baseClusterInvoker instance
func (cluster *failoverCluster) Join(directory cluster.Directory) protocol.Invoker {
return newFailoverClusterInvoker(directory)
}
......@@ -19,6 +19,7 @@ package cluster_impl
import (
"context"
"fmt"
"strconv"
)
......@@ -44,6 +45,7 @@ func newFailoverClusterInvoker(directory cluster.Directory) protocol.Invoker {
}
}
// nolint
func (invoker *failoverClusterInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result {
var (
result protocol.Result
......@@ -91,8 +93,10 @@ func (invoker *failoverClusterInvoker) Invoke(ctx context.Context, invocation pr
invokerSvc := invoker.GetUrl().Service()
invokerUrl := invoker.directory.GetUrl()
return &protocol.RPCResult{
Err: perrors.Errorf("Failed to invoke the method %v in the service %v. Tried %v times of the providers %v (%v/%v)from the registry %v on the consumer %v using the dubbo version %v. Last error is %v.",
methodName, invokerSvc, retries, providers, len(providers), len(invokers), invokerUrl, ip, constant.Version, result.Error().Error(),
Err: perrors.Wrap(result.Error(), fmt.Sprintf("Failed to invoke the method %v in the service %v. "+
"Tried %v times of the providers %v (%v/%v)from the registry %v on the consumer %v using the dubbo version %v. "+
"Last error is %+v.", methodName, invokerSvc, retries, providers, len(providers), len(invokers),
invokerUrl, ip, constant.Version, result.Error().Error()),
)}
}
......
......@@ -43,6 +43,7 @@ import (
// mock invoker
// ///////////////////////////
// nolint
type MockInvoker struct {
url common.URL
available bool
......@@ -51,6 +52,7 @@ type MockInvoker struct {
successCount int
}
// nolint
func NewMockInvoker(url common.URL, successCount int) *MockInvoker {
return &MockInvoker{
url: url,
......@@ -60,23 +62,28 @@ func NewMockInvoker(url common.URL, successCount int) *MockInvoker {
}
}
// nolint
func (bi *MockInvoker) GetUrl() common.URL {
return bi.url
}
// nolint
func (bi *MockInvoker) IsAvailable() bool {
return bi.available
}
// nolint
func (bi *MockInvoker) IsDestroyed() bool {
return bi.destroyed
}
// nolint
type rest struct {
tried int
success bool
}
// nolint
func (bi *MockInvoker) Invoke(c context.Context, invocation protocol.Invocation) protocol.Result {
count++
var (
......@@ -93,14 +100,17 @@ func (bi *MockInvoker) Invoke(c context.Context, invocation protocol.Invocation)
return result
}
// nolint
func (bi *MockInvoker) Destroy() {
logger.Infof("Destroy invoker: %v", bi.GetUrl().String())
bi.destroyed = true
bi.available = false
}
// nolint
var count int
// nolint
func normalInvoke(successCount int, urlParam url.Values, invocations ...*invocation.RPCInvocation) protocol.Result {
extension.SetLoadbalance("random", loadbalance.NewRandomLoadBalance)
failoverCluster := NewFailoverCluster()
......@@ -119,6 +129,7 @@ func normalInvoke(successCount int, urlParam url.Values, invocations ...*invocat
return clusterInvoker.Invoke(context.Background(), &invocation.RPCInvocation{})
}
// nolint
func TestFailoverInvokeSuccess(t *testing.T) {
urlParams := url.Values{}
result := normalInvoke(3, urlParams)
......@@ -126,6 +137,7 @@ func TestFailoverInvokeSuccess(t *testing.T) {
count = 0
}
// nolint
func TestFailoverInvokeFail(t *testing.T) {
urlParams := url.Values{}
result := normalInvoke(4, urlParams)
......@@ -133,6 +145,7 @@ func TestFailoverInvokeFail(t *testing.T) {
count = 0
}
// nolint
func TestFailoverInvoke1(t *testing.T) {
urlParams := url.Values{}
urlParams.Set(constant.RETRIES_KEY, "3")
......@@ -141,6 +154,7 @@ func TestFailoverInvoke1(t *testing.T) {
count = 0
}
// nolint
func TestFailoverInvoke2(t *testing.T) {
urlParams := url.Values{}
urlParams.Set(constant.RETRIES_KEY, "2")
......@@ -152,6 +166,7 @@ func TestFailoverInvoke2(t *testing.T) {
count = 0
}
// nolint
func TestFailoverDestroy(t *testing.T) {
extension.SetLoadbalance("random", loadbalance.NewRandomLoadBalance)
failoverCluster := NewFailoverCluster()
......
......@@ -39,6 +39,7 @@ func NewFailsafeCluster() cluster.Cluster {
return &failsafeCluster{}
}
// Join returns a baseClusterInvoker instance
func (cluster *failsafeCluster) Join(directory cluster.Directory) protocol.Invoker {
return newFailsafeClusterInvoker(directory)
}
......@@ -45,6 +45,7 @@ func newFailsafeClusterInvoker(directory cluster.Directory) protocol.Invoker {
}
}
// nolint
func (invoker *failsafeClusterInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result {
invokers := invoker.directory.List(invocation)
......
......@@ -39,6 +39,7 @@ func NewForkingCluster() cluster.Cluster {
return &forkingCluster{}
}
// Join returns a baseClusterInvoker instance
func (cluster *forkingCluster) Join(directory cluster.Directory) protocol.Invoker {
return newForkingClusterInvoker(directory)
}
......@@ -44,7 +44,7 @@ func newForkingClusterInvoker(directory cluster.Directory) protocol.Invoker {
}
}
// Invoke ...
// nolint
func (invoker *forkingClusterInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result {
if err := invoker.checkWhetherDestroyed(); err != nil {
return &protocol.RPCResult{Err: err}
......
......@@ -33,6 +33,7 @@ func NewMockCluster() cluster.Cluster {
return &mockCluster{}
}
// nolint
func (cluster *mockCluster) Join(directory cluster.Directory) protocol.Invoker {
return protocol.NewBaseInvoker(directory.GetUrl())
}
......@@ -34,6 +34,7 @@ func NewRegistryAwareCluster() cluster.Cluster {
return &registryAwareCluster{}
}
// nolint
func (cluster *registryAwareCluster) Join(directory cluster.Directory) protocol.Invoker {
return newRegistryAwareClusterInvoker(directory)
}
......@@ -36,6 +36,7 @@ func newRegistryAwareClusterInvoker(directory cluster.Directory) protocol.Invoke
}
}
// nolint
func (invoker *registryAwareClusterInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result {
invokers := invoker.directory.List(invocation)
//First, pick the invoker (XXXClusterInvoker) that comes from the local registry, distinguish by a 'default' key.
......
......@@ -38,9 +38,9 @@ import (
)
const (
// ConsistentHash ...
// ConsistentHash consistent hash
ConsistentHash = "consistenthash"
// HashNodes ...
// HashNodes hash nodes
HashNodes = "hash.nodes"
// HashArguments key of hash arguments in url
HashArguments = "hash.arguments"
......@@ -157,6 +157,7 @@ func (c *ConsistentHashSelector) selectForKey(hash uint32) protocol.Invoker {
return c.virtualInvokers[c.keys[idx]]
}
// nolint
func (c *ConsistentHashSelector) hash(digest [16]byte, i int) uint32 {
return uint32((digest[3+i*4]&0xFF)<<24) | uint32((digest[2+i*4]&0xFF)<<16) |
uint32((digest[1+i*4]&0xFF)<<8) | uint32(digest[i*4]&0xFF)&0xFFFFFFF
......
......@@ -46,6 +46,7 @@ func NewLeastActiveLoadBalance() cluster.LoadBalance {
return &leastActiveLoadBalance{}
}
// Select gets invoker based on least active load balancing strategy
func (lb *leastActiveLoadBalance) Select(invokers []protocol.Invoker, invocation protocol.Invocation) protocol.Invoker {
count := len(invokers)
if count == 0 {
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment