diff --git a/cluster/cluster_impl/base_cluster_invoker.go b/cluster/cluster_impl/base_cluster_invoker.go index bbdfa715d7cdc461689e60a5a41171ad5c9770e1..ced5b15cb9a1c2292ca866f6f7478ce2b23a30b9 100644 --- a/cluster/cluster_impl/base_cluster_invoker.go +++ b/cluster/cluster_impl/base_cluster_invoker.go @@ -17,6 +17,10 @@ package cluster_impl +import ( + "context" +) + import ( gxnet "github.com/dubbogo/gost/net" perrors "github.com/pkg/errors" @@ -36,6 +40,7 @@ type baseClusterInvoker struct { availablecheck bool destroyed *atomic.Bool stickyInvoker protocol.Invoker + interceptor cluster.ClusterInterceptor } func newBaseClusterInvoker(directory cluster.Directory) baseClusterInvoker { @@ -146,6 +151,20 @@ func (invoker *baseClusterInvoker) doSelectInvoker(lb cluster.LoadBalance, invoc return selectedInvoker } +func (invoker *baseClusterInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result { + if invoker.interceptor != nil { + invoker.interceptor.BeforeInvoker(ctx, invocation) + + result := invoker.interceptor.DoInvoke(ctx, invocation) + + invoker.interceptor.AfterInvoker(ctx, invocation) + + return result + } + + return nil +} + func isInvoked(selectedInvoker protocol.Invoker, invoked []protocol.Invoker) bool { for _, i := range invoked { if i == selectedInvoker { diff --git a/cluster/cluster_impl/failover_cluster.go b/cluster/cluster_impl/failover_cluster.go index 254cc097e3e7f0cc70dfcff01212562bc6febe97..4c09fd16d3eb5ebaf7833443ee0f8a980c81b822 100644 --- a/cluster/cluster_impl/failover_cluster.go +++ b/cluster/cluster_impl/failover_cluster.go @@ -19,16 +19,15 @@ package cluster_impl import ( "github.com/apache/dubbo-go/cluster" + "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/protocol" ) type failoverCluster struct{} -const name = "failover" - func init() { - extension.SetCluster(name, NewFailoverCluster) + extension.SetCluster(constant.FAILOVER_CLUSTER_NAME, NewFailoverCluster) } // NewFailoverCluster returns a failover cluster instance diff --git a/cluster/cluster_impl/registry_aware_cluster_invoker.go b/cluster/cluster_impl/registry_aware_cluster_invoker.go deleted file mode 100644 index 7840da5218090a995298e718a4bbce9d7f5480b4..0000000000000000000000000000000000000000 --- a/cluster/cluster_impl/registry_aware_cluster_invoker.go +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package cluster_impl - -import ( - "context" -) -import ( - "github.com/apache/dubbo-go/cluster" - "github.com/apache/dubbo-go/common/constant" - "github.com/apache/dubbo-go/protocol" -) - -type registryAwareClusterInvoker struct { - baseClusterInvoker -} - -func newRegistryAwareClusterInvoker(directory cluster.Directory) protocol.Invoker { - return ®istryAwareClusterInvoker{ - baseClusterInvoker: newBaseClusterInvoker(directory), - } -} - -// nolint -func (invoker *registryAwareClusterInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result { - invokers := invoker.directory.List(invocation) - //First, pick the invoker (XXXClusterInvoker) that comes from the local registry, distinguish by a 'default' key. - for _, invoker := range invokers { - if invoker.IsAvailable() && invoker.GetUrl().GetParam(constant.REGISTRY_DEFAULT_KEY, "false") == "true" { - return invoker.Invoke(ctx, invocation) - } - } - - //If none of the invokers has a local signal, pick the first one available. - for _, invoker := range invokers { - if invoker.IsAvailable() { - return invoker.Invoke(ctx, invocation) - } - } - return nil -} diff --git a/cluster/cluster_impl/registry_aware_cluster_test.go b/cluster/cluster_impl/registry_aware_cluster_test.go deleted file mode 100644 index 74584b44800fce3342956f4237a63ffbbabf5544..0000000000000000000000000000000000000000 --- a/cluster/cluster_impl/registry_aware_cluster_test.go +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package cluster_impl - -import ( - "context" - "fmt" - "testing" -) -import ( - "github.com/stretchr/testify/assert" -) - -import ( - "github.com/apache/dubbo-go/cluster/directory" - "github.com/apache/dubbo-go/common" - "github.com/apache/dubbo-go/protocol" - "github.com/apache/dubbo-go/protocol/invocation" -) - -func TestRegAwareInvokeSuccess(t *testing.T) { - - regAwareCluster := NewRegistryAwareCluster() - - invokers := []protocol.Invoker{} - for i := 0; i < 10; i++ { - url, _ := common.NewURL(fmt.Sprintf("dubbo://192.168.1.%v:20000/com.ikurento.user.UserProvider", i)) - invokers = append(invokers, NewMockInvoker(url, 1)) - } - - staticDir := directory.NewStaticDirectory(invokers) - clusterInvoker := regAwareCluster.Join(staticDir) - result := clusterInvoker.Invoke(context.Background(), &invocation.RPCInvocation{}) - assert.NoError(t, result.Error()) - count = 0 -} - -func TestDestroy(t *testing.T) { - regAwareCluster := NewRegistryAwareCluster() - - invokers := []protocol.Invoker{} - for i := 0; i < 10; i++ { - url, _ := common.NewURL(fmt.Sprintf("dubbo://192.168.1.%v:20000/com.ikurento.user.UserProvider", i)) - invokers = append(invokers, NewMockInvoker(url, 1)) - } - - staticDir := directory.NewStaticDirectory(invokers) - clusterInvoker := regAwareCluster.Join(staticDir) - assert.Equal(t, true, clusterInvoker.IsAvailable()) - result := clusterInvoker.Invoke(context.Background(), &invocation.RPCInvocation{}) - assert.NoError(t, result.Error()) - count = 0 - clusterInvoker.Destroy() - assert.Equal(t, false, clusterInvoker.IsAvailable()) - -} diff --git a/cluster/cluster_impl/registry_aware_cluster.go b/cluster/cluster_impl/zone_aware_cluster.go similarity index 63% rename from cluster/cluster_impl/registry_aware_cluster.go rename to cluster/cluster_impl/zone_aware_cluster.go index f4c08973713e6b3736b4ca1ed0d6e5275cf61d01..7439db2d374f73571f3c8b95d8faca6cb98d4cd0 100644 --- a/cluster/cluster_impl/registry_aware_cluster.go +++ b/cluster/cluster_impl/zone_aware_cluster.go @@ -19,22 +19,26 @@ package cluster_impl import ( "github.com/apache/dubbo-go/cluster" + "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/protocol" ) -type registryAwareCluster struct{} +type zoneAwareCluster struct{} func init() { - extension.SetCluster("registryAware", NewRegistryAwareCluster) + extension.SetCluster(constant.ZONEAWARE_CLUSTER_NAME, NewZoneAwareCluster) } -// NewRegistryAwareCluster returns a registry aware cluster instance -func NewRegistryAwareCluster() cluster.Cluster { - return ®istryAwareCluster{} +// NewZoneAwareCluster returns a zoneaware cluster instance. +// +// More than one registry for subscription. +// Usually it is used for choose between registries. +func NewZoneAwareCluster() cluster.Cluster { + return &zoneAwareCluster{} } -// nolint -func (cluster *registryAwareCluster) Join(directory cluster.Directory) protocol.Invoker { - return newRegistryAwareClusterInvoker(directory) +// Join returns a zoneAwareClusterInvoker instance +func (cluster *zoneAwareCluster) Join(directory cluster.Directory) protocol.Invoker { + return newZoneAwareClusterInvoker(directory) } diff --git a/cluster/cluster_impl/zone_aware_cluster_invoker.go b/cluster/cluster_impl/zone_aware_cluster_invoker.go new file mode 100644 index 0000000000000000000000000000000000000000..0f52b0442c235d570e4cc9d8491aa80a9df5842d --- /dev/null +++ b/cluster/cluster_impl/zone_aware_cluster_invoker.go @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cluster_impl + +import ( + "context" + "fmt" +) + +import ( + "github.com/apache/dubbo-go/cluster" + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/protocol" +) + +// When there're more than one registry for subscription. +// +// This extension provides a strategy to decide how to distribute traffics among them: +// 1. registry marked as 'preferred=true' has the highest priority. +// 2. check the zone the current request belongs, pick the registry that has the same zone first. +// 3. Evenly balance traffic between all registries based on each registry's weight. +// 4. Pick anyone that's available. +type zoneAwareClusterInvoker struct { + baseClusterInvoker +} + +func newZoneAwareClusterInvoker(directory cluster.Directory) protocol.Invoker { + invoke := &zoneAwareClusterInvoker{ + baseClusterInvoker: newBaseClusterInvoker(directory), + } + // add self to interceptor + invoke.interceptor = invoke + return invoke +} + +// nolint +func (invoker *zoneAwareClusterInvoker) DoInvoke(ctx context.Context, invocation protocol.Invocation) protocol.Result { + invokers := invoker.directory.List(invocation) + + err := invoker.checkInvokers(invokers, invocation) + if err != nil { + return &protocol.RPCResult{Err: err} + } + + // First, pick the invoker (XXXClusterInvoker) that comes from the local registry, distinguish by a 'preferred' key. + for _, invoker := range invokers { + key := constant.REGISTRY_KEY + "." + constant.PREFERRED_KEY + if invoker.IsAvailable() && matchParam("true", key, "false", invoker) { + return invoker.Invoke(ctx, invocation) + } + } + + // providers in the registry with the same zone + key := constant.REGISTRY_KEY + "." + constant.ZONE_KEY + zone := invocation.AttachmentsByKey(key, "") + if "" != zone { + for _, invoker := range invokers { + if invoker.IsAvailable() && matchParam(zone, key, "", invoker) { + return invoker.Invoke(ctx, invocation) + } + } + + force := invocation.AttachmentsByKey(constant.REGISTRY_KEY+"."+constant.ZONE_FORCE_KEY, "") + if "true" == force { + return &protocol.RPCResult{ + Err: fmt.Errorf("no registry instance in zone or "+ + "no available providers in the registry, zone: %v, "+ + " registries: %v", zone, invoker.GetUrl()), + } + } + } + + // load balance among all registries, with registry weight count in. + loadBalance := getLoadBalance(invokers[0], invocation) + ivk := invoker.doSelect(loadBalance, invocation, invokers, nil) + if ivk != nil && ivk.IsAvailable() { + return ivk.Invoke(ctx, invocation) + } + + // If none of the invokers has a preferred signal or is picked by the loadBalancer, pick the first one available. + for _, invoker := range invokers { + if invoker.IsAvailable() { + return invoker.Invoke(ctx, invocation) + } + } + + return &protocol.RPCResult{ + Err: fmt.Errorf("no provider available in %v", invokers), + } +} + +func (invoker *zoneAwareClusterInvoker) BeforeInvoker(ctx context.Context, invocation protocol.Invocation) { + key := constant.REGISTRY_KEY + "." + constant.ZONE_FORCE_KEY + force := ctx.Value(key) + + if force != nil { + switch value := force.(type) { + case bool: + if value { + invocation.SetAttachments(key, "true") + } + case string: + if "true" == value { + invocation.SetAttachments(key, "true") + } + default: + // ignore + } + } +} + +func (invoker *zoneAwareClusterInvoker) AfterInvoker(ctx context.Context, invocation protocol.Invocation) { + +} + +func matchParam(target, key, def string, invoker protocol.Invoker) bool { + return target == invoker.GetUrl().GetParam(key, def) +} diff --git a/cluster/cluster_impl/zone_aware_cluster_invoker_test.go b/cluster/cluster_impl/zone_aware_cluster_invoker_test.go new file mode 100644 index 0000000000000000000000000000000000000000..cd201a42c759354ca536ea3e9e77116d89ea8b4b --- /dev/null +++ b/cluster/cluster_impl/zone_aware_cluster_invoker_test.go @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cluster_impl + +import ( + "context" + "fmt" + "testing" +) + +import ( + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" +) + +import ( + "github.com/apache/dubbo-go/cluster/directory" + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/protocol" + "github.com/apache/dubbo-go/protocol/invocation" + "github.com/apache/dubbo-go/protocol/mock" +) + +func TestZoneWareInvokerWithPreferredSuccess(t *testing.T) { + ctrl := gomock.NewController(t) + // In Go versions 1.14+, if you pass a *testing.T + // into gomock.NewController(t) you no longer need to call ctrl.Finish(). + //defer ctrl.Finish() + + mockResult := &protocol.RPCResult{ + Attrs: map[string]string{constant.PREFERRED_KEY: "true"}, + Rest: rest{tried: 0, success: true}} + + var invokers []protocol.Invoker + for i := 0; i < 2; i++ { + url, _ := common.NewURL(fmt.Sprintf("dubbo://192.168.1.%v:20000/com.ikurento.user.UserProvider", i)) + invoker := mock.NewMockInvoker(ctrl) + invoker.EXPECT().IsAvailable().Return(true).AnyTimes() + invoker.EXPECT().GetUrl().Return(url).AnyTimes() + if 0 == i { + url.SetParam(constant.REGISTRY_KEY+"."+constant.PREFERRED_KEY, "true") + invoker.EXPECT().Invoke(gomock.Any()).DoAndReturn( + func(invocation protocol.Invocation) protocol.Result { + return mockResult + }) + } else { + invoker.EXPECT().Invoke(gomock.Any()).DoAndReturn( + func(invocation protocol.Invocation) protocol.Result { + return &protocol.RPCResult{} + }) + } + + invokers = append(invokers, invoker) + } + + zoneAwareCluster := NewZoneAwareCluster() + staticDir := directory.NewStaticDirectory(invokers) + clusterInvoker := zoneAwareCluster.Join(staticDir) + + result := clusterInvoker.Invoke(context.Background(), &invocation.RPCInvocation{}) + + assert.Equal(t, mockResult, result) +} + +func TestZoneWareInvokerWithWeightSuccess(t *testing.T) { + ctrl := gomock.NewController(t) + // In Go versions 1.14+, if you pass a *testing.T + // into gomock.NewController(t) you no longer need to call ctrl.Finish(). + //defer ctrl.Finish() + + w1 := "50" + w2 := "200" + + var invokers []protocol.Invoker + for i := 0; i < 2; i++ { + url, _ := common.NewURL(fmt.Sprintf("dubbo://192.168.1.%v:20000/com.ikurento.user.UserProvider", i)) + invoker := mock.NewMockInvoker(ctrl) + invoker.EXPECT().IsAvailable().Return(true).AnyTimes() + invoker.EXPECT().GetUrl().Return(url).AnyTimes() + url.SetParam(constant.REGISTRY_KEY+"."+constant.REGISTRY_LABEL_KEY, "true") + if 1 == i { + url.SetParam(constant.REGISTRY_KEY+"."+constant.WEIGHT_KEY, w1) + invoker.EXPECT().Invoke(gomock.Any()).DoAndReturn( + func(invocation protocol.Invocation) protocol.Result { + return &protocol.RPCResult{ + Attrs: map[string]string{constant.WEIGHT_KEY: w1}, + Rest: rest{tried: 0, success: true}} + }).MaxTimes(100) + } else { + url.SetParam(constant.REGISTRY_KEY+"."+constant.WEIGHT_KEY, w2) + invoker.EXPECT().Invoke(gomock.Any()).DoAndReturn( + func(invocation protocol.Invocation) protocol.Result { + return &protocol.RPCResult{ + Attrs: map[string]string{constant.WEIGHT_KEY: w2}, + Rest: rest{tried: 0, success: true}} + }).MaxTimes(100) + } + invokers = append(invokers, invoker) + } + + zoneAwareCluster := NewZoneAwareCluster() + staticDir := directory.NewStaticDirectory(invokers) + clusterInvoker := zoneAwareCluster.Join(staticDir) + + var w2Count, w1Count int + loop := 50 + for i := 0; i < loop; i++ { + result := clusterInvoker.Invoke(context.Background(), &invocation.RPCInvocation{}) + if w2 == result.Attachment(constant.WEIGHT_KEY, "0") { + w2Count++ + } + if w1 == result.Attachment(constant.WEIGHT_KEY, "0") { + w1Count++ + } + assert.NoError(t, result.Error()) + } + t.Logf("loop count : %d, w1 height : %s | count : %d, w2 height : %s | count : %d", loop, + w1, w1Count, w2, w2Count) +} + +func TestZoneWareInvokerWithZoneSuccess(t *testing.T) { + var zoneArray = []string{"hangzhou", "shanghai"} + + ctrl := gomock.NewController(t) + // In Go versions 1.14+, if you pass a *testing.T + // into gomock.NewController(t) you no longer need to call ctrl.Finish(). + //defer ctrl.Finish() + + var invokers []protocol.Invoker + for i := 0; i < 2; i++ { + zoneValue := zoneArray[i] + url, _ := common.NewURL(fmt.Sprintf("dubbo://192.168.1.%v:20000/com.ikurento.user.UserProvider", i)) + url.SetParam(constant.REGISTRY_KEY+"."+constant.ZONE_KEY, zoneValue) + + invoker := mock.NewMockInvoker(ctrl) + invoker.EXPECT().IsAvailable().Return(true).AnyTimes() + invoker.EXPECT().GetUrl().Return(url).AnyTimes() + invoker.EXPECT().Invoke(gomock.Any()).DoAndReturn( + func(invocation protocol.Invocation) protocol.Result { + return &protocol.RPCResult{ + Attrs: map[string]string{constant.ZONE_KEY: zoneValue}, + Rest: rest{tried: 0, success: true}} + }) + invokers = append(invokers, invoker) + } + + zoneAwareCluster := NewZoneAwareCluster() + staticDir := directory.NewStaticDirectory(invokers) + clusterInvoker := zoneAwareCluster.Join(staticDir) + + inv := &invocation.RPCInvocation{} + // zone hangzhou + hz := zoneArray[0] + inv.SetAttachments(constant.REGISTRY_KEY+"."+constant.ZONE_KEY, hz) + + result := clusterInvoker.Invoke(context.Background(), inv) + + assert.Equal(t, hz, result.Attachment(constant.ZONE_KEY, "")) +} + +func TestZoneWareInvokerWithZoneForceFail(t *testing.T) { + ctrl := gomock.NewController(t) + // In Go versions 1.14+, if you pass a *testing.T + // into gomock.NewController(t) you no longer need to call ctrl.Finish(). + //defer ctrl.Finish() + + var invokers []protocol.Invoker + for i := 0; i < 2; i++ { + url, _ := common.NewURL(fmt.Sprintf("dubbo://192.168.1.%v:20000/com.ikurento.user.UserProvider", i)) + + invoker := mock.NewMockInvoker(ctrl) + invoker.EXPECT().IsAvailable().Return(true).AnyTimes() + invoker.EXPECT().GetUrl().Return(url).AnyTimes() + invokers = append(invokers, invoker) + } + + zoneAwareCluster := NewZoneAwareCluster() + staticDir := directory.NewStaticDirectory(invokers) + clusterInvoker := zoneAwareCluster.Join(staticDir) + + inv := &invocation.RPCInvocation{} + // zone hangzhou + inv.SetAttachments(constant.REGISTRY_KEY+"."+constant.ZONE_KEY, "hangzhou") + // zone force + inv.SetAttachments(constant.REGISTRY_KEY+"."+constant.ZONE_FORCE_KEY, "true") + + result := clusterInvoker.Invoke(context.Background(), inv) + + assert.NotNil(t, result.Error()) +} diff --git a/cluster/cluster_interceptor.go b/cluster/cluster_interceptor.go new file mode 100644 index 0000000000000000000000000000000000000000..a627e81365781e83932668872cd94a2cdec3b71e --- /dev/null +++ b/cluster/cluster_interceptor.go @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cluster + +import ( + "context" +) + +import ( + "github.com/apache/dubbo-go/protocol" +) + +// ClusterInterceptor +// Extension - ClusterInterceptor +type ClusterInterceptor interface { + // Before DoInvoke method + BeforeInvoker(ctx context.Context, invocation protocol.Invocation) + + // After DoInvoke method + AfterInvoker(ctx context.Context, invocation protocol.Invocation) + + // Corresponding cluster invoke + DoInvoke(ctx context.Context, invocation protocol.Invocation) protocol.Result +} diff --git a/cluster/directory/base_directory.go b/cluster/directory/base_directory.go index 8a7e0c0e8359c69faf0e504adeb1778c38a08e04..651c90ff6862bd75e604b0572bb15e583a00065e 100644 --- a/cluster/directory/base_directory.go +++ b/cluster/directory/base_directory.go @@ -85,15 +85,21 @@ func (dir *BaseDirectory) SetRouters(urls []*common.URL) { for _, url := range urls { routerKey := url.GetParam(constant.ROUTER_KEY, "") - if len(routerKey) > 0 { - factory := extension.GetRouterFactory(url.Protocol) - r, err := factory.NewPriorityRouter(url) - if err != nil { - logger.Errorf("Create router fail. router key: %s, url:%s, error: %+v", routerKey, url.Service(), err) - return + if len(routerKey) == 0 { + continue + } + if url.Protocol == constant.CONDITION_ROUTE_PROTOCOL { + if !dir.isProperRouter(url) { + continue } - routers = append(routers, r) } + factory := extension.GetRouterFactory(url.Protocol) + r, err := factory.NewPriorityRouter(url) + if err != nil { + logger.Errorf("Create router fail. router key: %s, url:%s, error: %+v", routerKey, url.Service(), err) + return + } + routers = append(routers, r) } logger.Infof("Init file condition router success, size: %v", len(routers)) @@ -104,6 +110,21 @@ func (dir *BaseDirectory) SetRouters(urls []*common.URL) { rc.AddRouters(routers) } +func (dir *BaseDirectory) isProperRouter(url *common.URL) bool { + app := url.GetParam(constant.APPLICATION_KEY, "") + serviceKey := dir.GetUrl().ServiceKey() + if serviceKey == "" { + serviceKey = dir.GetUrl().SubURL.ServiceKey() + } + if len(app) > 0 && app == dir.GetUrl().GetParam(constant.APPLICATION_KEY, "") { + return true + } + if url.ServiceKey() == serviceKey { + return true + } + return false +} + // Destroy Destroy func (dir *BaseDirectory) Destroy(doDestroy func()) { if dir.destroyed.CAS(false, true) { diff --git a/cluster/directory/base_directory_test.go b/cluster/directory/base_directory_test.go index 8b60163b79b7120829e51f69238474a127133fb4..a2b62dfa008e6cd17b1200d93cd235da17d03905 100644 --- a/cluster/directory/base_directory_test.go +++ b/cluster/directory/base_directory_test.go @@ -37,7 +37,7 @@ import ( var ( url, _ = common.NewURL( fmt.Sprintf("dubbo://%s:%d/com.ikurento.user.UserProvider", constant.LOCAL_HOST_VALUE, constant.DEFAULT_PORT)) - anyUrl, _ = common.NewURL(fmt.Sprintf("condition://%s/com.foo.BarService", constant.ANYHOST_VALUE)) + anyURL, _ = common.NewURL(fmt.Sprintf("condition://%s/com.foo.BarService", constant.ANYHOST_VALUE)) ) func TestNewBaseDirectory(t *testing.T) { @@ -48,13 +48,17 @@ func TestNewBaseDirectory(t *testing.T) { } func TestBuildRouterChain(t *testing.T) { - directory := NewBaseDirectory(&url) + + regURL := url + regURL.AddParam(constant.INTERFACE_KEY, "mock-app") + directory := NewBaseDirectory(®URL) assert.NotNil(t, directory) localIP, _ := gxnet.GetLocalIP() rule := base64.URLEncoding.EncodeToString([]byte("true => " + " host = " + localIP)) - routeURL := getRouteUrl(rule) + routeURL := getRouteURL(rule, anyURL) + routeURL.AddParam(constant.INTERFACE_KEY, "mock-app") routerURLs := make([]*common.URL, 0) routerURLs = append(routerURLs, routeURL) directory.SetRouters(routerURLs) @@ -63,9 +67,53 @@ func TestBuildRouterChain(t *testing.T) { assert.NotNil(t, chain) } -func getRouteUrl(rule string) *common.URL { - anyUrl.AddParam("rule", rule) - anyUrl.AddParam("force", "true") - anyUrl.AddParam(constant.ROUTER_KEY, "router") - return &url +func getRouteURL(rule string, u common.URL) *common.URL { + ru := u + ru.AddParam("rule", rule) + ru.AddParam("force", "true") + ru.AddParam(constant.ROUTER_KEY, "router") + return &ru +} + +func TestIsProperRouter(t *testing.T) { + regURL := url + regURL.AddParam(constant.APPLICATION_KEY, "mock-app") + d := NewBaseDirectory(®URL) + localIP, _ := gxnet.GetLocalIP() + rule := base64.URLEncoding.EncodeToString([]byte("true => " + " host = " + localIP)) + routeURL := getRouteURL(rule, anyURL) + routeURL.AddParam(constant.APPLICATION_KEY, "mock-app") + rst := d.isProperRouter(routeURL) + assert.True(t, rst) + + regURL.AddParam(constant.APPLICATION_KEY, "") + regURL.AddParam(constant.INTERFACE_KEY, "com.foo.BarService") + d = NewBaseDirectory(®URL) + routeURL = getRouteURL(rule, anyURL) + routeURL.AddParam(constant.INTERFACE_KEY, "com.foo.BarService") + rst = d.isProperRouter(routeURL) + assert.True(t, rst) + + regURL.AddParam(constant.APPLICATION_KEY, "") + regURL.AddParam(constant.INTERFACE_KEY, "") + d = NewBaseDirectory(®URL) + routeURL = getRouteURL(rule, anyURL) + rst = d.isProperRouter(routeURL) + assert.True(t, rst) + + regURL.SetParam(constant.APPLICATION_KEY, "") + regURL.SetParam(constant.INTERFACE_KEY, "") + d = NewBaseDirectory(®URL) + routeURL = getRouteURL(rule, anyURL) + routeURL.AddParam(constant.APPLICATION_KEY, "mock-service") + rst = d.isProperRouter(routeURL) + assert.False(t, rst) + + regURL.SetParam(constant.APPLICATION_KEY, "") + regURL.SetParam(constant.INTERFACE_KEY, "") + d = NewBaseDirectory(®URL) + routeURL = getRouteURL(rule, anyURL) + routeURL.AddParam(constant.INTERFACE_KEY, "mock-service") + rst = d.isProperRouter(routeURL) + assert.False(t, rst) } diff --git a/cluster/loadbalance/util.go b/cluster/loadbalance/util.go index b6c013852bf55ce7eb67e4fa18802a938141d283..684ffe11a72058e188fdbcc5f8aa56fe16073619 100644 --- a/cluster/loadbalance/util.go +++ b/cluster/loadbalance/util.go @@ -28,23 +28,35 @@ import ( // GetWeight gets weight for load balance strategy func GetWeight(invoker protocol.Invoker, invocation protocol.Invocation) int64 { + var weight int64 url := invoker.GetUrl() - weight := url.GetMethodParamInt64(invocation.MethodName(), constant.WEIGHT_KEY, constant.DEFAULT_WEIGHT) + // Multiple registry scenario, load balance among multiple registries. + isRegIvk := url.GetParamBool(constant.REGISTRY_KEY+"."+constant.REGISTRY_LABEL_KEY, false) + if isRegIvk { + weight = url.GetParamInt(constant.REGISTRY_KEY+"."+constant.WEIGHT_KEY, constant.DEFAULT_WEIGHT) + } else { + weight = url.GetMethodParamInt64(invocation.MethodName(), constant.WEIGHT_KEY, constant.DEFAULT_WEIGHT) - if weight > 0 { - //get service register time an do warm up time - now := time.Now().Unix() - timestamp := url.GetParamInt(constant.REMOTE_TIMESTAMP_KEY, now) - if uptime := now - timestamp; uptime > 0 { - warmup := url.GetParamInt(constant.WARMUP_KEY, constant.DEFAULT_WARMUP) - if uptime < warmup { - if ww := float64(uptime) / float64(warmup) / float64(weight); ww < 1 { - weight = 1 - } else if int64(ww) <= weight { - weight = int64(ww) + if weight > 0 { + //get service register time an do warm up time + now := time.Now().Unix() + timestamp := url.GetParamInt(constant.REMOTE_TIMESTAMP_KEY, now) + if uptime := now - timestamp; uptime > 0 { + warmup := url.GetParamInt(constant.WARMUP_KEY, constant.DEFAULT_WARMUP) + if uptime < warmup { + if ww := float64(uptime) / float64(warmup) / float64(weight); ww < 1 { + weight = 1 + } else if int64(ww) <= weight { + weight = int64(ww) + } } } } } + + if weight < 0 { + weight = 0 + } + return weight } diff --git a/cluster/router/chain/chain_test.go b/cluster/router/chain/chain_test.go index c1f723525f5307e7732f0ea1ecc27eca7ba09c8d..065f0e49d9a2a465c7bb26a9541b34ad569bb93a 100644 --- a/cluster/router/chain/chain_test.go +++ b/cluster/router/chain/chain_test.go @@ -66,7 +66,9 @@ func TestNewRouterChain(t *testing.T) { err = z.Create(path) assert.NoError(t, err) - testyml := `enabled: true + testyml := `scope: application +key: mock-app +enabled: true force: true runtime: false conditions: @@ -93,7 +95,7 @@ conditions: assert.NotNil(t, appRouter) assert.NotNil(t, appRouter.RouterRule()) rule := appRouter.RouterRule() - assert.Equal(t, "", rule.Scope) + assert.Equal(t, "application", rule.Scope) assert.True(t, rule.Force) assert.True(t, rule.Enabled) assert.True(t, rule.Valid) @@ -101,7 +103,7 @@ conditions: assert.Equal(t, testyml, rule.RawRule) assert.Equal(t, false, rule.Runtime) assert.Equal(t, false, rule.Dynamic) - assert.Equal(t, "", rule.Key) + assert.Equal(t, "mock-app", rule.Key) } func TestNewRouterChainURLNil(t *testing.T) { @@ -116,7 +118,9 @@ func TestRouterChainAddRouters(t *testing.T) { err = z.Create(path) assert.NoError(t, err) - testyml := `enabled: true + testyml := `scope: application +key: mock-app +enabled: true force: true runtime: false conditions: @@ -182,7 +186,9 @@ func TestRouterChainRouteAppRouter(t *testing.T) { err = z.Create(path) assert.NoError(t, err) - testyml := `enabled: true + testyml := `scope: application +key: mock-app +enabled: true force: true runtime: false conditions: diff --git a/cluster/router/condition/app_router_test.go b/cluster/router/condition/app_router_test.go index 8b38f2dd6136b4d31f46e7214c0ad1359537b198..53465f90516fed1d5cf94b8ecc077cbfde6c743f 100644 --- a/cluster/router/condition/app_router_test.go +++ b/cluster/router/condition/app_router_test.go @@ -52,7 +52,9 @@ var ( func TestNewAppRouter(t *testing.T) { - testYML := `enabled: true + testYML := `scope: application +key: mock-app +enabled: true force: true runtime: false conditions: @@ -83,7 +85,7 @@ conditions: assert.NotNil(t, appRouter) assert.NotNil(t, appRouter.RouterRule()) rule := appRouter.RouterRule() - assert.Equal(t, "", rule.Scope) + assert.Equal(t, "application", rule.Scope) assert.True(t, rule.Force) assert.True(t, rule.Enabled) assert.True(t, rule.Valid) @@ -91,13 +93,15 @@ conditions: assert.Equal(t, testYML, rule.RawRule) assert.Equal(t, false, rule.Runtime) assert.Equal(t, false, rule.Dynamic) - assert.Equal(t, "", rule.Key) + assert.Equal(t, "mock-app", rule.Key) assert.Equal(t, 0, rule.Priority) } func TestGenerateConditions(t *testing.T) { - testYML := `enabled: true + testYML := `scope: application +key: mock-app +enabled: true force: true runtime: false conditions: @@ -135,7 +139,9 @@ conditions: func TestProcess(t *testing.T) { - testYML := `enabled: true + testYML := `scope: application +key: mock-app +enabled: true force: true runtime: false conditions: @@ -165,7 +171,8 @@ conditions: assert.Equal(t, 1, len(appRouter.conditionRouters)) - testNewYML := ` + testNewYML := `scope: application +key: mock-app enabled: true force: true runtime: false diff --git a/cluster/router/condition/file.go b/cluster/router/condition/file.go index eabdf1c263446140b359b3e791238b020cecb50c..996db7443faff88d3baa1a2363f98fa62623d121 100644 --- a/cluster/router/condition/file.go +++ b/cluster/router/condition/file.go @@ -20,6 +20,7 @@ package condition import ( "encoding/base64" "net/url" + "regexp" "strconv" "strings" "sync" @@ -71,11 +72,53 @@ func (f *FileConditionRouter) URL() common.URL { common.WithParamsValue(constant.RouterPriority, strconv.Itoa(routerRule.Priority)), common.WithParamsValue(constant.RULE_KEY, base64.URLEncoding.EncodeToString([]byte(rule))), common.WithParamsValue(constant.ROUTER_KEY, constant.CONDITION_ROUTE_PROTOCOL), - common.WithParamsValue(constant.CATEGORY_KEY, constant.ROUTERS_CATEGORY)) + common.WithParamsValue(constant.CATEGORY_KEY, constant.ROUTERS_CATEGORY), + ) + if routerRule.Scope == constant.RouterApplicationScope { + f.url.AddParam(constant.APPLICATION_KEY, routerRule.Key) + return + } + grp, srv, ver, e := parseServiceRouterKey(routerRule.Key) + if e != nil { + return + } + if len(grp) > 0 { + f.url.AddParam(constant.GROUP_KEY, grp) + } + if len(ver) > 0 { + f.url.AddParam(constant.VERSION_KEY, ver) + } + if len(srv) > 0 { + f.url.AddParam(constant.INTERFACE_KEY, srv) + } }) return f.url } +// The input value must follow [{group}/]{service}[:{version}] pattern +// the returning strings are representing group, service, version respectively. +// input: mock-group/mock-service:1.0.0 ==> "mock-group", "mock-service", "1.0.0" +// input: mock-group/mock-service ==> "mock-group", "mock-service", "" +// input: mock-service:1.0.0 ==> "", "mock-service", "1.0.0" +// For more samples, please refer to unit test. +func parseServiceRouterKey(key string) (string, string, string, error) { + if len(strings.TrimSpace(key)) == 0 { + return "", "", "", nil + } + reg := regexp.MustCompile(`(.*/{1})?([^:/]+)(:{1}[^:]*)?`) + strs := reg.FindAllStringSubmatch(key, -1) + if strs == nil || len(strs) > 1 { + return "", "", "", perrors.Errorf("Invalid key, service key must follow [{group}/]{service}[:{version}] pattern") + } + if len(strs[0]) != 4 { + return "", "", "", perrors.Errorf("Parse service router key failed") + } + grp := strings.TrimSpace(strings.TrimRight(strs[0][1], "/")) + srv := strings.TrimSpace(strs[0][2]) + ver := strings.TrimSpace(strings.TrimLeft(strs[0][3], ":")) + return grp, srv, ver, nil +} + func parseCondition(conditions []string) string { var when, then string for _, condition := range conditions { diff --git a/cluster/router/condition/file_test.go b/cluster/router/condition/file_test.go index 3092b12ff88dcacc9108c7cdd46ba1ac9f74eb2b..bd19a0d18c6692af181ffef77c5cd3f9fc16d67d 100644 --- a/cluster/router/condition/file_test.go +++ b/cluster/router/condition/file_test.go @@ -26,7 +26,9 @@ import ( ) func TestLoadYmlConfig(t *testing.T) { - router, e := NewFileConditionRouter([]byte(`priority: 1 + router, e := NewFileConditionRouter([]byte(`scope: application +key: mock-app +priority: 1 force: true conditions : - "a => b" @@ -47,12 +49,78 @@ func TestParseCondition(t *testing.T) { } func TestFileRouterURL(t *testing.T) { - router, e := NewFileConditionRouter([]byte(`priority: 1 + router, e := NewFileConditionRouter([]byte(`scope: application +key: mock-app +priority: 1 force: true conditions : - "a => b" - "c => d"`)) assert.Nil(t, e) assert.NotNil(t, router) - assert.Equal(t, "condition://0.0.0.0:?category=routers&force=true&priority=1&router=condition&rule=YSAmIGMgPT4gYiAmIGQ%3D", router.URL().String()) + assert.Equal(t, "condition://0.0.0.0:?application=mock-app&category=routers&force=true&priority=1&router=condition&rule=YSAmIGMgPT4gYiAmIGQ%3D", router.URL().String()) + + router, e = NewFileConditionRouter([]byte(`scope: service +key: mock-service +priority: 1 +force: true +conditions : + - "a => b" + - "c => d"`)) + assert.Nil(t, e) + assert.NotNil(t, router) + assert.Equal(t, "condition://0.0.0.0:?category=routers&force=true&interface=mock-service&priority=1&router=condition&rule=YSAmIGMgPT4gYiAmIGQ%3D", router.URL().String()) + + router, e = NewFileConditionRouter([]byte(`scope: service +key: grp1/mock-service:v1 +priority: 1 +force: true +conditions : + - "a => b" + - "c => d"`)) + assert.Nil(t, e) + assert.NotNil(t, router) + assert.Equal(t, "condition://0.0.0.0:?category=routers&force=true&group=grp1&interface=mock-service&priority=1&router=condition&rule=YSAmIGMgPT4gYiAmIGQ%3D&version=v1", router.URL().String()) +} + +func TestParseServiceRouterKey(t *testing.T) { + testString := " mock-group / mock-service:1.0.0" + grp, srv, ver, err := parseServiceRouterKey(testString) + assert.Equal(t, "mock-group", grp) + assert.Equal(t, "mock-service", srv) + assert.Equal(t, "1.0.0", ver) + + testString = "mock-group/mock-service" + grp, srv, ver, err = parseServiceRouterKey(testString) + assert.Equal(t, "mock-group", grp) + assert.Equal(t, "mock-service", srv) + assert.Equal(t, "", ver) + + testString = "mock-service:1.0.0" + grp, srv, ver, err = parseServiceRouterKey(testString) + assert.Equal(t, "", grp) + assert.Equal(t, "mock-service", srv) + assert.Equal(t, "1.0.0", ver) + + testString = "mock-service" + grp, srv, ver, err = parseServiceRouterKey(testString) + assert.Equal(t, "", grp) + assert.Equal(t, "mock-service", srv) + assert.Equal(t, "", ver) + + testString = "/mock-service:" + grp, srv, ver, err = parseServiceRouterKey(testString) + assert.Equal(t, "", grp) + assert.Equal(t, "mock-service", srv) + assert.Equal(t, "", ver) + + testString = "grp:mock-service:123" + grp, srv, ver, err = parseServiceRouterKey(testString) + assert.Error(t, err) + + testString = "" + grp, srv, ver, err = parseServiceRouterKey(testString) + assert.Equal(t, "", grp) + assert.Equal(t, "", srv) + assert.Equal(t, "", ver) } diff --git a/cluster/router/condition/router.go b/cluster/router/condition/router.go index 40a251573f5e73d40032972313565d98b288b1b1..800293da6c59f07a4bff1b6b832f482273394cec 100644 --- a/cluster/router/condition/router.go +++ b/cluster/router/condition/router.go @@ -117,7 +117,13 @@ func NewConditionRouter(url *common.URL) (*ConditionRouter, error) { } router.url = url - router.priority = url.GetParamInt(constant.RouterPriority, 0) + var defaultPriority int64 = 0 + if url.GetParam(constant.APPLICATION_KEY, "") != "" { + defaultPriority = 150 + } else if url.GetParam(constant.INTERFACE_KEY, "") != "" { + defaultPriority = 140 + } + router.priority = url.GetParamInt(constant.RouterPriority, defaultPriority) router.Force = url.GetParamBool(constant.RouterForce, false) router.enabled = url.GetParamBool(constant.RouterEnabled, true) diff --git a/cluster/router/condition/router_rule.go b/cluster/router/condition/router_rule.go index ce397d6cc0f51519123dd427709e8dba42d72a20..9f2bf2d41a941398698f110f77a7a3051a0c9c11 100644 --- a/cluster/router/condition/router_rule.go +++ b/cluster/router/condition/router_rule.go @@ -28,12 +28,13 @@ import ( import ( "github.com/apache/dubbo-go/cluster/router" "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/yaml" ) // RouterRule RouterRule config read from config file or config center type RouterRule struct { - router.BaseRouterRule `yaml:",inline""` + router.BaseRouterRule `yaml:",inline"` Conditions []string } @@ -57,7 +58,7 @@ func getRule(rawRule string) (*RouterRule, error) { return r, err } r.RawRule = rawRule - if len(r.Conditions) != 0 { + if len(r.Conditions) > 0 && len(r.Key) > 0 && (r.Scope == constant.RouterApplicationScope || r.Scope == constant.RouterServiceScope) { r.Valid = true } diff --git a/cluster/router/condition/router_rule_test.go b/cluster/router/condition/router_rule_test.go index 675acaec912b413d8fa3d1a25463b1fd4813a7f5..369b14f08a6a650b36fe63a2d890c04fe7b892a5 100644 --- a/cluster/router/condition/router_rule_test.go +++ b/cluster/router/condition/router_rule_test.go @@ -32,6 +32,7 @@ import ( func TestGetRule(t *testing.T) { testyml := ` scope: application +key: test-provider runtime: true force: false conditions: @@ -50,10 +51,31 @@ conditions: assert.True(t, rule.Runtime) assert.Equal(t, false, rule.Force) assert.Equal(t, testyml, rule.RawRule) - assert.True(t, true, rule.Valid) + assert.True(t, rule.Valid) assert.Equal(t, false, rule.Enabled) assert.Equal(t, false, rule.Dynamic) - assert.Equal(t, "", rule.Key) + assert.Equal(t, "test-provider", rule.Key) + + testyml = ` +key: test-provider +runtime: true +force: false +conditions: + - > + method!=sayHello =>` + rule, e = getRule(testyml) + assert.Nil(t, e) + assert.False(t, rule.Valid) + + testyml = ` +scope: noApplication +key: test-provider +conditions: + - > + method!=sayHello =>` + rule, e = getRule(testyml) + assert.Nil(t, e) + assert.False(t, rule.Valid) } func TestIsMatchGlobPattern(t *testing.T) { diff --git a/cluster/router/condition/router_test.go b/cluster/router/condition/router_test.go new file mode 100644 index 0000000000000000000000000000000000000000..c27a1d9552a331d7e67af0eb3d444c480758891e --- /dev/null +++ b/cluster/router/condition/router_test.go @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package condition + +import ( + "testing" +) + +import ( + "github.com/apache/dubbo-go/common" + "github.com/dubbogo/gost/container/set" + "github.com/stretchr/testify/assert" +) + +func TestParseRule(t *testing.T) { + testString := `` + matchPair, err := parseRule(testString) + assert.Nil(t, err) + assert.EqualValues(t, matchPair, make(map[string]MatchPair, 16)) + + testString = `method!=sayHello&application=sayGoodBye` + matchPair, err = parseRule(testString) + assert.Nil(t, err) + assert.EqualValues(t, matchPair["method"].Mismatches, gxset.NewSet("sayHello")) + assert.EqualValues(t, matchPair["application"].Matches, gxset.NewSet("sayGoodBye")) + + testString = `noRule` + matchPair, err = parseRule(testString) + assert.Nil(t, err) + assert.EqualValues(t, matchPair["noRule"].Mismatches, gxset.NewSet()) + assert.EqualValues(t, matchPair["noRule"].Matches, gxset.NewSet()) + + testString = `method!=sayHello,sayGoodBye` + matchPair, err = parseRule(testString) + assert.Nil(t, err) + assert.EqualValues(t, matchPair["method"].Mismatches, gxset.NewSet(`sayHello`, `sayGoodBye`)) + + testString = `method!=sayHello,sayGoodDay=sayGoodBye` + matchPair, err = parseRule(testString) + assert.Nil(t, err) + assert.EqualValues(t, matchPair["method"].Mismatches, gxset.NewSet(`sayHello`, `sayGoodDay`)) + assert.EqualValues(t, matchPair["method"].Matches, gxset.NewSet(`sayGoodBye`)) +} + +func TestNewConditionRouter(t *testing.T) { + url, _ := common.NewURL(`condition://0.0.0.0:?application=mock-app&category=routers&force=true&priority=1&router=condition&rule=YSAmIGMgPT4gYiAmIGQ%3D`) + router, err := NewConditionRouter(&url) + assert.Nil(t, err) + assert.Equal(t, true, router.Enabled()) + assert.Equal(t, true, router.Force) + assert.Equal(t, int64(1), router.Priority()) + whenRule, _ := parseRule("a & c") + thenRule, _ := parseRule("b & d") + assert.EqualValues(t, router.WhenCondition, whenRule) + assert.EqualValues(t, router.ThenCondition, thenRule) + + router, err = NewConditionRouter(nil) + assert.Error(t, err) + + url, _ = common.NewURL(`condition://0.0.0.0:?application=mock-app&category=routers&force=true&priority=1&router=condition&rule=YSAmT4gYiAmIGQ%3D`) + router, err = NewConditionRouter(&url) + assert.Error(t, err) + + url, _ = common.NewURL(`condition://0.0.0.0:?application=mock-app&category=routers&force=true&router=condition&rule=YSAmIGMgPT4gYiAmIGQ%3D`) + router, err = NewConditionRouter(&url) + assert.Nil(t, err) + assert.Equal(t, int64(150), router.Priority()) + + url, _ = common.NewURL(`condition://0.0.0.0:?category=routers&force=true&interface=mock-service&router=condition&rule=YSAmIGMgPT4gYiAmIGQ%3D`) + router, err = NewConditionRouter(&url) + assert.Nil(t, err) + assert.Equal(t, int64(140), router.Priority()) +} diff --git a/cluster/router/tag/router_rule.go b/cluster/router/tag/router_rule.go index 926446dcb2f18fa2fd4639a9246a85f435d75d45..fcf5542fdd5f5cf285f4060a234cf2ef2c4185ed 100644 --- a/cluster/router/tag/router_rule.go +++ b/cluster/router/tag/router_rule.go @@ -24,7 +24,7 @@ import ( // RouterRule RouterRule config read from config file or config center type RouterRule struct { - router.BaseRouterRule `yaml:",inline""` + router.BaseRouterRule `yaml:",inline"` } func getRule(rawRule string) (*RouterRule, error) { diff --git a/common/constant/cluster.go b/common/constant/cluster.go new file mode 100644 index 0000000000000000000000000000000000000000..6894f3595ea8dfdc83f0ce372bb7f22a47e3e434 --- /dev/null +++ b/common/constant/cluster.go @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package constant + +// nolint +const ( + FAILOVER_CLUSTER_NAME = "failover" + ZONEAWARE_CLUSTER_NAME = "zoneAware" +) diff --git a/common/constant/default.go b/common/constant/default.go index c1c404e089ea90899d2b599b01cd5980c3e92ab1..629aa32392a0151046eaaea67287618eae02158d 100644 --- a/common/constant/default.go +++ b/common/constant/default.go @@ -37,6 +37,7 @@ const ( DEFAULT_RETRIES_INT = 2 DEFAULT_PROTOCOL = "dubbo" DEFAULT_REG_TIMEOUT = "10s" + DEFAULT_REG_TTL = "15m" DEFAULT_CLUSTER = "failover" DEFAULT_FAILBACK_TIMES = "3" DEFAULT_FAILBACK_TIMES_INT = 3 diff --git a/common/constant/key.go b/common/constant/key.go index cd23dd0f1ad3e000fe54c251d9563bcf12ba86c7..0aa6912e4beda60c7e37f53b7504f1ecd1abc572 100644 --- a/common/constant/key.go +++ b/common/constant/key.go @@ -97,6 +97,11 @@ const ( ROLE_KEY = "registry.role" REGISTRY_DEFAULT_KEY = "registry.default" REGISTRY_TIMEOUT_KEY = "registry.timeout" + REGISTRY_LABEL_KEY = "label" + PREFERRED_KEY = "preferred" + ZONE_KEY = "zone" + ZONE_FORCE_KEY = "zone.force" + REGISTRY_TTL_KEY = "registry.ttl" ) const ( @@ -196,7 +201,14 @@ const ( RouterEnabled = "enabled" // Priority Priority key in router module RouterPriority = "priority" - + // RouterScope Scope key in router module + RouterScope = "scope" + // RouterApplicationScope Scope key in router module + RouterApplicationScope = "application" + // RouterServiceScope Scope key in router module + RouterServiceScope = "service" + // RouterRuleKey defines the key of the router, service's/application's name + RouterRuleKey = "key" // ForceUseTag is the tag in attachment ForceUseTag = "dubbo.force.tag" Tagkey = "dubbo.tag" diff --git a/common/url.go b/common/url.go index 807d0ed5eff4ecb70d3adeb8524b841d0ec92a58..ec6dce9175596e4f1774614f8f0cb978d181f300 100644 --- a/common/url.go +++ b/common/url.go @@ -381,7 +381,7 @@ func (c URL) Service() string { if service != "" { return service } else if c.SubURL != nil { - service = c.GetParam(constant.INTERFACE_KEY, strings.TrimPrefix(c.Path, "/")) + service = c.SubURL.GetParam(constant.INTERFACE_KEY, strings.TrimPrefix(c.Path, "/")) if service != "" { // if url.path is "" then return suburl's path, special for registry url return service } diff --git a/common/yaml/yaml.go b/common/yaml/yaml.go index 5edda1b3c7751e8171528d121148b6c3c60fe128..d7e1ca4e898ce64f316b2abf8cb9e3324eb31e32 100644 --- a/common/yaml/yaml.go +++ b/common/yaml/yaml.go @@ -27,7 +27,7 @@ import ( "gopkg.in/yaml.v2" ) -// loadYMLConfig Load yml config byte from file +// LoadYMLConfig Load yml config byte from file func LoadYMLConfig(confProFile string) ([]byte, error) { if len(confProFile) == 0 { return nil, perrors.Errorf("application configure(provider) file name is nil") @@ -40,7 +40,7 @@ func LoadYMLConfig(confProFile string) ([]byte, error) { return ioutil.ReadFile(confProFile) } -// unmarshalYMLConfig Load yml config byte from file, then unmarshal to object +// UnmarshalYMLConfig Load yml config byte from file, then unmarshal to object func UnmarshalYMLConfig(confProFile string, out interface{}) ([]byte, error) { confFileStream, err := LoadYMLConfig(confProFile) if err != nil { @@ -49,6 +49,12 @@ func UnmarshalYMLConfig(confProFile string, out interface{}) ([]byte, error) { return confFileStream, yaml.Unmarshal(confFileStream, out) } +//UnmarshalYML unmarshals decodes the first document found within the in byte slice and assigns decoded values into the out value. func UnmarshalYML(data []byte, out interface{}) error { return yaml.Unmarshal(data, out) } + +// MarshalYML serializes the value provided into a YAML document. +func MarshalYML(in interface{}) ([]byte, error) { + return yaml.Marshal(in) +} diff --git a/config/base_config.go b/config/base_config.go index 0ba5bc7ef98cb30a13890b93a659c467adcbf73b..22a0832731daff6c9957d4913a3784c9b268b11f 100644 --- a/config/base_config.go +++ b/config/base_config.go @@ -29,11 +29,8 @@ import ( ) import ( - "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/config" - "github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/common/logger" - "github.com/apache/dubbo-go/config_center" ) type multiConfiger interface { @@ -52,7 +49,6 @@ type BaseConfig struct { // application config ApplicationConfig *ApplicationConfig `yaml:"application" json:"application,omitempty" property:"application"` - configCenterUrl *common.URL prefix string fatherConfig interface{} EventDispatcherType string `default:"direct" yaml:"event_dispatcher_type" json:"event_dispatcher_type,omitempty"` @@ -72,74 +68,6 @@ func (c *BaseConfig) GetRemoteConfig(name string) (config *RemoteConfig, ok bool return } -// startConfigCenter will start the config center. -// it will prepare the environment -func (c *BaseConfig) startConfigCenter() error { - url, err := common.NewURL(c.ConfigCenterConfig.Address, - common.WithProtocol(c.ConfigCenterConfig.Protocol), common.WithParams(c.ConfigCenterConfig.GetUrlMap())) - if err != nil { - return err - } - c.configCenterUrl = &url - if c.prepareEnvironment() != nil { - return perrors.WithMessagef(err, "start config center error!") - } - // c.fresh() - return err -} - -func (c *BaseConfig) prepareEnvironment() error { - factory := extension.GetConfigCenterFactory(c.ConfigCenterConfig.Protocol) - dynamicConfig, err := factory.GetDynamicConfiguration(c.configCenterUrl) - config.GetEnvInstance().SetDynamicConfiguration(dynamicConfig) - if err != nil { - logger.Errorf("Get dynamic configuration error , error message is %v", err) - return perrors.WithStack(err) - } - content, err := dynamicConfig.GetProperties(c.ConfigCenterConfig.ConfigFile, config_center.WithGroup(c.ConfigCenterConfig.Group)) - if err != nil { - logger.Errorf("Get config content in dynamic configuration error , error message is %v", err) - return perrors.WithStack(err) - } - var appGroup string - var appContent string - if providerConfig != nil && providerConfig.ApplicationConfig != nil && - reflect.ValueOf(c.fatherConfig).Elem().Type().Name() == "ProviderConfig" { - appGroup = providerConfig.ApplicationConfig.Name - } else if consumerConfig != nil && consumerConfig.ApplicationConfig != nil && - reflect.ValueOf(c.fatherConfig).Elem().Type().Name() == "ConsumerConfig" { - appGroup = consumerConfig.ApplicationConfig.Name - } - - if len(appGroup) != 0 { - configFile := c.ConfigCenterConfig.AppConfigFile - if len(configFile) == 0 { - configFile = c.ConfigCenterConfig.ConfigFile - } - appContent, err = dynamicConfig.GetProperties(configFile, config_center.WithGroup(appGroup)) - if err != nil { - return perrors.WithStack(err) - } - } - // global config file - mapContent, err := dynamicConfig.Parser().Parse(content) - if err != nil { - return perrors.WithStack(err) - } - config.GetEnvInstance().UpdateExternalConfigMap(mapContent) - - // appGroup config file - if len(appContent) != 0 { - appMapConent, err := dynamicConfig.Parser().Parse(appContent) - if err != nil { - return perrors.WithStack(err) - } - config.GetEnvInstance().UpdateAppExternalConfigMap(appMapConent) - } - - return nil -} - func getKeyPrefix(val reflect.Value) []string { var ( prefix string diff --git a/config/base_config_test.go b/config/base_config_test.go index 6db6a8dcb84de3fdefe94cce87338b9efe28246c..849a9c4586c0c8cd2d74e3dd1011aaab466f0e93 100644 --- a/config/base_config_test.go +++ b/config/base_config_test.go @@ -28,8 +28,6 @@ import ( import ( "github.com/apache/dubbo-go/common/config" - "github.com/apache/dubbo-go/common/extension" - "github.com/apache/dubbo-go/config_center" _ "github.com/apache/dubbo-go/config_center/apollo" ) @@ -282,23 +280,6 @@ func TestRefreshProvider(t *testing.T) { assert.Equal(t, "20001", father.Protocols["jsonrpc1"].Port) } -func TestStartConfigCenter(t *testing.T) { - extension.SetConfigCenterFactory("mock", func() config_center.DynamicConfigurationFactory { - return &config_center.MockDynamicConfigurationFactory{} - }) - c := &BaseConfig{ConfigCenterConfig: &ConfigCenterConfig{ - Protocol: "mock", - Address: "172.0.0.1", - Group: "dubbo", - ConfigFile: "mockDubbo.properties", - }} - err := c.startConfigCenter() - assert.NoError(t, err) - b, v := config.GetEnvInstance().Configuration().Back().Value.(*config.InmemoryConfiguration).GetProperty("dubbo.application.organization") - assert.True(t, b) - assert.Equal(t, "ikurento.com", v) -} - func TestInitializeStruct(t *testing.T) { testConsumerConfig := &ConsumerConfig{} tp := reflect.TypeOf(ConsumerConfig{}) diff --git a/config/config_center_config.go b/config/config_center_config.go index c9133dc26df0b05e3bb61df0f612d0e2914e98bb..0fc4007940d9b1ac2456c9b2d379493bb5d8edb0 100644 --- a/config/config_center_config.go +++ b/config/config_center_config.go @@ -20,6 +20,7 @@ package config import ( "context" "net/url" + "reflect" "time" ) @@ -28,7 +29,13 @@ import ( ) import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/config" "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/common/logger" + "github.com/apache/dubbo-go/config_center" + perrors "github.com/pkg/errors" ) // ConfigCenterConfig is configuration for config center @@ -52,6 +59,7 @@ type ConfigCenterConfig struct { AppConfigFile string `default:"dubbo.properties" yaml:"app_config_file" json:"app_config_file,omitempty"` AppId string `default:"dubbo" yaml:"app_id" json:"app_id,omitempty"` TimeoutStr string `yaml:"timeout" json:"timeout,omitempty"` + RemoteRef string `required:"false" yaml:"remote_ref" json:"remote_ref,omitempty"` timeout time.Duration } @@ -77,3 +85,94 @@ func (c *ConfigCenterConfig) GetUrlMap() url.Values { urlMap.Set(constant.CONFIG_LOG_DIR_KEY, c.LogDir) return urlMap } + +type configCenter struct { +} + +// toURL will compatible with baseConfig.ConfigCenterConfig.Address and baseConfig.ConfigCenterConfig.RemoteRef before 1.6.0 +// After 1.6.0 will not compatible, only baseConfig.ConfigCenterConfig.RemoteRef +func (b *configCenter) toURL(baseConfig BaseConfig) (common.URL, error) { + if len(baseConfig.ConfigCenterConfig.Address) > 0 { + return common.NewURL(baseConfig.ConfigCenterConfig.Address, + common.WithProtocol(baseConfig.ConfigCenterConfig.Protocol), common.WithParams(baseConfig.ConfigCenterConfig.GetUrlMap())) + } + + remoteRef := baseConfig.ConfigCenterConfig.RemoteRef + rc, ok := baseConfig.GetRemoteConfig(remoteRef) + + if !ok { + return common.URL{}, perrors.New("Could not find out the remote ref config, name: " + remoteRef) + } + + newURL, err := rc.toURL() + if err == nil { + newURL.SetParams(baseConfig.ConfigCenterConfig.GetUrlMap()) + } + return newURL, err +} + +// startConfigCenter will start the config center. +// it will prepare the environment +func (b *configCenter) startConfigCenter(baseConfig BaseConfig) error { + url, err := b.toURL(baseConfig) + if err != nil { + return err + } + if err = b.prepareEnvironment(baseConfig, &url); err != nil { + return perrors.WithMessagef(err, "start config center error!") + } + // c.fresh() + return nil +} + +func (b *configCenter) prepareEnvironment(baseConfig BaseConfig, configCenterUrl *common.URL) error { + factory := extension.GetConfigCenterFactory(configCenterUrl.Protocol) + dynamicConfig, err := factory.GetDynamicConfiguration(configCenterUrl) + if err != nil { + logger.Errorf("Get dynamic configuration error , error message is %v", err) + return perrors.WithStack(err) + } + config.GetEnvInstance().SetDynamicConfiguration(dynamicConfig) + content, err := dynamicConfig.GetProperties(baseConfig.ConfigCenterConfig.ConfigFile, config_center.WithGroup(baseConfig.ConfigCenterConfig.Group)) + if err != nil { + logger.Errorf("Get config content in dynamic configuration error , error message is %v", err) + return perrors.WithStack(err) + } + var appGroup string + var appContent string + if providerConfig != nil && providerConfig.ApplicationConfig != nil && + reflect.ValueOf(baseConfig.fatherConfig).Elem().Type().Name() == "ProviderConfig" { + appGroup = providerConfig.ApplicationConfig.Name + } else if consumerConfig != nil && consumerConfig.ApplicationConfig != nil && + reflect.ValueOf(baseConfig.fatherConfig).Elem().Type().Name() == "ConsumerConfig" { + appGroup = consumerConfig.ApplicationConfig.Name + } + + if len(appGroup) != 0 { + configFile := baseConfig.ConfigCenterConfig.AppConfigFile + if len(configFile) == 0 { + configFile = baseConfig.ConfigCenterConfig.ConfigFile + } + appContent, err = dynamicConfig.GetProperties(configFile, config_center.WithGroup(appGroup)) + if err != nil { + return perrors.WithStack(err) + } + } + // global config file + mapContent, err := dynamicConfig.Parser().Parse(content) + if err != nil { + return perrors.WithStack(err) + } + config.GetEnvInstance().UpdateExternalConfigMap(mapContent) + + // appGroup config file + if len(appContent) != 0 { + appMapConent, err := dynamicConfig.Parser().Parse(appContent) + if err != nil { + return perrors.WithStack(err) + } + config.GetEnvInstance().UpdateAppExternalConfigMap(appMapConent) + } + + return nil +} diff --git a/config/config_center_config_test.go b/config/config_center_config_test.go new file mode 100644 index 0000000000000000000000000000000000000000..2299167bb62a019f6854ea48d61442f5bde7e646 --- /dev/null +++ b/config/config_center_config_test.go @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package config + +import ( + "testing" +) + +import ( + "github.com/stretchr/testify/assert" +) + +import ( + "github.com/apache/dubbo-go/common/config" + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/config_center" +) + +func TestStartConfigCenter(t *testing.T) { + extension.SetConfigCenterFactory("mock", func() config_center.DynamicConfigurationFactory { + return &config_center.MockDynamicConfigurationFactory{} + }) + baseConfig := &BaseConfig{ConfigCenterConfig: &ConfigCenterConfig{ + Protocol: "mock", + Address: "172.0.0.1", + Group: "dubbo", + ConfigFile: "mockDubbo.properties", + }} + + c := &configCenter{} + err := c.startConfigCenter(*baseConfig) + assert.NoError(t, err) + b, v := config.GetEnvInstance().Configuration().Back().Value.(*config.InmemoryConfiguration).GetProperty("dubbo.application.organization") + assert.True(t, b) + assert.Equal(t, "ikurento.com", v) +} + +func TestStartConfigCenterWithRemoteRef(t *testing.T) { + extension.SetConfigCenterFactory("mock", func() config_center.DynamicConfigurationFactory { + return &config_center.MockDynamicConfigurationFactory{} + }) + m := make(map[string]*RemoteConfig) + m["mock"] = &RemoteConfig{Protocol: "mock", Address: "172.0.0.1"} + baseConfig := &BaseConfig{ + Remotes: m, + ConfigCenterConfig: &ConfigCenterConfig{ + Group: "dubbo", + RemoteRef: "mock", + ConfigFile: "mockDubbo.properties", + }} + + c := &configCenter{} + err := c.startConfigCenter(*baseConfig) + assert.NoError(t, err) + b, v := config.GetEnvInstance().Configuration().Back().Value.(*config.InmemoryConfiguration).GetProperty("dubbo.application.organization") + assert.True(t, b) + assert.Equal(t, "ikurento.com", v) +} + +func TestStartConfigCenterWithRemoteRefError(t *testing.T) { + extension.SetConfigCenterFactory("mock", func() config_center.DynamicConfigurationFactory { + return &config_center.MockDynamicConfigurationFactory{} + }) + m := make(map[string]*RemoteConfig) + m["mock"] = &RemoteConfig{Address: "172.0.0.1"} + baseConfig := &BaseConfig{ + Remotes: m, + ConfigCenterConfig: &ConfigCenterConfig{ + Protocol: "mock", + Group: "dubbo", + RemoteRef: "mock", + ConfigFile: "mockDubbo.properties", + }} + + c := &configCenter{} + err := c.startConfigCenter(*baseConfig) + assert.Error(t, err) +} diff --git a/config/config_loader_test.go b/config/config_loader_test.go index 01d2ca812a278ee8fb80feb584673e2ebe470a01..a219b9f465c71186e8cd482a7bbca32a03cdb396 100644 --- a/config/config_loader_test.go +++ b/config/config_loader_test.go @@ -72,7 +72,7 @@ func TestLoad(t *testing.T) { SetProviderService(ms) extension.SetProtocol("registry", GetProtocol) - extension.SetCluster("registryAware", cluster_impl.NewRegistryAwareCluster) + extension.SetCluster(constant.ZONEAWARE_CLUSTER_NAME, cluster_impl.NewZoneAwareCluster) extension.SetProxyFactory("default", proxy_factory.NewDefaultProxyFactory) Load() @@ -101,7 +101,7 @@ func TestLoadWithSingleReg(t *testing.T) { SetProviderService(ms) extension.SetProtocol("registry", GetProtocol) - extension.SetCluster("registryAware", cluster_impl.NewRegistryAwareCluster) + extension.SetCluster(constant.ZONEAWARE_CLUSTER_NAME, cluster_impl.NewZoneAwareCluster) extension.SetProxyFactory("default", proxy_factory.NewDefaultProxyFactory) Load() @@ -130,7 +130,7 @@ func TestWithNoRegLoad(t *testing.T) { SetProviderService(ms) extension.SetProtocol("registry", GetProtocol) - extension.SetCluster("registryAware", cluster_impl.NewRegistryAwareCluster) + extension.SetCluster(constant.ZONEAWARE_CLUSTER_NAME, cluster_impl.NewZoneAwareCluster) extension.SetProxyFactory("default", proxy_factory.NewDefaultProxyFactory) Load() diff --git a/config/consumer_config.go b/config/consumer_config.go index 48f29f0e70028a7c057ee3831b45afa72446f3d0..177531209225b7a4b25751352d9777c0d4ee260c 100644 --- a/config/consumer_config.go +++ b/config/consumer_config.go @@ -41,7 +41,8 @@ import ( // ConsumerConfig is Consumer default configuration type ConsumerConfig struct { BaseConfig `yaml:",inline"` - Filter string `yaml:"filter" json:"filter,omitempty" property:"filter"` + configCenter + Filter string `yaml:"filter" json:"filter,omitempty" property:"filter"` // client Connect_Timeout string `default:"100ms" yaml:"connect_timeout" json:"connect_timeout,omitempty" property:"connect_timeout"` ConnectTimeout time.Duration @@ -125,13 +126,6 @@ func ConsumerInit(confConFile string) error { func configCenterRefreshConsumer() error { //fresh it var err error - if consumerConfig.ConfigCenterConfig != nil { - consumerConfig.SetFatherConfig(consumerConfig) - if err = consumerConfig.startConfigCenter(); err != nil { - return perrors.Errorf("start config center error , error message is {%v}", perrors.WithStack(err)) - } - consumerConfig.fresh() - } if consumerConfig.Request_Timeout != "" { if consumerConfig.RequestTimeout, err = time.ParseDuration(consumerConfig.Request_Timeout); err != nil { return perrors.WithMessagef(err, "time.ParseDuration(Request_Timeout{%#v})", consumerConfig.Request_Timeout) @@ -142,5 +136,12 @@ func configCenterRefreshConsumer() error { return perrors.WithMessagef(err, "time.ParseDuration(Connect_Timeout{%#v})", consumerConfig.Connect_Timeout) } } + if consumerConfig.ConfigCenterConfig != nil { + consumerConfig.SetFatherConfig(consumerConfig) + if err = consumerConfig.startConfigCenter((*consumerConfig).BaseConfig); err != nil { + return perrors.Errorf("start config center error , error message is {%v}", perrors.WithStack(err)) + } + consumerConfig.fresh() + } return nil } diff --git a/config/provider_config.go b/config/provider_config.go index 7cd3c1e98bfb8c35abb2b414b782ec709d0a8d0d..c710e48dc233a62837b31a89828e9c612eaff093 100644 --- a/config/provider_config.go +++ b/config/provider_config.go @@ -37,7 +37,8 @@ import ( // ProviderConfig is the default configuration of service provider type ProviderConfig struct { - BaseConfig `yaml:",inline"` + BaseConfig `yaml:",inline"` + configCenter Filter string `yaml:"filter" json:"filter,omitempty" property:"filter"` ProxyFactory string `yaml:"proxy_factory" default:"default" json:"proxy_factory,omitempty" property:"proxy_factory"` Services map[string]*ServiceConfig `yaml:"services" json:"services,omitempty" property:"services"` @@ -101,7 +102,7 @@ func configCenterRefreshProvider() error { // fresh it if providerConfig.ConfigCenterConfig != nil { providerConfig.fatherConfig = providerConfig - if err := providerConfig.startConfigCenter(); err != nil { + if err := providerConfig.startConfigCenter((*providerConfig).BaseConfig); err != nil { return perrors.Errorf("start config center error , error message is {%v}", perrors.WithStack(err)) } providerConfig.fresh() diff --git a/config/reference_config.go b/config/reference_config.go index e9a895d57a90d9fd2d5d08dcd8706e5b2d058174..bbc875192c7a87354ccc81e28ea05bbc3bb71149 100644 --- a/config/reference_config.go +++ b/config/reference_config.go @@ -147,13 +147,26 @@ func (c *ReferenceConfig) Refer(_ interface{}) { } // TODO(decouple from directory, config should not depend on directory module) + var hitClu string if regUrl != nil { - cluster := extension.GetCluster("registryAware") - c.invoker = cluster.Join(directory.NewStaticDirectory(invokers)) + // for multi-subscription scenario, use 'zone-aware' policy by default + hitClu = constant.ZONEAWARE_CLUSTER_NAME } else { - cluster := extension.GetCluster(c.Cluster) - c.invoker = cluster.Join(directory.NewStaticDirectory(invokers)) + // not a registry url, must be direct invoke. + hitClu = constant.FAILOVER_CLUSTER_NAME + if len(invokers) > 0 { + u := invokers[0].GetUrl() + if nil != &u { + hitClu = u.GetParam(constant.CLUSTER_KEY, constant.ZONEAWARE_CLUSTER_NAME) + } + } } + + cluster := extension.GetCluster(hitClu) + // If 'zone-aware' policy select, the invoker wrap sequence would be: + // ZoneAwareClusterInvoker(StaticDirectory) -> + // FailoverClusterInvoker(RegistryDirectory, routing happens here) -> Invoker + c.invoker = cluster.Join(directory.NewStaticDirectory(invokers)) } // create proxy diff --git a/config/reference_config_test.go b/config/reference_config_test.go index 45cdb2dfaca32f918bc067cf489a3c6fc4820dbc..e45780159615c8896627b003fd45c35af86d3f02 100644 --- a/config/reference_config_test.go +++ b/config/reference_config_test.go @@ -187,10 +187,10 @@ func doInitConsumerWithSingleRegistry() { } } -func TestReferMultireg(t *testing.T) { +func TestReferMultiReg(t *testing.T) { doInitConsumer() extension.SetProtocol("registry", GetProtocol) - extension.SetCluster("registryAware", cluster_impl.NewRegistryAwareCluster) + extension.SetCluster(constant.ZONEAWARE_CLUSTER_NAME, cluster_impl.NewZoneAwareCluster) for _, reference := range consumerConfig.References { reference.Refer(nil) @@ -203,7 +203,7 @@ func TestReferMultireg(t *testing.T) { func TestRefer(t *testing.T) { doInitConsumer() extension.SetProtocol("registry", GetProtocol) - extension.SetCluster("registryAware", cluster_impl.NewRegistryAwareCluster) + extension.SetCluster(constant.ZONEAWARE_CLUSTER_NAME, cluster_impl.NewZoneAwareCluster) for _, reference := range consumerConfig.References { reference.Refer(nil) @@ -217,7 +217,7 @@ func TestRefer(t *testing.T) { func TestReferAsync(t *testing.T) { doInitConsumerAsync() extension.SetProtocol("registry", GetProtocol) - extension.SetCluster("registryAware", cluster_impl.NewRegistryAwareCluster) + extension.SetCluster(constant.ZONEAWARE_CLUSTER_NAME, cluster_impl.NewZoneAwareCluster) for _, reference := range consumerConfig.References { reference.Refer(nil) @@ -275,7 +275,7 @@ func TestReferMultiP2PWithReg(t *testing.T) { func TestImplement(t *testing.T) { doInitConsumer() extension.SetProtocol("registry", GetProtocol) - extension.SetCluster("registryAware", cluster_impl.NewRegistryAwareCluster) + extension.SetCluster(constant.ZONEAWARE_CLUSTER_NAME, cluster_impl.NewZoneAwareCluster) for _, reference := range consumerConfig.References { reference.Refer(nil) reference.Implement(&MockService{}) diff --git a/config/registry_config.go b/config/registry_config.go index ef527c827e9dac4cd2762f579d30254e9e51150f..89566c428ed14f460c0f214358c9fa05d529ddb6 100644 --- a/config/registry_config.go +++ b/config/registry_config.go @@ -39,12 +39,22 @@ type RegistryConfig struct { // I changed "type" to "protocol" ,the same as "protocol" field in java class RegistryConfig TimeoutStr string `yaml:"timeout" default:"5s" json:"timeout,omitempty" property:"timeout"` // unit: second Group string `yaml:"group" json:"group,omitempty" property:"group"` + TTL string `yaml:"ttl" default:"10m" json:"ttl,omitempty" property:"ttl"` // unit: minute // for registry - Address string `yaml:"address" json:"address,omitempty" property:"address"` - Username string `yaml:"username" json:"username,omitempty" property:"username"` - Password string `yaml:"password" json:"password,omitempty" property:"password"` - Simplified bool `yaml:"simplified" json:"simplified,omitempty" property:"simplified"` - Params map[string]string `yaml:"params" json:"params,omitempty" property:"params"` + Address string `yaml:"address" json:"address,omitempty" property:"address"` + Username string `yaml:"username" json:"username,omitempty" property:"username"` + Password string `yaml:"password" json:"password,omitempty" property:"password"` + Simplified bool `yaml:"simplified" json:"simplified,omitempty" property:"simplified"` + // Always use this registry first if set to true, useful when subscribe to multiple registries + Preferred bool `yaml:"preferred" json:"preferred,omitempty" property:"preferred"` + // The region where the registry belongs, usually used to isolate traffics + Zone string `yaml:"zone" json:"zone,omitempty" property:"zone"` + //// Force must user the region, property zone is specified. + //ZoneForce bool `yaml:"zoneForce" json:"zoneForce,omitempty" property:"zoneForce"` + // Affects traffic distribution among registries, + // useful when subscribe to multiple registries Take effect only when no preferred registry is specified. + Weight int64 `yaml:"weight" json:"params,omitempty" property:"weight"` + Params map[string]string `yaml:"params" json:"params,omitempty" property:"params"` } // UnmarshalYAML unmarshals the RegistryConfig by @unmarshal function @@ -118,6 +128,13 @@ func (c *RegistryConfig) getUrlMap(roleType common.RoleType) url.Values { urlMap.Set(constant.ROLE_KEY, strconv.Itoa(int(roleType))) urlMap.Set(constant.REGISTRY_KEY, c.Protocol) urlMap.Set(constant.REGISTRY_TIMEOUT_KEY, c.TimeoutStr) + // multi registry invoker weight label for load balance + urlMap.Set(constant.REGISTRY_KEY+"."+constant.REGISTRY_LABEL_KEY, strconv.FormatBool(true)) + urlMap.Set(constant.REGISTRY_KEY+"."+constant.PREFERRED_KEY, strconv.FormatBool(c.Preferred)) + urlMap.Set(constant.REGISTRY_KEY+"."+constant.ZONE_KEY, c.Zone) + //urlMap.Set(constant.REGISTRY_KEY+"."+constant.ZONE_FORCE_KEY, strconv.FormatBool(c.ZoneForce)) + urlMap.Set(constant.REGISTRY_KEY+"."+constant.WEIGHT_KEY, strconv.FormatInt(c.Weight, 10)) + urlMap.Set(constant.REGISTRY_TTL_KEY, c.TTL) for k, v := range c.Params { urlMap.Set(k, v) } diff --git a/config/remote_config.go b/config/remote_config.go index 5e0330c571715d99e63688ee944c61f8e48117bb..55380dd5a05b47b5b4677b32daf73b37376673d0 100644 --- a/config/remote_config.go +++ b/config/remote_config.go @@ -22,6 +22,11 @@ import ( ) import ( + perrors "github.com/pkg/errors" +) + +import ( + "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/logger" ) @@ -30,6 +35,7 @@ import ( // so that other module, like config center, registry could reuse the config // but now, only metadata report, metadata service, service discovery use this structure type RemoteConfig struct { + Protocol string `yaml:"protocol" json:"protocol,omitempty"` Address string `yaml:"address" json:"address,omitempty"` TimeoutStr string `default:"5s" yaml:"timeout" json:"timeout,omitempty"` Username string `yaml:"username" json:"username,omitempty" property:"username"` @@ -56,3 +62,15 @@ func (rc *RemoteConfig) GetParam(key string, def string) string { } return param } + +func (rc *RemoteConfig) toURL() (common.URL, error) { + if len(rc.Protocol) == 0 { + return common.URL{}, perrors.Errorf("Must provide protocol in RemoteConfig.") + } + return common.NewURL(rc.Address, + common.WithUsername(rc.Username), + common.WithPassword(rc.Password), + common.WithLocation(rc.Address), + common.WithProtocol(rc.Protocol), + ) +} diff --git a/config/router_config.go b/config/router_config.go index 16a2bec918d0f9a2de2174324e78ca21e853dabf..ed42577ed3cce2e5a1ab0da290f0d5450553d8fb 100644 --- a/config/router_config.go +++ b/config/router_config.go @@ -23,6 +23,7 @@ import ( ) import ( + "github.com/apache/dubbo-go/cluster/router" "github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/common/logger" "github.com/apache/dubbo-go/common/yaml" @@ -32,16 +33,37 @@ var ( routerURLSet = gxset.NewSet() ) +// LocalRouterRules defines the local router config structure +type LocalRouterRules struct { + RouterRules []interface{} `yaml:"routerRules"` +} + // RouterInit Load config file to init router config func RouterInit(confRouterFile string) error { - fileRouterFactories := extension.GetFileRouterFactories() bytes, err := yaml.LoadYMLConfig(confRouterFile) if err != nil { return perrors.Errorf("ioutil.ReadFile(file:%s) = error:%v", confRouterFile, perrors.WithStack(err)) } - logger.Warnf("get fileRouterFactories len{%+v})", len(fileRouterFactories)) - for k, factory := range fileRouterFactories { - r, e := factory.NewFileRouter(bytes) + routerRules := &LocalRouterRules{} + err = yaml.UnmarshalYML(bytes, routerRules) + if err != nil { + return perrors.Errorf("Load router file %s failed due to error: %v", confRouterFile, perrors.WithStack(err)) + } + if len(routerRules.RouterRules) == 0 { + return perrors.Errorf("No router configurations in file %s", confRouterFile) + } + fileRouterFactories := extension.GetFileRouterFactories() + for _, v := range routerRules.RouterRules { + content, _ := yaml.MarshalYML(v) + err = initRouterConfig(content, fileRouterFactories) + } + return err +} + +func initRouterConfig(content []byte, factories map[string]router.FilePriorityRouterFactory) error { + logger.Warnf("get fileRouterFactories len{%+v})", len(factories)) + for k, factory := range factories { + r, e := factory.NewFileRouter(content) if e == nil { url := r.URL() routerURLSet.Add(&url) @@ -52,6 +74,7 @@ func RouterInit(confRouterFile string) error { return perrors.Errorf("no file router exists for parse %s , implement router.FIleRouterFactory please.", confRouterFile) } +// GetRouterURLSet exposes the routerURLSet func GetRouterURLSet() *gxset.HashSet { return routerURLSet } diff --git a/config/router_config_test.go b/config/router_config_test.go index 72e51c1c82562b03736fd0afef79b78d83d6f4f3..13af7056d5280ef4cca3c0f9ede9397407df7478 100644 --- a/config/router_config_test.go +++ b/config/router_config_test.go @@ -23,6 +23,7 @@ import ( ) import ( + "github.com/dubbogo/gost/container/set" "github.com/stretchr/testify/assert" ) @@ -31,6 +32,7 @@ import ( ) const testYML = "testdata/router_config.yml" +const testMultiRouterYML = "testdata/router_multi_config.yml" const errorTestYML = "testdata/router_config_error.yml" func TestString(t *testing.T) { @@ -63,4 +65,10 @@ func TestRouterInit(t *testing.T) { assert.NoError(t, errPro) assert.Equal(t, 1, routerURLSet.Size()) + + routerURLSet = gxset.NewSet() + errPro = RouterInit(testMultiRouterYML) + assert.NoError(t, errPro) + + assert.Equal(t, 2, routerURLSet.Size()) } diff --git a/config/service_config_test.go b/config/service_config_test.go index 0f7e404f6e336b8ad254e4007825ecbfcaf9d78b..d2bbda0c49ee18dfeb8e6475203f09d4c2fbb3b9 100644 --- a/config/service_config_test.go +++ b/config/service_config_test.go @@ -153,7 +153,7 @@ func TestExport(t *testing.T) { providerConfig = nil } -func TestgetRandomPort(t *testing.T) { +func TestGetRandomPort(t *testing.T) { protocolConfigs := make([]*ProtocolConfig, 0, 3) ip, err := gxnet.GetLocalIP() diff --git a/config/testdata/router_config.yml b/config/testdata/router_config.yml index f6b91f5da7d95256e6279924b884bfd450c45a08..1845650d93c152585c4d4caf60ab7aa3d5b04887 100644 --- a/config/testdata/router_config.yml +++ b/config/testdata/router_config.yml @@ -1,6 +1,9 @@ # dubbo router yaml configure file -priority: 1 -force: true -conditions : - - "a => b" - - "c => d" \ No newline at end of file +routerRules: + - scope: application + key: mock-app + priority: 1 + force: true + conditions : + - "a => b" + - "c => d" \ No newline at end of file diff --git a/config/testdata/router_config_error.yml b/config/testdata/router_config_error.yml index 37894ac96474281d10131b53d8e644f10a18b14e..74e89cc52ec772a52c3c424a94276075fb099f8c 100644 --- a/config/testdata/router_config_error.yml +++ b/config/testdata/router_config_error.yml @@ -1,6 +1,7 @@ # dubbo router yaml configure file -priority: 1 -force: true -noConditions : - - "a => b" - - "c => d" \ No newline at end of file +routerRules: + - priority: 1 + force: true + noConditions : + - "a => b" + - "c => d" \ No newline at end of file diff --git a/config/testdata/router_multi_config.yml b/config/testdata/router_multi_config.yml new file mode 100644 index 0000000000000000000000000000000000000000..42bb4cbe70a43dc82ad4a4849059ad4344e6fd85 --- /dev/null +++ b/config/testdata/router_multi_config.yml @@ -0,0 +1,16 @@ +# dubbo router yaml configure file +routerRules: + - scope: application + key: mock-app + priority: 1 + force: true + conditions : + - "a => b" + - "c => d" + - scope: application + key: mock-app2 + priority: 1 + force: true + conditions : + - "a => b" + - "c => d" \ No newline at end of file diff --git a/protocol/invocation.go b/protocol/invocation.go index ba5949794c0120874ebdf31cfb1fd9c7d8ac08e4..296ec0540c1eed69c30e1b1477be038a4a9cc00e 100644 --- a/protocol/invocation.go +++ b/protocol/invocation.go @@ -41,6 +41,8 @@ type Invocation interface { Attributes() map[string]interface{} // AttributeByKey gets attribute by key , if nil then return default value AttributeByKey(string, interface{}) interface{} + // SetAttachments sets attribute by @key and @value. + SetAttachments(key string, value string) // Invoker gets the invoker in current context. Invoker() Invoker } diff --git a/registry/zookeeper/registry.go b/registry/zookeeper/registry.go index 8f2ac1023b8ad34938b9996b480e3bbc4adbaaea..e8ee51beb70b5a08ec60b213c5342ef52972c59f 100644 --- a/registry/zookeeper/registry.go +++ b/registry/zookeeper/registry.go @@ -243,6 +243,8 @@ func (r *zkRegistry) getListener(conf *common.URL) (*RegistryConfigurationListen var zkListener *RegistryConfigurationListener dataListener := r.dataListener + ttl := r.GetParam(constant.REGISTRY_TTL_KEY, constant.DEFAULT_REG_TTL) + conf.SetParam(constant.REGISTRY_TTL_KEY, ttl) dataListener.mutex.Lock() defer dataListener.mutex.Unlock() if r.dataListener.subscribed[conf.ServiceKey()] != nil { diff --git a/remoting/kubernetes/listener_test.go b/remoting/kubernetes/listener_test.go index 1f398485b2f16defddf44ce1a08a7ecfd9760dd1..0b05b6e6e09501dfd25164e865a3eb6eef91dc9f 100644 --- a/remoting/kubernetes/listener_test.go +++ b/remoting/kubernetes/listener_test.go @@ -19,6 +19,7 @@ package kubernetes import ( "testing" + "time" ) import ( @@ -87,6 +88,7 @@ func TestListener(t *testing.T) { listener := NewEventListener(c) dataListener := &mockDataListener{client: c, changedData: changedData, rc: make(chan remoting.Event)} listener.ListenServiceEvent("/dubbo", dataListener) + time.Sleep(1e9) for _, tc := range tests { diff --git a/remoting/zookeeper/listener.go b/remoting/zookeeper/listener.go index 9a4874db24696d90e4fcc7d9d987f5888f1be599..4f50c18ab61ba6faf373dfd0f831c14ae7ab6d5d 100644 --- a/remoting/zookeeper/listener.go +++ b/remoting/zookeeper/listener.go @@ -37,6 +37,10 @@ import ( "github.com/apache/dubbo-go/remoting" ) +var ( + defaultTTL = 15 * time.Minute +) + // nolint type ZkEventListener struct { client *ZookeeperClient @@ -197,10 +201,20 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkPath string, listen var ( failTimes int + ttl time.Duration event chan struct{} zkEvent zk.Event ) event = make(chan struct{}, 4) + ttl = defaultTTL + if conf != nil { + timeout, err := time.ParseDuration(conf.GetParam(constant.REGISTRY_TTL_KEY, constant.DEFAULT_REG_TTL)) + if err == nil { + ttl = timeout + } else { + logger.Warnf("wrong configuration for registry ttl, error:=%+v, using default value %v instead", err, defaultTTL) + } + } defer close(event) for { // get current children for a zkPath @@ -302,18 +316,29 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkPath string, listen }(dubboPath, listener) } } - select { - case zkEvent = <-childEventCh: - logger.Warnf("get a zookeeper zkEvent{type:%s, server:%s, path:%s, state:%d-%s, err:%s}", - zkEvent.Type.String(), zkEvent.Server, zkEvent.Path, zkEvent.State, StateToString(zkEvent.State), zkEvent.Err) - if zkEvent.Type != zk.EventNodeChildrenChanged { - continue + // Periodically update provider information + ticker := time.NewTicker(ttl) + WATCH: + for { + select { + case <-ticker.C: + l.handleZkNodeEvent(zkEvent.Path, children, listener) + case zkEvent = <-childEventCh: + logger.Warnf("get a zookeeper zkEvent{type:%s, server:%s, path:%s, state:%d-%s, err:%s}", + zkEvent.Type.String(), zkEvent.Server, zkEvent.Path, zkEvent.State, StateToString(zkEvent.State), zkEvent.Err) + ticker.Stop() + if zkEvent.Type != zk.EventNodeChildrenChanged { + break WATCH + } + l.handleZkNodeEvent(zkEvent.Path, children, listener) + break WATCH + case <-l.client.Done(): + logger.Warnf("client.done(), listen(path{%s}) goroutine exit now...", zkPath) + ticker.Stop() + return } - l.handleZkNodeEvent(zkEvent.Path, children, listener) - case <-l.client.Done(): - logger.Warnf("client.done(), listen(path{%s}) goroutine exit now...", zkPath) - return } + } }