diff --git a/cluster/router/healthcheck/default_health_check.go b/cluster/router/healthcheck/default_health_check.go index 44d5f0e95c2f633df610b9adfd857b05d8868569..e580dc1040833b2b5979312c165d7b15376bba15 100644 --- a/cluster/router/healthcheck/default_health_check.go +++ b/cluster/router/healthcheck/default_health_check.go @@ -38,7 +38,7 @@ const ( CIRCUIT_TRIPPED_TIMEOUT_FACTOR_KEY = "circuit.tripped.timeout.factor" DEFAULT_SUCCESSIVE_FAILED_REQUEST_MAX_DIFF = 5 DEFAULT_CIRCUIT_TRIPPED_TIMEOUT_FACTOR = 1000 - MAX_CIRCUIT_TRIPPED_TIMEOUT = 30000 + MAX_CIRCUIT_TRIPPED_TIMEOUT_IN_MS = 30000 ) func init() { @@ -48,19 +48,19 @@ func init() { // DefaultHealthChecker is the default implementation of HealthChecker, which determines the health status of // the invoker based on the number of successive bad request and the current active request. type DefaultHealthChecker struct { - // OutStandingRequestConutLimit - OutStandingRequestConutLimit int32 - // RequestSuccessiveFailureThreshold - RequestSuccessiveFailureThreshold int32 - // RequestSuccessiveFailureThreshold - CircuitTrippedTimeoutFactor int32 + // outStandingRequestConutLimit + outStandingRequestConutLimit int32 + // requestSuccessiveFailureThreshold + requestSuccessiveFailureThreshold int32 + // requestSuccessiveFailureThreshold + circuitTrippedTimeoutFactor int32 } // IsHealthy evaluates the healthy state on the given Invoker based on the number of successive bad request // and the current active request func (c *DefaultHealthChecker) IsHealthy(invoker protocol.Invoker) bool { urlStatus := protocol.GetURLStatus(invoker.GetUrl()) - if c.isCircuitBreakerTripped(urlStatus) || urlStatus.GetActive() > c.OutStandingRequestConutLimit { + if c.isCircuitBreakerTripped(urlStatus) || urlStatus.GetActive() > c.outStandingRequestConutLimit { logger.Debugf("Invoker [%s] is currently in circuitbreaker tripped state", invoker.GetUrl().Key()) return false } @@ -77,7 +77,7 @@ func (c *DefaultHealthChecker) isCircuitBreakerTripped(status *protocol.RPCStatu return circuitBreakerTimeout > currentTime } -// getCircuitBreakerTimeout get the timestamp recovered from tripped state +// getCircuitBreakerTimeout get the timestamp recovered from tripped state, the unit is millisecond func (c *DefaultHealthChecker) getCircuitBreakerTimeout(status *protocol.RPCStatus) int64 { sleepWindow := c.getCircuitBreakerSleepWindowTime(status) if sleepWindow <= 0 { @@ -86,19 +86,19 @@ func (c *DefaultHealthChecker) getCircuitBreakerTimeout(status *protocol.RPCStat return status.GetLastRequestFailedTimestamp() + sleepWindow } -// getCircuitBreakerSleepWindowTime get the sleep window time of invoker +// getCircuitBreakerSleepWindowTime get the sleep window time of invoker, the unit is millisecond func (c *DefaultHealthChecker) getCircuitBreakerSleepWindowTime(status *protocol.RPCStatus) int64 { successiveFailureCount := status.GetSuccessiveRequestFailureCount() - diff := successiveFailureCount - c.RequestSuccessiveFailureThreshold + diff := successiveFailureCount - c.requestSuccessiveFailureThreshold if diff < 0 { return 0 } else if diff > DEFAULT_SUCCESSIVE_FAILED_REQUEST_MAX_DIFF { diff = DEFAULT_SUCCESSIVE_FAILED_REQUEST_MAX_DIFF } sleepWindow := (1 << diff) * DEFAULT_CIRCUIT_TRIPPED_TIMEOUT_FACTOR - if sleepWindow > MAX_CIRCUIT_TRIPPED_TIMEOUT { - sleepWindow = MAX_CIRCUIT_TRIPPED_TIMEOUT + if sleepWindow > MAX_CIRCUIT_TRIPPED_TIMEOUT_IN_MS { + sleepWindow = MAX_CIRCUIT_TRIPPED_TIMEOUT_IN_MS } return int64(sleepWindow) } @@ -106,8 +106,10 @@ func (c *DefaultHealthChecker) getCircuitBreakerSleepWindowTime(status *protocol // NewDefaultHealthChecker constructs a new DefaultHealthChecker based on the url func NewDefaultHealthChecker(url *common.URL) router.HealthChecker { return &DefaultHealthChecker{ - OutStandingRequestConutLimit: int32(url.GetParamInt(OUTSTANDING_REQUEST_COUNT_LIMIT_KEY, math.MaxInt32)), - RequestSuccessiveFailureThreshold: int32(url.GetParamInt(SUCCESSIVE_FAILED_REQUEST_THRESHOLD_KEY, DEFAULT_SUCCESSIVE_FAILED_REQUEST_MAX_DIFF)), - CircuitTrippedTimeoutFactor: int32(url.GetParamInt(CIRCUIT_TRIPPED_TIMEOUT_FACTOR_KEY, DEFAULT_CIRCUIT_TRIPPED_TIMEOUT_FACTOR)), + outStandingRequestConutLimit: int32(url.GetParamInt(OUTSTANDING_REQUEST_COUNT_LIMIT_KEY, math.MaxInt32)), + requestSuccessiveFailureThreshold: int32(url.GetParamInt(SUCCESSIVE_FAILED_REQUEST_THRESHOLD_KEY, DEFAULT_SUCCESSIVE_FAILED_REQUEST_MAX_DIFF)), + circuitTrippedTimeoutFactor: int32(url.GetParamInt(CIRCUIT_TRIPPED_TIMEOUT_FACTOR_KEY, DEFAULT_CIRCUIT_TRIPPED_TIMEOUT_FACTOR)), } } + + diff --git a/cluster/router/healthcheck/default_health_check_test.go b/cluster/router/healthcheck/default_health_check_test.go index 5ab2bfb47d4569ea19f2376725c440f427c19cd2..39408c8a4f374b102064ef2da8490ab5a8277428 100644 --- a/cluster/router/healthcheck/default_health_check_test.go +++ b/cluster/router/healthcheck/default_health_check_test.go @@ -76,7 +76,7 @@ func TestDefaultHealthChecker_getCircuitBreakerSleepWindowTime(t *testing.T) { request(url, "test", 1, false, false) } sleepWindowTime := defaultHc.getCircuitBreakerSleepWindowTime(protocol.GetURLStatus(url)) - assert.True(t, sleepWindowTime == MAX_CIRCUIT_TRIPPED_TIMEOUT) + assert.True(t, sleepWindowTime == MAX_CIRCUIT_TRIPPED_TIMEOUT_IN_MS) // Adjust the threshold size to 1000 url.SetParam(SUCCESSIVE_FAILED_REQUEST_THRESHOLD_KEY, "1000") @@ -93,7 +93,7 @@ func TestDefaultHealthChecker_getCircuitBreakerSleepWindowTime(t *testing.T) { request(url1, "test", 1, false, false) request(url1, "test", 1, false, false) sleepWindowTime = defaultHc.getCircuitBreakerSleepWindowTime(protocol.GetURLStatus(url1)) - assert.True(t, sleepWindowTime > 0 && sleepWindowTime < MAX_CIRCUIT_TRIPPED_TIMEOUT) + assert.True(t, sleepWindowTime > 0 && sleepWindowTime < MAX_CIRCUIT_TRIPPED_TIMEOUT_IN_MS) } func TestDefaultHealthChecker_getCircuitBreakerTimeout(t *testing.T) { @@ -136,16 +136,16 @@ func TestNewDefaultHealthChecker(t *testing.T) { url, _ := common.NewURL("dubbo://192.168.10.10:20000/com.ikurento.user.UserProvider") defaultHc := NewDefaultHealthChecker(&url).(*DefaultHealthChecker) assert.NotNil(t, defaultHc) - assert.Equal(t, defaultHc.OutStandingRequestConutLimit, int32(math.MaxInt32)) - assert.Equal(t, defaultHc.RequestSuccessiveFailureThreshold, int32(DEFAULT_SUCCESSIVE_FAILED_REQUEST_MAX_DIFF)) + assert.Equal(t, defaultHc.outStandingRequestConutLimit, int32(math.MaxInt32)) + assert.Equal(t, defaultHc.requestSuccessiveFailureThreshold, int32(DEFAULT_SUCCESSIVE_FAILED_REQUEST_MAX_DIFF)) url1, _ := common.NewURL("dubbo://192.168.10.10:20000/com.ikurento.user.UserProvider") url1.SetParam(OUTSTANDING_REQUEST_COUNT_LIMIT_KEY, "10") url1.SetParam(SUCCESSIVE_FAILED_REQUEST_THRESHOLD_KEY, "10") nondefaultHc := NewDefaultHealthChecker(&url1).(*DefaultHealthChecker) assert.NotNil(t, nondefaultHc) - assert.Equal(t, nondefaultHc.OutStandingRequestConutLimit, int32(10)) - assert.Equal(t, nondefaultHc.RequestSuccessiveFailureThreshold, int32(10)) + assert.Equal(t, nondefaultHc.outStandingRequestConutLimit, int32(10)) + assert.Equal(t, nondefaultHc.requestSuccessiveFailureThreshold, int32(10)) } func request(url common.URL, method string, elapsed int64, active, succeeded bool) { diff --git a/cluster/router/healthcheck/factory.go b/cluster/router/healthcheck/factory.go index 337013c9a4805898f137bf5d4eff07c501ec8d3e..32d84d145ceb2aa05f5a75de352e52d13dd9d6b3 100644 --- a/cluster/router/healthcheck/factory.go +++ b/cluster/router/healthcheck/factory.go @@ -28,7 +28,6 @@ func init() { extension.SetRouterFactory(constant.HealthCheckRouterName, newHealthCheckRouteFactory) } - // HealthCheckRouteFactory type HealthCheckRouteFactory struct { } diff --git a/cluster/router/healthcheck/factory_test.go b/cluster/router/healthcheck/factory_test.go index 824a44271550523601aab1a571658220791b06a4..a9d94da7c37f0e0c9640de1386998a85823e80a6 100644 --- a/cluster/router/healthcheck/factory_test.go +++ b/cluster/router/healthcheck/factory_test.go @@ -30,13 +30,14 @@ import ( "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/protocol" ) + type MockInvoker struct { - url common.URL + url common.URL } func NewMockInvoker(url common.URL, successCount int) *MockInvoker { return &MockInvoker{ - url: url, + url: url, } } @@ -51,8 +52,6 @@ func (bi *MockInvoker) IsDestroyed() bool { return true } - - func (bi *MockInvoker) Invoke(_ context.Context, _ protocol.Invocation) protocol.Result { return nil } @@ -60,7 +59,6 @@ func (bi *MockInvoker) Invoke(_ context.Context, _ protocol.Invocation) protocol func (bi *MockInvoker) Destroy() { } - func TestHealthCheckRouteFactory(t *testing.T) { factory := newHealthCheckRouteFactory() assert.NotNil(t, factory) diff --git a/cluster/router/healthcheck/health_check_route_test.go b/cluster/router/healthcheck/health_check_route_test.go index 6ad5ce6b92ef5dfd2b7ac18982dfa708645c1e92..c48fdc7d6a5924c47a55d51deb05034bc6d49456 100644 --- a/cluster/router/healthcheck/health_check_route_test.go +++ b/cluster/router/healthcheck/health_check_route_test.go @@ -118,9 +118,9 @@ func TestNewHealthCheckRouter(t *testing.T) { assert.NotNil(t, h.checker) dhc := h.checker.(*DefaultHealthChecker) - assert.Equal(t, dhc.OutStandingRequestConutLimit, int32(math.MaxInt32)) - assert.Equal(t, dhc.RequestSuccessiveFailureThreshold, int32(DEFAULT_SUCCESSIVE_FAILED_THRESHOLD)) - assert.Equal(t, dhc.CircuitTrippedTimeoutFactor, int32(DEFAULT_CIRCUIT_TRIPPED_TIMEOUT_FACTOR)) + assert.Equal(t, dhc.outStandingRequestConutLimit, int32(math.MaxInt32)) + assert.Equal(t, dhc.requestSuccessiveFailureThreshold, int32(DEFAULT_SUCCESSIVE_FAILED_THRESHOLD)) + assert.Equal(t, dhc.circuitTrippedTimeoutFactor, int32(DEFAULT_CIRCUIT_TRIPPED_TIMEOUT_FACTOR)) url.SetParam(CIRCUIT_TRIPPED_TIMEOUT_FACTOR_KEY, "500") url.SetParam(SUCCESSIVE_FAILED_REQUEST_THRESHOLD_KEY, "10") @@ -128,7 +128,7 @@ func TestNewHealthCheckRouter(t *testing.T) { hcr, _ = NewHealthCheckRouter(&url) h = hcr.(*HealthCheckRouter) dhc = h.checker.(*DefaultHealthChecker) - assert.Equal(t, dhc.OutStandingRequestConutLimit, int32(1000)) - assert.Equal(t, dhc.RequestSuccessiveFailureThreshold, int32(10)) - assert.Equal(t, dhc.CircuitTrippedTimeoutFactor, int32(500)) + assert.Equal(t, dhc.outStandingRequestConutLimit, int32(1000)) + assert.Equal(t, dhc.requestSuccessiveFailureThreshold, int32(10)) + assert.Equal(t, dhc.circuitTrippedTimeoutFactor, int32(500)) } diff --git a/common/constant/key.go b/common/constant/key.go index 3ba8eed8df17472832c49f8fe147707f59144d92..648dc48d8f73f3e8aecc7b3ac1abded93545641b 100644 --- a/common/constant/key.go +++ b/common/constant/key.go @@ -159,6 +159,7 @@ const ( ConditionAppRouterName = "app" // ListenableRouterName Specify listenable router name ListenableRouterName = "listenable" + // HealthCheckRouterName Specify the name of HealthCheckRouter HealthCheckRouterName = "health_check" // ConditionRouterRuleSuffix Specify condition router suffix