From f77342c5abdbb90dab241b2469263c27c375d234 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=8C=A0=E5=B0=98?= <xianbin.wxb@koubei.com> Date: Thu, 27 Jun 2019 16:36:10 +0800 Subject: [PATCH] failback and failsafe develop --- cluster/cluster_impl/failback_cluster.go | 40 ++++ .../cluster_impl/failback_cluster_invoker.go | 206 ++++++++++++++++++ cluster/cluster_impl/failsafe_cluster.go | 40 ++++ .../cluster_impl/failsafe_cluster_invoker.go | 77 +++++++ common/constant/default.go | 12 +- common/constant/key.go | 1 + 6 files changed, 371 insertions(+), 5 deletions(-) create mode 100644 cluster/cluster_impl/failback_cluster.go create mode 100644 cluster/cluster_impl/failback_cluster_invoker.go create mode 100644 cluster/cluster_impl/failsafe_cluster.go create mode 100644 cluster/cluster_impl/failsafe_cluster_invoker.go diff --git a/cluster/cluster_impl/failback_cluster.go b/cluster/cluster_impl/failback_cluster.go new file mode 100644 index 000000000..de22c78e9 --- /dev/null +++ b/cluster/cluster_impl/failback_cluster.go @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cluster_impl + +import ( + "github.com/apache/dubbo-go/cluster" + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/protocol" +) + +type failbackCluster struct{} + +const failback = "failback" + +func init() { + extension.SetCluster(failback, NewFailbackCluster) +} + +func NewFailbackCluster() cluster.Cluster { + return &failbackCluster{} +} + +func (cluster *failbackCluster) Join(directory cluster.Directory) protocol.Invoker { + return newFailbackClusterInvoker(directory) +} diff --git a/cluster/cluster_impl/failback_cluster_invoker.go b/cluster/cluster_impl/failback_cluster_invoker.go new file mode 100644 index 000000000..07792de0f --- /dev/null +++ b/cluster/cluster_impl/failback_cluster_invoker.go @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cluster_impl + +import ( + "container/list" + perrors "github.com/pkg/errors" + "sync" + "time" +) + +import ( + "github.com/apache/dubbo-go/cluster" + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/protocol" +) + +type failbackClusterInvoker struct { + baseClusterInvoker +} + +var ( + retries int64 + failbackTasks int64 + ticker *time.Ticker + once sync.Once + lock sync.Mutex + taskList *Queue +) + +func newFailbackClusterInvoker(directory cluster.Directory) protocol.Invoker { + invoker := &failbackClusterInvoker{ + baseClusterInvoker: newBaseClusterInvoker(directory), + } + retriesConfig := invoker.GetUrl().GetParamInt(constant.RETRIES_KEY, constant.DEFAULT_FAILBACK_TIMES) + if retriesConfig <= 0 { + retriesConfig = constant.DEFAULT_FAILBACK_TIMES + } + failbackTasksConfig := invoker.GetUrl().GetParamInt(constant.FAIL_BACK_TASKS_KEY, constant.DEFAULT_FAILBACK_TASKS) + if failbackTasksConfig <= 0 { + failbackTasksConfig = constant.DEFAULT_FAILBACK_TASKS + } + retries = retriesConfig + failbackTasks = failbackTasksConfig + return invoker +} + +func (invoker *failbackClusterInvoker) Invoke(invocation protocol.Invocation) protocol.Result { + + invokers := invoker.directory.List(invocation) + err := invoker.checkInvokers(invokers, invocation) + + if err != nil { + // add retry ticker task + perrors.Errorf("Failed to invoke the method %v in the service %v, wait for retry in background. Ignored exception: %v.", + invocation.MethodName(), invoker.GetUrl().Service(), err) + return &protocol.RPCResult{} + } + url := invokers[0].GetUrl() + + methodName := invocation.MethodName() + //Get the service loadbalance config + lb := url.GetParam(constant.LOADBALANCE_KEY, constant.DEFAULT_LOADBALANCE) + + //Get the service method loadbalance config if have + if v := url.GetMethodParam(methodName, constant.LOADBALANCE_KEY, ""); v != "" { + lb = v + } + loadbalance := extension.GetLoadbalance(lb) + + invoked := []protocol.Invoker{} + var result protocol.Result + + ivk := invoker.doSelect(loadbalance, invocation, invokers, invoked) + invoked = append(invoked, ivk) + //DO INVOKE + result = ivk.Invoke(invocation) + + if result.Error() != nil { + // add retry ticker task + addFailed(loadbalance, invocation, invokers, invoker) + perrors.Errorf("Failback to invoke the method %v in the service %v, wait for retry in background. Ignored exception: %v.", + methodName, invoker.GetUrl().Service(), result.Error().Error()) + // ignore + return &protocol.RPCResult{} + } + + return result +} + +func (invoker *failbackClusterInvoker) Destroy() { + //this is must atom operation + if invoker.destroyed.CAS(false, true) { + invoker.directory.Destroy() + } + // stop ticker + ticker.Stop() +} + +func addFailed(balance cluster.LoadBalance, invocation protocol.Invocation, invokers []protocol.Invoker, + invoker *failbackClusterInvoker) { + initSingleTickerTaskInstance() + // init one retryTimerTask + timerTask := newRetryTimerTask(balance, invocation, invokers, invoker, retries, 5) + taskList.push(timerTask) + // process ticker task + go func() { + <-ticker.C + value := taskList.pop() + if value == nil { + return + } + + retryTask := value.(retryTimerTask) + invoked := []protocol.Invoker{} + invoked = append(invoked, retryTask.lastInvoker) + retryInvoker := invoker.doSelect(retryTask.loadbalance, retryTask.invocation, retryTask.invokers, + invoked) + var result protocol.Result + result = retryInvoker.Invoke(retryTask.invocation) + if result.Error() != nil { + perrors.Errorf("Failed retry to invoke the method %v in the service %v, wait again. The exception: %v.", + invocation.MethodName(), invoker.GetUrl().Service(), result.Error().Error()) + retryTask.retries++ + if retryTask.retries > retries { + perrors.Errorf("Failed retry times exceed threshold (%v), We have to abandon, invocation-> %v", + retries, invocation) + } else { + taskList.push(retryTask) + } + } + }() +} + +func initSingleTickerTaskInstance() { + once.Do(func() { + newTickerTask() + }) +} + +func newTickerTask() { + ticker = time.NewTicker(time.Second * 1) + taskList = newQueue() +} + +type retryTimerTask struct { + loadbalance cluster.LoadBalance + invocation protocol.Invocation + invokers []protocol.Invoker + lastInvoker *failbackClusterInvoker + retries int64 + tick int64 +} + +func newRetryTimerTask(loadbalance cluster.LoadBalance, invocation protocol.Invocation, invokers []protocol.Invoker, + lastInvoker *failbackClusterInvoker, retries int64, tick int64) *retryTimerTask { + return &retryTimerTask{ + loadbalance: loadbalance, + invocation: invocation, + invokers: invokers, + lastInvoker: lastInvoker, + retries: retries, + tick: tick, + } +} + +type Queue struct { + data *list.List +} + +func newQueue() *Queue { + q := new(Queue) + q.data = list.New() + return q +} + +func (q *Queue) push(v interface{}) { + defer lock.Unlock() + lock.Lock() + q.data.PushFront(v) +} + +func (q *Queue) pop() interface{} { + defer lock.Unlock() + lock.Lock() + iter := q.data.Back() + v := iter.Value + q.data.Remove(iter) + return v +} diff --git a/cluster/cluster_impl/failsafe_cluster.go b/cluster/cluster_impl/failsafe_cluster.go new file mode 100644 index 000000000..3ff97d25e --- /dev/null +++ b/cluster/cluster_impl/failsafe_cluster.go @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cluster_impl + +import ( + "github.com/apache/dubbo-go/cluster" + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/protocol" +) + +type failsafeCluster struct{} + +const failsafe = "failsafe" + +func init() { + extension.SetCluster(failsafe, NewFailsafeCluster) +} + +func NewFailsafeCluster() cluster.Cluster { + return &failsafeCluster{} +} + +func (cluster *failsafeCluster) Join(directory cluster.Directory) protocol.Invoker { + return newFailsafeClusterInvoker(directory) +} diff --git a/cluster/cluster_impl/failsafe_cluster_invoker.go b/cluster/cluster_impl/failsafe_cluster_invoker.go new file mode 100644 index 000000000..a006e918a --- /dev/null +++ b/cluster/cluster_impl/failsafe_cluster_invoker.go @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cluster_impl + +import ( + perrors "github.com/pkg/errors" +) + +import ( + "github.com/apache/dubbo-go/cluster" + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/protocol" +) + +type failsafeClusterInvoker struct { + baseClusterInvoker +} + +func newFailsafeClusterInvoker(directory cluster.Directory) protocol.Invoker { + return &failsafeClusterInvoker{ + baseClusterInvoker: newBaseClusterInvoker(directory), + } +} + +func (invoker *failsafeClusterInvoker) Invoke(invocation protocol.Invocation) protocol.Result { + + invokers := invoker.directory.List(invocation) + err := invoker.checkInvokers(invokers, invocation) + + if err != nil { + return &protocol.RPCResult{} + } + + url := invokers[0].GetUrl() + + methodName := invocation.MethodName() + //Get the service loadbalance config + lb := url.GetParam(constant.LOADBALANCE_KEY, constant.DEFAULT_LOADBALANCE) + + //Get the service method loadbalance config if have + if v := url.GetMethodParam(methodName, constant.LOADBALANCE_KEY, ""); v != "" { + lb = v + } + loadbalance := extension.GetLoadbalance(lb) + + invoked := []protocol.Invoker{} + + var result protocol.Result + + ivk := invoker.doSelect(loadbalance, invocation, invokers, invoked) + invoked = append(invoked, ivk) + //DO INVOKE + result = ivk.Invoke(invocation) + if result.Error() != nil { + // ignore + perrors.Errorf("Failsafe ignore exception: %v.", result.Error().Error()) + return &protocol.RPCResult{} + } + return result + +} diff --git a/common/constant/default.go b/common/constant/default.go index 05461ca6e..0ee0aa247 100644 --- a/common/constant/default.go +++ b/common/constant/default.go @@ -26,11 +26,13 @@ const ( ) const ( - DEFAULT_LOADBALANCE = "random" - DEFAULT_RETRIES = 2 - DEFAULT_PROTOCOL = "dubbo" - DEFAULT_REG_TIMEOUT = "10s" - DEFAULT_CLUSTER = "failover" + DEFAULT_LOADBALANCE = "random" + DEFAULT_RETRIES = 2 + DEFAULT_PROTOCOL = "dubbo" + DEFAULT_REG_TIMEOUT = "10s" + DEFAULT_CLUSTER = "failover" + DEFAULT_FAILBACK_TIMES = 3 + DEFAULT_FAILBACK_TASKS = 100 ) const ( diff --git a/common/constant/key.go b/common/constant/key.go index 82df44c3e..104380ae4 100644 --- a/common/constant/key.go +++ b/common/constant/key.go @@ -45,6 +45,7 @@ const ( WARMUP_KEY = "warmup" RETRIES_KEY = "retries" BEAN_NAME = "bean.name" + FAIL_BACK_TASKS_KEY = "failbacktasks" ) const ( -- GitLab