diff --git a/cluster/cluster_impl/failback_cluster_invoker.go b/cluster/cluster_impl/failback_cluster_invoker.go index 47e626a0ea5e5eb2b8165103f90252f21466674e..46b0ff634e56c45223a5aeb5566b9b1401518960 100644 --- a/cluster/cluster_impl/failback_cluster_invoker.go +++ b/cluster/cluster_impl/failback_cluster_invoker.go @@ -74,9 +74,9 @@ func newFailbackClusterInvoker(directory cluster.Directory) protocol.Invoker { func (invoker *failbackClusterInvoker) process(ctx context.Context) { invoker.ticker = time.NewTicker(time.Second * 1) - for { - select { - case <-invoker.ticker.C: + for range invoker.ticker.C { + // check each timeout task and re-run + for { value, err := invoker.taskList.Peek() if err == queue.ErrDisposed { return @@ -91,25 +91,26 @@ func (invoker *failbackClusterInvoker) process(ctx context.Context) { } // ignore return. the get must success. - if _, err = invoker.taskList.Get(1); err != nil { + _, err = invoker.taskList.Get(1) + if err != nil { logger.Warnf("get task found err: %v\n", err) break } - go invoker.tryTimerTaskProc(ctx, retryTask) - } - } -} -func (invoker *failbackClusterInvoker) tryTimerTaskProc(ctx context.Context, retryTask *retryTimerTask) { - invoked := make([]protocol.Invoker, 0) - invoked = append(invoked, retryTask.lastInvoker) + go func(retryTask *retryTimerTask) { + invoked := make([]protocol.Invoker, 0) + invoked = append(invoked, retryTask.lastInvoker) - retryInvoker := invoker.doSelect(retryTask.loadbalance, retryTask.invocation, retryTask.invokers, invoked) - var result protocol.Result - result = retryInvoker.Invoke(ctx, retryTask.invocation) - if result.Error() != nil { - retryTask.lastInvoker = retryInvoker - invoker.checkRetry(retryTask, result.Error()) + retryInvoker := invoker.doSelect(retryTask.loadbalance, retryTask.invocation, retryTask.invokers, invoked) + var result protocol.Result + result = retryInvoker.Invoke(ctx, retryTask.invocation) + if result.Error() != nil { + retryTask.lastInvoker = retryInvoker + invoker.checkRetry(retryTask, result.Error()) + } + }(retryTask) + + } } }