Skip to content
Snippets Groups Projects
Commit 34902904 authored by Ming Deng's avatar Ming Deng
Browse files

Fix review: move the tps_limit to filter/impl/tps/

parent 27420aa9
No related branches found
No related tags found
No related merge requests found
Showing
with 118 additions and 78 deletions
......@@ -18,20 +18,20 @@
package extension
import (
"github.com/apache/dubbo-go/filter"
"github.com/apache/dubbo-go/filter/impl/tps/intf"
)
var (
tpsLimitStrategy = make(map[string]func(rate int, interval int) filter.TpsLimitStrategy)
tpsLimiter = make(map[string]func() filter.TpsLimiter)
tpsRejectedExecutionHandler = make(map[string]func() filter.RejectedExecutionHandler)
tpsLimitStrategy = make(map[string]func(rate int, interval int) intf.TpsLimitStrategy)
tpsLimiter = make(map[string]func() intf.TpsLimiter)
tpsRejectedExecutionHandler = make(map[string]func() intf.RejectedExecutionHandler)
)
func SetTpsLimiter(name string, creator func() filter.TpsLimiter) {
func SetTpsLimiter(name string, creator func() intf.TpsLimiter) {
tpsLimiter[name] = creator
}
func GetTpsLimiter(name string) filter.TpsLimiter {
func GetTpsLimiter(name string) intf.TpsLimiter {
creator, ok := tpsLimiter[name]
if !ok {
panic("TpsLimiter for " + name + " is not existing, make sure you have import the package " +
......@@ -40,11 +40,11 @@ func GetTpsLimiter(name string) filter.TpsLimiter {
return creator()
}
func SetTpsLimitStrategy(name string, creator func(rate int, interval int) filter.TpsLimitStrategy) {
func SetTpsLimitStrategy(name string, creator func(rate int, interval int) intf.TpsLimitStrategy) {
tpsLimitStrategy[name] = creator
}
func GetTpsLimitStrategyCreator(name string) func(rate int, interval int) filter.TpsLimitStrategy {
func GetTpsLimitStrategyCreator(name string) func(rate int, interval int) intf.TpsLimitStrategy {
creator, ok := tpsLimitStrategy[name]
if !ok {
panic("TpsLimitStrategy for " + name + " is not existing, make sure you have import the package " +
......@@ -53,11 +53,11 @@ func GetTpsLimitStrategyCreator(name string) func(rate int, interval int) filter
return creator
}
func SetTpsRejectedExecutionHandler(name string, creator func() filter.RejectedExecutionHandler) {
func SetTpsRejectedExecutionHandler(name string, creator func() intf.RejectedExecutionHandler) {
tpsRejectedExecutionHandler[name] = creator
}
func GetTpsRejectedExecutionHandler(name string) filter.RejectedExecutionHandler {
func GetTpsRejectedExecutionHandler(name string) intf.RejectedExecutionHandler {
creator, ok := tpsRejectedExecutionHandler[name]
if !ok {
panic("TpsRejectedExecutionHandler for " + name + " is not existing, make sure you have import the package " +
......
......@@ -29,6 +29,15 @@ services:
"UserProvider":
registry: "hangzhouzk,shanghaizk"
filter: ""
# the name of limiter
tps.limiter: "default"
# the time unit of interval is ms
tps.limit.interval: 60000
tps.limit.rate: 200
# the name of strategy
tps.limit.strategy: "slidingWindow"
# the name of RejectedExecutionHandler
tps.limit.rejected.handler: "default"
protocol : "dubbo"
# equivalent to interface of dubbo.xml
interface : "com.ikurento.user.UserProvider"
......
......@@ -14,13 +14,26 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package filter
package intf
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/protocol"
)
/**
* This implementation only logs the invocation info.
* it always return en error inside the result.
* "UserProvider":
* registry: "hangzhouzk"
* protocol : "dubbo"
* interface : "com.ikurento.user.UserProvider"
* ... # other configuration
* tps.limiter: "method-service" # the name of limiter
* tps.limit.rejected.handler: "name of handler"
* methods:
* - name: "GetUser"
*/
type RejectedExecutionHandler interface {
RejectedExecution(url common.URL, invocation protocol.Invocation) protocol.Result
}
......@@ -15,10 +15,21 @@
* limitations under the License.
*/
package filter
package intf
/*
* please register your implementation by invoking SetTpsLimitStrategy
* "UserProvider":
* registry: "hangzhouzk"
* protocol : "dubbo"
* interface : "com.ikurento.user.UserProvider"
* ... # other configuration
* tps.limiter: "method-service" # the name of limiter
* tps.limit.strategy: "name of implementation" # service-level
* methods:
* - name: "GetUser"
* tps.interval: 3000
* tps.limit.strategy: "name of implementation" # method-level
*/
type TpsLimitStrategy interface {
IsAllowable() bool
......
......@@ -15,7 +15,7 @@
* limitations under the License.
*/
package filter
package intf
import (
"github.com/apache/dubbo-go/common"
......
......@@ -18,7 +18,7 @@
// Source: rejected_execution_handler.go
// Package filter is a generated GoMock package.
package filter
package tps
import (
common "github.com/apache/dubbo-go/common"
......
......@@ -15,21 +15,18 @@
* limitations under the License.
*/
package impl
package tps
import (
"sync"
)
import (
"github.com/prometheus/common/log"
)
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/filter"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/filter/impl/tps/intf"
"github.com/apache/dubbo-go/protocol"
)
......@@ -46,16 +43,25 @@ var onlyLogHandlerOnce sync.Once
/**
* This implementation only logs the invocation info.
* it always return en error inside the result.
* "UserProvider":
* registry: "hangzhouzk"
* protocol : "dubbo"
* interface : "com.ikurento.user.UserProvider"
* ... # other configuration
* tps.limiter: "method-service" # the name of limiter
* tps.limit.rejected.handler: "default" or "log"
* methods:
* - name: "GetUser"
*/
type OnlyLogRejectedExecutionHandler struct {
}
func (handler *OnlyLogRejectedExecutionHandler) RejectedExecution(url common.URL, invocation protocol.Invocation) protocol.Result {
log.Errorf("The invocation was rejected due to over rate limitation. url: %s", url.String())
logger.Errorf("The invocation was rejected due to over rate limitation. url: %s", url.String())
return &protocol.RPCResult{}
}
func GetOnlyLogRejectedExecutionHandler() filter.RejectedExecutionHandler {
func GetOnlyLogRejectedExecutionHandler() intf.RejectedExecutionHandler {
onlyLogHandlerOnce.Do(func() {
onlyLogHandlerInstance = &OnlyLogRejectedExecutionHandler{}
})
......
......@@ -15,7 +15,7 @@
* limitations under the License.
*/
package impl
package tps
import (
"net/url"
......
......@@ -15,7 +15,7 @@
* limitations under the License.
*/
package impl
package tps
import (
"sync/atomic"
......@@ -24,7 +24,7 @@ import (
import (
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/filter"
"github.com/apache/dubbo-go/filter/impl/tps/intf"
)
const (
......@@ -75,7 +75,7 @@ func (impl *FixedWindowTpsLimitStrategyImpl) IsAllowable() bool {
return atomic.AddInt32(&impl.count, 1) <= impl.rate
}
func NewFixedWindowTpsLimitStrategyImpl(rate int, interval int) filter.TpsLimitStrategy {
func NewFixedWindowTpsLimitStrategyImpl(rate int, interval int) intf.TpsLimitStrategy {
return &FixedWindowTpsLimitStrategyImpl{
rate: int32(rate),
interval: int64(interval * 1000), // convert to ns
......
......@@ -15,7 +15,7 @@
* limitations under the License.
*/
package impl
package tps
import (
"testing"
......
......@@ -15,7 +15,7 @@
* limitations under the License.
*/
package impl
package tps
import (
"container/list"
......@@ -25,7 +25,7 @@ import (
import (
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/filter"
"github.com/apache/dubbo-go/filter/impl/tps/intf"
)
func init() {
......@@ -80,7 +80,7 @@ func (impl *SlidingWindowTpsLimitStrategyImpl) IsAllowable() bool {
return false
}
func NewSlidingWindowTpsLimitStrategyImpl(rate int, interval int) filter.TpsLimitStrategy {
func NewSlidingWindowTpsLimitStrategyImpl(rate int, interval int) intf.TpsLimitStrategy {
return &SlidingWindowTpsLimitStrategyImpl{
rate: rate,
interval: int64(interval * 1000),
......
......@@ -15,7 +15,7 @@
* limitations under the License.
*/
package impl
package tps
import (
"testing"
......@@ -23,23 +23,23 @@ import (
)
import (
"go.etcd.io/etcd/pkg/testutil"
"github.com/stretchr/testify/assert"
)
func TestSlidingWindowTpsLimitStrategyImpl_IsAllowable(t *testing.T) {
strategy := NewSlidingWindowTpsLimitStrategyImpl(2, 60000)
testutil.AssertTrue(t, strategy.IsAllowable())
testutil.AssertTrue(t, strategy.IsAllowable())
testutil.AssertFalse(t, strategy.IsAllowable())
assert.True(t, strategy.IsAllowable())
assert.True(t, strategy.IsAllowable())
assert.False(t, strategy.IsAllowable())
time.Sleep(time.Duration(2100 * 1000))
testutil.AssertFalse(t, strategy.IsAllowable())
assert.False(t, strategy.IsAllowable())
strategy = NewSlidingWindowTpsLimitStrategyImpl(2, 2000)
testutil.AssertTrue(t, strategy.IsAllowable())
testutil.AssertTrue(t, strategy.IsAllowable())
testutil.AssertFalse(t, strategy.IsAllowable())
assert.True(t, strategy.IsAllowable())
assert.True(t, strategy.IsAllowable())
assert.False(t, strategy.IsAllowable())
time.Sleep(time.Duration(2100 * 1000))
testutil.AssertTrue(t, strategy.IsAllowable())
testutil.AssertTrue(t, strategy.IsAllowable())
testutil.AssertFalse(t, strategy.IsAllowable())
assert.True(t, strategy.IsAllowable())
assert.True(t, strategy.IsAllowable())
assert.False(t, strategy.IsAllowable())
}
......@@ -18,7 +18,7 @@
// Source: tps_limit_strategy.go
// Package filter is a generated GoMock package.
package filter
package tps
import (
gomock "github.com/golang/mock/gomock"
......
......@@ -15,7 +15,7 @@
* limitations under the License.
*/
package impl
package tps
import (
"sync"
......@@ -23,7 +23,7 @@ import (
import (
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/filter"
"github.com/apache/dubbo-go/filter/impl/tps/intf"
)
func init() {
......@@ -56,7 +56,7 @@ func (impl *ThreadSafeFixedWindowTpsLimitStrategyImpl) IsAllowable() bool {
return impl.fixedWindow.IsAllowable()
}
func NewThreadSafeFixedWindowTpsLimitStrategyImpl(rate int, interval int) filter.TpsLimitStrategy {
func NewThreadSafeFixedWindowTpsLimitStrategyImpl(rate int, interval int) intf.TpsLimitStrategy {
fixedWindowStrategy := NewFixedWindowTpsLimitStrategyImpl(rate, interval).(*FixedWindowTpsLimitStrategyImpl)
return &ThreadSafeFixedWindowTpsLimitStrategyImpl{
fixedWindow: fixedWindowStrategy,
......
......@@ -15,7 +15,7 @@
* limitations under the License.
*/
package impl
package tps
import (
"testing"
......@@ -23,21 +23,21 @@ import (
)
import (
"github.com/coreos/etcd/pkg/testutil"
"github.com/stretchr/testify/assert"
)
func TestThreadSafeFixedWindowTpsLimitStrategyImpl_IsAllowable(t *testing.T) {
strategy := NewThreadSafeFixedWindowTpsLimitStrategyImpl(2, 60000)
testutil.AssertTrue(t, strategy.IsAllowable())
testutil.AssertTrue(t, strategy.IsAllowable())
testutil.AssertFalse(t, strategy.IsAllowable())
assert.True(t, strategy.IsAllowable())
assert.True(t, strategy.IsAllowable())
assert.False(t, strategy.IsAllowable())
strategy = NewThreadSafeFixedWindowTpsLimitStrategyImpl(2, 2000)
testutil.AssertTrue(t, strategy.IsAllowable())
testutil.AssertTrue(t, strategy.IsAllowable())
testutil.AssertFalse(t, strategy.IsAllowable())
assert.True(t, strategy.IsAllowable())
assert.True(t, strategy.IsAllowable())
assert.False(t, strategy.IsAllowable())
time.Sleep(time.Duration(2100 * 1000))
testutil.AssertTrue(t, strategy.IsAllowable())
testutil.AssertTrue(t, strategy.IsAllowable())
testutil.AssertFalse(t, strategy.IsAllowable())
assert.True(t, strategy.IsAllowable())
assert.True(t, strategy.IsAllowable())
assert.False(t, strategy.IsAllowable())
}
......@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package impl
package tps
import (
"fmt"
......@@ -30,7 +30,7 @@ import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/filter"
"github.com/apache/dubbo-go/filter/impl/tps/intf"
"github.com/apache/dubbo-go/protocol"
)
......@@ -127,7 +127,7 @@ func (limiter MethodServiceTpsLimiterImpl) IsAllowable(url common.URL, invocatio
limitState, found := limiter.tpsState.Load(limitTarget)
if found {
return limitState.(filter.TpsLimitStrategy).IsAllowable()
return limitState.(intf.TpsLimitStrategy).IsAllowable()
}
limitRate := getLimitConfig(methodLimitRateConfig, url, invocation,
......@@ -149,7 +149,7 @@ func (limiter MethodServiceTpsLimiterImpl) IsAllowable(url common.URL, invocatio
url.GetParam(constant.TPS_LIMIT_STRATEGY_KEY, constant.DEFAULT_KEY))
limitStateCreator := extension.GetTpsLimitStrategyCreator(limitStrategyConfig)
limitState, _ = limiter.tpsState.LoadOrStore(limitTarget, limitStateCreator(int(limitRate), int(limitInterval)))
return limitState.(filter.TpsLimitStrategy).IsAllowable()
return limitState.(intf.TpsLimitStrategy).IsAllowable()
}
func getLimitConfig(methodLevelConfig string,
......@@ -178,7 +178,7 @@ func getLimitConfig(methodLevelConfig string,
var methodServiceTpsLimiterInstance *MethodServiceTpsLimiterImpl
var methodServiceTpsLimiterOnce sync.Once
func GetMethodServiceTpsLimiter() filter.TpsLimiter {
func GetMethodServiceTpsLimiter() intf.TpsLimiter {
methodServiceTpsLimiterOnce.Do(func() {
methodServiceTpsLimiterInstance = &MethodServiceTpsLimiterImpl{
tpsState: concurrent.NewMap(),
......
......@@ -15,7 +15,7 @@
* limitations under the License.
*/
package impl
package tps
import (
"net/url"
......@@ -30,7 +30,7 @@ import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/filter"
"github.com/apache/dubbo-go/filter/impl/tps/intf"
"github.com/apache/dubbo-go/protocol/invocation"
)
......@@ -46,9 +46,9 @@ func TestMethodServiceTpsLimiterImpl_IsAllowable_Only_Service_Level(t *testing.T
common.WithParamsValue(constant.INTERFACE_KEY, methodName),
common.WithParamsValue(constant.TPS_LIMIT_RATE_KEY, "20"))
mockStrategyImpl := filter.NewMockTpsLimitStrategy(ctrl)
mockStrategyImpl := NewMockTpsLimitStrategy(ctrl)
mockStrategyImpl.EXPECT().IsAllowable().Return(true).Times(1)
extension.SetTpsLimitStrategy(constant.DEFAULT_KEY, func(rate int, interval int) filter.TpsLimitStrategy {
extension.SetTpsLimitStrategy(constant.DEFAULT_KEY, func(rate int, interval int) intf.TpsLimitStrategy {
assert.Equal(t, 20, rate)
assert.Equal(t, 60000, interval)
return mockStrategyImpl
......@@ -93,9 +93,9 @@ func TestMethodServiceTpsLimiterImpl_IsAllowable_Method_Level_Override(t *testin
common.WithParamsValue(methodConfigPrefix+constant.TPS_LIMIT_STRATEGY_KEY, "default"),
)
mockStrategyImpl := filter.NewMockTpsLimitStrategy(ctrl)
mockStrategyImpl := NewMockTpsLimitStrategy(ctrl)
mockStrategyImpl.EXPECT().IsAllowable().Return(true).Times(1)
extension.SetTpsLimitStrategy(constant.DEFAULT_KEY, func(rate int, interval int) filter.TpsLimitStrategy {
extension.SetTpsLimitStrategy(constant.DEFAULT_KEY, func(rate int, interval int) intf.TpsLimitStrategy {
assert.Equal(t, 40, rate)
assert.Equal(t, 7000, interval)
return mockStrategyImpl
......@@ -121,9 +121,9 @@ func TestMethodServiceTpsLimiterImpl_IsAllowable_Both_Method_And_Service(t *test
common.WithParamsValue(methodConfigPrefix+constant.TPS_LIMIT_RATE_KEY, "40"),
)
mockStrategyImpl := filter.NewMockTpsLimitStrategy(ctrl)
mockStrategyImpl := NewMockTpsLimitStrategy(ctrl)
mockStrategyImpl.EXPECT().IsAllowable().Return(true).Times(1)
extension.SetTpsLimitStrategy(constant.DEFAULT_KEY, func(rate int, interval int) filter.TpsLimitStrategy {
extension.SetTpsLimitStrategy(constant.DEFAULT_KEY, func(rate int, interval int) intf.TpsLimitStrategy {
assert.Equal(t, 40, rate)
assert.Equal(t, 3000, interval)
return mockStrategyImpl
......
......@@ -18,7 +18,7 @@
// Source: tps_limiter.go
// Package filter is a generated GoMock package.
package filter
package tps
import (
common "github.com/apache/dubbo-go/common"
......
......@@ -31,7 +31,8 @@ import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/filter"
"github.com/apache/dubbo-go/filter/impl/tps"
"github.com/apache/dubbo-go/filter/impl/tps/intf"
"github.com/apache/dubbo-go/protocol"
"github.com/apache/dubbo-go/protocol/invocation"
)
......@@ -53,9 +54,9 @@ func TestTpsLimitFilter_Invoke_With_No_TpsLimiter(t *testing.T) {
func TestGenericFilter_Invoke_With_Default_TpsLimiter(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
mockLimiter := filter.NewMockTpsLimiter(ctrl)
mockLimiter := tps.NewMockTpsLimiter(ctrl)
mockLimiter.EXPECT().IsAllowable(gomock.Any(), gomock.Any()).Return(true).Times(1)
extension.SetTpsLimiter(constant.DEFAULT_KEY, func() filter.TpsLimiter {
extension.SetTpsLimiter(constant.DEFAULT_KEY, func() intf.TpsLimiter {
return mockLimiter
})
......@@ -74,17 +75,17 @@ func TestGenericFilter_Invoke_With_Default_TpsLimiter(t *testing.T) {
func TestGenericFilter_Invoke_With_Default_TpsLimiter_Not_Allow(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
mockLimiter := filter.NewMockTpsLimiter(ctrl)
mockLimiter := tps.NewMockTpsLimiter(ctrl)
mockLimiter.EXPECT().IsAllowable(gomock.Any(), gomock.Any()).Return(false).Times(1)
extension.SetTpsLimiter(constant.DEFAULT_KEY, func() filter.TpsLimiter {
extension.SetTpsLimiter(constant.DEFAULT_KEY, func() intf.TpsLimiter {
return mockLimiter
})
mockResult := &protocol.RPCResult{}
mockRejectedHandler := filter.NewMockRejectedExecutionHandler(ctrl)
mockRejectedHandler := tps.NewMockRejectedExecutionHandler(ctrl)
mockRejectedHandler.EXPECT().RejectedExecution(gomock.Any(), gomock.Any()).Return(mockResult).Times(1)
extension.SetTpsRejectedExecutionHandler(constant.DEFAULT_KEY, func() filter.RejectedExecutionHandler {
extension.SetTpsRejectedExecutionHandler(constant.DEFAULT_KEY, func() intf.RejectedExecutionHandler {
return mockRejectedHandler
})
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment