diff --git a/cmd/querynode/query_node.go b/cmd/querynode/query_node.go index 4c7a529277b328dc66501137b0cb798b6e6e1ebf..12a6639a58fb42c8d38de846f78139e03f6de208 100644 --- a/cmd/querynode/query_node.go +++ b/cmd/querynode/query_node.go @@ -34,14 +34,12 @@ func main() { cancel() }() - if err := svr.Start(); err != nil { - log.Fatal("run server failed", zap.Error(err)) - } + svr.Start() <-ctx.Done() log.Print("Got signal to exit", zap.String("signal", sig.String())) - svr.Close() + svr.Stop() switch sig { case syscall.SIGTERM: exit(0) diff --git a/cmd/singlenode/main.go b/cmd/singlenode/main.go index c6504686ddea7f1b450a83080ad6fc99ac3f6b0f..383fc4f40f0fa7eb89fbb8b55721e7d7ca3d287d 100644 --- a/cmd/singlenode/main.go +++ b/cmd/singlenode/main.go @@ -120,14 +120,12 @@ func InitQueryNode(wg *sync.WaitGroup) { cancel() }() - if err := svr.Start(); err != nil { - log.Fatal("run server failed", zap.Error(err)) - } + svr.Start() <-ctx.Done() log.Print("Got signal to exit", zap.String("signal", sig.String())) - svr.Close() + svr.Stop() switch sig { case syscall.SIGTERM: exit(0) diff --git a/configs/advanced/channel.yaml b/configs/advanced/channel.yaml index 43aaf7060da4c1098c20bfa15e58b1c5f5f62095..fb7c835ae54a093ace4f4d7cb2c27b71d43683dc 100644 --- a/configs/advanced/channel.yaml +++ b/configs/advanced/channel.yaml @@ -21,6 +21,7 @@ msgChannel: proxyTimeTick: "proxyTimeTick" writeNodeTimeTick: "writeNodeTimeTick" # GOOSE TODO: remove this dataNodeTimeTick: "dataNodeTimeTick" + queryNodeTimeTick: "queryNodeTimeTick" dataNodeSegStatistics: "dataNodeSegStatistics" # old name: statsChannels: "statistic" queryNodeStats: "query-node-stats" diff --git a/configs/advanced/query_node.yaml b/configs/advanced/query_node.yaml index ec5e6603b79af3a9a3266cb816b6ec4414190489..2250fcacdaae4a8bc0b1715712e2ebfc44a7109f 100644 --- a/configs/advanced/query_node.yaml +++ b/configs/advanced/query_node.yaml @@ -46,3 +46,6 @@ queryNode: loadIndex: recvBufSize: 512 pulsarBufSize: 512 + + timeTick: + recvBufSize: 64 diff --git a/configs/milvus.yaml b/configs/milvus.yaml index cdba95b8451654baf661695ba233870a22186c53..ad495e53a1d9c96d8c6fb760216077ff76d71280 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -52,6 +52,8 @@ proxyNode: port: 19530 queryNode: + ip: localhost + port: 20010 gracefulTime: 5000 #ms indexBuilder: diff --git a/internal/core/src/segcore/segment_c.cpp b/internal/core/src/segcore/segment_c.cpp index 02db6516b12c64be3983a7897b4e1041c1880234..0f00d324f1299533754a99db69e24b925fe5a9cc 100644 --- a/internal/core/src/segcore/segment_c.cpp +++ b/internal/core/src/segcore/segment_c.cpp @@ -46,9 +46,9 @@ NewSegment(CCollection collection, uint64_t segment_id, SegmentType seg_type) { } void -DeleteSegment(CSegmentInterface segment) { +DeleteSegment(CSegmentInterface c_segment) { // TODO: use dynamic cast, and return c status - auto s = (milvus::segcore::SegmentGrowing*)segment; + auto s = (milvus::segcore::SegmentInterface*)c_segment; std::cout << "delete segment " << std::endl; delete s; @@ -67,7 +67,7 @@ Search(CSegmentInterface c_segment, uint64_t* timestamps, int num_groups, CQueryResult* result) { - auto segment = (milvus::segcore::SegmentGrowing*)c_segment; + auto segment = (milvus::segcore::SegmentInterface*)c_segment; auto plan = (milvus::query::Plan*)c_plan; std::vector<const milvus::query::PlaceholderGroup*> placeholder_groups; for (int i = 0; i < num_groups; ++i) { @@ -102,7 +102,7 @@ Search(CSegmentInterface c_segment, CStatus FillTargetEntry(CSegmentInterface c_segment, CPlan c_plan, CQueryResult c_result) { - auto segment = (milvus::segcore::SegmentGrowing*)c_segment; + auto segment = (milvus::segcore::SegmentInterface*)c_segment; auto plan = (milvus::query::Plan*)c_plan; auto result = (milvus::QueryResult*)c_result; @@ -120,18 +120,19 @@ FillTargetEntry(CSegmentInterface c_segment, CPlan c_plan, CQueryResult c_result int64_t GetMemoryUsageInBytes(CSegmentInterface c_segment) { - auto segment = (milvus::segcore::SegmentGrowing*)c_segment; + auto segment = (milvus::segcore::SegmentInterface*)c_segment; auto mem_size = segment->GetMemoryUsageInBytes(); return mem_size; } int64_t GetRowCount(CSegmentInterface c_segment) { - auto segment = (milvus::segcore::SegmentGrowing*)c_segment; + auto segment = (milvus::segcore::SegmentInterface*)c_segment; auto row_count = segment->get_row_count(); return row_count; } +// TODO: segmentInterface implement get_deleted_count() int64_t GetDeletedCount(CSegmentInterface c_segment) { auto segment = (milvus::segcore::SegmentGrowing*)c_segment; @@ -211,12 +212,15 @@ PreDelete(CSegmentInterface c_segment, int64_t size) { ////////////////////////////// interfaces for sealed segment ////////////////////////////// CStatus LoadFieldData(CSegmentInterface c_segment, CLoadFieldDataInfo load_field_data_info) { - auto segment = (milvus::segcore::SegmentSealed*)c_segment; - try { + auto segment_interface = reinterpret_cast<milvus::segcore::SegmentInterface*>(c_segment); + auto segment = dynamic_cast<milvus::segcore::SegmentSealed*>(segment_interface); + AssertInfo(segment != nullptr, "segment conversion failed"); auto load_info = LoadFieldDataInfo{load_field_data_info.field_id, load_field_data_info.blob, load_field_data_info.row_count}; segment->LoadFieldData(load_info); + std::cout << "load field done, field_id = " << load_info.field_id << ", row count = " << load_info.row_count + << std::endl; auto status = CStatus(); status.error_code = Success; status.error_msg = ""; @@ -229,15 +233,36 @@ LoadFieldData(CSegmentInterface c_segment, CLoadFieldDataInfo load_field_data_in } } +CStatus +UpdateSealedSegmentIndex(CSegmentInterface c_segment, CLoadIndexInfo c_load_index_info) { + auto status = CStatus(); + try { + auto segment_interface = reinterpret_cast<milvus::segcore::SegmentInterface*>(c_segment); + auto segment = dynamic_cast<milvus::segcore::SegmentSealed*>(segment_interface); + AssertInfo(segment != nullptr, "segment conversion failed"); + auto load_index_info = (LoadIndexInfo*)c_load_index_info; + segment->LoadIndex(*load_index_info); + status.error_code = Success; + status.error_msg = ""; + return status; + } catch (std::exception& e) { + status.error_code = UnexpectedException; + status.error_msg = strdup(e.what()); + return status; + } +} + ////////////////////////////// deprecated interfaces ////////////////////////////// CStatus UpdateSegmentIndex(CSegmentInterface c_segment, CLoadIndexInfo c_load_index_info) { auto status = CStatus(); try { - auto segment = (milvus::segcore::SegmentGrowing*)c_segment; + auto segment_interface = reinterpret_cast<milvus::segcore::SegmentInterface*>(c_segment); + auto segment = dynamic_cast<milvus::segcore::SegmentGrowing*>(segment_interface); + AssertInfo(segment != nullptr, "segment conversion failed"); auto load_index_info = (LoadIndexInfo*)c_load_index_info; auto res = segment->LoadIndexing(*load_index_info); - status.error_code = Success; + status.error_code = res.code(); status.error_msg = ""; return status; } catch (std::exception& e) { diff --git a/internal/core/src/segcore/segment_c.h b/internal/core/src/segcore/segment_c.h index 2330eac08a282e2a4cea24787201aabe6796870b..3347af2acdd8a1d1a8c819df7339e65e99c27761 100644 --- a/internal/core/src/segcore/segment_c.h +++ b/internal/core/src/segcore/segment_c.h @@ -31,7 +31,7 @@ CSegmentInterface NewSegment(CCollection collection, uint64_t segment_id, SegmentType seg_type); void -DeleteSegment(CSegmentInterface segment); +DeleteSegment(CSegmentInterface c_segment); void DeleteQueryResult(CQueryResult query_result); @@ -67,11 +67,9 @@ Insert(CSegmentInterface c_segment, int sizeof_per_row, int64_t count); -// interface for growing segment int64_t PreInsert(CSegmentInterface c_segment, int64_t size); -// interface for growing segment CStatus Delete(CSegmentInterface c_segment, int64_t reserved_offset, @@ -79,7 +77,6 @@ Delete(CSegmentInterface c_segment, const int64_t* row_ids, const uint64_t* timestamps); -// interface for growing segment int64_t PreDelete(CSegmentInterface c_segment, int64_t size); @@ -87,19 +84,19 @@ PreDelete(CSegmentInterface c_segment, int64_t size); CStatus LoadFieldData(CSegmentInterface c_segment, CLoadFieldDataInfo load_field_data_info); +CStatus +UpdateSealedSegmentIndex(CSegmentInterface c_segment, CLoadIndexInfo c_load_index_info); + ////////////////////////////// deprecated interfaces ////////////////////////////// CStatus UpdateSegmentIndex(CSegmentInterface c_segment, CLoadIndexInfo c_load_index_info); -// deprecated int Close(CSegmentInterface c_segment); -// deprecated int BuildIndex(CCollection c_collection, CSegmentInterface c_segment); -// deprecated bool IsOpened(CSegmentInterface c_segment); diff --git a/internal/core/unittest/test_c_api.cpp b/internal/core/unittest/test_c_api.cpp index 16150d91819cd390af6c0d34d4d850b5f4353349..ba153c6539ad87890bbf38e29f2038f06d72d6ee 100644 --- a/internal/core/unittest/test_c_api.cpp +++ b/internal/core/unittest/test_c_api.cpp @@ -1519,4 +1519,51 @@ TEST(CApiTest, UpdateSegmentIndex_With_binary_Predicate_Term) { DeleteQueryResult(c_search_result_on_bigIndex); DeleteCollection(collection); DeleteSegment(segment); -} \ No newline at end of file +} + +TEST(CApiTest, SealedSegmentTest) { + auto schema_tmp_conf = R"(name: "test" + fields: < + fieldID: 100 + name: "vec" + data_type: VECTOR_FLOAT + type_params: < + key: "dim" + value: "16" + > + index_params: < + key: "metric_type" + value: "L2" + > + > + fields: < + fieldID: 101 + name: "age" + data_type: INT32 + type_params: < + key: "dim" + value: "1" + > + >)"; + auto collection = NewCollection(schema_tmp_conf); + auto segment = NewSegment(collection, 0, Sealed); + + int N = 10000; + std::default_random_engine e(67); + auto ages = std::vector<int32_t>(N); + for (auto& age : ages) { + age = e() % 2000; + } + auto blob = (void*)(&ages[0]); + + auto load_info = CLoadFieldDataInfo{101, blob, N}; + + // TODO: open load test + // auto res = LoadFieldData(segment, load_info); + // assert(res.error_code == Success); + // auto count = GetRowCount(segment); + // assert(count == N); + + DeleteCollection(collection); + DeleteSegment(segment); +} diff --git a/internal/distributed/indexservice/client/client.go b/internal/distributed/indexservice/client/client.go index e245ecd330a1e07efa6e412790f709c456502803..816887c0a2497125bd7b50173ed9f67f4f305352 100644 --- a/internal/distributed/indexservice/client/client.go +++ b/internal/distributed/indexservice/client/client.go @@ -34,7 +34,7 @@ func (g Client) GetIndexStates(req *indexpb.IndexStatesRequest) (*indexpb.IndexS return g.grpcClient.GetIndexStates(ctx, req) } -func (g Client) GetIndexFilePaths(req *indexpb.IndexFilePathRequest) (*indexpb.IndexFilePathsResponse, error) { +func (g Client) GetIndexFilePaths(req *indexpb.IndexFilePathsRequest) (*indexpb.IndexFilePathsResponse, error) { ctx := context.TODO() diff --git a/internal/distributed/indexservice/service.go b/internal/distributed/indexservice/service.go index 7bf384d867ad27dbbf8da20a32e51b6d18220d03..37f9f2646cdce9ae362eb605625dbbec9192d77d 100644 --- a/internal/distributed/indexservice/service.go +++ b/internal/distributed/indexservice/service.go @@ -69,7 +69,7 @@ func (s *Server) GetIndexStates(ctx context.Context, req *indexpb.IndexStatesReq return s.server.GetIndexStates(req) } -func (s *Server) GetIndexFilePaths(ctx context.Context, req *indexpb.IndexFilePathRequest) (*indexpb.IndexFilePathsResponse, error) { +func (s *Server) GetIndexFilePaths(ctx context.Context, req *indexpb.IndexFilePathsRequest) (*indexpb.IndexFilePathsResponse, error) { return s.server.GetIndexFilePaths(req) } diff --git a/internal/distributed/querynode/service.go b/internal/distributed/querynode/service.go index 68a7b817a61665ea409b212bc9211c8a937d3b6c..c59d8a154d66ffa731325f59559915eed08bbf93 100644 --- a/internal/distributed/querynode/service.go +++ b/internal/distributed/querynode/service.go @@ -38,9 +38,7 @@ func (s *Server) StartGrpcServer() { func (s *Server) Start() { go s.StartGrpcServer() - if err := s.node.Start(); err != nil { - panic(err) - } + s.node.Start() } func (s *Server) AddQueryChannel(ctx context.Context, in *querypb.AddQueryChannelsRequest) (*commonpb.Status, error) { diff --git a/internal/indexnode/client/client.go b/internal/indexnode/client/client.go index 09c35ac0c2e0015c26bf00aa362861b259350a89..6cd98e9abc219a9509ba3566977b363191258d90 100644 --- a/internal/indexnode/client/client.go +++ b/internal/indexnode/client/client.go @@ -133,13 +133,13 @@ func (c *Client) GetIndexStates(indexIDs []UniqueID) (*indexpb.IndexStatesRespon return response, err } -func (c *Client) GetIndexFilePaths(indexID UniqueID) ([]string, error) { +func (c *Client) GetIndexFilePaths(indexIDs []UniqueID) ([][]string, error) { if c.tryConnect() != nil { panic("GetIndexFilePaths: failed to connect index builder") } ctx := context.TODO() - request := &indexpb.IndexFilePathRequest{ - IndexID: indexID, + request := &indexpb.IndexFilePathsRequest{ + IndexIDs: indexIDs, } response, err := c.client.GetIndexFilePaths(ctx, request) @@ -147,5 +147,15 @@ func (c *Client) GetIndexFilePaths(indexID UniqueID) ([]string, error) { return nil, err } - return response.IndexFilePaths, nil + var filePaths [][]string + for _, indexID := range indexIDs { + for _, filePathInfo := range response.FilePaths { + if indexID == filePathInfo.IndexID { + filePaths = append(filePaths, filePathInfo.IndexFilePaths) + break + } + } + } + + return filePaths, nil } diff --git a/internal/indexnode/grpc_service.go b/internal/indexnode/grpc_service.go index e5753be9f67811ae8cca6a07bfb00c56a1b05feb..abd54d4eb07c3302a0a88ce948b532f8e9a022ab 100644 --- a/internal/indexnode/grpc_service.go +++ b/internal/indexnode/grpc_service.go @@ -74,16 +74,25 @@ func (b *Builder) GetIndexStates(ctx context.Context, request *indexpb.IndexStat return ret, nil } -func (b *Builder) GetIndexFilePaths(ctx context.Context, request *indexpb.IndexFilePathRequest) (*indexpb.IndexFilePathsResponse, error) { +func (b *Builder) GetIndexFilePaths(ctx context.Context, request *indexpb.IndexFilePathsRequest) (*indexpb.IndexFilePathsResponse, error) { ret := &indexpb.IndexFilePathsResponse{ - Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_SUCCESS}, - IndexID: request.IndexID, + Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_SUCCESS}, } - filePaths, err := b.metaTable.GetIndexFilePaths(request.IndexID) - if err != nil { - ret.Status.ErrorCode = commonpb.ErrorCode_UNEXPECTED_ERROR - ret.Status.Reason = err.Error() + var filePathInfos []*indexpb.IndexFilePathInfo + for _, indexID := range request.IndexIDs { + filePathInfo := &indexpb.IndexFilePathInfo{ + Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_SUCCESS}, + IndexID: indexID, + } + + filePaths, err := b.metaTable.GetIndexFilePaths(indexID) + if err != nil { + filePathInfo.Status.ErrorCode = commonpb.ErrorCode_UNEXPECTED_ERROR + filePathInfo.Status.Reason = err.Error() + } + filePathInfo.IndexFilePaths = filePaths + filePathInfos = append(filePathInfos, filePathInfo) } - ret.IndexFilePaths = filePaths + ret.FilePaths = filePathInfos return ret, nil } diff --git a/internal/indexnode/indexnode_test.go b/internal/indexnode/indexnode_test.go index 5a8d5b8bd8e6633434680ca5c983ca1f3f10802a..5ffc5a533c52b8fcad0052de3108e32fdda6767b 100644 --- a/internal/indexnode/indexnode_test.go +++ b/internal/indexnode/indexnode_test.go @@ -124,8 +124,7 @@ func TestBuilder_GRPC(t *testing.T) { assert.Equal(t, commonpb.IndexState_INPROGRESS, description.States[0].State) assert.Equal(t, indexID, description.States[0].IndexID) - indexDataPaths, err := buildClient.GetIndexFilePaths(indexID) + indexDataPaths, err := buildClient.GetIndexFilePaths([]UniqueID{indexID}) assert.Nil(t, err) - assert.Nil(t, indexDataPaths) - + assert.Nil(t, indexDataPaths[0]) } diff --git a/internal/indexservice/indexservice.go b/internal/indexservice/indexservice.go index 862de4f678e03c0478ae318807176910a69a3be8..2a15ffe02294a6259a9bc8da4d88373b13387d67 100644 --- a/internal/indexservice/indexservice.go +++ b/internal/indexservice/indexservice.go @@ -149,7 +149,7 @@ func (i *IndexService) GetIndexStates(req *indexpb.IndexStatesRequest) (*indexpb return ret, nil } -func (i *IndexService) GetIndexFilePaths(req *indexpb.IndexFilePathRequest) (*indexpb.IndexFilePathsResponse, error) { +func (i *IndexService) GetIndexFilePaths(req *indexpb.IndexFilePathsRequest) (*indexpb.IndexFilePathsResponse, error) { panic("implement me") } diff --git a/internal/indexservice/interface.go b/internal/indexservice/interface.go index ba62d22e35924c44f8af5d001cab3477c5c6505f..9f6ec2cbccf75619c4599dfbf898dcb3797a5b29 100644 --- a/internal/indexservice/interface.go +++ b/internal/indexservice/interface.go @@ -13,6 +13,6 @@ type Interface interface { RegisterNode(req *indexpb.RegisterNodeRequest) (*indexpb.RegisterNodeResponse, error) BuildIndex(req *indexpb.BuildIndexRequest) (*indexpb.BuildIndexResponse, error) GetIndexStates(req *indexpb.IndexStatesRequest) (*indexpb.IndexStatesResponse, error) - GetIndexFilePaths(req *indexpb.IndexFilePathRequest) (*indexpb.IndexFilePathsResponse, error) + GetIndexFilePaths(req *indexpb.IndexFilePathsRequest) (*indexpb.IndexFilePathsResponse, error) NotifyBuildIndex(nty *indexpb.BuildIndexNotification) (*commonpb.Status, error) } diff --git a/internal/master/client.go b/internal/master/client.go index 9851210405c02d86ca867fe132d5b5fe98aa683f..169145302ce97636d4043e513a2c2b3bc586dcdc 100644 --- a/internal/master/client.go +++ b/internal/master/client.go @@ -65,7 +65,7 @@ func (m *MockWriteNodeClient) GetInsertBinlogPaths(segmentID UniqueID) (map[Uniq type BuildIndexClient interface { BuildIndex(columnDataPaths []string, typeParams map[string]string, indexParams map[string]string) (UniqueID, error) GetIndexStates(indexIDs []UniqueID) (*indexpb.IndexStatesResponse, error) - GetIndexFilePaths(indexID UniqueID) ([]string, error) + GetIndexFilePaths(indexID []UniqueID) ([][]string, error) } type MockBuildIndexClient struct { @@ -107,8 +107,8 @@ func (m *MockBuildIndexClient) GetIndexStates(indexIDs []UniqueID) (*indexpb.Ind return ret, nil } -func (m *MockBuildIndexClient) GetIndexFilePaths(indexID UniqueID) ([]string, error) { - return []string{"/binlog/index/file_1", "/binlog/index/file_2", "/binlog/index/file_3"}, nil +func (m *MockBuildIndexClient) GetIndexFilePaths(indexIDs []UniqueID) ([][]string, error) { + return [][]string{{"/binlog/index/file_1", "/binlog/index/file_2", "/binlog/index/file_3"}}, nil } type LoadIndexClient interface { diff --git a/internal/master/index_builder_scheduler.go b/internal/master/index_builder_scheduler.go index 9a067c070f0d90dffd2898915dd0708955566746..450821d46adc1249909c3e763d9b60c7384973e4 100644 --- a/internal/master/index_builder_scheduler.go +++ b/internal/master/index_builder_scheduler.go @@ -116,10 +116,11 @@ func (scheduler *IndexBuildScheduler) describe() error { } if description.States[0].State == commonpb.IndexState_FINISHED { log.Printf("build index for segment %d field %d is finished", indexBuildInfo.segmentID, indexBuildInfo.fieldID) - filePaths, err := scheduler.client.GetIndexFilePaths(indexID) + filesPaths, err := scheduler.client.GetIndexFilePaths([]UniqueID{indexID}) if err != nil { return err } + filePaths := filesPaths[0] //TODO: remove fileName var fieldName string diff --git a/internal/proto/index_service.proto b/internal/proto/index_service.proto index b95d738b1d258542a292030eef50415f5f70c3e9..7710f018eb8e1a9677cc13501635a4bba60780fd 100644 --- a/internal/proto/index_service.proto +++ b/internal/proto/index_service.proto @@ -34,9 +34,9 @@ message IndexStatesResponse { } message BuildIndexRequest { - repeated string data_paths = 2; - repeated common.KeyValuePair type_params = 3; - repeated common.KeyValuePair index_params = 4; + repeated string data_paths = 1; + repeated common.KeyValuePair type_params = 2; + repeated common.KeyValuePair index_params = 3; } message BuildIndexResponse { @@ -56,16 +56,21 @@ message BuildIndexNotification { repeated string index_file_paths = 3; } -message IndexFilePathRequest { - int64 indexID = 1; +message IndexFilePathsRequest { + repeated int64 indexIDs = 1; } -message IndexFilePathsResponse { +message IndexFilePathInfo { common.Status status = 1; int64 indexID = 2; repeated string index_file_paths = 3; } +message IndexFilePathsResponse { + common.Status status = 1; + repeated IndexFilePathInfo file_paths = 2; +} + message IndexMeta { common.IndexState state = 1; int64 indexID = 2; @@ -88,7 +93,7 @@ service IndexService { rpc RegisterNode(RegisterNodeRequest) returns (RegisterNodeResponse) {} rpc BuildIndex(BuildIndexRequest) returns (BuildIndexResponse){} rpc GetIndexStates(IndexStatesRequest) returns (IndexStatesResponse) {} - rpc GetIndexFilePaths(IndexFilePathRequest) returns (IndexFilePathsResponse){} + rpc GetIndexFilePaths(IndexFilePathsRequest) returns (IndexFilePathsResponse){} rpc NotifyBuildIndex(BuildIndexNotification) returns (common.Status) {} } diff --git a/internal/proto/indexpb/index_service.pb.go b/internal/proto/indexpb/index_service.pb.go index af51c4608fe78f42a450d351d626f69976583b44..aeb4c820b196ef1f50a67f11a5e518c80a3dbe0e 100644 --- a/internal/proto/indexpb/index_service.pb.go +++ b/internal/proto/indexpb/index_service.pb.go @@ -262,9 +262,9 @@ func (m *IndexStatesResponse) GetStates() []*IndexInfo { } type BuildIndexRequest struct { - DataPaths []string `protobuf:"bytes,2,rep,name=data_paths,json=dataPaths,proto3" json:"data_paths,omitempty"` - TypeParams []*commonpb.KeyValuePair `protobuf:"bytes,3,rep,name=type_params,json=typeParams,proto3" json:"type_params,omitempty"` - IndexParams []*commonpb.KeyValuePair `protobuf:"bytes,4,rep,name=index_params,json=indexParams,proto3" json:"index_params,omitempty"` + DataPaths []string `protobuf:"bytes,1,rep,name=data_paths,json=dataPaths,proto3" json:"data_paths,omitempty"` + TypeParams []*commonpb.KeyValuePair `protobuf:"bytes,2,rep,name=type_params,json=typeParams,proto3" json:"type_params,omitempty"` + IndexParams []*commonpb.KeyValuePair `protobuf:"bytes,3,rep,name=index_params,json=indexParams,proto3" json:"index_params,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` @@ -465,46 +465,46 @@ func (m *BuildIndexNotification) GetIndexFilePaths() []string { return nil } -type IndexFilePathRequest struct { - IndexID int64 `protobuf:"varint,1,opt,name=indexID,proto3" json:"indexID,omitempty"` +type IndexFilePathsRequest struct { + IndexIDs []int64 `protobuf:"varint,1,rep,packed,name=indexIDs,proto3" json:"indexIDs,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` } -func (m *IndexFilePathRequest) Reset() { *m = IndexFilePathRequest{} } -func (m *IndexFilePathRequest) String() string { return proto.CompactTextString(m) } -func (*IndexFilePathRequest) ProtoMessage() {} -func (*IndexFilePathRequest) Descriptor() ([]byte, []int) { +func (m *IndexFilePathsRequest) Reset() { *m = IndexFilePathsRequest{} } +func (m *IndexFilePathsRequest) String() string { return proto.CompactTextString(m) } +func (*IndexFilePathsRequest) ProtoMessage() {} +func (*IndexFilePathsRequest) Descriptor() ([]byte, []int) { return fileDescriptor_a5d2036b4df73e0a, []int{9} } -func (m *IndexFilePathRequest) XXX_Unmarshal(b []byte) error { - return xxx_messageInfo_IndexFilePathRequest.Unmarshal(m, b) +func (m *IndexFilePathsRequest) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_IndexFilePathsRequest.Unmarshal(m, b) } -func (m *IndexFilePathRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { - return xxx_messageInfo_IndexFilePathRequest.Marshal(b, m, deterministic) +func (m *IndexFilePathsRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_IndexFilePathsRequest.Marshal(b, m, deterministic) } -func (m *IndexFilePathRequest) XXX_Merge(src proto.Message) { - xxx_messageInfo_IndexFilePathRequest.Merge(m, src) +func (m *IndexFilePathsRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_IndexFilePathsRequest.Merge(m, src) } -func (m *IndexFilePathRequest) XXX_Size() int { - return xxx_messageInfo_IndexFilePathRequest.Size(m) +func (m *IndexFilePathsRequest) XXX_Size() int { + return xxx_messageInfo_IndexFilePathsRequest.Size(m) } -func (m *IndexFilePathRequest) XXX_DiscardUnknown() { - xxx_messageInfo_IndexFilePathRequest.DiscardUnknown(m) +func (m *IndexFilePathsRequest) XXX_DiscardUnknown() { + xxx_messageInfo_IndexFilePathsRequest.DiscardUnknown(m) } -var xxx_messageInfo_IndexFilePathRequest proto.InternalMessageInfo +var xxx_messageInfo_IndexFilePathsRequest proto.InternalMessageInfo -func (m *IndexFilePathRequest) GetIndexID() int64 { +func (m *IndexFilePathsRequest) GetIndexIDs() []int64 { if m != nil { - return m.IndexID + return m.IndexIDs } - return 0 + return nil } -type IndexFilePathsResponse struct { +type IndexFilePathInfo struct { Status *commonpb.Status `protobuf:"bytes,1,opt,name=status,proto3" json:"status,omitempty"` IndexID int64 `protobuf:"varint,2,opt,name=indexID,proto3" json:"indexID,omitempty"` IndexFilePaths []string `protobuf:"bytes,3,rep,name=index_file_paths,json=indexFilePaths,proto3" json:"index_file_paths,omitempty"` @@ -513,11 +513,65 @@ type IndexFilePathsResponse struct { XXX_sizecache int32 `json:"-"` } +func (m *IndexFilePathInfo) Reset() { *m = IndexFilePathInfo{} } +func (m *IndexFilePathInfo) String() string { return proto.CompactTextString(m) } +func (*IndexFilePathInfo) ProtoMessage() {} +func (*IndexFilePathInfo) Descriptor() ([]byte, []int) { + return fileDescriptor_a5d2036b4df73e0a, []int{10} +} + +func (m *IndexFilePathInfo) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_IndexFilePathInfo.Unmarshal(m, b) +} +func (m *IndexFilePathInfo) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_IndexFilePathInfo.Marshal(b, m, deterministic) +} +func (m *IndexFilePathInfo) XXX_Merge(src proto.Message) { + xxx_messageInfo_IndexFilePathInfo.Merge(m, src) +} +func (m *IndexFilePathInfo) XXX_Size() int { + return xxx_messageInfo_IndexFilePathInfo.Size(m) +} +func (m *IndexFilePathInfo) XXX_DiscardUnknown() { + xxx_messageInfo_IndexFilePathInfo.DiscardUnknown(m) +} + +var xxx_messageInfo_IndexFilePathInfo proto.InternalMessageInfo + +func (m *IndexFilePathInfo) GetStatus() *commonpb.Status { + if m != nil { + return m.Status + } + return nil +} + +func (m *IndexFilePathInfo) GetIndexID() int64 { + if m != nil { + return m.IndexID + } + return 0 +} + +func (m *IndexFilePathInfo) GetIndexFilePaths() []string { + if m != nil { + return m.IndexFilePaths + } + return nil +} + +type IndexFilePathsResponse struct { + Status *commonpb.Status `protobuf:"bytes,1,opt,name=status,proto3" json:"status,omitempty"` + FilePaths []*IndexFilePathInfo `protobuf:"bytes,2,rep,name=file_paths,json=filePaths,proto3" json:"file_paths,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + func (m *IndexFilePathsResponse) Reset() { *m = IndexFilePathsResponse{} } func (m *IndexFilePathsResponse) String() string { return proto.CompactTextString(m) } func (*IndexFilePathsResponse) ProtoMessage() {} func (*IndexFilePathsResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_a5d2036b4df73e0a, []int{10} + return fileDescriptor_a5d2036b4df73e0a, []int{11} } func (m *IndexFilePathsResponse) XXX_Unmarshal(b []byte) error { @@ -545,16 +599,9 @@ func (m *IndexFilePathsResponse) GetStatus() *commonpb.Status { return nil } -func (m *IndexFilePathsResponse) GetIndexID() int64 { +func (m *IndexFilePathsResponse) GetFilePaths() []*IndexFilePathInfo { if m != nil { - return m.IndexID - } - return 0 -} - -func (m *IndexFilePathsResponse) GetIndexFilePaths() []string { - if m != nil { - return m.IndexFilePaths + return m.FilePaths } return nil } @@ -576,7 +623,7 @@ func (m *IndexMeta) Reset() { *m = IndexMeta{} } func (m *IndexMeta) String() string { return proto.CompactTextString(m) } func (*IndexMeta) ProtoMessage() {} func (*IndexMeta) Descriptor() ([]byte, []int) { - return fileDescriptor_a5d2036b4df73e0a, []int{11} + return fileDescriptor_a5d2036b4df73e0a, []int{12} } func (m *IndexMeta) XXX_Unmarshal(b []byte) error { @@ -656,7 +703,8 @@ func init() { proto.RegisterType((*BuildIndexResponse)(nil), "milvus.proto.index.BuildIndexResponse") proto.RegisterType((*BuildIndexCmd)(nil), "milvus.proto.index.BuildIndexCmd") proto.RegisterType((*BuildIndexNotification)(nil), "milvus.proto.index.BuildIndexNotification") - proto.RegisterType((*IndexFilePathRequest)(nil), "milvus.proto.index.IndexFilePathRequest") + proto.RegisterType((*IndexFilePathsRequest)(nil), "milvus.proto.index.IndexFilePathsRequest") + proto.RegisterType((*IndexFilePathInfo)(nil), "milvus.proto.index.IndexFilePathInfo") proto.RegisterType((*IndexFilePathsResponse)(nil), "milvus.proto.index.IndexFilePathsResponse") proto.RegisterType((*IndexMeta)(nil), "milvus.proto.index.IndexMeta") } @@ -664,55 +712,57 @@ func init() { func init() { proto.RegisterFile("index_service.proto", fileDescriptor_a5d2036b4df73e0a) } var fileDescriptor_a5d2036b4df73e0a = []byte{ - // 757 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xb4, 0x55, 0x5d, 0x4f, 0xdb, 0x4a, - 0x10, 0xc5, 0x38, 0x04, 0x65, 0x12, 0x22, 0xd8, 0x20, 0x14, 0xe5, 0x5e, 0x74, 0xc1, 0x57, 0x17, - 0x22, 0xa4, 0xeb, 0xa0, 0x20, 0xda, 0xc7, 0x8a, 0x80, 0x5a, 0x45, 0x15, 0x08, 0xb9, 0x55, 0x1f, - 0x5a, 0x55, 0x91, 0x63, 0x0f, 0x64, 0x55, 0x7f, 0x04, 0xef, 0x1a, 0x15, 0x5e, 0xaa, 0xaa, 0x3f, - 0xa0, 0xea, 0x6f, 0xe9, 0x6b, 0x7f, 0x5c, 0xe5, 0xdd, 0x75, 0x12, 0x83, 0x49, 0x40, 0xd0, 0x37, - 0xef, 0xee, 0x99, 0x33, 0xb3, 0xe7, 0xcc, 0xac, 0xa1, 0x46, 0x03, 0x17, 0x3f, 0xf7, 0x18, 0x46, - 0x97, 0xd4, 0x41, 0x73, 0x18, 0x85, 0x3c, 0x24, 0xc4, 0xa7, 0xde, 0x65, 0xcc, 0xe4, 0xca, 0x14, - 0x88, 0x46, 0xc5, 0x09, 0x7d, 0x3f, 0x0c, 0xe4, 0x5e, 0xa3, 0x4a, 0x03, 0x8e, 0x51, 0x60, 0x7b, - 0x72, 0x6d, 0x7c, 0x81, 0x9a, 0x85, 0xe7, 0x94, 0x71, 0x8c, 0x4e, 0x42, 0x17, 0x2d, 0xbc, 0x88, - 0x91, 0x71, 0xb2, 0x0b, 0x85, 0xbe, 0xcd, 0xb0, 0xae, 0x6d, 0x68, 0xcd, 0x72, 0xfb, 0x6f, 0x33, - 0xc3, 0xab, 0x08, 0x8f, 0xd9, 0x79, 0xc7, 0x66, 0x68, 0x09, 0x24, 0x79, 0x06, 0x8b, 0xb6, 0xeb, - 0x46, 0xc8, 0x58, 0x7d, 0x7e, 0x4a, 0xd0, 0x81, 0xc4, 0x58, 0x29, 0xd8, 0xf8, 0xae, 0xc1, 0x6a, - 0xb6, 0x02, 0x36, 0x0c, 0x03, 0x86, 0x64, 0x0f, 0x8a, 0x8c, 0xdb, 0x3c, 0x66, 0xaa, 0x88, 0xbf, - 0x72, 0xf9, 0xde, 0x08, 0x88, 0xa5, 0xa0, 0xa4, 0x03, 0x65, 0x1a, 0x50, 0xde, 0x1b, 0xda, 0x91, - 0xed, 0xa7, 0x95, 0x6c, 0x9a, 0x37, 0x64, 0x51, 0x0a, 0x74, 0x03, 0xca, 0x4f, 0x05, 0xd0, 0x02, - 0x3a, 0xfa, 0x36, 0x4c, 0x20, 0xdd, 0x44, 0xb9, 0x84, 0x1a, 0x59, 0xaa, 0x48, 0x1d, 0x16, 0x85, - 0x9e, 0xdd, 0xa3, 0xba, 0xb6, 0xa1, 0x37, 0x75, 0x2b, 0x5d, 0x1a, 0x1c, 0x4a, 0x02, 0xdf, 0x0d, - 0xce, 0x42, 0xb2, 0x0f, 0x0b, 0x49, 0x29, 0x52, 0xb9, 0x6a, 0xfb, 0x9f, 0xdc, 0xa2, 0xc7, 0xf4, - 0x96, 0x44, 0x4f, 0xb2, 0x27, 0x35, 0x8f, 0xd9, 0xc9, 0x1a, 0x14, 0x2d, 0xb4, 0x59, 0x18, 0xd4, - 0xf5, 0x0d, 0xad, 0x59, 0xb2, 0xd4, 0xca, 0xf8, 0xaa, 0x41, 0x2d, 0x53, 0xe6, 0x63, 0x64, 0xdb, - 0x97, 0x41, 0x98, 0x28, 0xa6, 0x37, 0xcb, 0xed, 0x75, 0xf3, 0x76, 0x23, 0x99, 0xa3, 0x4b, 0x5a, - 0x0a, 0x6c, 0xfc, 0xd2, 0x60, 0xa5, 0x13, 0x53, 0xcf, 0x15, 0x47, 0xa9, 0x52, 0xeb, 0x00, 0xae, - 0xcd, 0xed, 0xde, 0xd0, 0xe6, 0x03, 0x49, 0x58, 0xb2, 0x4a, 0xc9, 0xce, 0x69, 0xb2, 0x91, 0x58, - 0xc4, 0xaf, 0x86, 0x98, 0x5a, 0xa4, 0x8b, 0x84, 0x9b, 0xb9, 0x55, 0xbe, 0xc6, 0xab, 0x77, 0xb6, - 0x17, 0xe3, 0xa9, 0x4d, 0x23, 0x0b, 0x92, 0x28, 0x69, 0x11, 0x39, 0x82, 0x8a, 0x6c, 0x7f, 0x45, - 0x52, 0xb8, 0x2f, 0x49, 0x59, 0x84, 0x29, 0xa3, 0x1d, 0x20, 0x93, 0xd5, 0x3f, 0x46, 0xc0, 0x3b, - 0xfd, 0x33, 0xfa, 0xb0, 0x34, 0x4e, 0x72, 0xe8, 0xbb, 0xd9, 0x46, 0xca, 0x58, 0xfd, 0x1c, 0xf4, - 0x08, 0x2f, 0x54, 0xd3, 0xfe, 0x97, 0x67, 0xc1, 0x2d, 0xb1, 0xad, 0x24, 0xc2, 0xf8, 0xa1, 0xc1, - 0xda, 0xf8, 0xe8, 0x24, 0xe4, 0xf4, 0x8c, 0x3a, 0x36, 0xa7, 0x61, 0xf0, 0xc4, 0xb7, 0x21, 0x4d, - 0x58, 0x96, 0xc2, 0x9f, 0x51, 0x0f, 0x95, 0xc3, 0xba, 0x70, 0xb8, 0x2a, 0xf6, 0x5f, 0x52, 0x0f, - 0x85, 0xcd, 0xc6, 0x2e, 0xac, 0x76, 0x27, 0x77, 0x72, 0xe7, 0x28, 0xa3, 0x54, 0x72, 0x8b, 0x4c, - 0x08, 0xfb, 0x43, 0x9e, 0x3c, 0xe0, 0x16, 0x3f, 0xe7, 0xd5, 0x70, 0x1f, 0x23, 0xb7, 0x9f, 0x7e, - 0xb8, 0xd7, 0x01, 0x30, 0xb8, 0x88, 0xb1, 0xc7, 0xa9, 0x8f, 0x62, 0xc0, 0x75, 0xab, 0x24, 0x76, - 0xde, 0x52, 0x1f, 0xc9, 0xbf, 0xb0, 0xc4, 0x9c, 0x01, 0xba, 0xb1, 0xa7, 0x10, 0x05, 0x81, 0xa8, - 0xa4, 0x9b, 0x02, 0x64, 0x42, 0xad, 0x9f, 0x78, 0xdf, 0x73, 0x42, 0x7f, 0xe8, 0x21, 0x57, 0xd0, - 0x05, 0x01, 0x5d, 0x11, 0x47, 0x87, 0xea, 0x44, 0xe0, 0x55, 0x97, 0x15, 0x1f, 0xda, 0x65, 0xb9, - 0xaa, 0x2d, 0xe6, 0xa9, 0xd6, 0xfe, 0x56, 0x80, 0x8a, 0x94, 0x41, 0xfe, 0x9d, 0x88, 0x03, 0x95, - 0xc9, 0x37, 0x9e, 0x6c, 0xe7, 0xa5, 0xcd, 0xf9, 0x0f, 0x35, 0x9a, 0xb3, 0x81, 0xb2, 0x45, 0x8c, - 0x39, 0xf2, 0x11, 0x60, 0x5c, 0x39, 0xb9, 0xdf, 0xcd, 0x1a, 0x5b, 0xb3, 0x60, 0x23, 0x7a, 0x07, - 0xaa, 0xaf, 0x90, 0x4f, 0x3c, 0xb9, 0x64, 0xeb, 0xce, 0x57, 0x32, 0xf3, 0xeb, 0x68, 0x6c, 0xcf, - 0xc4, 0x8d, 0x92, 0x7c, 0x82, 0x95, 0x34, 0xc9, 0x48, 0x4e, 0xd2, 0xbc, 0x33, 0xfe, 0xc6, 0x70, - 0x35, 0x76, 0x66, 0x22, 0x59, 0x46, 0xb0, 0x65, 0xf1, 0x56, 0x5c, 0x4d, 0xc8, 0xb6, 0x33, 0x5d, - 0x8f, 0xc9, 0xb7, 0xa5, 0x31, 0x6d, 0x0a, 0x8d, 0xb9, 0xf6, 0x07, 0x35, 0x3a, 0xc2, 0xf1, 0x93, - 0x8c, 0x39, 0x9b, 0xd3, 0xb3, 0x1c, 0xfa, 0xee, 0x0c, 0xf2, 0xce, 0xc1, 0xfb, 0x17, 0xe7, 0x94, - 0x0f, 0xe2, 0x7e, 0x72, 0xd2, 0xba, 0xa6, 0x9e, 0x47, 0xaf, 0x39, 0x3a, 0x83, 0x96, 0x8c, 0xfa, - 0xdf, 0xa5, 0x8c, 0x47, 0xb4, 0x1f, 0x73, 0x74, 0x5b, 0xe9, 0x0f, 0xbf, 0x25, 0xa8, 0x5a, 0x22, - 0xdb, 0xb0, 0xdf, 0x2f, 0x8a, 0xe5, 0xde, 0xef, 0x00, 0x00, 0x00, 0xff, 0xff, 0x4d, 0x32, 0xc8, - 0x07, 0x4a, 0x09, 0x00, 0x00, + // 793 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xc4, 0x56, 0x5d, 0x4f, 0xe3, 0x46, + 0x14, 0xc5, 0x18, 0x42, 0x73, 0x13, 0x22, 0x32, 0x69, 0x51, 0x94, 0x16, 0x15, 0x5c, 0x01, 0x29, + 0x52, 0x9d, 0x2a, 0x88, 0xf6, 0xb1, 0x22, 0xa0, 0x56, 0x51, 0x05, 0x42, 0xd3, 0xaa, 0x0f, 0xad, + 0xaa, 0xc8, 0xb1, 0x6f, 0xc8, 0x48, 0xfe, 0x08, 0x9e, 0x31, 0x5a, 0x78, 0x59, 0xad, 0xb4, 0x8f, + 0x2b, 0xad, 0x56, 0xfb, 0x53, 0xf6, 0x75, 0x7f, 0xdc, 0xca, 0xe3, 0xb1, 0x63, 0x43, 0x48, 0x58, + 0x81, 0xb4, 0x6f, 0x99, 0xeb, 0x73, 0x3f, 0xe6, 0x9c, 0x33, 0x33, 0x81, 0x06, 0xf3, 0x1d, 0x7c, + 0x31, 0xe0, 0x18, 0x5e, 0x33, 0x1b, 0xcd, 0x49, 0x18, 0x88, 0x80, 0x10, 0x8f, 0xb9, 0xd7, 0x11, + 0x4f, 0x56, 0xa6, 0x44, 0xb4, 0xaa, 0x76, 0xe0, 0x79, 0x81, 0x9f, 0xc4, 0x5a, 0x35, 0xe6, 0x0b, + 0x0c, 0x7d, 0xcb, 0x4d, 0xd6, 0xc6, 0x4b, 0x68, 0x50, 0xbc, 0x64, 0x5c, 0x60, 0x78, 0x1e, 0x38, + 0x48, 0xf1, 0x2a, 0x42, 0x2e, 0xc8, 0xcf, 0xb0, 0x32, 0xb4, 0x38, 0x36, 0xb5, 0x6d, 0xad, 0x5d, + 0xe9, 0x7e, 0x67, 0x16, 0xea, 0xaa, 0x82, 0x67, 0xfc, 0xb2, 0x67, 0x71, 0xa4, 0x12, 0x49, 0x7e, + 0x81, 0x35, 0xcb, 0x71, 0x42, 0xe4, 0xbc, 0xb9, 0x3c, 0x27, 0xe9, 0x38, 0xc1, 0xd0, 0x14, 0x6c, + 0xbc, 0xd5, 0xe0, 0xeb, 0xe2, 0x04, 0x7c, 0x12, 0xf8, 0x1c, 0xc9, 0x21, 0x94, 0xb8, 0xb0, 0x44, + 0xc4, 0xd5, 0x10, 0xdf, 0xce, 0xac, 0xf7, 0x97, 0x84, 0x50, 0x05, 0x25, 0x3d, 0xa8, 0x30, 0x9f, + 0x89, 0xc1, 0xc4, 0x0a, 0x2d, 0x2f, 0x9d, 0x64, 0xc7, 0xbc, 0x43, 0x8b, 0x62, 0xa0, 0xef, 0x33, + 0x71, 0x21, 0x81, 0x14, 0x58, 0xf6, 0xdb, 0x30, 0x81, 0xf4, 0x63, 0xe6, 0xe2, 0xd2, 0xc8, 0x53, + 0x46, 0x9a, 0xb0, 0x26, 0xf9, 0xec, 0x9f, 0x36, 0xb5, 0x6d, 0xbd, 0xad, 0xd3, 0x74, 0x69, 0x08, + 0x28, 0x4b, 0x7c, 0xdf, 0x1f, 0x05, 0xe4, 0x08, 0x56, 0xe3, 0x51, 0x12, 0xe6, 0x6a, 0xdd, 0xef, + 0x67, 0x0e, 0x3d, 0x2d, 0x4f, 0x13, 0x74, 0xbe, 0x7a, 0x3c, 0xf3, 0xb4, 0x3a, 0xd9, 0x84, 0x12, + 0x45, 0x8b, 0x07, 0x7e, 0x53, 0xdf, 0xd6, 0xda, 0x65, 0xaa, 0x56, 0xc6, 0x2b, 0x0d, 0x1a, 0x85, + 0x31, 0x9f, 0x42, 0xdb, 0x51, 0x92, 0x84, 0x31, 0x63, 0x7a, 0xbb, 0xd2, 0xdd, 0x32, 0xef, 0x1b, + 0xc9, 0xcc, 0x36, 0x49, 0x15, 0xd8, 0xf8, 0xa8, 0x41, 0xbd, 0x17, 0x31, 0xd7, 0x91, 0x9f, 0x52, + 0xa6, 0xb6, 0x00, 0x1c, 0x4b, 0x58, 0x83, 0x89, 0x25, 0xc6, 0x5c, 0x92, 0x55, 0xa6, 0xe5, 0x38, + 0x72, 0x11, 0x07, 0x62, 0x89, 0xc4, 0xcd, 0x04, 0xa7, 0x12, 0xe9, 0xf7, 0x25, 0x52, 0x53, 0xfe, + 0x89, 0x37, 0xff, 0x58, 0x6e, 0x84, 0x17, 0x16, 0x0b, 0x29, 0xc4, 0x59, 0x89, 0x44, 0xe4, 0x14, + 0xaa, 0x89, 0xfd, 0x55, 0x11, 0xfd, 0xb1, 0x45, 0x2a, 0x32, 0x4d, 0x09, 0x6d, 0x03, 0xc9, 0x4f, + 0xff, 0x14, 0x02, 0x1f, 0xd4, 0xcf, 0x18, 0xc2, 0xfa, 0xb4, 0xc9, 0x89, 0xe7, 0x14, 0x8d, 0x54, + 0x90, 0xfa, 0x57, 0xd0, 0x43, 0xbc, 0x52, 0xa6, 0xdd, 0x9d, 0x25, 0xc1, 0x3d, 0xb2, 0x69, 0x9c, + 0x61, 0xbc, 0xd3, 0x60, 0x73, 0xfa, 0xe9, 0x3c, 0x10, 0x6c, 0xc4, 0x6c, 0x4b, 0xb0, 0xc0, 0x7f, + 0xe6, 0xdd, 0x90, 0x36, 0x6c, 0x24, 0xc4, 0x8f, 0x98, 0x8b, 0x4a, 0x61, 0x5d, 0x2a, 0x5c, 0x93, + 0xf1, 0xdf, 0x99, 0x8b, 0x52, 0x66, 0xe3, 0x10, 0xbe, 0xe9, 0x17, 0x22, 0xa9, 0x3d, 0x5a, 0xf0, + 0x95, 0xaa, 0xc6, 0xd5, 0x49, 0xca, 0xd6, 0xc6, 0x1b, 0x0d, 0xea, 0x85, 0x2c, 0x79, 0xa6, 0xbe, + 0xd8, 0x1e, 0xde, 0x6b, 0xb0, 0x79, 0x77, 0x13, 0x4f, 0x71, 0xc9, 0x29, 0x40, 0xae, 0x67, 0xe2, + 0xfc, 0xdd, 0x07, 0x8f, 0x5a, 0x9e, 0x03, 0x5a, 0x1e, 0x65, 0x53, 0x7d, 0x58, 0x56, 0x17, 0xce, + 0x19, 0x0a, 0xeb, 0xf9, 0x2f, 0x9c, 0x2d, 0x00, 0xf4, 0xaf, 0x22, 0x1c, 0x08, 0xe6, 0xa1, 0xbc, + 0x74, 0x74, 0x5a, 0x96, 0x91, 0xbf, 0x99, 0x87, 0xe4, 0x07, 0x58, 0xe7, 0xf6, 0x18, 0x9d, 0xc8, + 0x55, 0x88, 0x15, 0x89, 0xa8, 0xa6, 0x41, 0x09, 0x32, 0xa1, 0x31, 0x8c, 0xfd, 0x38, 0xb0, 0x03, + 0x6f, 0xe2, 0xa2, 0x50, 0xd0, 0x55, 0x09, 0xad, 0xcb, 0x4f, 0x27, 0xea, 0x8b, 0xc4, 0x2b, 0xe7, + 0x97, 0x3e, 0xd7, 0xf9, 0x33, 0xb5, 0x5c, 0x9b, 0xa5, 0x65, 0xf7, 0xf5, 0x0a, 0x54, 0x13, 0x1a, + 0x92, 0x17, 0x93, 0xd8, 0x50, 0xcd, 0xbf, 0x3b, 0x64, 0x7f, 0x56, 0xdb, 0x19, 0x6f, 0x63, 0xab, + 0xbd, 0x18, 0x98, 0x98, 0xc4, 0x58, 0x22, 0xff, 0x03, 0x4c, 0x27, 0x27, 0x8f, 0xdb, 0x59, 0x6b, + 0x6f, 0x11, 0x2c, 0x2b, 0x6f, 0x43, 0xed, 0x0f, 0x14, 0xb9, 0x67, 0x80, 0xec, 0x3d, 0x68, 0xa7, + 0xc2, 0x73, 0xd6, 0xda, 0x5f, 0x88, 0xcb, 0x9a, 0xb8, 0x50, 0x4f, 0x9b, 0x64, 0x74, 0x92, 0x1f, + 0x17, 0xda, 0x36, 0x6b, 0x75, 0xf0, 0x18, 0x68, 0x8e, 0xb1, 0x0d, 0x79, 0x81, 0xdd, 0xe4, 0x78, + 0x3b, 0x98, 0x4f, 0x48, 0xfe, 0xc2, 0x6b, 0xcd, 0x3b, 0x88, 0xc6, 0x52, 0xf7, 0x3f, 0x75, 0x76, + 0xa4, 0xe4, 0xe7, 0x05, 0x75, 0x76, 0xe6, 0x77, 0x39, 0xf1, 0x9c, 0x05, 0xc5, 0x7b, 0xc7, 0xff, + 0xfe, 0x76, 0xc9, 0xc4, 0x38, 0x1a, 0xc6, 0x5f, 0x3a, 0xb7, 0xcc, 0x75, 0xd9, 0xad, 0x40, 0x7b, + 0xdc, 0x49, 0xb2, 0x7e, 0x72, 0x18, 0x17, 0x21, 0x1b, 0x46, 0x02, 0x9d, 0x4e, 0xfa, 0x2f, 0xa4, + 0x23, 0x4b, 0x75, 0x64, 0xb7, 0xc9, 0x70, 0x58, 0x92, 0xcb, 0xc3, 0x4f, 0x01, 0x00, 0x00, 0xff, + 0xff, 0x10, 0x1a, 0x0a, 0xe9, 0xdf, 0x09, 0x00, 0x00, } // Reference imports to suppress errors if they are not otherwise used. @@ -736,7 +786,7 @@ type IndexServiceClient interface { RegisterNode(ctx context.Context, in *RegisterNodeRequest, opts ...grpc.CallOption) (*RegisterNodeResponse, error) BuildIndex(ctx context.Context, in *BuildIndexRequest, opts ...grpc.CallOption) (*BuildIndexResponse, error) GetIndexStates(ctx context.Context, in *IndexStatesRequest, opts ...grpc.CallOption) (*IndexStatesResponse, error) - GetIndexFilePaths(ctx context.Context, in *IndexFilePathRequest, opts ...grpc.CallOption) (*IndexFilePathsResponse, error) + GetIndexFilePaths(ctx context.Context, in *IndexFilePathsRequest, opts ...grpc.CallOption) (*IndexFilePathsResponse, error) NotifyBuildIndex(ctx context.Context, in *BuildIndexNotification, opts ...grpc.CallOption) (*commonpb.Status, error) } @@ -775,7 +825,7 @@ func (c *indexServiceClient) GetIndexStates(ctx context.Context, in *IndexStates return out, nil } -func (c *indexServiceClient) GetIndexFilePaths(ctx context.Context, in *IndexFilePathRequest, opts ...grpc.CallOption) (*IndexFilePathsResponse, error) { +func (c *indexServiceClient) GetIndexFilePaths(ctx context.Context, in *IndexFilePathsRequest, opts ...grpc.CallOption) (*IndexFilePathsResponse, error) { out := new(IndexFilePathsResponse) err := c.cc.Invoke(ctx, "/milvus.proto.index.IndexService/GetIndexFilePaths", in, out, opts...) if err != nil { @@ -804,7 +854,7 @@ type IndexServiceServer interface { RegisterNode(context.Context, *RegisterNodeRequest) (*RegisterNodeResponse, error) BuildIndex(context.Context, *BuildIndexRequest) (*BuildIndexResponse, error) GetIndexStates(context.Context, *IndexStatesRequest) (*IndexStatesResponse, error) - GetIndexFilePaths(context.Context, *IndexFilePathRequest) (*IndexFilePathsResponse, error) + GetIndexFilePaths(context.Context, *IndexFilePathsRequest) (*IndexFilePathsResponse, error) NotifyBuildIndex(context.Context, *BuildIndexNotification) (*commonpb.Status, error) } @@ -821,7 +871,7 @@ func (*UnimplementedIndexServiceServer) BuildIndex(ctx context.Context, req *Bui func (*UnimplementedIndexServiceServer) GetIndexStates(ctx context.Context, req *IndexStatesRequest) (*IndexStatesResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method GetIndexStates not implemented") } -func (*UnimplementedIndexServiceServer) GetIndexFilePaths(ctx context.Context, req *IndexFilePathRequest) (*IndexFilePathsResponse, error) { +func (*UnimplementedIndexServiceServer) GetIndexFilePaths(ctx context.Context, req *IndexFilePathsRequest) (*IndexFilePathsResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method GetIndexFilePaths not implemented") } func (*UnimplementedIndexServiceServer) NotifyBuildIndex(ctx context.Context, req *BuildIndexNotification) (*commonpb.Status, error) { @@ -887,7 +937,7 @@ func _IndexService_GetIndexStates_Handler(srv interface{}, ctx context.Context, } func _IndexService_GetIndexFilePaths_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(IndexFilePathRequest) + in := new(IndexFilePathsRequest) if err := dec(in); err != nil { return nil, err } @@ -899,7 +949,7 @@ func _IndexService_GetIndexFilePaths_Handler(srv interface{}, ctx context.Contex FullMethod: "/milvus.proto.index.IndexService/GetIndexFilePaths", } handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(IndexServiceServer).GetIndexFilePaths(ctx, req.(*IndexFilePathRequest)) + return srv.(IndexServiceServer).GetIndexFilePaths(ctx, req.(*IndexFilePathsRequest)) } return interceptor(ctx, in, info, handler) } diff --git a/internal/querynode/collection_replica.go b/internal/querynode/collection_replica.go index 11d991a23926ca21a9b51b43190e74f5b014b802..1e6ab95506fbbfe1181bc964f3612f153a2f1942 100644 --- a/internal/querynode/collection_replica.go +++ b/internal/querynode/collection_replica.go @@ -46,7 +46,8 @@ type collectionReplica interface { // Partition tags in different collections are not unique, // so partition api should specify the target collection. getPartitionNum(collectionID UniqueID) (int, error) - addPartition(collectionID UniqueID, partitionTag string) error + addPartition2(collectionID UniqueID, partitionTag string) error + addPartition(collectionID UniqueID, partitionID UniqueID) error removePartition(collectionID UniqueID, partitionTag string) error addPartitionsByCollectionMeta(colMeta *etcdpb.CollectionMeta) error removePartitionsByCollectionMeta(colMeta *etcdpb.CollectionMeta) error @@ -182,7 +183,7 @@ func (colReplica *collectionReplicaImpl) getPartitionNum(collectionID UniqueID) return len(collection.partitions), nil } -func (colReplica *collectionReplicaImpl) addPartition(collectionID UniqueID, partitionTag string) error { +func (colReplica *collectionReplicaImpl) addPartition2(collectionID UniqueID, partitionTag string) error { colReplica.mu.Lock() defer colReplica.mu.Unlock() @@ -191,7 +192,22 @@ func (colReplica *collectionReplicaImpl) addPartition(collectionID UniqueID, par return err } - var newPartition = newPartition(partitionTag) + var newPartition = newPartition2(partitionTag) + + *collection.Partitions() = append(*collection.Partitions(), newPartition) + return nil +} + +func (colReplica *collectionReplicaImpl) addPartition(collectionID UniqueID, partitionID UniqueID) error { + colReplica.mu.Lock() + defer colReplica.mu.Unlock() + + collection, err := colReplica.getCollectionByIDPrivate(collectionID) + if err != nil { + return err + } + + var newPartition = newPartition(partitionID) *collection.Partitions() = append(*collection.Partitions(), newPartition) return nil @@ -240,7 +256,7 @@ func (colReplica *collectionReplicaImpl) addPartitionsByCollectionMeta(colMeta * } for _, tag := range pToAdd { - err := colReplica.addPartition(colMeta.ID, tag) + err := colReplica.addPartition2(colMeta.ID, tag) if err != nil { log.Println(err) } diff --git a/internal/querynode/collection_replica_test.go b/internal/querynode/collection_replica_test.go index b02582bf9e6a72ecf4df0ab17d5ae541aa6ef09f..d95cae394670a518f254d7875d6e524318fc899f 100644 --- a/internal/querynode/collection_replica_test.go +++ b/internal/querynode/collection_replica_test.go @@ -11,13 +11,13 @@ func TestCollectionReplica_getCollectionNum(t *testing.T) { node := newQueryNodeMock() initTestMeta(t, node, "collection0", 0, 0) assert.Equal(t, node.replica.getCollectionNum(), 1) - node.Close() + node.Stop() } func TestCollectionReplica_addCollection(t *testing.T) { node := newQueryNodeMock() initTestMeta(t, node, "collection0", 0, 0) - node.Close() + node.Stop() } func TestCollectionReplica_removeCollection(t *testing.T) { @@ -28,7 +28,7 @@ func TestCollectionReplica_removeCollection(t *testing.T) { err := node.replica.removeCollection(0) assert.NoError(t, err) assert.Equal(t, node.replica.getCollectionNum(), 0) - node.Close() + node.Stop() } func TestCollectionReplica_getCollectionByID(t *testing.T) { @@ -41,7 +41,7 @@ func TestCollectionReplica_getCollectionByID(t *testing.T) { assert.NotNil(t, targetCollection) assert.Equal(t, targetCollection.Name(), collectionName) assert.Equal(t, targetCollection.ID(), collectionID) - node.Close() + node.Stop() } func TestCollectionReplica_getCollectionByName(t *testing.T) { @@ -56,7 +56,7 @@ func TestCollectionReplica_getCollectionByName(t *testing.T) { assert.Equal(t, targetCollection.Name(), collectionName) assert.Equal(t, targetCollection.ID(), collectionID) - node.Close() + node.Stop() } func TestCollectionReplica_hasCollection(t *testing.T) { @@ -70,7 +70,7 @@ func TestCollectionReplica_hasCollection(t *testing.T) { hasCollection = node.replica.hasCollection(UniqueID(1)) assert.Equal(t, hasCollection, false) - node.Close() + node.Stop() } //----------------------------------------------------------------------------------------------------- partition @@ -82,7 +82,7 @@ func TestCollectionReplica_getPartitionNum(t *testing.T) { partitionTags := []string{"a", "b", "c"} for _, tag := range partitionTags { - err := node.replica.addPartition(collectionID, tag) + err := node.replica.addPartition2(collectionID, tag) assert.NoError(t, err) partition, err := node.replica.getPartitionByTag(collectionID, tag) assert.NoError(t, err) @@ -92,7 +92,7 @@ func TestCollectionReplica_getPartitionNum(t *testing.T) { partitionNum, err := node.replica.getPartitionNum(collectionID) assert.NoError(t, err) assert.Equal(t, partitionNum, len(partitionTags)+1) // _default - node.Close() + node.Stop() } func TestCollectionReplica_addPartition(t *testing.T) { @@ -103,13 +103,13 @@ func TestCollectionReplica_addPartition(t *testing.T) { partitionTags := []string{"a", "b", "c"} for _, tag := range partitionTags { - err := node.replica.addPartition(collectionID, tag) + err := node.replica.addPartition2(collectionID, tag) assert.NoError(t, err) partition, err := node.replica.getPartitionByTag(collectionID, tag) assert.NoError(t, err) assert.Equal(t, partition.partitionTag, tag) } - node.Close() + node.Stop() } func TestCollectionReplica_removePartition(t *testing.T) { @@ -121,7 +121,7 @@ func TestCollectionReplica_removePartition(t *testing.T) { partitionTags := []string{"a", "b", "c"} for _, tag := range partitionTags { - err := node.replica.addPartition(collectionID, tag) + err := node.replica.addPartition2(collectionID, tag) assert.NoError(t, err) partition, err := node.replica.getPartitionByTag(collectionID, tag) assert.NoError(t, err) @@ -129,7 +129,7 @@ func TestCollectionReplica_removePartition(t *testing.T) { err = node.replica.removePartition(collectionID, tag) assert.NoError(t, err) } - node.Close() + node.Stop() } func TestCollectionReplica_addPartitionsByCollectionMeta(t *testing.T) { @@ -153,7 +153,7 @@ func TestCollectionReplica_addPartitionsByCollectionMeta(t *testing.T) { hasPartition = node.replica.hasPartition(UniqueID(0), "p2") assert.Equal(t, hasPartition, true) - node.Close() + node.Stop() } func TestCollectionReplica_removePartitionsByCollectionMeta(t *testing.T) { @@ -178,7 +178,7 @@ func TestCollectionReplica_removePartitionsByCollectionMeta(t *testing.T) { hasPartition = node.replica.hasPartition(UniqueID(0), "p2") assert.Equal(t, hasPartition, false) - node.Close() + node.Stop() } func TestCollectionReplica_getPartitionByTag(t *testing.T) { @@ -190,14 +190,14 @@ func TestCollectionReplica_getPartitionByTag(t *testing.T) { collectionMeta := genTestCollectionMeta(collectionName, collectionID, false) for _, tag := range collectionMeta.PartitionTags { - err := node.replica.addPartition(collectionID, tag) + err := node.replica.addPartition2(collectionID, tag) assert.NoError(t, err) partition, err := node.replica.getPartitionByTag(collectionID, tag) assert.NoError(t, err) assert.Equal(t, partition.partitionTag, tag) assert.NotNil(t, partition) } - node.Close() + node.Stop() } func TestCollectionReplica_hasPartition(t *testing.T) { @@ -207,13 +207,13 @@ func TestCollectionReplica_hasPartition(t *testing.T) { initTestMeta(t, node, collectionName, collectionID, 0) collectionMeta := genTestCollectionMeta(collectionName, collectionID, false) - err := node.replica.addPartition(collectionID, collectionMeta.PartitionTags[0]) + err := node.replica.addPartition2(collectionID, collectionMeta.PartitionTags[0]) assert.NoError(t, err) hasPartition := node.replica.hasPartition(collectionID, "default") assert.Equal(t, hasPartition, true) hasPartition = node.replica.hasPartition(collectionID, "default1") assert.Equal(t, hasPartition, false) - node.Close() + node.Stop() } //----------------------------------------------------------------------------------------------------- segment @@ -233,7 +233,7 @@ func TestCollectionReplica_addSegment(t *testing.T) { assert.Equal(t, targetSeg.segmentID, UniqueID(i)) } - node.Close() + node.Stop() } func TestCollectionReplica_removeSegment(t *testing.T) { @@ -255,7 +255,7 @@ func TestCollectionReplica_removeSegment(t *testing.T) { assert.NoError(t, err) } - node.Close() + node.Stop() } func TestCollectionReplica_getSegmentByID(t *testing.T) { @@ -275,7 +275,7 @@ func TestCollectionReplica_getSegmentByID(t *testing.T) { assert.Equal(t, targetSeg.segmentID, UniqueID(i)) } - node.Close() + node.Stop() } func TestCollectionReplica_hasSegment(t *testing.T) { @@ -299,7 +299,7 @@ func TestCollectionReplica_hasSegment(t *testing.T) { assert.Equal(t, hasSeg, false) } - node.Close() + node.Stop() } func TestCollectionReplica_freeAll(t *testing.T) { @@ -308,6 +308,6 @@ func TestCollectionReplica_freeAll(t *testing.T) { collectionID := UniqueID(0) initTestMeta(t, node, collectionName, collectionID, 0) - node.Close() + node.Stop() } diff --git a/internal/querynode/data_sync_service.go b/internal/querynode/data_sync_service.go index ebc4cb154199144362d01855f9be2d170d317e38..608006ec61f1f7454d707dc5ba668560f79fd46c 100644 --- a/internal/querynode/data_sync_service.go +++ b/internal/querynode/data_sync_service.go @@ -51,7 +51,7 @@ func (dsService *dataSyncService) initNodes() { var ddNode node = newDDNode(dsService.replica) var insertNode node = newInsertNode(dsService.replica) - var serviceTimeNode node = newServiceTimeNode(dsService.replica) + var serviceTimeNode node = newServiceTimeNode(dsService.ctx, dsService.replica) var gcNode node = newGCNode(dsService.replica) dsService.fg.AddNode(&dmStreamNode) diff --git a/internal/querynode/data_sync_service_test.go b/internal/querynode/data_sync_service_test.go index 31331bc7c877986a7b12d18a69308b284610702f..fe6dacf2dcf043009dfefc4c985dbd7fa0f3a207 100644 --- a/internal/querynode/data_sync_service_test.go +++ b/internal/querynode/data_sync_service_test.go @@ -136,5 +136,5 @@ func TestDataSyncService_Start(t *testing.T) { go node.dataSyncService.start() <-node.queryNodeLoopCtx.Done() - node.Close() + node.Stop() } diff --git a/internal/querynode/flow_graph_dd_node.go b/internal/querynode/flow_graph_dd_node.go index b99b22dfcac4511129f4224f5bfd361b4b2835e6..a2a8d967caf35d43d872f5e9152469363a6f6a6c 100644 --- a/internal/querynode/flow_graph_dd_node.go +++ b/internal/querynode/flow_graph_dd_node.go @@ -102,7 +102,7 @@ func (ddNode *ddNode) createCollection(msg *msgstream.CreateCollectionMsg) { } // add default partition - err = ddNode.replica.addPartition(collectionID, Params.DefaultPartitionTag) + err = ddNode.replica.addPartition2(collectionID, Params.DefaultPartitionTag) if err != nil { log.Println(err) return @@ -139,7 +139,7 @@ func (ddNode *ddNode) createPartition(msg *msgstream.CreatePartitionMsg) { collectionID := msg.CollectionID partitionName := msg.PartitionName - err := ddNode.replica.addPartition(collectionID, partitionName) + err := ddNode.replica.addPartition2(collectionID, partitionName) if err != nil { log.Println(err) return diff --git a/internal/querynode/flow_graph_service_time_node.go b/internal/querynode/flow_graph_service_time_node.go index a7830c4680e22c4810b7b92bc4943fd1be345941..9c33526c85b50f8cf102762c906d9059acd043a9 100644 --- a/internal/querynode/flow_graph_service_time_node.go +++ b/internal/querynode/flow_graph_service_time_node.go @@ -1,12 +1,19 @@ package querynode import ( + "context" "log" + + "github.com/zilliztech/milvus-distributed/internal/msgstream" + "github.com/zilliztech/milvus-distributed/internal/msgstream/pulsarms" + "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" + "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" ) type serviceTimeNode struct { baseNode - replica collectionReplica + replica collectionReplica + timeTickMsgStream *pulsarms.PulsarMsgStream } func (stNode *serviceTimeNode) Name() string { @@ -31,6 +38,10 @@ func (stNode *serviceTimeNode) Operate(in []*Msg) []*Msg { stNode.replica.getTSafe().set(serviceTimeMsg.timeRange.timestampMax) //fmt.Println("update tSafe to:", getPhysicalTime(serviceTimeMsg.timeRange.timestampMax)) + if err := stNode.sendTimeTick(serviceTimeMsg.timeRange.timestampMax); err != nil { + log.Printf("Error: send time tick into pulsar channel failed, %s\n", err.Error()) + } + var res Msg = &gcMsg{ gcRecord: serviceTimeMsg.gcRecord, timeRange: serviceTimeMsg.timeRange, @@ -38,7 +49,28 @@ func (stNode *serviceTimeNode) Operate(in []*Msg) []*Msg { return []*Msg{&res} } -func newServiceTimeNode(replica collectionReplica) *serviceTimeNode { +func (stNode *serviceTimeNode) sendTimeTick(ts Timestamp) error { + msgPack := msgstream.MsgPack{} + timeTickMsg := msgstream.TimeTickMsg{ + BaseMsg: msgstream.BaseMsg{ + BeginTimestamp: ts, + EndTimestamp: ts, + HashValues: []uint32{0}, + }, + TimeTickMsg: internalpb2.TimeTickMsg{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_kTimeTick, + MsgID: 0, + Timestamp: ts, + SourceID: Params.QueryNodeID, + }, + }, + } + msgPack.Msgs = append(msgPack.Msgs, &timeTickMsg) + return stNode.timeTickMsgStream.Produce(&msgPack) +} + +func newServiceTimeNode(ctx context.Context, replica collectionReplica) *serviceTimeNode { maxQueueLength := Params.FlowGraphMaxQueueLength maxParallelism := Params.FlowGraphMaxParallelism @@ -46,8 +78,13 @@ func newServiceTimeNode(replica collectionReplica) *serviceTimeNode { baseNode.SetMaxQueueLength(maxQueueLength) baseNode.SetMaxParallelism(maxParallelism) + timeTimeMsgStream := pulsarms.NewPulsarMsgStream(ctx, Params.SearchReceiveBufSize) + timeTimeMsgStream.SetPulsarClient(Params.PulsarAddress) + timeTimeMsgStream.CreatePulsarProducers([]string{Params.QueryNodeTimeTickChannelName}) + return &serviceTimeNode{ - baseNode: baseNode, - replica: replica, + baseNode: baseNode, + replica: replica, + timeTickMsgStream: timeTimeMsgStream, } } diff --git a/internal/querynode/load_index_service.go b/internal/querynode/load_index_service.go index 5071933334418ce3595bb648aca315e137672ead..caf044ea02f643bd6a5b81ece3de108e85ddf18c 100644 --- a/internal/querynode/load_index_service.go +++ b/internal/querynode/load_index_service.go @@ -136,7 +136,6 @@ func (lis *loadIndexService) execute(msg msgstream.TsMsg) error { return errors.New("type assertion failed for LoadIndexMsg") } // 1. use msg's index paths to get index bytes - fmt.Println("start load index") var err error ok, err = lis.checkIndexReady(indexMsg) if err != nil { @@ -169,6 +168,7 @@ func (lis *loadIndexService) execute(msg msgstream.TsMsg) error { if err != nil { return err } + fmt.Println("load index done") return nil } diff --git a/internal/querynode/load_index_service_test.go b/internal/querynode/load_index_service_test.go index f9d292a8ece46764b689e06a602ed4ec9570a706..8b931d5fb159ef76ec5b8198b2af67a55c4b7f9f 100644 --- a/internal/querynode/load_index_service_test.go +++ b/internal/querynode/load_index_service_test.go @@ -347,7 +347,7 @@ func TestLoadIndexService_FloatVector(t *testing.T) { defer assert.Equal(t, findFiledStats, true) <-node.queryNodeLoopCtx.Done() - node.Close() + node.Stop() } func TestLoadIndexService_BinaryVector(t *testing.T) { @@ -663,5 +663,5 @@ func TestLoadIndexService_BinaryVector(t *testing.T) { defer assert.Equal(t, findFiledStats, true) <-node.queryNodeLoopCtx.Done() - node.Close() + node.Stop() } diff --git a/internal/querynode/meta_service.go b/internal/querynode/meta_service.go index c5a039e9f4f9c4e2cb4c2cb8e86be25ae0197d38..6d12213c7129f17627bfb7b5c6428dde75a01044 100644 --- a/internal/querynode/meta_service.go +++ b/internal/querynode/meta_service.go @@ -141,7 +141,7 @@ func (mService *metaService) processCollectionCreate(id string, value string) { log.Println(err) } for _, partitionTag := range col.PartitionTags { - err = mService.replica.addPartition(col.ID, partitionTag) + err = mService.replica.addPartition2(col.ID, partitionTag) if err != nil { log.Println(err) } diff --git a/internal/querynode/meta_service_test.go b/internal/querynode/meta_service_test.go index eebaf58636a0c4b5488bbdd8d6562032ba1e3c00..2e816b03ef0e96105b8fa3879512c69a2bc34617 100644 --- a/internal/querynode/meta_service_test.go +++ b/internal/querynode/meta_service_test.go @@ -14,7 +14,7 @@ func TestMetaService_start(t *testing.T) { node.metaService = newMetaService(node.queryNodeLoopCtx, node.replica) node.metaService.start() - node.Close() + node.Stop() } func TestMetaService_getCollectionObjId(t *testing.T) { @@ -159,7 +159,7 @@ func TestMetaService_processCollectionCreate(t *testing.T) { collection, err := node.replica.getCollectionByName("test") assert.NoError(t, err) assert.Equal(t, collection.ID(), UniqueID(0)) - node.Close() + node.Stop() } func TestMetaService_processSegmentCreate(t *testing.T) { @@ -181,7 +181,7 @@ func TestMetaService_processSegmentCreate(t *testing.T) { s, err := node.replica.getSegmentByID(UniqueID(0)) assert.NoError(t, err) assert.Equal(t, s.segmentID, UniqueID(0)) - node.Close() + node.Stop() } func TestMetaService_processCreate(t *testing.T) { @@ -237,7 +237,7 @@ func TestMetaService_processCreate(t *testing.T) { s, err := node.replica.getSegmentByID(UniqueID(0)) assert.NoError(t, err) assert.Equal(t, s.segmentID, UniqueID(0)) - node.Close() + node.Stop() } func TestMetaService_processSegmentModify(t *testing.T) { @@ -271,7 +271,7 @@ func TestMetaService_processSegmentModify(t *testing.T) { seg, err := node.replica.getSegmentByID(segmentID) assert.NoError(t, err) assert.Equal(t, seg.segmentID, segmentID) - node.Close() + node.Stop() } func TestMetaService_processCollectionModify(t *testing.T) { @@ -379,7 +379,7 @@ func TestMetaService_processCollectionModify(t *testing.T) { assert.Equal(t, hasPartition, true) hasPartition = node.replica.hasPartition(UniqueID(0), "p3") assert.Equal(t, hasPartition, true) - node.Close() + node.Stop() } func TestMetaService_processModify(t *testing.T) { @@ -512,7 +512,7 @@ func TestMetaService_processModify(t *testing.T) { seg, err := node.replica.getSegmentByID(UniqueID(0)) assert.NoError(t, err) assert.Equal(t, seg.segmentID, UniqueID(0)) - node.Close() + node.Stop() } func TestMetaService_processSegmentDelete(t *testing.T) { @@ -537,7 +537,7 @@ func TestMetaService_processSegmentDelete(t *testing.T) { (*node.metaService).processSegmentDelete("0") mapSize := node.replica.getSegmentNum() assert.Equal(t, mapSize, 0) - node.Close() + node.Stop() } func TestMetaService_processCollectionDelete(t *testing.T) { @@ -585,7 +585,7 @@ func TestMetaService_processCollectionDelete(t *testing.T) { (*node.metaService).processCollectionDelete(id) collectionNum = node.replica.getCollectionNum() assert.Equal(t, collectionNum, 0) - node.Close() + node.Stop() } func TestMetaService_processDelete(t *testing.T) { @@ -648,7 +648,7 @@ func TestMetaService_processDelete(t *testing.T) { mapSize := node.replica.getSegmentNum() assert.Equal(t, mapSize, 0) - node.Close() + node.Stop() } func TestMetaService_processResp(t *testing.T) { @@ -663,7 +663,7 @@ func TestMetaService_processResp(t *testing.T) { case resp := <-metaChan: _ = (*node.metaService).processResp(resp) } - node.Close() + node.Stop() } func TestMetaService_loadCollections(t *testing.T) { @@ -672,7 +672,7 @@ func TestMetaService_loadCollections(t *testing.T) { err2 := (*node.metaService).loadCollections() assert.Nil(t, err2) - node.Close() + node.Stop() } func TestMetaService_loadSegments(t *testing.T) { @@ -681,5 +681,5 @@ func TestMetaService_loadSegments(t *testing.T) { err2 := (*node.metaService).loadSegments() assert.Nil(t, err2) - node.Close() + node.Stop() } diff --git a/internal/querynode/param_table.go b/internal/querynode/param_table.go index 663134dc153eb1ec688e771cc1afdfac8dd38548..cae8ed5b34c9c0279c1d5c64d387dd38d404b89e 100644 --- a/internal/querynode/param_table.go +++ b/internal/querynode/param_table.go @@ -15,8 +15,12 @@ type ParamTable struct { ETCDAddress string MetaRootPath string - QueryNodeID UniqueID - QueryNodeNum int + QueryNodeIP string + QueryNodePort int64 + QueryNodeID UniqueID + QueryNodeNum int + QueryNodeTimeTickChannelName string + QueryNodeTimeTickReceiveBufSize int64 FlowGraphMaxQueueLength int32 FlowGraphMaxParallelism int32 @@ -95,6 +99,13 @@ func (p *ParamTable) Init() { panic(err) } + p.initQueryNodeIP() + p.initQueryNodePort() + p.initQueryNodeID() + p.initQueryNodeNum() + p.initQueryNodeTimeTickChannelName() + p.initQueryNodeTimeTickReceiveBufSize() + p.initMinioEndPoint() p.initMinioAccessKeyID() p.initMinioSecretAccessKey() @@ -105,9 +116,6 @@ func (p *ParamTable) Init() { p.initETCDAddress() p.initMetaRootPath() - p.initQueryNodeID() - p.initQueryNodeNum() - p.initGracefulTime() p.initMsgChannelSubName() p.initDefaultPartitionTag() @@ -140,6 +148,55 @@ func (p *ParamTable) Init() { p.initLoadIndexPulsarBufSize() } +// ---------------------------------------------------------- query node +func (p *ParamTable) initQueryNodeIP() { + ip, err := p.Load("queryNode.ip") + if err != nil { + panic(err) + } + p.QueryNodeIP = ip +} + +func (p *ParamTable) initQueryNodePort() { + port, err := p.Load("queryNode.port") + if err != nil { + panic(err) + } + p.QueryNodePort, err = strconv.ParseInt(port, 10, 64) + if err != nil { + panic(err) + } +} + +func (p *ParamTable) initQueryNodeID() { + queryNodeID, err := p.Load("_queryNodeID") + if err != nil { + panic(err) + } + id, err := strconv.Atoi(queryNodeID) + if err != nil { + panic(err) + } + p.QueryNodeID = UniqueID(id) +} + +func (p *ParamTable) initQueryNodeNum() { + p.QueryNodeNum = len(p.QueryNodeIDList()) +} + +func (p *ParamTable) initQueryNodeTimeTickChannelName() { + ch, err := p.Load("msgChannel.chanNamePrefix.queryNodeTimeTick") + if err != nil { + log.Fatal(err) + } + p.QueryNodeTimeTickChannelName = ch +} + +func (p *ParamTable) initQueryNodeTimeTickReceiveBufSize() { + p.QueryNodeTimeTickReceiveBufSize = p.ParseInt64("queryNode.msgStream.timeTick.recvBufSize") +} + +// ---------------------------------------------------------- minio func (p *ParamTable) initMinioEndPoint() { url, err := p.Load("_MinioAddress") if err != nil { @@ -192,18 +249,6 @@ func (p *ParamTable) initPulsarAddress() { p.PulsarAddress = url } -func (p *ParamTable) initQueryNodeID() { - queryNodeID, err := p.Load("_queryNodeID") - if err != nil { - panic(err) - } - id, err := strconv.Atoi(queryNodeID) - if err != nil { - panic(err) - } - p.QueryNodeID = UniqueID(id) -} - func (p *ParamTable) initInsertChannelRange() { insertChannelRange, err := p.Load("msgChannel.channelRange.insert") if err != nil { @@ -426,10 +471,6 @@ func (p *ParamTable) initSliceIndex() { p.SliceIndex = -1 } -func (p *ParamTable) initQueryNodeNum() { - p.QueryNodeNum = len(p.QueryNodeIDList()) -} - func (p *ParamTable) initLoadIndexChannelNames() { loadIndexChannelName, err := p.Load("msgChannel.chanNamePrefix.cmd") if err != nil { diff --git a/internal/querynode/param_table_test.go b/internal/querynode/param_table_test.go index 461073146f8597d88c29076365c7bbb8e45dd0a8..b0d78c80a78d95eeb297828e0b1dc5ec274057a0 100644 --- a/internal/querynode/param_table_test.go +++ b/internal/querynode/param_table_test.go @@ -15,6 +15,38 @@ func TestParamTable_PulsarAddress(t *testing.T) { assert.Equal(t, "6650", split[len(split)-1]) } +func TestParamTable_QueryNode(t *testing.T) { + t.Run("Test ip", func(t *testing.T) { + ip := Params.QueryNodeIP + assert.Equal(t, ip, "localhost") + }) + + t.Run("Test port", func(t *testing.T) { + port := Params.QueryNodePort + assert.Equal(t, port, int64(20010)) + }) + + t.Run("Test id", func(t *testing.T) { + id := Params.QueryNodeID + assert.Contains(t, Params.QueryNodeIDList(), id) + }) + + t.Run("Test num", func(t *testing.T) { + num := Params.QueryNodeNum + assert.Equal(t, num, 2) + }) + + t.Run("Test time tick channel", func(t *testing.T) { + ch := Params.QueryNodeTimeTickChannelName + assert.Equal(t, ch, "queryNodeTimeTick") + }) + + t.Run("Test time tick ReceiveBufSize", func(t *testing.T) { + size := Params.QueryNodeTimeTickReceiveBufSize + assert.Equal(t, size, int64(64)) + }) +} + func TestParamTable_minio(t *testing.T) { t.Run("Test endPoint", func(t *testing.T) { endPoint := Params.MinioEndPoint @@ -56,11 +88,6 @@ func TestParamTable_LoadIndex(t *testing.T) { }) } -func TestParamTable_QueryNodeID(t *testing.T) { - id := Params.QueryNodeID - assert.Contains(t, Params.QueryNodeIDList(), id) -} - func TestParamTable_insertChannelRange(t *testing.T) { channelRange := Params.InsertChannelRange assert.Equal(t, 2, len(channelRange)) diff --git a/internal/querynode/partition.go b/internal/querynode/partition.go index cbf957ed52a9529e0b7f45568d85de13057332a9..1b5279f113f6a45833cc5a67d761a0176c618b87 100644 --- a/internal/querynode/partition.go +++ b/internal/querynode/partition.go @@ -30,10 +30,18 @@ func (p *Partition) Segments() *[]*Segment { return &(*p).segments } -func newPartition(partitionTag string) *Partition { +func newPartition2(partitionTag string) *Partition { var newPartition = &Partition{ partitionTag: partitionTag, } return newPartition } + +func newPartition(partitionID UniqueID) *Partition { + var newPartition = &Partition{ + id: partitionID, + } + + return newPartition +} diff --git a/internal/querynode/partition_test.go b/internal/querynode/partition_test.go index d004925617e3ebde82633d03cd750a2e12f120ac..0d5716f7c875fb69dc372eb97ed7daedaa51fa74 100644 --- a/internal/querynode/partition_test.go +++ b/internal/querynode/partition_test.go @@ -30,6 +30,6 @@ func TestPartition_Segments(t *testing.T) { func TestPartition_newPartition(t *testing.T) { partitionTag := "default" - partition := newPartition(partitionTag) + partition := newPartition2(partitionTag) assert.Equal(t, partition.partitionTag, partitionTag) } diff --git a/internal/querynode/query_node.go b/internal/querynode/query_node.go index 368cc457d27db4a65670f51b1c1392375756330c..6b0fbfb9de46d6676fe48c4011bb1014a8e599c9 100644 --- a/internal/querynode/query_node.go +++ b/internal/querynode/query_node.go @@ -16,19 +16,28 @@ import ( "context" "errors" "fmt" + queryserviceimpl "github.com/zilliztech/milvus-distributed/internal/queryservice" "io" + "sync/atomic" "github.com/opentracing/opentracing-go" "github.com/uber/jaeger-client-go/config" + "github.com/zilliztech/milvus-distributed/internal/msgstream/pulsarms" "github.com/zilliztech/milvus-distributed/internal/msgstream/util" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" + "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" queryPb "github.com/zilliztech/milvus-distributed/internal/proto/querypb" ) type Node interface { - Start() error - Close() + Init() + Start() + Stop() + + GetComponentStates() (*internalpb2.ComponentStates, error) + GetTimeTickChannel() (string, error) + GetStatisticsChannel() (string, error) AddQueryChannel(in *queryPb.AddQueryChannelsRequest) (*commonpb.Status, error) RemoveQueryChannel(in *queryPb.RemoveQueryChannelsRequest) (*commonpb.Status, error) @@ -43,6 +52,7 @@ type QueryNode struct { queryNodeLoopCancel context.CancelFunc QueryNodeID uint64 + stateCode atomic.Value replica collectionReplica @@ -60,10 +70,6 @@ type QueryNode struct { closer io.Closer } -func Init() { - Params.Init() -} - func NewQueryNode(ctx context.Context, queryNodeID uint64) Node { var node Node = newQueryNode(ctx, queryNodeID) return node @@ -71,7 +77,7 @@ func NewQueryNode(ctx context.Context, queryNodeID uint64) Node { func newQueryNode(ctx context.Context, queryNodeID uint64) *QueryNode { ctx1, cancel := context.WithCancel(ctx) - q := &QueryNode{ + node := &QueryNode{ queryNodeLoopCtx: ctx1, queryNodeLoopCancel: cancel, QueryNodeID: queryNodeID, @@ -91,28 +97,53 @@ func newQueryNode(ctx context.Context, queryNodeID uint64) *QueryNode { Param: 1, }, } - q.tracer, q.closer, err = cfg.NewTracer() + node.tracer, node.closer, err = cfg.NewTracer() if err != nil { panic(fmt.Sprintf("ERROR: cannot init Jaeger: %v\n", err)) } - opentracing.SetGlobalTracer(q.tracer) + opentracing.SetGlobalTracer(node.tracer) segmentsMap := make(map[int64]*Segment) collections := make([]*Collection, 0) tSafe := newTSafe() - q.replica = &collectionReplicaImpl{ + node.replica = &collectionReplicaImpl{ collections: collections, segments: segmentsMap, tSafe: tSafe, } + node.stateCode.Store(internalpb2.StateCode_INITIALIZING) + return node +} - return q +// TODO: delete this and call node.Init() +func Init() { + Params.Init() } -func (node *QueryNode) Start() error { +func (node *QueryNode) Init() { + registerReq := queryPb.RegisterNodeRequest{ + Address: &commonpb.Address{ + Ip: Params.QueryNodeIP, + Port: Params.QueryNodePort, + }, + } + var client queryserviceimpl.Interface // TODO: init interface + response, err := client.RegisterNode(registerReq) + if err != nil { + panic(err) + } + if response.Status.ErrorCode != commonpb.ErrorCode_SUCCESS { + panic(response.Status.Reason) + } + // TODO: use response.initParams + + Params.Init() +} + +func (node *QueryNode) Start() { // todo add connectMaster logic // init services and manager node.dataSyncService = newDataSyncService(node.queryNodeLoopCtx, node.replica) @@ -129,11 +160,12 @@ func (node *QueryNode) Start() error { go node.loadIndexService.start() go node.statsService.start() + node.stateCode.Store(internalpb2.StateCode_HEALTHY) <-node.queryNodeLoopCtx.Done() - return nil } -func (node *QueryNode) Close() { +func (node *QueryNode) Stop() { + node.stateCode.Store(internalpb2.StateCode_ABNORMAL) node.queryNodeLoopCancel() // free collectionReplica @@ -157,6 +189,30 @@ func (node *QueryNode) Close() { } } +func (node *QueryNode) GetComponentStates() (*internalpb2.ComponentStates, error) { + code, ok := node.stateCode.Load().(internalpb2.StateCode) + if !ok { + return nil, errors.New("unexpected error in type assertion") + } + info := &internalpb2.ComponentInfo{ + NodeID: Params.QueryNodeID, + Role: "query-node", + StateCode: code, + } + stats := &internalpb2.ComponentStates{ + State: info, + } + return stats, nil +} + +func (node *QueryNode) GetTimeTickChannel() (string, error) { + return Params.QueryNodeTimeTickChannelName, nil +} + +func (node *QueryNode) GetStatisticsChannel() (string, error) { + return Params.StatsChannelName, nil +} + func (node *QueryNode) AddQueryChannel(in *queryPb.AddQueryChannelsRequest) (*commonpb.Status, error) { if node.searchService == nil || node.searchService.searchMsgStream == nil { errMsg := "null search service or null search message stream" @@ -296,38 +352,26 @@ func (node *QueryNode) WatchDmChannels(in *queryPb.WatchDmChannelsRequest) (*com func (node *QueryNode) LoadSegments(in *queryPb.LoadSegmentRequest) (*commonpb.Status, error) { // TODO: support db - partitionID := in.PartitionID collectionID := in.CollectionID + partitionID := in.PartitionID + segmentIDs := in.SegmentIDs fieldIDs := in.FieldIDs - // TODO: interim solution - if len(fieldIDs) == 0 { - collection, err := node.replica.getCollectionByID(collectionID) - if err != nil { - status := &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, - Reason: err.Error(), - } - return status, err - } - fieldIDs = make([]int64, 0) - for _, field := range collection.Schema().Fields { - fieldIDs = append(fieldIDs, field.FieldID) + err := node.segManager.loadSegment(collectionID, partitionID, segmentIDs, fieldIDs) + if err != nil { + status := &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, + Reason: err.Error(), } + return status, err } - for _, segmentID := range in.SegmentIDs { - indexID := UniqueID(0) // TODO: get index id from master - err := node.segManager.loadSegment(segmentID, partitionID, collectionID, &fieldIDs) - if err != nil { - // TODO: return or continue? - status := &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, - Reason: err.Error(), - } - return status, err - } - err = node.segManager.loadIndex(segmentID, indexID) + return nil, nil +} + +func (node *QueryNode) ReleaseSegments(in *queryPb.ReleaseSegmentRequest) (*commonpb.Status, error) { + // release all fields in the segments + for _, id := range in.SegmentIDs { + err := node.segManager.releaseSegment(id) if err != nil { - // TODO: return or continue? status := &commonpb.Status{ ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, Reason: err.Error(), @@ -335,12 +379,6 @@ func (node *QueryNode) LoadSegments(in *queryPb.LoadSegmentRequest) (*commonpb.S return status, err } } - - return nil, nil -} - -func (node *QueryNode) ReleaseSegments(in *queryPb.ReleaseSegmentRequest) (*commonpb.Status, error) { - // TODO: implement return nil, nil } diff --git a/internal/querynode/query_node_test.go b/internal/querynode/query_node_test.go index feb9aeabc4e8412e53b37310dc45a7ef89d019d3..30bf6a224cfc5ff901e178012cf29dce38b1cf60 100644 --- a/internal/querynode/query_node_test.go +++ b/internal/querynode/query_node_test.go @@ -107,7 +107,7 @@ func initTestMeta(t *testing.T, node *QueryNode, collectionName string, collecti assert.Equal(t, collection.ID(), collectionID) assert.Equal(t, node.replica.getCollectionNum(), 1) - err = node.replica.addPartition(collection.ID(), collectionMeta.PartitionTags[0]) + err = node.replica.addPartition2(collection.ID(), collectionMeta.PartitionTags[0]) assert.NoError(t, err) err = node.replica.addSegment2(segmentID, collectionMeta.PartitionTags[0], collectionID, segTypeGrowing) @@ -163,7 +163,6 @@ func TestMain(m *testing.M) { // NOTE: start pulsar and etcd before test func TestQueryNode_Start(t *testing.T) { localNode := newQueryNodeMock() - err := localNode.Start() - assert.Nil(t, err) - localNode.Close() + localNode.Start() + localNode.Stop() } diff --git a/internal/querynode/search_service_test.go b/internal/querynode/search_service_test.go index 1853d6673079b33edd91544f8198c1a62a2f89e0..938e01c5b23b6ed835e74ab06a14de41d3466a69 100644 --- a/internal/querynode/search_service_test.go +++ b/internal/querynode/search_service_test.go @@ -209,7 +209,7 @@ func TestSearch_Search(t *testing.T) { time.Sleep(1 * time.Second) - node.Close() + node.Stop() } func TestSearch_SearchMultiSegments(t *testing.T) { @@ -407,5 +407,5 @@ func TestSearch_SearchMultiSegments(t *testing.T) { time.Sleep(1 * time.Second) - node.Close() + node.Stop() } diff --git a/internal/querynode/segment.go b/internal/querynode/segment.go index fb347154892ed1320c2c84dc6baa39ea1ae1a8c2..46aff8bb7a83654e58444e578876ad808a9fdb86 100644 --- a/internal/querynode/segment.go +++ b/internal/querynode/segment.go @@ -78,6 +78,7 @@ func newSegment2(collection *Collection, segmentID int64, partitionTag string, c segmentPtr := C.NewSegment(collection.collectionPtr, C.ulong(segmentID), segType) var newSegment = &Segment{ segmentPtr: segmentPtr, + segmentType: segType, segmentID: segmentID, partitionTag: partitionTag, collectionID: collectionID, @@ -96,6 +97,7 @@ func newSegment(collection *Collection, segmentID int64, partitionID UniqueID, c segmentPtr := C.NewSegment(collection.collectionPtr, C.ulong(segmentID), segType) var newSegment = &Segment{ segmentPtr: segmentPtr, + segmentType: segType, segmentID: segmentID, partitionID: partitionID, collectionID: collectionID, @@ -195,7 +197,16 @@ func (s *Segment) fillTargetEntry(plan *Plan, // segment, err := loadIndexService.replica.getSegmentByID(segmentID) func (s *Segment) updateSegmentIndex(loadIndexInfo *LoadIndexInfo) error { - status := C.UpdateSegmentIndex(s.segmentPtr, loadIndexInfo.cLoadIndexInfo) + var status C.CStatus + + if s.segmentType == segTypeGrowing { + status = C.UpdateSegmentIndex(s.segmentPtr, loadIndexInfo.cLoadIndexInfo) + } else if s.segmentType == segTypeSealed { + status = C.UpdateSealedSegmentIndex(s.segmentPtr, loadIndexInfo.cLoadIndexInfo) + } else { + return errors.New("illegal segment type") + } + errorCode := status.error_code if errorCode != 0 { @@ -345,7 +356,7 @@ func (s *Segment) segmentDelete(offset int64, entityIDs *[]UniqueID, timestamps } //-------------------------------------------------------------------------------------- interfaces for sealed segment -func (s *Segment) segmentLoadFieldData(fieldID int64, rowCount int, data unsafe.Pointer) error { +func (s *Segment) segmentLoadFieldData(fieldID int64, rowCount int, data interface{}) error { /* CStatus LoadFieldData(CSegmentInterface c_segment, CLoadFieldDataInfo load_field_data_info); @@ -354,16 +365,62 @@ func (s *Segment) segmentLoadFieldData(fieldID int64, rowCount int, data unsafe. return errors.New("illegal segment type when loading field data") } + // data interface check + var dataPointer unsafe.Pointer + emptyErr := errors.New("null field data to be loaded") + switch d := data.(type) { + case []bool: + if len(d) <= 0 { + return emptyErr + } + dataPointer = unsafe.Pointer(&d[0]) + case []int8: + if len(d) <= 0 { + return emptyErr + } + dataPointer = unsafe.Pointer(&d[0]) + case []int16: + if len(d) <= 0 { + return emptyErr + } + dataPointer = unsafe.Pointer(&d[0]) + case []int32: + if len(d) <= 0 { + return emptyErr + } + dataPointer = unsafe.Pointer(&d[0]) + case []int64: + if len(d) <= 0 { + return emptyErr + } + dataPointer = unsafe.Pointer(&d[0]) + case []float32: + if len(d) <= 0 { + return emptyErr + } + dataPointer = unsafe.Pointer(&d[0]) + case []float64: + if len(d) <= 0 { + return emptyErr + } + dataPointer = unsafe.Pointer(&d[0]) + case []string: + // TODO: support string type + return errors.New("we cannot support string type now") + default: + return errors.New("illegal field data type") + } + /* - struct CLoadFieldDataInfo { + typedef struct CLoadFieldDataInfo { int64_t field_id; void* blob; int64_t row_count; - }; + } CLoadFieldDataInfo; */ loadInfo := C.CLoadFieldDataInfo{ field_id: C.int64_t(fieldID), - blob: data, + blob: dataPointer, row_count: C.int64_t(rowCount), } diff --git a/internal/querynode/segment_manager.go b/internal/querynode/segment_manager.go index c08f6d2678f228cb50578776b0894a756be9c897..019a84ad38a4526182500f45e2f7707d85cff96d 100644 --- a/internal/querynode/segment_manager.go +++ b/internal/querynode/segment_manager.go @@ -3,7 +3,7 @@ package querynode import ( "context" "errors" - "unsafe" + "strconv" "github.com/zilliztech/milvus-distributed/internal/kv" miniokv "github.com/zilliztech/milvus-distributed/internal/kv/minio" @@ -12,7 +12,6 @@ import ( "github.com/zilliztech/milvus-distributed/internal/proto/datapb" "github.com/zilliztech/milvus-distributed/internal/proto/indexpb" internalPb "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" - queryPb "github.com/zilliztech/milvus-distributed/internal/proto/querypb" "github.com/zilliztech/milvus-distributed/internal/storage" ) @@ -58,25 +57,77 @@ func newSegmentManager(ctx context.Context, replica collectionReplica, loadIndex } } -func (s *segmentManager) loadSegment(segmentID UniqueID, partitionID UniqueID, collectionID UniqueID, fieldIDs *[]int64) error { +func (s *segmentManager) loadSegment(collectionID UniqueID, partitionID UniqueID, segmentIDs []UniqueID, fieldIDs []int64) error { + // TODO: interim solution + if len(fieldIDs) == 0 { + collection, err := s.replica.getCollectionByID(collectionID) + if err != nil { + return err + } + fieldIDs = make([]int64, 0) + for _, field := range collection.Schema().Fields { + fieldIDs = append(fieldIDs, field.FieldID) + } + } + for _, segmentID := range segmentIDs { + indexID := UniqueID(0) // TODO: get index id from master + paths, srcFieldIDs, err := s.getInsertBinlogPaths(segmentID) + if err != nil { + return err + } + + targetFields := s.filterOutNeedlessFields(paths, srcFieldIDs, fieldIDs) + // create segment + err = s.replica.addSegment(segmentID, partitionID, collectionID, segTypeSealed) + if err != nil { + return err + } + err = s.loadSegmentFieldsData(segmentID, targetFields) + if err != nil { + return err + } + indexPaths, err := s.getIndexPaths(indexID) + if err != nil { + return err + } + iParam, err := s.getIndexParam() + if err != nil { + return err + } + err = s.loadIndex(segmentID, indexPaths, iParam) + if err != nil { + // TODO: return or continue? + return err + } + } + return nil +} + +func (s *segmentManager) releaseSegment(segmentID UniqueID) error { + err := s.replica.removeSegment(segmentID) + return err +} + +//------------------------------------------------------------------------------------------------- internal functions +func (s *segmentManager) getInsertBinlogPaths(segmentID UniqueID) ([]*internalPb.StringList, []int64, error) { insertBinlogPathRequest := &datapb.InsertBinlogPathRequest{ SegmentID: segmentID, } pathResponse, err := s.dataClient.GetInsertBinlogPaths(context.TODO(), insertBinlogPathRequest) if err != nil { - return err + return nil, nil, err } if len(pathResponse.FieldIDs) != len(pathResponse.Paths) { - return errors.New("illegal InsertBinlogPathsResponse") + return nil, nil, errors.New("illegal InsertBinlogPathsResponse") } - // create segment - err = s.replica.addSegment(segmentID, partitionID, collectionID, segTypeSealed) - if err != nil { - return err - } + return pathResponse.Paths, pathResponse.FieldIDs, nil +} + +func (s *segmentManager) filterOutNeedlessFields(paths []*internalPb.StringList, srcFieldIDS []int64, dstFields []int64) map[int64]*internalPb.StringList { + targetFields := make(map[int64]*internalPb.StringList) containsFunc := func(s []int64, e int64) bool { for _, a := range s { @@ -87,13 +138,18 @@ func (s *segmentManager) loadSegment(segmentID UniqueID, partitionID UniqueID, c return false } - for i, fieldID := range pathResponse.FieldIDs { - // filter out the needless fields - if !containsFunc(*fieldIDs, fieldID) { - continue + for i, fieldID := range srcFieldIDS { + if containsFunc(dstFields, fieldID) { + targetFields[fieldID] = paths[i] } + } + + return targetFields +} - paths := pathResponse.Paths[i].Values +func (s *segmentManager) loadSegmentFieldsData(segmentID UniqueID, targetFields map[int64]*internalPb.StringList) error { + for id, p := range targetFields { + paths := p.Values blobs := make([]*storage.Blob, 0) for _, path := range paths { binLog, err := s.kv.Load(path) @@ -102,7 +158,7 @@ func (s *segmentManager) loadSegment(segmentID UniqueID, partitionID UniqueID, c return err } blobs = append(blobs, &storage.Blob{ - Key: "", // TODO: key??? + Key: strconv.FormatInt(id, 10), // TODO: key??? Value: []byte(binLog), }) } @@ -120,35 +176,35 @@ func (s *segmentManager) loadSegment(segmentID UniqueID, partitionID UniqueID, c var data interface{} switch fieldData := value.(type) { - case storage.BoolFieldData: + case *storage.BoolFieldData: numRows = fieldData.NumRows data = fieldData.Data - case storage.Int8FieldData: + case *storage.Int8FieldData: numRows = fieldData.NumRows data = fieldData.Data - case storage.Int16FieldData: + case *storage.Int16FieldData: numRows = fieldData.NumRows data = fieldData.Data - case storage.Int32FieldData: + case *storage.Int32FieldData: numRows = fieldData.NumRows data = fieldData.Data - case storage.Int64FieldData: + case *storage.Int64FieldData: numRows = fieldData.NumRows data = fieldData.Data - case storage.FloatFieldData: + case *storage.FloatFieldData: numRows = fieldData.NumRows data = fieldData.Data - case storage.DoubleFieldData: + case *storage.DoubleFieldData: numRows = fieldData.NumRows data = fieldData.Data case storage.StringFieldData: numRows = fieldData.NumRows data = fieldData.Data - case storage.FloatVectorFieldData: + case *storage.FloatVectorFieldData: // segment to be loaded doesn't need vector field, // so we ignore the type of vector field data continue - case storage.BinaryVectorFieldData: + case *storage.BinaryVectorFieldData: continue default: return errors.New("unexpected field data type") @@ -159,7 +215,7 @@ func (s *segmentManager) loadSegment(segmentID UniqueID, partitionID UniqueID, c // TODO: return or continue? return err } - err = segment.segmentLoadFieldData(fieldID, numRows, unsafe.Pointer(&data)) + err = segment.segmentLoadFieldData(id, numRows, data) if err != nil { // TODO: return or continue? return err @@ -170,25 +226,33 @@ func (s *segmentManager) loadSegment(segmentID UniqueID, partitionID UniqueID, c return nil } -func (s *segmentManager) loadIndex(segmentID UniqueID, indexID UniqueID) error { - indexFilePathRequest := &indexpb.IndexFilePathRequest{ - IndexID: indexID, +func (s *segmentManager) getIndexPaths(indexID UniqueID) ([]string, error) { + indexFilePathRequest := &indexpb.IndexFilePathsRequest{ + IndexIDs: []UniqueID{indexID}, } pathResponse, err := s.indexBuilderClient.GetIndexFilePaths(context.TODO(), indexFilePathRequest) if err != nil || pathResponse.Status.ErrorCode != commonpb.ErrorCode_SUCCESS { - return err + return nil, err } + return pathResponse.FilePaths[0].IndexFilePaths, nil +} + +func (s *segmentManager) getIndexParam() (indexParam, error) { + var targetIndexParam indexParam + // TODO: get index param from master + return targetIndexParam, nil +} + +func (s *segmentManager) loadIndex(segmentID UniqueID, indexPaths []string, indexParam indexParam) error { // get vector field ids from schema to load index vecFieldIDs, err := s.replica.getVecFieldsBySegmentID(segmentID) if err != nil { return err } for id, name := range vecFieldIDs { - var targetIndexParam indexParam - // TODO: get index param from master // non-blocking send - go s.sendLoadIndex(pathResponse.IndexFilePaths, segmentID, id, name, targetIndexParam) + go s.sendLoadIndex(indexPaths, segmentID, id, name, indexParam) } return nil @@ -225,9 +289,3 @@ func (s *segmentManager) sendLoadIndex(indexPaths []string, messages := []msgstream.TsMsg{loadIndexMsg} s.loadIndexReqChan <- messages } - -func (s *segmentManager) releaseSegment(in *queryPb.ReleaseSegmentRequest) error { - // TODO: implement - // TODO: release specific field, we need segCore supply relevant interface - return nil -} diff --git a/internal/querynode/segment_manager_test.go b/internal/querynode/segment_manager_test.go new file mode 100644 index 0000000000000000000000000000000000000000..563173ec114630bc46289ea2222bc4951571474d --- /dev/null +++ b/internal/querynode/segment_manager_test.go @@ -0,0 +1,251 @@ +package querynode + +import ( + "context" + "fmt" + "math/rand" + "path" + "strconv" + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/zilliztech/milvus-distributed/internal/indexnode" + minioKV "github.com/zilliztech/milvus-distributed/internal/kv/minio" + "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" + internalPb "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" + "github.com/zilliztech/milvus-distributed/internal/proto/schemapb" + "github.com/zilliztech/milvus-distributed/internal/storage" +) + +func generateInsertBinLog(collectionID UniqueID, partitionID UniqueID, segmentID UniqueID) ([]*internalPb.StringList, []int64, error) { + const ( + msgLength = 1000 + DIM = 16 + ) + + idData := make([]int64, 0) + for n := 0; n < msgLength; n++ { + idData = append(idData, int64(n)) + } + + var timestamps []int64 + for n := 0; n < msgLength; n++ { + timestamps = append(timestamps, int64(n+1)) + } + + var fieldAgeData []int32 + for n := 0; n < msgLength; n++ { + fieldAgeData = append(fieldAgeData, int32(n)) + } + + fieldVecData := make([]float32, 0) + for n := 0; n < msgLength; n++ { + for i := 0; i < DIM; i++ { + fieldVecData = append(fieldVecData, float32(n*i)*0.1) + } + } + + insertData := &storage.InsertData{ + Data: map[int64]storage.FieldData{ + 0: &storage.Int64FieldData{ + NumRows: msgLength, + Data: idData, + }, + 1: &storage.Int64FieldData{ + NumRows: msgLength, + Data: timestamps, + }, + 100: &storage.FloatVectorFieldData{ + NumRows: msgLength, + Data: fieldVecData, + Dim: DIM, + }, + 101: &storage.Int32FieldData{ + NumRows: msgLength, + Data: fieldAgeData, + }, + }, + } + + // buffer data to binLogs + collMeta := genTestCollectionMeta("collection0", collectionID, false) + collMeta.Schema.Fields = append(collMeta.Schema.Fields, &schemapb.FieldSchema{ + FieldID: 0, + Name: "uid", + DataType: schemapb.DataType_INT64, + }) + collMeta.Schema.Fields = append(collMeta.Schema.Fields, &schemapb.FieldSchema{ + FieldID: 1, + Name: "timestamp", + DataType: schemapb.DataType_INT64, + }) + inCodec := storage.NewInsertCodec(collMeta) + binLogs, err := inCodec.Serialize(partitionID, segmentID, insertData) + + if err != nil { + return nil, nil, err + } + + // create minio client + bucketName := Params.MinioBucketName + option := &minioKV.Option{ + Address: Params.MinioEndPoint, + AccessKeyID: Params.MinioAccessKeyID, + SecretAccessKeyID: Params.MinioSecretAccessKey, + UseSSL: Params.MinioUseSSLStr, + BucketName: bucketName, + CreateBucket: true, + } + kv, err := minioKV.NewMinIOKV(context.Background(), option) + if err != nil { + return nil, nil, err + } + + // binLogs -> minIO/S3 + collIDStr := strconv.FormatInt(collectionID, 10) + partitionIDStr := strconv.FormatInt(partitionID, 10) + segIDStr := strconv.FormatInt(segmentID, 10) + keyPrefix := path.Join("query-node-seg-manager-test-minio-prefix", collIDStr, partitionIDStr, segIDStr) + + paths := make([]*internalPb.StringList, 0) + fieldIDs := make([]int64, 0) + fmt.Println(".. saving binlog to MinIO ...", len(binLogs)) + for _, blob := range binLogs { + uid := rand.Int63n(100000000) + key := path.Join(keyPrefix, blob.Key, strconv.FormatInt(uid, 10)) + err = kv.Save(key, string(blob.Value[:])) + if err != nil { + return nil, nil, err + } + paths = append(paths, &internalPb.StringList{ + Values: []string{key}, + }) + fieldID, err := strconv.Atoi(blob.Key) + if err != nil { + return nil, nil, err + } + fieldIDs = append(fieldIDs, int64(fieldID)) + } + + return paths, fieldIDs, nil +} + +func generateIndex(segmentID UniqueID) ([]string, indexParam, error) { + const ( + msgLength = 1000 + DIM = 16 + ) + + indexParams := make(map[string]string) + indexParams["index_type"] = "IVF_PQ" + indexParams["index_mode"] = "cpu" + indexParams["dim"] = "16" + indexParams["k"] = "10" + indexParams["nlist"] = "100" + indexParams["nprobe"] = "10" + indexParams["m"] = "4" + indexParams["nbits"] = "8" + indexParams["metric_type"] = "L2" + indexParams["SLICE_SIZE"] = "4" + + var indexParamsKV []*commonpb.KeyValuePair + for key, value := range indexParams { + indexParamsKV = append(indexParamsKV, &commonpb.KeyValuePair{ + Key: key, + Value: value, + }) + } + + typeParams := make(map[string]string) + typeParams["dim"] = strconv.Itoa(DIM) + var indexRowData []float32 + for n := 0; n < msgLength; n++ { + for i := 0; i < DIM; i++ { + indexRowData = append(indexRowData, float32(n*i)) + } + } + + index, err := indexnode.NewCIndex(typeParams, indexParams) + if err != nil { + return nil, nil, err + } + + err = index.BuildFloatVecIndexWithoutIds(indexRowData) + if err != nil { + return nil, nil, err + } + + option := &minioKV.Option{ + Address: Params.MinioEndPoint, + AccessKeyID: Params.MinioAccessKeyID, + SecretAccessKeyID: Params.MinioSecretAccessKey, + UseSSL: Params.MinioUseSSLStr, + BucketName: Params.MinioBucketName, + CreateBucket: true, + } + + kv, err := minioKV.NewMinIOKV(context.Background(), option) + if err != nil { + return nil, nil, err + } + + //save index to minio + binarySet, err := index.Serialize() + if err != nil { + return nil, nil, err + } + + indexPaths := make([]string, 0) + for _, index := range binarySet { + path := strconv.Itoa(int(segmentID)) + "/" + index.Key + indexPaths = append(indexPaths, path) + err := kv.Save(path, string(index.Value)) + if err != nil { + return nil, nil, err + } + } + + return indexPaths, indexParams, nil +} + +func TestSegmentManager_load_and_release(t *testing.T) { + collectionID := UniqueID(0) + partitionID := UniqueID(1) + segmentID := UniqueID(2) + fieldIDs := []int64{101} + + node := newQueryNodeMock() + defer node.Stop() + + ctx := node.queryNodeLoopCtx + node.loadIndexService = newLoadIndexService(ctx, node.replica) + node.segManager = newSegmentManager(ctx, node.replica, node.loadIndexService.loadIndexReqChan) + go node.loadIndexService.start() + + collectionName := "collection0" + initTestMeta(t, node, collectionName, collectionID, 0) + + err := node.replica.addPartition(collectionID, partitionID) + assert.NoError(t, err) + + err = node.replica.addSegment(segmentID, partitionID, collectionID, segTypeSealed) + assert.NoError(t, err) + + paths, srcFieldIDs, err := generateInsertBinLog(collectionID, partitionID, segmentID) + assert.NoError(t, err) + + fieldsMap := node.segManager.filterOutNeedlessFields(paths, srcFieldIDs, fieldIDs) + assert.Equal(t, len(fieldsMap), 1) + + err = node.segManager.loadSegmentFieldsData(segmentID, fieldsMap) + assert.NoError(t, err) + + indexPaths, indexParams, err := generateIndex(segmentID) + assert.NoError(t, err) + + err = node.segManager.loadIndex(segmentID, indexPaths, indexParams) + assert.NoError(t, err) + + <-ctx.Done() +} diff --git a/internal/querynode/segment_test.go b/internal/querynode/segment_test.go index 35b82446bfec941b7a546e296285ccd17b6b7ad3..af7053577a3b8312a22ecbd962e0566fe231ae67 100644 --- a/internal/querynode/segment_test.go +++ b/internal/querynode/segment_test.go @@ -468,3 +468,27 @@ func TestSegment_segmentPreDelete(t *testing.T) { deleteSegment(segment) deleteCollection(collection) } + +func TestSegment_segmentLoadFieldData(t *testing.T) { + collectionName := "collection0" + collectionID := UniqueID(0) + collectionMeta := genTestCollectionMeta(collectionName, collectionID, false) + + collection := newCollection(collectionMeta.ID, collectionMeta.Schema) + assert.Equal(t, collection.Name(), collectionName) + assert.Equal(t, collection.ID(), collectionID) + + segmentID := UniqueID(0) + partitionID := UniqueID(0) + segment := newSegment(collection, segmentID, partitionID, collectionID, segTypeSealed) + assert.Equal(t, segmentID, segment.segmentID) + assert.Equal(t, partitionID, segment.partitionID) + + const N = 16 + var ages = []int32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16} + err := segment.segmentLoadFieldData(101, N, ages) + assert.NoError(t, err) + + deleteSegment(segment) + deleteCollection(collection) +} diff --git a/internal/querynode/stats_service_test.go b/internal/querynode/stats_service_test.go index 80c1cb4d415cb90e7f857ada5e166c69b4b7b33c..dc132dfb81313f8b48157ab3d4d0eed084ce796d 100644 --- a/internal/querynode/stats_service_test.go +++ b/internal/querynode/stats_service_test.go @@ -13,7 +13,7 @@ func TestStatsService_start(t *testing.T) { initTestMeta(t, node, "collection0", 0, 0) node.statsService = newStatsService(node.queryNodeLoopCtx, node.replica, nil) node.statsService.start() - node.Close() + node.Stop() } //NOTE: start pulsar before test @@ -39,5 +39,5 @@ func TestSegmentManagement_sendSegmentStatistic(t *testing.T) { // send stats node.statsService.publicStatistic(nil) - node.Close() + node.Stop() }