Skip to content
Snippets Groups Projects
Commit c135d03d authored by zhangshen023's avatar zhangshen023
Browse files

Merge remote-tracking branch 'origin/develop' into develop

parents d5dfaf97 d542f620
No related branches found
No related tags found
No related merge requests found
Showing
with 906 additions and 67 deletions
# Release Notes
---
## 1.5.1
### New Features
- [Add dynamic tag router](https://github.com/apache/dubbo-go/pull/703)
- [Add TLS support](https://github.com/apache/dubbo-go/pull/685)
- [Add Nearest first for multiple registry](https://github.com/apache/dubbo-go/pull/659)
- [Add application and service level router](https://github.com/apache/dubbo-go/pull/662)
- [Add dynamic tag router](https://github.com/apache/dubbo-go/pull/665)
### Enhancement
- [Avoid init the log twice](https://github.com/apache/dubbo-go/pull/719)
- [Correct words and format codes](https://github.com/apache/dubbo-go/pull/704)
- [Change log stack level from warn to error](https://github.com/apache/dubbo-go/pull/702)
- [Optimize remotes configuration](https://github.com/apache/dubbo-go/pull/687)
### Bugfixes
- [Fix register service instance after provider config load](https://github.com/apache/dubbo-go/pull/694)
- [Fix call subscribe function asynchronously](https://github.com/apache/dubbo-go/pull/721)
- [Fix tag router rule copy](https://github.com/apache/dubbo-go/pull/721)
- [Fix nacos unit test failed](https://github.com/apache/dubbo-go/pull/705)
- [Fix can not inovke nacos destroy when graceful shutdown](https://github.com/apache/dubbo-go/pull/689)
- [Fix zk lost event](https://github.com/apache/dubbo-go/pull/692)
- [Fix k8s ut bug](https://github.com/apache/dubbo-go/pull/693)
Milestone: [https://github.com/apache/dubbo-go/milestone/2?closed=1](https://github.com/apache/dubbo-go/milestone/2?closed=1)
Project: [https://github.com/apache/dubbo-go/projects/8](https://github.com/apache/dubbo-go/projects/8)
## 1.5.0
### New Features
......
......@@ -16,6 +16,8 @@ Apache License, Version 2.0
## Release note ##
[v1.5.1 - Aug 23, 2020](https://github.com/apache/dubbo-go/releases/tag/v1.5.1)
[v1.5.0 - July 24, 2020](https://github.com/apache/dubbo-go/releases/tag/v1.5.0)
[v1.4.0 - Mar 17, 2020](https://github.com/apache/dubbo-go/releases/tag/v1.4.0)
......@@ -34,7 +36,7 @@ Both extension module and layered project architecture is according to Apache Du
![dubbo go extend](./doc/pic/arch/dubbo-go-ext.png)
If you wanna know more about dubbo-go, please visit this reference [Project Architeture design](https://github.com/apache/dubbo-go/wiki/dubbo-go-V1.0-design)
If you wanna know more about dubbo-go, please visit this reference [Project Architecture design](https://github.com/apache/dubbo-go/wiki/dubbo-go-V1.0-design)
## Feature list ##
......
......@@ -36,5 +36,8 @@ xcopy /f "%zkJar%" "cluster/router/chain/zookeeper-4unittest/contrib/fatjar/"
md cluster\router\condition\zookeeper-4unittest\contrib\fatjar
xcopy /f "%zkJar%" "cluster/router/condition/zookeeper-4unittest/contrib/fatjar/"
mkdir -p cluster/router/tag/zookeeper-4unittest/contrib/fatjar
cp ${zkJar} cluster/router/tag/zookeeper-4unittest/contrib/fatjar
md metadata\report\zookeeper\zookeeper-4unittest\contrib\fatjar
xcopy /f "%zkJar%" "metadata/report/zookeeper/zookeeper-4unittest/contrib/fatjar/"
\ No newline at end of file
......@@ -36,5 +36,8 @@ cp ${zkJar} cluster/router/chain/zookeeper-4unittest/contrib/fatjar
mkdir -p cluster/router/condition/zookeeper-4unittest/contrib/fatjar
cp ${zkJar} cluster/router/condition/zookeeper-4unittest/contrib/fatjar
mkdir -p cluster/router/tag/zookeeper-4unittest/contrib/fatjar
cp ${zkJar} cluster/router/tag/zookeeper-4unittest/contrib/fatjar
mkdir -p metadata/report/zookeeper/zookeeper-4unittest/contrib/fatjar
cp ${zkJar} metadata/report/zookeeper/zookeeper-4unittest/contrib/fatjar
\ No newline at end of file
......@@ -17,6 +17,10 @@
package cluster_impl
import (
"context"
)
import (
gxnet "github.com/dubbogo/gost/net"
perrors "github.com/pkg/errors"
......@@ -36,6 +40,7 @@ type baseClusterInvoker struct {
availablecheck bool
destroyed *atomic.Bool
stickyInvoker protocol.Invoker
interceptor cluster.ClusterInterceptor
}
func newBaseClusterInvoker(directory cluster.Directory) baseClusterInvoker {
......@@ -146,6 +151,20 @@ func (invoker *baseClusterInvoker) doSelectInvoker(lb cluster.LoadBalance, invoc
return selectedInvoker
}
func (invoker *baseClusterInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result {
if invoker.interceptor != nil {
invoker.interceptor.BeforeInvoker(ctx, invocation)
result := invoker.interceptor.DoInvoke(ctx, invocation)
invoker.interceptor.AfterInvoker(ctx, invocation)
return result
}
return nil
}
func isInvoked(selectedInvoker protocol.Invoker, invoked []protocol.Invoker) bool {
for _, i := range invoked {
if i == selectedInvoker {
......
......@@ -19,16 +19,15 @@ package cluster_impl
import (
"github.com/apache/dubbo-go/cluster"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/protocol"
)
type failoverCluster struct{}
const name = "failover"
func init() {
extension.SetCluster(name, NewFailoverCluster)
extension.SetCluster(constant.FAILOVER_CLUSTER_NAME, NewFailoverCluster)
}
// NewFailoverCluster returns a failover cluster instance
......
......@@ -19,22 +19,26 @@ package cluster_impl
import (
"github.com/apache/dubbo-go/cluster"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/protocol"
)
type registryAwareCluster struct{}
type zoneAwareCluster struct{}
func init() {
extension.SetCluster("registryAware", NewRegistryAwareCluster)
extension.SetCluster(constant.ZONEAWARE_CLUSTER_NAME, NewZoneAwareCluster)
}
// NewRegistryAwareCluster returns a registry aware cluster instance
func NewRegistryAwareCluster() cluster.Cluster {
return &registryAwareCluster{}
// NewZoneAwareCluster returns a zoneaware cluster instance.
//
// More than one registry for subscription.
// Usually it is used for choose between registries.
func NewZoneAwareCluster() cluster.Cluster {
return &zoneAwareCluster{}
}
// nolint
func (cluster *registryAwareCluster) Join(directory cluster.Directory) protocol.Invoker {
return newRegistryAwareClusterInvoker(directory)
// Join returns a zoneAwareClusterInvoker instance
func (cluster *zoneAwareCluster) Join(directory cluster.Directory) protocol.Invoker {
return newZoneAwareClusterInvoker(directory)
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cluster_impl
import (
"context"
"fmt"
)
import (
"github.com/apache/dubbo-go/cluster"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/protocol"
)
// When there're more than one registry for subscription.
//
// This extension provides a strategy to decide how to distribute traffics among them:
// 1. registry marked as 'preferred=true' has the highest priority.
// 2. check the zone the current request belongs, pick the registry that has the same zone first.
// 3. Evenly balance traffic between all registries based on each registry's weight.
// 4. Pick anyone that's available.
type zoneAwareClusterInvoker struct {
baseClusterInvoker
}
func newZoneAwareClusterInvoker(directory cluster.Directory) protocol.Invoker {
invoke := &zoneAwareClusterInvoker{
baseClusterInvoker: newBaseClusterInvoker(directory),
}
// add self to interceptor
invoke.interceptor = invoke
return invoke
}
// nolint
func (invoker *zoneAwareClusterInvoker) DoInvoke(ctx context.Context, invocation protocol.Invocation) protocol.Result {
invokers := invoker.directory.List(invocation)
err := invoker.checkInvokers(invokers, invocation)
if err != nil {
return &protocol.RPCResult{Err: err}
}
// First, pick the invoker (XXXClusterInvoker) that comes from the local registry, distinguish by a 'preferred' key.
for _, invoker := range invokers {
key := constant.REGISTRY_KEY + "." + constant.PREFERRED_KEY
if invoker.IsAvailable() && matchParam("true", key, "false", invoker) {
return invoker.Invoke(ctx, invocation)
}
}
// providers in the registry with the same zone
key := constant.REGISTRY_KEY + "." + constant.ZONE_KEY
zone := invocation.AttachmentsByKey(key, "")
if "" != zone {
for _, invoker := range invokers {
if invoker.IsAvailable() && matchParam(zone, key, "", invoker) {
return invoker.Invoke(ctx, invocation)
}
}
force := invocation.AttachmentsByKey(constant.REGISTRY_KEY+"."+constant.ZONE_FORCE_KEY, "")
if "true" == force {
return &protocol.RPCResult{
Err: fmt.Errorf("no registry instance in zone or "+
"no available providers in the registry, zone: %v, "+
" registries: %v", zone, invoker.GetUrl()),
}
}
}
// load balance among all registries, with registry weight count in.
loadBalance := getLoadBalance(invokers[0], invocation)
ivk := invoker.doSelect(loadBalance, invocation, invokers, nil)
if ivk != nil && ivk.IsAvailable() {
return ivk.Invoke(ctx, invocation)
}
// If none of the invokers has a preferred signal or is picked by the loadBalancer, pick the first one available.
for _, invoker := range invokers {
if invoker.IsAvailable() {
return invoker.Invoke(ctx, invocation)
}
}
return &protocol.RPCResult{
Err: fmt.Errorf("no provider available in %v", invokers),
}
}
func (invoker *zoneAwareClusterInvoker) BeforeInvoker(ctx context.Context, invocation protocol.Invocation) {
key := constant.REGISTRY_KEY + "." + constant.ZONE_FORCE_KEY
force := ctx.Value(key)
if force != nil {
switch value := force.(type) {
case bool:
if value {
invocation.SetAttachments(key, "true")
}
case string:
if "true" == value {
invocation.SetAttachments(key, "true")
}
default:
// ignore
}
}
}
func (invoker *zoneAwareClusterInvoker) AfterInvoker(ctx context.Context, invocation protocol.Invocation) {
}
func matchParam(target, key, def string, invoker protocol.Invoker) bool {
return target == invoker.GetUrl().GetParam(key, def)
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cluster_impl
import (
"context"
"fmt"
"testing"
)
import (
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
)
import (
"github.com/apache/dubbo-go/cluster/directory"
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/protocol"
"github.com/apache/dubbo-go/protocol/invocation"
"github.com/apache/dubbo-go/protocol/mock"
)
func TestZoneWareInvokerWithPreferredSuccess(t *testing.T) {
ctrl := gomock.NewController(t)
// In Go versions 1.14+, if you pass a *testing.T
// into gomock.NewController(t) you no longer need to call ctrl.Finish().
//defer ctrl.Finish()
mockResult := &protocol.RPCResult{
Attrs: map[string]string{constant.PREFERRED_KEY: "true"},
Rest: rest{tried: 0, success: true}}
var invokers []protocol.Invoker
for i := 0; i < 2; i++ {
url, _ := common.NewURL(fmt.Sprintf("dubbo://192.168.1.%v:20000/com.ikurento.user.UserProvider", i))
invoker := mock.NewMockInvoker(ctrl)
invoker.EXPECT().IsAvailable().Return(true).AnyTimes()
invoker.EXPECT().GetUrl().Return(url).AnyTimes()
if 0 == i {
url.SetParam(constant.REGISTRY_KEY+"."+constant.PREFERRED_KEY, "true")
invoker.EXPECT().Invoke(gomock.Any()).DoAndReturn(
func(invocation protocol.Invocation) protocol.Result {
return mockResult
})
} else {
invoker.EXPECT().Invoke(gomock.Any()).DoAndReturn(
func(invocation protocol.Invocation) protocol.Result {
return &protocol.RPCResult{}
})
}
invokers = append(invokers, invoker)
}
zoneAwareCluster := NewZoneAwareCluster()
staticDir := directory.NewStaticDirectory(invokers)
clusterInvoker := zoneAwareCluster.Join(staticDir)
result := clusterInvoker.Invoke(context.Background(), &invocation.RPCInvocation{})
assert.Equal(t, mockResult, result)
}
func TestZoneWareInvokerWithWeightSuccess(t *testing.T) {
ctrl := gomock.NewController(t)
// In Go versions 1.14+, if you pass a *testing.T
// into gomock.NewController(t) you no longer need to call ctrl.Finish().
//defer ctrl.Finish()
w1 := "50"
w2 := "200"
var invokers []protocol.Invoker
for i := 0; i < 2; i++ {
url, _ := common.NewURL(fmt.Sprintf("dubbo://192.168.1.%v:20000/com.ikurento.user.UserProvider", i))
invoker := mock.NewMockInvoker(ctrl)
invoker.EXPECT().IsAvailable().Return(true).AnyTimes()
invoker.EXPECT().GetUrl().Return(url).AnyTimes()
url.SetParam(constant.REGISTRY_KEY+"."+constant.REGISTRY_LABEL_KEY, "true")
if 1 == i {
url.SetParam(constant.REGISTRY_KEY+"."+constant.WEIGHT_KEY, w1)
invoker.EXPECT().Invoke(gomock.Any()).DoAndReturn(
func(invocation protocol.Invocation) protocol.Result {
return &protocol.RPCResult{
Attrs: map[string]string{constant.WEIGHT_KEY: w1},
Rest: rest{tried: 0, success: true}}
}).MaxTimes(100)
} else {
url.SetParam(constant.REGISTRY_KEY+"."+constant.WEIGHT_KEY, w2)
invoker.EXPECT().Invoke(gomock.Any()).DoAndReturn(
func(invocation protocol.Invocation) protocol.Result {
return &protocol.RPCResult{
Attrs: map[string]string{constant.WEIGHT_KEY: w2},
Rest: rest{tried: 0, success: true}}
}).MaxTimes(100)
}
invokers = append(invokers, invoker)
}
zoneAwareCluster := NewZoneAwareCluster()
staticDir := directory.NewStaticDirectory(invokers)
clusterInvoker := zoneAwareCluster.Join(staticDir)
var w2Count, w1Count int
loop := 50
for i := 0; i < loop; i++ {
result := clusterInvoker.Invoke(context.Background(), &invocation.RPCInvocation{})
if w2 == result.Attachment(constant.WEIGHT_KEY, "0") {
w2Count++
}
if w1 == result.Attachment(constant.WEIGHT_KEY, "0") {
w1Count++
}
assert.NoError(t, result.Error())
}
t.Logf("loop count : %d, w1 height : %s | count : %d, w2 height : %s | count : %d", loop,
w1, w1Count, w2, w2Count)
}
func TestZoneWareInvokerWithZoneSuccess(t *testing.T) {
var zoneArray = []string{"hangzhou", "shanghai"}
ctrl := gomock.NewController(t)
// In Go versions 1.14+, if you pass a *testing.T
// into gomock.NewController(t) you no longer need to call ctrl.Finish().
//defer ctrl.Finish()
var invokers []protocol.Invoker
for i := 0; i < 2; i++ {
zoneValue := zoneArray[i]
url, _ := common.NewURL(fmt.Sprintf("dubbo://192.168.1.%v:20000/com.ikurento.user.UserProvider", i))
url.SetParam(constant.REGISTRY_KEY+"."+constant.ZONE_KEY, zoneValue)
invoker := mock.NewMockInvoker(ctrl)
invoker.EXPECT().IsAvailable().Return(true).AnyTimes()
invoker.EXPECT().GetUrl().Return(url).AnyTimes()
invoker.EXPECT().Invoke(gomock.Any()).DoAndReturn(
func(invocation protocol.Invocation) protocol.Result {
return &protocol.RPCResult{
Attrs: map[string]string{constant.ZONE_KEY: zoneValue},
Rest: rest{tried: 0, success: true}}
})
invokers = append(invokers, invoker)
}
zoneAwareCluster := NewZoneAwareCluster()
staticDir := directory.NewStaticDirectory(invokers)
clusterInvoker := zoneAwareCluster.Join(staticDir)
inv := &invocation.RPCInvocation{}
// zone hangzhou
hz := zoneArray[0]
inv.SetAttachments(constant.REGISTRY_KEY+"."+constant.ZONE_KEY, hz)
result := clusterInvoker.Invoke(context.Background(), inv)
assert.Equal(t, hz, result.Attachment(constant.ZONE_KEY, ""))
}
func TestZoneWareInvokerWithZoneForceFail(t *testing.T) {
ctrl := gomock.NewController(t)
// In Go versions 1.14+, if you pass a *testing.T
// into gomock.NewController(t) you no longer need to call ctrl.Finish().
//defer ctrl.Finish()
var invokers []protocol.Invoker
for i := 0; i < 2; i++ {
url, _ := common.NewURL(fmt.Sprintf("dubbo://192.168.1.%v:20000/com.ikurento.user.UserProvider", i))
invoker := mock.NewMockInvoker(ctrl)
invoker.EXPECT().IsAvailable().Return(true).AnyTimes()
invoker.EXPECT().GetUrl().Return(url).AnyTimes()
invokers = append(invokers, invoker)
}
zoneAwareCluster := NewZoneAwareCluster()
staticDir := directory.NewStaticDirectory(invokers)
clusterInvoker := zoneAwareCluster.Join(staticDir)
inv := &invocation.RPCInvocation{}
// zone hangzhou
inv.SetAttachments(constant.REGISTRY_KEY+"."+constant.ZONE_KEY, "hangzhou")
// zone force
inv.SetAttachments(constant.REGISTRY_KEY+"."+constant.ZONE_FORCE_KEY, "true")
result := clusterInvoker.Invoke(context.Background(), inv)
assert.NotNil(t, result.Error())
}
......@@ -15,42 +15,25 @@
* limitations under the License.
*/
package cluster_impl
package cluster
import (
"context"
)
import (
"github.com/apache/dubbo-go/cluster"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/protocol"
)
type registryAwareClusterInvoker struct {
baseClusterInvoker
}
func newRegistryAwareClusterInvoker(directory cluster.Directory) protocol.Invoker {
return &registryAwareClusterInvoker{
baseClusterInvoker: newBaseClusterInvoker(directory),
}
}
// ClusterInterceptor
// Extension - ClusterInterceptor
type ClusterInterceptor interface {
// Before DoInvoke method
BeforeInvoker(ctx context.Context, invocation protocol.Invocation)
// nolint
func (invoker *registryAwareClusterInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result {
invokers := invoker.directory.List(invocation)
//First, pick the invoker (XXXClusterInvoker) that comes from the local registry, distinguish by a 'default' key.
for _, invoker := range invokers {
if invoker.IsAvailable() && invoker.GetUrl().GetParam(constant.REGISTRY_DEFAULT_KEY, "false") == "true" {
return invoker.Invoke(ctx, invocation)
}
}
// After DoInvoke method
AfterInvoker(ctx context.Context, invocation protocol.Invocation)
//If none of the invokers has a local signal, pick the first one available.
for _, invoker := range invokers {
if invoker.IsAvailable() {
return invoker.Invoke(ctx, invocation)
}
}
return nil
// Corresponding cluster invoke
DoInvoke(ctx context.Context, invocation protocol.Invocation) protocol.Result
}
......@@ -85,15 +85,21 @@ func (dir *BaseDirectory) SetRouters(urls []*common.URL) {
for _, url := range urls {
routerKey := url.GetParam(constant.ROUTER_KEY, "")
if len(routerKey) > 0 {
factory := extension.GetRouterFactory(url.Protocol)
r, err := factory.NewPriorityRouter(url)
if err != nil {
logger.Errorf("Create router fail. router key: %s, url:%s, error: %+v", routerKey, url.Service(), err)
return
if len(routerKey) == 0 {
continue
}
if url.Protocol == constant.CONDITION_ROUTE_PROTOCOL {
if !dir.isProperRouter(url) {
continue
}
routers = append(routers, r)
}
factory := extension.GetRouterFactory(url.Protocol)
r, err := factory.NewPriorityRouter(url)
if err != nil {
logger.Errorf("Create router fail. router key: %s, url:%s, error: %+v", routerKey, url.Service(), err)
return
}
routers = append(routers, r)
}
logger.Infof("Init file condition router success, size: %v", len(routers))
......@@ -104,6 +110,25 @@ func (dir *BaseDirectory) SetRouters(urls []*common.URL) {
rc.AddRouters(routers)
}
func (dir *BaseDirectory) isProperRouter(url *common.URL) bool {
app := url.GetParam(constant.APPLICATION_KEY, "")
dirApp := dir.GetUrl().GetParam(constant.APPLICATION_KEY, "")
if len(dirApp) == 0 && dir.GetUrl().SubURL != nil {
dirApp = dir.GetUrl().SubURL.GetParam(constant.APPLICATION_KEY, "")
}
serviceKey := dir.GetUrl().ServiceKey()
if len(serviceKey) == 0 {
serviceKey = dir.GetUrl().SubURL.ServiceKey()
}
if len(app) > 0 && app == dirApp {
return true
}
if url.ServiceKey() == serviceKey {
return true
}
return false
}
// Destroy Destroy
func (dir *BaseDirectory) Destroy(doDestroy func()) {
if dir.destroyed.CAS(false, true) {
......
......@@ -37,7 +37,7 @@ import (
var (
url, _ = common.NewURL(
fmt.Sprintf("dubbo://%s:%d/com.ikurento.user.UserProvider", constant.LOCAL_HOST_VALUE, constant.DEFAULT_PORT))
anyUrl, _ = common.NewURL(fmt.Sprintf("condition://%s/com.foo.BarService", constant.ANYHOST_VALUE))
anyURL, _ = common.NewURL(fmt.Sprintf("condition://%s/com.foo.BarService", constant.ANYHOST_VALUE))
)
func TestNewBaseDirectory(t *testing.T) {
......@@ -48,13 +48,17 @@ func TestNewBaseDirectory(t *testing.T) {
}
func TestBuildRouterChain(t *testing.T) {
directory := NewBaseDirectory(&url)
regURL := url
regURL.AddParam(constant.INTERFACE_KEY, "mock-app")
directory := NewBaseDirectory(&regURL)
assert.NotNil(t, directory)
localIP, _ := gxnet.GetLocalIP()
rule := base64.URLEncoding.EncodeToString([]byte("true => " + " host = " + localIP))
routeURL := getRouteUrl(rule)
routeURL := getRouteURL(rule, anyURL)
routeURL.AddParam(constant.INTERFACE_KEY, "mock-app")
routerURLs := make([]*common.URL, 0)
routerURLs = append(routerURLs, routeURL)
directory.SetRouters(routerURLs)
......@@ -63,9 +67,53 @@ func TestBuildRouterChain(t *testing.T) {
assert.NotNil(t, chain)
}
func getRouteUrl(rule string) *common.URL {
anyUrl.AddParam("rule", rule)
anyUrl.AddParam("force", "true")
anyUrl.AddParam(constant.ROUTER_KEY, "router")
return &url
func getRouteURL(rule string, u common.URL) *common.URL {
ru := u
ru.AddParam("rule", rule)
ru.AddParam("force", "true")
ru.AddParam(constant.ROUTER_KEY, "router")
return &ru
}
func TestIsProperRouter(t *testing.T) {
regURL := url
regURL.AddParam(constant.APPLICATION_KEY, "mock-app")
d := NewBaseDirectory(&regURL)
localIP, _ := gxnet.GetLocalIP()
rule := base64.URLEncoding.EncodeToString([]byte("true => " + " host = " + localIP))
routeURL := getRouteURL(rule, anyURL)
routeURL.AddParam(constant.APPLICATION_KEY, "mock-app")
rst := d.isProperRouter(routeURL)
assert.True(t, rst)
regURL.AddParam(constant.APPLICATION_KEY, "")
regURL.AddParam(constant.INTERFACE_KEY, "com.foo.BarService")
d = NewBaseDirectory(&regURL)
routeURL = getRouteURL(rule, anyURL)
routeURL.AddParam(constant.INTERFACE_KEY, "com.foo.BarService")
rst = d.isProperRouter(routeURL)
assert.True(t, rst)
regURL.AddParam(constant.APPLICATION_KEY, "")
regURL.AddParam(constant.INTERFACE_KEY, "")
d = NewBaseDirectory(&regURL)
routeURL = getRouteURL(rule, anyURL)
rst = d.isProperRouter(routeURL)
assert.True(t, rst)
regURL.SetParam(constant.APPLICATION_KEY, "")
regURL.SetParam(constant.INTERFACE_KEY, "")
d = NewBaseDirectory(&regURL)
routeURL = getRouteURL(rule, anyURL)
routeURL.AddParam(constant.APPLICATION_KEY, "mock-service")
rst = d.isProperRouter(routeURL)
assert.False(t, rst)
regURL.SetParam(constant.APPLICATION_KEY, "")
regURL.SetParam(constant.INTERFACE_KEY, "")
d = NewBaseDirectory(&regURL)
routeURL = getRouteURL(rule, anyURL)
routeURL.AddParam(constant.INTERFACE_KEY, "mock-service")
rst = d.isProperRouter(routeURL)
assert.False(t, rst)
}
......@@ -39,10 +39,13 @@ func NewStaticDirectory(invokers []protocol.Invoker) *staticDirectory {
if len(invokers) > 0 {
url = invokers[0].GetUrl()
}
return &staticDirectory{
dir := &staticDirectory{
BaseDirectory: NewBaseDirectory(&url),
invokers: invokers,
}
dir.routerChain.SetInvokers(invokers)
return dir
}
//for-loop invokers ,if all invokers is available ,then it means directory is available
......@@ -69,7 +72,7 @@ func (dir *staticDirectory) List(invocation protocol.Invocation) []protocol.Invo
return invokers
}
dirUrl := dir.GetUrl()
return routerChain.Route(invokers, &dirUrl, invocation)
return routerChain.Route(&dirUrl, invocation)
}
// Destroy Destroy
......@@ -92,6 +95,7 @@ func (dir *staticDirectory) BuildRouterChain(invokers []protocol.Invoker) error
if e != nil {
return e
}
routerChain.SetInvokers(dir.invokers)
dir.SetRouterChain(routerChain)
return nil
}
......@@ -42,7 +42,7 @@ const (
ConsistentHash = "consistenthash"
// HashNodes hash nodes
HashNodes = "hash.nodes"
// HashArguments key of hash arguments in url
// HashArguments key of hash arguments in url
HashArguments = "hash.arguments"
)
......
......@@ -28,23 +28,35 @@ import (
// GetWeight gets weight for load balance strategy
func GetWeight(invoker protocol.Invoker, invocation protocol.Invocation) int64 {
var weight int64
url := invoker.GetUrl()
weight := url.GetMethodParamInt64(invocation.MethodName(), constant.WEIGHT_KEY, constant.DEFAULT_WEIGHT)
// Multiple registry scenario, load balance among multiple registries.
isRegIvk := url.GetParamBool(constant.REGISTRY_KEY+"."+constant.REGISTRY_LABEL_KEY, false)
if isRegIvk {
weight = url.GetParamInt(constant.REGISTRY_KEY+"."+constant.WEIGHT_KEY, constant.DEFAULT_WEIGHT)
} else {
weight = url.GetMethodParamInt64(invocation.MethodName(), constant.WEIGHT_KEY, constant.DEFAULT_WEIGHT)
if weight > 0 {
//get service register time an do warm up time
now := time.Now().Unix()
timestamp := url.GetParamInt(constant.REMOTE_TIMESTAMP_KEY, now)
if uptime := now - timestamp; uptime > 0 {
warmup := url.GetParamInt(constant.WARMUP_KEY, constant.DEFAULT_WARMUP)
if uptime < warmup {
if ww := float64(uptime) / float64(warmup) / float64(weight); ww < 1 {
weight = 1
} else if int64(ww) <= weight {
weight = int64(ww)
if weight > 0 {
//get service register time an do warm up time
now := time.Now().Unix()
timestamp := url.GetParamInt(constant.REMOTE_TIMESTAMP_KEY, now)
if uptime := now - timestamp; uptime > 0 {
warmup := url.GetParamInt(constant.WARMUP_KEY, constant.DEFAULT_WARMUP)
if uptime < warmup {
if ww := float64(uptime) / float64(warmup) / float64(weight); ww < 1 {
weight = 1
} else if int64(ww) <= weight {
weight = int64(ww)
}
}
}
}
}
if weight < 0 {
weight = 0
}
return weight
}
......@@ -15,57 +15,18 @@
* limitations under the License.
*/
package cluster_impl
package router
import (
"context"
"fmt"
"testing"
)
import (
"github.com/stretchr/testify/assert"
)
import (
"github.com/apache/dubbo-go/cluster/directory"
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/protocol"
"github.com/apache/dubbo-go/protocol/invocation"
)
func TestRegAwareInvokeSuccess(t *testing.T) {
regAwareCluster := NewRegistryAwareCluster()
invokers := []protocol.Invoker{}
for i := 0; i < 10; i++ {
url, _ := common.NewURL(fmt.Sprintf("dubbo://192.168.1.%v:20000/com.ikurento.user.UserProvider", i))
invokers = append(invokers, NewMockInvoker(url, 1))
}
staticDir := directory.NewStaticDirectory(invokers)
clusterInvoker := regAwareCluster.Join(staticDir)
result := clusterInvoker.Invoke(context.Background(), &invocation.RPCInvocation{})
assert.NoError(t, result.Error())
count = 0
}
func TestDestroy(t *testing.T) {
regAwareCluster := NewRegistryAwareCluster()
invokers := []protocol.Invoker{}
for i := 0; i < 10; i++ {
url, _ := common.NewURL(fmt.Sprintf("dubbo://192.168.1.%v:20000/com.ikurento.user.UserProvider", i))
invokers = append(invokers, NewMockInvoker(url, 1))
}
staticDir := directory.NewStaticDirectory(invokers)
clusterInvoker := regAwareCluster.Join(staticDir)
assert.Equal(t, true, clusterInvoker.IsAvailable())
result := clusterInvoker.Invoke(context.Background(), &invocation.RPCInvocation{})
assert.NoError(t, result.Error())
count = 0
clusterInvoker.Destroy()
assert.Equal(t, false, clusterInvoker.IsAvailable())
// Chain
type Chain interface {
Route(*common.URL, protocol.Invocation) []protocol.Invoker
// Refresh invokers
SetInvokers([]protocol.Invoker)
// AddRouters Add routers
AddRouters([]PriorityRouter)
}
......@@ -18,9 +18,10 @@
package chain
import (
"math"
"sort"
"sync"
"sync/atomic"
"time"
)
import (
......@@ -30,11 +31,18 @@ import (
import (
"github.com/apache/dubbo-go/cluster/router"
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/protocol"
)
const (
timeInterval = 5 * time.Second
timeThreshold = 2 * time.Second
countThreshold = 5
)
// RouterChain Router chain
type RouterChain struct {
// Full list of addresses from registry, classified by method name.
......@@ -48,20 +56,37 @@ type RouterChain struct {
mutex sync.RWMutex
url common.URL
// The times of address notification since last update for address cache
count int64
// The timestamp of last update for address cache
last time.Time
// Channel for notify to update the address cache
notify chan struct{}
// Address cache
cache atomic.Value
}
// Route Loop routers in RouterChain and call Route method to determine the target invokers list.
func (c *RouterChain) Route(invoker []protocol.Invoker, url *common.URL, invocation protocol.Invocation) []protocol.Invoker {
finalInvokers := invoker
l := len(c.routers)
rs := make([]router.PriorityRouter, l, int(math.Ceil(float64(l)*1.2)))
c.mutex.RLock()
copy(rs, c.routers)
c.mutex.RUnlock()
func (c *RouterChain) Route(url *common.URL, invocation protocol.Invocation) []protocol.Invoker {
cache := c.loadCache()
if cache == nil {
c.mutex.RLock()
defer c.mutex.RUnlock()
return c.invokers
}
bitmap := cache.bitmap
for _, r := range c.copyRouters() {
bitmap = r.Route(bitmap, cache, url, invocation)
}
for _, r := range rs {
finalInvokers = r.Route(finalInvokers, url, invocation)
indexes := bitmap.ToArray()
finalInvokers := make([]protocol.Invoker, len(indexes))
for i, index := range indexes {
finalInvokers[i] = cache.invokers[index]
}
return finalInvokers
}
......@@ -79,6 +104,116 @@ func (c *RouterChain) AddRouters(routers []router.PriorityRouter) {
c.routers = newRouters
}
// SetInvokers receives updated invokers from registry center. If the times of notification exceeds countThreshold and
// time interval exceeds timeThreshold since last cache update, then notify to update the cache.
func (c *RouterChain) SetInvokers(invokers []protocol.Invoker) {
c.mutex.Lock()
c.invokers = invokers
c.mutex.Unlock()
c.count++
now := time.Now()
if c.count >= countThreshold && now.Sub(c.last) >= timeThreshold {
c.last = now
c.count = 0
go func() {
c.notify <- struct{}{}
}()
}
}
// loop listens on events to update the address cache when it's necessary, either when it receives notification
// from address update, or when timeInterval exceeds.
func (c *RouterChain) loop() {
for {
ticker := time.NewTicker(timeInterval)
select {
case <-ticker.C:
c.buildCache()
case <-c.notify:
c.buildCache()
}
}
}
// copyRouters make a snapshot copy from RouterChain's router list.
func (c *RouterChain) copyRouters() []router.PriorityRouter {
c.mutex.RLock()
defer c.mutex.RUnlock()
ret := make([]router.PriorityRouter, 0, len(c.routers))
ret = append(ret, c.routers...)
return ret
}
// copyInvokers copies a snapshot of the received invokers.
func (c *RouterChain) copyInvokers() []protocol.Invoker {
c.mutex.RLock()
defer c.mutex.RUnlock()
if c.invokers == nil || len(c.invokers) == 0 {
return nil
}
ret := make([]protocol.Invoker, 0, len(c.invokers))
ret = append(ret, c.invokers...)
return ret
}
// loadCache loads cache from sync.Value to guarantee the visibility
func (c *RouterChain) loadCache() *InvokerCache {
v := c.cache.Load()
if v == nil {
return nil
}
return v.(*InvokerCache)
}
// copyInvokerIfNecessary compares chain's invokers copy and cache's invokers copy, to avoid copy as much as possible
func (c *RouterChain) copyInvokerIfNecessary(cache *InvokerCache) []protocol.Invoker {
var invokers []protocol.Invoker
if cache != nil {
invokers = cache.invokers
}
c.mutex.RLock()
defer c.mutex.RUnlock()
if isInvokersChanged(invokers, c.invokers) {
invokers = c.copyInvokers()
}
return invokers
}
// buildCache builds address cache with the new invokers for all poolable routers.
func (c *RouterChain) buildCache() {
origin := c.loadCache()
invokers := c.copyInvokerIfNecessary(origin)
if invokers == nil || len(invokers) == 0 {
return
}
var (
mutex sync.Mutex
wg sync.WaitGroup
)
cache := BuildCache(invokers)
for _, r := range c.copyRouters() {
if p, ok := r.(router.Poolable); ok {
wg.Add(1)
go func(p router.Poolable) {
defer wg.Done()
pool, info := poolRouter(p, origin, invokers)
mutex.Lock()
defer mutex.Unlock()
cache.pools[p.Name()] = pool
cache.metadatas[p.Name()] = info
}(p)
}
}
wg.Wait()
c.cache.Store(cache)
}
// URL Return URL in RouterChain
func (c *RouterChain) URL() common.URL {
return c.url
......@@ -109,14 +244,62 @@ func NewRouterChain(url *common.URL) (*RouterChain, error) {
chain := &RouterChain{
builtinRouters: routers,
routers: newRouters,
last: time.Now(),
notify: make(chan struct{}),
}
if url != nil {
chain.url = *url
}
go chain.loop()
return chain, nil
}
// poolRouter calls poolable router's Pool() to create new address pool and address metadata if necessary.
// If the corresponding cache entry exists, and the poolable router answers no need to re-pool (possibly because its
// rule doesn't change), and the address list doesn't change, then the existing data will be re-used.
func poolRouter(p router.Poolable, origin *InvokerCache, invokers []protocol.Invoker) (router.AddrPool, router.AddrMetadata) {
name := p.Name()
if isCacheMiss(origin, name) || p.ShouldPool() || &(origin.invokers) != &invokers {
logger.Debugf("build address cache for router %q", name)
return p.Pool(invokers)
}
logger.Debugf("reuse existing address cache for router %q", name)
return origin.pools[name], origin.metadatas[name]
}
// isCacheMiss checks if the corresponding cache entry for a poolable router has already existed.
// False returns when the cache is nil, or cache's pool is nil, or cache's invokers snapshot is nil, or the entry
// doesn't exist.
func isCacheMiss(cache *InvokerCache, key string) bool {
if cache == nil || cache.pools == nil || cache.invokers == nil || cache.pools[key] == nil {
return true
}
return false
}
// isInvokersChanged compares new invokers on the right changes, compared with the old invokers on the left.
func isInvokersChanged(left []protocol.Invoker, right []protocol.Invoker) bool {
if len(right) != len(left) {
return true
}
for _, r := range right {
found := false
for _, l := range left {
if common.IsEquals(l.GetUrl(), r.GetUrl(), constant.TIMESTAMP_KEY, constant.REMOTE_TIMESTAMP_KEY) {
found = true
break
}
}
if !found {
return true
}
}
return false
}
// sortRouter Sort router instance by priority with stable algorithm
func sortRouter(routers []router.PriorityRouter) {
sort.Stable(byPriority(routers))
......
......@@ -66,7 +66,9 @@ func TestNewRouterChain(t *testing.T) {
err = z.Create(path)
assert.NoError(t, err)
testyml := `enabled: true
testyml := `scope: application
key: mock-app
enabled: true
force: true
runtime: false
conditions:
......@@ -93,7 +95,7 @@ conditions:
assert.NotNil(t, appRouter)
assert.NotNil(t, appRouter.RouterRule())
rule := appRouter.RouterRule()
assert.Equal(t, "", rule.Scope)
assert.Equal(t, "application", rule.Scope)
assert.True(t, rule.Force)
assert.True(t, rule.Enabled)
assert.True(t, rule.Valid)
......@@ -101,7 +103,7 @@ conditions:
assert.Equal(t, testyml, rule.RawRule)
assert.Equal(t, false, rule.Runtime)
assert.Equal(t, false, rule.Dynamic)
assert.Equal(t, "", rule.Key)
assert.Equal(t, "mock-app", rule.Key)
}
func TestNewRouterChainURLNil(t *testing.T) {
......@@ -116,7 +118,9 @@ func TestRouterChainAddRouters(t *testing.T) {
err = z.Create(path)
assert.NoError(t, err)
testyml := `enabled: true
testyml := `scope: application
key: mock-app
enabled: true
force: true
runtime: false
conditions:
......@@ -168,10 +172,12 @@ func TestRouterChainRoute(t *testing.T) {
invokers := []protocol.Invoker{}
dubboURL, _ := common.NewURL(fmt.Sprintf(dubboForamt, test1234IP, port20000))
invokers = append(invokers, protocol.NewBaseInvoker(dubboURL))
chain.SetInvokers(invokers)
chain.buildCache()
targetURL, _ := common.NewURL(fmt.Sprintf(consumerFormat, test1111IP))
inv := &invocation.RPCInvocation{}
finalInvokers := chain.Route(invokers, &targetURL, inv)
finalInvokers := chain.Route(&targetURL, inv)
assert.Equal(t, 1, len(finalInvokers))
}
......@@ -182,7 +188,9 @@ func TestRouterChainRouteAppRouter(t *testing.T) {
err = z.Create(path)
assert.NoError(t, err)
testyml := `enabled: true
testyml := `scope: application
key: mock-app
enabled: true
force: true
runtime: false
conditions:
......@@ -205,10 +213,12 @@ conditions:
invokers := []protocol.Invoker{}
dubboURL, _ := common.NewURL(fmt.Sprintf(dubboForamt, test1234IP, port20000))
invokers = append(invokers, protocol.NewBaseInvoker(dubboURL))
chain.SetInvokers(invokers)
chain.buildCache()
targetURL, _ := common.NewURL(fmt.Sprintf(consumerFormat, test1111IP))
inv := &invocation.RPCInvocation{}
finalInvokers := chain.Route(invokers, &targetURL, inv)
finalInvokers := chain.Route(&targetURL, inv)
assert.Equal(t, 0, len(finalInvokers))
}
......@@ -232,10 +242,12 @@ func TestRouterChainRouteNoRoute(t *testing.T) {
invokers := []protocol.Invoker{}
dubboURL, _ := common.NewURL(fmt.Sprintf(dubboForamt, test1234IP, port20000))
invokers = append(invokers, protocol.NewBaseInvoker(dubboURL))
chain.SetInvokers(invokers)
chain.buildCache()
targetURL, _ := common.NewURL(fmt.Sprintf(consumerFormat, test1111IP))
inv := &invocation.RPCInvocation{}
finalInvokers := chain.Route(invokers, &targetURL, inv)
finalInvokers := chain.Route(&targetURL, inv)
assert.Equal(t, 0, len(finalInvokers))
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package chain
import (
"github.com/RoaringBitmap/roaring"
)
import (
"github.com/apache/dubbo-go/cluster/router"
"github.com/apache/dubbo-go/cluster/router/utils"
"github.com/apache/dubbo-go/protocol"
)
// Cache caches all addresses relevant info for a snapshot of received invokers. It keeps a snapshot of the received
// address list, and also keeps address pools and address metadata from routers based on the same address snapshot, if
// the router implements Poolable.
type InvokerCache struct {
// The snapshot of invokers
invokers []protocol.Invoker
// The bitmap representation for invokers snapshot
bitmap *roaring.Bitmap
// Address pool from routers which implement Poolable
pools map[string]router.AddrPool
// Address metadata from routers which implement Poolable
metadatas map[string]router.AddrMetadata
}
// BuildCache builds address cache from the given invokers.
func BuildCache(invokers []protocol.Invoker) *InvokerCache {
return &InvokerCache{
invokers: invokers,
bitmap: utils.ToBitmap(invokers),
pools: make(map[string]router.AddrPool, 8),
metadatas: make(map[string]router.AddrMetadata, 8),
}
}
// GetInvokers get invokers snapshot.
func (c *InvokerCache) GetInvokers() []protocol.Invoker {
return c.invokers
}
// FindAddrPool finds address pool for a poolable router.
func (c *InvokerCache) FindAddrPool(p router.Poolable) router.AddrPool {
return c.pools[p.Name()]
}
// FindAddrMeta finds address metadata for a poolable router.
func (c *InvokerCache) FindAddrMeta(p router.Poolable) router.AddrMetadata {
return c.metadatas[p.Name()]
}
// SetAddrPool sets address pool for a poolable router, for unit test only
func (c *InvokerCache) SetAddrPool(name string, pool router.AddrPool) {
c.pools[name] = pool
}
// SetAddrMeta sets address metadata for a poolable router, for unit test only
func (c *InvokerCache) SetAddrMeta(name string, meta router.AddrMetadata) {
c.metadatas[name] = meta
}
......@@ -24,7 +24,6 @@ import (
)
import (
_ "github.com/apache/dubbo-go/config_center/zookeeper"
"github.com/stretchr/testify/assert"
)
......@@ -34,6 +33,7 @@ import (
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/config_center"
_ "github.com/apache/dubbo-go/config_center/zookeeper"
"github.com/apache/dubbo-go/remoting"
"github.com/apache/dubbo-go/remoting/zookeeper"
)
......@@ -52,7 +52,9 @@ var (
func TestNewAppRouter(t *testing.T) {
testYML := `enabled: true
testYML := `scope: application
key: mock-app
enabled: true
force: true
runtime: false
conditions:
......@@ -83,7 +85,7 @@ conditions:
assert.NotNil(t, appRouter)
assert.NotNil(t, appRouter.RouterRule())
rule := appRouter.RouterRule()
assert.Equal(t, "", rule.Scope)
assert.Equal(t, "application", rule.Scope)
assert.True(t, rule.Force)
assert.True(t, rule.Enabled)
assert.True(t, rule.Valid)
......@@ -91,13 +93,15 @@ conditions:
assert.Equal(t, testYML, rule.RawRule)
assert.Equal(t, false, rule.Runtime)
assert.Equal(t, false, rule.Dynamic)
assert.Equal(t, "", rule.Key)
assert.Equal(t, "mock-app", rule.Key)
assert.Equal(t, 0, rule.Priority)
}
func TestGenerateConditions(t *testing.T) {
testYML := `enabled: true
testYML := `scope: application
key: mock-app
enabled: true
force: true
runtime: false
conditions:
......@@ -135,7 +139,9 @@ conditions:
func TestProcess(t *testing.T) {
testYML := `enabled: true
testYML := `scope: application
key: mock-app
enabled: true
force: true
runtime: false
conditions:
......@@ -165,7 +171,8 @@ conditions:
assert.Equal(t, 1, len(appRouter.conditionRouters))
testNewYML := `
testNewYML := `scope: application
key: mock-app
enabled: true
force: true
runtime: false
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment