Skip to content
Snippets Groups Projects
Commit 15189662 authored by 李志信's avatar 李志信
Browse files

fix: add test and fix fmt

parent f7073cf6
Branches
Tags
No related merge requests found
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
* limitations under the License. * limitations under the License.
*/ */
package healthcheck package conncheck
import ( import (
"github.com/RoaringBitmap/roaring" "github.com/RoaringBitmap/roaring"
...@@ -82,11 +82,12 @@ func (r *ConnCheckRouter) ShouldPool() bool { ...@@ -82,11 +82,12 @@ func (r *ConnCheckRouter) ShouldPool() bool {
return true return true
} }
// Name get name of ConnCheckerRouter
func (r *ConnCheckRouter) Name() string { func (r *ConnCheckRouter) Name() string {
return name return name
} }
// Priority // Priority get Router priority level
func (r *ConnCheckRouter) Priority() int64 { func (r *ConnCheckRouter) Priority() int64 {
return 0 return 0
} }
......
...@@ -15,11 +15,10 @@ ...@@ -15,11 +15,10 @@
* limitations under the License. * limitations under the License.
*/ */
package healthcheck package conncheck
import ( import (
"fmt" "fmt"
"math"
"testing" "testing"
"time" "time"
) )
...@@ -33,109 +32,54 @@ import ( ...@@ -33,109 +32,54 @@ import (
"github.com/apache/dubbo-go/cluster/router/chain" "github.com/apache/dubbo-go/cluster/router/chain"
"github.com/apache/dubbo-go/cluster/router/utils" "github.com/apache/dubbo-go/cluster/router/utils"
"github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/protocol" "github.com/apache/dubbo-go/protocol"
"github.com/apache/dubbo-go/protocol/invocation" "github.com/apache/dubbo-go/protocol/invocation"
) )
const ( const (
healthCheckRoute1010IP = "192.168.10.10" connCheckRoute1010IP = "192.168.10.10"
healthCheckRoute1011IP = "192.168.10.11" connCheckRoute1011IP = "192.168.10.11"
healthCheckRoute1012IP = "192.168.10.12" connCheckRoute1012IP = "192.168.10.12"
healthCheckRouteMethodNameTest = "test" connCheckRouteMethodNameTest = "test"
healthCheck1001URL = "dubbo://192.168.10.1/com.ikurento.user.UserProvider" connCheck1001URL = "dubbo://192.168.10.1/com.ikurento.user.UserProvider"
healthCheckRouteUrlFormat = "dubbo://%s:20000/com.ikurento.user.UserProvider" connCheckRouteUrlFormat = "dubbo://%s:20000/com.ikurento.user.UserProvider"
) )
func TestHealthCheckRouterRoute(t *testing.T) { func TestConnCheckRouterRoute(t *testing.T) {
defer protocol.CleanAllStatus() defer protocol.CleanAllStatus()
consumerURL, _ := common.NewURL(healthCheck1001URL) consumerURL, _ := common.NewURL(connCheck1001URL)
consumerURL.SetParam(CONN_ROUTE_ENABLED_KEY, "true") url1, _ := common.NewURL(fmt.Sprintf(connCheckRouteUrlFormat, connCheckRoute1010IP))
url1, _ := common.NewURL(fmt.Sprintf(healthCheckRouteUrlFormat, healthCheckRoute1010IP)) url2, _ := common.NewURL(fmt.Sprintf(connCheckRouteUrlFormat, connCheckRoute1011IP))
url2, _ := common.NewURL(fmt.Sprintf(healthCheckRouteUrlFormat, healthCheckRoute1011IP)) url3, _ := common.NewURL(fmt.Sprintf(connCheckRouteUrlFormat, connCheckRoute1012IP))
url3, _ := common.NewURL(fmt.Sprintf(healthCheckRouteUrlFormat, healthCheckRoute1012IP))
hcr, _ := NewConnCheckRouter(consumerURL) hcr, _ := NewConnCheckRouter(consumerURL)
var invokers []protocol.Invoker var invokers []protocol.Invoker
invoker1 := NewMockInvoker(url1) invoker1 := NewMockInvoker(url1)
invoker2 := NewMockInvoker(url2) invoker2 := NewMockInvoker(url2)
invoker3 := NewMockInvoker(url3) invoker3 := NewMockInvoker(url3)
protocol.SetInvokerUnhealthyStatus(invoker1)
protocol.SetInvokerUnhealthyStatus(invoker2)
invokers = append(invokers, invoker1, invoker2, invoker3) invokers = append(invokers, invoker1, invoker2, invoker3)
inv := invocation.NewRPCInvocation(healthCheckRouteMethodNameTest, nil, nil) inv := invocation.NewRPCInvocation(connCheckRouteMethodNameTest, nil, nil)
res := hcr.Route(utils.ToBitmap(invokers), setUpAddrCache(hcr.(*HealthCheckRouter), invokers), consumerURL, inv) res := hcr.Route(utils.ToBitmap(invokers), setUpAddrCache(hcr.(*ConnCheckRouter), invokers), consumerURL, inv)
// now invoker3 is healthy
assert.True(t, len(res.ToArray()) == 1)
invoker2 = NewMockInvoker(url2)
// check blacklist remove
protocol.RemoveInvokerUnhealthyStatus(invoker1)
res = hcr.Route(utils.ToBitmap(invokers), setUpAddrCache(hcr.(*ConnCheckRouter), invokers), consumerURL, inv)
// now invoker3 invoker1 is healthy
assert.True(t, len(res.ToArray()) == 2)
// check recover
protocol.TryRefreshBlackList()
time.Sleep(time.Second)
res = hcr.Route(utils.ToBitmap(invokers), setUpAddrCache(hcr.(*ConnCheckRouter), invokers), consumerURL, inv)
// now all invokers are healthy // now all invokers are healthy
assert.True(t, len(res.ToArray()) == len(invokers))
for i := 0; i < 10; i++ {
request(url1, healthCheckRouteMethodNameTest, 0, false, false)
}
res = hcr.Route(utils.ToBitmap(invokers), setUpAddrCache(hcr.(*HealthCheckRouter), invokers), consumerURL, inv)
// invokers1 is unhealthy now
assert.True(t, len(res.ToArray()) == 2 && !res.Contains(0))
for i := 0; i < 10; i++ {
request(url1, healthCheckRouteMethodNameTest, 0, false, false)
request(url2, healthCheckRouteMethodNameTest, 0, false, false)
}
res = hcr.Route(utils.ToBitmap(invokers), setUpAddrCache(hcr.(*HealthCheckRouter), invokers), consumerURL, inv)
// only invokers3 is healthy now
assert.True(t, len(res.ToArray()) == 1 && !res.Contains(0) && !res.Contains(1))
for i := 0; i < 10; i++ {
request(url1, healthCheckRouteMethodNameTest, 0, false, false)
request(url2, healthCheckRouteMethodNameTest, 0, false, false)
request(url3, healthCheckRouteMethodNameTest, 0, false, false)
}
res = hcr.Route(utils.ToBitmap(invokers), setUpAddrCache(hcr.(*HealthCheckRouter), invokers), consumerURL, inv)
// now all invokers are unhealthy, so downgraded to all
assert.True(t, len(res.ToArray()) == 3)
// reset the invoker1 successive failed count, so invoker1 go to healthy
request(url1, healthCheckRouteMethodNameTest, 0, false, true)
res = hcr.Route(utils.ToBitmap(invokers), setUpAddrCache(hcr.(*HealthCheckRouter), invokers), consumerURL, inv)
assert.True(t, res.Contains(0))
for i := 0; i < 6; i++ {
request(url1, healthCheckRouteMethodNameTest, 0, false, false)
}
// now all invokers are unhealthy, so downgraded to all again
res = hcr.Route(utils.ToBitmap(invokers), setUpAddrCache(hcr.(*HealthCheckRouter), invokers), consumerURL, inv)
assert.True(t, len(res.ToArray()) == 3) assert.True(t, len(res.ToArray()) == 3)
time.Sleep(time.Second * 2)
// invoker1 go to healthy again after 2s
res = hcr.Route(utils.ToBitmap(invokers), setUpAddrCache(hcr.(*HealthCheckRouter), invokers), consumerURL, inv)
assert.True(t, res.Contains(0))
}
func TestNewHealthCheckRouter(t *testing.T) {
defer protocol.CleanAllStatus()
url, _ := common.NewURL(fmt.Sprintf(healthCheckDubboUrlFormat, healthCheckDubbo1010IP))
hcr, _ := NewHealthCheckRouter(url)
h := hcr.(*HealthCheckRouter)
assert.Nil(t, h.checker)
url.SetParam(HEALTH_ROUTE_ENABLED_KEY, "true")
hcr, _ = NewHealthCheckRouter(url)
h = hcr.(*HealthCheckRouter)
assert.NotNil(t, h.checker)
dhc := h.checker.(*DefaultHealthChecker)
assert.Equal(t, dhc.outStandingRequestConutLimit, int32(math.MaxInt32))
assert.Equal(t, dhc.requestSuccessiveFailureThreshold, int32(constant.DEFAULT_SUCCESSIVE_FAILED_THRESHOLD))
assert.Equal(t, dhc.circuitTrippedTimeoutFactor, int32(constant.DEFAULT_CIRCUIT_TRIPPED_TIMEOUT_FACTOR))
url.SetParam(constant.CIRCUIT_TRIPPED_TIMEOUT_FACTOR_KEY, "500")
url.SetParam(constant.SUCCESSIVE_FAILED_REQUEST_THRESHOLD_KEY, "10")
url.SetParam(constant.OUTSTANDING_REQUEST_COUNT_LIMIT_KEY, "1000")
hcr, _ = NewHealthCheckRouter(url)
h = hcr.(*HealthCheckRouter)
dhc = h.checker.(*DefaultHealthChecker)
assert.Equal(t, dhc.outStandingRequestConutLimit, int32(1000))
assert.Equal(t, dhc.requestSuccessiveFailureThreshold, int32(10))
assert.Equal(t, dhc.circuitTrippedTimeoutFactor, int32(500))
} }
func setUpAddrCache(r router.Poolable, addrs []protocol.Invoker) router.Cache { func setUpAddrCache(r router.Poolable, addrs []protocol.Invoker) router.Cache {
......
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
* limitations under the License. * limitations under the License.
*/ */
package healthcheck package conncheck
import ( import (
"github.com/apache/dubbo-go/cluster/router" "github.com/apache/dubbo-go/cluster/router"
......
...@@ -15,11 +15,10 @@ ...@@ -15,11 +15,10 @@
* limitations under the License. * limitations under the License.
*/ */
package healthcheck package conncheck
import ( import (
"fmt" "fmt"
"math"
"testing" "testing"
) )
...@@ -29,136 +28,26 @@ import ( ...@@ -29,136 +28,26 @@ import (
import ( import (
"github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/protocol" "github.com/apache/dubbo-go/protocol"
) )
const ( const (
healthCheckDubbo1010IP = "192.168.10.10" connCheckDubbo1010IP = "192.168.10.10"
healthCheckDubbo1011IP = "192.168.10.11" connCheckDubboUrlFormat = "dubbo://%s:20000/com.ikurento.user.UserProvider"
healthCheckMethodTest = "test"
healthCheckDubboUrlFormat = "dubbo://%s:20000/com.ikurento.user.UserProvider"
) )
func TestDefaultHealthCheckerIsHealthy(t *testing.T) { func TestDefaultConnCheckerIsHealthy(t *testing.T) {
defer protocol.CleanAllStatus() defer protocol.CleanAllStatus()
url, _ := common.NewURL(fmt.Sprintf(healthCheckDubboUrlFormat, healthCheckDubbo1010IP)) url, _ := common.NewURL(fmt.Sprintf(connCheckDubboUrlFormat, connCheckDubbo1010IP))
hc := NewDefaultHealthChecker(url).(*DefaultHealthChecker) cc := NewDefaultConnChecker(url).(*DefaultConnChecker)
invoker := NewMockInvoker(url) invoker := NewMockInvoker(url)
healthy := hc.IsHealthy(invoker) healthy := cc.IsConnHealthy(invoker)
assert.True(t, healthy) assert.True(t, healthy)
url.SetParam(constant.OUTSTANDING_REQUEST_COUNT_LIMIT_KEY, "10") invoker = NewMockInvoker(url)
url.SetParam(constant.SUCCESSIVE_FAILED_REQUEST_THRESHOLD_KEY, "100") cc = NewDefaultConnChecker(url).(*DefaultConnChecker)
// fake the outgoing request healthy = cc.IsConnHealthy(invoker)
for i := 0; i < 11; i++ { // add to black list
request(url, healthCheckMethodTest, 0, true, false) protocol.SetInvokerUnhealthyStatus(invoker)
} assert.False(t, cc.IsConnHealthy(invoker))
hc = NewDefaultHealthChecker(url).(*DefaultHealthChecker)
healthy = hc.IsHealthy(invoker)
// the outgoing request is more than OUTSTANDING_REQUEST_COUNT_LIMIT, go to unhealthy
assert.False(t, hc.IsHealthy(invoker))
// successive failed count is more than constant.SUCCESSIVE_FAILED_REQUEST_THRESHOLD_KEY, go to unhealthy
for i := 0; i < 11; i++ {
request(url, healthCheckMethodTest, 0, false, false)
}
url.SetParam(constant.SUCCESSIVE_FAILED_REQUEST_THRESHOLD_KEY, "10")
url.SetParam(constant.OUTSTANDING_REQUEST_COUNT_LIMIT_KEY, "1000")
hc = NewDefaultHealthChecker(url).(*DefaultHealthChecker)
healthy = hc.IsHealthy(invoker)
assert.False(t, hc.IsHealthy(invoker))
// reset successive failed count and go to healthy
request(url, healthCheckMethodTest, 0, false, true)
healthy = hc.IsHealthy(invoker)
assert.True(t, hc.IsHealthy(invoker))
}
func TestDefaultHealthCheckerGetCircuitBreakerSleepWindowTime(t *testing.T) {
defer protocol.CleanAllStatus()
url, _ := common.NewURL(fmt.Sprintf(healthCheckDubboUrlFormat, healthCheckDubbo1010IP))
defaultHc := NewDefaultHealthChecker(url).(*DefaultHealthChecker)
// Increase the number of failed requests
for i := 0; i < 100; i++ {
request(url, healthCheckMethodTest, 1, false, false)
}
sleepWindowTime := defaultHc.getCircuitBreakerSleepWindowTime(protocol.GetURLStatus(url))
assert.True(t, sleepWindowTime == constant.MAX_CIRCUIT_TRIPPED_TIMEOUT_IN_MS)
// Adjust the threshold size to 1000
url.SetParam(constant.SUCCESSIVE_FAILED_REQUEST_THRESHOLD_KEY, "1000")
sleepWindowTime = NewDefaultHealthChecker(url).(*DefaultHealthChecker).getCircuitBreakerSleepWindowTime(protocol.GetURLStatus(url))
assert.True(t, sleepWindowTime == 0)
url1, _ := common.NewURL(fmt.Sprintf(healthCheckDubboUrlFormat, healthCheckDubbo1011IP))
sleepWindowTime = defaultHc.getCircuitBreakerSleepWindowTime(protocol.GetURLStatus(url1))
assert.True(t, sleepWindowTime == 0)
request(url1, healthCheckMethodTest, 1, false, false)
request(url1, healthCheckMethodTest, 1, false, false)
request(url1, healthCheckMethodTest, 1, false, false)
request(url1, healthCheckMethodTest, 1, false, false)
request(url1, healthCheckMethodTest, 1, false, false)
request(url1, healthCheckMethodTest, 1, false, false)
sleepWindowTime = defaultHc.getCircuitBreakerSleepWindowTime(protocol.GetURLStatus(url1))
assert.True(t, sleepWindowTime > 0 && sleepWindowTime < constant.MAX_CIRCUIT_TRIPPED_TIMEOUT_IN_MS)
}
func TestDefaultHealthCheckerGetCircuitBreakerTimeout(t *testing.T) {
defer protocol.CleanAllStatus()
url, _ := common.NewURL(fmt.Sprintf(healthCheckDubboUrlFormat, healthCheckDubbo1010IP))
defaultHc := NewDefaultHealthChecker(url).(*DefaultHealthChecker)
timeout := defaultHc.getCircuitBreakerTimeout(protocol.GetURLStatus(url))
assert.True(t, timeout == 0)
url1, _ := common.NewURL(fmt.Sprintf(healthCheckDubboUrlFormat, healthCheckDubbo1011IP))
request(url1, healthCheckMethodTest, 1, false, false)
request(url1, healthCheckMethodTest, 1, false, false)
request(url1, healthCheckMethodTest, 1, false, false)
request(url1, healthCheckMethodTest, 1, false, false)
request(url1, healthCheckMethodTest, 1, false, false)
request(url1, healthCheckMethodTest, 1, false, false)
timeout = defaultHc.getCircuitBreakerTimeout(protocol.GetURLStatus(url1))
// timeout must after the current time
assert.True(t, timeout > protocol.CurrentTimeMillis())
}
func TestDefaultHealthCheckerIsCircuitBreakerTripped(t *testing.T) {
defer protocol.CleanAllStatus()
url, _ := common.NewURL(fmt.Sprintf(healthCheckDubboUrlFormat, healthCheckDubbo1010IP))
defaultHc := NewDefaultHealthChecker(url).(*DefaultHealthChecker)
status := protocol.GetURLStatus(url)
tripped := defaultHc.isCircuitBreakerTripped(status)
assert.False(t, tripped)
// Increase the number of failed requests
for i := 0; i < 100; i++ {
request(url, healthCheckMethodTest, 1, false, false)
}
tripped = defaultHc.isCircuitBreakerTripped(protocol.GetURLStatus(url))
assert.True(t, tripped)
}
func TestNewDefaultHealthChecker(t *testing.T) {
defer protocol.CleanAllStatus()
url, _ := common.NewURL(fmt.Sprintf(healthCheckDubboUrlFormat, healthCheckDubbo1010IP))
defaultHc := NewDefaultHealthChecker(url).(*DefaultHealthChecker)
assert.NotNil(t, defaultHc)
assert.Equal(t, defaultHc.outStandingRequestConutLimit, int32(math.MaxInt32))
assert.Equal(t, defaultHc.requestSuccessiveFailureThreshold, int32(constant.DEFAULT_SUCCESSIVE_FAILED_REQUEST_MAX_DIFF))
url1, _ := common.NewURL(fmt.Sprintf(healthCheckDubboUrlFormat, healthCheckDubbo1010IP))
url1.SetParam(constant.OUTSTANDING_REQUEST_COUNT_LIMIT_KEY, "10")
url1.SetParam(constant.SUCCESSIVE_FAILED_REQUEST_THRESHOLD_KEY, "10")
nondefaultHc := NewDefaultHealthChecker(url1).(*DefaultHealthChecker)
assert.NotNil(t, nondefaultHc)
assert.Equal(t, nondefaultHc.outStandingRequestConutLimit, int32(10))
assert.Equal(t, nondefaultHc.requestSuccessiveFailureThreshold, int32(10))
}
func request(url *common.URL, method string, elapsed int64, active, succeeded bool) {
protocol.BeginCount(url, method)
if !active {
protocol.EndCount(url, method, elapsed, succeeded)
}
} }
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
* limitations under the License. * limitations under the License.
*/ */
package healthcheck package conncheck
import ( import (
"github.com/apache/dubbo-go/cluster/router" "github.com/apache/dubbo-go/cluster/router"
...@@ -28,16 +28,17 @@ func init() { ...@@ -28,16 +28,17 @@ func init() {
extension.SetRouterFactory(constant.ConnCheckRouterName, newConnCheckRouteFactory) extension.SetRouterFactory(constant.ConnCheckRouterName, newConnCheckRouteFactory)
} }
// ConnCheckRouteFactory // ConnCheckRouteFactory is the factory to create conn check router, it aims at filter ip with unhealthy status
// the unhealthy status is storied in protocol/rpc_status.go with sync.Map
type ConnCheckRouteFactory struct { type ConnCheckRouteFactory struct {
} }
// newHealthCheckRouteFactory construct a new HealthCheckRouteFactory // newConnCheckRouteFactory construct a new ConnCheckRouteFactory
func newConnCheckRouteFactory() router.PriorityRouterFactory { func newConnCheckRouteFactory() router.PriorityRouterFactory {
return &ConnCheckRouteFactory{} return &ConnCheckRouteFactory{}
} }
// NewPriorityRouter construct a new NewHealthCheckRouter via url // NewPriorityRouter construct a new NewConnCheckRouter via url
func (f *ConnCheckRouteFactory) NewPriorityRouter(url *common.URL) (router.PriorityRouter, error) { func (f *ConnCheckRouteFactory) NewPriorityRouter(url *common.URL) (router.PriorityRouter, error) {
return NewConnCheckRouter(url) return NewConnCheckRouter(url)
} }
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
* limitations under the License. * limitations under the License.
*/ */
package healthcheck package conncheck
import ( import (
"context" "context"
......
...@@ -50,7 +50,7 @@ const ( ...@@ -50,7 +50,7 @@ const (
func TestHealthCheckRouterRoute(t *testing.T) { func TestHealthCheckRouterRoute(t *testing.T) {
defer protocol.CleanAllStatus() defer protocol.CleanAllStatus()
consumerURL, _ := common.NewURL(healthCheck1001URL) consumerURL, _ := common.NewURL(healthCheck1001URL)
consumerURL.SetParam(HEALTH_ROUTE_ENABLED_KEY, "true") consumerURL.SetParam(constant.HEALTH_ROUTE_ENABLED_KEY, "true")
url1, _ := common.NewURL(fmt.Sprintf(healthCheckRouteUrlFormat, healthCheckRoute1010IP)) url1, _ := common.NewURL(fmt.Sprintf(healthCheckRouteUrlFormat, healthCheckRoute1010IP))
url2, _ := common.NewURL(fmt.Sprintf(healthCheckRouteUrlFormat, healthCheckRoute1011IP)) url2, _ := common.NewURL(fmt.Sprintf(healthCheckRouteUrlFormat, healthCheckRoute1011IP))
url3, _ := common.NewURL(fmt.Sprintf(healthCheckRouteUrlFormat, healthCheckRoute1012IP)) url3, _ := common.NewURL(fmt.Sprintf(healthCheckRouteUrlFormat, healthCheckRoute1012IP))
...@@ -117,7 +117,7 @@ func TestNewHealthCheckRouter(t *testing.T) { ...@@ -117,7 +117,7 @@ func TestNewHealthCheckRouter(t *testing.T) {
h := hcr.(*HealthCheckRouter) h := hcr.(*HealthCheckRouter)
assert.Nil(t, h.checker) assert.Nil(t, h.checker)
url.SetParam(HEALTH_ROUTE_ENABLED_KEY, "true") url.SetParam(constant.HEALTH_ROUTE_ENABLED_KEY, "true")
hcr, _ = NewHealthCheckRouter(url) hcr, _ = NewHealthCheckRouter(url)
h = hcr.(*HealthCheckRouter) h = hcr.(*HealthCheckRouter)
assert.NotNil(t, h.checker) assert.NotNil(t, h.checker)
......
...@@ -18,8 +18,6 @@ ...@@ -18,8 +18,6 @@
package protocol package protocol
import ( import (
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/logger"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time" "time"
...@@ -27,6 +25,8 @@ import ( ...@@ -27,6 +25,8 @@ import (
import ( import (
"github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/logger"
) )
var ( var (
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment