From e9ee9a273e4429e025b65c8bcfdeb1c50f81c6b8 Mon Sep 17 00:00:00 2001
From: bigsheeper <yihao.dai@zilliz.com>
Date: Sat, 6 Feb 2021 11:35:35 +0800
Subject: [PATCH] Refactor load service and check insertion binLog periodically

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
---
 internal/querynode/collection_replica.go | 114 +++--
 internal/querynode/index_loader.go       | 409 +++++++++++++++
 internal/querynode/load_service.go       | 617 ++---------------------
 internal/querynode/load_service_test.go  |   6 +-
 internal/querynode/partition.go          |   4 +-
 internal/querynode/query_node.go         |  10 +-
 internal/querynode/segment_loader.go     | 211 ++++++++
 7 files changed, 769 insertions(+), 602 deletions(-)
 create mode 100644 internal/querynode/index_loader.go
 create mode 100644 internal/querynode/segment_loader.go

diff --git a/internal/querynode/collection_replica.go b/internal/querynode/collection_replica.go
index 1cd007902..bf9c3e02a 100644
--- a/internal/querynode/collection_replica.go
+++ b/internal/querynode/collection_replica.go
@@ -30,6 +30,7 @@ import (
  */
 type collectionReplica interface {
 	// collection
+	getCollectionIDs() []UniqueID
 	addCollection(collectionID UniqueID, schema *schemapb.CollectionSchema) error
 	removeCollection(collectionID UniqueID) error
 	getCollectionByID(collectionID UniqueID) (*Collection, error)
@@ -37,7 +38,8 @@ type collectionReplica interface {
 	getCollectionNum() int
 	getPartitionIDs(collectionID UniqueID) ([]UniqueID, error)
 
-	getVecFieldsByCollectionID(collectionID UniqueID) ([]int64, error)
+	getVecFieldIDsByCollectionID(collectionID UniqueID) ([]int64, error)
+	getFieldIDsByCollectionID(collectionID UniqueID) ([]int64, error)
 
 	// partition
 	addPartition(collectionID UniqueID, partitionID UniqueID) error
@@ -47,9 +49,8 @@ type collectionReplica interface {
 	getPartitionNum() int
 	getSegmentIDs(partitionID UniqueID) ([]UniqueID, error)
 
-	enablePartitionDM(partitionID UniqueID) error
-	disablePartitionDM(partitionID UniqueID) error
-	getEnablePartitionDM(partitionID UniqueID) (bool, error)
+	enablePartition(partitionID UniqueID) error
+	disablePartition(partitionID UniqueID) error
 
 	// segment
 	addSegment(segmentID UniqueID, partitionID UniqueID, collectionID UniqueID, segType segmentType) error
@@ -59,7 +60,7 @@ type collectionReplica interface {
 	getSegmentNum() int
 
 	getSegmentStatistics() []*internalpb2.SegmentStats
-	getSealedSegments() ([]UniqueID, []UniqueID)
+	getEnabledSealedSegmentsBySegmentType(segType segmentType) ([]UniqueID, []UniqueID, []UniqueID)
 	replaceGrowingSegmentBySealedSegment(segment *Segment) error
 
 	getTSafe() tSafe
@@ -76,6 +77,16 @@ type collectionReplicaImpl struct {
 }
 
 //----------------------------------------------------------------------------------------------------- collection
+func (colReplica *collectionReplicaImpl) getCollectionIDs() []UniqueID {
+	colReplica.mu.RLock()
+	defer colReplica.mu.RUnlock()
+	collectionIDs := make([]UniqueID, 0)
+	for id := range colReplica.collections {
+		collectionIDs = append(collectionIDs, id)
+	}
+	return collectionIDs
+}
+
 func (colReplica *collectionReplicaImpl) addCollection(collectionID UniqueID, schema *schemapb.CollectionSchema) error {
 	colReplica.mu.Lock()
 	defer colReplica.mu.Unlock()
@@ -158,29 +169,59 @@ func (colReplica *collectionReplicaImpl) getPartitionIDs(collectionID UniqueID)
 	return collection.partitionIDs, nil
 }
 
-func (colReplica *collectionReplicaImpl) getVecFieldsByCollectionID(collectionID UniqueID) ([]int64, error) {
+func (colReplica *collectionReplicaImpl) getVecFieldIDsByCollectionID(collectionID UniqueID) ([]int64, error) {
 	colReplica.mu.RLock()
 	defer colReplica.mu.RUnlock()
 
-	collection, err := colReplica.getCollectionByIDPrivate(collectionID)
+	fields, err := colReplica.getFieldsByCollectionIDPrivate(collectionID)
 	if err != nil {
 		return nil, err
 	}
 
 	vecFields := make([]int64, 0)
-	for _, field := range collection.Schema().Fields {
+	for _, field := range fields {
 		if field.DataType == schemapb.DataType_VECTOR_BINARY || field.DataType == schemapb.DataType_VECTOR_FLOAT {
 			vecFields = append(vecFields, field.FieldID)
 		}
 	}
 
 	if len(vecFields) <= 0 {
-		return nil, errors.New("no vector field in segment " + strconv.FormatInt(collectionID, 10))
+		return nil, errors.New("no vector field in collection " + strconv.FormatInt(collectionID, 10))
 	}
 
 	return vecFields, nil
 }
 
+func (colReplica *collectionReplicaImpl) getFieldIDsByCollectionID(collectionID UniqueID) ([]int64, error) {
+	colReplica.mu.RLock()
+	defer colReplica.mu.RUnlock()
+
+	fields, err := colReplica.getFieldsByCollectionIDPrivate(collectionID)
+	if err != nil {
+		return nil, err
+	}
+
+	targetFields := make([]int64, 0)
+	for _, field := range fields {
+		targetFields = append(targetFields, field.FieldID)
+	}
+
+	return targetFields, nil
+}
+
+func (colReplica *collectionReplicaImpl) getFieldsByCollectionIDPrivate(collectionID UniqueID) ([]*schemapb.FieldSchema, error) {
+	collection, err := colReplica.getCollectionByIDPrivate(collectionID)
+	if err != nil {
+		return nil, err
+	}
+
+	if len(collection.Schema().Fields) <= 0 {
+		return nil, errors.New("no field in collection " + strconv.FormatInt(collectionID, 10))
+	}
+
+	return collection.Schema().Fields, nil
+}
+
 //----------------------------------------------------------------------------------------------------- partition
 func (colReplica *collectionReplicaImpl) addPartition(collectionID UniqueID, partitionID UniqueID) error {
 	colReplica.mu.Lock()
@@ -263,7 +304,10 @@ func (colReplica *collectionReplicaImpl) getPartitionNum() int {
 func (colReplica *collectionReplicaImpl) getSegmentIDs(partitionID UniqueID) ([]UniqueID, error) {
 	colReplica.mu.RLock()
 	defer colReplica.mu.RUnlock()
+	return colReplica.getSegmentIDsPrivate(partitionID)
+}
 
+func (colReplica *collectionReplicaImpl) getSegmentIDsPrivate(partitionID UniqueID) ([]UniqueID, error) {
 	partition, err2 := colReplica.getPartitionByIDPrivate(partitionID)
 	if err2 != nil {
 		return nil, err2
@@ -271,7 +315,7 @@ func (colReplica *collectionReplicaImpl) getSegmentIDs(partitionID UniqueID) ([]
 	return partition.segmentIDs, nil
 }
 
-func (colReplica *collectionReplicaImpl) enablePartitionDM(partitionID UniqueID) error {
+func (colReplica *collectionReplicaImpl) enablePartition(partitionID UniqueID) error {
 	colReplica.mu.Lock()
 	defer colReplica.mu.Unlock()
 
@@ -280,11 +324,11 @@ func (colReplica *collectionReplicaImpl) enablePartitionDM(partitionID UniqueID)
 		return err
 	}
 
-	partition.enableDM = true
+	partition.enable = true
 	return nil
 }
 
-func (colReplica *collectionReplicaImpl) disablePartitionDM(partitionID UniqueID) error {
+func (colReplica *collectionReplicaImpl) disablePartition(partitionID UniqueID) error {
 	colReplica.mu.Lock()
 	defer colReplica.mu.Unlock()
 
@@ -293,19 +337,18 @@ func (colReplica *collectionReplicaImpl) disablePartitionDM(partitionID UniqueID
 		return err
 	}
 
-	partition.enableDM = false
+	partition.enable = false
 	return nil
 }
 
-func (colReplica *collectionReplicaImpl) getEnablePartitionDM(partitionID UniqueID) (bool, error) {
-	colReplica.mu.Lock()
-	defer colReplica.mu.Unlock()
-
-	partition, err := colReplica.getPartitionByIDPrivate(partitionID)
-	if err != nil {
-		return false, err
+func (colReplica *collectionReplicaImpl) getEnabledPartitionIDsPrivate() []UniqueID {
+	partitionIDs := make([]UniqueID, 0)
+	for _, partition := range colReplica.partitions {
+		if partition.enable {
+			partitionIDs = append(partitionIDs, partition.partitionID)
+		}
 	}
-	return partition.enableDM, nil
+	return partitionIDs
 }
 
 //----------------------------------------------------------------------------------------------------- segment
@@ -414,20 +457,33 @@ func (colReplica *collectionReplicaImpl) getSegmentStatistics() []*internalpb2.S
 	return statisticData
 }
 
-func (colReplica *collectionReplicaImpl) getSealedSegments() ([]UniqueID, []UniqueID) {
+func (colReplica *collectionReplicaImpl) getEnabledSealedSegmentsBySegmentType(segType segmentType) ([]UniqueID, []UniqueID, []UniqueID) {
 	colReplica.mu.RLock()
 	defer colReplica.mu.RUnlock()
 
-	collectionIDs := make([]UniqueID, 0)
-	segmentIDs := make([]UniqueID, 0)
-	for k, v := range colReplica.segments {
-		if v.getType() == segTypeSealed {
-			collectionIDs = append(collectionIDs, v.collectionID)
-			segmentIDs = append(segmentIDs, k)
+	targetCollectionIDs := make([]UniqueID, 0)
+	targetPartitionIDs := make([]UniqueID, 0)
+	targetSegmentIDs := make([]UniqueID, 0)
+
+	for _, partitionID := range colReplica.getEnabledPartitionIDsPrivate() {
+		segmentIDs, err := colReplica.getSegmentIDsPrivate(partitionID)
+		if err != nil {
+			continue
+		}
+		for _, segmentID := range segmentIDs {
+			segment, err := colReplica.getSegmentByIDPrivate(segmentID)
+			if err != nil {
+				continue
+			}
+			if segment.getType() == segType {
+				targetCollectionIDs = append(targetCollectionIDs, segment.collectionID)
+				targetPartitionIDs = append(targetPartitionIDs, segment.collectionID)
+				targetSegmentIDs = append(targetSegmentIDs, segment.segmentID)
+			}
 		}
 	}
 
-	return collectionIDs, segmentIDs
+	return targetCollectionIDs, targetPartitionIDs, targetSegmentIDs
 }
 
 func (colReplica *collectionReplicaImpl) replaceGrowingSegmentBySealedSegment(segment *Segment) error {
diff --git a/internal/querynode/index_loader.go b/internal/querynode/index_loader.go
new file mode 100644
index 000000000..207386ad5
--- /dev/null
+++ b/internal/querynode/index_loader.go
@@ -0,0 +1,409 @@
+package querynode
+
+import (
+	"context"
+	"errors"
+	"fmt"
+	"log"
+	"path"
+	"sort"
+	"strconv"
+	"strings"
+	"sync"
+	"time"
+
+	"github.com/zilliztech/milvus-distributed/internal/kv"
+	minioKV "github.com/zilliztech/milvus-distributed/internal/kv/minio"
+	"github.com/zilliztech/milvus-distributed/internal/msgstream/util"
+	"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
+	"github.com/zilliztech/milvus-distributed/internal/proto/indexpb"
+	"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
+	"github.com/zilliztech/milvus-distributed/internal/proto/milvuspb"
+	"github.com/zilliztech/milvus-distributed/internal/storage"
+)
+
+type indexLoader struct {
+	replica collectionReplica
+
+	fieldIndexes   map[string][]*internalpb2.IndexStats
+	fieldStatsChan chan []*internalpb2.FieldStats
+
+	masterClient MasterServiceInterface
+	indexClient  IndexServiceInterface
+
+	kv kv.Base // minio kv
+}
+
+type loadIndex struct {
+	segmentID  UniqueID
+	fieldID    int64
+	indexPaths []string
+}
+
+func (loader *indexLoader) doLoadIndex(wg *sync.WaitGroup) {
+	collectionIDs, _, segmentIDs := loader.replica.getEnabledSealedSegmentsBySegmentType(segTypeSealed)
+	if len(collectionIDs) <= 0 {
+		return
+	}
+	fmt.Println("do load index for sealed segments:", segmentIDs)
+	for i := range collectionIDs {
+		// we don't need index id yet
+		_, buildID, err := loader.getIndexInfo(collectionIDs[i], segmentIDs[i])
+		if err != nil {
+			indexPaths, err := loader.getIndexPaths(buildID)
+			if err != nil {
+				log.Println(err)
+				continue
+			}
+			err = loader.loadIndexDelayed(collectionIDs[i], segmentIDs[i], indexPaths)
+			if err != nil {
+				log.Println(err)
+				continue
+			}
+		}
+	}
+	// sendQueryNodeStats
+	err := loader.sendQueryNodeStats()
+	if err != nil {
+		log.Println(err)
+		wg.Done()
+		return
+	}
+
+	wg.Done()
+}
+
+func (loader *indexLoader) execute(l *loadIndex) error {
+	// 1. use msg's index paths to get index bytes
+	var err error
+	var indexBuffer [][]byte
+	var indexParams indexParam
+	var indexName string
+	var indexID UniqueID
+	fn := func() error {
+		indexBuffer, indexParams, indexName, indexID, err = loader.loadIndex(l.indexPaths)
+		if err != nil {
+			return err
+		}
+		return nil
+	}
+	err = util.Retry(5, time.Millisecond*200, fn)
+	if err != nil {
+		return err
+	}
+	ok, err := loader.checkIndexReady(indexParams, l)
+	if err != nil {
+		return err
+	}
+	if ok {
+		// no error
+		return errors.New("")
+	}
+	// 2. use index bytes and index path to update segment
+	err = loader.updateSegmentIndex(indexParams, indexBuffer, l)
+	if err != nil {
+		return err
+	}
+	// 3. update segment index stats
+	err = loader.updateSegmentIndexStats(indexParams, indexName, indexID, l)
+	if err != nil {
+		return err
+	}
+	fmt.Println("load index done")
+	return nil
+}
+
+func (loader *indexLoader) printIndexParams(index []*commonpb.KeyValuePair) {
+	fmt.Println("=================================================")
+	for i := 0; i < len(index); i++ {
+		fmt.Println(index[i])
+	}
+}
+
+func (loader *indexLoader) indexParamsEqual(index1 []*commonpb.KeyValuePair, index2 []*commonpb.KeyValuePair) bool {
+	if len(index1) != len(index2) {
+		return false
+	}
+
+	for i := 0; i < len(index1); i++ {
+		kv1 := *index1[i]
+		kv2 := *index2[i]
+		if kv1.Key != kv2.Key || kv1.Value != kv2.Value {
+			return false
+		}
+	}
+
+	return true
+}
+
+func (loader *indexLoader) fieldsStatsIDs2Key(collectionID UniqueID, fieldID UniqueID) string {
+	return strconv.FormatInt(collectionID, 10) + "/" + strconv.FormatInt(fieldID, 10)
+}
+
+func (loader *indexLoader) fieldsStatsKey2IDs(key string) (UniqueID, UniqueID, error) {
+	ids := strings.Split(key, "/")
+	if len(ids) != 2 {
+		return 0, 0, errors.New("illegal fieldsStatsKey")
+	}
+	collectionID, err := strconv.ParseInt(ids[0], 10, 64)
+	if err != nil {
+		return 0, 0, err
+	}
+	fieldID, err := strconv.ParseInt(ids[1], 10, 64)
+	if err != nil {
+		return 0, 0, err
+	}
+	return collectionID, fieldID, nil
+}
+
+func (loader *indexLoader) updateSegmentIndexStats(indexParams indexParam, indexName string, indexID UniqueID, l *loadIndex) error {
+	targetSegment, err := loader.replica.getSegmentByID(l.segmentID)
+	if err != nil {
+		return err
+	}
+
+	fieldStatsKey := loader.fieldsStatsIDs2Key(targetSegment.collectionID, l.fieldID)
+	_, ok := loader.fieldIndexes[fieldStatsKey]
+	newIndexParams := make([]*commonpb.KeyValuePair, 0)
+	for k, v := range indexParams {
+		newIndexParams = append(newIndexParams, &commonpb.KeyValuePair{
+			Key:   k,
+			Value: v,
+		})
+	}
+
+	// sort index params by key
+	sort.Slice(newIndexParams, func(i, j int) bool { return newIndexParams[i].Key < newIndexParams[j].Key })
+	if !ok {
+		loader.fieldIndexes[fieldStatsKey] = make([]*internalpb2.IndexStats, 0)
+		loader.fieldIndexes[fieldStatsKey] = append(loader.fieldIndexes[fieldStatsKey],
+			&internalpb2.IndexStats{
+				IndexParams:        newIndexParams,
+				NumRelatedSegments: 1,
+			})
+	} else {
+		isNewIndex := true
+		for _, index := range loader.fieldIndexes[fieldStatsKey] {
+			if loader.indexParamsEqual(newIndexParams, index.IndexParams) {
+				index.NumRelatedSegments++
+				isNewIndex = false
+			}
+		}
+		if isNewIndex {
+			loader.fieldIndexes[fieldStatsKey] = append(loader.fieldIndexes[fieldStatsKey],
+				&internalpb2.IndexStats{
+					IndexParams:        newIndexParams,
+					NumRelatedSegments: 1,
+				})
+		}
+	}
+	err = targetSegment.setIndexParam(l.fieldID, newIndexParams)
+	if err != nil {
+		return err
+	}
+	targetSegment.setIndexName(indexName)
+	targetSegment.setIndexID(indexID)
+
+	return nil
+}
+
+func (loader *indexLoader) loadIndex(indexPath []string) ([][]byte, indexParam, string, UniqueID, error) {
+	index := make([][]byte, 0)
+
+	var indexParams indexParam
+	var indexName string
+	var indexID UniqueID
+	for _, p := range indexPath {
+		fmt.Println("load path = ", indexPath)
+		indexPiece, err := loader.kv.Load(p)
+		if err != nil {
+			return nil, nil, "", -1, err
+		}
+		// get index params when detecting indexParamPrefix
+		if path.Base(p) == storage.IndexParamsFile {
+			indexCodec := storage.NewIndexCodec()
+			_, indexParams, indexName, indexID, err = indexCodec.Deserialize([]*storage.Blob{
+				{
+					Key:   storage.IndexParamsFile,
+					Value: []byte(indexPiece),
+				},
+			})
+			if err != nil {
+				return nil, nil, "", -1, err
+			}
+		} else {
+			index = append(index, []byte(indexPiece))
+		}
+	}
+
+	if len(indexParams) <= 0 {
+		return nil, nil, "", -1, errors.New("cannot find index param")
+	}
+	return index, indexParams, indexName, indexID, nil
+}
+
+func (loader *indexLoader) updateSegmentIndex(indexParams indexParam, bytesIndex [][]byte, l *loadIndex) error {
+	segment, err := loader.replica.getSegmentByID(l.segmentID)
+	if err != nil {
+		return err
+	}
+
+	loadIndexInfo, err := newLoadIndexInfo()
+	defer deleteLoadIndexInfo(loadIndexInfo)
+	if err != nil {
+		return err
+	}
+	err = loadIndexInfo.appendFieldInfo(l.fieldID)
+	if err != nil {
+		return err
+	}
+	for k, v := range indexParams {
+		err = loadIndexInfo.appendIndexParam(k, v)
+		if err != nil {
+			return err
+		}
+	}
+	err = loadIndexInfo.appendIndex(bytesIndex, l.indexPaths)
+	if err != nil {
+		return err
+	}
+	return segment.updateSegmentIndex(loadIndexInfo)
+}
+
+func (loader *indexLoader) sendQueryNodeStats() error {
+	resultFieldsStats := make([]*internalpb2.FieldStats, 0)
+	for fieldStatsKey, indexStats := range loader.fieldIndexes {
+		colID, fieldID, err := loader.fieldsStatsKey2IDs(fieldStatsKey)
+		if err != nil {
+			return err
+		}
+		fieldStats := internalpb2.FieldStats{
+			CollectionID: colID,
+			FieldID:      fieldID,
+			IndexStats:   indexStats,
+		}
+		resultFieldsStats = append(resultFieldsStats, &fieldStats)
+	}
+
+	loader.fieldStatsChan <- resultFieldsStats
+	fmt.Println("sent field stats")
+	return nil
+}
+
+func (loader *indexLoader) checkIndexReady(indexParams indexParam, l *loadIndex) (bool, error) {
+	segment, err := loader.replica.getSegmentByID(l.segmentID)
+	if err != nil {
+		return false, err
+	}
+	if !segment.matchIndexParam(l.fieldID, indexParams) {
+		return false, nil
+	}
+	return true, nil
+}
+
+func (loader *indexLoader) getIndexInfo(collectionID UniqueID, segmentID UniqueID) (UniqueID, UniqueID, error) {
+	req := &milvuspb.DescribeSegmentRequest{
+		Base: &commonpb.MsgBase{
+			MsgType: commonpb.MsgType_kDescribeSegment,
+		},
+		CollectionID: collectionID,
+		SegmentID:    segmentID,
+	}
+	response, err := loader.masterClient.DescribeSegment(req)
+	if err != nil {
+		return 0, 0, err
+	}
+	return response.IndexID, response.BuildID, nil
+}
+
+func (loader *indexLoader) getIndexPaths(indexBuildID UniqueID) ([]string, error) {
+	if loader.indexClient == nil {
+		return nil, errors.New("null index service client")
+	}
+
+	indexFilePathRequest := &indexpb.IndexFilePathsRequest{
+		// TODO: rename indexIDs to buildIDs
+		IndexBuildIDs: []UniqueID{indexBuildID},
+	}
+	pathResponse, err := loader.indexClient.GetIndexFilePaths(indexFilePathRequest)
+	if err != nil || pathResponse.Status.ErrorCode != commonpb.ErrorCode_SUCCESS {
+		return nil, err
+	}
+
+	if len(pathResponse.FilePaths) <= 0 {
+		return nil, errors.New("illegal index file paths")
+	}
+
+	return pathResponse.FilePaths[0].IndexFilePaths, nil
+}
+
+func (loader *indexLoader) loadIndexImmediate(segment *Segment, indexPaths []string) error {
+	// get vector field ids from schema to load index
+	vecFieldIDs, err := loader.replica.getVecFieldIDsByCollectionID(segment.collectionID)
+	if err != nil {
+		return err
+	}
+	for _, id := range vecFieldIDs {
+		l := &loadIndex{
+			segmentID:  segment.ID(),
+			fieldID:    id,
+			indexPaths: indexPaths,
+		}
+
+		err = loader.execute(l)
+		if err != nil {
+			return err
+		}
+	}
+	return nil
+}
+
+func (loader *indexLoader) loadIndexDelayed(collectionID, segmentID UniqueID, indexPaths []string) error {
+	// get vector field ids from schema to load index
+	vecFieldIDs, err := loader.replica.getVecFieldIDsByCollectionID(collectionID)
+	if err != nil {
+		return err
+	}
+	for _, id := range vecFieldIDs {
+		l := &loadIndex{
+			segmentID:  segmentID,
+			fieldID:    id,
+			indexPaths: indexPaths,
+		}
+
+		err = loader.execute(l)
+		if err != nil {
+			return err
+		}
+	}
+
+	return nil
+}
+
+func newIndexLoader(ctx context.Context, masterClient MasterServiceInterface, indexClient IndexServiceInterface, replica collectionReplica) *indexLoader {
+	option := &minioKV.Option{
+		Address:           Params.MinioEndPoint,
+		AccessKeyID:       Params.MinioAccessKeyID,
+		SecretAccessKeyID: Params.MinioSecretAccessKey,
+		UseSSL:            Params.MinioUseSSLStr,
+		CreateBucket:      true,
+		BucketName:        Params.MinioBucketName,
+	}
+
+	client, err := minioKV.NewMinIOKV(ctx, option)
+	if err != nil {
+		panic(err)
+	}
+
+	return &indexLoader{
+		replica: replica,
+
+		fieldIndexes:   make(map[string][]*internalpb2.IndexStats),
+		fieldStatsChan: make(chan []*internalpb2.FieldStats, 1),
+
+		masterClient: masterClient,
+		indexClient:  indexClient,
+
+		kv: client,
+	}
+}
diff --git a/internal/querynode/load_service.go b/internal/querynode/load_service.go
index 076bd57cb..1f3268399 100644
--- a/internal/querynode/load_service.go
+++ b/internal/querynode/load_service.go
@@ -2,25 +2,12 @@ package querynode
 
 import (
 	"context"
-	"errors"
 	"fmt"
 	"log"
-	"path"
-	"sort"
-	"strconv"
-	"strings"
+	"sync"
 	"time"
 
-	"github.com/zilliztech/milvus-distributed/internal/kv"
-	minioKV "github.com/zilliztech/milvus-distributed/internal/kv/minio"
 	"github.com/zilliztech/milvus-distributed/internal/msgstream"
-	"github.com/zilliztech/milvus-distributed/internal/msgstream/util"
-	"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
-	"github.com/zilliztech/milvus-distributed/internal/proto/datapb"
-	"github.com/zilliztech/milvus-distributed/internal/proto/indexpb"
-	"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
-	"github.com/zilliztech/milvus-distributed/internal/proto/milvuspb"
-	"github.com/zilliztech/milvus-distributed/internal/storage"
 )
 
 const indexCheckInterval = 1
@@ -29,622 +16,126 @@ type loadService struct {
 	ctx    context.Context
 	cancel context.CancelFunc
 
-	replica collectionReplica
-
-	fieldIndexes   map[string][]*internalpb2.IndexStats
-	fieldStatsChan chan []*internalpb2.FieldStats
-
-	dmStream msgstream.MsgStream
-
-	masterClient MasterServiceInterface
-	dataClient   DataServiceInterface
-	indexClient  IndexServiceInterface
-
-	kv     kv.Base // minio kv
-	iCodec *storage.InsertCodec
-}
-
-type loadIndex struct {
-	segmentID  UniqueID
-	fieldID    int64
-	indexPaths []string
+	segLoader *segmentLoader
 }
 
 // -------------------------------------------- load index -------------------------------------------- //
 func (s *loadService) start() {
+	wg := &sync.WaitGroup{}
 	for {
 		select {
 		case <-s.ctx.Done():
 			return
 		case <-time.After(indexCheckInterval * time.Second):
-			collectionIDs, segmentIDs := s.replica.getSealedSegments()
-			if len(collectionIDs) <= 0 {
-				continue
-			}
-			fmt.Println("do load index for segments:", segmentIDs)
-			for i := range collectionIDs {
-				// we don't need index id yet
-				_, buildID, err := s.getIndexInfo(collectionIDs[i], segmentIDs[i])
-				if err != nil {
-					indexPaths, err := s.getIndexPaths(buildID)
-					if err != nil {
-						log.Println(err)
-						continue
-					}
-					err = s.loadIndexDelayed(collectionIDs[i], segmentIDs[i], indexPaths)
-					if err != nil {
-						log.Println(err)
-						continue
-					}
-				}
-			}
-			// sendQueryNodeStats
-			err := s.sendQueryNodeStats()
-			if err != nil {
-				log.Println(err)
-				continue
-			}
-		}
-	}
-}
-
-func (s *loadService) execute(l *loadIndex) error {
-	// 1. use msg's index paths to get index bytes
-	var err error
-	var indexBuffer [][]byte
-	var indexParams indexParam
-	var indexName string
-	var indexID UniqueID
-	fn := func() error {
-		indexBuffer, indexParams, indexName, indexID, err = s.loadIndex(l.indexPaths)
-		if err != nil {
-			return err
+			wg.Add(2)
+			go s.segLoader.indexLoader.doLoadIndex(wg)
+			go s.loadSegmentActively(wg)
+			wg.Wait()
 		}
-		return nil
-	}
-	err = util.Retry(5, time.Millisecond*200, fn)
-	if err != nil {
-		return err
-	}
-	ok, err := s.checkIndexReady(indexParams, l)
-	if err != nil {
-		return err
-	}
-	if ok {
-		// no error
-		return errors.New("")
-	}
-	// 2. use index bytes and index path to update segment
-	err = s.updateSegmentIndex(indexParams, indexBuffer, l)
-	if err != nil {
-		return err
-	}
-	// 3. update segment index stats
-	err = s.updateSegmentIndexStats(indexParams, indexName, indexID, l)
-	if err != nil {
-		return err
 	}
-	fmt.Println("load index done")
-	return nil
 }
 
 func (s *loadService) close() {
 	s.cancel()
 }
 
-func (s *loadService) printIndexParams(index []*commonpb.KeyValuePair) {
-	fmt.Println("=================================================")
-	for i := 0; i < len(index); i++ {
-		fmt.Println(index[i])
-	}
-}
-
-func (s *loadService) indexParamsEqual(index1 []*commonpb.KeyValuePair, index2 []*commonpb.KeyValuePair) bool {
-	if len(index1) != len(index2) {
-		return false
-	}
-
-	for i := 0; i < len(index1); i++ {
-		kv1 := *index1[i]
-		kv2 := *index2[i]
-		if kv1.Key != kv2.Key || kv1.Value != kv2.Value {
-			return false
-		}
-	}
-
-	return true
-}
-
-func (s *loadService) fieldsStatsIDs2Key(collectionID UniqueID, fieldID UniqueID) string {
-	return strconv.FormatInt(collectionID, 10) + "/" + strconv.FormatInt(fieldID, 10)
-}
-
-func (s *loadService) fieldsStatsKey2IDs(key string) (UniqueID, UniqueID, error) {
-	ids := strings.Split(key, "/")
-	if len(ids) != 2 {
-		return 0, 0, errors.New("illegal fieldsStatsKey")
-	}
-	collectionID, err := strconv.ParseInt(ids[0], 10, 64)
-	if err != nil {
-		return 0, 0, err
-	}
-	fieldID, err := strconv.ParseInt(ids[1], 10, 64)
-	if err != nil {
-		return 0, 0, err
+func (s *loadService) loadSegmentActively(wg *sync.WaitGroup) {
+	collectionIDs, partitionIDs, segmentIDs := s.segLoader.replica.getEnabledSealedSegmentsBySegmentType(segTypeGrowing)
+	if len(collectionIDs) <= 0 {
+		return
 	}
-	return collectionID, fieldID, nil
-}
-
-func (s *loadService) updateSegmentIndexStats(indexParams indexParam, indexName string, indexID UniqueID, l *loadIndex) error {
-	targetSegment, err := s.replica.getSegmentByID(l.segmentID)
-	if err != nil {
-		return err
-	}
-
-	fieldStatsKey := s.fieldsStatsIDs2Key(targetSegment.collectionID, l.fieldID)
-	_, ok := s.fieldIndexes[fieldStatsKey]
-	newIndexParams := make([]*commonpb.KeyValuePair, 0)
-	for k, v := range indexParams {
-		newIndexParams = append(newIndexParams, &commonpb.KeyValuePair{
-			Key:   k,
-			Value: v,
-		})
-	}
-
-	// sort index params by key
-	sort.Slice(newIndexParams, func(i, j int) bool { return newIndexParams[i].Key < newIndexParams[j].Key })
-	if !ok {
-		s.fieldIndexes[fieldStatsKey] = make([]*internalpb2.IndexStats, 0)
-		s.fieldIndexes[fieldStatsKey] = append(s.fieldIndexes[fieldStatsKey],
-			&internalpb2.IndexStats{
-				IndexParams:        newIndexParams,
-				NumRelatedSegments: 1,
-			})
-	} else {
-		isNewIndex := true
-		for _, index := range s.fieldIndexes[fieldStatsKey] {
-			if s.indexParamsEqual(newIndexParams, index.IndexParams) {
-				index.NumRelatedSegments++
-				isNewIndex = false
-			}
-		}
-		if isNewIndex {
-			s.fieldIndexes[fieldStatsKey] = append(s.fieldIndexes[fieldStatsKey],
-				&internalpb2.IndexStats{
-					IndexParams:        newIndexParams,
-					NumRelatedSegments: 1,
-				})
-		}
-	}
-	err = targetSegment.setIndexParam(l.fieldID, newIndexParams)
-	if err != nil {
-		return err
-	}
-	targetSegment.setIndexName(indexName)
-	targetSegment.setIndexID(indexID)
-
-	return nil
-}
-
-func (s *loadService) loadIndex(indexPath []string) ([][]byte, indexParam, string, UniqueID, error) {
-	index := make([][]byte, 0)
-
-	var indexParams indexParam
-	var indexName string
-	var indexID UniqueID
-	for _, p := range indexPath {
-		fmt.Println("load path = ", indexPath)
-		indexPiece, err := s.kv.Load(p)
+	fmt.Println("do load segment for growing segments:", segmentIDs)
+	for i := range collectionIDs {
+		fieldIDs, err := s.segLoader.replica.getFieldIDsByCollectionID(collectionIDs[i])
 		if err != nil {
-			return nil, nil, "", -1, err
-		}
-		// get index params when detecting indexParamPrefix
-		if path.Base(p) == storage.IndexParamsFile {
-			indexCodec := storage.NewIndexCodec()
-			_, indexParams, indexName, indexID, err = indexCodec.Deserialize([]*storage.Blob{
-				{
-					Key:   storage.IndexParamsFile,
-					Value: []byte(indexPiece),
-				},
-			})
-			if err != nil {
-				return nil, nil, "", -1, err
-			}
-		} else {
-			index = append(index, []byte(indexPiece))
+			log.Println(err)
+			continue
 		}
-	}
-
-	if len(indexParams) <= 0 {
-		return nil, nil, "", -1, errors.New("cannot find index param")
-	}
-	return index, indexParams, indexName, indexID, nil
-}
-
-func (s *loadService) updateSegmentIndex(indexParams indexParam, bytesIndex [][]byte, l *loadIndex) error {
-	segment, err := s.replica.getSegmentByID(l.segmentID)
-	if err != nil {
-		return err
-	}
-
-	loadIndexInfo, err := newLoadIndexInfo()
-	defer deleteLoadIndexInfo(loadIndexInfo)
-	if err != nil {
-		return err
-	}
-	err = loadIndexInfo.appendFieldInfo(l.fieldID)
-	if err != nil {
-		return err
-	}
-	for k, v := range indexParams {
-		err = loadIndexInfo.appendIndexParam(k, v)
+		err = s.loadSegmentInternal(collectionIDs[i], partitionIDs[i], segmentIDs[i], fieldIDs)
 		if err != nil {
-			return err
+			log.Println(err)
 		}
 	}
-	err = loadIndexInfo.appendIndex(bytesIndex, l.indexPaths)
+	// sendQueryNodeStats
+	err := s.segLoader.indexLoader.sendQueryNodeStats()
 	if err != nil {
-		return err
+		log.Println(err)
+		wg.Done()
+		return
 	}
-	return segment.updateSegmentIndex(loadIndexInfo)
-}
 
-func (s *loadService) sendQueryNodeStats() error {
-	resultFieldsStats := make([]*internalpb2.FieldStats, 0)
-	for fieldStatsKey, indexStats := range s.fieldIndexes {
-		colID, fieldID, err := s.fieldsStatsKey2IDs(fieldStatsKey)
-		if err != nil {
-			return err
-		}
-		fieldStats := internalpb2.FieldStats{
-			CollectionID: colID,
-			FieldID:      fieldID,
-			IndexStats:   indexStats,
-		}
-		resultFieldsStats = append(resultFieldsStats, &fieldStats)
-	}
-
-	s.fieldStatsChan <- resultFieldsStats
-	fmt.Println("sent field stats")
-	return nil
+	wg.Done()
 }
 
-func (s *loadService) checkIndexReady(indexParams indexParam, l *loadIndex) (bool, error) {
-	segment, err := s.replica.getSegmentByID(l.segmentID)
-	if err != nil {
-		return false, err
-	}
-	if !segment.matchIndexParam(l.fieldID, indexParams) {
-		return false, nil
-	}
-	return true, nil
-}
-
-func (s *loadService) getIndexInfo(collectionID UniqueID, segmentID UniqueID) (UniqueID, UniqueID, error) {
-	req := &milvuspb.DescribeSegmentRequest{
-		Base: &commonpb.MsgBase{
-			MsgType: commonpb.MsgType_kDescribeSegment,
-		},
-		CollectionID: collectionID,
-		SegmentID:    segmentID,
-	}
-	response, err := s.masterClient.DescribeSegment(req)
-	if err != nil {
-		return 0, 0, err
-	}
-	return response.IndexID, response.BuildID, nil
-}
-
-// -------------------------------------------- load segment -------------------------------------------- //
+// load segment passively
 func (s *loadService) loadSegment(collectionID UniqueID, partitionID UniqueID, segmentIDs []UniqueID, fieldIDs []int64) error {
 	// TODO: interim solution
 	if len(fieldIDs) == 0 {
-		collection, err := s.replica.getCollectionByID(collectionID)
+		var err error
+		fieldIDs, err = s.segLoader.replica.getFieldIDsByCollectionID(collectionID)
 		if err != nil {
 			return err
 		}
-		fieldIDs = make([]int64, 0)
-		for _, field := range collection.Schema().Fields {
-			fieldIDs = append(fieldIDs, field.FieldID)
-		}
 	}
 	for _, segmentID := range segmentIDs {
-		// we don't need index id yet
-		_, buildID, errIndex := s.getIndexInfo(collectionID, segmentID)
-		if errIndex == nil {
-			// we don't need load to vector fields
-			vectorFields, err := s.replica.getVecFieldsByCollectionID(segmentID)
-			if err != nil {
-				return err
-			}
-			fieldIDs = s.filterOutVectorFields(fieldIDs, vectorFields)
-		}
-		paths, srcFieldIDs, err := s.getInsertBinlogPaths(segmentID)
-		if err != nil {
-			return err
-		}
-
-		targetFields := s.getTargetFields(paths, srcFieldIDs, fieldIDs)
-		collection, err := s.replica.getCollectionByID(collectionID)
-		if err != nil {
-			return err
-		}
-		segment := newSegment(collection, segmentID, partitionID, collectionID, segTypeSealed)
-		err = s.loadSegmentFieldsData(segment, targetFields)
+		err := s.loadSegmentInternal(collectionID, partitionID, segmentID, fieldIDs)
 		if err != nil {
-			return err
-		}
-		if errIndex == nil {
-			indexPaths, err := s.getIndexPaths(buildID)
-			if err != nil {
-				return err
-			}
-			err = s.loadIndexImmediate(segment, indexPaths)
-			if err != nil {
-				// TODO: return or continue?
-				return err
-			}
+			log.Println(err)
+			continue
 		}
 	}
 	return nil
 }
 
-func (s *loadService) releaseSegment(segmentID UniqueID) error {
-	err := s.replica.removeSegment(segmentID)
-	return err
-}
-
-func (s *loadService) seekSegment(position *internalpb2.MsgPosition) error {
-	// TODO: open seek
-	//for _, position := range positions {
-	//	err := s.dmStream.Seek(position)
-	//	if err != nil {
-	//		return err
-	//	}
-	//}
-	return nil
-}
-
-func (s *loadService) getIndexPaths(indexBuildID UniqueID) ([]string, error) {
-	if s.indexClient == nil {
-		return nil, errors.New("null index service client")
-	}
-
-	indexFilePathRequest := &indexpb.IndexFilePathsRequest{
-		// TODO: rename indexIDs to buildIDs
-		IndexBuildIDs: []UniqueID{indexBuildID},
-	}
-	pathResponse, err := s.indexClient.GetIndexFilePaths(indexFilePathRequest)
-	if err != nil || pathResponse.Status.ErrorCode != commonpb.ErrorCode_SUCCESS {
-		return nil, err
-	}
-
-	if len(pathResponse.FilePaths) <= 0 {
-		return nil, errors.New("illegal index file paths")
-	}
-
-	return pathResponse.FilePaths[0].IndexFilePaths, nil
-}
-
-func (s *loadService) loadIndexImmediate(segment *Segment, indexPaths []string) error {
-	// get vector field ids from schema to load index
-	vecFieldIDs, err := s.replica.getVecFieldsByCollectionID(segment.collectionID)
-	if err != nil {
-		return err
-	}
-	for _, id := range vecFieldIDs {
-		l := &loadIndex{
-			segmentID:  segment.ID(),
-			fieldID:    id,
-			indexPaths: indexPaths,
-		}
-
-		err = s.execute(l)
-		if err != nil {
-			return err
-		}
-		// replace segment
-		err = s.replica.replaceGrowingSegmentBySealedSegment(segment)
+func (s *loadService) loadSegmentInternal(collectionID UniqueID, partitionID UniqueID, segmentID UniqueID, fieldIDs []int64) error {
+	// we don't need index id yet
+	_, buildID, errIndex := s.segLoader.indexLoader.getIndexInfo(collectionID, segmentID)
+	if errIndex == nil {
+		// we don't need load to vector fields
+		vectorFields, err := s.segLoader.replica.getVecFieldIDsByCollectionID(segmentID)
 		if err != nil {
 			return err
 		}
+		fieldIDs = s.segLoader.filterOutVectorFields(fieldIDs, vectorFields)
 	}
-	return nil
-}
-
-func (s *loadService) loadIndexDelayed(collectionID, segmentID UniqueID, indexPaths []string) error {
-	// get vector field ids from schema to load index
-	vecFieldIDs, err := s.replica.getVecFieldsByCollectionID(collectionID)
+	paths, srcFieldIDs, err := s.segLoader.getInsertBinlogPaths(segmentID)
 	if err != nil {
 		return err
 	}
-	for _, id := range vecFieldIDs {
-		l := &loadIndex{
-			segmentID:  segmentID,
-			fieldID:    id,
-			indexPaths: indexPaths,
-		}
-
-		err = s.execute(l)
-		if err != nil {
-			return err
-		}
-	}
-
-	return nil
-}
-
-func (s *loadService) getInsertBinlogPaths(segmentID UniqueID) ([]*internalpb2.StringList, []int64, error) {
-	if s.dataClient == nil {
-		return nil, nil, errors.New("null data service client")
-	}
-
-	insertBinlogPathRequest := &datapb.InsertBinlogPathRequest{
-		SegmentID: segmentID,
-	}
 
-	pathResponse, err := s.dataClient.GetInsertBinlogPaths(insertBinlogPathRequest)
+	targetFields := s.segLoader.getTargetFields(paths, srcFieldIDs, fieldIDs)
+	collection, err := s.segLoader.replica.getCollectionByID(collectionID)
 	if err != nil {
-		return nil, nil, err
-	}
-
-	if len(pathResponse.FieldIDs) != len(pathResponse.Paths) {
-		return nil, nil, errors.New("illegal InsertBinlogPathsResponse")
-	}
-
-	return pathResponse.Paths, pathResponse.FieldIDs, nil
-}
-
-func (s *loadService) filterOutVectorFields(fieldIDs []int64, vectorFields []int64) []int64 {
-	containsFunc := func(s []int64, e int64) bool {
-		for _, a := range s {
-			if a == e {
-				return true
-			}
-		}
-		return false
-	}
-	targetFields := make([]int64, 0)
-	for _, id := range fieldIDs {
-		if !containsFunc(vectorFields, id) {
-			targetFields = append(targetFields, id)
-		}
-	}
-	return targetFields
-}
-
-func (s *loadService) getTargetFields(paths []*internalpb2.StringList, srcFieldIDS []int64, dstFields []int64) map[int64]*internalpb2.StringList {
-	targetFields := make(map[int64]*internalpb2.StringList)
-
-	containsFunc := func(s []int64, e int64) bool {
-		for _, a := range s {
-			if a == e {
-				return true
-			}
-		}
-		return false
+		return err
 	}
-
-	for i, fieldID := range srcFieldIDS {
-		if containsFunc(dstFields, fieldID) {
-			targetFields[fieldID] = paths[i]
-		}
+	segment := newSegment(collection, segmentID, partitionID, collectionID, segTypeSealed)
+	err = s.segLoader.loadSegmentFieldsData(segment, targetFields)
+	if err != nil {
+		return err
 	}
-
-	return targetFields
-}
-
-func (s *loadService) loadSegmentFieldsData(segment *Segment, targetFields map[int64]*internalpb2.StringList) error {
-	for id, p := range targetFields {
-		if id == timestampFieldID {
-			// seg core doesn't need timestamp field
-			continue
-		}
-
-		paths := p.Values
-		blobs := make([]*storage.Blob, 0)
-		for _, path := range paths {
-			binLog, err := s.kv.Load(path)
-			if err != nil {
-				// TODO: return or continue?
-				return err
-			}
-			blobs = append(blobs, &storage.Blob{
-				Key:   strconv.FormatInt(id, 10), // TODO: key???
-				Value: []byte(binLog),
-			})
-		}
-		_, _, insertData, err := s.iCodec.Deserialize(blobs)
+	if errIndex == nil {
+		indexPaths, err := s.segLoader.indexLoader.getIndexPaths(buildID)
 		if err != nil {
-			// TODO: return or continue
 			return err
 		}
-		if len(insertData.Data) != 1 {
-			return errors.New("we expect only one field in deserialized insert data")
-		}
-
-		for _, value := range insertData.Data {
-			var numRows int
-			var data interface{}
-
-			switch fieldData := value.(type) {
-			case *storage.BoolFieldData:
-				numRows = fieldData.NumRows
-				data = fieldData.Data
-			case *storage.Int8FieldData:
-				numRows = fieldData.NumRows
-				data = fieldData.Data
-			case *storage.Int16FieldData:
-				numRows = fieldData.NumRows
-				data = fieldData.Data
-			case *storage.Int32FieldData:
-				numRows = fieldData.NumRows
-				data = fieldData.Data
-			case *storage.Int64FieldData:
-				numRows = fieldData.NumRows
-				data = fieldData.Data
-			case *storage.FloatFieldData:
-				numRows = fieldData.NumRows
-				data = fieldData.Data
-			case *storage.DoubleFieldData:
-				numRows = fieldData.NumRows
-				data = fieldData.Data
-			case storage.StringFieldData:
-				numRows = fieldData.NumRows
-				data = fieldData.Data
-			case *storage.FloatVectorFieldData:
-				numRows = fieldData.NumRows
-				data = fieldData.Data
-			case *storage.BinaryVectorFieldData:
-				numRows = fieldData.NumRows
-				data = fieldData.Data
-			default:
-				return errors.New("unexpected field data type")
-			}
-			err = segment.segmentLoadFieldData(id, numRows, data)
-			if err != nil {
-				// TODO: return or continue?
-				return err
-			}
+		err = s.segLoader.indexLoader.loadIndexImmediate(segment, indexPaths)
+		if err != nil {
+			return err
 		}
 	}
-
-	return nil
+	// replace segment
+	return s.segLoader.replica.replaceGrowingSegmentBySealedSegment(segment)
 }
 
 func newLoadService(ctx context.Context, masterClient MasterServiceInterface, dataClient DataServiceInterface, indexClient IndexServiceInterface, replica collectionReplica, dmStream msgstream.MsgStream) *loadService {
 	ctx1, cancel := context.WithCancel(ctx)
 
-	option := &minioKV.Option{
-		Address:           Params.MinioEndPoint,
-		AccessKeyID:       Params.MinioAccessKeyID,
-		SecretAccessKeyID: Params.MinioSecretAccessKey,
-		UseSSL:            Params.MinioUseSSLStr,
-		CreateBucket:      true,
-		BucketName:        Params.MinioBucketName,
-	}
-
-	client, err := minioKV.NewMinIOKV(ctx1, option)
-	if err != nil {
-		panic(err)
-	}
+	segLoader := newSegmentLoader(ctx1, masterClient, indexClient, dataClient, replica, dmStream)
 
 	return &loadService{
 		ctx:    ctx1,
 		cancel: cancel,
 
-		replica: replica,
-
-		fieldIndexes:   make(map[string][]*internalpb2.IndexStats),
-		fieldStatsChan: make(chan []*internalpb2.FieldStats, 1),
-
-		dmStream: dmStream,
-
-		masterClient: masterClient,
-		dataClient:   dataClient,
-		indexClient:  indexClient,
-
-		kv:     client,
-		iCodec: &storage.InsertCodec{},
+		segLoader: segLoader,
 	}
 }
diff --git a/internal/querynode/load_service_test.go b/internal/querynode/load_service_test.go
index 4bcd10052..247c76b3c 100644
--- a/internal/querynode/load_service_test.go
+++ b/internal/querynode/load_service_test.go
@@ -1126,19 +1126,19 @@ func TestSegmentLoad_Search_Vector(t *testing.T) {
 	paths, srcFieldIDs, err := generateInsertBinLog(collectionID, partitionID, segmentID, keyPrefix)
 	assert.NoError(t, err)
 
-	fieldsMap := node.loadService.getTargetFields(paths, srcFieldIDs, fieldIDs)
+	fieldsMap := node.loadService.segLoader.getTargetFields(paths, srcFieldIDs, fieldIDs)
 	assert.Equal(t, len(fieldsMap), 2)
 
 	segment, err := node.replica.getSegmentByID(segmentID)
 	assert.NoError(t, err)
 
-	err = node.loadService.loadSegmentFieldsData(segment, fieldsMap)
+	err = node.loadService.segLoader.loadSegmentFieldsData(segment, fieldsMap)
 	assert.NoError(t, err)
 
 	indexPaths, err := generateIndex(segmentID)
 	assert.NoError(t, err)
 
-	err = node.loadService.loadIndexImmediate(segment, indexPaths)
+	err = node.loadService.segLoader.indexLoader.loadIndexImmediate(segment, indexPaths)
 	assert.NoError(t, err)
 
 	// do search
diff --git a/internal/querynode/partition.go b/internal/querynode/partition.go
index ee6abe3ba..787d6fab1 100644
--- a/internal/querynode/partition.go
+++ b/internal/querynode/partition.go
@@ -16,7 +16,7 @@ type Partition struct {
 	collectionID UniqueID
 	partitionID  UniqueID
 	segmentIDs   []UniqueID
-	enableDM     bool
+	enable       bool
 }
 
 func (p *Partition) ID() UniqueID {
@@ -41,7 +41,7 @@ func newPartition(collectionID UniqueID, partitionID UniqueID) *Partition {
 	var newPartition = &Partition{
 		collectionID: collectionID,
 		partitionID:  partitionID,
-		enableDM:     false,
+		enable:       false,
 	}
 
 	return newPartition
diff --git a/internal/querynode/query_node.go b/internal/querynode/query_node.go
index 3bf6c5abc..dedf615a7 100644
--- a/internal/querynode/query_node.go
+++ b/internal/querynode/query_node.go
@@ -148,7 +148,7 @@ func (node *QueryNode) Start() error {
 	node.searchService = newSearchService(node.queryNodeLoopCtx, node.replica)
 	node.metaService = newMetaService(node.queryNodeLoopCtx, node.replica)
 	node.loadService = newLoadService(node.queryNodeLoopCtx, node.masterClient, node.dataClient, node.indexClient, node.replica, node.dataSyncService.dmStream)
-	node.statsService = newStatsService(node.queryNodeLoopCtx, node.replica, node.loadService.fieldStatsChan)
+	node.statsService = newStatsService(node.queryNodeLoopCtx, node.replica, node.loadService.segLoader.indexLoader.fieldStatsChan)
 
 	// start services
 	go node.dataSyncService.start()
@@ -382,7 +382,7 @@ func (node *QueryNode) LoadSegments(in *queryPb.LoadSegmentRequest) (*commonpb.S
 	segmentIDs := in.SegmentIDs
 	fieldIDs := in.FieldIDs
 
-	err := node.replica.enablePartitionDM(partitionID)
+	err := node.replica.enablePartition(partitionID)
 	if err != nil {
 		status := &commonpb.Status{
 			ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
@@ -395,7 +395,7 @@ func (node *QueryNode) LoadSegments(in *queryPb.LoadSegmentRequest) (*commonpb.S
 	for i, state := range in.SegmentStates {
 		if state.State == commonpb.SegmentState_SegmentGrowing {
 			position := state.StartPosition
-			err = node.loadService.seekSegment(position)
+			err = node.loadService.segLoader.seekSegment(position)
 			if err != nil {
 				status := &commonpb.Status{
 					ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
@@ -423,7 +423,7 @@ func (node *QueryNode) LoadSegments(in *queryPb.LoadSegmentRequest) (*commonpb.S
 
 func (node *QueryNode) ReleaseSegments(in *queryPb.ReleaseSegmentRequest) (*commonpb.Status, error) {
 	for _, id := range in.PartitionIDs {
-		err := node.replica.enablePartitionDM(id)
+		err := node.replica.enablePartition(id)
 		if err != nil {
 			status := &commonpb.Status{
 				ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
@@ -435,7 +435,7 @@ func (node *QueryNode) ReleaseSegments(in *queryPb.ReleaseSegmentRequest) (*comm
 
 	// release all fields in the segments
 	for _, id := range in.SegmentIDs {
-		err := node.loadService.releaseSegment(id)
+		err := node.loadService.segLoader.releaseSegment(id)
 		if err != nil {
 			status := &commonpb.Status{
 				ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
diff --git a/internal/querynode/segment_loader.go b/internal/querynode/segment_loader.go
new file mode 100644
index 000000000..883c96e17
--- /dev/null
+++ b/internal/querynode/segment_loader.go
@@ -0,0 +1,211 @@
+package querynode
+
+import (
+	"context"
+	"errors"
+	"strconv"
+
+	"github.com/zilliztech/milvus-distributed/internal/kv"
+	minioKV "github.com/zilliztech/milvus-distributed/internal/kv/minio"
+	"github.com/zilliztech/milvus-distributed/internal/msgstream"
+	"github.com/zilliztech/milvus-distributed/internal/proto/datapb"
+	"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
+	"github.com/zilliztech/milvus-distributed/internal/storage"
+)
+
+// segmentLoader is only responsible for loading the field data from binlog
+type segmentLoader struct {
+	replica collectionReplica
+
+	dmStream msgstream.MsgStream
+
+	dataClient DataServiceInterface
+
+	kv     kv.Base // minio kv
+	iCodec *storage.InsertCodec
+
+	indexLoader *indexLoader
+}
+
+func (loader *segmentLoader) releaseSegment(segmentID UniqueID) error {
+	err := loader.replica.removeSegment(segmentID)
+	return err
+}
+
+func (loader *segmentLoader) seekSegment(position *internalpb2.MsgPosition) error {
+	// TODO: open seek
+	//for _, position := range positions {
+	//	err := s.dmStream.Seek(position)
+	//	if err != nil {
+	//		return err
+	//	}
+	//}
+	return nil
+}
+
+func (loader *segmentLoader) getInsertBinlogPaths(segmentID UniqueID) ([]*internalpb2.StringList, []int64, error) {
+	if loader.dataClient == nil {
+		return nil, nil, errors.New("null data service client")
+	}
+
+	insertBinlogPathRequest := &datapb.InsertBinlogPathRequest{
+		SegmentID: segmentID,
+	}
+
+	pathResponse, err := loader.dataClient.GetInsertBinlogPaths(insertBinlogPathRequest)
+	if err != nil {
+		return nil, nil, err
+	}
+
+	if len(pathResponse.FieldIDs) != len(pathResponse.Paths) {
+		return nil, nil, errors.New("illegal InsertBinlogPathsResponse")
+	}
+
+	return pathResponse.Paths, pathResponse.FieldIDs, nil
+}
+
+func (loader *segmentLoader) filterOutVectorFields(fieldIDs []int64, vectorFields []int64) []int64 {
+	containsFunc := func(s []int64, e int64) bool {
+		for _, a := range s {
+			if a == e {
+				return true
+			}
+		}
+		return false
+	}
+	targetFields := make([]int64, 0)
+	for _, id := range fieldIDs {
+		if !containsFunc(vectorFields, id) {
+			targetFields = append(targetFields, id)
+		}
+	}
+	return targetFields
+}
+
+func (loader *segmentLoader) getTargetFields(paths []*internalpb2.StringList, srcFieldIDS []int64, dstFields []int64) map[int64]*internalpb2.StringList {
+	targetFields := make(map[int64]*internalpb2.StringList)
+
+	containsFunc := func(s []int64, e int64) bool {
+		for _, a := range s {
+			if a == e {
+				return true
+			}
+		}
+		return false
+	}
+
+	for i, fieldID := range srcFieldIDS {
+		if containsFunc(dstFields, fieldID) {
+			targetFields[fieldID] = paths[i]
+		}
+	}
+
+	return targetFields
+}
+
+func (loader *segmentLoader) loadSegmentFieldsData(segment *Segment, targetFields map[int64]*internalpb2.StringList) error {
+	for id, p := range targetFields {
+		if id == timestampFieldID {
+			// seg core doesn't need timestamp field
+			continue
+		}
+
+		paths := p.Values
+		blobs := make([]*storage.Blob, 0)
+		for _, path := range paths {
+			binLog, err := loader.kv.Load(path)
+			if err != nil {
+				// TODO: return or continue?
+				return err
+			}
+			blobs = append(blobs, &storage.Blob{
+				Key:   strconv.FormatInt(id, 10), // TODO: key???
+				Value: []byte(binLog),
+			})
+		}
+		_, _, insertData, err := loader.iCodec.Deserialize(blobs)
+		if err != nil {
+			// TODO: return or continue
+			return err
+		}
+		if len(insertData.Data) != 1 {
+			return errors.New("we expect only one field in deserialized insert data")
+		}
+
+		for _, value := range insertData.Data {
+			var numRows int
+			var data interface{}
+
+			switch fieldData := value.(type) {
+			case *storage.BoolFieldData:
+				numRows = fieldData.NumRows
+				data = fieldData.Data
+			case *storage.Int8FieldData:
+				numRows = fieldData.NumRows
+				data = fieldData.Data
+			case *storage.Int16FieldData:
+				numRows = fieldData.NumRows
+				data = fieldData.Data
+			case *storage.Int32FieldData:
+				numRows = fieldData.NumRows
+				data = fieldData.Data
+			case *storage.Int64FieldData:
+				numRows = fieldData.NumRows
+				data = fieldData.Data
+			case *storage.FloatFieldData:
+				numRows = fieldData.NumRows
+				data = fieldData.Data
+			case *storage.DoubleFieldData:
+				numRows = fieldData.NumRows
+				data = fieldData.Data
+			case storage.StringFieldData:
+				numRows = fieldData.NumRows
+				data = fieldData.Data
+			case *storage.FloatVectorFieldData:
+				numRows = fieldData.NumRows
+				data = fieldData.Data
+			case *storage.BinaryVectorFieldData:
+				numRows = fieldData.NumRows
+				data = fieldData.Data
+			default:
+				return errors.New("unexpected field data type")
+			}
+			err = segment.segmentLoadFieldData(id, numRows, data)
+			if err != nil {
+				// TODO: return or continue?
+				return err
+			}
+		}
+	}
+	return nil
+}
+
+func newSegmentLoader(ctx context.Context, masterClient MasterServiceInterface, indexClient IndexServiceInterface, dataClient DataServiceInterface, replica collectionReplica, dmStream msgstream.MsgStream) *segmentLoader {
+	option := &minioKV.Option{
+		Address:           Params.MinioEndPoint,
+		AccessKeyID:       Params.MinioAccessKeyID,
+		SecretAccessKeyID: Params.MinioSecretAccessKey,
+		UseSSL:            Params.MinioUseSSLStr,
+		CreateBucket:      true,
+		BucketName:        Params.MinioBucketName,
+	}
+
+	client, err := minioKV.NewMinIOKV(ctx, option)
+	if err != nil {
+		panic(err)
+	}
+
+	iLoader := newIndexLoader(ctx, masterClient, indexClient, replica)
+	return &segmentLoader{
+		replica: replica,
+
+		dmStream: dmStream,
+
+		dataClient: dataClient,
+
+		kv:     client,
+		iCodec: &storage.InsertCodec{},
+
+		indexLoader: iLoader,
+	}
+}
-- 
GitLab