Skip to content
Snippets Groups Projects
failback_cluster_invoker.go 5.72 KiB
Newer Older
匠尘's avatar
匠尘 committed
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package cluster_impl

import (
	"container/list"
	perrors "github.com/pkg/errors"
	"sync"
	"time"
)

import (
	"github.com/apache/dubbo-go/cluster"
	"github.com/apache/dubbo-go/common/constant"
	"github.com/apache/dubbo-go/common/extension"
	"github.com/apache/dubbo-go/protocol"
)

type failbackClusterInvoker struct {
	baseClusterInvoker
}

var (
	retries       int64
	failbackTasks int64
	ticker        *time.Ticker
	once          sync.Once
	lock          sync.Mutex
	taskList      *Queue
)

func newFailbackClusterInvoker(directory cluster.Directory) protocol.Invoker {
	invoker := &failbackClusterInvoker{
		baseClusterInvoker: newBaseClusterInvoker(directory),
	}
	retriesConfig := invoker.GetUrl().GetParamInt(constant.RETRIES_KEY, constant.DEFAULT_FAILBACK_TIMES)
	if retriesConfig <= 0 {
		retriesConfig = constant.DEFAULT_FAILBACK_TIMES
	}
	failbackTasksConfig := invoker.GetUrl().GetParamInt(constant.FAIL_BACK_TASKS_KEY, constant.DEFAULT_FAILBACK_TASKS)
	if failbackTasksConfig <= 0 {
		failbackTasksConfig = constant.DEFAULT_FAILBACK_TASKS
	}
	retries = retriesConfig
	failbackTasks = failbackTasksConfig
	return invoker
}

func (invoker *failbackClusterInvoker) Invoke(invocation protocol.Invocation) protocol.Result {

	invokers := invoker.directory.List(invocation)
	err := invoker.checkInvokers(invokers, invocation)

	if err != nil {
		// add retry ticker task
		perrors.Errorf("Failed to invoke the method %v in the service %v, wait for retry in background. Ignored exception: %v.",
			invocation.MethodName(), invoker.GetUrl().Service(), err)
		return &protocol.RPCResult{}
	}
	url := invokers[0].GetUrl()

	methodName := invocation.MethodName()
	//Get the service loadbalance config
	lb := url.GetParam(constant.LOADBALANCE_KEY, constant.DEFAULT_LOADBALANCE)

	//Get the service method loadbalance config if have
	if v := url.GetMethodParam(methodName, constant.LOADBALANCE_KEY, ""); v != "" {
		lb = v
	}
	loadbalance := extension.GetLoadbalance(lb)

	invoked := []protocol.Invoker{}
	var result protocol.Result

	ivk := invoker.doSelect(loadbalance, invocation, invokers, invoked)
	invoked = append(invoked, ivk)
	//DO INVOKE
	result = ivk.Invoke(invocation)

	if result.Error() != nil {
		// add retry ticker task
		addFailed(loadbalance, invocation, invokers, invoker)
		perrors.Errorf("Failback to invoke the method %v in the service %v, wait for retry in background. Ignored exception: %v.",
			methodName, invoker.GetUrl().Service(), result.Error().Error())
		// ignore
		return &protocol.RPCResult{}
	}

	return result
}

func (invoker *failbackClusterInvoker) Destroy() {
	//this is must atom operation
	if invoker.destroyed.CAS(false, true) {
		invoker.directory.Destroy()
	}
	// stop ticker
	ticker.Stop()
}

func addFailed(balance cluster.LoadBalance, invocation protocol.Invocation, invokers []protocol.Invoker,
	invoker *failbackClusterInvoker) {
	initSingleTickerTaskInstance()
	// init one retryTimerTask
	timerTask := newRetryTimerTask(balance, invocation, invokers, invoker, retries, 5)
	taskList.push(timerTask)
	// process ticker task
	go func() {
		<-ticker.C
		value := taskList.pop()
		if value == nil {
			return
		}

		retryTask := value.(retryTimerTask)
		invoked := []protocol.Invoker{}
		invoked = append(invoked, retryTask.lastInvoker)
		retryInvoker := invoker.doSelect(retryTask.loadbalance, retryTask.invocation, retryTask.invokers,
			invoked)
		var result protocol.Result
		result = retryInvoker.Invoke(retryTask.invocation)
		if result.Error() != nil {
			perrors.Errorf("Failed retry to invoke the method %v in the service %v, wait again. The exception: %v.",
				invocation.MethodName(), invoker.GetUrl().Service(), result.Error().Error())
			retryTask.retries++
			if retryTask.retries > retries {
				perrors.Errorf("Failed retry times exceed threshold (%v), We have to abandon, invocation-> %v",
					retries, invocation)
			} else {
				taskList.push(retryTask)
			}
		}
	}()
}

func initSingleTickerTaskInstance() {
	once.Do(func() {
		newTickerTask()
	})
}

func newTickerTask() {
	ticker = time.NewTicker(time.Second * 1)
	taskList = newQueue()
}

type retryTimerTask struct {
	loadbalance cluster.LoadBalance
	invocation  protocol.Invocation
	invokers    []protocol.Invoker
	lastInvoker *failbackClusterInvoker
	retries     int64
	tick        int64
}

func newRetryTimerTask(loadbalance cluster.LoadBalance, invocation protocol.Invocation, invokers []protocol.Invoker,
	lastInvoker *failbackClusterInvoker, retries int64, tick int64) *retryTimerTask {
	return &retryTimerTask{
		loadbalance: loadbalance,
		invocation:  invocation,
		invokers:    invokers,
		lastInvoker: lastInvoker,
		retries:     retries,
		tick:        tick,
	}
}

type Queue struct {
	data *list.List
}

func newQueue() *Queue {
	q := new(Queue)
	q.data = list.New()
	return q
}

func (q *Queue) push(v interface{}) {
	defer lock.Unlock()
	lock.Lock()
	q.data.PushFront(v)
}

func (q *Queue) pop() interface{} {
	defer lock.Unlock()
	lock.Lock()
	iter := q.data.Back()
	v := iter.Value
	q.data.Remove(iter)
	return v
}