diff --git a/cluster/cluster_impl/base_cluster_invoker.go b/cluster/cluster_impl/base_cluster_invoker.go index bbdfa715d7cdc461689e60a5a41171ad5c9770e1..b826a6b9abeda40c7a2190215adc432df888d715 100644 --- a/cluster/cluster_impl/base_cluster_invoker.go +++ b/cluster/cluster_impl/base_cluster_invoker.go @@ -18,6 +18,7 @@ package cluster_impl import ( + "context" gxnet "github.com/dubbogo/gost/net" perrors "github.com/pkg/errors" "go.uber.org/atomic" @@ -36,6 +37,7 @@ type baseClusterInvoker struct { availablecheck bool destroyed *atomic.Bool stickyInvoker protocol.Invoker + interceptor cluster.ClusterInterceptor } func newBaseClusterInvoker(directory cluster.Directory) baseClusterInvoker { @@ -146,6 +148,20 @@ func (invoker *baseClusterInvoker) doSelectInvoker(lb cluster.LoadBalance, invoc return selectedInvoker } +func (invoker *baseClusterInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result { + if invoker.interceptor != nil { + invoker.interceptor.BeforeInvoker(ctx, invocation) + + result := invoker.interceptor.DoInvoke(ctx, invocation) + + invoker.interceptor.AfterInvoker(ctx, invocation) + + return result + } + + return nil +} + func isInvoked(selectedInvoker protocol.Invoker, invoked []protocol.Invoker) bool { for _, i := range invoked { if i == selectedInvoker { diff --git a/cluster/cluster_impl/failover_cluster.go b/cluster/cluster_impl/failover_cluster.go index 6ef0fbf95118e79e3faf3586f11d077bc15c33f7..4c09fd16d3eb5ebaf7833443ee0f8a980c81b822 100644 --- a/cluster/cluster_impl/failover_cluster.go +++ b/cluster/cluster_impl/failover_cluster.go @@ -19,16 +19,15 @@ package cluster_impl import ( "github.com/apache/dubbo-go/cluster" + "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/protocol" ) type failoverCluster struct{} -const name = "failover" - func init() { - extension.SetCluster(name, NewFailoverCluster) + extension.SetCluster(constant.FAILOVER_CLUSTER_NAME, NewFailoverCluster) } // NewFailoverCluster returns a failover cluster instance @@ -44,7 +43,3 @@ func NewFailoverCluster() cluster.Cluster { func (cluster *failoverCluster) Join(directory cluster.Directory) protocol.Invoker { return newFailoverClusterInvoker(directory) } - -func GetFailoverName() string { - return name -} diff --git a/cluster/cluster_impl/zone_aware_cluster.go b/cluster/cluster_impl/zone_aware_cluster.go index 2e19be476278a364a8d6a932150476a73ce55f5d..7439db2d374f73571f3c8b95d8faca6cb98d4cd0 100644 --- a/cluster/cluster_impl/zone_aware_cluster.go +++ b/cluster/cluster_impl/zone_aware_cluster.go @@ -19,16 +19,15 @@ package cluster_impl import ( "github.com/apache/dubbo-go/cluster" + "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/protocol" ) type zoneAwareCluster struct{} -const zoneAware = "zoneAware" - func init() { - extension.SetCluster(zoneAware, NewZoneAwareCluster) + extension.SetCluster(constant.ZONEAWARE_CLUSTER_NAME, NewZoneAwareCluster) } // NewZoneAwareCluster returns a zoneaware cluster instance. @@ -43,8 +42,3 @@ func NewZoneAwareCluster() cluster.Cluster { func (cluster *zoneAwareCluster) Join(directory cluster.Directory) protocol.Invoker { return newZoneAwareClusterInvoker(directory) } - -// GetZoneAwareName get cluster name -func GetZoneAwareName() string { - return zoneAware -} diff --git a/cluster/cluster_impl/zone_aware_cluster_invoker.go b/cluster/cluster_impl/zone_aware_cluster_invoker.go index dd4d319b4031e447ba42ef817dc2ba179d982569..0f52b0442c235d570e4cc9d8491aa80a9df5842d 100644 --- a/cluster/cluster_impl/zone_aware_cluster_invoker.go +++ b/cluster/cluster_impl/zone_aware_cluster_invoker.go @@ -40,13 +40,16 @@ type zoneAwareClusterInvoker struct { } func newZoneAwareClusterInvoker(directory cluster.Directory) protocol.Invoker { - return &zoneAwareClusterInvoker{ + invoke := &zoneAwareClusterInvoker{ baseClusterInvoker: newBaseClusterInvoker(directory), } + // add self to interceptor + invoke.interceptor = invoke + return invoke } // nolint -func (invoker *zoneAwareClusterInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result { +func (invoker *zoneAwareClusterInvoker) DoInvoke(ctx context.Context, invocation protocol.Invocation) protocol.Result { invokers := invoker.directory.List(invocation) err := invoker.checkInvokers(invokers, invocation) @@ -63,16 +66,16 @@ func (invoker *zoneAwareClusterInvoker) Invoke(ctx context.Context, invocation p } // providers in the registry with the same zone - zone := invocation.AttachmentsByKey(constant.REGISTRY_ZONE, "") + key := constant.REGISTRY_KEY + "." + constant.ZONE_KEY + zone := invocation.AttachmentsByKey(key, "") if "" != zone { for _, invoker := range invokers { - key := constant.REGISTRY_KEY + "." + constant.ZONE_KEY if invoker.IsAvailable() && matchParam(zone, key, "", invoker) { return invoker.Invoke(ctx, invocation) } } - force := invocation.AttachmentsByKey(constant.REGISTRY_ZONE_FORCE, "") + force := invocation.AttachmentsByKey(constant.REGISTRY_KEY+"."+constant.ZONE_FORCE_KEY, "") if "true" == force { return &protocol.RPCResult{ Err: fmt.Errorf("no registry instance in zone or "+ @@ -101,6 +104,30 @@ func (invoker *zoneAwareClusterInvoker) Invoke(ctx context.Context, invocation p } } +func (invoker *zoneAwareClusterInvoker) BeforeInvoker(ctx context.Context, invocation protocol.Invocation) { + key := constant.REGISTRY_KEY + "." + constant.ZONE_FORCE_KEY + force := ctx.Value(key) + + if force != nil { + switch value := force.(type) { + case bool: + if value { + invocation.SetAttachments(key, "true") + } + case string: + if "true" == value { + invocation.SetAttachments(key, "true") + } + default: + // ignore + } + } +} + +func (invoker *zoneAwareClusterInvoker) AfterInvoker(ctx context.Context, invocation protocol.Invocation) { + +} + func matchParam(target, key, def string, invoker protocol.Invoker) bool { return target == invoker.GetUrl().GetParam(key, def) } diff --git a/cluster/cluster_impl/zone_aware_cluster_invoker_test.go b/cluster/cluster_impl/zone_aware_cluster_invoker_test.go index 4ac865972a5675224298183e6fc0f501bb370197..cd201a42c759354ca536ea3e9e77116d89ea8b4b 100644 --- a/cluster/cluster_impl/zone_aware_cluster_invoker_test.go +++ b/cluster/cluster_impl/zone_aware_cluster_invoker_test.go @@ -37,7 +37,7 @@ import ( "github.com/apache/dubbo-go/protocol/mock" ) -func Test_ZoneWareInvokerWithPreferredSuccess(t *testing.T) { +func TestZoneWareInvokerWithPreferredSuccess(t *testing.T) { ctrl := gomock.NewController(t) // In Go versions 1.14+, if you pass a *testing.T // into gomock.NewController(t) you no longer need to call ctrl.Finish(). @@ -78,7 +78,7 @@ func Test_ZoneWareInvokerWithPreferredSuccess(t *testing.T) { assert.Equal(t, mockResult, result) } -func Test_ZoneWareInvokerWithWeightSuccess(t *testing.T) { +func TestZoneWareInvokerWithWeightSuccess(t *testing.T) { ctrl := gomock.NewController(t) // In Go versions 1.14+, if you pass a *testing.T // into gomock.NewController(t) you no longer need to call ctrl.Finish(). @@ -134,7 +134,7 @@ func Test_ZoneWareInvokerWithWeightSuccess(t *testing.T) { w1, w1Count, w2, w2Count) } -func Test_ZoneWareInvokerWithZoneSuccess(t *testing.T) { +func TestZoneWareInvokerWithZoneSuccess(t *testing.T) { var zoneArray = []string{"hangzhou", "shanghai"} ctrl := gomock.NewController(t) @@ -167,14 +167,14 @@ func Test_ZoneWareInvokerWithZoneSuccess(t *testing.T) { inv := &invocation.RPCInvocation{} // zone hangzhou hz := zoneArray[0] - inv.SetAttachments(constant.REGISTRY_ZONE, hz) + inv.SetAttachments(constant.REGISTRY_KEY+"."+constant.ZONE_KEY, hz) result := clusterInvoker.Invoke(context.Background(), inv) assert.Equal(t, hz, result.Attachment(constant.ZONE_KEY, "")) } -func Test_ZoneWareInvokerWithZoneForceFail(t *testing.T) { +func TestZoneWareInvokerWithZoneForceFail(t *testing.T) { ctrl := gomock.NewController(t) // In Go versions 1.14+, if you pass a *testing.T // into gomock.NewController(t) you no longer need to call ctrl.Finish(). @@ -196,8 +196,9 @@ func Test_ZoneWareInvokerWithZoneForceFail(t *testing.T) { inv := &invocation.RPCInvocation{} // zone hangzhou - inv.SetAttachments(constant.REGISTRY_ZONE, "hangzhou") - inv.SetAttachments(constant.REGISTRY_ZONE_FORCE, "true") + inv.SetAttachments(constant.REGISTRY_KEY+"."+constant.ZONE_KEY, "hangzhou") + // zone force + inv.SetAttachments(constant.REGISTRY_KEY+"."+constant.ZONE_FORCE_KEY, "true") result := clusterInvoker.Invoke(context.Background(), inv) diff --git a/cluster/cluster_interceptor.go b/cluster/cluster_interceptor.go new file mode 100644 index 0000000000000000000000000000000000000000..d78dc15edab36f082f1ba9c24affcf47106a59c1 --- /dev/null +++ b/cluster/cluster_interceptor.go @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cluster + +import ( + "context" + "github.com/apache/dubbo-go/protocol" +) + +// ClusterInterceptor +// Extension - ClusterInterceptor +type ClusterInterceptor interface { + // Before DoInvoke method + BeforeInvoker(ctx context.Context, invocation protocol.Invocation) + + // After DoInvoke method + AfterInvoker(ctx context.Context, invocation protocol.Invocation) + + // Corresponding cluster invoke + DoInvoke(ctx context.Context, invocation protocol.Invocation) protocol.Result +} diff --git a/common/constant/cluster.go b/common/constant/cluster.go new file mode 100644 index 0000000000000000000000000000000000000000..6894f3595ea8dfdc83f0ce372bb7f22a47e3e434 --- /dev/null +++ b/common/constant/cluster.go @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package constant + +// nolint +const ( + FAILOVER_CLUSTER_NAME = "failover" + ZONEAWARE_CLUSTER_NAME = "zoneAware" +) diff --git a/common/constant/key.go b/common/constant/key.go index f7189b7c2589d1461e333828c5a18c8d6cc6bbd0..5c4b0ad669662e3f17a799fa8bf4500c35e932ad 100644 --- a/common/constant/key.go +++ b/common/constant/key.go @@ -100,8 +100,7 @@ const ( REGISTRY_LABEL_KEY = "label" PREFERRED_KEY = "preferred" ZONE_KEY = "zone" - REGISTRY_ZONE = "registry_zone" - REGISTRY_ZONE_FORCE = "registry_zone_force" + ZONE_FORCE_KEY = "zone.force" REGISTRY_TTL_KEY = "registry.ttl" ) diff --git a/config/config_loader_test.go b/config/config_loader_test.go index 103a978b258c3f4732e0d1b81c4ccfaac3aa6c10..a219b9f465c71186e8cd482a7bbca32a03cdb396 100644 --- a/config/config_loader_test.go +++ b/config/config_loader_test.go @@ -72,7 +72,7 @@ func TestLoad(t *testing.T) { SetProviderService(ms) extension.SetProtocol("registry", GetProtocol) - extension.SetCluster(cluster_impl.GetZoneAwareName(), cluster_impl.NewZoneAwareCluster) + extension.SetCluster(constant.ZONEAWARE_CLUSTER_NAME, cluster_impl.NewZoneAwareCluster) extension.SetProxyFactory("default", proxy_factory.NewDefaultProxyFactory) Load() @@ -101,7 +101,7 @@ func TestLoadWithSingleReg(t *testing.T) { SetProviderService(ms) extension.SetProtocol("registry", GetProtocol) - extension.SetCluster(cluster_impl.GetZoneAwareName(), cluster_impl.NewZoneAwareCluster) + extension.SetCluster(constant.ZONEAWARE_CLUSTER_NAME, cluster_impl.NewZoneAwareCluster) extension.SetProxyFactory("default", proxy_factory.NewDefaultProxyFactory) Load() @@ -130,7 +130,7 @@ func TestWithNoRegLoad(t *testing.T) { SetProviderService(ms) extension.SetProtocol("registry", GetProtocol) - extension.SetCluster(cluster_impl.GetZoneAwareName(), cluster_impl.NewZoneAwareCluster) + extension.SetCluster(constant.ZONEAWARE_CLUSTER_NAME, cluster_impl.NewZoneAwareCluster) extension.SetProxyFactory("default", proxy_factory.NewDefaultProxyFactory) Load() diff --git a/config/reference_config.go b/config/reference_config.go index 1ce60d3677ae5ef304d6b1050a3863fb9efd01e0..bbc875192c7a87354ccc81e28ea05bbc3bb71149 100644 --- a/config/reference_config.go +++ b/config/reference_config.go @@ -26,7 +26,6 @@ import ( ) import ( - "github.com/apache/dubbo-go/cluster/cluster_impl" "github.com/creasty/defaults" gxstrings "github.com/dubbogo/gost/strings" ) @@ -148,26 +147,26 @@ func (c *ReferenceConfig) Refer(_ interface{}) { } // TODO(decouple from directory, config should not depend on directory module) + var hitClu string if regUrl != nil { // for multi-subscription scenario, use 'zone-aware' policy by default - cluster := extension.GetCluster(cluster_impl.GetZoneAwareName()) - // The invoker wrap sequence would be: - // ZoneAwareClusterInvoker(StaticDirectory) -> - // FailoverClusterInvoker(RegistryDirectory, routing happens here) -> Invoker - c.invoker = cluster.Join(directory.NewStaticDirectory(invokers)) + hitClu = constant.ZONEAWARE_CLUSTER_NAME } else { // not a registry url, must be direct invoke. - clu := cluster_impl.GetFailoverName() + hitClu = constant.FAILOVER_CLUSTER_NAME if len(invokers) > 0 { u := invokers[0].GetUrl() if nil != &u { - clu = u.GetParam(constant.CLUSTER_KEY, cluster_impl.GetZoneAwareName()) + hitClu = u.GetParam(constant.CLUSTER_KEY, constant.ZONEAWARE_CLUSTER_NAME) } } - - cluster := extension.GetCluster(clu) - c.invoker = cluster.Join(directory.NewStaticDirectory(invokers)) } + + cluster := extension.GetCluster(hitClu) + // If 'zone-aware' policy select, the invoker wrap sequence would be: + // ZoneAwareClusterInvoker(StaticDirectory) -> + // FailoverClusterInvoker(RegistryDirectory, routing happens here) -> Invoker + c.invoker = cluster.Join(directory.NewStaticDirectory(invokers)) } // create proxy diff --git a/config/reference_config_test.go b/config/reference_config_test.go index 1e31d7d44528d32a2ac7bf972e651ac11fe1c89b..e45780159615c8896627b003fd45c35af86d3f02 100644 --- a/config/reference_config_test.go +++ b/config/reference_config_test.go @@ -190,7 +190,7 @@ func doInitConsumerWithSingleRegistry() { func TestReferMultiReg(t *testing.T) { doInitConsumer() extension.SetProtocol("registry", GetProtocol) - extension.SetCluster(cluster_impl.GetZoneAwareName(), cluster_impl.NewZoneAwareCluster) + extension.SetCluster(constant.ZONEAWARE_CLUSTER_NAME, cluster_impl.NewZoneAwareCluster) for _, reference := range consumerConfig.References { reference.Refer(nil) @@ -203,7 +203,7 @@ func TestReferMultiReg(t *testing.T) { func TestRefer(t *testing.T) { doInitConsumer() extension.SetProtocol("registry", GetProtocol) - extension.SetCluster(cluster_impl.GetZoneAwareName(), cluster_impl.NewZoneAwareCluster) + extension.SetCluster(constant.ZONEAWARE_CLUSTER_NAME, cluster_impl.NewZoneAwareCluster) for _, reference := range consumerConfig.References { reference.Refer(nil) @@ -217,7 +217,7 @@ func TestRefer(t *testing.T) { func TestReferAsync(t *testing.T) { doInitConsumerAsync() extension.SetProtocol("registry", GetProtocol) - extension.SetCluster(cluster_impl.GetZoneAwareName(), cluster_impl.NewZoneAwareCluster) + extension.SetCluster(constant.ZONEAWARE_CLUSTER_NAME, cluster_impl.NewZoneAwareCluster) for _, reference := range consumerConfig.References { reference.Refer(nil) @@ -275,7 +275,7 @@ func TestReferMultiP2PWithReg(t *testing.T) { func TestImplement(t *testing.T) { doInitConsumer() extension.SetProtocol("registry", GetProtocol) - extension.SetCluster(cluster_impl.GetZoneAwareName(), cluster_impl.NewZoneAwareCluster) + extension.SetCluster(constant.ZONEAWARE_CLUSTER_NAME, cluster_impl.NewZoneAwareCluster) for _, reference := range consumerConfig.References { reference.Refer(nil) reference.Implement(&MockService{}) diff --git a/config/registry_config.go b/config/registry_config.go index 2adae5d0379dd1571cf923001b5ed15cee219564..89566c428ed14f460c0f214358c9fa05d529ddb6 100644 --- a/config/registry_config.go +++ b/config/registry_config.go @@ -46,9 +46,11 @@ type RegistryConfig struct { Password string `yaml:"password" json:"password,omitempty" property:"password"` Simplified bool `yaml:"simplified" json:"simplified,omitempty" property:"simplified"` // Always use this registry first if set to true, useful when subscribe to multiple registries - Preferred bool `yaml:"preferred" json:"params,omitempty" property:"preferred"` + Preferred bool `yaml:"preferred" json:"preferred,omitempty" property:"preferred"` // The region where the registry belongs, usually used to isolate traffics - Zone string `yaml:"zone" json:"params,omitempty" property:"zone"` + Zone string `yaml:"zone" json:"zone,omitempty" property:"zone"` + //// Force must user the region, property zone is specified. + //ZoneForce bool `yaml:"zoneForce" json:"zoneForce,omitempty" property:"zoneForce"` // Affects traffic distribution among registries, // useful when subscribe to multiple registries Take effect only when no preferred registry is specified. Weight int64 `yaml:"weight" json:"params,omitempty" property:"weight"` @@ -130,6 +132,7 @@ func (c *RegistryConfig) getUrlMap(roleType common.RoleType) url.Values { urlMap.Set(constant.REGISTRY_KEY+"."+constant.REGISTRY_LABEL_KEY, strconv.FormatBool(true)) urlMap.Set(constant.REGISTRY_KEY+"."+constant.PREFERRED_KEY, strconv.FormatBool(c.Preferred)) urlMap.Set(constant.REGISTRY_KEY+"."+constant.ZONE_KEY, c.Zone) + //urlMap.Set(constant.REGISTRY_KEY+"."+constant.ZONE_FORCE_KEY, strconv.FormatBool(c.ZoneForce)) urlMap.Set(constant.REGISTRY_KEY+"."+constant.WEIGHT_KEY, strconv.FormatInt(c.Weight, 10)) urlMap.Set(constant.REGISTRY_TTL_KEY, c.TTL) for k, v := range c.Params { diff --git a/protocol/invocation.go b/protocol/invocation.go index ba5949794c0120874ebdf31cfb1fd9c7d8ac08e4..296ec0540c1eed69c30e1b1477be038a4a9cc00e 100644 --- a/protocol/invocation.go +++ b/protocol/invocation.go @@ -41,6 +41,8 @@ type Invocation interface { Attributes() map[string]interface{} // AttributeByKey gets attribute by key , if nil then return default value AttributeByKey(string, interface{}) interface{} + // SetAttachments sets attribute by @key and @value. + SetAttachments(key string, value string) // Invoker gets the invoker in current context. Invoker() Invoker }