Skip to content
Snippets Groups Projects
Commit 317f3c1b authored by yì jí's avatar yì jí Committed by GitHub
Browse files

Merge pull request #65 from zonghaishang/loadbalance_leastactive

NewLeastActiveLoadBalance
parents ce992ef8 89c7425b
No related branches found
No related tags found
No related merge requests found
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// @author yiji@apache.org
package loadbalance
import (
"math/rand"
)
import (
"github.com/apache/dubbo-go/cluster"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/protocol"
)
const (
LeastActive = "leastactive"
)
func init() {
extension.SetLoadbalance(LeastActive, NewLeastActiveLoadBalance)
}
type leastActiveLoadBalance struct {
}
func NewLeastActiveLoadBalance() cluster.LoadBalance {
return &leastActiveLoadBalance{}
}
func (lb *leastActiveLoadBalance) Select(invokers []protocol.Invoker, invocation protocol.Invocation) protocol.Invoker {
count := len(invokers)
if count == 0 {
return nil
}
if count == 1 {
return invokers[0]
}
var (
leastActive int32 = -1 // The least active value of all invokers
totalWeight int64 = 0 // The number of invokers having the same least active value (LEAST_ACTIVE)
firstWeight int64 = 0 // Initial value, used for comparision
leastIndexes = make([]int, count) // The index of invokers having the same least active value (LEAST_ACTIVE)
leastCount = 0 // The number of invokers having the same least active value (LEAST_ACTIVE)
sameWeight = true // Every invoker has the same weight value?
)
for i := 0; i < count; i++ {
invoker := invokers[i]
// Active number
active := protocol.GetStatus(invoker.GetUrl(), invocation.MethodName()).GetActive()
// current weight (maybe in warmUp)
weight := GetWeight(invoker, invocation)
// There are smaller active services
if leastActive == -1 || active < leastActive {
leastActive = active
leastIndexes[0] = i
leastCount = 1 // next available leastIndex offset
totalWeight = weight
firstWeight = weight
sameWeight = true
} else if active == leastActive {
leastIndexes[leastCount] = i
totalWeight += weight
leastCount++
if sameWeight && (i > 0) && weight != firstWeight {
sameWeight = false
}
}
}
if leastCount == 1 {
return invokers[0]
}
if !sameWeight && totalWeight > 0 {
offsetWeight := rand.Int63n(totalWeight) + 1
for i := 0; i < leastCount; i++ {
leastIndex := leastIndexes[i]
offsetWeight -= GetWeight(invokers[i], invocation)
if offsetWeight <= 0 {
return invokers[leastIndex]
}
}
}
index := leastIndexes[rand.Intn(leastCount)]
return invokers[index]
}
package loadbalance
import (
"context"
"fmt"
"testing"
)
import (
"github.com/stretchr/testify/assert"
)
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/protocol"
"github.com/apache/dubbo-go/protocol/invocation"
)
func TestLeastActiveSelect(t *testing.T) {
loadBalance := NewLeastActiveLoadBalance()
var invokers []protocol.Invoker
url, _ := common.NewURL(context.TODO(), "dubbo://192.168.1.0:20000/org.apache.demo.HelloService")
invokers = append(invokers, protocol.NewBaseInvoker(url))
i := loadBalance.Select(invokers, &invocation.RPCInvocation{})
assert.True(t, i.GetUrl().URLEqual(url))
for i := 1; i < 10; i++ {
url, _ := common.NewURL(context.TODO(), fmt.Sprintf("dubbo://192.168.1.%v:20000/org.apache.demo.HelloService", i))
invokers = append(invokers, protocol.NewBaseInvoker(url))
}
loadBalance.Select(invokers, &invocation.RPCInvocation{})
}
func TestLeastActiveByWeight(t *testing.T) {
loadBalance := NewLeastActiveLoadBalance()
var invokers []protocol.Invoker
loop := 3
for i := 1; i <= loop; i++ {
url, _ := common.NewURL(context.TODO(), fmt.Sprintf("test%v://192.168.1.%v:20000/org.apache.demo.HelloService?weight=%v", i, i, i))
invokers = append(invokers, protocol.NewBaseInvoker(url))
}
inv := new(invocation.RPCInvocation)
inv.SetMethod("test")
protocol.BeginCount(invokers[2].GetUrl(), inv.MethodName())
loop = 10000
var (
firstCount int
secondCount int
)
for i := 1; i <= loop; i++ {
invoker := loadBalance.Select(invokers, inv)
if invoker.GetUrl().Protocol == "test1" {
firstCount++
} else if invoker.GetUrl().Protocol == "test2" {
secondCount++
}
}
assert.Equal(t, firstCount+secondCount, loop)
}
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// @author yiji@apache.org
package impl
import (
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/filter"
"github.com/apache/dubbo-go/protocol"
)
const active = "active"
func init() {
extension.SetFilter(active, GetActiveFilter)
}
type ActiveFilter struct {
}
func (ef *ActiveFilter) Invoke(invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
logger.Infof("invoking active filter. %v,%v", invocation.MethodName(), len(invocation.Arguments()))
protocol.BeginCount(invoker.GetUrl(), invocation.MethodName())
return invoker.Invoke(invocation)
}
func (ef *ActiveFilter) OnResponse(result protocol.Result, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
protocol.EndCount(invoker.GetUrl(), invocation.MethodName())
return result
}
func GetActiveFilter() filter.Filter {
return &ActiveFilter{}
}
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// @author yiji@apache.org
package protocol
import (
"sync"
"sync/atomic"
)
import (
"github.com/apache/dubbo-go/common"
)
var (
methodStatistics sync.Map // url -> { methodName : RpcStatus}
)
type RpcStatus struct {
active int32
}
func (rpc *RpcStatus) GetActive() int32 {
return atomic.LoadInt32(&rpc.active)
}
func GetStatus(url common.URL, methodName string) *RpcStatus {
identifier := url.Key()
methodMap, found := methodStatistics.Load(identifier)
if !found {
methodMap = &sync.Map{}
methodStatistics.Store(identifier, methodMap)
}
methodActive := methodMap.(*sync.Map)
rpcStatus, found := methodActive.Load(methodName)
if !found {
rpcStatus = &RpcStatus{}
methodActive.Store(methodName, rpcStatus)
}
status := rpcStatus.(*RpcStatus)
return status
}
func BeginCount(url common.URL, methodName string) {
beginCount0(GetStatus(url, methodName))
}
func EndCount(url common.URL, methodName string) {
endCount0(GetStatus(url, methodName))
}
// private methods
func beginCount0(rpcStatus *RpcStatus) {
atomic.AddInt32(&rpcStatus.active, 1)
}
func endCount0(rpcStatus *RpcStatus) {
atomic.AddInt32(&rpcStatus.active, -1)
}
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment