diff --git a/internal/dataservice/watcher.go b/internal/dataservice/watcher.go index ac3a1ff1f80cb3fae6e9cd40b46b0c7d1d2e56ff..7fef27d8419af9bf17914d2f8b60bf6e918a1923 100644 --- a/internal/dataservice/watcher.go +++ b/internal/dataservice/watcher.go @@ -85,6 +85,7 @@ func (watcher *dataNodeTimeTickWatcher) handleTimeTickMsg(msg *msgstream.TimeTic if err != nil { return err } + coll2Segs := make(map[UniqueID][]UniqueID) for _, id := range segments { expired, err := watcher.allocator.IsAllocationsExpired(ctx, id, msg.Base.Timestamp) if err != nil { @@ -92,7 +93,7 @@ func (watcher *dataNodeTimeTickWatcher) handleTimeTickMsg(msg *msgstream.TimeTic continue } if expired { - segmentInfo, err := watcher.meta.GetSegment(id) + sInfo, err := watcher.meta.GetSegment(id) if err != nil { log.Error("get segment from meta error", zap.Int64("segmentID", id), zap.Error(err)) continue @@ -101,18 +102,22 @@ func (watcher *dataNodeTimeTickWatcher) handleTimeTickMsg(msg *msgstream.TimeTic log.Error("set segment state error", zap.Int64("segmentID", id), zap.Error(err)) continue } - watcher.cluster.FlushSegment(&datapb.FlushSegmentsRequest{ - Base: &commonpb.MsgBase{ - MsgType: commonpb.MsgType_Flush, - MsgID: -1, // todo add msg id - Timestamp: 0, // todo - SourceID: Params.NodeID, - }, - CollectionID: segmentInfo.CollectionID, - SegmentIDs: []int64{segmentInfo.SegmentID}, - }) + collID, segID := sInfo.CollectionID, sInfo.SegmentID + coll2Segs[collID] = append(coll2Segs[collID], segID) watcher.allocator.DropSegment(ctx, id) } } + for collID, segIDs := range coll2Segs { + watcher.cluster.FlushSegment(&datapb.FlushSegmentsRequest{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_Flush, + MsgID: -1, // todo add msg id + Timestamp: 0, // todo + SourceID: Params.NodeID, + }, + CollectionID: collID, + SegmentIDs: segIDs, + }) + } return nil }