From 73a8e7de847ca07d9680d4b44945dd678434880e Mon Sep 17 00:00:00 2001
From: XuanYang-cn <xuan.yang@zilliz.com>
Date: Wed, 3 Feb 2021 15:18:05 +0800
Subject: [PATCH] Fix datanode insertchannel refresh bug

Signed-off-by: XuanYang-cn <xuan.yang@zilliz.com>
---
 internal/datanode/client/client.go            | 137 -------
 internal/datanode/data_node.go                |  59 ++-
 internal/datanode/data_node_test.go           | 374 ++++++++++++++++++
 internal/datanode/data_sync_service.go        |  16 +-
 internal/datanode/data_sync_service_test.go   | 109 +----
 internal/datanode/factory.go                  | 231 -----------
 internal/datanode/flow_graph_dd_node.go       |   1 -
 .../flow_graph_insert_buffer_node_test.go     | 133 +------
 .../flow_graph_msg_stream_input_node.go       |  27 +-
 internal/masterservice/master_service_test.go |  14 +-
 10 files changed, 460 insertions(+), 641 deletions(-)
 delete mode 100644 internal/datanode/client/client.go
 delete mode 100644 internal/datanode/factory.go

diff --git a/internal/datanode/client/client.go b/internal/datanode/client/client.go
deleted file mode 100644
index f987f20d6..000000000
--- a/internal/datanode/client/client.go
+++ /dev/null
@@ -1,137 +0,0 @@
-package writerclient
-
-import (
-	"strconv"
-
-	"github.com/golang/protobuf/proto"
-	"go.etcd.io/etcd/clientv3"
-
-	"github.com/zilliztech/milvus-distributed/internal/errors"
-	"github.com/zilliztech/milvus-distributed/internal/kv"
-	etcdkv "github.com/zilliztech/milvus-distributed/internal/kv/etcd"
-	"github.com/zilliztech/milvus-distributed/internal/msgstream"
-	"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
-	"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
-	pb "github.com/zilliztech/milvus-distributed/internal/proto/writerpb"
-	"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
-)
-
-type UniqueID = typeutil.UniqueID
-
-type Timestamp = typeutil.Timestamp
-
-type Client struct {
-	kvClient kv.TxnBase // client of a reliable kv service, i.e. etcd client
-	kvPrefix string
-
-	flushStream msgstream.MsgStream
-}
-
-func NewWriterClient(etcdAddress string, kvRootPath string, writeNodeSegKvSubPath string, flushStream msgstream.MsgStream) (*Client, error) {
-	// init kv client
-	etcdClient, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddress}})
-	if err != nil {
-		return nil, err
-	}
-	kvClient := etcdkv.NewEtcdKV(etcdClient, kvRootPath)
-
-	return &Client{
-		kvClient:    kvClient,
-		kvPrefix:    writeNodeSegKvSubPath,
-		flushStream: flushStream,
-	}, nil
-}
-
-type SegmentDescription struct {
-	SegmentID UniqueID
-	IsClosed  bool
-	OpenTime  Timestamp
-	CloseTime Timestamp
-}
-
-func (c *Client) FlushSegment(segmentID UniqueID, collectionID UniqueID, partitionTag string, timestamp Timestamp) error {
-	baseMsg := msgstream.BaseMsg{
-		BeginTimestamp: 0,
-		EndTimestamp:   0,
-		HashValues:     []uint32{0},
-	}
-
-	flushMsg := internalpb2.FlushMsg{
-		Base: &commonpb.MsgBase{
-			MsgType:   commonpb.MsgType_kFlush,
-			Timestamp: timestamp,
-		},
-		SegmentID:    segmentID,
-		CollectionID: collectionID,
-		PartitionTag: partitionTag,
-	}
-
-	fMsg := &msgstream.FlushMsg{
-		BaseMsg:  baseMsg,
-		FlushMsg: flushMsg,
-	}
-	msgPack := msgstream.MsgPack{}
-	msgPack.Msgs = append(msgPack.Msgs, fMsg)
-
-	err := c.flushStream.Produce(&msgPack)
-	return err
-}
-
-func (c *Client) DescribeSegment(segmentID UniqueID) (*SegmentDescription, error) {
-	// query etcd
-	ret := &SegmentDescription{
-		SegmentID: segmentID,
-		IsClosed:  false,
-	}
-
-	key := c.kvPrefix + strconv.FormatInt(segmentID, 10)
-
-	etcdKV, ok := c.kvClient.(*etcdkv.EtcdKV)
-	if !ok {
-		return nil, errors.New("type assertion failed for etcd kv")
-	}
-	count, err := etcdKV.GetCount(key)
-	if err != nil {
-		return nil, err
-	}
-
-	if count <= 0 {
-		ret.IsClosed = false
-		return ret, nil
-	}
-
-	value, err := c.kvClient.Load(key)
-	if err != nil {
-		return ret, err
-	}
-
-	flushMeta := pb.SegmentFlushMeta{}
-	err = proto.UnmarshalText(value, &flushMeta)
-	if err != nil {
-		return ret, err
-	}
-	ret.IsClosed = flushMeta.IsClosed
-	ret.OpenTime = flushMeta.OpenTime
-	ret.CloseTime = flushMeta.CloseTime
-	return ret, nil
-}
-
-func (c *Client) GetInsertBinlogPaths(segmentID UniqueID) (map[int64][]string, error) {
-	key := c.kvPrefix + strconv.FormatInt(segmentID, 10)
-
-	value, err := c.kvClient.Load(key)
-	if err != nil {
-		return nil, err
-	}
-
-	flushMeta := pb.SegmentFlushMeta{}
-	err = proto.UnmarshalText(value, &flushMeta)
-	if err != nil {
-		return nil, err
-	}
-	ret := make(map[int64][]string)
-	for _, field := range flushMeta.Fields {
-		ret[field.FieldID] = field.BinlogPaths
-	}
-	return ret, nil
-}
diff --git a/internal/datanode/data_node.go b/internal/datanode/data_node.go
index ca55446b5..0ae0497e4 100644
--- a/internal/datanode/data_node.go
+++ b/internal/datanode/data_node.go
@@ -2,6 +2,7 @@ package datanode
 
 import (
 	"context"
+	"fmt"
 	"io"
 	"log"
 	"time"
@@ -82,9 +83,9 @@ func NewDataNode(ctx context.Context) *DataNode {
 	node := &DataNode{
 		ctx:             ctx2,
 		cancel:          cancel2,
-		NodeID:          Params.NodeID, // GOOSE TODO How to init
+		NodeID:          Params.NodeID, // GOOSE TODO: How to init
 		Role:            typeutil.DataNodeRole,
-		State:           internalpb2.StateCode_INITIALIZING,
+		State:           internalpb2.StateCode_INITIALIZING, // GOOSE TODO: atomic
 		dataSyncService: nil,
 		metaService:     nil,
 		masterService:   nil,
@@ -96,15 +97,26 @@ func NewDataNode(ctx context.Context) *DataNode {
 }
 
 func (node *DataNode) SetMasterServiceInterface(ms MasterServiceInterface) error {
-	node.masterService = ms
-	return nil
+	switch {
+	case ms == nil, node.masterService != nil:
+		return errors.New("Nil parameter or repeatly set")
+	default:
+		node.masterService = ms
+		return nil
+	}
 }
 
 func (node *DataNode) SetDataServiceInterface(ds DataServiceInterface) error {
-	node.dataService = ds
-	return nil
+	switch {
+	case ds == nil, node.dataService != nil:
+		return errors.New("Nil parameter or repeatly set")
+	default:
+		node.dataService = ds
+		return nil
+	}
 }
 
+// Suppose dataservice is in INITIALIZING
 func (node *DataNode) Init() error {
 
 	req := &datapb.RegisterNodeRequest{
@@ -145,11 +157,15 @@ func (node *DataNode) Init() error {
 	}
 
 	var alloc allocator = newAllocatorImpl(node.masterService)
+
 	chanSize := 100
 	node.flushChan = make(chan *flushMsg, chanSize)
+
 	node.dataSyncService = newDataSyncService(node.ctx, node.flushChan, replica, alloc)
 	node.metaService = newMetaService(node.ctx, replica, node.masterService)
+
 	node.replica = replica
+	node.dataSyncService.initNodes()
 
 	// --- Opentracing ---
 	cfg := &config.Configuration{
@@ -174,19 +190,38 @@ func (node *DataNode) Init() error {
 }
 
 func (node *DataNode) Start() error {
+	node.metaService.init()
+	return nil
+}
 
+// DataNode is HEALTHY until StartSync() is called
+func (node *DataNode) StartSync() {
+	node.dataSyncService.init()
 	go node.dataSyncService.start()
-	node.metaService.init()
 	node.State = internalpb2.StateCode_HEALTHY
-
-	return nil
 }
 
 func (node *DataNode) WatchDmChannels(in *datapb.WatchDmChannelRequest) (*commonpb.Status, error) {
-	log.Println("Init insert channel names:", in.GetChannelNames())
-	Params.InsertChannelNames = append(Params.InsertChannelNames, in.GetChannelNames()...)
+	status := &commonpb.Status{
+		ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
+	}
 
-	return &commonpb.Status{ErrorCode: commonpb.ErrorCode_SUCCESS}, nil
+	switch {
+
+	case node.State != internalpb2.StateCode_HEALTHY:
+		status.Reason = fmt.Sprintf("DataNode %d not healthy!", node.NodeID)
+		return status, errors.New(status.GetReason())
+
+	case len(Params.InsertChannelNames) != 0:
+		status.Reason = fmt.Sprintf("DataNode has %d already set insert channels!", node.NodeID)
+		return status, errors.New(status.GetReason())
+
+	default:
+		Params.InsertChannelNames = in.GetChannelNames()
+		status.ErrorCode = commonpb.ErrorCode_SUCCESS
+		node.StartSync()
+		return status, nil
+	}
 }
 
 func (node *DataNode) GetComponentStates() (*internalpb2.ComponentStates, error) {
diff --git a/internal/datanode/data_node_test.go b/internal/datanode/data_node_test.go
index 7de7101c1..f0626a857 100644
--- a/internal/datanode/data_node_test.go
+++ b/internal/datanode/data_node_test.go
@@ -1,7 +1,10 @@
 package datanode
 
 import (
+	"bytes"
+	"encoding/binary"
 	"log"
+	"math"
 	"math/rand"
 	"os"
 	"strconv"
@@ -10,6 +13,14 @@ import (
 	"go.etcd.io/etcd/clientv3"
 
 	etcdkv "github.com/zilliztech/milvus-distributed/internal/kv/etcd"
+
+	"github.com/zilliztech/milvus-distributed/internal/msgstream"
+	"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
+	"github.com/zilliztech/milvus-distributed/internal/proto/etcdpb"
+	"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
+	"github.com/zilliztech/milvus-distributed/internal/proto/masterpb"
+	"github.com/zilliztech/milvus-distributed/internal/proto/milvuspb"
+	"github.com/zilliztech/milvus-distributed/internal/proto/schemapb"
 )
 
 func makeNewChannelNames(names []string, suffix string) []string {
@@ -77,3 +88,366 @@ func clearEtcd(rootPath string) error {
 	return nil
 
 }
+
+type (
+	Factory interface {
+	}
+
+	MetaFactory struct {
+	}
+
+	DataFactory struct {
+		rawData []byte
+	}
+
+	AllocatorFactory struct {
+		ID UniqueID
+	}
+
+	MasterServiceFactory struct {
+		ID             UniqueID
+		collectionName string
+		collectionID   UniqueID
+	}
+)
+
+func (mf *MetaFactory) CollectionMetaFactory(collectionID UniqueID, collectionName string) *etcdpb.CollectionMeta {
+	sch := schemapb.CollectionSchema{
+		Name:        collectionName,
+		Description: "test collection by meta factory",
+		AutoID:      true,
+		Fields: []*schemapb.FieldSchema{
+			{
+				FieldID:     0,
+				Name:        "RowID",
+				Description: "RowID field",
+				DataType:    schemapb.DataType_INT64,
+				TypeParams: []*commonpb.KeyValuePair{
+					{
+						Key:   "f0_tk1",
+						Value: "f0_tv1",
+					},
+				},
+			},
+			{
+				FieldID:     1,
+				Name:        "Timestamp",
+				Description: "Timestamp field",
+				DataType:    schemapb.DataType_INT64,
+				TypeParams: []*commonpb.KeyValuePair{
+					{
+						Key:   "f1_tk1",
+						Value: "f1_tv1",
+					},
+				},
+			},
+			{
+				FieldID:     100,
+				Name:        "float_vector_field",
+				Description: "field 100",
+				DataType:    schemapb.DataType_VECTOR_FLOAT,
+				TypeParams: []*commonpb.KeyValuePair{
+					{
+						Key:   "dim",
+						Value: "2",
+					},
+				},
+				IndexParams: []*commonpb.KeyValuePair{
+					{
+						Key:   "indexkey",
+						Value: "indexvalue",
+					},
+				},
+			},
+			{
+				FieldID:     101,
+				Name:        "binary_vector_field",
+				Description: "field 101",
+				DataType:    schemapb.DataType_VECTOR_BINARY,
+				TypeParams: []*commonpb.KeyValuePair{
+					{
+						Key:   "dim",
+						Value: "32",
+					},
+				},
+				IndexParams: []*commonpb.KeyValuePair{
+					{
+						Key:   "indexkey",
+						Value: "indexvalue",
+					},
+				},
+			},
+			{
+				FieldID:     102,
+				Name:        "bool_field",
+				Description: "field 102",
+				DataType:    schemapb.DataType_BOOL,
+				TypeParams:  []*commonpb.KeyValuePair{},
+				IndexParams: []*commonpb.KeyValuePair{},
+			},
+			{
+				FieldID:     103,
+				Name:        "int8_field",
+				Description: "field 103",
+				DataType:    schemapb.DataType_INT8,
+				TypeParams:  []*commonpb.KeyValuePair{},
+				IndexParams: []*commonpb.KeyValuePair{},
+			},
+			{
+				FieldID:     104,
+				Name:        "int16_field",
+				Description: "field 104",
+				DataType:    schemapb.DataType_INT16,
+				TypeParams:  []*commonpb.KeyValuePair{},
+				IndexParams: []*commonpb.KeyValuePair{},
+			},
+			{
+				FieldID:     105,
+				Name:        "int32_field",
+				Description: "field 105",
+				DataType:    schemapb.DataType_INT32,
+				TypeParams:  []*commonpb.KeyValuePair{},
+				IndexParams: []*commonpb.KeyValuePair{},
+			},
+			{
+				FieldID:     106,
+				Name:        "int64_field",
+				Description: "field 106",
+				DataType:    schemapb.DataType_INT64,
+				TypeParams:  []*commonpb.KeyValuePair{},
+				IndexParams: []*commonpb.KeyValuePair{},
+			},
+			{
+				FieldID:     107,
+				Name:        "float32_field",
+				Description: "field 107",
+				DataType:    schemapb.DataType_FLOAT,
+				TypeParams:  []*commonpb.KeyValuePair{},
+				IndexParams: []*commonpb.KeyValuePair{},
+			},
+			{
+				FieldID:     108,
+				Name:        "float64_field",
+				Description: "field 108",
+				DataType:    schemapb.DataType_DOUBLE,
+				TypeParams:  []*commonpb.KeyValuePair{},
+				IndexParams: []*commonpb.KeyValuePair{},
+			},
+		},
+	}
+
+	collection := etcdpb.CollectionMeta{
+		ID:            collectionID,
+		Schema:        &sch,
+		CreateTime:    Timestamp(1),
+		SegmentIDs:    make([]UniqueID, 0),
+		PartitionTags: make([]string, 0),
+	}
+	return &collection
+}
+
+func NewDataFactory() *DataFactory {
+	return &DataFactory{rawData: GenRowData()}
+}
+
+func GenRowData() (rawData []byte) {
+	const DIM = 2
+	const N = 1
+
+	// Float vector
+	var fvector = [DIM]float32{1, 2}
+	for _, ele := range fvector {
+		buf := make([]byte, 4)
+		binary.LittleEndian.PutUint32(buf, math.Float32bits(ele))
+		rawData = append(rawData, buf...)
+	}
+
+	// Binary vector
+	// Dimension of binary vector is 32
+	// size := 4,  = 32 / 8
+	var bvector = []byte{255, 255, 255, 0}
+	rawData = append(rawData, bvector...)
+
+	// Bool
+	var fieldBool = true
+	buf := new(bytes.Buffer)
+	if err := binary.Write(buf, binary.LittleEndian, fieldBool); err != nil {
+		panic(err)
+	}
+
+	rawData = append(rawData, buf.Bytes()...)
+
+	// int8
+	var dataInt8 int8 = 100
+	bint8 := new(bytes.Buffer)
+	if err := binary.Write(bint8, binary.LittleEndian, dataInt8); err != nil {
+		panic(err)
+	}
+	rawData = append(rawData, bint8.Bytes()...)
+
+	// int16
+	var dataInt16 int16 = 200
+	bint16 := new(bytes.Buffer)
+	if err := binary.Write(bint16, binary.LittleEndian, dataInt16); err != nil {
+		panic(err)
+	}
+	rawData = append(rawData, bint16.Bytes()...)
+
+	// int32
+	var dataInt32 int32 = 300
+	bint32 := new(bytes.Buffer)
+	if err := binary.Write(bint32, binary.LittleEndian, dataInt32); err != nil {
+		panic(err)
+	}
+	rawData = append(rawData, bint32.Bytes()...)
+
+	// int64
+	var dataInt64 int64 = 400
+	bint64 := new(bytes.Buffer)
+	if err := binary.Write(bint64, binary.LittleEndian, dataInt64); err != nil {
+		panic(err)
+	}
+	rawData = append(rawData, bint64.Bytes()...)
+
+	// float32
+	var datafloat float32 = 1.1
+	bfloat32 := new(bytes.Buffer)
+	if err := binary.Write(bfloat32, binary.LittleEndian, datafloat); err != nil {
+		panic(err)
+	}
+	rawData = append(rawData, bfloat32.Bytes()...)
+
+	// float64
+	var datafloat64 float64 = 2.2
+	bfloat64 := new(bytes.Buffer)
+	if err := binary.Write(bfloat64, binary.LittleEndian, datafloat64); err != nil {
+		panic(err)
+	}
+	rawData = append(rawData, bfloat64.Bytes()...)
+	log.Println("Rawdata length:", len(rawData))
+	return
+}
+
+// n: number of TsinsertMsgs to generate
+func (df *DataFactory) GetMsgStreamTsInsertMsgs(n int) (inMsgs []msgstream.TsMsg) {
+	for i := 0; i < n; i++ {
+		var msg msgstream.TsMsg = &msgstream.InsertMsg{
+			BaseMsg: msgstream.BaseMsg{
+				HashValues: []uint32{uint32(i)},
+			},
+			InsertRequest: internalpb2.InsertRequest{
+				Base: &commonpb.MsgBase{
+					MsgType:   commonpb.MsgType_kInsert,
+					MsgID:     0, // GOOSE TODO
+					Timestamp: Timestamp(i + 1000),
+					SourceID:  0,
+				},
+				CollectionName: "col1", // GOOSE TODO
+				PartitionName:  "default",
+				SegmentID:      1,   // GOOSE TODO
+				ChannelID:      "0", // GOOSE TODO
+				Timestamps:     []Timestamp{Timestamp(i + 1000)},
+				RowIDs:         []UniqueID{UniqueID(i)},
+				RowData:        []*commonpb.Blob{{Value: df.rawData}},
+			},
+		}
+		inMsgs = append(inMsgs, msg)
+	}
+	return
+}
+
+// n: number of insertMsgs to generate
+func (df *DataFactory) GetMsgStreamInsertMsgs(n int) (inMsgs []*msgstream.InsertMsg) {
+	for i := 0; i < n; i++ {
+		var msg = &msgstream.InsertMsg{
+			BaseMsg: msgstream.BaseMsg{
+				HashValues: []uint32{uint32(i)},
+			},
+			InsertRequest: internalpb2.InsertRequest{
+				Base: &commonpb.MsgBase{
+					MsgType:   commonpb.MsgType_kInsert,
+					MsgID:     0, // GOOSE TODO
+					Timestamp: Timestamp(i + 1000),
+					SourceID:  0,
+				},
+				CollectionName: "col1", // GOOSE TODO
+				PartitionName:  "default",
+				SegmentID:      1,   // GOOSE TODO
+				ChannelID:      "0", // GOOSE TODO
+				Timestamps:     []Timestamp{Timestamp(i + 1000)},
+				RowIDs:         []UniqueID{UniqueID(i)},
+				RowData:        []*commonpb.Blob{{Value: df.rawData}},
+			},
+		}
+		inMsgs = append(inMsgs, msg)
+	}
+	return
+}
+
+func NewAllocatorFactory(id ...UniqueID) *AllocatorFactory {
+	f := &AllocatorFactory{}
+	if len(id) == 1 {
+		f.ID = id[0]
+	}
+	return f
+}
+
+func (alloc AllocatorFactory) setID(id UniqueID) {
+	alloc.ID = id
+}
+
+func (alloc AllocatorFactory) allocID() (UniqueID, error) {
+	if alloc.ID == 0 {
+		return UniqueID(0), nil // GOOSE TODO: random ID generating
+	}
+	return alloc.ID, nil
+}
+
+func (m *MasterServiceFactory) setID(id UniqueID) {
+	m.ID = id // GOOSE TODO: random ID generator
+}
+
+func (m *MasterServiceFactory) setCollectionID(id UniqueID) {
+	m.collectionID = id
+}
+
+func (m *MasterServiceFactory) setCollectionName(name string) {
+	m.collectionName = name
+}
+
+func (m *MasterServiceFactory) AllocID(in *masterpb.IDRequest) (*masterpb.IDResponse, error) {
+	resp := &masterpb.IDResponse{
+		Status: &commonpb.Status{},
+		ID:     m.ID,
+	}
+	return resp, nil
+}
+
+func (m *MasterServiceFactory) ShowCollections(in *milvuspb.ShowCollectionRequest) (*milvuspb.ShowCollectionResponse, error) {
+	resp := &milvuspb.ShowCollectionResponse{
+		Status:          &commonpb.Status{},
+		CollectionNames: []string{m.collectionName},
+	}
+	return resp, nil
+
+}
+func (m *MasterServiceFactory) DescribeCollection(in *milvuspb.DescribeCollectionRequest) (*milvuspb.DescribeCollectionResponse, error) {
+	f := MetaFactory{}
+	meta := f.CollectionMetaFactory(m.collectionID, m.collectionName)
+	resp := &milvuspb.DescribeCollectionResponse{
+		Status:       &commonpb.Status{},
+		CollectionID: m.collectionID,
+		Schema:       meta.Schema,
+	}
+	return resp, nil
+}
+
+func (m *MasterServiceFactory) GetComponentStates() (*internalpb2.ComponentStates, error) {
+	return &internalpb2.ComponentStates{
+		State:              &internalpb2.ComponentInfo{},
+		SubcomponentStates: make([]*internalpb2.ComponentInfo, 0),
+		Status: &commonpb.Status{
+			ErrorCode: commonpb.ErrorCode_SUCCESS,
+		},
+	}, nil
+}
diff --git a/internal/datanode/data_sync_service.go b/internal/datanode/data_sync_service.go
index fa8b09ec0..4a49b79af 100644
--- a/internal/datanode/data_sync_service.go
+++ b/internal/datanode/data_sync_service.go
@@ -19,18 +19,27 @@ type dataSyncService struct {
 
 func newDataSyncService(ctx context.Context, flushChan chan *flushMsg,
 	replica collectionReplica, alloc allocator) *dataSyncService {
-
-	return &dataSyncService{
+	service := &dataSyncService{
 		ctx:         ctx,
 		fg:          nil,
 		flushChan:   flushChan,
 		replica:     replica,
 		idAllocator: alloc,
 	}
+	return service
 }
 
-func (dsService *dataSyncService) start() {
+func (dsService *dataSyncService) init() {
+	if len(Params.InsertChannelNames) == 0 {
+		log.Println("InsertChannels not readly, init datasync service failed")
+		return
+	}
+
 	dsService.initNodes()
+}
+
+func (dsService *dataSyncService) start() {
+	log.Println("Data Sync Service Start Successfully")
 	dsService.fg.Start()
 }
 
@@ -60,7 +69,6 @@ func (dsService *dataSyncService) initNodes() {
 	var ddStreamNode Node = newDDInputNode(dsService.ctx)
 
 	var filterDmNode Node = newFilteredDmNode()
-
 	var ddNode Node = newDDNode(dsService.ctx, mt, dsService.flushChan, dsService.replica, dsService.idAllocator)
 	var insertBufferNode Node = newInsertBufferNode(dsService.ctx, mt, dsService.replica, dsService.idAllocator)
 	var gcNode Node = newGCNode(dsService.replica)
diff --git a/internal/datanode/data_sync_service_test.go b/internal/datanode/data_sync_service_test.go
index 3b40ea1a2..1a1079498 100644
--- a/internal/datanode/data_sync_service_test.go
+++ b/internal/datanode/data_sync_service_test.go
@@ -2,7 +2,6 @@ package datanode
 
 import (
 	"context"
-	"encoding/binary"
 	"math"
 	"testing"
 	"time"
@@ -42,116 +41,15 @@ func TestDataSyncService_Start(t *testing.T) {
 	allocFactory := AllocatorFactory{}
 	sync := newDataSyncService(ctx, flushChan, replica, allocFactory)
 	sync.replica.addCollection(collMeta.ID, collMeta.Schema)
+	sync.init()
 	go sync.start()
 
-	// test data generate
-	// GOOSE TODO orgnize
-	const DIM = 2
-	const N = 1
-	var rawData []byte
-
-	// Float vector
-	var fvector = [DIM]float32{1, 2}
-	for _, ele := range fvector {
-		buf := make([]byte, 4)
-		binary.LittleEndian.PutUint32(buf, math.Float32bits(ele))
-		rawData = append(rawData, buf...)
-	}
-
-	// Binary vector
-	// Dimension of binary vector is 32
-	var bvector = [4]byte{255, 255, 255, 0}
-	for _, ele := range bvector {
-		bs := make([]byte, 4)
-		binary.LittleEndian.PutUint32(bs, uint32(ele))
-		rawData = append(rawData, bs...)
-	}
-
-	// Bool
-	bb := make([]byte, 4)
-	var fieldBool = true
-	var fieldBoolInt uint32
-	if fieldBool {
-		fieldBoolInt = 1
-	} else {
-		fieldBoolInt = 0
-	}
-
-	binary.LittleEndian.PutUint32(bb, fieldBoolInt)
-	rawData = append(rawData, bb...)
-
-	// int8
-	var dataInt8 int8 = 100
-	bint8 := make([]byte, 4)
-	binary.LittleEndian.PutUint32(bint8, uint32(dataInt8))
-	rawData = append(rawData, bint8...)
-
-	// int16
-	var dataInt16 int16 = 200
-	bint16 := make([]byte, 4)
-	binary.LittleEndian.PutUint32(bint16, uint32(dataInt16))
-	rawData = append(rawData, bint16...)
-
-	// int32
-	var dataInt32 int32 = 300
-	bint32 := make([]byte, 4)
-	binary.LittleEndian.PutUint32(bint32, uint32(dataInt32))
-	rawData = append(rawData, bint32...)
-
-	// int64
-	var dataInt64 int64 = 300
-	bint64 := make([]byte, 4)
-	binary.LittleEndian.PutUint32(bint64, uint32(dataInt64))
-	rawData = append(rawData, bint64...)
-
-	// float32
-	var datafloat float32 = 1.1
-	bfloat32 := make([]byte, 4)
-	binary.LittleEndian.PutUint32(bfloat32, math.Float32bits(datafloat))
-	rawData = append(rawData, bfloat32...)
-
-	// float64
-	var datafloat64 float64 = 2.2
-	bfloat64 := make([]byte, 8)
-	binary.LittleEndian.PutUint64(bfloat64, math.Float64bits(datafloat64))
-	rawData = append(rawData, bfloat64...)
-
 	timeRange := TimeRange{
 		timestampMin: 0,
 		timestampMax: math.MaxUint64,
 	}
-
-	// messages generate
-	const MSGLENGTH = 1
-	insertMessages := make([]msgstream.TsMsg, 0)
-	for i := 0; i < MSGLENGTH; i++ {
-		var msg msgstream.TsMsg = &msgstream.InsertMsg{
-			BaseMsg: msgstream.BaseMsg{
-				HashValues: []uint32{
-					uint32(i),
-				},
-			},
-			InsertRequest: internalpb2.InsertRequest{
-				Base: &commonpb.MsgBase{
-					MsgType:   commonpb.MsgType_kInsert,
-					MsgID:     UniqueID(0),
-					Timestamp: Timestamp(i + 1000),
-					SourceID:  0,
-				},
-				CollectionName: "col1",
-				PartitionName:  "default",
-				SegmentID:      UniqueID(1),
-				ChannelID:      "0",
-				Timestamps:     []Timestamp{Timestamp(i + 1000)},
-				RowIDs:         []UniqueID{UniqueID(i)},
-
-				RowData: []*commonpb.Blob{
-					{Value: rawData},
-				},
-			},
-		}
-		insertMessages = append(insertMessages, msg)
-	}
+	dataFactory := NewDataFactory()
+	insertMessages := dataFactory.GetMsgStreamTsInsertMsgs(2)
 
 	msgPack := msgstream.MsgPack{
 		BeginTs: timeRange.timestampMin,
@@ -208,6 +106,7 @@ func TestDataSyncService_Start(t *testing.T) {
 
 	// dataSync
 	Params.FlushInsertBufferSize = 1
+	<-sync.ctx.Done()
 
 	sync.close()
 }
diff --git a/internal/datanode/factory.go b/internal/datanode/factory.go
deleted file mode 100644
index c0fbebb34..000000000
--- a/internal/datanode/factory.go
+++ /dev/null
@@ -1,231 +0,0 @@
-package datanode
-
-import (
-	"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
-	"github.com/zilliztech/milvus-distributed/internal/proto/etcdpb"
-	"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
-	"github.com/zilliztech/milvus-distributed/internal/proto/masterpb"
-	"github.com/zilliztech/milvus-distributed/internal/proto/milvuspb"
-	"github.com/zilliztech/milvus-distributed/internal/proto/schemapb"
-)
-
-type (
-	Factory interface {
-	}
-
-	MetaFactory struct {
-	}
-
-	AllocatorFactory struct {
-		ID UniqueID
-	}
-
-	MasterServiceFactory struct {
-		ID             UniqueID
-		collectionName string
-		collectionID   UniqueID
-	}
-)
-
-func (mf *MetaFactory) CollectionMetaFactory(collectionID UniqueID, collectionName string) *etcdpb.CollectionMeta {
-	sch := schemapb.CollectionSchema{
-		Name:        collectionName,
-		Description: "test collection by meta factory",
-		AutoID:      true,
-		Fields: []*schemapb.FieldSchema{
-			{
-				FieldID:     0,
-				Name:        "RowID",
-				Description: "RowID field",
-				DataType:    schemapb.DataType_INT64,
-				TypeParams: []*commonpb.KeyValuePair{
-					{
-						Key:   "f0_tk1",
-						Value: "f0_tv1",
-					},
-				},
-			},
-			{
-				FieldID:     1,
-				Name:        "Timestamp",
-				Description: "Timestamp field",
-				DataType:    schemapb.DataType_INT64,
-				TypeParams: []*commonpb.KeyValuePair{
-					{
-						Key:   "f1_tk1",
-						Value: "f1_tv1",
-					},
-				},
-			},
-			{
-				FieldID:     100,
-				Name:        "float_vector_field",
-				Description: "field 100",
-				DataType:    schemapb.DataType_VECTOR_FLOAT,
-				TypeParams: []*commonpb.KeyValuePair{
-					{
-						Key:   "dim",
-						Value: "2",
-					},
-				},
-				IndexParams: []*commonpb.KeyValuePair{
-					{
-						Key:   "indexkey",
-						Value: "indexvalue",
-					},
-				},
-			},
-			{
-				FieldID:     101,
-				Name:        "binary_vector_field",
-				Description: "field 101",
-				DataType:    schemapb.DataType_VECTOR_BINARY,
-				TypeParams: []*commonpb.KeyValuePair{
-					{
-						Key:   "dim",
-						Value: "32",
-					},
-				},
-				IndexParams: []*commonpb.KeyValuePair{
-					{
-						Key:   "indexkey",
-						Value: "indexvalue",
-					},
-				},
-			},
-			{
-				FieldID:     102,
-				Name:        "bool_field",
-				Description: "field 102",
-				DataType:    schemapb.DataType_BOOL,
-				TypeParams:  []*commonpb.KeyValuePair{},
-				IndexParams: []*commonpb.KeyValuePair{},
-			},
-			{
-				FieldID:     103,
-				Name:        "int8_field",
-				Description: "field 103",
-				DataType:    schemapb.DataType_INT8,
-				TypeParams:  []*commonpb.KeyValuePair{},
-				IndexParams: []*commonpb.KeyValuePair{},
-			},
-			{
-				FieldID:     104,
-				Name:        "int16_field",
-				Description: "field 104",
-				DataType:    schemapb.DataType_INT16,
-				TypeParams:  []*commonpb.KeyValuePair{},
-				IndexParams: []*commonpb.KeyValuePair{},
-			},
-			{
-				FieldID:     105,
-				Name:        "int32_field",
-				Description: "field 105",
-				DataType:    schemapb.DataType_INT32,
-				TypeParams:  []*commonpb.KeyValuePair{},
-				IndexParams: []*commonpb.KeyValuePair{},
-			},
-			{
-				FieldID:     106,
-				Name:        "int64_field",
-				Description: "field 106",
-				DataType:    schemapb.DataType_INT64,
-				TypeParams:  []*commonpb.KeyValuePair{},
-				IndexParams: []*commonpb.KeyValuePair{},
-			},
-			{
-				FieldID:     107,
-				Name:        "float32_field",
-				Description: "field 107",
-				DataType:    schemapb.DataType_FLOAT,
-				TypeParams:  []*commonpb.KeyValuePair{},
-				IndexParams: []*commonpb.KeyValuePair{},
-			},
-			{
-				FieldID:     108,
-				Name:        "float64_field",
-				Description: "field 108",
-				DataType:    schemapb.DataType_DOUBLE,
-				TypeParams:  []*commonpb.KeyValuePair{},
-				IndexParams: []*commonpb.KeyValuePair{},
-			},
-		},
-	}
-
-	collection := etcdpb.CollectionMeta{
-		ID:            collectionID,
-		Schema:        &sch,
-		CreateTime:    Timestamp(1),
-		SegmentIDs:    make([]UniqueID, 0),
-		PartitionTags: make([]string, 0),
-	}
-	return &collection
-}
-
-func NewAllocatorFactory(id ...UniqueID) *AllocatorFactory {
-	f := &AllocatorFactory{}
-	if len(id) == 1 {
-		f.ID = id[0]
-	}
-	return f
-}
-
-func (alloc AllocatorFactory) setID(id UniqueID) {
-	alloc.ID = id
-}
-
-func (alloc AllocatorFactory) allocID() (UniqueID, error) {
-	if alloc.ID == 0 {
-		return UniqueID(0), nil // GOOSE TODO: random ID generating
-	}
-	return alloc.ID, nil
-}
-
-func (m *MasterServiceFactory) setID(id UniqueID) {
-	m.ID = id // GOOSE TODO: random ID generator
-}
-
-func (m *MasterServiceFactory) setCollectionID(id UniqueID) {
-	m.collectionID = id
-}
-
-func (m *MasterServiceFactory) setCollectionName(name string) {
-	m.collectionName = name
-}
-
-func (m *MasterServiceFactory) AllocID(in *masterpb.IDRequest) (*masterpb.IDResponse, error) {
-	resp := &masterpb.IDResponse{
-		Status: &commonpb.Status{},
-		ID:     m.ID,
-	}
-	return resp, nil
-}
-
-func (m *MasterServiceFactory) ShowCollections(in *milvuspb.ShowCollectionRequest) (*milvuspb.ShowCollectionResponse, error) {
-	resp := &milvuspb.ShowCollectionResponse{
-		Status:          &commonpb.Status{},
-		CollectionNames: []string{m.collectionName},
-	}
-	return resp, nil
-
-}
-func (m *MasterServiceFactory) DescribeCollection(in *milvuspb.DescribeCollectionRequest) (*milvuspb.DescribeCollectionResponse, error) {
-	f := MetaFactory{}
-	meta := f.CollectionMetaFactory(m.collectionID, m.collectionName)
-	resp := &milvuspb.DescribeCollectionResponse{
-		Status:       &commonpb.Status{},
-		CollectionID: m.collectionID,
-		Schema:       meta.Schema,
-	}
-	return resp, nil
-}
-
-func (m *MasterServiceFactory) GetComponentStates() (*internalpb2.ComponentStates, error) {
-	return &internalpb2.ComponentStates{
-		State:              &internalpb2.ComponentInfo{},
-		SubcomponentStates: make([]*internalpb2.ComponentInfo, 0),
-		Status: &commonpb.Status{
-			ErrorCode: commonpb.ErrorCode_SUCCESS,
-		},
-	}, nil
-}
diff --git a/internal/datanode/flow_graph_dd_node.go b/internal/datanode/flow_graph_dd_node.go
index 00e80f28b..30b97ab13 100644
--- a/internal/datanode/flow_graph_dd_node.go
+++ b/internal/datanode/flow_graph_dd_node.go
@@ -132,7 +132,6 @@ func (ddNode *ddNode) Operate(in []*Msg) []*Msg {
 		}
 
 	default:
-		//log.Println(". default: do nothing ...")
 	}
 
 	// generate binlog
diff --git a/internal/datanode/flow_graph_insert_buffer_node_test.go b/internal/datanode/flow_graph_insert_buffer_node_test.go
index 28295a2df..12c776b98 100644
--- a/internal/datanode/flow_graph_insert_buffer_node_test.go
+++ b/internal/datanode/flow_graph_insert_buffer_node_test.go
@@ -1,10 +1,7 @@
 package datanode
 
 import (
-	"bytes"
 	"context"
-	"encoding/binary"
-	"log"
 	"math"
 	"testing"
 	"time"
@@ -12,8 +9,6 @@ import (
 	"github.com/stretchr/testify/require"
 
 	"github.com/zilliztech/milvus-distributed/internal/msgstream"
-	"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
-	"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
 	"github.com/zilliztech/milvus-distributed/internal/util/flowgraph"
 )
 
@@ -43,7 +38,6 @@ func TestFlowGraphInsertBufferNode_Operate(t *testing.T) {
 	err = replica.addCollection(collMeta.ID, collMeta.Schema)
 	require.NoError(t, err)
 
-	// Params.FlushInsertBufSize = 2
 	idFactory := AllocatorFactory{}
 	iBNode := newInsertBufferNode(ctx, newMetaTable(), replica, idFactory)
 	inMsg := genInsertMsg()
@@ -52,82 +46,6 @@ func TestFlowGraphInsertBufferNode_Operate(t *testing.T) {
 }
 
 func genInsertMsg() insertMsg {
-	// test data generate
-	const DIM = 2
-	const N = 1
-	var rawData []byte
-
-	// Float vector
-	var fvector = [DIM]float32{1, 2}
-	for _, ele := range fvector {
-		buf := make([]byte, 4)
-		binary.LittleEndian.PutUint32(buf, math.Float32bits(ele))
-		rawData = append(rawData, buf...)
-	}
-
-	// Binary vector
-	// Dimension of binary vector is 32
-	// size := 4,  = 32 / 8
-	var bvector = []byte{255, 255, 255, 0}
-	rawData = append(rawData, bvector...)
-
-	// Bool
-	var fieldBool = true
-	buf := new(bytes.Buffer)
-	if err := binary.Write(buf, binary.LittleEndian, fieldBool); err != nil {
-		panic(err)
-	}
-
-	rawData = append(rawData, buf.Bytes()...)
-
-	// int8
-	var dataInt8 int8 = 100
-	bint8 := new(bytes.Buffer)
-	if err := binary.Write(bint8, binary.LittleEndian, dataInt8); err != nil {
-		panic(err)
-	}
-	rawData = append(rawData, bint8.Bytes()...)
-
-	// int16
-	var dataInt16 int16 = 200
-	bint16 := new(bytes.Buffer)
-	if err := binary.Write(bint16, binary.LittleEndian, dataInt16); err != nil {
-		panic(err)
-	}
-	rawData = append(rawData, bint16.Bytes()...)
-
-	// int32
-	var dataInt32 int32 = 300
-	bint32 := new(bytes.Buffer)
-	if err := binary.Write(bint32, binary.LittleEndian, dataInt32); err != nil {
-		panic(err)
-	}
-	rawData = append(rawData, bint32.Bytes()...)
-
-	// int64
-	var dataInt64 int64 = 400
-	bint64 := new(bytes.Buffer)
-	if err := binary.Write(bint64, binary.LittleEndian, dataInt64); err != nil {
-		panic(err)
-	}
-	rawData = append(rawData, bint64.Bytes()...)
-
-	// float32
-	var datafloat float32 = 1.1
-	bfloat32 := new(bytes.Buffer)
-	if err := binary.Write(bfloat32, binary.LittleEndian, datafloat); err != nil {
-		panic(err)
-	}
-	rawData = append(rawData, bfloat32.Bytes()...)
-
-	// float64
-	var datafloat64 float64 = 2.2
-	bfloat64 := new(bytes.Buffer)
-	if err := binary.Write(bfloat64, binary.LittleEndian, datafloat64); err != nil {
-		panic(err)
-	}
-	rawData = append(rawData, bfloat64.Bytes()...)
-	log.Println("Test rawdata length:", len(rawData))
 
 	timeRange := TimeRange{
 		timestampMin: 0,
@@ -143,55 +61,8 @@ func genInsertMsg() insertMsg {
 		},
 	}
 
-	// messages generate
-	const MSGLENGTH = 1
-	// insertMessages := make([]msgstream.TsMsg, 0)
-	for i := 0; i < MSGLENGTH; i++ {
-		var msg = &msgstream.InsertMsg{
-			BaseMsg: msgstream.BaseMsg{
-				HashValues: []uint32{
-					uint32(i),
-				},
-			},
-			InsertRequest: internalpb2.InsertRequest{
-				Base: &commonpb.MsgBase{
-					MsgType:   commonpb.MsgType_kInsert,
-					MsgID:     0,
-					Timestamp: Timestamp(i + 1000),
-					SourceID:  0,
-				},
-				CollectionName: "col1",
-				PartitionName:  "default",
-				CollectionID:   0,
-				PartitionID:    1,
-				SegmentID:      UniqueID(1),
-				ChannelID:      "0",
-				Timestamps: []Timestamp{
-					Timestamp(i + 1000),
-					Timestamp(i + 1000),
-					Timestamp(i + 1000),
-					Timestamp(i + 1000),
-					Timestamp(i + 1000),
-				},
-				RowIDs: []UniqueID{
-					UniqueID(i),
-					UniqueID(i),
-					UniqueID(i),
-					UniqueID(i),
-					UniqueID(i),
-				},
-
-				RowData: []*commonpb.Blob{
-					{Value: rawData},
-					{Value: rawData},
-					{Value: rawData},
-					{Value: rawData},
-					{Value: rawData},
-				},
-			},
-		}
-		iMsg.insertMessages = append(iMsg.insertMessages, msg)
-	}
+	dataFactory := NewDataFactory()
+	iMsg.insertMessages = append(iMsg.insertMessages, dataFactory.GetMsgStreamInsertMsgs(2)...)
 
 	fmsg := &flushMsg{
 		msgID:        1,
diff --git a/internal/datanode/flow_graph_msg_stream_input_node.go b/internal/datanode/flow_graph_msg_stream_input_node.go
index 163ac7fc6..57a2ff164 100644
--- a/internal/datanode/flow_graph_msg_stream_input_node.go
+++ b/internal/datanode/flow_graph_msg_stream_input_node.go
@@ -10,42 +10,35 @@ import (
 )
 
 func newDmInputNode(ctx context.Context) *flowgraph.InputNode {
-	msgStreamURL := Params.PulsarAddress
 
+	maxQueueLength := Params.FlowGraphMaxQueueLength
+	maxParallelism := Params.FlowGraphMaxParallelism
 	consumeChannels := Params.InsertChannelNames
 	consumeSubName := Params.MsgChannelSubName
+	unmarshalDispatcher := util.NewUnmarshalDispatcher()
 
 	insertStream := pulsarms.NewPulsarTtMsgStream(ctx, 1024)
-
-	insertStream.SetPulsarClient(msgStreamURL)
-	unmarshalDispatcher := util.NewUnmarshalDispatcher()
+	insertStream.SetPulsarClient(Params.PulsarAddress)
 
 	insertStream.CreatePulsarConsumers(consumeChannels, consumeSubName, unmarshalDispatcher, 1024)
 
 	var stream msgstream.MsgStream = insertStream
-
-	maxQueueLength := Params.FlowGraphMaxQueueLength
-	maxParallelism := Params.FlowGraphMaxParallelism
-
 	node := flowgraph.NewInputNode(&stream, "dmInputNode", maxQueueLength, maxParallelism)
 	return node
 }
 
 func newDDInputNode(ctx context.Context) *flowgraph.InputNode {
 
-	consumeChannels := Params.DDChannelNames
+	maxQueueLength := Params.FlowGraphMaxQueueLength
+	maxParallelism := Params.FlowGraphMaxParallelism
 	consumeSubName := Params.MsgChannelSubName
-
-	ddStream := pulsarms.NewPulsarTtMsgStream(ctx, 1024)
-	ddStream.SetPulsarClient(Params.PulsarAddress)
 	unmarshalDispatcher := util.NewUnmarshalDispatcher()
-	ddStream.CreatePulsarConsumers(consumeChannels, consumeSubName, unmarshalDispatcher, 1024)
 
-	var stream msgstream.MsgStream = ddStream
-
-	maxQueueLength := Params.FlowGraphMaxQueueLength
-	maxParallelism := Params.FlowGraphMaxParallelism
+	tmpStream := pulsarms.NewPulsarTtMsgStream(ctx, 1024)
+	tmpStream.SetPulsarClient(Params.PulsarAddress)
+	tmpStream.CreatePulsarConsumers(Params.DDChannelNames, consumeSubName, unmarshalDispatcher, 1024)
 
+	var stream msgstream.MsgStream = tmpStream
 	node := flowgraph.NewInputNode(&stream, "ddInputNode", maxQueueLength, maxParallelism)
 	return node
 }
diff --git a/internal/masterservice/master_service_test.go b/internal/masterservice/master_service_test.go
index b133267f9..46b9a4721 100644
--- a/internal/masterservice/master_service_test.go
+++ b/internal/masterservice/master_service_test.go
@@ -657,10 +657,18 @@ func TestMasterService(t *testing.T) {
 		rsp, err := core.DescribeIndex(req)
 		assert.Nil(t, err)
 		assert.Equal(t, rsp.Status.ErrorCode, commonpb.ErrorCode_SUCCESS)
+
 		assert.Equal(t, len(rsp.IndexDescriptions), 3)
-		assert.Equal(t, rsp.IndexDescriptions[0].IndexName, Params.DefaultIndexName)
-		assert.Equal(t, rsp.IndexDescriptions[1].IndexName, "index_field_100_0")
-		assert.Equal(t, rsp.IndexDescriptions[2].IndexName, "index_field_100_1")
+		indexNames := make([]string, 0)
+		for _, d := range rsp.IndexDescriptions {
+			indexNames = append(indexNames, d.IndexName)
+		}
+
+		assert.ElementsMatch(t, indexNames, []string{
+			"index_field_100_0",
+			"index_field_100_1",
+			Params.DefaultIndexName,
+		})
 	})
 
 	t.Run("drop partition", func(t *testing.T) {
-- 
GitLab