diff --git a/common/extension/tps_limit.go b/common/extension/tps_limit.go index 34272018316bc9312e5af54a1a7d18a9d8bf46e7..70c71cbc4b34240c873eaa1d22651665dcb24e6e 100644 --- a/common/extension/tps_limit.go +++ b/common/extension/tps_limit.go @@ -20,8 +20,8 @@ package extension import "github.com/apache/dubbo-go/filter" var ( - tpsLimitStrategy = make(map[string]func(rate int, interval int) filter.TpsLimitStrategy) - tpsLimiter = make(map[string]func() filter.TpsLimiter) + tpsLimitStrategy = make(map[string]func(rate int, interval int) filter.TpsLimitStrategy) + tpsLimiter = make(map[string]func() filter.TpsLimiter) tpsRejectedExecutionHandler = make(map[string]func() filter.RejectedExecutionHandler) ) @@ -51,7 +51,7 @@ func GetTpsLimitStrategyCreator(name string) func(rate int, interval int) filter return creator } -func SetTpsRejectedExecutionHandler(name string, creator func() filter.RejectedExecutionHandler) { +func SetTpsRejectedExecutionHandler(name string, creator func() filter.RejectedExecutionHandler) { tpsRejectedExecutionHandler[name] = creator } @@ -62,4 +62,4 @@ func GetTpsRejectedExecutionHandler(name string) filter.RejectedExecutionHandler "and you have register it by invoking extension.SetTpsRejectedExecutionHandler.") } return creator() -} \ No newline at end of file +} diff --git a/config/service_config.go b/config/service_config.go index 784257b8c7262372e99e14fbe244d995d8136a67..fb65567a4b4163730a82fb99e7c309d7b35d3cf0 100644 --- a/config/service_config.go +++ b/config/service_config.go @@ -209,9 +209,9 @@ func (srvconfig *ServiceConfig) getUrlMap() url.Values { urlMap.Set(prefix+constant.RETRIES_KEY, v.Retries) urlMap.Set(prefix+constant.WEIGHT_KEY, strconv.FormatInt(v.Weight, 10)) - urlMap.Set(prefix+constant.TPS_LIMIT_STRATEGY_KEY, srvconfig.TpsLimitStrategy) - urlMap.Set(prefix+constant.TPS_LIMIT_INTERVAL_KEY, srvconfig.TpsLimitInterval) - urlMap.Set(prefix+constant.TPS_LIMIT_RATE_KEY, srvconfig.TpsLimitRate) + urlMap.Set(prefix+constant.TPS_LIMIT_STRATEGY_KEY, v.TpsLimitStrategy) + urlMap.Set(prefix+constant.TPS_LIMIT_INTERVAL_KEY, v.TpsLimitInterval) + urlMap.Set(prefix+constant.TPS_LIMIT_RATE_KEY, v.TpsLimitRate) } return urlMap diff --git a/filter/impl/only_log_rejected_execution_handler.go b/filter/impl/rejected_execution_handler_only_log_impl.go similarity index 89% rename from filter/impl/only_log_rejected_execution_handler.go rename to filter/impl/rejected_execution_handler_only_log_impl.go index 8ce4a445937d47e1dc34da8f4013da8e5c8f9b6c..1ea44efb4cc129be35b84c57f3b7b87ac91f1a7b 100644 --- a/filter/impl/only_log_rejected_execution_handler.go +++ b/filter/impl/rejected_execution_handler_only_log_impl.go @@ -19,9 +19,13 @@ package impl import ( "sync" +) +import ( "github.com/prometheus/common/log" +) +import ( "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/extension" @@ -29,7 +33,7 @@ import ( "github.com/apache/dubbo-go/protocol" ) -const HandlerName = "log" +const HandlerName = "log" func init() { extension.SetTpsRejectedExecutionHandler(HandlerName, GetOnlyRejectedExecutionHandler) @@ -41,15 +45,14 @@ var onlyLogHandlerOnce sync.Once /** * This implementation only logs the invocation info. - * it always return nil. + * it always return en error inside the result. */ type OnlyLogRejectedExecutionHandler struct { - } func (handler *OnlyLogRejectedExecutionHandler) RejectedExecution(url common.URL, invocation protocol.Invocation) protocol.Result { - log.Warnf("The invocation was rejected. url: %s", url.String()) - return nil + log.Errorf("The invocation was rejected due to over rate limitation. url: %s", url.String()) + return &protocol.RPCResult{} } func GetOnlyRejectedExecutionHandler() filter.RejectedExecutionHandler { @@ -58,4 +61,3 @@ func GetOnlyRejectedExecutionHandler() filter.RejectedExecutionHandler { }) return onlyLogHandlerInstance } - diff --git a/filter/impl/tps_limit_filter.go b/filter/impl/tps_limit_filter.go index ae900a33a34cd99cee918f9aa331a34a52a608ad..fc3a5a8a3e3674d4e0d38600d99c0e68d7ebd70b 100644 --- a/filter/impl/tps_limit_filter.go +++ b/filter/impl/tps_limit_filter.go @@ -46,10 +46,9 @@ func init() { * if the value of 'tps.limiter' is nil or empty string, the tps filter will do nothing */ type TpsLimitFilter struct { - } -func (t TpsLimitFilter) Invoke(invoker protocol.Invoker,invocation protocol.Invocation) protocol.Result { +func (t TpsLimitFilter) Invoke(invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result { url := invoker.GetUrl() tpsLimiter := url.GetParam(constant.TPS_LIMITER_KEY, "") rejectedExeHandler := url.GetParam(constant.TPS_REJECTED_EXECUTION_HANDLER_KEY, constant.DEFAULT_KEY) diff --git a/filter/impl/tps_limit_filter_test.go b/filter/impl/tps_limit_filter_test.go index 236f42c95c45929f9a47d0e57b7d54879e43693a..45ce2a5e6d5c90444ea2aff061e5d08fd0da7fd6 100644 --- a/filter/impl/tps_limit_filter_test.go +++ b/filter/impl/tps_limit_filter_test.go @@ -20,10 +20,14 @@ package impl import ( "net/url" "testing" +) +import ( "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" +) +import ( "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/extension" @@ -67,7 +71,7 @@ func TestGenericFilter_Invoke_With_Default_TpsLimiter(t *testing.T) { assert.Nil(t, result.Result()) } -func TestGenericFilter_Invoke_With_Default_TpsLimiter_Not_Allow(t *testing.T) { +func TestGenericFilter_Invoke_With_Default_TpsLimiter_Not_Allow(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() mockLimiter := filter.NewMockTpsLimiter(ctrl) diff --git a/filter/impl/tps_limit_fix_window_strategy_impl.go b/filter/impl/tps_limit_fix_window_strategy_impl.go new file mode 100644 index 0000000000000000000000000000000000000000..04e79b9b4dbd8ef3ab5dd4959a9311996cd8cf2a --- /dev/null +++ b/filter/impl/tps_limit_fix_window_strategy_impl.go @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package impl + +import ( + "sync/atomic" + "time" +) +import ( + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/filter" +) + +const ( + FixedWindowKey = "fixedWindow" +) + +func init() { + extension.SetTpsLimitStrategy(FixedWindowKey, NewFixedWindowTpsLimitStrategyImpl) + extension.SetTpsLimitStrategy(constant.DEFAULT_KEY, NewFixedWindowTpsLimitStrategyImpl) +} + +/** + * It's the same as default implementation in Java + * It's not a thread-safe implementation. + * It you want to use the thread-safe implementation, please use ThreadSafeFixedWindowTpsLimitStrategyImpl + * This is the default implementation. + * + * "UserProvider": + * registry: "hangzhouzk" + * protocol : "dubbo" + * interface : "com.ikurento.user.UserProvider" + * ... # other configuration + * tps.limiter: "method-service" # the name of limiter + * tps.limit.strategy: "default" or "fixedWindow" # service-level + * methods: + * - name: "GetUser" + * tps.interval: 3000 + * tps.limit.strategy: "default" or "fixedWindow" # method-level + */ +type FixedWindowTpsLimitStrategyImpl struct { + rate int32 + interval int64 + count int32 + timestamp int64 +} + +func (impl *FixedWindowTpsLimitStrategyImpl) IsAllowable() bool { + + current := time.Now().UnixNano() + if impl.timestamp+impl.interval < current { + // it's a new window + // if a lot of threads come here, the count will be set to 0 several times. + // so the return statement will be wrong. + impl.timestamp = current + impl.count = 0 + } + // this operation is thread-safe, but count + 1 may be overflow + return atomic.AddInt32(&impl.count, 1) <= impl.rate +} + +func NewFixedWindowTpsLimitStrategyImpl(rate int, interval int) filter.TpsLimitStrategy { + return &FixedWindowTpsLimitStrategyImpl{ + rate: int32(rate), + interval: int64(interval * 1000), // convert to ns + count: 0, + timestamp: time.Now().UnixNano(), + } +} diff --git a/filter/impl/tps_limit_fix_window_strategy_impl_test.go b/filter/impl/tps_limit_fix_window_strategy_impl_test.go new file mode 100644 index 0000000000000000000000000000000000000000..a78d0ec0709c470e26ba4194a5c7a5548cd2178b --- /dev/null +++ b/filter/impl/tps_limit_fix_window_strategy_impl_test.go @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package impl + +import ( + "testing" + "time" +) + +import ( + "github.com/coreos/etcd/pkg/testutil" +) + +func TestFixedWindowTpsLimitStrategyImpl_IsAllowable(t *testing.T) { + strategy := NewFixedWindowTpsLimitStrategyImpl(2, 60000) + testutil.AssertTrue(t, strategy.IsAllowable()) + testutil.AssertTrue(t, strategy.IsAllowable()) + testutil.AssertFalse(t, strategy.IsAllowable()) + + strategy = NewFixedWindowTpsLimitStrategyImpl(2, 2000) + testutil.AssertTrue(t, strategy.IsAllowable()) + testutil.AssertTrue(t, strategy.IsAllowable()) + testutil.AssertFalse(t, strategy.IsAllowable()) + time.Sleep(time.Duration(2100 * 1000)) + testutil.AssertTrue(t, strategy.IsAllowable()) + testutil.AssertTrue(t, strategy.IsAllowable()) + testutil.AssertFalse(t, strategy.IsAllowable()) +} diff --git a/filter/impl/tps_limit_sliding_window_strategy_impl.go b/filter/impl/tps_limit_sliding_window_strategy_impl.go new file mode 100644 index 0000000000000000000000000000000000000000..726ac197a8957f05cf88ac499179a9995c6c5cd1 --- /dev/null +++ b/filter/impl/tps_limit_sliding_window_strategy_impl.go @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package impl + +import ( + "container/list" + "sync" + "time" +) + +import ( + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/filter" +) + +func init() { + extension.SetTpsLimitStrategy("slidingWindow", NewSlidingWindowTpsLimitStrategyImpl) +} + +/** + * it's thread-safe. + * "UserProvider": + * registry: "hangzhouzk" + * protocol : "dubbo" + * interface : "com.ikurento.user.UserProvider" + * ... # other configuration + * tps.limiter: "method-service" # the name of limiter + * tps.limit.strategy: "slidingWindow" # service-level + * methods: + * - name: "GetUser" + * tps.interval: 3000 + * tps.limit.strategy: "slidingWindow" # method-level + */ +type SlidingWindowTpsLimitStrategyImpl struct { + rate int + interval int64 + mutex *sync.Mutex + queue *list.List +} + +func (impl *SlidingWindowTpsLimitStrategyImpl) IsAllowable() bool { + impl.mutex.Lock() + defer impl.mutex.Unlock() + // quick path + size := impl.queue.Len() + current := time.Now().UnixNano() + if size < impl.rate { + impl.queue.PushBack(current) + return true + } + + // slow path + boundary := current - impl.interval + + timestamp := impl.queue.Front() + // remove the element that out of the window + for timestamp != nil && timestamp.Value.(int64) < boundary { + impl.queue.Remove(timestamp) + timestamp = impl.queue.Front() + } + if impl.queue.Len() < impl.rate { + impl.queue.PushBack(current) + return true + } + return false +} + +func NewSlidingWindowTpsLimitStrategyImpl(rate int, interval int) filter.TpsLimitStrategy { + return &SlidingWindowTpsLimitStrategyImpl{ + rate: rate, + interval: int64(interval * 1000), + mutex: &sync.Mutex{}, + queue: list.New(), + } +} diff --git a/filter/impl/tps_limit_sliding_window_strategy_impl_test.go b/filter/impl/tps_limit_sliding_window_strategy_impl_test.go new file mode 100644 index 0000000000000000000000000000000000000000..26a9400cdf924bdbc791a4f7f201f4b6503bdbba --- /dev/null +++ b/filter/impl/tps_limit_sliding_window_strategy_impl_test.go @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package impl + +import ( + "testing" + "time" +) + +import ( + "go.etcd.io/etcd/pkg/testutil" +) + +func TestSlidingWindowTpsLimitStrategyImpl_IsAllowable(t *testing.T) { + strategy := NewSlidingWindowTpsLimitStrategyImpl(2, 60000) + testutil.AssertTrue(t, strategy.IsAllowable()) + testutil.AssertTrue(t, strategy.IsAllowable()) + testutil.AssertFalse(t, strategy.IsAllowable()) + time.Sleep(time.Duration(2100 * 1000)) + testutil.AssertFalse(t, strategy.IsAllowable()) + + strategy = NewSlidingWindowTpsLimitStrategyImpl(2, 2000) + testutil.AssertTrue(t, strategy.IsAllowable()) + testutil.AssertTrue(t, strategy.IsAllowable()) + testutil.AssertFalse(t, strategy.IsAllowable()) + time.Sleep(time.Duration(2100 * 1000)) + testutil.AssertTrue(t, strategy.IsAllowable()) + testutil.AssertTrue(t, strategy.IsAllowable()) + testutil.AssertFalse(t, strategy.IsAllowable()) +} diff --git a/filter/impl/tps_limit_thread_safe_fix_window_strategy_impl.go b/filter/impl/tps_limit_thread_safe_fix_window_strategy_impl.go new file mode 100644 index 0000000000000000000000000000000000000000..e7d592c0bf6a9fe504b8a79185c718b4c1e1fd0e --- /dev/null +++ b/filter/impl/tps_limit_thread_safe_fix_window_strategy_impl.go @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package impl + +import ( + "sync" +) + +import ( + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/filter" +) + +func init() { + extension.SetTpsLimitStrategy("threadSafeFixedWindow", NewThreadSafeFixedWindowTpsLimitStrategyImpl) +} + +/** + * it's the thread-safe implementation. + * Also, it's a thread-safe decorator of FixedWindowTpsLimitStrategyImpl + * "UserProvider": + * registry: "hangzhouzk" + * protocol : "dubbo" + * interface : "com.ikurento.user.UserProvider" + * ... # other configuration + * tps.limiter: "method-service" # the name of limiter + * tps.limit.strategy: "threadSafeFixedWindow" # service-level + * methods: + * - name: "GetUser" + * tps.interval: 3000 + * tps.limit.strategy: "threadSafeFixedWindow" # method-level + */ +type ThreadSafeFixedWindowTpsLimitStrategyImpl struct { + mutex *sync.Mutex + fixedWindow *FixedWindowTpsLimitStrategyImpl +} + +func (impl *ThreadSafeFixedWindowTpsLimitStrategyImpl) IsAllowable() bool { + impl.mutex.Lock() + defer impl.mutex.Unlock() + return impl.fixedWindow.IsAllowable() +} + +func NewThreadSafeFixedWindowTpsLimitStrategyImpl(rate int, interval int) filter.TpsLimitStrategy { + fixedWindowStrategy := NewFixedWindowTpsLimitStrategyImpl(rate, interval).(*FixedWindowTpsLimitStrategyImpl) + return &ThreadSafeFixedWindowTpsLimitStrategyImpl{ + fixedWindow: fixedWindowStrategy, + mutex: &sync.Mutex{}, + } +} diff --git a/filter/impl/tps_limit_thread_safe_fix_window_strategy_impl_test.go b/filter/impl/tps_limit_thread_safe_fix_window_strategy_impl_test.go new file mode 100644 index 0000000000000000000000000000000000000000..e41dd62f2c4b1c381ce9dc38f6434ced1d987893 --- /dev/null +++ b/filter/impl/tps_limit_thread_safe_fix_window_strategy_impl_test.go @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package impl + +import ( + "testing" + "time" +) + +import ( + "github.com/coreos/etcd/pkg/testutil" +) + +func TestThreadSafeFixedWindowTpsLimitStrategyImpl_IsAllowable(t *testing.T) { + strategy := NewThreadSafeFixedWindowTpsLimitStrategyImpl(2, 60000) + testutil.AssertTrue(t, strategy.IsAllowable()) + testutil.AssertTrue(t, strategy.IsAllowable()) + testutil.AssertFalse(t, strategy.IsAllowable()) + + strategy = NewThreadSafeFixedWindowTpsLimitStrategyImpl(2, 2000) + testutil.AssertTrue(t, strategy.IsAllowable()) + testutil.AssertTrue(t, strategy.IsAllowable()) + testutil.AssertFalse(t, strategy.IsAllowable()) + time.Sleep(time.Duration(2100 * 1000)) + testutil.AssertTrue(t, strategy.IsAllowable()) + testutil.AssertTrue(t, strategy.IsAllowable()) + testutil.AssertFalse(t, strategy.IsAllowable()) +} diff --git a/filter/impl/tps_limiter_method_service_impl.go b/filter/impl/tps_limiter_method_service_impl.go index 2a30c30b143c61062f66ac61e2079a3d8713f0dd..23cd64f5c4bbd1ad27d5116b4e23eea6cbbfef54 100644 --- a/filter/impl/tps_limiter_method_service_impl.go +++ b/filter/impl/tps_limiter_method_service_impl.go @@ -20,9 +20,13 @@ import ( "fmt" "strconv" "sync" +) +import ( "github.com/modern-go/concurrent" +) +import ( "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/extension" @@ -45,15 +49,63 @@ func init() { * protocol : "dubbo" * interface : "com.ikurento.user.UserProvider" * ... # other configuration - * tps.limiter: "method-service" # the name of MethodServiceTpsLimiterImpl. It's the default limiter too. + * tps.limiter: "method-service" or "default" # the name of MethodServiceTpsLimiterImpl. It's the default limiter too. * tps.limit.interval: 5000 # interval, the time unit is ms * tps.limit.rate: 300 # the max value in the interval. <0 means that the service will not be limited. * methods: * - name: "GetUser" * tps.interval: 3000 - * tps.rate: 20, # in this case, this configuration in service-level will be ignored. + * tps.limit.rate: 20, # in this case, this configuration in service-level will be ignored. + * - name: "UpdateUser" + * tps.limit.rate: -1, # If the rate<0, the method will be ignored + * + * + * More examples: + * case1: + * "UserProvider": + * registry: "hangzhouzk" + * protocol : "dubbo" + * interface : "com.ikurento.user.UserProvider" + * ... # other configuration + * tps.limiter: "method-service" or "default" # the name of MethodServiceTpsLimiterImpl. It's the default limiter too. + * tps.limit.interval: 5000 # interval, the time unit is ms + * tps.limit.rate: 300 # the max value in the interval. <0 means that the service will not be limited. + * methods: + * - name: "GetUser" + * - name: "UpdateUser" + * tps.limit.rate: -1, + * in this case, the method UpdateUser will be ignored, + * which means that only GetUser will be limited by service-level configuration. + * + * case2: + * "UserProvider": + * registry: "hangzhouzk" + * protocol : "dubbo" + * interface : "com.ikurento.user.UserProvider" + * ... # other configuration + * tps.limiter: "method-service" or "default" # the name of MethodServiceTpsLimiterImpl. It's the default limiter too. + * tps.limit.interval: 5000 # interval, the time unit is ms + * tps.limit.rate: 300 # the max value in the interval. <0 means that the service will not be limited. + * methods: + * - name: "GetUser" + * - name: "UpdateUser" + * tps.limit.rate: 30, + * In this case, the GetUser will be limited by service-level configuration(300 times in 5000ms), + * but UpdateUser will be limited by its configuration (30 times in 60000ms) + * + * case3: + * "UserProvider": + * registry: "hangzhouzk" + * protocol : "dubbo" + * interface : "com.ikurento.user.UserProvider" + * ... # other configuration + * tps.limiter: "method-service" or "default" # the name of MethodServiceTpsLimiterImpl. It's the default limiter too. + * methods: + * - name: "GetUser" * - name: "UpdateUser" - * tps.rate: -1, # If the rate<0, the method will be ignored + * tps.limit.rate: 70, + * tps.limit.interval: 40000 + * In this case, only UpdateUser will be limited by its configuration (70 times in 40000ms) */ type MethodServiceTpsLimiterImpl struct { tpsState *concurrent.Map @@ -89,7 +141,7 @@ func (limiter MethodServiceTpsLimiterImpl) IsAllowable(url common.URL, invocatio limitInterval := getLimitConfig(methodIntervalConfig, url, invocation, constant.TPS_LIMIT_INTERVAL_KEY, constant.DEFAULT_TPS_LIMIT_INTERVAL) - if limitInterval <= 0{ + if limitInterval <= 0 { panic(fmt.Sprintf("The interval must be positive, please check your configuration! url: %s", url.String())) } diff --git a/filter/impl/tps_limiter_method_service_impl_test.go b/filter/impl/tps_limiter_method_service_impl_test.go new file mode 100644 index 0000000000000000000000000000000000000000..9194a7488c55cb11d0d2a1e31fc7b1835f3bf6c1 --- /dev/null +++ b/filter/impl/tps_limiter_method_service_impl_test.go @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package impl + +import ( + "net/url" + "testing" +) +import ( + "github.com/coreos/etcd/pkg/testutil" + "github.com/golang/mock/gomock" +) + +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/filter" + "github.com/apache/dubbo-go/protocol/invocation" +) + +const methodName = "hello" + +var methodConfigPrefix = "methods." + methodName + "." +var invoc = invocation.NewRPCInvocation(methodName, []interface{}{"OK"}, make(map[string]string, 0)) + +func TestMethodServiceTpsLimiterImpl_IsAllowable_Only_Service_Level(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + invokeUrl := common.NewURLWithOptions( + common.WithParams(url.Values{}), + common.WithParamsValue(constant.TPS_LIMIT_RATE_KEY, "20")) + + mockStrategyImpl := filter.NewMockTpsLimitStrategy(ctrl) + mockStrategyImpl.EXPECT().IsAllowable().Return(true).Times(1) + extension.SetTpsLimitStrategy(constant.DEFAULT_KEY, func(rate int, interval int) filter.TpsLimitStrategy { + testutil.AssertEqual(t, 20, rate) + testutil.AssertEqual(t, 60000, interval) + return mockStrategyImpl + }) + + limiter := GetMethodServiceTpsLimiter() + result := limiter.IsAllowable(*invokeUrl, invoc) + testutil.AssertTrue(t, result) +} + +func TestMethodServiceTpsLimiterImpl_IsAllowable_No_Config(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + invokeUrl := common.NewURLWithOptions( + common.WithParams(url.Values{}), + common.WithParamsValue(constant.TPS_LIMIT_RATE_KEY, "")) + + limiter := GetMethodServiceTpsLimiter() + result := limiter.IsAllowable(*invokeUrl, invoc) + testutil.AssertTrue(t, result) +} + +func TestMethodServiceTpsLimiterImpl_IsAllowable_Method_Level_Override(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + invokeUrl := common.NewURLWithOptions( + common.WithParams(url.Values{}), + common.WithParamsValue(constant.TPS_LIMIT_RATE_KEY, "20"), + common.WithParamsValue(constant.TPS_LIMIT_INTERVAL_KEY, "3000"), + common.WithParamsValue(constant.TPS_LIMIT_STRATEGY_KEY, "invalid"), + common.WithParamsValue(methodConfigPrefix+constant.TPS_LIMIT_RATE_KEY, "40"), + common.WithParamsValue(methodConfigPrefix+constant.TPS_LIMIT_INTERVAL_KEY, "7000"), + common.WithParamsValue(methodConfigPrefix+constant.TPS_LIMIT_STRATEGY_KEY, "default"), + ) + + mockStrategyImpl := filter.NewMockTpsLimitStrategy(ctrl) + mockStrategyImpl.EXPECT().IsAllowable().Return(true).Times(1) + extension.SetTpsLimitStrategy(constant.DEFAULT_KEY, func(rate int, interval int) filter.TpsLimitStrategy { + testutil.AssertEqual(t, 40, rate) + testutil.AssertEqual(t, 7000, interval) + return mockStrategyImpl + }) + + limiter := GetMethodServiceTpsLimiter() + result := limiter.IsAllowable(*invokeUrl, invoc) + testutil.AssertTrue(t, result) +} + +func TestMethodServiceTpsLimiterImpl_IsAllowable_Both_Method_And_Service(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + invokeUrl := common.NewURLWithOptions( + common.WithParams(url.Values{}), + common.WithParamsValue(constant.TPS_LIMIT_RATE_KEY, "20"), + common.WithParamsValue(constant.TPS_LIMIT_INTERVAL_KEY, "3000"), + common.WithParamsValue(methodConfigPrefix+constant.TPS_LIMIT_RATE_KEY, "40"), + ) + + mockStrategyImpl := filter.NewMockTpsLimitStrategy(ctrl) + mockStrategyImpl.EXPECT().IsAllowable().Return(true).Times(1) + extension.SetTpsLimitStrategy(constant.DEFAULT_KEY, func(rate int, interval int) filter.TpsLimitStrategy { + testutil.AssertEqual(t, 40, rate) + testutil.AssertEqual(t, 3000, interval) + return mockStrategyImpl + }) + + limiter := GetMethodServiceTpsLimiter() + result := limiter.IsAllowable(*invokeUrl, invoc) + testutil.AssertTrue(t, result) +} diff --git a/filter/tps_limit_strategy.go b/filter/tps_limit_strategy.go index da5144054108118c9c87948d119a834ce32200a0..602d20f79caeb6bf8da60de6c08dee5f358c6b8c 100644 --- a/filter/tps_limit_strategy.go +++ b/filter/tps_limit_strategy.go @@ -16,10 +16,10 @@ */ package filter + /* * please register your implementation by invoking SetTpsLimitStrategy */ type TpsLimitStrategy interface { IsAllowable() bool } - diff --git a/filter/tps_limit_strategy_mock.go b/filter/tps_limit_strategy_mock.go new file mode 100644 index 0000000000000000000000000000000000000000..4adf5848d121c6d222f893cf094d6d9401c0e5c9 --- /dev/null +++ b/filter/tps_limit_strategy_mock.go @@ -0,0 +1,63 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +// Code generated by MockGen. DO NOT EDIT. +// Source: tps_limit_strategy.go + +// Package filter is a generated GoMock package. +package filter + +import ( + gomock "github.com/golang/mock/gomock" + reflect "reflect" +) + +// MockTpsLimitStrategy is a mock of TpsLimitStrategy interface +type MockTpsLimitStrategy struct { + ctrl *gomock.Controller + recorder *MockTpsLimitStrategyMockRecorder +} + +// MockTpsLimitStrategyMockRecorder is the mock recorder for MockTpsLimitStrategy +type MockTpsLimitStrategyMockRecorder struct { + mock *MockTpsLimitStrategy +} + +// NewMockTpsLimitStrategy creates a new mock instance +func NewMockTpsLimitStrategy(ctrl *gomock.Controller) *MockTpsLimitStrategy { + mock := &MockTpsLimitStrategy{ctrl: ctrl} + mock.recorder = &MockTpsLimitStrategyMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use +func (m *MockTpsLimitStrategy) EXPECT() *MockTpsLimitStrategyMockRecorder { + return m.recorder +} + +// IsAllowable mocks base method +func (m *MockTpsLimitStrategy) IsAllowable() bool { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "IsAllowable") + ret0, _ := ret[0].(bool) + return ret0 +} + +// IsAllowable indicates an expected call of IsAllowable +func (mr *MockTpsLimitStrategyMockRecorder) IsAllowable() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsAllowable", reflect.TypeOf((*MockTpsLimitStrategy)(nil).IsAllowable)) +} diff --git a/filter/tps_limiter.go b/filter/tps_limiter.go index ca945cc70079286595bab4eee4665f304174f0b1..1d2b2341ac7d9b12f75d373909b0baa58bc7295f 100644 --- a/filter/tps_limiter.go +++ b/filter/tps_limiter.go @@ -21,6 +21,7 @@ import ( "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/protocol" ) + /* * please register your implementation by invoking SetTpsLimiter * The usage, for example: @@ -34,6 +35,3 @@ import ( type TpsLimiter interface { IsAllowable(common.URL, protocol.Invocation) bool } - - - diff --git a/go.mod b/go.mod index 8171e8e429b243dffaafe2b24915ac90af742d80..0354a5056a6b021e82bf77edcbfe4edcc2e456da 100644 --- a/go.mod +++ b/go.mod @@ -33,6 +33,7 @@ require ( github.com/lestrrat/go-file-rotatelogs v0.0.0-20180223000712-d3151e2a480f // indirect github.com/lestrrat/go-strftime v0.0.0-20180220042222-ba3bf9c1d042 // indirect github.com/magiconair/properties v1.8.1 + github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd github.com/nacos-group/nacos-sdk-go v0.0.0-20190723125407-0242d42e3dbb github.com/pkg/errors v0.8.1 github.com/prometheus/client_golang v1.1.0 // indirect