diff --git a/.gitignore b/.gitignore index 0b3673bf316a409b5caadf09f3fa58d42ad8f968..f189060bb351db399efe009a676ac3984b634215 100644 --- a/.gitignore +++ b/.gitignore @@ -22,3 +22,4 @@ vendor/ logs/ .vscode/ +coverage.txt diff --git a/cluster/cluster_impl/failback_cluster.go b/cluster/cluster_impl/failback_cluster.go new file mode 100644 index 0000000000000000000000000000000000000000..de22c78e947d0b8124add721ab7ff42efebcdbe4 --- /dev/null +++ b/cluster/cluster_impl/failback_cluster.go @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cluster_impl + +import ( + "github.com/apache/dubbo-go/cluster" + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/protocol" +) + +type failbackCluster struct{} + +const failback = "failback" + +func init() { + extension.SetCluster(failback, NewFailbackCluster) +} + +func NewFailbackCluster() cluster.Cluster { + return &failbackCluster{} +} + +func (cluster *failbackCluster) Join(directory cluster.Directory) protocol.Invoker { + return newFailbackClusterInvoker(directory) +} diff --git a/cluster/cluster_impl/failback_cluster_invoker.go b/cluster/cluster_impl/failback_cluster_invoker.go new file mode 100644 index 0000000000000000000000000000000000000000..d7d01dd5c58662940b297cdb9b8a36a78e897d8f --- /dev/null +++ b/cluster/cluster_impl/failback_cluster_invoker.go @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cluster_impl + +import ( + "sync" + "time" +) + +import ( + "github.com/Workiva/go-datastructures/queue" +) + +import ( + "github.com/apache/dubbo-go/cluster" + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/common/logger" + "github.com/apache/dubbo-go/protocol" +) + +/** + * When fails, record failure requests and schedule for retry on a regular interval. + * Especially useful for services of notification. + * + * <a href="http://en.wikipedia.org/wiki/Failback">Failback</a> + */ +type failbackClusterInvoker struct { + baseClusterInvoker + + once sync.Once + ticker *time.Ticker + maxRetries int64 + failbackTasks int64 + taskList *queue.Queue +} + +func newFailbackClusterInvoker(directory cluster.Directory) protocol.Invoker { + invoker := &failbackClusterInvoker{ + baseClusterInvoker: newBaseClusterInvoker(directory), + } + retriesConfig := invoker.GetUrl().GetParamInt(constant.RETRIES_KEY, constant.DEFAULT_FAILBACK_TIMES) + if retriesConfig <= 0 { + retriesConfig = constant.DEFAULT_FAILBACK_TIMES + } + failbackTasksConfig := invoker.GetUrl().GetParamInt(constant.FAIL_BACK_TASKS_KEY, constant.DEFAULT_FAILBACK_TASKS) + if failbackTasksConfig <= 0 { + failbackTasksConfig = constant.DEFAULT_FAILBACK_TASKS + } + invoker.maxRetries = retriesConfig + invoker.failbackTasks = failbackTasksConfig + return invoker +} + +func (invoker *failbackClusterInvoker) process() { + invoker.ticker = time.NewTicker(time.Second * 1) + for range invoker.ticker.C { + // check each timeout task and re-run + for { + value, err := invoker.taskList.Peek() + if err == queue.ErrDisposed { + return + } + if err == queue.ErrEmptyQueue { + break + } + + retryTask := value.(*retryTimerTask) + if time.Since(retryTask.lastT).Seconds() < 5 { + break + } + + // ignore return. the get must success. + _, err = invoker.taskList.Get(1) + if err != nil { + logger.Warnf("get task found err: %v\n", err) + break + } + + go func(retryTask *retryTimerTask) { + invoked := make([]protocol.Invoker, 0) + invoked = append(invoked, retryTask.lastInvoker) + + retryInvoker := invoker.doSelect(retryTask.loadbalance, retryTask.invocation, retryTask.invokers, invoked) + var result protocol.Result + result = retryInvoker.Invoke(retryTask.invocation) + if result.Error() != nil { + retryTask.lastInvoker = retryInvoker + invoker.checkRetry(retryTask, result.Error()) + } + }(retryTask) + + } + } +} + +func (invoker *failbackClusterInvoker) checkRetry(retryTask *retryTimerTask, err error) { + logger.Errorf("Failed retry to invoke the method %v in the service %v, wait again. The exception: %v.\n", + retryTask.invocation.MethodName(), invoker.GetUrl().Service(), err.Error()) + retryTask.retries++ + retryTask.lastT = time.Now() + if retryTask.retries > invoker.maxRetries { + logger.Errorf("Failed retry times exceed threshold (%v), We have to abandon, invocation-> %v.\n", + retryTask.retries, retryTask.invocation) + } else { + invoker.taskList.Put(retryTask) + } +} + +func (invoker *failbackClusterInvoker) Invoke(invocation protocol.Invocation) protocol.Result { + invokers := invoker.directory.List(invocation) + err := invoker.checkInvokers(invokers, invocation) + if err != nil { + logger.Errorf("Failed to invoke the method %v in the service %v, wait for retry in background. Ignored exception: %v.\n", + invocation.MethodName(), invoker.GetUrl().Service(), err) + return &protocol.RPCResult{} + } + url := invokers[0].GetUrl() + + methodName := invocation.MethodName() + //Get the service loadbalance config + lb := url.GetParam(constant.LOADBALANCE_KEY, constant.DEFAULT_LOADBALANCE) + + //Get the service method loadbalance config if have + if v := url.GetMethodParam(methodName, constant.LOADBALANCE_KEY, ""); v != "" { + lb = v + } + loadbalance := extension.GetLoadbalance(lb) + + invoked := make([]protocol.Invoker, 0) + var result protocol.Result + + ivk := invoker.doSelect(loadbalance, invocation, invokers, invoked) + invoked = append(invoked, ivk) + //DO INVOKE + result = ivk.Invoke(invocation) + if result.Error() != nil { + invoker.once.Do(func() { + invoker.taskList = queue.New(invoker.failbackTasks) + go invoker.process() + }) + + taskLen := invoker.taskList.Len() + if taskLen >= invoker.failbackTasks { + logger.Warnf("tasklist is too full > %d.\n", taskLen) + return &protocol.RPCResult{} + } + + timerTask := newRetryTimerTask(loadbalance, invocation, invokers, ivk) + invoker.taskList.Put(timerTask) + + logger.Errorf("Failback to invoke the method %v in the service %v, wait for retry in background. Ignored exception: %v.\n", + methodName, url.Service(), result.Error().Error()) + // ignore + return &protocol.RPCResult{} + } + + return result +} + +func (invoker *failbackClusterInvoker) Destroy() { + invoker.baseClusterInvoker.Destroy() + + // stop ticker + invoker.ticker.Stop() + + _ = invoker.taskList.Dispose() +} + +type retryTimerTask struct { + loadbalance cluster.LoadBalance + invocation protocol.Invocation + invokers []protocol.Invoker + lastInvoker protocol.Invoker + retries int64 + lastT time.Time +} + +func newRetryTimerTask(loadbalance cluster.LoadBalance, invocation protocol.Invocation, invokers []protocol.Invoker, + lastInvoker protocol.Invoker) *retryTimerTask { + return &retryTimerTask{ + loadbalance: loadbalance, + invocation: invocation, + invokers: invokers, + lastInvoker: lastInvoker, + lastT: time.Now(), + } +} diff --git a/cluster/cluster_impl/failback_cluster_test.go b/cluster/cluster_impl/failback_cluster_test.go new file mode 100644 index 0000000000000000000000000000000000000000..2cdd14e5636a8d7db09f9c47653e5de745193863 --- /dev/null +++ b/cluster/cluster_impl/failback_cluster_test.go @@ -0,0 +1,242 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cluster_impl + +import ( + "context" + "sync" + "testing" + "time" +) + +import ( + "github.com/golang/mock/gomock" + perrors "github.com/pkg/errors" + "github.com/stretchr/testify/assert" +) + +import ( + "github.com/apache/dubbo-go/cluster/directory" + "github.com/apache/dubbo-go/cluster/loadbalance" + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/protocol" + "github.com/apache/dubbo-go/protocol/invocation" + "github.com/apache/dubbo-go/protocol/mock" +) + +var ( + failbackUrl, _ = common.NewURL(context.TODO(), "dubbo://192.168.1.1:20000/com.ikurento.user.UserProvider") +) + +// registerFailback register failbackCluster to cluster extension. +func registerFailback(t *testing.T, invoker *mock.MockInvoker) protocol.Invoker { + extension.SetLoadbalance("random", loadbalance.NewRandomLoadBalance) + failbackCluster := NewFailbackCluster() + + invokers := []protocol.Invoker{} + invokers = append(invokers, invoker) + + invoker.EXPECT().GetUrl().Return(failbackUrl) + + staticDir := directory.NewStaticDirectory(invokers) + clusterInvoker := failbackCluster.Join(staticDir) + return clusterInvoker +} + +// success firstly, failback should return origin invoke result. +func Test_FailbackSuceess(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + invoker := mock.NewMockInvoker(ctrl) + clusterInvoker := registerFailback(t, invoker).(*failbackClusterInvoker) + + invoker.EXPECT().GetUrl().Return(failbackUrl).Times(1) + + mockResult := &protocol.RPCResult{Rest: rest{tried: 0, success: true}} + invoker.EXPECT().Invoke(gomock.Any()).Return(mockResult) + + result := clusterInvoker.Invoke(&invocation.RPCInvocation{}) + assert.Equal(t, mockResult, result) +} + +// failed firstly, success later after one retry. +func Test_FailbackRetryOneSuccess(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + invoker := mock.NewMockInvoker(ctrl) + clusterInvoker := registerFailback(t, invoker).(*failbackClusterInvoker) + + invoker.EXPECT().GetUrl().Return(failbackUrl).AnyTimes() + + // failed at first + mockFailedResult := &protocol.RPCResult{Err: perrors.New("error")} + invoker.EXPECT().Invoke(gomock.Any()).Return(mockFailedResult) + + // success second + var wg sync.WaitGroup + wg.Add(1) + now := time.Now() + mockSuccResult := &protocol.RPCResult{Rest: rest{tried: 0, success: true}} + invoker.EXPECT().Invoke(gomock.Any()).DoAndReturn(func(invocation protocol.Invocation) protocol.Result { + delta := time.Since(now).Nanoseconds() / int64(time.Second) + assert.True(t, delta >= 5) + wg.Done() + return mockSuccResult + }) + + result := clusterInvoker.Invoke(&invocation.RPCInvocation{}) + assert.Nil(t, result.Error()) + assert.Nil(t, result.Result()) + assert.Equal(t, 0, len(result.Attachments())) + + // ensure the retry task has been executed + assert.Equal(t, int64(1), clusterInvoker.taskList.Len()) + // wait until the retry task is executed, the taskList will be empty. + wg.Wait() + assert.Equal(t, int64(0), clusterInvoker.taskList.Len()) + + invoker.EXPECT().Destroy().Return() + clusterInvoker.Destroy() + + assert.Equal(t, int64(0), clusterInvoker.taskList.Len()) +} + +// failed firstly, and failed again after ech retry time. +func Test_FailbackRetryFailed(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + invoker := mock.NewMockInvoker(ctrl) + clusterInvoker := registerFailback(t, invoker).(*failbackClusterInvoker) + + invoker.EXPECT().GetUrl().Return(failbackUrl).AnyTimes() + + mockFailedResult := &protocol.RPCResult{Err: perrors.New("error")} + invoker.EXPECT().Invoke(gomock.Any()).Return(mockFailedResult) + + // + var wg sync.WaitGroup + retries := 2 + wg.Add(retries) + now := time.Now() + + // add retry call that eventually failed. + for i := 0; i < retries; i++ { + j := i + 1 + invoker.EXPECT().Invoke(gomock.Any()).DoAndReturn(func(invocation protocol.Invocation) protocol.Result { + delta := time.Since(now).Nanoseconds() / int64(time.Second) + assert.True(t, delta >= int64(5*j)) + wg.Done() + return mockFailedResult + }) + } + + // first call should failed. + result := clusterInvoker.Invoke(&invocation.RPCInvocation{}) + assert.Nil(t, result.Error()) + assert.Nil(t, result.Result()) + assert.Equal(t, 0, len(result.Attachments())) + + wg.Wait() + time.Sleep(time.Second) + assert.Equal(t, int64(1), clusterInvoker.taskList.Len()) + + invoker.EXPECT().Destroy().Return() + clusterInvoker.Destroy() + // after destory, the tasklist will be empty + assert.Equal(t, int64(0), clusterInvoker.taskList.Len()) +} + +// add 10 tasks but all failed firstly, and failed again with one retry. +func Test_FailbackRetryFailed10Times(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + invoker := mock.NewMockInvoker(ctrl) + clusterInvoker := registerFailback(t, invoker).(*failbackClusterInvoker) + clusterInvoker.maxRetries = 10 + + invoker.EXPECT().GetUrl().Return(failbackUrl).AnyTimes() + + // 10 task should failed firstly. + mockFailedResult := &protocol.RPCResult{Err: perrors.New("error")} + invoker.EXPECT().Invoke(gomock.Any()).Return(mockFailedResult).Times(10) + + // 10 task should retry and failed. + var wg sync.WaitGroup + wg.Add(10) + now := time.Now() + invoker.EXPECT().Invoke(gomock.Any()).DoAndReturn(func(invocation protocol.Invocation) protocol.Result { + delta := time.Since(now).Nanoseconds() / int64(time.Second) + assert.True(t, delta >= 5) + wg.Done() + return mockFailedResult + }).Times(10) + + for i := 0; i < 10; i++ { + result := clusterInvoker.Invoke(&invocation.RPCInvocation{}) + assert.Nil(t, result.Error()) + assert.Nil(t, result.Result()) + assert.Equal(t, 0, len(result.Attachments())) + } + + wg.Wait() + time.Sleep(time.Second) // in order to ensure checkRetry have done + assert.Equal(t, int64(10), clusterInvoker.taskList.Len()) + + invoker.EXPECT().Destroy().Return() + clusterInvoker.Destroy() + + assert.Equal(t, int64(0), clusterInvoker.taskList.Len()) +} + +func Test_FailbackOutOfLimit(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + invoker := mock.NewMockInvoker(ctrl) + clusterInvoker := registerFailback(t, invoker).(*failbackClusterInvoker) + clusterInvoker.failbackTasks = 1 + + invoker.EXPECT().GetUrl().Return(failbackUrl).AnyTimes() + + mockFailedResult := &protocol.RPCResult{Err: perrors.New("error")} + invoker.EXPECT().Invoke(gomock.Any()).Return(mockFailedResult).Times(11) + + // reached limit + result := clusterInvoker.Invoke(&invocation.RPCInvocation{}) + assert.Nil(t, result.Error()) + assert.Nil(t, result.Result()) + assert.Equal(t, 0, len(result.Attachments())) + + // all will be out of limit + for i := 0; i < 10; i++ { + result := clusterInvoker.Invoke(&invocation.RPCInvocation{}) + assert.Nil(t, result.Error()) + assert.Nil(t, result.Result()) + assert.Equal(t, 0, len(result.Attachments())) + + assert.Equal(t, int64(1), clusterInvoker.taskList.Len()) + } + + invoker.EXPECT().Destroy().Return() + clusterInvoker.Destroy() +} diff --git a/cluster/cluster_impl/failsafe_cluster.go b/cluster/cluster_impl/failsafe_cluster.go new file mode 100644 index 0000000000000000000000000000000000000000..3ff97d25eae80980a90a03e71865bb8f9a63defe --- /dev/null +++ b/cluster/cluster_impl/failsafe_cluster.go @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cluster_impl + +import ( + "github.com/apache/dubbo-go/cluster" + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/protocol" +) + +type failsafeCluster struct{} + +const failsafe = "failsafe" + +func init() { + extension.SetCluster(failsafe, NewFailsafeCluster) +} + +func NewFailsafeCluster() cluster.Cluster { + return &failsafeCluster{} +} + +func (cluster *failsafeCluster) Join(directory cluster.Directory) protocol.Invoker { + return newFailsafeClusterInvoker(directory) +} diff --git a/cluster/cluster_impl/failsafe_cluster_invoker.go b/cluster/cluster_impl/failsafe_cluster_invoker.go new file mode 100644 index 0000000000000000000000000000000000000000..0a1da1bc49bfe01ffc20be2f4941ef752d026378 --- /dev/null +++ b/cluster/cluster_impl/failsafe_cluster_invoker.go @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cluster_impl + +import ( + "github.com/apache/dubbo-go/cluster" + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/common/logger" + "github.com/apache/dubbo-go/protocol" +) + +/** + * When invoke fails, log the error message and ignore this error by returning an empty Result. + * Usually used to write audit logs and other operations + * + * <a href="http://en.wikipedia.org/wiki/Fail-safe">Fail-safe</a> + * + */ +type failsafeClusterInvoker struct { + baseClusterInvoker +} + +func newFailsafeClusterInvoker(directory cluster.Directory) protocol.Invoker { + return &failsafeClusterInvoker{ + baseClusterInvoker: newBaseClusterInvoker(directory), + } +} + +func (invoker *failsafeClusterInvoker) Invoke(invocation protocol.Invocation) protocol.Result { + invokers := invoker.directory.List(invocation) + + err := invoker.checkInvokers(invokers, invocation) + if err != nil { + return &protocol.RPCResult{} + } + + url := invokers[0].GetUrl() + methodName := invocation.MethodName() + //Get the service loadbalance config + lb := url.GetParam(constant.LOADBALANCE_KEY, constant.DEFAULT_LOADBALANCE) + //Get the service method loadbalance config if have + if v := url.GetMethodParam(methodName, constant.LOADBALANCE_KEY, ""); v != "" { + lb = v + } + loadbalance := extension.GetLoadbalance(lb) + + invoked := make([]protocol.Invoker, 0) + var result protocol.Result + + ivk := invoker.doSelect(loadbalance, invocation, invokers, invoked) + invoked = append(invoked, ivk) + //DO INVOKE + result = ivk.Invoke(invocation) + if result.Error() != nil { + // ignore + logger.Errorf("Failsafe ignore exception: %v.\n", result.Error().Error()) + return &protocol.RPCResult{} + } + return result +} diff --git a/cluster/cluster_impl/failsafe_cluster_test.go b/cluster/cluster_impl/failsafe_cluster_test.go new file mode 100644 index 0000000000000000000000000000000000000000..9ee9d9fee31b0cb24d877ab3dc0e24fb552f5f11 --- /dev/null +++ b/cluster/cluster_impl/failsafe_cluster_test.go @@ -0,0 +1,95 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cluster_impl + +import ( + "context" + "testing" +) + +import ( + "github.com/golang/mock/gomock" + perrors "github.com/pkg/errors" + "github.com/stretchr/testify/assert" +) + +import ( + "github.com/apache/dubbo-go/cluster/directory" + "github.com/apache/dubbo-go/cluster/loadbalance" + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/protocol" + "github.com/apache/dubbo-go/protocol/invocation" + "github.com/apache/dubbo-go/protocol/mock" +) + +var ( + failsafeUrl, _ = common.NewURL(context.TODO(), "dubbo://192.168.1.1:20000/com.ikurento.user.UserProvider") +) + +// register_failsafe register failsafeCluster to cluster extension. +func register_failsafe(t *testing.T, invoker *mock.MockInvoker) protocol.Invoker { + extension.SetLoadbalance("random", loadbalance.NewRandomLoadBalance) + failsafeCluster := NewFailsafeCluster() + + invokers := []protocol.Invoker{} + invokers = append(invokers, invoker) + + invoker.EXPECT().GetUrl().Return(failbackUrl) + + staticDir := directory.NewStaticDirectory(invokers) + clusterInvoker := failsafeCluster.Join(staticDir) + return clusterInvoker +} + +func Test_FailSafeInvokeSuccess(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + invoker := mock.NewMockInvoker(ctrl) + clusterInvoker := register_failsafe(t, invoker) + + invoker.EXPECT().GetUrl().Return(failsafeUrl) + + mockResult := &protocol.RPCResult{Rest: rest{tried: 0, success: true}} + + invoker.EXPECT().Invoke(gomock.Any()).Return(mockResult) + result := clusterInvoker.Invoke(&invocation.RPCInvocation{}) + + assert.NoError(t, result.Error()) + res := result.Result().(rest) + assert.True(t, res.success) +} + +func Test_FailSafeInvokeFail(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + invoker := mock.NewMockInvoker(ctrl) + clusterInvoker := register_failsafe(t, invoker) + + invoker.EXPECT().GetUrl().Return(failsafeUrl) + + mockResult := &protocol.RPCResult{Err: perrors.New("error")} + + invoker.EXPECT().Invoke(gomock.Any()).Return(mockResult) + result := clusterInvoker.Invoke(&invocation.RPCInvocation{}) + + assert.NoError(t, result.Error()) + assert.Nil(t, result.Result()) +} diff --git a/common/constant/default.go b/common/constant/default.go index 05461ca6e7360e08d716d6b78d20ad4411df99e8..0ee0aa24786bc4a0fac6178876154de4ac0784ed 100644 --- a/common/constant/default.go +++ b/common/constant/default.go @@ -26,11 +26,13 @@ const ( ) const ( - DEFAULT_LOADBALANCE = "random" - DEFAULT_RETRIES = 2 - DEFAULT_PROTOCOL = "dubbo" - DEFAULT_REG_TIMEOUT = "10s" - DEFAULT_CLUSTER = "failover" + DEFAULT_LOADBALANCE = "random" + DEFAULT_RETRIES = 2 + DEFAULT_PROTOCOL = "dubbo" + DEFAULT_REG_TIMEOUT = "10s" + DEFAULT_CLUSTER = "failover" + DEFAULT_FAILBACK_TIMES = 3 + DEFAULT_FAILBACK_TASKS = 100 ) const ( diff --git a/common/constant/key.go b/common/constant/key.go index 82df44c3e10b6f19d2fba2c86fb7b5086904ab41..104380ae4dd99a55cf64fdeec76d9defc4f634b3 100644 --- a/common/constant/key.go +++ b/common/constant/key.go @@ -45,6 +45,7 @@ const ( WARMUP_KEY = "warmup" RETRIES_KEY = "retries" BEAN_NAME = "bean.name" + FAIL_BACK_TASKS_KEY = "failbacktasks" ) const ( diff --git a/go.mod b/go.mod index 28a3fc7c4202212a7664193b710044cefb406955..fb7e313c718ec5d755c4d2160b3d07f2f3d8474f 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,7 @@ module github.com/apache/dubbo-go require ( + github.com/Workiva/go-datastructures v1.0.50 github.com/apache/dubbo-go-hessian2 v1.2.5-0.20190731020727-1697039810c8 github.com/davecgh/go-spew v1.1.1 // indirect github.com/dubbogo/getty v1.2.0 diff --git a/go.sum b/go.sum index e2947876e1ec1d8b45c160bb9fff356798898e21..bce2b9a98a1be18eed0d7e8a81cb8cea6f950c1d 100644 --- a/go.sum +++ b/go.sum @@ -1,3 +1,5 @@ +github.com/Workiva/go-datastructures v1.0.50 h1:slDmfW6KCHcC7U+LP3DDBbm4fqTwZGn1beOFPfGaLvo= +github.com/Workiva/go-datastructures v1.0.50/go.mod h1:Z+F2Rca0qCsVYDS8z7bAGm8f3UkzuWYS/oBZz5a7VVA= github.com/apache/dubbo-go-hessian2 v1.2.5-0.20190731020727-1697039810c8 h1:7zJlM+8bpCAUhv03TZnXkT4MLlLWng1s7An8CLuN73E= github.com/apache/dubbo-go-hessian2 v1.2.5-0.20190731020727-1697039810c8/go.mod h1:LWnndnrFXZmJLAzoyNAPNHSIJ1KOHVkTSsHgC3YYWlo= github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= @@ -41,6 +43,7 @@ golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a h1:1BGLXjeY4akVXGgbC9HugT3Jv golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/tools v0.0.0-20190425150028-36563e24a262 h1:qsl9y/CJx34tuA7QCPNp86JNJe4spst6Ff8MjvPUdPg= golang.org/x/tools v0.0.0-20190425150028-36563e24a262/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=