diff --git a/examples/dubbo/with-hystrix-go-client/app/client.go b/examples/dubbo/with-hystrix-go-client/app/client.go index 7918e2df0fbb2c488569791beefa6d872ca22564..886fa892001e2cf9e53f0d7c9b248135264a35c0 100644 --- a/examples/dubbo/with-hystrix-go-client/app/client.go +++ b/examples/dubbo/with-hystrix-go-client/app/client.go @@ -56,70 +56,53 @@ func main() { hessian.RegisterJavaEnum(Gender(MAN)) hessian.RegisterJavaEnum(Gender(WOMAN)) hessian.RegisterPOJO(&User{}) + getUserChan := make(chan string, 32) + getErrChan := make(chan string, 32) + getUser1Chan := make(chan string, 32) config.Load() - - println("\n\n\necho") - res, err := userProvider.Echo(context.TODO(), "OK") - if err != nil { - panic(err) - } - println("res: %v\n", res) - - time.Sleep(3e9) - - println("\n\n\nstart to test dubbo") - user := &User{} - err = userProvider.GetUser(context.TODO(), []interface{}{"A003"}, user) - if err != nil { - panic(err) + logger.Debugf("[Start to test GetUser]") + for i := 0; i < 32; i++ { + go func() { + user := &User{} + err := userProvider.GetUser(context.TODO(), []interface{}{"A003"}, user) + getUserChan <- fmt.Sprintf("Result: %s ; Error: %v", user.Name, err) + }() } - println("response result: %v", user) - - println("\n\n\nstart to test dubbo - GetUser0") - ret, err := userProvider.GetUser0("A003", "Moorse") - if err != nil { - panic(err) - } - println("response result: %v", ret) - - println("\n\n\nstart to test dubbo - GetUsers") - ret1, err := userProvider.GetUsers([]interface{}{[]interface{}{"A002", "A003"}}) - if err != nil { - panic(err) + time.Sleep(time.Second * 4) + + logger.Debugf("[Start to test GetErr, without error whitelist]") + for i := 0; i < 32; i++ { + go func() { + user := &User{} + err := userProvider.GetErr(context.TODO(), []interface{}{"A003"}, user) + getErrChan <- fmt.Sprintf("Result: %s ; Error: %v", user.Name, err) + }() } - println("response result: %v", ret1) - - println("\n\n\nstart to test dubbo - getUser") - user = &User{} - var i int32 = 1 - err = userProvider.GetUser2(context.TODO(), []interface{}{i}, user) - if err != nil { - panic(err) + time.Sleep(time.Second * 4) + + logger.Debugf("[Start to test illegal method GetUser1, with error whitelist]") + for i := 0; i < 32; i++ { + go func() { + user := &User{} + err := userProvider.GetUser1(context.TODO(), []interface{}{"A003"}, user) + getUser1Chan <- fmt.Sprintf("Result: %s ; Error: %v", user.Name, err) + }() } - println("response result: %v", user) - - println("\n\n\nstart to test dubbo - GetUser3") - err = userProvider.GetUser3() - if err != nil { - panic(err) + time.Sleep(time.Second * 4) + for i := 1; i < 32; i++ { + resGot := <-getUserChan + logger.Infof("[GetUser] %v", resGot) } - println("succ!") - - println("\n\n\nstart to test dubbo - getErr") - user = &User{} - err = userProvider.GetErr(context.TODO(), []interface{}{"A003"}, user) - if err != nil { - println("getErr - error: %v", err) + for i := 1; i < 32; i++ { + resGot := <-getErrChan + logger.Infof("[GetErr] %v", resGot) } - - println("\n\n\nstart to test dubbo illegal method") - err = userProvider.GetUser1(context.TODO(), []interface{}{"A003"}, user) - if err != nil { - panic(err) + for i := 1; i < 32; i++ { + resGot := <-getUser1Chan + logger.Infof("[GetUser1] %v", resGot) } - - initSignal() + //initSignal() } func initSignal() { @@ -145,7 +128,3 @@ func initSignal() { } } } - -func println(format string, args ...interface{}) { - fmt.Printf("\033[32;40m"+format+"\033[0m\n", args...) -} diff --git a/examples/dubbo/with-hystrix-go-client/profiles/dev/client.yml b/examples/dubbo/with-hystrix-go-client/profiles/dev/client.yml index fd18422f211026151b7f44de151af408a1039841..5b5003fb2cabde05e7d99e738afd584a635355bd 100644 --- a/examples/dubbo/with-hystrix-go-client/profiles/dev/client.yml +++ b/examples/dubbo/with-hystrix-go-client/profiles/dev/client.yml @@ -35,6 +35,7 @@ references: "UserProvider": # 可以指定多个registry,使用逗号隔开;不指定默认向所有注册中心注册 registry: "hangzhouzk" + filter: "example_fallback,hystrix_consumer" protocol : "dubbo" interface : "com.ikurento.user.UserProvider" cluster: "failover" @@ -71,26 +72,27 @@ filter_conf: configs: "Default": timeout : 1000 - max_concurrent_requests : 10 + max_concurrent_requests : 25 sleep_window : 5000 error_percent_threshold : 50 request_volume_threshold: 20 "userp": - timeout: 1200 - max_concurrent_requests: 8 + timeout: 2000 + max_concurrent_requests: 512 sleep_window: 4000 - error_percent_threshold: 45 - request_volume_threshold: 15 + error_percent_threshold: 35 + request_volume_threshold: 6 "userp_m": timeout : 1200 - max_concurrent_requests : 12 + max_concurrent_requests : 512 sleep_window : 6000 error_percent_threshold : 60 - request_volume_threshold: 30 - fallback: "exampleFallback" + request_volume_threshold: 16 + error_whitelist: [".*exception.*"] default: "Default" services: "com.ikurento.user.UserProvider": service_config: "userp" methods: "GetUser": "userp_m" + "GetUser1": "userp_m" \ No newline at end of file diff --git a/examples/dubbo/with-hystrix-go-client/profiles/release/client.yml b/examples/dubbo/with-hystrix-go-client/profiles/release/client.yml index e6fd3f06af92eb0fb113a6729bcbaeae5eafc47e..9263f2d01ff5288144876eb14c08060bdb165974 100644 --- a/examples/dubbo/with-hystrix-go-client/profiles/release/client.yml +++ b/examples/dubbo/with-hystrix-go-client/profiles/release/client.yml @@ -34,6 +34,7 @@ references: "UserProvider": # 可以指定多个registry,使用逗号隔开;不指定默认向所有注册中心注册 registry: "hangzhouzk" + filter: "example_fallback,hystrix_consumer" protocol : "dubbo" # version: "2.0" # group: "as" diff --git a/examples/dubbo/with-hystrix-go-client/profiles/test/client.yml b/examples/dubbo/with-hystrix-go-client/profiles/test/client.yml index 8e3b1c5ebbb4ec90856bf1d781de2d633bd133db..048703b6bb130fbd26f5be88650e950f70355a24 100644 --- a/examples/dubbo/with-hystrix-go-client/profiles/test/client.yml +++ b/examples/dubbo/with-hystrix-go-client/profiles/test/client.yml @@ -34,6 +34,7 @@ references: "UserProvider": # 可以指定多个registry,使用逗号隔开;不指定默认向所有注册中心注册 registry: "hangzhouzk" + filter: "example_fallback,hystrix_consumer" protocol : "dubbo" # version: "2.0" # group: "as" @@ -72,26 +73,27 @@ filter_conf: configs: "Default": timeout : 1000 - max_concurrent_requests : 10 + max_concurrent_requests : 25 sleep_window : 5000 error_percent_threshold : 50 request_volume_threshold: 20 "userp": - timeout: 1200 - max_concurrent_requests: 8 + timeout: 2000 + max_concurrent_requests: 512 sleep_window: 4000 - error_percent_threshold: 45 - request_volume_threshold: 15 + error_percent_threshold: 35 + request_volume_threshold: 6 "userp_m": timeout : 1200 - max_concurrent_requests : 12 + max_concurrent_requests : 512 sleep_window : 6000 error_percent_threshold : 60 - request_volume_threshold: 30 - fallback: "exampleFallback" + request_volume_threshold: 16 + error_whitelist: [".*exception.*"] default: "Default" services: "com.ikurento.user.UserProvider": service_config: "userp" methods: "GetUser": "userp_m" + "GetUser1": "userp_m" \ No newline at end of file diff --git a/filter/impl/hystrix_filter.go b/filter/impl/hystrix_filter.go index 48b4b57a75b03f2662eaaf069a1aa1c8a6bf33b6..7a98e537fb7bbc54c35d8da5bd88217c6373f644 100644 --- a/filter/impl/hystrix_filter.go +++ b/filter/impl/hystrix_filter.go @@ -46,40 +46,39 @@ var ( confConsumer = &HystrixFilterConfig{} confProvider = &HystrixFilterConfig{} configLoadMutex = sync.RWMutex{} - //Timeout - //MaxConcurrentRequests - //RequestVolumeThreshold - //SleepWindow - //ErrorPercentThreshold ) +//The filter in the server end of dubbo-go can't get the invoke result for now, +//this filter ONLY works in CLIENT end (consumer side) temporarily +//Only after the callService logic is integrated into the filter chain of server end can this filter be used, +//which will be done soon func init() { extension.SetFilter(HYSTRIX_CONSUMER, GetHystrixFilterConsumer) extension.SetFilter(HYSTRIX_PROVIDER, GetHystrixFilterProvider) } type HystrixFilterError struct { - err error - circuitBreakerOpen bool + err error + failByHystrix bool } func (hfError *HystrixFilterError) Error() string { return hfError.err.Error() } -func (hfError *HystrixFilterError) CbOpen() bool { - return hfError.circuitBreakerOpen +func (hfError *HystrixFilterError) FailByHystrix() bool { + return hfError.failByHystrix } -func NewHystrixFilterError(err error, cbOpen bool) error { +func NewHystrixFilterError(err error, failByHystrix bool) error { return &HystrixFilterError{ - err: err, - circuitBreakerOpen: cbOpen, + err: err, + failByHystrix: failByHystrix, } } type HystrixFilter struct { COrP bool //true for consumer - res []*regexp.Regexp + res map[string][]*regexp.Regexp ifNewMap sync.Map } @@ -96,7 +95,10 @@ func (hf *HystrixFilter) Invoke(invoker protocol.Invoker, invocation protocol.In if err != nil { logger.Warnf("[Hystrix Filter]Errors occurred parsing error omit regexp: %s, %v", ptn, err) } else { - hf.res = append(hf.res, reg) + if hf.res == nil { + hf.res = make(map[string][]*regexp.Regexp) + } + hf.res[invocation.MethodName()] = append(hf.res[invocation.MethodName()], reg) } } hystrix.ConfigureCommand(cmdName, hystrix.CommandConfig{ @@ -109,7 +111,7 @@ func (hf *HystrixFilter) Invoke(invoker protocol.Invoker, invocation protocol.In configLoadMutex.Unlock() } configLoadMutex.RLock() - cb, _, err := hystrix.GetCircuit(cmdName) + _, _, err := hystrix.GetCircuit(cmdName) configLoadMutex.RUnlock() if err != nil { logger.Errorf("[Hystrix Filter]Errors occurred getting circuit for %s , will invoke without hystrix, error is: ", cmdName, err) @@ -121,21 +123,22 @@ func (hf *HystrixFilter) Invoke(invoker protocol.Invoker, invocation protocol.In result = invoker.Invoke(invocation) err := result.Error() if err != nil { - result.SetError(NewHystrixFilterError(err, cb.IsOpen())) - for _, reg := range hf.res { + result.SetError(NewHystrixFilterError(err, false)) + for _, reg := range hf.res[invocation.MethodName()] { if reg.MatchString(err.Error()) { - logger.Debugf("[Hystrix Filter]Error in invocation but omitted in circuit breaker: %v", err) + logger.Debugf("[Hystrix Filter]Error in invocation but omitted in circuit breaker: %v; %s", err, cmdName) return nil } } } return err }, func(err error) error { - //Return error and circuit breaker's status, so that it can be handled by previous filters. - logger.Debugf("[Hystrix Filter]Enter fallback, error is: %v, circuit breaker open: %v", err, cb.IsOpen()) + //Return error and if it is caused by hystrix logic, so that it can be handled by previous filters. + _, ok := err.(hystrix.CircuitError) + logger.Debugf("[Hystrix Filter]Hystrix health check counted, error is: %v, failed by hystrix: %v; %s", err, ok, cmdName) result = &protocol.RPCResult{} result.SetResult(nil) - result.SetError(NewHystrixFilterError(err, cb.IsOpen())) + result.SetError(NewHystrixFilterError(err, ok)) return err }) return result @@ -249,9 +252,17 @@ type CommandConfigWithError struct { RequestVolumeThreshold int `yaml:"request_volume_threshold"` SleepWindow int `yaml:"sleep_window"` ErrorPercentThreshold int `yaml:"error_percent_threshold"` - Error []string `yaml:"error_omit"` + Error []string `yaml:"error_whitelist"` } +//Config: +//- Timeout: how long to wait for command to complete, in milliseconds +//- MaxConcurrentRequests: how many commands of the same type can run at the same time +//- RequestVolumeThreshold: the minimum number of requests needed before a circuit can be tripped due to health +//- SleepWindow: how long, in milliseconds, to wait after a circuit opens before testing for recovery +//- ErrorPercentThreshold: it causes circuits to open once the rolling measure of errors exceeds this percent of requests +//See hystrix doc + type HystrixFilterConfig struct { Configs map[string]*CommandConfigWithError Default string diff --git a/filter/impl/hystrix_filter_test.go b/filter/impl/hystrix_filter_test.go index 83a21b63328a85060f0ddb0b75fc2816ba64f719..f3cb198eec8fee9b17052dde3b9c1d896e0cbeb2 100644 --- a/filter/impl/hystrix_filter_test.go +++ b/filter/impl/hystrix_filter_test.go @@ -106,10 +106,10 @@ func TestGetConfig_3(t *testing.T) { configGot := getConfig("Mock.Service", "GetMock", true) assert.NotNil(t, configGot) assert.Equal(t, 1000, configGot.Timeout) - assert.Equal(t, 10, configGot.MaxConcurrentRequests) + assert.Equal(t, 500, configGot.MaxConcurrentRequests) assert.Equal(t, 5000, configGot.SleepWindow) assert.Equal(t, 50, configGot.ErrorPercentThreshold) - assert.Equal(t, 20, configGot.RequestVolumeThreshold) + assert.Equal(t, 18, configGot.RequestVolumeThreshold) } type testMockSuccessInvoker struct { @@ -160,7 +160,7 @@ func TestHystricFilter_Invoke_CircuitBreak(t *testing.T) { time.Sleep(time.Second * 6) var lastRest bool for i := 0; i < 50; i++ { - lastRest = (<-resChan).Error().(*HystrixFilterError).CbOpen() + lastRest = (<-resChan).Error().(*HystrixFilterError).FailByHystrix() } //Normally the last result should be true, which means the circuit has been opened assert.True(t, lastRest) @@ -170,7 +170,7 @@ func TestHystricFilter_Invoke_CircuitBreak(t *testing.T) { func TestHystricFilter_Invoke_CircuitBreak_Omit_Exception(t *testing.T) { reg, _ := regexp.Compile(".*exception.*") regs := []*regexp.Regexp{reg} - hf := &HystrixFilter{res: regs, COrP: true} + hf := &HystrixFilter{res: map[string][]*regexp.Regexp{"": regs}, COrP: true} resChan := make(chan protocol.Result, 50) for i := 0; i < 50; i++ { go func() { @@ -181,7 +181,7 @@ func TestHystricFilter_Invoke_CircuitBreak_Omit_Exception(t *testing.T) { time.Sleep(time.Second * 6) var lastRest bool for i := 0; i < 50; i++ { - lastRest = (<-resChan).Error().(*HystrixFilterError).CbOpen() + lastRest = (<-resChan).Error().(*HystrixFilterError).FailByHystrix() } assert.False(t, lastRest)