Skip to content
Snippets Groups Projects
Commit dc0d1c77 authored by Xin.Zh's avatar Xin.Zh Committed by GitHub
Browse files

Merge pull request #389 from CodingSinger/HEALTH_CHECK

add healthcheck router
parents 12ba9977 b18d86e4
No related branches found
No related tags found
No related merge requests found
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package router
import (
"github.com/apache/dubbo-go/protocol"
)
// HealthChecker is used to determine whether the invoker is healthy or not
type HealthChecker interface {
// IsHealthy evaluates the healthy state on the given Invoker
IsHealthy(invoker protocol.Invoker) bool
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package healthcheck
import (
"math"
)
import (
"github.com/apache/dubbo-go/cluster/router"
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/protocol"
)
func init() {
extension.SethealthChecker(constant.DEFAULT_HEALTH_CHECKER, NewDefaultHealthChecker)
}
// DefaultHealthChecker is the default implementation of HealthChecker, which determines the health status of
// the invoker based on the number of successive bad request and the current active request.
type DefaultHealthChecker struct {
// the limit of outstanding request
outStandingRequestConutLimit int32
// the threshold of successive-failure-request
requestSuccessiveFailureThreshold int32
// value of circuit-tripped timeout factor
circuitTrippedTimeoutFactor int32
}
// IsHealthy evaluates the healthy state on the given Invoker based on the number of successive bad request
// and the current active request
func (c *DefaultHealthChecker) IsHealthy(invoker protocol.Invoker) bool {
urlStatus := protocol.GetURLStatus(invoker.GetUrl())
if c.isCircuitBreakerTripped(urlStatus) || urlStatus.GetActive() > c.GetOutStandingRequestConutLimit() {
logger.Debugf("Invoker [%s] is currently in circuitbreaker tripped state", invoker.GetUrl().Key())
return false
}
return true
}
// isCircuitBreakerTripped determine whether the invoker is in the tripped state by the number of successive bad request
func (c *DefaultHealthChecker) isCircuitBreakerTripped(status *protocol.RPCStatus) bool {
circuitBreakerTimeout := c.getCircuitBreakerTimeout(status)
currentTime := protocol.CurrentTimeMillis()
if circuitBreakerTimeout <= 0 {
return false
}
return circuitBreakerTimeout > currentTime
}
// getCircuitBreakerTimeout get the timestamp recovered from tripped state, the unit is millisecond
func (c *DefaultHealthChecker) getCircuitBreakerTimeout(status *protocol.RPCStatus) int64 {
sleepWindow := c.getCircuitBreakerSleepWindowTime(status)
if sleepWindow <= 0 {
return 0
}
return status.GetLastRequestFailedTimestamp() + sleepWindow
}
// getCircuitBreakerSleepWindowTime get the sleep window time of invoker, the unit is millisecond
func (c *DefaultHealthChecker) getCircuitBreakerSleepWindowTime(status *protocol.RPCStatus) int64 {
successiveFailureCount := status.GetSuccessiveRequestFailureCount()
diff := successiveFailureCount - c.GetRequestSuccessiveFailureThreshold()
if diff < 0 {
return 0
} else if diff > constant.DEFAULT_SUCCESSIVE_FAILED_REQUEST_MAX_DIFF {
diff = constant.DEFAULT_SUCCESSIVE_FAILED_REQUEST_MAX_DIFF
}
sleepWindow := (1 << diff) * c.GetCircuitTrippedTimeoutFactor()
if sleepWindow > constant.MAX_CIRCUIT_TRIPPED_TIMEOUT_IN_MS {
sleepWindow = constant.MAX_CIRCUIT_TRIPPED_TIMEOUT_IN_MS
}
return int64(sleepWindow)
}
// GetOutStandingRequestConutLimit return the requestSuccessiveFailureThreshold bound to this DefaultHealthChecker
func (c *DefaultHealthChecker) GetRequestSuccessiveFailureThreshold() int32 {
return c.requestSuccessiveFailureThreshold
}
// GetOutStandingRequestConutLimit return the circuitTrippedTimeoutFactor bound to this DefaultHealthChecker
func (c *DefaultHealthChecker) GetCircuitTrippedTimeoutFactor() int32 {
return c.circuitTrippedTimeoutFactor
}
// GetOutStandingRequestConutLimit return the outStandingRequestConutLimit bound to this DefaultHealthChecker
func (c *DefaultHealthChecker) GetOutStandingRequestConutLimit() int32 {
return c.outStandingRequestConutLimit
}
// NewDefaultHealthChecker constructs a new DefaultHealthChecker based on the url
func NewDefaultHealthChecker(url *common.URL) router.HealthChecker {
return &DefaultHealthChecker{
outStandingRequestConutLimit: int32(url.GetParamInt(constant.OUTSTANDING_REQUEST_COUNT_LIMIT_KEY, math.MaxInt32)),
requestSuccessiveFailureThreshold: int32(url.GetParamInt(constant.SUCCESSIVE_FAILED_REQUEST_THRESHOLD_KEY, constant.DEFAULT_SUCCESSIVE_FAILED_REQUEST_MAX_DIFF)),
circuitTrippedTimeoutFactor: int32(url.GetParamInt(constant.CIRCUIT_TRIPPED_TIMEOUT_FACTOR_KEY, constant.DEFAULT_CIRCUIT_TRIPPED_TIMEOUT_FACTOR)),
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package healthcheck
import (
"math"
"testing"
)
import (
"github.com/stretchr/testify/assert"
)
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/protocol"
)
func TestDefaultHealthChecker_IsHealthy(t *testing.T) {
defer protocol.CleanAllStatus()
url, _ := common.NewURL("dubbo://192.168.10.10:20000/com.ikurento.user.UserProvider")
hc := NewDefaultHealthChecker(&url).(*DefaultHealthChecker)
invoker := NewMockInvoker(url, 1)
healthy := hc.IsHealthy(invoker)
assert.True(t, healthy)
url.SetParam(constant.OUTSTANDING_REQUEST_COUNT_LIMIT_KEY, "10")
url.SetParam(constant.SUCCESSIVE_FAILED_REQUEST_THRESHOLD_KEY, "100")
// fake the outgoing request
for i := 0; i < 11; i++ {
request(url, "test", 0, true, false)
}
hc = NewDefaultHealthChecker(&url).(*DefaultHealthChecker)
healthy = hc.IsHealthy(invoker)
// the outgoing request is more than OUTSTANDING_REQUEST_COUNT_LIMIT, go to unhealthy
assert.False(t, hc.IsHealthy(invoker))
// successive failed count is more than constant.SUCCESSIVE_FAILED_REQUEST_THRESHOLD_KEY, go to unhealthy
for i := 0; i < 11; i++ {
request(url, "test", 0, false, false)
}
url.SetParam(constant.SUCCESSIVE_FAILED_REQUEST_THRESHOLD_KEY, "10")
url.SetParam(constant.OUTSTANDING_REQUEST_COUNT_LIMIT_KEY, "1000")
hc = NewDefaultHealthChecker(&url).(*DefaultHealthChecker)
healthy = hc.IsHealthy(invoker)
assert.False(t, hc.IsHealthy(invoker))
// reset successive failed count and go to healthy
request(url, "test", 0, false, true)
healthy = hc.IsHealthy(invoker)
assert.True(t, hc.IsHealthy(invoker))
}
func TestDefaultHealthChecker_getCircuitBreakerSleepWindowTime(t *testing.T) {
defer protocol.CleanAllStatus()
url, _ := common.NewURL("dubbo://192.168.10.10:20000/com.ikurento.user.UserProvider")
defaultHc := NewDefaultHealthChecker(&url).(*DefaultHealthChecker)
// Increase the number of failed requests
for i := 0; i < 100; i++ {
request(url, "test", 1, false, false)
}
sleepWindowTime := defaultHc.getCircuitBreakerSleepWindowTime(protocol.GetURLStatus(url))
assert.True(t, sleepWindowTime == constant.MAX_CIRCUIT_TRIPPED_TIMEOUT_IN_MS)
// Adjust the threshold size to 1000
url.SetParam(constant.SUCCESSIVE_FAILED_REQUEST_THRESHOLD_KEY, "1000")
sleepWindowTime = NewDefaultHealthChecker(&url).(*DefaultHealthChecker).getCircuitBreakerSleepWindowTime(protocol.GetURLStatus(url))
assert.True(t, sleepWindowTime == 0)
url1, _ := common.NewURL("dubbo://192.168.10.11:20000/com.ikurento.user.UserProvider")
sleepWindowTime = defaultHc.getCircuitBreakerSleepWindowTime(protocol.GetURLStatus(url1))
assert.True(t, sleepWindowTime == 0)
request(url1, "test", 1, false, false)
request(url1, "test", 1, false, false)
request(url1, "test", 1, false, false)
request(url1, "test", 1, false, false)
request(url1, "test", 1, false, false)
request(url1, "test", 1, false, false)
sleepWindowTime = defaultHc.getCircuitBreakerSleepWindowTime(protocol.GetURLStatus(url1))
assert.True(t, sleepWindowTime > 0 && sleepWindowTime < constant.MAX_CIRCUIT_TRIPPED_TIMEOUT_IN_MS)
}
func TestDefaultHealthChecker_getCircuitBreakerTimeout(t *testing.T) {
defer protocol.CleanAllStatus()
url, _ := common.NewURL("dubbo://192.168.10.10:20000/com.ikurento.user.UserProvider")
defaultHc := NewDefaultHealthChecker(&url).(*DefaultHealthChecker)
timeout := defaultHc.getCircuitBreakerTimeout(protocol.GetURLStatus(url))
assert.True(t, timeout == 0)
url1, _ := common.NewURL("dubbo://192.168.10.11:20000/com.ikurento.user.UserProvider")
request(url1, "test", 1, false, false)
request(url1, "test", 1, false, false)
request(url1, "test", 1, false, false)
request(url1, "test", 1, false, false)
request(url1, "test", 1, false, false)
request(url1, "test", 1, false, false)
timeout = defaultHc.getCircuitBreakerTimeout(protocol.GetURLStatus(url1))
// timeout must after the current time
assert.True(t, timeout > protocol.CurrentTimeMillis())
}
func TestDefaultHealthChecker_isCircuitBreakerTripped(t *testing.T) {
defer protocol.CleanAllStatus()
url, _ := common.NewURL("dubbo://192.168.10.10:20000/com.ikurento.user.UserProvider")
defaultHc := NewDefaultHealthChecker(&url).(*DefaultHealthChecker)
status := protocol.GetURLStatus(url)
tripped := defaultHc.isCircuitBreakerTripped(status)
assert.False(t, tripped)
// Increase the number of failed requests
for i := 0; i < 100; i++ {
request(url, "test", 1, false, false)
}
tripped = defaultHc.isCircuitBreakerTripped(protocol.GetURLStatus(url))
assert.True(t, tripped)
}
func TestNewDefaultHealthChecker(t *testing.T) {
defer protocol.CleanAllStatus()
url, _ := common.NewURL("dubbo://192.168.10.10:20000/com.ikurento.user.UserProvider")
defaultHc := NewDefaultHealthChecker(&url).(*DefaultHealthChecker)
assert.NotNil(t, defaultHc)
assert.Equal(t, defaultHc.outStandingRequestConutLimit, int32(math.MaxInt32))
assert.Equal(t, defaultHc.requestSuccessiveFailureThreshold, int32(constant.DEFAULT_SUCCESSIVE_FAILED_REQUEST_MAX_DIFF))
url1, _ := common.NewURL("dubbo://192.168.10.10:20000/com.ikurento.user.UserProvider")
url1.SetParam(constant.OUTSTANDING_REQUEST_COUNT_LIMIT_KEY, "10")
url1.SetParam(constant.SUCCESSIVE_FAILED_REQUEST_THRESHOLD_KEY, "10")
nondefaultHc := NewDefaultHealthChecker(&url1).(*DefaultHealthChecker)
assert.NotNil(t, nondefaultHc)
assert.Equal(t, nondefaultHc.outStandingRequestConutLimit, int32(10))
assert.Equal(t, nondefaultHc.requestSuccessiveFailureThreshold, int32(10))
}
func request(url common.URL, method string, elapsed int64, active, succeeded bool) {
protocol.BeginCount(url, method)
if !active {
protocol.EndCount(url, method, elapsed, succeeded)
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package healthcheck
import (
"github.com/apache/dubbo-go/cluster/router"
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
)
func init() {
extension.SetRouterFactory(constant.HealthCheckRouterName, newHealthCheckRouteFactory)
}
// HealthCheckRouteFactory
type HealthCheckRouteFactory struct {
}
// newHealthCheckRouteFactory construct a new HealthCheckRouteFactory
func newHealthCheckRouteFactory() router.RouterFactory {
return &HealthCheckRouteFactory{}
}
// NewRouter construct a new NewHealthCheckRouter via url
func (f *HealthCheckRouteFactory) NewRouter(url *common.URL) (router.Router, error) {
return NewHealthCheckRouter(url)
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package healthcheck
import (
"context"
"testing"
)
import (
"github.com/stretchr/testify/assert"
)
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/protocol"
)
type MockInvoker struct {
url common.URL
}
func NewMockInvoker(url common.URL, successCount int) *MockInvoker {
return &MockInvoker{
url: url,
}
}
func (bi *MockInvoker) GetUrl() common.URL {
return bi.url
}
func (bi *MockInvoker) IsAvailable() bool {
return true
}
func (bi *MockInvoker) IsDestroyed() bool {
return true
}
func (bi *MockInvoker) Invoke(_ context.Context, _ protocol.Invocation) protocol.Result {
return nil
}
func (bi *MockInvoker) Destroy() {
}
func TestHealthCheckRouteFactory(t *testing.T) {
factory := newHealthCheckRouteFactory()
assert.NotNil(t, factory)
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package healthcheck
import (
"github.com/apache/dubbo-go/cluster/router"
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/protocol"
)
const (
HEALTH_ROUTE_ENABLED_KEY = "health.route.enabled"
)
// HealthCheckRouter provides a health-first routing mechanism through HealthChecker
type HealthCheckRouter struct {
url *common.URL
enabled bool
checker router.HealthChecker
}
// NewHealthCheckRouter construct an HealthCheckRouter via url
func NewHealthCheckRouter(url *common.URL) (router.Router, error) {
r := &HealthCheckRouter{
url: url,
enabled: url.GetParamBool(HEALTH_ROUTE_ENABLED_KEY, false),
}
if r.enabled {
checkerName := url.GetParam(constant.HEALTH_CHECKER, constant.DEFAULT_HEALTH_CHECKER)
r.checker = extension.GetHealthChecker(checkerName, url)
}
return r, nil
}
// Route gets a list of healthy invoker
func (r *HealthCheckRouter) Route(invokers []protocol.Invoker, url *common.URL, invocation protocol.Invocation) []protocol.Invoker {
if !r.enabled {
return invokers
}
healthyInvokers := make([]protocol.Invoker, 0, len(invokers))
// Add healthy invoker to the list
for _, invoker := range invokers {
if r.checker.IsHealthy(invoker) {
healthyInvokers = append(healthyInvokers, invoker)
}
}
// If all Invoke are considered unhealthy, downgrade to all inovker
if len(healthyInvokers) == 0 {
logger.Warnf(" Now all invokers are unhealthy, so downgraded to all! Service: [%s]", url.ServiceKey())
return invokers
}
return healthyInvokers
}
// Priority
func (r *HealthCheckRouter) Priority() int64 {
return 0
}
// URL Return URL in router
func (r *HealthCheckRouter) URL() common.URL {
return *r.url
}
// HealthyChecker returns the HealthChecker bound to this HealthCheckRouter
func (r *HealthCheckRouter) HealthyChecker() router.HealthChecker {
return r.checker
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package healthcheck
import (
"math"
"testing"
"time"
)
import (
"github.com/stretchr/testify/assert"
)
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/protocol"
"github.com/apache/dubbo-go/protocol/invocation"
)
func TestHealthCheckRouter_Route(t *testing.T) {
defer protocol.CleanAllStatus()
consumerURL, _ := common.NewURL("dubbo://192.168.10.1/com.ikurento.user.UserProvider")
consumerURL.SetParam(HEALTH_ROUTE_ENABLED_KEY, "true")
url1, _ := common.NewURL("dubbo://192.168.10.10:20000/com.ikurento.user.UserProvider")
url2, _ := common.NewURL("dubbo://192.168.10.11:20000/com.ikurento.user.UserProvider")
url3, _ := common.NewURL("dubbo://192.168.10.12:20000/com.ikurento.user.UserProvider")
hcr, _ := NewHealthCheckRouter(&consumerURL)
var invokers []protocol.Invoker
invoker1 := NewMockInvoker(url1, 1)
invoker2 := NewMockInvoker(url2, 1)
invoker3 := NewMockInvoker(url3, 1)
invokers = append(invokers, invoker1, invoker2, invoker3)
inv := invocation.NewRPCInvocation("test", nil, nil)
res := hcr.Route(invokers, &consumerURL, inv)
// now all invokers are healthy
assert.True(t, len(res) == len(invokers))
for i := 0; i < 10; i++ {
request(url1, "test", 0, false, false)
}
res = hcr.Route(invokers, &consumerURL, inv)
// invokers1 is unhealthy now
assert.True(t, len(res) == 2 && !contains(res, invoker1))
for i := 0; i < 10; i++ {
request(url1, "test", 0, false, false)
request(url2, "test", 0, false, false)
}
res = hcr.Route(invokers, &consumerURL, inv)
// only invokers3 is healthy now
assert.True(t, len(res) == 1 && !contains(res, invoker1) && !contains(res, invoker2))
for i := 0; i < 10; i++ {
request(url1, "test", 0, false, false)
request(url2, "test", 0, false, false)
request(url3, "test", 0, false, false)
}
res = hcr.Route(invokers, &consumerURL, inv)
// now all invokers are unhealthy, so downgraded to all
assert.True(t, len(res) == 3)
// reset the invoker1 successive failed count, so invoker1 go to healthy
request(url1, "test", 0, false, true)
res = hcr.Route(invokers, &consumerURL, inv)
assert.True(t, contains(res, invoker1))
for i := 0; i < 6; i++ {
request(url1, "test", 0, false, false)
}
// now all invokers are unhealthy, so downgraded to all again
res = hcr.Route(invokers, &consumerURL, inv)
assert.True(t, len(res) == 3)
time.Sleep(time.Second * 2)
// invoker1 go to healthy again after 2s
res = hcr.Route(invokers, &consumerURL, inv)
assert.True(t, contains(res, invoker1))
}
func contains(invokers []protocol.Invoker, invoker protocol.Invoker) bool {
for _, e := range invokers {
if e == invoker {
return true
}
}
return false
}
func TestNewHealthCheckRouter(t *testing.T) {
defer protocol.CleanAllStatus()
url, _ := common.NewURL("dubbo://192.168.10.10:20000/com.ikurento.user.UserProvider")
hcr, _ := NewHealthCheckRouter(&url)
h := hcr.(*HealthCheckRouter)
assert.Nil(t, h.checker)
url.SetParam(HEALTH_ROUTE_ENABLED_KEY, "true")
hcr, _ = NewHealthCheckRouter(&url)
h = hcr.(*HealthCheckRouter)
assert.NotNil(t, h.checker)
dhc := h.checker.(*DefaultHealthChecker)
assert.Equal(t, dhc.outStandingRequestConutLimit, int32(math.MaxInt32))
assert.Equal(t, dhc.requestSuccessiveFailureThreshold, int32(constant.DEFAULT_SUCCESSIVE_FAILED_THRESHOLD))
assert.Equal(t, dhc.circuitTrippedTimeoutFactor, int32(constant.DEFAULT_CIRCUIT_TRIPPED_TIMEOUT_FACTOR))
url.SetParam(constant.CIRCUIT_TRIPPED_TIMEOUT_FACTOR_KEY, "500")
url.SetParam(constant.SUCCESSIVE_FAILED_REQUEST_THRESHOLD_KEY, "10")
url.SetParam(constant.OUTSTANDING_REQUEST_COUNT_LIMIT_KEY, "1000")
hcr, _ = NewHealthCheckRouter(&url)
h = hcr.(*HealthCheckRouter)
dhc = h.checker.(*DefaultHealthChecker)
assert.Equal(t, dhc.outStandingRequestConutLimit, int32(1000))
assert.Equal(t, dhc.requestSuccessiveFailureThreshold, int32(10))
assert.Equal(t, dhc.circuitTrippedTimeoutFactor, int32(500))
}
......@@ -159,6 +159,8 @@ const (
ConditionAppRouterName = "app"
// ListenableRouterName Specify listenable router name
ListenableRouterName = "listenable"
// HealthCheckRouterName Specify the name of HealthCheckRouter
HealthCheckRouterName = "health_check"
// ConditionRouterRuleSuffix Specify condition router suffix
ConditionRouterRuleSuffix = ".condition-router"
......@@ -188,3 +190,25 @@ const (
ACCESS_KEY_ID_KEY = "accessKeyId"
SECRET_ACCESS_KEY_KEY = "secretAccessKey"
)
// HealthCheck Router
const (
// The key of HealthCheck SPI
HEALTH_CHECKER = "health.checker"
// The name of the default implementation of HealthChecker
DEFAULT_HEALTH_CHECKER = "default"
// The key of oustanding-request-limit
OUTSTANDING_REQUEST_COUNT_LIMIT_KEY = "outstanding.request.limit"
// The key of successive-failed-request's threshold
SUCCESSIVE_FAILED_REQUEST_THRESHOLD_KEY = "successive.failed.threshold"
// The key of circuit-tripped timeout factor
CIRCUIT_TRIPPED_TIMEOUT_FACTOR_KEY = "circuit.tripped.timeout.factor"
// The default threshold of successive-failed-request if not specfied
DEFAULT_SUCCESSIVE_FAILED_THRESHOLD = 5
// The default maximum diff between successive-failed-request's threshold and actual successive-failed-request's count
DEFAULT_SUCCESSIVE_FAILED_REQUEST_MAX_DIFF = 5
// The default factor of circuit-tripped timeout if not specfied
DEFAULT_CIRCUIT_TRIPPED_TIMEOUT_FACTOR = 1000
// The default time window of circuit-tripped in millisecond if not specfied
MAX_CIRCUIT_TRIPPED_TIMEOUT_IN_MS = 30000
)
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package extension
import (
"github.com/apache/dubbo-go/cluster/router"
"github.com/apache/dubbo-go/common"
)
var (
healthCheckers = make(map[string]func(url *common.URL) router.HealthChecker)
)
// SethealthChecker set the HealthChecker with name
func SethealthChecker(name string, fcn func(url *common.URL) router.HealthChecker) {
healthCheckers[name] = fcn
}
// GetHealthChecker get the HealthChecker with name
func GetHealthChecker(name string, url *common.URL) router.HealthChecker {
if healthCheckers[name] == nil {
panic("healthCheckers for " + name + " is not existing, make sure you have import the package.")
}
return healthCheckers[name](url)
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package extension
import (
"testing"
)
import (
"github.com/stretchr/testify/assert"
)
import (
"github.com/apache/dubbo-go/cluster/router"
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/protocol"
)
func TestGetHealthChecker(t *testing.T) {
SethealthChecker("mock", newMockhealthCheck)
checker := GetHealthChecker("mock", common.NewURLWithOptions())
assert.NotNil(t, checker)
}
type mockHealthChecker struct {
}
func (m mockHealthChecker) IsHealthy(invoker protocol.Invoker) bool {
return true
}
func newMockhealthCheck(url *common.URL) router.HealthChecker {
return &mockHealthChecker{}
}
......@@ -153,7 +153,7 @@ func endCount0(rpcStatus *RPCStatus, elapsed int64, succeeded bool) {
}
atomic.StoreInt32(&rpcStatus.successiveRequestFailureCount, 0)
} else {
atomic.StoreInt64(&rpcStatus.lastRequestFailedTimestamp, time.Now().Unix())
atomic.StoreInt64(&rpcStatus.lastRequestFailedTimestamp, CurrentTimeMillis())
atomic.AddInt32(&rpcStatus.successiveRequestFailureCount, 1)
atomic.AddInt32(&rpcStatus.failed, 1)
atomic.AddInt64(&rpcStatus.failedElapsed, elapsed)
......@@ -167,3 +167,17 @@ func endCount0(rpcStatus *RPCStatus, elapsed int64, succeeded bool) {
func CurrentTimeMillis() int64 {
return time.Now().UnixNano() / int64(time.Millisecond)
}
// Destroy is used to clean all status
func CleanAllStatus() {
delete1 := func(key interface{}, value interface{}) bool {
methodStatistics.Delete(key)
return true
}
methodStatistics.Range(delete1)
delete2 := func(key interface{}, value interface{}) bool {
serviceStatistic.Delete(key)
return true
}
serviceStatistic.Range(delete2)
}
......@@ -14,7 +14,7 @@ import (
)
func TestBeginCount(t *testing.T) {
defer destroy()
defer CleanAllStatus()
url, _ := common.NewURL("dubbo://192.168.10.10:20000/com.ikurento.user.UserProvider")
BeginCount(url, "test")
......@@ -28,7 +28,7 @@ func TestBeginCount(t *testing.T) {
}
func TestEndCount(t *testing.T) {
defer destroy()
defer CleanAllStatus()
url, _ := common.NewURL("dubbo://192.168.10.10:20000/com.ikurento.user.UserProvider")
EndCount(url, "test", 100, true)
......@@ -41,7 +41,7 @@ func TestEndCount(t *testing.T) {
}
func TestGetMethodStatus(t *testing.T) {
defer destroy()
defer CleanAllStatus()
url, _ := common.NewURL("dubbo://192.168.10.10:20000/com.ikurento.user.UserProvider")
status := GetMethodStatus(url, "test")
......@@ -50,7 +50,7 @@ func TestGetMethodStatus(t *testing.T) {
}
func TestGetUrlStatus(t *testing.T) {
defer destroy()
defer CleanAllStatus()
url, _ := common.NewURL("dubbo://192.168.10.10:20000/com.ikurento.user.UserProvider")
status := GetURLStatus(url)
......@@ -59,7 +59,7 @@ func TestGetUrlStatus(t *testing.T) {
}
func Test_beginCount0(t *testing.T) {
defer destroy()
defer CleanAllStatus()
url, _ := common.NewURL("dubbo://192.168.10.10:20000/com.ikurento.user.UserProvider")
status := GetURLStatus(url)
......@@ -68,7 +68,7 @@ func Test_beginCount0(t *testing.T) {
}
func Test_All(t *testing.T) {
defer destroy()
defer CleanAllStatus()
url, _ := common.NewURL("dubbo://192.168.10.10:20000/com.ikurento.user.UserProvider")
request(url, "test", 100, false, true)
......@@ -129,23 +129,10 @@ func request(url common.URL, method string, elapsed int64, active, succeeded boo
}
func TestCurrentTimeMillis(t *testing.T) {
defer destroy()
defer CleanAllStatus()
c := CurrentTimeMillis()
assert.NotNil(t, c)
str := strconv.FormatInt(c, 10)
i, _ := strconv.ParseInt(str, 10, 64)
assert.Equal(t, c, i)
}
func destroy() {
delete1 := func(key interface{}, value interface{}) bool {
methodStatistics.Delete(key)
return true
}
methodStatistics.Range(delete1)
delete2 := func(key interface{}, value interface{}) bool {
serviceStatistic.Delete(key)
return true
}
serviceStatistic.Range(delete2)
}
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment