Skip to content
Snippets Groups Projects
Commit ab228d95 authored by Ming Deng's avatar Ming Deng
Browse files

Feature: ExecuteLimit Support

parent 196481a6
No related branches found
No related tags found
No related merge requests found
Showing
with 344 additions and 93 deletions
......@@ -46,7 +46,7 @@ const (
const (
DEFAULT_KEY = "default"
PREFIX_DEFAULT_KEY = "default."
DEFAULT_SERVICE_FILTERS = "echo,token,accesslog,tps"
DEFAULT_SERVICE_FILTERS = "echo,token,accesslog,tps,execute"
DEFAULT_REFERENCE_FILTERS = ""
GENERIC_REFERENCE_FILTERS = "generic"
GENERIC = "$invoke"
......
......@@ -48,26 +48,29 @@ const (
)
const (
TIMESTAMP_KEY = "timestamp"
REMOTE_TIMESTAMP_KEY = "remote.timestamp"
CLUSTER_KEY = "cluster"
LOADBALANCE_KEY = "loadbalance"
WEIGHT_KEY = "weight"
WARMUP_KEY = "warmup"
RETRIES_KEY = "retries"
BEAN_NAME = "bean.name"
FAIL_BACK_TASKS_KEY = "failbacktasks"
FORKS_KEY = "forks"
DEFAULT_FORKS = 2
DEFAULT_TIMEOUT = 1000
ACCESS_LOG_KEY = "accesslog"
TPS_LIMITER_KEY = "tps.limiter"
TPS_REJECTED_EXECUTION_HANDLER_KEY = "tps.limit.rejected.handler"
TPS_LIMIT_RATE_KEY = "tps.limit.rate"
DEFAULT_TPS_LIMIT_RATE = "-1"
TPS_LIMIT_INTERVAL_KEY = "tps.limit.interval"
DEFAULT_TPS_LIMIT_INTERVAL = "60000"
TPS_LIMIT_STRATEGY_KEY = "tps.limit.strategy"
TIMESTAMP_KEY = "timestamp"
REMOTE_TIMESTAMP_KEY = "remote.timestamp"
CLUSTER_KEY = "cluster"
LOADBALANCE_KEY = "loadbalance"
WEIGHT_KEY = "weight"
WARMUP_KEY = "warmup"
RETRIES_KEY = "retries"
BEAN_NAME = "bean.name"
FAIL_BACK_TASKS_KEY = "failbacktasks"
FORKS_KEY = "forks"
DEFAULT_FORKS = 2
DEFAULT_TIMEOUT = 1000
ACCESS_LOG_KEY = "accesslog"
TPS_LIMITER_KEY = "tps.limiter"
TPS_REJECTED_EXECUTION_HANDLER_KEY = "tps.limit.rejected.handler"
TPS_LIMIT_RATE_KEY = "tps.limit.rate"
DEFAULT_TPS_LIMIT_RATE = "-1"
TPS_LIMIT_INTERVAL_KEY = "tps.limit.interval"
DEFAULT_TPS_LIMIT_INTERVAL = "60000"
TPS_LIMIT_STRATEGY_KEY = "tps.limit.strategy"
EXECUTE_LIMIT_KEY = "execute.limit"
DEFAULT_EXECUTE_LIMIT = "-1"
EXECUTE_REJECTED_EXECUTION_HANDLER_KEY = "execute.limit.rejected.handler"
)
const (
......
......@@ -19,10 +19,12 @@ package extension
import (
"github.com/apache/dubbo-go/filter"
"github.com/apache/dubbo-go/filter/common"
)
var (
filters = make(map[string]func() filter.Filter)
filters = make(map[string]func() filter.Filter)
rejectedExecutionHandler = make(map[string]func() common.RejectedExecutionHandler)
)
func SetFilter(name string, v func() filter.Filter) {
......@@ -35,3 +37,16 @@ func GetFilter(name string) filter.Filter {
}
return filters[name]()
}
func SetRejectedExecutionHandler(name string, creator func() common.RejectedExecutionHandler) {
rejectedExecutionHandler[name] = creator
}
func GetRejectedExecutionHandler(name string) common.RejectedExecutionHandler {
creator, ok := rejectedExecutionHandler[name]
if !ok {
panic("RejectedExecutionHandler for " + name + " is not existing, make sure you have import the package " +
"and you have register it by invoking extension.SetRejectedExecutionHandler.")
}
return creator()
}
......@@ -22,9 +22,8 @@ import (
)
var (
tpsLimitStrategy = make(map[string]func(rate int, interval int) tps.TpsLimitStrategy)
tpsLimiter = make(map[string]func() tps.TpsLimiter)
tpsRejectedExecutionHandler = make(map[string]func() tps.RejectedExecutionHandler)
tpsLimitStrategy = make(map[string]func(rate int, interval int) tps.TpsLimitStrategy)
tpsLimiter = make(map[string]func() tps.TpsLimiter)
)
func SetTpsLimiter(name string, creator func() tps.TpsLimiter) {
......@@ -52,16 +51,3 @@ func GetTpsLimitStrategyCreator(name string) func(rate int, interval int) tps.Tp
}
return creator
}
func SetTpsRejectedExecutionHandler(name string, creator func() tps.RejectedExecutionHandler) {
tpsRejectedExecutionHandler[name] = creator
}
func GetTpsRejectedExecutionHandler(name string) tps.RejectedExecutionHandler {
creator, ok := tpsRejectedExecutionHandler[name]
if !ok {
panic("TpsRejectedExecutionHandler for " + name + " is not existing, make sure you have import the package " +
"and you have register it by invoking extension.SetTpsRejectedExecutionHandler.")
}
return creator()
}
......@@ -25,15 +25,17 @@ import (
)
type MethodConfig struct {
InterfaceId string
InterfaceName string
Name string `yaml:"name" json:"name,omitempty" property:"name"`
Retries string `yaml:"retries" json:"retries,omitempty" property:"retries"`
Loadbalance string `yaml:"loadbalance" json:"loadbalance,omitempty" property:"loadbalance"`
Weight int64 `yaml:"weight" json:"weight,omitempty" property:"weight"`
TpsLimitInterval string `yaml:"tps.limit.interval" json:"tps.limit.interval,omitempty" property:"tps.limit.interval"`
TpsLimitRate string `yaml:"tps.limit.rate" json:"tps.limit.rate,omitempty" property:"tps.limit.rate"`
TpsLimitStrategy string `yaml:"tps.limit.strategy" json:"tps.limit.strategy,omitempty" property:"tps.limit.strategy"`
InterfaceId string
InterfaceName string
Name string `yaml:"name" json:"name,omitempty" property:"name"`
Retries string `yaml:"retries" json:"retries,omitempty" property:"retries"`
Loadbalance string `yaml:"loadbalance" json:"loadbalance,omitempty" property:"loadbalance"`
Weight int64 `yaml:"weight" json:"weight,omitempty" property:"weight"`
TpsLimitInterval string `yaml:"tps.limit.interval" json:"tps.limit.interval,omitempty" property:"tps.limit.interval"`
TpsLimitRate string `yaml:"tps.limit.rate" json:"tps.limit.rate,omitempty" property:"tps.limit.rate"`
TpsLimitStrategy string `yaml:"tps.limit.strategy" json:"tps.limit.strategy,omitempty" property:"tps.limit.strategy"`
ExecuteLimit string `yaml:"execute.limit" json:"execute.limit,omitempty" property:"execute.limit"`
ExecuteLimitRejectedHandler string `yaml:"execute.limit.rejected.handler" json:"execute.limit.rejected.handler,omitempty" property:"execute.limit.rejected.handler"`
}
func (c *MethodConfig) Prefix() string {
......
......@@ -43,32 +43,35 @@ import (
)
type ServiceConfig struct {
context context.Context
id string
Filter string `yaml:"filter" json:"filter,omitempty" property:"filter"`
Protocol string `default:"dubbo" required:"true" yaml:"protocol" json:"protocol,omitempty" property:"protocol"` // multi protocol support, split by ','
InterfaceName string `required:"true" yaml:"interface" json:"interface,omitempty" property:"interface"`
Registry string `yaml:"registry" json:"registry,omitempty" property:"registry"`
Cluster string `default:"failover" yaml:"cluster" json:"cluster,omitempty" property:"cluster"`
Loadbalance string `default:"random" yaml:"loadbalance" json:"loadbalance,omitempty" property:"loadbalance"`
Group string `yaml:"group" json:"group,omitempty" property:"group"`
Version string `yaml:"version" json:"version,omitempty" property:"version" `
Methods []*MethodConfig `yaml:"methods" json:"methods,omitempty" property:"methods"`
Warmup string `yaml:"warmup" json:"warmup,omitempty" property:"warmup"`
Retries string `yaml:"retries" json:"retries,omitempty" property:"retries"`
Params map[string]string `yaml:"params" json:"params,omitempty" property:"params"`
Token string `yaml:"token" json:"token,omitempty" property:"token"`
AccessLog string `yaml:"accesslog" json:"accesslog,omitempty" property:"accesslog"`
TpsLimiter string `yaml:"tps.limiter" json:"tps.limiter,omitempty" property:"tps.limiter"`
TpsLimitInterval string `yaml:"tps.limit.interval" json:"tps.limit.interval,omitempty" property:"tps.limit.interval"`
TpsLimitRate string `yaml:"tps.limit.rate" json:"tps.limit.rate,omitempty" property:"tps.limit.rate"`
TpsLimitStrategy string `yaml:"tps.limit.strategy" json:"tps.limit.strategy,omitempty" property:"tps.limit.strategy"`
TpsLimitRejectedHandler string `yaml:"tps.limit.rejected.handler" json:"tps.limit.rejected.handler,omitempty" property:"tps.limit.rejected.handler"`
unexported *atomic.Bool
exported *atomic.Bool
rpcService common.RPCService
cacheProtocol protocol.Protocol
cacheMutex sync.Mutex
context context.Context
id string
Filter string `yaml:"filter" json:"filter,omitempty" property:"filter"`
Protocol string `default:"dubbo" required:"true" yaml:"protocol" json:"protocol,omitempty" property:"protocol"` // multi protocol support, split by ','
InterfaceName string `required:"true" yaml:"interface" json:"interface,omitempty" property:"interface"`
Registry string `yaml:"registry" json:"registry,omitempty" property:"registry"`
Cluster string `default:"failover" yaml:"cluster" json:"cluster,omitempty" property:"cluster"`
Loadbalance string `default:"random" yaml:"loadbalance" json:"loadbalance,omitempty" property:"loadbalance"`
Group string `yaml:"group" json:"group,omitempty" property:"group"`
Version string `yaml:"version" json:"version,omitempty" property:"version" `
Methods []*MethodConfig `yaml:"methods" json:"methods,omitempty" property:"methods"`
Warmup string `yaml:"warmup" json:"warmup,omitempty" property:"warmup"`
Retries string `yaml:"retries" json:"retries,omitempty" property:"retries"`
Params map[string]string `yaml:"params" json:"params,omitempty" property:"params"`
Token string `yaml:"token" json:"token,omitempty" property:"token"`
AccessLog string `yaml:"accesslog" json:"accesslog,omitempty" property:"accesslog"`
TpsLimiter string `yaml:"tps.limiter" json:"tps.limiter,omitempty" property:"tps.limiter"`
TpsLimitInterval string `yaml:"tps.limit.interval" json:"tps.limit.interval,omitempty" property:"tps.limit.interval"`
TpsLimitRate string `yaml:"tps.limit.rate" json:"tps.limit.rate,omitempty" property:"tps.limit.rate"`
TpsLimitStrategy string `yaml:"tps.limit.strategy" json:"tps.limit.strategy,omitempty" property:"tps.limit.strategy"`
TpsLimitRejectedHandler string `yaml:"tps.limit.rejected.handler" json:"tps.limit.rejected.handler,omitempty" property:"tps.limit.rejected.handler"`
ExecuteLimit string `yaml:"execute.limit" json:"execute.limit,omitempty" property:"execute.limit"`
ExecuteLimitRejectedHandler string `yaml:"execute.limit.rejected.handler" json:"execute.limit.rejected.handler,omitempty" property:"execute.limit.rejected.handler"`
unexported *atomic.Bool
exported *atomic.Bool
rpcService common.RPCService
cacheProtocol protocol.Protocol
cacheMutex sync.Mutex
}
func (c *ServiceConfig) Prefix() string {
......@@ -203,6 +206,10 @@ func (srvconfig *ServiceConfig) getUrlMap() url.Values {
urlMap.Set(constant.TPS_LIMITER_KEY, srvconfig.TpsLimiter)
urlMap.Set(constant.TPS_REJECTED_EXECUTION_HANDLER_KEY, srvconfig.TpsLimitRejectedHandler)
// execute limit filter
urlMap.Set(constant.EXECUTE_LIMIT_KEY, srvconfig.ExecuteLimit)
urlMap.Set(constant.EXECUTE_REJECTED_EXECUTION_HANDLER_KEY, srvconfig.ExecuteLimitRejectedHandler)
for _, v := range srvconfig.Methods {
prefix := "methods." + v.Name + "."
urlMap.Set(prefix+constant.LOADBALANCE_KEY, v.Loadbalance)
......@@ -212,6 +219,10 @@ func (srvconfig *ServiceConfig) getUrlMap() url.Values {
urlMap.Set(prefix+constant.TPS_LIMIT_STRATEGY_KEY, v.TpsLimitStrategy)
urlMap.Set(prefix+constant.TPS_LIMIT_INTERVAL_KEY, v.TpsLimitInterval)
urlMap.Set(prefix+constant.TPS_LIMIT_RATE_KEY, v.TpsLimitRate)
urlMap.Set(constant.EXECUTE_LIMIT_KEY, v.ExecuteLimit)
urlMap.Set(constant.EXECUTE_REJECTED_EXECUTION_HANDLER_KEY, v.ExecuteLimitRejectedHandler)
}
return urlMap
......
......@@ -29,6 +29,7 @@ services:
"UserProvider":
registry: "hangzhouzk,shanghaizk"
filter: ""
# the name of limiter
tps.limiter: "default"
# the time unit of interval is ms
......@@ -38,6 +39,13 @@ services:
tps.limit.strategy: "slidingWindow"
# the name of RejectedExecutionHandler
tps.limit.rejected.handler: "default"
# the concurrent request limitation of this service
# if the value < 0, it will not be limited.
execute.limit: "200"
# the name of RejectedExecutionHandler
execute.limit.rejected.handler: "default"
protocol : "dubbo"
# equivalent to interface of dubbo.xml
interface : "com.ikurento.user.UserProvider"
......@@ -50,6 +58,11 @@ services:
- name: "GetUser"
retries: 1
loadbalance: "random"
# the concurrent request limitation of this method
# if the value < 0, it will not be limited.
execute.limit: "200"
# the name of RejectedExecutionHandler
execute.limit.rejected.handler: "default"
protocols:
"dubbo":
......
......@@ -19,6 +19,8 @@ package impl
import (
"sync"
common2 "github.com/apache/dubbo-go/filter/common"
)
import (
......@@ -26,15 +28,14 @@ import (
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/filter/impl/tps"
"github.com/apache/dubbo-go/protocol"
)
const HandlerName = "log"
func init() {
extension.SetTpsRejectedExecutionHandler(HandlerName, GetOnlyLogRejectedExecutionHandler)
extension.SetTpsRejectedExecutionHandler(constant.DEFAULT_KEY, GetOnlyLogRejectedExecutionHandler)
extension.SetRejectedExecutionHandler(HandlerName, GetOnlyLogRejectedExecutionHandler)
extension.SetRejectedExecutionHandler(constant.DEFAULT_KEY, GetOnlyLogRejectedExecutionHandler)
}
var onlyLogHandlerInstance *OnlyLogRejectedExecutionHandler
......@@ -57,11 +58,11 @@ type OnlyLogRejectedExecutionHandler struct {
}
func (handler *OnlyLogRejectedExecutionHandler) RejectedExecution(url common.URL, invocation protocol.Invocation) protocol.Result {
logger.Errorf("The invocation was rejected due to over rate limitation. url: %s", url.String())
logger.Errorf("The invocation was rejected. url: %s", url.String())
return &protocol.RPCResult{}
}
func GetOnlyLogRejectedExecutionHandler() tps.RejectedExecutionHandler {
func GetOnlyLogRejectedExecutionHandler() common2.RejectedExecutionHandler {
onlyLogHandlerOnce.Do(func() {
onlyLogHandlerInstance = &OnlyLogRejectedExecutionHandler{}
})
......
......@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package tps
package common
import (
"github.com/apache/dubbo-go/common"
......@@ -22,17 +22,8 @@ import (
)
/**
* This implementation only logs the invocation info.
* it always return en error inside the result.
* "UserProvider":
* registry: "hangzhouzk"
* protocol : "dubbo"
* interface : "com.ikurento.user.UserProvider"
* ... # other configuration
* tps.limiter: "method-service" # the name of limiter
* tps.limit.rejected.handler: "name of handler"
* methods:
* - name: "GetUser"
* If the invocation cannot pass any validation in filter, like ExecuteLimit and TpsLimit,
* the implementation will be used.
*/
type RejectedExecutionHandler interface {
RejectedExecution(url common.URL, invocation protocol.Invocation) protocol.Result
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package impl
import (
"strconv"
"sync"
"sync/atomic"
)
import (
"github.com/modern-go/concurrent"
)
import (
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/filter"
_ "github.com/apache/dubbo-go/filter/common/impl"
"github.com/apache/dubbo-go/protocol"
)
const name = "execute"
func init() {
extension.SetFilter(name, GetExecuteLimitFilter)
}
/**
* The filter will limit the number of in-progress request and it's thread-safe.
* example:
* "UserProvider":
* registry: "hangzhouzk"
* protocol : "dubbo"
* interface : "com.ikurento.user.UserProvider"
* ... # other configuration
* execute.limit: 200 # the name of MethodServiceTpsLimiterImpl. if the value < 0, invocation will be ignored.
* execute.limit.rejected.handle: "default" # the name of rejected handler
* methods:
* - name: "GetUser"
* execute.limit: 20, # in this case, this configuration in service-level will be ignored.
* - name: "UpdateUser"
* execute.limit: -1, # If the rate<0, the method will be ignored
* - name: "DeleteUser"
* execute.limit.rejected.handle: "customHandler"
* - name: "AddUser"
* From the example, the configuration in service-level is 200, and the configuration of method GetUser is 20.
* it means that, the GetUser will be counted separately.
* The configuration of method UpdateUser is -1, so the invocation for it will not be counted.
* So the method DeleteUser and method AddUser will be limited by service-level configuration.
*/
type ExecuteLimitFilter struct {
executeState *concurrent.Map
}
type ExecuteState struct {
concurrentCount int64
}
func (ef *ExecuteLimitFilter) Invoke(invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
methodConfigPrefix := "methods." + invocation.MethodName() + "."
url := invoker.GetUrl()
limitTarget := url.ServiceKey()
limitRateConfig := constant.DEFAULT_EXECUTE_LIMIT
methodLevelConfig := url.GetParam(methodConfigPrefix+constant.EXECUTE_LIMIT_KEY, "")
if len(methodLevelConfig) > 0 {
// we have the method-level configuration
limitTarget = limitTarget + "#" + invocation.MethodName()
limitRateConfig = methodLevelConfig
} else {
limitRateConfig = url.GetParam(constant.EXECUTE_LIMIT_KEY, constant.DEFAULT_EXECUTE_LIMIT)
}
limitRate, err := strconv.ParseInt(limitRateConfig, 0, 0)
if err != nil {
logger.Error("The configuration of execute.limit is invalid: %s", limitRateConfig)
return &protocol.RPCResult{}
}
if limitRate < 0 {
return invoker.Invoke(invocation)
}
state, loaded := ef.executeState.LoadOrStore(limitTarget, &ExecuteState{
concurrentCount: 0,
})
concurrentCount := state.(*ExecuteState).increase()
defer state.(*ExecuteState).decrease()
logger.Debugf("The execution count is %d, loaded: %t, target: %s", concurrentCount, loaded, limitTarget)
if concurrentCount > limitRate {
logger.Errorf("The invocation was rejected due to over the execute limitation, url: %s ", url.String())
rejectedHandlerConfig := url.GetParam(methodConfigPrefix+constant.EXECUTE_REJECTED_EXECUTION_HANDLER_KEY,
url.GetParam(constant.EXECUTE_REJECTED_EXECUTION_HANDLER_KEY, constant.DEFAULT_KEY))
return extension.GetRejectedExecutionHandler(rejectedHandlerConfig).RejectedExecution(url, invocation)
}
return invoker.Invoke(invocation)
}
func (ef *ExecuteLimitFilter) OnResponse(result protocol.Result, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
return result
}
func (state *ExecuteState) increase() int64 {
return atomic.AddInt64(&state.concurrentCount, 1)
}
func (state *ExecuteState) decrease() {
atomic.AddInt64(&state.concurrentCount, -1)
}
var executeLimitOnce sync.Once
var executeLimitFilter *ExecuteLimitFilter
func GetExecuteLimitFilter() filter.Filter {
executeLimitOnce.Do(func() {
executeLimitFilter = &ExecuteLimitFilter{
executeState: concurrent.NewMap(),
}
})
return executeLimitFilter
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package impl
import (
"net/url"
"testing"
)
import (
"github.com/stretchr/testify/assert"
)
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/protocol"
"github.com/apache/dubbo-go/protocol/invocation"
)
func TestExecuteLimitFilter_Invoke_Ignored(t *testing.T) {
methodName := "hello"
invoc := invocation.NewRPCInvocation(methodName, []interface{}{"OK"}, make(map[string]string, 0))
invokeUrl := common.NewURLWithOptions(
common.WithParams(url.Values{}),
common.WithParamsValue(constant.INTERFACE_KEY, methodName))
limitFilter := GetExecuteLimitFilter()
result := limitFilter.Invoke(protocol.NewBaseInvoker(*invokeUrl), invoc)
assert.NotNil(t, result)
assert.Nil(t, result.Error())
}
func TestExecuteLimitFilter_Invoke_Configure_Error(t *testing.T) {
methodName := "hello1"
invoc := invocation.NewRPCInvocation(methodName, []interface{}{"OK"}, make(map[string]string, 0))
invokeUrl := common.NewURLWithOptions(
common.WithParams(url.Values{}),
common.WithParamsValue(constant.INTERFACE_KEY, methodName),
common.WithParamsValue(constant.EXECUTE_LIMIT_KEY, "13a"),
)
limitFilter := GetExecuteLimitFilter()
result := limitFilter.Invoke(protocol.NewBaseInvoker(*invokeUrl), invoc)
assert.NotNil(t, result)
assert.NotNil(t, result.Error())
}
func TestExecuteLimitFilter_Invoke(t *testing.T) {
methodName := "hello1"
invoc := invocation.NewRPCInvocation(methodName, []interface{}{"OK"}, make(map[string]string, 0))
invokeUrl := common.NewURLWithOptions(
common.WithParams(url.Values{}),
common.WithParamsValue(constant.INTERFACE_KEY, methodName),
common.WithParamsValue(constant.EXECUTE_LIMIT_KEY, "20"),
)
limitFilter := GetExecuteLimitFilter()
result := limitFilter.Invoke(protocol.NewBaseInvoker(*invokeUrl), invoc)
assert.NotNil(t, result)
assert.Nil(t, result.Error())
}
......@@ -20,7 +20,9 @@ package impl
import (
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/filter"
_ "github.com/apache/dubbo-go/filter/common/impl"
_ "github.com/apache/dubbo-go/filter/impl/tps/impl"
"github.com/apache/dubbo-go/protocol"
)
......@@ -58,7 +60,8 @@ func (t TpsLimitFilter) Invoke(invoker protocol.Invoker, invocation protocol.Inv
if allow {
return invoker.Invoke(invocation)
}
return extension.GetTpsRejectedExecutionHandler(rejectedExeHandler).RejectedExecution(url, invocation)
logger.Errorf("The invocation was rejected due to over the tps limitation, url: %s ", url.String())
return extension.GetRejectedExecutionHandler(rejectedExeHandler).RejectedExecution(url, invocation)
}
return invoker.Invoke(invocation)
}
......
......@@ -20,6 +20,9 @@ package impl
import (
"net/url"
"testing"
common2 "github.com/apache/dubbo-go/filter/common"
impl2 "github.com/apache/dubbo-go/filter/common/impl"
)
import (
......@@ -82,10 +85,10 @@ func TestGenericFilter_Invoke_With_Default_TpsLimiter_Not_Allow(t *testing.T) {
})
mockResult := &protocol.RPCResult{}
mockRejectedHandler := impl.NewMockRejectedExecutionHandler(ctrl)
mockRejectedHandler := impl2.NewMockRejectedExecutionHandler(ctrl)
mockRejectedHandler.EXPECT().RejectedExecution(gomock.Any(), gomock.Any()).Return(mockResult).Times(1)
extension.SetTpsRejectedExecutionHandler(constant.DEFAULT_KEY, func() tps.RejectedExecutionHandler {
extension.SetRejectedExecutionHandler(constant.DEFAULT_KEY, func() common2.RejectedExecutionHandler {
return mockRejectedHandler
})
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment