Skip to content
Snippets Groups Projects
forking_cluster_invoker.go 3.01 KiB
Newer Older
xujianhai666's avatar
xujianhai666 committed
/*
邹毅贤's avatar
邹毅贤 committed
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
xujianhai666's avatar
xujianhai666 committed

package cluster_impl

import (
flycash's avatar
flycash committed
	"context"
xujianhai666's avatar
xujianhai666 committed
	"fmt"
	"time"
)

import (
	"github.com/Workiva/go-datastructures/queue"
)

import (
	"github.com/apache/dubbo-go/cluster"
	"github.com/apache/dubbo-go/common/constant"
xujianhai666's avatar
xujianhai666 committed
	"github.com/apache/dubbo-go/common/logger"
xujianhai666's avatar
xujianhai666 committed
	"github.com/apache/dubbo-go/protocol"
)

type forkingClusterInvoker struct {
	baseClusterInvoker
}

func newForkingClusterInvoker(directory cluster.Directory) protocol.Invoker {
	return &forkingClusterInvoker{
		baseClusterInvoker: newBaseClusterInvoker(directory),
	}
}

flycash's avatar
flycash committed
func (invoker *forkingClusterInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result {
haohongfan's avatar
haohongfan committed
	if err := invoker.checkWhetherDestroyed(); err != nil {
xujianhai666's avatar
xujianhai666 committed
		return &protocol.RPCResult{Err: err}
	}

	invokers := invoker.directory.List(invocation)
haohongfan's avatar
haohongfan committed
	if err := invoker.checkInvokers(invokers, invocation); err != nil {
xujianhai666's avatar
xujianhai666 committed
		return &protocol.RPCResult{Err: err}
	}

	var selected []protocol.Invoker
	forks := invoker.GetUrl().GetParamByIntValue(constant.FORKS_KEY, constant.DEFAULT_FORKS)
xujianhai666's avatar
xujianhai666 committed
	timeouts := invoker.GetUrl().GetParamInt(constant.TIMEOUT_KEY, constant.DEFAULT_TIMEOUT)
	if forks < 0 || forks > len(invokers) {
		selected = invokers
	} else {
haohongfan's avatar
haohongfan committed
		loadBalance := getLoadBalance(invokers[0], invocation)
xujianhai666's avatar
xujianhai666 committed
		for i := 0; i < forks; i++ {
haohongfan's avatar
haohongfan committed
			if ivk := invoker.doSelect(loadBalance, invocation, invokers, selected); ivk != nil {
xujianhai666's avatar
xujianhai666 committed
				selected = append(selected, ivk)
			}
		}
	}

	resultQ := queue.New(1)
	for _, ivk := range selected {
		go func(k protocol.Invoker) {
flycash's avatar
flycash committed
			result := k.Invoke(ctx, invocation)
haohongfan's avatar
haohongfan committed
			if err := resultQ.Put(result); err != nil {
xujianhai666's avatar
xujianhai666 committed
				logger.Errorf("resultQ put failed with exception: %v.\n", err)
			}
xujianhai666's avatar
xujianhai666 committed
		}(ivk)
	}

	rsps, err := resultQ.Poll(1, time.Millisecond*time.Duration(timeouts))
	if err != nil {
		return &protocol.RPCResult{
AlexStocks's avatar
AlexStocks committed
			Err: fmt.Errorf("failed to forking invoke provider %v, "+
				"but no luck to perform the invocation. Last error is: %v", selected, err),
		}
xujianhai666's avatar
xujianhai666 committed
	}
	if len(rsps) == 0 {
AlexStocks's avatar
AlexStocks committed
		return &protocol.RPCResult{Err: fmt.Errorf("failed to forking invoke provider %v, but no resp", selected)}
xujianhai666's avatar
xujianhai666 committed
	result, ok := rsps[0].(protocol.Result)
	if !ok {
AlexStocks's avatar
AlexStocks committed
		return &protocol.RPCResult{Err: fmt.Errorf("failed to forking invoke provider %v, but not legal resp", selected)}
xujianhai666's avatar
xujianhai666 committed
	}
	return result
}