diff --git a/cluster/cluster_impl/failback_cluster.go b/cluster/cluster_impl/failback_cluster.go new file mode 100644 index 0000000000000000000000000000000000000000..de22c78e947d0b8124add721ab7ff42efebcdbe4 --- /dev/null +++ b/cluster/cluster_impl/failback_cluster.go @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cluster_impl + +import ( + "github.com/apache/dubbo-go/cluster" + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/protocol" +) + +type failbackCluster struct{} + +const failback = "failback" + +func init() { + extension.SetCluster(failback, NewFailbackCluster) +} + +func NewFailbackCluster() cluster.Cluster { + return &failbackCluster{} +} + +func (cluster *failbackCluster) Join(directory cluster.Directory) protocol.Invoker { + return newFailbackClusterInvoker(directory) +} diff --git a/cluster/cluster_impl/failback_cluster_invoker.go b/cluster/cluster_impl/failback_cluster_invoker.go new file mode 100644 index 0000000000000000000000000000000000000000..07792de0fd75333ac689e14e4e208b7386392309 --- /dev/null +++ b/cluster/cluster_impl/failback_cluster_invoker.go @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cluster_impl + +import ( + "container/list" + perrors "github.com/pkg/errors" + "sync" + "time" +) + +import ( + "github.com/apache/dubbo-go/cluster" + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/protocol" +) + +type failbackClusterInvoker struct { + baseClusterInvoker +} + +var ( + retries int64 + failbackTasks int64 + ticker *time.Ticker + once sync.Once + lock sync.Mutex + taskList *Queue +) + +func newFailbackClusterInvoker(directory cluster.Directory) protocol.Invoker { + invoker := &failbackClusterInvoker{ + baseClusterInvoker: newBaseClusterInvoker(directory), + } + retriesConfig := invoker.GetUrl().GetParamInt(constant.RETRIES_KEY, constant.DEFAULT_FAILBACK_TIMES) + if retriesConfig <= 0 { + retriesConfig = constant.DEFAULT_FAILBACK_TIMES + } + failbackTasksConfig := invoker.GetUrl().GetParamInt(constant.FAIL_BACK_TASKS_KEY, constant.DEFAULT_FAILBACK_TASKS) + if failbackTasksConfig <= 0 { + failbackTasksConfig = constant.DEFAULT_FAILBACK_TASKS + } + retries = retriesConfig + failbackTasks = failbackTasksConfig + return invoker +} + +func (invoker *failbackClusterInvoker) Invoke(invocation protocol.Invocation) protocol.Result { + + invokers := invoker.directory.List(invocation) + err := invoker.checkInvokers(invokers, invocation) + + if err != nil { + // add retry ticker task + perrors.Errorf("Failed to invoke the method %v in the service %v, wait for retry in background. Ignored exception: %v.", + invocation.MethodName(), invoker.GetUrl().Service(), err) + return &protocol.RPCResult{} + } + url := invokers[0].GetUrl() + + methodName := invocation.MethodName() + //Get the service loadbalance config + lb := url.GetParam(constant.LOADBALANCE_KEY, constant.DEFAULT_LOADBALANCE) + + //Get the service method loadbalance config if have + if v := url.GetMethodParam(methodName, constant.LOADBALANCE_KEY, ""); v != "" { + lb = v + } + loadbalance := extension.GetLoadbalance(lb) + + invoked := []protocol.Invoker{} + var result protocol.Result + + ivk := invoker.doSelect(loadbalance, invocation, invokers, invoked) + invoked = append(invoked, ivk) + //DO INVOKE + result = ivk.Invoke(invocation) + + if result.Error() != nil { + // add retry ticker task + addFailed(loadbalance, invocation, invokers, invoker) + perrors.Errorf("Failback to invoke the method %v in the service %v, wait for retry in background. Ignored exception: %v.", + methodName, invoker.GetUrl().Service(), result.Error().Error()) + // ignore + return &protocol.RPCResult{} + } + + return result +} + +func (invoker *failbackClusterInvoker) Destroy() { + //this is must atom operation + if invoker.destroyed.CAS(false, true) { + invoker.directory.Destroy() + } + // stop ticker + ticker.Stop() +} + +func addFailed(balance cluster.LoadBalance, invocation protocol.Invocation, invokers []protocol.Invoker, + invoker *failbackClusterInvoker) { + initSingleTickerTaskInstance() + // init one retryTimerTask + timerTask := newRetryTimerTask(balance, invocation, invokers, invoker, retries, 5) + taskList.push(timerTask) + // process ticker task + go func() { + <-ticker.C + value := taskList.pop() + if value == nil { + return + } + + retryTask := value.(retryTimerTask) + invoked := []protocol.Invoker{} + invoked = append(invoked, retryTask.lastInvoker) + retryInvoker := invoker.doSelect(retryTask.loadbalance, retryTask.invocation, retryTask.invokers, + invoked) + var result protocol.Result + result = retryInvoker.Invoke(retryTask.invocation) + if result.Error() != nil { + perrors.Errorf("Failed retry to invoke the method %v in the service %v, wait again. The exception: %v.", + invocation.MethodName(), invoker.GetUrl().Service(), result.Error().Error()) + retryTask.retries++ + if retryTask.retries > retries { + perrors.Errorf("Failed retry times exceed threshold (%v), We have to abandon, invocation-> %v", + retries, invocation) + } else { + taskList.push(retryTask) + } + } + }() +} + +func initSingleTickerTaskInstance() { + once.Do(func() { + newTickerTask() + }) +} + +func newTickerTask() { + ticker = time.NewTicker(time.Second * 1) + taskList = newQueue() +} + +type retryTimerTask struct { + loadbalance cluster.LoadBalance + invocation protocol.Invocation + invokers []protocol.Invoker + lastInvoker *failbackClusterInvoker + retries int64 + tick int64 +} + +func newRetryTimerTask(loadbalance cluster.LoadBalance, invocation protocol.Invocation, invokers []protocol.Invoker, + lastInvoker *failbackClusterInvoker, retries int64, tick int64) *retryTimerTask { + return &retryTimerTask{ + loadbalance: loadbalance, + invocation: invocation, + invokers: invokers, + lastInvoker: lastInvoker, + retries: retries, + tick: tick, + } +} + +type Queue struct { + data *list.List +} + +func newQueue() *Queue { + q := new(Queue) + q.data = list.New() + return q +} + +func (q *Queue) push(v interface{}) { + defer lock.Unlock() + lock.Lock() + q.data.PushFront(v) +} + +func (q *Queue) pop() interface{} { + defer lock.Unlock() + lock.Lock() + iter := q.data.Back() + v := iter.Value + q.data.Remove(iter) + return v +} diff --git a/cluster/cluster_impl/failsafe_cluster.go b/cluster/cluster_impl/failsafe_cluster.go new file mode 100644 index 0000000000000000000000000000000000000000..3ff97d25eae80980a90a03e71865bb8f9a63defe --- /dev/null +++ b/cluster/cluster_impl/failsafe_cluster.go @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cluster_impl + +import ( + "github.com/apache/dubbo-go/cluster" + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/protocol" +) + +type failsafeCluster struct{} + +const failsafe = "failsafe" + +func init() { + extension.SetCluster(failsafe, NewFailsafeCluster) +} + +func NewFailsafeCluster() cluster.Cluster { + return &failsafeCluster{} +} + +func (cluster *failsafeCluster) Join(directory cluster.Directory) protocol.Invoker { + return newFailsafeClusterInvoker(directory) +} diff --git a/cluster/cluster_impl/failsafe_cluster_invoker.go b/cluster/cluster_impl/failsafe_cluster_invoker.go new file mode 100644 index 0000000000000000000000000000000000000000..a006e918a2740381858a6f998529f40852ea443f --- /dev/null +++ b/cluster/cluster_impl/failsafe_cluster_invoker.go @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cluster_impl + +import ( + perrors "github.com/pkg/errors" +) + +import ( + "github.com/apache/dubbo-go/cluster" + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/protocol" +) + +type failsafeClusterInvoker struct { + baseClusterInvoker +} + +func newFailsafeClusterInvoker(directory cluster.Directory) protocol.Invoker { + return &failsafeClusterInvoker{ + baseClusterInvoker: newBaseClusterInvoker(directory), + } +} + +func (invoker *failsafeClusterInvoker) Invoke(invocation protocol.Invocation) protocol.Result { + + invokers := invoker.directory.List(invocation) + err := invoker.checkInvokers(invokers, invocation) + + if err != nil { + return &protocol.RPCResult{} + } + + url := invokers[0].GetUrl() + + methodName := invocation.MethodName() + //Get the service loadbalance config + lb := url.GetParam(constant.LOADBALANCE_KEY, constant.DEFAULT_LOADBALANCE) + + //Get the service method loadbalance config if have + if v := url.GetMethodParam(methodName, constant.LOADBALANCE_KEY, ""); v != "" { + lb = v + } + loadbalance := extension.GetLoadbalance(lb) + + invoked := []protocol.Invoker{} + + var result protocol.Result + + ivk := invoker.doSelect(loadbalance, invocation, invokers, invoked) + invoked = append(invoked, ivk) + //DO INVOKE + result = ivk.Invoke(invocation) + if result.Error() != nil { + // ignore + perrors.Errorf("Failsafe ignore exception: %v.", result.Error().Error()) + return &protocol.RPCResult{} + } + return result + +} diff --git a/common/constant/default.go b/common/constant/default.go index 05461ca6e7360e08d716d6b78d20ad4411df99e8..0ee0aa24786bc4a0fac6178876154de4ac0784ed 100644 --- a/common/constant/default.go +++ b/common/constant/default.go @@ -26,11 +26,13 @@ const ( ) const ( - DEFAULT_LOADBALANCE = "random" - DEFAULT_RETRIES = 2 - DEFAULT_PROTOCOL = "dubbo" - DEFAULT_REG_TIMEOUT = "10s" - DEFAULT_CLUSTER = "failover" + DEFAULT_LOADBALANCE = "random" + DEFAULT_RETRIES = 2 + DEFAULT_PROTOCOL = "dubbo" + DEFAULT_REG_TIMEOUT = "10s" + DEFAULT_CLUSTER = "failover" + DEFAULT_FAILBACK_TIMES = 3 + DEFAULT_FAILBACK_TASKS = 100 ) const ( diff --git a/common/constant/key.go b/common/constant/key.go index 82df44c3e10b6f19d2fba2c86fb7b5086904ab41..104380ae4dd99a55cf64fdeec76d9defc4f634b3 100644 --- a/common/constant/key.go +++ b/common/constant/key.go @@ -45,6 +45,7 @@ const ( WARMUP_KEY = "warmup" RETRIES_KEY = "retries" BEAN_NAME = "bean.name" + FAIL_BACK_TASKS_KEY = "failbacktasks" ) const (