diff --git a/README.md b/README.md
index a1c09fc3ca4414188f42e8e5433b33b90d959865..7df7e7973a7e3e808f4b8feb4f26b98c0f21e9ef 100644
--- a/README.md
+++ b/README.md
@@ -34,7 +34,7 @@ Both extension module and layered project architecture is according to Apache Du

-If you wanna know more about dubbo-go, please visit this reference [Project Architeture design](https://github.com/apache/dubbo-go/wiki/dubbo-go-V1.0-design)
+If you wanna know more about dubbo-go, please visit this reference [Project Architecture design](https://github.com/apache/dubbo-go/wiki/dubbo-go-V1.0-design)
## Feature list ##
diff --git a/cluster/cluster_impl/base_cluster_invoker.go b/cluster/cluster_impl/base_cluster_invoker.go
index bbdfa715d7cdc461689e60a5a41171ad5c9770e1..ced5b15cb9a1c2292ca866f6f7478ce2b23a30b9 100644
--- a/cluster/cluster_impl/base_cluster_invoker.go
+++ b/cluster/cluster_impl/base_cluster_invoker.go
@@ -17,6 +17,10 @@
package cluster_impl
+import (
+ "context"
+)
+
import (
gxnet "github.com/dubbogo/gost/net"
perrors "github.com/pkg/errors"
@@ -36,6 +40,7 @@ type baseClusterInvoker struct {
availablecheck bool
destroyed *atomic.Bool
stickyInvoker protocol.Invoker
+ interceptor cluster.ClusterInterceptor
}
func newBaseClusterInvoker(directory cluster.Directory) baseClusterInvoker {
@@ -146,6 +151,20 @@ func (invoker *baseClusterInvoker) doSelectInvoker(lb cluster.LoadBalance, invoc
return selectedInvoker
}
+func (invoker *baseClusterInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result {
+ if invoker.interceptor != nil {
+ invoker.interceptor.BeforeInvoker(ctx, invocation)
+
+ result := invoker.interceptor.DoInvoke(ctx, invocation)
+
+ invoker.interceptor.AfterInvoker(ctx, invocation)
+
+ return result
+ }
+
+ return nil
+}
+
func isInvoked(selectedInvoker protocol.Invoker, invoked []protocol.Invoker) bool {
for _, i := range invoked {
if i == selectedInvoker {
diff --git a/cluster/cluster_impl/failover_cluster.go b/cluster/cluster_impl/failover_cluster.go
index 254cc097e3e7f0cc70dfcff01212562bc6febe97..4c09fd16d3eb5ebaf7833443ee0f8a980c81b822 100644
--- a/cluster/cluster_impl/failover_cluster.go
+++ b/cluster/cluster_impl/failover_cluster.go
@@ -19,16 +19,15 @@ package cluster_impl
import (
"github.com/apache/dubbo-go/cluster"
+ "github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/protocol"
)
type failoverCluster struct{}
-const name = "failover"
-
func init() {
- extension.SetCluster(name, NewFailoverCluster)
+ extension.SetCluster(constant.FAILOVER_CLUSTER_NAME, NewFailoverCluster)
}
// NewFailoverCluster returns a failover cluster instance
diff --git a/cluster/cluster_impl/registry_aware_cluster_invoker.go b/cluster/cluster_impl/registry_aware_cluster_invoker.go
deleted file mode 100644
index 7840da5218090a995298e718a4bbce9d7f5480b4..0000000000000000000000000000000000000000
--- a/cluster/cluster_impl/registry_aware_cluster_invoker.go
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package cluster_impl
-
-import (
- "context"
-)
-import (
- "github.com/apache/dubbo-go/cluster"
- "github.com/apache/dubbo-go/common/constant"
- "github.com/apache/dubbo-go/protocol"
-)
-
-type registryAwareClusterInvoker struct {
- baseClusterInvoker
-}
-
-func newRegistryAwareClusterInvoker(directory cluster.Directory) protocol.Invoker {
- return ®istryAwareClusterInvoker{
- baseClusterInvoker: newBaseClusterInvoker(directory),
- }
-}
-
-// nolint
-func (invoker *registryAwareClusterInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result {
- invokers := invoker.directory.List(invocation)
- //First, pick the invoker (XXXClusterInvoker) that comes from the local registry, distinguish by a 'default' key.
- for _, invoker := range invokers {
- if invoker.IsAvailable() && invoker.GetUrl().GetParam(constant.REGISTRY_DEFAULT_KEY, "false") == "true" {
- return invoker.Invoke(ctx, invocation)
- }
- }
-
- //If none of the invokers has a local signal, pick the first one available.
- for _, invoker := range invokers {
- if invoker.IsAvailable() {
- return invoker.Invoke(ctx, invocation)
- }
- }
- return nil
-}
diff --git a/cluster/cluster_impl/registry_aware_cluster_test.go b/cluster/cluster_impl/registry_aware_cluster_test.go
deleted file mode 100644
index 74584b44800fce3342956f4237a63ffbbabf5544..0000000000000000000000000000000000000000
--- a/cluster/cluster_impl/registry_aware_cluster_test.go
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package cluster_impl
-
-import (
- "context"
- "fmt"
- "testing"
-)
-import (
- "github.com/stretchr/testify/assert"
-)
-
-import (
- "github.com/apache/dubbo-go/cluster/directory"
- "github.com/apache/dubbo-go/common"
- "github.com/apache/dubbo-go/protocol"
- "github.com/apache/dubbo-go/protocol/invocation"
-)
-
-func TestRegAwareInvokeSuccess(t *testing.T) {
-
- regAwareCluster := NewRegistryAwareCluster()
-
- invokers := []protocol.Invoker{}
- for i := 0; i < 10; i++ {
- url, _ := common.NewURL(fmt.Sprintf("dubbo://192.168.1.%v:20000/com.ikurento.user.UserProvider", i))
- invokers = append(invokers, NewMockInvoker(url, 1))
- }
-
- staticDir := directory.NewStaticDirectory(invokers)
- clusterInvoker := regAwareCluster.Join(staticDir)
- result := clusterInvoker.Invoke(context.Background(), &invocation.RPCInvocation{})
- assert.NoError(t, result.Error())
- count = 0
-}
-
-func TestDestroy(t *testing.T) {
- regAwareCluster := NewRegistryAwareCluster()
-
- invokers := []protocol.Invoker{}
- for i := 0; i < 10; i++ {
- url, _ := common.NewURL(fmt.Sprintf("dubbo://192.168.1.%v:20000/com.ikurento.user.UserProvider", i))
- invokers = append(invokers, NewMockInvoker(url, 1))
- }
-
- staticDir := directory.NewStaticDirectory(invokers)
- clusterInvoker := regAwareCluster.Join(staticDir)
- assert.Equal(t, true, clusterInvoker.IsAvailable())
- result := clusterInvoker.Invoke(context.Background(), &invocation.RPCInvocation{})
- assert.NoError(t, result.Error())
- count = 0
- clusterInvoker.Destroy()
- assert.Equal(t, false, clusterInvoker.IsAvailable())
-
-}
diff --git a/cluster/cluster_impl/registry_aware_cluster.go b/cluster/cluster_impl/zone_aware_cluster.go
similarity index 63%
rename from cluster/cluster_impl/registry_aware_cluster.go
rename to cluster/cluster_impl/zone_aware_cluster.go
index f4c08973713e6b3736b4ca1ed0d6e5275cf61d01..7439db2d374f73571f3c8b95d8faca6cb98d4cd0 100644
--- a/cluster/cluster_impl/registry_aware_cluster.go
+++ b/cluster/cluster_impl/zone_aware_cluster.go
@@ -19,22 +19,26 @@ package cluster_impl
import (
"github.com/apache/dubbo-go/cluster"
+ "github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/protocol"
)
-type registryAwareCluster struct{}
+type zoneAwareCluster struct{}
func init() {
- extension.SetCluster("registryAware", NewRegistryAwareCluster)
+ extension.SetCluster(constant.ZONEAWARE_CLUSTER_NAME, NewZoneAwareCluster)
}
-// NewRegistryAwareCluster returns a registry aware cluster instance
-func NewRegistryAwareCluster() cluster.Cluster {
- return ®istryAwareCluster{}
+// NewZoneAwareCluster returns a zoneaware cluster instance.
+//
+// More than one registry for subscription.
+// Usually it is used for choose between registries.
+func NewZoneAwareCluster() cluster.Cluster {
+ return &zoneAwareCluster{}
}
-// nolint
-func (cluster *registryAwareCluster) Join(directory cluster.Directory) protocol.Invoker {
- return newRegistryAwareClusterInvoker(directory)
+// Join returns a zoneAwareClusterInvoker instance
+func (cluster *zoneAwareCluster) Join(directory cluster.Directory) protocol.Invoker {
+ return newZoneAwareClusterInvoker(directory)
}
diff --git a/cluster/cluster_impl/zone_aware_cluster_invoker.go b/cluster/cluster_impl/zone_aware_cluster_invoker.go
new file mode 100644
index 0000000000000000000000000000000000000000..0f52b0442c235d570e4cc9d8491aa80a9df5842d
--- /dev/null
+++ b/cluster/cluster_impl/zone_aware_cluster_invoker.go
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package cluster_impl
+
+import (
+ "context"
+ "fmt"
+)
+
+import (
+ "github.com/apache/dubbo-go/cluster"
+ "github.com/apache/dubbo-go/common/constant"
+ "github.com/apache/dubbo-go/protocol"
+)
+
+// When there're more than one registry for subscription.
+//
+// This extension provides a strategy to decide how to distribute traffics among them:
+// 1. registry marked as 'preferred=true' has the highest priority.
+// 2. check the zone the current request belongs, pick the registry that has the same zone first.
+// 3. Evenly balance traffic between all registries based on each registry's weight.
+// 4. Pick anyone that's available.
+type zoneAwareClusterInvoker struct {
+ baseClusterInvoker
+}
+
+func newZoneAwareClusterInvoker(directory cluster.Directory) protocol.Invoker {
+ invoke := &zoneAwareClusterInvoker{
+ baseClusterInvoker: newBaseClusterInvoker(directory),
+ }
+ // add self to interceptor
+ invoke.interceptor = invoke
+ return invoke
+}
+
+// nolint
+func (invoker *zoneAwareClusterInvoker) DoInvoke(ctx context.Context, invocation protocol.Invocation) protocol.Result {
+ invokers := invoker.directory.List(invocation)
+
+ err := invoker.checkInvokers(invokers, invocation)
+ if err != nil {
+ return &protocol.RPCResult{Err: err}
+ }
+
+ // First, pick the invoker (XXXClusterInvoker) that comes from the local registry, distinguish by a 'preferred' key.
+ for _, invoker := range invokers {
+ key := constant.REGISTRY_KEY + "." + constant.PREFERRED_KEY
+ if invoker.IsAvailable() && matchParam("true", key, "false", invoker) {
+ return invoker.Invoke(ctx, invocation)
+ }
+ }
+
+ // providers in the registry with the same zone
+ key := constant.REGISTRY_KEY + "." + constant.ZONE_KEY
+ zone := invocation.AttachmentsByKey(key, "")
+ if "" != zone {
+ for _, invoker := range invokers {
+ if invoker.IsAvailable() && matchParam(zone, key, "", invoker) {
+ return invoker.Invoke(ctx, invocation)
+ }
+ }
+
+ force := invocation.AttachmentsByKey(constant.REGISTRY_KEY+"."+constant.ZONE_FORCE_KEY, "")
+ if "true" == force {
+ return &protocol.RPCResult{
+ Err: fmt.Errorf("no registry instance in zone or "+
+ "no available providers in the registry, zone: %v, "+
+ " registries: %v", zone, invoker.GetUrl()),
+ }
+ }
+ }
+
+ // load balance among all registries, with registry weight count in.
+ loadBalance := getLoadBalance(invokers[0], invocation)
+ ivk := invoker.doSelect(loadBalance, invocation, invokers, nil)
+ if ivk != nil && ivk.IsAvailable() {
+ return ivk.Invoke(ctx, invocation)
+ }
+
+ // If none of the invokers has a preferred signal or is picked by the loadBalancer, pick the first one available.
+ for _, invoker := range invokers {
+ if invoker.IsAvailable() {
+ return invoker.Invoke(ctx, invocation)
+ }
+ }
+
+ return &protocol.RPCResult{
+ Err: fmt.Errorf("no provider available in %v", invokers),
+ }
+}
+
+func (invoker *zoneAwareClusterInvoker) BeforeInvoker(ctx context.Context, invocation protocol.Invocation) {
+ key := constant.REGISTRY_KEY + "." + constant.ZONE_FORCE_KEY
+ force := ctx.Value(key)
+
+ if force != nil {
+ switch value := force.(type) {
+ case bool:
+ if value {
+ invocation.SetAttachments(key, "true")
+ }
+ case string:
+ if "true" == value {
+ invocation.SetAttachments(key, "true")
+ }
+ default:
+ // ignore
+ }
+ }
+}
+
+func (invoker *zoneAwareClusterInvoker) AfterInvoker(ctx context.Context, invocation protocol.Invocation) {
+
+}
+
+func matchParam(target, key, def string, invoker protocol.Invoker) bool {
+ return target == invoker.GetUrl().GetParam(key, def)
+}
diff --git a/cluster/cluster_impl/zone_aware_cluster_invoker_test.go b/cluster/cluster_impl/zone_aware_cluster_invoker_test.go
new file mode 100644
index 0000000000000000000000000000000000000000..cd201a42c759354ca536ea3e9e77116d89ea8b4b
--- /dev/null
+++ b/cluster/cluster_impl/zone_aware_cluster_invoker_test.go
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package cluster_impl
+
+import (
+ "context"
+ "fmt"
+ "testing"
+)
+
+import (
+ "github.com/golang/mock/gomock"
+ "github.com/stretchr/testify/assert"
+)
+
+import (
+ "github.com/apache/dubbo-go/cluster/directory"
+ "github.com/apache/dubbo-go/common"
+ "github.com/apache/dubbo-go/common/constant"
+ "github.com/apache/dubbo-go/protocol"
+ "github.com/apache/dubbo-go/protocol/invocation"
+ "github.com/apache/dubbo-go/protocol/mock"
+)
+
+func TestZoneWareInvokerWithPreferredSuccess(t *testing.T) {
+ ctrl := gomock.NewController(t)
+ // In Go versions 1.14+, if you pass a *testing.T
+ // into gomock.NewController(t) you no longer need to call ctrl.Finish().
+ //defer ctrl.Finish()
+
+ mockResult := &protocol.RPCResult{
+ Attrs: map[string]string{constant.PREFERRED_KEY: "true"},
+ Rest: rest{tried: 0, success: true}}
+
+ var invokers []protocol.Invoker
+ for i := 0; i < 2; i++ {
+ url, _ := common.NewURL(fmt.Sprintf("dubbo://192.168.1.%v:20000/com.ikurento.user.UserProvider", i))
+ invoker := mock.NewMockInvoker(ctrl)
+ invoker.EXPECT().IsAvailable().Return(true).AnyTimes()
+ invoker.EXPECT().GetUrl().Return(url).AnyTimes()
+ if 0 == i {
+ url.SetParam(constant.REGISTRY_KEY+"."+constant.PREFERRED_KEY, "true")
+ invoker.EXPECT().Invoke(gomock.Any()).DoAndReturn(
+ func(invocation protocol.Invocation) protocol.Result {
+ return mockResult
+ })
+ } else {
+ invoker.EXPECT().Invoke(gomock.Any()).DoAndReturn(
+ func(invocation protocol.Invocation) protocol.Result {
+ return &protocol.RPCResult{}
+ })
+ }
+
+ invokers = append(invokers, invoker)
+ }
+
+ zoneAwareCluster := NewZoneAwareCluster()
+ staticDir := directory.NewStaticDirectory(invokers)
+ clusterInvoker := zoneAwareCluster.Join(staticDir)
+
+ result := clusterInvoker.Invoke(context.Background(), &invocation.RPCInvocation{})
+
+ assert.Equal(t, mockResult, result)
+}
+
+func TestZoneWareInvokerWithWeightSuccess(t *testing.T) {
+ ctrl := gomock.NewController(t)
+ // In Go versions 1.14+, if you pass a *testing.T
+ // into gomock.NewController(t) you no longer need to call ctrl.Finish().
+ //defer ctrl.Finish()
+
+ w1 := "50"
+ w2 := "200"
+
+ var invokers []protocol.Invoker
+ for i := 0; i < 2; i++ {
+ url, _ := common.NewURL(fmt.Sprintf("dubbo://192.168.1.%v:20000/com.ikurento.user.UserProvider", i))
+ invoker := mock.NewMockInvoker(ctrl)
+ invoker.EXPECT().IsAvailable().Return(true).AnyTimes()
+ invoker.EXPECT().GetUrl().Return(url).AnyTimes()
+ url.SetParam(constant.REGISTRY_KEY+"."+constant.REGISTRY_LABEL_KEY, "true")
+ if 1 == i {
+ url.SetParam(constant.REGISTRY_KEY+"."+constant.WEIGHT_KEY, w1)
+ invoker.EXPECT().Invoke(gomock.Any()).DoAndReturn(
+ func(invocation protocol.Invocation) protocol.Result {
+ return &protocol.RPCResult{
+ Attrs: map[string]string{constant.WEIGHT_KEY: w1},
+ Rest: rest{tried: 0, success: true}}
+ }).MaxTimes(100)
+ } else {
+ url.SetParam(constant.REGISTRY_KEY+"."+constant.WEIGHT_KEY, w2)
+ invoker.EXPECT().Invoke(gomock.Any()).DoAndReturn(
+ func(invocation protocol.Invocation) protocol.Result {
+ return &protocol.RPCResult{
+ Attrs: map[string]string{constant.WEIGHT_KEY: w2},
+ Rest: rest{tried: 0, success: true}}
+ }).MaxTimes(100)
+ }
+ invokers = append(invokers, invoker)
+ }
+
+ zoneAwareCluster := NewZoneAwareCluster()
+ staticDir := directory.NewStaticDirectory(invokers)
+ clusterInvoker := zoneAwareCluster.Join(staticDir)
+
+ var w2Count, w1Count int
+ loop := 50
+ for i := 0; i < loop; i++ {
+ result := clusterInvoker.Invoke(context.Background(), &invocation.RPCInvocation{})
+ if w2 == result.Attachment(constant.WEIGHT_KEY, "0") {
+ w2Count++
+ }
+ if w1 == result.Attachment(constant.WEIGHT_KEY, "0") {
+ w1Count++
+ }
+ assert.NoError(t, result.Error())
+ }
+ t.Logf("loop count : %d, w1 height : %s | count : %d, w2 height : %s | count : %d", loop,
+ w1, w1Count, w2, w2Count)
+}
+
+func TestZoneWareInvokerWithZoneSuccess(t *testing.T) {
+ var zoneArray = []string{"hangzhou", "shanghai"}
+
+ ctrl := gomock.NewController(t)
+ // In Go versions 1.14+, if you pass a *testing.T
+ // into gomock.NewController(t) you no longer need to call ctrl.Finish().
+ //defer ctrl.Finish()
+
+ var invokers []protocol.Invoker
+ for i := 0; i < 2; i++ {
+ zoneValue := zoneArray[i]
+ url, _ := common.NewURL(fmt.Sprintf("dubbo://192.168.1.%v:20000/com.ikurento.user.UserProvider", i))
+ url.SetParam(constant.REGISTRY_KEY+"."+constant.ZONE_KEY, zoneValue)
+
+ invoker := mock.NewMockInvoker(ctrl)
+ invoker.EXPECT().IsAvailable().Return(true).AnyTimes()
+ invoker.EXPECT().GetUrl().Return(url).AnyTimes()
+ invoker.EXPECT().Invoke(gomock.Any()).DoAndReturn(
+ func(invocation protocol.Invocation) protocol.Result {
+ return &protocol.RPCResult{
+ Attrs: map[string]string{constant.ZONE_KEY: zoneValue},
+ Rest: rest{tried: 0, success: true}}
+ })
+ invokers = append(invokers, invoker)
+ }
+
+ zoneAwareCluster := NewZoneAwareCluster()
+ staticDir := directory.NewStaticDirectory(invokers)
+ clusterInvoker := zoneAwareCluster.Join(staticDir)
+
+ inv := &invocation.RPCInvocation{}
+ // zone hangzhou
+ hz := zoneArray[0]
+ inv.SetAttachments(constant.REGISTRY_KEY+"."+constant.ZONE_KEY, hz)
+
+ result := clusterInvoker.Invoke(context.Background(), inv)
+
+ assert.Equal(t, hz, result.Attachment(constant.ZONE_KEY, ""))
+}
+
+func TestZoneWareInvokerWithZoneForceFail(t *testing.T) {
+ ctrl := gomock.NewController(t)
+ // In Go versions 1.14+, if you pass a *testing.T
+ // into gomock.NewController(t) you no longer need to call ctrl.Finish().
+ //defer ctrl.Finish()
+
+ var invokers []protocol.Invoker
+ for i := 0; i < 2; i++ {
+ url, _ := common.NewURL(fmt.Sprintf("dubbo://192.168.1.%v:20000/com.ikurento.user.UserProvider", i))
+
+ invoker := mock.NewMockInvoker(ctrl)
+ invoker.EXPECT().IsAvailable().Return(true).AnyTimes()
+ invoker.EXPECT().GetUrl().Return(url).AnyTimes()
+ invokers = append(invokers, invoker)
+ }
+
+ zoneAwareCluster := NewZoneAwareCluster()
+ staticDir := directory.NewStaticDirectory(invokers)
+ clusterInvoker := zoneAwareCluster.Join(staticDir)
+
+ inv := &invocation.RPCInvocation{}
+ // zone hangzhou
+ inv.SetAttachments(constant.REGISTRY_KEY+"."+constant.ZONE_KEY, "hangzhou")
+ // zone force
+ inv.SetAttachments(constant.REGISTRY_KEY+"."+constant.ZONE_FORCE_KEY, "true")
+
+ result := clusterInvoker.Invoke(context.Background(), inv)
+
+ assert.NotNil(t, result.Error())
+}
diff --git a/cluster/cluster_interceptor.go b/cluster/cluster_interceptor.go
new file mode 100644
index 0000000000000000000000000000000000000000..a627e81365781e83932668872cd94a2cdec3b71e
--- /dev/null
+++ b/cluster/cluster_interceptor.go
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package cluster
+
+import (
+ "context"
+)
+
+import (
+ "github.com/apache/dubbo-go/protocol"
+)
+
+// ClusterInterceptor
+// Extension - ClusterInterceptor
+type ClusterInterceptor interface {
+ // Before DoInvoke method
+ BeforeInvoker(ctx context.Context, invocation protocol.Invocation)
+
+ // After DoInvoke method
+ AfterInvoker(ctx context.Context, invocation protocol.Invocation)
+
+ // Corresponding cluster invoke
+ DoInvoke(ctx context.Context, invocation protocol.Invocation) protocol.Result
+}
diff --git a/cluster/directory/base_directory.go b/cluster/directory/base_directory.go
index 8a7e0c0e8359c69faf0e504adeb1778c38a08e04..651c90ff6862bd75e604b0572bb15e583a00065e 100644
--- a/cluster/directory/base_directory.go
+++ b/cluster/directory/base_directory.go
@@ -85,15 +85,21 @@ func (dir *BaseDirectory) SetRouters(urls []*common.URL) {
for _, url := range urls {
routerKey := url.GetParam(constant.ROUTER_KEY, "")
- if len(routerKey) > 0 {
- factory := extension.GetRouterFactory(url.Protocol)
- r, err := factory.NewPriorityRouter(url)
- if err != nil {
- logger.Errorf("Create router fail. router key: %s, url:%s, error: %+v", routerKey, url.Service(), err)
- return
+ if len(routerKey) == 0 {
+ continue
+ }
+ if url.Protocol == constant.CONDITION_ROUTE_PROTOCOL {
+ if !dir.isProperRouter(url) {
+ continue
}
- routers = append(routers, r)
}
+ factory := extension.GetRouterFactory(url.Protocol)
+ r, err := factory.NewPriorityRouter(url)
+ if err != nil {
+ logger.Errorf("Create router fail. router key: %s, url:%s, error: %+v", routerKey, url.Service(), err)
+ return
+ }
+ routers = append(routers, r)
}
logger.Infof("Init file condition router success, size: %v", len(routers))
@@ -104,6 +110,21 @@ func (dir *BaseDirectory) SetRouters(urls []*common.URL) {
rc.AddRouters(routers)
}
+func (dir *BaseDirectory) isProperRouter(url *common.URL) bool {
+ app := url.GetParam(constant.APPLICATION_KEY, "")
+ serviceKey := dir.GetUrl().ServiceKey()
+ if serviceKey == "" {
+ serviceKey = dir.GetUrl().SubURL.ServiceKey()
+ }
+ if len(app) > 0 && app == dir.GetUrl().GetParam(constant.APPLICATION_KEY, "") {
+ return true
+ }
+ if url.ServiceKey() == serviceKey {
+ return true
+ }
+ return false
+}
+
// Destroy Destroy
func (dir *BaseDirectory) Destroy(doDestroy func()) {
if dir.destroyed.CAS(false, true) {
diff --git a/cluster/directory/base_directory_test.go b/cluster/directory/base_directory_test.go
index 8b60163b79b7120829e51f69238474a127133fb4..a2b62dfa008e6cd17b1200d93cd235da17d03905 100644
--- a/cluster/directory/base_directory_test.go
+++ b/cluster/directory/base_directory_test.go
@@ -37,7 +37,7 @@ import (
var (
url, _ = common.NewURL(
fmt.Sprintf("dubbo://%s:%d/com.ikurento.user.UserProvider", constant.LOCAL_HOST_VALUE, constant.DEFAULT_PORT))
- anyUrl, _ = common.NewURL(fmt.Sprintf("condition://%s/com.foo.BarService", constant.ANYHOST_VALUE))
+ anyURL, _ = common.NewURL(fmt.Sprintf("condition://%s/com.foo.BarService", constant.ANYHOST_VALUE))
)
func TestNewBaseDirectory(t *testing.T) {
@@ -48,13 +48,17 @@ func TestNewBaseDirectory(t *testing.T) {
}
func TestBuildRouterChain(t *testing.T) {
- directory := NewBaseDirectory(&url)
+
+ regURL := url
+ regURL.AddParam(constant.INTERFACE_KEY, "mock-app")
+ directory := NewBaseDirectory(®URL)
assert.NotNil(t, directory)
localIP, _ := gxnet.GetLocalIP()
rule := base64.URLEncoding.EncodeToString([]byte("true => " + " host = " + localIP))
- routeURL := getRouteUrl(rule)
+ routeURL := getRouteURL(rule, anyURL)
+ routeURL.AddParam(constant.INTERFACE_KEY, "mock-app")
routerURLs := make([]*common.URL, 0)
routerURLs = append(routerURLs, routeURL)
directory.SetRouters(routerURLs)
@@ -63,9 +67,53 @@ func TestBuildRouterChain(t *testing.T) {
assert.NotNil(t, chain)
}
-func getRouteUrl(rule string) *common.URL {
- anyUrl.AddParam("rule", rule)
- anyUrl.AddParam("force", "true")
- anyUrl.AddParam(constant.ROUTER_KEY, "router")
- return &url
+func getRouteURL(rule string, u common.URL) *common.URL {
+ ru := u
+ ru.AddParam("rule", rule)
+ ru.AddParam("force", "true")
+ ru.AddParam(constant.ROUTER_KEY, "router")
+ return &ru
+}
+
+func TestIsProperRouter(t *testing.T) {
+ regURL := url
+ regURL.AddParam(constant.APPLICATION_KEY, "mock-app")
+ d := NewBaseDirectory(®URL)
+ localIP, _ := gxnet.GetLocalIP()
+ rule := base64.URLEncoding.EncodeToString([]byte("true => " + " host = " + localIP))
+ routeURL := getRouteURL(rule, anyURL)
+ routeURL.AddParam(constant.APPLICATION_KEY, "mock-app")
+ rst := d.isProperRouter(routeURL)
+ assert.True(t, rst)
+
+ regURL.AddParam(constant.APPLICATION_KEY, "")
+ regURL.AddParam(constant.INTERFACE_KEY, "com.foo.BarService")
+ d = NewBaseDirectory(®URL)
+ routeURL = getRouteURL(rule, anyURL)
+ routeURL.AddParam(constant.INTERFACE_KEY, "com.foo.BarService")
+ rst = d.isProperRouter(routeURL)
+ assert.True(t, rst)
+
+ regURL.AddParam(constant.APPLICATION_KEY, "")
+ regURL.AddParam(constant.INTERFACE_KEY, "")
+ d = NewBaseDirectory(®URL)
+ routeURL = getRouteURL(rule, anyURL)
+ rst = d.isProperRouter(routeURL)
+ assert.True(t, rst)
+
+ regURL.SetParam(constant.APPLICATION_KEY, "")
+ regURL.SetParam(constant.INTERFACE_KEY, "")
+ d = NewBaseDirectory(®URL)
+ routeURL = getRouteURL(rule, anyURL)
+ routeURL.AddParam(constant.APPLICATION_KEY, "mock-service")
+ rst = d.isProperRouter(routeURL)
+ assert.False(t, rst)
+
+ regURL.SetParam(constant.APPLICATION_KEY, "")
+ regURL.SetParam(constant.INTERFACE_KEY, "")
+ d = NewBaseDirectory(®URL)
+ routeURL = getRouteURL(rule, anyURL)
+ routeURL.AddParam(constant.INTERFACE_KEY, "mock-service")
+ rst = d.isProperRouter(routeURL)
+ assert.False(t, rst)
}
diff --git a/cluster/loadbalance/consistent_hash.go b/cluster/loadbalance/consistent_hash.go
index 85eb96417c5c1f1946962a87f6b889c2ed09c26c..27ce2369f9de6dfe76bd35581ea26f0e0c24e480 100644
--- a/cluster/loadbalance/consistent_hash.go
+++ b/cluster/loadbalance/consistent_hash.go
@@ -42,7 +42,7 @@ const (
ConsistentHash = "consistenthash"
// HashNodes hash nodes
HashNodes = "hash.nodes"
- // HashArguments key of hash arguments in url
+ // HashArguments key of hash arguments in url
HashArguments = "hash.arguments"
)
diff --git a/cluster/loadbalance/util.go b/cluster/loadbalance/util.go
index b6c013852bf55ce7eb67e4fa18802a938141d283..684ffe11a72058e188fdbcc5f8aa56fe16073619 100644
--- a/cluster/loadbalance/util.go
+++ b/cluster/loadbalance/util.go
@@ -28,23 +28,35 @@ import (
// GetWeight gets weight for load balance strategy
func GetWeight(invoker protocol.Invoker, invocation protocol.Invocation) int64 {
+ var weight int64
url := invoker.GetUrl()
- weight := url.GetMethodParamInt64(invocation.MethodName(), constant.WEIGHT_KEY, constant.DEFAULT_WEIGHT)
+ // Multiple registry scenario, load balance among multiple registries.
+ isRegIvk := url.GetParamBool(constant.REGISTRY_KEY+"."+constant.REGISTRY_LABEL_KEY, false)
+ if isRegIvk {
+ weight = url.GetParamInt(constant.REGISTRY_KEY+"."+constant.WEIGHT_KEY, constant.DEFAULT_WEIGHT)
+ } else {
+ weight = url.GetMethodParamInt64(invocation.MethodName(), constant.WEIGHT_KEY, constant.DEFAULT_WEIGHT)
- if weight > 0 {
- //get service register time an do warm up time
- now := time.Now().Unix()
- timestamp := url.GetParamInt(constant.REMOTE_TIMESTAMP_KEY, now)
- if uptime := now - timestamp; uptime > 0 {
- warmup := url.GetParamInt(constant.WARMUP_KEY, constant.DEFAULT_WARMUP)
- if uptime < warmup {
- if ww := float64(uptime) / float64(warmup) / float64(weight); ww < 1 {
- weight = 1
- } else if int64(ww) <= weight {
- weight = int64(ww)
+ if weight > 0 {
+ //get service register time an do warm up time
+ now := time.Now().Unix()
+ timestamp := url.GetParamInt(constant.REMOTE_TIMESTAMP_KEY, now)
+ if uptime := now - timestamp; uptime > 0 {
+ warmup := url.GetParamInt(constant.WARMUP_KEY, constant.DEFAULT_WARMUP)
+ if uptime < warmup {
+ if ww := float64(uptime) / float64(warmup) / float64(weight); ww < 1 {
+ weight = 1
+ } else if int64(ww) <= weight {
+ weight = int64(ww)
+ }
}
}
}
}
+
+ if weight < 0 {
+ weight = 0
+ }
+
return weight
}
diff --git a/cluster/router/chain/chain_test.go b/cluster/router/chain/chain_test.go
index 17a7fa1ae7b2b6f6be44d5c8ca1732f5c64a526b..dec03894ebc73e315c2bb161911bdc67235e1ebb 100644
--- a/cluster/router/chain/chain_test.go
+++ b/cluster/router/chain/chain_test.go
@@ -66,7 +66,9 @@ func TestNewRouterChain(t *testing.T) {
err = z.Create(path)
assert.NoError(t, err)
- testyml := `enabled: true
+ testyml := `scope: application
+key: mock-app
+enabled: true
force: true
runtime: false
conditions:
@@ -93,7 +95,7 @@ conditions:
assert.NotNil(t, appRouter)
assert.NotNil(t, appRouter.RouterRule())
rule := appRouter.RouterRule()
- assert.Equal(t, "", rule.Scope)
+ assert.Equal(t, "application", rule.Scope)
assert.True(t, rule.Force)
assert.True(t, rule.Enabled)
assert.True(t, rule.Valid)
@@ -101,7 +103,7 @@ conditions:
assert.Equal(t, testyml, rule.RawRule)
assert.Equal(t, false, rule.Runtime)
assert.Equal(t, false, rule.Dynamic)
- assert.Equal(t, "", rule.Key)
+ assert.Equal(t, "mock-app", rule.Key)
}
func TestNewRouterChainURLNil(t *testing.T) {
@@ -116,7 +118,9 @@ func TestRouterChainAddRouters(t *testing.T) {
err = z.Create(path)
assert.NoError(t, err)
- testyml := `enabled: true
+ testyml := `scope: application
+key: mock-app
+enabled: true
force: true
runtime: false
conditions:
@@ -184,7 +188,9 @@ func TestRouterChainRouteAppRouter(t *testing.T) {
err = z.Create(path)
assert.NoError(t, err)
- testyml := `enabled: true
+ testyml := `scope: application
+key: mock-app
+enabled: true
force: true
runtime: false
conditions:
diff --git a/cluster/router/condition/app_router_test.go b/cluster/router/condition/app_router_test.go
index ea186049642d8dbe42ff11997e0c154ff298ce6c..cce96b12c95a691e828d91ba3d0629ddb6421954 100644
--- a/cluster/router/condition/app_router_test.go
+++ b/cluster/router/condition/app_router_test.go
@@ -52,7 +52,9 @@ var (
func TestNewAppRouter(t *testing.T) {
- testYML := `enabled: true
+ testYML := `scope: application
+key: mock-app
+enabled: true
force: true
runtime: false
conditions:
@@ -83,7 +85,7 @@ conditions:
assert.NotNil(t, appRouter)
assert.NotNil(t, appRouter.RouterRule())
rule := appRouter.RouterRule()
- assert.Equal(t, "", rule.Scope)
+ assert.Equal(t, "application", rule.Scope)
assert.True(t, rule.Force)
assert.True(t, rule.Enabled)
assert.True(t, rule.Valid)
@@ -91,13 +93,15 @@ conditions:
assert.Equal(t, testYML, rule.RawRule)
assert.Equal(t, false, rule.Runtime)
assert.Equal(t, false, rule.Dynamic)
- assert.Equal(t, "", rule.Key)
+ assert.Equal(t, "mock-app", rule.Key)
assert.Equal(t, 0, rule.Priority)
}
func TestGenerateConditions(t *testing.T) {
- testYML := `enabled: true
+ testYML := `scope: application
+key: mock-app
+enabled: true
force: true
runtime: false
conditions:
@@ -135,7 +139,9 @@ conditions:
func TestProcess(t *testing.T) {
- testYML := `enabled: true
+ testYML := `scope: application
+key: mock-app
+enabled: true
force: true
runtime: false
conditions:
@@ -165,7 +171,8 @@ conditions:
assert.Equal(t, 1, len(appRouter.conditionRouters))
- testNewYML := `
+ testNewYML := `scope: application
+key: mock-app
enabled: true
force: true
runtime: false
diff --git a/cluster/router/condition/file.go b/cluster/router/condition/file.go
index eabdf1c263446140b359b3e791238b020cecb50c..996db7443faff88d3baa1a2363f98fa62623d121 100644
--- a/cluster/router/condition/file.go
+++ b/cluster/router/condition/file.go
@@ -20,6 +20,7 @@ package condition
import (
"encoding/base64"
"net/url"
+ "regexp"
"strconv"
"strings"
"sync"
@@ -71,11 +72,53 @@ func (f *FileConditionRouter) URL() common.URL {
common.WithParamsValue(constant.RouterPriority, strconv.Itoa(routerRule.Priority)),
common.WithParamsValue(constant.RULE_KEY, base64.URLEncoding.EncodeToString([]byte(rule))),
common.WithParamsValue(constant.ROUTER_KEY, constant.CONDITION_ROUTE_PROTOCOL),
- common.WithParamsValue(constant.CATEGORY_KEY, constant.ROUTERS_CATEGORY))
+ common.WithParamsValue(constant.CATEGORY_KEY, constant.ROUTERS_CATEGORY),
+ )
+ if routerRule.Scope == constant.RouterApplicationScope {
+ f.url.AddParam(constant.APPLICATION_KEY, routerRule.Key)
+ return
+ }
+ grp, srv, ver, e := parseServiceRouterKey(routerRule.Key)
+ if e != nil {
+ return
+ }
+ if len(grp) > 0 {
+ f.url.AddParam(constant.GROUP_KEY, grp)
+ }
+ if len(ver) > 0 {
+ f.url.AddParam(constant.VERSION_KEY, ver)
+ }
+ if len(srv) > 0 {
+ f.url.AddParam(constant.INTERFACE_KEY, srv)
+ }
})
return f.url
}
+// The input value must follow [{group}/]{service}[:{version}] pattern
+// the returning strings are representing group, service, version respectively.
+// input: mock-group/mock-service:1.0.0 ==> "mock-group", "mock-service", "1.0.0"
+// input: mock-group/mock-service ==> "mock-group", "mock-service", ""
+// input: mock-service:1.0.0 ==> "", "mock-service", "1.0.0"
+// For more samples, please refer to unit test.
+func parseServiceRouterKey(key string) (string, string, string, error) {
+ if len(strings.TrimSpace(key)) == 0 {
+ return "", "", "", nil
+ }
+ reg := regexp.MustCompile(`(.*/{1})?([^:/]+)(:{1}[^:]*)?`)
+ strs := reg.FindAllStringSubmatch(key, -1)
+ if strs == nil || len(strs) > 1 {
+ return "", "", "", perrors.Errorf("Invalid key, service key must follow [{group}/]{service}[:{version}] pattern")
+ }
+ if len(strs[0]) != 4 {
+ return "", "", "", perrors.Errorf("Parse service router key failed")
+ }
+ grp := strings.TrimSpace(strings.TrimRight(strs[0][1], "/"))
+ srv := strings.TrimSpace(strs[0][2])
+ ver := strings.TrimSpace(strings.TrimLeft(strs[0][3], ":"))
+ return grp, srv, ver, nil
+}
+
func parseCondition(conditions []string) string {
var when, then string
for _, condition := range conditions {
diff --git a/cluster/router/condition/file_test.go b/cluster/router/condition/file_test.go
index 3092b12ff88dcacc9108c7cdd46ba1ac9f74eb2b..bd19a0d18c6692af181ffef77c5cd3f9fc16d67d 100644
--- a/cluster/router/condition/file_test.go
+++ b/cluster/router/condition/file_test.go
@@ -26,7 +26,9 @@ import (
)
func TestLoadYmlConfig(t *testing.T) {
- router, e := NewFileConditionRouter([]byte(`priority: 1
+ router, e := NewFileConditionRouter([]byte(`scope: application
+key: mock-app
+priority: 1
force: true
conditions :
- "a => b"
@@ -47,12 +49,78 @@ func TestParseCondition(t *testing.T) {
}
func TestFileRouterURL(t *testing.T) {
- router, e := NewFileConditionRouter([]byte(`priority: 1
+ router, e := NewFileConditionRouter([]byte(`scope: application
+key: mock-app
+priority: 1
force: true
conditions :
- "a => b"
- "c => d"`))
assert.Nil(t, e)
assert.NotNil(t, router)
- assert.Equal(t, "condition://0.0.0.0:?category=routers&force=true&priority=1&router=condition&rule=YSAmIGMgPT4gYiAmIGQ%3D", router.URL().String())
+ assert.Equal(t, "condition://0.0.0.0:?application=mock-app&category=routers&force=true&priority=1&router=condition&rule=YSAmIGMgPT4gYiAmIGQ%3D", router.URL().String())
+
+ router, e = NewFileConditionRouter([]byte(`scope: service
+key: mock-service
+priority: 1
+force: true
+conditions :
+ - "a => b"
+ - "c => d"`))
+ assert.Nil(t, e)
+ assert.NotNil(t, router)
+ assert.Equal(t, "condition://0.0.0.0:?category=routers&force=true&interface=mock-service&priority=1&router=condition&rule=YSAmIGMgPT4gYiAmIGQ%3D", router.URL().String())
+
+ router, e = NewFileConditionRouter([]byte(`scope: service
+key: grp1/mock-service:v1
+priority: 1
+force: true
+conditions :
+ - "a => b"
+ - "c => d"`))
+ assert.Nil(t, e)
+ assert.NotNil(t, router)
+ assert.Equal(t, "condition://0.0.0.0:?category=routers&force=true&group=grp1&interface=mock-service&priority=1&router=condition&rule=YSAmIGMgPT4gYiAmIGQ%3D&version=v1", router.URL().String())
+}
+
+func TestParseServiceRouterKey(t *testing.T) {
+ testString := " mock-group / mock-service:1.0.0"
+ grp, srv, ver, err := parseServiceRouterKey(testString)
+ assert.Equal(t, "mock-group", grp)
+ assert.Equal(t, "mock-service", srv)
+ assert.Equal(t, "1.0.0", ver)
+
+ testString = "mock-group/mock-service"
+ grp, srv, ver, err = parseServiceRouterKey(testString)
+ assert.Equal(t, "mock-group", grp)
+ assert.Equal(t, "mock-service", srv)
+ assert.Equal(t, "", ver)
+
+ testString = "mock-service:1.0.0"
+ grp, srv, ver, err = parseServiceRouterKey(testString)
+ assert.Equal(t, "", grp)
+ assert.Equal(t, "mock-service", srv)
+ assert.Equal(t, "1.0.0", ver)
+
+ testString = "mock-service"
+ grp, srv, ver, err = parseServiceRouterKey(testString)
+ assert.Equal(t, "", grp)
+ assert.Equal(t, "mock-service", srv)
+ assert.Equal(t, "", ver)
+
+ testString = "/mock-service:"
+ grp, srv, ver, err = parseServiceRouterKey(testString)
+ assert.Equal(t, "", grp)
+ assert.Equal(t, "mock-service", srv)
+ assert.Equal(t, "", ver)
+
+ testString = "grp:mock-service:123"
+ grp, srv, ver, err = parseServiceRouterKey(testString)
+ assert.Error(t, err)
+
+ testString = ""
+ grp, srv, ver, err = parseServiceRouterKey(testString)
+ assert.Equal(t, "", grp)
+ assert.Equal(t, "", srv)
+ assert.Equal(t, "", ver)
}
diff --git a/cluster/router/condition/router.go b/cluster/router/condition/router.go
index 40a251573f5e73d40032972313565d98b288b1b1..751b5a7111655577566c561614d39093485130cd 100644
--- a/cluster/router/condition/router.go
+++ b/cluster/router/condition/router.go
@@ -117,7 +117,13 @@ func NewConditionRouter(url *common.URL) (*ConditionRouter, error) {
}
router.url = url
- router.priority = url.GetParamInt(constant.RouterPriority, 0)
+ var defaultPriority int64 = 0
+ if url.GetParam(constant.APPLICATION_KEY, "") != "" {
+ defaultPriority = 150
+ } else if url.GetParam(constant.INTERFACE_KEY, "") != "" {
+ defaultPriority = 140
+ }
+ router.priority = url.GetParamInt(constant.RouterPriority, defaultPriority)
router.Force = url.GetParamBool(constant.RouterForce, false)
router.enabled = url.GetParamBool(constant.RouterEnabled, true)
@@ -288,7 +294,7 @@ func matchCondition(pairs map[string]MatchPair, url *common.URL, param *common.U
return result
}
-// MatchPair Match key pair , condition process
+// MatchPair Match key pair, condition process
type MatchPair struct {
Matches *gxset.HashSet
Mismatches *gxset.HashSet
@@ -314,7 +320,7 @@ func (pair MatchPair) isMatch(value string, param *common.URL) bool {
return true
}
if !pair.Mismatches.Empty() && !pair.Matches.Empty() {
- //when both mismatches and matches contain the same value, then using mismatches first
+ // when both mismatches and matches contain the same value, then using mismatches first
for mismatch := range pair.Mismatches.Items {
if isMatchGlobalPattern(mismatch.(string), value, param) {
return false
diff --git a/cluster/router/condition/router_rule.go b/cluster/router/condition/router_rule.go
index ce397d6cc0f51519123dd427709e8dba42d72a20..9f2bf2d41a941398698f110f77a7a3051a0c9c11 100644
--- a/cluster/router/condition/router_rule.go
+++ b/cluster/router/condition/router_rule.go
@@ -28,12 +28,13 @@ import (
import (
"github.com/apache/dubbo-go/cluster/router"
"github.com/apache/dubbo-go/common"
+ "github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/yaml"
)
// RouterRule RouterRule config read from config file or config center
type RouterRule struct {
- router.BaseRouterRule `yaml:",inline""`
+ router.BaseRouterRule `yaml:",inline"`
Conditions []string
}
@@ -57,7 +58,7 @@ func getRule(rawRule string) (*RouterRule, error) {
return r, err
}
r.RawRule = rawRule
- if len(r.Conditions) != 0 {
+ if len(r.Conditions) > 0 && len(r.Key) > 0 && (r.Scope == constant.RouterApplicationScope || r.Scope == constant.RouterServiceScope) {
r.Valid = true
}
diff --git a/cluster/router/condition/router_rule_test.go b/cluster/router/condition/router_rule_test.go
index 675acaec912b413d8fa3d1a25463b1fd4813a7f5..369b14f08a6a650b36fe63a2d890c04fe7b892a5 100644
--- a/cluster/router/condition/router_rule_test.go
+++ b/cluster/router/condition/router_rule_test.go
@@ -32,6 +32,7 @@ import (
func TestGetRule(t *testing.T) {
testyml := `
scope: application
+key: test-provider
runtime: true
force: false
conditions:
@@ -50,10 +51,31 @@ conditions:
assert.True(t, rule.Runtime)
assert.Equal(t, false, rule.Force)
assert.Equal(t, testyml, rule.RawRule)
- assert.True(t, true, rule.Valid)
+ assert.True(t, rule.Valid)
assert.Equal(t, false, rule.Enabled)
assert.Equal(t, false, rule.Dynamic)
- assert.Equal(t, "", rule.Key)
+ assert.Equal(t, "test-provider", rule.Key)
+
+ testyml = `
+key: test-provider
+runtime: true
+force: false
+conditions:
+ - >
+ method!=sayHello =>`
+ rule, e = getRule(testyml)
+ assert.Nil(t, e)
+ assert.False(t, rule.Valid)
+
+ testyml = `
+scope: noApplication
+key: test-provider
+conditions:
+ - >
+ method!=sayHello =>`
+ rule, e = getRule(testyml)
+ assert.Nil(t, e)
+ assert.False(t, rule.Valid)
}
func TestIsMatchGlobPattern(t *testing.T) {
diff --git a/cluster/router/condition/router_test.go b/cluster/router/condition/router_test.go
new file mode 100644
index 0000000000000000000000000000000000000000..c27a1d9552a331d7e67af0eb3d444c480758891e
--- /dev/null
+++ b/cluster/router/condition/router_test.go
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package condition
+
+import (
+ "testing"
+)
+
+import (
+ "github.com/apache/dubbo-go/common"
+ "github.com/dubbogo/gost/container/set"
+ "github.com/stretchr/testify/assert"
+)
+
+func TestParseRule(t *testing.T) {
+ testString := ``
+ matchPair, err := parseRule(testString)
+ assert.Nil(t, err)
+ assert.EqualValues(t, matchPair, make(map[string]MatchPair, 16))
+
+ testString = `method!=sayHello&application=sayGoodBye`
+ matchPair, err = parseRule(testString)
+ assert.Nil(t, err)
+ assert.EqualValues(t, matchPair["method"].Mismatches, gxset.NewSet("sayHello"))
+ assert.EqualValues(t, matchPair["application"].Matches, gxset.NewSet("sayGoodBye"))
+
+ testString = `noRule`
+ matchPair, err = parseRule(testString)
+ assert.Nil(t, err)
+ assert.EqualValues(t, matchPair["noRule"].Mismatches, gxset.NewSet())
+ assert.EqualValues(t, matchPair["noRule"].Matches, gxset.NewSet())
+
+ testString = `method!=sayHello,sayGoodBye`
+ matchPair, err = parseRule(testString)
+ assert.Nil(t, err)
+ assert.EqualValues(t, matchPair["method"].Mismatches, gxset.NewSet(`sayHello`, `sayGoodBye`))
+
+ testString = `method!=sayHello,sayGoodDay=sayGoodBye`
+ matchPair, err = parseRule(testString)
+ assert.Nil(t, err)
+ assert.EqualValues(t, matchPair["method"].Mismatches, gxset.NewSet(`sayHello`, `sayGoodDay`))
+ assert.EqualValues(t, matchPair["method"].Matches, gxset.NewSet(`sayGoodBye`))
+}
+
+func TestNewConditionRouter(t *testing.T) {
+ url, _ := common.NewURL(`condition://0.0.0.0:?application=mock-app&category=routers&force=true&priority=1&router=condition&rule=YSAmIGMgPT4gYiAmIGQ%3D`)
+ router, err := NewConditionRouter(&url)
+ assert.Nil(t, err)
+ assert.Equal(t, true, router.Enabled())
+ assert.Equal(t, true, router.Force)
+ assert.Equal(t, int64(1), router.Priority())
+ whenRule, _ := parseRule("a & c")
+ thenRule, _ := parseRule("b & d")
+ assert.EqualValues(t, router.WhenCondition, whenRule)
+ assert.EqualValues(t, router.ThenCondition, thenRule)
+
+ router, err = NewConditionRouter(nil)
+ assert.Error(t, err)
+
+ url, _ = common.NewURL(`condition://0.0.0.0:?application=mock-app&category=routers&force=true&priority=1&router=condition&rule=YSAmT4gYiAmIGQ%3D`)
+ router, err = NewConditionRouter(&url)
+ assert.Error(t, err)
+
+ url, _ = common.NewURL(`condition://0.0.0.0:?application=mock-app&category=routers&force=true&router=condition&rule=YSAmIGMgPT4gYiAmIGQ%3D`)
+ router, err = NewConditionRouter(&url)
+ assert.Nil(t, err)
+ assert.Equal(t, int64(150), router.Priority())
+
+ url, _ = common.NewURL(`condition://0.0.0.0:?category=routers&force=true&interface=mock-service&router=condition&rule=YSAmIGMgPT4gYiAmIGQ%3D`)
+ router, err = NewConditionRouter(&url)
+ assert.Nil(t, err)
+ assert.Equal(t, int64(140), router.Priority())
+}
diff --git a/cluster/router/healthcheck/default_health_check.go b/cluster/router/healthcheck/default_health_check.go
index a26f86ddac45aa6e999cd4453aa296d0786a02ba..c693b86ecdab7b32936185fe4bd614bd0f83fbeb 100644
--- a/cluster/router/healthcheck/default_health_check.go
+++ b/cluster/router/healthcheck/default_health_check.go
@@ -49,7 +49,7 @@ type DefaultHealthChecker struct {
// and the current active request
func (c *DefaultHealthChecker) IsHealthy(invoker protocol.Invoker) bool {
urlStatus := protocol.GetURLStatus(invoker.GetUrl())
- if c.isCircuitBreakerTripped(urlStatus) || urlStatus.GetActive() > c.GetOutStandingRequestConutLimit() {
+ if c.isCircuitBreakerTripped(urlStatus) || urlStatus.GetActive() > c.GetOutStandingRequestCountLimit() {
logger.Debugf("Invoker [%s] is currently in circuitbreaker tripped state", invoker.GetUrl().Key())
return false
}
@@ -92,18 +92,18 @@ func (c *DefaultHealthChecker) getCircuitBreakerSleepWindowTime(status *protocol
return int64(sleepWindow)
}
-// GetOutStandingRequestConutLimit return the requestSuccessiveFailureThreshold bound to this DefaultHealthChecker
+// GetRequestSuccessiveFailureThreshold return the requestSuccessiveFailureThreshold bound to this DefaultHealthChecker
func (c *DefaultHealthChecker) GetRequestSuccessiveFailureThreshold() int32 {
return c.requestSuccessiveFailureThreshold
}
-// GetOutStandingRequestConutLimit return the circuitTrippedTimeoutFactor bound to this DefaultHealthChecker
+// GetCircuitTrippedTimeoutFactor return the circuitTrippedTimeoutFactor bound to this DefaultHealthChecker
func (c *DefaultHealthChecker) GetCircuitTrippedTimeoutFactor() int32 {
return c.circuitTrippedTimeoutFactor
}
-// GetOutStandingRequestConutLimit return the outStandingRequestConutLimit bound to this DefaultHealthChecker
-func (c *DefaultHealthChecker) GetOutStandingRequestConutLimit() int32 {
+// GetOutStandingRequestCountLimit return the outStandingRequestConutLimit bound to this DefaultHealthChecker
+func (c *DefaultHealthChecker) GetOutStandingRequestCountLimit() int32 {
return c.outStandingRequestConutLimit
}
diff --git a/cluster/router/router.go b/cluster/router/router.go
index 7291224edb2eb9a7d92871eaeb86b07283cb39c1..66603c1d4d0efedad3489712ecea91b43254fffd 100644
--- a/cluster/router/router.go
+++ b/cluster/router/router.go
@@ -54,10 +54,7 @@ type PriorityRouter interface {
// NotifyRouter notify router use the invoker list. Invoker list may change from time to time. This method gives the router a
// chance to prepare before {@link Router#route(List, URL, Invocation)} gets called.
type NotifyRouter interface {
- router
+ PriorityRouter
// Notify notify whenever addresses in registry change
Notify([]protocol.Invoker)
- // Priority Return Priority in router
- // 0 to ^int(0) is better
- Priority() int64
}
diff --git a/common/constant/cluster.go b/common/constant/cluster.go
new file mode 100644
index 0000000000000000000000000000000000000000..6894f3595ea8dfdc83f0ce372bb7f22a47e3e434
--- /dev/null
+++ b/common/constant/cluster.go
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package constant
+
+// nolint
+const (
+ FAILOVER_CLUSTER_NAME = "failover"
+ ZONEAWARE_CLUSTER_NAME = "zoneAware"
+)
diff --git a/common/constant/key.go b/common/constant/key.go
index 72072ddb155edeff1ca05fbfe6ae132f0e5576e9..211768122ff4ffeaacd17f13af90eba21ab74c7b 100644
--- a/common/constant/key.go
+++ b/common/constant/key.go
@@ -97,6 +97,10 @@ const (
ROLE_KEY = "registry.role"
REGISTRY_DEFAULT_KEY = "registry.default"
REGISTRY_TIMEOUT_KEY = "registry.timeout"
+ REGISTRY_LABEL_KEY = "label"
+ PREFERRED_KEY = "preferred"
+ ZONE_KEY = "zone"
+ ZONE_FORCE_KEY = "zone.force"
REGISTRY_TTL_KEY = "registry.ttl"
)
@@ -200,7 +204,14 @@ const (
RouterEnabled = "enabled"
// Priority Priority key in router module
RouterPriority = "priority"
-
+ // RouterScope Scope key in router module
+ RouterScope = "scope"
+ // RouterApplicationScope Scope key in router module
+ RouterApplicationScope = "application"
+ // RouterServiceScope Scope key in router module
+ RouterServiceScope = "service"
+ // RouterRuleKey defines the key of the router, service's/application's name
+ RouterRuleKey = "key"
// ForceUseTag is the tag in attachment
ForceUseTag = "dubbo.force.tag"
Tagkey = "dubbo.tag"
diff --git a/common/logger/log.yml b/common/logger/log.yml
index 59fa4279ad85272c4c49d532beaf23b74d00f58a..21f97bcbc40218ab23a6fc022b148d321cbe23cc 100644
--- a/common/logger/log.yml
+++ b/common/logger/log.yml
@@ -1,6 +1,5 @@
-
level: "debug"
-development: true
+development: false
disableCaller: false
disableStacktrace: false
sampling:
diff --git a/common/url.go b/common/url.go
index 807d0ed5eff4ecb70d3adeb8524b841d0ec92a58..ec6dce9175596e4f1774614f8f0cb978d181f300 100644
--- a/common/url.go
+++ b/common/url.go
@@ -381,7 +381,7 @@ func (c URL) Service() string {
if service != "" {
return service
} else if c.SubURL != nil {
- service = c.GetParam(constant.INTERFACE_KEY, strings.TrimPrefix(c.Path, "/"))
+ service = c.SubURL.GetParam(constant.INTERFACE_KEY, strings.TrimPrefix(c.Path, "/"))
if service != "" { // if url.path is "" then return suburl's path, special for registry url
return service
}
diff --git a/common/yaml/yaml.go b/common/yaml/yaml.go
index 5edda1b3c7751e8171528d121148b6c3c60fe128..d7e1ca4e898ce64f316b2abf8cb9e3324eb31e32 100644
--- a/common/yaml/yaml.go
+++ b/common/yaml/yaml.go
@@ -27,7 +27,7 @@ import (
"gopkg.in/yaml.v2"
)
-// loadYMLConfig Load yml config byte from file
+// LoadYMLConfig Load yml config byte from file
func LoadYMLConfig(confProFile string) ([]byte, error) {
if len(confProFile) == 0 {
return nil, perrors.Errorf("application configure(provider) file name is nil")
@@ -40,7 +40,7 @@ func LoadYMLConfig(confProFile string) ([]byte, error) {
return ioutil.ReadFile(confProFile)
}
-// unmarshalYMLConfig Load yml config byte from file, then unmarshal to object
+// UnmarshalYMLConfig Load yml config byte from file, then unmarshal to object
func UnmarshalYMLConfig(confProFile string, out interface{}) ([]byte, error) {
confFileStream, err := LoadYMLConfig(confProFile)
if err != nil {
@@ -49,6 +49,12 @@ func UnmarshalYMLConfig(confProFile string, out interface{}) ([]byte, error) {
return confFileStream, yaml.Unmarshal(confFileStream, out)
}
+//UnmarshalYML unmarshals decodes the first document found within the in byte slice and assigns decoded values into the out value.
func UnmarshalYML(data []byte, out interface{}) error {
return yaml.Unmarshal(data, out)
}
+
+// MarshalYML serializes the value provided into a YAML document.
+func MarshalYML(in interface{}) ([]byte, error) {
+ return yaml.Marshal(in)
+}
diff --git a/config/config_loader_test.go b/config/config_loader_test.go
index 01d2ca812a278ee8fb80feb584673e2ebe470a01..a219b9f465c71186e8cd482a7bbca32a03cdb396 100644
--- a/config/config_loader_test.go
+++ b/config/config_loader_test.go
@@ -72,7 +72,7 @@ func TestLoad(t *testing.T) {
SetProviderService(ms)
extension.SetProtocol("registry", GetProtocol)
- extension.SetCluster("registryAware", cluster_impl.NewRegistryAwareCluster)
+ extension.SetCluster(constant.ZONEAWARE_CLUSTER_NAME, cluster_impl.NewZoneAwareCluster)
extension.SetProxyFactory("default", proxy_factory.NewDefaultProxyFactory)
Load()
@@ -101,7 +101,7 @@ func TestLoadWithSingleReg(t *testing.T) {
SetProviderService(ms)
extension.SetProtocol("registry", GetProtocol)
- extension.SetCluster("registryAware", cluster_impl.NewRegistryAwareCluster)
+ extension.SetCluster(constant.ZONEAWARE_CLUSTER_NAME, cluster_impl.NewZoneAwareCluster)
extension.SetProxyFactory("default", proxy_factory.NewDefaultProxyFactory)
Load()
@@ -130,7 +130,7 @@ func TestWithNoRegLoad(t *testing.T) {
SetProviderService(ms)
extension.SetProtocol("registry", GetProtocol)
- extension.SetCluster("registryAware", cluster_impl.NewRegistryAwareCluster)
+ extension.SetCluster(constant.ZONEAWARE_CLUSTER_NAME, cluster_impl.NewZoneAwareCluster)
extension.SetProxyFactory("default", proxy_factory.NewDefaultProxyFactory)
Load()
diff --git a/config/reference_config.go b/config/reference_config.go
index e9a895d57a90d9fd2d5d08dcd8706e5b2d058174..bbc875192c7a87354ccc81e28ea05bbc3bb71149 100644
--- a/config/reference_config.go
+++ b/config/reference_config.go
@@ -147,13 +147,26 @@ func (c *ReferenceConfig) Refer(_ interface{}) {
}
// TODO(decouple from directory, config should not depend on directory module)
+ var hitClu string
if regUrl != nil {
- cluster := extension.GetCluster("registryAware")
- c.invoker = cluster.Join(directory.NewStaticDirectory(invokers))
+ // for multi-subscription scenario, use 'zone-aware' policy by default
+ hitClu = constant.ZONEAWARE_CLUSTER_NAME
} else {
- cluster := extension.GetCluster(c.Cluster)
- c.invoker = cluster.Join(directory.NewStaticDirectory(invokers))
+ // not a registry url, must be direct invoke.
+ hitClu = constant.FAILOVER_CLUSTER_NAME
+ if len(invokers) > 0 {
+ u := invokers[0].GetUrl()
+ if nil != &u {
+ hitClu = u.GetParam(constant.CLUSTER_KEY, constant.ZONEAWARE_CLUSTER_NAME)
+ }
+ }
}
+
+ cluster := extension.GetCluster(hitClu)
+ // If 'zone-aware' policy select, the invoker wrap sequence would be:
+ // ZoneAwareClusterInvoker(StaticDirectory) ->
+ // FailoverClusterInvoker(RegistryDirectory, routing happens here) -> Invoker
+ c.invoker = cluster.Join(directory.NewStaticDirectory(invokers))
}
// create proxy
diff --git a/config/reference_config_test.go b/config/reference_config_test.go
index 45cdb2dfaca32f918bc067cf489a3c6fc4820dbc..e45780159615c8896627b003fd45c35af86d3f02 100644
--- a/config/reference_config_test.go
+++ b/config/reference_config_test.go
@@ -187,10 +187,10 @@ func doInitConsumerWithSingleRegistry() {
}
}
-func TestReferMultireg(t *testing.T) {
+func TestReferMultiReg(t *testing.T) {
doInitConsumer()
extension.SetProtocol("registry", GetProtocol)
- extension.SetCluster("registryAware", cluster_impl.NewRegistryAwareCluster)
+ extension.SetCluster(constant.ZONEAWARE_CLUSTER_NAME, cluster_impl.NewZoneAwareCluster)
for _, reference := range consumerConfig.References {
reference.Refer(nil)
@@ -203,7 +203,7 @@ func TestReferMultireg(t *testing.T) {
func TestRefer(t *testing.T) {
doInitConsumer()
extension.SetProtocol("registry", GetProtocol)
- extension.SetCluster("registryAware", cluster_impl.NewRegistryAwareCluster)
+ extension.SetCluster(constant.ZONEAWARE_CLUSTER_NAME, cluster_impl.NewZoneAwareCluster)
for _, reference := range consumerConfig.References {
reference.Refer(nil)
@@ -217,7 +217,7 @@ func TestRefer(t *testing.T) {
func TestReferAsync(t *testing.T) {
doInitConsumerAsync()
extension.SetProtocol("registry", GetProtocol)
- extension.SetCluster("registryAware", cluster_impl.NewRegistryAwareCluster)
+ extension.SetCluster(constant.ZONEAWARE_CLUSTER_NAME, cluster_impl.NewZoneAwareCluster)
for _, reference := range consumerConfig.References {
reference.Refer(nil)
@@ -275,7 +275,7 @@ func TestReferMultiP2PWithReg(t *testing.T) {
func TestImplement(t *testing.T) {
doInitConsumer()
extension.SetProtocol("registry", GetProtocol)
- extension.SetCluster("registryAware", cluster_impl.NewRegistryAwareCluster)
+ extension.SetCluster(constant.ZONEAWARE_CLUSTER_NAME, cluster_impl.NewZoneAwareCluster)
for _, reference := range consumerConfig.References {
reference.Refer(nil)
reference.Implement(&MockService{})
diff --git a/config/registry_config.go b/config/registry_config.go
index 703606b836906a8dbe9964e74da0edbe76991f48..89566c428ed14f460c0f214358c9fa05d529ddb6 100644
--- a/config/registry_config.go
+++ b/config/registry_config.go
@@ -41,11 +41,20 @@ type RegistryConfig struct {
Group string `yaml:"group" json:"group,omitempty" property:"group"`
TTL string `yaml:"ttl" default:"10m" json:"ttl,omitempty" property:"ttl"` // unit: minute
// for registry
- Address string `yaml:"address" json:"address,omitempty" property:"address"`
- Username string `yaml:"username" json:"username,omitempty" property:"username"`
- Password string `yaml:"password" json:"password,omitempty" property:"password"`
- Simplified bool `yaml:"simplified" json:"simplified,omitempty" property:"simplified"`
- Params map[string]string `yaml:"params" json:"params,omitempty" property:"params"`
+ Address string `yaml:"address" json:"address,omitempty" property:"address"`
+ Username string `yaml:"username" json:"username,omitempty" property:"username"`
+ Password string `yaml:"password" json:"password,omitempty" property:"password"`
+ Simplified bool `yaml:"simplified" json:"simplified,omitempty" property:"simplified"`
+ // Always use this registry first if set to true, useful when subscribe to multiple registries
+ Preferred bool `yaml:"preferred" json:"preferred,omitempty" property:"preferred"`
+ // The region where the registry belongs, usually used to isolate traffics
+ Zone string `yaml:"zone" json:"zone,omitempty" property:"zone"`
+ //// Force must user the region, property zone is specified.
+ //ZoneForce bool `yaml:"zoneForce" json:"zoneForce,omitempty" property:"zoneForce"`
+ // Affects traffic distribution among registries,
+ // useful when subscribe to multiple registries Take effect only when no preferred registry is specified.
+ Weight int64 `yaml:"weight" json:"params,omitempty" property:"weight"`
+ Params map[string]string `yaml:"params" json:"params,omitempty" property:"params"`
}
// UnmarshalYAML unmarshals the RegistryConfig by @unmarshal function
@@ -119,6 +128,12 @@ func (c *RegistryConfig) getUrlMap(roleType common.RoleType) url.Values {
urlMap.Set(constant.ROLE_KEY, strconv.Itoa(int(roleType)))
urlMap.Set(constant.REGISTRY_KEY, c.Protocol)
urlMap.Set(constant.REGISTRY_TIMEOUT_KEY, c.TimeoutStr)
+ // multi registry invoker weight label for load balance
+ urlMap.Set(constant.REGISTRY_KEY+"."+constant.REGISTRY_LABEL_KEY, strconv.FormatBool(true))
+ urlMap.Set(constant.REGISTRY_KEY+"."+constant.PREFERRED_KEY, strconv.FormatBool(c.Preferred))
+ urlMap.Set(constant.REGISTRY_KEY+"."+constant.ZONE_KEY, c.Zone)
+ //urlMap.Set(constant.REGISTRY_KEY+"."+constant.ZONE_FORCE_KEY, strconv.FormatBool(c.ZoneForce))
+ urlMap.Set(constant.REGISTRY_KEY+"."+constant.WEIGHT_KEY, strconv.FormatInt(c.Weight, 10))
urlMap.Set(constant.REGISTRY_TTL_KEY, c.TTL)
for k, v := range c.Params {
urlMap.Set(k, v)
diff --git a/config/router_config.go b/config/router_config.go
index 16a2bec918d0f9a2de2174324e78ca21e853dabf..ed42577ed3cce2e5a1ab0da290f0d5450553d8fb 100644
--- a/config/router_config.go
+++ b/config/router_config.go
@@ -23,6 +23,7 @@ import (
)
import (
+ "github.com/apache/dubbo-go/cluster/router"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/common/yaml"
@@ -32,16 +33,37 @@ var (
routerURLSet = gxset.NewSet()
)
+// LocalRouterRules defines the local router config structure
+type LocalRouterRules struct {
+ RouterRules []interface{} `yaml:"routerRules"`
+}
+
// RouterInit Load config file to init router config
func RouterInit(confRouterFile string) error {
- fileRouterFactories := extension.GetFileRouterFactories()
bytes, err := yaml.LoadYMLConfig(confRouterFile)
if err != nil {
return perrors.Errorf("ioutil.ReadFile(file:%s) = error:%v", confRouterFile, perrors.WithStack(err))
}
- logger.Warnf("get fileRouterFactories len{%+v})", len(fileRouterFactories))
- for k, factory := range fileRouterFactories {
- r, e := factory.NewFileRouter(bytes)
+ routerRules := &LocalRouterRules{}
+ err = yaml.UnmarshalYML(bytes, routerRules)
+ if err != nil {
+ return perrors.Errorf("Load router file %s failed due to error: %v", confRouterFile, perrors.WithStack(err))
+ }
+ if len(routerRules.RouterRules) == 0 {
+ return perrors.Errorf("No router configurations in file %s", confRouterFile)
+ }
+ fileRouterFactories := extension.GetFileRouterFactories()
+ for _, v := range routerRules.RouterRules {
+ content, _ := yaml.MarshalYML(v)
+ err = initRouterConfig(content, fileRouterFactories)
+ }
+ return err
+}
+
+func initRouterConfig(content []byte, factories map[string]router.FilePriorityRouterFactory) error {
+ logger.Warnf("get fileRouterFactories len{%+v})", len(factories))
+ for k, factory := range factories {
+ r, e := factory.NewFileRouter(content)
if e == nil {
url := r.URL()
routerURLSet.Add(&url)
@@ -52,6 +74,7 @@ func RouterInit(confRouterFile string) error {
return perrors.Errorf("no file router exists for parse %s , implement router.FIleRouterFactory please.", confRouterFile)
}
+// GetRouterURLSet exposes the routerURLSet
func GetRouterURLSet() *gxset.HashSet {
return routerURLSet
}
diff --git a/config/router_config_test.go b/config/router_config_test.go
index 72e51c1c82562b03736fd0afef79b78d83d6f4f3..13af7056d5280ef4cca3c0f9ede9397407df7478 100644
--- a/config/router_config_test.go
+++ b/config/router_config_test.go
@@ -23,6 +23,7 @@ import (
)
import (
+ "github.com/dubbogo/gost/container/set"
"github.com/stretchr/testify/assert"
)
@@ -31,6 +32,7 @@ import (
)
const testYML = "testdata/router_config.yml"
+const testMultiRouterYML = "testdata/router_multi_config.yml"
const errorTestYML = "testdata/router_config_error.yml"
func TestString(t *testing.T) {
@@ -63,4 +65,10 @@ func TestRouterInit(t *testing.T) {
assert.NoError(t, errPro)
assert.Equal(t, 1, routerURLSet.Size())
+
+ routerURLSet = gxset.NewSet()
+ errPro = RouterInit(testMultiRouterYML)
+ assert.NoError(t, errPro)
+
+ assert.Equal(t, 2, routerURLSet.Size())
}
diff --git a/config/service_config_test.go b/config/service_config_test.go
index 0f7e404f6e336b8ad254e4007825ecbfcaf9d78b..d2bbda0c49ee18dfeb8e6475203f09d4c2fbb3b9 100644
--- a/config/service_config_test.go
+++ b/config/service_config_test.go
@@ -153,7 +153,7 @@ func TestExport(t *testing.T) {
providerConfig = nil
}
-func TestgetRandomPort(t *testing.T) {
+func TestGetRandomPort(t *testing.T) {
protocolConfigs := make([]*ProtocolConfig, 0, 3)
ip, err := gxnet.GetLocalIP()
diff --git a/config/testdata/router_config.yml b/config/testdata/router_config.yml
index f6b91f5da7d95256e6279924b884bfd450c45a08..1845650d93c152585c4d4caf60ab7aa3d5b04887 100644
--- a/config/testdata/router_config.yml
+++ b/config/testdata/router_config.yml
@@ -1,6 +1,9 @@
# dubbo router yaml configure file
-priority: 1
-force: true
-conditions :
- - "a => b"
- - "c => d"
\ No newline at end of file
+routerRules:
+ - scope: application
+ key: mock-app
+ priority: 1
+ force: true
+ conditions :
+ - "a => b"
+ - "c => d"
\ No newline at end of file
diff --git a/config/testdata/router_config_error.yml b/config/testdata/router_config_error.yml
index 37894ac96474281d10131b53d8e644f10a18b14e..74e89cc52ec772a52c3c424a94276075fb099f8c 100644
--- a/config/testdata/router_config_error.yml
+++ b/config/testdata/router_config_error.yml
@@ -1,6 +1,7 @@
# dubbo router yaml configure file
-priority: 1
-force: true
-noConditions :
- - "a => b"
- - "c => d"
\ No newline at end of file
+routerRules:
+ - priority: 1
+ force: true
+ noConditions :
+ - "a => b"
+ - "c => d"
\ No newline at end of file
diff --git a/config/testdata/router_multi_config.yml b/config/testdata/router_multi_config.yml
new file mode 100644
index 0000000000000000000000000000000000000000..42bb4cbe70a43dc82ad4a4849059ad4344e6fd85
--- /dev/null
+++ b/config/testdata/router_multi_config.yml
@@ -0,0 +1,16 @@
+# dubbo router yaml configure file
+routerRules:
+ - scope: application
+ key: mock-app
+ priority: 1
+ force: true
+ conditions :
+ - "a => b"
+ - "c => d"
+ - scope: application
+ key: mock-app2
+ priority: 1
+ force: true
+ conditions :
+ - "a => b"
+ - "c => d"
\ No newline at end of file
diff --git a/go.mod b/go.mod
index c19627378261221b8a9565730ed62f7cc367e5e2..197f2a3012896affd499ab95dd5abd9263d4010f 100644
--- a/go.mod
+++ b/go.mod
@@ -67,3 +67,5 @@ require (
)
go 1.13
+
+replace launchpad.net/gocheck => github.com/go-check/check v0.0.0-20140225173054-eb6ee6f84d0a
diff --git a/go.sum b/go.sum
index aa6ecc86e25276eae2747aa5291f718713d164bd..6df5fc5248feb0281deeb3bc57e83453ae52ea7a 100644
--- a/go.sum
+++ b/go.sum
@@ -184,6 +184,7 @@ github.com/ghodss/yaml v1.0.1-0.20190212211648-25d852aebe32 h1:Mn26/9ZMNWSw9C9ER
github.com/ghodss/yaml v1.0.1-0.20190212211648-25d852aebe32/go.mod h1:GIjDIg/heH5DOkXY3YJ/wNhfHsQHoXGjl8G8amsYQ1I=
github.com/go-asn1-ber/asn1-ber v1.3.1 h1:gvPdv/Hr++TRFCl0UbPFHC54P9N9jgsRPnmnr419Uck=
github.com/go-asn1-ber/asn1-ber v1.3.1/go.mod h1:hEBeB/ic+5LoWskz+yKT7vGhhPYkProFKoKdwZRWMe0=
+github.com/go-check/check v0.0.0-20140225173054-eb6ee6f84d0a/go.mod h1:9ES+weclKsC9YodN5RgxqK/VD9HM9JsCSh7rNhMZE98=
github.com/go-co-op/gocron v0.1.1 h1:OfDmkqkCguFtFMsm6Eaayci3DADLa8pXvdmOlPU/JcU=
github.com/go-co-op/gocron v0.1.1/go.mod h1:Y9PWlYqDChf2Nbgg7kfS+ZsXHDTZbMZYPEQ0MILqH+M=
github.com/go-errors/errors v1.0.1 h1:LUHzmkK3GUKUrL/1gfBUxAHzcev3apQlezX/+O7ma6w=
@@ -898,7 +899,6 @@ k8s.io/kube-openapi v0.0.0-20191107075043-30be4d16710a h1:UcxjrRMyNx/i/y8G7kPvLy
k8s.io/kube-openapi v0.0.0-20191107075043-30be4d16710a/go.mod h1:1TqjTSzOxsLGIKfj0lK8EeCP7K1iUG65v09OM0/WG5E=
k8s.io/utils v0.0.0-20190801114015-581e00157fb1 h1:+ySTxfHnfzZb9ys375PXNlLhkJPLKgHajBU0N62BDvE=
k8s.io/utils v0.0.0-20190801114015-581e00157fb1/go.mod h1:sZAwmy6armz5eXlNoLmJcl4F1QuKu7sr+mFQ0byX7Ew=
-launchpad.net/gocheck v0.0.0-20140225173054-000000000087/go.mod h1:hj7XX3B/0A+80Vse0e+BUHsHMTEhd0O4cpUHr/e/BUM=
sigs.k8s.io/structured-merge-diff v0.0.0-20190525122527-15d366b2352e/go.mod h1:wWxsB5ozmmv/SG7nM11ayaAW51xMvak/t1r0CSlcokI=
sigs.k8s.io/yaml v1.1.0 h1:4A07+ZFc2wgJwo8YNlQpr1rVlgUDlxXHhPJciaPY5gs=
sigs.k8s.io/yaml v1.1.0/go.mod h1:UJmg0vDUVViEyp3mgSv9WPwZCDxu4rQW1olrI1uml+o=
diff --git a/protocol/dubbo/client.go b/protocol/dubbo/client.go
index 6d1b771bf4108d17372e0ceb5ca818323278afd2..253427f3019401a76e862e65203adf868e0e45af 100644
--- a/protocol/dubbo/client.go
+++ b/protocol/dubbo/client.go
@@ -193,7 +193,7 @@ type Response struct {
atta map[string]string
}
-// NewResponse create a new Response.
+// nolint
func NewResponse(reply interface{}, atta map[string]string) *Response {
return &Response{
reply: reply,
diff --git a/protocol/dubbo/codec.go b/protocol/dubbo/codec.go
index 8ba725e3cebd8261bc0adeea0e6fb8ef53f383a0..9781c70115e3d8b9e41c3418ae7b859608651ee8 100644
--- a/protocol/dubbo/codec.go
+++ b/protocol/dubbo/codec.go
@@ -82,7 +82,7 @@ func (p *DubboPackage) Marshal() (*bytes.Buffer, error) {
return bytes.NewBuffer(pkg), nil
}
-// Unmarshal decode hessian package.
+// Unmarshal decodes hessian package.
func (p *DubboPackage) Unmarshal(buf *bytes.Buffer, opts ...interface{}) error {
// fix issue https://github.com/apache/dubbo-go/issues/380
bufLen := buf.Len()
diff --git a/protocol/grpc/server.go b/protocol/grpc/server.go
index 4017b65dd5c35ef19795b390e40d7afff6699306..2b7b1adddf573e3b84db32a11cfc286ff22a0276 100644
--- a/protocol/grpc/server.go
+++ b/protocol/grpc/server.go
@@ -69,7 +69,7 @@ func (s *Server) Start(url common.URL) {
panic(err)
}
- // if global trace instance was set , then server tracer instance can be get. If not , will return Nooptracer
+ // if global trace instance was set, then server tracer instance can be get. If not , will return Nooptracer
tracer := opentracing.GlobalTracer()
server := grpc.NewServer(
grpc.UnaryInterceptor(otgrpc.OpenTracingServerInterceptor(tracer)))
diff --git a/protocol/invocation.go b/protocol/invocation.go
index ba5949794c0120874ebdf31cfb1fd9c7d8ac08e4..296ec0540c1eed69c30e1b1477be038a4a9cc00e 100644
--- a/protocol/invocation.go
+++ b/protocol/invocation.go
@@ -41,6 +41,8 @@ type Invocation interface {
Attributes() map[string]interface{}
// AttributeByKey gets attribute by key , if nil then return default value
AttributeByKey(string, interface{}) interface{}
+ // SetAttachments sets attribute by @key and @value.
+ SetAttachments(key string, value string)
// Invoker gets the invoker in current context.
Invoker() Invoker
}
diff --git a/protocol/invocation/rpcinvocation.go b/protocol/invocation/rpcinvocation.go
index 953d50ca46d3e5613655720affdab9bfd7ae9a67..c72e105d3594195457a7d7abcccc8badb85c678b 100644
--- a/protocol/invocation/rpcinvocation.go
+++ b/protocol/invocation/rpcinvocation.go
@@ -27,7 +27,7 @@ import (
)
// ///////////////////////////
-// Invocation Impletment of RPC
+// Invocation Implement of RPC
// ///////////////////////////
// todo: is it necessary to separate fields of consumer(provider) from RPCInvocation
@@ -103,7 +103,7 @@ func (r *RPCInvocation) Attachments() map[string]string {
return r.attachments
}
-// AttachmentsByKey gets RPC attachment by key , if nil then return default value.
+// AttachmentsByKey gets RPC attachment by key, if nil then return default value.
func (r *RPCInvocation) AttachmentsByKey(key string, defaultValue string) string {
r.lock.RLock()
defer r.lock.RUnlock()
diff --git a/protocol/result.go b/protocol/result.go
index 2e7a6e492a60888ec9d9f420c77e6b77aee6aa70..2a33be612fd1f319c8c46cbd480865d5564b189d 100644
--- a/protocol/result.go
+++ b/protocol/result.go
@@ -38,7 +38,7 @@ type Result interface {
}
/////////////////////////////
-// Result Impletment of RPC
+// Result Implement of RPC
/////////////////////////////
// RPCResult is default RPC result.
diff --git a/registry/etcdv3/service_discovery.go b/registry/etcdv3/service_discovery.go
index f381ba70d64a4bbefcb09b48a11e52eec951f5ec..dceaa99df8061c6f46baa52eb6f5cebe4477f120 100644
--- a/registry/etcdv3/service_discovery.go
+++ b/registry/etcdv3/service_discovery.go
@@ -71,7 +71,7 @@ func (e *etcdV3ServiceDiscovery) String() string {
return e.descriptor
}
-// Destory service discovery
+// Destroy service discovery
func (e *etcdV3ServiceDiscovery) Destroy() error {
if e.client != nil {
e.client.Close()
diff --git a/registry/nacos/listener.go b/registry/nacos/listener.go
index 36f733df5a32f57e3410a2f31f9ab4b0af735d49..669900736214178799294f1ba1f97841bb6b81a4 100644
--- a/registry/nacos/listener.go
+++ b/registry/nacos/listener.go
@@ -132,10 +132,10 @@ func (nl *nacosListener) Callback(services []model.SubscribeService, err error)
instance := generateInstance(services[i])
newInstanceMap[host] = instance
if old, ok := nl.instanceMap[host]; !ok {
- //instance is not exsit in cache,add it to cache
+ // instance is not exist in cache, add it to cache
addInstances = append(addInstances, instance)
} else {
- //instance is not different from cache,update it to cache
+ // instance is not different from cache, update it to cache
if !reflect.DeepEqual(old, instance) {
updateInstances = append(updateInstances, instance)
}
@@ -144,7 +144,7 @@ func (nl *nacosListener) Callback(services []model.SubscribeService, err error)
for host, inst := range nl.instanceMap {
if _, ok := newInstanceMap[host]; !ok {
- //cache instance is not exsit in new instance list, remove it from cache
+ // cache instance is not exist in new instance list, remove it from cache
delInstances = append(delInstances, inst)
}
}
diff --git a/registry/nacos/service_discovery_test.go b/registry/nacos/service_discovery_test.go
index 24412999c590020852eca676dfaca55eafe13117..119be0b3aad3a828470c8c72c775abaada9512c2 100644
--- a/registry/nacos/service_discovery_test.go
+++ b/registry/nacos/service_discovery_test.go
@@ -139,9 +139,7 @@ func TestNacosServiceDiscovery_CRUD(t *testing.T) {
err = serviceDiscovery.Update(instance)
assert.Nil(t, err)
- //sometimes nacos may be failed to push update of instance,
- //so it need 10s to pull, we sleep 10 second to make sure instance has been update
- time.Sleep(11 * time.Second)
+ time.Sleep(5 * time.Second)
pageMap := serviceDiscovery.GetRequestInstances([]string{serviceName}, 0, 1)
assert.Equal(t, 1, len(pageMap))
@@ -158,7 +156,7 @@ func TestNacosServiceDiscovery_CRUD(t *testing.T) {
assert.Nil(t, err)
// test AddListener
- err = serviceDiscovery.AddListener(®istry.ServiceInstancesChangedListener{})
+ err = serviceDiscovery.AddListener(®istry.ServiceInstancesChangedListener{ServiceName: serviceName})
assert.Nil(t, err)
}
diff --git a/registry/registry.go b/registry/registry.go
index bb09ead7ef2af6707345086f8695b35286d76a10..5e77eab186680671f27b44bbe2e6a6b964a28721 100644
--- a/registry/registry.go
+++ b/registry/registry.go
@@ -46,7 +46,7 @@ type Registry interface {
//Deprecated!
//subscribe(event.URL) (Listener, error)
- //Will relace mode1 in dubbogo version v1.1.0
+ //Will replace mode1 in dubbogo version v1.1.0
//mode2 : callback mode, subscribe with notify(notify listener).
Subscribe(*common.URL, NotifyListener) error
diff --git a/remoting/kubernetes/registry_controller.go b/remoting/kubernetes/registry_controller.go
index f93a00a6f2df6022d0436f56e8c719f108be66f3..20be0d72ec1ce4c379f44d3218ffbd0cfd3d2a63 100644
--- a/remoting/kubernetes/registry_controller.go
+++ b/remoting/kubernetes/registry_controller.go
@@ -567,7 +567,7 @@ func (c *dubboRegistryController) addAnnotationForCurrentPod(k string, v string)
c.lock.Lock()
defer c.lock.Unlock()
- // 1. accord old pod && (k, v) assemble new pod dubbo annotion v
+ // 1. accord old pod && (k, v) assemble new pod dubbo annotation v
// 2. get patch data
// 3. PATCH the pod
currentPod, err := c.readCurrentPod()
diff --git a/remoting/zookeeper/client.go b/remoting/zookeeper/client.go
index 3db743ed584898baaca09a7b3ec871e6253459d7..fbd90762eb34f361a38486ef2d8f5f10699a96f7 100644
--- a/remoting/zookeeper/client.go
+++ b/remoting/zookeeper/client.go
@@ -190,7 +190,7 @@ func NewZookeeperClient(name string, zkAddrs []string, timeout time.Duration) (*
return z, nil
}
-// WithTestCluster sets test cluser for zk client
+// WithTestCluster sets test cluster for zk client
func WithTestCluster(ts *zk.TestCluster) Option {
return func(opt *Options) {
opt.ts = ts
diff --git a/test/integrate/dubbo/go-client/log.yml b/test/integrate/dubbo/go-client/log.yml
index 59fa4279ad85272c4c49d532beaf23b74d00f58a..21f97bcbc40218ab23a6fc022b148d321cbe23cc 100644
--- a/test/integrate/dubbo/go-client/log.yml
+++ b/test/integrate/dubbo/go-client/log.yml
@@ -1,6 +1,5 @@
-
level: "debug"
-development: true
+development: false
disableCaller: false
disableStacktrace: false
sampling:
diff --git a/test/integrate/dubbo/go-server/log.yml b/test/integrate/dubbo/go-server/log.yml
index 59fa4279ad85272c4c49d532beaf23b74d00f58a..21f97bcbc40218ab23a6fc022b148d321cbe23cc 100644
--- a/test/integrate/dubbo/go-server/log.yml
+++ b/test/integrate/dubbo/go-server/log.yml
@@ -1,6 +1,5 @@
-
level: "debug"
-development: true
+development: false
disableCaller: false
disableStacktrace: false
sampling: