Skip to content
Snippets Groups Projects
failback_cluster_invoker.go 6.47 KiB
Newer Older
匠尘's avatar
匠尘 committed
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package cluster_impl

import (
flycash's avatar
flycash committed
	"context"
匠尘's avatar
匠尘 committed
	"sync"
	"time"
)

import (
	"github.com/Workiva/go-datastructures/queue"
)

匠尘's avatar
匠尘 committed
import (
	"github.com/apache/dubbo-go/cluster"
	"github.com/apache/dubbo-go/common/constant"
	"github.com/apache/dubbo-go/common/extension"
	"github.com/apache/dubbo-go/common/logger"
匠尘's avatar
匠尘 committed
	"github.com/apache/dubbo-go/protocol"
)

/**
 * When fails, record failure requests and schedule for retry on a regular interval.
 * Especially useful for services of notification.
 *
 * <a href="http://en.wikipedia.org/wiki/Failback">Failback</a>
 */
匠尘's avatar
匠尘 committed
type failbackClusterInvoker struct {
	baseClusterInvoker

	once          sync.Once
	ticker        *time.Ticker
	maxRetries    int64
	failbackTasks int64
	taskList      *queue.Queue
}
匠尘's avatar
匠尘 committed

func newFailbackClusterInvoker(directory cluster.Directory) protocol.Invoker {
	invoker := &failbackClusterInvoker{
		baseClusterInvoker: newBaseClusterInvoker(directory),
	}
LaurenceLiZhixin's avatar
LaurenceLiZhixin committed
	retriesConfig := invoker.GetURL().GetParam(constant.RETRIES_KEY, constant.DEFAULT_FAILBACK_TIMES)
	retries, err := strconv.Atoi(retriesConfig)
	if err != nil || retries < 0 {
vito.he's avatar
vito.he committed
		logger.Error("Your retries config is invalid,pls do a check. And will use the default fail back times configuration instead.")
		retries = constant.DEFAULT_FAILBACK_TIMES_INT
匠尘's avatar
匠尘 committed
	}
LaurenceLiZhixin's avatar
LaurenceLiZhixin committed
	failbackTasksConfig := invoker.GetURL().GetParamInt(constant.FAIL_BACK_TASKS_KEY, constant.DEFAULT_FAILBACK_TASKS)
匠尘's avatar
匠尘 committed
	if failbackTasksConfig <= 0 {
		failbackTasksConfig = constant.DEFAULT_FAILBACK_TASKS
	}
	invoker.maxRetries = int64(retries)
	invoker.failbackTasks = failbackTasksConfig
匠尘's avatar
匠尘 committed
	return invoker
}

haohongfan's avatar
haohongfan committed
func (invoker *failbackClusterInvoker) tryTimerTaskProc(ctx context.Context, retryTask *retryTimerTask) {
	invoked := make([]protocol.Invoker, 0)
	invoked = append(invoked, retryTask.lastInvoker)

	retryInvoker := invoker.doSelect(retryTask.loadbalance, retryTask.invocation, retryTask.invokers, invoked)
AlexStocks's avatar
AlexStocks committed
	result := retryInvoker.Invoke(ctx, retryTask.invocation)
haohongfan's avatar
haohongfan committed
	if result.Error() != nil {
		retryTask.lastInvoker = retryInvoker
		invoker.checkRetry(retryTask, result.Error())
	}
}

flycash's avatar
flycash committed
func (invoker *failbackClusterInvoker) process(ctx context.Context) {
	invoker.ticker = time.NewTicker(time.Second * 1)
	for range invoker.ticker.C {
		// check each timeout task and re-run
		for {
			value, err := invoker.taskList.Peek()
			if err == queue.ErrDisposed {
				return
			}
			if err == queue.ErrEmptyQueue {
				break
			}

			retryTask := value.(*retryTimerTask)
			if time.Since(retryTask.lastT).Seconds() < 5 {
				break
			}

			// ignore return. the get must success.
			if _, err = invoker.taskList.Get(1); err != nil {
				logger.Warnf("get task found err: %v\n", err)
				break
			}
			go invoker.tryTimerTaskProc(ctx, retryTask)
		}
	}
}

func (invoker *failbackClusterInvoker) checkRetry(retryTask *retryTimerTask, err error) {
	logger.Errorf("Failed retry to invoke the method %v in the service %v, wait again. The exception: %v.\n",
LaurenceLiZhixin's avatar
LaurenceLiZhixin committed
		retryTask.invocation.MethodName(), invoker.GetURL().Service(), err.Error())
	retryTask.retries++
	retryTask.lastT = time.Now()
	if retryTask.retries > invoker.maxRetries {
		logger.Errorf("Failed retry times exceed threshold (%v), We have to abandon, invocation-> %v.\n",
			retryTask.retries, retryTask.invocation)
AlexStocks's avatar
AlexStocks committed
		return
	}

	if err := invoker.taskList.Put(retryTask); err != nil {
		logger.Errorf("invoker.taskList.Put(retryTask:%#v) = error:%v", retryTask, err)
flycash's avatar
flycash committed
func (invoker *failbackClusterInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result {
匠尘's avatar
匠尘 committed
	invokers := invoker.directory.List(invocation)
	if err := invoker.checkInvokers(invokers, invocation); err != nil {
		logger.Errorf("Failed to invoke the method %v in the service %v, wait for retry in background. Ignored exception: %v.\n",
LaurenceLiZhixin's avatar
LaurenceLiZhixin committed
			invocation.MethodName(), invoker.GetURL().Service(), err)
匠尘's avatar
匠尘 committed
		return &protocol.RPCResult{}
	}
georgehao's avatar
georgehao committed
	// Get the service loadbalance config
LaurenceLiZhixin's avatar
LaurenceLiZhixin committed
	url := invokers[0].GetURL()
匠尘's avatar
匠尘 committed
	lb := url.GetParam(constant.LOADBALANCE_KEY, constant.DEFAULT_LOADBALANCE)
georgehao's avatar
georgehao committed
	// Get the service method loadbalance config if have
	methodName := invocation.MethodName()
匠尘's avatar
匠尘 committed
	if v := url.GetMethodParam(methodName, constant.LOADBALANCE_KEY, ""); v != "" {
		lb = v
	}

	loadBalance := extension.GetLoadbalance(lb)
AlexStocks's avatar
AlexStocks committed
	invoked := make([]protocol.Invoker, 0, len(invokers))
	ivk := invoker.doSelect(loadBalance, invocation, invokers, invoked)
georgehao's avatar
georgehao committed
	// DO INVOKE
	result := ivk.Invoke(ctx, invocation)
匠尘's avatar
匠尘 committed
	if result.Error() != nil {
		invoker.once.Do(func() {
			invoker.taskList = queue.New(invoker.failbackTasks)
flycash's avatar
flycash committed
			go invoker.process(ctx)
		})

		taskLen := invoker.taskList.Len()
		if taskLen >= invoker.failbackTasks {
			logger.Warnf("tasklist is too full > %d.\n", taskLen)
			return &protocol.RPCResult{}
		}

		timerTask := newRetryTimerTask(loadBalance, invocation, invokers, ivk)
		invoker.taskList.Put(timerTask)

		logger.Errorf("Failback to invoke the method %v in the service %v, wait for retry in background. Ignored exception: %v.\n",
			methodName, url.Service(), result.Error().Error())
匠尘's avatar
匠尘 committed
		// ignore
randy's avatar
randy committed
		return &protocol.RPCResult{}
匠尘's avatar
匠尘 committed
	}
	return result
}

func (invoker *failbackClusterInvoker) Destroy() {
	invoker.baseClusterInvoker.Destroy()
	if invoker.ticker != nil {
		invoker.ticker.Stop()
	}
匠尘's avatar
匠尘 committed
}

type retryTimerTask struct {
	loadbalance cluster.LoadBalance
	invocation  protocol.Invocation
	invokers    []protocol.Invoker
匠尘's avatar
匠尘 committed
	retries     int64
匠尘's avatar
匠尘 committed
}

func newRetryTimerTask(loadbalance cluster.LoadBalance, invocation protocol.Invocation, invokers []protocol.Invoker,
	lastInvoker protocol.Invoker) *retryTimerTask {
匠尘's avatar
匠尘 committed
	return &retryTimerTask{
		loadbalance: loadbalance,
		invocation:  invocation,
		invokers:    invokers,
		lastInvoker: lastInvoker,