diff --git a/cluster/cluster_impl/base_cluster_invoker.go b/cluster/cluster_impl/base_cluster_invoker.go index 85ab4ceb16d2d379214377fa5eb0ba739a32b655..a2d65291f401d5ad9d53349c03c0cd31b4428d6c 100644 --- a/cluster/cluster_impl/base_cluster_invoker.go +++ b/cluster/cluster_impl/base_cluster_invoker.go @@ -124,7 +124,7 @@ func (invoker *baseClusterInvoker) doSelectInvoker(lb cluster.LoadBalance, invoc logger.Errorf("the invokers of %s is nil. ", invocation.Invoker().GetUrl().ServiceKey()) return nil } - refreshBlackList() + go protocol.TryRefreshBlackList() if len(invokers) == 1 { if invokers[0].IsAvailable() { return invokers[0] @@ -139,28 +139,28 @@ func (invoker *baseClusterInvoker) doSelectInvoker(lb cluster.LoadBalance, invoc //judge to if the selectedInvoker is invoked and available if (!selectedInvoker.IsAvailable() && invoker.availablecheck) || isInvoked(selectedInvoker, invoked) { protocol.SetInvokerUnhealthyStatus(selectedInvoker) - + otherInvokers := getOtherInvokers(invokers, selectedInvoker) // do reselect - var reslectInvokers []protocol.Invoker - - for _, invoker := range invokers { - if !invoker.IsAvailable() { + for i := 0; i < 3; i++ { + if len(otherInvokers) == 0 { + // no other ivk to reselect, return to fallback + logger.Errorf("all %d invokers is unavailable for %s.", len(invokers), selectedInvoker.GetUrl().String()) + return nil + } + reselectedInvoker := lb.Select(otherInvokers, invocation) + if isInvoked(reselectedInvoker, invoked) { + otherInvokers = getOtherInvokers(otherInvokers, reselectedInvoker) + continue + } + if !reselectedInvoker.IsAvailable() { logger.Infof("the invoker of %s is not available, maybe some network error happened or the server is shutdown.", invoker.GetUrl().Ip) - protocol.SetInvokerUnhealthyStatus(invoker) + protocol.SetInvokerUnhealthyStatus(reselectedInvoker) + otherInvokers = getOtherInvokers(otherInvokers, reselectedInvoker) continue } - - if !isInvoked(invoker, invoked) { - reslectInvokers = append(reslectInvokers, invoker) - } - } - - if len(reslectInvokers) > 0 { - selectedInvoker = lb.Select(reslectInvokers, invocation) - } else { - logger.Errorf("all %d invokers is unavailable for %s.", len(invokers), selectedInvoker.GetUrl().String()) - return nil + selectedInvoker = reselectedInvoker + break } } return selectedInvoker @@ -203,12 +203,12 @@ func getLoadBalance(invoker protocol.Invoker, invocation protocol.Invocation) cl return extension.GetLoadbalance(lb) } -func refreshBlackList() { - ivks := protocol.GetBlackListInvokers(constant.DEFAULT_BLACK_LIST_RECOVER_BLOCK) - logger.Debug("blackList len = ", len(ivks)) - for i, _ := range ivks { - if ivks[i].(protocol.Invoker).IsAvailable() { - protocol.RemoveInvokerUnhealthyStatus(ivks[i]) +func getOtherInvokers(invokers []protocol.Invoker, invoker protocol.Invoker) []protocol.Invoker { + otherInvokers := make([]protocol.Invoker, 0) + for _, i := range invokers { + if i != invoker { + otherInvokers = append(otherInvokers, i) } } + return otherInvokers } diff --git a/cluster/router/conn_checker.go b/cluster/router/conn_checker.go new file mode 100644 index 0000000000000000000000000000000000000000..fda0ef868b32dfcd82980394cec0bce99615ecbb --- /dev/null +++ b/cluster/router/conn_checker.go @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package router + +import ( + "github.com/apache/dubbo-go/protocol" +) + +// ConnChecker is used to determine whether the invoker is healthy or not +type ConnChecker interface { + // IsConnHealthy evaluates the healthy state on the given Invoker + IsConnHealthy(invoker protocol.Invoker) bool +} diff --git a/cluster/router/conncheck/conn_check_route.go b/cluster/router/conncheck/conn_check_route.go new file mode 100644 index 0000000000000000000000000000000000000000..0826821b28967f9f2718c3df9e30e1179d8c436b --- /dev/null +++ b/cluster/router/conncheck/conn_check_route.go @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package healthcheck + +import ( + "github.com/RoaringBitmap/roaring" +) + +import ( + "github.com/apache/dubbo-go/cluster/router" + "github.com/apache/dubbo-go/cluster/router/utils" + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/common/logger" + "github.com/apache/dubbo-go/protocol" +) + +const ( + connHealthy = "conn-healthy" + name = "conn-check-router" +) + +// ConnCheckRouter provides a health-first routing mechanism through ConnChecker +type ConnCheckRouter struct { + url *common.URL + checker router.ConnChecker +} + +// NewConnCheckRouter construct an NewConnCheckRouter via url +func NewConnCheckRouter(url *common.URL) (router.PriorityRouter, error) { + r := &ConnCheckRouter{ + url: url, + } + checkerName := url.GetParam(constant.HEALTH_CHECKER, constant.DEFAULT_HEALTH_CHECKER) + r.checker = extension.GetConnChecker(checkerName, url) + return r, nil +} + +// Route gets a list of healthy invoker +func (r *ConnCheckRouter) Route(invokers *roaring.Bitmap, cache router.Cache, url *common.URL, invocation protocol.Invocation) *roaring.Bitmap { + addrPool := cache.FindAddrPool(r) + // Add healthy invoker to the list + healthyInvokers := utils.JoinIfNotEqual(addrPool[connHealthy], invokers) + // If all invokers are considered unhealthy, downgrade to all invoker + if healthyInvokers.IsEmpty() { + logger.Warnf(" Now all invokers are unhealthy, so downgraded to all! Service: [%s]", url.ServiceKey()) + return invokers + } + return healthyInvokers +} + +// Pool separates healthy invokers from others. +func (r *ConnCheckRouter) Pool(invokers []protocol.Invoker) (router.AddrPool, router.AddrMetadata) { + rb := make(router.AddrPool, 8) + rb[connHealthy] = roaring.NewBitmap() + for i, invoker := range invokers { + if r.checker.IsConnHealthy(invoker) { + rb[connHealthy].Add(uint32(i)) + } + } + return rb, nil +} + +// ShouldPool will always return true to make sure healthy check constantly. +func (r *ConnCheckRouter) ShouldPool() bool { + return true +} + +func (r *ConnCheckRouter) Name() string { + return name +} + +// Priority +func (r *ConnCheckRouter) Priority() int64 { + return 0 +} + +// URL Return URL in router +func (r *ConnCheckRouter) URL() *common.URL { + return r.url +} + +// ConnChecker returns the HealthChecker bound to this HealthCheckRouter +func (r *ConnCheckRouter) ConnChecker() router.ConnChecker { + return r.checker +} diff --git a/cluster/router/conncheck/conn_check_route_test.go b/cluster/router/conncheck/conn_check_route_test.go new file mode 100644 index 0000000000000000000000000000000000000000..4b9cacc17c59457d07e17131d08ded129cd898b5 --- /dev/null +++ b/cluster/router/conncheck/conn_check_route_test.go @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package healthcheck + +import ( + "fmt" + "math" + "testing" + "time" +) + +import ( + "github.com/stretchr/testify/assert" +) + +import ( + "github.com/apache/dubbo-go/cluster/router" + "github.com/apache/dubbo-go/cluster/router/chain" + "github.com/apache/dubbo-go/cluster/router/utils" + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/protocol" + "github.com/apache/dubbo-go/protocol/invocation" +) + +const ( + healthCheckRoute1010IP = "192.168.10.10" + healthCheckRoute1011IP = "192.168.10.11" + healthCheckRoute1012IP = "192.168.10.12" + healthCheckRouteMethodNameTest = "test" + healthCheck1001URL = "dubbo://192.168.10.1/com.ikurento.user.UserProvider" + healthCheckRouteUrlFormat = "dubbo://%s:20000/com.ikurento.user.UserProvider" +) + +func TestHealthCheckRouterRoute(t *testing.T) { + defer protocol.CleanAllStatus() + consumerURL, _ := common.NewURL(healthCheck1001URL) + consumerURL.SetParam(CONN_ROUTE_ENABLED_KEY, "true") + url1, _ := common.NewURL(fmt.Sprintf(healthCheckRouteUrlFormat, healthCheckRoute1010IP)) + url2, _ := common.NewURL(fmt.Sprintf(healthCheckRouteUrlFormat, healthCheckRoute1011IP)) + url3, _ := common.NewURL(fmt.Sprintf(healthCheckRouteUrlFormat, healthCheckRoute1012IP)) + hcr, _ := NewConnCheckRouter(consumerURL) + + var invokers []protocol.Invoker + invoker1 := NewMockInvoker(url1) + invoker2 := NewMockInvoker(url2) + invoker3 := NewMockInvoker(url3) + invokers = append(invokers, invoker1, invoker2, invoker3) + inv := invocation.NewRPCInvocation(healthCheckRouteMethodNameTest, nil, nil) + res := hcr.Route(utils.ToBitmap(invokers), setUpAddrCache(hcr.(*HealthCheckRouter), invokers), consumerURL, inv) + // now all invokers are healthy + assert.True(t, len(res.ToArray()) == len(invokers)) + + for i := 0; i < 10; i++ { + request(url1, healthCheckRouteMethodNameTest, 0, false, false) + } + res = hcr.Route(utils.ToBitmap(invokers), setUpAddrCache(hcr.(*HealthCheckRouter), invokers), consumerURL, inv) + // invokers1 is unhealthy now + assert.True(t, len(res.ToArray()) == 2 && !res.Contains(0)) + + for i := 0; i < 10; i++ { + request(url1, healthCheckRouteMethodNameTest, 0, false, false) + request(url2, healthCheckRouteMethodNameTest, 0, false, false) + } + + res = hcr.Route(utils.ToBitmap(invokers), setUpAddrCache(hcr.(*HealthCheckRouter), invokers), consumerURL, inv) + // only invokers3 is healthy now + assert.True(t, len(res.ToArray()) == 1 && !res.Contains(0) && !res.Contains(1)) + + for i := 0; i < 10; i++ { + request(url1, healthCheckRouteMethodNameTest, 0, false, false) + request(url2, healthCheckRouteMethodNameTest, 0, false, false) + request(url3, healthCheckRouteMethodNameTest, 0, false, false) + } + + res = hcr.Route(utils.ToBitmap(invokers), setUpAddrCache(hcr.(*HealthCheckRouter), invokers), consumerURL, inv) + // now all invokers are unhealthy, so downgraded to all + assert.True(t, len(res.ToArray()) == 3) + + // reset the invoker1 successive failed count, so invoker1 go to healthy + request(url1, healthCheckRouteMethodNameTest, 0, false, true) + res = hcr.Route(utils.ToBitmap(invokers), setUpAddrCache(hcr.(*HealthCheckRouter), invokers), consumerURL, inv) + assert.True(t, res.Contains(0)) + + for i := 0; i < 6; i++ { + request(url1, healthCheckRouteMethodNameTest, 0, false, false) + } + // now all invokers are unhealthy, so downgraded to all again + res = hcr.Route(utils.ToBitmap(invokers), setUpAddrCache(hcr.(*HealthCheckRouter), invokers), consumerURL, inv) + assert.True(t, len(res.ToArray()) == 3) + time.Sleep(time.Second * 2) + // invoker1 go to healthy again after 2s + res = hcr.Route(utils.ToBitmap(invokers), setUpAddrCache(hcr.(*HealthCheckRouter), invokers), consumerURL, inv) + assert.True(t, res.Contains(0)) + +} + +func TestNewHealthCheckRouter(t *testing.T) { + defer protocol.CleanAllStatus() + url, _ := common.NewURL(fmt.Sprintf(healthCheckDubboUrlFormat, healthCheckDubbo1010IP)) + hcr, _ := NewHealthCheckRouter(url) + h := hcr.(*HealthCheckRouter) + assert.Nil(t, h.checker) + + url.SetParam(HEALTH_ROUTE_ENABLED_KEY, "true") + hcr, _ = NewHealthCheckRouter(url) + h = hcr.(*HealthCheckRouter) + assert.NotNil(t, h.checker) + + dhc := h.checker.(*DefaultHealthChecker) + assert.Equal(t, dhc.outStandingRequestConutLimit, int32(math.MaxInt32)) + assert.Equal(t, dhc.requestSuccessiveFailureThreshold, int32(constant.DEFAULT_SUCCESSIVE_FAILED_THRESHOLD)) + assert.Equal(t, dhc.circuitTrippedTimeoutFactor, int32(constant.DEFAULT_CIRCUIT_TRIPPED_TIMEOUT_FACTOR)) + + url.SetParam(constant.CIRCUIT_TRIPPED_TIMEOUT_FACTOR_KEY, "500") + url.SetParam(constant.SUCCESSIVE_FAILED_REQUEST_THRESHOLD_KEY, "10") + url.SetParam(constant.OUTSTANDING_REQUEST_COUNT_LIMIT_KEY, "1000") + hcr, _ = NewHealthCheckRouter(url) + h = hcr.(*HealthCheckRouter) + dhc = h.checker.(*DefaultHealthChecker) + assert.Equal(t, dhc.outStandingRequestConutLimit, int32(1000)) + assert.Equal(t, dhc.requestSuccessiveFailureThreshold, int32(10)) + assert.Equal(t, dhc.circuitTrippedTimeoutFactor, int32(500)) +} + +func setUpAddrCache(r router.Poolable, addrs []protocol.Invoker) router.Cache { + pool, info := r.Pool(addrs) + cache := chain.BuildCache(addrs) + cache.SetAddrMeta(r.Name(), info) + cache.SetAddrPool(r.Name(), pool) + return cache +} diff --git a/cluster/router/conncheck/conn_health_check.go b/cluster/router/conncheck/conn_health_check.go new file mode 100644 index 0000000000000000000000000000000000000000..54d39f931ba83470eac4f841f2e1fb363ee09ca6 --- /dev/null +++ b/cluster/router/conncheck/conn_health_check.go @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package healthcheck + +import ( + "github.com/apache/dubbo-go/cluster/router" + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/protocol" +) + +func init() { + extension.SetConnChecker(constant.DEFAULT_CONN_CHECKER, NewDefaultConnChecker) +} + +// DefaultConnChecker is the default implementation of ConnChecker, which determines the health status of invoker conn +type DefaultConnChecker struct { +} + +// IsConnHealthy evaluates the healthy state on the given Invoker based on the number of successive bad request +// and the current active request +func (c *DefaultConnChecker) IsConnHealthy(invoker protocol.Invoker) bool { + return protocol.GetInvokerHealthyStatus(invoker) +} + +// NewDefaultConnChecker constructs a new DefaultConnChecker based on the url +func NewDefaultConnChecker(url *common.URL) router.ConnChecker { + return &DefaultConnChecker{} +} diff --git a/cluster/router/conncheck/conn_health_check_test.go b/cluster/router/conncheck/conn_health_check_test.go new file mode 100644 index 0000000000000000000000000000000000000000..39827c5f050a1a5ac9524345b59e37ec23efb90f --- /dev/null +++ b/cluster/router/conncheck/conn_health_check_test.go @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package healthcheck + +import ( + "fmt" + "math" + "testing" +) + +import ( + "github.com/stretchr/testify/assert" +) + +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/protocol" +) + +const ( + healthCheckDubbo1010IP = "192.168.10.10" + healthCheckDubbo1011IP = "192.168.10.11" + healthCheckMethodTest = "test" + healthCheckDubboUrlFormat = "dubbo://%s:20000/com.ikurento.user.UserProvider" +) + +func TestDefaultHealthCheckerIsHealthy(t *testing.T) { + defer protocol.CleanAllStatus() + url, _ := common.NewURL(fmt.Sprintf(healthCheckDubboUrlFormat, healthCheckDubbo1010IP)) + hc := NewDefaultHealthChecker(url).(*DefaultHealthChecker) + invoker := NewMockInvoker(url) + healthy := hc.IsHealthy(invoker) + assert.True(t, healthy) + + url.SetParam(constant.OUTSTANDING_REQUEST_COUNT_LIMIT_KEY, "10") + url.SetParam(constant.SUCCESSIVE_FAILED_REQUEST_THRESHOLD_KEY, "100") + // fake the outgoing request + for i := 0; i < 11; i++ { + request(url, healthCheckMethodTest, 0, true, false) + } + hc = NewDefaultHealthChecker(url).(*DefaultHealthChecker) + healthy = hc.IsHealthy(invoker) + // the outgoing request is more than OUTSTANDING_REQUEST_COUNT_LIMIT, go to unhealthy + assert.False(t, hc.IsHealthy(invoker)) + + // successive failed count is more than constant.SUCCESSIVE_FAILED_REQUEST_THRESHOLD_KEY, go to unhealthy + for i := 0; i < 11; i++ { + request(url, healthCheckMethodTest, 0, false, false) + } + url.SetParam(constant.SUCCESSIVE_FAILED_REQUEST_THRESHOLD_KEY, "10") + url.SetParam(constant.OUTSTANDING_REQUEST_COUNT_LIMIT_KEY, "1000") + hc = NewDefaultHealthChecker(url).(*DefaultHealthChecker) + healthy = hc.IsHealthy(invoker) + assert.False(t, hc.IsHealthy(invoker)) + + // reset successive failed count and go to healthy + request(url, healthCheckMethodTest, 0, false, true) + healthy = hc.IsHealthy(invoker) + assert.True(t, hc.IsHealthy(invoker)) +} + +func TestDefaultHealthCheckerGetCircuitBreakerSleepWindowTime(t *testing.T) { + defer protocol.CleanAllStatus() + url, _ := common.NewURL(fmt.Sprintf(healthCheckDubboUrlFormat, healthCheckDubbo1010IP)) + defaultHc := NewDefaultHealthChecker(url).(*DefaultHealthChecker) + // Increase the number of failed requests + for i := 0; i < 100; i++ { + request(url, healthCheckMethodTest, 1, false, false) + } + sleepWindowTime := defaultHc.getCircuitBreakerSleepWindowTime(protocol.GetURLStatus(url)) + assert.True(t, sleepWindowTime == constant.MAX_CIRCUIT_TRIPPED_TIMEOUT_IN_MS) + + // Adjust the threshold size to 1000 + url.SetParam(constant.SUCCESSIVE_FAILED_REQUEST_THRESHOLD_KEY, "1000") + sleepWindowTime = NewDefaultHealthChecker(url).(*DefaultHealthChecker).getCircuitBreakerSleepWindowTime(protocol.GetURLStatus(url)) + assert.True(t, sleepWindowTime == 0) + + url1, _ := common.NewURL(fmt.Sprintf(healthCheckDubboUrlFormat, healthCheckDubbo1011IP)) + sleepWindowTime = defaultHc.getCircuitBreakerSleepWindowTime(protocol.GetURLStatus(url1)) + assert.True(t, sleepWindowTime == 0) + request(url1, healthCheckMethodTest, 1, false, false) + request(url1, healthCheckMethodTest, 1, false, false) + request(url1, healthCheckMethodTest, 1, false, false) + request(url1, healthCheckMethodTest, 1, false, false) + request(url1, healthCheckMethodTest, 1, false, false) + request(url1, healthCheckMethodTest, 1, false, false) + sleepWindowTime = defaultHc.getCircuitBreakerSleepWindowTime(protocol.GetURLStatus(url1)) + assert.True(t, sleepWindowTime > 0 && sleepWindowTime < constant.MAX_CIRCUIT_TRIPPED_TIMEOUT_IN_MS) +} + +func TestDefaultHealthCheckerGetCircuitBreakerTimeout(t *testing.T) { + defer protocol.CleanAllStatus() + url, _ := common.NewURL(fmt.Sprintf(healthCheckDubboUrlFormat, healthCheckDubbo1010IP)) + defaultHc := NewDefaultHealthChecker(url).(*DefaultHealthChecker) + timeout := defaultHc.getCircuitBreakerTimeout(protocol.GetURLStatus(url)) + assert.True(t, timeout == 0) + url1, _ := common.NewURL(fmt.Sprintf(healthCheckDubboUrlFormat, healthCheckDubbo1011IP)) + request(url1, healthCheckMethodTest, 1, false, false) + request(url1, healthCheckMethodTest, 1, false, false) + request(url1, healthCheckMethodTest, 1, false, false) + request(url1, healthCheckMethodTest, 1, false, false) + request(url1, healthCheckMethodTest, 1, false, false) + request(url1, healthCheckMethodTest, 1, false, false) + timeout = defaultHc.getCircuitBreakerTimeout(protocol.GetURLStatus(url1)) + // timeout must after the current time + assert.True(t, timeout > protocol.CurrentTimeMillis()) + +} + +func TestDefaultHealthCheckerIsCircuitBreakerTripped(t *testing.T) { + defer protocol.CleanAllStatus() + url, _ := common.NewURL(fmt.Sprintf(healthCheckDubboUrlFormat, healthCheckDubbo1010IP)) + defaultHc := NewDefaultHealthChecker(url).(*DefaultHealthChecker) + status := protocol.GetURLStatus(url) + tripped := defaultHc.isCircuitBreakerTripped(status) + assert.False(t, tripped) + // Increase the number of failed requests + for i := 0; i < 100; i++ { + request(url, healthCheckMethodTest, 1, false, false) + } + tripped = defaultHc.isCircuitBreakerTripped(protocol.GetURLStatus(url)) + assert.True(t, tripped) + +} + +func TestNewDefaultHealthChecker(t *testing.T) { + defer protocol.CleanAllStatus() + url, _ := common.NewURL(fmt.Sprintf(healthCheckDubboUrlFormat, healthCheckDubbo1010IP)) + defaultHc := NewDefaultHealthChecker(url).(*DefaultHealthChecker) + assert.NotNil(t, defaultHc) + assert.Equal(t, defaultHc.outStandingRequestConutLimit, int32(math.MaxInt32)) + assert.Equal(t, defaultHc.requestSuccessiveFailureThreshold, int32(constant.DEFAULT_SUCCESSIVE_FAILED_REQUEST_MAX_DIFF)) + + url1, _ := common.NewURL(fmt.Sprintf(healthCheckDubboUrlFormat, healthCheckDubbo1010IP)) + url1.SetParam(constant.OUTSTANDING_REQUEST_COUNT_LIMIT_KEY, "10") + url1.SetParam(constant.SUCCESSIVE_FAILED_REQUEST_THRESHOLD_KEY, "10") + nondefaultHc := NewDefaultHealthChecker(url1).(*DefaultHealthChecker) + assert.NotNil(t, nondefaultHc) + assert.Equal(t, nondefaultHc.outStandingRequestConutLimit, int32(10)) + assert.Equal(t, nondefaultHc.requestSuccessiveFailureThreshold, int32(10)) +} + +func request(url *common.URL, method string, elapsed int64, active, succeeded bool) { + protocol.BeginCount(url, method) + if !active { + protocol.EndCount(url, method, elapsed, succeeded) + } +} diff --git a/cluster/router/conncheck/factory.go b/cluster/router/conncheck/factory.go new file mode 100644 index 0000000000000000000000000000000000000000..05d6a3815ed73e43b84c2f4c672f95d9fdbcaf4a --- /dev/null +++ b/cluster/router/conncheck/factory.go @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package healthcheck + +import ( + "github.com/apache/dubbo-go/cluster/router" + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/extension" +) + +func init() { + extension.SetRouterFactory(constant.ConnCheckRouterName, newConnCheckRouteFactory) +} + +// ConnCheckRouteFactory +type ConnCheckRouteFactory struct { +} + +// newHealthCheckRouteFactory construct a new HealthCheckRouteFactory +func newConnCheckRouteFactory() router.PriorityRouterFactory { + return &ConnCheckRouteFactory{} +} + +// NewPriorityRouter construct a new NewHealthCheckRouter via url +func (f *ConnCheckRouteFactory) NewPriorityRouter(url *common.URL) (router.PriorityRouter, error) { + return NewConnCheckRouter(url) +} diff --git a/cluster/router/conncheck/factory_test.go b/cluster/router/conncheck/factory_test.go new file mode 100644 index 0000000000000000000000000000000000000000..9e1870ae98b21690ab5f0b9c64d25e1fee5502c4 --- /dev/null +++ b/cluster/router/conncheck/factory_test.go @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package healthcheck + +import ( + "context" + "testing" +) + +import ( + "github.com/stretchr/testify/assert" +) + +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/protocol" +) + +// nolint +type MockInvoker struct { + url *common.URL +} + +// nolint +func NewMockInvoker(url *common.URL) *MockInvoker { + return &MockInvoker{ + url: url, + } +} + +// nolint +func (bi *MockInvoker) GetUrl() *common.URL { + return bi.url +} + +// nolint +func (bi *MockInvoker) IsAvailable() bool { + return true +} + +// nolint +func (bi *MockInvoker) IsDestroyed() bool { + return true +} + +// nolint +func (bi *MockInvoker) Invoke(_ context.Context, _ protocol.Invocation) protocol.Result { + return nil +} + +// nolint +func (bi *MockInvoker) Destroy() { +} + +// nolint +func TestHealthCheckRouteFactory(t *testing.T) { + factory := newConnCheckRouteFactory() + assert.NotNil(t, factory) +} diff --git a/cluster/router/healthcheck/default_health_check.go b/cluster/router/healthcheck/default_health_check.go index f1ae9c20a222b463898db4498764caf99d95e3a4..eb15e6f6422d1bdc0ae4d0f4a76db50161db1870 100644 --- a/cluster/router/healthcheck/default_health_check.go +++ b/cluster/router/healthcheck/default_health_check.go @@ -48,10 +48,6 @@ type DefaultHealthChecker struct { // IsHealthy evaluates the healthy state on the given Invoker based on the number of successive bad request // and the current active request func (c *DefaultHealthChecker) IsHealthy(invoker protocol.Invoker) bool { - if !protocol.GetInvokerHealthyStatus(invoker) { - return false - } - urlStatus := protocol.GetURLStatus(invoker.GetUrl()) if c.isCircuitBreakerTripped(urlStatus) || urlStatus.GetActive() > c.GetOutStandingRequestCountLimit() { logger.Debugf("Invoker [%s] is currently in circuitbreaker tripped state", invoker.GetUrl().Key()) diff --git a/common/constant/key.go b/common/constant/key.go index 786c6ed016e60c103900c355ebf00704246a94dd..5dedbc2c21b29b23aaccdccc7049364a50c4bfd5 100644 --- a/common/constant/key.go +++ b/common/constant/key.go @@ -89,7 +89,7 @@ const ( RETRY_PERIOD_KEY = "retry.period" RETRY_TIMES_KEY = "retry.times" CYCLE_REPORT_KEY = "cycle.report" - DEFAULT_BLACK_LIST_RECOVER_BLOCK = 10 + DEFAULT_BLACK_LIST_RECOVER_BLOCK = 16 ) const ( @@ -211,6 +211,8 @@ const ( ListenableRouterName = "listenable" // HealthCheckRouterName Specify the name of HealthCheckRouter HealthCheckRouterName = "health_check" + // ConnCheckRouterName Specify the name of ConnCheckRouter + ConnCheckRouterName = "conn_check" // TagRouterName Specify the name of TagRouter TagRouterName = "tag" // TagRouterRuleSuffix Specify tag router suffix @@ -296,6 +298,8 @@ const ( HEALTH_CHECKER = "health.checker" // The name of the default implementation of HealthChecker DEFAULT_HEALTH_CHECKER = "default" + // The name of the default implementation of C + DEFAULT_CONN_CHECKER = "default" // The key of oustanding-request-limit OUTSTANDING_REQUEST_COUNT_LIMIT_KEY = "outstanding.request.limit" // The key of successive-failed-request's threshold diff --git a/common/extension/conn_checker.go b/common/extension/conn_checker.go new file mode 100644 index 0000000000000000000000000000000000000000..99add91f8000b754b1724aa9d87d17a65de83d8e --- /dev/null +++ b/common/extension/conn_checker.go @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package extension + +import ( + "github.com/apache/dubbo-go/cluster/router" + "github.com/apache/dubbo-go/common" +) + +var ( + connCheckers = make(map[string]func(url *common.URL) router.ConnChecker) +) + +// SetHealthChecker sets the HealthChecker with @name +func SetConnChecker(name string, fcn func(_ *common.URL) router.ConnChecker) { + connCheckers[name] = fcn +} + +// GetHealthChecker gets the HealthChecker with @name +func GetConnChecker(name string, url *common.URL) router.ConnChecker { + if connCheckers[name] == nil { + panic("connCheckers for " + name + " is not existing, make sure you have import the package.") + } + return connCheckers[name](url) +} diff --git a/go.mod b/go.mod index 5f954a3f8926dcf564d3d6a3a16be2c021a78382..282877bb5dc4d352cd57b6027449c07a07411281 100644 --- a/go.mod +++ b/go.mod @@ -11,7 +11,7 @@ require ( github.com/coreos/etcd v3.3.25+incompatible github.com/creasty/defaults v1.5.1 github.com/dubbogo/go-zookeeper v1.0.2 - github.com/dubbogo/gost v1.10.0 + github.com/dubbogo/gost v1.10.1 github.com/elazarl/go-bindata-assetfs v1.0.0 // indirect github.com/emicklei/go-restful/v3 v3.0.0 github.com/frankban/quicktest v1.4.1 // indirect diff --git a/go.sum b/go.sum index a30f2bd5e539d44c95218c38bd8f86c26ec6307e..ba6e62fcbbbc859e5d3043c45da1985731c4e087 100644 --- a/go.sum +++ b/go.sum @@ -217,6 +217,8 @@ github.com/dubbogo/gost v1.9.2 h1:lTo5WETmqDKSW4d+Fr3Emiz1rKsVaQCPWRypJPAAfcw= github.com/dubbogo/gost v1.9.2/go.mod h1:QNM5RaeRdNWehUu8S0hUP5Qa8QUfGf6KH1JhqOVFvEI= github.com/dubbogo/gost v1.10.0 h1:LGZlrNAICZzPXAXU/t3gwnV4gxW2CcXvbjfx28G5164= github.com/dubbogo/gost v1.10.0/go.mod h1:+mQGS51XQEUWZP2JeGZTxJwipjRKtJO7Tr+FOg+72rI= +github.com/dubbogo/gost v1.10.1 h1:39kF9Cd5JOiMpmwG6dX1/aLWNFqFv9gHp8HrhzMmjLY= +github.com/dubbogo/gost v1.10.1/go.mod h1:+mQGS51XQEUWZP2JeGZTxJwipjRKtJO7Tr+FOg+72rI= github.com/dubbogo/jsonparser v1.0.1/go.mod h1:tYAtpctvSP/tWw4MeelsowSPgXQRVHHWbqL6ynps8jU= github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo= github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= diff --git a/protocol/rpc_status.go b/protocol/rpc_status.go index e0fc7134f15350893b99c72a3f17da051ebf6628..2e2aecffe7dc7a9579ea3268baf372c04e84a6f2 100644 --- a/protocol/rpc_status.go +++ b/protocol/rpc_status.go @@ -18,6 +18,8 @@ package protocol import ( + "fmt" + "github.com/apache/dubbo-go/common/constant" "sync" "sync/atomic" "time" @@ -28,9 +30,10 @@ import ( ) var ( - methodStatistics sync.Map // url -> { methodName : RPCStatus} - serviceStatistic sync.Map // url -> RPCStatus - invokerBlackList sync.Map // store unhealthy url blackList + methodStatistics sync.Map // url -> { methodName : RPCStatus} + serviceStatistic sync.Map // url -> RPCStatus + invokerBlackList sync.Map // store unhealthy url blackList + blackListRefreshing int32 // store if the refresing method is processing ) // RPCStatus is URL statistics. @@ -218,3 +221,29 @@ func GetBlackListInvokers(blockSize int) []Invoker { func RemoveUrlKeyUnhealthyStatus(key string) { invokerBlackList.Delete(key) } + +func TryRefreshBlackList() { + if atomic.CompareAndSwapInt32(&blackListRefreshing, 0, 1) { + wg := sync.WaitGroup{} + defer func() { + atomic.CompareAndSwapInt32(&blackListRefreshing, 1, 0) + }() + + ivks := GetBlackListInvokers(constant.DEFAULT_BLACK_LIST_RECOVER_BLOCK) + fmt.Println("blackList len = ", len(ivks)) + + for i := 0; i < 3; i++ { + wg.Add(1) + go func(ivks []Invoker, i int) { + for j, _ := range ivks { + if j%3-i == 0 { + if ivks[j].(Invoker).IsAvailable() { + RemoveInvokerUnhealthyStatus(ivks[i]) + } + } + } + }(ivks, i) + } + wg.Wait() + } +}