diff --git a/examples/dubbo/with-hystrix-go-client/app/example_hystrix_fallback.go b/examples/dubbo/with-hystrix-go-client/app/example_hystrix_fallback.go deleted file mode 100644 index 418f6eae137bf64ca03b4ef89be2b8895c1057f3..0000000000000000000000000000000000000000 --- a/examples/dubbo/with-hystrix-go-client/app/example_hystrix_fallback.go +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package main - -import ( - "github.com/afex/hystrix-go/hystrix" -) -import ( - "github.com/apache/dubbo-go/filter/impl" - "github.com/apache/dubbo-go/protocol" -) - -const EXAMPLE_FALLBACK_NAME = "exampleFallback" - -type ExampleHystrixFallback struct { -} - -//Example for hystrix fallback function -//The 4 parameters: -//The error that caused the fallback; -//The invoker that the filter receives; -//The invocation that should be invoked; -//The copy of circuit breaker for this invocation, for getting its status -func (f *ExampleHystrixFallback) FallbackFunc(err error, invoker protocol.Invoker, invocation protocol.Invocation, cb hystrix.CircuitBreaker) protocol.Result { - result := &protocol.RPCResult{} - if cb.IsOpen() { - result.SetError(nil) - result.SetResult("Mock OK") - } else { - result.SetError(err) - } - return result -} - -//Add the fallback function to the map -//The name MUST be the same as in your config file -func init() { - impl.SetHystrixFallback(EXAMPLE_FALLBACK_NAME, &ExampleHystrixFallback{}) -} diff --git a/filter/impl/default_hystrix_fallback.go b/filter/impl/default_hystrix_fallback.go deleted file mode 100644 index 8afbeca35ae4cb5bcffd403d4bf8c73b795d7f83..0000000000000000000000000000000000000000 --- a/filter/impl/default_hystrix_fallback.go +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package impl - -import ( - "github.com/afex/hystrix-go/hystrix" -) - -import ( - "github.com/apache/dubbo-go/protocol" -) - -type DefaultHystrixFallback struct { -} - -func (d *DefaultHystrixFallback) FallbackFunc(err error, invoker protocol.Invoker, invocation protocol.Invocation, cb hystrix.CircuitBreaker) protocol.Result { - //By default, return nil value and the error occurred - res := &protocol.RPCResult{} - res.SetError(err) - return res -} diff --git a/filter/impl/hystrix_filter.go b/filter/impl/hystrix_filter.go index d827aebb0404eecac5e4cebaaf54f5086e584d34..48b4b57a75b03f2662eaaf069a1aa1c8a6bf33b6 100644 --- a/filter/impl/hystrix_filter.go +++ b/filter/impl/hystrix_filter.go @@ -18,6 +18,8 @@ package impl import ( "fmt" + "regexp" + "sync" ) import ( "github.com/afex/hystrix-go/hystrix" @@ -33,17 +35,17 @@ import ( ) const ( - HYSTRIX = "hystrix" + HYSTRIX_CONSUMER = "hystrix_consumer" + HYSTRIX_PROVIDER = "hystrix_provider" + HYSTRIX = "hystrix" ) -type HystrixFallback interface { - FallbackFunc(err error, invoker protocol.Invoker, invocation protocol.Invocation, cb hystrix.CircuitBreaker) protocol.Result -} - var ( - isConfigLoaded = false - fallback = make(map[string]HystrixFallback) - conf = &HystrixFilterConfig{} + isConsumerConfigLoaded = false + isProviderConfigLoaded = false + confConsumer = &HystrixFilterConfig{} + confProvider = &HystrixFilterConfig{} + configLoadMutex = sync.RWMutex{} //Timeout //MaxConcurrentRequests //RequestVolumeThreshold @@ -52,59 +54,89 @@ var ( ) func init() { - extension.SetFilter(HYSTRIX, GetHystrixFilter) + extension.SetFilter(HYSTRIX_CONSUMER, GetHystrixFilterConsumer) + extension.SetFilter(HYSTRIX_PROVIDER, GetHystrixFilterProvider) +} + +type HystrixFilterError struct { + err error + circuitBreakerOpen bool +} + +func (hfError *HystrixFilterError) Error() string { + return hfError.err.Error() +} + +func (hfError *HystrixFilterError) CbOpen() bool { + return hfError.circuitBreakerOpen +} +func NewHystrixFilterError(err error, cbOpen bool) error { + return &HystrixFilterError{ + err: err, + circuitBreakerOpen: cbOpen, + } } type HystrixFilter struct { - fallback HystrixFallback + COrP bool //true for consumer + res []*regexp.Regexp + ifNewMap sync.Map } func (hf *HystrixFilter) Invoke(invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result { cmdName := fmt.Sprintf("%s&method=%s", invoker.GetUrl().Key(), invocation.MethodName()) - cb, ifNew, err := hystrix.GetCircuit(cmdName) + // Do the configuration if the circuit breaker is created for the first time + if _, load := hf.ifNewMap.LoadOrStore(cmdName, true); !load { + configLoadMutex.Lock() + filterConf := getConfig(invoker.GetUrl().Service(), invocation.MethodName(), hf.COrP) + for _, ptn := range filterConf.Error { + reg, err := regexp.Compile(ptn) + if err != nil { + logger.Warnf("[Hystrix Filter]Errors occurred parsing error omit regexp: %s, %v", ptn, err) + } else { + hf.res = append(hf.res, reg) + } + } + hystrix.ConfigureCommand(cmdName, hystrix.CommandConfig{ + Timeout: filterConf.Timeout, + MaxConcurrentRequests: filterConf.MaxConcurrentRequests, + SleepWindow: filterConf.SleepWindow, + ErrorPercentThreshold: filterConf.ErrorPercentThreshold, + RequestVolumeThreshold: filterConf.RequestVolumeThreshold, + }) + configLoadMutex.Unlock() + } + configLoadMutex.RLock() + cb, _, err := hystrix.GetCircuit(cmdName) + configLoadMutex.RUnlock() if err != nil { logger.Errorf("[Hystrix Filter]Errors occurred getting circuit for %s , will invoke without hystrix, error is: ", cmdName, err) return invoker.Invoke(invocation) } - - // Do the configuration if the circuit breaker is created for the first time - - if ifNew || hf.fallback == nil { - filterConf := getConfig(invoker.GetUrl().Service(), invocation.MethodName()) - if ifNew { - hystrix.ConfigureCommand(cmdName, hystrix.CommandConfig{ - Timeout: filterConf.Timeout, - MaxConcurrentRequests: filterConf.MaxConcurrentRequests, - SleepWindow: filterConf.SleepWindow, - ErrorPercentThreshold: filterConf.ErrorPercentThreshold, - RequestVolumeThreshold: filterConf.RequestVolumeThreshold, - }) - } - if hf.fallback == nil { - hf.fallback = getHystrixFallback(filterConf.Fallback) - } - } - logger.Infof("[Hystrix Filter]Using hystrix filter: %s", cmdName) var result protocol.Result _ = hystrix.Do(cmdName, func() error { result = invoker.Invoke(invocation) - return result.Error() - }, func(err error) error { - //failure logic - logger.Debugf("[Hystrix Filter]Invoke failed, error is: %v, circuit breaker open: %v", err, cb.IsOpen()) - result = hf.fallback.FallbackFunc(err, invoker, invocation, *cb) - - //If user try to return nil in the customized fallback func, it will cause panic - //So check here - if result == nil { - result = &protocol.RPCResult{} + err := result.Error() + if err != nil { + result.SetError(NewHystrixFilterError(err, cb.IsOpen())) + for _, reg := range hf.res { + if reg.MatchString(err.Error()) { + logger.Debugf("[Hystrix Filter]Error in invocation but omitted in circuit breaker: %v", err) + return nil + } + } } - return nil - //failure logic - + return err + }, func(err error) error { + //Return error and circuit breaker's status, so that it can be handled by previous filters. + logger.Debugf("[Hystrix Filter]Enter fallback, error is: %v, circuit breaker open: %v", err, cb.IsOpen()) + result = &protocol.RPCResult{} + result.SetResult(nil) + result.SetError(NewHystrixFilterError(err, cb.IsOpen())) + return err }) return result } @@ -112,21 +144,38 @@ func (hf *HystrixFilter) Invoke(invoker protocol.Invoker, invocation protocol.In func (hf *HystrixFilter) OnResponse(result protocol.Result, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result { return result } -func GetHystrixFilter() filter.Filter { +func GetHystrixFilterConsumer() filter.Filter { //When first called, load the config in - if !isConfigLoaded { - if err := initHystrixConfig(); err != nil { - logger.Warnf("[Hystrix Filter]Config load failed, error is: %v , will use default", err) + if !isConsumerConfigLoaded { + if err := initHystrixConfigConsumer(); err != nil { + logger.Warnf("[Hystrix Filter]Config load failed for consumer, error is: %v , will use default", err) + } + isConsumerConfigLoaded = true + } + + return &HystrixFilter{COrP: true} +} + +func GetHystrixFilterProvider() filter.Filter { + if !isProviderConfigLoaded { + if err := initHystrixConfigProvider(); err != nil { + logger.Warnf("[Hystrix Filter]Config load failed for provider, error is: %v , will use default", err) } - isConfigLoaded = true + isProviderConfigLoaded = true } - return &HystrixFilter{} + return &HystrixFilter{COrP: false} } -func getConfig(service string, method string) CommandConfigWithFallback { +func getConfig(service string, method string, cOrP bool) CommandConfigWithError { //Find method level config + var conf *HystrixFilterConfig + if cOrP { + conf = confConsumer + } else { + conf = confProvider + } getConf := conf.Configs[conf.Services[service].Methods[method]] if getConf != nil { logger.Infof("[Hystrix Filter]Found method-level config for %s - %s", service, method) @@ -144,13 +193,13 @@ func getConfig(service string, method string) CommandConfigWithFallback { logger.Infof("[Hystrix Filter]Found global default config for %s - %s", service, method) return *getConf } - getConf = &CommandConfigWithFallback{} + getConf = &CommandConfigWithError{} logger.Infof("[Hystrix Filter]No config found for %s - %s, using default", service, method) return *getConf } -func initHystrixConfig() error { +func initHystrixConfigConsumer() error { if config.GetConsumerConfig().FilterConf == nil { return perrors.Errorf("no config for hystrix") } @@ -162,44 +211,49 @@ func initHystrixConfig() error { if err != nil { return err } - err = yaml.Unmarshal(hystrixConfByte, conf) + err = yaml.Unmarshal(hystrixConfByte, confConsumer) if err != nil { return err } return nil } - -//For sake of dynamic config -func RefreshHystrix() error { - conf = &HystrixFilterConfig{} - hystrix.Flush() - return initHystrixConfig() -} - -func SetHystrixFallback(name string, fallbackImpl HystrixFallback) { - fallback[name] = fallbackImpl -} - -func getHystrixFallback(name string) HystrixFallback { - fallbackImpl := fallback[name] - if fallbackImpl == nil { - logger.Warnf("[Hystrix Filter]Fallback func not found: %s", name) - fallbackImpl = &DefaultHystrixFallback{} +func initHystrixConfigProvider() error { + if config.GetProviderConfig().FilterConf == nil { + return perrors.Errorf("no config for hystrix") + } + filterConfig := config.GetConsumerConfig().FilterConf.(map[interface{}]interface{})[HYSTRIX] + if filterConfig == nil { + return perrors.Errorf("no config for hystrix") + } + hystrixConfByte, err := yaml.Marshal(filterConfig) + if err != nil { + return err } - return fallbackImpl + err = yaml.Unmarshal(hystrixConfByte, confProvider) + if err != nil { + return err + } + return nil } -type CommandConfigWithFallback struct { - Timeout int `yaml:"timeout"` - MaxConcurrentRequests int `yaml:"max_concurrent_requests"` - RequestVolumeThreshold int `yaml:"request_volume_threshold"` - SleepWindow int `yaml:"sleep_window"` - ErrorPercentThreshold int `yaml:"error_percent_threshold"` - Fallback string `yaml:"fallback"` +//For sake of dynamic config +//func RefreshHystrix() error { +// conf = &HystrixFilterConfig{} +// hystrix.Flush() +// return initHystrixConfig() +//} + +type CommandConfigWithError struct { + Timeout int `yaml:"timeout"` + MaxConcurrentRequests int `yaml:"max_concurrent_requests"` + RequestVolumeThreshold int `yaml:"request_volume_threshold"` + SleepWindow int `yaml:"sleep_window"` + ErrorPercentThreshold int `yaml:"error_percent_threshold"` + Error []string `yaml:"error_omit"` } type HystrixFilterConfig struct { - Configs map[string]*CommandConfigWithFallback + Configs map[string]*CommandConfigWithError Default string Services map[string]ServiceHystrixConfig } diff --git a/filter/impl/hystrix_filter_test.go b/filter/impl/hystrix_filter_test.go index 8b22912d1942f3dbd58d65dcfcd99eb3c2f21bd6..83a21b63328a85060f0ddb0b75fc2816ba64f719 100644 --- a/filter/impl/hystrix_filter_test.go +++ b/filter/impl/hystrix_filter_test.go @@ -17,17 +17,16 @@ package impl import ( + "regexp" "testing" "time" ) import ( - "github.com/afex/hystrix-go/hystrix" "github.com/pkg/errors" "github.com/stretchr/testify/assert" ) import ( "github.com/apache/dubbo-go/protocol" - "github.com/apache/dubbo-go/protocol/dubbo" "github.com/apache/dubbo-go/protocol/invocation" ) @@ -37,36 +36,38 @@ func init() { func mockInitHystrixConfig() { //Mock config - conf = &HystrixFilterConfig{ - make(map[string]*CommandConfigWithFallback), + confConsumer = &HystrixFilterConfig{ + make(map[string]*CommandConfigWithError), "Default", make(map[string]ServiceHystrixConfig), } - conf.Configs["Default"] = &CommandConfigWithFallback{ + confConsumer.Configs["Default"] = &CommandConfigWithError{ 1000, - 10, - 20, + 500, + 18, 5000, 50, - "", + nil, } - conf.Configs["userp"] = &CommandConfigWithFallback{ + confConsumer.Configs["userp"] = &CommandConfigWithError{ 2000, 8, 15, 4000, 45, - "", + nil, } - conf.Configs["userp_m"] = &CommandConfigWithFallback{ + confConsumer.Configs["userp_m"] = &CommandConfigWithError{ 1200, 12, 5, 6000, 60, - "exampleFallback", + []string{ + "exception", + }, } - conf.Services["com.ikurento.user.UserProvider"] = ServiceHystrixConfig{ + confConsumer.Services["com.ikurento.user.UserProvider"] = ServiceHystrixConfig{ "userp", map[string]string{ "GetUser": "userp_m", @@ -76,66 +77,39 @@ func mockInitHystrixConfig() { } func TestGetHystrixFilter(t *testing.T) { - filterGot := GetHystrixFilter() + filterGot := GetHystrixFilterConsumer() assert.NotNil(t, filterGot) } -type MockFallback struct { -} - -func (m *MockFallback) FallbackFunc(err error, invoker protocol.Invoker, invocation protocol.Invocation, cb hystrix.CircuitBreaker) protocol.Result { - return &protocol.RPCResult{Rest: "MOCK"} -} - func TestGetConfig_1(t *testing.T) { - configGot := getConfig("com.ikurento.user.UserProvider", "GetUser") + configGot := getConfig("com.ikurento.user.UserProvider", "GetUser", true) assert.NotNil(t, configGot) assert.Equal(t, 1200, configGot.Timeout) assert.Equal(t, 12, configGot.MaxConcurrentRequests) assert.Equal(t, 6000, configGot.SleepWindow) assert.Equal(t, 60, configGot.ErrorPercentThreshold) assert.Equal(t, 5, configGot.RequestVolumeThreshold) - assert.Equal(t, "exampleFallback", configGot.Fallback) } func TestGetConfig_2(t *testing.T) { - configGot := getConfig("com.ikurento.user.UserProvider", "GetUser0") + configGot := getConfig("com.ikurento.user.UserProvider", "GetUser0", true) assert.NotNil(t, configGot) assert.Equal(t, 2000, configGot.Timeout) assert.Equal(t, 8, configGot.MaxConcurrentRequests) assert.Equal(t, 4000, configGot.SleepWindow) assert.Equal(t, 45, configGot.ErrorPercentThreshold) assert.Equal(t, 15, configGot.RequestVolumeThreshold) - assert.Equal(t, "", configGot.Fallback) } func TestGetConfig_3(t *testing.T) { //This should use default - configGot := getConfig("Mock.Service", "GetMock") + configGot := getConfig("Mock.Service", "GetMock", true) assert.NotNil(t, configGot) assert.Equal(t, 1000, configGot.Timeout) assert.Equal(t, 10, configGot.MaxConcurrentRequests) assert.Equal(t, 5000, configGot.SleepWindow) assert.Equal(t, 50, configGot.ErrorPercentThreshold) assert.Equal(t, 20, configGot.RequestVolumeThreshold) - assert.Equal(t, "", configGot.Fallback) -} - -func TestGetHystrixFallback(t *testing.T) { - fallback["mock"] = &MockFallback{} - fallbackGot := getHystrixFallback("mock") - assert.NotNil(t, fallbackGot) - fallbackGot = getHystrixFallback("notExist") - assert.IsType(t, &DefaultHystrixFallback{}, fallbackGot) -} - -func TestDefaultHystrixFallback_FallbackFunc(t *testing.T) { - cb, _, _ := hystrix.GetCircuit("newCB") - defaultFallback := &DefaultHystrixFallback{} - result := defaultFallback.FallbackFunc(errors.Errorf("error"), &dubbo.DubboInvoker{}, nil, *cb) - assert.NotNil(t, result) - assert.Error(t, result.Error()) - } type testMockSuccessInvoker struct { @@ -155,12 +129,12 @@ type testMockFailInvoker struct { func (iv *testMockFailInvoker) Invoke(invocation protocol.Invocation) protocol.Result { return &protocol.RPCResult{ - Err: errors.Errorf("Error"), + Err: errors.Errorf("exception"), } } func TestHystrixFilter_Invoke_Success(t *testing.T) { - hf := &HystrixFilter{&DefaultHystrixFallback{}} + hf := &HystrixFilter{} result := hf.Invoke(&testMockSuccessInvoker{}, &invocation.RPCInvocation{}) assert.NotNil(t, result) assert.NoError(t, result.Error()) @@ -168,31 +142,14 @@ func TestHystrixFilter_Invoke_Success(t *testing.T) { } func TestHystrixFilter_Invoke_Fail(t *testing.T) { - hf := &HystrixFilter{&DefaultHystrixFallback{}} + hf := &HystrixFilter{} result := hf.Invoke(&testMockFailInvoker{}, &invocation.RPCInvocation{}) assert.NotNil(t, result) assert.Error(t, result.Error()) } -type testHystrixFallback struct { -} - -func (d *testHystrixFallback) FallbackFunc(err error, invoker protocol.Invoker, invocation protocol.Invocation, cb hystrix.CircuitBreaker) protocol.Result { - if cb.IsOpen() { - return &protocol.RPCResult{ - //For the request is blocked due to the circuit breaker is open - Rest: true, - } - } else { - return &protocol.RPCResult{ - //Circuit breaker not open - Rest: false, - } - } -} - func TestHystricFilter_Invoke_CircuitBreak(t *testing.T) { - hf := &HystrixFilter{&testHystrixFallback{}} + hf := &HystrixFilter{COrP: true} resChan := make(chan protocol.Result, 50) for i := 0; i < 50; i++ { go func() { @@ -203,9 +160,30 @@ func TestHystricFilter_Invoke_CircuitBreak(t *testing.T) { time.Sleep(time.Second * 6) var lastRest bool for i := 0; i < 50; i++ { - lastRest = (<-resChan).Result().(bool) + lastRest = (<-resChan).Error().(*HystrixFilterError).CbOpen() } //Normally the last result should be true, which means the circuit has been opened assert.True(t, lastRest) } + +func TestHystricFilter_Invoke_CircuitBreak_Omit_Exception(t *testing.T) { + reg, _ := regexp.Compile(".*exception.*") + regs := []*regexp.Regexp{reg} + hf := &HystrixFilter{res: regs, COrP: true} + resChan := make(chan protocol.Result, 50) + for i := 0; i < 50; i++ { + go func() { + result := hf.Invoke(&testMockFailInvoker{}, &invocation.RPCInvocation{}) + resChan <- result + }() + } + time.Sleep(time.Second * 6) + var lastRest bool + for i := 0; i < 50; i++ { + lastRest = (<-resChan).Error().(*HystrixFilterError).CbOpen() + } + + assert.False(t, lastRest) + +}