diff --git a/cluster/router/.gitkeep b/cluster/router/.gitkeep deleted file mode 100644 index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..0000000000000000000000000000000000000000 diff --git a/cluster/router/conncheck/conn_check_route.go b/cluster/router/conncheck/conn_check_route.go index 0826821b28967f9f2718c3df9e30e1179d8c436b..8d94db409bbf896eb27ecdf74c813e34dfd298aa 100644 --- a/cluster/router/conncheck/conn_check_route.go +++ b/cluster/router/conncheck/conn_check_route.go @@ -15,7 +15,7 @@ * limitations under the License. */ -package healthcheck +package conncheck import ( "github.com/RoaringBitmap/roaring" @@ -82,11 +82,12 @@ func (r *ConnCheckRouter) ShouldPool() bool { return true } +// Name get name of ConnCheckerRouter func (r *ConnCheckRouter) Name() string { return name } -// Priority +// Priority get Router priority level func (r *ConnCheckRouter) Priority() int64 { return 0 } diff --git a/cluster/router/conncheck/conn_check_route_test.go b/cluster/router/conncheck/conn_check_route_test.go index 4b9cacc17c59457d07e17131d08ded129cd898b5..9c78fd3899488df0ae82cfa0c2dbac71c51b1fa6 100644 --- a/cluster/router/conncheck/conn_check_route_test.go +++ b/cluster/router/conncheck/conn_check_route_test.go @@ -15,11 +15,10 @@ * limitations under the License. */ -package healthcheck +package conncheck import ( "fmt" - "math" "testing" "time" ) @@ -33,109 +32,54 @@ import ( "github.com/apache/dubbo-go/cluster/router/chain" "github.com/apache/dubbo-go/cluster/router/utils" "github.com/apache/dubbo-go/common" - "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/protocol" "github.com/apache/dubbo-go/protocol/invocation" ) const ( - healthCheckRoute1010IP = "192.168.10.10" - healthCheckRoute1011IP = "192.168.10.11" - healthCheckRoute1012IP = "192.168.10.12" - healthCheckRouteMethodNameTest = "test" - healthCheck1001URL = "dubbo://192.168.10.1/com.ikurento.user.UserProvider" - healthCheckRouteUrlFormat = "dubbo://%s:20000/com.ikurento.user.UserProvider" + connCheckRoute1010IP = "192.168.10.10" + connCheckRoute1011IP = "192.168.10.11" + connCheckRoute1012IP = "192.168.10.12" + connCheckRouteMethodNameTest = "test" + connCheck1001URL = "dubbo://192.168.10.1/com.ikurento.user.UserProvider" + connCheckRouteUrlFormat = "dubbo://%s:20000/com.ikurento.user.UserProvider" ) -func TestHealthCheckRouterRoute(t *testing.T) { +func TestConnCheckRouterRoute(t *testing.T) { defer protocol.CleanAllStatus() - consumerURL, _ := common.NewURL(healthCheck1001URL) - consumerURL.SetParam(CONN_ROUTE_ENABLED_KEY, "true") - url1, _ := common.NewURL(fmt.Sprintf(healthCheckRouteUrlFormat, healthCheckRoute1010IP)) - url2, _ := common.NewURL(fmt.Sprintf(healthCheckRouteUrlFormat, healthCheckRoute1011IP)) - url3, _ := common.NewURL(fmt.Sprintf(healthCheckRouteUrlFormat, healthCheckRoute1012IP)) + consumerURL, _ := common.NewURL(connCheck1001URL) + url1, _ := common.NewURL(fmt.Sprintf(connCheckRouteUrlFormat, connCheckRoute1010IP)) + url2, _ := common.NewURL(fmt.Sprintf(connCheckRouteUrlFormat, connCheckRoute1011IP)) + url3, _ := common.NewURL(fmt.Sprintf(connCheckRouteUrlFormat, connCheckRoute1012IP)) hcr, _ := NewConnCheckRouter(consumerURL) var invokers []protocol.Invoker invoker1 := NewMockInvoker(url1) invoker2 := NewMockInvoker(url2) invoker3 := NewMockInvoker(url3) + protocol.SetInvokerUnhealthyStatus(invoker1) + protocol.SetInvokerUnhealthyStatus(invoker2) + invokers = append(invokers, invoker1, invoker2, invoker3) - inv := invocation.NewRPCInvocation(healthCheckRouteMethodNameTest, nil, nil) - res := hcr.Route(utils.ToBitmap(invokers), setUpAddrCache(hcr.(*HealthCheckRouter), invokers), consumerURL, inv) + inv := invocation.NewRPCInvocation(connCheckRouteMethodNameTest, nil, nil) + res := hcr.Route(utils.ToBitmap(invokers), setUpAddrCache(hcr.(*ConnCheckRouter), invokers), consumerURL, inv) + + // now invoker3 is healthy + assert.True(t, len(res.ToArray()) == 1) + invoker2 = NewMockInvoker(url2) + + // check blacklist remove + protocol.RemoveInvokerUnhealthyStatus(invoker1) + res = hcr.Route(utils.ToBitmap(invokers), setUpAddrCache(hcr.(*ConnCheckRouter), invokers), consumerURL, inv) + // now invoker3 invoker1 is healthy + assert.True(t, len(res.ToArray()) == 2) + + // check recover + protocol.TryRefreshBlackList() + time.Sleep(time.Second) + res = hcr.Route(utils.ToBitmap(invokers), setUpAddrCache(hcr.(*ConnCheckRouter), invokers), consumerURL, inv) // now all invokers are healthy - assert.True(t, len(res.ToArray()) == len(invokers)) - - for i := 0; i < 10; i++ { - request(url1, healthCheckRouteMethodNameTest, 0, false, false) - } - res = hcr.Route(utils.ToBitmap(invokers), setUpAddrCache(hcr.(*HealthCheckRouter), invokers), consumerURL, inv) - // invokers1 is unhealthy now - assert.True(t, len(res.ToArray()) == 2 && !res.Contains(0)) - - for i := 0; i < 10; i++ { - request(url1, healthCheckRouteMethodNameTest, 0, false, false) - request(url2, healthCheckRouteMethodNameTest, 0, false, false) - } - - res = hcr.Route(utils.ToBitmap(invokers), setUpAddrCache(hcr.(*HealthCheckRouter), invokers), consumerURL, inv) - // only invokers3 is healthy now - assert.True(t, len(res.ToArray()) == 1 && !res.Contains(0) && !res.Contains(1)) - - for i := 0; i < 10; i++ { - request(url1, healthCheckRouteMethodNameTest, 0, false, false) - request(url2, healthCheckRouteMethodNameTest, 0, false, false) - request(url3, healthCheckRouteMethodNameTest, 0, false, false) - } - - res = hcr.Route(utils.ToBitmap(invokers), setUpAddrCache(hcr.(*HealthCheckRouter), invokers), consumerURL, inv) - // now all invokers are unhealthy, so downgraded to all - assert.True(t, len(res.ToArray()) == 3) - - // reset the invoker1 successive failed count, so invoker1 go to healthy - request(url1, healthCheckRouteMethodNameTest, 0, false, true) - res = hcr.Route(utils.ToBitmap(invokers), setUpAddrCache(hcr.(*HealthCheckRouter), invokers), consumerURL, inv) - assert.True(t, res.Contains(0)) - - for i := 0; i < 6; i++ { - request(url1, healthCheckRouteMethodNameTest, 0, false, false) - } - // now all invokers are unhealthy, so downgraded to all again - res = hcr.Route(utils.ToBitmap(invokers), setUpAddrCache(hcr.(*HealthCheckRouter), invokers), consumerURL, inv) assert.True(t, len(res.ToArray()) == 3) - time.Sleep(time.Second * 2) - // invoker1 go to healthy again after 2s - res = hcr.Route(utils.ToBitmap(invokers), setUpAddrCache(hcr.(*HealthCheckRouter), invokers), consumerURL, inv) - assert.True(t, res.Contains(0)) - -} - -func TestNewHealthCheckRouter(t *testing.T) { - defer protocol.CleanAllStatus() - url, _ := common.NewURL(fmt.Sprintf(healthCheckDubboUrlFormat, healthCheckDubbo1010IP)) - hcr, _ := NewHealthCheckRouter(url) - h := hcr.(*HealthCheckRouter) - assert.Nil(t, h.checker) - - url.SetParam(HEALTH_ROUTE_ENABLED_KEY, "true") - hcr, _ = NewHealthCheckRouter(url) - h = hcr.(*HealthCheckRouter) - assert.NotNil(t, h.checker) - - dhc := h.checker.(*DefaultHealthChecker) - assert.Equal(t, dhc.outStandingRequestConutLimit, int32(math.MaxInt32)) - assert.Equal(t, dhc.requestSuccessiveFailureThreshold, int32(constant.DEFAULT_SUCCESSIVE_FAILED_THRESHOLD)) - assert.Equal(t, dhc.circuitTrippedTimeoutFactor, int32(constant.DEFAULT_CIRCUIT_TRIPPED_TIMEOUT_FACTOR)) - - url.SetParam(constant.CIRCUIT_TRIPPED_TIMEOUT_FACTOR_KEY, "500") - url.SetParam(constant.SUCCESSIVE_FAILED_REQUEST_THRESHOLD_KEY, "10") - url.SetParam(constant.OUTSTANDING_REQUEST_COUNT_LIMIT_KEY, "1000") - hcr, _ = NewHealthCheckRouter(url) - h = hcr.(*HealthCheckRouter) - dhc = h.checker.(*DefaultHealthChecker) - assert.Equal(t, dhc.outStandingRequestConutLimit, int32(1000)) - assert.Equal(t, dhc.requestSuccessiveFailureThreshold, int32(10)) - assert.Equal(t, dhc.circuitTrippedTimeoutFactor, int32(500)) } func setUpAddrCache(r router.Poolable, addrs []protocol.Invoker) router.Cache { diff --git a/cluster/router/conncheck/conn_health_check.go b/cluster/router/conncheck/conn_health_check.go index 54d39f931ba83470eac4f841f2e1fb363ee09ca6..9f05b0695f2b7a569ab669baac7540ca7c896861 100644 --- a/cluster/router/conncheck/conn_health_check.go +++ b/cluster/router/conncheck/conn_health_check.go @@ -15,7 +15,7 @@ * limitations under the License. */ -package healthcheck +package conncheck import ( "github.com/apache/dubbo-go/cluster/router" diff --git a/cluster/router/conncheck/conn_health_check_test.go b/cluster/router/conncheck/conn_health_check_test.go index 39827c5f050a1a5ac9524345b59e37ec23efb90f..7544014972819d24b30fa035524cb033a2f4846c 100644 --- a/cluster/router/conncheck/conn_health_check_test.go +++ b/cluster/router/conncheck/conn_health_check_test.go @@ -15,11 +15,10 @@ * limitations under the License. */ -package healthcheck +package conncheck import ( "fmt" - "math" "testing" ) @@ -29,136 +28,26 @@ import ( import ( "github.com/apache/dubbo-go/common" - "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/protocol" ) const ( - healthCheckDubbo1010IP = "192.168.10.10" - healthCheckDubbo1011IP = "192.168.10.11" - healthCheckMethodTest = "test" - healthCheckDubboUrlFormat = "dubbo://%s:20000/com.ikurento.user.UserProvider" + connCheckDubbo1010IP = "192.168.10.10" + connCheckDubboUrlFormat = "dubbo://%s:20000/com.ikurento.user.UserProvider" ) -func TestDefaultHealthCheckerIsHealthy(t *testing.T) { +func TestDefaultConnCheckerIsHealthy(t *testing.T) { defer protocol.CleanAllStatus() - url, _ := common.NewURL(fmt.Sprintf(healthCheckDubboUrlFormat, healthCheckDubbo1010IP)) - hc := NewDefaultHealthChecker(url).(*DefaultHealthChecker) + url, _ := common.NewURL(fmt.Sprintf(connCheckDubboUrlFormat, connCheckDubbo1010IP)) + cc := NewDefaultConnChecker(url).(*DefaultConnChecker) invoker := NewMockInvoker(url) - healthy := hc.IsHealthy(invoker) + healthy := cc.IsConnHealthy(invoker) assert.True(t, healthy) - url.SetParam(constant.OUTSTANDING_REQUEST_COUNT_LIMIT_KEY, "10") - url.SetParam(constant.SUCCESSIVE_FAILED_REQUEST_THRESHOLD_KEY, "100") - // fake the outgoing request - for i := 0; i < 11; i++ { - request(url, healthCheckMethodTest, 0, true, false) - } - hc = NewDefaultHealthChecker(url).(*DefaultHealthChecker) - healthy = hc.IsHealthy(invoker) - // the outgoing request is more than OUTSTANDING_REQUEST_COUNT_LIMIT, go to unhealthy - assert.False(t, hc.IsHealthy(invoker)) - - // successive failed count is more than constant.SUCCESSIVE_FAILED_REQUEST_THRESHOLD_KEY, go to unhealthy - for i := 0; i < 11; i++ { - request(url, healthCheckMethodTest, 0, false, false) - } - url.SetParam(constant.SUCCESSIVE_FAILED_REQUEST_THRESHOLD_KEY, "10") - url.SetParam(constant.OUTSTANDING_REQUEST_COUNT_LIMIT_KEY, "1000") - hc = NewDefaultHealthChecker(url).(*DefaultHealthChecker) - healthy = hc.IsHealthy(invoker) - assert.False(t, hc.IsHealthy(invoker)) - - // reset successive failed count and go to healthy - request(url, healthCheckMethodTest, 0, false, true) - healthy = hc.IsHealthy(invoker) - assert.True(t, hc.IsHealthy(invoker)) -} - -func TestDefaultHealthCheckerGetCircuitBreakerSleepWindowTime(t *testing.T) { - defer protocol.CleanAllStatus() - url, _ := common.NewURL(fmt.Sprintf(healthCheckDubboUrlFormat, healthCheckDubbo1010IP)) - defaultHc := NewDefaultHealthChecker(url).(*DefaultHealthChecker) - // Increase the number of failed requests - for i := 0; i < 100; i++ { - request(url, healthCheckMethodTest, 1, false, false) - } - sleepWindowTime := defaultHc.getCircuitBreakerSleepWindowTime(protocol.GetURLStatus(url)) - assert.True(t, sleepWindowTime == constant.MAX_CIRCUIT_TRIPPED_TIMEOUT_IN_MS) - - // Adjust the threshold size to 1000 - url.SetParam(constant.SUCCESSIVE_FAILED_REQUEST_THRESHOLD_KEY, "1000") - sleepWindowTime = NewDefaultHealthChecker(url).(*DefaultHealthChecker).getCircuitBreakerSleepWindowTime(protocol.GetURLStatus(url)) - assert.True(t, sleepWindowTime == 0) - - url1, _ := common.NewURL(fmt.Sprintf(healthCheckDubboUrlFormat, healthCheckDubbo1011IP)) - sleepWindowTime = defaultHc.getCircuitBreakerSleepWindowTime(protocol.GetURLStatus(url1)) - assert.True(t, sleepWindowTime == 0) - request(url1, healthCheckMethodTest, 1, false, false) - request(url1, healthCheckMethodTest, 1, false, false) - request(url1, healthCheckMethodTest, 1, false, false) - request(url1, healthCheckMethodTest, 1, false, false) - request(url1, healthCheckMethodTest, 1, false, false) - request(url1, healthCheckMethodTest, 1, false, false) - sleepWindowTime = defaultHc.getCircuitBreakerSleepWindowTime(protocol.GetURLStatus(url1)) - assert.True(t, sleepWindowTime > 0 && sleepWindowTime < constant.MAX_CIRCUIT_TRIPPED_TIMEOUT_IN_MS) -} - -func TestDefaultHealthCheckerGetCircuitBreakerTimeout(t *testing.T) { - defer protocol.CleanAllStatus() - url, _ := common.NewURL(fmt.Sprintf(healthCheckDubboUrlFormat, healthCheckDubbo1010IP)) - defaultHc := NewDefaultHealthChecker(url).(*DefaultHealthChecker) - timeout := defaultHc.getCircuitBreakerTimeout(protocol.GetURLStatus(url)) - assert.True(t, timeout == 0) - url1, _ := common.NewURL(fmt.Sprintf(healthCheckDubboUrlFormat, healthCheckDubbo1011IP)) - request(url1, healthCheckMethodTest, 1, false, false) - request(url1, healthCheckMethodTest, 1, false, false) - request(url1, healthCheckMethodTest, 1, false, false) - request(url1, healthCheckMethodTest, 1, false, false) - request(url1, healthCheckMethodTest, 1, false, false) - request(url1, healthCheckMethodTest, 1, false, false) - timeout = defaultHc.getCircuitBreakerTimeout(protocol.GetURLStatus(url1)) - // timeout must after the current time - assert.True(t, timeout > protocol.CurrentTimeMillis()) - -} - -func TestDefaultHealthCheckerIsCircuitBreakerTripped(t *testing.T) { - defer protocol.CleanAllStatus() - url, _ := common.NewURL(fmt.Sprintf(healthCheckDubboUrlFormat, healthCheckDubbo1010IP)) - defaultHc := NewDefaultHealthChecker(url).(*DefaultHealthChecker) - status := protocol.GetURLStatus(url) - tripped := defaultHc.isCircuitBreakerTripped(status) - assert.False(t, tripped) - // Increase the number of failed requests - for i := 0; i < 100; i++ { - request(url, healthCheckMethodTest, 1, false, false) - } - tripped = defaultHc.isCircuitBreakerTripped(protocol.GetURLStatus(url)) - assert.True(t, tripped) - -} - -func TestNewDefaultHealthChecker(t *testing.T) { - defer protocol.CleanAllStatus() - url, _ := common.NewURL(fmt.Sprintf(healthCheckDubboUrlFormat, healthCheckDubbo1010IP)) - defaultHc := NewDefaultHealthChecker(url).(*DefaultHealthChecker) - assert.NotNil(t, defaultHc) - assert.Equal(t, defaultHc.outStandingRequestConutLimit, int32(math.MaxInt32)) - assert.Equal(t, defaultHc.requestSuccessiveFailureThreshold, int32(constant.DEFAULT_SUCCESSIVE_FAILED_REQUEST_MAX_DIFF)) - - url1, _ := common.NewURL(fmt.Sprintf(healthCheckDubboUrlFormat, healthCheckDubbo1010IP)) - url1.SetParam(constant.OUTSTANDING_REQUEST_COUNT_LIMIT_KEY, "10") - url1.SetParam(constant.SUCCESSIVE_FAILED_REQUEST_THRESHOLD_KEY, "10") - nondefaultHc := NewDefaultHealthChecker(url1).(*DefaultHealthChecker) - assert.NotNil(t, nondefaultHc) - assert.Equal(t, nondefaultHc.outStandingRequestConutLimit, int32(10)) - assert.Equal(t, nondefaultHc.requestSuccessiveFailureThreshold, int32(10)) -} - -func request(url *common.URL, method string, elapsed int64, active, succeeded bool) { - protocol.BeginCount(url, method) - if !active { - protocol.EndCount(url, method, elapsed, succeeded) - } + invoker = NewMockInvoker(url) + cc = NewDefaultConnChecker(url).(*DefaultConnChecker) + healthy = cc.IsConnHealthy(invoker) + // add to black list + protocol.SetInvokerUnhealthyStatus(invoker) + assert.False(t, cc.IsConnHealthy(invoker)) } diff --git a/cluster/router/conncheck/factory.go b/cluster/router/conncheck/factory.go index 05d6a3815ed73e43b84c2f4c672f95d9fdbcaf4a..12498d18d85ea72b31a36c882ee46b5bc6769c01 100644 --- a/cluster/router/conncheck/factory.go +++ b/cluster/router/conncheck/factory.go @@ -15,7 +15,7 @@ * limitations under the License. */ -package healthcheck +package conncheck import ( "github.com/apache/dubbo-go/cluster/router" @@ -28,16 +28,17 @@ func init() { extension.SetRouterFactory(constant.ConnCheckRouterName, newConnCheckRouteFactory) } -// ConnCheckRouteFactory +// ConnCheckRouteFactory is the factory to create conn check router, it aims at filter ip with unhealthy status +// the unhealthy status is storied in protocol/rpc_status.go with sync.Map type ConnCheckRouteFactory struct { } -// newHealthCheckRouteFactory construct a new HealthCheckRouteFactory +// newConnCheckRouteFactory construct a new ConnCheckRouteFactory func newConnCheckRouteFactory() router.PriorityRouterFactory { return &ConnCheckRouteFactory{} } -// NewPriorityRouter construct a new NewHealthCheckRouter via url +// NewPriorityRouter construct a new NewConnCheckRouter via url func (f *ConnCheckRouteFactory) NewPriorityRouter(url *common.URL) (router.PriorityRouter, error) { return NewConnCheckRouter(url) } diff --git a/cluster/router/conncheck/factory_test.go b/cluster/router/conncheck/factory_test.go index 9e1870ae98b21690ab5f0b9c64d25e1fee5502c4..02f8fb472ed808b7db4d7903f94ac96064aedbb8 100644 --- a/cluster/router/conncheck/factory_test.go +++ b/cluster/router/conncheck/factory_test.go @@ -15,7 +15,7 @@ * limitations under the License. */ -package healthcheck +package conncheck import ( "context" diff --git a/cluster/router/healthcheck/health_check_route_test.go b/cluster/router/healthcheck/health_check_route_test.go index 0730f105b7e010311576fc30adf94c8ad69b9614..f088be531ade713d30d29f51cdb2ff1f2ae55584 100644 --- a/cluster/router/healthcheck/health_check_route_test.go +++ b/cluster/router/healthcheck/health_check_route_test.go @@ -50,7 +50,7 @@ const ( func TestHealthCheckRouterRoute(t *testing.T) { defer protocol.CleanAllStatus() consumerURL, _ := common.NewURL(healthCheck1001URL) - consumerURL.SetParam(HEALTH_ROUTE_ENABLED_KEY, "true") + consumerURL.SetParam(constant.HEALTH_ROUTE_ENABLED_KEY, "true") url1, _ := common.NewURL(fmt.Sprintf(healthCheckRouteUrlFormat, healthCheckRoute1010IP)) url2, _ := common.NewURL(fmt.Sprintf(healthCheckRouteUrlFormat, healthCheckRoute1011IP)) url3, _ := common.NewURL(fmt.Sprintf(healthCheckRouteUrlFormat, healthCheckRoute1012IP)) @@ -117,7 +117,7 @@ func TestNewHealthCheckRouter(t *testing.T) { h := hcr.(*HealthCheckRouter) assert.Nil(t, h.checker) - url.SetParam(HEALTH_ROUTE_ENABLED_KEY, "true") + url.SetParam(constant.HEALTH_ROUTE_ENABLED_KEY, "true") hcr, _ = NewHealthCheckRouter(url) h = hcr.(*HealthCheckRouter) assert.NotNil(t, h.checker) diff --git a/protocol/rpc_status.go b/protocol/rpc_status.go index d4988cd938ace89464e28a47ade796c54fed83b2..54435f156c2318ec4ca616f7e488e3fbb8e38c48 100644 --- a/protocol/rpc_status.go +++ b/protocol/rpc_status.go @@ -18,8 +18,6 @@ package protocol import ( - "github.com/apache/dubbo-go/common/constant" - "github.com/apache/dubbo-go/common/logger" "sync" "sync/atomic" "time" @@ -27,6 +25,8 @@ import ( import ( "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/logger" ) var (