diff --git a/cluster/router/health_checker.go b/cluster/router/health_checker.go new file mode 100644 index 0000000000000000000000000000000000000000..d9e3087a272dd500cdd1dc9dc6680d436891f88b --- /dev/null +++ b/cluster/router/health_checker.go @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package router + +import ( + "github.com/apache/dubbo-go/protocol" +) + +// HealthChecker is used to determine whether the invoker is healthy or not +type HealthChecker interface { + // IsHealthy evaluates the healthy state on the given Invoker + IsHealthy(invoker protocol.Invoker) bool +} diff --git a/cluster/router/healthcheck/default_health_check.go b/cluster/router/healthcheck/default_health_check.go new file mode 100644 index 0000000000000000000000000000000000000000..a26f86ddac45aa6e999cd4453aa296d0786a02ba --- /dev/null +++ b/cluster/router/healthcheck/default_health_check.go @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package healthcheck + +import ( + "math" +) + +import ( + "github.com/apache/dubbo-go/cluster/router" + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/common/logger" + "github.com/apache/dubbo-go/protocol" +) + +func init() { + extension.SethealthChecker(constant.DEFAULT_HEALTH_CHECKER, NewDefaultHealthChecker) +} + +// DefaultHealthChecker is the default implementation of HealthChecker, which determines the health status of +// the invoker based on the number of successive bad request and the current active request. +type DefaultHealthChecker struct { + // the limit of outstanding request + outStandingRequestConutLimit int32 + // the threshold of successive-failure-request + requestSuccessiveFailureThreshold int32 + // value of circuit-tripped timeout factor + circuitTrippedTimeoutFactor int32 +} + +// IsHealthy evaluates the healthy state on the given Invoker based on the number of successive bad request +// and the current active request +func (c *DefaultHealthChecker) IsHealthy(invoker protocol.Invoker) bool { + urlStatus := protocol.GetURLStatus(invoker.GetUrl()) + if c.isCircuitBreakerTripped(urlStatus) || urlStatus.GetActive() > c.GetOutStandingRequestConutLimit() { + logger.Debugf("Invoker [%s] is currently in circuitbreaker tripped state", invoker.GetUrl().Key()) + return false + } + return true +} + +// isCircuitBreakerTripped determine whether the invoker is in the tripped state by the number of successive bad request +func (c *DefaultHealthChecker) isCircuitBreakerTripped(status *protocol.RPCStatus) bool { + circuitBreakerTimeout := c.getCircuitBreakerTimeout(status) + currentTime := protocol.CurrentTimeMillis() + if circuitBreakerTimeout <= 0 { + return false + } + return circuitBreakerTimeout > currentTime +} + +// getCircuitBreakerTimeout get the timestamp recovered from tripped state, the unit is millisecond +func (c *DefaultHealthChecker) getCircuitBreakerTimeout(status *protocol.RPCStatus) int64 { + sleepWindow := c.getCircuitBreakerSleepWindowTime(status) + if sleepWindow <= 0 { + return 0 + } + return status.GetLastRequestFailedTimestamp() + sleepWindow +} + +// getCircuitBreakerSleepWindowTime get the sleep window time of invoker, the unit is millisecond +func (c *DefaultHealthChecker) getCircuitBreakerSleepWindowTime(status *protocol.RPCStatus) int64 { + + successiveFailureCount := status.GetSuccessiveRequestFailureCount() + diff := successiveFailureCount - c.GetRequestSuccessiveFailureThreshold() + if diff < 0 { + return 0 + } else if diff > constant.DEFAULT_SUCCESSIVE_FAILED_REQUEST_MAX_DIFF { + diff = constant.DEFAULT_SUCCESSIVE_FAILED_REQUEST_MAX_DIFF + } + sleepWindow := (1 << diff) * c.GetCircuitTrippedTimeoutFactor() + if sleepWindow > constant.MAX_CIRCUIT_TRIPPED_TIMEOUT_IN_MS { + sleepWindow = constant.MAX_CIRCUIT_TRIPPED_TIMEOUT_IN_MS + } + return int64(sleepWindow) +} + +// GetOutStandingRequestConutLimit return the requestSuccessiveFailureThreshold bound to this DefaultHealthChecker +func (c *DefaultHealthChecker) GetRequestSuccessiveFailureThreshold() int32 { + return c.requestSuccessiveFailureThreshold +} + +// GetOutStandingRequestConutLimit return the circuitTrippedTimeoutFactor bound to this DefaultHealthChecker +func (c *DefaultHealthChecker) GetCircuitTrippedTimeoutFactor() int32 { + return c.circuitTrippedTimeoutFactor +} + +// GetOutStandingRequestConutLimit return the outStandingRequestConutLimit bound to this DefaultHealthChecker +func (c *DefaultHealthChecker) GetOutStandingRequestConutLimit() int32 { + return c.outStandingRequestConutLimit +} + +// NewDefaultHealthChecker constructs a new DefaultHealthChecker based on the url +func NewDefaultHealthChecker(url *common.URL) router.HealthChecker { + return &DefaultHealthChecker{ + outStandingRequestConutLimit: int32(url.GetParamInt(constant.OUTSTANDING_REQUEST_COUNT_LIMIT_KEY, math.MaxInt32)), + requestSuccessiveFailureThreshold: int32(url.GetParamInt(constant.SUCCESSIVE_FAILED_REQUEST_THRESHOLD_KEY, constant.DEFAULT_SUCCESSIVE_FAILED_REQUEST_MAX_DIFF)), + circuitTrippedTimeoutFactor: int32(url.GetParamInt(constant.CIRCUIT_TRIPPED_TIMEOUT_FACTOR_KEY, constant.DEFAULT_CIRCUIT_TRIPPED_TIMEOUT_FACTOR)), + } +} diff --git a/cluster/router/healthcheck/default_health_check_test.go b/cluster/router/healthcheck/default_health_check_test.go new file mode 100644 index 0000000000000000000000000000000000000000..74aa3940743a012f907cfe3d8811a618f07ff800 --- /dev/null +++ b/cluster/router/healthcheck/default_health_check_test.go @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package healthcheck + +import ( + "math" + "testing" +) + +import ( + "github.com/stretchr/testify/assert" +) + +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/protocol" +) + +func TestDefaultHealthChecker_IsHealthy(t *testing.T) { + + defer protocol.CleanAllStatus() + url, _ := common.NewURL("dubbo://192.168.10.10:20000/com.ikurento.user.UserProvider") + hc := NewDefaultHealthChecker(&url).(*DefaultHealthChecker) + invoker := NewMockInvoker(url, 1) + healthy := hc.IsHealthy(invoker) + assert.True(t, healthy) + + url.SetParam(constant.OUTSTANDING_REQUEST_COUNT_LIMIT_KEY, "10") + url.SetParam(constant.SUCCESSIVE_FAILED_REQUEST_THRESHOLD_KEY, "100") + // fake the outgoing request + for i := 0; i < 11; i++ { + request(url, "test", 0, true, false) + } + hc = NewDefaultHealthChecker(&url).(*DefaultHealthChecker) + healthy = hc.IsHealthy(invoker) + // the outgoing request is more than OUTSTANDING_REQUEST_COUNT_LIMIT, go to unhealthy + assert.False(t, hc.IsHealthy(invoker)) + + // successive failed count is more than constant.SUCCESSIVE_FAILED_REQUEST_THRESHOLD_KEY, go to unhealthy + for i := 0; i < 11; i++ { + request(url, "test", 0, false, false) + } + url.SetParam(constant.SUCCESSIVE_FAILED_REQUEST_THRESHOLD_KEY, "10") + url.SetParam(constant.OUTSTANDING_REQUEST_COUNT_LIMIT_KEY, "1000") + hc = NewDefaultHealthChecker(&url).(*DefaultHealthChecker) + healthy = hc.IsHealthy(invoker) + assert.False(t, hc.IsHealthy(invoker)) + + // reset successive failed count and go to healthy + request(url, "test", 0, false, true) + healthy = hc.IsHealthy(invoker) + assert.True(t, hc.IsHealthy(invoker)) +} + +func TestDefaultHealthChecker_getCircuitBreakerSleepWindowTime(t *testing.T) { + defer protocol.CleanAllStatus() + url, _ := common.NewURL("dubbo://192.168.10.10:20000/com.ikurento.user.UserProvider") + defaultHc := NewDefaultHealthChecker(&url).(*DefaultHealthChecker) + // Increase the number of failed requests + for i := 0; i < 100; i++ { + request(url, "test", 1, false, false) + } + sleepWindowTime := defaultHc.getCircuitBreakerSleepWindowTime(protocol.GetURLStatus(url)) + assert.True(t, sleepWindowTime == constant.MAX_CIRCUIT_TRIPPED_TIMEOUT_IN_MS) + + // Adjust the threshold size to 1000 + url.SetParam(constant.SUCCESSIVE_FAILED_REQUEST_THRESHOLD_KEY, "1000") + sleepWindowTime = NewDefaultHealthChecker(&url).(*DefaultHealthChecker).getCircuitBreakerSleepWindowTime(protocol.GetURLStatus(url)) + assert.True(t, sleepWindowTime == 0) + + url1, _ := common.NewURL("dubbo://192.168.10.11:20000/com.ikurento.user.UserProvider") + sleepWindowTime = defaultHc.getCircuitBreakerSleepWindowTime(protocol.GetURLStatus(url1)) + assert.True(t, sleepWindowTime == 0) + request(url1, "test", 1, false, false) + request(url1, "test", 1, false, false) + request(url1, "test", 1, false, false) + request(url1, "test", 1, false, false) + request(url1, "test", 1, false, false) + request(url1, "test", 1, false, false) + sleepWindowTime = defaultHc.getCircuitBreakerSleepWindowTime(protocol.GetURLStatus(url1)) + assert.True(t, sleepWindowTime > 0 && sleepWindowTime < constant.MAX_CIRCUIT_TRIPPED_TIMEOUT_IN_MS) +} + +func TestDefaultHealthChecker_getCircuitBreakerTimeout(t *testing.T) { + defer protocol.CleanAllStatus() + url, _ := common.NewURL("dubbo://192.168.10.10:20000/com.ikurento.user.UserProvider") + defaultHc := NewDefaultHealthChecker(&url).(*DefaultHealthChecker) + timeout := defaultHc.getCircuitBreakerTimeout(protocol.GetURLStatus(url)) + assert.True(t, timeout == 0) + url1, _ := common.NewURL("dubbo://192.168.10.11:20000/com.ikurento.user.UserProvider") + request(url1, "test", 1, false, false) + request(url1, "test", 1, false, false) + request(url1, "test", 1, false, false) + request(url1, "test", 1, false, false) + request(url1, "test", 1, false, false) + request(url1, "test", 1, false, false) + timeout = defaultHc.getCircuitBreakerTimeout(protocol.GetURLStatus(url1)) + // timeout must after the current time + assert.True(t, timeout > protocol.CurrentTimeMillis()) + +} + +func TestDefaultHealthChecker_isCircuitBreakerTripped(t *testing.T) { + defer protocol.CleanAllStatus() + url, _ := common.NewURL("dubbo://192.168.10.10:20000/com.ikurento.user.UserProvider") + defaultHc := NewDefaultHealthChecker(&url).(*DefaultHealthChecker) + status := protocol.GetURLStatus(url) + tripped := defaultHc.isCircuitBreakerTripped(status) + assert.False(t, tripped) + // Increase the number of failed requests + for i := 0; i < 100; i++ { + request(url, "test", 1, false, false) + } + tripped = defaultHc.isCircuitBreakerTripped(protocol.GetURLStatus(url)) + assert.True(t, tripped) + +} + +func TestNewDefaultHealthChecker(t *testing.T) { + defer protocol.CleanAllStatus() + url, _ := common.NewURL("dubbo://192.168.10.10:20000/com.ikurento.user.UserProvider") + defaultHc := NewDefaultHealthChecker(&url).(*DefaultHealthChecker) + assert.NotNil(t, defaultHc) + assert.Equal(t, defaultHc.outStandingRequestConutLimit, int32(math.MaxInt32)) + assert.Equal(t, defaultHc.requestSuccessiveFailureThreshold, int32(constant.DEFAULT_SUCCESSIVE_FAILED_REQUEST_MAX_DIFF)) + + url1, _ := common.NewURL("dubbo://192.168.10.10:20000/com.ikurento.user.UserProvider") + url1.SetParam(constant.OUTSTANDING_REQUEST_COUNT_LIMIT_KEY, "10") + url1.SetParam(constant.SUCCESSIVE_FAILED_REQUEST_THRESHOLD_KEY, "10") + nondefaultHc := NewDefaultHealthChecker(&url1).(*DefaultHealthChecker) + assert.NotNil(t, nondefaultHc) + assert.Equal(t, nondefaultHc.outStandingRequestConutLimit, int32(10)) + assert.Equal(t, nondefaultHc.requestSuccessiveFailureThreshold, int32(10)) +} + +func request(url common.URL, method string, elapsed int64, active, succeeded bool) { + protocol.BeginCount(url, method) + if !active { + protocol.EndCount(url, method, elapsed, succeeded) + } +} diff --git a/cluster/router/healthcheck/factory.go b/cluster/router/healthcheck/factory.go new file mode 100644 index 0000000000000000000000000000000000000000..32d84d145ceb2aa05f5a75de352e52d13dd9d6b3 --- /dev/null +++ b/cluster/router/healthcheck/factory.go @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package healthcheck + +import ( + "github.com/apache/dubbo-go/cluster/router" + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/extension" +) + +func init() { + extension.SetRouterFactory(constant.HealthCheckRouterName, newHealthCheckRouteFactory) +} + +// HealthCheckRouteFactory +type HealthCheckRouteFactory struct { +} + +// newHealthCheckRouteFactory construct a new HealthCheckRouteFactory +func newHealthCheckRouteFactory() router.RouterFactory { + return &HealthCheckRouteFactory{} +} + +// NewRouter construct a new NewHealthCheckRouter via url +func (f *HealthCheckRouteFactory) NewRouter(url *common.URL) (router.Router, error) { + return NewHealthCheckRouter(url) +} diff --git a/cluster/router/healthcheck/factory_test.go b/cluster/router/healthcheck/factory_test.go new file mode 100644 index 0000000000000000000000000000000000000000..a9d94da7c37f0e0c9640de1386998a85823e80a6 --- /dev/null +++ b/cluster/router/healthcheck/factory_test.go @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package healthcheck + +import ( + "context" + "testing" +) + +import ( + "github.com/stretchr/testify/assert" +) + +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/protocol" +) + +type MockInvoker struct { + url common.URL +} + +func NewMockInvoker(url common.URL, successCount int) *MockInvoker { + return &MockInvoker{ + url: url, + } +} + +func (bi *MockInvoker) GetUrl() common.URL { + return bi.url +} +func (bi *MockInvoker) IsAvailable() bool { + return true +} + +func (bi *MockInvoker) IsDestroyed() bool { + return true +} + +func (bi *MockInvoker) Invoke(_ context.Context, _ protocol.Invocation) protocol.Result { + return nil +} + +func (bi *MockInvoker) Destroy() { +} + +func TestHealthCheckRouteFactory(t *testing.T) { + factory := newHealthCheckRouteFactory() + assert.NotNil(t, factory) +} diff --git a/cluster/router/healthcheck/health_check_route.go b/cluster/router/healthcheck/health_check_route.go new file mode 100644 index 0000000000000000000000000000000000000000..1ddc9ccb173881a87bc5351711326f02ab2da3f6 --- /dev/null +++ b/cluster/router/healthcheck/health_check_route.go @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package healthcheck + +import ( + "github.com/apache/dubbo-go/cluster/router" + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/common/logger" + "github.com/apache/dubbo-go/protocol" +) + +const ( + HEALTH_ROUTE_ENABLED_KEY = "health.route.enabled" +) + +// HealthCheckRouter provides a health-first routing mechanism through HealthChecker +type HealthCheckRouter struct { + url *common.URL + enabled bool + checker router.HealthChecker +} + +// NewHealthCheckRouter construct an HealthCheckRouter via url +func NewHealthCheckRouter(url *common.URL) (router.Router, error) { + r := &HealthCheckRouter{ + url: url, + enabled: url.GetParamBool(HEALTH_ROUTE_ENABLED_KEY, false), + } + if r.enabled { + checkerName := url.GetParam(constant.HEALTH_CHECKER, constant.DEFAULT_HEALTH_CHECKER) + r.checker = extension.GetHealthChecker(checkerName, url) + } + return r, nil +} + +// Route gets a list of healthy invoker +func (r *HealthCheckRouter) Route(invokers []protocol.Invoker, url *common.URL, invocation protocol.Invocation) []protocol.Invoker { + if !r.enabled { + return invokers + } + healthyInvokers := make([]protocol.Invoker, 0, len(invokers)) + // Add healthy invoker to the list + for _, invoker := range invokers { + if r.checker.IsHealthy(invoker) { + healthyInvokers = append(healthyInvokers, invoker) + } + } + // If all Invoke are considered unhealthy, downgrade to all inovker + if len(healthyInvokers) == 0 { + logger.Warnf(" Now all invokers are unhealthy, so downgraded to all! Service: [%s]", url.ServiceKey()) + return invokers + } + return healthyInvokers +} + +// Priority +func (r *HealthCheckRouter) Priority() int64 { + return 0 +} + +// URL Return URL in router +func (r *HealthCheckRouter) URL() common.URL { + return *r.url +} + +// HealthyChecker returns the HealthChecker bound to this HealthCheckRouter +func (r *HealthCheckRouter) HealthyChecker() router.HealthChecker { + return r.checker +} diff --git a/cluster/router/healthcheck/health_check_route_test.go b/cluster/router/healthcheck/health_check_route_test.go new file mode 100644 index 0000000000000000000000000000000000000000..759ef93dbeb8d91a82eefd59060afbe8a10a4440 --- /dev/null +++ b/cluster/router/healthcheck/health_check_route_test.go @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package healthcheck + +import ( + "math" + "testing" + "time" +) + +import ( + "github.com/stretchr/testify/assert" +) + +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/protocol" + "github.com/apache/dubbo-go/protocol/invocation" +) + +func TestHealthCheckRouter_Route(t *testing.T) { + defer protocol.CleanAllStatus() + consumerURL, _ := common.NewURL("dubbo://192.168.10.1/com.ikurento.user.UserProvider") + consumerURL.SetParam(HEALTH_ROUTE_ENABLED_KEY, "true") + url1, _ := common.NewURL("dubbo://192.168.10.10:20000/com.ikurento.user.UserProvider") + url2, _ := common.NewURL("dubbo://192.168.10.11:20000/com.ikurento.user.UserProvider") + url3, _ := common.NewURL("dubbo://192.168.10.12:20000/com.ikurento.user.UserProvider") + hcr, _ := NewHealthCheckRouter(&consumerURL) + + var invokers []protocol.Invoker + invoker1 := NewMockInvoker(url1, 1) + invoker2 := NewMockInvoker(url2, 1) + invoker3 := NewMockInvoker(url3, 1) + invokers = append(invokers, invoker1, invoker2, invoker3) + inv := invocation.NewRPCInvocation("test", nil, nil) + res := hcr.Route(invokers, &consumerURL, inv) + // now all invokers are healthy + assert.True(t, len(res) == len(invokers)) + + for i := 0; i < 10; i++ { + request(url1, "test", 0, false, false) + } + res = hcr.Route(invokers, &consumerURL, inv) + // invokers1 is unhealthy now + assert.True(t, len(res) == 2 && !contains(res, invoker1)) + + for i := 0; i < 10; i++ { + request(url1, "test", 0, false, false) + request(url2, "test", 0, false, false) + } + + res = hcr.Route(invokers, &consumerURL, inv) + // only invokers3 is healthy now + assert.True(t, len(res) == 1 && !contains(res, invoker1) && !contains(res, invoker2)) + + for i := 0; i < 10; i++ { + request(url1, "test", 0, false, false) + request(url2, "test", 0, false, false) + request(url3, "test", 0, false, false) + } + + res = hcr.Route(invokers, &consumerURL, inv) + // now all invokers are unhealthy, so downgraded to all + assert.True(t, len(res) == 3) + + // reset the invoker1 successive failed count, so invoker1 go to healthy + request(url1, "test", 0, false, true) + res = hcr.Route(invokers, &consumerURL, inv) + assert.True(t, contains(res, invoker1)) + + for i := 0; i < 6; i++ { + request(url1, "test", 0, false, false) + } + // now all invokers are unhealthy, so downgraded to all again + res = hcr.Route(invokers, &consumerURL, inv) + assert.True(t, len(res) == 3) + time.Sleep(time.Second * 2) + // invoker1 go to healthy again after 2s + res = hcr.Route(invokers, &consumerURL, inv) + assert.True(t, contains(res, invoker1)) + +} + +func contains(invokers []protocol.Invoker, invoker protocol.Invoker) bool { + for _, e := range invokers { + if e == invoker { + return true + } + } + return false +} + +func TestNewHealthCheckRouter(t *testing.T) { + defer protocol.CleanAllStatus() + url, _ := common.NewURL("dubbo://192.168.10.10:20000/com.ikurento.user.UserProvider") + hcr, _ := NewHealthCheckRouter(&url) + h := hcr.(*HealthCheckRouter) + assert.Nil(t, h.checker) + + url.SetParam(HEALTH_ROUTE_ENABLED_KEY, "true") + hcr, _ = NewHealthCheckRouter(&url) + h = hcr.(*HealthCheckRouter) + assert.NotNil(t, h.checker) + + dhc := h.checker.(*DefaultHealthChecker) + assert.Equal(t, dhc.outStandingRequestConutLimit, int32(math.MaxInt32)) + assert.Equal(t, dhc.requestSuccessiveFailureThreshold, int32(constant.DEFAULT_SUCCESSIVE_FAILED_THRESHOLD)) + assert.Equal(t, dhc.circuitTrippedTimeoutFactor, int32(constant.DEFAULT_CIRCUIT_TRIPPED_TIMEOUT_FACTOR)) + + url.SetParam(constant.CIRCUIT_TRIPPED_TIMEOUT_FACTOR_KEY, "500") + url.SetParam(constant.SUCCESSIVE_FAILED_REQUEST_THRESHOLD_KEY, "10") + url.SetParam(constant.OUTSTANDING_REQUEST_COUNT_LIMIT_KEY, "1000") + hcr, _ = NewHealthCheckRouter(&url) + h = hcr.(*HealthCheckRouter) + dhc = h.checker.(*DefaultHealthChecker) + assert.Equal(t, dhc.outStandingRequestConutLimit, int32(1000)) + assert.Equal(t, dhc.requestSuccessiveFailureThreshold, int32(10)) + assert.Equal(t, dhc.circuitTrippedTimeoutFactor, int32(500)) +} diff --git a/common/constant/key.go b/common/constant/key.go index 4536d945c333a57b59464a2d37aef78974a4b394..c8a03b3be9f0179bb5317640d38abef8d9cc2b3a 100644 --- a/common/constant/key.go +++ b/common/constant/key.go @@ -159,6 +159,8 @@ const ( ConditionAppRouterName = "app" // ListenableRouterName Specify listenable router name ListenableRouterName = "listenable" + // HealthCheckRouterName Specify the name of HealthCheckRouter + HealthCheckRouterName = "health_check" // ConditionRouterRuleSuffix Specify condition router suffix ConditionRouterRuleSuffix = ".condition-router" @@ -188,3 +190,25 @@ const ( ACCESS_KEY_ID_KEY = "accessKeyId" SECRET_ACCESS_KEY_KEY = "secretAccessKey" ) + +// HealthCheck Router +const ( + // The key of HealthCheck SPI + HEALTH_CHECKER = "health.checker" + // The name of the default implementation of HealthChecker + DEFAULT_HEALTH_CHECKER = "default" + // The key of oustanding-request-limit + OUTSTANDING_REQUEST_COUNT_LIMIT_KEY = "outstanding.request.limit" + // The key of successive-failed-request's threshold + SUCCESSIVE_FAILED_REQUEST_THRESHOLD_KEY = "successive.failed.threshold" + // The key of circuit-tripped timeout factor + CIRCUIT_TRIPPED_TIMEOUT_FACTOR_KEY = "circuit.tripped.timeout.factor" + // The default threshold of successive-failed-request if not specfied + DEFAULT_SUCCESSIVE_FAILED_THRESHOLD = 5 + // The default maximum diff between successive-failed-request's threshold and actual successive-failed-request's count + DEFAULT_SUCCESSIVE_FAILED_REQUEST_MAX_DIFF = 5 + // The default factor of circuit-tripped timeout if not specfied + DEFAULT_CIRCUIT_TRIPPED_TIMEOUT_FACTOR = 1000 + // The default time window of circuit-tripped in millisecond if not specfied + MAX_CIRCUIT_TRIPPED_TIMEOUT_IN_MS = 30000 +) diff --git a/common/extension/health_checker.go b/common/extension/health_checker.go new file mode 100644 index 0000000000000000000000000000000000000000..365c5d0910812efb00eb94408bb226115b037c02 --- /dev/null +++ b/common/extension/health_checker.go @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package extension + +import ( + "github.com/apache/dubbo-go/cluster/router" + "github.com/apache/dubbo-go/common" +) + +var ( + healthCheckers = make(map[string]func(url *common.URL) router.HealthChecker) +) + +// SethealthChecker set the HealthChecker with name +func SethealthChecker(name string, fcn func(url *common.URL) router.HealthChecker) { + healthCheckers[name] = fcn +} + +// GetHealthChecker get the HealthChecker with name +func GetHealthChecker(name string, url *common.URL) router.HealthChecker { + if healthCheckers[name] == nil { + panic("healthCheckers for " + name + " is not existing, make sure you have import the package.") + } + return healthCheckers[name](url) +} diff --git a/common/extension/health_checker_test.go b/common/extension/health_checker_test.go new file mode 100644 index 0000000000000000000000000000000000000000..ec934e6e9cedc5acbef350f17b87b0b2e37bc844 --- /dev/null +++ b/common/extension/health_checker_test.go @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package extension + +import ( + "testing" +) + +import ( + "github.com/stretchr/testify/assert" +) + +import ( + "github.com/apache/dubbo-go/cluster/router" + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/protocol" +) + +func TestGetHealthChecker(t *testing.T) { + SethealthChecker("mock", newMockhealthCheck) + checker := GetHealthChecker("mock", common.NewURLWithOptions()) + assert.NotNil(t, checker) +} + +type mockHealthChecker struct { +} + +func (m mockHealthChecker) IsHealthy(invoker protocol.Invoker) bool { + return true +} + +func newMockhealthCheck(url *common.URL) router.HealthChecker { + return &mockHealthChecker{} +} diff --git a/protocol/rpc_status.go b/protocol/rpc_status.go index 639fd559aa16689a249d035895fc037dc3bc3f8b..13be47c98ece1cc006250ad49ab2b9a8c3b1f625 100644 --- a/protocol/rpc_status.go +++ b/protocol/rpc_status.go @@ -153,7 +153,7 @@ func endCount0(rpcStatus *RPCStatus, elapsed int64, succeeded bool) { } atomic.StoreInt32(&rpcStatus.successiveRequestFailureCount, 0) } else { - atomic.StoreInt64(&rpcStatus.lastRequestFailedTimestamp, time.Now().Unix()) + atomic.StoreInt64(&rpcStatus.lastRequestFailedTimestamp, CurrentTimeMillis()) atomic.AddInt32(&rpcStatus.successiveRequestFailureCount, 1) atomic.AddInt32(&rpcStatus.failed, 1) atomic.AddInt64(&rpcStatus.failedElapsed, elapsed) @@ -167,3 +167,17 @@ func endCount0(rpcStatus *RPCStatus, elapsed int64, succeeded bool) { func CurrentTimeMillis() int64 { return time.Now().UnixNano() / int64(time.Millisecond) } + +// Destroy is used to clean all status +func CleanAllStatus() { + delete1 := func(key interface{}, value interface{}) bool { + methodStatistics.Delete(key) + return true + } + methodStatistics.Range(delete1) + delete2 := func(key interface{}, value interface{}) bool { + serviceStatistic.Delete(key) + return true + } + serviceStatistic.Range(delete2) +} diff --git a/protocol/rpc_status_test.go b/protocol/rpc_status_test.go index ffdb3b535667f32e96c3af2be84851655abf5954..5a07f44eab0db60ba65a155d6c6190ab4ce2d716 100644 --- a/protocol/rpc_status_test.go +++ b/protocol/rpc_status_test.go @@ -14,7 +14,7 @@ import ( ) func TestBeginCount(t *testing.T) { - defer destroy() + defer CleanAllStatus() url, _ := common.NewURL("dubbo://192.168.10.10:20000/com.ikurento.user.UserProvider") BeginCount(url, "test") @@ -28,7 +28,7 @@ func TestBeginCount(t *testing.T) { } func TestEndCount(t *testing.T) { - defer destroy() + defer CleanAllStatus() url, _ := common.NewURL("dubbo://192.168.10.10:20000/com.ikurento.user.UserProvider") EndCount(url, "test", 100, true) @@ -41,7 +41,7 @@ func TestEndCount(t *testing.T) { } func TestGetMethodStatus(t *testing.T) { - defer destroy() + defer CleanAllStatus() url, _ := common.NewURL("dubbo://192.168.10.10:20000/com.ikurento.user.UserProvider") status := GetMethodStatus(url, "test") @@ -50,7 +50,7 @@ func TestGetMethodStatus(t *testing.T) { } func TestGetUrlStatus(t *testing.T) { - defer destroy() + defer CleanAllStatus() url, _ := common.NewURL("dubbo://192.168.10.10:20000/com.ikurento.user.UserProvider") status := GetURLStatus(url) @@ -59,7 +59,7 @@ func TestGetUrlStatus(t *testing.T) { } func Test_beginCount0(t *testing.T) { - defer destroy() + defer CleanAllStatus() url, _ := common.NewURL("dubbo://192.168.10.10:20000/com.ikurento.user.UserProvider") status := GetURLStatus(url) @@ -68,7 +68,7 @@ func Test_beginCount0(t *testing.T) { } func Test_All(t *testing.T) { - defer destroy() + defer CleanAllStatus() url, _ := common.NewURL("dubbo://192.168.10.10:20000/com.ikurento.user.UserProvider") request(url, "test", 100, false, true) @@ -129,23 +129,10 @@ func request(url common.URL, method string, elapsed int64, active, succeeded boo } func TestCurrentTimeMillis(t *testing.T) { - defer destroy() + defer CleanAllStatus() c := CurrentTimeMillis() assert.NotNil(t, c) str := strconv.FormatInt(c, 10) i, _ := strconv.ParseInt(str, 10, 64) assert.Equal(t, c, i) } - -func destroy() { - delete1 := func(key interface{}, value interface{}) bool { - methodStatistics.Delete(key) - return true - } - methodStatistics.Range(delete1) - delete2 := func(key interface{}, value interface{}) bool { - serviceStatistic.Delete(key) - return true - } - serviceStatistic.Range(delete2) -}