From 83409cab07027fbe94e1b33fcd454263c6a2b000 Mon Sep 17 00:00:00 2001
From: zonghaishang <yiji@apache.org>
Date: Mon, 27 May 2019 14:20:53 +0800
Subject: [PATCH] register NewLeastActiveLoadBalance

---
 cluster/loadbalance/least_active.go | 100 ++++++++++++++++++++++++++++
 filter/RpcStatus.go                 |  68 +++++++++++++++++++
 filter/impl/active_filter.go        |  50 ++++++++++++++
 3 files changed, 218 insertions(+)
 create mode 100644 cluster/loadbalance/least_active.go
 create mode 100644 filter/RpcStatus.go
 create mode 100644 filter/impl/active_filter.go

diff --git a/cluster/loadbalance/least_active.go b/cluster/loadbalance/least_active.go
new file mode 100644
index 000000000..588793578
--- /dev/null
+++ b/cluster/loadbalance/least_active.go
@@ -0,0 +1,100 @@
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// @author yiji@apache.org
+package loadbalance
+
+import (
+	"github.com/dubbo/go-for-apache-dubbo/cluster"
+	"github.com/dubbo/go-for-apache-dubbo/common/extension"
+	"github.com/dubbo/go-for-apache-dubbo/filter"
+	"github.com/dubbo/go-for-apache-dubbo/protocol"
+	"math/rand"
+)
+
+const (
+	leastActive = "leastactive"
+)
+
+func init() {
+	extension.SetLoadbalance(leastActive, NewLeastActiveLoadBalance)
+}
+
+type leastActiveLoadBalance struct {
+}
+
+func NewLeastActiveLoadBalance() cluster.LoadBalance {
+	return &leastActiveLoadBalance{}
+}
+
+func (lb *leastActiveLoadBalance) Select(invokers []protocol.Invoker, invocation protocol.Invocation) protocol.Invoker {
+
+	count := len(invokers)
+	if invokers == nil || count == 0 {
+		return nil
+	}
+	if count == 1 {
+		return invokers[0]
+	}
+
+	var (
+		leastActive  int32 = -1                 // The least active value of all invokers
+		totalWeight  int64 = 0                  // The number of invokers having the same least active value (leastActive)
+		firstWeight  int64 = 0                  // Initial value, used for comparision
+		leastIndexes       = make([]int, count) // The index of invokers having the same least active value (leastActive)
+		leastCount         = 0                  // The number of invokers having the same least active value (leastActive)
+		sameWeight         = true               // Every invoker has the same weight value?
+	)
+
+	for i := 0; i < count; i++ {
+		invoker := invokers[i]
+		// Active number
+		active := filter.GetStatus(invoker.GetUrl(), invocation.MethodName()).GetActive()
+		// current weight (maybe in warmUp)
+		weight := GetWeight(invoker, invocation)
+		// There are smaller active services
+		if leastActive == -1 || active < leastActive {
+			leastActive = active
+			leastIndexes[0] = i
+			leastCount = 1 // next available leastIndex offset
+			totalWeight = weight
+			firstWeight = weight
+			sameWeight = true
+		} else if active == leastActive {
+			leastIndexes[leastCount] = i
+			totalWeight += weight
+			leastCount++
+
+			if sameWeight && (i > 0) && weight != firstWeight {
+				sameWeight = false
+			}
+		}
+	}
+
+	if leastCount == 1 {
+		return invokers[0]
+	}
+
+	if !sameWeight && totalWeight > 0 {
+		offsetWeight := rand.Int63n(totalWeight) + 1
+		for i := 0; i < leastCount; i++ {
+			leastIndex := leastIndexes[i]
+			offsetWeight -= GetWeight(invokers[i], invocation)
+			if offsetWeight <= 0 {
+				return invokers[leastIndex]
+			}
+		}
+	}
+
+	index := leastIndexes[rand.Intn(leastCount)]
+	return invokers[index]
+}
diff --git a/filter/RpcStatus.go b/filter/RpcStatus.go
new file mode 100644
index 000000000..2148594da
--- /dev/null
+++ b/filter/RpcStatus.go
@@ -0,0 +1,68 @@
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// @author yiji@apache.org
+package filter
+
+import (
+	"github.com/dubbo/go-for-apache-dubbo/common"
+	"sync"
+	"sync/atomic"
+)
+
+var (
+	methodStatistics = sync.Map{} // url -> { methodName : RpcStatus}
+)
+
+type RpcStatus struct {
+	active int32
+}
+
+func (rpc *RpcStatus) GetActive() int32 {
+	return atomic.LoadInt32(&rpc.active)
+}
+
+func GetStatus(url common.URL, methodName string) *RpcStatus {
+	identity := url.Key()
+	methodMap, found := methodStatistics.Load(identity)
+	if !found {
+		methodMap = sync.Map{}
+		methodStatistics.Store(identity, methodMap)
+	}
+
+	methodActive := methodMap.(sync.Map)
+	rpcStatus, found := methodActive.Load(methodName)
+	if !found {
+		rpcStatus = RpcStatus{}
+		methodActive.Store(methodName, rpcStatus)
+	}
+
+	status := rpcStatus.(RpcStatus)
+	return &status
+}
+
+func BeginCount(url common.URL, methodName string) {
+	beginCount0(GetStatus(url, methodName))
+}
+
+func EndCount(url common.URL, methodName string) {
+	endCount0(GetStatus(url, methodName))
+}
+
+// private methods
+func beginCount0(rpcStatus *RpcStatus) {
+	atomic.AddInt32(&rpcStatus.active, 1)
+}
+
+func endCount0(rpcStatus *RpcStatus) {
+	atomic.AddInt32(&rpcStatus.active, -1)
+}
diff --git a/filter/impl/active_filter.go b/filter/impl/active_filter.go
new file mode 100644
index 000000000..eaaaeabd2
--- /dev/null
+++ b/filter/impl/active_filter.go
@@ -0,0 +1,50 @@
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// @author yiji@apache.org
+package impl
+
+import (
+	"github.com/dubbo/go-for-apache-dubbo/common/logger"
+)
+
+import (
+	"github.com/dubbo/go-for-apache-dubbo/common/extension"
+	"github.com/dubbo/go-for-apache-dubbo/filter"
+	"github.com/dubbo/go-for-apache-dubbo/protocol"
+)
+
+const active = "active"
+
+func init() {
+	extension.SetFilter(active, GetActiveFilter)
+}
+
+type ActiveFilter struct {
+}
+
+func (ef *ActiveFilter) Invoke(invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
+	logger.Infof("invoking active filter. %v,%v", invocation.MethodName(), len(invocation.Arguments()))
+
+	filter.BeginCount(invoker.GetUrl(), invocation.MethodName())
+	return invoker.Invoke(invocation)
+}
+
+func (ef *ActiveFilter) OnResponse(result protocol.Result, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
+
+	filter.EndCount(invoker.GetUrl(), invocation.MethodName())
+	return result
+}
+
+func GetActiveFilter() filter.Filter {
+	return &ActiveFilter{}
+}
-- 
GitLab