diff --git a/common/constant/default.go b/common/constant/default.go index 4363e3efd55f5960b1ee55b5bef23e6b10c2c1c2..9b69226318d92824de8eccd06ccb1dc88470c2ee 100644 --- a/common/constant/default.go +++ b/common/constant/default.go @@ -46,7 +46,7 @@ const ( const ( DEFAULT_KEY = "default" PREFIX_DEFAULT_KEY = "default." - DEFAULT_SERVICE_FILTERS = "echo,token,accesslog" + DEFAULT_SERVICE_FILTERS = "echo,token,accesslog,tps" DEFAULT_REFERENCE_FILTERS = "" GENERIC_REFERENCE_FILTERS = "generic" GENERIC = "$invoke" diff --git a/common/constant/key.go b/common/constant/key.go index 3006f44732626bfcfd843466f91efd5a88a56f09..dc24c2b5dbe3baae5333641cd99d746332ed705a 100644 --- a/common/constant/key.go +++ b/common/constant/key.go @@ -48,19 +48,26 @@ const ( ) const ( - TIMESTAMP_KEY = "timestamp" - REMOTE_TIMESTAMP_KEY = "remote.timestamp" - CLUSTER_KEY = "cluster" - LOADBALANCE_KEY = "loadbalance" - WEIGHT_KEY = "weight" - WARMUP_KEY = "warmup" - RETRIES_KEY = "retries" - BEAN_NAME = "bean.name" - FAIL_BACK_TASKS_KEY = "failbacktasks" - FORKS_KEY = "forks" - DEFAULT_FORKS = 2 - DEFAULT_TIMEOUT = 1000 - ACCESS_LOG_KEY = "accesslog" + TIMESTAMP_KEY = "timestamp" + REMOTE_TIMESTAMP_KEY = "remote.timestamp" + CLUSTER_KEY = "cluster" + LOADBALANCE_KEY = "loadbalance" + WEIGHT_KEY = "weight" + WARMUP_KEY = "warmup" + RETRIES_KEY = "retries" + BEAN_NAME = "bean.name" + FAIL_BACK_TASKS_KEY = "failbacktasks" + FORKS_KEY = "forks" + DEFAULT_FORKS = 2 + DEFAULT_TIMEOUT = 1000 + ACCESS_LOG_KEY = "accesslog" + TPS_LIMITER_KEY = "tps.limiter" + TPS_REJECTED_EXECUTION_HANDLER_KEY = "tps.limit.rejected.handler" + TPS_LIMIT_RATE_KEY = "tps.limit.rate" + DEFAULT_TPS_LIMIT_RATE = "-1" + TPS_LIMIT_INTERVAL_KEY = "tps.limit.interval" + DEFAULT_TPS_LIMIT_INTERVAL = "60000" + TPS_LIMIT_STRATEGY_KEY = "tps.limit.strategy" ) const ( diff --git a/common/extension/tps_limit.go b/common/extension/tps_limit.go new file mode 100644 index 0000000000000000000000000000000000000000..34272018316bc9312e5af54a1a7d18a9d8bf46e7 --- /dev/null +++ b/common/extension/tps_limit.go @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package extension + +import "github.com/apache/dubbo-go/filter" + +var ( + tpsLimitStrategy = make(map[string]func(rate int, interval int) filter.TpsLimitStrategy) + tpsLimiter = make(map[string]func() filter.TpsLimiter) + tpsRejectedExecutionHandler = make(map[string]func() filter.RejectedExecutionHandler) +) + +func SetTpsLimiter(name string, creator func() filter.TpsLimiter) { + tpsLimiter[name] = creator +} + +func GetTpsLimiter(name string) filter.TpsLimiter { + var creator = tpsLimiter[name] + if creator == nil { + panic("TpsLimiter for " + name + " is not existing, make sure you have import the package " + + "and you have register it by invoking extension.SetTpsLimiter.") + } + return creator() +} + +func SetTpsLimitStrategy(name string, creator func(rate int, interval int) filter.TpsLimitStrategy) { + tpsLimitStrategy[name] = creator +} + +func GetTpsLimitStrategyCreator(name string) func(rate int, interval int) filter.TpsLimitStrategy { + var creator = tpsLimitStrategy[name] + if creator == nil { + panic("TpsLimitStrategy for " + name + " is not existing, make sure you have import the package " + + "and you have register it by invoking extension.SetTpsLimitStrategy.") + } + return creator +} + +func SetTpsRejectedExecutionHandler(name string, creator func() filter.RejectedExecutionHandler) { + tpsRejectedExecutionHandler[name] = creator +} + +func GetTpsRejectedExecutionHandler(name string) filter.RejectedExecutionHandler { + var creator = tpsRejectedExecutionHandler[name] + if creator() == nil { + panic("TpsRejectedExecutionHandler for " + name + " is not existing, make sure you have import the package " + + "and you have register it by invoking extension.SetTpsRejectedExecutionHandler.") + } + return creator() +} \ No newline at end of file diff --git a/filter/impl/access_log_filter.go b/filter/impl/access_log_filter.go index 75c0582937c69b1c3368cd38fd015cd5b0d68315..89fa34952f99057f1d8bb35794a57f9905f5f169 100644 --- a/filter/impl/access_log_filter.go +++ b/filter/impl/access_log_filter.go @@ -132,7 +132,7 @@ func (ef *AccessLogFilter) writeLogToFile(data AccessLogData) { logFile, err := ef.openLogFile(accessLog) if err != nil { - logger.Warnf("Can not open the access log file: %s, %v", accessLog, err) + logger.Warnf("Can not open the access log file: %s, %v", accessLog, err) return } logger.Debugf("Append log to %s", accessLog) diff --git a/filter/impl/only_log_rejected_execution_handler.go b/filter/impl/only_log_rejected_execution_handler.go new file mode 100644 index 0000000000000000000000000000000000000000..8ce4a445937d47e1dc34da8f4013da8e5c8f9b6c --- /dev/null +++ b/filter/impl/only_log_rejected_execution_handler.go @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package impl + +import ( + "sync" + + "github.com/prometheus/common/log" + + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/filter" + "github.com/apache/dubbo-go/protocol" +) + +const HandlerName = "log" + +func init() { + extension.SetTpsRejectedExecutionHandler(HandlerName, GetOnlyRejectedExecutionHandler) + extension.SetTpsRejectedExecutionHandler(constant.DEFAULT_KEY, GetOnlyRejectedExecutionHandler) +} + +var onlyLogHandlerInstance *OnlyLogRejectedExecutionHandler +var onlyLogHandlerOnce sync.Once + +/** + * This implementation only logs the invocation info. + * it always return nil. + */ +type OnlyLogRejectedExecutionHandler struct { + +} + +func (handler *OnlyLogRejectedExecutionHandler) RejectedExecution(url common.URL, invocation protocol.Invocation) protocol.Result { + log.Warnf("The invocation was rejected. url: %s", url.String()) + return nil +} + +func GetOnlyRejectedExecutionHandler() filter.RejectedExecutionHandler { + onlyLogHandlerOnce.Do(func() { + onlyLogHandlerInstance = &OnlyLogRejectedExecutionHandler{} + }) + return onlyLogHandlerInstance +} + diff --git a/filter/impl/tps_limit_filter.go b/filter/impl/tps_limit_filter.go index 6ada1aa61a1de2f14687b5dc478e557f09193879..ae900a33a34cd99cee918f9aa331a34a52a608ad 100644 --- a/filter/impl/tps_limit_filter.go +++ b/filter/impl/tps_limit_filter.go @@ -18,6 +18,7 @@ package impl import ( + "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/filter" "github.com/apache/dubbo-go/protocol" @@ -28,19 +29,37 @@ const ( ) func init() { - extension.SetFilter(TpsLimitFilterKey, GetAccessLogFilter) + extension.SetFilter(TpsLimitFilterKey, GetTpsLimitFilter) } /** - * + * if you wish to use the TpsLimiter, please add the configuration into your service provider configuration: + * for example: + * "UserProvider": + * registry: "hangzhouzk" + * protocol : "dubbo" + * interface : "com.ikurento.user.UserProvider" + * ... # other configuration + * tps.limiter: "method-service", # it should be the name of limiter. if the value is 'default', + * # the MethodServiceTpsLimiterImpl will be used. + * tps.limit.rejected.handler: "default", # optional, or the name of the implementation + * if the value of 'tps.limiter' is nil or empty string, the tps filter will do nothing */ type TpsLimitFilter struct { } func (t TpsLimitFilter) Invoke(invoker protocol.Invoker,invocation protocol.Invocation) protocol.Result { - invoker.GetUrl() - invoker.IsAvailable() + url := invoker.GetUrl() + tpsLimiter := url.GetParam(constant.TPS_LIMITER_KEY, "") + rejectedExeHandler := url.GetParam(constant.TPS_REJECTED_EXECUTION_HANDLER_KEY, constant.DEFAULT_KEY) + if len(tpsLimiter) > 0 { + allow := extension.GetTpsLimiter(tpsLimiter).IsAllowable(invoker.GetUrl(), invocation) + if allow { + return invoker.Invoke(invocation) + } + return extension.GetTpsRejectedExecutionHandler(rejectedExeHandler).RejectedExecution(url, invocation) + } return invoker.Invoke(invocation) } @@ -49,6 +68,5 @@ func (t TpsLimitFilter) OnResponse(result protocol.Result, invoker protocol.Invo } func GetTpsLimitFilter() filter.Filter { - var tpsLimitFilter = TpsLimitFilter{} - return tpsLimitFilter + return &TpsLimitFilter{} } diff --git a/filter/impl/tps_limit_filter_test.go b/filter/impl/tps_limit_filter_test.go index 2ac8beceba9a76b27ddef677f63288b709073730..236f42c95c45929f9a47d0e57b7d54879e43693a 100644 --- a/filter/impl/tps_limit_filter_test.go +++ b/filter/impl/tps_limit_filter_test.go @@ -16,3 +16,82 @@ */ package impl + +import ( + "net/url" + "testing" + + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" + + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/filter" + "github.com/apache/dubbo-go/protocol" + "github.com/apache/dubbo-go/protocol/invocation" +) + +func TestTpsLimitFilter_Invoke_With_No_TpsLimiter(t *testing.T) { + tpsFilter := GetTpsLimitFilter() + invokeUrl := common.NewURLWithOptions( + common.WithParams(url.Values{}), + common.WithParamsValue(constant.TPS_LIMITER_KEY, "")) + attch := make(map[string]string, 0) + + result := tpsFilter.Invoke(protocol.NewBaseInvoker(*invokeUrl), + invocation.NewRPCInvocation("MethodName", []interface{}{"OK"}, attch)) + assert.Nil(t, result.Error()) + assert.Nil(t, result.Result()) + +} + +func TestGenericFilter_Invoke_With_Default_TpsLimiter(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + mockLimiter := filter.NewMockTpsLimiter(ctrl) + mockLimiter.EXPECT().IsAllowable(gomock.Any(), gomock.Any()).Return(true).Times(1) + extension.SetTpsLimiter(constant.DEFAULT_KEY, func() filter.TpsLimiter { + return mockLimiter + }) + + tpsFilter := GetTpsLimitFilter() + invokeUrl := common.NewURLWithOptions( + common.WithParams(url.Values{}), + common.WithParamsValue(constant.TPS_LIMITER_KEY, constant.DEFAULT_KEY)) + attch := make(map[string]string, 0) + + result := tpsFilter.Invoke(protocol.NewBaseInvoker(*invokeUrl), + invocation.NewRPCInvocation("MethodName", []interface{}{"OK"}, attch)) + assert.Nil(t, result.Error()) + assert.Nil(t, result.Result()) +} + +func TestGenericFilter_Invoke_With_Default_TpsLimiter_Not_Allow(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + mockLimiter := filter.NewMockTpsLimiter(ctrl) + mockLimiter.EXPECT().IsAllowable(gomock.Any(), gomock.Any()).Return(false).Times(1) + extension.SetTpsLimiter(constant.DEFAULT_KEY, func() filter.TpsLimiter { + return mockLimiter + }) + + mockResult := &protocol.RPCResult{} + mockRejectedHandler := filter.NewMockRejectedExecutionHandler(ctrl) + mockRejectedHandler.EXPECT().RejectedExecution(gomock.Any(), gomock.Any()).Return(mockResult).Times(1) + + extension.SetTpsRejectedExecutionHandler(constant.DEFAULT_KEY, func() filter.RejectedExecutionHandler { + return mockRejectedHandler + }) + + tpsFilter := GetTpsLimitFilter() + invokeUrl := common.NewURLWithOptions( + common.WithParams(url.Values{}), + common.WithParamsValue(constant.TPS_LIMITER_KEY, constant.DEFAULT_KEY)) + attch := make(map[string]string, 0) + + result := tpsFilter.Invoke(protocol.NewBaseInvoker(*invokeUrl), + invocation.NewRPCInvocation("MethodName", []interface{}{"OK"}, attch)) + assert.Nil(t, result.Error()) + assert.Nil(t, result.Result()) +} diff --git a/filter/impl/tps_limiter_method_service_impl.go b/filter/impl/tps_limiter_method_service_impl.go new file mode 100644 index 0000000000000000000000000000000000000000..8ea32c2ae41283d3ba0c5cbb41c0f6d7b3f767af --- /dev/null +++ b/filter/impl/tps_limiter_method_service_impl.go @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package impl + +import ( + "fmt" + "strconv" + "sync" + + "github.com/modern-go/concurrent" + + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/filter" + "github.com/apache/dubbo-go/protocol" +) + +const name = "method-service" + +func init() { + extension.SetTpsLimiter(constant.DEFAULT_KEY, GetMethodServiceTpsLimiter) + extension.SetTpsLimiter(name, GetMethodServiceTpsLimiter) +} + +/** + * This implementation allows developer to config both method-level and service-level tps limiter. + * for example: + * "UserProvider": + * registry: "hangzhouzk" + * protocol : "dubbo" + * interface : "com.ikurento.user.UserProvider" + * ... # other configuration + * tps.limiter: "method-service" # the name of MethodServiceTpsLimiterImpl. It's the default limiter too. + * tps.interval: 5000 # interval, the time unit is ms + * tps.rate: 300 # the max value in the interval. <0 means that the service will not be limited. + * methods: + * - name: "GetUser" + * tps.interval: 3000 + * tps.rate: 20, # in this case, this configuration in service-level will be ignored. + * - name: "UpdateUser" + * tps.rate: -1, # If the rate<0, the method will be ignored + */ +type MethodServiceTpsLimiterImpl struct { + tpsState *concurrent.Map +} + +func (limiter MethodServiceTpsLimiterImpl) IsAllowable(url common.URL, invocation protocol.Invocation) bool { + + serviceLimitRate, err:= strconv.ParseInt(url.GetParam(constant.TPS_LIMIT_RATE_KEY, + constant.DEFAULT_TPS_LIMIT_RATE), 0, 0) + + if err != nil { + panic(fmt.Sprintf("Can not parse the %s for url %s, please check your configuration!", + constant.TPS_LIMIT_RATE_KEY, url.String())) + } + methodLimitRateConfig := invocation.AttachmentsByKey(constant.TPS_LIMIT_RATE_KEY, "") + + // both method-level and service-level don't have the configuration of tps limit + if serviceLimitRate < 0 && len(methodLimitRateConfig) <= 0 { + return true + } + + limitRate := serviceLimitRate + // the method has tps limit configuration + if len(methodLimitRateConfig) >0 { + limitRate, err = strconv.ParseInt(methodLimitRateConfig, 0, 0) + if err != nil { + panic(fmt.Sprintf("Can not parse the %s for invocation %s # %s, please check your configuration!", + constant.TPS_LIMIT_RATE_KEY, url.ServiceKey(), invocation.MethodName())) + } + } + + // 1. the serviceLimitRate < 0 and methodRateConfig is empty string + // 2. the methodLimitRate < 0 + if limitRate < 0{ + return true + } + + serviceInterval, err := strconv.ParseInt(url.GetParam(constant.TPS_LIMIT_INTERVAL_KEY, + constant.DEFAULT_TPS_LIMIT_INTERVAL), 0, 0) + + if err != nil || serviceInterval <= 0{ + panic(fmt.Sprintf("The %s must be positive, please check your configuration!", + constant.TPS_LIMIT_INTERVAL_KEY)) + } + limitInterval := serviceInterval + methodIntervalConfig := invocation.AttachmentsByKey(constant.TPS_LIMIT_INTERVAL_KEY, "") + // there is the interval configuration of method-level + if len(methodIntervalConfig) > 0 { + limitInterval, err = strconv.ParseInt(methodIntervalConfig, 0, 0) + if err != nil || limitInterval <= 0{ + panic(fmt.Sprintf("The %s for invocation %s # %s must be positive, please check your configuration!", + constant.TPS_LIMIT_INTERVAL_KEY, url.ServiceKey(), invocation.MethodName())) + } + } + + limitTarget := url.ServiceKey() + + // method-level tps limit + if len(methodIntervalConfig) > 0 || len(methodLimitRateConfig) >0 { + limitTarget = limitTarget + "#" + invocation.MethodName() + } + + limitStrategyConfig := invocation.AttachmentsByKey(constant.TPS_LIMIT_STRATEGY_KEY, + url.GetParam(constant.TPS_LIMIT_STRATEGY_KEY, constant.DEFAULT_KEY)) + limitStateCreator := extension.GetTpsLimitStrategyCreator(limitStrategyConfig) + limitState, _ := limiter.tpsState.LoadOrStore(limitTarget, limitStateCreator(int(limitRate), int(limitInterval))) + return limitState.(filter.TpsLimitStrategy).IsAllowable() +} + +var methodServiceTpsLimiterInstance *MethodServiceTpsLimiterImpl +var methodServiceTpsLimiterOnce sync.Once + +func GetMethodServiceTpsLimiter() filter.TpsLimiter { + methodServiceTpsLimiterOnce.Do(func() { + methodServiceTpsLimiterInstance = &MethodServiceTpsLimiterImpl{ + tpsState: concurrent.NewMap(), + } + }) + return methodServiceTpsLimiterInstance +} diff --git a/filter/impl/tps_limiter.go b/filter/rejected_execution_handler.go similarity index 86% rename from filter/impl/tps_limiter.go rename to filter/rejected_execution_handler.go index ee24c8c1b28825e6270dc5f5bce043838691af0b..1773edcbd6d415c57dd1b04e69860831fe831efd 100644 --- a/filter/impl/tps_limiter.go +++ b/filter/rejected_execution_handler.go @@ -14,14 +14,13 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - -package impl +package filter import ( "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/protocol" ) -type TpsLimiter interface { - IsAllowable(common.URL, protocol.Invocation) -} \ No newline at end of file +type RejectedExecutionHandler interface { + RejectedExecution(url common.URL, invocation protocol.Invocation) protocol.Result +} diff --git a/filter/rejected_execution_handler_mock.go b/filter/rejected_execution_handler_mock.go new file mode 100644 index 0000000000000000000000000000000000000000..20c3e575bb3e0e02164a7d4ef01e3db64c6a72c9 --- /dev/null +++ b/filter/rejected_execution_handler_mock.go @@ -0,0 +1,65 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +// Code generated by MockGen. DO NOT EDIT. +// Source: rejected_execution_handler.go + +// Package filter is a generated GoMock package. +package filter + +import ( + common "github.com/apache/dubbo-go/common" + protocol "github.com/apache/dubbo-go/protocol" + gomock "github.com/golang/mock/gomock" + reflect "reflect" +) + +// MockRejectedExecutionHandler is a mock of RejectedExecutionHandler interface +type MockRejectedExecutionHandler struct { + ctrl *gomock.Controller + recorder *MockRejectedExecutionHandlerMockRecorder +} + +// MockRejectedExecutionHandlerMockRecorder is the mock recorder for MockRejectedExecutionHandler +type MockRejectedExecutionHandlerMockRecorder struct { + mock *MockRejectedExecutionHandler +} + +// NewMockRejectedExecutionHandler creates a new mock instance +func NewMockRejectedExecutionHandler(ctrl *gomock.Controller) *MockRejectedExecutionHandler { + mock := &MockRejectedExecutionHandler{ctrl: ctrl} + mock.recorder = &MockRejectedExecutionHandlerMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use +func (m *MockRejectedExecutionHandler) EXPECT() *MockRejectedExecutionHandlerMockRecorder { + return m.recorder +} + +// RejectedExecution mocks base method +func (m *MockRejectedExecutionHandler) RejectedExecution(url common.URL, invocation protocol.Invocation) protocol.Result { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "RejectedExecution", url, invocation) + ret0, _ := ret[0].(protocol.Result) + return ret0 +} + +// RejectedExecution indicates an expected call of RejectedExecution +func (mr *MockRejectedExecutionHandlerMockRecorder) RejectedExecution(url, invocation interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RejectedExecution", reflect.TypeOf((*MockRejectedExecutionHandler)(nil).RejectedExecution), url, invocation) +} diff --git a/filter/impl/tps_limiter_fixed_window_impl.go b/filter/tps_limit_strategy.go similarity index 71% rename from filter/impl/tps_limiter_fixed_window_impl.go rename to filter/tps_limit_strategy.go index 6f4a3360f9e23271295d1b4a885b756fe3ded561..da5144054108118c9c87948d119a834ce32200a0 100644 --- a/filter/impl/tps_limiter_fixed_window_impl.go +++ b/filter/tps_limit_strategy.go @@ -15,21 +15,11 @@ * limitations under the License. */ -package impl - -import ( - "github.com/apache/dubbo-go/common" - "github.com/apache/dubbo-go/protocol" -) - -type FixedWindowTpsLimiterImpl struct { - -} - -func (limiter FixedWindowTpsLimiterImpl) IsAllowable(common.URL, protocol.Invocation) { - panic("implement me") +package filter +/* + * please register your implementation by invoking SetTpsLimitStrategy + */ +type TpsLimitStrategy interface { + IsAllowable() bool } -func NewInstance() TpsLimiter { - return FixedWindowTpsLimiterImpl{} -} diff --git a/filter/tps_limiter.go b/filter/tps_limiter.go new file mode 100644 index 0000000000000000000000000000000000000000..ca945cc70079286595bab4eee4665f304174f0b1 --- /dev/null +++ b/filter/tps_limiter.go @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package filter + +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/protocol" +) +/* + * please register your implementation by invoking SetTpsLimiter + * The usage, for example: + * "UserProvider": + * registry: "hangzhouzk" + * protocol : "dubbo" + * interface : "com.ikurento.user.UserProvider" + * ... # other configuration + * tps.limiter: "the name of limiter", + */ +type TpsLimiter interface { + IsAllowable(common.URL, protocol.Invocation) bool +} + + + diff --git a/filter/tps_limiter_mock.go b/filter/tps_limiter_mock.go new file mode 100644 index 0000000000000000000000000000000000000000..5fbf7176147a1168164762657799f6c903d196a7 --- /dev/null +++ b/filter/tps_limiter_mock.go @@ -0,0 +1,65 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +// Code generated by MockGen. DO NOT EDIT. +// Source: tps_limiter.go + +// Package filter is a generated GoMock package. +package filter + +import ( + common "github.com/apache/dubbo-go/common" + protocol "github.com/apache/dubbo-go/protocol" + gomock "github.com/golang/mock/gomock" + reflect "reflect" +) + +// MockTpsLimiter is a mock of TpsLimiter interface +type MockTpsLimiter struct { + ctrl *gomock.Controller + recorder *MockTpsLimiterMockRecorder +} + +// MockTpsLimiterMockRecorder is the mock recorder for MockTpsLimiter +type MockTpsLimiterMockRecorder struct { + mock *MockTpsLimiter +} + +// NewMockTpsLimiter creates a new mock instance +func NewMockTpsLimiter(ctrl *gomock.Controller) *MockTpsLimiter { + mock := &MockTpsLimiter{ctrl: ctrl} + mock.recorder = &MockTpsLimiterMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use +func (m *MockTpsLimiter) EXPECT() *MockTpsLimiterMockRecorder { + return m.recorder +} + +// IsAllowable mocks base method +func (m *MockTpsLimiter) IsAllowable(arg0 common.URL, arg1 protocol.Invocation) bool { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "IsAllowable", arg0, arg1) + ret0, _ := ret[0].(bool) + return ret0 +} + +// IsAllowable indicates an expected call of IsAllowable +func (mr *MockTpsLimiterMockRecorder) IsAllowable(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsAllowable", reflect.TypeOf((*MockTpsLimiter)(nil).IsAllowable), arg0, arg1) +} diff --git a/go.mod b/go.mod index be1c80bd170fe15a05eef55e013607265c282b2a..8171e8e429b243dffaafe2b24915ac90af742d80 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,8 @@ module github.com/apache/dubbo-go require ( github.com/Workiva/go-datastructures v1.0.50 github.com/afex/hystrix-go v0.0.0-20180502004556-fa1af6a1f4f5 + github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 // indirect + github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d // indirect github.com/aliyun/alibaba-cloud-sdk-go v0.0.0-20190802083043-4cd0c391755e // indirect github.com/apache/dubbo-go-hessian2 v1.2.5-0.20190923055845-e3dd5d131df5 github.com/buger/jsonparser v0.0.0-20181115193947-bf1c66bbce23 // indirect @@ -34,11 +36,12 @@ require ( github.com/nacos-group/nacos-sdk-go v0.0.0-20190723125407-0242d42e3dbb github.com/pkg/errors v0.8.1 github.com/prometheus/client_golang v1.1.0 // indirect + github.com/prometheus/common v0.6.0 github.com/samuel/go-zookeeper v0.0.0-20180130194729-c4fab1ac1bec github.com/satori/go.uuid v1.2.0 github.com/smartystreets/goconvey v0.0.0-20190710185942-9d28bd7c0945 // indirect github.com/soheilhy/cmux v0.1.4 // indirect - github.com/stretchr/testify v1.3.0 + github.com/stretchr/testify v1.4.0 github.com/tebeka/strftime v0.1.3 // indirect github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5 // indirect github.com/toolkits/concurrent v0.0.0-20150624120057-a4371d70e3e3 // indirect diff --git a/go.sum b/go.sum index 19b148133b736a6891ad2774db2f010a0b5691ca..3ae72390a6115b032f08924a8141bee04efbb3a8 100644 --- a/go.sum +++ b/go.sum @@ -31,7 +31,11 @@ github.com/abdullin/seq v0.0.0-20160510034733-d5467c17e7af/go.mod h1:5Jv4cbFiHJM github.com/afex/hystrix-go v0.0.0-20180502004556-fa1af6a1f4f5 h1:rFw4nCn9iMW+Vajsk51NtYIcwSTkXr+JGrMd36kTDJw= github.com/afex/hystrix-go v0.0.0-20180502004556-fa1af6a1f4f5/go.mod h1:SkGFH1ia65gfNATL8TAiHDNxPzPdmEL5uirI2Uyuz6c= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= +github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 h1:JYp7IbQjafoB+tBA3gMyHYHrpOtNuDiK/uB5uXxq5wM= +github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= +github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d h1:UQZhZ2O0vMHr2cI+DC1Mbh0TJxzA3RcLoMsFw+aXw7E= +github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho= github.com/aliyun/alibaba-cloud-sdk-go v0.0.0-20190802083043-4cd0c391755e h1:MSuLXx/mveDbpDNhVrcWTMeV4lbYWKcyO4rH+jAxmX0= github.com/aliyun/alibaba-cloud-sdk-go v0.0.0-20190802083043-4cd0c391755e/go.mod h1:myCDvQSzCW+wB1WAlocEru4wMGJxy+vlxHdhegi1CDQ= github.com/aliyun/aliyun-oss-go-sdk v0.0.0-20190307165228-86c17b95fcd5/go.mod h1:T/Aws4fEfogEE9v+HPhhw+CntffsBHJ8nXQCwKr0/g8= @@ -446,6 +450,8 @@ github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+ github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk= +github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/tebeka/strftime v0.1.3 h1:5HQXOqWKYRFfNyBMNVc9z5+QzuBtIXy03psIhtdJYto= github.com/tebeka/strftime v0.1.3/go.mod h1:7wJm3dZlpr4l/oVK0t1HYIc4rMzQ2XJlOMIUJUJH6XQ= github.com/tent/http-link-go v0.0.0-20130702225549-ac974c61c2f9/go.mod h1:RHkNRtSLfOK7qBTHaeSX1D6BNpI3qw7NTxsmNr4RvN8= @@ -534,6 +540,7 @@ google.golang.org/grpc v1.19.1/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZi google.golang.org/grpc v1.22.1 h1:/7cs52RnTJmD43s3uxzlq2U7nqVTd/37viQwMrMNlOM= google.golang.org/grpc v1.22.1/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= gopkg.in/airbrake/gobrake.v2 v2.0.9/go.mod h1:/h5ZAUhDkGaJfjzjKLSjv6zCL6O0LLBxU4K+aSYdM/U= +gopkg.in/alecthomas/kingpin.v2 v2.2.6 h1:jMFz6MfLP0/4fUyZle81rXUoxOBFi19VUFKVDOQfozc= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/asn1-ber.v1 v1.0.0-20181015200546-f715ec2f112d h1:TxyelI5cVkbREznMhfzycHdkp5cLA7DpE+GKjSslYhM= gopkg.in/asn1-ber.v1 v1.0.0-20181015200546-f715ec2f112d/go.mod h1:cuepJuh7vyXfUyUwEgHQXw849cJrilpS5NeIjOWESAw=