diff --git a/README.md b/README.md index da87a936ee53bc10dd68b156f0dc855f12c9d7e3..6ff358d9d603cb25833a36afb774cc46595e63f5 100644 --- a/README.md +++ b/README.md @@ -34,13 +34,12 @@ Finished List: - Dynamic Configure Center & Service Management Configurator: Zookeeper - Cluster Strategy: Failover/[Failfast](https://github.com/apache/dubbo-go/pull/140)/[Failsafe/Failback](https://github.com/apache/dubbo-go/pull/136)/[Available](https://github.com/apache/dubbo-go/pull/155)/[Broadcast](https://github.com/apache/dubbo-go/pull/158)/[Forking](https://github.com/apache/dubbo-go/pull/161) - Load Balance: Random/[RoundRobin](https://github.com/apache/dubbo-go/pull/66)/[LeastActive](https://github.com/apache/dubbo-go/pull/65) -- Filter: Echo Health Check/[Circuit break and service downgrade](https://github.com/apache/dubbo-go/pull/133)/[TokenFilter](https://github.com/apache/dubbo-go/pull/202)/[AccessLogFilter](https://github.com/apache/dubbo-go/pull/214)/[TpsLimitFilter](https://github.com/apache/dubbo-go/pull/237) +- Filter: Echo Health Check/[Circuit break and service downgrade](https://github.com/apache/dubbo-go/pull/133)/[TokenFilter](https://github.com/apache/dubbo-go/pull/202)/[AccessLogFilter](https://github.com/apache/dubbo-go/pull/214)/[TpsLimitFilter](https://github.com/apache/dubbo-go/pull/237)/[ExecuteLimitFilter](https://github.com/apache/dubbo-go/pull/246) - Other feature: [generic invoke](https://github.com/apache/dubbo-go/pull/122)/start check/connecting certain provider/multi-protocols/multi-registries/multi-versions/service group Working List: - Load Balance: ConsistentHash -- Filter: CountFilter/ExecuteLimitFilter - Registry: k8s - Configure Center: apollo - Metadata Center (dubbo v2.7.x) diff --git a/README_CN.md b/README_CN.md index adf87c2805a05bf1e3a6592188625eff7ee7b1fa..7a7b061960e8dcc49f1da0ba6490ca8bbbf09e22 100644 --- a/README_CN.md +++ b/README_CN.md @@ -33,14 +33,13 @@ Apache License, Version 2.0 - 鍔ㄦ€侀厤缃腑蹇冧笌鏈嶅姟娌荤悊閰嶇疆鍣紙config center锛�: Zookeeper - 闆嗙兢绛栫暐: Failover/[Failfast](https://github.com/apache/dubbo-go/pull/140)/[Failsafe/Failback](https://github.com/apache/dubbo-go/pull/136)/[Available](https://github.com/apache/dubbo-go/pull/155)/[Broadcast](https://github.com/apache/dubbo-go/pull/158)/[Forking](https://github.com/apache/dubbo-go/pull/161) - 璐熻浇鍧囪 绛栫暐: Random/[RoundRobin](https://github.com/apache/dubbo-go/pull/66)/[LeastActive](https://github.com/apache/dubbo-go/pull/65) -- 杩囨护鍣�: Echo Health Check/[鏈嶅姟鐔旀柇&闄嶇骇](https://github.com/apache/dubbo-go/pull/133)/[TokenFilter](https://github.com/apache/dubbo-go/pull/202)/[AccessLogFilter](https://github.com/apache/dubbo-go/pull/214)/[TpsLimitFilter](https://github.com/apache/dubbo-go/pull/237) +- 杩囨护鍣�: Echo Health Check/[鏈嶅姟鐔旀柇&闄嶇骇](https://github.com/apache/dubbo-go/pull/133)/[TokenFilter](https://github.com/apache/dubbo-go/pull/202)/[AccessLogFilter](https://github.com/apache/dubbo-go/pull/214)/[TpsLimitFilter](https://github.com/apache/dubbo-go/pull/237)[ExecuteLimitFilter](https://github.com/apache/dubbo-go/pull/246) - 鍏朵粬鍔熻兘鏀寔: [娉涘寲璋冪敤](https://github.com/apache/dubbo-go/pull/122)/鍚姩鏃舵鏌�/鏈嶅姟鐩磋繛/澶氭湇鍔″崗璁�/澶氭敞鍐屼腑蹇�/澶氭湇鍔$増鏈�/鏈嶅姟鍒嗙粍 寮€鍙戜腑鍒楄〃: - 闆嗙兢绛栫暐: Forking - 璐熻浇鍧囪 绛栫暐: ConsistentHash -- 杩囨护鍣�: CountFilter/ExecuteLimitFilter - 娉ㄥ唽涓績: k8s - 閰嶇疆涓績: apollo - 鍏冩暟鎹腑蹇� (dubbo v2.7.x) diff --git a/common/constant/default.go b/common/constant/default.go index 9b69226318d92824de8eccd06ccb1dc88470c2ee..cb6d68af0561d44f4306f16973a89759c9a9ac37 100644 --- a/common/constant/default.go +++ b/common/constant/default.go @@ -46,7 +46,7 @@ const ( const ( DEFAULT_KEY = "default" PREFIX_DEFAULT_KEY = "default." - DEFAULT_SERVICE_FILTERS = "echo,token,accesslog,tps" + DEFAULT_SERVICE_FILTERS = "echo,token,accesslog,tps,execute" DEFAULT_REFERENCE_FILTERS = "" GENERIC_REFERENCE_FILTERS = "generic" GENERIC = "$invoke" diff --git a/common/constant/key.go b/common/constant/key.go index dc24c2b5dbe3baae5333641cd99d746332ed705a..ff371d08c669df07f24a3e81e0be0a29a6b17ddd 100644 --- a/common/constant/key.go +++ b/common/constant/key.go @@ -48,26 +48,29 @@ const ( ) const ( - TIMESTAMP_KEY = "timestamp" - REMOTE_TIMESTAMP_KEY = "remote.timestamp" - CLUSTER_KEY = "cluster" - LOADBALANCE_KEY = "loadbalance" - WEIGHT_KEY = "weight" - WARMUP_KEY = "warmup" - RETRIES_KEY = "retries" - BEAN_NAME = "bean.name" - FAIL_BACK_TASKS_KEY = "failbacktasks" - FORKS_KEY = "forks" - DEFAULT_FORKS = 2 - DEFAULT_TIMEOUT = 1000 - ACCESS_LOG_KEY = "accesslog" - TPS_LIMITER_KEY = "tps.limiter" - TPS_REJECTED_EXECUTION_HANDLER_KEY = "tps.limit.rejected.handler" - TPS_LIMIT_RATE_KEY = "tps.limit.rate" - DEFAULT_TPS_LIMIT_RATE = "-1" - TPS_LIMIT_INTERVAL_KEY = "tps.limit.interval" - DEFAULT_TPS_LIMIT_INTERVAL = "60000" - TPS_LIMIT_STRATEGY_KEY = "tps.limit.strategy" + TIMESTAMP_KEY = "timestamp" + REMOTE_TIMESTAMP_KEY = "remote.timestamp" + CLUSTER_KEY = "cluster" + LOADBALANCE_KEY = "loadbalance" + WEIGHT_KEY = "weight" + WARMUP_KEY = "warmup" + RETRIES_KEY = "retries" + BEAN_NAME = "bean.name" + FAIL_BACK_TASKS_KEY = "failbacktasks" + FORKS_KEY = "forks" + DEFAULT_FORKS = 2 + DEFAULT_TIMEOUT = 1000 + ACCESS_LOG_KEY = "accesslog" + TPS_LIMITER_KEY = "tps.limiter" + TPS_REJECTED_EXECUTION_HANDLER_KEY = "tps.limit.rejected.handler" + TPS_LIMIT_RATE_KEY = "tps.limit.rate" + DEFAULT_TPS_LIMIT_RATE = "-1" + TPS_LIMIT_INTERVAL_KEY = "tps.limit.interval" + DEFAULT_TPS_LIMIT_INTERVAL = "60000" + TPS_LIMIT_STRATEGY_KEY = "tps.limit.strategy" + EXECUTE_LIMIT_KEY = "execute.limit" + DEFAULT_EXECUTE_LIMIT = "-1" + EXECUTE_REJECTED_EXECUTION_HANDLER_KEY = "execute.limit.rejected.handler" ) const ( diff --git a/common/extension/filter.go b/common/extension/filter.go index d6c7f6f694cc9888bd63c1d123bc57ee86f15e1f..e2a66c7449448a2229c53eabb478f2c96a429bc7 100644 --- a/common/extension/filter.go +++ b/common/extension/filter.go @@ -19,10 +19,12 @@ package extension import ( "github.com/apache/dubbo-go/filter" + "github.com/apache/dubbo-go/filter/common" ) var ( - filters = make(map[string]func() filter.Filter) + filters = make(map[string]func() filter.Filter) + rejectedExecutionHandler = make(map[string]func() common.RejectedExecutionHandler) ) func SetFilter(name string, v func() filter.Filter) { @@ -35,3 +37,16 @@ func GetFilter(name string) filter.Filter { } return filters[name]() } + +func SetRejectedExecutionHandler(name string, creator func() common.RejectedExecutionHandler) { + rejectedExecutionHandler[name] = creator +} + +func GetRejectedExecutionHandler(name string) common.RejectedExecutionHandler { + creator, ok := rejectedExecutionHandler[name] + if !ok { + panic("RejectedExecutionHandler for " + name + " is not existing, make sure you have import the package " + + "and you have register it by invoking extension.SetRejectedExecutionHandler.") + } + return creator() +} diff --git a/common/extension/tps_limit.go b/common/extension/tps_limit.go index 7d5cb8e9f6465a7a6aab98fa2ed484c4715c31c3..65891c79336224f59b66f8312693c6b5151a7e28 100644 --- a/common/extension/tps_limit.go +++ b/common/extension/tps_limit.go @@ -22,9 +22,8 @@ import ( ) var ( - tpsLimitStrategy = make(map[string]func(rate int, interval int) tps.TpsLimitStrategy) - tpsLimiter = make(map[string]func() tps.TpsLimiter) - tpsRejectedExecutionHandler = make(map[string]func() tps.RejectedExecutionHandler) + tpsLimitStrategy = make(map[string]func(rate int, interval int) tps.TpsLimitStrategy) + tpsLimiter = make(map[string]func() tps.TpsLimiter) ) func SetTpsLimiter(name string, creator func() tps.TpsLimiter) { @@ -52,16 +51,3 @@ func GetTpsLimitStrategyCreator(name string) func(rate int, interval int) tps.Tp } return creator } - -func SetTpsRejectedExecutionHandler(name string, creator func() tps.RejectedExecutionHandler) { - tpsRejectedExecutionHandler[name] = creator -} - -func GetTpsRejectedExecutionHandler(name string) tps.RejectedExecutionHandler { - creator, ok := tpsRejectedExecutionHandler[name] - if !ok { - panic("TpsRejectedExecutionHandler for " + name + " is not existing, make sure you have import the package " + - "and you have register it by invoking extension.SetTpsRejectedExecutionHandler.") - } - return creator() -} diff --git a/config/method_config.go b/config/method_config.go index 431a30c1ddc1406c5537810fc47b23dcb0f5838a..e3f0b1b01b5c6d753da216ecf906aee3bf305944 100644 --- a/config/method_config.go +++ b/config/method_config.go @@ -25,15 +25,17 @@ import ( ) type MethodConfig struct { - InterfaceId string - InterfaceName string - Name string `yaml:"name" json:"name,omitempty" property:"name"` - Retries string `yaml:"retries" json:"retries,omitempty" property:"retries"` - Loadbalance string `yaml:"loadbalance" json:"loadbalance,omitempty" property:"loadbalance"` - Weight int64 `yaml:"weight" json:"weight,omitempty" property:"weight"` - TpsLimitInterval string `yaml:"tps.limit.interval" json:"tps.limit.interval,omitempty" property:"tps.limit.interval"` - TpsLimitRate string `yaml:"tps.limit.rate" json:"tps.limit.rate,omitempty" property:"tps.limit.rate"` - TpsLimitStrategy string `yaml:"tps.limit.strategy" json:"tps.limit.strategy,omitempty" property:"tps.limit.strategy"` + InterfaceId string + InterfaceName string + Name string `yaml:"name" json:"name,omitempty" property:"name"` + Retries string `yaml:"retries" json:"retries,omitempty" property:"retries"` + Loadbalance string `yaml:"loadbalance" json:"loadbalance,omitempty" property:"loadbalance"` + Weight int64 `yaml:"weight" json:"weight,omitempty" property:"weight"` + TpsLimitInterval string `yaml:"tps.limit.interval" json:"tps.limit.interval,omitempty" property:"tps.limit.interval"` + TpsLimitRate string `yaml:"tps.limit.rate" json:"tps.limit.rate,omitempty" property:"tps.limit.rate"` + TpsLimitStrategy string `yaml:"tps.limit.strategy" json:"tps.limit.strategy,omitempty" property:"tps.limit.strategy"` + ExecuteLimit string `yaml:"execute.limit" json:"execute.limit,omitempty" property:"execute.limit"` + ExecuteLimitRejectedHandler string `yaml:"execute.limit.rejected.handler" json:"execute.limit.rejected.handler,omitempty" property:"execute.limit.rejected.handler"` } func (c *MethodConfig) Prefix() string { diff --git a/config/service_config.go b/config/service_config.go index fb65567a4b4163730a82fb99e7c309d7b35d3cf0..c17846322e20120bfdf00f1afe24bd20efe7510b 100644 --- a/config/service_config.go +++ b/config/service_config.go @@ -43,32 +43,35 @@ import ( ) type ServiceConfig struct { - context context.Context - id string - Filter string `yaml:"filter" json:"filter,omitempty" property:"filter"` - Protocol string `default:"dubbo" required:"true" yaml:"protocol" json:"protocol,omitempty" property:"protocol"` // multi protocol support, split by ',' - InterfaceName string `required:"true" yaml:"interface" json:"interface,omitempty" property:"interface"` - Registry string `yaml:"registry" json:"registry,omitempty" property:"registry"` - Cluster string `default:"failover" yaml:"cluster" json:"cluster,omitempty" property:"cluster"` - Loadbalance string `default:"random" yaml:"loadbalance" json:"loadbalance,omitempty" property:"loadbalance"` - Group string `yaml:"group" json:"group,omitempty" property:"group"` - Version string `yaml:"version" json:"version,omitempty" property:"version" ` - Methods []*MethodConfig `yaml:"methods" json:"methods,omitempty" property:"methods"` - Warmup string `yaml:"warmup" json:"warmup,omitempty" property:"warmup"` - Retries string `yaml:"retries" json:"retries,omitempty" property:"retries"` - Params map[string]string `yaml:"params" json:"params,omitempty" property:"params"` - Token string `yaml:"token" json:"token,omitempty" property:"token"` - AccessLog string `yaml:"accesslog" json:"accesslog,omitempty" property:"accesslog"` - TpsLimiter string `yaml:"tps.limiter" json:"tps.limiter,omitempty" property:"tps.limiter"` - TpsLimitInterval string `yaml:"tps.limit.interval" json:"tps.limit.interval,omitempty" property:"tps.limit.interval"` - TpsLimitRate string `yaml:"tps.limit.rate" json:"tps.limit.rate,omitempty" property:"tps.limit.rate"` - TpsLimitStrategy string `yaml:"tps.limit.strategy" json:"tps.limit.strategy,omitempty" property:"tps.limit.strategy"` - TpsLimitRejectedHandler string `yaml:"tps.limit.rejected.handler" json:"tps.limit.rejected.handler,omitempty" property:"tps.limit.rejected.handler"` - unexported *atomic.Bool - exported *atomic.Bool - rpcService common.RPCService - cacheProtocol protocol.Protocol - cacheMutex sync.Mutex + context context.Context + id string + Filter string `yaml:"filter" json:"filter,omitempty" property:"filter"` + Protocol string `default:"dubbo" required:"true" yaml:"protocol" json:"protocol,omitempty" property:"protocol"` // multi protocol support, split by ',' + InterfaceName string `required:"true" yaml:"interface" json:"interface,omitempty" property:"interface"` + Registry string `yaml:"registry" json:"registry,omitempty" property:"registry"` + Cluster string `default:"failover" yaml:"cluster" json:"cluster,omitempty" property:"cluster"` + Loadbalance string `default:"random" yaml:"loadbalance" json:"loadbalance,omitempty" property:"loadbalance"` + Group string `yaml:"group" json:"group,omitempty" property:"group"` + Version string `yaml:"version" json:"version,omitempty" property:"version" ` + Methods []*MethodConfig `yaml:"methods" json:"methods,omitempty" property:"methods"` + Warmup string `yaml:"warmup" json:"warmup,omitempty" property:"warmup"` + Retries string `yaml:"retries" json:"retries,omitempty" property:"retries"` + Params map[string]string `yaml:"params" json:"params,omitempty" property:"params"` + Token string `yaml:"token" json:"token,omitempty" property:"token"` + AccessLog string `yaml:"accesslog" json:"accesslog,omitempty" property:"accesslog"` + TpsLimiter string `yaml:"tps.limiter" json:"tps.limiter,omitempty" property:"tps.limiter"` + TpsLimitInterval string `yaml:"tps.limit.interval" json:"tps.limit.interval,omitempty" property:"tps.limit.interval"` + TpsLimitRate string `yaml:"tps.limit.rate" json:"tps.limit.rate,omitempty" property:"tps.limit.rate"` + TpsLimitStrategy string `yaml:"tps.limit.strategy" json:"tps.limit.strategy,omitempty" property:"tps.limit.strategy"` + TpsLimitRejectedHandler string `yaml:"tps.limit.rejected.handler" json:"tps.limit.rejected.handler,omitempty" property:"tps.limit.rejected.handler"` + ExecuteLimit string `yaml:"execute.limit" json:"execute.limit,omitempty" property:"execute.limit"` + ExecuteLimitRejectedHandler string `yaml:"execute.limit.rejected.handler" json:"execute.limit.rejected.handler,omitempty" property:"execute.limit.rejected.handler"` + + unexported *atomic.Bool + exported *atomic.Bool + rpcService common.RPCService + cacheProtocol protocol.Protocol + cacheMutex sync.Mutex } func (c *ServiceConfig) Prefix() string { @@ -203,6 +206,10 @@ func (srvconfig *ServiceConfig) getUrlMap() url.Values { urlMap.Set(constant.TPS_LIMITER_KEY, srvconfig.TpsLimiter) urlMap.Set(constant.TPS_REJECTED_EXECUTION_HANDLER_KEY, srvconfig.TpsLimitRejectedHandler) + // execute limit filter + urlMap.Set(constant.EXECUTE_LIMIT_KEY, srvconfig.ExecuteLimit) + urlMap.Set(constant.EXECUTE_REJECTED_EXECUTION_HANDLER_KEY, srvconfig.ExecuteLimitRejectedHandler) + for _, v := range srvconfig.Methods { prefix := "methods." + v.Name + "." urlMap.Set(prefix+constant.LOADBALANCE_KEY, v.Loadbalance) @@ -212,6 +219,10 @@ func (srvconfig *ServiceConfig) getUrlMap() url.Values { urlMap.Set(prefix+constant.TPS_LIMIT_STRATEGY_KEY, v.TpsLimitStrategy) urlMap.Set(prefix+constant.TPS_LIMIT_INTERVAL_KEY, v.TpsLimitInterval) urlMap.Set(prefix+constant.TPS_LIMIT_RATE_KEY, v.TpsLimitRate) + + urlMap.Set(constant.EXECUTE_LIMIT_KEY, v.ExecuteLimit) + urlMap.Set(constant.EXECUTE_REJECTED_EXECUTION_HANDLER_KEY, v.ExecuteLimitRejectedHandler) + } return urlMap diff --git a/config/testdata/provider_config.yml b/config/testdata/provider_config.yml index 71e45b9c0e17d0a0f21b4afaeabd1a553469c6dc..080feb7dcd1cccd06ae436b2854b2531177d23e3 100644 --- a/config/testdata/provider_config.yml +++ b/config/testdata/provider_config.yml @@ -38,6 +38,11 @@ services: tps.limit.strategy: "slidingWindow" # the name of RejectedExecutionHandler tps.limit.rejected.handler: "default" + # the concurrent request limitation of this service + # if the value < 0, it will not be limited. + execute.limit: "200" + # the name of RejectedExecutionHandler + execute.limit.rejected.handler: "default" protocol : "dubbo" # equivalent to interface of dubbo.xml interface : "com.ikurento.user.UserProvider" @@ -50,6 +55,11 @@ services: - name: "GetUser" retries: 1 loadbalance: "random" + # the concurrent request limitation of this method + # if the value < 0, it will not be limited. + execute.limit: "200" + # the name of RejectedExecutionHandler + execute.limit.rejected.handler: "default" protocols: "dubbo": diff --git a/filter/impl/tps/impl/rejected_execution_handler_mock.go b/filter/common/impl/rejected_execution_handler_mock.go similarity index 100% rename from filter/impl/tps/impl/rejected_execution_handler_mock.go rename to filter/common/impl/rejected_execution_handler_mock.go diff --git a/filter/impl/tps/impl/rejected_execution_handler_only_log.go b/filter/common/impl/rejected_execution_handler_only_log.go similarity index 82% rename from filter/impl/tps/impl/rejected_execution_handler_only_log.go rename to filter/common/impl/rejected_execution_handler_only_log.go index 62d2fc81cbcf0a220613ecaef9a100abbe1dff76..8943433af1ba908fc834740163df78e3b2b6443a 100644 --- a/filter/impl/tps/impl/rejected_execution_handler_only_log.go +++ b/filter/common/impl/rejected_execution_handler_only_log.go @@ -26,15 +26,15 @@ import ( "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/common/logger" - "github.com/apache/dubbo-go/filter/impl/tps" + filterCommon "github.com/apache/dubbo-go/filter/common" "github.com/apache/dubbo-go/protocol" ) const HandlerName = "log" func init() { - extension.SetTpsRejectedExecutionHandler(HandlerName, GetOnlyLogRejectedExecutionHandler) - extension.SetTpsRejectedExecutionHandler(constant.DEFAULT_KEY, GetOnlyLogRejectedExecutionHandler) + extension.SetRejectedExecutionHandler(HandlerName, GetOnlyLogRejectedExecutionHandler) + extension.SetRejectedExecutionHandler(constant.DEFAULT_KEY, GetOnlyLogRejectedExecutionHandler) } var onlyLogHandlerInstance *OnlyLogRejectedExecutionHandler @@ -57,11 +57,11 @@ type OnlyLogRejectedExecutionHandler struct { } func (handler *OnlyLogRejectedExecutionHandler) RejectedExecution(url common.URL, invocation protocol.Invocation) protocol.Result { - logger.Errorf("The invocation was rejected due to over rate limitation. url: %s", url.String()) + logger.Errorf("The invocation was rejected. url: %s", url.String()) return &protocol.RPCResult{} } -func GetOnlyLogRejectedExecutionHandler() tps.RejectedExecutionHandler { +func GetOnlyLogRejectedExecutionHandler() filterCommon.RejectedExecutionHandler { onlyLogHandlerOnce.Do(func() { onlyLogHandlerInstance = &OnlyLogRejectedExecutionHandler{} }) diff --git a/filter/impl/tps/impl/rejected_execution_handler_only_log_test.go b/filter/common/impl/rejected_execution_handler_only_log_test.go similarity index 100% rename from filter/impl/tps/impl/rejected_execution_handler_only_log_test.go rename to filter/common/impl/rejected_execution_handler_only_log_test.go diff --git a/filter/impl/tps/rejected_execution_handler.go b/filter/common/rejected_execution_handler.go similarity index 70% rename from filter/impl/tps/rejected_execution_handler.go rename to filter/common/rejected_execution_handler.go index 827908974e5bd4f9679d0c4c66c70a8259887d5c..b993b8444c14c13ce9a8861c113dc02ca5fd335a 100644 --- a/filter/impl/tps/rejected_execution_handler.go +++ b/filter/common/rejected_execution_handler.go @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package tps +package common import ( "github.com/apache/dubbo-go/common" @@ -22,17 +22,11 @@ import ( ) /** - * This implementation only logs the invocation info. - * it always return en error inside the result. - * "UserProvider": - * registry: "hangzhouzk" - * protocol : "dubbo" - * interface : "com.ikurento.user.UserProvider" - * ... # other configuration - * tps.limiter: "method-service" # the name of limiter - * tps.limit.rejected.handler: "name of handler" - * methods: - * - name: "GetUser" + * If the invocation cannot pass any validation in filter, like ExecuteLimitFilter and TpsLimitFilter, + * the implementation will be used. + * The common case is that sometimes you want to return the default value when the request was rejected. + * Or you want to be warned if any request was rejected. + * In such situation, implement this interface and register it by invoking extension.SetRejectedExecutionHandler. */ type RejectedExecutionHandler interface { RejectedExecution(url common.URL, invocation protocol.Invocation) protocol.Result diff --git a/filter/impl/execute_limit_filter.go b/filter/impl/execute_limit_filter.go new file mode 100644 index 0000000000000000000000000000000000000000..156af1b140283dd76c4867ca26e9b42ce8eb25c0 --- /dev/null +++ b/filter/impl/execute_limit_filter.go @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package impl + +import ( + "strconv" + "sync" + "sync/atomic" +) + +import ( + "github.com/modern-go/concurrent" +) + +import ( + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/common/logger" + "github.com/apache/dubbo-go/filter" + _ "github.com/apache/dubbo-go/filter/common/impl" + "github.com/apache/dubbo-go/protocol" +) + +const name = "execute" + +func init() { + extension.SetFilter(name, GetExecuteLimitFilter) +} + +/** + * The filter will limit the number of in-progress request and it's thread-safe. + * example: + * "UserProvider": + * registry: "hangzhouzk" + * protocol : "dubbo" + * interface : "com.ikurento.user.UserProvider" + * ... # other configuration + * execute.limit: 200 # the name of MethodServiceTpsLimiterImpl. if the value < 0, invocation will be ignored. + * execute.limit.rejected.handle: "default" # the name of rejected handler + * methods: + * - name: "GetUser" + * execute.limit: 20, # in this case, this configuration in service-level will be ignored. + * - name: "UpdateUser" + * execute.limit: -1, # If the rate<0, the method will be ignored + * - name: "DeleteUser" + * execute.limit.rejected.handle: "customHandler" # Using the custom handler to do something when the request was rejected. + * - name: "AddUser" + * From the example, the configuration in service-level is 200, and the configuration of method GetUser is 20. + * it means that, the GetUser will be counted separately. + * The configuration of method UpdateUser is -1, so the invocation for it will not be counted. + * So the method DeleteUser and method AddUser will be limited by service-level configuration. + * Sometimes we want to do something, like log the request or return default value when the request is over limitation. + * Then you can implement the RejectedExecutionHandler interface and register it by invoking SetRejectedExecutionHandler. + */ +type ExecuteLimitFilter struct { + executeState *concurrent.Map +} + +type ExecuteState struct { + concurrentCount int64 +} + +func (ef *ExecuteLimitFilter) Invoke(invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result { + methodConfigPrefix := "methods." + invocation.MethodName() + "." + url := invoker.GetUrl() + limitTarget := url.ServiceKey() + limitRateConfig := constant.DEFAULT_EXECUTE_LIMIT + + methodLevelConfig := url.GetParam(methodConfigPrefix+constant.EXECUTE_LIMIT_KEY, "") + if len(methodLevelConfig) > 0 { + // we have the method-level configuration + limitTarget = limitTarget + "#" + invocation.MethodName() + limitRateConfig = methodLevelConfig + } else { + limitRateConfig = url.GetParam(constant.EXECUTE_LIMIT_KEY, constant.DEFAULT_EXECUTE_LIMIT) + } + + limitRate, err := strconv.ParseInt(limitRateConfig, 0, 0) + if err != nil { + logger.Errorf("The configuration of execute.limit is invalid: %s", limitRateConfig) + return &protocol.RPCResult{} + } + + if limitRate < 0 { + return invoker.Invoke(invocation) + } + + state, _ := ef.executeState.LoadOrStore(limitTarget, &ExecuteState{ + concurrentCount: 0, + }) + + concurrentCount := state.(*ExecuteState).increase() + defer state.(*ExecuteState).decrease() + if concurrentCount > limitRate { + logger.Errorf("The invocation was rejected due to over the execute limitation, url: %s ", url.String()) + rejectedHandlerConfig := url.GetParam(methodConfigPrefix+constant.EXECUTE_REJECTED_EXECUTION_HANDLER_KEY, + url.GetParam(constant.EXECUTE_REJECTED_EXECUTION_HANDLER_KEY, constant.DEFAULT_KEY)) + return extension.GetRejectedExecutionHandler(rejectedHandlerConfig).RejectedExecution(url, invocation) + } + + return invoker.Invoke(invocation) +} + +func (ef *ExecuteLimitFilter) OnResponse(result protocol.Result, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result { + return result +} + +func (state *ExecuteState) increase() int64 { + return atomic.AddInt64(&state.concurrentCount, 1) +} + +func (state *ExecuteState) decrease() { + atomic.AddInt64(&state.concurrentCount, -1) +} + +var executeLimitOnce sync.Once +var executeLimitFilter *ExecuteLimitFilter + +func GetExecuteLimitFilter() filter.Filter { + executeLimitOnce.Do(func() { + executeLimitFilter = &ExecuteLimitFilter{ + executeState: concurrent.NewMap(), + } + }) + return executeLimitFilter +} diff --git a/filter/impl/execute_limit_filter_test.go b/filter/impl/execute_limit_filter_test.go new file mode 100644 index 0000000000000000000000000000000000000000..5d729c0e6a1205902856eccfa6aa96b0bee0e790 --- /dev/null +++ b/filter/impl/execute_limit_filter_test.go @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package impl + +import ( + "net/url" + "testing" +) + +import ( + "github.com/stretchr/testify/assert" +) + +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/protocol" + "github.com/apache/dubbo-go/protocol/invocation" +) + +func TestExecuteLimitFilter_Invoke_Ignored(t *testing.T) { + methodName := "hello" + invoc := invocation.NewRPCInvocation(methodName, []interface{}{"OK"}, make(map[string]string, 0)) + + invokeUrl := common.NewURLWithOptions( + common.WithParams(url.Values{}), + common.WithParamsValue(constant.INTERFACE_KEY, methodName)) + + limitFilter := GetExecuteLimitFilter() + + result := limitFilter.Invoke(protocol.NewBaseInvoker(*invokeUrl), invoc) + assert.NotNil(t, result) + assert.Nil(t, result.Error()) +} + +func TestExecuteLimitFilter_Invoke_Configure_Error(t *testing.T) { + methodName := "hello1" + invoc := invocation.NewRPCInvocation(methodName, []interface{}{"OK"}, make(map[string]string, 0)) + + invokeUrl := common.NewURLWithOptions( + common.WithParams(url.Values{}), + common.WithParamsValue(constant.INTERFACE_KEY, methodName), + common.WithParamsValue(constant.EXECUTE_LIMIT_KEY, "13a"), + ) + + limitFilter := GetExecuteLimitFilter() + + result := limitFilter.Invoke(protocol.NewBaseInvoker(*invokeUrl), invoc) + assert.NotNil(t, result) + assert.Nil(t, result.Error()) +} + +func TestExecuteLimitFilter_Invoke(t *testing.T) { + methodName := "hello1" + invoc := invocation.NewRPCInvocation(methodName, []interface{}{"OK"}, make(map[string]string, 0)) + + invokeUrl := common.NewURLWithOptions( + common.WithParams(url.Values{}), + common.WithParamsValue(constant.INTERFACE_KEY, methodName), + common.WithParamsValue(constant.EXECUTE_LIMIT_KEY, "20"), + ) + + limitFilter := GetExecuteLimitFilter() + + result := limitFilter.Invoke(protocol.NewBaseInvoker(*invokeUrl), invoc) + assert.NotNil(t, result) + assert.Nil(t, result.Error()) +} diff --git a/filter/impl/tps_limit_filter.go b/filter/impl/tps_limit_filter.go index 1e3222c0e86f5e8c108420e91a0816067ed8443e..3cb7381c8616abd61fe2ac306b59694a92715dda 100644 --- a/filter/impl/tps_limit_filter.go +++ b/filter/impl/tps_limit_filter.go @@ -20,7 +20,9 @@ package impl import ( "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/common/logger" "github.com/apache/dubbo-go/filter" + _ "github.com/apache/dubbo-go/filter/common/impl" _ "github.com/apache/dubbo-go/filter/impl/tps/impl" "github.com/apache/dubbo-go/protocol" ) @@ -58,7 +60,8 @@ func (t TpsLimitFilter) Invoke(invoker protocol.Invoker, invocation protocol.Inv if allow { return invoker.Invoke(invocation) } - return extension.GetTpsRejectedExecutionHandler(rejectedExeHandler).RejectedExecution(url, invocation) + logger.Errorf("The invocation was rejected due to over the tps limitation, url: %s ", url.String()) + return extension.GetRejectedExecutionHandler(rejectedExeHandler).RejectedExecution(url, invocation) } return invoker.Invoke(invocation) } diff --git a/filter/impl/tps_limit_filter_test.go b/filter/impl/tps_limit_filter_test.go index 5f5557b07e386e93c641f9d81e3dd9fa1253c41c..debdbd00dec97ed67d789bfc45103993c014ab4a 100644 --- a/filter/impl/tps_limit_filter_test.go +++ b/filter/impl/tps_limit_filter_test.go @@ -31,6 +31,8 @@ import ( "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/extension" + filterCommon "github.com/apache/dubbo-go/filter/common" + filterCommonImpl "github.com/apache/dubbo-go/filter/common/impl" "github.com/apache/dubbo-go/filter/impl/tps" "github.com/apache/dubbo-go/filter/impl/tps/impl" "github.com/apache/dubbo-go/protocol" @@ -82,10 +84,10 @@ func TestGenericFilter_Invoke_With_Default_TpsLimiter_Not_Allow(t *testing.T) { }) mockResult := &protocol.RPCResult{} - mockRejectedHandler := impl.NewMockRejectedExecutionHandler(ctrl) + mockRejectedHandler := filterCommonImpl.NewMockRejectedExecutionHandler(ctrl) mockRejectedHandler.EXPECT().RejectedExecution(gomock.Any(), gomock.Any()).Return(mockResult).Times(1) - extension.SetTpsRejectedExecutionHandler(constant.DEFAULT_KEY, func() tps.RejectedExecutionHandler { + extension.SetRejectedExecutionHandler(constant.DEFAULT_KEY, func() filterCommon.RejectedExecutionHandler { return mockRejectedHandler })