From eca1e2547049be7a788bcce9d0c672f9f69821e0 Mon Sep 17 00:00:00 2001 From: neza2017 <yefu.chen@zilliz.com> Date: Fri, 26 Mar 2021 15:13:33 +0800 Subject: [PATCH] Add msg id at master service Signed-off-by: neza2017 <yefu.chen@zilliz.com> --- .../Regression/PythonRegression.groovy | 8 +- cmd/distributed/roles/roles.go | 10 ++- internal/distributed/datanode/service.go | 14 ++-- internal/distributed/dataservice/service.go | 12 ++- internal/distributed/indexnode/service.go | 6 +- internal/distributed/indexservice/service.go | 12 ++- internal/distributed/masterservice/server.go | 8 +- internal/distributed/proxynode/service.go | 12 +-- internal/distributed/proxyservice/service.go | 12 ++- internal/distributed/querynode/service.go | 14 ++-- internal/distributed/queryservice/service.go | 15 ++-- internal/masterservice/master_service.go | 76 ++++++++++--------- internal/querynode/search_service.go | 4 +- internal/util/trace/util.go | 34 ++++++--- internal/util/trace/util_test.go | 4 +- 15 files changed, 119 insertions(+), 122 deletions(-) diff --git a/.jenkins/modules/Regression/PythonRegression.groovy b/.jenkins/modules/Regression/PythonRegression.groovy index dad8a9bae..3f2f4ed86 100644 --- a/.jenkins/modules/Regression/PythonRegression.groovy +++ b/.jenkins/modules/Regression/PythonRegression.groovy @@ -71,16 +71,16 @@ timeout(time: "${regressionTimeout}", unit: 'MINUTES') { } def etcdLabels = "app.kubernetes.io/instance=${env.HELM_RELEASE_NAME},app.kubernetes.io/name=etcd" def minioLables = "release=${env.HELM_RELEASE_NAME},app=minio" - def pulsarLabels = "app.kubernetes.io/instance=${env.HELM_RELEASE_NAME},component=pulsar" + def pulsarLabels = "release=${env.HELM_RELEASE_NAME},app=pulsar" sh "mkdir -p ${env.DEV_TEST_ARTIFACTS_PATH}" sh "kubectl cp -n ${env.HELM_RELEASE_NAMESPACE} \$(kubectl get pod -n ${env.HELM_RELEASE_NAMESPACE} -l ${milvusLabels} -o jsonpath='{range.items[0]}{.metadata.name}'):logs ${env.DEV_TEST_ARTIFACTS_PATH}" sh "kubectl logs -n ${env.HELM_RELEASE_NAMESPACE} \$(kubectl get pod -n ${env.HELM_RELEASE_NAMESPACE} -l ${etcdLabels} -o jsonpath='{range.items[*]}{.metadata.name} ') > ${env.DEV_TEST_ARTIFACTS_PATH}/etcd-${REGRESSION_SERVICE_TYPE}.log" sh "kubectl logs -n ${env.HELM_RELEASE_NAMESPACE} \$(kubectl get pod -n ${env.HELM_RELEASE_NAMESPACE} -l ${minioLables} -o jsonpath='{range.items[*]}{.metadata.name} ') > ${env.DEV_TEST_ARTIFACTS_PATH}/minio-${REGRESSION_SERVICE_TYPE}.log" - if ("${REGRESSION_SERVICE_TYPE}" == "distributed") { - sh "kubectl logs -n ${env.HELM_RELEASE_NAMESPACE} \$(kubectl get pod -n ${env.HELM_RELEASE_NAMESPACE} -l ${pulsarLabels} -o jsonpath='{range.items[*]}{.metadata.name} ') > ${env.DEV_TEST_ARTIFACTS_PATH}/pulsar-${REGRESSION_SERVICE_TYPE}.log" - } + // if ("${REGRESSION_SERVICE_TYPE}" == "distributed") { + // sh "kubectl logs -n ${env.HELM_RELEASE_NAMESPACE} \$(kubectl get pod -n ${env.HELM_RELEASE_NAMESPACE} -l ${pulsarLabels} -o jsonpath='{range.items[*]}{.metadata.name} ') > ${env.DEV_TEST_ARTIFACTS_PATH}/pulsar-${REGRESSION_SERVICE_TYPE}.log" + // } archiveArtifacts artifacts: "${env.DEV_TEST_ARTIFACTS_PATH}/**", allowEmptyArchive: true } } diff --git a/cmd/distributed/roles/roles.go b/cmd/distributed/roles/roles.go index 590d73b07..4fd01be6f 100644 --- a/cmd/distributed/roles/roles.go +++ b/cmd/distributed/roles/roles.go @@ -8,14 +8,14 @@ import ( "strings" "syscall" - "github.com/zilliztech/milvus-distributed/internal/logutil" - "github.com/zilliztech/milvus-distributed/cmd/distributed/components" ds "github.com/zilliztech/milvus-distributed/internal/dataservice" + "github.com/zilliztech/milvus-distributed/internal/logutil" "github.com/zilliztech/milvus-distributed/internal/msgstream" "github.com/zilliztech/milvus-distributed/internal/msgstream/pulsarms" "github.com/zilliztech/milvus-distributed/internal/msgstream/rmqms" "github.com/zilliztech/milvus-distributed/internal/util/rocksmq/server/rocksmq" + "github.com/zilliztech/milvus-distributed/internal/util/trace" ) func newMsgFactory(localMsg bool) msgstream.Factory { @@ -57,6 +57,12 @@ func (mr *MilvusRoles) EnvValue(env string) bool { } func (mr *MilvusRoles) Run(localMsg bool) { + + closer := trace.InitTracing("singleNode") + if closer != nil { + defer closer.Close() + } + if !mr.HasAnyRole() { return } diff --git a/internal/distributed/datanode/service.go b/internal/distributed/datanode/service.go index 4b9ac5a87..115761dae 100644 --- a/internal/distributed/datanode/service.go +++ b/internal/distributed/datanode/service.go @@ -118,8 +118,10 @@ func (s *Server) Run() error { } func (s *Server) Stop() error { - if err := s.closer.Close(); err != nil { - return err + if s.closer != nil { + if err := s.closer.Close(); err != nil { + return err + } } s.cancel() if s.grpcServer != nil { @@ -149,17 +151,13 @@ func (s *Server) init() error { log.Debug("DataNode port", zap.Int("port", Params.Port)) - tracer, closer, err := trace.InitTracing(fmt.Sprintf("data_node ip: %s, port: %d", Params.IP, Params.Port)) - if err != nil { - log.Error("data_node", zap.String("init trace err", err.Error())) - } - opentracing.SetGlobalTracer(tracer) + closer := trace.InitTracing(fmt.Sprintf("data_node ip: %s, port: %d", Params.IP, Params.Port)) s.closer = closer s.wg.Add(1) go s.startGrpcLoop(Params.Port) // wait for grpc server loop start - err = <-s.grpcErrChan + err := <-s.grpcErrChan if err != nil { return err } diff --git a/internal/distributed/dataservice/service.go b/internal/distributed/dataservice/service.go index d5219c06b..cccf5d6b5 100644 --- a/internal/distributed/dataservice/service.go +++ b/internal/distributed/dataservice/service.go @@ -66,11 +66,7 @@ func (s *Server) init() error { Params.Init() Params.LoadFromEnv() - tracer, closer, err := trace.InitTracing("data_service") - if err != nil { - log.Error("data_service", zap.String("init trace err", err.Error())) - } - opentracing.SetGlobalTracer(tracer) + closer := trace.InitTracing("data_service") s.closer = closer s.wg.Add(1) @@ -147,8 +143,10 @@ func (s *Server) start() error { func (s *Server) Stop() error { var err error - if err = s.closer.Close(); err != nil { - return err + if s.closer != nil { + if err = s.closer.Close(); err != nil { + return err + } } s.cancel() diff --git a/internal/distributed/indexnode/service.go b/internal/distributed/indexnode/service.go index b517fa7ca..593e06988 100644 --- a/internal/distributed/indexnode/service.go +++ b/internal/distributed/indexnode/service.go @@ -97,11 +97,7 @@ func (s *Server) init() error { indexnode.Params.IP = Params.IP indexnode.Params.Address = Params.Address - tracer, closer, err := trace.InitTracing(fmt.Sprintf("index_node_%d", indexnode.Params.NodeID)) - if err != nil { - log.Error("index_node", zap.String("init trace err", err.Error())) - } - opentracing.SetGlobalTracer(tracer) + closer := trace.InitTracing(fmt.Sprintf("index_node_%d", indexnode.Params.NodeID)) s.closer = closer Params.Address = Params.IP + ":" + strconv.FormatInt(int64(Params.Port), 10) diff --git a/internal/distributed/indexservice/service.go b/internal/distributed/indexservice/service.go index 57598d936..4a0aa2876 100644 --- a/internal/distributed/indexservice/service.go +++ b/internal/distributed/indexservice/service.go @@ -56,11 +56,7 @@ func (s *Server) init() error { Params.Init() indexservice.Params.Init() - tracer, closer, err := trace.InitTracing("index_service") - if err != nil { - log.Error("index_service", zap.String("init trace err", err.Error())) - } - opentracing.SetGlobalTracer(tracer) + closer := trace.InitTracing("index_service") s.closer = closer s.loopWg.Add(1) @@ -86,8 +82,10 @@ func (s *Server) start() error { } func (s *Server) Stop() error { - if err := s.closer.Close(); err != nil { - return err + if s.closer != nil { + if err := s.closer.Close(); err != nil { + return err + } } if s.indexservice != nil { s.indexservice.Stop() diff --git a/internal/distributed/masterservice/server.go b/internal/distributed/masterservice/server.go index a1cd188ab..92c694857 100644 --- a/internal/distributed/masterservice/server.go +++ b/internal/distributed/masterservice/server.go @@ -96,16 +96,12 @@ func (s *Server) init() error { ctx := context.Background() - tracer, closer, err := trace.InitTracing("master_service") - if err != nil { - log.Error("master_service", zap.String("init trace err", err.Error())) - } - opentracing.SetGlobalTracer(tracer) + closer := trace.InitTracing("master_service") s.closer = closer log.Debug("init params done") - err = s.startGrpc() + err := s.startGrpc() if err != nil { return err } diff --git a/internal/distributed/proxynode/service.go b/internal/distributed/proxynode/service.go index 61098d35f..b1df7e549 100644 --- a/internal/distributed/proxynode/service.go +++ b/internal/distributed/proxynode/service.go @@ -136,11 +136,7 @@ func (s *Server) init() error { // for purpose of ID Allocator proxynode.Params.MasterAddress = Params.MasterAddress - tracer, closer, err := trace.InitTracing(fmt.Sprintf("proxy_node ip: %s, port: %d", Params.IP, Params.Port)) - if err != nil { - log.Error("proxy_node", zap.String("init trace err", err.Error())) - } - opentracing.SetGlobalTracer(tracer) + closer := trace.InitTracing(fmt.Sprintf("proxy_node ip: %s, port: %d", Params.IP, Params.Port)) s.closer = closer log.Debug("proxynode", zap.String("proxy host", Params.IP)) @@ -243,7 +239,11 @@ func (s *Server) start() error { func (s *Server) Stop() error { var err error - s.closer.Close() + if s.closer != nil { + if err = s.closer.Close(); err != nil { + return err + } + } if s.grpcServer != nil { s.grpcServer.GracefulStop() diff --git a/internal/distributed/proxyservice/service.go b/internal/distributed/proxyservice/service.go index 1b75d6b65..4ba943cec 100644 --- a/internal/distributed/proxyservice/service.go +++ b/internal/distributed/proxyservice/service.go @@ -73,11 +73,7 @@ func (s *Server) init() error { proxyservice.Params.Init() log.Debug("init params done") - tracer, closer, err := trace.InitTracing("proxy_service") - if err != nil { - log.Error("proxy_service", zap.String("init trace err", err.Error())) - } - opentracing.SetGlobalTracer(tracer) + closer := trace.InitTracing("proxy_service") s.closer = closer s.wg.Add(1) @@ -137,8 +133,10 @@ func (s *Server) start() error { } func (s *Server) Stop() error { - if err := s.closer.Close(); err != nil { - return err + if s.closer != nil { + if err := s.closer.Close(); err != nil { + return err + } } s.cancel() s.closer.Close() diff --git a/internal/distributed/querynode/service.go b/internal/distributed/querynode/service.go index bf022c83b..8d557a6e0 100644 --- a/internal/distributed/querynode/service.go +++ b/internal/distributed/querynode/service.go @@ -77,18 +77,14 @@ func (s *Server) init() error { qn.Params.QueryNodePort = int64(Params.QueryNodePort) qn.Params.QueryNodeID = Params.QueryNodeID - tracer, closer, err := trace.InitTracing(fmt.Sprintf("query_node ip: %s, port: %d", Params.QueryNodeIP, Params.QueryNodePort)) - if err != nil { - log.Error("query_node", zap.String("init trace err", err.Error())) - } - opentracing.SetGlobalTracer(tracer) + closer := trace.InitTracing(fmt.Sprintf("query_node ip: %s, port: %d", Params.QueryNodeIP, Params.QueryNodePort)) s.closer = closer log.Debug("QueryNode", zap.Int("port", Params.QueryNodePort)) s.wg.Add(1) go s.startGrpcLoop(Params.QueryNodePort) // wait for grpc server loop start - err = <-s.grpcErrChan + err := <-s.grpcErrChan if err != nil { return err } @@ -257,8 +253,10 @@ func (s *Server) Run() error { } func (s *Server) Stop() error { - if err := s.closer.Close(); err != nil { - return err + if s.closer != nil { + if err := s.closer.Close(); err != nil { + return err + } } s.cancel() diff --git a/internal/distributed/queryservice/service.go b/internal/distributed/queryservice/service.go index 51bb5812b..626e82343 100644 --- a/internal/distributed/queryservice/service.go +++ b/internal/distributed/queryservice/service.go @@ -81,11 +81,7 @@ func (s *Server) init() error { Params.Init() qs.Params.Init() - tracer, closer, err := trace.InitTracing("query_service") - if err != nil { - log.Error("query_service", zap.String("init trace err", err.Error())) - } - opentracing.SetGlobalTracer(tracer) + closer := trace.InitTracing("query_service") s.closer = closer s.wg.Add(1) @@ -185,11 +181,12 @@ func (s *Server) start() error { } func (s *Server) Stop() error { - err := s.closer.Close() - if err != nil { - return err + if s.closer != nil { + if err := s.closer.Close(); err != nil { + return err + } } - err = s.queryservice.Stop() + err := s.queryservice.Stop() s.loopCancel() if s.grpcServer != nil { s.grpcServer.GracefulStop() diff --git a/internal/masterservice/master_service.go b/internal/masterservice/master_service.go index 44eb52987..270bc753d 100644 --- a/internal/masterservice/master_service.go +++ b/internal/masterservice/master_service.go @@ -879,7 +879,7 @@ func (c *Core) CreateCollection(ctx context.Context, in *milvuspb.CreateCollecti Reason: fmt.Sprintf("state code = %s", internalpb.StateCode_name[int32(code)]), }, nil } - log.Debug("CreateCollection ", zap.String("name", in.CollectionName)) + log.Debug("CreateCollection ", zap.String("name", in.CollectionName), zap.Int64("msgID", in.Base.MsgID)) t := &CreateCollectionReqTask{ baseReqTask: baseReqTask{ ctx: ctx, @@ -891,13 +891,13 @@ func (c *Core) CreateCollection(ctx context.Context, in *milvuspb.CreateCollecti c.ddReqQueue <- t err := t.WaitToFinish() if err != nil { - log.Debug("CreateCollection failed", zap.String("name", in.CollectionName)) + log.Debug("CreateCollection failed", zap.String("name", in.CollectionName), zap.Int64("msgID", in.Base.MsgID)) return &commonpb.Status{ ErrorCode: commonpb.ErrorCode_UnexpectedError, Reason: "Create collection failed: " + err.Error(), }, nil } - log.Debug("CreateCollection Success", zap.String("name", in.CollectionName)) + log.Debug("CreateCollection Success", zap.String("name", in.CollectionName), zap.Int64("msgID", in.Base.MsgID)) return &commonpb.Status{ ErrorCode: commonpb.ErrorCode_Success, Reason: "", @@ -912,7 +912,7 @@ func (c *Core) DropCollection(ctx context.Context, in *milvuspb.DropCollectionRe Reason: fmt.Sprintf("state code = %s", internalpb.StateCode_name[int32(code)]), }, nil } - log.Debug("DropCollection", zap.String("name", in.CollectionName)) + log.Debug("DropCollection", zap.String("name", in.CollectionName), zap.Int64("msgID", in.Base.MsgID)) t := &DropCollectionReqTask{ baseReqTask: baseReqTask{ ctx: ctx, @@ -924,13 +924,13 @@ func (c *Core) DropCollection(ctx context.Context, in *milvuspb.DropCollectionRe c.ddReqQueue <- t err := t.WaitToFinish() if err != nil { - log.Debug("DropCollection Failed", zap.String("name", in.CollectionName)) + log.Debug("DropCollection Failed", zap.String("name", in.CollectionName), zap.Int64("msgID", in.Base.MsgID)) return &commonpb.Status{ ErrorCode: commonpb.ErrorCode_UnexpectedError, Reason: "Drop collection failed: " + err.Error(), }, nil } - log.Debug("DropCollection Success", zap.String("name", in.CollectionName)) + log.Debug("DropCollection Success", zap.String("name", in.CollectionName), zap.Int64("msgID", in.Base.MsgID)) return &commonpb.Status{ ErrorCode: commonpb.ErrorCode_Success, Reason: "", @@ -948,7 +948,7 @@ func (c *Core) HasCollection(ctx context.Context, in *milvuspb.HasCollectionRequ Value: false, }, nil } - log.Debug("HasCollection", zap.String("name", in.CollectionName)) + log.Debug("HasCollection", zap.String("name", in.CollectionName), zap.Int64("msgID", in.Base.MsgID)) t := &HasCollectionReqTask{ baseReqTask: baseReqTask{ ctx: ctx, @@ -961,7 +961,7 @@ func (c *Core) HasCollection(ctx context.Context, in *milvuspb.HasCollectionRequ c.ddReqQueue <- t err := t.WaitToFinish() if err != nil { - log.Debug("HasCollection Failed", zap.String("name", in.CollectionName)) + log.Debug("HasCollection Failed", zap.String("name", in.CollectionName), zap.Int64("msgID", in.Base.MsgID)) return &milvuspb.BoolResponse{ Status: &commonpb.Status{ ErrorCode: commonpb.ErrorCode_UnexpectedError, @@ -970,7 +970,7 @@ func (c *Core) HasCollection(ctx context.Context, in *milvuspb.HasCollectionRequ Value: false, }, nil } - log.Debug("HasCollection Success", zap.String("name", in.CollectionName)) + log.Debug("HasCollection Success", zap.String("name", in.CollectionName), zap.Int64("msgID", in.Base.MsgID)) return &milvuspb.BoolResponse{ Status: &commonpb.Status{ ErrorCode: commonpb.ErrorCode_Success, @@ -992,7 +992,7 @@ func (c *Core) DescribeCollection(ctx context.Context, in *milvuspb.DescribeColl CollectionID: 0, }, nil } - log.Debug("DescribeCollection", zap.String("name", in.CollectionName)) + log.Debug("DescribeCollection", zap.String("name", in.CollectionName), zap.Int64("msgID", in.Base.MsgID)) t := &DescribeCollectionReqTask{ baseReqTask: baseReqTask{ ctx: ctx, @@ -1005,7 +1005,7 @@ func (c *Core) DescribeCollection(ctx context.Context, in *milvuspb.DescribeColl c.ddReqQueue <- t err := t.WaitToFinish() if err != nil { - log.Debug("DescribeCollection Failed", zap.String("name", in.CollectionName), zap.Error(err)) + log.Debug("DescribeCollection Failed", zap.String("name", in.CollectionName), zap.Error(err), zap.Int64("msgID", in.Base.MsgID)) return &milvuspb.DescribeCollectionResponse{ Status: &commonpb.Status{ ErrorCode: commonpb.ErrorCode_UnexpectedError, @@ -1014,7 +1014,7 @@ func (c *Core) DescribeCollection(ctx context.Context, in *milvuspb.DescribeColl Schema: nil, }, nil } - log.Debug("DescribeCollection Success", zap.String("name", in.CollectionName)) + log.Debug("DescribeCollection Success", zap.String("name", in.CollectionName), zap.Int64("msgID", in.Base.MsgID)) t.Rsp.Status = &commonpb.Status{ ErrorCode: commonpb.ErrorCode_Success, Reason: "", @@ -1033,7 +1033,7 @@ func (c *Core) ShowCollections(ctx context.Context, in *milvuspb.ShowCollections CollectionNames: nil, }, nil } - log.Debug("ShowCollections", zap.String("dbname", in.DbName)) + log.Debug("ShowCollections", zap.String("dbname", in.DbName), zap.Int64("msgID", in.Base.MsgID)) t := &ShowCollectionReqTask{ baseReqTask: baseReqTask{ ctx: ctx, @@ -1048,7 +1048,7 @@ func (c *Core) ShowCollections(ctx context.Context, in *milvuspb.ShowCollections c.ddReqQueue <- t err := t.WaitToFinish() if err != nil { - log.Debug("ShowCollections failed", zap.String("dbname", in.DbName)) + log.Debug("ShowCollections failed", zap.String("dbname", in.DbName), zap.Int64("msgID", in.Base.MsgID)) return &milvuspb.ShowCollectionsResponse{ CollectionNames: nil, Status: &commonpb.Status{ @@ -1057,7 +1057,7 @@ func (c *Core) ShowCollections(ctx context.Context, in *milvuspb.ShowCollections }, }, nil } - log.Debug("ShowCollections Success", zap.String("dbname", in.DbName), zap.Strings("collection names", t.Rsp.CollectionNames)) + log.Debug("ShowCollections Success", zap.String("dbname", in.DbName), zap.Strings("collection names", t.Rsp.CollectionNames), zap.Int64("msgID", in.Base.MsgID)) t.Rsp.Status = &commonpb.Status{ ErrorCode: commonpb.ErrorCode_Success, Reason: "", @@ -1073,7 +1073,7 @@ func (c *Core) CreatePartition(ctx context.Context, in *milvuspb.CreatePartition Reason: fmt.Sprintf("state code = %s", internalpb.StateCode_name[int32(code)]), }, nil } - log.Debug("CreatePartition", zap.String("collection name", in.CollectionName), zap.String("partition name", in.PartitionName)) + log.Debug("CreatePartition", zap.String("collection name", in.CollectionName), zap.String("partition name", in.PartitionName), zap.Int64("msgID", in.Base.MsgID)) t := &CreatePartitionReqTask{ baseReqTask: baseReqTask{ ctx: ctx, @@ -1085,13 +1085,13 @@ func (c *Core) CreatePartition(ctx context.Context, in *milvuspb.CreatePartition c.ddReqQueue <- t err := t.WaitToFinish() if err != nil { - log.Debug("CreatePartition Failed", zap.String("collection name", in.CollectionName), zap.String("partition name", in.PartitionName)) + log.Debug("CreatePartition Failed", zap.String("collection name", in.CollectionName), zap.String("partition name", in.PartitionName), zap.Int64("msgID", in.Base.MsgID)) return &commonpb.Status{ ErrorCode: commonpb.ErrorCode_UnexpectedError, Reason: "create partition failed: " + err.Error(), }, nil } - log.Debug("CreatePartition Success", zap.String("collection name", in.CollectionName), zap.String("partition name", in.PartitionName)) + log.Debug("CreatePartition Success", zap.String("collection name", in.CollectionName), zap.String("partition name", in.PartitionName), zap.Int64("msgID", in.Base.MsgID)) return &commonpb.Status{ ErrorCode: commonpb.ErrorCode_Success, Reason: "", @@ -1106,7 +1106,7 @@ func (c *Core) DropPartition(ctx context.Context, in *milvuspb.DropPartitionRequ Reason: fmt.Sprintf("state code = %s", internalpb.StateCode_name[int32(code)]), }, nil } - log.Debug("DropPartition", zap.String("collection name", in.CollectionName), zap.String("partition name", in.PartitionName)) + log.Debug("DropPartition", zap.String("collection name", in.CollectionName), zap.String("partition name", in.PartitionName), zap.Int64("msgID", in.Base.MsgID)) t := &DropPartitionReqTask{ baseReqTask: baseReqTask{ ctx: ctx, @@ -1118,13 +1118,13 @@ func (c *Core) DropPartition(ctx context.Context, in *milvuspb.DropPartitionRequ c.ddReqQueue <- t err := t.WaitToFinish() if err != nil { - log.Debug("DropPartition Failed", zap.String("collection name", in.CollectionName), zap.String("partition name", in.PartitionName)) + log.Debug("DropPartition Failed", zap.String("collection name", in.CollectionName), zap.String("partition name", in.PartitionName), zap.Int64("msgID", in.Base.MsgID)) return &commonpb.Status{ ErrorCode: commonpb.ErrorCode_UnexpectedError, Reason: "DropPartition failed: " + err.Error(), }, nil } - log.Debug("DropPartition Success", zap.String("collection name", in.CollectionName), zap.String("partition name", in.PartitionName)) + log.Debug("DropPartition Success", zap.String("collection name", in.CollectionName), zap.String("partition name", in.PartitionName), zap.Int64("msgID", in.Base.MsgID)) return &commonpb.Status{ ErrorCode: commonpb.ErrorCode_Success, Reason: "", @@ -1142,7 +1142,7 @@ func (c *Core) HasPartition(ctx context.Context, in *milvuspb.HasPartitionReques Value: false, }, nil } - log.Debug("HasPartition", zap.String("collection name", in.CollectionName), zap.String("partition name", in.PartitionName)) + log.Debug("HasPartition", zap.String("collection name", in.CollectionName), zap.String("partition name", in.PartitionName), zap.Int64("msgID", in.Base.MsgID)) t := &HasPartitionReqTask{ baseReqTask: baseReqTask{ ctx: ctx, @@ -1155,7 +1155,7 @@ func (c *Core) HasPartition(ctx context.Context, in *milvuspb.HasPartitionReques c.ddReqQueue <- t err := t.WaitToFinish() if err != nil { - log.Debug("HasPartition Failed", zap.String("collection name", in.CollectionName), zap.String("partition name", in.PartitionName)) + log.Debug("HasPartition Failed", zap.String("collection name", in.CollectionName), zap.String("partition name", in.PartitionName), zap.Int64("msgID", in.Base.MsgID)) return &milvuspb.BoolResponse{ Status: &commonpb.Status{ ErrorCode: commonpb.ErrorCode_UnexpectedError, @@ -1164,7 +1164,7 @@ func (c *Core) HasPartition(ctx context.Context, in *milvuspb.HasPartitionReques Value: false, }, nil } - log.Debug("HasPartition Success", zap.String("collection name", in.CollectionName), zap.String("partition name", in.PartitionName)) + log.Debug("HasPartition Success", zap.String("collection name", in.CollectionName), zap.String("partition name", in.PartitionName), zap.Int64("msgID", in.Base.MsgID)) return &milvuspb.BoolResponse{ Status: &commonpb.Status{ ErrorCode: commonpb.ErrorCode_Success, @@ -1232,7 +1232,7 @@ func (c *Core) CreateIndex(ctx context.Context, in *milvuspb.CreateIndexRequest) Reason: fmt.Sprintf("state code = %s", internalpb.StateCode_name[int32(code)]), }, nil } - log.Debug("CreateIndex", zap.String("collection name", in.CollectionName), zap.String("field name", in.FieldName)) + log.Debug("CreateIndex", zap.String("collection name", in.CollectionName), zap.String("field name", in.FieldName), zap.Int64("msgID", in.Base.MsgID)) t := &CreateIndexReqTask{ baseReqTask: baseReqTask{ ctx: ctx, @@ -1244,13 +1244,13 @@ func (c *Core) CreateIndex(ctx context.Context, in *milvuspb.CreateIndexRequest) c.ddReqQueue <- t err := t.WaitToFinish() if err != nil { - log.Debug("CreateIndex Failed", zap.String("collection name", in.CollectionName), zap.String("field name", in.FieldName)) + log.Debug("CreateIndex Failed", zap.String("collection name", in.CollectionName), zap.String("field name", in.FieldName), zap.Int64("msgID", in.Base.MsgID)) return &commonpb.Status{ ErrorCode: commonpb.ErrorCode_UnexpectedError, Reason: "CreateIndex failed, error = " + err.Error(), }, nil } - log.Debug("CreateIndex Success", zap.String("collection name", in.CollectionName), zap.String("field name", in.FieldName)) + log.Debug("CreateIndex Success", zap.String("collection name", in.CollectionName), zap.String("field name", in.FieldName), zap.Int64("msgID", in.Base.MsgID)) return &commonpb.Status{ ErrorCode: commonpb.ErrorCode_Success, Reason: "", @@ -1268,7 +1268,7 @@ func (c *Core) DescribeIndex(ctx context.Context, in *milvuspb.DescribeIndexRequ IndexDescriptions: nil, }, nil } - log.Debug("DescribeIndex", zap.String("collection name", in.CollectionName), zap.String("field name", in.FieldName)) + log.Debug("DescribeIndex", zap.String("collection name", in.CollectionName), zap.String("field name", in.FieldName), zap.Int64("msgID", in.Base.MsgID)) t := &DescribeIndexReqTask{ baseReqTask: baseReqTask{ ctx: ctx, @@ -1284,6 +1284,7 @@ func (c *Core) DescribeIndex(ctx context.Context, in *milvuspb.DescribeIndexRequ c.ddReqQueue <- t err := t.WaitToFinish() if err != nil { + log.Debug("DescribeIndex Failed", zap.String("collection name", in.CollectionName), zap.String("field name", in.FieldName), zap.Int64("msgID", in.Base.MsgID)) return &milvuspb.DescribeIndexResponse{ Status: &commonpb.Status{ ErrorCode: commonpb.ErrorCode_UnexpectedError, @@ -1296,7 +1297,7 @@ func (c *Core) DescribeIndex(ctx context.Context, in *milvuspb.DescribeIndexRequ for _, i := range t.Rsp.IndexDescriptions { idxNames = append(idxNames, i.IndexName) } - log.Debug("DescribeIndex Success", zap.String("collection name", in.CollectionName), zap.String("field name", in.FieldName), zap.Strings("index names", idxNames)) + log.Debug("DescribeIndex Success", zap.String("collection name", in.CollectionName), zap.String("field name", in.FieldName), zap.Strings("index names", idxNames), zap.Int64("msgID", in.Base.MsgID)) if len(t.Rsp.IndexDescriptions) == 0 { t.Rsp.Status = &commonpb.Status{ ErrorCode: commonpb.ErrorCode_IndexNotExist, @@ -1319,7 +1320,7 @@ func (c *Core) DropIndex(ctx context.Context, in *milvuspb.DropIndexRequest) (*c Reason: fmt.Sprintf("state code = %s", internalpb.StateCode_name[int32(code)]), }, nil } - log.Debug("DropIndex", zap.String("collection name", in.CollectionName), zap.String("field name", in.FieldName), zap.String("index name", in.IndexName)) + log.Debug("DropIndex", zap.String("collection name", in.CollectionName), zap.String("field name", in.FieldName), zap.String("index name", in.IndexName), zap.Int64("msgID", in.Base.MsgID)) t := &DropIndexReqTask{ baseReqTask: baseReqTask{ ctx: ctx, @@ -1331,13 +1332,13 @@ func (c *Core) DropIndex(ctx context.Context, in *milvuspb.DropIndexRequest) (*c c.ddReqQueue <- t err := t.WaitToFinish() if err != nil { - log.Debug("DropIndex Failed", zap.String("collection name", in.CollectionName), zap.String("field name", in.FieldName), zap.String("index name", in.IndexName)) + log.Debug("DropIndex Failed", zap.String("collection name", in.CollectionName), zap.String("field name", in.FieldName), zap.String("index name", in.IndexName), zap.Int64("msgID", in.Base.MsgID)) return &commonpb.Status{ ErrorCode: commonpb.ErrorCode_UnexpectedError, Reason: "DropIndex failed, error = " + err.Error(), }, nil } - log.Debug("DropIndex Success", zap.String("collection name", in.CollectionName), zap.String("field name", in.FieldName), zap.String("index name", in.IndexName)) + log.Debug("DropIndex Success", zap.String("collection name", in.CollectionName), zap.String("field name", in.FieldName), zap.String("index name", in.IndexName), zap.Int64("msgID", in.Base.MsgID)) return &commonpb.Status{ ErrorCode: commonpb.ErrorCode_Success, Reason: "", @@ -1355,7 +1356,7 @@ func (c *Core) DescribeSegment(ctx context.Context, in *milvuspb.DescribeSegment IndexID: 0, }, nil } - log.Debug("DescribeSegment", zap.Int64("collection id", in.CollectionID), zap.Int64("segment id", in.SegmentID)) + log.Debug("DescribeSegment", zap.Int64("collection id", in.CollectionID), zap.Int64("segment id", in.SegmentID), zap.Int64("msgID", in.Base.MsgID)) t := &DescribeSegmentReqTask{ baseReqTask: baseReqTask{ ctx: ctx, @@ -1371,7 +1372,7 @@ func (c *Core) DescribeSegment(ctx context.Context, in *milvuspb.DescribeSegment c.ddReqQueue <- t err := t.WaitToFinish() if err != nil { - log.Debug("DescribeSegment Failed", zap.Int64("collection id", in.CollectionID), zap.Int64("segment id", in.SegmentID)) + log.Debug("DescribeSegment Failed", zap.Int64("collection id", in.CollectionID), zap.Int64("segment id", in.SegmentID), zap.Int64("msgID", in.Base.MsgID)) return &milvuspb.DescribeSegmentResponse{ Status: &commonpb.Status{ ErrorCode: commonpb.ErrorCode_UnexpectedError, @@ -1380,7 +1381,7 @@ func (c *Core) DescribeSegment(ctx context.Context, in *milvuspb.DescribeSegment IndexID: 0, }, nil } - log.Debug("DescribeSegment Success", zap.Int64("collection id", in.CollectionID), zap.Int64("segment id", in.SegmentID)) + log.Debug("DescribeSegment Success", zap.Int64("collection id", in.CollectionID), zap.Int64("segment id", in.SegmentID), zap.Int64("msgID", in.Base.MsgID)) t.Rsp.Status = &commonpb.Status{ ErrorCode: commonpb.ErrorCode_Success, Reason: "", @@ -1399,7 +1400,7 @@ func (c *Core) ShowSegments(ctx context.Context, in *milvuspb.ShowSegmentsReques SegmentIDs: nil, }, nil } - log.Debug("ShowSegments", zap.Int64("collection id", in.CollectionID), zap.Int64("partition id", in.PartitionID)) + log.Debug("ShowSegments", zap.Int64("collection id", in.CollectionID), zap.Int64("partition id", in.PartitionID), zap.Int64("msgID", in.Base.MsgID)) t := &ShowSegmentReqTask{ baseReqTask: baseReqTask{ ctx: ctx, @@ -1415,6 +1416,7 @@ func (c *Core) ShowSegments(ctx context.Context, in *milvuspb.ShowSegmentsReques c.ddReqQueue <- t err := t.WaitToFinish() if err != nil { + log.Debug("ShowSegments Failed", zap.Int64("collection id", in.CollectionID), zap.Int64("partition id", in.PartitionID), zap.Int64("msgID", in.Base.MsgID)) return &milvuspb.ShowSegmentsResponse{ Status: &commonpb.Status{ ErrorCode: commonpb.ErrorCode_UnexpectedError, @@ -1423,7 +1425,7 @@ func (c *Core) ShowSegments(ctx context.Context, in *milvuspb.ShowSegmentsReques SegmentIDs: nil, }, nil } - log.Debug("ShowSegments Success", zap.Int64("collection id", in.CollectionID), zap.Int64("partition id", in.PartitionID), zap.Int64s("segments ids", t.Rsp.SegmentIDs)) + log.Debug("ShowSegments Success", zap.Int64("collection id", in.CollectionID), zap.Int64("partition id", in.PartitionID), zap.Int64s("segments ids", t.Rsp.SegmentIDs), zap.Int64("msgID", in.Base.MsgID)) t.Rsp.Status = &commonpb.Status{ ErrorCode: commonpb.ErrorCode_Success, Reason: "", diff --git a/internal/querynode/search_service.go b/internal/querynode/search_service.go index 1393efcb7..be0375704 100644 --- a/internal/querynode/search_service.go +++ b/internal/querynode/search_service.go @@ -88,8 +88,8 @@ func (s *searchService) consumeSearch() { if !ok { continue } - sp, ctx := trace.StartSpanFromContext(sm.BaseMsg.Ctx) - sm.BaseMsg.Ctx = ctx + sp, ctx := trace.StartSpanFromContext(sm.TraceCtx()) + sm.SetTraceCtx(ctx) err := s.collectionCheck(sm.CollectionID) if err != nil { s.emptySearchCollection.emptySearch(sm) diff --git a/internal/util/trace/util.go b/internal/util/trace/util.go index 4783263aa..4d0241ee2 100644 --- a/internal/util/trace/util.go +++ b/internal/util/trace/util.go @@ -17,23 +17,35 @@ import ( "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" ) -func InitTracing(serviceName string) (opentracing.Tracer, io.Closer, error) { +func InitTracing(serviceName string) io.Closer { + if opentracing.IsGlobalTracerRegistered() { + return nil + } + var cfg *config.Configuration + var err error if true { - cfg, err := config.FromEnv() + cfg, err = config.FromEnv() if err != nil { - return nil, nil, errors.New("trace from env error") + log.Error(err) + return nil } cfg.ServiceName = serviceName - return cfg.NewTracer() + } else { + cfg = &config.Configuration{ + ServiceName: serviceName, + Sampler: &config.SamplerConfig{ + Type: "const", + Param: 1, + }, + } } - cfg := &config.Configuration{ - ServiceName: serviceName, - Sampler: &config.SamplerConfig{ - Type: "const", - Param: 1, - }, + tracer, closer, err := cfg.NewTracer() + if err != nil { + log.Error(err) + return nil } - return cfg.NewTracer() + opentracing.SetGlobalTracer(tracer) + return closer } func StartSpanFromContext(ctx context.Context, opts ...opentracing.StartSpanOption) (opentracing.Span, context.Context) { diff --git a/internal/util/trace/util_test.go b/internal/util/trace/util_test.go index 8d4d195f7..1ff692a89 100644 --- a/internal/util/trace/util_test.go +++ b/internal/util/trace/util_test.go @@ -7,7 +7,6 @@ import ( "errors" - "github.com/opentracing/opentracing-go" oplog "github.com/opentracing/opentracing-go/log" ) @@ -18,8 +17,7 @@ type simpleStruct struct { func TestTracing(t *testing.T) { //Already Init in each framework, this can be ignored in debug - tracer, closer, _ := InitTracing("test") - opentracing.SetGlobalTracer(tracer) + closer := InitTracing("test") defer closer.Close() // context normally can be propagated through func params -- GitLab