Skip to content
Snippets Groups Projects
failover_cluster_invoker.go 4.17 KiB
Newer Older
AlexStocks's avatar
AlexStocks committed
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
fangyincheng's avatar
fangyincheng committed

vito.he's avatar
vito.he committed
package cluster_impl
vito.he's avatar
vito.he committed

import (
flycash's avatar
flycash committed
	"context"
cvictory's avatar
cvictory committed
	"fmt"
vito.he's avatar
vito.he committed
)
vito.he's avatar
vito.he committed
import (
	perrors "github.com/pkg/errors"
)

vito.he's avatar
vito.he committed
import (
cvictory's avatar
cvictory committed
	"github.com/apache/dubbo-go/common"
	"github.com/apache/dubbo-go/common/constant"
vito.he's avatar
vito.he committed
	"github.com/apache/dubbo-go/common/logger"
vito.he's avatar
vito.he committed
)

type failoverClusterInvoker struct {
	baseClusterInvoker
}

func newFailoverClusterInvoker(directory cluster.Directory) protocol.Invoker {
vito.he's avatar
vito.he committed
	return &failoverClusterInvoker{
vito.he's avatar
vito.he committed
		baseClusterInvoker: newBaseClusterInvoker(directory),
flycash's avatar
flycash committed
func (invoker *failoverClusterInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result {
haohongfan's avatar
haohongfan committed
	var (
		result    protocol.Result
		invoked   []protocol.Invoker
		providers []string
haohongfan's avatar
haohongfan committed
	)
vito.he's avatar
vito.he committed
	invokers := invoker.directory.List(invocation)
haohongfan's avatar
haohongfan committed
	if err := invoker.checkInvokers(invokers, invocation); err != nil {
		return &protocol.RPCResult{Err: err}
	}

	methodName := invocation.MethodName()
haohongfan's avatar
haohongfan committed
	retries := getRetries(invokers, methodName)
	loadBalance := getLoadBalance(invokers[0], invocation)
	for i := 0; i <= retries; i++ {
		//Reselect before retry to avoid a change of candidate `invokers`.
		//NOTE: if `invokers` changed, then `invoked` also lose accuracy.
		if i > 0 {
haohongfan's avatar
haohongfan committed
			if err := invoker.checkWhetherDestroyed(); err != nil {
				return &protocol.RPCResult{Err: err}
			}
			invokers = invoker.directory.List(invocation)
haohongfan's avatar
haohongfan committed
			if err := invoker.checkInvokers(invokers, invocation); err != nil {
				return &protocol.RPCResult{Err: err}
			}
		}
		ivk = invoker.doSelect(loadBalance, invocation, invokers, invoked)
		if ivk == nil {
			continue
		}
		invoked = append(invoked, ivk)
		//DO INVOKE
flycash's avatar
flycash committed
		result = ivk.Invoke(ctx, invocation)
		if result.Error() != nil {
			providers = append(providers, ivk.GetUrl().Key())
			continue
		}
haohongfan's avatar
haohongfan committed
		return result
	ip := common.GetLocalIp()
haohongfan's avatar
haohongfan committed
	invokerSvc := invoker.GetUrl().Service()
	invokerUrl := invoker.directory.GetUrl()
	if ivk == nil {
		logger.Errorf("Failed to invoke the method %s of the service %s .No provider is available.", methodName, invokerSvc)
		return &protocol.RPCResult{
			Err: perrors.Errorf("Failed to invoke the method %s of the service %s .No provider is available because can't connect server.",
				methodName, invokerSvc),
		}
	}

haohongfan's avatar
haohongfan committed
	return &protocol.RPCResult{
cvictory's avatar
cvictory committed
		Err: perrors.Wrap(result.Error(), fmt.Sprintf("Failed to invoke the method %v in the service %v. "+
			"Tried %v times of the providers %v (%v/%v)from the registry %v on the consumer %v using the dubbo version %v. "+
			"Last error is %+v.", methodName, invokerSvc, retries, providers, len(providers), len(invokers),
			invokerUrl, ip, constant.Version, result.Error().Error()),
haohongfan's avatar
haohongfan committed
		)}
}

func getRetries(invokers []protocol.Invoker, methodName string) int {
	if len(invokers) <= 0 {
		return constant.DEFAULT_RETRIES_INT
	}

	url := invokers[0].GetUrl()
	//get reties
	retriesConfig := url.GetParam(constant.RETRIES_KEY, constant.DEFAULT_RETRIES)
	//Get the service method loadbalance config if have
	if v := url.GetMethodParam(methodName, constant.RETRIES_KEY, ""); len(v) != 0 {
		retriesConfig = v
	}

	retries, err := strconv.Atoi(retriesConfig)
	if err != nil || retries < 0 {
		logger.Error("Your retries config is invalid,pls do a check. And will use the default retries configuration instead.")
		retries = constant.DEFAULT_RETRIES_INT
	}

	if retries > len(invokers) {
		retries = len(invokers)
	}
	return retries