From dd6c4bfb75e901a0862d03f077ccd29b19bf770e Mon Sep 17 00:00:00 2001
From: GuoRentong <rentong.guo@zilliz.com>
Date: Mon, 18 Jan 2021 10:09:17 +0800
Subject: [PATCH] Update doc:interface definitions related to LoadCollection

Signed-off-by: GuoRentong <rentong.guo@zilliz.com>
---
 .../developer_guides/chap04_message_stream.md |  11 +-
 docs/developer_guides/chap06_master.md        |  37 +++++
 docs/developer_guides/chap07_query_service.md |   6 +-
 docs/developer_guides/chap09_data_service.md  |   4 +-
 internal/querynode/api.go                     |  16 ++-
 internal/querynode/query_node.go              |   2 +
 internal/querynode/segment_manager.go         | 126 ++++++++++++++++++
 internal/writenode/data_sync_service_test.go  |   6 +-
 .../flow_graph_insert_buffer_node.go          |  72 ++++------
 .../flow_graph_insert_buffer_node_test.go     |  54 +++++---
 10 files changed, 264 insertions(+), 70 deletions(-)
 create mode 100644 internal/querynode/segment_manager.go

diff --git a/docs/developer_guides/chap04_message_stream.md b/docs/developer_guides/chap04_message_stream.md
index cd0a227b1..3aa66a0c4 100644
--- a/docs/developer_guides/chap04_message_stream.md
+++ b/docs/developer_guides/chap04_message_stream.md
@@ -96,17 +96,26 @@ type TsMsg interface {
   Unmarshal([]byte) *TsMsg
 }
 
+type MsgPosition {
+  ChannelName string
+  MsgID string
+  TimestampFilter Timestamp
+}
+
 type MsgPack struct {
   BeginTs Timestamp
   EndTs Timestamp
   Msgs []TsMsg
+  StartPositions []MsgPosition
+  EndPositions []MsgPosition
 }
 
-
 type MsgStream interface {
   Produce(*MsgPack) error
   Broadcast(*MsgPack) error
   Consume() *MsgPack // message can be consumed exactly once
+  ShowChannelNames() []string
+  Seek(offset MsgPosition) error
 }
 
 type RepackFunc(msgs []* TsMsg, hashKeys [][]int32) map[int32] *MsgPack
diff --git a/docs/developer_guides/chap06_master.md b/docs/developer_guides/chap06_master.md
index d700b51a2..26152d8bf 100644
--- a/docs/developer_guides/chap06_master.md
+++ b/docs/developer_guides/chap06_master.md
@@ -23,6 +23,9 @@ type Master interface {
   GetPartitionStatistics(req PartitionStatsRequest) (PartitionStatsResponse, error)
   ShowPartitions(req ShowPartitionRequest) (ShowPartitionResponse, error)
   
+  DescribeSegment(req DescribeSegmentRequest) (DescribeSegmentResponse, error)
+  ShowSegments(req ShowSegmentRequest) (ShowSegmentResponse, error)
+  
   CreateIndex(req CreateIndexRequest) error
   DescribeIndex(DescribeIndexRequest) (DescribeIndexResponse, error)
   
@@ -87,6 +90,10 @@ type DescribeCollectionRequest struct {
 }
 
 type DescribeCollectionResponse struct {
+  DbID UniqueID
+  CollectionID UniqueID
+  DefaultPartitionName string
+  DefaultPartitionID UniqueID
   Schema []bytes
 }
 ```
@@ -115,6 +122,7 @@ type ShowCollectionRequest struct {
 
 type ShowCollectionResponse struct {
   CollectionNames []string
+  CollectionIDs []UniqueID
 }
 ```
 
@@ -176,10 +184,39 @@ type ShowPartitionRequest struct {
 }
 
 type ShowPartitionResponse struct {
+  PartitionIDs []UniqueID
   PartitionNames []string
 }
 ```
 
+* DescribeSegment
+
+```go
+type DescribeSegmentRequest struct {
+  MsgBase
+  CollectionID UniqueID
+  SegmentID UniqueID
+}
+
+type DescribeSegmentResponse struct {
+  IndexID UniqueID
+}
+```
+
+* ShowSegments
+
+```go
+type ShowSegmentRequest struct {
+  MsgBase
+  CollectionID UniqueID
+  PartitionID UniqueID
+}
+
+type ShowSegmentResponse struct {
+  SegmentIDs []UniqueID
+}
+```
+
 * *CreateIndex*
 
 ```go
diff --git a/docs/developer_guides/chap07_query_service.md b/docs/developer_guides/chap07_query_service.md
index 5d7a25df8..9b0fb086d 100644
--- a/docs/developer_guides/chap07_query_service.md
+++ b/docs/developer_guides/chap07_query_service.md
@@ -4,7 +4,7 @@
 
 #### 8.1 Overview
 
-<img src="./figs/query_service.png" width=700>
+<img src="./figs/query_service.png" width=500>
 
 
 
@@ -230,6 +230,8 @@ type RemoveQueryChannelRequest struct {
 ```go
 type WatchDmChannelRequest struct {
   InsertChannelNames []string
+  StartSegment UniqueID
+  //FieldIDs []int64
 }
 ```
 
@@ -242,7 +244,7 @@ type LoadSegmentRequest struct {
   CollectionID UniqueID
   PartitionID UniqueID
   SegmentIDs []UniqueID
-  FieldIDs []int64
+  //FieldIDs []int64
 }
 ```
 
diff --git a/docs/developer_guides/chap09_data_service.md b/docs/developer_guides/chap09_data_service.md
index 6406dd911..3748ff19d 100644
--- a/docs/developer_guides/chap09_data_service.md
+++ b/docs/developer_guides/chap09_data_service.md
@@ -129,8 +129,10 @@ type SegmentStatesRequest struct {
 
 type SegmentStatesResponse struct {
   State SegmentState
-  CreateTime Timestamp
+  OpenTime Timestamp
   SealedTime Timestamp
+  MsgStartPositions []msgstream.MsgPosition
+  MsgEndPositions []msgstream.MsgPosition
 }
 ```
 
diff --git a/internal/querynode/api.go b/internal/querynode/api.go
index 0ecd451d1..92f5f520f 100644
--- a/internal/querynode/api.go
+++ b/internal/querynode/api.go
@@ -146,7 +146,21 @@ func (node *QueryNode) WatchDmChannels(in *queryPb.WatchDmChannelsRequest) (*com
 }
 
 func (node *QueryNode) LoadSegments(in *queryPb.LoadSegmentRequest) (*commonpb.Status, error) {
-	// TODO: implement
+	// TODO: support db
+	for _, segmentID := range in.SegmentIDs {
+		hasBeenBuiltIndex := segmentID > 0 // TODO: ???
+		indexID := UniqueID(0)             // TODO: ???
+		err := node.segManager.loadSegment(segmentID, hasBeenBuiltIndex, indexID, in.FieldIDs)
+		if err != nil {
+			// TODO: return or continue
+			status := &commonpb.Status{
+				ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
+				Reason:    err.Error(),
+			}
+			return status, err
+		}
+	}
+
 	return nil, nil
 }
 
diff --git a/internal/querynode/query_node.go b/internal/querynode/query_node.go
index 9fc8111fb..d3426d88b 100644
--- a/internal/querynode/query_node.go
+++ b/internal/querynode/query_node.go
@@ -53,6 +53,8 @@ type QueryNode struct {
 	loadIndexService *loadIndexService
 	statsService     *statsService
 
+	segManager *segmentManager
+
 	//opentracing
 	tracer opentracing.Tracer
 	closer io.Closer
diff --git a/internal/querynode/segment_manager.go b/internal/querynode/segment_manager.go
new file mode 100644
index 000000000..d59b537c4
--- /dev/null
+++ b/internal/querynode/segment_manager.go
@@ -0,0 +1,126 @@
+package querynode
+
+import (
+	"context"
+	"errors"
+	"fmt"
+
+	"github.com/zilliztech/milvus-distributed/internal/kv"
+	"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
+	"github.com/zilliztech/milvus-distributed/internal/proto/datapb"
+	"github.com/zilliztech/milvus-distributed/internal/proto/indexpb"
+	queryPb "github.com/zilliztech/milvus-distributed/internal/proto/querypb"
+	"github.com/zilliztech/milvus-distributed/internal/querynode/client"
+	"github.com/zilliztech/milvus-distributed/internal/storage"
+)
+
+type segmentManager struct {
+	replica collectionReplica
+
+	// TODO: replace by client instead of grpc client
+	dataClient         datapb.DataServiceClient
+	indexBuilderClient indexpb.IndexServiceClient
+
+	queryNodeClient *client.Client
+	kv              kv.Base // minio kv
+	iCodec          storage.InsertCodec
+}
+
+func (s *segmentManager) loadSegment(segmentID UniqueID, hasBeenBuiltIndex bool, indexID UniqueID, vecFieldIDs []int64) error {
+	// 1. load segment
+	req := &datapb.InsertBinlogPathRequest{
+		SegmentID: segmentID,
+	}
+
+	pathResponse, err := s.dataClient.GetInsertBinlogPaths(context.TODO(), req)
+	if err != nil {
+		return err
+	}
+
+	if len(pathResponse.FieldIDs) != len(pathResponse.Paths) {
+		return errors.New("illegal InsertBinlogPathsResponse")
+	}
+
+	for fieldID, i := range pathResponse.FieldIDs {
+		paths := pathResponse.Paths[i].Values
+		blobs := make([]*storage.Blob, 0)
+		for _, path := range paths {
+			binLog, err := s.kv.Load(path)
+			if err != nil {
+				// TODO: return or continue?
+				return err
+			}
+			blobs = append(blobs, &storage.Blob{
+				Key:   "", // TODO: key???
+				Value: []byte(binLog),
+			})
+		}
+		_, _, insertData, err := s.iCodec.Deserialize(blobs)
+		if err != nil {
+			// TODO: return or continue
+			return err
+		}
+		if len(insertData.Data) != 1 {
+			return errors.New("we expect only one field in deserialized insert data")
+		}
+
+		for _, value := range insertData.Data {
+			switch fieldData := value.(type) {
+			case storage.BoolFieldData:
+				numRows := fieldData.NumRows
+				data := fieldData.Data
+				fmt.Println(numRows, data, fieldID)
+				// TODO: s.replica.addSegment()
+			case storage.Int8FieldData:
+				// TODO: s.replica.addSegment()
+			case storage.Int16FieldData:
+				// TODO: s.replica.addSegment()
+			case storage.Int32FieldData:
+				// TODO: s.replica.addSegment()
+			case storage.Int64FieldData:
+				// TODO: s.replica.addSegment()
+			case storage.FloatFieldData:
+				// TODO: s.replica.addSegment()
+			case storage.DoubleFieldData:
+				// TODO: s.replica.addSegment()
+			default:
+				// TODO: what if the index has not been built ?
+				// does the info from hasBeenBuiltIndex is synced with the dataService?
+				return errors.New("unsupported field data type")
+			}
+		}
+	}
+
+	// 2. load index
+	// does the info from hasBeenBuiltIndex is synced with the dataService?
+	if !hasBeenBuiltIndex {
+		req := &indexpb.IndexFilePathRequest{
+			IndexID: indexID,
+		}
+		pathResponse, err := s.indexBuilderClient.GetIndexFilePaths(context.TODO(), req)
+		if err != nil || pathResponse.Status.ErrorCode != commonpb.ErrorCode_SUCCESS {
+			return err
+		}
+		targetSegment, err := s.replica.getSegmentByID(segmentID)
+		if err != nil {
+			return err
+		}
+		for _, vecFieldID := range vecFieldIDs {
+			targetIndexParam, ok := targetSegment.indexParam[vecFieldID]
+			if !ok {
+				return errors.New(fmt.Sprint("cannot found index params in segment ", segmentID, " with field = ", vecFieldID))
+			}
+			err := s.queryNodeClient.LoadIndex(pathResponse.IndexFilePaths, segmentID, vecFieldID, "", targetIndexParam)
+			if err != nil {
+				return err
+			}
+		}
+	}
+	return nil
+}
+
+func (s *segmentManager) releaseSegment(in *queryPb.ReleaseSegmentRequest) error {
+	// TODO: implement
+	// TODO: release specific field, we need segCore supply relevant interface
+	return nil
+}
diff --git a/internal/writenode/data_sync_service_test.go b/internal/writenode/data_sync_service_test.go
index 1ca9149d8..f1af78575 100644
--- a/internal/writenode/data_sync_service_test.go
+++ b/internal/writenode/data_sync_service_test.go
@@ -130,7 +130,7 @@ func TestDataSyncService_Start(t *testing.T) {
 			InsertRequest: internalPb.InsertRequest{
 				MsgType:        commonpb.MsgType_kInsert,
 				ReqID:          UniqueID(0),
-				CollectionName: "coll1",
+				CollectionName: "col1",
 				PartitionTag:   "default",
 				SegmentID:      UniqueID(1),
 				ChannelID:      UniqueID(0),
@@ -206,7 +206,7 @@ func TestDataSyncService_Start(t *testing.T) {
 	<-ctx.Done()
 }
 
-func newMeta() {
+func newMeta() *etcdpb.CollectionMeta {
 	ETCDAddr := Params.EtcdAddress
 	MetaRootPath := Params.MetaRootPath
 
@@ -375,4 +375,6 @@ func newMeta() {
 	segBytes := proto.MarshalTextString(&segSch)
 	kvClient.Save("/segment/"+strconv.FormatInt(segSch.SegmentID, 10), segBytes)
 
+	return &collection
+
 }
diff --git a/internal/writenode/flow_graph_insert_buffer_node.go b/internal/writenode/flow_graph_insert_buffer_node.go
index c95c0d016..3568822c5 100644
--- a/internal/writenode/flow_graph_insert_buffer_node.go
+++ b/internal/writenode/flow_graph_insert_buffer_node.go
@@ -127,24 +127,8 @@ func (ibNode *insertBufferNode) Operate(in []*Msg) []*Msg {
 			}
 		}
 
-		// Timestamps
-		_, ok = idata.Data[1].(*storage.Int64FieldData)
-		if !ok {
-			idata.Data[1] = &storage.Int64FieldData{
-				Data:    []int64{},
-				NumRows: 0,
-			}
-		}
-		tsData := idata.Data[1].(*storage.Int64FieldData)
-		for _, ts := range msg.Timestamps {
-			tsData.Data = append(tsData.Data, int64(ts))
-		}
-		tsData.NumRows += len(msg.Timestamps)
-		span.LogFields(oplog.Int("tsData numRows", tsData.NumRows))
-
 		// 1.1 Get CollectionMeta from etcd
 		collection, err := ibNode.replica.getCollectionByName(collectionName)
-		//collSchema, err := ibNode.getCollectionSchemaByName(collectionName)
 		if err != nil {
 			// GOOSE TODO add error handler
 			log.Println("bbb, Get meta wrong:", err)
@@ -183,18 +167,20 @@ func (ibNode *insertBufferNode) Operate(in []*Msg) []*Msg {
 
 				fieldData := idata.Data[field.FieldID].(*storage.FloatVectorFieldData)
 
+				var offset int
 				for _, blob := range msg.RowData {
+					offset = 0
 					for j := 0; j < dim; j++ {
 						var v float32
-						buf := bytes.NewBuffer(blob.GetValue()[pos:])
+						buf := bytes.NewBuffer(blob.GetValue()[pos+offset:])
 						if err := binary.Read(buf, binary.LittleEndian, &v); err != nil {
 							log.Println("binary.read float32 err:", err)
 						}
 						fieldData.Data = append(fieldData.Data, v)
-						pos += int(unsafe.Sizeof(*(&v)))
+						offset += int(unsafe.Sizeof(*(&v)))
 					}
 				}
-
+				pos += offset
 				fieldData.NumRows += len(msg.RowIDs)
 
 			case schemapb.DataType_VECTOR_BINARY:
@@ -222,13 +208,15 @@ func (ibNode *insertBufferNode) Operate(in []*Msg) []*Msg {
 				}
 				fieldData := idata.Data[field.FieldID].(*storage.BinaryVectorFieldData)
 
+				var offset int
 				for _, blob := range msg.RowData {
-					bv := blob.GetValue()[pos : pos+(dim/8)]
+					bv := blob.GetValue()[pos+offset : pos+(dim/8)]
 					fieldData.Data = append(fieldData.Data, bv...)
-					pos += len(bv)
+					offset = len(bv)
 				}
-
+				pos += offset
 				fieldData.NumRows += len(msg.RowData)
+
 			case schemapb.DataType_BOOL:
 				if _, ok := idata.Data[field.FieldID]; !ok {
 					idata.Data[field.FieldID] = &storage.BoolFieldData{
@@ -238,17 +226,18 @@ func (ibNode *insertBufferNode) Operate(in []*Msg) []*Msg {
 				}
 
 				fieldData := idata.Data[field.FieldID].(*storage.BoolFieldData)
+				var v bool
 				for _, blob := range msg.RowData {
-					var v bool
 					buf := bytes.NewReader(blob.GetValue()[pos:])
 					if err := binary.Read(buf, binary.LittleEndian, &v); err != nil {
 						log.Println("binary.Read bool failed:", err)
 					}
 					fieldData.Data = append(fieldData.Data, v)
-					pos += int(unsafe.Sizeof(*(&v)))
-				}
 
+				}
+				pos += int(unsafe.Sizeof(*(&v)))
 				fieldData.NumRows += len(msg.RowIDs)
+
 			case schemapb.DataType_INT8:
 				if _, ok := idata.Data[field.FieldID]; !ok {
 					idata.Data[field.FieldID] = &storage.Int8FieldData{
@@ -258,15 +247,15 @@ func (ibNode *insertBufferNode) Operate(in []*Msg) []*Msg {
 				}
 
 				fieldData := idata.Data[field.FieldID].(*storage.Int8FieldData)
+				var v int8
 				for _, blob := range msg.RowData {
-					var v int8
 					buf := bytes.NewReader(blob.GetValue()[pos:])
 					if err := binary.Read(buf, binary.LittleEndian, &v); err != nil {
 						log.Println("binary.Read int8 failed:", err)
 					}
 					fieldData.Data = append(fieldData.Data, v)
-					pos += int(unsafe.Sizeof(*(&v)))
 				}
+				pos += int(unsafe.Sizeof(*(&v)))
 				fieldData.NumRows += len(msg.RowIDs)
 
 			case schemapb.DataType_INT16:
@@ -278,16 +267,15 @@ func (ibNode *insertBufferNode) Operate(in []*Msg) []*Msg {
 				}
 
 				fieldData := idata.Data[field.FieldID].(*storage.Int16FieldData)
+				var v int16
 				for _, blob := range msg.RowData {
-					var v int16
 					buf := bytes.NewReader(blob.GetValue()[pos:])
 					if err := binary.Read(buf, binary.LittleEndian, &v); err != nil {
 						log.Println("binary.Read int16 failed:", err)
 					}
 					fieldData.Data = append(fieldData.Data, v)
-					pos += int(unsafe.Sizeof(*(&v)))
 				}
-
+				pos += int(unsafe.Sizeof(*(&v)))
 				fieldData.NumRows += len(msg.RowIDs)
 
 			case schemapb.DataType_INT32:
@@ -299,15 +287,15 @@ func (ibNode *insertBufferNode) Operate(in []*Msg) []*Msg {
 				}
 
 				fieldData := idata.Data[field.FieldID].(*storage.Int32FieldData)
+				var v int32
 				for _, blob := range msg.RowData {
-					var v int32
 					buf := bytes.NewReader(blob.GetValue()[pos:])
 					if err := binary.Read(buf, binary.LittleEndian, &v); err != nil {
 						log.Println("binary.Read int32 failed:", err)
 					}
 					fieldData.Data = append(fieldData.Data, v)
-					pos += int(unsafe.Sizeof(*(&v)))
 				}
+				pos += int(unsafe.Sizeof(*(&v)))
 				fieldData.NumRows += len(msg.RowIDs)
 
 			case schemapb.DataType_INT64:
@@ -320,27 +308,24 @@ func (ibNode *insertBufferNode) Operate(in []*Msg) []*Msg {
 
 				fieldData := idata.Data[field.FieldID].(*storage.Int64FieldData)
 				switch field.FieldID {
-				case 0:
+				case 0: // rowIDs
 					fieldData.Data = append(fieldData.Data, msg.RowIDs...)
 					fieldData.NumRows += len(msg.RowIDs)
-				case 1:
-					// Timestamps
+				case 1: // Timestamps
 					for _, ts := range msg.Timestamps {
 						fieldData.Data = append(fieldData.Data, int64(ts))
 					}
 					fieldData.NumRows += len(msg.Timestamps)
 				default:
-
+					var v int64
 					for _, blob := range msg.RowData {
-						var v int64
 						buf := bytes.NewBuffer(blob.GetValue()[pos:])
 						if err := binary.Read(buf, binary.LittleEndian, &v); err != nil {
 							log.Println("binary.Read int64 failed:", err)
 						}
 						fieldData.Data = append(fieldData.Data, v)
-						pos += int(unsafe.Sizeof(*(&v)))
 					}
-
+					pos += int(unsafe.Sizeof(*(&v)))
 					fieldData.NumRows += len(msg.RowIDs)
 				}
 
@@ -353,16 +338,15 @@ func (ibNode *insertBufferNode) Operate(in []*Msg) []*Msg {
 				}
 
 				fieldData := idata.Data[field.FieldID].(*storage.FloatFieldData)
+				var v float32
 				for _, blob := range msg.RowData {
-					var v float32
 					buf := bytes.NewBuffer(blob.GetValue()[pos:])
 					if err := binary.Read(buf, binary.LittleEndian, &v); err != nil {
 						log.Println("binary.Read float32 failed:", err)
 					}
 					fieldData.Data = append(fieldData.Data, v)
-					pos += int(unsafe.Sizeof(*(&v)))
 				}
-
+				pos += int(unsafe.Sizeof(*(&v)))
 				fieldData.NumRows += len(msg.RowIDs)
 
 			case schemapb.DataType_DOUBLE:
@@ -374,16 +358,16 @@ func (ibNode *insertBufferNode) Operate(in []*Msg) []*Msg {
 				}
 
 				fieldData := idata.Data[field.FieldID].(*storage.DoubleFieldData)
+				var v float64
 				for _, blob := range msg.RowData {
-					var v float64
 					buf := bytes.NewBuffer(blob.GetValue()[pos:])
 					if err := binary.Read(buf, binary.LittleEndian, &v); err != nil {
 						log.Println("binary.Read float64 failed:", err)
 					}
 					fieldData.Data = append(fieldData.Data, v)
-					pos += int(unsafe.Sizeof(*(&v)))
 				}
 
+				pos += int(unsafe.Sizeof(*(&v)))
 				fieldData.NumRows += len(msg.RowIDs)
 			}
 		}
diff --git a/internal/writenode/flow_graph_insert_buffer_node_test.go b/internal/writenode/flow_graph_insert_buffer_node_test.go
index 24c0b5391..428be2aaf 100644
--- a/internal/writenode/flow_graph_insert_buffer_node_test.go
+++ b/internal/writenode/flow_graph_insert_buffer_node_test.go
@@ -9,6 +9,7 @@ import (
 	"testing"
 	"time"
 
+	"github.com/golang/protobuf/proto"
 	"github.com/stretchr/testify/assert"
 	"github.com/stretchr/testify/require"
 	etcdkv "github.com/zilliztech/milvus-distributed/internal/kv/etcd"
@@ -46,11 +47,16 @@ func TestFlowGraphInputBufferNode_Operate(t *testing.T) {
 	assert.Equal(t, testPath, fService.metaTable.client.(*etcdkv.EtcdKV).GetPath("."))
 	go fService.start()
 
-	// Params.FlushInsertBufSize = 2
+	collMeta := newMeta()
+	schemaBlob := proto.MarshalTextString(collMeta.Schema)
+	require.NotEqual(t, "", schemaBlob)
+
 	replica := newReplica()
-	iBNode := newInsertBufferNode(ctx, insertChan, replica)
+	err = replica.addCollection(collMeta.ID, schemaBlob)
+	require.NoError(t, err)
 
-	newMeta()
+	// Params.FlushInsertBufSize = 2
+	iBNode := newInsertBufferNode(ctx, insertChan, replica)
 	inMsg := genInsertMsg()
 	var iMsg flowgraph.Msg = &inMsg
 	iBNode.Operate([]*flowgraph.Msg{&iMsg})
@@ -70,14 +76,12 @@ func genInsertMsg() insertMsg {
 		binary.LittleEndian.PutUint32(buf, math.Float32bits(ele))
 		rawData = append(rawData, buf...)
 	}
-	log.Println(len(rawData))
 
 	// Binary vector
 	// Dimension of binary vector is 32
 	// size := 4,  = 32 / 8
 	var bvector = []byte{255, 255, 255, 0}
 	rawData = append(rawData, bvector...)
-	log.Println(len(rawData))
 
 	// Bool
 	var fieldBool = true
@@ -87,7 +91,6 @@ func genInsertMsg() insertMsg {
 	}
 
 	rawData = append(rawData, buf.Bytes()...)
-	log.Println(len(rawData))
 
 	// int8
 	var dataInt8 int8 = 100
@@ -96,7 +99,6 @@ func genInsertMsg() insertMsg {
 		panic(err)
 	}
 	rawData = append(rawData, bint8.Bytes()...)
-	log.Println(len(rawData))
 
 	// int16
 	var dataInt16 int16 = 200
@@ -105,7 +107,6 @@ func genInsertMsg() insertMsg {
 		panic(err)
 	}
 	rawData = append(rawData, bint16.Bytes()...)
-	log.Println(len(rawData))
 
 	// int32
 	var dataInt32 int32 = 300
@@ -114,16 +115,14 @@ func genInsertMsg() insertMsg {
 		panic(err)
 	}
 	rawData = append(rawData, bint32.Bytes()...)
-	log.Println(len(rawData))
 
 	// int64
-	var dataInt64 int64 = 300
+	var dataInt64 int64 = 400
 	bint64 := new(bytes.Buffer)
 	if err := binary.Write(bint64, binary.LittleEndian, dataInt64); err != nil {
 		panic(err)
 	}
 	rawData = append(rawData, bint64.Bytes()...)
-	log.Println(len(rawData))
 
 	// float32
 	var datafloat float32 = 1.1
@@ -132,7 +131,6 @@ func genInsertMsg() insertMsg {
 		panic(err)
 	}
 	rawData = append(rawData, bfloat32.Bytes()...)
-	log.Println(len(rawData))
 
 	// float64
 	var datafloat64 float64 = 2.2
@@ -141,7 +139,7 @@ func genInsertMsg() insertMsg {
 		panic(err)
 	}
 	rawData = append(rawData, bfloat64.Bytes()...)
-	log.Println(len(rawData))
+	log.Println("Test rawdata length:", len(rawData))
 
 	timeRange := TimeRange{
 		timestampMin: 0,
@@ -170,16 +168,32 @@ func genInsertMsg() insertMsg {
 			InsertRequest: internalpb.InsertRequest{
 				MsgType:        commonpb.MsgType_kInsert,
 				ReqID:          UniqueID(0),
-				CollectionName: "coll1",
+				CollectionName: "col1",
 				PartitionTag:   "default",
 				SegmentID:      UniqueID(1),
 				ChannelID:      UniqueID(0),
 				ProxyID:        UniqueID(0),
-				Timestamps:     []Timestamp{Timestamp(i + 1000)},
-				RowIDs:         []UniqueID{UniqueID(i)},
+				Timestamps: []Timestamp{
+					Timestamp(i + 1000),
+					Timestamp(i + 1000),
+					Timestamp(i + 1000),
+					Timestamp(i + 1000),
+					Timestamp(i + 1000),
+				},
+				RowIDs: []UniqueID{
+					UniqueID(i),
+					UniqueID(i),
+					UniqueID(i),
+					UniqueID(i),
+					UniqueID(i),
+				},
 
 				RowData: []*commonpb.Blob{
 					{Value: rawData},
+					{Value: rawData},
+					{Value: rawData},
+					{Value: rawData},
+					{Value: rawData},
 				},
 			},
 		}
@@ -193,9 +207,11 @@ func genInsertMsg() insertMsg {
 			},
 		},
 		FlushMsg: internalpb.FlushMsg{
-			MsgType:   commonpb.MsgType_kFlush,
-			SegmentID: UniqueID(1),
-			Timestamp: Timestamp(2000),
+			MsgType:      commonpb.MsgType_kFlush,
+			SegmentID:    UniqueID(1),
+			Timestamp:    Timestamp(2000),
+			CollectionID: UniqueID(1),
+			PartitionTag: "default",
 		},
 	}
 	iMsg.flushMessages = append(iMsg.flushMessages, &fmsg)
-- 
GitLab