Skip to content
Snippets Groups Projects
Commit c4df1e68 authored by flycash's avatar flycash
Browse files

Add comments for ServiceMethodLimiter

parent 30fafdb3
No related branches found
No related tags found
No related merge requests found
...@@ -115,7 +115,12 @@ type MethodServiceTpsLimiterImpl struct { ...@@ -115,7 +115,12 @@ type MethodServiceTpsLimiterImpl struct {
tpsState *concurrent.Map tpsState *concurrent.Map
} }
// IsAllowable ... // IsAllowable based on method-level and service-level.
// The method-level has high priority which means that if there is any rate limit configuration for the method,
// the service-level rate limit strategy will be ignored.
// The key point is how to keep thread-safe
// This implementation use concurrent map + loadOrStore to make implementation thread-safe
// You can image that even multiple threads create limiter, but only one could store the limiter into tpsState
func (limiter MethodServiceTpsLimiterImpl) IsAllowable(url common.URL, invocation protocol.Invocation) bool { func (limiter MethodServiceTpsLimiterImpl) IsAllowable(url common.URL, invocation protocol.Invocation) bool {
methodConfigPrefix := "methods." + invocation.MethodName() + "." methodConfigPrefix := "methods." + invocation.MethodName() + "."
...@@ -123,23 +128,30 @@ func (limiter MethodServiceTpsLimiterImpl) IsAllowable(url common.URL, invocatio ...@@ -123,23 +128,30 @@ func (limiter MethodServiceTpsLimiterImpl) IsAllowable(url common.URL, invocatio
methodLimitRateConfig := url.GetParam(methodConfigPrefix+constant.TPS_LIMIT_RATE_KEY, "") methodLimitRateConfig := url.GetParam(methodConfigPrefix+constant.TPS_LIMIT_RATE_KEY, "")
methodIntervalConfig := url.GetParam(methodConfigPrefix+constant.TPS_LIMIT_INTERVAL_KEY, "") methodIntervalConfig := url.GetParam(methodConfigPrefix+constant.TPS_LIMIT_INTERVAL_KEY, "")
// service-level tps limit
limitTarget := url.ServiceKey() limitTarget := url.ServiceKey()
// method-level tps limit // method-level tps limit
if len(methodIntervalConfig) > 0 || len(methodLimitRateConfig) > 0 { if len(methodIntervalConfig) > 0 || len(methodLimitRateConfig) > 0 {
// it means that if the method-level rate limit exist, we will use method-level rate limit strategy
limitTarget = limitTarget + "#" + invocation.MethodName() limitTarget = limitTarget + "#" + invocation.MethodName()
} }
// looking up the limiter from 'cache'
limitState, found := limiter.tpsState.Load(limitTarget) limitState, found := limiter.tpsState.Load(limitTarget)
if found { if found {
// the limiter has been cached, we return its result
return limitState.(filter.TpsLimitStrategy).IsAllowable() return limitState.(filter.TpsLimitStrategy).IsAllowable()
} }
// we could not find the limiter, and try to create one.
limitRate := getLimitConfig(methodLimitRateConfig, url, invocation, limitRate := getLimitConfig(methodLimitRateConfig, url, invocation,
constant.TPS_LIMIT_RATE_KEY, constant.TPS_LIMIT_RATE_KEY,
constant.DEFAULT_TPS_LIMIT_RATE) constant.DEFAULT_TPS_LIMIT_RATE)
if limitRate < 0 { if limitRate < 0 {
// the limitTarget is not necessary to be limited.
return true return true
} }
...@@ -150,13 +162,20 @@ func (limiter MethodServiceTpsLimiterImpl) IsAllowable(url common.URL, invocatio ...@@ -150,13 +162,20 @@ func (limiter MethodServiceTpsLimiterImpl) IsAllowable(url common.URL, invocatio
panic(fmt.Sprintf("The interval must be positive, please check your configuration! url: %s", url.String())) panic(fmt.Sprintf("The interval must be positive, please check your configuration! url: %s", url.String()))
} }
// find the strategy config and then create one
limitStrategyConfig := url.GetParam(methodConfigPrefix+constant.TPS_LIMIT_STRATEGY_KEY, limitStrategyConfig := url.GetParam(methodConfigPrefix+constant.TPS_LIMIT_STRATEGY_KEY,
url.GetParam(constant.TPS_LIMIT_STRATEGY_KEY, constant.DEFAULT_KEY)) url.GetParam(constant.TPS_LIMIT_STRATEGY_KEY, constant.DEFAULT_KEY))
limitStateCreator := extension.GetTpsLimitStrategyCreator(limitStrategyConfig) limitStateCreator := extension.GetTpsLimitStrategyCreator(limitStrategyConfig)
// we using loadOrStore to ensure thread-safe
limitState, _ = limiter.tpsState.LoadOrStore(limitTarget, limitStateCreator.Create(int(limitRate), int(limitInterval))) limitState, _ = limiter.tpsState.LoadOrStore(limitTarget, limitStateCreator.Create(int(limitRate), int(limitInterval)))
return limitState.(filter.TpsLimitStrategy).IsAllowable() return limitState.(filter.TpsLimitStrategy).IsAllowable()
} }
// getLimitConfig will try to fetch the configuration from url.
// If we can convert the methodLevelConfig to int64, return;
// Or, we will try to look up server-level configuration and then convert it to int64
func getLimitConfig(methodLevelConfig string, func getLimitConfig(methodLevelConfig string,
url common.URL, url common.URL,
invocation protocol.Invocation, invocation protocol.Invocation,
...@@ -172,6 +191,8 @@ func getLimitConfig(methodLevelConfig string, ...@@ -172,6 +191,8 @@ func getLimitConfig(methodLevelConfig string,
return result return result
} }
// actually there is no method-level configuration, so we use the service-level configuration
result, err := strconv.ParseInt(url.GetParam(configKey, defaultVal), 0, 0) result, err := strconv.ParseInt(url.GetParam(configKey, defaultVal), 0, 0)
if err != nil { if err != nil {
...@@ -183,7 +204,7 @@ func getLimitConfig(methodLevelConfig string, ...@@ -183,7 +204,7 @@ func getLimitConfig(methodLevelConfig string,
var methodServiceTpsLimiterInstance *MethodServiceTpsLimiterImpl var methodServiceTpsLimiterInstance *MethodServiceTpsLimiterImpl
var methodServiceTpsLimiterOnce sync.Once var methodServiceTpsLimiterOnce sync.Once
// GetMethodServiceTpsLimiter ... // GetMethodServiceTpsLimiter will return an MethodServiceTpsLimiterImpl instance.
func GetMethodServiceTpsLimiter() filter.TpsLimiter { func GetMethodServiceTpsLimiter() filter.TpsLimiter {
methodServiceTpsLimiterOnce.Do(func() { methodServiceTpsLimiterOnce.Do(func() {
methodServiceTpsLimiterInstance = &MethodServiceTpsLimiterImpl{ methodServiceTpsLimiterInstance = &MethodServiceTpsLimiterImpl{
......
...@@ -34,5 +34,6 @@ import ( ...@@ -34,5 +34,6 @@ import (
* tps.limiter: "the name of limiter", * tps.limiter: "the name of limiter",
*/ */
type TpsLimiter interface { type TpsLimiter interface {
// IsAllowable will check whether this invocation should be enabled for further process
IsAllowable(common.URL, protocol.Invocation) bool IsAllowable(common.URL, protocol.Invocation) bool
} }
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment