From b7bf26b48622f89cf9223809eae07da89a2d4e2a Mon Sep 17 00:00:00 2001 From: godchen <qingxiang.chen@zilliz.com> Date: Thu, 17 Jun 2021 14:17:56 +0800 Subject: [PATCH] Update grpc trace (#5817) * Update grpc trace Signed-off-by: godchen <qingxiang.chen@zilliz.com> * go fmt Signed-off-by: godchen <qingxiang.chen@zilliz.com> * remove useless code in mod Signed-off-by: godchen <qingxiang.chen@zilliz.com> --- go.mod | 2 +- go.sum | 2 - .../distributed/datanode/client/client.go | 15 +++---- internal/distributed/datanode/service.go | 9 ++-- .../distributed/dataservice/client/client.go | 10 ++--- internal/distributed/dataservice/service.go | 9 ++-- .../distributed/indexnode/client/client.go | 13 +++--- internal/distributed/indexnode/service.go | 9 ++-- .../distributed/indexservice/client/client.go | 10 ++--- internal/distributed/indexservice/service.go | 9 ++-- .../masterservice/client/client.go | 10 ++--- internal/distributed/masterservice/server.go | 9 ++-- .../distributed/proxynode/client/client.go | 10 ++--- internal/distributed/proxynode/service.go | 10 ++--- .../distributed/querynode/client/client.go | 13 +++--- internal/distributed/querynode/service.go | 9 ++-- .../distributed/queryservice/client/client.go | 10 ++--- internal/distributed/queryservice/service.go | 9 ++-- internal/util/trace/interceptor_suite.go | 44 +++++++++++++++++++ 19 files changed, 122 insertions(+), 90 deletions(-) create mode 100644 internal/util/trace/interceptor_suite.go diff --git a/go.mod b/go.mod index 5b0988e46..351747220 100644 --- a/go.mod +++ b/go.mod @@ -18,12 +18,12 @@ require ( github.com/golang/mock v1.3.1 github.com/golang/protobuf v1.4.3 github.com/google/btree v1.0.0 + github.com/grpc-ecosystem/go-grpc-middleware v1.0.0 github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 github.com/jarcoal/httpmock v1.0.8 github.com/klauspost/compress v1.10.11 // indirect github.com/minio/minio-go/v7 v7.0.10 github.com/mitchellh/mapstructure v1.1.2 - github.com/opentracing-contrib/go-grpc v0.0.0-20200813121455-4a6760c71486 github.com/opentracing/opentracing-go v1.2.0 github.com/pierrec/lz4 v2.5.2+incompatible // indirect github.com/pkg/errors v0.9.1 diff --git a/go.sum b/go.sum index d85fd9c4b..51bedc26c 100644 --- a/go.sum +++ b/go.sum @@ -277,8 +277,6 @@ github.com/onsi/ginkgo v1.14.0/go.mod h1:iSB4RoI2tjJc9BBv4NKIKWKya62Rps+oPG/Lv9k github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY= github.com/onsi/gomega v1.10.1 h1:o0+MgICZLuZ7xjH7Vx6zS/zcu93/BEp1VwkIW1mEXCE= github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo= -github.com/opentracing-contrib/go-grpc v0.0.0-20200813121455-4a6760c71486 h1:K35HCWaOTJIPW6cDHK4yj3QfRY/NhE0pBbfoc0M2NMQ= -github.com/opentracing-contrib/go-grpc v0.0.0-20200813121455-4a6760c71486/go.mod h1:DYR5Eij8rJl8h7gblRrOZ8g0kW1umSpKqYIBTgeDtLo= github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= github.com/opentracing/opentracing-go v1.2.0 h1:uEJPy/1a5RIPAJ0Ov+OIO8OxWu77jEv+1B0VhjKrZUs= github.com/opentracing/opentracing-go v1.2.0/go.mod h1:GxEUsuufX4nBwe+T+Wl9TAgYrxe9dPLANfrWvHYVTgc= diff --git a/internal/distributed/datanode/client/client.go b/internal/distributed/datanode/client/client.go index 085e84d56..766de5868 100644 --- a/internal/distributed/datanode/client/client.go +++ b/internal/distributed/datanode/client/client.go @@ -18,13 +18,13 @@ import ( "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/util/retry" - otgrpc "github.com/opentracing-contrib/go-grpc" - "github.com/opentracing/opentracing-go" + grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing" "github.com/milvus-io/milvus/internal/proto/commonpb" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/proto/milvuspb" + "github.com/milvus-io/milvus/internal/util/trace" "go.uber.org/zap" "google.golang.org/grpc" @@ -69,17 +69,16 @@ func (c *Client) Init() error { } func (c *Client) connect() error { - tracer := opentracing.GlobalTracer() - var err error connectGrpcFunc := func() error { - log.Debug("DataNode connect ", zap.String("address", c.addr)) ctx, cancelFunc := context.WithTimeout(c.ctx, c.timeout) defer cancelFunc() + opts := trace.GetInterceptorOpts() + log.Debug("DataNode connect ", zap.String("address", c.addr)) conn, err := grpc.DialContext(ctx, c.addr, grpc.WithInsecure(), grpc.WithBlock(), grpc.WithUnaryInterceptor( - otgrpc.OpenTracingClientInterceptor(tracer)), + grpc_opentracing.UnaryClientInterceptor(opts...)), grpc.WithStreamInterceptor( - otgrpc.OpenTracingStreamClientInterceptor(tracer))) + grpc_opentracing.StreamClientInterceptor(opts...))) if err != nil { return err } @@ -87,7 +86,7 @@ func (c *Client) connect() error { return nil } - err = retry.Retry(c.reconnTry, 500*time.Millisecond, connectGrpcFunc) + err := retry.Retry(c.reconnTry, 500*time.Millisecond, connectGrpcFunc) if err != nil { log.Debug("DataNodeClient try connect failed", zap.Error(err)) return err diff --git a/internal/distributed/datanode/service.go b/internal/distributed/datanode/service.go index 4b6138e31..02a37131b 100644 --- a/internal/distributed/datanode/service.go +++ b/internal/distributed/datanode/service.go @@ -22,8 +22,6 @@ import ( "sync" "time" - otgrpc "github.com/opentracing-contrib/go-grpc" - "github.com/opentracing/opentracing-go" "go.uber.org/zap" "google.golang.org/grpc" @@ -31,6 +29,7 @@ import ( dsc "github.com/milvus-io/milvus/internal/distributed/dataservice/client" msc "github.com/milvus-io/milvus/internal/distributed/masterservice/client" + grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing" "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/msgstream" "github.com/milvus-io/milvus/internal/proto/commonpb" @@ -93,14 +92,14 @@ func (s *Server) startGrpc() error { func (s *Server) startGrpcLoop(listener net.Listener) { defer s.wg.Done() - tracer := opentracing.GlobalTracer() + opts := trace.GetInterceptorOpts() s.grpcServer = grpc.NewServer( grpc.MaxRecvMsgSize(math.MaxInt32), grpc.MaxSendMsgSize(math.MaxInt32), grpc.UnaryInterceptor( - otgrpc.OpenTracingServerInterceptor(tracer)), + grpc_opentracing.UnaryServerInterceptor(opts...)), grpc.StreamInterceptor( - otgrpc.OpenTracingStreamServerInterceptor(tracer))) + grpc_opentracing.StreamServerInterceptor(opts...))) datapb.RegisterDataNodeServer(s.grpcServer, s) ctx, cancel := context.WithCancel(s.ctx) diff --git a/internal/distributed/dataservice/client/client.go b/internal/distributed/dataservice/client/client.go index 608d2e3b2..237827b22 100644 --- a/internal/distributed/dataservice/client/client.go +++ b/internal/distributed/dataservice/client/client.go @@ -16,13 +16,13 @@ import ( "fmt" "time" + grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing" "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/proto/milvuspb" "github.com/milvus-io/milvus/internal/util/retry" "github.com/milvus-io/milvus/internal/util/sessionutil" + "github.com/milvus-io/milvus/internal/util/trace" "github.com/milvus-io/milvus/internal/util/typeutil" - otgrpc "github.com/opentracing-contrib/go-grpc" - "github.com/opentracing/opentracing-go" "go.uber.org/zap" "google.golang.org/grpc" @@ -80,7 +80,6 @@ func (c *Client) Init() error { } func (c *Client) connect() error { - tracer := opentracing.GlobalTracer() var err error getDataServiceAddressFn := func() error { c.addr, err = getDataServiceAddress(c.sess) @@ -97,12 +96,13 @@ func (c *Client) connect() error { connectGrpcFunc := func() error { ctx, cancelFunc := context.WithTimeout(c.ctx, c.timeout) defer cancelFunc() + opts := trace.GetInterceptorOpts() log.Debug("DataServiceClient try reconnect ", zap.String("address", c.addr)) conn, err := grpc.DialContext(ctx, c.addr, grpc.WithInsecure(), grpc.WithBlock(), grpc.WithUnaryInterceptor( - otgrpc.OpenTracingClientInterceptor(tracer)), + grpc_opentracing.UnaryClientInterceptor(opts...)), grpc.WithStreamInterceptor( - otgrpc.OpenTracingStreamClientInterceptor(tracer))) + grpc_opentracing.StreamClientInterceptor(opts...))) if err != nil { return err } diff --git a/internal/distributed/dataservice/service.go b/internal/distributed/dataservice/service.go index f91355c98..ee9ee7d8a 100644 --- a/internal/distributed/dataservice/service.go +++ b/internal/distributed/dataservice/service.go @@ -25,6 +25,7 @@ import ( "google.golang.org/grpc" + grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing" grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" "github.com/milvus-io/milvus/internal/dataservice" "github.com/milvus-io/milvus/internal/log" @@ -32,8 +33,6 @@ import ( "github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/internal/util/funcutil" "github.com/milvus-io/milvus/internal/util/trace" - otgrpc "github.com/opentracing-contrib/go-grpc" - "github.com/opentracing/opentracing-go" "github.com/milvus-io/milvus/internal/proto/commonpb" "github.com/milvus-io/milvus/internal/proto/datapb" @@ -126,14 +125,14 @@ func (s *Server) startGrpcLoop(grpcPort int) { ctx, cancel := context.WithCancel(s.ctx) defer cancel() - tracer := opentracing.GlobalTracer() + opts := trace.GetInterceptorOpts() s.grpcServer = grpc.NewServer( grpc.MaxRecvMsgSize(math.MaxInt32), grpc.MaxSendMsgSize(math.MaxInt32), grpc.UnaryInterceptor( - otgrpc.OpenTracingServerInterceptor(tracer)), + grpc_opentracing.UnaryServerInterceptor(opts...)), grpc.StreamInterceptor( - otgrpc.OpenTracingStreamServerInterceptor(tracer))) + grpc_opentracing.StreamServerInterceptor(opts...))) //grpc.UnaryInterceptor(grpc_prometheus.UnaryServerInterceptor)) datapb.RegisterDataServiceServer(s.grpcServer, s) grpc_prometheus.Register(s.grpcServer) diff --git a/internal/distributed/indexnode/client/client.go b/internal/distributed/indexnode/client/client.go index 40ca0a23b..7610907e5 100644 --- a/internal/distributed/indexnode/client/client.go +++ b/internal/distributed/indexnode/client/client.go @@ -16,11 +16,11 @@ import ( "fmt" "time" + grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing" "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/proto/milvuspb" "github.com/milvus-io/milvus/internal/util/retry" - otgrpc "github.com/opentracing-contrib/go-grpc" - "github.com/opentracing/opentracing-go" + "github.com/milvus-io/milvus/internal/util/trace" "go.uber.org/zap" "google.golang.org/grpc" @@ -64,17 +64,16 @@ func (c *Client) Init() error { } func (c *Client) connect() error { - tracer := opentracing.GlobalTracer() - var err error connectGrpcFunc := func() error { ctx, cancelFunc := context.WithTimeout(c.ctx, c.timeout) defer cancelFunc() + opts := trace.GetInterceptorOpts() log.Debug("IndexNodeClient try connect ", zap.String("address", c.addr)) conn, err := grpc.DialContext(ctx, c.addr, grpc.WithInsecure(), grpc.WithBlock(), grpc.WithUnaryInterceptor( - otgrpc.OpenTracingClientInterceptor(tracer)), + grpc_opentracing.UnaryClientInterceptor(opts...)), grpc.WithStreamInterceptor( - otgrpc.OpenTracingStreamClientInterceptor(tracer))) + grpc_opentracing.StreamClientInterceptor(opts...))) if err != nil { return err } @@ -82,7 +81,7 @@ func (c *Client) connect() error { return nil } - err = retry.Retry(c.reconnTry, 500*time.Millisecond, connectGrpcFunc) + err := retry.Retry(c.reconnTry, 500*time.Millisecond, connectGrpcFunc) if err != nil { log.Debug("IndexNodeClient try connect failed", zap.Error(err)) return err diff --git a/internal/distributed/indexnode/service.go b/internal/distributed/indexnode/service.go index 90448a397..95a96c759 100644 --- a/internal/distributed/indexnode/service.go +++ b/internal/distributed/indexnode/service.go @@ -23,6 +23,7 @@ import ( "go.uber.org/zap" + grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing" grpcindexserviceclient "github.com/milvus-io/milvus/internal/distributed/indexservice/client" "github.com/milvus-io/milvus/internal/indexnode" "github.com/milvus-io/milvus/internal/log" @@ -33,8 +34,6 @@ import ( "github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/internal/util/funcutil" "github.com/milvus-io/milvus/internal/util/trace" - otgrpc "github.com/opentracing-contrib/go-grpc" - "github.com/opentracing/opentracing-go" "google.golang.org/grpc" ) @@ -79,14 +78,14 @@ func (s *Server) startGrpcLoop(grpcPort int) { ctx, cancel := context.WithCancel(s.loopCtx) defer cancel() - tracer := opentracing.GlobalTracer() + opts := trace.GetInterceptorOpts() s.grpcServer = grpc.NewServer( grpc.MaxRecvMsgSize(math.MaxInt32), grpc.MaxSendMsgSize(math.MaxInt32), grpc.UnaryInterceptor( - otgrpc.OpenTracingServerInterceptor(tracer)), + grpc_opentracing.UnaryServerInterceptor(opts...)), grpc.StreamInterceptor( - otgrpc.OpenTracingStreamServerInterceptor(tracer))) + grpc_opentracing.StreamServerInterceptor(opts...))) indexpb.RegisterIndexNodeServer(s.grpcServer, s) go funcutil.CheckGrpcReady(ctx, s.grpcErrChan) if err := s.grpcServer.Serve(lis); err != nil { diff --git a/internal/distributed/indexservice/client/client.go b/internal/distributed/indexservice/client/client.go index 149ba2a43..978395698 100644 --- a/internal/distributed/indexservice/client/client.go +++ b/internal/distributed/indexservice/client/client.go @@ -18,12 +18,12 @@ import ( "google.golang.org/grpc" + grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing" "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/util/retry" "github.com/milvus-io/milvus/internal/util/sessionutil" + "github.com/milvus-io/milvus/internal/util/trace" "github.com/milvus-io/milvus/internal/util/typeutil" - otgrpc "github.com/opentracing-contrib/go-grpc" - "github.com/opentracing/opentracing-go" "go.uber.org/zap" "github.com/milvus-io/milvus/internal/proto/commonpb" @@ -84,7 +84,6 @@ func (c *Client) Init() error { } func (c *Client) connect() error { - tracer := opentracing.GlobalTracer() var err error getIndexServiceaddrFn := func() error { c.addr, err = getIndexServiceaddr(c.sess) @@ -102,12 +101,13 @@ func (c *Client) connect() error { connectGrpcFunc := func() error { ctx, cancelFunc := context.WithTimeout(c.ctx, c.timeout) defer cancelFunc() + opts := trace.GetInterceptorOpts() log.Debug("IndexServiceClient try connect ", zap.String("address", c.addr)) conn, err := grpc.DialContext(ctx, c.addr, grpc.WithInsecure(), grpc.WithBlock(), grpc.WithUnaryInterceptor( - otgrpc.OpenTracingClientInterceptor(tracer)), + grpc_opentracing.UnaryClientInterceptor(opts...)), grpc.WithStreamInterceptor( - otgrpc.OpenTracingStreamClientInterceptor(tracer))) + grpc_opentracing.StreamClientInterceptor(opts...))) if err != nil { return err } diff --git a/internal/distributed/indexservice/service.go b/internal/distributed/indexservice/service.go index d4037dcda..e93d6cf25 100644 --- a/internal/distributed/indexservice/service.go +++ b/internal/distributed/indexservice/service.go @@ -21,6 +21,7 @@ import ( "go.uber.org/zap" + grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing" "github.com/milvus-io/milvus/internal/indexservice" "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/proto/commonpb" @@ -30,8 +31,6 @@ import ( "github.com/milvus-io/milvus/internal/util/funcutil" "github.com/milvus-io/milvus/internal/util/trace" "github.com/milvus-io/milvus/internal/util/typeutil" - otgrpc "github.com/opentracing-contrib/go-grpc" - "github.com/opentracing/opentracing-go" "google.golang.org/grpc" ) @@ -164,14 +163,14 @@ func (s *Server) startGrpcLoop(grpcPort int) { ctx, cancel := context.WithCancel(s.loopCtx) defer cancel() - tracer := opentracing.GlobalTracer() + opts := trace.GetInterceptorOpts() s.grpcServer = grpc.NewServer( grpc.MaxRecvMsgSize(math.MaxInt32), grpc.MaxSendMsgSize(math.MaxInt32), grpc.UnaryInterceptor( - otgrpc.OpenTracingServerInterceptor(tracer)), + grpc_opentracing.UnaryServerInterceptor(opts...)), grpc.StreamInterceptor( - otgrpc.OpenTracingStreamServerInterceptor(tracer))) + grpc_opentracing.StreamServerInterceptor(opts...))) indexpb.RegisterIndexServiceServer(s.grpcServer, s) go funcutil.CheckGrpcReady(ctx, s.grpcErrChan) diff --git a/internal/distributed/masterservice/client/client.go b/internal/distributed/masterservice/client/client.go index 7a305e6ea..bab3b6d20 100644 --- a/internal/distributed/masterservice/client/client.go +++ b/internal/distributed/masterservice/client/client.go @@ -17,6 +17,7 @@ import ( "fmt" "time" + grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing" "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/proto/commonpb" "github.com/milvus-io/milvus/internal/proto/internalpb" @@ -24,9 +25,8 @@ import ( "github.com/milvus-io/milvus/internal/proto/milvuspb" "github.com/milvus-io/milvus/internal/util/retry" "github.com/milvus-io/milvus/internal/util/sessionutil" + "github.com/milvus-io/milvus/internal/util/trace" "github.com/milvus-io/milvus/internal/util/typeutil" - otgrpc "github.com/opentracing-contrib/go-grpc" - "github.com/opentracing/opentracing-go" "go.uber.org/zap" "google.golang.org/grpc" ) @@ -87,7 +87,6 @@ func NewClient(ctx context.Context, metaRoot string, etcdEndpoints []string, tim } func (c *GrpcClient) connect() error { - tracer := opentracing.GlobalTracer() var err error getMasterServiceAddrFn := func() error { ch := make(chan struct{}, 1) @@ -118,12 +117,13 @@ func (c *GrpcClient) connect() error { var conn *grpc.ClientConn var err error ch := make(chan struct{}, 1) + opts := trace.GetInterceptorOpts() go func() { conn, err = grpc.DialContext(ctx, c.addr, grpc.WithInsecure(), grpc.WithBlock(), grpc.WithUnaryInterceptor( - otgrpc.OpenTracingClientInterceptor(tracer)), + grpc_opentracing.UnaryClientInterceptor(opts...)), grpc.WithStreamInterceptor( - otgrpc.OpenTracingStreamClientInterceptor(tracer))) + grpc_opentracing.StreamClientInterceptor(opts...))) ch <- struct{}{} }() select { diff --git a/internal/distributed/masterservice/server.go b/internal/distributed/masterservice/server.go index edbbc6a7d..b1a7e82c4 100644 --- a/internal/distributed/masterservice/server.go +++ b/internal/distributed/masterservice/server.go @@ -20,8 +20,6 @@ import ( "sync" "time" - otgrpc "github.com/opentracing-contrib/go-grpc" - "github.com/opentracing/opentracing-go" "go.uber.org/zap" "google.golang.org/grpc" @@ -35,6 +33,7 @@ import ( "github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/internal/util/trace" + grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing" "github.com/milvus-io/milvus/internal/proto/commonpb" "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/proto/masterpb" @@ -223,14 +222,14 @@ func (s *Server) startGrpcLoop(grpcPort int) { ctx, cancel := context.WithCancel(s.ctx) defer cancel() - tracer := opentracing.GlobalTracer() + opts := trace.GetInterceptorOpts() s.grpcServer = grpc.NewServer( grpc.MaxRecvMsgSize(math.MaxInt32), grpc.MaxSendMsgSize(math.MaxInt32), grpc.UnaryInterceptor( - otgrpc.OpenTracingServerInterceptor(tracer)), + grpc_opentracing.UnaryServerInterceptor(opts...)), grpc.StreamInterceptor( - otgrpc.OpenTracingStreamServerInterceptor(tracer))) + grpc_opentracing.StreamServerInterceptor(opts...))) masterpb.RegisterMasterServiceServer(s.grpcServer, s) go funcutil.CheckGrpcReady(ctx, s.grpcErrChan) diff --git a/internal/distributed/proxynode/client/client.go b/internal/distributed/proxynode/client/client.go index d13dfcc80..da79f24a0 100644 --- a/internal/distributed/proxynode/client/client.go +++ b/internal/distributed/proxynode/client/client.go @@ -15,14 +15,14 @@ import ( "context" "time" + grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing" "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/proto/commonpb" "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/proto/milvuspb" "github.com/milvus-io/milvus/internal/proto/proxypb" "github.com/milvus-io/milvus/internal/util/retry" - otgrpc "github.com/opentracing-contrib/go-grpc" - "github.com/opentracing/opentracing-go" + "github.com/milvus-io/milvus/internal/util/trace" "go.uber.org/zap" "google.golang.org/grpc" ) @@ -58,16 +58,16 @@ func (c *Client) Init() error { } func (c *Client) connect() error { - tracer := opentracing.GlobalTracer() connectGrpcFunc := func() error { ctx, cancelFunc := context.WithTimeout(c.ctx, c.timeout) defer cancelFunc() + opts := trace.GetInterceptorOpts() log.Debug("ProxyNodeClient try connect ", zap.String("address", c.addr)) conn, err := grpc.DialContext(ctx, c.addr, grpc.WithInsecure(), grpc.WithBlock(), grpc.WithUnaryInterceptor( - otgrpc.OpenTracingClientInterceptor(tracer)), + grpc_opentracing.UnaryClientInterceptor(opts...)), grpc.WithStreamInterceptor( - otgrpc.OpenTracingStreamClientInterceptor(tracer))) + grpc_opentracing.StreamClientInterceptor(opts...))) if err != nil { return err } diff --git a/internal/distributed/proxynode/service.go b/internal/distributed/proxynode/service.go index 93abb7528..1b5a62410 100644 --- a/internal/distributed/proxynode/service.go +++ b/internal/distributed/proxynode/service.go @@ -28,8 +28,8 @@ import ( grpcindexserviceclient "github.com/milvus-io/milvus/internal/distributed/indexservice/client" grpcmasterserviceclient "github.com/milvus-io/milvus/internal/distributed/masterservice/client" grpcqueryserviceclient "github.com/milvus-io/milvus/internal/distributed/queryservice/client" - otgrpc "github.com/opentracing-contrib/go-grpc" + grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing" "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/msgstream" "github.com/milvus-io/milvus/internal/proto/commonpb" @@ -93,15 +93,15 @@ func (s *Server) startGrpcLoop(grpcPort int) { ctx, cancel := context.WithCancel(s.ctx) defer cancel() - tracer := opentracing.GlobalTracer() + opts := trace.GetInterceptorOpts() s.grpcServer = grpc.NewServer( grpc.MaxRecvMsgSize(math.MaxInt32), grpc.MaxSendMsgSize(math.MaxInt32), + grpc.MaxRecvMsgSize(GRPCMaxMagSize), grpc.UnaryInterceptor( - otgrpc.OpenTracingServerInterceptor(tracer)), + grpc_opentracing.UnaryServerInterceptor(opts...)), grpc.StreamInterceptor( - otgrpc.OpenTracingStreamServerInterceptor(tracer)), - grpc.MaxRecvMsgSize(GRPCMaxMagSize)) + grpc_opentracing.StreamServerInterceptor(opts...))) proxypb.RegisterProxyNodeServiceServer(s.grpcServer, s) milvuspb.RegisterMilvusServiceServer(s.grpcServer, s) diff --git a/internal/distributed/querynode/client/client.go b/internal/distributed/querynode/client/client.go index 5f1b584bc..d1ac09c20 100644 --- a/internal/distributed/querynode/client/client.go +++ b/internal/distributed/querynode/client/client.go @@ -18,14 +18,14 @@ import ( "google.golang.org/grpc" + grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing" "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/proto/commonpb" "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/proto/milvuspb" "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/util/retry" - otgrpc "github.com/opentracing-contrib/go-grpc" - "github.com/opentracing/opentracing-go" + "github.com/milvus-io/milvus/internal/util/trace" "go.uber.org/zap" ) @@ -64,17 +64,16 @@ func (c *Client) Init() error { } func (c *Client) connect() error { - tracer := opentracing.GlobalTracer() - var err error connectGrpcFunc := func() error { ctx, cancelFunc := context.WithTimeout(c.ctx, c.timeout) defer cancelFunc() + opts := trace.GetInterceptorOpts() log.Debug("QueryNodeClient try connect ", zap.String("address", c.addr)) conn, err := grpc.DialContext(ctx, c.addr, grpc.WithInsecure(), grpc.WithBlock(), grpc.WithUnaryInterceptor( - otgrpc.OpenTracingClientInterceptor(tracer)), + grpc_opentracing.UnaryClientInterceptor(opts...)), grpc.WithStreamInterceptor( - otgrpc.OpenTracingStreamClientInterceptor(tracer))) + grpc_opentracing.StreamClientInterceptor(opts...))) if err != nil { return err } @@ -82,7 +81,7 @@ func (c *Client) connect() error { return nil } - err = retry.Retry(c.reconnTry, 500*time.Millisecond, connectGrpcFunc) + err := retry.Retry(c.reconnTry, 500*time.Millisecond, connectGrpcFunc) if err != nil { log.Debug("QueryNodeClient try connect failed", zap.Error(err)) return err diff --git a/internal/distributed/querynode/service.go b/internal/distributed/querynode/service.go index 900a3405c..407968e32 100644 --- a/internal/distributed/querynode/service.go +++ b/internal/distributed/querynode/service.go @@ -25,11 +25,10 @@ import ( "github.com/milvus-io/milvus/internal/types" - otgrpc "github.com/opentracing-contrib/go-grpc" - "github.com/opentracing/opentracing-go" "go.uber.org/zap" "google.golang.org/grpc" + grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing" dsc "github.com/milvus-io/milvus/internal/distributed/dataservice/client" isc "github.com/milvus-io/milvus/internal/distributed/indexservice/client" msc "github.com/milvus-io/milvus/internal/distributed/masterservice/client" @@ -248,14 +247,14 @@ func (s *Server) startGrpcLoop(grpcPort int) { return } - tracer := opentracing.GlobalTracer() + opts := trace.GetInterceptorOpts() s.grpcServer = grpc.NewServer( grpc.MaxRecvMsgSize(math.MaxInt32), grpc.MaxSendMsgSize(math.MaxInt32), grpc.UnaryInterceptor( - otgrpc.OpenTracingServerInterceptor(tracer)), + grpc_opentracing.UnaryServerInterceptor(opts...)), grpc.StreamInterceptor( - otgrpc.OpenTracingStreamServerInterceptor(tracer))) + grpc_opentracing.StreamServerInterceptor(opts...))) querypb.RegisterQueryNodeServer(s.grpcServer, s) ctx, cancel := context.WithCancel(s.ctx) diff --git a/internal/distributed/queryservice/client/client.go b/internal/distributed/queryservice/client/client.go index 6d5fdff6c..3826a97da 100644 --- a/internal/distributed/queryservice/client/client.go +++ b/internal/distributed/queryservice/client/client.go @@ -16,11 +16,11 @@ import ( "fmt" "time" + grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing" "github.com/milvus-io/milvus/internal/util/retry" "github.com/milvus-io/milvus/internal/util/sessionutil" + "github.com/milvus-io/milvus/internal/util/trace" "github.com/milvus-io/milvus/internal/util/typeutil" - otgrpc "github.com/opentracing-contrib/go-grpc" - "github.com/opentracing/opentracing-go" "go.uber.org/zap" "google.golang.org/grpc" @@ -83,7 +83,6 @@ func (c *Client) Init() error { } func (c *Client) connect() error { - tracer := opentracing.GlobalTracer() var err error getQueryServiceAddressFn := func() error { c.addr, err = getQueryServiceAddress(c.sess) @@ -100,12 +99,13 @@ func (c *Client) connect() error { connectGrpcFunc := func() error { ctx, cancelFunc := context.WithTimeout(c.ctx, c.timeout) defer cancelFunc() + opts := trace.GetInterceptorOpts() log.Debug("QueryServiceClient try reconnect ", zap.String("address", c.addr)) conn, err := grpc.DialContext(ctx, c.addr, grpc.WithInsecure(), grpc.WithBlock(), grpc.WithUnaryInterceptor( - otgrpc.OpenTracingClientInterceptor(tracer)), + grpc_opentracing.UnaryClientInterceptor(opts...)), grpc.WithStreamInterceptor( - otgrpc.OpenTracingStreamClientInterceptor(tracer))) + grpc_opentracing.StreamClientInterceptor(opts...))) if err != nil { return err } diff --git a/internal/distributed/queryservice/service.go b/internal/distributed/queryservice/service.go index 355101a24..56efe1164 100644 --- a/internal/distributed/queryservice/service.go +++ b/internal/distributed/queryservice/service.go @@ -20,6 +20,7 @@ import ( "sync" "time" + grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing" dsc "github.com/milvus-io/milvus/internal/distributed/dataservice/client" msc "github.com/milvus-io/milvus/internal/distributed/masterservice/client" "github.com/milvus-io/milvus/internal/log" @@ -28,8 +29,6 @@ import ( "github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/internal/util/funcutil" "github.com/milvus-io/milvus/internal/util/trace" - otgrpc "github.com/opentracing-contrib/go-grpc" - "github.com/opentracing/opentracing-go" "go.uber.org/zap" "google.golang.org/grpc" @@ -182,14 +181,14 @@ func (s *Server) startGrpcLoop(grpcPort int) { ctx, cancel := context.WithCancel(s.loopCtx) defer cancel() - tracer := opentracing.GlobalTracer() + opts := trace.GetInterceptorOpts() s.grpcServer = grpc.NewServer( grpc.MaxRecvMsgSize(math.MaxInt32), grpc.MaxSendMsgSize(math.MaxInt32), grpc.UnaryInterceptor( - otgrpc.OpenTracingServerInterceptor(tracer)), + grpc_opentracing.UnaryServerInterceptor(opts...)), grpc.StreamInterceptor( - otgrpc.OpenTracingStreamServerInterceptor(tracer))) + grpc_opentracing.StreamServerInterceptor(opts...))) querypb.RegisterQueryServiceServer(s.grpcServer, s) go funcutil.CheckGrpcReady(ctx, s.grpcErrChan) diff --git a/internal/util/trace/interceptor_suite.go b/internal/util/trace/interceptor_suite.go new file mode 100644 index 000000000..26bdb8a79 --- /dev/null +++ b/internal/util/trace/interceptor_suite.go @@ -0,0 +1,44 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +package trace + +import ( + "context" + + grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing" + "github.com/opentracing/opentracing-go" + "google.golang.org/grpc" +) + +type InterceptorSuite struct { + ClientOpts []grpc.DialOption + ServerOpts []grpc.ServerOption +} + +var ( + filterFunc = func(ctx context.Context, fullMethodName string) bool { + if fullMethodName == `/milvus.proto.master.MasterService/UpdateChannelTimeTick` || + fullMethodName == `/milvus.proto.master.MasterService/AllocTimestamp` { + return false + } + return true + } +) + +func GetInterceptorOpts() []grpc_opentracing.Option { + tracer := opentracing.GlobalTracer() + opts := []grpc_opentracing.Option{ + grpc_opentracing.WithTracer(tracer), + grpc_opentracing.WithFilterFunc(filterFunc), + } + return opts +} -- GitLab