Skip to content
Snippets Groups Projects
Commit c8779cc1 authored by CodingSinger's avatar CodingSinger
Browse files

mv constant val to constant package

parent 6b98d435
No related branches found
No related tags found
No related merge requests found
......@@ -24,25 +24,14 @@ import (
import (
"github.com/apache/dubbo-go/cluster/router"
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/protocol"
)
const (
HEALTH_CHECKER = "health.checker"
DEFAULT_HEALTH_CHECKER = "default"
OUTSTANDING_REQUEST_COUNT_LIMIT_KEY = "outstanding.request.limit"
SUCCESSIVE_FAILED_REQUEST_THRESHOLD_KEY = "successive.failed.threshold"
DEFAULT_SUCCESSIVE_FAILED_THRESHOLD = 5
CIRCUIT_TRIPPED_TIMEOUT_FACTOR_KEY = "circuit.tripped.timeout.factor"
DEFAULT_SUCCESSIVE_FAILED_REQUEST_MAX_DIFF = 5
DEFAULT_CIRCUIT_TRIPPED_TIMEOUT_FACTOR = 1000
MAX_CIRCUIT_TRIPPED_TIMEOUT_IN_MS = 30000
)
func init() {
extension.SethealthChecker(DEFAULT_HEALTH_CHECKER, NewDefaultHealthChecker)
extension.SethealthChecker(constant.DEFAULT_HEALTH_CHECKER, NewDefaultHealthChecker)
}
// DefaultHealthChecker is the default implementation of HealthChecker, which determines the health status of
......@@ -60,7 +49,7 @@ type DefaultHealthChecker struct {
// and the current active request
func (c *DefaultHealthChecker) IsHealthy(invoker protocol.Invoker) bool {
urlStatus := protocol.GetURLStatus(invoker.GetUrl())
if c.isCircuitBreakerTripped(urlStatus) || urlStatus.GetActive() > c.outStandingRequestConutLimit {
if c.isCircuitBreakerTripped(urlStatus) || urlStatus.GetActive() > c.GetOutStandingRequestConutLimit() {
logger.Debugf("Invoker [%s] is currently in circuitbreaker tripped state", invoker.GetUrl().Key())
return false
}
......@@ -90,24 +79,39 @@ func (c *DefaultHealthChecker) getCircuitBreakerTimeout(status *protocol.RPCStat
func (c *DefaultHealthChecker) getCircuitBreakerSleepWindowTime(status *protocol.RPCStatus) int64 {
successiveFailureCount := status.GetSuccessiveRequestFailureCount()
diff := successiveFailureCount - c.requestSuccessiveFailureThreshold
diff := successiveFailureCount - c.GetRequestSuccessiveFailureThreshold()
if diff < 0 {
return 0
} else if diff > DEFAULT_SUCCESSIVE_FAILED_REQUEST_MAX_DIFF {
diff = DEFAULT_SUCCESSIVE_FAILED_REQUEST_MAX_DIFF
} else if diff > constant.DEFAULT_SUCCESSIVE_FAILED_REQUEST_MAX_DIFF {
diff = constant.DEFAULT_SUCCESSIVE_FAILED_REQUEST_MAX_DIFF
}
sleepWindow := (1 << diff) * DEFAULT_CIRCUIT_TRIPPED_TIMEOUT_FACTOR
if sleepWindow > MAX_CIRCUIT_TRIPPED_TIMEOUT_IN_MS {
sleepWindow = MAX_CIRCUIT_TRIPPED_TIMEOUT_IN_MS
sleepWindow := (1 << diff) * c.GetCircuitTrippedTimeoutFactor()
if sleepWindow > constant.MAX_CIRCUIT_TRIPPED_TIMEOUT_IN_MS {
sleepWindow = constant.MAX_CIRCUIT_TRIPPED_TIMEOUT_IN_MS
}
return int64(sleepWindow)
}
// GetOutStandingRequestConutLimit return the requestSuccessiveFailureThreshold bound to this DefaultHealthChecker
func (c *DefaultHealthChecker) GetRequestSuccessiveFailureThreshold() int32 {
return c.requestSuccessiveFailureThreshold
}
// GetOutStandingRequestConutLimit return the circuitTrippedTimeoutFactor bound to this DefaultHealthChecker
func (c *DefaultHealthChecker) GetCircuitTrippedTimeoutFactor() int32 {
return c.circuitTrippedTimeoutFactor
}
// GetOutStandingRequestConutLimit return the outStandingRequestConutLimit bound to this DefaultHealthChecker
func (c *DefaultHealthChecker) GetOutStandingRequestConutLimit() int32 {
return c.outStandingRequestConutLimit
}
// NewDefaultHealthChecker constructs a new DefaultHealthChecker based on the url
func NewDefaultHealthChecker(url *common.URL) router.HealthChecker {
return &DefaultHealthChecker{
outStandingRequestConutLimit: int32(url.GetParamInt(OUTSTANDING_REQUEST_COUNT_LIMIT_KEY, math.MaxInt32)),
requestSuccessiveFailureThreshold: int32(url.GetParamInt(SUCCESSIVE_FAILED_REQUEST_THRESHOLD_KEY, DEFAULT_SUCCESSIVE_FAILED_REQUEST_MAX_DIFF)),
circuitTrippedTimeoutFactor: int32(url.GetParamInt(CIRCUIT_TRIPPED_TIMEOUT_FACTOR_KEY, DEFAULT_CIRCUIT_TRIPPED_TIMEOUT_FACTOR)),
outStandingRequestConutLimit: int32(url.GetParamInt(constant.OUTSTANDING_REQUEST_COUNT_LIMIT_KEY, math.MaxInt32)),
requestSuccessiveFailureThreshold: int32(url.GetParamInt(constant.SUCCESSIVE_FAILED_REQUEST_THRESHOLD_KEY, constant.DEFAULT_SUCCESSIVE_FAILED_REQUEST_MAX_DIFF)),
circuitTrippedTimeoutFactor: int32(url.GetParamInt(constant.CIRCUIT_TRIPPED_TIMEOUT_FACTOR_KEY, constant.DEFAULT_CIRCUIT_TRIPPED_TIMEOUT_FACTOR)),
}
}
......@@ -28,6 +28,7 @@ import (
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/protocol"
)
......@@ -40,8 +41,8 @@ func TestDefaultHealthChecker_IsHealthy(t *testing.T) {
healthy := hc.IsHealthy(invoker)
assert.True(t, healthy)
url.SetParam(OUTSTANDING_REQUEST_COUNT_LIMIT_KEY, "10")
url.SetParam(SUCCESSIVE_FAILED_REQUEST_THRESHOLD_KEY, "100")
url.SetParam(constant.OUTSTANDING_REQUEST_COUNT_LIMIT_KEY, "10")
url.SetParam(constant.SUCCESSIVE_FAILED_REQUEST_THRESHOLD_KEY, "100")
// fake the outgoing request
for i := 0; i < 11; i++ {
request(url, "test", 0, true, false)
......@@ -51,12 +52,12 @@ func TestDefaultHealthChecker_IsHealthy(t *testing.T) {
// the outgoing request is more than OUTSTANDING_REQUEST_COUNT_LIMIT, go to unhealthy
assert.False(t, hc.IsHealthy(invoker))
// successive failed count is more than SUCCESSIVE_FAILED_REQUEST_THRESHOLD_KEY, go to unhealthy
// successive failed count is more than constant.SUCCESSIVE_FAILED_REQUEST_THRESHOLD_KEY, go to unhealthy
for i := 0; i < 11; i++ {
request(url, "test", 0, false, false)
}
url.SetParam(SUCCESSIVE_FAILED_REQUEST_THRESHOLD_KEY, "10")
url.SetParam(OUTSTANDING_REQUEST_COUNT_LIMIT_KEY, "1000")
url.SetParam(constant.SUCCESSIVE_FAILED_REQUEST_THRESHOLD_KEY, "10")
url.SetParam(constant.OUTSTANDING_REQUEST_COUNT_LIMIT_KEY, "1000")
hc = NewDefaultHealthChecker(&url).(*DefaultHealthChecker)
healthy = hc.IsHealthy(invoker)
assert.False(t, hc.IsHealthy(invoker))
......@@ -76,10 +77,10 @@ func TestDefaultHealthChecker_getCircuitBreakerSleepWindowTime(t *testing.T) {
request(url, "test", 1, false, false)
}
sleepWindowTime := defaultHc.getCircuitBreakerSleepWindowTime(protocol.GetURLStatus(url))
assert.True(t, sleepWindowTime == MAX_CIRCUIT_TRIPPED_TIMEOUT_IN_MS)
assert.True(t, sleepWindowTime == constant.MAX_CIRCUIT_TRIPPED_TIMEOUT_IN_MS)
// Adjust the threshold size to 1000
url.SetParam(SUCCESSIVE_FAILED_REQUEST_THRESHOLD_KEY, "1000")
url.SetParam(constant.SUCCESSIVE_FAILED_REQUEST_THRESHOLD_KEY, "1000")
sleepWindowTime = NewDefaultHealthChecker(&url).(*DefaultHealthChecker).getCircuitBreakerSleepWindowTime(protocol.GetURLStatus(url))
assert.True(t, sleepWindowTime == 0)
......@@ -93,7 +94,7 @@ func TestDefaultHealthChecker_getCircuitBreakerSleepWindowTime(t *testing.T) {
request(url1, "test", 1, false, false)
request(url1, "test", 1, false, false)
sleepWindowTime = defaultHc.getCircuitBreakerSleepWindowTime(protocol.GetURLStatus(url1))
assert.True(t, sleepWindowTime > 0 && sleepWindowTime < MAX_CIRCUIT_TRIPPED_TIMEOUT_IN_MS)
assert.True(t, sleepWindowTime > 0 && sleepWindowTime < constant.MAX_CIRCUIT_TRIPPED_TIMEOUT_IN_MS)
}
func TestDefaultHealthChecker_getCircuitBreakerTimeout(t *testing.T) {
......@@ -137,11 +138,11 @@ func TestNewDefaultHealthChecker(t *testing.T) {
defaultHc := NewDefaultHealthChecker(&url).(*DefaultHealthChecker)
assert.NotNil(t, defaultHc)
assert.Equal(t, defaultHc.outStandingRequestConutLimit, int32(math.MaxInt32))
assert.Equal(t, defaultHc.requestSuccessiveFailureThreshold, int32(DEFAULT_SUCCESSIVE_FAILED_REQUEST_MAX_DIFF))
assert.Equal(t, defaultHc.requestSuccessiveFailureThreshold, int32(constant.DEFAULT_SUCCESSIVE_FAILED_REQUEST_MAX_DIFF))
url1, _ := common.NewURL("dubbo://192.168.10.10:20000/com.ikurento.user.UserProvider")
url1.SetParam(OUTSTANDING_REQUEST_COUNT_LIMIT_KEY, "10")
url1.SetParam(SUCCESSIVE_FAILED_REQUEST_THRESHOLD_KEY, "10")
url1.SetParam(constant.OUTSTANDING_REQUEST_COUNT_LIMIT_KEY, "10")
url1.SetParam(constant.SUCCESSIVE_FAILED_REQUEST_THRESHOLD_KEY, "10")
nondefaultHc := NewDefaultHealthChecker(&url1).(*DefaultHealthChecker)
assert.NotNil(t, nondefaultHc)
assert.Equal(t, nondefaultHc.outStandingRequestConutLimit, int32(10))
......
......@@ -20,6 +20,7 @@ package healthcheck
import (
"github.com/apache/dubbo-go/cluster/router"
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/protocol"
......@@ -42,7 +43,7 @@ func NewHealthCheckRouter(url *common.URL) (router.Router, error) {
r.url = url
r.enabled = url.GetParamBool(HEALTH_ROUTE_ENABLED_KEY, false)
if r.enabled {
checkerName := url.GetParam(HEALTH_CHECKER, DEFAULT_HEALTH_CHECKER)
checkerName := url.GetParam(constant.HEALTH_CHECKER, constant.DEFAULT_HEALTH_CHECKER)
r.checker = extension.GetHealthChecker(checkerName, url)
}
return r, nil
......
......@@ -29,6 +29,7 @@ import (
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/protocol"
"github.com/apache/dubbo-go/protocol/invocation"
)
......@@ -119,12 +120,12 @@ func TestNewHealthCheckRouter(t *testing.T) {
dhc := h.checker.(*DefaultHealthChecker)
assert.Equal(t, dhc.outStandingRequestConutLimit, int32(math.MaxInt32))
assert.Equal(t, dhc.requestSuccessiveFailureThreshold, int32(DEFAULT_SUCCESSIVE_FAILED_THRESHOLD))
assert.Equal(t, dhc.circuitTrippedTimeoutFactor, int32(DEFAULT_CIRCUIT_TRIPPED_TIMEOUT_FACTOR))
assert.Equal(t, dhc.requestSuccessiveFailureThreshold, int32(constant.DEFAULT_SUCCESSIVE_FAILED_THRESHOLD))
assert.Equal(t, dhc.circuitTrippedTimeoutFactor, int32(constant.DEFAULT_CIRCUIT_TRIPPED_TIMEOUT_FACTOR))
url.SetParam(CIRCUIT_TRIPPED_TIMEOUT_FACTOR_KEY, "500")
url.SetParam(SUCCESSIVE_FAILED_REQUEST_THRESHOLD_KEY, "10")
url.SetParam(OUTSTANDING_REQUEST_COUNT_LIMIT_KEY, "1000")
url.SetParam(constant.CIRCUIT_TRIPPED_TIMEOUT_FACTOR_KEY, "500")
url.SetParam(constant.SUCCESSIVE_FAILED_REQUEST_THRESHOLD_KEY, "10")
url.SetParam(constant.OUTSTANDING_REQUEST_COUNT_LIMIT_KEY, "1000")
hcr, _ = NewHealthCheckRouter(&url)
h = hcr.(*HealthCheckRouter)
dhc = h.checker.(*DefaultHealthChecker)
......
......@@ -190,3 +190,16 @@ const (
ACCESS_KEY_ID_KEY = "accessKeyId"
SECRET_ACCESS_KEY_KEY = "secretAccessKey"
)
// HealthCheck Router
const (
HEALTH_CHECKER = "health.checker"
DEFAULT_HEALTH_CHECKER = "default"
OUTSTANDING_REQUEST_COUNT_LIMIT_KEY = "outstanding.request.limit"
SUCCESSIVE_FAILED_REQUEST_THRESHOLD_KEY = "successive.failed.threshold"
DEFAULT_SUCCESSIVE_FAILED_THRESHOLD = 5
CIRCUIT_TRIPPED_TIMEOUT_FACTOR_KEY = "circuit.tripped.timeout.factor"
DEFAULT_SUCCESSIVE_FAILED_REQUEST_MAX_DIFF = 5
DEFAULT_CIRCUIT_TRIPPED_TIMEOUT_FACTOR = 1000
MAX_CIRCUIT_TRIPPED_TIMEOUT_IN_MS = 30000
)
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment