From 1d5643a8f5bf37d3fdaf6306bf043adc5156ea40 Mon Sep 17 00:00:00 2001 From: louyuting <1849491904@qq.com> Date: Sat, 12 Sep 2020 00:21:55 +0800 Subject: [PATCH] refine sentinel-go filter --- filter/filter_impl/sentinel_filter.go | 159 +++++++++++++-------- filter/filter_impl/sentinel_filter_test.go | 64 +++++++-- 2 files changed, 153 insertions(+), 70 deletions(-) diff --git a/filter/filter_impl/sentinel_filter.go b/filter/filter_impl/sentinel_filter.go index 0472f63aa..e832ef097 100644 --- a/filter/filter_impl/sentinel_filter.go +++ b/filter/filter_impl/sentinel_filter.go @@ -18,27 +18,82 @@ package filter_impl import ( - "bytes" "context" "fmt" -) - -import ( sentinel "github.com/alibaba/sentinel-golang/api" "github.com/alibaba/sentinel-golang/core/base" -) - -import ( + "github.com/alibaba/sentinel-golang/logging" "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/common/logger" "github.com/apache/dubbo-go/filter" "github.com/apache/dubbo-go/protocol" + "strings" ) func init() { extension.SetFilter(SentinelProviderFilterName, GetSentinelProviderFilter) extension.SetFilter(SentinelConsumerFilterName, GetSentinelConsumerFilter) + + if err := sentinel.InitDefault(); err != nil { + logger.Errorf("[Sentinel Filter] fail to initialize Sentinel") + } + if err := logging.ResetGlobalLogger(DubboGoLogger{logger: logger.GetLogger()}); err != nil { + logger.Errorf("[Sentinel Filter] fail to ingest dubbo logger into sentinel") + } +} + +type DubboGoLogger struct { + logger logger.Logger +} + +func (d DubboGoLogger) Debug(v ...interface{}) { + d.logger.Debug(v...) +} + +func (d DubboGoLogger) Debugf(format string, v ...interface{}) { + d.logger.Debugf(format, v...) +} + +func (d DubboGoLogger) Info(v ...interface{}) { + d.logger.Info(v...) +} + +func (d DubboGoLogger) Infof(format string, v ...interface{}) { + d.logger.Infof(format, v...) +} + +func (d DubboGoLogger) Warn(v ...interface{}) { + d.logger.Warn(v...) +} + +func (d DubboGoLogger) Warnf(format string, v ...interface{}) { + d.logger.Warnf(format, v...) +} + +func (d DubboGoLogger) Error(v ...interface{}) { + d.logger.Error(v...) +} + +func (d DubboGoLogger) Errorf(format string, v ...interface{}) { + d.logger.Errorf(format, v...) +} + +func (d DubboGoLogger) Fatal(v ...interface{}) { + d.logger.Error(v...) +} + +func (d DubboGoLogger) Fatalf(format string, v ...interface{}) { + d.logger.Errorf(format, v...) +} + +func (d DubboGoLogger) Panic(v ...interface{}) { + d.logger.Error(v...) +} + +func (d DubboGoLogger) Panicf(format string, v ...interface{}) { + d.logger.Errorf(format, v...) } func GetSentinelConsumerFilter() filter.Filter { @@ -49,16 +104,24 @@ func GetSentinelProviderFilter() filter.Filter { return &SentinelProviderFilter{} } +func sentinelExit(ctx context.Context, result protocol.Result) { + if methodEntry := ctx.Value(MethodEntryKey); methodEntry != nil { + e := methodEntry.(*base.SentinelEntry) + sentinel.TraceError(e, result.Error()) + e.Exit() + } + if interfaceEntry := ctx.Value(InterfaceEntryKey); interfaceEntry != nil { + e := interfaceEntry.(*base.SentinelEntry) + sentinel.TraceError(e, result.Error()) + e.Exit() + } +} + type SentinelProviderFilter struct{} func (d *SentinelProviderFilter) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result { - methodResourceName := getResourceName(invoker, invocation, getProviderPrefix()) - interfaceResourceName := "" - if getInterfaceGroupAndVersionEnabled() { - interfaceResourceName = getColonSeparatedKey(invoker.GetUrl()) - } else { - interfaceResourceName = invoker.GetUrl().Service() - } + interfaceResourceName, methodResourceName := getResourceName(invoker, invocation, getProviderPrefix()) + var ( interfaceEntry *base.SentinelEntry methodEntry *base.SentinelEntry @@ -71,8 +134,10 @@ func (d *SentinelProviderFilter) Invoke(ctx context.Context, invoker protocol.In } ctx = context.WithValue(ctx, InterfaceEntryKey, interfaceEntry) - methodEntry, b = sentinel.Entry(methodResourceName, sentinel.WithResourceType(base.ResTypeRPC), - sentinel.WithTrafficType(base.Inbound), sentinel.WithArgs(invocation.Arguments()...)) + methodEntry, b = sentinel.Entry(methodResourceName, + sentinel.WithResourceType(base.ResTypeRPC), + sentinel.WithTrafficType(base.Inbound), + sentinel.WithArgs(invocation.Arguments()...)) if b != nil { // method blocked return sentinelDubboProviderFallback(ctx, invoker, invocation, b) @@ -82,29 +147,14 @@ func (d *SentinelProviderFilter) Invoke(ctx context.Context, invoker protocol.In } func (d *SentinelProviderFilter) OnResponse(ctx context.Context, result protocol.Result, _ protocol.Invoker, _ protocol.Invocation) protocol.Result { - if methodEntry := ctx.Value(MethodEntryKey); methodEntry != nil { - e := methodEntry.(*base.SentinelEntry) - sentinel.TraceError(e, result.Error()) - e.Exit() - } - if interfaceEntry := ctx.Value(InterfaceEntryKey); interfaceEntry != nil { - e := interfaceEntry.(*base.SentinelEntry) - sentinel.TraceError(e, result.Error()) - e.Exit() - } + sentinelExit(ctx, result) return result } type SentinelConsumerFilter struct{} func (d *SentinelConsumerFilter) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result { - methodResourceName := getResourceName(invoker, invocation, getConsumerPrefix()) - interfaceResourceName := "" - if getInterfaceGroupAndVersionEnabled() { - interfaceResourceName = getColonSeparatedKey(invoker.GetUrl()) - } else { - interfaceResourceName = invoker.GetUrl().Service() - } + interfaceResourceName, methodResourceName := getResourceName(invoker, invocation, getConsumerPrefix()) var ( interfaceEntry *base.SentinelEntry methodEntry *base.SentinelEntry @@ -130,16 +180,7 @@ func (d *SentinelConsumerFilter) Invoke(ctx context.Context, invoker protocol.In } func (d *SentinelConsumerFilter) OnResponse(ctx context.Context, result protocol.Result, _ protocol.Invoker, _ protocol.Invocation) protocol.Result { - if methodEntry := ctx.Value(MethodEntryKey); methodEntry != nil { - e := methodEntry.(*base.SentinelEntry) - sentinel.TraceError(e, result.Error()) - e.Exit() - } - if interfaceEntry := ctx.Value(InterfaceEntryKey); interfaceEntry != nil { - e := interfaceEntry.(*base.SentinelEntry) - sentinel.TraceError(e, result.Error()) - e.Exit() - } + sentinelExit(ctx, result) return result } @@ -176,34 +217,30 @@ const ( InterfaceEntryKey = "$$sentinelInterfaceEntry" ) -// Currently, a ConcurrentHashMap mechanism is missing. -// All values are filled with default values first. +func getResourceName(invoker protocol.Invoker, invocation protocol.Invocation, prefix string) (interfaceResourceName, methodResourceName string) { + var sb strings.Builder -func getResourceName(invoker protocol.Invoker, invocation protocol.Invocation, prefix string) string { - var ( - buf bytes.Buffer - interfaceResource string - ) - buf.WriteString(prefix) + sb.WriteString(prefix) if getInterfaceGroupAndVersionEnabled() { - interfaceResource = getColonSeparatedKey(invoker.GetUrl()) + interfaceResourceName = getColonSeparatedKey(invoker.GetUrl()) } else { - interfaceResource = invoker.GetUrl().Service() + interfaceResourceName = invoker.GetUrl().Service() } - buf.WriteString(interfaceResource) - buf.WriteString(":") - buf.WriteString(invocation.MethodName()) - buf.WriteString("(") + sb.WriteString(interfaceResourceName) + sb.WriteString(":") + sb.WriteString(invocation.MethodName()) + sb.WriteString("(") isFirst := true for _, v := range invocation.ParameterTypes() { if !isFirst { - buf.WriteString(",") + sb.WriteString(",") } - buf.WriteString(v.Name()) + sb.WriteString(v.Name()) isFirst = false } - buf.WriteString(")") - return buf.String() + sb.WriteString(")") + methodResourceName = sb.String() + return } func getConsumerPrefix() string { diff --git a/filter/filter_impl/sentinel_filter_test.go b/filter/filter_impl/sentinel_filter_test.go index b8e4d87cb..2bf11d0be 100644 --- a/filter/filter_impl/sentinel_filter_test.go +++ b/filter/filter_impl/sentinel_filter_test.go @@ -19,19 +19,64 @@ package filter_impl import ( "context" - "testing" -) - -import ( - "github.com/stretchr/testify/assert" -) - -import ( + "github.com/alibaba/sentinel-golang/core/flow" "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/protocol" "github.com/apache/dubbo-go/protocol/invocation" + "github.com/stretchr/testify/assert" + "sync" + "sync/atomic" + "testing" ) +func TestSentinelFilter_QPS(t *testing.T) { + url, err := common.NewURL("dubbo://127.0.0.1:20000/UserProvider?anyhost=true&" + + "version=1.0.0&group=myGroup&" + + "application=BDTService&category=providers&default.timeout=10000&dubbo=dubbo-provider-golang-1.0.0&" + + "environment=dev&interface=com.ikurento.user.UserProvider&ip=192.168.56.1&methods=GetUser%2C&" + + "module=dubbogo+user-info+server&org=ikurento.com&owner=ZX&pid=1447&revision=0.0.1&" + + "side=provider&timeout=3000×tamp=1556509797245&bean.name=UserProvider") + assert.NoError(t, err) + mockInvoker := protocol.NewBaseInvoker(url) + interfaceResourceName, _ := getResourceName(mockInvoker, + invocation.NewRPCInvocation("hello", []interface{}{"OK"}, make(map[string]interface{})), "prefix_") + mockInvocation := invocation.NewRPCInvocation("hello", []interface{}{"OK"}, make(map[string]interface{})) + + _, err = flow.LoadRules([]*flow.Rule{ + { + Resource: interfaceResourceName, + MetricType: flow.QPS, + TokenCalculateStrategy: flow.Direct, + ControlBehavior: flow.Reject, + Count: 100, + RelationStrategy: flow.CurrentResource, + }, + }) + assert.NoError(t, err) + + wg := sync.WaitGroup{} + wg.Add(10) + f := GetSentinelProviderFilter() + pass := int64(0) + block := int64(0) + for i := 0; i < 10; i++ { + go func() { + for j := 0; j < 30; j++ { + result := f.Invoke(context.TODO(), mockInvoker, mockInvocation) + if result.Error() == nil { + atomic.AddInt64(&pass, 1) + } else { + atomic.AddInt64(&block, 1) + } + } + wg.Done() + }() + } + wg.Wait() + assert.True(t, atomic.LoadInt64(&pass) == 100) + assert.True(t, atomic.LoadInt64(&block) == 200) +} + func TestConsumerFilter_Invoke(t *testing.T) { f := GetSentinelConsumerFilter() url, err := common.NewURL("dubbo://127.0.0.1:20000/UserProvider?anyhost=true&" + @@ -69,7 +114,8 @@ func TestGetResourceName(t *testing.T) { "side=provider&timeout=3000×tamp=1556509797245&bean.name=UserProvider") assert.NoError(t, err) mockInvoker := protocol.NewBaseInvoker(url) - methodResourceName := getResourceName(mockInvoker, + interfaceResourceName, methodResourceName := getResourceName(mockInvoker, invocation.NewRPCInvocation("hello", []interface{}{"OK"}, make(map[string]interface{})), "prefix_") + assert.Equal(t, "com.ikurento.user.UserProvider:myGroup:1.0.0", interfaceResourceName) assert.Equal(t, "prefix_com.ikurento.user.UserProvider:myGroup:1.0.0:hello()", methodResourceName) } -- GitLab