diff --git a/internal/datanode/flow_graph_dd_node.go b/internal/datanode/flow_graph_dd_node.go index 05ea49350bf256b228b2869a81e34772892c7ff3..1ed0afb80c4fe8ad9335eb25e9867c7f512e18be 100644 --- a/internal/datanode/flow_graph_dd_node.go +++ b/internal/datanode/flow_graph_dd_node.go @@ -81,6 +81,10 @@ func (ddNode *ddNode) Operate(ctx context.Context, in []Msg) ([]Msg, context.Con // TODO: add error handling } + if msMsg == nil { + return []Msg{}, ctx + } + ddNode.ddMsg = &ddMsg{ collectionRecords: make(map[UniqueID][]*metaOperateRecord), partitionRecords: make(map[UniqueID][]*metaOperateRecord), diff --git a/internal/datanode/flow_graph_filter_dm_node.go b/internal/datanode/flow_graph_filter_dm_node.go index a85b9c5a66ae86c2ef50e8307bfe74807f22c4ea..8c3f065e0b3b45f608f64e65f148b5b0c02ad29d 100644 --- a/internal/datanode/flow_graph_filter_dm_node.go +++ b/internal/datanode/flow_graph_filter_dm_node.go @@ -40,6 +40,10 @@ func (fdmNode *filterDmNode) Operate(ctx context.Context, in []Msg) ([]Msg, cont // TODO: add error handling } + if msgStreamMsg == nil || ddMsg == nil { + return []Msg{}, ctx + } + fdmNode.ddMsg = ddMsg var iMsg = insertMsg{ diff --git a/internal/datanode/flow_graph_gc_node.go b/internal/datanode/flow_graph_gc_node.go index 396b752c21c246d70a435f59db374557caa63821..982869b2eaa307089cbc39aa288d87fa5385cebf 100644 --- a/internal/datanode/flow_graph_gc_node.go +++ b/internal/datanode/flow_graph_gc_node.go @@ -30,6 +30,10 @@ func (gcNode *gcNode) Operate(ctx context.Context, in []Msg) ([]Msg, context.Con // TODO: add error handling } + if gcMsg == nil { + return []Msg{}, ctx + } + // drop collections for _, collectionID := range gcMsg.gcRecord.collections { err := gcNode.replica.removeCollection(collectionID) diff --git a/internal/datanode/flow_graph_insert_buffer_node.go b/internal/datanode/flow_graph_insert_buffer_node.go index d8485008d4a7b4c6e7fbb55fc690edb8707a0d47..e9674343ec6a48a88a04ea58fe0272ad8549c508 100644 --- a/internal/datanode/flow_graph_insert_buffer_node.go +++ b/internal/datanode/flow_graph_insert_buffer_node.go @@ -100,6 +100,10 @@ func (ibNode *insertBufferNode) Operate(ctx context.Context, in []Msg) ([]Msg, c // TODO: add error handling } + if iMsg == nil { + return []Msg{}, ctx + } + // Updating segment statistics uniqueSeg := make(map[UniqueID]int64) for _, msg := range iMsg.insertMessages { diff --git a/internal/distributed/datanode/service.go b/internal/distributed/datanode/service.go index 35f56a39c529f63d3822f89f00a78716b1a040f6..4b9ac5a87902d0f06a29537c1c7b28fb9b9500c3 100644 --- a/internal/distributed/datanode/service.go +++ b/internal/distributed/datanode/service.go @@ -13,7 +13,6 @@ import ( otgrpc "github.com/opentracing-contrib/go-grpc" "github.com/opentracing/opentracing-go" - "github.com/uber/jaeger-client-go/config" "go.uber.org/zap" "google.golang.org/grpc" @@ -29,6 +28,7 @@ import ( "github.com/zilliztech/milvus-distributed/internal/proto/milvuspb" "github.com/zilliztech/milvus-distributed/internal/types" "github.com/zilliztech/milvus-distributed/internal/util/funcutil" + "github.com/zilliztech/milvus-distributed/internal/util/trace" ) type Server struct { @@ -143,18 +143,15 @@ func (s *Server) init() error { Params.LoadFromEnv() Params.LoadFromArgs() + dn.Params.Init() + dn.Params.Port = Params.Port + dn.Params.IP = Params.IP + log.Debug("DataNode port", zap.Int("port", Params.Port)) - // TODO - cfg := &config.Configuration{ - ServiceName: fmt.Sprintf("data_node ip: %s, port: %d", Params.IP, Params.Port), - Sampler: &config.SamplerConfig{ - Type: "const", - Param: 1, - }, - } - tracer, closer, err := cfg.NewTracer() + + tracer, closer, err := trace.InitTracing(fmt.Sprintf("data_node ip: %s, port: %d", Params.IP, Params.Port)) if err != nil { - panic(fmt.Sprintf("ERROR: cannot init Jaeger: %v\n", err)) + log.Error("data_node", zap.String("init trace err", err.Error())) } opentracing.SetGlobalTracer(tracer) s.closer = closer @@ -210,10 +207,6 @@ func (s *Server) init() error { panic(err) } - dn.Params.Init() - dn.Params.Port = Params.Port - dn.Params.IP = Params.IP - s.datanode.NodeID = dn.Params.NodeID s.datanode.UpdateStateCode(internalpb.StateCode_Initializing) diff --git a/internal/distributed/dataservice/service.go b/internal/distributed/dataservice/service.go index d3c7aad1354bdc143dc60115837bff5c11441b3a..d5219c06b86747a28c8e2cebe9c3654d91ee07b4 100644 --- a/internal/distributed/dataservice/service.go +++ b/internal/distributed/dataservice/service.go @@ -2,7 +2,6 @@ package grpcdataserviceclient import ( "context" - "fmt" "io" "math" "net" @@ -18,13 +17,13 @@ import ( otgrpc "github.com/opentracing-contrib/go-grpc" "github.com/opentracing/opentracing-go" - "github.com/uber/jaeger-client-go/config" "github.com/zilliztech/milvus-distributed/internal/dataservice" msc "github.com/zilliztech/milvus-distributed/internal/distributed/masterservice/client" "github.com/zilliztech/milvus-distributed/internal/log" "github.com/zilliztech/milvus-distributed/internal/msgstream" "github.com/zilliztech/milvus-distributed/internal/types" "github.com/zilliztech/milvus-distributed/internal/util/funcutil" + "github.com/zilliztech/milvus-distributed/internal/util/trace" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" "github.com/zilliztech/milvus-distributed/internal/proto/datapb" @@ -56,21 +55,6 @@ func NewServer(ctx context.Context, factory msgstream.Factory) (*Server, error) grpcErrChan: make(chan error), } - // TODO - cfg := &config.Configuration{ - ServiceName: "data_service", - Sampler: &config.SamplerConfig{ - Type: "const", - Param: 1, - }, - } - tracer, closer, err := cfg.NewTracer() - if err != nil { - panic(fmt.Sprintf("ERROR: cannot init Jaeger: %v\n", err)) - } - opentracing.SetGlobalTracer(tracer) - s.closer = closer - s.dataService, err = dataservice.CreateServer(s.ctx, factory) if err != nil { return nil, err @@ -82,6 +66,13 @@ func (s *Server) init() error { Params.Init() Params.LoadFromEnv() + tracer, closer, err := trace.InitTracing("data_service") + if err != nil { + log.Error("data_service", zap.String("init trace err", err.Error())) + } + opentracing.SetGlobalTracer(tracer) + s.closer = closer + s.wg.Add(1) go s.startGrpcLoop(Params.Port) // wait for grpc server loop start diff --git a/internal/distributed/indexnode/service.go b/internal/distributed/indexnode/service.go index 45e5523591a3fadb1e38d6f4bc1829cd024dbaae..b517fa7ca894df832b16a472c0837d572036f6da 100644 --- a/internal/distributed/indexnode/service.go +++ b/internal/distributed/indexnode/service.go @@ -2,6 +2,8 @@ package grpcindexnode import ( "context" + "fmt" + "io" "math" "net" "strconv" @@ -20,6 +22,7 @@ import ( "github.com/zilliztech/milvus-distributed/internal/proto/milvuspb" "github.com/zilliztech/milvus-distributed/internal/types" "github.com/zilliztech/milvus-distributed/internal/util/funcutil" + "github.com/zilliztech/milvus-distributed/internal/util/trace" "google.golang.org/grpc" ) @@ -33,6 +36,8 @@ type Server struct { loopCtx context.Context loopCancel func() loopWg sync.WaitGroup + + closer io.Closer } func (s *Server) Run() error { @@ -87,6 +92,18 @@ func (s *Server) init() error { Params.LoadFromEnv() Params.LoadFromArgs() + indexnode.Params.Init() + indexnode.Params.Port = Params.Port + indexnode.Params.IP = Params.IP + indexnode.Params.Address = Params.Address + + tracer, closer, err := trace.InitTracing(fmt.Sprintf("index_node_%d", indexnode.Params.NodeID)) + if err != nil { + log.Error("index_node", zap.String("init trace err", err.Error())) + } + opentracing.SetGlobalTracer(tracer) + s.closer = closer + Params.Address = Params.IP + ":" + strconv.FormatInt(int64(Params.Port), 10) defer func() { @@ -114,11 +131,6 @@ func (s *Server) init() error { } s.indexnode.SetIndexServiceClient(s.indexServiceClient) - indexnode.Params.Init() - indexnode.Params.Port = Params.Port - indexnode.Params.IP = Params.IP - indexnode.Params.Address = Params.Address - s.indexnode.UpdateStateCode(internalpb.StateCode_Initializing) err = s.indexnode.Init() @@ -137,6 +149,9 @@ func (s *Server) start() error { } func (s *Server) Stop() error { + if err := s.closer.Close(); err != nil { + return err + } s.loopCancel() if s.indexnode != nil { s.indexnode.Stop() diff --git a/internal/distributed/indexservice/service.go b/internal/distributed/indexservice/service.go index 17cf78412136140c2b2d8f8777d513329207d3af..57598d9362ce473232c6df1e9f4b4819a9cd8251 100644 --- a/internal/distributed/indexservice/service.go +++ b/internal/distributed/indexservice/service.go @@ -2,7 +2,6 @@ package grpcindexservice import ( "context" - "fmt" "io" "math" "net" @@ -13,7 +12,6 @@ import ( otgrpc "github.com/opentracing-contrib/go-grpc" "github.com/opentracing/opentracing-go" - "github.com/uber/jaeger-client-go/config" "github.com/zilliztech/milvus-distributed/internal/indexservice" "github.com/zilliztech/milvus-distributed/internal/log" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" @@ -21,6 +19,7 @@ import ( "github.com/zilliztech/milvus-distributed/internal/proto/internalpb" "github.com/zilliztech/milvus-distributed/internal/proto/milvuspb" "github.com/zilliztech/milvus-distributed/internal/util/funcutil" + "github.com/zilliztech/milvus-distributed/internal/util/trace" "github.com/zilliztech/milvus-distributed/internal/util/typeutil" "google.golang.org/grpc" ) @@ -57,6 +56,13 @@ func (s *Server) init() error { Params.Init() indexservice.Params.Init() + tracer, closer, err := trace.InitTracing("index_service") + if err != nil { + log.Error("index_service", zap.String("init trace err", err.Error())) + } + opentracing.SetGlobalTracer(tracer) + s.closer = closer + s.loopWg.Add(1) go s.startGrpcLoop(Params.ServicePort) // wait for grpc indexservice loop start @@ -131,7 +137,6 @@ func (s *Server) GetIndexFilePaths(ctx context.Context, req *indexpb.GetIndexFil func (s *Server) NotifyBuildIndex(ctx context.Context, nty *indexpb.NotifyBuildIndexRequest) (*commonpb.Status, error) { return s.indexservice.NotifyBuildIndex(ctx, nty) } - func (s *Server) startGrpcLoop(grpcPort int) { defer s.loopWg.Done() @@ -178,19 +183,5 @@ func NewServer(ctx context.Context) (*Server, error) { grpcErrChan: make(chan error), } - cfg := &config.Configuration{ - ServiceName: "index_service", - Sampler: &config.SamplerConfig{ - Type: "const", - Param: 1, - }, - } - tracer, closer, err := cfg.NewTracer() - if err != nil { - panic(fmt.Sprintf("ERROR: cannot init Jaeger: %v\n", err)) - } - opentracing.SetGlobalTracer(tracer) - s.closer = closer - return s, nil } diff --git a/internal/distributed/masterservice/server.go b/internal/distributed/masterservice/server.go index c453323094e21286fb988f23875e95e73ccf4dbe..a1cd188ab6f90a8082bdfaa2880e46d1ce338510 100644 --- a/internal/distributed/masterservice/server.go +++ b/internal/distributed/masterservice/server.go @@ -2,7 +2,6 @@ package grpcmasterservice import ( "context" - "fmt" "io" "math" "net" @@ -12,7 +11,6 @@ import ( otgrpc "github.com/opentracing-contrib/go-grpc" "github.com/opentracing/opentracing-go" - "github.com/uber/jaeger-client-go/config" "go.uber.org/zap" "google.golang.org/grpc" @@ -24,6 +22,7 @@ import ( cms "github.com/zilliztech/milvus-distributed/internal/masterservice" "github.com/zilliztech/milvus-distributed/internal/msgstream" "github.com/zilliztech/milvus-distributed/internal/types" + "github.com/zilliztech/milvus-distributed/internal/util/trace" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" "github.com/zilliztech/milvus-distributed/internal/proto/internalpb" @@ -70,21 +69,7 @@ func NewServer(ctx context.Context, factory msgstream.Factory) (*Server, error) connectQueryService: true, } - //TODO - cfg := &config.Configuration{ - ServiceName: "master_service", - Sampler: &config.SamplerConfig{ - Type: "const", - Param: 1, - }, - } - tracer, closer, err := cfg.NewTracer() - if err != nil { - panic(fmt.Sprintf("ERROR: cannot init Jaeger: %v\n", err)) - } - opentracing.SetGlobalTracer(tracer) - s.closer = closer - + var err error s.masterService, err = cms.NewCore(s.ctx, factory) if err != nil { return nil, err @@ -105,11 +90,22 @@ func (s *Server) Run() error { func (s *Server) init() error { Params.Init() + + cms.Params.Init() + log.Debug("grpc init done ...") + ctx := context.Background() + tracer, closer, err := trace.InitTracing("master_service") + if err != nil { + log.Error("master_service", zap.String("init trace err", err.Error())) + } + opentracing.SetGlobalTracer(tracer) + s.closer = closer + log.Debug("init params done") - err := s.startGrpc() + err = s.startGrpc() if err != nil { return err } @@ -177,9 +173,6 @@ func (s *Server) init() error { panic(err) } } - cms.Params.Init() - log.Debug("grpc init done ...") - if err := s.masterService.Init(); err != nil { return err } @@ -235,8 +228,10 @@ func (s *Server) start() error { } func (s *Server) Stop() error { - if err := s.closer.Close(); err != nil { - return err + if s.closer != nil { + if err := s.closer.Close(); err != nil { + return err + } } if s.proxyService != nil { _ = s.proxyService.Stop() diff --git a/internal/distributed/proxynode/service.go b/internal/distributed/proxynode/service.go index 6d452d2dfcf5c085bb2582095010ace27c44390c..61098d35f1455a9d3bd4360ddf497661611f47b6 100644 --- a/internal/distributed/proxynode/service.go +++ b/internal/distributed/proxynode/service.go @@ -21,7 +21,6 @@ import ( grpcqueryserviceclient "github.com/zilliztech/milvus-distributed/internal/distributed/queryservice/client" "github.com/opentracing/opentracing-go" - "github.com/uber/jaeger-client-go/config" "github.com/zilliztech/milvus-distributed/internal/log" "github.com/zilliztech/milvus-distributed/internal/msgstream" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" @@ -30,6 +29,7 @@ import ( "github.com/zilliztech/milvus-distributed/internal/proto/proxypb" "github.com/zilliztech/milvus-distributed/internal/proxynode" "github.com/zilliztech/milvus-distributed/internal/util/funcutil" + "github.com/zilliztech/milvus-distributed/internal/util/trace" ) const ( @@ -126,28 +126,27 @@ func (s *Server) init() error { } Params.LoadFromEnv() Params.LoadFromArgs() - Params.Address = Params.IP + ":" + strconv.FormatInt(int64(Params.Port), 10) - log.Debug("proxynode", zap.String("proxy host", Params.IP)) - log.Debug("proxynode", zap.Int("proxy port", Params.Port)) - log.Debug("proxynode", zap.String("proxy address", Params.Address)) + proxynode.Params.Init() + log.Debug("init params done ...") + proxynode.Params.NetworkPort = Params.Port + proxynode.Params.IP = Params.IP + proxynode.Params.NetworkAddress = Params.Address + // for purpose of ID Allocator + proxynode.Params.MasterAddress = Params.MasterAddress - // TODO - cfg := &config.Configuration{ - ServiceName: fmt.Sprintf("proxy_node ip: %s, port: %d", Params.IP, Params.Port), - Sampler: &config.SamplerConfig{ - Type: "const", - Param: 1, - }, - } - tracer, closer, err := cfg.NewTracer() + tracer, closer, err := trace.InitTracing(fmt.Sprintf("proxy_node ip: %s, port: %d", Params.IP, Params.Port)) if err != nil { - panic(fmt.Sprintf("ERROR: cannot init Jaeger: %v\n", err)) + log.Error("proxy_node", zap.String("init trace err", err.Error())) } opentracing.SetGlobalTracer(tracer) s.closer = closer + log.Debug("proxynode", zap.String("proxy host", Params.IP)) + log.Debug("proxynode", zap.Int("proxy port", Params.Port)) + log.Debug("proxynode", zap.String("proxy address", Params.Address)) + defer func() { if err != nil { err2 := s.Stop() @@ -226,14 +225,6 @@ func (s *Server) init() error { s.proxynode.SetQueryServiceClient(s.queryServiceClient) log.Debug("set query service client ...") - proxynode.Params.Init() - log.Debug("init params done ...") - proxynode.Params.NetworkPort = Params.Port - proxynode.Params.IP = Params.IP - proxynode.Params.NetworkAddress = Params.Address - // for purpose of ID Allocator - proxynode.Params.MasterAddress = Params.MasterAddress - s.proxynode.UpdateStateCode(internalpb.StateCode_Initializing) log.Debug("proxynode", zap.Any("state of proxynode", internalpb.StateCode_Initializing)) diff --git a/internal/distributed/proxyservice/service.go b/internal/distributed/proxyservice/service.go index ae27d6e9585e40296a9f4fa02e50d5fa08b13232..1b75d6b659cb412876dd4a69bb52e3478f581671 100644 --- a/internal/distributed/proxyservice/service.go +++ b/internal/distributed/proxyservice/service.go @@ -2,7 +2,6 @@ package grpcproxyservice import ( "context" - "fmt" "io" "math" "net" @@ -13,7 +12,6 @@ import ( otgrpc "github.com/opentracing-contrib/go-grpc" "github.com/opentracing/opentracing-go" - "github.com/uber/jaeger-client-go/config" "github.com/zilliztech/milvus-distributed/internal/log" "github.com/zilliztech/milvus-distributed/internal/msgstream" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" @@ -22,6 +20,7 @@ import ( "github.com/zilliztech/milvus-distributed/internal/proto/proxypb" "github.com/zilliztech/milvus-distributed/internal/proxyservice" "github.com/zilliztech/milvus-distributed/internal/util/funcutil" + "github.com/zilliztech/milvus-distributed/internal/util/trace" "google.golang.org/grpc" ) @@ -49,20 +48,6 @@ func NewServer(ctx1 context.Context, factory msgstream.Factory) (*Server, error) grpcErrChan: make(chan error), } - // TODO - cfg := &config.Configuration{ - ServiceName: "proxy_service", - Sampler: &config.SamplerConfig{ - Type: "const", - Param: 1, - }, - } - server.tracer, server.closer, err = cfg.NewTracer() - if err != nil { - panic(fmt.Sprintf("ERROR: cannot init Jaeger: %v\n", err)) - } - opentracing.SetGlobalTracer(server.tracer) - server.proxyservice, err = proxyservice.NewProxyService(server.ctx, factory) if err != nil { return nil, err @@ -88,6 +73,13 @@ func (s *Server) init() error { proxyservice.Params.Init() log.Debug("init params done") + tracer, closer, err := trace.InitTracing("proxy_service") + if err != nil { + log.Error("proxy_service", zap.String("init trace err", err.Error())) + } + opentracing.SetGlobalTracer(tracer) + s.closer = closer + s.wg.Add(1) go s.startGrpcLoop(Params.ServicePort) // wait for grpc server loop start diff --git a/internal/distributed/querynode/service.go b/internal/distributed/querynode/service.go index ad98185730c72618939b6178336fb3fff8e01e13..bf022c83b1f5de68043e06ee446050635a790338 100644 --- a/internal/distributed/querynode/service.go +++ b/internal/distributed/querynode/service.go @@ -16,7 +16,6 @@ import ( otgrpc "github.com/opentracing-contrib/go-grpc" "github.com/opentracing/opentracing-go" - "github.com/uber/jaeger-client-go/config" "go.uber.org/zap" "google.golang.org/grpc" @@ -32,6 +31,7 @@ import ( "github.com/zilliztech/milvus-distributed/internal/proto/querypb" qn "github.com/zilliztech/milvus-distributed/internal/querynode" "github.com/zilliztech/milvus-distributed/internal/util/funcutil" + "github.com/zilliztech/milvus-distributed/internal/util/trace" "github.com/zilliztech/milvus-distributed/internal/util/typeutil" ) @@ -72,17 +72,14 @@ func (s *Server) init() error { Params.LoadFromEnv() Params.LoadFromArgs() - // TODO - cfg := &config.Configuration{ - ServiceName: fmt.Sprintf("query_node ip: %s, port: %d", Params.QueryNodeIP, Params.QueryNodePort), - Sampler: &config.SamplerConfig{ - Type: "const", - Param: 1, - }, - } - tracer, closer, err := cfg.NewTracer() + qn.Params.Init() + qn.Params.QueryNodeIP = Params.QueryNodeIP + qn.Params.QueryNodePort = int64(Params.QueryNodePort) + qn.Params.QueryNodeID = Params.QueryNodeID + + tracer, closer, err := trace.InitTracing(fmt.Sprintf("query_node ip: %s, port: %d", Params.QueryNodeIP, Params.QueryNodePort)) if err != nil { - panic(fmt.Sprintf("ERROR: cannot init Jaeger: %v\n", err)) + log.Error("query_node", zap.String("init trace err", err.Error())) } opentracing.SetGlobalTracer(tracer) s.closer = closer @@ -189,11 +186,6 @@ func (s *Server) init() error { panic(err) } - qn.Params.Init() - qn.Params.QueryNodeIP = Params.QueryNodeIP - qn.Params.QueryNodePort = int64(Params.QueryNodePort) - qn.Params.QueryNodeID = Params.QueryNodeID - s.querynode.UpdateStateCode(internalpb.StateCode_Initializing) if err := s.querynode.Init(); err != nil { diff --git a/internal/distributed/queryservice/service.go b/internal/distributed/queryservice/service.go index 0d205424d851f7d8df078165eb03db42ac592490..51bb5812bc1477edefa48122ae5da9645d6c9c6d 100644 --- a/internal/distributed/queryservice/service.go +++ b/internal/distributed/queryservice/service.go @@ -2,6 +2,7 @@ package grpcqueryservice import ( "context" + "io" "math" "net" "strconv" @@ -17,6 +18,7 @@ import ( qs "github.com/zilliztech/milvus-distributed/internal/queryservice" "github.com/zilliztech/milvus-distributed/internal/types" "github.com/zilliztech/milvus-distributed/internal/util/funcutil" + "github.com/zilliztech/milvus-distributed/internal/util/trace" "go.uber.org/zap" "google.golang.org/grpc" @@ -40,6 +42,8 @@ type Server struct { dataService *dsc.Client masterService *msc.GrpcClient + + closer io.Closer } func NewServer(ctx context.Context, factory msgstream.Factory) (*Server, error) { @@ -75,6 +79,14 @@ func (s *Server) Run() error { func (s *Server) init() error { ctx := context.Background() Params.Init() + qs.Params.Init() + + tracer, closer, err := trace.InitTracing("query_service") + if err != nil { + log.Error("query_service", zap.String("init trace err", err.Error())) + } + opentracing.SetGlobalTracer(tracer) + s.closer = closer s.wg.Add(1) go s.startGrpcLoop(Params.Port) @@ -129,7 +141,6 @@ func (s *Server) init() error { panic(err) } - qs.Params.Init() s.queryservice.UpdateStateCode(internalpb.StateCode_Initializing) if err := s.queryservice.Init(); err != nil { @@ -174,7 +185,11 @@ func (s *Server) start() error { } func (s *Server) Stop() error { - err := s.queryservice.Stop() + err := s.closer.Close() + if err != nil { + return err + } + err = s.queryservice.Stop() s.loopCancel() if s.grpcServer != nil { s.grpcServer.GracefulStop() diff --git a/internal/indexnode/indexnode.go b/internal/indexnode/indexnode.go index 624d49243bc6cdda90a7325bfb145ff6c5e5ae0b..beeb2884990b8b552dbd42393c343bc840e81a16 100644 --- a/internal/indexnode/indexnode.go +++ b/internal/indexnode/indexnode.go @@ -3,15 +3,12 @@ package indexnode import ( "context" "errors" - "fmt" "io" "math/rand" "time" "go.uber.org/zap" - "github.com/opentracing/opentracing-go" - "github.com/uber/jaeger-client-go/config" "github.com/zilliztech/milvus-distributed/internal/kv" miniokv "github.com/zilliztech/milvus-distributed/internal/kv/minio" "github.com/zilliztech/milvus-distributed/internal/log" @@ -95,21 +92,6 @@ func (i *IndexNode) Init() error { return err } - // TODO - cfg := &config.Configuration{ - ServiceName: fmt.Sprintf("index_node_%d", Params.NodeID), - Sampler: &config.SamplerConfig{ - Type: "const", - Param: 1, - }, - } - tracer, closer, err := cfg.NewTracer() - if err != nil { - panic(fmt.Sprintf("ERROR: cannot init Jaeger: %v\n", err)) - } - opentracing.SetGlobalTracer(tracer) - i.closer = closer - option := &miniokv.Option{ Address: Params.MinIOAddress, AccessKeyID: Params.MinIOAccessKeyID, @@ -140,9 +122,6 @@ func (i *IndexNode) Start() error { // Close closes the server. func (i *IndexNode) Stop() error { - if err := i.closer.Close(); err != nil { - return err - } i.loopCancel() if i.sched != nil { i.sched.Close() diff --git a/internal/querynode/flow_graph_service_time_node.go b/internal/querynode/flow_graph_service_time_node.go index 0e676e714bf17e08ffea68d5990538a72adacd0b..eb3572ce8dac6e8f8b148ec0e07bd32205a0fe06 100644 --- a/internal/querynode/flow_graph_service_time_node.go +++ b/internal/querynode/flow_graph_service_time_node.go @@ -44,9 +44,9 @@ func (stNode *serviceTimeNode) Operate(ctx context.Context, in []Msg) ([]Msg, co ts := stNode.replica.getTSafe(stNode.collectionID) if ts != nil { ts.set(serviceTimeMsg.timeRange.timestampMax) - log.Debug("update tSafe:", - zap.Int64("tSafe", int64(serviceTimeMsg.timeRange.timestampMax)), - zap.Int64("collectionID", stNode.collectionID)) + //log.Debug("update tSafe:", + // zap.Int64("tSafe", int64(serviceTimeMsg.timeRange.timestampMax)), + // zap.Int64("collectionID", stNode.collectionID)) } if err := stNode.sendTimeTick(serviceTimeMsg.timeRange.timestampMax); err != nil { diff --git a/internal/queryservice/queryservice.go b/internal/queryservice/queryservice.go index 868ca8c1b280075bf893931ff70a62d40a19a982..531ae9c709272ca8d615b7c6298e75fe1d13b6ba 100644 --- a/internal/queryservice/queryservice.go +++ b/internal/queryservice/queryservice.go @@ -4,7 +4,6 @@ import ( "context" "errors" "fmt" - "io" "math/rand" "sort" "strconv" @@ -12,8 +11,6 @@ import ( "sync/atomic" "time" - "github.com/opentracing/opentracing-go" - "github.com/uber/jaeger-client-go/config" "go.uber.org/zap" nodeclient "github.com/zilliztech/milvus-distributed/internal/distributed/querynode/client" @@ -51,8 +48,6 @@ type QueryService struct { enableGrpc bool msFactory msgstream.Factory - - closer io.Closer } func (qs *QueryService) Init() error { @@ -65,9 +60,6 @@ func (qs *QueryService) Start() error { } func (qs *QueryService) Stop() error { - if err := qs.closer.Close(); err != nil { - return err - } qs.loopCancel() qs.UpdateStateCode(internalpb.StateCode_Abnormal) return nil @@ -677,20 +669,6 @@ func NewQueryService(ctx context.Context, factory msgstream.Factory) (*QueryServ msFactory: factory, } - cfg := &config.Configuration{ - ServiceName: "query_service", - Sampler: &config.SamplerConfig{ - Type: "const", - Param: 1, - }, - } - tracer, closer, err := cfg.NewTracer() - if err != nil { - panic(fmt.Sprintf("ERROR: cannot init Jaeger: %v\n", err)) - } - opentracing.SetGlobalTracer(tracer) - service.closer = closer - service.UpdateStateCode(internalpb.StateCode_Abnormal) return service, nil } diff --git a/internal/util/flowgraph/node.go b/internal/util/flowgraph/node.go index e3ac1ec58443736f28c45f5a333490539b98d7aa..ec5192605d8f6ce7d7f5f32ba2e05cbfc1ebecff 100644 --- a/internal/util/flowgraph/node.go +++ b/internal/util/flowgraph/node.go @@ -54,7 +54,7 @@ func (nodeCtx *nodeCtx) Start(ctx context.Context, wg *sync.WaitGroup) { select { case <-ctx.Done(): wg.Done() - fmt.Println(nodeCtx.node.Name(), "closed") + //fmt.Println(nodeCtx.node.Name(), "closed") return default: // inputs from inputsMessages for Operate @@ -64,7 +64,7 @@ func (nodeCtx *nodeCtx) Start(ctx context.Context, wg *sync.WaitGroup) { var res []Msg var sp opentracing.Span if !nodeCtx.node.IsInputNode() { - msgCtx = nodeCtx.collectInputMessages() + msgCtx = nodeCtx.collectInputMessages(ctx) inputs = nodeCtx.inputMessages } n := nodeCtx.node @@ -108,7 +108,7 @@ func (nodeCtx *nodeCtx) ReceiveMsg(ctx context.Context, wg *sync.WaitGroup, msg wg.Done() } -func (nodeCtx *nodeCtx) collectInputMessages() context.Context { +func (nodeCtx *nodeCtx) collectInputMessages(exitCtx context.Context) context.Context { var opts []opentracing.StartSpanOption inputsNum := len(nodeCtx.inputChannels) @@ -119,17 +119,21 @@ func (nodeCtx *nodeCtx) collectInputMessages() context.Context { // and move them to inputMessages. for i := 0; i < inputsNum; i++ { channel := nodeCtx.inputChannels[i] - msgWithCtx, ok := <-channel - if !ok { - // TODO: add status - log.Println("input channel closed") + select { + case <-exitCtx.Done(): return nil - } - nodeCtx.inputMessages[i] = msgWithCtx.msg - if msgWithCtx.ctx != nil { - sp, _ := trace.StartSpanFromContext(msgWithCtx.ctx) - opts = append(opts, opentracing.ChildOf(sp.Context())) - sp.Finish() + case msgWithCtx, ok := <-channel: + if !ok { + // TODO: add status + log.Println("input channel closed") + return nil + } + nodeCtx.inputMessages[i] = msgWithCtx.msg + if msgWithCtx.ctx != nil { + sp, _ := trace.StartSpanFromContext(msgWithCtx.ctx) + opts = append(opts, opentracing.ChildOf(sp.Context())) + sp.Finish() + } } } @@ -157,12 +161,16 @@ func (nodeCtx *nodeCtx) collectInputMessages() context.Context { for nodeCtx.inputMessages[i].TimeTick() != latestTime { fmt.Println("try to align timestamp, t1 =", latestTime, ", t2 =", nodeCtx.inputMessages[i].TimeTick()) channel := nodeCtx.inputChannels[i] - msg, ok := <-channel - if !ok { - log.Println("input channel closed") + select { + case <-exitCtx.Done(): return + case msg, ok := <-channel: + if !ok { + log.Println("input channel closed") + return + } + nodeCtx.inputMessages[i] = msg.msg } - nodeCtx.inputMessages[i] = msg.msg } } sign <- struct{}{} diff --git a/internal/util/trace/util.go b/internal/util/trace/util.go index 7caf39d880872d4a0c2824c35606f2344d5c4a69..4783263aaa541f5c6697c1c6e86a3213d888c158 100644 --- a/internal/util/trace/util.go +++ b/internal/util/trace/util.go @@ -2,6 +2,7 @@ package trace import ( "context" + "io" "runtime" "strings" @@ -11,10 +12,30 @@ import ( "github.com/opentracing/opentracing-go/ext" "github.com/opentracing/opentracing-go/log" "github.com/uber/jaeger-client-go" + "github.com/uber/jaeger-client-go/config" "github.com/zilliztech/milvus-distributed/internal/msgstream" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" ) +func InitTracing(serviceName string) (opentracing.Tracer, io.Closer, error) { + if true { + cfg, err := config.FromEnv() + if err != nil { + return nil, nil, errors.New("trace from env error") + } + cfg.ServiceName = serviceName + return cfg.NewTracer() + } + cfg := &config.Configuration{ + ServiceName: serviceName, + Sampler: &config.SamplerConfig{ + Type: "const", + Param: 1, + }, + } + return cfg.NewTracer() +} + func StartSpanFromContext(ctx context.Context, opts ...opentracing.StartSpanOption) (opentracing.Span, context.Context) { if ctx == nil { return noopSpan(), ctx diff --git a/internal/util/trace/util_test.go b/internal/util/trace/util_test.go index b69830e43b245092588f6bb2e1a0430832d97311..8d4d195f784aceb03d78094bae460a5add12a89f 100644 --- a/internal/util/trace/util_test.go +++ b/internal/util/trace/util_test.go @@ -3,32 +3,14 @@ package trace import ( "context" "fmt" - "io" "testing" "errors" "github.com/opentracing/opentracing-go" oplog "github.com/opentracing/opentracing-go/log" - "github.com/uber/jaeger-client-go/config" ) -func InitTracing() io.Closer { - cfg := &config.Configuration{ - ServiceName: "test", - Sampler: &config.SamplerConfig{ - Type: "const", - Param: 1, - }, - } - tracer, closer, err := cfg.NewTracer() - if err != nil { - panic(fmt.Sprintf("ERROR: cannot init Jaeger: %v\n", err)) - } - opentracing.SetGlobalTracer(tracer) - return closer -} - type simpleStruct struct { name string value string @@ -36,7 +18,8 @@ type simpleStruct struct { func TestTracing(t *testing.T) { //Already Init in each framework, this can be ignored in debug - closer := InitTracing() + tracer, closer, _ := InitTracing("test") + opentracing.SetGlobalTracer(tracer) defer closer.Close() // context normally can be propagated through func params