diff --git a/cluster/cluster_impl/failover_cluster.go b/cluster/cluster_impl/failover_cluster.go index d30a743e034dafabad87381cdaa356e7603b74d1..d02a0c581006b2be9f6440efdb27c9b9470e6b16 100644 --- a/cluster/cluster_impl/failover_cluster.go +++ b/cluster/cluster_impl/failover_cluster.go @@ -43,3 +43,7 @@ func NewFailoverCluster() cluster.Cluster { func (cluster *failoverCluster) Join(directory cluster.Directory) protocol.Invoker { return newFailoverClusterInvoker(directory) } + +func GetFailoverName() string { + return name +} diff --git a/cluster/cluster_impl/registry_aware_cluster_invoker.go b/cluster/cluster_impl/registry_aware_cluster_invoker.go deleted file mode 100644 index cded5bf16432e6b0c590e15b81c28369889a5f88..0000000000000000000000000000000000000000 --- a/cluster/cluster_impl/registry_aware_cluster_invoker.go +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package cluster_impl - -import ( - "context" -) -import ( - "github.com/apache/dubbo-go/cluster" - "github.com/apache/dubbo-go/common/constant" - "github.com/apache/dubbo-go/protocol" -) - -type registryAwareClusterInvoker struct { - baseClusterInvoker -} - -func newRegistryAwareClusterInvoker(directory cluster.Directory) protocol.Invoker { - return ®istryAwareClusterInvoker{ - baseClusterInvoker: newBaseClusterInvoker(directory), - } -} - -func (invoker *registryAwareClusterInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result { - invokers := invoker.directory.List(invocation) - //First, pick the invoker (XXXClusterInvoker) that comes from the local registry, distinguish by a 'default' key. - for _, invoker := range invokers { - if invoker.IsAvailable() && invoker.GetUrl().GetParam(constant.REGISTRY_DEFAULT_KEY, "false") == "true" { - return invoker.Invoke(ctx, invocation) - } - } - - //If none of the invokers has a local signal, pick the first one available. - for _, invoker := range invokers { - if invoker.IsAvailable() { - return invoker.Invoke(ctx, invocation) - } - } - return nil -} diff --git a/cluster/cluster_impl/registry_aware_cluster_test.go b/cluster/cluster_impl/registry_aware_cluster_test.go deleted file mode 100644 index 74584b44800fce3342956f4237a63ffbbabf5544..0000000000000000000000000000000000000000 --- a/cluster/cluster_impl/registry_aware_cluster_test.go +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package cluster_impl - -import ( - "context" - "fmt" - "testing" -) -import ( - "github.com/stretchr/testify/assert" -) - -import ( - "github.com/apache/dubbo-go/cluster/directory" - "github.com/apache/dubbo-go/common" - "github.com/apache/dubbo-go/protocol" - "github.com/apache/dubbo-go/protocol/invocation" -) - -func TestRegAwareInvokeSuccess(t *testing.T) { - - regAwareCluster := NewRegistryAwareCluster() - - invokers := []protocol.Invoker{} - for i := 0; i < 10; i++ { - url, _ := common.NewURL(fmt.Sprintf("dubbo://192.168.1.%v:20000/com.ikurento.user.UserProvider", i)) - invokers = append(invokers, NewMockInvoker(url, 1)) - } - - staticDir := directory.NewStaticDirectory(invokers) - clusterInvoker := regAwareCluster.Join(staticDir) - result := clusterInvoker.Invoke(context.Background(), &invocation.RPCInvocation{}) - assert.NoError(t, result.Error()) - count = 0 -} - -func TestDestroy(t *testing.T) { - regAwareCluster := NewRegistryAwareCluster() - - invokers := []protocol.Invoker{} - for i := 0; i < 10; i++ { - url, _ := common.NewURL(fmt.Sprintf("dubbo://192.168.1.%v:20000/com.ikurento.user.UserProvider", i)) - invokers = append(invokers, NewMockInvoker(url, 1)) - } - - staticDir := directory.NewStaticDirectory(invokers) - clusterInvoker := regAwareCluster.Join(staticDir) - assert.Equal(t, true, clusterInvoker.IsAvailable()) - result := clusterInvoker.Invoke(context.Background(), &invocation.RPCInvocation{}) - assert.NoError(t, result.Error()) - count = 0 - clusterInvoker.Destroy() - assert.Equal(t, false, clusterInvoker.IsAvailable()) - -} diff --git a/cluster/cluster_impl/registry_aware_cluster.go b/cluster/cluster_impl/zone_aware_cluster.go similarity index 70% rename from cluster/cluster_impl/registry_aware_cluster.go rename to cluster/cluster_impl/zone_aware_cluster.go index fcefa52862a39eece98dca8660e62d9ca144e955..eadc5da6d8ebe23afc82156394162278755f227c 100644 --- a/cluster/cluster_impl/registry_aware_cluster.go +++ b/cluster/cluster_impl/zone_aware_cluster.go @@ -23,17 +23,23 @@ import ( "github.com/apache/dubbo-go/protocol" ) -type registryAwareCluster struct{} +type zoneAwareCluster struct{} + +const zoneAware = "zoneAware" func init() { - extension.SetCluster("registryAware", NewRegistryAwareCluster) + extension.SetCluster(zoneAware, NewZoneAwareCluster) +} + +// NewZoneAwareCluster ... +func NewZoneAwareCluster() cluster.Cluster { + return &zoneAwareCluster{} } -// NewRegistryAwareCluster returns a registry aware cluster instance -func NewRegistryAwareCluster() cluster.Cluster { - return ®istryAwareCluster{} +func (cluster *zoneAwareCluster) Join(directory cluster.Directory) protocol.Invoker { + return newZoneAwareClusterInvoker(directory) } -func (cluster *registryAwareCluster) Join(directory cluster.Directory) protocol.Invoker { - return newRegistryAwareClusterInvoker(directory) +func GetZoneAwareName() string { + return zoneAware } diff --git a/cluster/cluster_impl/zone_aware_cluster_invoker.go b/cluster/cluster_impl/zone_aware_cluster_invoker.go new file mode 100644 index 0000000000000000000000000000000000000000..5e9b63326cc90b242d737a3832770b9795859539 --- /dev/null +++ b/cluster/cluster_impl/zone_aware_cluster_invoker.go @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cluster_impl + +import ( + "context" + "fmt" + "github.com/apache/dubbo-go/cluster" + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/protocol" +) + +// When there're more than one registry for subscription. +// +// This extension provides a strategy to decide how to distribute traffics among them: +// 1. registry marked as 'preferred=true' has the highest priority. +// 2. check the zone the current request belongs, pick the registry that has the same zone first. +// 3. Evenly balance traffic between all registries based on each registry's weight. +// 4. Pick anyone that's available. +type zoneAwareClusterInvoker struct { + baseClusterInvoker +} + +func newZoneAwareClusterInvoker(directory cluster.Directory) protocol.Invoker { + return &zoneAwareClusterInvoker{ + baseClusterInvoker: newBaseClusterInvoker(directory), + } +} + +func (invoker *zoneAwareClusterInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result { + invokers := invoker.directory.List(invocation) + + err := invoker.checkInvokers(invokers, invocation) + if err != nil { + return &protocol.RPCResult{Err: err} + } + + // First, pick the invoker (XXXClusterInvoker) that comes from the local registry, distinguish by a 'preferred' key. + for _, invoker := range invokers { + if invoker.IsAvailable() && "true" == invoker.GetUrl().GetParam(constant.REGISTRY_KEY+"."+constant.PREFERRED_KEY, "false") { + return invoker.Invoke(ctx, invocation) + } + } + + // providers in the registry with the same zone + zone := invocation.AttachmentsByKey(constant.REGISTRY_ZONE, "") + if "" != zone { + for _, invoker := range invokers { + if invoker.IsAvailable() && zone == invoker.GetUrl().GetParam(constant.REGISTRY_KEY+"."+constant.ZONE_KEY, "") { + return invoker.Invoke(ctx, invocation) + } + } + + force := invocation.AttachmentsByKey(constant.REGISTRY_ZONE_FORCE, "") + if "true" == force { + return &protocol.RPCResult{ + Err: fmt.Errorf("no registry instance in zone or no available providers in the registry, zone: %v, "+ + " registries: %v", zone, invoker.GetUrl()), + } + } + } + + // load balance among all registries, with registry weight count in. + loadBalance := getLoadBalance(invokers[0], invocation) + ivk := invoker.doSelect(loadBalance, invocation, invokers, nil) + if ivk != nil && ivk.IsAvailable() { + return ivk.Invoke(ctx, invocation) + } + + // If none of the invokers has a preferred signal or is picked by the loadBalancer, pick the first one available. + for _, invoker := range invokers { + if invoker.IsAvailable() { + return invoker.Invoke(ctx, invocation) + } + } + + return &protocol.RPCResult{ + Err: fmt.Errorf("no provider available in %v", invokers), + } +} diff --git a/cluster/cluster_impl/zone_aware_cluster_invoker_test.go b/cluster/cluster_impl/zone_aware_cluster_invoker_test.go new file mode 100644 index 0000000000000000000000000000000000000000..a7d550c604558a8a82298b03ac9669f3672505ba --- /dev/null +++ b/cluster/cluster_impl/zone_aware_cluster_invoker_test.go @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cluster_impl + +import ( + "context" + "fmt" + "github.com/apache/dubbo-go/cluster/directory" + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/protocol" + "github.com/apache/dubbo-go/protocol/invocation" + "github.com/apache/dubbo-go/protocol/mock" + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" + "testing" +) + +func Test_ZoneWareInvokerWithPreferredSuccess(t *testing.T) { + ctrl := gomock.NewController(t) + // In Go versions 1.14+, if you pass a *testing.T into gomock.NewController(t) you no longer need to call ctrl.Finish(). + //defer ctrl.Finish() + + mockResult := &protocol.RPCResult{Attrs: map[string]string{constant.PREFERRED_KEY: "true"}, Rest: rest{tried: 0, success: true}} + + var invokers []protocol.Invoker + for i := 0; i < 2; i++ { + url, _ := common.NewURL(fmt.Sprintf("dubbo://192.168.1.%v:20000/com.ikurento.user.UserProvider", i)) + invoker := mock.NewMockInvoker(ctrl) + invoker.EXPECT().IsAvailable().Return(true).AnyTimes() + invoker.EXPECT().GetUrl().Return(url).AnyTimes() + if 0 == i { + url.SetParam(constant.REGISTRY_KEY+"."+constant.PREFERRED_KEY, "true") + invoker.EXPECT().Invoke(gomock.Any()).DoAndReturn( + func(invocation protocol.Invocation) protocol.Result { + return mockResult + }) + } else { + invoker.EXPECT().Invoke(gomock.Any()).DoAndReturn( + func(invocation protocol.Invocation) protocol.Result { + return &protocol.RPCResult{} + }) + } + + invokers = append(invokers, invoker) + } + + zoneAwareCluster := NewZoneAwareCluster() + staticDir := directory.NewStaticDirectory(invokers) + clusterInvoker := zoneAwareCluster.Join(staticDir) + + result := clusterInvoker.Invoke(context.Background(), &invocation.RPCInvocation{}) + + assert.Equal(t, mockResult, result) +} + +func Test_ZoneWareInvokerWithWeightSuccess(t *testing.T) { + ctrl := gomock.NewController(t) + // In Go versions 1.14+, if you pass a *testing.T into gomock.NewController(t) you no longer need to call ctrl.Finish(). + //defer ctrl.Finish() + + w1 := "50" + w2 := "200" + + var invokers []protocol.Invoker + for i := 0; i < 2; i++ { + url, _ := common.NewURL(fmt.Sprintf("dubbo://192.168.1.%v:20000/com.ikurento.user.UserProvider", i)) + invoker := mock.NewMockInvoker(ctrl) + invoker.EXPECT().IsAvailable().Return(true).AnyTimes() + invoker.EXPECT().GetUrl().Return(url).AnyTimes() + if 1 == i { + url.SetParam(constant.REGISTRY_KEY+"."+constant.WEIGHT_KEY, w1) + invoker.EXPECT().Invoke(gomock.Any()).DoAndReturn( + func(invocation protocol.Invocation) protocol.Result { + return &protocol.RPCResult{Attrs: map[string]string{constant.WEIGHT_KEY: w1}, Rest: rest{tried: 0, success: true}} + }).MaxTimes(100) + } else { + url.SetParam(constant.REGISTRY_KEY+"."+constant.WEIGHT_KEY, w2) + invoker.EXPECT().Invoke(gomock.Any()).DoAndReturn( + func(invocation protocol.Invocation) protocol.Result { + return &protocol.RPCResult{Attrs: map[string]string{constant.WEIGHT_KEY: w2}, Rest: rest{tried: 0, success: true}} + }).MaxTimes(100) + } + invokers = append(invokers, invoker) + } + + zoneAwareCluster := NewZoneAwareCluster() + staticDir := directory.NewStaticDirectory(invokers) + clusterInvoker := zoneAwareCluster.Join(staticDir) + + var w2Count, w1Count int + loop := 50 + for i := 0; i < loop; i++ { + result := clusterInvoker.Invoke(context.Background(), &invocation.RPCInvocation{}) + if w2 == result.Attachment(constant.WEIGHT_KEY, "0") { + w2Count++ + } + if w1 == result.Attachment(constant.WEIGHT_KEY, "0") { + w1Count++ + } + assert.NoError(t, result.Error()) + } + t.Logf("loop count : %d, w1 value : %s | count : %d, w2 value : %s | count : %d", loop, + w1, w1Count, w2, w2Count) +} + +func Test_ZoneWareInvokerWithZoneSuccess(t *testing.T) { + var zoneArray = []string{"hangzhou", "shanghai"} + + ctrl := gomock.NewController(t) + // In Go versions 1.14+, if you pass a *testing.T into gomock.NewController(t) you no longer need to call ctrl.Finish(). + //defer ctrl.Finish() + + var invokers []protocol.Invoker + for i := 0; i < 2; i++ { + zoneValue := zoneArray[i] + url, _ := common.NewURL(fmt.Sprintf("dubbo://192.168.1.%v:20000/com.ikurento.user.UserProvider", i)) + url.SetParam(constant.REGISTRY_KEY+"."+constant.ZONE_KEY, zoneValue) + + invoker := mock.NewMockInvoker(ctrl) + invoker.EXPECT().IsAvailable().Return(true).AnyTimes() + invoker.EXPECT().GetUrl().Return(url).AnyTimes() + invoker.EXPECT().Invoke(gomock.Any()).DoAndReturn( + func(invocation protocol.Invocation) protocol.Result { + return &protocol.RPCResult{Attrs: map[string]string{constant.ZONE_KEY: zoneValue}, Rest: rest{tried: 0, success: true}} + }) + invokers = append(invokers, invoker) + } + + zoneAwareCluster := NewZoneAwareCluster() + staticDir := directory.NewStaticDirectory(invokers) + clusterInvoker := zoneAwareCluster.Join(staticDir) + + inv := &invocation.RPCInvocation{} + // zone hangzhou + hz := zoneArray[0] + inv.SetAttachments(constant.REGISTRY_ZONE, hz) + + result := clusterInvoker.Invoke(context.Background(), inv) + + assert.Equal(t, hz, result.Attachment(constant.ZONE_KEY, "")) +} + +func Test_ZoneWareInvokerWithZoneForceFail(t *testing.T) { + ctrl := gomock.NewController(t) + // In Go versions 1.14+, if you pass a *testing.T into gomock.NewController(t) you no longer need to call ctrl.Finish(). + //defer ctrl.Finish() + + var invokers []protocol.Invoker + for i := 0; i < 2; i++ { + url, _ := common.NewURL(fmt.Sprintf("dubbo://192.168.1.%v:20000/com.ikurento.user.UserProvider", i)) + + invoker := mock.NewMockInvoker(ctrl) + invoker.EXPECT().IsAvailable().Return(true).AnyTimes() + invoker.EXPECT().GetUrl().Return(url).AnyTimes() + invokers = append(invokers, invoker) + } + + zoneAwareCluster := NewZoneAwareCluster() + staticDir := directory.NewStaticDirectory(invokers) + clusterInvoker := zoneAwareCluster.Join(staticDir) + + inv := &invocation.RPCInvocation{} + // zone hangzhou + inv.SetAttachments(constant.REGISTRY_ZONE, "hangzhou") + inv.SetAttachments(constant.REGISTRY_ZONE_FORCE, "true") + + result := clusterInvoker.Invoke(context.Background(), inv) + + assert.NotNil(t, result.Error()) +} diff --git a/cluster/loadbalance/util.go b/cluster/loadbalance/util.go index b6c013852bf55ce7eb67e4fa18802a938141d283..c76bdbae7e5c591e6ac1fb28ff78afe7b5b42df7 100644 --- a/cluster/loadbalance/util.go +++ b/cluster/loadbalance/util.go @@ -17,34 +17,43 @@ package loadbalance -import ( - "time" -) - import ( "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/protocol" + "time" ) // GetWeight gets weight for load balance strategy func GetWeight(invoker protocol.Invoker, invocation protocol.Invocation) int64 { + var weight int64 url := invoker.GetUrl() - weight := url.GetMethodParamInt64(invocation.MethodName(), constant.WEIGHT_KEY, constant.DEFAULT_WEIGHT) + // Multiple registry scenario, load balance among multiple registries. + isRegIvk := url.GetParamBool(constant.REGISTRY_KEY+"."+constant.REGISTRY_LABEL_KEY, false) + if isRegIvk { + weight = url.GetParamInt(constant.REGISTRY_KEY+"."+constant.WEIGHT_KEY, constant.DEFAULT_WEIGHT) + } else { + weight = url.GetMethodParamInt64(invocation.MethodName(), constant.WEIGHT_KEY, constant.DEFAULT_WEIGHT) - if weight > 0 { - //get service register time an do warm up time - now := time.Now().Unix() - timestamp := url.GetParamInt(constant.REMOTE_TIMESTAMP_KEY, now) - if uptime := now - timestamp; uptime > 0 { - warmup := url.GetParamInt(constant.WARMUP_KEY, constant.DEFAULT_WARMUP) - if uptime < warmup { - if ww := float64(uptime) / float64(warmup) / float64(weight); ww < 1 { - weight = 1 - } else if int64(ww) <= weight { - weight = int64(ww) + if weight > 0 { + //get service register time an do warm up time + now := time.Now().Unix() + timestamp := url.GetParamInt(constant.REMOTE_TIMESTAMP_KEY, now) + if uptime := now - timestamp; uptime > 0 { + warmup := url.GetParamInt(constant.WARMUP_KEY, constant.DEFAULT_WARMUP) + if uptime < warmup { + if ww := float64(uptime) / float64(warmup) / float64(weight); ww < 1 { + weight = 1 + } else if int64(ww) <= weight { + weight = int64(ww) + } } } } } + + if weight < 0 { + weight = 0 + } + return weight } diff --git a/common/constant/key.go b/common/constant/key.go index 5be63fe82ce61d41b0845e2e076e48c3aebcd0cb..ded6de9efac8df24996368cc85ba50150bfc2fe8 100644 --- a/common/constant/key.go +++ b/common/constant/key.go @@ -89,6 +89,11 @@ const ( ROLE_KEY = "registry.role" REGISTRY_DEFAULT_KEY = "registry.default" REGISTRY_TIMEOUT_KEY = "registry.timeout" + REGISTRY_LABEL_KEY = "label" + PREFERRED_KEY = "preferred" + ZONE_KEY = "zone" + REGISTRY_ZONE = "registry_zone" + REGISTRY_ZONE_FORCE = "registry_zone_force" ) const ( diff --git a/config/config_loader_test.go b/config/config_loader_test.go index a21a4998aae9eb6e0bd0632c7248cc39e7bbd9fa..0351289f46f3b19c28fc258446e2a4e50022ba8b 100644 --- a/config/config_loader_test.go +++ b/config/config_loader_test.go @@ -71,7 +71,7 @@ func TestLoad(t *testing.T) { SetProviderService(ms) extension.SetProtocol("registry", GetProtocol) - extension.SetCluster("registryAware", cluster_impl.NewRegistryAwareCluster) + extension.SetCluster(cluster_impl.GetZoneAwareName(), cluster_impl.NewZoneAwareCluster) extension.SetProxyFactory("default", proxy_factory.NewDefaultProxyFactory) Load() @@ -100,7 +100,7 @@ func TestLoadWithSingleReg(t *testing.T) { SetProviderService(ms) extension.SetProtocol("registry", GetProtocol) - extension.SetCluster("registryAware", cluster_impl.NewRegistryAwareCluster) + extension.SetCluster(cluster_impl.GetZoneAwareName(), cluster_impl.NewZoneAwareCluster) extension.SetProxyFactory("default", proxy_factory.NewDefaultProxyFactory) Load() @@ -129,7 +129,7 @@ func TestWithNoRegLoad(t *testing.T) { SetProviderService(ms) extension.SetProtocol("registry", GetProtocol) - extension.SetCluster("registryAware", cluster_impl.NewRegistryAwareCluster) + extension.SetCluster(cluster_impl.GetZoneAwareName(), cluster_impl.NewZoneAwareCluster) extension.SetProxyFactory("default", proxy_factory.NewDefaultProxyFactory) Load() diff --git a/config/reference_config.go b/config/reference_config.go index 5b7a8e9eac676e10276775cab327ae4de1eddf86..af645e2e308dc2707e25544611064a7fd5bd2429 100644 --- a/config/reference_config.go +++ b/config/reference_config.go @@ -20,6 +20,7 @@ package config import ( "context" "fmt" + "github.com/apache/dubbo-go/cluster/cluster_impl" "net/url" "strconv" "time" @@ -145,10 +146,21 @@ func (c *ReferenceConfig) Refer(_ interface{}) { } } if regUrl != nil { - cluster := extension.GetCluster("registryAware") + // for multi-subscription scenario, use 'zone-aware' policy by default + cluster := extension.GetCluster(cluster_impl.GetZoneAwareName()) + // The invoker wrap sequence would be: ZoneAwareClusterInvoker(StaticDirectory) -> FailoverClusterInvoker(RegistryDirectory, routing happens here) -> Invoker c.invoker = cluster.Join(directory.NewStaticDirectory(invokers)) } else { - cluster := extension.GetCluster(c.Cluster) + // not a registry url, must be direct invoke. + clu := cluster_impl.GetFailoverName() + if len(invokers) > 0 { + u := invokers[0].GetUrl() + if nil != &u { + clu = u.GetParam(constant.CLUSTER_KEY, cluster_impl.GetZoneAwareName()) + } + } + + cluster := extension.GetCluster(clu) c.invoker = cluster.Join(directory.NewStaticDirectory(invokers)) } } diff --git a/config/reference_config_test.go b/config/reference_config_test.go index e43f5aa40af84b9f15a5595ce23696b6a1bae9a4..3d38dff8647c65f1e10c24693a4ecd1b40239286 100644 --- a/config/reference_config_test.go +++ b/config/reference_config_test.go @@ -179,10 +179,10 @@ func doInitConsumerWithSingleRegistry() { } } -func TestReferMultireg(t *testing.T) { +func TestReferMultiReg(t *testing.T) { doInitConsumer() extension.SetProtocol("registry", GetProtocol) - extension.SetCluster("registryAware", cluster_impl.NewRegistryAwareCluster) + extension.SetCluster(cluster_impl.GetZoneAwareName(), cluster_impl.NewZoneAwareCluster) for _, reference := range consumerConfig.References { reference.Refer(nil) @@ -195,7 +195,7 @@ func TestReferMultireg(t *testing.T) { func TestRefer(t *testing.T) { doInitConsumer() extension.SetProtocol("registry", GetProtocol) - extension.SetCluster("registryAware", cluster_impl.NewRegistryAwareCluster) + extension.SetCluster(cluster_impl.GetZoneAwareName(), cluster_impl.NewZoneAwareCluster) for _, reference := range consumerConfig.References { reference.Refer(nil) @@ -209,7 +209,7 @@ func TestRefer(t *testing.T) { func TestReferAsync(t *testing.T) { doInitConsumerAsync() extension.SetProtocol("registry", GetProtocol) - extension.SetCluster("registryAware", cluster_impl.NewRegistryAwareCluster) + extension.SetCluster(cluster_impl.GetZoneAwareName(), cluster_impl.NewZoneAwareCluster) for _, reference := range consumerConfig.References { reference.Refer(nil) @@ -267,7 +267,7 @@ func TestReferMultiP2PWithReg(t *testing.T) { func TestImplement(t *testing.T) { doInitConsumer() extension.SetProtocol("registry", GetProtocol) - extension.SetCluster("registryAware", cluster_impl.NewRegistryAwareCluster) + extension.SetCluster(cluster_impl.GetZoneAwareName(), cluster_impl.NewZoneAwareCluster) for _, reference := range consumerConfig.References { reference.Refer(nil) reference.Implement(&MockService{}) diff --git a/config/registry_config.go b/config/registry_config.go index ef527c827e9dac4cd2762f579d30254e9e51150f..92a174d8fba57e6fdcb0daa841a4fb7548590620 100644 --- a/config/registry_config.go +++ b/config/registry_config.go @@ -40,11 +40,17 @@ type RegistryConfig struct { TimeoutStr string `yaml:"timeout" default:"5s" json:"timeout,omitempty" property:"timeout"` // unit: second Group string `yaml:"group" json:"group,omitempty" property:"group"` // for registry - Address string `yaml:"address" json:"address,omitempty" property:"address"` - Username string `yaml:"username" json:"username,omitempty" property:"username"` - Password string `yaml:"password" json:"password,omitempty" property:"password"` - Simplified bool `yaml:"simplified" json:"simplified,omitempty" property:"simplified"` - Params map[string]string `yaml:"params" json:"params,omitempty" property:"params"` + Address string `yaml:"address" json:"address,omitempty" property:"address"` + Username string `yaml:"username" json:"username,omitempty" property:"username"` + Password string `yaml:"password" json:"password,omitempty" property:"password"` + Simplified bool `yaml:"simplified" json:"simplified,omitempty" property:"simplified"` + // Always use this registry first if set to true, useful when subscribe to multiple registries + Preferred bool `yaml:"preferred" json:"params,omitempty" property:"preferred"` + // The region where the registry belongs, usually used to isolate traffics + Zone string `yaml:"zone" json:"params,omitempty" property:"zone"` + // Affects traffic distribution among registries, useful when subscribe to multiple registries Take effect only when no preferred registry is specified. + Weight int64 `yaml:"weight" json:"params,omitempty" property:"weight"` + Params map[string]string `yaml:"params" json:"params,omitempty" property:"params"` } // UnmarshalYAML unmarshals the RegistryConfig by @unmarshal function @@ -118,6 +124,11 @@ func (c *RegistryConfig) getUrlMap(roleType common.RoleType) url.Values { urlMap.Set(constant.ROLE_KEY, strconv.Itoa(int(roleType))) urlMap.Set(constant.REGISTRY_KEY, c.Protocol) urlMap.Set(constant.REGISTRY_TIMEOUT_KEY, c.TimeoutStr) + // multi registry invoker weight label for load balance + urlMap.Set(constant.REGISTRY_KEY+"."+constant.REGISTRY_LABEL_KEY, strconv.FormatBool(true)) + urlMap.Set(constant.REGISTRY_KEY+"."+constant.PREFERRED_KEY, strconv.FormatBool(c.Preferred)) + urlMap.Set(constant.REGISTRY_KEY+"."+constant.ZONE_KEY, c.Zone) + urlMap.Set(constant.REGISTRY_KEY+"."+constant.WEIGHT_KEY, strconv.FormatInt(c.Weight, 10)) for k, v := range c.Params { urlMap.Set(k, v) }