Skip to content
Snippets Groups Projects
Commit 4769e666 authored by fangyincheng's avatar fangyincheng
Browse files

Mrg:merge upstream

parents 1d51d40b 9abe968e
No related branches found
No related tags found
No related merge requests found
Showing
with 1140 additions and 50 deletions
......@@ -17,9 +17,10 @@ coverage.txt
target/
classes
# Gopkg.lock
# go mod, go test
vendor/
coverage.txt
logs/
.vscode/
coverage.txt
......@@ -10,6 +10,10 @@ install: true
script:
- go fmt ./... && [[ -z `git status -s` ]]
- mkdir -p remoting/zookeeper/zookeeper-4unittest/contrib/fatjar config_center/zookeeper/zookeeper-4unittest/contrib/fatjar registry/zookeeper/zookeeper-4unittest/contrib/fatjar
- wget -P "remoting/zookeeper/zookeeper-4unittest/contrib/fatjar" https://github.com/dubbogo/resources/raw/master/zookeeper-4unitest/contrib/fatjar/zookeeper-3.4.9-fatjar.jar
- cp remoting/zookeeper/zookeeper-4unittest/contrib/fatjar/zookeeper-3.4.9-fatjar.jar config_center/zookeeper/zookeeper-4unittest/contrib/fatjar/
- cp remoting/zookeeper/zookeeper-4unittest/contrib/fatjar/zookeeper-3.4.9-fatjar.jar registry/zookeeper/zookeeper-4unittest/contrib/fatjar/
- go mod vendor && go test ./... -coverprofile=coverage.txt -covermode=atomic
after_success:
......
......@@ -176,8 +176,6 @@
END OF TERMS AND CONDITIONS
Copyright (c) 2016 ~ 2018 Alex Stocks.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
......
NOTICE 0 → 100644
Apache Dubbo Go
Copyright 2018-2019 The Apache Software Foundation
This product includes software developed at
The Apache Software Foundation (http://www.apache.org/).
......@@ -6,6 +6,10 @@
---
Apache Dubbo Go Implementation.
![Apache Dubbo-go](./dubbogo.png "Apache Dubbo-go")
Apache/Dubbo-go image, licensed under [Creative Commons 3.0 Attributions license](https://creativecommons.org/licenses/by/3.0/).
## License
Apache License, Version 2.0
......@@ -29,18 +33,18 @@ Finished List:
- Role: Consumer, Provider
- Transport: HTTP, TCP
- Codec: JsonRPC v2, Hessian v2
- Registry: ZooKeeper
- Registry: ZooKeeper/[etcd v3](https://github.com/apache/dubbo-go/pull/148)/[nacos](https://github.com/apache/dubbo-go/pull/151)/[consul](https://github.com/apache/dubbo-go/pull/121)
- Configure Center: Zookeeper
- Cluster Strategy: Failover
- Load Balance: Random, RoundRobin, LeastActive
- Filter: Echo Health Check
- Cluster Strategy: Failover/[Failfast](https://github.com/apache/dubbo-go/pull/140)/[Failsafe/Failback](https://github.com/apache/dubbo-go/pull/136)/[Available](https://github.com/apache/dubbo-go/pull/155)/[Broadcast](https://github.com/apache/dubbo-go/pull/158)/[Forking](https://github.com/apache/dubbo-go/pull/161)
- Load Balance: Random/[RoundRobin](https://github.com/apache/dubbo-go/pull/66)/[LeastActive](https://github.com/apache/dubbo-go/pull/65)
- Filter: Echo Health Check/[Circuit break and service downgrade](https://github.com/apache/dubbo-go/pull/133)
- Other feature: [generic invoke](https://github.com/apache/dubbo-go/pull/122)/start check/connecting certain provider/multi-protocols/multi-registries/multi-versions/service group
Working List:
- Cluster Strategy: Failfast/Failsafe/Failback/Forking
- Load Balance: ConsistentHash
- Filter: TokenFilter/AccessLogFilter/CountFilter/ActiveLimitFilter/ExecuteLimitFilter/GenericFilter/TpsLimitFilter
- Registry: etcd/k8s/consul/nacos
- Filter: TokenFilter/AccessLogFilter/CountFilter/ExecuteLimitFilter/TpsLimitFilter
- Registry: k8s
- Configure Center: apollo
- Dynamic Configuration Center & Metadata Center (dubbo v2.7.x)
- Metrics: Promethus(dubbo v2.7.x)
......@@ -55,7 +59,7 @@ You can know more about dubbo-go by its [roadmap](https://github.com/apache/dubb
## Document
Move [here](https://dubbogo.github.io/dubbo-go-website/)
TODO
## Quick Start
......
......@@ -5,6 +5,9 @@
---
Apache Dubbo Go 语言实现
![Apache Dubbo-go](./dubbogo.png "Apache Dubbo-go")
Apache/Dubbo-go image, licensed under [Creative Commons 3.0 Attributions license](https://creativecommons.org/licenses/by/3.0/).
## 证书 ##
......@@ -26,29 +29,29 @@ Apache License, Version 2.0
实现列表:
- Role: Consumer, Provider
- Transport: HTTP, TCP
- Codec: JsonRPC v2, Hessian v2
- Registry: ZooKeeper
- Routing: Rule(dubbo v2.6.x)
- Configure Center: Zookeeper
- Cluster Strategy: Failover
- Load Balance: Random, RoundRobin, LeastActive
- Filter: Echo Health Check
- 角色端: Consumer, Provider
- 传输协议: HTTP, TCP
- 序列化协议: JsonRPC v2, Hessian v2
- 注册中心: ZooKeeper/[etcd v3](https://github.com/apache/dubbo-go/pull/148)/[nacos](https://github.com/apache/dubbo-go/pull/151)/[consul](https://github.com/apache/dubbo-go/pull/121)
- 配置中心: Zookeeper
- 集群策略: Failover/[Failfast](https://github.com/apache/dubbo-go/pull/140)/[Failsafe/Failback](https://github.com/apache/dubbo-go/pull/136)/[Available](https://github.com/apache/dubbo-go/pull/155)/[Broadcast](https://github.com/apache/dubbo-go/pull/158)/[Forking](https://github.com/apache/dubbo-go/pull/161)
- 负载均衡策略: Random/[RoundRobin](https://github.com/apache/dubbo-go/pull/66)/[LeastActive](https://github.com/apache/dubbo-go/pull/65)
- 过滤器: Echo Health Check/[服务熔断&降级](https://github.com/apache/dubbo-go/pull/133)
- 其他功能支持: [泛化调用](https://github.com/apache/dubbo-go/pull/122)/启动时检查/服务直连/多服务协议/多注册中心/多服务版本/服务分组
开发中列表:
- Cluster Strategy: Failfast/Failsafe/Failback/Forking
- Load Balance: ConsistentHash
- Filter: TokenFilter/AccessLogFilter/CountFilter/ActiveLimitFilter/ExecuteLimitFilter/GenericFilter/TpsLimitFilter
- Registry: etcd/k8s/consul/nacos
- Configure Center: apollo
- Dynamic Configuration Center & Metadata Center (dubbo v2.7.x)
- 集群策略: Forking
- 负载均衡策略: ConsistentHash
- 过滤器: TokenFilter/AccessLogFilter/CountFilter/ExecuteLimitFilter/TpsLimitFilter
- 注册中心: k8s
- 配置中心: apollo
- 动态配置中心 & 元数据中心 (dubbo v2.7.x)
- Metrics: Promethus(dubbo v2.7.x)
任务列表:
- Registry: kubernetes
- 注册中心: kubernetes
- Routing: istio
- tracing (dubbo ecosystem)
......@@ -56,7 +59,7 @@ Apache License, Version 2.0
## 文档
移步[这里](https://dubbogo.github.io/dubbo-go-website/)
TODO
## 快速开始 ##
......
/*
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cluster_impl
import (
"github.com/apache/dubbo-go/cluster"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/protocol"
)
type availableCluster struct{}
const available = "available"
func init() {
extension.SetCluster(available, NewAvailableCluster)
}
func NewAvailableCluster() cluster.Cluster {
return &availableCluster{}
}
func (cluser *availableCluster) Join(directory cluster.Directory) protocol.Invoker {
return NewAvailableClusterInvoker(directory)
}
/*
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cluster_impl
import (
"fmt"
)
import (
"github.com/pkg/errors"
)
import (
"github.com/apache/dubbo-go/cluster"
"github.com/apache/dubbo-go/protocol"
)
type availableClusterInvoker struct {
baseClusterInvoker
}
func NewAvailableClusterInvoker(directory cluster.Directory) protocol.Invoker {
return &availableClusterInvoker{
baseClusterInvoker: newBaseClusterInvoker(directory),
}
}
func (invoker *availableClusterInvoker) Invoke(invocation protocol.Invocation) protocol.Result {
invokers := invoker.directory.List(invocation)
err := invoker.checkInvokers(invokers, invocation)
if err != nil {
return &protocol.RPCResult{Err: err}
}
err = invoker.checkWhetherDestroyed()
if err != nil {
return &protocol.RPCResult{Err: err}
}
for _, ivk := range invokers {
if ivk.IsAvailable() {
return ivk.Invoke(invocation)
}
}
return &protocol.RPCResult{Err: errors.New(fmt.Sprintf("no provider available in %v", invokers))}
}
/*
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cluster_impl
import (
"context"
"strings"
"testing"
)
import (
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
)
import (
"github.com/apache/dubbo-go/cluster/directory"
"github.com/apache/dubbo-go/cluster/loadbalance"
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/protocol"
"github.com/apache/dubbo-go/protocol/invocation"
"github.com/apache/dubbo-go/protocol/mock"
)
var (
availableUrl, _ = common.NewURL(context.Background(), "dubbo://192.168.1.1:20000/com.ikurento.user.UserProvider")
)
func registerAvailable(t *testing.T, invoker *mock.MockInvoker) protocol.Invoker {
extension.SetLoadbalance("random", loadbalance.NewRandomLoadBalance)
availableCluster := NewAvailableCluster()
invokers := []protocol.Invoker{}
invokers = append(invokers, invoker)
invoker.EXPECT().GetUrl().Return(availableUrl)
staticDir := directory.NewStaticDirectory(invokers)
clusterInvoker := availableCluster.Join(staticDir)
return clusterInvoker
}
func TestAvailableClusterInvokerSuccess(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
invoker := mock.NewMockInvoker(ctrl)
clusterInvoker := registerAvailable(t, invoker)
mockResult := &protocol.RPCResult{Rest: rest{tried: 0, success: true}}
invoker.EXPECT().IsAvailable().Return(true)
invoker.EXPECT().Invoke(gomock.Any()).Return(mockResult)
result := clusterInvoker.Invoke(&invocation.RPCInvocation{})
assert.Equal(t, mockResult, result)
}
func TestAvailableClusterInvokerNoAvail(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
invoker := mock.NewMockInvoker(ctrl)
clusterInvoker := registerAvailable(t, invoker)
invoker.EXPECT().IsAvailable().Return(false)
result := clusterInvoker.Invoke(&invocation.RPCInvocation{})
assert.NotNil(t, result.Error())
assert.True(t, strings.Contains(result.Error().Error(), "no provider available"))
assert.Nil(t, result.Result())
}
......@@ -25,9 +25,10 @@ import (
import (
"github.com/apache/dubbo-go/cluster"
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/common/utils"
"github.com/apache/dubbo-go/protocol"
"github.com/apache/dubbo-go/version"
)
type baseClusterInvoker struct {
......@@ -65,7 +66,7 @@ func (invoker *baseClusterInvoker) checkInvokers(invokers []protocol.Invoker, in
ip, _ := utils.GetLocalIP()
return perrors.Errorf("Failed to invoke the method %v. No provider available for the service %v from "+
"registry %v on the consumer %v using the dubbo version %v .Please check if the providers have been started and registered.",
invocation.MethodName(), invoker.directory.GetUrl().SubURL.Key(), invoker.directory.GetUrl().String(), ip, version.Version)
invocation.MethodName(), invoker.directory.GetUrl().SubURL.Key(), invoker.directory.GetUrl().String(), ip, constant.Version)
}
return nil
......@@ -76,13 +77,13 @@ func (invoker *baseClusterInvoker) checkWhetherDestroyed() error {
if invoker.destroyed.Load() {
ip, _ := utils.GetLocalIP()
return perrors.Errorf("Rpc cluster invoker for %v on consumer %v use dubbo version %v is now destroyed! can not invoke any more. ",
invoker.directory.GetUrl().Service(), ip, version.Version)
invoker.directory.GetUrl().Service(), ip, constant.Version)
}
return nil
}
func (invoker *baseClusterInvoker) doSelect(lb cluster.LoadBalance, invocation protocol.Invocation, invokers []protocol.Invoker, invoked []protocol.Invoker) protocol.Invoker {
//todo:ticky connect 粘纸连接
//todo:sticky connect
if len(invokers) == 1 {
return invokers[0]
}
......@@ -115,12 +116,24 @@ func (invoker *baseClusterInvoker) doSelect(lb cluster.LoadBalance, invocation p
}
func isInvoked(selectedInvoker protocol.Invoker, invoked []protocol.Invoker) bool {
if len(invoked) > 0 {
for _, i := range invoked {
if i == selectedInvoker {
return true
}
for _, i := range invoked {
if i == selectedInvoker {
return true
}
}
return false
}
func getLoadBalance(invoker protocol.Invoker, invocation protocol.Invocation) cluster.LoadBalance {
url := invoker.GetUrl()
methodName := invocation.MethodName()
//Get the service loadbalance config
lb := url.GetParam(constant.LOADBALANCE_KEY, constant.DEFAULT_LOADBALANCE)
//Get the service method loadbalance config if have
if v := url.GetMethodParam(methodName, constant.LOADBALANCE_KEY, ""); len(v) > 0 {
lb = v
}
return extension.GetLoadbalance(lb)
}
/*
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cluster_impl
import (
"github.com/apache/dubbo-go/cluster"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/protocol"
)
type broadcastCluster struct{}
const broadcast = "broadcast"
func init() {
extension.SetCluster(broadcast, NewBroadcastCluster)
}
func NewBroadcastCluster() cluster.Cluster {
return &broadcastCluster{}
}
func (cluster *broadcastCluster) Join(directory cluster.Directory) protocol.Invoker {
return newBroadcastClusterInvoker(directory)
}
/*
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cluster_impl
import (
"github.com/apache/dubbo-go/cluster"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/protocol"
)
type broadcastClusterInvoker struct {
baseClusterInvoker
}
func newBroadcastClusterInvoker(directory cluster.Directory) protocol.Invoker {
return &broadcastClusterInvoker{
baseClusterInvoker: newBaseClusterInvoker(directory),
}
}
func (invoker *broadcastClusterInvoker) Invoke(invocation protocol.Invocation) protocol.Result {
invokers := invoker.directory.List(invocation)
err := invoker.checkInvokers(invokers, invocation)
if err != nil {
return &protocol.RPCResult{Err: err}
}
err = invoker.checkWhetherDestroyed()
if err != nil {
return &protocol.RPCResult{Err: err}
}
var result protocol.Result
for _, ivk := range invokers {
result = ivk.Invoke(invocation)
if result.Error() != nil {
logger.Warnf("broadcast invoker invoke err: %v when use invoker: %v\n", result.Error(), ivk)
err = result.Error()
}
}
if err != nil {
return &protocol.RPCResult{Err: err}
}
return result
}
/*
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cluster_impl
import (
"context"
"errors"
"testing"
)
import (
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
)
import (
"github.com/apache/dubbo-go/cluster/directory"
"github.com/apache/dubbo-go/cluster/loadbalance"
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/protocol"
"github.com/apache/dubbo-go/protocol/invocation"
"github.com/apache/dubbo-go/protocol/mock"
)
var (
broadcastUrl, _ = common.NewURL(context.TODO(), "dubbo://192.168.1.1:20000/com.ikurento.user.UserProvider")
)
func registerBroadcast(t *testing.T, mockInvokers ...*mock.MockInvoker) protocol.Invoker {
extension.SetLoadbalance("random", loadbalance.NewRandomLoadBalance)
invokers := []protocol.Invoker{}
for i, ivk := range mockInvokers {
invokers = append(invokers, ivk)
if i == 0 {
ivk.EXPECT().GetUrl().Return(broadcastUrl)
}
}
staticDir := directory.NewStaticDirectory(invokers)
broadcastCluster := NewBroadcastCluster()
clusterInvoker := broadcastCluster.Join(staticDir)
return clusterInvoker
}
func Test_BroadcastInvokeSuccess(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
invokers := make([]*mock.MockInvoker, 0)
mockResult := &protocol.RPCResult{Rest: rest{tried: 0, success: true}}
for i := 0; i < 3; i++ {
invoker := mock.NewMockInvoker(ctrl)
invokers = append(invokers, invoker)
invoker.EXPECT().Invoke(gomock.Any()).Return(mockResult)
}
clusterInvoker := registerBroadcast(t, invokers...)
result := clusterInvoker.Invoke(&invocation.RPCInvocation{})
assert.Equal(t, mockResult, result)
}
func Test_BroadcastInvokeFailed(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
invokers := make([]*mock.MockInvoker, 0)
mockResult := &protocol.RPCResult{Rest: rest{tried: 0, success: true}}
mockFailedResult := &protocol.RPCResult{Err: errors.New("just failed")}
for i := 0; i < 10; i++ {
invoker := mock.NewMockInvoker(ctrl)
invokers = append(invokers, invoker)
invoker.EXPECT().Invoke(gomock.Any()).Return(mockResult)
}
{
invoker := mock.NewMockInvoker(ctrl)
invokers = append(invokers, invoker)
invoker.EXPECT().Invoke(gomock.Any()).Return(mockFailedResult)
}
for i := 0; i < 10; i++ {
invoker := mock.NewMockInvoker(ctrl)
invokers = append(invokers, invoker)
invoker.EXPECT().Invoke(gomock.Any()).Return(mockResult)
}
clusterInvoker := registerBroadcast(t, invokers...)
result := clusterInvoker.Invoke(&invocation.RPCInvocation{})
assert.Equal(t, mockFailedResult.Err, result.Error())
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cluster_impl
import (
"github.com/apache/dubbo-go/cluster"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/protocol"
)
type failbackCluster struct{}
const failback = "failback"
func init() {
extension.SetCluster(failback, NewFailbackCluster)
}
func NewFailbackCluster() cluster.Cluster {
return &failbackCluster{}
}
func (cluster *failbackCluster) Join(directory cluster.Directory) protocol.Invoker {
return newFailbackClusterInvoker(directory)
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cluster_impl
import (
"sync"
"time"
)
import (
"github.com/Workiva/go-datastructures/queue"
)
import (
"github.com/apache/dubbo-go/cluster"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/protocol"
)
/**
* When fails, record failure requests and schedule for retry on a regular interval.
* Especially useful for services of notification.
*
* <a href="http://en.wikipedia.org/wiki/Failback">Failback</a>
*/
type failbackClusterInvoker struct {
baseClusterInvoker
once sync.Once
ticker *time.Ticker
maxRetries int64
failbackTasks int64
taskList *queue.Queue
}
func newFailbackClusterInvoker(directory cluster.Directory) protocol.Invoker {
invoker := &failbackClusterInvoker{
baseClusterInvoker: newBaseClusterInvoker(directory),
}
retriesConfig := invoker.GetUrl().GetParamInt(constant.RETRIES_KEY, constant.DEFAULT_FAILBACK_TIMES)
if retriesConfig <= 0 {
retriesConfig = constant.DEFAULT_FAILBACK_TIMES
}
failbackTasksConfig := invoker.GetUrl().GetParamInt(constant.FAIL_BACK_TASKS_KEY, constant.DEFAULT_FAILBACK_TASKS)
if failbackTasksConfig <= 0 {
failbackTasksConfig = constant.DEFAULT_FAILBACK_TASKS
}
invoker.maxRetries = retriesConfig
invoker.failbackTasks = failbackTasksConfig
return invoker
}
func (invoker *failbackClusterInvoker) process() {
invoker.ticker = time.NewTicker(time.Second * 1)
for range invoker.ticker.C {
// check each timeout task and re-run
for {
value, err := invoker.taskList.Peek()
if err == queue.ErrDisposed {
return
}
if err == queue.ErrEmptyQueue {
break
}
retryTask := value.(*retryTimerTask)
if time.Since(retryTask.lastT).Seconds() < 5 {
break
}
// ignore return. the get must success.
_, err = invoker.taskList.Get(1)
if err != nil {
logger.Warnf("get task found err: %v\n", err)
break
}
go func(retryTask *retryTimerTask) {
invoked := make([]protocol.Invoker, 0)
invoked = append(invoked, retryTask.lastInvoker)
retryInvoker := invoker.doSelect(retryTask.loadbalance, retryTask.invocation, retryTask.invokers, invoked)
var result protocol.Result
result = retryInvoker.Invoke(retryTask.invocation)
if result.Error() != nil {
retryTask.lastInvoker = retryInvoker
invoker.checkRetry(retryTask, result.Error())
}
}(retryTask)
}
}
}
func (invoker *failbackClusterInvoker) checkRetry(retryTask *retryTimerTask, err error) {
logger.Errorf("Failed retry to invoke the method %v in the service %v, wait again. The exception: %v.\n",
retryTask.invocation.MethodName(), invoker.GetUrl().Service(), err.Error())
retryTask.retries++
retryTask.lastT = time.Now()
if retryTask.retries > invoker.maxRetries {
logger.Errorf("Failed retry times exceed threshold (%v), We have to abandon, invocation-> %v.\n",
retryTask.retries, retryTask.invocation)
} else {
invoker.taskList.Put(retryTask)
}
}
func (invoker *failbackClusterInvoker) Invoke(invocation protocol.Invocation) protocol.Result {
invokers := invoker.directory.List(invocation)
err := invoker.checkInvokers(invokers, invocation)
if err != nil {
logger.Errorf("Failed to invoke the method %v in the service %v, wait for retry in background. Ignored exception: %v.\n",
invocation.MethodName(), invoker.GetUrl().Service(), err)
return &protocol.RPCResult{}
}
url := invokers[0].GetUrl()
methodName := invocation.MethodName()
//Get the service loadbalance config
lb := url.GetParam(constant.LOADBALANCE_KEY, constant.DEFAULT_LOADBALANCE)
//Get the service method loadbalance config if have
if v := url.GetMethodParam(methodName, constant.LOADBALANCE_KEY, ""); v != "" {
lb = v
}
loadbalance := extension.GetLoadbalance(lb)
invoked := make([]protocol.Invoker, 0, len(invokers))
var result protocol.Result
ivk := invoker.doSelect(loadbalance, invocation, invokers, invoked)
//DO INVOKE
result = ivk.Invoke(invocation)
if result.Error() != nil {
invoker.once.Do(func() {
invoker.taskList = queue.New(invoker.failbackTasks)
go invoker.process()
})
taskLen := invoker.taskList.Len()
if taskLen >= invoker.failbackTasks {
logger.Warnf("tasklist is too full > %d.\n", taskLen)
return &protocol.RPCResult{}
}
timerTask := newRetryTimerTask(loadbalance, invocation, invokers, ivk)
invoker.taskList.Put(timerTask)
logger.Errorf("Failback to invoke the method %v in the service %v, wait for retry in background. Ignored exception: %v.\n",
methodName, url.Service(), result.Error().Error())
// ignore
return &protocol.RPCResult{}
}
return result
}
func (invoker *failbackClusterInvoker) Destroy() {
invoker.baseClusterInvoker.Destroy()
// stop ticker
if invoker.ticker != nil {
invoker.ticker.Stop()
}
_ = invoker.taskList.Dispose()
}
type retryTimerTask struct {
loadbalance cluster.LoadBalance
invocation protocol.Invocation
invokers []protocol.Invoker
lastInvoker protocol.Invoker
retries int64
lastT time.Time
}
func newRetryTimerTask(loadbalance cluster.LoadBalance, invocation protocol.Invocation, invokers []protocol.Invoker,
lastInvoker protocol.Invoker) *retryTimerTask {
return &retryTimerTask{
loadbalance: loadbalance,
invocation: invocation,
invokers: invokers,
lastInvoker: lastInvoker,
lastT: time.Now(),
}
}
/*
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cluster_impl
import (
"context"
"sync"
"testing"
"time"
)
import (
"github.com/golang/mock/gomock"
perrors "github.com/pkg/errors"
"github.com/stretchr/testify/assert"
)
import (
"github.com/apache/dubbo-go/cluster/directory"
"github.com/apache/dubbo-go/cluster/loadbalance"
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/protocol"
"github.com/apache/dubbo-go/protocol/invocation"
"github.com/apache/dubbo-go/protocol/mock"
)
var (
failbackUrl, _ = common.NewURL(context.TODO(), "dubbo://192.168.1.1:20000/com.ikurento.user.UserProvider")
)
// registerFailback register failbackCluster to cluster extension.
func registerFailback(t *testing.T, invoker *mock.MockInvoker) protocol.Invoker {
extension.SetLoadbalance("random", loadbalance.NewRandomLoadBalance)
failbackCluster := NewFailbackCluster()
invokers := []protocol.Invoker{}
invokers = append(invokers, invoker)
invoker.EXPECT().GetUrl().Return(failbackUrl)
staticDir := directory.NewStaticDirectory(invokers)
clusterInvoker := failbackCluster.Join(staticDir)
return clusterInvoker
}
// success firstly, failback should return origin invoke result.
func Test_FailbackSuceess(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
invoker := mock.NewMockInvoker(ctrl)
clusterInvoker := registerFailback(t, invoker).(*failbackClusterInvoker)
invoker.EXPECT().GetUrl().Return(failbackUrl).Times(1)
mockResult := &protocol.RPCResult{Rest: rest{tried: 0, success: true}}
invoker.EXPECT().Invoke(gomock.Any()).Return(mockResult)
result := clusterInvoker.Invoke(&invocation.RPCInvocation{})
assert.Equal(t, mockResult, result)
}
// failed firstly, success later after one retry.
func Test_FailbackRetryOneSuccess(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
invoker := mock.NewMockInvoker(ctrl)
clusterInvoker := registerFailback(t, invoker).(*failbackClusterInvoker)
invoker.EXPECT().GetUrl().Return(failbackUrl).AnyTimes()
// failed at first
mockFailedResult := &protocol.RPCResult{Err: perrors.New("error")}
invoker.EXPECT().Invoke(gomock.Any()).Return(mockFailedResult)
// success second
var wg sync.WaitGroup
wg.Add(1)
now := time.Now()
mockSuccResult := &protocol.RPCResult{Rest: rest{tried: 0, success: true}}
invoker.EXPECT().Invoke(gomock.Any()).DoAndReturn(func(invocation protocol.Invocation) protocol.Result {
delta := time.Since(now).Nanoseconds() / int64(time.Second)
assert.True(t, delta >= 5)
wg.Done()
return mockSuccResult
})
result := clusterInvoker.Invoke(&invocation.RPCInvocation{})
assert.Nil(t, result.Error())
assert.Nil(t, result.Result())
assert.Equal(t, 0, len(result.Attachments()))
// ensure the retry task has been executed
assert.Equal(t, int64(1), clusterInvoker.taskList.Len())
// wait until the retry task is executed, the taskList will be empty.
wg.Wait()
assert.Equal(t, int64(0), clusterInvoker.taskList.Len())
invoker.EXPECT().Destroy().Return()
clusterInvoker.Destroy()
assert.Equal(t, int64(0), clusterInvoker.taskList.Len())
}
// failed firstly, and failed again after ech retry time.
func Test_FailbackRetryFailed(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
invoker := mock.NewMockInvoker(ctrl)
clusterInvoker := registerFailback(t, invoker).(*failbackClusterInvoker)
invoker.EXPECT().GetUrl().Return(failbackUrl).AnyTimes()
mockFailedResult := &protocol.RPCResult{Err: perrors.New("error")}
invoker.EXPECT().Invoke(gomock.Any()).Return(mockFailedResult)
//
var wg sync.WaitGroup
retries := 2
wg.Add(retries)
now := time.Now()
// add retry call that eventually failed.
for i := 0; i < retries; i++ {
j := i + 1
invoker.EXPECT().Invoke(gomock.Any()).DoAndReturn(func(invocation protocol.Invocation) protocol.Result {
delta := time.Since(now).Nanoseconds() / int64(time.Second)
assert.True(t, delta >= int64(5*j))
wg.Done()
return mockFailedResult
})
}
// first call should failed.
result := clusterInvoker.Invoke(&invocation.RPCInvocation{})
assert.Nil(t, result.Error())
assert.Nil(t, result.Result())
assert.Equal(t, 0, len(result.Attachments()))
wg.Wait()
time.Sleep(time.Second)
assert.Equal(t, int64(1), clusterInvoker.taskList.Len())
invoker.EXPECT().Destroy().Return()
clusterInvoker.Destroy()
// after destroy, the taskList will be empty
assert.Equal(t, int64(0), clusterInvoker.taskList.Len())
}
// add 10 tasks but all failed firstly, and failed again with one retry.
func Test_FailbackRetryFailed10Times(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
invoker := mock.NewMockInvoker(ctrl)
clusterInvoker := registerFailback(t, invoker).(*failbackClusterInvoker)
clusterInvoker.maxRetries = 10
invoker.EXPECT().GetUrl().Return(failbackUrl).AnyTimes()
// 10 task should failed firstly.
mockFailedResult := &protocol.RPCResult{Err: perrors.New("error")}
invoker.EXPECT().Invoke(gomock.Any()).Return(mockFailedResult).Times(10)
// 10 task should retry and failed.
var wg sync.WaitGroup
wg.Add(10)
now := time.Now()
invoker.EXPECT().Invoke(gomock.Any()).DoAndReturn(func(invocation protocol.Invocation) protocol.Result {
delta := time.Since(now).Nanoseconds() / int64(time.Second)
assert.True(t, delta >= 5)
wg.Done()
return mockFailedResult
}).Times(10)
for i := 0; i < 10; i++ {
result := clusterInvoker.Invoke(&invocation.RPCInvocation{})
assert.Nil(t, result.Error())
assert.Nil(t, result.Result())
assert.Equal(t, 0, len(result.Attachments()))
}
wg.Wait()
time.Sleep(time.Second) // in order to ensure checkRetry have done
assert.Equal(t, int64(10), clusterInvoker.taskList.Len())
invoker.EXPECT().Destroy().Return()
clusterInvoker.Destroy()
assert.Equal(t, int64(0), clusterInvoker.taskList.Len())
}
func Test_FailbackOutOfLimit(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
invoker := mock.NewMockInvoker(ctrl)
clusterInvoker := registerFailback(t, invoker).(*failbackClusterInvoker)
clusterInvoker.failbackTasks = 1
invoker.EXPECT().GetUrl().Return(failbackUrl).AnyTimes()
mockFailedResult := &protocol.RPCResult{Err: perrors.New("error")}
invoker.EXPECT().Invoke(gomock.Any()).Return(mockFailedResult).Times(11)
// reached limit
result := clusterInvoker.Invoke(&invocation.RPCInvocation{})
assert.Nil(t, result.Error())
assert.Nil(t, result.Result())
assert.Equal(t, 0, len(result.Attachments()))
// all will be out of limit
for i := 0; i < 10; i++ {
result := clusterInvoker.Invoke(&invocation.RPCInvocation{})
assert.Nil(t, result.Error())
assert.Nil(t, result.Result())
assert.Equal(t, 0, len(result.Attachments()))
assert.Equal(t, int64(1), clusterInvoker.taskList.Len())
}
invoker.EXPECT().Destroy().Return()
clusterInvoker.Destroy()
}
/*
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cluster_impl
import (
"github.com/apache/dubbo-go/cluster"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/protocol"
)
type failfastCluster struct{}
const failfast = "failfast"
func init() {
extension.SetCluster(failfast, NewFailFastCluster)
}
func NewFailFastCluster() cluster.Cluster {
return &failfastCluster{}
}
func (cluster *failfastCluster) Join(directory cluster.Directory) protocol.Invoker {
return newFailFastClusterInvoker(directory)
}
/*
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cluster_impl
import (
"github.com/apache/dubbo-go/cluster"
"github.com/apache/dubbo-go/protocol"
)
type failfastClusterInvoker struct {
baseClusterInvoker
}
func newFailFastClusterInvoker(directory cluster.Directory) protocol.Invoker {
return &failfastClusterInvoker{
baseClusterInvoker: newBaseClusterInvoker(directory),
}
}
func (invoker *failfastClusterInvoker) Invoke(invocation protocol.Invocation) protocol.Result {
invokers := invoker.directory.List(invocation)
err := invoker.checkInvokers(invokers, invocation)
if err != nil {
return &protocol.RPCResult{Err: err}
}
loadbalance := getLoadBalance(invokers[0], invocation)
err = invoker.checkWhetherDestroyed()
if err != nil {
return &protocol.RPCResult{Err: err}
}
ivk := invoker.doSelect(loadbalance, invocation, invokers, nil)
return ivk.Invoke(invocation)
}
/*
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cluster_impl
import (
"context"
"testing"
)
import (
"github.com/golang/mock/gomock"
perrors "github.com/pkg/errors"
"github.com/stretchr/testify/assert"
)
import (
"github.com/apache/dubbo-go/cluster/directory"
"github.com/apache/dubbo-go/cluster/loadbalance"
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/protocol"
"github.com/apache/dubbo-go/protocol/invocation"
"github.com/apache/dubbo-go/protocol/mock"
)
var (
failfastUrl, _ = common.NewURL(context.TODO(), "dubbo://192.168.1.1:20000/com.ikurento.user.UserProvider")
)
// registerFailfast register failfastCluster to cluster extension.
func registerFailfast(t *testing.T, invoker *mock.MockInvoker) protocol.Invoker {
extension.SetLoadbalance("random", loadbalance.NewRandomLoadBalance)
failfastCluster := NewFailFastCluster()
invokers := []protocol.Invoker{}
invokers = append(invokers, invoker)
invoker.EXPECT().GetUrl().Return(failfastUrl)
staticDir := directory.NewStaticDirectory(invokers)
clusterInvoker := failfastCluster.Join(staticDir)
return clusterInvoker
}
func Test_FailfastInvokeSuccess(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
invoker := mock.NewMockInvoker(ctrl)
clusterInvoker := registerFailfast(t, invoker)
invoker.EXPECT().GetUrl().Return(failfastUrl)
mockResult := &protocol.RPCResult{Rest: rest{tried: 0, success: true}}
invoker.EXPECT().Invoke(gomock.Any()).Return(mockResult)
result := clusterInvoker.Invoke(&invocation.RPCInvocation{})
assert.NoError(t, result.Error())
res := result.Result().(rest)
assert.True(t, res.success)
assert.Equal(t, 0, res.tried)
}
func Test_FailfastInvokeFail(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
invoker := mock.NewMockInvoker(ctrl)
clusterInvoker := registerFailfast(t, invoker)
invoker.EXPECT().GetUrl().Return(failfastUrl)
mockResult := &protocol.RPCResult{Err: perrors.New("error")}
invoker.EXPECT().Invoke(gomock.Any()).Return(mockResult)
result := clusterInvoker.Invoke(&invocation.RPCInvocation{})
assert.NotNil(t, result.Error())
assert.Equal(t, "error", result.Error().Error())
assert.Nil(t, result.Result())
}
......@@ -24,10 +24,8 @@ import (
import (
"github.com/apache/dubbo-go/cluster"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/common/utils"
"github.com/apache/dubbo-go/protocol"
"github.com/apache/dubbo-go/version"
)
type failoverClusterInvoker struct {
......@@ -48,17 +46,11 @@ func (invoker *failoverClusterInvoker) Invoke(invocation protocol.Invocation) pr
if err != nil {
return &protocol.RPCResult{Err: err}
}
url := invokers[0].GetUrl()
methodName := invocation.MethodName()
//Get the service loadbalance config
lb := url.GetParam(constant.LOADBALANCE_KEY, constant.DEFAULT_LOADBALANCE)
loadbalance := getLoadBalance(invokers[0], invocation)
//Get the service method loadbalance config if have
if v := url.GetMethodParam(methodName, constant.LOADBALANCE_KEY, ""); v != "" {
lb = v
}
loadbalance := extension.GetLoadbalance(lb)
methodName := invocation.MethodName()
url := invokers[0].GetUrl()
//get reties
retries := url.GetParamInt(constant.RETRIES_KEY, constant.DEFAULT_RETRIES)
......@@ -98,6 +90,6 @@ func (invoker *failoverClusterInvoker) Invoke(invocation protocol.Invocation) pr
ip, _ := utils.GetLocalIP()
return &protocol.RPCResult{Err: perrors.Errorf("Failed to invoke the method %v in the service %v. Tried %v times of "+
"the providers %v (%v/%v)from the registry %v on the consumer %v using the dubbo version %v. Last error is %v.",
methodName, invoker.GetUrl().Service(), retries, providers, len(providers), len(invokers), invoker.directory.GetUrl(), ip, version.Version, result.Error().Error(),
methodName, invoker.GetUrl().Service(), retries, providers, len(providers), len(invokers), invoker.directory.GetUrl(), ip, constant.Version, result.Error().Error(),
)}
}
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment