diff --git a/go.mod b/go.mod index 5b0988e4611ec812ec935e64882bdbeede655fe2..351747220dbf9292ccc05b36361a9cf742b9e23e 100644 --- a/go.mod +++ b/go.mod @@ -18,12 +18,12 @@ require ( github.com/golang/mock v1.3.1 github.com/golang/protobuf v1.4.3 github.com/google/btree v1.0.0 + github.com/grpc-ecosystem/go-grpc-middleware v1.0.0 github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 github.com/jarcoal/httpmock v1.0.8 github.com/klauspost/compress v1.10.11 // indirect github.com/minio/minio-go/v7 v7.0.10 github.com/mitchellh/mapstructure v1.1.2 - github.com/opentracing-contrib/go-grpc v0.0.0-20200813121455-4a6760c71486 github.com/opentracing/opentracing-go v1.2.0 github.com/pierrec/lz4 v2.5.2+incompatible // indirect github.com/pkg/errors v0.9.1 diff --git a/go.sum b/go.sum index d85fd9c4b372ab0b80304104a14ce1baae3d7c06..51bedc26c7eedaf31ad3768bb92adc05cdfde58b 100644 --- a/go.sum +++ b/go.sum @@ -277,8 +277,6 @@ github.com/onsi/ginkgo v1.14.0/go.mod h1:iSB4RoI2tjJc9BBv4NKIKWKya62Rps+oPG/Lv9k github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY= github.com/onsi/gomega v1.10.1 h1:o0+MgICZLuZ7xjH7Vx6zS/zcu93/BEp1VwkIW1mEXCE= github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo= -github.com/opentracing-contrib/go-grpc v0.0.0-20200813121455-4a6760c71486 h1:K35HCWaOTJIPW6cDHK4yj3QfRY/NhE0pBbfoc0M2NMQ= -github.com/opentracing-contrib/go-grpc v0.0.0-20200813121455-4a6760c71486/go.mod h1:DYR5Eij8rJl8h7gblRrOZ8g0kW1umSpKqYIBTgeDtLo= github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= github.com/opentracing/opentracing-go v1.2.0 h1:uEJPy/1a5RIPAJ0Ov+OIO8OxWu77jEv+1B0VhjKrZUs= github.com/opentracing/opentracing-go v1.2.0/go.mod h1:GxEUsuufX4nBwe+T+Wl9TAgYrxe9dPLANfrWvHYVTgc= diff --git a/internal/distributed/datanode/client/client.go b/internal/distributed/datanode/client/client.go index 085e84d5661bbdb1463d258048f2988aa3361a4b..766de5868171ea7132a4232e8a98b0e7f65d8ff6 100644 --- a/internal/distributed/datanode/client/client.go +++ b/internal/distributed/datanode/client/client.go @@ -18,13 +18,13 @@ import ( "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/util/retry" - otgrpc "github.com/opentracing-contrib/go-grpc" - "github.com/opentracing/opentracing-go" + grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing" "github.com/milvus-io/milvus/internal/proto/commonpb" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/proto/milvuspb" + "github.com/milvus-io/milvus/internal/util/trace" "go.uber.org/zap" "google.golang.org/grpc" @@ -69,17 +69,16 @@ func (c *Client) Init() error { } func (c *Client) connect() error { - tracer := opentracing.GlobalTracer() - var err error connectGrpcFunc := func() error { - log.Debug("DataNode connect ", zap.String("address", c.addr)) ctx, cancelFunc := context.WithTimeout(c.ctx, c.timeout) defer cancelFunc() + opts := trace.GetInterceptorOpts() + log.Debug("DataNode connect ", zap.String("address", c.addr)) conn, err := grpc.DialContext(ctx, c.addr, grpc.WithInsecure(), grpc.WithBlock(), grpc.WithUnaryInterceptor( - otgrpc.OpenTracingClientInterceptor(tracer)), + grpc_opentracing.UnaryClientInterceptor(opts...)), grpc.WithStreamInterceptor( - otgrpc.OpenTracingStreamClientInterceptor(tracer))) + grpc_opentracing.StreamClientInterceptor(opts...))) if err != nil { return err } @@ -87,7 +86,7 @@ func (c *Client) connect() error { return nil } - err = retry.Retry(c.reconnTry, 500*time.Millisecond, connectGrpcFunc) + err := retry.Retry(c.reconnTry, 500*time.Millisecond, connectGrpcFunc) if err != nil { log.Debug("DataNodeClient try connect failed", zap.Error(err)) return err diff --git a/internal/distributed/datanode/service.go b/internal/distributed/datanode/service.go index 4b6138e31d2397e60881f5307d28b7a37454ca89..02a37131bb88be79d0c2ef3153366ec449e1f9bf 100644 --- a/internal/distributed/datanode/service.go +++ b/internal/distributed/datanode/service.go @@ -22,8 +22,6 @@ import ( "sync" "time" - otgrpc "github.com/opentracing-contrib/go-grpc" - "github.com/opentracing/opentracing-go" "go.uber.org/zap" "google.golang.org/grpc" @@ -31,6 +29,7 @@ import ( dsc "github.com/milvus-io/milvus/internal/distributed/dataservice/client" msc "github.com/milvus-io/milvus/internal/distributed/masterservice/client" + grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing" "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/msgstream" "github.com/milvus-io/milvus/internal/proto/commonpb" @@ -93,14 +92,14 @@ func (s *Server) startGrpc() error { func (s *Server) startGrpcLoop(listener net.Listener) { defer s.wg.Done() - tracer := opentracing.GlobalTracer() + opts := trace.GetInterceptorOpts() s.grpcServer = grpc.NewServer( grpc.MaxRecvMsgSize(math.MaxInt32), grpc.MaxSendMsgSize(math.MaxInt32), grpc.UnaryInterceptor( - otgrpc.OpenTracingServerInterceptor(tracer)), + grpc_opentracing.UnaryServerInterceptor(opts...)), grpc.StreamInterceptor( - otgrpc.OpenTracingStreamServerInterceptor(tracer))) + grpc_opentracing.StreamServerInterceptor(opts...))) datapb.RegisterDataNodeServer(s.grpcServer, s) ctx, cancel := context.WithCancel(s.ctx) diff --git a/internal/distributed/dataservice/client/client.go b/internal/distributed/dataservice/client/client.go index 608d2e3b2bf503fefb9920031a669ae80a0ff31a..237827b224726295c3a474d3e5459ab04500dc6a 100644 --- a/internal/distributed/dataservice/client/client.go +++ b/internal/distributed/dataservice/client/client.go @@ -16,13 +16,13 @@ import ( "fmt" "time" + grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing" "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/proto/milvuspb" "github.com/milvus-io/milvus/internal/util/retry" "github.com/milvus-io/milvus/internal/util/sessionutil" + "github.com/milvus-io/milvus/internal/util/trace" "github.com/milvus-io/milvus/internal/util/typeutil" - otgrpc "github.com/opentracing-contrib/go-grpc" - "github.com/opentracing/opentracing-go" "go.uber.org/zap" "google.golang.org/grpc" @@ -80,7 +80,6 @@ func (c *Client) Init() error { } func (c *Client) connect() error { - tracer := opentracing.GlobalTracer() var err error getDataServiceAddressFn := func() error { c.addr, err = getDataServiceAddress(c.sess) @@ -97,12 +96,13 @@ func (c *Client) connect() error { connectGrpcFunc := func() error { ctx, cancelFunc := context.WithTimeout(c.ctx, c.timeout) defer cancelFunc() + opts := trace.GetInterceptorOpts() log.Debug("DataServiceClient try reconnect ", zap.String("address", c.addr)) conn, err := grpc.DialContext(ctx, c.addr, grpc.WithInsecure(), grpc.WithBlock(), grpc.WithUnaryInterceptor( - otgrpc.OpenTracingClientInterceptor(tracer)), + grpc_opentracing.UnaryClientInterceptor(opts...)), grpc.WithStreamInterceptor( - otgrpc.OpenTracingStreamClientInterceptor(tracer))) + grpc_opentracing.StreamClientInterceptor(opts...))) if err != nil { return err } diff --git a/internal/distributed/dataservice/service.go b/internal/distributed/dataservice/service.go index f91355c98fd9ca4a3e6751502b5feab1f91f62bd..ee9ee7d8a6a42a86c1d46812e718c2b19ac08e27 100644 --- a/internal/distributed/dataservice/service.go +++ b/internal/distributed/dataservice/service.go @@ -25,6 +25,7 @@ import ( "google.golang.org/grpc" + grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing" grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" "github.com/milvus-io/milvus/internal/dataservice" "github.com/milvus-io/milvus/internal/log" @@ -32,8 +33,6 @@ import ( "github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/internal/util/funcutil" "github.com/milvus-io/milvus/internal/util/trace" - otgrpc "github.com/opentracing-contrib/go-grpc" - "github.com/opentracing/opentracing-go" "github.com/milvus-io/milvus/internal/proto/commonpb" "github.com/milvus-io/milvus/internal/proto/datapb" @@ -126,14 +125,14 @@ func (s *Server) startGrpcLoop(grpcPort int) { ctx, cancel := context.WithCancel(s.ctx) defer cancel() - tracer := opentracing.GlobalTracer() + opts := trace.GetInterceptorOpts() s.grpcServer = grpc.NewServer( grpc.MaxRecvMsgSize(math.MaxInt32), grpc.MaxSendMsgSize(math.MaxInt32), grpc.UnaryInterceptor( - otgrpc.OpenTracingServerInterceptor(tracer)), + grpc_opentracing.UnaryServerInterceptor(opts...)), grpc.StreamInterceptor( - otgrpc.OpenTracingStreamServerInterceptor(tracer))) + grpc_opentracing.StreamServerInterceptor(opts...))) //grpc.UnaryInterceptor(grpc_prometheus.UnaryServerInterceptor)) datapb.RegisterDataServiceServer(s.grpcServer, s) grpc_prometheus.Register(s.grpcServer) diff --git a/internal/distributed/indexnode/client/client.go b/internal/distributed/indexnode/client/client.go index 40ca0a23b8ae49ebbc5df94d5f6b866166bce186..7610907e5e8f37649bd78cae86423d4402b81253 100644 --- a/internal/distributed/indexnode/client/client.go +++ b/internal/distributed/indexnode/client/client.go @@ -16,11 +16,11 @@ import ( "fmt" "time" + grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing" "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/proto/milvuspb" "github.com/milvus-io/milvus/internal/util/retry" - otgrpc "github.com/opentracing-contrib/go-grpc" - "github.com/opentracing/opentracing-go" + "github.com/milvus-io/milvus/internal/util/trace" "go.uber.org/zap" "google.golang.org/grpc" @@ -64,17 +64,16 @@ func (c *Client) Init() error { } func (c *Client) connect() error { - tracer := opentracing.GlobalTracer() - var err error connectGrpcFunc := func() error { ctx, cancelFunc := context.WithTimeout(c.ctx, c.timeout) defer cancelFunc() + opts := trace.GetInterceptorOpts() log.Debug("IndexNodeClient try connect ", zap.String("address", c.addr)) conn, err := grpc.DialContext(ctx, c.addr, grpc.WithInsecure(), grpc.WithBlock(), grpc.WithUnaryInterceptor( - otgrpc.OpenTracingClientInterceptor(tracer)), + grpc_opentracing.UnaryClientInterceptor(opts...)), grpc.WithStreamInterceptor( - otgrpc.OpenTracingStreamClientInterceptor(tracer))) + grpc_opentracing.StreamClientInterceptor(opts...))) if err != nil { return err } @@ -82,7 +81,7 @@ func (c *Client) connect() error { return nil } - err = retry.Retry(c.reconnTry, 500*time.Millisecond, connectGrpcFunc) + err := retry.Retry(c.reconnTry, 500*time.Millisecond, connectGrpcFunc) if err != nil { log.Debug("IndexNodeClient try connect failed", zap.Error(err)) return err diff --git a/internal/distributed/indexnode/service.go b/internal/distributed/indexnode/service.go index 90448a397f46330e47b54e127ca30d8a49144bf1..95a96c759d3992eee7498e4c8aeb57d85d33f85f 100644 --- a/internal/distributed/indexnode/service.go +++ b/internal/distributed/indexnode/service.go @@ -23,6 +23,7 @@ import ( "go.uber.org/zap" + grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing" grpcindexserviceclient "github.com/milvus-io/milvus/internal/distributed/indexservice/client" "github.com/milvus-io/milvus/internal/indexnode" "github.com/milvus-io/milvus/internal/log" @@ -33,8 +34,6 @@ import ( "github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/internal/util/funcutil" "github.com/milvus-io/milvus/internal/util/trace" - otgrpc "github.com/opentracing-contrib/go-grpc" - "github.com/opentracing/opentracing-go" "google.golang.org/grpc" ) @@ -79,14 +78,14 @@ func (s *Server) startGrpcLoop(grpcPort int) { ctx, cancel := context.WithCancel(s.loopCtx) defer cancel() - tracer := opentracing.GlobalTracer() + opts := trace.GetInterceptorOpts() s.grpcServer = grpc.NewServer( grpc.MaxRecvMsgSize(math.MaxInt32), grpc.MaxSendMsgSize(math.MaxInt32), grpc.UnaryInterceptor( - otgrpc.OpenTracingServerInterceptor(tracer)), + grpc_opentracing.UnaryServerInterceptor(opts...)), grpc.StreamInterceptor( - otgrpc.OpenTracingStreamServerInterceptor(tracer))) + grpc_opentracing.StreamServerInterceptor(opts...))) indexpb.RegisterIndexNodeServer(s.grpcServer, s) go funcutil.CheckGrpcReady(ctx, s.grpcErrChan) if err := s.grpcServer.Serve(lis); err != nil { diff --git a/internal/distributed/indexservice/client/client.go b/internal/distributed/indexservice/client/client.go index 149ba2a4360c753a22855cb9a8bfb1a6c6cba50c..978395698b0ce37544b16f471e7bc44879659707 100644 --- a/internal/distributed/indexservice/client/client.go +++ b/internal/distributed/indexservice/client/client.go @@ -18,12 +18,12 @@ import ( "google.golang.org/grpc" + grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing" "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/util/retry" "github.com/milvus-io/milvus/internal/util/sessionutil" + "github.com/milvus-io/milvus/internal/util/trace" "github.com/milvus-io/milvus/internal/util/typeutil" - otgrpc "github.com/opentracing-contrib/go-grpc" - "github.com/opentracing/opentracing-go" "go.uber.org/zap" "github.com/milvus-io/milvus/internal/proto/commonpb" @@ -84,7 +84,6 @@ func (c *Client) Init() error { } func (c *Client) connect() error { - tracer := opentracing.GlobalTracer() var err error getIndexServiceaddrFn := func() error { c.addr, err = getIndexServiceaddr(c.sess) @@ -102,12 +101,13 @@ func (c *Client) connect() error { connectGrpcFunc := func() error { ctx, cancelFunc := context.WithTimeout(c.ctx, c.timeout) defer cancelFunc() + opts := trace.GetInterceptorOpts() log.Debug("IndexServiceClient try connect ", zap.String("address", c.addr)) conn, err := grpc.DialContext(ctx, c.addr, grpc.WithInsecure(), grpc.WithBlock(), grpc.WithUnaryInterceptor( - otgrpc.OpenTracingClientInterceptor(tracer)), + grpc_opentracing.UnaryClientInterceptor(opts...)), grpc.WithStreamInterceptor( - otgrpc.OpenTracingStreamClientInterceptor(tracer))) + grpc_opentracing.StreamClientInterceptor(opts...))) if err != nil { return err } diff --git a/internal/distributed/indexservice/service.go b/internal/distributed/indexservice/service.go index d4037dcdabc6500d4eee1ba431cf4725d61d0d3c..e93d6cf25ba4d0cba644f4f027e93b9edac031ec 100644 --- a/internal/distributed/indexservice/service.go +++ b/internal/distributed/indexservice/service.go @@ -21,6 +21,7 @@ import ( "go.uber.org/zap" + grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing" "github.com/milvus-io/milvus/internal/indexservice" "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/proto/commonpb" @@ -30,8 +31,6 @@ import ( "github.com/milvus-io/milvus/internal/util/funcutil" "github.com/milvus-io/milvus/internal/util/trace" "github.com/milvus-io/milvus/internal/util/typeutil" - otgrpc "github.com/opentracing-contrib/go-grpc" - "github.com/opentracing/opentracing-go" "google.golang.org/grpc" ) @@ -164,14 +163,14 @@ func (s *Server) startGrpcLoop(grpcPort int) { ctx, cancel := context.WithCancel(s.loopCtx) defer cancel() - tracer := opentracing.GlobalTracer() + opts := trace.GetInterceptorOpts() s.grpcServer = grpc.NewServer( grpc.MaxRecvMsgSize(math.MaxInt32), grpc.MaxSendMsgSize(math.MaxInt32), grpc.UnaryInterceptor( - otgrpc.OpenTracingServerInterceptor(tracer)), + grpc_opentracing.UnaryServerInterceptor(opts...)), grpc.StreamInterceptor( - otgrpc.OpenTracingStreamServerInterceptor(tracer))) + grpc_opentracing.StreamServerInterceptor(opts...))) indexpb.RegisterIndexServiceServer(s.grpcServer, s) go funcutil.CheckGrpcReady(ctx, s.grpcErrChan) diff --git a/internal/distributed/masterservice/client/client.go b/internal/distributed/masterservice/client/client.go index 7a305e6ea3cb3268a27fd78367d1f3d4e82948b4..bab3b6d20b063b854606e0ec5c715bb03850f269 100644 --- a/internal/distributed/masterservice/client/client.go +++ b/internal/distributed/masterservice/client/client.go @@ -17,6 +17,7 @@ import ( "fmt" "time" + grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing" "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/proto/commonpb" "github.com/milvus-io/milvus/internal/proto/internalpb" @@ -24,9 +25,8 @@ import ( "github.com/milvus-io/milvus/internal/proto/milvuspb" "github.com/milvus-io/milvus/internal/util/retry" "github.com/milvus-io/milvus/internal/util/sessionutil" + "github.com/milvus-io/milvus/internal/util/trace" "github.com/milvus-io/milvus/internal/util/typeutil" - otgrpc "github.com/opentracing-contrib/go-grpc" - "github.com/opentracing/opentracing-go" "go.uber.org/zap" "google.golang.org/grpc" ) @@ -87,7 +87,6 @@ func NewClient(ctx context.Context, metaRoot string, etcdEndpoints []string, tim } func (c *GrpcClient) connect() error { - tracer := opentracing.GlobalTracer() var err error getMasterServiceAddrFn := func() error { ch := make(chan struct{}, 1) @@ -118,12 +117,13 @@ func (c *GrpcClient) connect() error { var conn *grpc.ClientConn var err error ch := make(chan struct{}, 1) + opts := trace.GetInterceptorOpts() go func() { conn, err = grpc.DialContext(ctx, c.addr, grpc.WithInsecure(), grpc.WithBlock(), grpc.WithUnaryInterceptor( - otgrpc.OpenTracingClientInterceptor(tracer)), + grpc_opentracing.UnaryClientInterceptor(opts...)), grpc.WithStreamInterceptor( - otgrpc.OpenTracingStreamClientInterceptor(tracer))) + grpc_opentracing.StreamClientInterceptor(opts...))) ch <- struct{}{} }() select { diff --git a/internal/distributed/masterservice/server.go b/internal/distributed/masterservice/server.go index edbbc6a7d5d1333dfab6b0596b78e784829d447d..b1a7e82c44421f5ae68aafcca423560a2393a7d7 100644 --- a/internal/distributed/masterservice/server.go +++ b/internal/distributed/masterservice/server.go @@ -20,8 +20,6 @@ import ( "sync" "time" - otgrpc "github.com/opentracing-contrib/go-grpc" - "github.com/opentracing/opentracing-go" "go.uber.org/zap" "google.golang.org/grpc" @@ -35,6 +33,7 @@ import ( "github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/internal/util/trace" + grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing" "github.com/milvus-io/milvus/internal/proto/commonpb" "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/proto/masterpb" @@ -223,14 +222,14 @@ func (s *Server) startGrpcLoop(grpcPort int) { ctx, cancel := context.WithCancel(s.ctx) defer cancel() - tracer := opentracing.GlobalTracer() + opts := trace.GetInterceptorOpts() s.grpcServer = grpc.NewServer( grpc.MaxRecvMsgSize(math.MaxInt32), grpc.MaxSendMsgSize(math.MaxInt32), grpc.UnaryInterceptor( - otgrpc.OpenTracingServerInterceptor(tracer)), + grpc_opentracing.UnaryServerInterceptor(opts...)), grpc.StreamInterceptor( - otgrpc.OpenTracingStreamServerInterceptor(tracer))) + grpc_opentracing.StreamServerInterceptor(opts...))) masterpb.RegisterMasterServiceServer(s.grpcServer, s) go funcutil.CheckGrpcReady(ctx, s.grpcErrChan) diff --git a/internal/distributed/proxynode/client/client.go b/internal/distributed/proxynode/client/client.go index d13dfcc803ee7144d860d51183269589f275e3da..da79f24a064719c1040266025611596ba9944e1f 100644 --- a/internal/distributed/proxynode/client/client.go +++ b/internal/distributed/proxynode/client/client.go @@ -15,14 +15,14 @@ import ( "context" "time" + grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing" "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/proto/commonpb" "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/proto/milvuspb" "github.com/milvus-io/milvus/internal/proto/proxypb" "github.com/milvus-io/milvus/internal/util/retry" - otgrpc "github.com/opentracing-contrib/go-grpc" - "github.com/opentracing/opentracing-go" + "github.com/milvus-io/milvus/internal/util/trace" "go.uber.org/zap" "google.golang.org/grpc" ) @@ -58,16 +58,16 @@ func (c *Client) Init() error { } func (c *Client) connect() error { - tracer := opentracing.GlobalTracer() connectGrpcFunc := func() error { ctx, cancelFunc := context.WithTimeout(c.ctx, c.timeout) defer cancelFunc() + opts := trace.GetInterceptorOpts() log.Debug("ProxyNodeClient try connect ", zap.String("address", c.addr)) conn, err := grpc.DialContext(ctx, c.addr, grpc.WithInsecure(), grpc.WithBlock(), grpc.WithUnaryInterceptor( - otgrpc.OpenTracingClientInterceptor(tracer)), + grpc_opentracing.UnaryClientInterceptor(opts...)), grpc.WithStreamInterceptor( - otgrpc.OpenTracingStreamClientInterceptor(tracer))) + grpc_opentracing.StreamClientInterceptor(opts...))) if err != nil { return err } diff --git a/internal/distributed/proxynode/service.go b/internal/distributed/proxynode/service.go index 93abb752826d8677a1c69cef3178154e80268321..1b5a624101a4c18b959e45c361cedc5e69e0d26a 100644 --- a/internal/distributed/proxynode/service.go +++ b/internal/distributed/proxynode/service.go @@ -28,8 +28,8 @@ import ( grpcindexserviceclient "github.com/milvus-io/milvus/internal/distributed/indexservice/client" grpcmasterserviceclient "github.com/milvus-io/milvus/internal/distributed/masterservice/client" grpcqueryserviceclient "github.com/milvus-io/milvus/internal/distributed/queryservice/client" - otgrpc "github.com/opentracing-contrib/go-grpc" + grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing" "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/msgstream" "github.com/milvus-io/milvus/internal/proto/commonpb" @@ -93,15 +93,15 @@ func (s *Server) startGrpcLoop(grpcPort int) { ctx, cancel := context.WithCancel(s.ctx) defer cancel() - tracer := opentracing.GlobalTracer() + opts := trace.GetInterceptorOpts() s.grpcServer = grpc.NewServer( grpc.MaxRecvMsgSize(math.MaxInt32), grpc.MaxSendMsgSize(math.MaxInt32), + grpc.MaxRecvMsgSize(GRPCMaxMagSize), grpc.UnaryInterceptor( - otgrpc.OpenTracingServerInterceptor(tracer)), + grpc_opentracing.UnaryServerInterceptor(opts...)), grpc.StreamInterceptor( - otgrpc.OpenTracingStreamServerInterceptor(tracer)), - grpc.MaxRecvMsgSize(GRPCMaxMagSize)) + grpc_opentracing.StreamServerInterceptor(opts...))) proxypb.RegisterProxyNodeServiceServer(s.grpcServer, s) milvuspb.RegisterMilvusServiceServer(s.grpcServer, s) diff --git a/internal/distributed/querynode/client/client.go b/internal/distributed/querynode/client/client.go index 5f1b584bcab805d03dd7eb7493f8e62c7068a6c0..d1ac09c20ec1f7a5e33b7a74532bea11786e11bf 100644 --- a/internal/distributed/querynode/client/client.go +++ b/internal/distributed/querynode/client/client.go @@ -18,14 +18,14 @@ import ( "google.golang.org/grpc" + grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing" "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/proto/commonpb" "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/proto/milvuspb" "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/util/retry" - otgrpc "github.com/opentracing-contrib/go-grpc" - "github.com/opentracing/opentracing-go" + "github.com/milvus-io/milvus/internal/util/trace" "go.uber.org/zap" ) @@ -64,17 +64,16 @@ func (c *Client) Init() error { } func (c *Client) connect() error { - tracer := opentracing.GlobalTracer() - var err error connectGrpcFunc := func() error { ctx, cancelFunc := context.WithTimeout(c.ctx, c.timeout) defer cancelFunc() + opts := trace.GetInterceptorOpts() log.Debug("QueryNodeClient try connect ", zap.String("address", c.addr)) conn, err := grpc.DialContext(ctx, c.addr, grpc.WithInsecure(), grpc.WithBlock(), grpc.WithUnaryInterceptor( - otgrpc.OpenTracingClientInterceptor(tracer)), + grpc_opentracing.UnaryClientInterceptor(opts...)), grpc.WithStreamInterceptor( - otgrpc.OpenTracingStreamClientInterceptor(tracer))) + grpc_opentracing.StreamClientInterceptor(opts...))) if err != nil { return err } @@ -82,7 +81,7 @@ func (c *Client) connect() error { return nil } - err = retry.Retry(c.reconnTry, 500*time.Millisecond, connectGrpcFunc) + err := retry.Retry(c.reconnTry, 500*time.Millisecond, connectGrpcFunc) if err != nil { log.Debug("QueryNodeClient try connect failed", zap.Error(err)) return err diff --git a/internal/distributed/querynode/service.go b/internal/distributed/querynode/service.go index 900a3405c2f546d8ed94d4d41659cdecd2a41c04..407968e3293eaba4b21ae9b53e601219c88078a9 100644 --- a/internal/distributed/querynode/service.go +++ b/internal/distributed/querynode/service.go @@ -25,11 +25,10 @@ import ( "github.com/milvus-io/milvus/internal/types" - otgrpc "github.com/opentracing-contrib/go-grpc" - "github.com/opentracing/opentracing-go" "go.uber.org/zap" "google.golang.org/grpc" + grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing" dsc "github.com/milvus-io/milvus/internal/distributed/dataservice/client" isc "github.com/milvus-io/milvus/internal/distributed/indexservice/client" msc "github.com/milvus-io/milvus/internal/distributed/masterservice/client" @@ -248,14 +247,14 @@ func (s *Server) startGrpcLoop(grpcPort int) { return } - tracer := opentracing.GlobalTracer() + opts := trace.GetInterceptorOpts() s.grpcServer = grpc.NewServer( grpc.MaxRecvMsgSize(math.MaxInt32), grpc.MaxSendMsgSize(math.MaxInt32), grpc.UnaryInterceptor( - otgrpc.OpenTracingServerInterceptor(tracer)), + grpc_opentracing.UnaryServerInterceptor(opts...)), grpc.StreamInterceptor( - otgrpc.OpenTracingStreamServerInterceptor(tracer))) + grpc_opentracing.StreamServerInterceptor(opts...))) querypb.RegisterQueryNodeServer(s.grpcServer, s) ctx, cancel := context.WithCancel(s.ctx) diff --git a/internal/distributed/queryservice/client/client.go b/internal/distributed/queryservice/client/client.go index 6d5fdff6cf8e483d5bdd0049585755bf6babd920..3826a97da42f4083315c46d8349d629ece2ca97b 100644 --- a/internal/distributed/queryservice/client/client.go +++ b/internal/distributed/queryservice/client/client.go @@ -16,11 +16,11 @@ import ( "fmt" "time" + grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing" "github.com/milvus-io/milvus/internal/util/retry" "github.com/milvus-io/milvus/internal/util/sessionutil" + "github.com/milvus-io/milvus/internal/util/trace" "github.com/milvus-io/milvus/internal/util/typeutil" - otgrpc "github.com/opentracing-contrib/go-grpc" - "github.com/opentracing/opentracing-go" "go.uber.org/zap" "google.golang.org/grpc" @@ -83,7 +83,6 @@ func (c *Client) Init() error { } func (c *Client) connect() error { - tracer := opentracing.GlobalTracer() var err error getQueryServiceAddressFn := func() error { c.addr, err = getQueryServiceAddress(c.sess) @@ -100,12 +99,13 @@ func (c *Client) connect() error { connectGrpcFunc := func() error { ctx, cancelFunc := context.WithTimeout(c.ctx, c.timeout) defer cancelFunc() + opts := trace.GetInterceptorOpts() log.Debug("QueryServiceClient try reconnect ", zap.String("address", c.addr)) conn, err := grpc.DialContext(ctx, c.addr, grpc.WithInsecure(), grpc.WithBlock(), grpc.WithUnaryInterceptor( - otgrpc.OpenTracingClientInterceptor(tracer)), + grpc_opentracing.UnaryClientInterceptor(opts...)), grpc.WithStreamInterceptor( - otgrpc.OpenTracingStreamClientInterceptor(tracer))) + grpc_opentracing.StreamClientInterceptor(opts...))) if err != nil { return err } diff --git a/internal/distributed/queryservice/service.go b/internal/distributed/queryservice/service.go index 355101a24c5996a4872456d9056dad1b6f9b8145..56efe1164099d39c5ee8fbe346dbc841b4277660 100644 --- a/internal/distributed/queryservice/service.go +++ b/internal/distributed/queryservice/service.go @@ -20,6 +20,7 @@ import ( "sync" "time" + grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing" dsc "github.com/milvus-io/milvus/internal/distributed/dataservice/client" msc "github.com/milvus-io/milvus/internal/distributed/masterservice/client" "github.com/milvus-io/milvus/internal/log" @@ -28,8 +29,6 @@ import ( "github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/internal/util/funcutil" "github.com/milvus-io/milvus/internal/util/trace" - otgrpc "github.com/opentracing-contrib/go-grpc" - "github.com/opentracing/opentracing-go" "go.uber.org/zap" "google.golang.org/grpc" @@ -182,14 +181,14 @@ func (s *Server) startGrpcLoop(grpcPort int) { ctx, cancel := context.WithCancel(s.loopCtx) defer cancel() - tracer := opentracing.GlobalTracer() + opts := trace.GetInterceptorOpts() s.grpcServer = grpc.NewServer( grpc.MaxRecvMsgSize(math.MaxInt32), grpc.MaxSendMsgSize(math.MaxInt32), grpc.UnaryInterceptor( - otgrpc.OpenTracingServerInterceptor(tracer)), + grpc_opentracing.UnaryServerInterceptor(opts...)), grpc.StreamInterceptor( - otgrpc.OpenTracingStreamServerInterceptor(tracer))) + grpc_opentracing.StreamServerInterceptor(opts...))) querypb.RegisterQueryServiceServer(s.grpcServer, s) go funcutil.CheckGrpcReady(ctx, s.grpcErrChan) diff --git a/internal/util/trace/interceptor_suite.go b/internal/util/trace/interceptor_suite.go new file mode 100644 index 0000000000000000000000000000000000000000..26bdb8a79014270b9d178390c6de3eda04227835 --- /dev/null +++ b/internal/util/trace/interceptor_suite.go @@ -0,0 +1,44 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +package trace + +import ( + "context" + + grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing" + "github.com/opentracing/opentracing-go" + "google.golang.org/grpc" +) + +type InterceptorSuite struct { + ClientOpts []grpc.DialOption + ServerOpts []grpc.ServerOption +} + +var ( + filterFunc = func(ctx context.Context, fullMethodName string) bool { + if fullMethodName == `/milvus.proto.master.MasterService/UpdateChannelTimeTick` || + fullMethodName == `/milvus.proto.master.MasterService/AllocTimestamp` { + return false + } + return true + } +) + +func GetInterceptorOpts() []grpc_opentracing.Option { + tracer := opentracing.GlobalTracer() + opts := []grpc_opentracing.Option{ + grpc_opentracing.WithTracer(tracer), + grpc_opentracing.WithFilterFunc(filterFunc), + } + return opts +}