From 5d1797b19a7a277debcf5da0eaad35528645b729 Mon Sep 17 00:00:00 2001
From: Ooo0oO0o0oO <907709476@qq.com>
Date: Sun, 19 Jan 2020 11:24:40 +0800
Subject: [PATCH] supplement rpc stats

---
 cluster/loadbalance/least_active.go      |   2 +-
 filter/filter_impl/active_filter.go      |  18 +++-
 filter/filter_impl/active_filter_test.go |  68 ++++++++++++
 protocol/rpc_status.go                   |  90 ++++++++++++++--
 protocol/rpc_status_test.go              | 126 +++++++++++++++++++++++
 5 files changed, 295 insertions(+), 9 deletions(-)
 create mode 100644 filter/filter_impl/active_filter_test.go
 create mode 100644 protocol/rpc_status_test.go

diff --git a/cluster/loadbalance/least_active.go b/cluster/loadbalance/least_active.go
index aa69f3cc2..a1e851669 100644
--- a/cluster/loadbalance/least_active.go
+++ b/cluster/loadbalance/least_active.go
@@ -63,7 +63,7 @@ func (lb *leastActiveLoadBalance) Select(invokers []protocol.Invoker, invocation
 	for i := 0; i < count; i++ {
 		invoker := invokers[i]
 		// Active number
-		active := protocol.GetStatus(invoker.GetUrl(), invocation.MethodName()).GetActive()
+		active := protocol.GetMethodStatus(invoker.GetUrl(), invocation.MethodName()).GetActive()
 		// current weight (maybe in warmUp)
 		weight := GetWeight(invoker, invocation)
 		// There are smaller active services
diff --git a/filter/filter_impl/active_filter.go b/filter/filter_impl/active_filter.go
index b12f77632..d50679a2b 100644
--- a/filter/filter_impl/active_filter.go
+++ b/filter/filter_impl/active_filter.go
@@ -17,14 +17,22 @@
 
 package filter_impl
 
+import (
+	"strconv"
+)
+
 import (
 	"github.com/apache/dubbo-go/common/extension"
 	"github.com/apache/dubbo-go/common/logger"
 	"github.com/apache/dubbo-go/filter"
 	"github.com/apache/dubbo-go/protocol"
+	invocation2 "github.com/apache/dubbo-go/protocol/invocation"
 )
 
-const active = "active"
+const (
+	active                  = "active"
+	dubbo_invoke_start_time = "dubbo_invoke_start_time"
+)
 
 func init() {
 	extension.SetFilter(active, GetActiveFilter)
@@ -36,13 +44,19 @@ type ActiveFilter struct {
 func (ef *ActiveFilter) Invoke(invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
 	logger.Infof("invoking active filter. %v,%v", invocation.MethodName(), len(invocation.Arguments()))
 
+	invocation.(*invocation2.RPCInvocation).SetAttachments(dubbo_invoke_start_time, strconv.FormatInt(protocol.CurrentTimeMillis(), 10))
 	protocol.BeginCount(invoker.GetUrl(), invocation.MethodName())
 	return invoker.Invoke(invocation)
 }
 
 func (ef *ActiveFilter) OnResponse(result protocol.Result, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
 
-	protocol.EndCount(invoker.GetUrl(), invocation.MethodName())
+	startTime, err := strconv.ParseInt(invocation.(*invocation2.RPCInvocation).AttachmentsByKey(dubbo_invoke_start_time, "0"), 10, 64)
+	if err != nil {
+		panic("as")
+	}
+	elapsed := protocol.CurrentTimeMillis() - startTime
+	protocol.EndCount(invoker.GetUrl(), invocation.MethodName(), elapsed, result.Error() == nil)
 	return result
 }
 
diff --git a/filter/filter_impl/active_filter_test.go b/filter/filter_impl/active_filter_test.go
new file mode 100644
index 000000000..b42c47314
--- /dev/null
+++ b/filter/filter_impl/active_filter_test.go
@@ -0,0 +1,68 @@
+package filter_impl
+
+import (
+	"context"
+	"errors"
+	"strconv"
+	"testing"
+)
+
+import (
+	"github.com/golang/mock/gomock"
+	"github.com/stretchr/testify/assert"
+)
+
+import (
+	"github.com/apache/dubbo-go/common"
+	"github.com/apache/dubbo-go/protocol"
+	"github.com/apache/dubbo-go/protocol/invocation"
+	"github.com/apache/dubbo-go/protocol/mock"
+)
+
+func TestActiveFilter_Invoke(t *testing.T) {
+	invoc := invocation.NewRPCInvocation("test", []interface{}{"OK"}, make(map[string]string, 0))
+	url, _ := common.NewURL(context.TODO(), "dubbo://192.168.10.10:20000/com.ikurento.user.UserProvider")
+	filter := ActiveFilter{}
+	ctrl := gomock.NewController(t)
+	defer ctrl.Finish()
+	invoker := mock.NewMockInvoker(ctrl)
+	invoker.EXPECT().Invoke(gomock.Any()).Return(nil)
+	invoker.EXPECT().GetUrl().Return(url).Times(1)
+	filter.Invoke(invoker, invoc)
+	assert.True(t, invoc.AttachmentsByKey(dubbo_invoke_start_time, "") != "")
+
+}
+
+func TestActiveFilter_OnResponse(t *testing.T) {
+	c := protocol.CurrentTimeMillis()
+	elapsed := 100
+	invoc := invocation.NewRPCInvocation("test", []interface{}{"OK"}, map[string]string{
+		dubbo_invoke_start_time: strconv.FormatInt(c-int64(elapsed), 10),
+	})
+	url, _ := common.NewURL(context.TODO(), "dubbo://192.168.10.10:20000/com.ikurento.user.UserProvider")
+	filter := ActiveFilter{}
+	ctrl := gomock.NewController(t)
+	defer ctrl.Finish()
+	invoker := mock.NewMockInvoker(ctrl)
+	invoker.EXPECT().GetUrl().Return(url).Times(1)
+	result := &protocol.RPCResult{
+		Err: errors.New("test"),
+	}
+	filter.OnResponse(result, invoker, invoc)
+	methodStatus := protocol.GetMethodStatus(url, "test")
+	urlStatus := protocol.GetUrlStatus(url)
+
+	assert.Equal(t, int32(1), methodStatus.GetTotal())
+	assert.Equal(t, int32(1), urlStatus.GetTotal())
+	assert.Equal(t, int32(-1), methodStatus.GetActive())
+	assert.Equal(t, int32(-1), urlStatus.GetActive())
+	assert.Equal(t, int32(1), methodStatus.GetFailed())
+	assert.Equal(t, int32(1), urlStatus.GetFailed())
+	assert.Equal(t, int32(1), methodStatus.GetSuccessiveRequestFailureCount())
+	assert.Equal(t, int32(1), urlStatus.GetSuccessiveRequestFailureCount())
+	assert.True(t, methodStatus.GetFailedElapsed() >= int64(elapsed))
+	assert.True(t, urlStatus.GetFailedElapsed() >= int64(elapsed))
+	assert.True(t, urlStatus.GetLastRequestFailedTimestamp() != int64(0))
+	assert.True(t, methodStatus.GetLastRequestFailedTimestamp() != int64(0))
+
+}
diff --git a/protocol/rpc_status.go b/protocol/rpc_status.go
index 3a8bfbc87..cbc81f116 100644
--- a/protocol/rpc_status.go
+++ b/protocol/rpc_status.go
@@ -20,6 +20,7 @@ package protocol
 import (
 	"sync"
 	"sync/atomic"
+	"time"
 )
 
 import (
@@ -28,17 +29,68 @@ import (
 
 var (
 	methodStatistics sync.Map // url -> { methodName : RpcStatus}
+	serviceStatistic sync.Map // url -> RpcStatus
 )
 
 type RpcStatus struct {
-	active int32
+	active                        int32
+	failed                        int32
+	total                         int32
+	totalElapsed                  int64
+	failedElapsed                 int64
+	maxElapsed                    int64
+	failedMaxElapsed              int64
+	succeededMaxElapsed           int64
+	successiveRequestFailureCount int32
+	lastRequestFailedTimestamp    int64
 }
 
 func (rpc *RpcStatus) GetActive() int32 {
 	return atomic.LoadInt32(&rpc.active)
 }
 
-func GetStatus(url common.URL, methodName string) *RpcStatus {
+func (rpc *RpcStatus) GetFailed() int32 {
+	return atomic.LoadInt32(&rpc.failed)
+}
+
+func (rpc *RpcStatus) GetTotal() int32 {
+	return atomic.LoadInt32(&rpc.total)
+}
+
+func (rpc *RpcStatus) GetTotalElapsed() int64 {
+	return atomic.LoadInt64(&rpc.totalElapsed)
+}
+
+func (rpc *RpcStatus) GetFailedElapsed() int64 {
+	return atomic.LoadInt64(&rpc.failedElapsed)
+}
+
+func (rpc *RpcStatus) GetMaxElapsed() int64 {
+	return atomic.LoadInt64(&rpc.maxElapsed)
+}
+
+func (rpc *RpcStatus) GetFailedMaxElapsed() int64 {
+	return atomic.LoadInt64(&rpc.failedMaxElapsed)
+}
+
+func (rpc *RpcStatus) GetSucceededMaxElapsed() int64 {
+	return atomic.LoadInt64(&rpc.succeededMaxElapsed)
+}
+
+func (rpc *RpcStatus) GetLastRequestFailedTimestamp() int64 {
+	return atomic.LoadInt64(&rpc.lastRequestFailedTimestamp)
+}
+
+func (rpc *RpcStatus) GetSuccessiveRequestFailureCount() int32 {
+	return atomic.LoadInt32(&rpc.successiveRequestFailureCount)
+}
+
+func GetUrlStatus(url common.URL) *RpcStatus {
+	rpcStatus, _ := serviceStatistic.LoadOrStore(url.Key(), &RpcStatus{})
+	return rpcStatus.(*RpcStatus)
+}
+
+func GetMethodStatus(url common.URL, methodName string) *RpcStatus {
 	identifier := url.Key()
 	methodMap, found := methodStatistics.Load(identifier)
 	if !found {
@@ -58,11 +110,13 @@ func GetStatus(url common.URL, methodName string) *RpcStatus {
 }
 
 func BeginCount(url common.URL, methodName string) {
-	beginCount0(GetStatus(url, methodName))
+	beginCount0(GetUrlStatus(url))
+	beginCount0(GetMethodStatus(url, methodName))
 }
 
-func EndCount(url common.URL, methodName string) {
-	endCount0(GetStatus(url, methodName))
+func EndCount(url common.URL, methodName string, elapsed int64, succeeded bool) {
+	endCount0(GetUrlStatus(url), elapsed, succeeded)
+	endCount0(GetMethodStatus(url, methodName), elapsed, succeeded)
 }
 
 // private methods
@@ -70,6 +124,30 @@ func beginCount0(rpcStatus *RpcStatus) {
 	atomic.AddInt32(&rpcStatus.active, 1)
 }
 
-func endCount0(rpcStatus *RpcStatus) {
+func endCount0(rpcStatus *RpcStatus, elapsed int64, succeeded bool) {
 	atomic.AddInt32(&rpcStatus.active, -1)
+	atomic.AddInt32(&rpcStatus.total, 1)
+	atomic.AddInt64(&rpcStatus.totalElapsed, elapsed)
+
+	if rpcStatus.maxElapsed < elapsed {
+		atomic.StoreInt64(&rpcStatus.maxElapsed, elapsed)
+	}
+	if succeeded {
+		if rpcStatus.succeededMaxElapsed < elapsed {
+			atomic.StoreInt64(&rpcStatus.succeededMaxElapsed, elapsed)
+		}
+		atomic.StoreInt32(&rpcStatus.successiveRequestFailureCount, 0)
+	} else {
+		atomic.StoreInt64(&rpcStatus.lastRequestFailedTimestamp, time.Now().Unix())
+		atomic.AddInt32(&rpcStatus.successiveRequestFailureCount, 1)
+		atomic.AddInt32(&rpcStatus.failed, 1)
+		atomic.AddInt64(&rpcStatus.failedElapsed, elapsed)
+		if rpcStatus.failedMaxElapsed < elapsed {
+			atomic.StoreInt64(&rpcStatus.failedMaxElapsed, elapsed)
+		}
+	}
+}
+
+func CurrentTimeMillis() int64 {
+	return time.Now().UnixNano() / int64(time.Millisecond)
 }
diff --git a/protocol/rpc_status_test.go b/protocol/rpc_status_test.go
new file mode 100644
index 000000000..bce7fd5c5
--- /dev/null
+++ b/protocol/rpc_status_test.go
@@ -0,0 +1,126 @@
+package protocol
+
+import (
+	"context"
+	"strconv"
+	"testing"
+)
+
+import (
+	"github.com/stretchr/testify/assert"
+)
+
+import (
+	"github.com/apache/dubbo-go/common"
+)
+
+func TestBeginCount(t *testing.T) {
+	url, _ := common.NewURL(context.TODO(), "dubbo://192.168.10.10:20000/com.ikurento.user.UserProvider")
+	BeginCount(url, "test")
+	urlStatus := GetUrlStatus(url)
+	methodStatus := GetMethodStatus(url, "test")
+	methodStatus1 := GetMethodStatus(url, "test1")
+	assert.Equal(t, int32(1), methodStatus.active)
+	assert.Equal(t, int32(1), urlStatus.active)
+	assert.Equal(t, int32(0), methodStatus1.active)
+
+}
+
+func TestEndCount(t *testing.T) {
+	url, _ := common.NewURL(context.TODO(), "dubbo://192.168.10.10:20000/com.ikurento.user.UserProvider")
+	EndCount(url, "test", 100, true)
+	urlStatus := GetUrlStatus(url)
+	methodStatus := GetMethodStatus(url, "test")
+	assert.Equal(t, int32(-1), methodStatus.active)
+	assert.Equal(t, int32(-1), urlStatus.active)
+	assert.Equal(t, int32(1), methodStatus.total)
+	assert.Equal(t, int32(1), urlStatus.total)
+}
+
+func TestGetMethodStatus(t *testing.T) {
+	url, _ := common.NewURL(context.TODO(), "dubbo://192.168.10.10:20000/com.ikurento.user.UserProvider")
+	status := GetMethodStatus(url, "test")
+	assert.NotNil(t, status)
+	assert.Equal(t, int32(0), status.total)
+}
+
+func TestGetUrlStatus(t *testing.T) {
+	url, _ := common.NewURL(context.TODO(), "dubbo://192.168.10.10:20000/com.ikurento.user.UserProvider")
+	status := GetUrlStatus(url)
+	assert.NotNil(t, status)
+	assert.Equal(t, int32(0), status.total)
+}
+
+func Test_beginCount0(t *testing.T) {
+	url, _ := common.NewURL(context.TODO(), "dubbo://192.168.10.10:20000/com.ikurento.user.UserProvider")
+	status := GetUrlStatus(url)
+	beginCount0(status)
+	assert.Equal(t, int32(1), status.active)
+}
+
+func Test_All(t *testing.T) {
+	url, _ := common.NewURL(context.TODO(), "dubbo://192.168.10.10:20000/com.ikurento.user.UserProvider")
+	request(url, "test", 100, false, true)
+	urlStatus := GetUrlStatus(url)
+	methodStatus := GetMethodStatus(url, "test")
+	assert.Equal(t, int32(1), methodStatus.total)
+	assert.Equal(t, int32(1), urlStatus.total)
+	assert.Equal(t, int32(0), methodStatus.active)
+	assert.Equal(t, int32(0), urlStatus.active)
+	assert.Equal(t, int32(0), methodStatus.failed)
+	assert.Equal(t, int32(0), urlStatus.failed)
+	assert.Equal(t, int32(0), methodStatus.successiveRequestFailureCount)
+	assert.Equal(t, int32(0), urlStatus.successiveRequestFailureCount)
+	assert.Equal(t, int64(100), methodStatus.totalElapsed)
+	assert.Equal(t, int64(100), urlStatus.totalElapsed)
+	request(url, "test", 100, false, false)
+	request(url, "test", 100, false, false)
+	request(url, "test", 100, false, false)
+	request(url, "test", 100, false, false)
+	request(url, "test", 100, false, false)
+	assert.Equal(t, int32(6), methodStatus.total)
+	assert.Equal(t, int32(6), urlStatus.total)
+	assert.Equal(t, int32(5), methodStatus.failed)
+	assert.Equal(t, int32(5), urlStatus.failed)
+	assert.Equal(t, int32(5), methodStatus.successiveRequestFailureCount)
+	assert.Equal(t, int32(5), urlStatus.successiveRequestFailureCount)
+	assert.Equal(t, int64(600), methodStatus.totalElapsed)
+	assert.Equal(t, int64(600), urlStatus.totalElapsed)
+	assert.Equal(t, int64(500), methodStatus.failedElapsed)
+	assert.Equal(t, int64(500), urlStatus.failedElapsed)
+
+	request(url, "test", 100, false, true)
+	assert.Equal(t, int32(0), methodStatus.successiveRequestFailureCount)
+	assert.Equal(t, int32(0), urlStatus.successiveRequestFailureCount)
+
+	request(url, "test", 200, false, false)
+	request(url, "test", 200, false, false)
+	assert.Equal(t, int32(2), methodStatus.successiveRequestFailureCount)
+	assert.Equal(t, int32(2), urlStatus.successiveRequestFailureCount)
+	assert.Equal(t, int64(200), methodStatus.maxElapsed)
+	assert.Equal(t, int64(200), urlStatus.maxElapsed)
+
+	request(url, "test1", 200, false, false)
+	request(url, "test1", 200, false, false)
+	request(url, "test1", 200, false, false)
+	assert.Equal(t, int32(5), urlStatus.successiveRequestFailureCount)
+	methodStatus1 := GetMethodStatus(url, "test1")
+	assert.Equal(t, int32(2), methodStatus.successiveRequestFailureCount)
+	assert.Equal(t, int32(3), methodStatus1.successiveRequestFailureCount)
+
+}
+
+func request(url common.URL, method string, elapsed int64, active, succeeded bool) {
+	BeginCount(url, method)
+	if !active {
+		EndCount(url, method, elapsed, succeeded)
+	}
+}
+
+func TestCurrentTimeMillis(t *testing.T) {
+	c := CurrentTimeMillis()
+	assert.NotNil(t, c)
+	str := strconv.FormatInt(c, 10)
+	i, _ := strconv.ParseInt(str, 10, 64)
+	assert.Equal(t, c, i)
+}
-- 
GitLab