diff --git a/internal/querynode/collection.go b/internal/querynode/collection.go index c42c1a66a0770441cffd362d65a067dfc27e8e5d..bd53ffbd01631f0dd618bd26a2e76e1d682d9510 100644 --- a/internal/querynode/collection.go +++ b/internal/querynode/collection.go @@ -23,10 +23,6 @@ type Collection struct { partitions []*Partition } -//func (c *Collection) Name() string { -// return c.schema.Name -//} - func (c *Collection) ID() UniqueID { return c.id } diff --git a/internal/querynode/collection_replica.go b/internal/querynode/collection_replica.go index da369b813f2b56070ebf3d327f1a21b397180d99..dd1a80fd3461b15632edf1bb13cb9a60bcce51ad 100644 --- a/internal/querynode/collection_replica.go +++ b/internal/querynode/collection_replica.go @@ -43,17 +43,14 @@ type collectionReplica interface { getVecFieldsByCollectionID(collectionID UniqueID) ([]int64, error) // partition - // Partition tags in different collections are not unique, - // so partition api should specify the target collection. + // TODO: remove collection ID, add a `map[partitionID]partition` to replica implement getPartitionNum(collectionID UniqueID) (int, error) - addPartition2(collectionID UniqueID, partitionTag string) error addPartition(collectionID UniqueID, partitionID UniqueID) error - removePartition(collectionID UniqueID, partitionTag string) error - addPartitionsByCollectionMeta(colMeta *etcdpb.CollectionMeta) error - removePartitionsByCollectionMeta(colMeta *etcdpb.CollectionMeta) error - getPartitionByTag(collectionID UniqueID, partitionTag string) (*Partition, error) + removePartition(collectionID UniqueID, partitionID UniqueID) error + addPartitionsByCollectionMeta(colMeta *etcdpb.CollectionInfo) error + removePartitionsByCollectionMeta(colMeta *etcdpb.CollectionInfo) error getPartitionByID(collectionID UniqueID, partitionID UniqueID) (*Partition, error) - hasPartition(collectionID UniqueID, partitionTag string) bool + hasPartition(collectionID UniqueID, partitionID UniqueID) bool enablePartitionDM(collectionID UniqueID, partitionID UniqueID) error disablePartitionDM(collectionID UniqueID, partitionID UniqueID) error getEnablePartitionDM(collectionID UniqueID, partitionID UniqueID) (bool, error) @@ -61,7 +58,6 @@ type collectionReplica interface { // segment getSegmentNum() int getSegmentStatistics() []*internalpb2.SegmentStats - addSegment2(segmentID UniqueID, partitionTag string, collectionID UniqueID, segType segmentType) error addSegment(segmentID UniqueID, partitionID UniqueID, collectionID UniqueID, segType segmentType) error removeSegment(segmentID UniqueID) error getSegmentByID(segmentID UniqueID) (*Segment, error) @@ -197,21 +193,6 @@ func (colReplica *collectionReplicaImpl) getPartitionNum(collectionID UniqueID) return len(collection.partitions), nil } -func (colReplica *collectionReplicaImpl) addPartition2(collectionID UniqueID, partitionTag string) error { - colReplica.mu.Lock() - defer colReplica.mu.Unlock() - - collection, err := colReplica.getCollectionByIDPrivate(collectionID) - if err != nil { - return err - } - - var newPartition = newPartition2(partitionTag) - - *collection.Partitions() = append(*collection.Partitions(), newPartition) - return nil -} - func (colReplica *collectionReplicaImpl) addPartition(collectionID UniqueID, partitionID UniqueID) error { colReplica.mu.Lock() defer colReplica.mu.Unlock() @@ -227,14 +208,14 @@ func (colReplica *collectionReplicaImpl) addPartition(collectionID UniqueID, par return nil } -func (colReplica *collectionReplicaImpl) removePartition(collectionID UniqueID, partitionTag string) error { +func (colReplica *collectionReplicaImpl) removePartition(collectionID UniqueID, partitionID UniqueID) error { colReplica.mu.Lock() defer colReplica.mu.Unlock() - return colReplica.removePartitionPrivate(collectionID, partitionTag) + return colReplica.removePartitionPrivate(collectionID, partitionID) } -func (colReplica *collectionReplicaImpl) removePartitionPrivate(collectionID UniqueID, partitionTag string) error { +func (colReplica *collectionReplicaImpl) removePartitionPrivate(collectionID UniqueID, partitionID UniqueID) error { collection, err := colReplica.getCollectionByIDPrivate(collectionID) if err != nil { return err @@ -242,7 +223,7 @@ func (colReplica *collectionReplicaImpl) removePartitionPrivate(collectionID Uni var tmpPartitions = make([]*Partition, 0) for _, p := range *collection.Partitions() { - if p.Tag() == partitionTag { + if p.ID() == partitionID { for _, s := range *p.Segments() { deleteSegment(colReplica.segments[s.ID()]) delete(colReplica.segments, s.ID()) @@ -257,30 +238,30 @@ func (colReplica *collectionReplicaImpl) removePartitionPrivate(collectionID Uni } // deprecated -func (colReplica *collectionReplicaImpl) addPartitionsByCollectionMeta(colMeta *etcdpb.CollectionMeta) error { +func (colReplica *collectionReplicaImpl) addPartitionsByCollectionMeta(colMeta *etcdpb.CollectionInfo) error { if !colReplica.hasCollection(colMeta.ID) { err := errors.New("Cannot find collection, id = " + strconv.FormatInt(colMeta.ID, 10)) return err } - pToAdd := make([]string, 0) - for _, partitionTag := range colMeta.PartitionTags { - if !colReplica.hasPartition(colMeta.ID, partitionTag) { - pToAdd = append(pToAdd, partitionTag) + pToAdd := make([]UniqueID, 0) + for _, partitionID := range colMeta.PartitionIDs { + if !colReplica.hasPartition(colMeta.ID, partitionID) { + pToAdd = append(pToAdd, partitionID) } } - for _, tag := range pToAdd { - err := colReplica.addPartition2(colMeta.ID, tag) + for _, id := range pToAdd { + err := colReplica.addPartition(colMeta.ID, id) if err != nil { log.Println(err) } - fmt.Println("add partition: ", tag) + fmt.Println("add partition: ", id) } return nil } -func (colReplica *collectionReplicaImpl) removePartitionsByCollectionMeta(colMeta *etcdpb.CollectionMeta) error { +func (colReplica *collectionReplicaImpl) removePartitionsByCollectionMeta(colMeta *etcdpb.CollectionInfo) error { colReplica.mu.Lock() defer colReplica.mu.Unlock() @@ -289,37 +270,30 @@ func (colReplica *collectionReplicaImpl) removePartitionsByCollectionMeta(colMet return err } - pToDel := make([]string, 0) + pToDel := make([]UniqueID, 0) for _, partition := range col.partitions { hasPartition := false - for _, tag := range colMeta.PartitionTags { - if partition.partitionTag == tag { + for _, id := range colMeta.PartitionIDs { + if partition.ID() == id { hasPartition = true } } if !hasPartition { - pToDel = append(pToDel, partition.partitionTag) + pToDel = append(pToDel, partition.ID()) } } - for _, tag := range pToDel { - err := colReplica.removePartitionPrivate(col.ID(), tag) + for _, id := range pToDel { + err := colReplica.removePartitionPrivate(col.ID(), id) if err != nil { log.Println(err) } - fmt.Println("delete partition: ", tag) + fmt.Println("delete partition: ", id) } return nil } -func (colReplica *collectionReplicaImpl) getPartitionByTag(collectionID UniqueID, partitionTag string) (*Partition, error) { - colReplica.mu.RLock() - defer colReplica.mu.RUnlock() - - return colReplica.getPartitionByTagPrivate(collectionID, partitionTag) -} - func (colReplica *collectionReplicaImpl) getPartitionByID(collectionID UniqueID, partitionID UniqueID) (*Partition, error) { colReplica.mu.RLock() defer colReplica.mu.RUnlock() @@ -327,21 +301,6 @@ func (colReplica *collectionReplicaImpl) getPartitionByID(collectionID UniqueID, return colReplica.getPartitionByIDPrivate(collectionID, partitionID) } -func (colReplica *collectionReplicaImpl) getPartitionByTagPrivate(collectionID UniqueID, partitionTag string) (*Partition, error) { - collection, err := colReplica.getCollectionByIDPrivate(collectionID) - if err != nil { - return nil, err - } - - for _, p := range *collection.Partitions() { - if p.Tag() == partitionTag { - return p, nil - } - } - - return nil, errors.New("cannot find partition, tag = " + partitionTag) -} - func (colReplica *collectionReplicaImpl) getPartitionByIDPrivate(collectionID UniqueID, partitionID UniqueID) (*Partition, error) { collection, err := colReplica.getCollectionByIDPrivate(collectionID) if err != nil { @@ -357,7 +316,7 @@ func (colReplica *collectionReplicaImpl) getPartitionByIDPrivate(collectionID Un return nil, errors.New("cannot find partition, id = " + strconv.FormatInt(partitionID, 10)) } -func (colReplica *collectionReplicaImpl) hasPartition(collectionID UniqueID, partitionTag string) bool { +func (colReplica *collectionReplicaImpl) hasPartition(collectionID UniqueID, partitionID UniqueID) bool { colReplica.mu.RLock() defer colReplica.mu.RUnlock() @@ -368,7 +327,7 @@ func (colReplica *collectionReplicaImpl) hasPartition(collectionID UniqueID, par } for _, p := range *collection.Partitions() { - if p.Tag() == partitionTag { + if p.ID() == partitionID { return true } } @@ -446,28 +405,6 @@ func (colReplica *collectionReplicaImpl) getSegmentStatistics() []*internalpb2.S return statisticData } -func (colReplica *collectionReplicaImpl) addSegment2(segmentID UniqueID, partitionTag string, collectionID UniqueID, segType segmentType) error { - colReplica.mu.Lock() - defer colReplica.mu.Unlock() - - collection, err := colReplica.getCollectionByIDPrivate(collectionID) - if err != nil { - return err - } - - partition, err2 := colReplica.getPartitionByTagPrivate(collectionID, partitionTag) - if err2 != nil { - return err2 - } - - var newSegment = newSegment2(collection, segmentID, partitionTag, collectionID, segType) - - colReplica.segments[segmentID] = newSegment - *partition.Segments() = append(*partition.Segments(), newSegment) - - return nil -} - func (colReplica *collectionReplicaImpl) addSegment(segmentID UniqueID, partitionID UniqueID, collectionID UniqueID, segType segmentType) error { colReplica.mu.Lock() defer colReplica.mu.Unlock() diff --git a/internal/querynode/collection_replica_test.go b/internal/querynode/collection_replica_test.go index d0a81d27b59d20d13fc18c13c16b11e994c4ee17..521a1060d3c467a1f8c67f394ba55875d615438a 100644 --- a/internal/querynode/collection_replica_test.go +++ b/internal/querynode/collection_replica_test.go @@ -61,18 +61,18 @@ func TestCollectionReplica_getPartitionNum(t *testing.T) { collectionID := UniqueID(0) initTestMeta(t, node, collectionID, 0) - partitionTags := []string{"a", "b", "c"} - for _, tag := range partitionTags { - err := node.replica.addPartition2(collectionID, tag) + partitionIDs := []UniqueID{1, 2, 3} + for _, id := range partitionIDs { + err := node.replica.addPartition(collectionID, id) assert.NoError(t, err) - partition, err := node.replica.getPartitionByTag(collectionID, tag) + partition, err := node.replica.getPartitionByID(collectionID, id) assert.NoError(t, err) - assert.Equal(t, partition.partitionTag, tag) + assert.Equal(t, partition.ID(), id) } partitionNum, err := node.replica.getPartitionNum(collectionID) assert.NoError(t, err) - assert.Equal(t, partitionNum, len(partitionTags)+1) // _default + assert.Equal(t, partitionNum, len(partitionIDs)+1) // _default node.Stop() } @@ -81,13 +81,13 @@ func TestCollectionReplica_addPartition(t *testing.T) { collectionID := UniqueID(0) initTestMeta(t, node, collectionID, 0) - partitionTags := []string{"a", "b", "c"} - for _, tag := range partitionTags { - err := node.replica.addPartition2(collectionID, tag) + partitionIDs := []UniqueID{1, 2, 3} + for _, id := range partitionIDs { + err := node.replica.addPartition(collectionID, id) assert.NoError(t, err) - partition, err := node.replica.getPartitionByTag(collectionID, tag) + partition, err := node.replica.getPartitionByID(collectionID, id) assert.NoError(t, err) - assert.Equal(t, partition.partitionTag, tag) + assert.Equal(t, partition.ID(), id) } node.Stop() } @@ -97,15 +97,15 @@ func TestCollectionReplica_removePartition(t *testing.T) { collectionID := UniqueID(0) initTestMeta(t, node, collectionID, 0) - partitionTags := []string{"a", "b", "c"} + partitionIDs := []UniqueID{1, 2, 3} - for _, tag := range partitionTags { - err := node.replica.addPartition2(collectionID, tag) + for _, id := range partitionIDs { + err := node.replica.addPartition(collectionID, id) assert.NoError(t, err) - partition, err := node.replica.getPartitionByTag(collectionID, tag) + partition, err := node.replica.getPartitionByID(collectionID, id) assert.NoError(t, err) - assert.Equal(t, partition.partitionTag, tag) - err = node.replica.removePartition(collectionID, tag) + assert.Equal(t, partition.ID(), id) + err = node.replica.removePartition(collectionID, id) assert.NoError(t, err) } node.Stop() @@ -117,18 +117,18 @@ func TestCollectionReplica_addPartitionsByCollectionMeta(t *testing.T) { initTestMeta(t, node, collectionID, 0) collectionMeta := genTestCollectionMeta(collectionID, false) - collectionMeta.PartitionTags = []string{"p0", "p1", "p2"} + collectionMeta.PartitionIDs = []UniqueID{0, 1, 2} err := node.replica.addPartitionsByCollectionMeta(collectionMeta) assert.NoError(t, err) partitionNum, err := node.replica.getPartitionNum(UniqueID(0)) assert.NoError(t, err) - assert.Equal(t, partitionNum, len(collectionMeta.PartitionTags)+1) - hasPartition := node.replica.hasPartition(UniqueID(0), "p0") + assert.Equal(t, partitionNum, len(collectionMeta.PartitionIDs)+1) + hasPartition := node.replica.hasPartition(UniqueID(0), UniqueID(0)) assert.Equal(t, hasPartition, true) - hasPartition = node.replica.hasPartition(UniqueID(0), "p1") + hasPartition = node.replica.hasPartition(UniqueID(0), UniqueID(1)) assert.Equal(t, hasPartition, true) - hasPartition = node.replica.hasPartition(UniqueID(0), "p2") + hasPartition = node.replica.hasPartition(UniqueID(0), UniqueID(2)) assert.Equal(t, hasPartition, true) node.Stop() @@ -140,19 +140,19 @@ func TestCollectionReplica_removePartitionsByCollectionMeta(t *testing.T) { initTestMeta(t, node, collectionID, 0) collectionMeta := genTestCollectionMeta(collectionID, false) - collectionMeta.PartitionTags = []string{"p0"} + collectionMeta.PartitionIDs = []UniqueID{0} err := node.replica.addPartitionsByCollectionMeta(collectionMeta) assert.NoError(t, err) partitionNum, err := node.replica.getPartitionNum(UniqueID(0)) assert.NoError(t, err) - assert.Equal(t, partitionNum, len(collectionMeta.PartitionTags)+1) + assert.Equal(t, partitionNum, len(collectionMeta.PartitionIDs)+1) - hasPartition := node.replica.hasPartition(UniqueID(0), "p0") + hasPartition := node.replica.hasPartition(UniqueID(0), UniqueID(0)) assert.Equal(t, hasPartition, true) - hasPartition = node.replica.hasPartition(UniqueID(0), "p1") + hasPartition = node.replica.hasPartition(UniqueID(0), UniqueID(1)) assert.Equal(t, hasPartition, false) - hasPartition = node.replica.hasPartition(UniqueID(0), "p2") + hasPartition = node.replica.hasPartition(UniqueID(0), UniqueID(2)) assert.Equal(t, hasPartition, false) node.Stop() @@ -165,12 +165,12 @@ func TestCollectionReplica_getPartitionByTag(t *testing.T) { collectionMeta := genTestCollectionMeta(collectionID, false) - for _, tag := range collectionMeta.PartitionTags { - err := node.replica.addPartition2(collectionID, tag) + for _, id := range collectionMeta.PartitionIDs { + err := node.replica.addPartition(collectionID, id) assert.NoError(t, err) - partition, err := node.replica.getPartitionByTag(collectionID, tag) + partition, err := node.replica.getPartitionByID(collectionID, id) assert.NoError(t, err) - assert.Equal(t, partition.partitionTag, tag) + assert.Equal(t, partition.ID(), id) assert.NotNil(t, partition) } node.Stop() @@ -182,11 +182,11 @@ func TestCollectionReplica_hasPartition(t *testing.T) { initTestMeta(t, node, collectionID, 0) collectionMeta := genTestCollectionMeta(collectionID, false) - err := node.replica.addPartition2(collectionID, collectionMeta.PartitionTags[0]) + err := node.replica.addPartition(collectionID, collectionMeta.PartitionIDs[0]) assert.NoError(t, err) - hasPartition := node.replica.hasPartition(collectionID, "default") + hasPartition := node.replica.hasPartition(collectionID, defaultPartitionID) assert.Equal(t, hasPartition, true) - hasPartition = node.replica.hasPartition(collectionID, "default1") + hasPartition = node.replica.hasPartition(collectionID, defaultPartitionID+1) assert.Equal(t, hasPartition, false) node.Stop() } @@ -198,9 +198,8 @@ func TestCollectionReplica_addSegment(t *testing.T) { initTestMeta(t, node, collectionID, 0) const segmentNum = 3 - tag := "default" for i := 0; i < segmentNum; i++ { - err := node.replica.addSegment2(UniqueID(i), tag, collectionID, segTypeGrowing) + err := node.replica.addSegment(UniqueID(i), defaultPartitionID, collectionID, segTypeGrowing) assert.NoError(t, err) targetSeg, err := node.replica.getSegmentByID(UniqueID(i)) assert.NoError(t, err) @@ -216,10 +215,9 @@ func TestCollectionReplica_removeSegment(t *testing.T) { initTestMeta(t, node, collectionID, 0) const segmentNum = 3 - tag := "default" for i := 0; i < segmentNum; i++ { - err := node.replica.addSegment2(UniqueID(i), tag, collectionID, segTypeGrowing) + err := node.replica.addSegment(UniqueID(i), defaultPartitionID, collectionID, segTypeGrowing) assert.NoError(t, err) targetSeg, err := node.replica.getSegmentByID(UniqueID(i)) assert.NoError(t, err) @@ -237,10 +235,9 @@ func TestCollectionReplica_getSegmentByID(t *testing.T) { initTestMeta(t, node, collectionID, 0) const segmentNum = 3 - tag := "default" for i := 0; i < segmentNum; i++ { - err := node.replica.addSegment2(UniqueID(i), tag, collectionID, segTypeGrowing) + err := node.replica.addSegment(UniqueID(i), defaultPartitionID, collectionID, segTypeGrowing) assert.NoError(t, err) targetSeg, err := node.replica.getSegmentByID(UniqueID(i)) assert.NoError(t, err) @@ -256,10 +253,9 @@ func TestCollectionReplica_hasSegment(t *testing.T) { initTestMeta(t, node, collectionID, 0) const segmentNum = 3 - tag := "default" for i := 0; i < segmentNum; i++ { - err := node.replica.addSegment2(UniqueID(i), tag, collectionID, segTypeGrowing) + err := node.replica.addSegment(UniqueID(i), defaultPartitionID, collectionID, segTypeGrowing) assert.NoError(t, err) targetSeg, err := node.replica.getSegmentByID(UniqueID(i)) assert.NoError(t, err) diff --git a/internal/querynode/data_sync_service_test.go b/internal/querynode/data_sync_service_test.go index 8b48ce116bad660cf5a8eb0b7bec376ccd0f02d2..b8cd27b7aa727c5bfc867c0f530a08af5af8afaa 100644 --- a/internal/querynode/data_sync_service_test.go +++ b/internal/querynode/data_sync_service_test.go @@ -61,12 +61,12 @@ func TestDataSyncService_Start(t *testing.T) { Timestamp: uint64(i + 1000), SourceID: 0, }, - CollectionID: UniqueID(0), - PartitionName: "default", - SegmentID: int64(0), - ChannelID: "0", - Timestamps: []uint64{uint64(i + 1000), uint64(i + 1000)}, - RowIDs: []int64{int64(i), int64(i)}, + CollectionID: UniqueID(0), + PartitionID: defaultPartitionID, + SegmentID: int64(0), + ChannelID: "0", + Timestamps: []uint64{uint64(i + 1000), uint64(i + 1000)}, + RowIDs: []int64{int64(i), int64(i)}, RowData: []*commonpb.Blob{ {Value: rawData}, {Value: rawData}, diff --git a/internal/querynode/flow_graph_dd_node.go b/internal/querynode/flow_graph_dd_node.go index a61355a27483009affc6b34542e8899d5b046f92..3b92e206c1131b7b8a2861bd0303ef0d09846f1b 100644 --- a/internal/querynode/flow_graph_dd_node.go +++ b/internal/querynode/flow_graph_dd_node.go @@ -38,7 +38,7 @@ func (ddNode *ddNode) Operate(in []*Msg) []*Msg { var ddMsg = ddMsg{ collectionRecords: make(map[UniqueID][]metaOperateRecord), - partitionRecords: make(map[string][]metaOperateRecord), + partitionRecords: make(map[UniqueID][]metaOperateRecord), timeRange: TimeRange{ timestampMin: msMsg.TimestampMin(), timestampMax: msMsg.TimestampMax(), @@ -102,7 +102,8 @@ func (ddNode *ddNode) createCollection(msg *msgstream.CreateCollectionMsg) { } // add default partition - err = ddNode.replica.addPartition2(collectionID, Params.DefaultPartitionTag) + // TODO: allocate default partition id in master + err = ddNode.replica.addPartition(collectionID, UniqueID(2021)) if err != nil { log.Println(err) return @@ -118,12 +119,6 @@ func (ddNode *ddNode) createCollection(msg *msgstream.CreateCollectionMsg) { func (ddNode *ddNode) dropCollection(msg *msgstream.DropCollectionMsg) { collectionID := msg.CollectionID - //err := ddNode.replica.removeCollection(collectionID) - //if err != nil { - // log.Println(err) - // return - //} - ddNode.ddMsg.collectionRecords[collectionID] = append(ddNode.ddMsg.collectionRecords[collectionID], metaOperateRecord{ createOrDrop: false, @@ -135,17 +130,15 @@ func (ddNode *ddNode) dropCollection(msg *msgstream.DropCollectionMsg) { func (ddNode *ddNode) createPartition(msg *msgstream.CreatePartitionMsg) { collectionID := msg.CollectionID - partitionName := msg.PartitionName + partitionID := msg.PartitionID - err := ddNode.replica.addPartition2(collectionID, partitionName) - // TODO:: add partition by partitionID - //err := ddNode.replica.addPartition(collectionID, msg.PartitionID) + err := ddNode.replica.addPartition(collectionID, partitionID) if err != nil { log.Println(err) return } - ddNode.ddMsg.partitionRecords[partitionName] = append(ddNode.ddMsg.partitionRecords[partitionName], + ddNode.ddMsg.partitionRecords[partitionID] = append(ddNode.ddMsg.partitionRecords[partitionID], metaOperateRecord{ createOrDrop: true, timestamp: msg.Base.Timestamp, @@ -154,22 +147,16 @@ func (ddNode *ddNode) createPartition(msg *msgstream.CreatePartitionMsg) { func (ddNode *ddNode) dropPartition(msg *msgstream.DropPartitionMsg) { collectionID := msg.CollectionID - partitionName := msg.PartitionName - - //err := ddNode.replica.removePartition(collectionID, partitionTag) - //if err != nil { - // log.Println(err) - // return - //} + partitionID := msg.PartitionID - ddNode.ddMsg.partitionRecords[partitionName] = append(ddNode.ddMsg.partitionRecords[partitionName], + ddNode.ddMsg.partitionRecords[partitionID] = append(ddNode.ddMsg.partitionRecords[partitionID], metaOperateRecord{ createOrDrop: false, timestamp: msg.Base.Timestamp, }) ddNode.ddMsg.gcRecord.partitions = append(ddNode.ddMsg.gcRecord.partitions, partitionWithID{ - partitionTag: partitionName, + partitionID: partitionID, collectionID: collectionID, }) } diff --git a/internal/querynode/flow_graph_gc_node.go b/internal/querynode/flow_graph_gc_node.go index 076f44fcd93ef5f521e276db71caeac634077dcf..dc468ed079bcf63e1e4d2d6a841e9b5bc6f03236 100644 --- a/internal/querynode/flow_graph_gc_node.go +++ b/internal/querynode/flow_graph_gc_node.go @@ -21,27 +21,31 @@ func (gcNode *gcNode) Operate(in []*Msg) []*Msg { // TODO: add error handling } - gcMsg, ok := (*in[0]).(*gcMsg) + _, ok := (*in[0]).(*gcMsg) if !ok { log.Println("type assertion failed for gcMsg") // TODO: add error handling } - // drop collections - for _, collectionID := range gcMsg.gcRecord.collections { - err := gcNode.replica.removeCollection(collectionID) - if err != nil { - log.Println(err) - } - } + // Use `releasePartition` and `releaseCollection`, + // because if we drop collections or partitions here, query service doesn't know this behavior, + // which would lead the wrong result of `showCollections` or `showPartition` - // drop partitions - for _, partition := range gcMsg.gcRecord.partitions { - err := gcNode.replica.removePartition(partition.collectionID, partition.partitionTag) - if err != nil { - log.Println(err) - } - } + //// drop collections + //for _, collectionID := range gcMsg.gcRecord.collections { + // err := gcNode.replica.removeCollection(collectionID) + // if err != nil { + // log.Println(err) + // } + //} + // + //// drop partitions + //for _, partition := range gcMsg.gcRecord.partitions { + // err := gcNode.replica.removePartition(partition.collectionID, partition.partitionID) + // if err != nil { + // log.Println(err) + // } + //} return nil } diff --git a/internal/querynode/flow_graph_insert_node.go b/internal/querynode/flow_graph_insert_node.go index 10aa43d18cfc1a69659136ba022f89d948b9a6bb..dcb6216ae8d530d9dcded3963043e4d2904db533 100644 --- a/internal/querynode/flow_graph_insert_node.go +++ b/internal/querynode/flow_graph_insert_node.go @@ -81,7 +81,7 @@ func (iNode *insertNode) Operate(in []*Msg) []*Msg { // check if segment exists, if not, create this segment if !iNode.replica.hasSegment(task.SegmentID) { - err := iNode.replica.addSegment2(task.SegmentID, task.PartitionName, task.CollectionID, segTypeGrowing) + err := iNode.replica.addSegment(task.SegmentID, task.PartitionID, task.CollectionID, segTypeGrowing) if err != nil { log.Println(err) continue diff --git a/internal/querynode/flow_graph_message.go b/internal/querynode/flow_graph_message.go index 484614d10cf18d88d0d105fbc586c08e6e987ea5..c0f84777a2f7491f0792e88e60aab8247622909f 100644 --- a/internal/querynode/flow_graph_message.go +++ b/internal/querynode/flow_graph_message.go @@ -15,7 +15,7 @@ type key2SegMsg struct { type ddMsg struct { collectionRecords map[UniqueID][]metaOperateRecord - partitionRecords map[string][]metaOperateRecord + partitionRecords map[UniqueID][]metaOperateRecord gcRecord *gcRecord timeRange TimeRange } @@ -63,17 +63,16 @@ type DeletePreprocessData struct { count int32 } -// TODO: replace partitionWithID by partition id +// TODO: delete collection id type partitionWithID struct { - partitionTag string + partitionID UniqueID collectionID UniqueID } type gcRecord struct { // collections and partitions to be dropped collections []UniqueID - // TODO: use partition id - partitions []partitionWithID + partitions []partitionWithID } func (ksMsg *key2SegMsg) TimeTick() Timestamp { diff --git a/internal/querynode/load_service_test.go b/internal/querynode/load_service_test.go index f7e61d660674947fae316c8ce870ab7ac9a7850b..5e00e4f097d47b84a9af3feb3bf29792fbaaa567 100644 --- a/internal/querynode/load_service_test.go +++ b/internal/querynode/load_service_test.go @@ -19,6 +19,7 @@ import ( "github.com/zilliztech/milvus-distributed/internal/msgstream" "github.com/zilliztech/milvus-distributed/internal/msgstream/pulsarms" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" + "github.com/zilliztech/milvus-distributed/internal/proto/etcdpb" "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" "github.com/zilliztech/milvus-distributed/internal/proto/milvuspb" "github.com/zilliztech/milvus-distributed/internal/proto/schemapb" @@ -89,7 +90,7 @@ import ( // SourceID: 0, // }, // CollectionID: UniqueID(collectionID), -// PartitionName: "default", +// PartitionID: defaultPartitionID, // SegmentID: segmentID, // ChannelID: "0", // Timestamps: timestamps, @@ -173,8 +174,6 @@ import ( // log.Print("marshal placeholderGroup failed") // } // query := milvuspb.SearchRequest{ -// CollectionName: "collection0", -// PartitionNames: []string{"default"}, // Dsl: dslString, // PlaceholderGroup: placeGroupByte, // } @@ -425,7 +424,7 @@ import ( // SourceID: 0, // }, // CollectionID: UniqueID(collectionID), -// PartitionName: "default", +// PartitionID: defaultPartitionID, // SegmentID: segmentID, // ChannelID: "0", // Timestamps: timestamps, @@ -498,8 +497,6 @@ import ( // log.Print("marshal placeholderGroup failed") // } // query := milvuspb.SearchRequest{ -// CollectionName: "collection0", -// PartitionNames: []string{"default"}, // Dsl: dslString, // PlaceholderGroup: placeGroupByte, // } @@ -674,6 +671,72 @@ import ( //} /////////////////////////////////////////////////////////////////////////////////////////////////////////// +func genETCDCollectionMeta(collectionID UniqueID, isBinary bool) *etcdpb.CollectionMeta { + var fieldVec schemapb.FieldSchema + if isBinary { + fieldVec = schemapb.FieldSchema{ + FieldID: UniqueID(100), + Name: "vec", + IsPrimaryKey: false, + DataType: schemapb.DataType_VECTOR_BINARY, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: "dim", + Value: "128", + }, + }, + IndexParams: []*commonpb.KeyValuePair{ + { + Key: "metric_type", + Value: "JACCARD", + }, + }, + } + } else { + fieldVec = schemapb.FieldSchema{ + FieldID: UniqueID(100), + Name: "vec", + IsPrimaryKey: false, + DataType: schemapb.DataType_VECTOR_FLOAT, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: "dim", + Value: "16", + }, + }, + IndexParams: []*commonpb.KeyValuePair{ + { + Key: "metric_type", + Value: "L2", + }, + }, + } + } + + fieldInt := schemapb.FieldSchema{ + FieldID: UniqueID(101), + Name: "age", + IsPrimaryKey: false, + DataType: schemapb.DataType_INT32, + } + + schema := schemapb.CollectionSchema{ + AutoID: true, + Fields: []*schemapb.FieldSchema{ + &fieldVec, &fieldInt, + }, + } + + collectionMeta := etcdpb.CollectionMeta{ + ID: collectionID, + Schema: &schema, + CreateTime: Timestamp(0), + PartitionIDs: []UniqueID{defaultPartitionID}, + } + + return &collectionMeta +} + func generateInsertBinLog(collectionID UniqueID, partitionID UniqueID, segmentID UniqueID, keyPrefix string) ([]*internalpb2.StringList, []int64, error) { const ( msgLength = 1000 @@ -725,7 +788,7 @@ func generateInsertBinLog(collectionID UniqueID, partitionID UniqueID, segmentID } // buffer data to binLogs - collMeta := genTestCollectionMeta(collectionID, false) + collMeta := genETCDCollectionMeta(collectionID, false) collMeta.Schema.Fields = append(collMeta.Schema.Fields, &schemapb.FieldSchema{ FieldID: 0, Name: "uid", @@ -870,7 +933,7 @@ func generateIndex(segmentID UniqueID) ([]string, error) { return indexPaths, nil } -func doInsert(ctx context.Context, collectionID UniqueID, partitionTag string, segmentID UniqueID) error { +func doInsert(ctx context.Context, collectionID UniqueID, partitionID UniqueID, segmentID UniqueID) error { const msgLength = 1000 const DIM = 16 @@ -906,12 +969,12 @@ func doInsert(ctx context.Context, collectionID UniqueID, partitionTag string, s Timestamp: uint64(i + 1000), SourceID: 0, }, - CollectionID: collectionID, - PartitionName: partitionTag, - SegmentID: segmentID, - ChannelID: "0", - Timestamps: []uint64{uint64(i + 1000)}, - RowIDs: []int64{int64(i)}, + CollectionID: collectionID, + PartitionID: partitionID, + SegmentID: segmentID, + ChannelID: "0", + Timestamps: []uint64{uint64(i + 1000)}, + RowIDs: []int64{int64(i)}, RowData: []*commonpb.Blob{ {Value: rawData}, }, diff --git a/internal/querynode/meta_service.go b/internal/querynode/meta_service.go index 6d12213c7129f17627bfb7b5c6428dde75a01044..d17194c41c2d5326e44e8e1a15034717b3052780 100644 --- a/internal/querynode/meta_service.go +++ b/internal/querynode/meta_service.go @@ -6,16 +6,14 @@ import ( "log" "path" "reflect" - "strconv" "strings" "time" "github.com/golang/protobuf/proto" - "go.etcd.io/etcd/clientv3" - "go.etcd.io/etcd/mvcc/mvccpb" - etcdkv "github.com/zilliztech/milvus-distributed/internal/kv/etcd" + "github.com/zilliztech/milvus-distributed/internal/proto/datapb" "github.com/zilliztech/milvus-distributed/internal/proto/etcdpb" + "go.etcd.io/etcd/clientv3" ) const ( @@ -91,23 +89,7 @@ func isSegmentObj(key string) bool { return index == 0 } -func isSegmentChannelRangeInQueryNodeChannelRange(segment *etcdpb.SegmentMeta) bool { - if segment.ChannelStart > segment.ChannelEnd { - log.Printf("Illegal segment channel range") - return false - } - - var queryNodeChannelStart = Params.InsertChannelRange[0] - var queryNodeChannelEnd = Params.InsertChannelRange[1] - - if segment.ChannelStart >= int32(queryNodeChannelStart) && segment.ChannelEnd <= int32(queryNodeChannelEnd) { - return true - } - - return false -} - -func printCollectionStruct(obj *etcdpb.CollectionMeta) { +func printCollectionStruct(obj *etcdpb.CollectionInfo) { v := reflect.ValueOf(obj) v = reflect.Indirect(v) typeOfS := v.Type() @@ -120,7 +102,7 @@ func printCollectionStruct(obj *etcdpb.CollectionMeta) { } } -func printSegmentStruct(obj *etcdpb.SegmentMeta) { +func printSegmentStruct(obj *datapb.SegmentInfo) { v := reflect.ValueOf(obj) v = reflect.Indirect(v) typeOfS := v.Type() @@ -140,8 +122,8 @@ func (mService *metaService) processCollectionCreate(id string, value string) { if err != nil { log.Println(err) } - for _, partitionTag := range col.PartitionTags { - err = mService.replica.addPartition2(col.ID, partitionTag) + for _, partitionID := range col.PartitionIDs { + err = mService.replica.addPartition(col.ID, partitionID) if err != nil { log.Println(err) } @@ -153,14 +135,11 @@ func (mService *metaService) processSegmentCreate(id string, value string) { //println("Create Segment: ", id) seg := mService.segmentUnmarshal(value) - if !isSegmentChannelRangeInQueryNodeChannelRange(seg) { - log.Println("Illegal segment channel range") - return - } // TODO: what if seg == nil? We need to notify master and return rpc request failed if seg != nil { - err := mService.replica.addSegment2(seg.SegmentID, seg.PartitionTag, seg.CollectionID, segTypeGrowing) + // TODO: get partition id from segment meta + err := mService.replica.addSegment(seg.SegmentID, seg.PartitionID, seg.CollectionID, segTypeGrowing) if err != nil { log.Println(err) return @@ -181,122 +160,6 @@ func (mService *metaService) processCreate(key string, msg string) { } } -func (mService *metaService) processSegmentModify(id string, value string) { - seg := mService.segmentUnmarshal(value) - - if !isSegmentChannelRangeInQueryNodeChannelRange(seg) { - return - } - - if seg != nil { - targetSegment, err := mService.replica.getSegmentByID(seg.SegmentID) - if err != nil { - log.Println(err) - return - } - - // TODO: do modify - fmt.Println(targetSegment) - } -} - -func (mService *metaService) processCollectionModify(id string, value string) { - //println("Modify Collection: ", id) - - col := mService.collectionUnmarshal(value) - if col != nil { - err := mService.replica.addPartitionsByCollectionMeta(col) - if err != nil { - log.Println(err) - } - err = mService.replica.removePartitionsByCollectionMeta(col) - if err != nil { - log.Println(err) - } - } -} - -func (mService *metaService) processModify(key string, msg string) { - if isCollectionObj(key) { - objID := GetCollectionObjID(key) - mService.processCollectionModify(objID, msg) - } else if isSegmentObj(key) { - objID := GetSegmentObjID(key) - mService.processSegmentModify(objID, msg) - } else { - println("can not process modify msg:", key) - } -} - -func (mService *metaService) processSegmentDelete(id string) { - //println("Delete segment: ", id) - - var segmentID, err = strconv.ParseInt(id, 10, 64) - if err != nil { - log.Println("Cannot parse segment id:" + id) - } - - err = mService.replica.removeSegment(segmentID) - if err != nil { - log.Println(err) - return - } -} - -func (mService *metaService) processCollectionDelete(id string) { - //println("Delete collection: ", id) - - var collectionID, err = strconv.ParseInt(id, 10, 64) - if err != nil { - log.Println("Cannot parse collection id:" + id) - } - - err = mService.replica.removeCollection(collectionID) - if err != nil { - log.Println(err) - return - } -} - -func (mService *metaService) processDelete(key string) { - //println("process delete") - - if isCollectionObj(key) { - objID := GetCollectionObjID(key) - mService.processCollectionDelete(objID) - } else if isSegmentObj(key) { - objID := GetSegmentObjID(key) - mService.processSegmentDelete(objID) - } else { - println("can not process delete msg:", key) - } -} - -func (mService *metaService) processResp(resp clientv3.WatchResponse) error { - err := resp.Err() - if err != nil { - return err - } - - for _, ev := range resp.Events { - if ev.IsCreate() { - key := string(ev.Kv.Key) - msg := string(ev.Kv.Value) - mService.processCreate(key, msg) - } else if ev.IsModify() { - key := string(ev.Kv.Key) - msg := string(ev.Kv.Value) - mService.processModify(key, msg) - } else if ev.Type == mvccpb.DELETE { - key := string(ev.Kv.Key) - mService.processDelete(key) - } else { - println("Unrecognized etcd msg!") - } - } - return nil -} - func (mService *metaService) loadCollections() error { keys, values, err := mService.kvBase.LoadWithPrefix(CollectionPrefix) if err != nil { @@ -326,8 +189,8 @@ func (mService *metaService) loadSegments() error { } //----------------------------------------------------------------------- Unmarshal and Marshal -func (mService *metaService) collectionUnmarshal(value string) *etcdpb.CollectionMeta { - col := etcdpb.CollectionMeta{} +func (mService *metaService) collectionUnmarshal(value string) *etcdpb.CollectionInfo { + col := etcdpb.CollectionInfo{} err := proto.UnmarshalText(value, &col) if err != nil { log.Println(err) @@ -336,7 +199,7 @@ func (mService *metaService) collectionUnmarshal(value string) *etcdpb.Collectio return &col } -func (mService *metaService) collectionMarshal(col *etcdpb.CollectionMeta) string { +func (mService *metaService) collectionMarshal(col *etcdpb.CollectionInfo) string { value := proto.MarshalTextString(col) if value == "" { log.Println("marshal collection failed") @@ -345,8 +208,8 @@ func (mService *metaService) collectionMarshal(col *etcdpb.CollectionMeta) strin return value } -func (mService *metaService) segmentUnmarshal(value string) *etcdpb.SegmentMeta { - seg := etcdpb.SegmentMeta{} +func (mService *metaService) segmentUnmarshal(value string) *datapb.SegmentInfo { + seg := datapb.SegmentInfo{} err := proto.UnmarshalText(value, &seg) if err != nil { log.Println(err) diff --git a/internal/querynode/meta_service_test.go b/internal/querynode/meta_service_test.go index f47660a3de7774b696daee53379ed6df1383b895..e58ddfcf79476731e499ae151890aef73ca3e845 100644 --- a/internal/querynode/meta_service_test.go +++ b/internal/querynode/meta_service_test.go @@ -1,12 +1,11 @@ package querynode import ( - "math" "testing" "github.com/stretchr/testify/assert" - "github.com/zilliztech/milvus-distributed/internal/proto/etcdpb" + "github.com/zilliztech/milvus-distributed/internal/proto/datapb" ) func TestMetaService_start(t *testing.T) { @@ -65,36 +64,6 @@ func TestMetaService_isSegmentObj(t *testing.T) { assert.Equal(t, b2, false) } -func TestMetaService_isSegmentChannelRangeInQueryNodeChannelRange(t *testing.T) { - var s = etcdpb.SegmentMeta{ - SegmentID: UniqueID(0), - CollectionID: UniqueID(0), - PartitionTag: "partition0", - ChannelStart: 0, - ChannelEnd: 1, - OpenTime: Timestamp(0), - CloseTime: Timestamp(math.MaxUint64), - NumRows: UniqueID(0), - } - - var b = isSegmentChannelRangeInQueryNodeChannelRange(&s) - assert.Equal(t, b, true) - - s = etcdpb.SegmentMeta{ - SegmentID: UniqueID(0), - CollectionID: UniqueID(0), - PartitionTag: "partition0", - ChannelStart: 128, - ChannelEnd: 256, - OpenTime: Timestamp(0), - CloseTime: Timestamp(math.MaxUint64), - NumRows: UniqueID(0), - } - - b = isSegmentChannelRangeInQueryNodeChannelRange(&s) - assert.Equal(t, b, false) -} - func TestMetaService_printCollectionStruct(t *testing.T) { collectionID := UniqueID(0) collectionMeta := genTestCollectionMeta(collectionID, false) @@ -102,14 +71,11 @@ func TestMetaService_printCollectionStruct(t *testing.T) { } func TestMetaService_printSegmentStruct(t *testing.T) { - var s = etcdpb.SegmentMeta{ + var s = datapb.SegmentInfo{ SegmentID: UniqueID(0), CollectionID: UniqueID(0), - PartitionTag: "partition0", - ChannelStart: 128, - ChannelEnd: 256, + PartitionID: defaultPartitionID, OpenTime: Timestamp(0), - CloseTime: Timestamp(math.MaxUint64), NumRows: UniqueID(0), } @@ -146,8 +112,7 @@ func TestMetaService_processCollectionCreate(t *testing.T) { > > > - segmentIDs: 0 - partition_tags: "default" + partitionIDs: 2021 ` node.metaService.processCollectionCreate(id, value) @@ -168,10 +133,7 @@ func TestMetaService_processSegmentCreate(t *testing.T) { node.metaService = newMetaService(node.queryNodeLoopCtx, node.replica) id := "0" - value := `partition_tag: "default" - channel_start: 0 - channel_end: 1 - close_time: 18446744073709551615 + value := `partitionID: 2021 ` (*node.metaService).processSegmentCreate(id, value) @@ -212,8 +174,7 @@ func TestMetaService_processCreate(t *testing.T) { > > > - segmentIDs: 0 - partition_tags: "default" + partitionIDs: 2021 ` (*node.metaService).processCreate(key1, msg1) @@ -225,10 +186,7 @@ func TestMetaService_processCreate(t *testing.T) { assert.Equal(t, collection.ID(), UniqueID(0)) key2 := Params.MetaRootPath + "/segment/0" - msg2 := `partition_tag: "default" - channel_start: 0 - channel_end: 1 - close_time: 18446744073709551615 + msg2 := `partitionID: 2021 ` (*node.metaService).processCreate(key2, msg2) @@ -238,430 +196,6 @@ func TestMetaService_processCreate(t *testing.T) { node.Stop() } -func TestMetaService_processSegmentModify(t *testing.T) { - node := newQueryNodeMock() - collectionID := UniqueID(0) - segmentID := UniqueID(0) - initTestMeta(t, node, collectionID, segmentID) - node.metaService = newMetaService(node.queryNodeLoopCtx, node.replica) - - id := "0" - value := `partition_tag: "default" - channel_start: 0 - channel_end: 1 - close_time: 18446744073709551615 - ` - - (*node.metaService).processSegmentCreate(id, value) - s, err := node.replica.getSegmentByID(segmentID) - assert.NoError(t, err) - assert.Equal(t, s.segmentID, segmentID) - - newValue := `partition_tag: "default" - channel_start: 0 - channel_end: 1 - close_time: 18446744073709551615 - ` - - // TODO: modify segment for testing processCollectionModify - (*node.metaService).processSegmentModify(id, newValue) - seg, err := node.replica.getSegmentByID(segmentID) - assert.NoError(t, err) - assert.Equal(t, seg.segmentID, segmentID) - node.Stop() -} - -func TestMetaService_processCollectionModify(t *testing.T) { - node := newQueryNodeMock() - node.metaService = newMetaService(node.queryNodeLoopCtx, node.replica) - - id := "0" - value := `schema: < - name: "test" - fields: < - fieldID:100 - name: "vec" - data_type: VECTOR_FLOAT - type_params: < - key: "dim" - value: "16" - > - index_params: < - key: "metric_type" - value: "L2" - > - > - fields: < - fieldID:101 - name: "age" - data_type: INT32 - type_params: < - key: "dim" - value: "1" - > - > - > - segmentIDs: 0 - partition_tags: "p0" - partition_tags: "p1" - partition_tags: "p2" - ` - - (*node.metaService).processCollectionCreate(id, value) - collectionNum := node.replica.getCollectionNum() - assert.Equal(t, collectionNum, 1) - - collection, err := node.replica.getCollectionByID(UniqueID(0)) - assert.NoError(t, err) - assert.Equal(t, collection.ID(), UniqueID(0)) - - partitionNum, err := node.replica.getPartitionNum(UniqueID(0)) - assert.NoError(t, err) - assert.Equal(t, partitionNum, 3) - - hasPartition := node.replica.hasPartition(UniqueID(0), "p0") - assert.Equal(t, hasPartition, true) - hasPartition = node.replica.hasPartition(UniqueID(0), "p1") - assert.Equal(t, hasPartition, true) - hasPartition = node.replica.hasPartition(UniqueID(0), "p2") - assert.Equal(t, hasPartition, true) - hasPartition = node.replica.hasPartition(UniqueID(0), "p3") - assert.Equal(t, hasPartition, false) - - newValue := `schema: < - name: "test" - fields: < - fieldID:100 - name: "vec" - data_type: VECTOR_FLOAT - type_params: < - key: "dim" - value: "16" - > - index_params: < - key: "metric_type" - value: "L2" - > - > - fields: < - fieldID:101 - name: "age" - data_type: INT32 - type_params: < - key: "dim" - value: "1" - > - > - > - segmentIDs: 0 - partition_tags: "p1" - partition_tags: "p2" - partition_tags: "p3" - ` - - (*node.metaService).processCollectionModify(id, newValue) - collection, err = node.replica.getCollectionByID(UniqueID(0)) - assert.NoError(t, err) - assert.Equal(t, collection.ID(), UniqueID(0)) - - partitionNum, err = node.replica.getPartitionNum(UniqueID(0)) - assert.NoError(t, err) - assert.Equal(t, partitionNum, 3) - - hasPartition = node.replica.hasPartition(UniqueID(0), "p0") - assert.Equal(t, hasPartition, false) - hasPartition = node.replica.hasPartition(UniqueID(0), "p1") - assert.Equal(t, hasPartition, true) - hasPartition = node.replica.hasPartition(UniqueID(0), "p2") - assert.Equal(t, hasPartition, true) - hasPartition = node.replica.hasPartition(UniqueID(0), "p3") - assert.Equal(t, hasPartition, true) - node.Stop() -} - -func TestMetaService_processModify(t *testing.T) { - node := newQueryNodeMock() - node.metaService = newMetaService(node.queryNodeLoopCtx, node.replica) - - key1 := Params.MetaRootPath + "/collection/0" - msg1 := `schema: < - name: "test" - fields: < - fieldID:100 - name: "vec" - data_type: VECTOR_FLOAT - type_params: < - key: "dim" - value: "16" - > - index_params: < - key: "metric_type" - value: "L2" - > - > - fields: < - fieldID:101 - name: "age" - data_type: INT32 - type_params: < - key: "dim" - value: "1" - > - > - > - segmentIDs: 0 - partition_tags: "p0" - partition_tags: "p1" - partition_tags: "p2" - ` - - (*node.metaService).processCreate(key1, msg1) - collectionNum := node.replica.getCollectionNum() - assert.Equal(t, collectionNum, 1) - - collection, err := node.replica.getCollectionByID(UniqueID(0)) - assert.NoError(t, err) - assert.Equal(t, collection.ID(), UniqueID(0)) - - partitionNum, err := node.replica.getPartitionNum(UniqueID(0)) - assert.NoError(t, err) - assert.Equal(t, partitionNum, 3) - - hasPartition := node.replica.hasPartition(UniqueID(0), "p0") - assert.Equal(t, hasPartition, true) - hasPartition = node.replica.hasPartition(UniqueID(0), "p1") - assert.Equal(t, hasPartition, true) - hasPartition = node.replica.hasPartition(UniqueID(0), "p2") - assert.Equal(t, hasPartition, true) - hasPartition = node.replica.hasPartition(UniqueID(0), "p3") - assert.Equal(t, hasPartition, false) - - key2 := Params.MetaRootPath + "/segment/0" - msg2 := `partition_tag: "p1" - channel_start: 0 - channel_end: 1 - close_time: 18446744073709551615 - ` - - (*node.metaService).processCreate(key2, msg2) - s, err := node.replica.getSegmentByID(UniqueID(0)) - assert.NoError(t, err) - assert.Equal(t, s.segmentID, UniqueID(0)) - - // modify - // TODO: use different index for testing processCollectionModify - msg3 := `schema: < - name: "test" - fields: < - fieldID:100 - name: "vec" - data_type: VECTOR_FLOAT - type_params: < - key: "dim" - value: "16" - > - index_params: < - key: "metric_type" - value: "L2" - > - > - fields: < - fieldID:101 - name: "age" - data_type: INT32 - type_params: < - key: "dim" - value: "1" - > - > - > - segmentIDs: 0 - partition_tags: "p1" - partition_tags: "p2" - partition_tags: "p3" - ` - - (*node.metaService).processModify(key1, msg3) - collection, err = node.replica.getCollectionByID(UniqueID(0)) - assert.NoError(t, err) - assert.Equal(t, collection.ID(), UniqueID(0)) - - partitionNum, err = node.replica.getPartitionNum(UniqueID(0)) - assert.NoError(t, err) - assert.Equal(t, partitionNum, 3) - - hasPartition = node.replica.hasPartition(UniqueID(0), "p0") - assert.Equal(t, hasPartition, false) - hasPartition = node.replica.hasPartition(UniqueID(0), "p1") - assert.Equal(t, hasPartition, true) - hasPartition = node.replica.hasPartition(UniqueID(0), "p2") - assert.Equal(t, hasPartition, true) - hasPartition = node.replica.hasPartition(UniqueID(0), "p3") - assert.Equal(t, hasPartition, true) - - msg4 := `partition_tag: "p1" - channel_start: 0 - channel_end: 1 - close_time: 18446744073709551615 - ` - - (*node.metaService).processModify(key2, msg4) - seg, err := node.replica.getSegmentByID(UniqueID(0)) - assert.NoError(t, err) - assert.Equal(t, seg.segmentID, UniqueID(0)) - node.Stop() -} - -func TestMetaService_processSegmentDelete(t *testing.T) { - node := newQueryNodeMock() - collectionID := UniqueID(0) - initTestMeta(t, node, collectionID, 0) - node.metaService = newMetaService(node.queryNodeLoopCtx, node.replica) - - id := "0" - value := `partition_tag: "default" - channel_start: 0 - channel_end: 1 - close_time: 18446744073709551615 - ` - - (*node.metaService).processSegmentCreate(id, value) - seg, err := node.replica.getSegmentByID(UniqueID(0)) - assert.NoError(t, err) - assert.Equal(t, seg.segmentID, UniqueID(0)) - - (*node.metaService).processSegmentDelete("0") - mapSize := node.replica.getSegmentNum() - assert.Equal(t, mapSize, 0) - node.Stop() -} - -func TestMetaService_processCollectionDelete(t *testing.T) { - node := newQueryNodeMock() - node.metaService = newMetaService(node.queryNodeLoopCtx, node.replica) - - id := "0" - value := `schema: < - name: "test" - fields: < - fieldID:100 - name: "vec" - data_type: VECTOR_FLOAT - type_params: < - key: "dim" - value: "16" - > - index_params: < - key: "metric_type" - value: "L2" - > - > - fields: < - fieldID:101 - name: "age" - data_type: INT32 - type_params: < - key: "dim" - value: "1" - > - > - > - segmentIDs: 0 - partition_tags: "default" - ` - - (*node.metaService).processCollectionCreate(id, value) - collectionNum := node.replica.getCollectionNum() - assert.Equal(t, collectionNum, 1) - - collection, err := node.replica.getCollectionByID(UniqueID(0)) - assert.NoError(t, err) - assert.Equal(t, collection.ID(), UniqueID(0)) - - (*node.metaService).processCollectionDelete(id) - collectionNum = node.replica.getCollectionNum() - assert.Equal(t, collectionNum, 0) - node.Stop() -} - -func TestMetaService_processDelete(t *testing.T) { - node := newQueryNodeMock() - node.metaService = newMetaService(node.queryNodeLoopCtx, node.replica) - - key1 := Params.MetaRootPath + "/collection/0" - msg1 := `schema: < - name: "test" - fields: < - fieldID:100 - name: "vec" - data_type: VECTOR_FLOAT - type_params: < - key: "dim" - value: "16" - > - index_params: < - key: "metric_type" - value: "L2" - > - > - fields: < - fieldID:101 - name: "age" - data_type: INT32 - type_params: < - key: "dim" - value: "1" - > - > - > - segmentIDs: 0 - partition_tags: "default" - ` - - (*node.metaService).processCreate(key1, msg1) - collectionNum := node.replica.getCollectionNum() - assert.Equal(t, collectionNum, 1) - - collection, err := node.replica.getCollectionByID(UniqueID(0)) - assert.NoError(t, err) - assert.Equal(t, collection.ID(), UniqueID(0)) - - key2 := Params.MetaRootPath + "/segment/0" - msg2 := `partition_tag: "default" - channel_start: 0 - channel_end: 1 - close_time: 18446744073709551615 - ` - - (*node.metaService).processCreate(key2, msg2) - seg, err := node.replica.getSegmentByID(UniqueID(0)) - assert.NoError(t, err) - assert.Equal(t, seg.segmentID, UniqueID(0)) - - (*node.metaService).processDelete(key1) - collectionsSize := node.replica.getCollectionNum() - assert.Equal(t, collectionsSize, 0) - - mapSize := node.replica.getSegmentNum() - assert.Equal(t, mapSize, 0) - node.Stop() -} - -func TestMetaService_processResp(t *testing.T) { - node := newQueryNodeMock() - node.metaService = newMetaService(node.queryNodeLoopCtx, node.replica) - - metaChan := (*node.metaService).kvBase.WatchWithPrefix("") - - select { - case <-node.queryNodeLoopCtx.Done(): - return - case resp := <-metaChan: - _ = (*node.metaService).processResp(resp) - } - node.Stop() -} - func TestMetaService_loadCollections(t *testing.T) { node := newQueryNodeMock() node.metaService = newMetaService(node.queryNodeLoopCtx, node.replica) diff --git a/internal/querynode/param_table.go b/internal/querynode/param_table.go index e8a9a0a2fe9b6b75c5aff1647772b0a64d1bc28f..a2254f1bdb710f62e83878171aca97c13860e050 100644 --- a/internal/querynode/param_table.go +++ b/internal/querynode/param_table.go @@ -58,10 +58,9 @@ type ParamTable struct { StatsChannelName string StatsReceiveBufSize int64 - GracefulTime int64 - MsgChannelSubName string - DefaultPartitionTag string - SliceIndex int + GracefulTime int64 + MsgChannelSubName string + SliceIndex int } var Params ParamTable @@ -133,7 +132,6 @@ func (p *ParamTable) Init() { p.initGracefulTime() p.initMsgChannelSubName() - p.initDefaultPartitionTag() p.initSliceIndex() p.initFlowGraphMaxQueueLength() @@ -458,15 +456,6 @@ func (p *ParamTable) initDDChannelNames() { p.DDChannelNames = ret } -func (p *ParamTable) initDefaultPartitionTag() { - defaultTag, err := p.Load("common.defaultPartitionTag") - if err != nil { - panic(err) - } - - p.DefaultPartitionTag = defaultTag -} - func (p *ParamTable) initSliceIndex() { queryNodeID := p.QueryNodeID queryNodeIDList := p.QueryNodeIDList() diff --git a/internal/querynode/param_table_test.go b/internal/querynode/param_table_test.go index 04bf7807f28c7bcc3df7de9449e31c56417cad51..cbd6b4524505db1cdcc52db56c48013ccc764281 100644 --- a/internal/querynode/param_table_test.go +++ b/internal/querynode/param_table_test.go @@ -165,8 +165,3 @@ func TestParamTable_ddChannelName(t *testing.T) { contains := strings.Contains(names[0], "data-definition-0") assert.Equal(t, contains, true) } - -func TestParamTable_defaultPartitionTag(t *testing.T) { - tag := Params.DefaultPartitionTag - assert.Equal(t, tag, "_default") -} diff --git a/internal/querynode/partition.go b/internal/querynode/partition.go index 5b835b86163e4e1c85f1edb267373b305bfa9271..199d6caa556cd76772de632ddfee19f770367a7a 100644 --- a/internal/querynode/partition.go +++ b/internal/querynode/partition.go @@ -13,33 +13,19 @@ package querynode import "C" type Partition struct { - partitionTag string - id UniqueID - segments []*Segment - enableDM bool + id UniqueID + segments []*Segment + enableDM bool } func (p *Partition) ID() UniqueID { return p.id } -func (p *Partition) Tag() string { - return (*p).partitionTag -} - func (p *Partition) Segments() *[]*Segment { return &(*p).segments } -func newPartition2(partitionTag string) *Partition { - var newPartition = &Partition{ - partitionTag: partitionTag, - enableDM: false, - } - - return newPartition -} - func newPartition(partitionID UniqueID) *Partition { var newPartition = &Partition{ id: partitionID, diff --git a/internal/querynode/partition_test.go b/internal/querynode/partition_test.go index 707cc5513a620822bed3084d02004bfc96320409..328141e90b988b93bc438c16fb3b297bc92e01c2 100644 --- a/internal/querynode/partition_test.go +++ b/internal/querynode/partition_test.go @@ -19,7 +19,7 @@ func TestPartition_Segments(t *testing.T) { const segmentNum = 3 for i := 0; i < segmentNum; i++ { - err := node.replica.addSegment2(UniqueID(i), targetPartition.partitionTag, collection.ID(), segTypeGrowing) + err := node.replica.addSegment(UniqueID(i), targetPartition.ID(), collection.ID(), segTypeGrowing) assert.NoError(t, err) } @@ -28,7 +28,7 @@ func TestPartition_Segments(t *testing.T) { } func TestPartition_newPartition(t *testing.T) { - partitionTag := "default" - partition := newPartition2(partitionTag) - assert.Equal(t, partition.partitionTag, partitionTag) + partitionID := defaultPartitionID + partition := newPartition(partitionID) + assert.Equal(t, partition.ID(), defaultPartitionID) } diff --git a/internal/querynode/query_node_test.go b/internal/querynode/query_node_test.go index e45af4e7f161944c697ecbd2bd347497f51a4fd0..5b2a8210363ab843126503982c3c1853b71999e6 100644 --- a/internal/querynode/query_node_test.go +++ b/internal/querynode/query_node_test.go @@ -20,6 +20,8 @@ import ( const ctxTimeInMillisecond = 5000 const closeWithDeadline = true +const defaultPartitionID = UniqueID(2021) + type queryServiceMock struct{} func setup() { @@ -27,7 +29,7 @@ func setup() { Params.MetaRootPath = "/etcd/test/root/querynode" } -func genTestCollectionMeta(collectionID UniqueID, isBinary bool) *etcdpb.CollectionMeta { +func genTestCollectionMeta(collectionID UniqueID, isBinary bool) *etcdpb.CollectionInfo { var fieldVec schemapb.FieldSchema if isBinary { fieldVec = schemapb.FieldSchema{ @@ -76,21 +78,18 @@ func genTestCollectionMeta(collectionID UniqueID, isBinary bool) *etcdpb.Collect DataType: schemapb.DataType_INT32, } - collectionName := rand.Int63n(1000000) schema := schemapb.CollectionSchema{ - Name: "collection-" + strconv.FormatInt(collectionName, 10), AutoID: true, Fields: []*schemapb.FieldSchema{ &fieldVec, &fieldInt, }, } - collectionMeta := etcdpb.CollectionMeta{ - ID: collectionID, - Schema: &schema, - CreateTime: Timestamp(0), - SegmentIDs: []UniqueID{0}, - PartitionTags: []string{"default"}, + collectionMeta := etcdpb.CollectionInfo{ + ID: collectionID, + Schema: &schema, + CreateTime: Timestamp(0), + PartitionIDs: []UniqueID{defaultPartitionID}, } return &collectionMeta @@ -111,10 +110,10 @@ func initTestMeta(t *testing.T, node *QueryNode, collectionID UniqueID, segmentI assert.Equal(t, collection.ID(), collectionID) assert.Equal(t, node.replica.getCollectionNum(), 1) - err = node.replica.addPartition2(collection.ID(), collectionMeta.PartitionTags[0]) + err = node.replica.addPartition(collection.ID(), collectionMeta.PartitionIDs[0]) assert.NoError(t, err) - err = node.replica.addSegment2(segmentID, collectionMeta.PartitionTags[0], collectionID, segTypeGrowing) + err = node.replica.addSegment(segmentID, collectionMeta.PartitionIDs[0], collectionID, segTypeGrowing) assert.NoError(t, err) } diff --git a/internal/querynode/reduce_test.go b/internal/querynode/reduce_test.go index 2964e959b58c4415c3406773ac441262a40288eb..b89af4a43209884a09ea3f1b7f15b9f2f29e5df8 100644 --- a/internal/querynode/reduce_test.go +++ b/internal/querynode/reduce_test.go @@ -18,7 +18,7 @@ func TestReduce_AllFunc(t *testing.T) { collectionMeta := genTestCollectionMeta(collectionID, false) collection := newCollection(collectionMeta.ID, collectionMeta.Schema) - segment := newSegment2(collection, segmentID, Params.DefaultPartitionTag, collectionID, segTypeGrowing) + segment := newSegment(collection, segmentID, defaultPartitionID, collectionID, segTypeGrowing) const DIM = 16 var vec = [DIM]float32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16} diff --git a/internal/querynode/search_service.go b/internal/querynode/search_service.go index 11890968acbef027ea0ba5d6ecd4fc2efbfd9b3e..217a2c8814f937644e0237d23fcce69e267c0296 100644 --- a/internal/querynode/search_service.go +++ b/internal/querynode/search_service.go @@ -239,7 +239,6 @@ func (ss *searchService) search(msg msgstream.TsMsg) error { return errors.New("unmarshal query failed") } collectionID := searchMsg.CollectionID - partitionTagsInQuery := query.PartitionNames collection, err := ss.replica.getCollectionByID(collectionID) if err != nil { span.LogFields(oplog.Error(err)) @@ -263,29 +262,30 @@ func (ss *searchService) search(msg msgstream.TsMsg) error { searchResults := make([]*SearchResult, 0) matchedSegments := make([]*Segment, 0) - //fmt.Println("search msg's partitionTag = ", partitionTagsInQuery) + //fmt.Println("search msg's partitionID = ", partitionIDsInQuery) - var partitionTagsInCol []string + var partitionIDsInCol []UniqueID for _, partition := range collection.partitions { - partitionTag := partition.partitionTag - partitionTagsInCol = append(partitionTagsInCol, partitionTag) + partitionID := partition.ID() + partitionIDsInCol = append(partitionIDsInCol, partitionID) } - var searchPartitionTag []string - if len(partitionTagsInQuery) == 0 { - searchPartitionTag = partitionTagsInCol + var searchPartitionIDs []UniqueID + partitionIDsInQuery := searchMsg.PartitionIDs + if len(partitionIDsInQuery) == 0 { + searchPartitionIDs = partitionIDsInCol } else { - for _, tag := range partitionTagsInCol { - for _, toMatchTag := range partitionTagsInQuery { - re := regexp.MustCompile("^" + toMatchTag + "$") - if re.MatchString(tag) { - searchPartitionTag = append(searchPartitionTag, tag) + for _, id := range partitionIDsInCol { + for _, toMatchID := range partitionIDsInQuery { + re := regexp.MustCompile("^" + strconv.FormatInt(toMatchID, 10) + "$") + if re.MatchString(strconv.FormatInt(id, 10)) { + searchPartitionIDs = append(searchPartitionIDs, id) } } } } - for _, partitionTag := range searchPartitionTag { - partition, _ := ss.replica.getPartitionByTag(collectionID, partitionTag) + for _, partitionID := range searchPartitionIDs { + partition, _ := ss.replica.getPartitionByID(collectionID, partitionID) for _, segment := range partition.segments { //fmt.Println("dsl = ", dsl) diff --git a/internal/querynode/search_service_test.go b/internal/querynode/search_service_test.go index 558cfd9cf2c98f4c855bdb2c47141cac4ab33092..759d4e23bd11614dea9d9d22aaba0a12c1502982 100644 --- a/internal/querynode/search_service_test.go +++ b/internal/querynode/search_service_test.go @@ -61,8 +61,6 @@ func TestSearch_Search(t *testing.T) { } query := milvuspb.SearchRequest{ - CollectionName: "collection0", - PartitionNames: []string{"default"}, Dsl: dslString, PlaceholderGroup: placeGroupByte, } @@ -137,12 +135,12 @@ func TestSearch_Search(t *testing.T) { Timestamp: uint64(10 + 1000), SourceID: 0, }, - CollectionID: UniqueID(0), - PartitionName: "default", - SegmentID: int64(0), - ChannelID: "0", - Timestamps: []uint64{uint64(i + 1000)}, - RowIDs: []int64{int64(i)}, + CollectionID: UniqueID(0), + PartitionID: defaultPartitionID, + SegmentID: int64(0), + ChannelID: "0", + Timestamps: []uint64{uint64(i + 1000)}, + RowIDs: []int64{int64(i)}, RowData: []*commonpb.Blob{ {Value: rawData}, }, @@ -256,8 +254,6 @@ func TestSearch_SearchMultiSegments(t *testing.T) { } query := milvuspb.SearchRequest{ - CollectionName: "collection0", - PartitionNames: []string{"default"}, Dsl: dslString, PlaceholderGroup: placeGroupByte, } @@ -336,12 +332,12 @@ func TestSearch_SearchMultiSegments(t *testing.T) { Timestamp: uint64(i + 1000), SourceID: 0, }, - CollectionID: UniqueID(0), - PartitionName: "default", - SegmentID: int64(segmentID), - ChannelID: "0", - Timestamps: []uint64{uint64(i + 1000)}, - RowIDs: []int64{int64(i)}, + CollectionID: UniqueID(0), + PartitionID: defaultPartitionID, + SegmentID: int64(segmentID), + ChannelID: "0", + Timestamps: []uint64{uint64(i + 1000)}, + RowIDs: []int64{int64(i)}, RowData: []*commonpb.Blob{ {Value: rawData}, }, diff --git a/internal/querynode/segment.go b/internal/querynode/segment.go index 98e594cabbb71d4eab4497483dbedde491942065..924c66d3e1a65b2fd4556d0d3a129a65ef560214 100644 --- a/internal/querynode/segment.go +++ b/internal/querynode/segment.go @@ -36,7 +36,6 @@ type Segment struct { segmentPtr C.CSegmentInterface segmentID UniqueID - partitionTag string // TODO: use partitionID partitionID UniqueID collectionID UniqueID lastMemSize int64 @@ -81,25 +80,6 @@ func (s *Segment) getType() segmentType { return s.segmentType } -func newSegment2(collection *Collection, segmentID int64, partitionTag string, collectionID UniqueID, segType segmentType) *Segment { - /* - CSegmentInterface - NewSegment(CCollection collection, uint64_t segment_id, SegmentType seg_type); - */ - initIndexParam := make(map[int64]indexParam) - segmentPtr := C.NewSegment(collection.collectionPtr, C.ulong(segmentID), segType) - var newSegment = &Segment{ - segmentPtr: segmentPtr, - segmentType: segType, - segmentID: segmentID, - partitionTag: partitionTag, - collectionID: collectionID, - indexParam: initIndexParam, - } - - return newSegment -} - func newSegment(collection *Collection, segmentID int64, partitionID UniqueID, collectionID UniqueID, segType segmentType) *Segment { /* CSegmentInterface diff --git a/internal/querynode/segment_test.go b/internal/querynode/segment_test.go index 8243144e386547222472af44c02e747fbcafa841..4b021656a4c216ebf4523103a5c274fdb6113db7 100644 --- a/internal/querynode/segment_test.go +++ b/internal/querynode/segment_test.go @@ -22,7 +22,7 @@ func TestSegment_newSegment(t *testing.T) { assert.Equal(t, collection.ID(), collectionID) segmentID := UniqueID(0) - segment := newSegment2(collection, segmentID, Params.DefaultPartitionTag, collectionID, segTypeGrowing) + segment := newSegment(collection, segmentID, defaultPartitionID, collectionID, segTypeGrowing) assert.Equal(t, segmentID, segment.segmentID) deleteSegment(segment) deleteCollection(collection) @@ -36,7 +36,7 @@ func TestSegment_deleteSegment(t *testing.T) { assert.Equal(t, collection.ID(), collectionID) segmentID := UniqueID(0) - segment := newSegment2(collection, segmentID, Params.DefaultPartitionTag, collectionID, segTypeGrowing) + segment := newSegment(collection, segmentID, defaultPartitionID, collectionID, segTypeGrowing) assert.Equal(t, segmentID, segment.segmentID) deleteSegment(segment) @@ -52,7 +52,7 @@ func TestSegment_getRowCount(t *testing.T) { assert.Equal(t, collection.ID(), collectionID) segmentID := UniqueID(0) - segment := newSegment2(collection, segmentID, Params.DefaultPartitionTag, collectionID, segTypeGrowing) + segment := newSegment(collection, segmentID, defaultPartitionID, collectionID, segTypeGrowing) assert.Equal(t, segmentID, segment.segmentID) ids := []int64{1, 2, 3} @@ -99,7 +99,7 @@ func TestSegment_getDeletedCount(t *testing.T) { assert.Equal(t, collection.ID(), collectionID) segmentID := UniqueID(0) - segment := newSegment2(collection, segmentID, Params.DefaultPartitionTag, collectionID, segTypeGrowing) + segment := newSegment(collection, segmentID, defaultPartitionID, collectionID, segTypeGrowing) assert.Equal(t, segmentID, segment.segmentID) ids := []int64{1, 2, 3} @@ -152,7 +152,7 @@ func TestSegment_getMemSize(t *testing.T) { assert.Equal(t, collection.ID(), collectionID) segmentID := UniqueID(0) - segment := newSegment2(collection, segmentID, Params.DefaultPartitionTag, collectionID, segTypeGrowing) + segment := newSegment(collection, segmentID, defaultPartitionID, collectionID, segTypeGrowing) assert.Equal(t, segmentID, segment.segmentID) ids := []int64{1, 2, 3} @@ -199,7 +199,7 @@ func TestSegment_segmentInsert(t *testing.T) { collection := newCollection(collectionMeta.ID, collectionMeta.Schema) assert.Equal(t, collection.ID(), collectionID) segmentID := UniqueID(0) - segment := newSegment2(collection, segmentID, Params.DefaultPartitionTag, collectionID, segTypeGrowing) + segment := newSegment(collection, segmentID, defaultPartitionID, collectionID, segTypeGrowing) assert.Equal(t, segmentID, segment.segmentID) ids := []int64{1, 2, 3} @@ -242,7 +242,7 @@ func TestSegment_segmentDelete(t *testing.T) { assert.Equal(t, collection.ID(), collectionID) segmentID := UniqueID(0) - segment := newSegment2(collection, segmentID, Params.DefaultPartitionTag, collectionID, segTypeGrowing) + segment := newSegment(collection, segmentID, defaultPartitionID, collectionID, segTypeGrowing) assert.Equal(t, segmentID, segment.segmentID) ids := []int64{1, 2, 3} @@ -291,7 +291,7 @@ func TestSegment_segmentSearch(t *testing.T) { assert.Equal(t, collection.ID(), collectionID) segmentID := UniqueID(0) - segment := newSegment2(collection, segmentID, Params.DefaultPartitionTag, collectionID, segTypeGrowing) + segment := newSegment(collection, segmentID, defaultPartitionID, collectionID, segTypeGrowing) assert.Equal(t, segmentID, segment.segmentID) ids := []int64{1, 2, 3} @@ -372,7 +372,7 @@ func TestSegment_segmentPreInsert(t *testing.T) { assert.Equal(t, collection.ID(), collectionID) segmentID := UniqueID(0) - segment := newSegment2(collection, segmentID, Params.DefaultPartitionTag, collectionID, segTypeGrowing) + segment := newSegment(collection, segmentID, defaultPartitionID, collectionID, segTypeGrowing) assert.Equal(t, segmentID, segment.segmentID) const DIM = 16 @@ -410,7 +410,7 @@ func TestSegment_segmentPreDelete(t *testing.T) { assert.Equal(t, collection.ID(), collectionID) segmentID := UniqueID(0) - segment := newSegment2(collection, segmentID, Params.DefaultPartitionTag, collectionID, segTypeGrowing) + segment := newSegment(collection, segmentID, defaultPartitionID, collectionID, segTypeGrowing) assert.Equal(t, segmentID, segment.segmentID) ids := []int64{1, 2, 3}