diff --git a/cluster/router/healthcheck/default_health_check.go b/cluster/router/healthcheck/default_health_check.go index 5be102760325e02ed72e771e2cc0cbb7be8e3819..fc9879798e938f499507017d372168758aaeab2c 100644 --- a/cluster/router/healthcheck/default_health_check.go +++ b/cluster/router/healthcheck/default_health_check.go @@ -24,25 +24,14 @@ import ( import ( "github.com/apache/dubbo-go/cluster/router" "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/common/logger" "github.com/apache/dubbo-go/protocol" ) -const ( - HEALTH_CHECKER = "health.checker" - DEFAULT_HEALTH_CHECKER = "default" - OUTSTANDING_REQUEST_COUNT_LIMIT_KEY = "outstanding.request.limit" - SUCCESSIVE_FAILED_REQUEST_THRESHOLD_KEY = "successive.failed.threshold" - DEFAULT_SUCCESSIVE_FAILED_THRESHOLD = 5 - CIRCUIT_TRIPPED_TIMEOUT_FACTOR_KEY = "circuit.tripped.timeout.factor" - DEFAULT_SUCCESSIVE_FAILED_REQUEST_MAX_DIFF = 5 - DEFAULT_CIRCUIT_TRIPPED_TIMEOUT_FACTOR = 1000 - MAX_CIRCUIT_TRIPPED_TIMEOUT_IN_MS = 30000 -) - func init() { - extension.SethealthChecker(DEFAULT_HEALTH_CHECKER, NewDefaultHealthChecker) + extension.SethealthChecker(constant.DEFAULT_HEALTH_CHECKER, NewDefaultHealthChecker) } // DefaultHealthChecker is the default implementation of HealthChecker, which determines the health status of @@ -60,7 +49,7 @@ type DefaultHealthChecker struct { // and the current active request func (c *DefaultHealthChecker) IsHealthy(invoker protocol.Invoker) bool { urlStatus := protocol.GetURLStatus(invoker.GetUrl()) - if c.isCircuitBreakerTripped(urlStatus) || urlStatus.GetActive() > c.outStandingRequestConutLimit { + if c.isCircuitBreakerTripped(urlStatus) || urlStatus.GetActive() > c.GetOutStandingRequestConutLimit() { logger.Debugf("Invoker [%s] is currently in circuitbreaker tripped state", invoker.GetUrl().Key()) return false } @@ -90,24 +79,39 @@ func (c *DefaultHealthChecker) getCircuitBreakerTimeout(status *protocol.RPCStat func (c *DefaultHealthChecker) getCircuitBreakerSleepWindowTime(status *protocol.RPCStatus) int64 { successiveFailureCount := status.GetSuccessiveRequestFailureCount() - diff := successiveFailureCount - c.requestSuccessiveFailureThreshold + diff := successiveFailureCount - c.GetRequestSuccessiveFailureThreshold() if diff < 0 { return 0 - } else if diff > DEFAULT_SUCCESSIVE_FAILED_REQUEST_MAX_DIFF { - diff = DEFAULT_SUCCESSIVE_FAILED_REQUEST_MAX_DIFF + } else if diff > constant.DEFAULT_SUCCESSIVE_FAILED_REQUEST_MAX_DIFF { + diff = constant.DEFAULT_SUCCESSIVE_FAILED_REQUEST_MAX_DIFF } - sleepWindow := (1 << diff) * DEFAULT_CIRCUIT_TRIPPED_TIMEOUT_FACTOR - if sleepWindow > MAX_CIRCUIT_TRIPPED_TIMEOUT_IN_MS { - sleepWindow = MAX_CIRCUIT_TRIPPED_TIMEOUT_IN_MS + sleepWindow := (1 << diff) * c.GetCircuitTrippedTimeoutFactor() + if sleepWindow > constant.MAX_CIRCUIT_TRIPPED_TIMEOUT_IN_MS { + sleepWindow = constant.MAX_CIRCUIT_TRIPPED_TIMEOUT_IN_MS } return int64(sleepWindow) } +// GetOutStandingRequestConutLimit return the requestSuccessiveFailureThreshold bound to this DefaultHealthChecker +func (c *DefaultHealthChecker) GetRequestSuccessiveFailureThreshold() int32 { + return c.requestSuccessiveFailureThreshold +} + +// GetOutStandingRequestConutLimit return the circuitTrippedTimeoutFactor bound to this DefaultHealthChecker +func (c *DefaultHealthChecker) GetCircuitTrippedTimeoutFactor() int32 { + return c.circuitTrippedTimeoutFactor +} + +// GetOutStandingRequestConutLimit return the outStandingRequestConutLimit bound to this DefaultHealthChecker +func (c *DefaultHealthChecker) GetOutStandingRequestConutLimit() int32 { + return c.outStandingRequestConutLimit +} + // NewDefaultHealthChecker constructs a new DefaultHealthChecker based on the url func NewDefaultHealthChecker(url *common.URL) router.HealthChecker { return &DefaultHealthChecker{ - outStandingRequestConutLimit: int32(url.GetParamInt(OUTSTANDING_REQUEST_COUNT_LIMIT_KEY, math.MaxInt32)), - requestSuccessiveFailureThreshold: int32(url.GetParamInt(SUCCESSIVE_FAILED_REQUEST_THRESHOLD_KEY, DEFAULT_SUCCESSIVE_FAILED_REQUEST_MAX_DIFF)), - circuitTrippedTimeoutFactor: int32(url.GetParamInt(CIRCUIT_TRIPPED_TIMEOUT_FACTOR_KEY, DEFAULT_CIRCUIT_TRIPPED_TIMEOUT_FACTOR)), + outStandingRequestConutLimit: int32(url.GetParamInt(constant.OUTSTANDING_REQUEST_COUNT_LIMIT_KEY, math.MaxInt32)), + requestSuccessiveFailureThreshold: int32(url.GetParamInt(constant.SUCCESSIVE_FAILED_REQUEST_THRESHOLD_KEY, constant.DEFAULT_SUCCESSIVE_FAILED_REQUEST_MAX_DIFF)), + circuitTrippedTimeoutFactor: int32(url.GetParamInt(constant.CIRCUIT_TRIPPED_TIMEOUT_FACTOR_KEY, constant.DEFAULT_CIRCUIT_TRIPPED_TIMEOUT_FACTOR)), } } diff --git a/cluster/router/healthcheck/default_health_check_test.go b/cluster/router/healthcheck/default_health_check_test.go index 39408c8a4f374b102064ef2da8490ab5a8277428..74aa3940743a012f907cfe3d8811a618f07ff800 100644 --- a/cluster/router/healthcheck/default_health_check_test.go +++ b/cluster/router/healthcheck/default_health_check_test.go @@ -28,6 +28,7 @@ import ( import ( "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/protocol" ) @@ -40,8 +41,8 @@ func TestDefaultHealthChecker_IsHealthy(t *testing.T) { healthy := hc.IsHealthy(invoker) assert.True(t, healthy) - url.SetParam(OUTSTANDING_REQUEST_COUNT_LIMIT_KEY, "10") - url.SetParam(SUCCESSIVE_FAILED_REQUEST_THRESHOLD_KEY, "100") + url.SetParam(constant.OUTSTANDING_REQUEST_COUNT_LIMIT_KEY, "10") + url.SetParam(constant.SUCCESSIVE_FAILED_REQUEST_THRESHOLD_KEY, "100") // fake the outgoing request for i := 0; i < 11; i++ { request(url, "test", 0, true, false) @@ -51,12 +52,12 @@ func TestDefaultHealthChecker_IsHealthy(t *testing.T) { // the outgoing request is more than OUTSTANDING_REQUEST_COUNT_LIMIT, go to unhealthy assert.False(t, hc.IsHealthy(invoker)) - // successive failed count is more than SUCCESSIVE_FAILED_REQUEST_THRESHOLD_KEY, go to unhealthy + // successive failed count is more than constant.SUCCESSIVE_FAILED_REQUEST_THRESHOLD_KEY, go to unhealthy for i := 0; i < 11; i++ { request(url, "test", 0, false, false) } - url.SetParam(SUCCESSIVE_FAILED_REQUEST_THRESHOLD_KEY, "10") - url.SetParam(OUTSTANDING_REQUEST_COUNT_LIMIT_KEY, "1000") + url.SetParam(constant.SUCCESSIVE_FAILED_REQUEST_THRESHOLD_KEY, "10") + url.SetParam(constant.OUTSTANDING_REQUEST_COUNT_LIMIT_KEY, "1000") hc = NewDefaultHealthChecker(&url).(*DefaultHealthChecker) healthy = hc.IsHealthy(invoker) assert.False(t, hc.IsHealthy(invoker)) @@ -76,10 +77,10 @@ func TestDefaultHealthChecker_getCircuitBreakerSleepWindowTime(t *testing.T) { request(url, "test", 1, false, false) } sleepWindowTime := defaultHc.getCircuitBreakerSleepWindowTime(protocol.GetURLStatus(url)) - assert.True(t, sleepWindowTime == MAX_CIRCUIT_TRIPPED_TIMEOUT_IN_MS) + assert.True(t, sleepWindowTime == constant.MAX_CIRCUIT_TRIPPED_TIMEOUT_IN_MS) // Adjust the threshold size to 1000 - url.SetParam(SUCCESSIVE_FAILED_REQUEST_THRESHOLD_KEY, "1000") + url.SetParam(constant.SUCCESSIVE_FAILED_REQUEST_THRESHOLD_KEY, "1000") sleepWindowTime = NewDefaultHealthChecker(&url).(*DefaultHealthChecker).getCircuitBreakerSleepWindowTime(protocol.GetURLStatus(url)) assert.True(t, sleepWindowTime == 0) @@ -93,7 +94,7 @@ func TestDefaultHealthChecker_getCircuitBreakerSleepWindowTime(t *testing.T) { request(url1, "test", 1, false, false) request(url1, "test", 1, false, false) sleepWindowTime = defaultHc.getCircuitBreakerSleepWindowTime(protocol.GetURLStatus(url1)) - assert.True(t, sleepWindowTime > 0 && sleepWindowTime < MAX_CIRCUIT_TRIPPED_TIMEOUT_IN_MS) + assert.True(t, sleepWindowTime > 0 && sleepWindowTime < constant.MAX_CIRCUIT_TRIPPED_TIMEOUT_IN_MS) } func TestDefaultHealthChecker_getCircuitBreakerTimeout(t *testing.T) { @@ -137,11 +138,11 @@ func TestNewDefaultHealthChecker(t *testing.T) { defaultHc := NewDefaultHealthChecker(&url).(*DefaultHealthChecker) assert.NotNil(t, defaultHc) assert.Equal(t, defaultHc.outStandingRequestConutLimit, int32(math.MaxInt32)) - assert.Equal(t, defaultHc.requestSuccessiveFailureThreshold, int32(DEFAULT_SUCCESSIVE_FAILED_REQUEST_MAX_DIFF)) + assert.Equal(t, defaultHc.requestSuccessiveFailureThreshold, int32(constant.DEFAULT_SUCCESSIVE_FAILED_REQUEST_MAX_DIFF)) url1, _ := common.NewURL("dubbo://192.168.10.10:20000/com.ikurento.user.UserProvider") - url1.SetParam(OUTSTANDING_REQUEST_COUNT_LIMIT_KEY, "10") - url1.SetParam(SUCCESSIVE_FAILED_REQUEST_THRESHOLD_KEY, "10") + url1.SetParam(constant.OUTSTANDING_REQUEST_COUNT_LIMIT_KEY, "10") + url1.SetParam(constant.SUCCESSIVE_FAILED_REQUEST_THRESHOLD_KEY, "10") nondefaultHc := NewDefaultHealthChecker(&url1).(*DefaultHealthChecker) assert.NotNil(t, nondefaultHc) assert.Equal(t, nondefaultHc.outStandingRequestConutLimit, int32(10)) diff --git a/cluster/router/healthcheck/health_check_route.go b/cluster/router/healthcheck/health_check_route.go index 7e66f9d0c20342bf7b1bcbe72c0376fe37ea6da0..481a85acb0d353ae9767b0c8a2d40be61d094a52 100644 --- a/cluster/router/healthcheck/health_check_route.go +++ b/cluster/router/healthcheck/health_check_route.go @@ -20,6 +20,7 @@ package healthcheck import ( "github.com/apache/dubbo-go/cluster/router" "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/common/logger" "github.com/apache/dubbo-go/protocol" @@ -42,7 +43,7 @@ func NewHealthCheckRouter(url *common.URL) (router.Router, error) { r.url = url r.enabled = url.GetParamBool(HEALTH_ROUTE_ENABLED_KEY, false) if r.enabled { - checkerName := url.GetParam(HEALTH_CHECKER, DEFAULT_HEALTH_CHECKER) + checkerName := url.GetParam(constant.HEALTH_CHECKER, constant.DEFAULT_HEALTH_CHECKER) r.checker = extension.GetHealthChecker(checkerName, url) } return r, nil diff --git a/cluster/router/healthcheck/health_check_route_test.go b/cluster/router/healthcheck/health_check_route_test.go index c48fdc7d6a5924c47a55d51deb05034bc6d49456..759ef93dbeb8d91a82eefd59060afbe8a10a4440 100644 --- a/cluster/router/healthcheck/health_check_route_test.go +++ b/cluster/router/healthcheck/health_check_route_test.go @@ -29,6 +29,7 @@ import ( import ( "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/protocol" "github.com/apache/dubbo-go/protocol/invocation" ) @@ -119,12 +120,12 @@ func TestNewHealthCheckRouter(t *testing.T) { dhc := h.checker.(*DefaultHealthChecker) assert.Equal(t, dhc.outStandingRequestConutLimit, int32(math.MaxInt32)) - assert.Equal(t, dhc.requestSuccessiveFailureThreshold, int32(DEFAULT_SUCCESSIVE_FAILED_THRESHOLD)) - assert.Equal(t, dhc.circuitTrippedTimeoutFactor, int32(DEFAULT_CIRCUIT_TRIPPED_TIMEOUT_FACTOR)) + assert.Equal(t, dhc.requestSuccessiveFailureThreshold, int32(constant.DEFAULT_SUCCESSIVE_FAILED_THRESHOLD)) + assert.Equal(t, dhc.circuitTrippedTimeoutFactor, int32(constant.DEFAULT_CIRCUIT_TRIPPED_TIMEOUT_FACTOR)) - url.SetParam(CIRCUIT_TRIPPED_TIMEOUT_FACTOR_KEY, "500") - url.SetParam(SUCCESSIVE_FAILED_REQUEST_THRESHOLD_KEY, "10") - url.SetParam(OUTSTANDING_REQUEST_COUNT_LIMIT_KEY, "1000") + url.SetParam(constant.CIRCUIT_TRIPPED_TIMEOUT_FACTOR_KEY, "500") + url.SetParam(constant.SUCCESSIVE_FAILED_REQUEST_THRESHOLD_KEY, "10") + url.SetParam(constant.OUTSTANDING_REQUEST_COUNT_LIMIT_KEY, "1000") hcr, _ = NewHealthCheckRouter(&url) h = hcr.(*HealthCheckRouter) dhc = h.checker.(*DefaultHealthChecker) diff --git a/common/constant/key.go b/common/constant/key.go index 5690e53ed9fbd2bb7ffef4e0c36cdff76843e42b..0ecd4f752e193588f0e3b18208e78825957a4066 100644 --- a/common/constant/key.go +++ b/common/constant/key.go @@ -190,3 +190,16 @@ const ( ACCESS_KEY_ID_KEY = "accessKeyId" SECRET_ACCESS_KEY_KEY = "secretAccessKey" ) + +// HealthCheck Router +const ( + HEALTH_CHECKER = "health.checker" + DEFAULT_HEALTH_CHECKER = "default" + OUTSTANDING_REQUEST_COUNT_LIMIT_KEY = "outstanding.request.limit" + SUCCESSIVE_FAILED_REQUEST_THRESHOLD_KEY = "successive.failed.threshold" + DEFAULT_SUCCESSIVE_FAILED_THRESHOLD = 5 + CIRCUIT_TRIPPED_TIMEOUT_FACTOR_KEY = "circuit.tripped.timeout.factor" + DEFAULT_SUCCESSIVE_FAILED_REQUEST_MAX_DIFF = 5 + DEFAULT_CIRCUIT_TRIPPED_TIMEOUT_FACTOR = 1000 + MAX_CIRCUIT_TRIPPED_TIMEOUT_IN_MS = 30000 +)