diff --git a/cmd/distributed/components/index_node.go b/cmd/distributed/components/index_node.go index 11ec3da4bfb1e0a548b74179898e821c9c3f3bdd..957e0d1317c0ce748bfbfc042821459737ebb3d0 100644 --- a/cmd/distributed/components/index_node.go +++ b/cmd/distributed/components/index_node.go @@ -2,37 +2,17 @@ package components import ( "context" - "fmt" - "io" - "github.com/opentracing/opentracing-go" - "github.com/uber/jaeger-client-go/config" grpcindexnode "github.com/zilliztech/milvus-distributed/internal/distributed/indexnode" ) type IndexNode struct { svr *grpcindexnode.Server - - tracer opentracing.Tracer - closer io.Closer } func NewIndexNode(ctx context.Context) (*IndexNode, error) { var err error n := &IndexNode{} - cfg := &config.Configuration{ - ServiceName: "indexnode", - Sampler: &config.SamplerConfig{ - Type: "const", - Param: 1, - }, - } - n.tracer, n.closer, err = cfg.NewTracer() - if err != nil { - panic(fmt.Sprintf("ERROR: cannot init Jaeger: %v\n", err)) - } - opentracing.SetGlobalTracer(n.tracer) - svr, err := grpcindexnode.NewServer(ctx) if err != nil { return nil, err diff --git a/cmd/distributed/components/index_service.go b/cmd/distributed/components/index_service.go index 07db95c121ce0ba835d072fd1847b78cf37691b4..2b89996a1672410189195ddd0b1242a6cb3d5352 100644 --- a/cmd/distributed/components/index_service.go +++ b/cmd/distributed/components/index_service.go @@ -2,37 +2,17 @@ package components import ( "context" - "fmt" - "io" - "github.com/opentracing/opentracing-go" - "github.com/uber/jaeger-client-go/config" grpcindexserver "github.com/zilliztech/milvus-distributed/internal/distributed/indexservice" ) type IndexService struct { svr *grpcindexserver.Server - - tracer opentracing.Tracer - closer io.Closer } func NewIndexService(ctx context.Context) (*IndexService, error) { var err error s := &IndexService{} - - cfg := &config.Configuration{ - ServiceName: "indexservice", - Sampler: &config.SamplerConfig{ - Type: "const", - Param: 1, - }, - } - s.tracer, s.closer, err = cfg.NewTracer() - if err != nil { - panic(fmt.Sprintf("ERROR: cannot init Jaeger: %v\n", err)) - } - opentracing.SetGlobalTracer(s.tracer) svr, err := grpcindexserver.NewServer(ctx) if err != nil { @@ -48,7 +28,6 @@ func (s *IndexService) Run() error { return nil } func (s *IndexService) Stop() error { - s.closer.Close() if err := s.svr.Stop(); err != nil { return err } diff --git a/cmd/distributed/components/master_service.go b/cmd/distributed/components/master_service.go index 20ec85d1a76d86848e7af928b66ba3e8f4d6e36c..b0739f7da8467fcf17e7dfe3fcd938bef5ed6712 100644 --- a/cmd/distributed/components/master_service.go +++ b/cmd/distributed/components/master_service.go @@ -2,7 +2,9 @@ package components import ( "context" + "io" + "github.com/opentracing/opentracing-go" msc "github.com/zilliztech/milvus-distributed/internal/distributed/masterservice" "github.com/zilliztech/milvus-distributed/internal/msgstream" ) @@ -10,6 +12,9 @@ import ( type MasterService struct { ctx context.Context svr *msc.Server + + tracer opentracing.Tracer + closer io.Closer } func NewMasterService(ctx context.Context, factory msgstream.Factory) (*MasterService, error) { @@ -18,7 +23,6 @@ func NewMasterService(ctx context.Context, factory msgstream.Factory) (*MasterSe if err != nil { return nil, err } - return &MasterService{ ctx: ctx, svr: svr, diff --git a/cmd/distributed/components/proxy_node.go b/cmd/distributed/components/proxy_node.go index a3b47cedd3877e8870178a131c9b41e79ad2e63d..c35b4231f580abcd068b6f5d96c14b1418a06f27 100644 --- a/cmd/distributed/components/proxy_node.go +++ b/cmd/distributed/components/proxy_node.go @@ -3,9 +3,8 @@ package components import ( "context" - "github.com/zilliztech/milvus-distributed/internal/msgstream" - grpcproxynode "github.com/zilliztech/milvus-distributed/internal/distributed/proxynode" + "github.com/zilliztech/milvus-distributed/internal/msgstream" ) type ProxyNode struct { @@ -13,7 +12,9 @@ type ProxyNode struct { } func NewProxyNode(ctx context.Context, factory msgstream.Factory) (*ProxyNode, error) { + var err error n := &ProxyNode{} + svr, err := grpcproxynode.NewServer(ctx, factory) if err != nil { return nil, err @@ -21,12 +22,14 @@ func NewProxyNode(ctx context.Context, factory msgstream.Factory) (*ProxyNode, e n.svr = svr return n, nil } + func (n *ProxyNode) Run() error { if err := n.svr.Run(); err != nil { return err } return nil } + func (n *ProxyNode) Stop() error { if err := n.svr.Stop(); err != nil { return err diff --git a/cmd/distributed/components/proxy_service.go b/cmd/distributed/components/proxy_service.go index 4581e729d1c3f9618b5e058cdc0351502d57e4dc..a492647c4db6a263f5c2a28ec2963085ed8f0803 100644 --- a/cmd/distributed/components/proxy_service.go +++ b/cmd/distributed/components/proxy_service.go @@ -3,9 +3,8 @@ package components import ( "context" - "github.com/zilliztech/milvus-distributed/internal/msgstream" - grpcproxyservice "github.com/zilliztech/milvus-distributed/internal/distributed/proxyservice" + "github.com/zilliztech/milvus-distributed/internal/msgstream" ) type ProxyService struct { @@ -13,6 +12,7 @@ type ProxyService struct { } func NewProxyService(ctx context.Context, factory msgstream.Factory) (*ProxyService, error) { + var err error service := &ProxyService{} svr, err := grpcproxyservice.NewServer(ctx, factory) if err != nil { @@ -21,12 +21,14 @@ func NewProxyService(ctx context.Context, factory msgstream.Factory) (*ProxyServ service.svr = svr return service, nil } + func (s *ProxyService) Run() error { if err := s.svr.Run(); err != nil { return err } return nil } + func (s *ProxyService) Stop() error { if err := s.svr.Stop(); err != nil { return err diff --git a/docs/developer_guides/chap05_proxy.md b/docs/developer_guides/chap05_proxy.md index 3576e39997bd9a7a6185440a33a01741565553b4..ae8e3de1f0232149524af159875f21c9ef36692d 100644 --- a/docs/developer_guides/chap05_proxy.md +++ b/docs/developer_guides/chap05_proxy.md @@ -2,7 +2,7 @@ -<img src="./figs/proxy.jpeg" width=700> +<img src="./figs/proxy.png" width=700> diff --git a/docs/developer_guides/figs/figs.graffle b/docs/developer_guides/figs/figs.graffle index e0c0621e44fd798544d5fc1bd4ce58508898131c..b671e01e4ed0ef0cf9be75838a81a795e57ec824 100644 Binary files a/docs/developer_guides/figs/figs.graffle and b/docs/developer_guides/figs/figs.graffle differ diff --git a/docs/developer_guides/figs/proxy.jpeg b/docs/developer_guides/figs/proxy.jpeg deleted file mode 100644 index 724849228db00036c3ec6b9f3dbe04a6df511087..0000000000000000000000000000000000000000 Binary files a/docs/developer_guides/figs/proxy.jpeg and /dev/null differ diff --git a/docs/developer_guides/figs/proxy.png b/docs/developer_guides/figs/proxy.png new file mode 100644 index 0000000000000000000000000000000000000000..4dfc2bcdca6cf9cbabdf5e17299a9d24753d5569 Binary files /dev/null and b/docs/developer_guides/figs/proxy.png differ diff --git a/internal/datanode/data_sync_service_test.go b/internal/datanode/data_sync_service_test.go index ffa8d893e463126fd5c350a3e95e4f60f4015f94..7e1998820b67f178d0285dc4f28306ecef6483dd 100644 --- a/internal/datanode/data_sync_service_test.go +++ b/internal/datanode/data_sync_service_test.go @@ -100,12 +100,12 @@ func TestDataSyncService_Start(t *testing.T) { var ddMsgStream msgstream.MsgStream = ddStream ddMsgStream.Start() - err = insertMsgStream.Produce(&msgPack) + err = insertMsgStream.Produce(ctx, &msgPack) assert.NoError(t, err) - err = insertMsgStream.Broadcast(&timeTickMsgPack) + err = insertMsgStream.Broadcast(ctx, &timeTickMsgPack) assert.NoError(t, err) - err = ddMsgStream.Broadcast(&timeTickMsgPack) + err = ddMsgStream.Broadcast(ctx, &timeTickMsgPack) assert.NoError(t, err) // dataSync diff --git a/internal/datanode/flow_graph_insert_buffer_node.go b/internal/datanode/flow_graph_insert_buffer_node.go index 6f6ea1a91933fb63bf9f01ff8d1cbe41f27bbeff..354eb45803a6bccadfd85368c6fd3b75c7470b73 100644 --- a/internal/datanode/flow_graph_insert_buffer_node.go +++ b/internal/datanode/flow_graph_insert_buffer_node.go @@ -550,7 +550,7 @@ func (ibNode *insertBufferNode) completeFlush(segID UniqueID) error { } msgPack.Msgs = append(msgPack.Msgs, msg) - return ibNode.completeFlushStream.Produce(&msgPack) + return ibNode.completeFlushStream.Produce(context.TODO(), &msgPack) } func (ibNode *insertBufferNode) writeHardTimeTick(ts Timestamp) error { @@ -571,7 +571,7 @@ func (ibNode *insertBufferNode) writeHardTimeTick(ts Timestamp) error { }, } msgPack.Msgs = append(msgPack.Msgs, &timeTickMsg) - return ibNode.timeTickStream.Produce(&msgPack) + return ibNode.timeTickStream.Produce(context.TODO(), &msgPack) } func (ibNode *insertBufferNode) updateSegStatistics(segIDs []UniqueID, currentPosition *internalpb2.MsgPosition) error { @@ -608,7 +608,7 @@ func (ibNode *insertBufferNode) updateSegStatistics(segIDs []UniqueID, currentPo var msgPack = msgstream.MsgPack{ Msgs: []msgstream.TsMsg{msg}, } - return ibNode.segmentStatisticsStream.Produce(&msgPack) + return ibNode.segmentStatisticsStream.Produce(context.TODO(), &msgPack) } func (ibNode *insertBufferNode) getCollectionSchemaByID(collectionID UniqueID) (*schemapb.CollectionSchema, error) { diff --git a/internal/dataservice/server.go b/internal/dataservice/server.go index bc7b9e53c923bec2517dfe750f927251b9b47135..d358b1c707e2b7eeb616f80580b09ce463992cc1 100644 --- a/internal/dataservice/server.go +++ b/internal/dataservice/server.go @@ -619,7 +619,7 @@ func (s *Server) openNewSegment(collectionID UniqueID, partitionID UniqueID, cha msgPack := &msgstream.MsgPack{ Msgs: []msgstream.TsMsg{infoMsg}, } - if err = s.segmentInfoStream.Produce(msgPack); err != nil { + if err = s.segmentInfoStream.Produce(s.ctx, msgPack); err != nil { return err } return nil diff --git a/internal/distributed/datanode/service.go b/internal/distributed/datanode/service.go index 004109c126cb3071ef0001fd6dcfc33f42324e93..c177ae8b2bdcf7b529753cbb4e3db03dcc953857 100644 --- a/internal/distributed/datanode/service.go +++ b/internal/distributed/datanode/service.go @@ -2,6 +2,8 @@ package grpcdatanode import ( "context" + "fmt" + "io" "sync" "time" @@ -11,6 +13,8 @@ import ( "github.com/zilliztech/milvus-distributed/internal/util/funcutil" + "github.com/opentracing/opentracing-go" + "github.com/uber/jaeger-client-go/config" dn "github.com/zilliztech/milvus-distributed/internal/datanode" dsc "github.com/zilliztech/milvus-distributed/internal/distributed/dataservice/client" msc "github.com/zilliztech/milvus-distributed/internal/distributed/masterservice/client" @@ -35,6 +39,8 @@ type Server struct { masterService *msc.GrpcClient dataService *dsc.Client + + closer io.Closer } func New(ctx context.Context, factory msgstream.Factory) (*Server, error) { @@ -101,13 +107,15 @@ func (s *Server) Run() error { } func (s *Server) Stop() error { + if err := s.closer.Close(); err != nil { + return err + } s.cancel() - var err error if s.grpcServer != nil { s.grpcServer.GracefulStop() } - err = s.impl.Stop() + err := s.impl.Stop() if err != nil { return err } @@ -182,6 +190,21 @@ func (s *Server) init() error { s.impl.NodeID = dn.Params.NodeID s.impl.UpdateStateCode(internalpb2.StateCode_INITIALIZING) + // TODO + cfg := &config.Configuration{ + ServiceName: fmt.Sprintf("data_node_%d", s.impl.NodeID), + Sampler: &config.SamplerConfig{ + Type: "const", + Param: 1, + }, + } + tracer, closer, err := cfg.NewTracer() + if err != nil { + panic(fmt.Sprintf("ERROR: cannot init Jaeger: %v\n", err)) + } + opentracing.SetGlobalTracer(tracer) + s.closer = closer + if err := s.impl.Init(); err != nil { log.Println("impl init error: ", err) return err diff --git a/internal/distributed/dataservice/service.go b/internal/distributed/dataservice/service.go index 6382f7f3da0001aaf829982499328729fa7e9df6..1534a14ec49201b08f42bc3e75968cca06c4276c 100644 --- a/internal/distributed/dataservice/service.go +++ b/internal/distributed/dataservice/service.go @@ -2,6 +2,8 @@ package grpcdataserviceclient import ( "context" + "fmt" + "io" "log" "net" "strconv" @@ -12,6 +14,8 @@ import ( msc "github.com/zilliztech/milvus-distributed/internal/distributed/masterservice/client" + "github.com/opentracing/opentracing-go" + "github.com/uber/jaeger-client-go/config" "github.com/zilliztech/milvus-distributed/internal/dataservice" "github.com/zilliztech/milvus-distributed/internal/msgstream" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" @@ -31,10 +35,12 @@ type Server struct { impl *dataservice.Server grpcServer *grpc.Server masterClient *msc.GrpcClient + + closer io.Closer } func NewServer(ctx context.Context, factory msgstream.Factory) (*Server, error) { - + var err error ctx1, cancel := context.WithCancel(ctx) s := &Server{ @@ -43,7 +49,21 @@ func NewServer(ctx context.Context, factory msgstream.Factory) (*Server, error) grpcErrChan: make(chan error), } - var err error + // TODO + cfg := &config.Configuration{ + ServiceName: "data_service", + Sampler: &config.SamplerConfig{ + Type: "const", + Param: 1, + }, + } + tracer, closer, err := cfg.NewTracer() + if err != nil { + panic(fmt.Sprintf("ERROR: cannot init Jaeger: %v\n", err)) + } + opentracing.SetGlobalTracer(tracer) + s.closer = closer + s.impl, err = dataservice.CreateServer(s.ctx, factory) if err != nil { return nil, err @@ -120,9 +140,11 @@ func (s *Server) start() error { } func (s *Server) Stop() error { - - s.cancel() var err error + if err = s.closer.Close(); err != nil { + return err + } + s.cancel() if s.grpcServer != nil { s.grpcServer.GracefulStop() diff --git a/internal/distributed/masterservice/server.go b/internal/distributed/masterservice/server.go index 7f8bb1be176d9eeee5f2af107f9b7557ed49a5b0..0b6dd5a2d511d70fb3f4054d8670b37366acfb60 100644 --- a/internal/distributed/masterservice/server.go +++ b/internal/distributed/masterservice/server.go @@ -2,6 +2,8 @@ package grpcmasterservice import ( "context" + "fmt" + "io" "log" "strconv" "time" @@ -15,6 +17,8 @@ import ( qsc "github.com/zilliztech/milvus-distributed/internal/distributed/queryservice/client" "github.com/zilliztech/milvus-distributed/internal/util/funcutil" + "github.com/opentracing/opentracing-go" + "github.com/uber/jaeger-client-go/config" cms "github.com/zilliztech/milvus-distributed/internal/masterservice" "github.com/zilliztech/milvus-distributed/internal/msgstream" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" @@ -44,6 +48,8 @@ type Server struct { connectDataService bool connectIndexService bool connectQueryService bool + + closer io.Closer } func NewServer(ctx context.Context, factory msgstream.Factory) (*Server, error) { @@ -60,7 +66,21 @@ func NewServer(ctx context.Context, factory msgstream.Factory) (*Server, error) connectQueryService: true, } - var err error + //TODO + cfg := &config.Configuration{ + ServiceName: "proxy_service", + Sampler: &config.SamplerConfig{ + Type: "const", + Param: 1, + }, + } + tracer, closer, err := cfg.NewTracer() + if err != nil { + panic(fmt.Sprintf("ERROR: cannot init Jaeger: %v\n", err)) + } + opentracing.SetGlobalTracer(tracer) + s.closer = closer + s.core, err = cms.NewCore(s.ctx, factory) if err != nil { return nil, err @@ -81,6 +101,7 @@ func (s *Server) Run() error { func (s *Server) init() error { Params.Init() + log.Println("init params done") err := s.startGrpc() @@ -202,6 +223,9 @@ func (s *Server) start() error { } func (s *Server) Stop() error { + if err := s.closer.Close(); err != nil { + return err + } if s.proxyService != nil { _ = s.proxyService.Stop() } diff --git a/internal/distributed/proxynode/service.go b/internal/distributed/proxynode/service.go index a9b7dc41d3c3d8dfa8d8c12ce2819c426d78acd4..fd97fe5d480b54a94c9dea37702005eb13b39b3c 100644 --- a/internal/distributed/proxynode/service.go +++ b/internal/distributed/proxynode/service.go @@ -2,7 +2,6 @@ package grpcproxynode import ( "context" - "fmt" "io" "log" "net" @@ -19,7 +18,6 @@ import ( grpcqueryserviceclient "github.com/zilliztech/milvus-distributed/internal/distributed/queryservice/client" "github.com/opentracing/opentracing-go" - "github.com/uber/jaeger-client-go/config" "github.com/zilliztech/milvus-distributed/internal/msgstream" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" @@ -55,19 +53,6 @@ func NewServer(ctx context.Context, factory msgstream.Factory) (*Server, error) grpcErrChan: make(chan error), } - cfg := &config.Configuration{ - ServiceName: "proxynode", - Sampler: &config.SamplerConfig{ - Type: "const", - Param: 1, - }, - } - server.tracer, server.closer, err = cfg.NewTracer() - if err != nil { - panic(fmt.Sprintf("ERROR: cannot init Jaeger: %v\n", err)) - } - opentracing.SetGlobalTracer(server.tracer) - server.impl, err = proxynode.NewProxyNodeImpl(server.ctx, factory) if err != nil { return nil, err diff --git a/internal/distributed/proxyservice/service.go b/internal/distributed/proxyservice/service.go index 67d7925e84682a30706699faf5dffe2e7d90ea0e..1e03df78fa0d29ccc87032bdd9e1e6d6684dfa67 100644 --- a/internal/distributed/proxyservice/service.go +++ b/internal/distributed/proxyservice/service.go @@ -45,8 +45,9 @@ func NewServer(ctx1 context.Context, factory msgstream.Factory) (*Server, error) grpcErrChan: make(chan error), } + // TODO cfg := &config.Configuration{ - ServiceName: "proxyservice", + ServiceName: "proxy_service", Sampler: &config.SamplerConfig{ Type: "const", Param: 1, @@ -133,6 +134,9 @@ func (s *Server) start() error { } func (s *Server) Stop() error { + if err := s.closer.Close(); err != nil { + return err + } s.cancel() s.closer.Close() err := s.impl.Stop() diff --git a/internal/masterservice/master_service.go b/internal/masterservice/master_service.go index f9387866d23049cb641fb4211b3eb4fbcefd17c3..1a7d804156fd48c853b4e9c3b68029d40425c913 100644 --- a/internal/masterservice/master_service.go +++ b/internal/masterservice/master_service.go @@ -487,10 +487,10 @@ func (c *Core) setMsgStreams() error { TimeTickMsg: timeTickResult, } msgPack.Msgs = append(msgPack.Msgs, timeTickMsg) - if err := timeTickStream.Broadcast(&msgPack); err != nil { + if err := timeTickStream.Broadcast(c.ctx, &msgPack); err != nil { return err } - if err := ddStream.Broadcast(&msgPack); err != nil { + if err := ddStream.Broadcast(c.ctx, &msgPack); err != nil { return err } return nil @@ -508,7 +508,7 @@ func (c *Core) setMsgStreams() error { CreateCollectionRequest: *req, } msgPack.Msgs = append(msgPack.Msgs, collMsg) - if err := ddStream.Broadcast(&msgPack); err != nil { + if err := ddStream.Broadcast(c.ctx, &msgPack); err != nil { return err } return nil @@ -526,7 +526,7 @@ func (c *Core) setMsgStreams() error { DropCollectionRequest: *req, } msgPack.Msgs = append(msgPack.Msgs, collMsg) - if err := ddStream.Broadcast(&msgPack); err != nil { + if err := ddStream.Broadcast(c.ctx, &msgPack); err != nil { return err } return nil @@ -544,7 +544,7 @@ func (c *Core) setMsgStreams() error { CreatePartitionRequest: *req, } msgPack.Msgs = append(msgPack.Msgs, collMsg) - if err := ddStream.Broadcast(&msgPack); err != nil { + if err := ddStream.Broadcast(c.ctx, &msgPack); err != nil { return err } return nil @@ -562,7 +562,7 @@ func (c *Core) setMsgStreams() error { DropPartitionRequest: *req, } msgPack.Msgs = append(msgPack.Msgs, collMsg) - if err := ddStream.Broadcast(&msgPack); err != nil { + if err := ddStream.Broadcast(c.ctx, &msgPack); err != nil { return err } return nil diff --git a/internal/masterservice/master_service_test.go b/internal/masterservice/master_service_test.go index 75e0a44b578630e09ece432c7272358c9d46267a..9075793dbcc5eca396c0d5d81e535e423c4d0e84 100644 --- a/internal/masterservice/master_service_test.go +++ b/internal/masterservice/master_service_test.go @@ -253,7 +253,7 @@ func TestMasterService(t *testing.T) { TimeTickMsg: timeTickResult, } msgPack.Msgs = append(msgPack.Msgs, timeTickMsg) - err := proxyTimeTickStream.Broadcast(&msgPack) + err := proxyTimeTickStream.Broadcast(ctx, &msgPack) assert.Nil(t, err) ttmsg, ok := <-timeTickStream.Chan() @@ -561,7 +561,7 @@ func TestMasterService(t *testing.T) { }, } msgPack.Msgs = append(msgPack.Msgs, segMsg) - err = dataServiceSegmentStream.Broadcast(&msgPack) + err = dataServiceSegmentStream.Broadcast(ctx, &msgPack) assert.Nil(t, err) time.Sleep(time.Second) @@ -703,7 +703,7 @@ func TestMasterService(t *testing.T) { }, } msgPack.Msgs = append(msgPack.Msgs, segMsg) - err = dataServiceSegmentStream.Broadcast(&msgPack) + err = dataServiceSegmentStream.Broadcast(ctx, &msgPack) assert.Nil(t, err) time.Sleep(time.Second) @@ -724,7 +724,7 @@ func TestMasterService(t *testing.T) { }, } msgPack.Msgs = []ms.TsMsg{flushMsg} - err = dataServiceSegmentStream.Broadcast(&msgPack) + err = dataServiceSegmentStream.Broadcast(ctx, &msgPack) assert.Nil(t, err) time.Sleep(time.Second) diff --git a/internal/msgstream/msgstream.go b/internal/msgstream/msgstream.go index a56ce74633f98eb8c3c6eb109e9b15c748f96649..9826c581833b370c0119dde598eb40079459eacb 100644 --- a/internal/msgstream/msgstream.go +++ b/internal/msgstream/msgstream.go @@ -30,8 +30,8 @@ type MsgStream interface { AsConsumer(channels []string, subName string) SetRepackFunc(repackFunc RepackFunc) - Produce(*MsgPack) error - Broadcast(*MsgPack) error + Produce(context.Context, *MsgPack) error + Broadcast(context.Context, *MsgPack) error Consume() *MsgPack Seek(offset *MsgPosition) error } diff --git a/internal/msgstream/pulsarms/msg_test.go b/internal/msgstream/pulsarms/msg_test.go index abee2ee0b6a0163b8e3d2ab309ab424aa0074731..5d986fa138127d80d485e8cbe19feee15d6ec790 100644 --- a/internal/msgstream/pulsarms/msg_test.go +++ b/internal/msgstream/pulsarms/msg_test.go @@ -131,6 +131,7 @@ func getInsertTask(reqID msgstream.UniqueID, hashValue uint32) msgstream.TsMsg { } func TestStream_task_Insert(t *testing.T) { + ctx := context.Background() pulsarAddress, _ := Params.Load("_PulsarAddress") producerChannels := []string{"insert1", "insert2"} consumerChannels := []string{"insert1", "insert2"} @@ -153,7 +154,7 @@ func TestStream_task_Insert(t *testing.T) { outputStream.AsConsumer(consumerChannels, consumerSubName) outputStream.Start() - err := inputStream.Produce(&msgPack) + err := inputStream.Produce(ctx, &msgPack) if err != nil { log.Fatalf("produce error = %v", err) } diff --git a/internal/msgstream/pulsarms/pulsar_msgstream.go b/internal/msgstream/pulsarms/pulsar_msgstream.go index c74274d5b5f99aef63a9303e7fbe8c78ffc91c9e..34b7084aba75b10a8f1387f8e818d0adb7494eb5 100644 --- a/internal/msgstream/pulsarms/pulsar_msgstream.go +++ b/internal/msgstream/pulsarms/pulsar_msgstream.go @@ -19,6 +19,7 @@ import ( "github.com/zilliztech/milvus-distributed/internal/msgstream/util" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" + "github.com/zilliztech/milvus-distributed/internal/util/trace" "github.com/zilliztech/milvus-distributed/internal/util/typeutil" ) @@ -204,7 +205,7 @@ func (ppRW *propertiesReaderWriter) ForeachKey(handler func(key, val string) err return nil } -func (ms *PulsarMsgStream) Produce(msgPack *MsgPack) error { +func (ms *PulsarMsgStream) Produce(ctx context.Context, msgPack *MsgPack) error { tsMsgs := msgPack.Msgs if len(tsMsgs) <= 0 { log.Debug("Warning: Receive empty msgPack") @@ -263,20 +264,26 @@ func (ms *PulsarMsgStream) Produce(msgPack *MsgPack) error { return err } - msg := &pulsar.ProducerMessage{Payload: m} + msg := &pulsar.ProducerMessage{Payload: m, Properties: map[string]string{}} + + sp, spanCtx := trace.MsgSpanFromCtx(ctx, v.Msgs[i]) + trace.InjectContextToPulsarMsgProperties(sp.Context(), msg.Properties) if _, err := ms.producers[k].Send( - context.Background(), + spanCtx, msg, ); err != nil { + trace.LogError(sp, err) + sp.Finish() return err } + sp.Finish() } } return nil } -func (ms *PulsarMsgStream) Broadcast(msgPack *MsgPack) error { +func (ms *PulsarMsgStream) Broadcast(ctx context.Context, msgPack *MsgPack) error { producerLen := len(ms.producers) for _, v := range msgPack.Msgs { mb, err := v.Marshal(v) @@ -289,15 +296,22 @@ func (ms *PulsarMsgStream) Broadcast(msgPack *MsgPack) error { return err } - msg := &pulsar.ProducerMessage{Payload: m} + msg := &pulsar.ProducerMessage{Payload: m, Properties: map[string]string{}} + + sp, spanCtx := trace.MsgSpanFromCtx(ctx, v) + trace.InjectContextToPulsarMsgProperties(sp.Context(), msg.Properties) + for i := 0; i < producerLen; i++ { if _, err := ms.producers[i].Send( - context.Background(), + spanCtx, msg, ); err != nil { + trace.LogError(sp, err) + sp.Finish() return err } } + sp.Finish() } return nil } diff --git a/internal/msgstream/pulsarms/pulsar_msgstream_test.go b/internal/msgstream/pulsarms/pulsar_msgstream_test.go index 431415a41475a59a1ff9da8afd22a4a40f0ab531..d9aa513be2fa9e78e24f991b477440dc1d982ec1 100644 --- a/internal/msgstream/pulsarms/pulsar_msgstream_test.go +++ b/internal/msgstream/pulsarms/pulsar_msgstream_test.go @@ -238,6 +238,7 @@ func receiveMsg(outputStream msgstream.MsgStream, msgCount int) { } func TestStream_PulsarMsgStream_Insert(t *testing.T) { + ctx := context.Background() pulsarAddress, _ := Params.Load("_PulsarAddress") c1, c2 := funcutil.RandomString(8), funcutil.RandomString(8) producerChannels := []string{c1, c2} @@ -249,7 +250,7 @@ func TestStream_PulsarMsgStream_Insert(t *testing.T) { msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_kInsert, 3, 3)) inputStream, outputStream := initPulsarStream(pulsarAddress, producerChannels, consumerChannels, consumerSubName) - err := inputStream.Produce(&msgPack) + err := inputStream.Produce(ctx, &msgPack) if err != nil { log.Fatalf("produce error = %v", err) } @@ -261,6 +262,7 @@ func TestStream_PulsarMsgStream_Insert(t *testing.T) { } func TestStream_PulsarMsgStream_Delete(t *testing.T) { + ctx := context.Background() pulsarAddress, _ := Params.Load("_PulsarAddress") c := funcutil.RandomString(8) producerChannels := []string{c} @@ -271,7 +273,7 @@ func TestStream_PulsarMsgStream_Delete(t *testing.T) { //msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_kDelete, 3, 3)) inputStream, outputStream := initPulsarStream(pulsarAddress, producerChannels, consumerChannels, consumerSubName) - err := inputStream.Produce(&msgPack) + err := inputStream.Produce(ctx, &msgPack) if err != nil { log.Fatalf("produce error = %v", err) } @@ -281,6 +283,7 @@ func TestStream_PulsarMsgStream_Delete(t *testing.T) { } func TestStream_PulsarMsgStream_Search(t *testing.T) { + ctx := context.Background() pulsarAddress, _ := Params.Load("_PulsarAddress") c := funcutil.RandomString(8) producerChannels := []string{c} @@ -292,7 +295,7 @@ func TestStream_PulsarMsgStream_Search(t *testing.T) { msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_kSearch, 3, 3)) inputStream, outputStream := initPulsarStream(pulsarAddress, producerChannels, consumerChannels, consumerSubName) - err := inputStream.Produce(&msgPack) + err := inputStream.Produce(ctx, &msgPack) if err != nil { log.Fatalf("produce error = %v", err) } @@ -302,6 +305,7 @@ func TestStream_PulsarMsgStream_Search(t *testing.T) { } func TestStream_PulsarMsgStream_SearchResult(t *testing.T) { + ctx := context.Background() pulsarAddress, _ := Params.Load("_PulsarAddress") c := funcutil.RandomString(8) producerChannels := []string{c} @@ -312,7 +316,7 @@ func TestStream_PulsarMsgStream_SearchResult(t *testing.T) { msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_kSearchResult, 3, 3)) inputStream, outputStream := initPulsarStream(pulsarAddress, producerChannels, consumerChannels, consumerSubName) - err := inputStream.Produce(&msgPack) + err := inputStream.Produce(ctx, &msgPack) if err != nil { log.Fatalf("produce error = %v", err) } @@ -322,6 +326,7 @@ func TestStream_PulsarMsgStream_SearchResult(t *testing.T) { } func TestStream_PulsarMsgStream_TimeTick(t *testing.T) { + ctx := context.Background() pulsarAddress, _ := Params.Load("_PulsarAddress") c := funcutil.RandomString(8) producerChannels := []string{c} @@ -332,7 +337,7 @@ func TestStream_PulsarMsgStream_TimeTick(t *testing.T) { msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_kTimeTick, 3, 3)) inputStream, outputStream := initPulsarStream(pulsarAddress, producerChannels, consumerChannels, consumerSubName) - err := inputStream.Produce(&msgPack) + err := inputStream.Produce(ctx, &msgPack) if err != nil { log.Fatalf("produce error = %v", err) } @@ -342,6 +347,7 @@ func TestStream_PulsarMsgStream_TimeTick(t *testing.T) { } func TestStream_PulsarMsgStream_BroadCast(t *testing.T) { + ctx := context.Background() pulsarAddress, _ := Params.Load("_PulsarAddress") c1, c2 := funcutil.RandomString(8), funcutil.RandomString(8) producerChannels := []string{c1, c2} @@ -353,7 +359,7 @@ func TestStream_PulsarMsgStream_BroadCast(t *testing.T) { msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_kTimeTick, 3, 3)) inputStream, outputStream := initPulsarStream(pulsarAddress, producerChannels, consumerChannels, consumerSubName) - err := inputStream.Broadcast(&msgPack) + err := inputStream.Broadcast(ctx, &msgPack) if err != nil { log.Fatalf("produce error = %v", err) } @@ -363,6 +369,7 @@ func TestStream_PulsarMsgStream_BroadCast(t *testing.T) { } func TestStream_PulsarMsgStream_RepackFunc(t *testing.T) { + ctx := context.Background() pulsarAddress, _ := Params.Load("_PulsarAddress") c1, c2 := funcutil.RandomString(8), funcutil.RandomString(8) producerChannels := []string{c1, c2} @@ -374,7 +381,7 @@ func TestStream_PulsarMsgStream_RepackFunc(t *testing.T) { msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_kInsert, 3, 3)) inputStream, outputStream := initPulsarStream(pulsarAddress, producerChannels, consumerChannels, consumerSubName, repackFunc) - err := inputStream.Produce(&msgPack) + err := inputStream.Produce(ctx, &msgPack) if err != nil { log.Fatalf("produce error = %v", err) } @@ -384,6 +391,7 @@ func TestStream_PulsarMsgStream_RepackFunc(t *testing.T) { } func TestStream_PulsarMsgStream_InsertRepackFunc(t *testing.T) { + ctx := context.Background() pulsarAddress, _ := Params.Load("_PulsarAddress") c1, c2 := funcutil.RandomString(8), funcutil.RandomString(8) producerChannels := []string{c1, c2} @@ -428,7 +436,7 @@ func TestStream_PulsarMsgStream_InsertRepackFunc(t *testing.T) { outputStream.Start() var output msgstream.MsgStream = outputStream - err := (*inputStream).Produce(&msgPack) + err := (*inputStream).Produce(ctx, &msgPack) if err != nil { log.Fatalf("produce error = %v", err) } @@ -438,6 +446,7 @@ func TestStream_PulsarMsgStream_InsertRepackFunc(t *testing.T) { } func TestStream_PulsarMsgStream_DeleteRepackFunc(t *testing.T) { + ctx := context.Background() pulsarAddress, _ := Params.Load("_PulsarAddress") c1, c2 := funcutil.RandomString(8), funcutil.RandomString(8) producerChannels := []string{c1, c2} @@ -480,7 +489,7 @@ func TestStream_PulsarMsgStream_DeleteRepackFunc(t *testing.T) { outputStream.Start() var output msgstream.MsgStream = outputStream - err := (*inputStream).Produce(&msgPack) + err := (*inputStream).Produce(ctx, &msgPack) if err != nil { log.Fatalf("produce error = %v", err) } @@ -490,6 +499,7 @@ func TestStream_PulsarMsgStream_DeleteRepackFunc(t *testing.T) { } func TestStream_PulsarMsgStream_DefaultRepackFunc(t *testing.T) { + ctx := context.Background() pulsarAddress, _ := Params.Load("_PulsarAddress") c1, c2 := funcutil.RandomString(8), funcutil.RandomString(8) producerChannels := []string{c1, c2} @@ -512,7 +522,7 @@ func TestStream_PulsarMsgStream_DefaultRepackFunc(t *testing.T) { outputStream.Start() var output msgstream.MsgStream = outputStream - err := (*inputStream).Produce(&msgPack) + err := (*inputStream).Produce(ctx, &msgPack) if err != nil { log.Fatalf("produce error = %v", err) } @@ -522,6 +532,7 @@ func TestStream_PulsarMsgStream_DefaultRepackFunc(t *testing.T) { } func TestStream_PulsarTtMsgStream_Insert(t *testing.T) { + ctx := context.Background() pulsarAddress, _ := Params.Load("_PulsarAddress") c1, c2 := funcutil.RandomString(8), funcutil.RandomString(8) producerChannels := []string{c1, c2} @@ -538,15 +549,15 @@ func TestStream_PulsarTtMsgStream_Insert(t *testing.T) { msgPack2.Msgs = append(msgPack2.Msgs, getTimeTickMsg(5, 5, 5)) inputStream, outputStream := initPulsarTtStream(pulsarAddress, producerChannels, consumerChannels, consumerSubName) - err := inputStream.Broadcast(&msgPack0) + err := inputStream.Broadcast(ctx, &msgPack0) if err != nil { log.Fatalf("broadcast error = %v", err) } - err = inputStream.Produce(&msgPack1) + err = inputStream.Produce(ctx, &msgPack1) if err != nil { log.Fatalf("produce error = %v", err) } - err = inputStream.Broadcast(&msgPack2) + err = inputStream.Broadcast(ctx, &msgPack2) if err != nil { log.Fatalf("broadcast error = %v", err) } @@ -556,6 +567,7 @@ func TestStream_PulsarTtMsgStream_Insert(t *testing.T) { } func TestStream_PulsarTtMsgStream_Seek(t *testing.T) { + ctx := context.Background() pulsarAddress, _ := Params.Load("_PulsarAddress") c1, c2 := funcutil.RandomString(8), funcutil.RandomString(8) producerChannels := []string{c1, c2} @@ -583,15 +595,15 @@ func TestStream_PulsarTtMsgStream_Seek(t *testing.T) { msgPack5.Msgs = append(msgPack5.Msgs, getTimeTickMsg(15, 15, 15)) inputStream, outputStream := initPulsarTtStream(pulsarAddress, producerChannels, consumerChannels, consumerSubName) - err := inputStream.Broadcast(&msgPack0) + err := inputStream.Broadcast(ctx, &msgPack0) assert.Nil(t, err) - err = inputStream.Produce(&msgPack1) + err = inputStream.Produce(ctx, &msgPack1) assert.Nil(t, err) - err = inputStream.Broadcast(&msgPack2) + err = inputStream.Broadcast(ctx, &msgPack2) assert.Nil(t, err) - err = inputStream.Produce(&msgPack3) + err = inputStream.Produce(ctx, &msgPack3) assert.Nil(t, err) - err = inputStream.Broadcast(&msgPack4) + err = inputStream.Broadcast(ctx, &msgPack4) assert.Nil(t, err) outputStream.Consume() @@ -599,7 +611,7 @@ func TestStream_PulsarTtMsgStream_Seek(t *testing.T) { for _, position := range receivedMsg.StartPositions { outputStream.Seek(position) } - err = inputStream.Broadcast(&msgPack5) + err = inputStream.Broadcast(ctx, &msgPack5) assert.Nil(t, err) seekMsg := outputStream.Consume() for _, msg := range seekMsg.Msgs { @@ -610,6 +622,7 @@ func TestStream_PulsarTtMsgStream_Seek(t *testing.T) { } func TestStream_PulsarTtMsgStream_UnMarshalHeader(t *testing.T) { + ctx := context.Background() pulsarAddress, _ := Params.Load("_PulsarAddress") c1, c2 := funcutil.RandomString(8), funcutil.RandomString(8) producerChannels := []string{c1, c2} @@ -627,15 +640,15 @@ func TestStream_PulsarTtMsgStream_UnMarshalHeader(t *testing.T) { msgPack2.Msgs = append(msgPack2.Msgs, getTimeTickMsg(5, 5, 5)) inputStream, outputStream := initPulsarTtStream(pulsarAddress, producerChannels, consumerChannels, consumerSubName) - err := inputStream.Broadcast(&msgPack0) + err := inputStream.Broadcast(ctx, &msgPack0) if err != nil { log.Fatalf("broadcast error = %v", err) } - err = inputStream.Produce(&msgPack1) + err = inputStream.Produce(ctx, &msgPack1) if err != nil { log.Fatalf("produce error = %v", err) } - err = inputStream.Broadcast(&msgPack2) + err = inputStream.Broadcast(ctx, &msgPack2) if err != nil { log.Fatalf("broadcast error = %v", err) } diff --git a/internal/msgstream/rmqms/rmq_msgstream.go b/internal/msgstream/rmqms/rmq_msgstream.go index acfdd2baaed5c24201fa1eff9a7e195a520eb56b..e881a7b911db1b4ce1b4baefefd1686bd288dec2 100644 --- a/internal/msgstream/rmqms/rmq_msgstream.go +++ b/internal/msgstream/rmqms/rmq_msgstream.go @@ -125,7 +125,7 @@ func (ms *RmqMsgStream) AsConsumer(channels []string, groupName string) { } } -func (ms *RmqMsgStream) Produce(pack *msgstream.MsgPack) error { +func (ms *RmqMsgStream) Produce(ctx context.Context, pack *msgstream.MsgPack) error { tsMsgs := pack.Msgs if len(tsMsgs) <= 0 { log.Printf("Warning: Receive empty msgPack") @@ -185,6 +185,7 @@ func (ms *RmqMsgStream) Produce(pack *msgstream.MsgPack) error { } msg := make([]rocksmq.ProducerMessage, 0) msg = append(msg, *rocksmq.NewProducerMessage(m)) + if err := rocksmq.Rmq.Produce(ms.producers[k], msg); err != nil { return err } @@ -193,7 +194,7 @@ func (ms *RmqMsgStream) Produce(pack *msgstream.MsgPack) error { return nil } -func (ms *RmqMsgStream) Broadcast(msgPack *MsgPack) error { +func (ms *RmqMsgStream) Broadcast(ctx context.Context, msgPack *MsgPack) error { producerLen := len(ms.producers) for _, v := range msgPack.Msgs { mb, err := v.Marshal(v) diff --git a/internal/msgstream/rmqms/rmq_msgstream_test.go b/internal/msgstream/rmqms/rmq_msgstream_test.go index 6c97446610e5bd093577a22defdefaa655029664..fa47c695404af7eacf56a3d6a6018f1b2d06897f 100644 --- a/internal/msgstream/rmqms/rmq_msgstream_test.go +++ b/internal/msgstream/rmqms/rmq_msgstream_test.go @@ -252,6 +252,7 @@ func receiveMsg(outputStream msgstream.MsgStream, msgCount int) { } func TestStream_RmqMsgStream_Insert(t *testing.T) { + ctx := context.Background() producerChannels := []string{"insert1", "insert2"} consumerChannels := []string{"insert1", "insert2"} consumerGroupName := "InsertGroup" @@ -263,7 +264,7 @@ func TestStream_RmqMsgStream_Insert(t *testing.T) { rocksdbName := "/tmp/rocksmq_insert" etcdKV := initRmq(rocksdbName) inputStream, outputStream := initRmqStream(producerChannels, consumerChannels, consumerGroupName) - err := inputStream.Produce(&msgPack) + err := inputStream.Produce(ctx, &msgPack) if err != nil { log.Fatalf("produce error = %v", err) } @@ -273,6 +274,7 @@ func TestStream_RmqMsgStream_Insert(t *testing.T) { } func TestStream_RmqMsgStream_Delete(t *testing.T) { + ctx := context.Background() producerChannels := []string{"delete"} consumerChannels := []string{"delete"} consumerSubName := "subDelete" @@ -283,7 +285,7 @@ func TestStream_RmqMsgStream_Delete(t *testing.T) { rocksdbName := "/tmp/rocksmq_delete" etcdKV := initRmq(rocksdbName) inputStream, outputStream := initRmqStream(producerChannels, consumerChannels, consumerSubName) - err := inputStream.Produce(&msgPack) + err := inputStream.Produce(ctx, &msgPack) if err != nil { log.Fatalf("produce error = %v", err) } @@ -292,6 +294,7 @@ func TestStream_RmqMsgStream_Delete(t *testing.T) { } func TestStream_RmqMsgStream_Search(t *testing.T) { + ctx := context.Background() producerChannels := []string{"search"} consumerChannels := []string{"search"} consumerSubName := "subSearch" @@ -303,7 +306,7 @@ func TestStream_RmqMsgStream_Search(t *testing.T) { rocksdbName := "/tmp/rocksmq_search" etcdKV := initRmq(rocksdbName) inputStream, outputStream := initRmqStream(producerChannels, consumerChannels, consumerSubName) - err := inputStream.Produce(&msgPack) + err := inputStream.Produce(ctx, &msgPack) if err != nil { log.Fatalf("produce error = %v", err) } @@ -312,6 +315,8 @@ func TestStream_RmqMsgStream_Search(t *testing.T) { } func TestStream_RmqMsgStream_SearchResult(t *testing.T) { + ctx := context.Background() + producerChannels := []string{"searchResult"} consumerChannels := []string{"searchResult"} consumerSubName := "subSearchResult" @@ -323,7 +328,7 @@ func TestStream_RmqMsgStream_SearchResult(t *testing.T) { rocksdbName := "/tmp/rocksmq_searchresult" etcdKV := initRmq(rocksdbName) inputStream, outputStream := initRmqStream(producerChannels, consumerChannels, consumerSubName) - err := inputStream.Produce(&msgPack) + err := inputStream.Produce(ctx, &msgPack) if err != nil { log.Fatalf("produce error = %v", err) } @@ -332,6 +337,7 @@ func TestStream_RmqMsgStream_SearchResult(t *testing.T) { } func TestStream_RmqMsgStream_TimeTick(t *testing.T) { + ctx := context.Background() producerChannels := []string{"timeTick"} consumerChannels := []string{"timeTick"} consumerSubName := "subTimeTick" @@ -343,7 +349,7 @@ func TestStream_RmqMsgStream_TimeTick(t *testing.T) { rocksdbName := "/tmp/rocksmq_timetick" etcdKV := initRmq(rocksdbName) inputStream, outputStream := initRmqStream(producerChannels, consumerChannels, consumerSubName) - err := inputStream.Produce(&msgPack) + err := inputStream.Produce(ctx, &msgPack) if err != nil { log.Fatalf("produce error = %v", err) } @@ -352,6 +358,7 @@ func TestStream_RmqMsgStream_TimeTick(t *testing.T) { } func TestStream_RmqMsgStream_BroadCast(t *testing.T) { + ctx := context.Background() producerChannels := []string{"insert1", "insert2"} consumerChannels := []string{"insert1", "insert2"} consumerSubName := "subInsert" @@ -363,7 +370,7 @@ func TestStream_RmqMsgStream_BroadCast(t *testing.T) { rocksdbName := "/tmp/rocksmq_broadcast" etcdKV := initRmq(rocksdbName) inputStream, outputStream := initRmqStream(producerChannels, consumerChannels, consumerSubName) - err := inputStream.Broadcast(&msgPack) + err := inputStream.Broadcast(ctx, &msgPack) if err != nil { log.Fatalf("produce error = %v", err) } @@ -372,6 +379,8 @@ func TestStream_RmqMsgStream_BroadCast(t *testing.T) { } func TestStream_RmqMsgStream_RepackFunc(t *testing.T) { + ctx := context.Background() + producerChannels := []string{"insert1", "insert2"} consumerChannels := []string{"insert1", "insert2"} consumerSubName := "subInsert" @@ -383,7 +392,7 @@ func TestStream_RmqMsgStream_RepackFunc(t *testing.T) { rocksdbName := "/tmp/rocksmq_repackfunc" etcdKV := initRmq(rocksdbName) inputStream, outputStream := initRmqStream(producerChannels, consumerChannels, consumerSubName, repackFunc) - err := inputStream.Produce(&msgPack) + err := inputStream.Produce(ctx, &msgPack) if err != nil { log.Fatalf("produce error = %v", err) } @@ -392,6 +401,8 @@ func TestStream_RmqMsgStream_RepackFunc(t *testing.T) { } func TestStream_PulsarTtMsgStream_Insert(t *testing.T) { + ctx := context.Background() + producerChannels := []string{"insert1", "insert2"} consumerChannels := []string{"insert1", "insert2"} consumerSubName := "subInsert" @@ -410,15 +421,15 @@ func TestStream_PulsarTtMsgStream_Insert(t *testing.T) { etcdKV := initRmq(rocksdbName) inputStream, outputStream := initRmqTtStream(producerChannels, consumerChannels, consumerSubName) - err := inputStream.Broadcast(&msgPack0) + err := inputStream.Broadcast(ctx, &msgPack0) if err != nil { log.Fatalf("broadcast error = %v", err) } - err = inputStream.Produce(&msgPack1) + err = inputStream.Produce(ctx, &msgPack1) if err != nil { log.Fatalf("produce error = %v", err) } - err = inputStream.Broadcast(&msgPack2) + err = inputStream.Broadcast(ctx, &msgPack2) if err != nil { log.Fatalf("broadcast error = %v", err) } diff --git a/internal/proxynode/proxy_node.go b/internal/proxynode/proxy_node.go index fbf25b80328330896d9c5dd60ab21cd5b47b8f30..85334d453ac16d86ba487b56e6b91d530606d7a8 100644 --- a/internal/proxynode/proxy_node.go +++ b/internal/proxynode/proxy_node.go @@ -12,6 +12,9 @@ import ( "github.com/zilliztech/milvus-distributed/internal/proto/proxypb" "github.com/zilliztech/milvus-distributed/internal/util/retry" + "github.com/opentracing/opentracing-go" + "github.com/uber/jaeger-client-go/config" + "github.com/zilliztech/milvus-distributed/internal/errors" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" @@ -130,6 +133,21 @@ func (node *NodeImpl) Init() error { return err } + // TODO + cfg := &config.Configuration{ + ServiceName: fmt.Sprintf("proxy_node_%d", Params.ProxyID), + Sampler: &config.SamplerConfig{ + Type: "const", + Param: 1, + }, + } + tracer, closer, err := cfg.NewTracer() + if err != nil { + panic(fmt.Sprintf("ERROR: cannot init Jaeger: %v\n", err)) + } + opentracing.SetGlobalTracer(tracer) + node.closer = closer + // wait for dataservice state changed to Healthy if node.dataServiceClient != nil { err = node.waitForServiceReady(node.dataServiceClient, "DataService") @@ -270,6 +288,9 @@ func (node *NodeImpl) Start() error { } func (node *NodeImpl) Stop() error { + if err := node.closer.Close(); err != nil { + return err + } node.cancel() globalInsertChannelsMap.closeAllMsgStream() diff --git a/internal/proxynode/task.go b/internal/proxynode/task.go index c9bb10e6559365ae2dcbdb8e3021db5c627e68b8..b24ac1e9b8bec7cef071ef5dcae6b64f2915e26a 100644 --- a/internal/proxynode/task.go +++ b/internal/proxynode/task.go @@ -224,7 +224,7 @@ func (it *InsertTask) Execute(ctx context.Context) error { return err } - err = stream.Produce(msgPack) + err = stream.Produce(ctx, msgPack) if err != nil { it.result.Status.ErrorCode = commonpb.ErrorCode_UNEXPECTED_ERROR it.result.Status.Reason = err.Error() @@ -580,7 +580,7 @@ func (st *SearchTask) Execute(ctx context.Context) error { Msgs: make([]msgstream.TsMsg, 1), } msgPack.Msgs[0] = tsMsg - err := st.queryMsgStream.Produce(msgPack) + err := st.queryMsgStream.Produce(ctx, msgPack) log.Printf("[NodeImpl] length of searchMsg: %v", len(msgPack.Msgs)) if err != nil { log.Printf("[NodeImpl] send search request failed: %v", err) diff --git a/internal/proxynode/timetick.go b/internal/proxynode/timetick.go index 1e76fcf6ec17e0ff169d6c2eaa177b959e250a3a..08f8d5c64d3d3273da5a0dc512e794f102247ca2 100644 --- a/internal/proxynode/timetick.go +++ b/internal/proxynode/timetick.go @@ -85,7 +85,7 @@ func (tt *timeTick) tick() error { }, } msgPack.Msgs = append(msgPack.Msgs, timeTickMsg) - err := tt.tickMsgStream.Produce(&msgPack) + err := tt.tickMsgStream.Produce(tt.ctx, &msgPack) if err != nil { log.Printf("proxynode send time tick error: %v", err) } else { diff --git a/internal/proxyservice/timetick.go b/internal/proxyservice/timetick.go index f93ce864bfbfd54f429c982728f3e1ffdcfd6159..da22a7181f02fba1511b67d0c5106f40864157d4 100644 --- a/internal/proxyservice/timetick.go +++ b/internal/proxyservice/timetick.go @@ -59,7 +59,7 @@ func (tt *TimeTickImpl) Start() error { } msgPack.Msgs = append(msgPack.Msgs, timeTickMsg) for _, channel := range tt.channels { - err = channel.Broadcast(&msgPack) + err = channel.Broadcast(tt.ctx, &msgPack) if err != nil { log.Println("send time tick error: ", err) } diff --git a/internal/querynode/data_sync_service_test.go b/internal/querynode/data_sync_service_test.go index 7b3ef328897a1399447d177c70aeaaa32b0f8caa..67f35cb34472a3ab1d7934aefb47af6994c1d55f 100644 --- a/internal/querynode/data_sync_service_test.go +++ b/internal/querynode/data_sync_service_test.go @@ -1,6 +1,7 @@ package querynode import ( + "context" "encoding/binary" "math" "testing" @@ -15,6 +16,8 @@ import ( // NOTE: start pulsar before test func TestDataSyncService_Start(t *testing.T) { + ctx := context.Background() + node := newQueryNodeMock() initTestMeta(t, node, 0, 0) // test data generate @@ -129,12 +132,12 @@ func TestDataSyncService_Start(t *testing.T) { var ddMsgStream msgstream.MsgStream = ddStream ddMsgStream.Start() - err = insertMsgStream.Produce(&msgPack) + err = insertMsgStream.Produce(ctx, &msgPack) assert.NoError(t, err) - err = insertMsgStream.Broadcast(&timeTickMsgPack) + err = insertMsgStream.Broadcast(ctx, &timeTickMsgPack) assert.NoError(t, err) - err = ddMsgStream.Broadcast(&timeTickMsgPack) + err = ddMsgStream.Broadcast(ctx, &timeTickMsgPack) assert.NoError(t, err) // dataSync diff --git a/internal/querynode/flow_graph_service_time_node.go b/internal/querynode/flow_graph_service_time_node.go index 85e9d8767c410621a76b1dc226cd8c04de2d917d..a164eb2bac0d1b93a25576c7a15020645e5922c5 100644 --- a/internal/querynode/flow_graph_service_time_node.go +++ b/internal/querynode/flow_graph_service_time_node.go @@ -66,7 +66,7 @@ func (stNode *serviceTimeNode) sendTimeTick(ts Timestamp) error { }, } msgPack.Msgs = append(msgPack.Msgs, &timeTickMsg) - return stNode.timeTickMsgStream.Produce(&msgPack) + return stNode.timeTickMsgStream.Produce(context.TODO(), &msgPack) } func newServiceTimeNode(ctx context.Context, replica collectionReplica, factory msgstream.Factory) *serviceTimeNode { diff --git a/internal/querynode/load_service_test.go b/internal/querynode/load_service_test.go index 8ec4ee440ee841c21233407f8c42d9181fc095c8..5bfd3365ca0926650a782c92852caa4ecf371709 100644 --- a/internal/querynode/load_service_test.go +++ b/internal/querynode/load_service_test.go @@ -1038,16 +1038,16 @@ func doInsert(ctx context.Context, collectionID UniqueID, partitionID UniqueID, var ddMsgStream msgstream.MsgStream = ddStream ddMsgStream.Start() - err = insertMsgStream.Produce(&msgPack) + err = insertMsgStream.Produce(ctx, &msgPack) if err != nil { return err } - err = insertMsgStream.Broadcast(&timeTickMsgPack) + err = insertMsgStream.Broadcast(ctx, &timeTickMsgPack) if err != nil { return err } - err = ddMsgStream.Broadcast(&timeTickMsgPack) + err = ddMsgStream.Broadcast(ctx, &timeTickMsgPack) if err != nil { return err } @@ -1104,11 +1104,11 @@ func sentTimeTick(ctx context.Context) error { var ddMsgStream msgstream.MsgStream = ddStream ddMsgStream.Start() - err = insertMsgStream.Broadcast(&timeTickMsgPack) + err = insertMsgStream.Broadcast(ctx, &timeTickMsgPack) if err != nil { return err } - err = ddMsgStream.Broadcast(&timeTickMsgPack) + err = ddMsgStream.Broadcast(ctx, &timeTickMsgPack) if err != nil { return err } diff --git a/internal/querynode/query_node.go b/internal/querynode/query_node.go index 14f35bff27d5c0c40621a2588d7a7b63d0d9958e..ac8b1cb8dce44ecf0949250e82fc3f8dbe82793d 100644 --- a/internal/querynode/query_node.go +++ b/internal/querynode/query_node.go @@ -19,6 +19,8 @@ import ( "log" "sync/atomic" + "github.com/opentracing/opentracing-go" + "github.com/uber/jaeger-client-go/config" "github.com/zilliztech/milvus-distributed/internal/errors" "github.com/zilliztech/milvus-distributed/internal/msgstream" "github.com/zilliztech/milvus-distributed/internal/msgstream/pulsarms" @@ -87,6 +89,20 @@ func NewQueryNode(ctx context.Context, queryNodeID UniqueID, factory msgstream.F msFactory: factory, } + cfg := &config.Configuration{ + ServiceName: fmt.Sprintf("query_node_%d", node.QueryNodeID), + Sampler: &config.SamplerConfig{ + Type: "const", + Param: 1, + }, + } + tracer, closer, err := cfg.NewTracer() + if err != nil { + panic(fmt.Sprintf("ERROR: cannot init Jaeger: %v\n", err)) + } + opentracing.SetGlobalTracer(tracer) + node.closer = closer + node.replica = newCollectionReplicaImpl() node.UpdateStateCode(internalpb2.StateCode_ABNORMAL) return node @@ -194,6 +210,9 @@ func (node *QueryNode) Start() error { } func (node *QueryNode) Stop() error { + if err := node.closer.Close(); err != nil { + return err + } node.UpdateStateCode(internalpb2.StateCode_ABNORMAL) node.queryNodeLoopCancel() diff --git a/internal/querynode/search_service.go b/internal/querynode/search_service.go index 193be887c19cb951253c906d9a627a0caa2ec407..14160200e96934f00a68dbcd988c615bff619ae6 100644 --- a/internal/querynode/search_service.go +++ b/internal/querynode/search_service.go @@ -398,7 +398,7 @@ func (ss *searchService) publishSearchResult(msg msgstream.TsMsg) error { // msg.SetMsgContext(ctx) msgPack := msgstream.MsgPack{} msgPack.Msgs = append(msgPack.Msgs, msg) - err := ss.searchResultMsgStream.Produce(&msgPack) + err := ss.searchResultMsgStream.Produce(context.TODO(), &msgPack) return err } @@ -430,7 +430,7 @@ func (ss *searchService) publishFailedSearchResult(msg msgstream.TsMsg, errMsg s } msgPack.Msgs = append(msgPack.Msgs, searchResultMsg) - err := ss.searchResultMsgStream.Produce(&msgPack) + err := ss.searchResultMsgStream.Produce(context.TODO(), &msgPack) if err != nil { return err } diff --git a/internal/querynode/search_service_test.go b/internal/querynode/search_service_test.go index 1765e346493a9375f00ac39c17e3a5856ff1b6c4..4fea8dedec51b366c7f87dfdd6524a10f1bd1622 100644 --- a/internal/querynode/search_service_test.go +++ b/internal/querynode/search_service_test.go @@ -19,6 +19,8 @@ import ( ) func TestSearch_Search(t *testing.T) { + ctx := context.Background() + node := newQueryNodeMock() initTestMeta(t, node, 0, 0) @@ -104,7 +106,7 @@ func TestSearch_Search(t *testing.T) { searchStream, _ := msFactory.NewMsgStream(node.queryNodeLoopCtx) searchStream.AsProducer(searchProducerChannels) searchStream.Start() - err = searchStream.Produce(&msgPackSearch) + err = searchStream.Produce(ctx, &msgPackSearch) assert.NoError(t, err) node.searchService = newSearchService(node.queryNodeLoopCtx, node.replica, msFactory) @@ -198,12 +200,12 @@ func TestSearch_Search(t *testing.T) { var ddMsgStream msgstream.MsgStream = ddStream ddMsgStream.Start() - err = insertMsgStream.Produce(&msgPack) + err = insertMsgStream.Produce(ctx, &msgPack) assert.NoError(t, err) - err = insertMsgStream.Broadcast(&timeTickMsgPack) + err = insertMsgStream.Broadcast(ctx, &timeTickMsgPack) assert.NoError(t, err) - err = ddMsgStream.Broadcast(&timeTickMsgPack) + err = ddMsgStream.Broadcast(ctx, &timeTickMsgPack) assert.NoError(t, err) // dataSync @@ -216,6 +218,7 @@ func TestSearch_Search(t *testing.T) { } func TestSearch_SearchMultiSegments(t *testing.T) { + ctx := context.Background() pulsarURL := Params.PulsarAddress const receiveBufSize = 1024 @@ -301,7 +304,7 @@ func TestSearch_SearchMultiSegments(t *testing.T) { searchStream, _ := msFactory.NewMsgStream(node.queryNodeLoopCtx) searchStream.AsProducer(searchProducerChannels) searchStream.Start() - err = searchStream.Produce(&msgPackSearch) + err = searchStream.Produce(ctx, &msgPackSearch) assert.NoError(t, err) node.searchService = newSearchService(node.queryNodeLoopCtx, node.replica, msFactory) @@ -399,12 +402,12 @@ func TestSearch_SearchMultiSegments(t *testing.T) { var ddMsgStream msgstream.MsgStream = ddStream ddMsgStream.Start() - err = insertMsgStream.Produce(&msgPack) + err = insertMsgStream.Produce(ctx, &msgPack) assert.NoError(t, err) - err = insertMsgStream.Broadcast(&timeTickMsgPack) + err = insertMsgStream.Broadcast(ctx, &timeTickMsgPack) assert.NoError(t, err) - err = ddMsgStream.Broadcast(&timeTickMsgPack) + err = ddMsgStream.Broadcast(ctx, &timeTickMsgPack) assert.NoError(t, err) // dataSync diff --git a/internal/querynode/stats_service.go b/internal/querynode/stats_service.go index cf6db549ff410ec11fccfeaae5d7b1c9a2aea08b..4f61262494c57f02a2f14b7f21eb9c8026a2ce1b 100644 --- a/internal/querynode/stats_service.go +++ b/internal/querynode/stats_service.go @@ -92,7 +92,7 @@ func (sService *statsService) publicStatistic(fieldStats []*internalpb2.FieldSta var msgPack = msgstream.MsgPack{ Msgs: []msgstream.TsMsg{msg}, } - err := sService.statsStream.Produce(&msgPack) + err := sService.statsStream.Produce(context.TODO(), &msgPack) if err != nil { log.Println(err) } diff --git a/internal/queryservice/queryservice.go b/internal/queryservice/queryservice.go index 76dfa6ca7bbc60d9a7cd56f361f3c08731bd426c..f82a0d6f0a4fd40e6392c3ef9839846b5eed2f96 100644 --- a/internal/queryservice/queryservice.go +++ b/internal/queryservice/queryservice.go @@ -3,12 +3,15 @@ package queryservice import ( "context" "fmt" + "io" "sort" "strconv" "sync" "sync/atomic" "time" + "github.com/opentracing/opentracing-go" + "github.com/uber/jaeger-client-go/config" nodeclient "github.com/zilliztech/milvus-distributed/internal/distributed/querynode/client" "github.com/zilliztech/milvus-distributed/internal/errors" "github.com/zilliztech/milvus-distributed/internal/msgstream" @@ -64,6 +67,8 @@ type QueryService struct { enableGrpc bool msFactory msgstream.Factory + + closer io.Closer } func (qs *QueryService) Init() error { @@ -76,6 +81,9 @@ func (qs *QueryService) Start() error { } func (qs *QueryService) Stop() error { + if err := qs.closer.Close(); err != nil { + return err + } qs.loopCancel() qs.UpdateStateCode(internalpb2.StateCode_ABNORMAL) return nil @@ -615,6 +623,21 @@ func NewQueryService(ctx context.Context, factory msgstream.Factory) (*QueryServ qcMutex: &sync.Mutex{}, msFactory: factory, } + + cfg := &config.Configuration{ + ServiceName: "query_service", + Sampler: &config.SamplerConfig{ + Type: "const", + Param: 1, + }, + } + tracer, closer, err := cfg.NewTracer() + if err != nil { + panic(fmt.Sprintf("ERROR: cannot init Jaeger: %v\n", err)) + } + opentracing.SetGlobalTracer(tracer) + service.closer = closer + service.UpdateStateCode(internalpb2.StateCode_ABNORMAL) return service, nil } diff --git a/internal/timesync/timetick_watcher.go b/internal/timesync/timetick_watcher.go index 9ba4c87d2443643d700d9df5e2a384494ff1e52e..b833cac31b4f10a2ae70bca3dd1e7bbb281fff7a 100644 --- a/internal/timesync/timetick_watcher.go +++ b/internal/timesync/timetick_watcher.go @@ -39,7 +39,7 @@ func (watcher *MsgTimeTickWatcher) StartBackgroundLoop(ctx context.Context) { msgPack := &ms.MsgPack{} msgPack.Msgs = append(msgPack.Msgs, msg) for _, stream := range watcher.streams { - if err := stream.Broadcast(msgPack); err != nil { + if err := stream.Broadcast(ctx, msgPack); err != nil { log.Printf("stream broadcast failed %s", err.Error()) } }