diff --git a/internal/distributed/querynode/mock.go b/internal/distributed/querynode/mock.go index 28ebe80168bdd8ad3d15dbc9773942dedeb39faf..7922e1698f215ef8ec1e6d14a91147798eb332c8 100644 --- a/internal/distributed/querynode/mock.go +++ b/internal/distributed/querynode/mock.go @@ -124,6 +124,26 @@ func (data *DataServiceMock) GetInsertBinlogPaths(req *datapb.InsertBinlogPathRe return rsp, nil } +func (data *DataServiceMock) GetSegmentStates(req *datapb.SegmentStatesRequest) (*datapb.SegmentStatesResponse, error) { + segmentGrowingInfo := &datapb.SegmentStateInfo{ + State: commonpb.SegmentState_SegmentGrowing, + } + segmentFlushedInfo := &datapb.SegmentStateInfo{ + State: commonpb.SegmentState_SegmentFlushed, + } + + if data.Count < 10 { + data.Count++ + return &datapb.SegmentStatesResponse{ + States: []*datapb.SegmentStateInfo{segmentGrowingInfo}, + }, nil + } + + return &datapb.SegmentStatesResponse{ + States: []*datapb.SegmentStateInfo{segmentFlushedInfo}, + }, nil +} + type IndexServiceMock struct { Count int } diff --git a/internal/querynode/collection_replica.go b/internal/querynode/collection_replica.go index 932d529fff8e7efeac8fb7834a43717e6ca06d7e..c16adaf3de0840aa90d311dba9cbc1aefc8c2af5 100644 --- a/internal/querynode/collection_replica.go +++ b/internal/querynode/collection_replica.go @@ -373,6 +373,9 @@ func (colReplica *collectionReplicaImpl) addSegmentPrivate(segmentID UniqueID, p return err } + if colReplica.hasSegmentPrivate(segmentID) { + return nil + } partition.addSegmentID(segmentID) var newSegment = newSegment(collection, segmentID, partitionID, collectionID, segType) colReplica.segments[segmentID] = newSegment diff --git a/internal/querynode/load_service.go b/internal/querynode/load_service.go index 73043849ff7d0b1c4dc745cb6ab3ca9969c61da2..be17c2a78ffd7fa4eabd5a39dbdf2613dcc2591f 100644 --- a/internal/querynode/load_service.go +++ b/internal/querynode/load_service.go @@ -7,7 +7,9 @@ import ( "sync" "time" + "github.com/zilliztech/milvus-distributed/internal/errors" "github.com/zilliztech/milvus-distributed/internal/msgstream" + "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" ) const loadingCheckInterval = 3 @@ -79,7 +81,12 @@ func (s *loadService) loadSegment(collectionID UniqueID, partitionID UniqueID, s } } for _, segmentID := range segmentIDs { - err := s.loadSegmentInternal(collectionID, partitionID, segmentID, fieldIDs) + err := s.segLoader.replica.addSegment(segmentID, partitionID, collectionID, segTypeGrowing) + if err != nil { + log.Println(err) + continue + } + err = s.loadSegmentInternal(collectionID, partitionID, segmentID, fieldIDs) if err != nil { log.Println(err) continue @@ -90,6 +97,14 @@ func (s *loadService) loadSegment(collectionID UniqueID, partitionID UniqueID, s func (s *loadService) loadSegmentInternal(collectionID UniqueID, partitionID UniqueID, segmentID UniqueID, fieldIDs []int64) error { // create segment + statesResp, err := s.segLoader.GetSegmentStates(segmentID) + if err != nil { + return err + } + if statesResp.States[0].State != commonpb.SegmentState_SegmentFlushed { + return errors.New("segment not flush done") + } + collection, err := s.segLoader.replica.getCollectionByID(collectionID) if err != nil { return err diff --git a/internal/querynode/segment_loader.go b/internal/querynode/segment_loader.go index b2a18961c332d8709dccd25ba7e7943192047cce..545e5a8a2249a2ca8fff70e8c7f64a5d17920efb 100644 --- a/internal/querynode/segment_loader.go +++ b/internal/querynode/segment_loader.go @@ -65,6 +65,25 @@ func (loader *segmentLoader) getInsertBinlogPaths(segmentID UniqueID) ([]*intern return pathResponse.Paths, pathResponse.FieldIDs, nil } +func (loader *segmentLoader) GetSegmentStates(segmentID UniqueID) (*datapb.SegmentStatesResponse, error) { + if loader.dataClient == nil { + return nil, errors.New("null data service client") + } + + segmentStatesRequest := &datapb.SegmentStatesRequest{ + SegmentIDs: []int64{segmentID}, + } + statesResponse, err := loader.dataClient.GetSegmentStates(segmentStatesRequest) + if err != nil || statesResponse.Status.ErrorCode != commonpb.ErrorCode_SUCCESS { + return nil, err + } + if len(statesResponse.States) != 1 { + return nil, errors.New("segment states' len should be 1") + } + + return statesResponse, nil +} + func (loader *segmentLoader) filterOutVectorFields(fieldIDs []int64, vectorFields []int64) []int64 { containsFunc := func(s []int64, e int64) bool { for _, a := range s { diff --git a/internal/querynode/type_def.go b/internal/querynode/type_def.go index 957ee0ca85b41067d3212d5990a7e5f3c9d2d2d9..f2a8880cc2cf921c8e319cbd0e74e417c499959b 100644 --- a/internal/querynode/type_def.go +++ b/internal/querynode/type_def.go @@ -31,6 +31,7 @@ type QueryServiceInterface interface { type DataServiceInterface interface { GetInsertBinlogPaths(req *datapb.InsertBinlogPathRequest) (*datapb.InsertBinlogPathsResponse, error) + GetSegmentStates(req *datapb.SegmentStatesRequest) (*datapb.SegmentStatesResponse, error) } type IndexServiceInterface interface {