Skip to content
Snippets Groups Projects
Unverified Commit f0017a86 authored by Joe Zou's avatar Joe Zou Committed by GitHub
Browse files

Merge pull request #659 from DogBaoBao/zoneware

Ftr: Nearest first for multiple registry
parents 2aafdeb0 4dfc8aac
No related branches found
No related tags found
No related merge requests found
Showing with 510 additions and 40 deletions
......@@ -17,6 +17,10 @@
package cluster_impl
import (
"context"
)
import (
gxnet "github.com/dubbogo/gost/net"
perrors "github.com/pkg/errors"
......@@ -36,6 +40,7 @@ type baseClusterInvoker struct {
availablecheck bool
destroyed *atomic.Bool
stickyInvoker protocol.Invoker
interceptor cluster.ClusterInterceptor
}
func newBaseClusterInvoker(directory cluster.Directory) baseClusterInvoker {
......@@ -146,6 +151,20 @@ func (invoker *baseClusterInvoker) doSelectInvoker(lb cluster.LoadBalance, invoc
return selectedInvoker
}
func (invoker *baseClusterInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result {
if invoker.interceptor != nil {
invoker.interceptor.BeforeInvoker(ctx, invocation)
result := invoker.interceptor.DoInvoke(ctx, invocation)
invoker.interceptor.AfterInvoker(ctx, invocation)
return result
}
return nil
}
func isInvoked(selectedInvoker protocol.Invoker, invoked []protocol.Invoker) bool {
for _, i := range invoked {
if i == selectedInvoker {
......
......@@ -19,16 +19,15 @@ package cluster_impl
import (
"github.com/apache/dubbo-go/cluster"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/protocol"
)
type failoverCluster struct{}
const name = "failover"
func init() {
extension.SetCluster(name, NewFailoverCluster)
extension.SetCluster(constant.FAILOVER_CLUSTER_NAME, NewFailoverCluster)
}
// NewFailoverCluster returns a failover cluster instance
......
......@@ -19,22 +19,26 @@ package cluster_impl
import (
"github.com/apache/dubbo-go/cluster"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/protocol"
)
type registryAwareCluster struct{}
type zoneAwareCluster struct{}
func init() {
extension.SetCluster("registryAware", NewRegistryAwareCluster)
extension.SetCluster(constant.ZONEAWARE_CLUSTER_NAME, NewZoneAwareCluster)
}
// NewRegistryAwareCluster returns a registry aware cluster instance
func NewRegistryAwareCluster() cluster.Cluster {
return &registryAwareCluster{}
// NewZoneAwareCluster returns a zoneaware cluster instance.
//
// More than one registry for subscription.
// Usually it is used for choose between registries.
func NewZoneAwareCluster() cluster.Cluster {
return &zoneAwareCluster{}
}
// nolint
func (cluster *registryAwareCluster) Join(directory cluster.Directory) protocol.Invoker {
return newRegistryAwareClusterInvoker(directory)
// Join returns a zoneAwareClusterInvoker instance
func (cluster *zoneAwareCluster) Join(directory cluster.Directory) protocol.Invoker {
return newZoneAwareClusterInvoker(directory)
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cluster_impl
import (
"context"
"fmt"
)
import (
"github.com/apache/dubbo-go/cluster"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/protocol"
)
// When there're more than one registry for subscription.
//
// This extension provides a strategy to decide how to distribute traffics among them:
// 1. registry marked as 'preferred=true' has the highest priority.
// 2. check the zone the current request belongs, pick the registry that has the same zone first.
// 3. Evenly balance traffic between all registries based on each registry's weight.
// 4. Pick anyone that's available.
type zoneAwareClusterInvoker struct {
baseClusterInvoker
}
func newZoneAwareClusterInvoker(directory cluster.Directory) protocol.Invoker {
invoke := &zoneAwareClusterInvoker{
baseClusterInvoker: newBaseClusterInvoker(directory),
}
// add self to interceptor
invoke.interceptor = invoke
return invoke
}
// nolint
func (invoker *zoneAwareClusterInvoker) DoInvoke(ctx context.Context, invocation protocol.Invocation) protocol.Result {
invokers := invoker.directory.List(invocation)
err := invoker.checkInvokers(invokers, invocation)
if err != nil {
return &protocol.RPCResult{Err: err}
}
// First, pick the invoker (XXXClusterInvoker) that comes from the local registry, distinguish by a 'preferred' key.
for _, invoker := range invokers {
key := constant.REGISTRY_KEY + "." + constant.PREFERRED_KEY
if invoker.IsAvailable() && matchParam("true", key, "false", invoker) {
return invoker.Invoke(ctx, invocation)
}
}
// providers in the registry with the same zone
key := constant.REGISTRY_KEY + "." + constant.ZONE_KEY
zone := invocation.AttachmentsByKey(key, "")
if "" != zone {
for _, invoker := range invokers {
if invoker.IsAvailable() && matchParam(zone, key, "", invoker) {
return invoker.Invoke(ctx, invocation)
}
}
force := invocation.AttachmentsByKey(constant.REGISTRY_KEY+"."+constant.ZONE_FORCE_KEY, "")
if "true" == force {
return &protocol.RPCResult{
Err: fmt.Errorf("no registry instance in zone or "+
"no available providers in the registry, zone: %v, "+
" registries: %v", zone, invoker.GetUrl()),
}
}
}
// load balance among all registries, with registry weight count in.
loadBalance := getLoadBalance(invokers[0], invocation)
ivk := invoker.doSelect(loadBalance, invocation, invokers, nil)
if ivk != nil && ivk.IsAvailable() {
return ivk.Invoke(ctx, invocation)
}
// If none of the invokers has a preferred signal or is picked by the loadBalancer, pick the first one available.
for _, invoker := range invokers {
if invoker.IsAvailable() {
return invoker.Invoke(ctx, invocation)
}
}
return &protocol.RPCResult{
Err: fmt.Errorf("no provider available in %v", invokers),
}
}
func (invoker *zoneAwareClusterInvoker) BeforeInvoker(ctx context.Context, invocation protocol.Invocation) {
key := constant.REGISTRY_KEY + "." + constant.ZONE_FORCE_KEY
force := ctx.Value(key)
if force != nil {
switch value := force.(type) {
case bool:
if value {
invocation.SetAttachments(key, "true")
}
case string:
if "true" == value {
invocation.SetAttachments(key, "true")
}
default:
// ignore
}
}
}
func (invoker *zoneAwareClusterInvoker) AfterInvoker(ctx context.Context, invocation protocol.Invocation) {
}
func matchParam(target, key, def string, invoker protocol.Invoker) bool {
return target == invoker.GetUrl().GetParam(key, def)
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cluster_impl
import (
"context"
"fmt"
"testing"
)
import (
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
)
import (
"github.com/apache/dubbo-go/cluster/directory"
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/protocol"
"github.com/apache/dubbo-go/protocol/invocation"
"github.com/apache/dubbo-go/protocol/mock"
)
func TestZoneWareInvokerWithPreferredSuccess(t *testing.T) {
ctrl := gomock.NewController(t)
// In Go versions 1.14+, if you pass a *testing.T
// into gomock.NewController(t) you no longer need to call ctrl.Finish().
//defer ctrl.Finish()
mockResult := &protocol.RPCResult{
Attrs: map[string]string{constant.PREFERRED_KEY: "true"},
Rest: rest{tried: 0, success: true}}
var invokers []protocol.Invoker
for i := 0; i < 2; i++ {
url, _ := common.NewURL(fmt.Sprintf("dubbo://192.168.1.%v:20000/com.ikurento.user.UserProvider", i))
invoker := mock.NewMockInvoker(ctrl)
invoker.EXPECT().IsAvailable().Return(true).AnyTimes()
invoker.EXPECT().GetUrl().Return(url).AnyTimes()
if 0 == i {
url.SetParam(constant.REGISTRY_KEY+"."+constant.PREFERRED_KEY, "true")
invoker.EXPECT().Invoke(gomock.Any()).DoAndReturn(
func(invocation protocol.Invocation) protocol.Result {
return mockResult
})
} else {
invoker.EXPECT().Invoke(gomock.Any()).DoAndReturn(
func(invocation protocol.Invocation) protocol.Result {
return &protocol.RPCResult{}
})
}
invokers = append(invokers, invoker)
}
zoneAwareCluster := NewZoneAwareCluster()
staticDir := directory.NewStaticDirectory(invokers)
clusterInvoker := zoneAwareCluster.Join(staticDir)
result := clusterInvoker.Invoke(context.Background(), &invocation.RPCInvocation{})
assert.Equal(t, mockResult, result)
}
func TestZoneWareInvokerWithWeightSuccess(t *testing.T) {
ctrl := gomock.NewController(t)
// In Go versions 1.14+, if you pass a *testing.T
// into gomock.NewController(t) you no longer need to call ctrl.Finish().
//defer ctrl.Finish()
w1 := "50"
w2 := "200"
var invokers []protocol.Invoker
for i := 0; i < 2; i++ {
url, _ := common.NewURL(fmt.Sprintf("dubbo://192.168.1.%v:20000/com.ikurento.user.UserProvider", i))
invoker := mock.NewMockInvoker(ctrl)
invoker.EXPECT().IsAvailable().Return(true).AnyTimes()
invoker.EXPECT().GetUrl().Return(url).AnyTimes()
url.SetParam(constant.REGISTRY_KEY+"."+constant.REGISTRY_LABEL_KEY, "true")
if 1 == i {
url.SetParam(constant.REGISTRY_KEY+"."+constant.WEIGHT_KEY, w1)
invoker.EXPECT().Invoke(gomock.Any()).DoAndReturn(
func(invocation protocol.Invocation) protocol.Result {
return &protocol.RPCResult{
Attrs: map[string]string{constant.WEIGHT_KEY: w1},
Rest: rest{tried: 0, success: true}}
}).MaxTimes(100)
} else {
url.SetParam(constant.REGISTRY_KEY+"."+constant.WEIGHT_KEY, w2)
invoker.EXPECT().Invoke(gomock.Any()).DoAndReturn(
func(invocation protocol.Invocation) protocol.Result {
return &protocol.RPCResult{
Attrs: map[string]string{constant.WEIGHT_KEY: w2},
Rest: rest{tried: 0, success: true}}
}).MaxTimes(100)
}
invokers = append(invokers, invoker)
}
zoneAwareCluster := NewZoneAwareCluster()
staticDir := directory.NewStaticDirectory(invokers)
clusterInvoker := zoneAwareCluster.Join(staticDir)
var w2Count, w1Count int
loop := 50
for i := 0; i < loop; i++ {
result := clusterInvoker.Invoke(context.Background(), &invocation.RPCInvocation{})
if w2 == result.Attachment(constant.WEIGHT_KEY, "0") {
w2Count++
}
if w1 == result.Attachment(constant.WEIGHT_KEY, "0") {
w1Count++
}
assert.NoError(t, result.Error())
}
t.Logf("loop count : %d, w1 height : %s | count : %d, w2 height : %s | count : %d", loop,
w1, w1Count, w2, w2Count)
}
func TestZoneWareInvokerWithZoneSuccess(t *testing.T) {
var zoneArray = []string{"hangzhou", "shanghai"}
ctrl := gomock.NewController(t)
// In Go versions 1.14+, if you pass a *testing.T
// into gomock.NewController(t) you no longer need to call ctrl.Finish().
//defer ctrl.Finish()
var invokers []protocol.Invoker
for i := 0; i < 2; i++ {
zoneValue := zoneArray[i]
url, _ := common.NewURL(fmt.Sprintf("dubbo://192.168.1.%v:20000/com.ikurento.user.UserProvider", i))
url.SetParam(constant.REGISTRY_KEY+"."+constant.ZONE_KEY, zoneValue)
invoker := mock.NewMockInvoker(ctrl)
invoker.EXPECT().IsAvailable().Return(true).AnyTimes()
invoker.EXPECT().GetUrl().Return(url).AnyTimes()
invoker.EXPECT().Invoke(gomock.Any()).DoAndReturn(
func(invocation protocol.Invocation) protocol.Result {
return &protocol.RPCResult{
Attrs: map[string]string{constant.ZONE_KEY: zoneValue},
Rest: rest{tried: 0, success: true}}
})
invokers = append(invokers, invoker)
}
zoneAwareCluster := NewZoneAwareCluster()
staticDir := directory.NewStaticDirectory(invokers)
clusterInvoker := zoneAwareCluster.Join(staticDir)
inv := &invocation.RPCInvocation{}
// zone hangzhou
hz := zoneArray[0]
inv.SetAttachments(constant.REGISTRY_KEY+"."+constant.ZONE_KEY, hz)
result := clusterInvoker.Invoke(context.Background(), inv)
assert.Equal(t, hz, result.Attachment(constant.ZONE_KEY, ""))
}
func TestZoneWareInvokerWithZoneForceFail(t *testing.T) {
ctrl := gomock.NewController(t)
// In Go versions 1.14+, if you pass a *testing.T
// into gomock.NewController(t) you no longer need to call ctrl.Finish().
//defer ctrl.Finish()
var invokers []protocol.Invoker
for i := 0; i < 2; i++ {
url, _ := common.NewURL(fmt.Sprintf("dubbo://192.168.1.%v:20000/com.ikurento.user.UserProvider", i))
invoker := mock.NewMockInvoker(ctrl)
invoker.EXPECT().IsAvailable().Return(true).AnyTimes()
invoker.EXPECT().GetUrl().Return(url).AnyTimes()
invokers = append(invokers, invoker)
}
zoneAwareCluster := NewZoneAwareCluster()
staticDir := directory.NewStaticDirectory(invokers)
clusterInvoker := zoneAwareCluster.Join(staticDir)
inv := &invocation.RPCInvocation{}
// zone hangzhou
inv.SetAttachments(constant.REGISTRY_KEY+"."+constant.ZONE_KEY, "hangzhou")
// zone force
inv.SetAttachments(constant.REGISTRY_KEY+"."+constant.ZONE_FORCE_KEY, "true")
result := clusterInvoker.Invoke(context.Background(), inv)
assert.NotNil(t, result.Error())
}
......@@ -15,42 +15,25 @@
* limitations under the License.
*/
package cluster_impl
package cluster
import (
"context"
)
import (
"github.com/apache/dubbo-go/cluster"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/protocol"
)
type registryAwareClusterInvoker struct {
baseClusterInvoker
}
func newRegistryAwareClusterInvoker(directory cluster.Directory) protocol.Invoker {
return &registryAwareClusterInvoker{
baseClusterInvoker: newBaseClusterInvoker(directory),
}
}
// ClusterInterceptor
// Extension - ClusterInterceptor
type ClusterInterceptor interface {
// Before DoInvoke method
BeforeInvoker(ctx context.Context, invocation protocol.Invocation)
// nolint
func (invoker *registryAwareClusterInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result {
invokers := invoker.directory.List(invocation)
//First, pick the invoker (XXXClusterInvoker) that comes from the local registry, distinguish by a 'default' key.
for _, invoker := range invokers {
if invoker.IsAvailable() && invoker.GetUrl().GetParam(constant.REGISTRY_DEFAULT_KEY, "false") == "true" {
return invoker.Invoke(ctx, invocation)
}
}
// After DoInvoke method
AfterInvoker(ctx context.Context, invocation protocol.Invocation)
//If none of the invokers has a local signal, pick the first one available.
for _, invoker := range invokers {
if invoker.IsAvailable() {
return invoker.Invoke(ctx, invocation)
}
}
return nil
// Corresponding cluster invoke
DoInvoke(ctx context.Context, invocation protocol.Invocation) protocol.Result
}
......@@ -28,23 +28,35 @@ import (
// GetWeight gets weight for load balance strategy
func GetWeight(invoker protocol.Invoker, invocation protocol.Invocation) int64 {
var weight int64
url := invoker.GetUrl()
weight := url.GetMethodParamInt64(invocation.MethodName(), constant.WEIGHT_KEY, constant.DEFAULT_WEIGHT)
// Multiple registry scenario, load balance among multiple registries.
isRegIvk := url.GetParamBool(constant.REGISTRY_KEY+"."+constant.REGISTRY_LABEL_KEY, false)
if isRegIvk {
weight = url.GetParamInt(constant.REGISTRY_KEY+"."+constant.WEIGHT_KEY, constant.DEFAULT_WEIGHT)
} else {
weight = url.GetMethodParamInt64(invocation.MethodName(), constant.WEIGHT_KEY, constant.DEFAULT_WEIGHT)
if weight > 0 {
//get service register time an do warm up time
now := time.Now().Unix()
timestamp := url.GetParamInt(constant.REMOTE_TIMESTAMP_KEY, now)
if uptime := now - timestamp; uptime > 0 {
warmup := url.GetParamInt(constant.WARMUP_KEY, constant.DEFAULT_WARMUP)
if uptime < warmup {
if ww := float64(uptime) / float64(warmup) / float64(weight); ww < 1 {
weight = 1
} else if int64(ww) <= weight {
weight = int64(ww)
if weight > 0 {
//get service register time an do warm up time
now := time.Now().Unix()
timestamp := url.GetParamInt(constant.REMOTE_TIMESTAMP_KEY, now)
if uptime := now - timestamp; uptime > 0 {
warmup := url.GetParamInt(constant.WARMUP_KEY, constant.DEFAULT_WARMUP)
if uptime < warmup {
if ww := float64(uptime) / float64(warmup) / float64(weight); ww < 1 {
weight = 1
} else if int64(ww) <= weight {
weight = int64(ww)
}
}
}
}
}
if weight < 0 {
weight = 0
}
return weight
}
......@@ -15,57 +15,10 @@
* limitations under the License.
*/
package cluster_impl
package constant
import (
"context"
"fmt"
"testing"
// nolint
const (
FAILOVER_CLUSTER_NAME = "failover"
ZONEAWARE_CLUSTER_NAME = "zoneAware"
)
import (
"github.com/stretchr/testify/assert"
)
import (
"github.com/apache/dubbo-go/cluster/directory"
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/protocol"
"github.com/apache/dubbo-go/protocol/invocation"
)
func TestRegAwareInvokeSuccess(t *testing.T) {
regAwareCluster := NewRegistryAwareCluster()
invokers := []protocol.Invoker{}
for i := 0; i < 10; i++ {
url, _ := common.NewURL(fmt.Sprintf("dubbo://192.168.1.%v:20000/com.ikurento.user.UserProvider", i))
invokers = append(invokers, NewMockInvoker(url, 1))
}
staticDir := directory.NewStaticDirectory(invokers)
clusterInvoker := regAwareCluster.Join(staticDir)
result := clusterInvoker.Invoke(context.Background(), &invocation.RPCInvocation{})
assert.NoError(t, result.Error())
count = 0
}
func TestDestroy(t *testing.T) {
regAwareCluster := NewRegistryAwareCluster()
invokers := []protocol.Invoker{}
for i := 0; i < 10; i++ {
url, _ := common.NewURL(fmt.Sprintf("dubbo://192.168.1.%v:20000/com.ikurento.user.UserProvider", i))
invokers = append(invokers, NewMockInvoker(url, 1))
}
staticDir := directory.NewStaticDirectory(invokers)
clusterInvoker := regAwareCluster.Join(staticDir)
assert.Equal(t, true, clusterInvoker.IsAvailable())
result := clusterInvoker.Invoke(context.Background(), &invocation.RPCInvocation{})
assert.NoError(t, result.Error())
count = 0
clusterInvoker.Destroy()
assert.Equal(t, false, clusterInvoker.IsAvailable())
}
......@@ -97,6 +97,10 @@ const (
ROLE_KEY = "registry.role"
REGISTRY_DEFAULT_KEY = "registry.default"
REGISTRY_TIMEOUT_KEY = "registry.timeout"
REGISTRY_LABEL_KEY = "label"
PREFERRED_KEY = "preferred"
ZONE_KEY = "zone"
ZONE_FORCE_KEY = "zone.force"
REGISTRY_TTL_KEY = "registry.ttl"
)
......
......@@ -72,7 +72,7 @@ func TestLoad(t *testing.T) {
SetProviderService(ms)
extension.SetProtocol("registry", GetProtocol)
extension.SetCluster("registryAware", cluster_impl.NewRegistryAwareCluster)
extension.SetCluster(constant.ZONEAWARE_CLUSTER_NAME, cluster_impl.NewZoneAwareCluster)
extension.SetProxyFactory("default", proxy_factory.NewDefaultProxyFactory)
Load()
......@@ -101,7 +101,7 @@ func TestLoadWithSingleReg(t *testing.T) {
SetProviderService(ms)
extension.SetProtocol("registry", GetProtocol)
extension.SetCluster("registryAware", cluster_impl.NewRegistryAwareCluster)
extension.SetCluster(constant.ZONEAWARE_CLUSTER_NAME, cluster_impl.NewZoneAwareCluster)
extension.SetProxyFactory("default", proxy_factory.NewDefaultProxyFactory)
Load()
......@@ -130,7 +130,7 @@ func TestWithNoRegLoad(t *testing.T) {
SetProviderService(ms)
extension.SetProtocol("registry", GetProtocol)
extension.SetCluster("registryAware", cluster_impl.NewRegistryAwareCluster)
extension.SetCluster(constant.ZONEAWARE_CLUSTER_NAME, cluster_impl.NewZoneAwareCluster)
extension.SetProxyFactory("default", proxy_factory.NewDefaultProxyFactory)
Load()
......
......@@ -147,13 +147,26 @@ func (c *ReferenceConfig) Refer(_ interface{}) {
}
// TODO(decouple from directory, config should not depend on directory module)
var hitClu string
if regUrl != nil {
cluster := extension.GetCluster("registryAware")
c.invoker = cluster.Join(directory.NewStaticDirectory(invokers))
// for multi-subscription scenario, use 'zone-aware' policy by default
hitClu = constant.ZONEAWARE_CLUSTER_NAME
} else {
cluster := extension.GetCluster(c.Cluster)
c.invoker = cluster.Join(directory.NewStaticDirectory(invokers))
// not a registry url, must be direct invoke.
hitClu = constant.FAILOVER_CLUSTER_NAME
if len(invokers) > 0 {
u := invokers[0].GetUrl()
if nil != &u {
hitClu = u.GetParam(constant.CLUSTER_KEY, constant.ZONEAWARE_CLUSTER_NAME)
}
}
}
cluster := extension.GetCluster(hitClu)
// If 'zone-aware' policy select, the invoker wrap sequence would be:
// ZoneAwareClusterInvoker(StaticDirectory) ->
// FailoverClusterInvoker(RegistryDirectory, routing happens here) -> Invoker
c.invoker = cluster.Join(directory.NewStaticDirectory(invokers))
}
// create proxy
......
......@@ -187,10 +187,10 @@ func doInitConsumerWithSingleRegistry() {
}
}
func TestReferMultireg(t *testing.T) {
func TestReferMultiReg(t *testing.T) {
doInitConsumer()
extension.SetProtocol("registry", GetProtocol)
extension.SetCluster("registryAware", cluster_impl.NewRegistryAwareCluster)
extension.SetCluster(constant.ZONEAWARE_CLUSTER_NAME, cluster_impl.NewZoneAwareCluster)
for _, reference := range consumerConfig.References {
reference.Refer(nil)
......@@ -203,7 +203,7 @@ func TestReferMultireg(t *testing.T) {
func TestRefer(t *testing.T) {
doInitConsumer()
extension.SetProtocol("registry", GetProtocol)
extension.SetCluster("registryAware", cluster_impl.NewRegistryAwareCluster)
extension.SetCluster(constant.ZONEAWARE_CLUSTER_NAME, cluster_impl.NewZoneAwareCluster)
for _, reference := range consumerConfig.References {
reference.Refer(nil)
......@@ -217,7 +217,7 @@ func TestRefer(t *testing.T) {
func TestReferAsync(t *testing.T) {
doInitConsumerAsync()
extension.SetProtocol("registry", GetProtocol)
extension.SetCluster("registryAware", cluster_impl.NewRegistryAwareCluster)
extension.SetCluster(constant.ZONEAWARE_CLUSTER_NAME, cluster_impl.NewZoneAwareCluster)
for _, reference := range consumerConfig.References {
reference.Refer(nil)
......@@ -275,7 +275,7 @@ func TestReferMultiP2PWithReg(t *testing.T) {
func TestImplement(t *testing.T) {
doInitConsumer()
extension.SetProtocol("registry", GetProtocol)
extension.SetCluster("registryAware", cluster_impl.NewRegistryAwareCluster)
extension.SetCluster(constant.ZONEAWARE_CLUSTER_NAME, cluster_impl.NewZoneAwareCluster)
for _, reference := range consumerConfig.References {
reference.Refer(nil)
reference.Implement(&MockService{})
......
......@@ -41,11 +41,20 @@ type RegistryConfig struct {
Group string `yaml:"group" json:"group,omitempty" property:"group"`
TTL string `yaml:"ttl" default:"10m" json:"ttl,omitempty" property:"ttl"` // unit: minute
// for registry
Address string `yaml:"address" json:"address,omitempty" property:"address"`
Username string `yaml:"username" json:"username,omitempty" property:"username"`
Password string `yaml:"password" json:"password,omitempty" property:"password"`
Simplified bool `yaml:"simplified" json:"simplified,omitempty" property:"simplified"`
Params map[string]string `yaml:"params" json:"params,omitempty" property:"params"`
Address string `yaml:"address" json:"address,omitempty" property:"address"`
Username string `yaml:"username" json:"username,omitempty" property:"username"`
Password string `yaml:"password" json:"password,omitempty" property:"password"`
Simplified bool `yaml:"simplified" json:"simplified,omitempty" property:"simplified"`
// Always use this registry first if set to true, useful when subscribe to multiple registries
Preferred bool `yaml:"preferred" json:"preferred,omitempty" property:"preferred"`
// The region where the registry belongs, usually used to isolate traffics
Zone string `yaml:"zone" json:"zone,omitempty" property:"zone"`
//// Force must user the region, property zone is specified.
//ZoneForce bool `yaml:"zoneForce" json:"zoneForce,omitempty" property:"zoneForce"`
// Affects traffic distribution among registries,
// useful when subscribe to multiple registries Take effect only when no preferred registry is specified.
Weight int64 `yaml:"weight" json:"params,omitempty" property:"weight"`
Params map[string]string `yaml:"params" json:"params,omitempty" property:"params"`
}
// UnmarshalYAML unmarshals the RegistryConfig by @unmarshal function
......@@ -119,6 +128,12 @@ func (c *RegistryConfig) getUrlMap(roleType common.RoleType) url.Values {
urlMap.Set(constant.ROLE_KEY, strconv.Itoa(int(roleType)))
urlMap.Set(constant.REGISTRY_KEY, c.Protocol)
urlMap.Set(constant.REGISTRY_TIMEOUT_KEY, c.TimeoutStr)
// multi registry invoker weight label for load balance
urlMap.Set(constant.REGISTRY_KEY+"."+constant.REGISTRY_LABEL_KEY, strconv.FormatBool(true))
urlMap.Set(constant.REGISTRY_KEY+"."+constant.PREFERRED_KEY, strconv.FormatBool(c.Preferred))
urlMap.Set(constant.REGISTRY_KEY+"."+constant.ZONE_KEY, c.Zone)
//urlMap.Set(constant.REGISTRY_KEY+"."+constant.ZONE_FORCE_KEY, strconv.FormatBool(c.ZoneForce))
urlMap.Set(constant.REGISTRY_KEY+"."+constant.WEIGHT_KEY, strconv.FormatInt(c.Weight, 10))
urlMap.Set(constant.REGISTRY_TTL_KEY, c.TTL)
for k, v := range c.Params {
urlMap.Set(k, v)
......
......@@ -41,6 +41,8 @@ type Invocation interface {
Attributes() map[string]interface{}
// AttributeByKey gets attribute by key , if nil then return default value
AttributeByKey(string, interface{}) interface{}
// SetAttachments sets attribute by @key and @value.
SetAttachments(key string, value string)
// Invoker gets the invoker in current context.
Invoker() Invoker
}
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment