/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package cluster_impl import ( "context" "fmt" "strconv" ) import ( perrors "github.com/pkg/errors" ) import ( "github.com/apache/dubbo-go/cluster" "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/logger" "github.com/apache/dubbo-go/protocol" ) type failoverClusterInvoker struct { baseClusterInvoker } func newFailoverClusterInvoker(directory cluster.Directory) protocol.Invoker { return &failoverClusterInvoker{ baseClusterInvoker: newBaseClusterInvoker(directory), } } // nolint func (invoker *failoverClusterInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result { var ( result protocol.Result invoked []protocol.Invoker providers []string ivk protocol.Invoker ) invokers := invoker.directory.List(invocation) if err := invoker.checkInvokers(invokers, invocation); err != nil { return &protocol.RPCResult{Err: err} } methodName := invocation.MethodName() retries := getRetries(invokers, methodName) loadBalance := getLoadBalance(invokers[0], invocation) for i := 0; i <= retries; i++ { //Reselect before retry to avoid a change of candidate `invokers`. //NOTE: if `invokers` changed, then `invoked` also lose accuracy. if i > 0 { if err := invoker.checkWhetherDestroyed(); err != nil { return &protocol.RPCResult{Err: err} } invokers = invoker.directory.List(invocation) if err := invoker.checkInvokers(invokers, invocation); err != nil { return &protocol.RPCResult{Err: err} } } ivk = invoker.doSelect(loadBalance, invocation, invokers, invoked) if ivk == nil { continue } invoked = append(invoked, ivk) //DO INVOKE result = ivk.Invoke(ctx, invocation) if result.Error() != nil { providers = append(providers, ivk.GetUrl().Key()) continue } return result } ip := common.GetLocalIp() invokerSvc := invoker.GetUrl().Service() invokerUrl := invoker.directory.GetUrl() if ivk == nil { logger.Errorf("Failed to invoke the method %s of the service %s .No provider is available.", methodName, invokerSvc) return &protocol.RPCResult{ Err: perrors.Errorf("Failed to invoke the method %s of the service %s .No provider is available because can't connect server.", methodName, invokerSvc), } } return &protocol.RPCResult{ Err: perrors.Wrap(result.Error(), fmt.Sprintf("Failed to invoke the method %v in the service %v. "+ "Tried %v times of the providers %v (%v/%v)from the registry %v on the consumer %v using the dubbo version %v. "+ "Last error is %+v.", methodName, invokerSvc, retries, providers, len(providers), len(invokers), invokerUrl, ip, constant.Version, result.Error().Error()), )} } func getRetries(invokers []protocol.Invoker, methodName string) int { if len(invokers) <= 0 { return constant.DEFAULT_RETRIES_INT } url := invokers[0].GetUrl() //get reties retriesConfig := url.GetParam(constant.RETRIES_KEY, constant.DEFAULT_RETRIES) //Get the service method loadbalance config if have if v := url.GetMethodParam(methodName, constant.RETRIES_KEY, ""); len(v) != 0 { retriesConfig = v } retries, err := strconv.Atoi(retriesConfig) if err != nil || retries < 0 { logger.Error("Your retries config is invalid,pls do a check. And will use the default retries configuration instead.") retries = constant.DEFAULT_RETRIES_INT } if retries > len(invokers) { retries = len(invokers) } return retries }