From e96d39bf6a101f33e4b440f3d56e1bf02028123b Mon Sep 17 00:00:00 2001
From: XuanYang-cn <xuan.yang@zilliz.com>
Date: Wed, 2 Jun 2021 15:58:33 +0800
Subject: [PATCH] refactor datanode flowgraph (#5538)

Signed-off-by: yangxuan <xuan.yang@zilliz.com>
---
 go.mod                                        |   2 +-
 internal/datanode/data_node.go                |  26 --
 internal/datanode/data_sync_service.go        |  54 +--
 internal/datanode/flow_graph_dd_node.go       | 420 ++----------------
 internal/datanode/flow_graph_dd_node_test.go  | 160 +------
 ...e.go => flow_graph_dmstream_input_node.go} |  19 +-
 .../datanode/flow_graph_filter_dm_node.go     | 172 -------
 internal/datanode/flow_graph_gc_node.go       |  71 ---
 .../datanode/flow_graph_insert_buffer_node.go |  48 +-
 .../flow_graph_insert_buffer_node_test.go     |  26 +-
 internal/datanode/flow_graph_message.go       |  51 ---
 11 files changed, 72 insertions(+), 977 deletions(-)
 rename internal/datanode/{flow_graph_msg_stream_input_node.go => flow_graph_dmstream_input_node.go} (68%)
 delete mode 100644 internal/datanode/flow_graph_filter_dm_node.go
 delete mode 100644 internal/datanode/flow_graph_gc_node.go

diff --git a/go.mod b/go.mod
index ed9203187..90b762013 100644
--- a/go.mod
+++ b/go.mod
@@ -18,7 +18,7 @@ require (
 	github.com/golang/mock v1.3.1
 	github.com/golang/protobuf v1.4.3
 	github.com/google/btree v1.0.0
-	github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 // indirect
+	github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
 	github.com/jarcoal/httpmock v1.0.8
 	github.com/klauspost/compress v1.10.11 // indirect
 	github.com/minio/minio-go/v7 v7.0.10
diff --git a/internal/datanode/data_node.go b/internal/datanode/data_node.go
index 0aae869d9..2954d428d 100644
--- a/internal/datanode/data_node.go
+++ b/internal/datanode/data_node.go
@@ -341,7 +341,6 @@ func (node *DataNode) FlushSegments(ctx context.Context, req *datapb.FlushSegmen
 			return status, nil
 		}
 
-		ddlFlushedCh := make(chan []*datapb.DDLBinlogMeta)
 		dmlFlushedCh := make(chan []*datapb.ID2PathList)
 
 		flushmsg := &flushMsg{
@@ -349,7 +348,6 @@ func (node *DataNode) FlushSegments(ctx context.Context, req *datapb.FlushSegmen
 			timestamp:    req.Base.Timestamp,
 			segmentID:    id,
 			collectionID: req.CollectionID,
-			ddlFlushedCh: ddlFlushedCh,
 			dmlFlushedCh: dmlFlushedCh,
 		}
 
@@ -372,28 +370,6 @@ func (node *DataNode) FlushSegments(ctx context.Context, req *datapb.FlushSegmen
 					req.Field2BinlogPaths = meta
 					log.Info("Insert messeges flush done!", zap.Any("Binlog paths", meta))
 				}
-
-			case chan []*datapb.DDLBinlogMeta:
-				select {
-				case <-time.After(300 * time.Second):
-					return
-				case meta := <-Ch:
-					if meta == nil {
-						log.Info("Ddl messages flush failed!")
-						// Modify req to confirm failure
-						return
-					}
-
-					if len(meta) == 0 {
-						log.Info("Ddl messages flush Done")
-						// Modify req with empty ddl binlog paths and position
-						return
-					}
-
-					// Modify req with valid ddl binlog paths
-					req.DdlBinlogPaths = meta
-					log.Info("Ddl messages flush done!", zap.Any("Binlog paths", meta))
-				}
 			default:
 				log.Error("Not supported type")
 			}
@@ -414,8 +390,6 @@ func (node *DataNode) FlushSegments(ctx context.Context, req *datapb.FlushSegmen
 
 			var wg sync.WaitGroup
 			wg.Add(1)
-			go waitReceive(&wg, ddlFlushedCh, req)
-			wg.Add(1)
 			go waitReceive(&wg, dmlFlushedCh, req)
 			wg.Wait()
 
diff --git a/internal/datanode/data_sync_service.go b/internal/datanode/data_sync_service.go
index 5feea2843..7d9d900ef 100644
--- a/internal/datanode/data_sync_service.go
+++ b/internal/datanode/data_sync_service.go
@@ -84,56 +84,27 @@ func (dsService *dataSyncService) initNodes(vchanPair *datapb.VchannelPair) {
 	}
 
 	var dmStreamNode Node = newDmInputNode(dsService.ctx, dsService.msFactory, vchanPair.GetDmlVchannelName(), vchanPair.GetDmlPosition())
-	var ddStreamNode Node = newDDInputNode(dsService.ctx, dsService.msFactory, vchanPair.GetDdlVchannelName(), vchanPair.GetDdlPosition())
-
-	var filterDmNode Node = newFilteredDmNode()
-	var ddNode Node = newDDNode(dsService.ctx, dsService.flushChan, dsService.replica, dsService.idAllocator, vchanPair.CollectionID)
-	var insertBufferNode Node = newInsertBufferNode(dsService.ctx, dsService.replica, dsService.msFactory, dsService.idAllocator)
-	var gcNode Node = newGCNode(dsService.replica)
+	var ddNode Node = newDDNode()
+	var insertBufferNode Node = newInsertBufferNode(dsService.ctx, dsService.replica, dsService.msFactory, dsService.idAllocator, dsService.flushChan)
 
 	dsService.fg.AddNode(dmStreamNode)
-	dsService.fg.AddNode(ddStreamNode)
-
-	dsService.fg.AddNode(filterDmNode)
 	dsService.fg.AddNode(ddNode)
-
 	dsService.fg.AddNode(insertBufferNode)
-	dsService.fg.AddNode(gcNode)
-
-	// dmStreamNode
-	err = dsService.fg.SetEdges(dmStreamNode.Name(),
-		[]string{},
-		[]string{filterDmNode.Name()},
-	)
-	if err != nil {
-		log.Error("set edges failed in node", zap.String("name", dmStreamNode.Name()), zap.Error(err))
-		panic("set edges faild in the node")
-	}
 
 	// ddStreamNode
-	err = dsService.fg.SetEdges(ddStreamNode.Name(),
+	err = dsService.fg.SetEdges(dmStreamNode.Name(),
 		[]string{},
 		[]string{ddNode.Name()},
 	)
 	if err != nil {
-		log.Error("set edges failed in node", zap.String("name", ddStreamNode.Name()), zap.Error(err))
-		panic("set edges faild in the node")
-	}
-
-	// filterDmNode
-	err = dsService.fg.SetEdges(filterDmNode.Name(),
-		[]string{dmStreamNode.Name(), ddNode.Name()},
-		[]string{insertBufferNode.Name()},
-	)
-	if err != nil {
-		log.Error("set edges failed in node", zap.String("name", filterDmNode.Name()), zap.Error(err))
+		log.Error("set edges failed in node", zap.String("name", dmStreamNode.Name()), zap.Error(err))
 		panic("set edges faild in the node")
 	}
 
 	// ddNode
 	err = dsService.fg.SetEdges(ddNode.Name(),
-		[]string{ddStreamNode.Name()},
-		[]string{filterDmNode.Name()},
+		[]string{dmStreamNode.Name()},
+		[]string{insertBufferNode.Name()},
 	)
 	if err != nil {
 		log.Error("set edges failed in node", zap.String("name", ddNode.Name()), zap.Error(err))
@@ -142,20 +113,11 @@ func (dsService *dataSyncService) initNodes(vchanPair *datapb.VchannelPair) {
 
 	// insertBufferNode
 	err = dsService.fg.SetEdges(insertBufferNode.Name(),
-		[]string{filterDmNode.Name()},
-		[]string{gcNode.Name()},
+		[]string{ddNode.Name()},
+		[]string{},
 	)
 	if err != nil {
 		log.Error("set edges failed in node", zap.String("name", insertBufferNode.Name()), zap.Error(err))
 		panic("set edges faild in the node")
 	}
-
-	// gcNode
-	err = dsService.fg.SetEdges(gcNode.Name(),
-		[]string{insertBufferNode.Name()},
-		[]string{})
-	if err != nil {
-		log.Error("set edges failed in node", zap.String("name", gcNode.Name()), zap.Error(err))
-		panic("set edges faild in the node")
-	}
 }
diff --git a/internal/datanode/flow_graph_dd_node.go b/internal/datanode/flow_graph_dd_node.go
index 2f3ae26d3..b0030280d 100644
--- a/internal/datanode/flow_graph_dd_node.go
+++ b/internal/datanode/flow_graph_dd_node.go
@@ -12,88 +12,24 @@
 package datanode
 
 import (
-	"context"
-	"fmt"
-	"path"
-	"sort"
-	"strconv"
-	"sync"
-
-	"github.com/golang/protobuf/proto"
 	"go.uber.org/zap"
 
-	"github.com/milvus-io/milvus/internal/kv"
-	miniokv "github.com/milvus-io/milvus/internal/kv/minio"
 	"github.com/milvus-io/milvus/internal/log"
 	"github.com/milvus-io/milvus/internal/msgstream"
-	"github.com/milvus-io/milvus/internal/proto/datapb"
-	"github.com/milvus-io/milvus/internal/proto/schemapb"
-	"github.com/milvus-io/milvus/internal/storage"
+	"github.com/milvus-io/milvus/internal/proto/commonpb"
+	"github.com/milvus-io/milvus/internal/proto/internalpb"
 	"github.com/milvus-io/milvus/internal/util/flowgraph"
-	"github.com/milvus-io/milvus/internal/util/trace"
-	"github.com/opentracing/opentracing-go"
 )
 
 type ddNode struct {
 	BaseNode
-	ddMsg       *ddMsg
-	ddRecords   *ddRecords
-	ddBuffer    *ddBuffer
-	flushMap    *sync.Map
-	inFlushCh   <-chan *flushMsg
-	idAllocator allocatorInterface
-
-	kv      kv.BaseKV
-	replica Replica
-
-	collectionID UniqueID
-}
-
-type ddData struct {
-	ddRequestString []string
-	timestamps      []Timestamp
-	eventTypes      []storage.EventTypeCode
-}
-
-type ddBuffer struct {
-	// ddData map[UniqueID]*ddData // collection ID
-	ddData ddData
-	sync.Mutex
 }
 
-type ddRecords struct {
-	collectionRecords map[UniqueID]interface{}
-	partitionRecords  map[UniqueID]interface{}
-}
-
-func (d *ddBuffer) getData() ddData {
-	d.Lock()
-	defer d.Unlock()
-	r := ddData{
-		ddRequestString: d.ddData.ddRequestString,
-		timestamps:      d.ddData.timestamps,
-		eventTypes:      d.ddData.eventTypes,
-	}
-	d.ddData.ddRequestString = make([]string, 0, 10)
-	d.ddData.timestamps = make([]Timestamp, 0, 10)
-	d.ddData.eventTypes = make([]storage.EventTypeCode, 0, 10)
-	return r
-}
-
-func (d *ddBuffer) append(request string, timestamp Timestamp, eventType storage.EventTypeCode) {
-	d.Lock()
-	defer d.Unlock()
-	d.ddData.ddRequestString = append(d.ddData.ddRequestString, request)
-	d.ddData.timestamps = append(d.ddData.timestamps, timestamp)
-	d.ddData.eventTypes = append(d.ddData.eventTypes, eventType)
-}
-
-func (ddNode *ddNode) Name() string {
+func (ddn *ddNode) Name() string {
 	return "ddNode"
 }
 
-func (ddNode *ddNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
-
+func (ddn *ddNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
 	if len(in) != 1 {
 		log.Error("Invalid operate message input in ddNode", zap.Int("input length", len(in)))
 		// TODO: add error handling
@@ -112,352 +48,48 @@ func (ddNode *ddNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
 	if msMsg == nil {
 		return []Msg{}
 	}
-	var spans []opentracing.Span
-	for _, msg := range msMsg.TsMessages() {
-		sp, ctx := trace.StartSpanFromContext(msg.TraceCtx())
-		spans = append(spans, sp)
-		msg.SetTraceCtx(ctx)
-	}
 
-	ddNode.ddMsg = &ddMsg{
-		collectionRecords: make(map[UniqueID][]*metaOperateRecord),
-		partitionRecords:  make(map[UniqueID][]*metaOperateRecord),
+	var iMsg = insertMsg{
+		insertMessages: make([]*msgstream.InsertMsg, 0),
 		timeRange: TimeRange{
 			timestampMin: msMsg.TimestampMin(),
 			timestampMax: msMsg.TimestampMax(),
 		},
-		gcRecord: &gcRecord{
-			collections: make([]UniqueID, 0),
-		},
+		startPositions: make([]*internalpb.MsgPosition, 0),
+		endPositions:   make([]*internalpb.MsgPosition, 0),
 	}
 
-	// sort tsMessages
-	tsMessages := msMsg.TsMessages()
-	sort.Slice(tsMessages,
-		func(i, j int) bool {
-			return tsMessages[i].BeginTs() < tsMessages[j].BeginTs()
-		})
-
-	// do dd tasks
-	for _, msg := range tsMessages {
-		switch msg := msg.(type) {
-		case *msgstream.CreateCollectionMsg:
-			if msg.CollectionID != ddNode.collectionID {
-				continue
-			}
-			ddNode.createCollection(msg)
-		case *msgstream.DropCollectionMsg:
-			if msg.CollectionID != ddNode.collectionID {
-				continue
-			}
-			ddNode.dropCollection(msg)
-		case *msgstream.CreatePartitionMsg:
-			if msg.CollectionID != ddNode.collectionID {
-				continue
-			}
-			ddNode.createPartition(msg)
-		case *msgstream.DropPartitionMsg:
-			if msg.CollectionID != ddNode.collectionID {
-				continue
+	for _, msg := range msMsg.TsMessages() {
+		switch msg.Type() {
+		case commonpb.MsgType_DropCollection:
+			// TODO distroy dataSyncService and nodify datanode
+			log.Error("Distorying current flowgraph")
+		case commonpb.MsgType_Insert:
+			resMsg := ddn.filterFlushedSegmentInsertMessages(msg.(*msgstream.InsertMsg))
+			if resMsg != nil {
+				iMsg.insertMessages = append(iMsg.insertMessages, resMsg)
 			}
-			ddNode.dropPartition(msg)
-		default:
-			log.Error("Not supporting message type", zap.Any("Type", msg.Type()))
 		}
 	}
 
-	select {
-	case fmsg := <-ddNode.inFlushCh:
-		log.Debug(". receive flush message ...")
-		segID := fmsg.segmentID
-		if !ddNode.replica.hasSegment(segID) {
-			log.Debug(".. Segment not exist in this datanode, skip flushing ...")
-			break
-		}
+	iMsg.startPositions = append(iMsg.startPositions, msMsg.StartPositions()...)
+	iMsg.endPositions = append(iMsg.endPositions, msMsg.EndPositions()...)
 
-		seg, _ := ddNode.replica.getSegmentByID(segID)
-		collID := seg.collectionID
+	var res Msg = &iMsg
 
-		buf := ddNode.ddBuffer.getData()
-		if len(buf.ddRequestString) == 0 {
-			fmsg.ddlFlushedCh <- []*datapb.DDLBinlogMeta{}
-		} else {
-			ddNode.flushMap.Store(collID, &buf)
-			log.Debug(".. ddl buffer not empty, flushing ...")
-			binlogMetaCh := make(chan *datapb.DDLBinlogMeta)
-			go flush(collID, ddNode.flushMap, ddNode.kv, ddNode.idAllocator, binlogMetaCh)
-			go ddNode.flushComplete(binlogMetaCh, collID, fmsg.ddlFlushedCh)
-		}
-
-		log.Debug(".. notifying insertbuffer ...")
-		ddNode.ddMsg.flushMessage = fmsg
-
-	default:
-	}
-
-	for _, span := range spans {
-		span.Finish()
-	}
-
-	var res Msg = ddNode.ddMsg
 	return []Msg{res}
 }
 
-func (ddNode *ddNode) flushComplete(binlogMetaCh <-chan *datapb.DDLBinlogMeta, collID UniqueID, ddlFlushedCh chan<- []*datapb.DDLBinlogMeta) {
-	binlogMeta := <-binlogMetaCh
-	if binlogMeta == nil {
-		ddlFlushedCh <- nil
-		return
-	}
-
-	ddlFlushedCh <- []*datapb.DDLBinlogMeta{binlogMeta}
+func (ddn *ddNode) filterFlushedSegmentInsertMessages(msg *msgstream.InsertMsg) *msgstream.InsertMsg {
+	// TODO fileter insert messages of flushed segments
+	return msg
 }
 
-/*
-flush will
-    generate binlogs for all buffer data in ddNode,
-    store the generated binlogs to minIO/S3,
-
-The keys of the binlogs are generated as below:
-    ${tenant}/data_definition_log/${collection_id}/ts/${log_idx}
-    ${tenant}/data_definition_log/${collection_id}/ddl/${log_idx}
-*/
-func flush(collID UniqueID, ddlData *sync.Map, kv kv.BaseKV, idAllocator allocatorInterface,
-	binlogMetaCh chan<- *datapb.DDLBinlogMeta) {
-	clearFn := func(isSuccess bool) {
-		if !isSuccess {
-			binlogMetaCh <- nil
-		}
-	}
-
-	ddCodec := &storage.DataDefinitionCodec{}
-	d, ok := ddlData.LoadAndDelete(collID)
-	if !ok {
-		log.Error("Flush failed ... cannot load ddlData ..")
-		clearFn(false)
-		return
-	}
-
-	data := d.(*ddData)
-
-	log.Debug(".. ddl flushing ...",
-		zap.Int64("collectionID", collID),
-		zap.Int("length", len(data.ddRequestString)))
-
-	binLogs, err := ddCodec.Serialize(data.timestamps, data.ddRequestString, data.eventTypes)
-	if err != nil || len(binLogs) != 2 {
-		log.Error("Codec Serialize wrong", zap.Error(err))
-		clearFn(false)
-		return
-	}
-
-	if len(data.ddRequestString) != len(data.timestamps) ||
-		len(data.timestamps) != len(data.eventTypes) {
-		log.Error("illegal ddBuffer, failed to save binlog")
-		clearFn(false)
-		return
-	}
-
-	kvs := make(map[string]string, 2)
-	tsIdx, err := idAllocator.genKey(true)
-	if err != nil {
-		log.Error("Id allocate wrong", zap.Error(err))
-		clearFn(false)
-		return
-	}
-	tsKey := path.Join(Params.DdlBinlogRootPath, strconv.FormatInt(collID, 10), binLogs[0].GetKey(), tsIdx)
-	kvs[tsKey] = string(binLogs[0].GetValue())
-
-	ddlIdx, err := idAllocator.genKey(true)
-	if err != nil {
-		log.Error("Id allocate wrong", zap.Error(err))
-		clearFn(false)
-		return
-	}
-	ddlKey := path.Join(Params.DdlBinlogRootPath, strconv.FormatInt(collID, 10), binLogs[1].GetKey(), ddlIdx)
-	kvs[ddlKey] = string(binLogs[1].GetValue())
-
-	// save ddl/ts binlog to minIO/s3
-	log.Debug(".. Saving ddl binlog to minIO/s3 ...")
-	err = kv.MultiSave(kvs)
-	if err != nil {
-		log.Error("Save to minIO/S3 Wrong", zap.Error(err))
-		_ = kv.MultiRemove([]string{tsKey, ddlKey})
-		clearFn(false)
-		return
-	}
-
-	log.Debug(".. Clearing ddl flush buffer ...")
-	clearFn(true)
-	binlogMetaCh <- &datapb.DDLBinlogMeta{
-		DdlBinlogPath: ddlKey,
-		TsBinlogPath:  tsKey,
-	}
-
-	log.Debug(".. DDL flushing completed ...")
-}
-
-func (ddNode *ddNode) createCollection(msg *msgstream.CreateCollectionMsg) {
-	sp, ctx := trace.StartSpanFromContext(msg.TraceCtx())
-	msg.SetTraceCtx(ctx)
-	defer sp.Finish()
-
-	collectionID := msg.CollectionID
-
-	// add collection
-	if _, ok := ddNode.ddRecords.collectionRecords[collectionID]; ok {
-		err := fmt.Errorf("collection %d is already exists", collectionID)
-		log.Error("String conversion wrong", zap.Error(err))
-		return
-	}
-	ddNode.ddRecords.collectionRecords[collectionID] = nil
-
-	// TODO: add default partition?
-
-	var schema schemapb.CollectionSchema
-	err := proto.Unmarshal(msg.Schema, &schema)
-	if err != nil {
-		log.Error("proto unmarshal wrong", zap.Error(err))
-		return
-	}
-
-	// add collection
-	err = ddNode.replica.addCollection(collectionID, &schema)
-	if err != nil {
-		log.Error("replica add collection wrong", zap.Error(err))
-		return
-	}
-
-	ddNode.ddMsg.collectionRecords[collectionID] = append(ddNode.ddMsg.collectionRecords[collectionID],
-		&metaOperateRecord{
-			createOrDrop: true,
-			timestamp:    msg.Base.Timestamp,
-		})
-
-	ddNode.ddBuffer.append(msg.CreateCollectionRequest.String(), msg.Base.Timestamp, storage.CreateCollectionEventType)
-}
-
-/*
-dropCollection will drop collection in ddRecords but won't drop collection in replica
-*/
-func (ddNode *ddNode) dropCollection(msg *msgstream.DropCollectionMsg) {
-	sp, ctx := trace.StartSpanFromContext(msg.TraceCtx())
-	msg.SetTraceCtx(ctx)
-	defer sp.Finish()
-
-	collectionID := msg.CollectionID
-
-	// remove collection
-	if _, ok := ddNode.ddRecords.collectionRecords[collectionID]; !ok {
-		log.Error("Cannot find collection", zap.Int64("collection ID", collectionID))
-		return
-	}
-	delete(ddNode.ddRecords.collectionRecords, collectionID)
-
-	ddNode.ddMsg.collectionRecords[collectionID] = append(ddNode.ddMsg.collectionRecords[collectionID],
-		&metaOperateRecord{
-			createOrDrop: false,
-			timestamp:    msg.Base.Timestamp,
-		})
-
-	ddNode.ddBuffer.append(msg.DropCollectionRequest.String(), msg.Base.Timestamp, storage.DropCollectionEventType)
-	ddNode.ddMsg.gcRecord.collections = append(ddNode.ddMsg.gcRecord.collections, collectionID)
-}
-
-func (ddNode *ddNode) createPartition(msg *msgstream.CreatePartitionMsg) {
-	sp, ctx := trace.StartSpanFromContext(msg.TraceCtx())
-	msg.SetTraceCtx(ctx)
-	defer sp.Finish()
-
-	partitionID := msg.PartitionID
-	// collectionID := msg.CollectionID
-
-	// add partition
-	if _, ok := ddNode.ddRecords.partitionRecords[partitionID]; ok {
-		log.Error("partition is already exists", zap.Int64("partition ID", partitionID))
-		return
-	}
-	ddNode.ddRecords.partitionRecords[partitionID] = nil
-
-	ddNode.ddMsg.partitionRecords[partitionID] = append(ddNode.ddMsg.partitionRecords[partitionID],
-		&metaOperateRecord{
-			createOrDrop: true,
-			timestamp:    msg.Base.Timestamp,
-		})
-
-	ddNode.ddBuffer.append(msg.CreatePartitionRequest.String(), msg.Base.Timestamp, storage.CreatePartitionEventType)
-}
-
-func (ddNode *ddNode) dropPartition(msg *msgstream.DropPartitionMsg) {
-	sp, ctx := trace.StartSpanFromContext(msg.TraceCtx())
-	msg.SetTraceCtx(ctx)
-	defer sp.Finish()
-	partitionID := msg.PartitionID
-	// collectionID := msg.CollectionID
-
-	// remove partition
-	if _, ok := ddNode.ddRecords.partitionRecords[partitionID]; !ok {
-		log.Error("cannot found partition", zap.Int64("partition ID", partitionID))
-		return
-	}
-	delete(ddNode.ddRecords.partitionRecords, partitionID)
-
-	// partitionName := msg.PartitionName
-	// ddNode.ddMsg.partitionRecords[partitionName] = append(ddNode.ddMsg.partitionRecords[partitionName],
-	ddNode.ddMsg.partitionRecords[partitionID] = append(ddNode.ddMsg.partitionRecords[partitionID],
-		&metaOperateRecord{
-			createOrDrop: false,
-			timestamp:    msg.Base.Timestamp,
-		})
-
-	ddNode.ddBuffer.append(msg.DropPartitionRequest.String(), msg.Base.Timestamp, storage.DropPartitionEventType)
-}
-
-func newDDNode(ctx context.Context, inFlushCh <-chan *flushMsg,
-	replica Replica, idAllocator allocatorInterface, collectionID UniqueID) *ddNode {
-	maxQueueLength := Params.FlowGraphMaxQueueLength
-	maxParallelism := Params.FlowGraphMaxParallelism
-
+func newDDNode() *ddNode {
 	baseNode := BaseNode{}
-	baseNode.SetMaxQueueLength(maxQueueLength)
-	baseNode.SetMaxParallelism(maxParallelism)
-
-	ddRecords := &ddRecords{
-		collectionRecords: make(map[UniqueID]interface{}),
-		partitionRecords:  make(map[UniqueID]interface{}),
-	}
-
-	bucketName := Params.MinioBucketName
-	option := &miniokv.Option{
-		Address:           Params.MinioAddress,
-		AccessKeyID:       Params.MinioAccessKeyID,
-		SecretAccessKeyID: Params.MinioSecretAccessKey,
-		UseSSL:            Params.MinioUseSSL,
-		BucketName:        bucketName,
-		CreateBucket:      true,
-	}
-	minioKV, err := miniokv.NewMinIOKV(ctx, option)
-	if err != nil {
-		panic(err)
-	}
+	baseNode.SetMaxParallelism(Params.FlowGraphMaxQueueLength)
 
 	return &ddNode{
-		BaseNode:  baseNode,
-		ddRecords: ddRecords,
-		ddBuffer: &ddBuffer{
-			ddData: ddData{
-				ddRequestString: make([]string, 0, 10),
-				timestamps:      make([]Timestamp, 0, 10),
-				eventTypes:      make([]storage.EventTypeCode, 0, 10),
-			},
-		},
-		inFlushCh: inFlushCh,
-
-		idAllocator: idAllocator,
-		kv:          minioKV,
-		replica:     replica,
-		flushMap:    &sync.Map{},
-
-		collectionID: collectionID,
+		BaseNode: baseNode,
 	}
 }
diff --git a/internal/datanode/flow_graph_dd_node_test.go b/internal/datanode/flow_graph_dd_node_test.go
index 4d0784dae..babaf3d39 100644
--- a/internal/datanode/flow_graph_dd_node_test.go
+++ b/internal/datanode/flow_graph_dd_node_test.go
@@ -12,167 +12,13 @@
 package datanode
 
 import (
-	"context"
 	"testing"
-	"time"
-
-	"github.com/stretchr/testify/assert"
-	"go.uber.org/zap"
-
-	"github.com/milvus-io/milvus/internal/log"
-	"github.com/milvus-io/milvus/internal/msgstream"
-	"github.com/milvus-io/milvus/internal/proto/commonpb"
-	"github.com/milvus-io/milvus/internal/proto/datapb"
-	"github.com/milvus-io/milvus/internal/proto/internalpb"
-	"github.com/milvus-io/milvus/internal/util/flowgraph"
 )
 
 func TestFlowGraphDDNode_Operate(t *testing.T) {
-	const ctxTimeInMillisecond = 2000
-	const closeWithDeadline = true
-	var ctx context.Context
-
-	if closeWithDeadline {
-		var cancel context.CancelFunc
-		d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond)
-		ctx, cancel = context.WithDeadline(context.Background(), d)
-		defer cancel()
-	} else {
-		ctx = context.Background()
-	}
-
-	inFlushCh := make(chan *flushMsg, 10)
-	defer close(inFlushCh)
-
-	replica := newReplica()
-	collID := UniqueID(0)
-	ddNode := newDDNode(ctx, inFlushCh, replica, NewAllocatorFactory(), collID)
-
-	collName := "col-test-0"
-	// create collection
-	createCollReq := internalpb.CreateCollectionRequest{
-		Base: &commonpb.MsgBase{
-			MsgType:   commonpb.MsgType_CreateCollection,
-			MsgID:     1,
-			Timestamp: 1,
-			SourceID:  1,
-		},
-		CollectionID:   collID,
-		Schema:         make([]byte, 0),
-		CollectionName: collName,
-		DbName:         "DbName",
-		DbID:           UniqueID(0),
-	}
-	createCollMsg := msgstream.CreateCollectionMsg{
-		BaseMsg: msgstream.BaseMsg{
-			BeginTimestamp: Timestamp(1),
-			EndTimestamp:   Timestamp(1),
-			HashValues:     []uint32{uint32(0)},
-		},
-		CreateCollectionRequest: createCollReq,
-	}
-
-	// drop collection
-	dropCollReq := internalpb.DropCollectionRequest{
-		Base: &commonpb.MsgBase{
-			MsgType:   commonpb.MsgType_DropCollection,
-			MsgID:     2,
-			Timestamp: 2,
-			SourceID:  2,
-		},
-		CollectionID:   collID,
-		CollectionName: collName,
-		DbName:         "DbName",
-		DbID:           UniqueID(0),
-	}
-	dropCollMsg := msgstream.DropCollectionMsg{
-		BaseMsg: msgstream.BaseMsg{
-			BeginTimestamp: Timestamp(2),
-			EndTimestamp:   Timestamp(2),
-			HashValues:     []uint32{uint32(0)},
-		},
-		DropCollectionRequest: dropCollReq,
-	}
-
-	partitionID := UniqueID(100)
-	partitionName := "partition-test-0"
-	// create partition
-	createPartitionReq := internalpb.CreatePartitionRequest{
-		Base: &commonpb.MsgBase{
-			MsgType:   commonpb.MsgType_CreatePartition,
-			MsgID:     3,
-			Timestamp: 3,
-			SourceID:  3,
-		},
-		CollectionID:   collID,
-		PartitionID:    partitionID,
-		CollectionName: collName,
-		PartitionName:  partitionName,
-		DbName:         "DbName",
-		DbID:           UniqueID(0),
-	}
-	createPartitionMsg := msgstream.CreatePartitionMsg{
-		BaseMsg: msgstream.BaseMsg{
-			BeginTimestamp: Timestamp(3),
-			EndTimestamp:   Timestamp(3),
-			HashValues:     []uint32{uint32(0)},
-		},
-		CreatePartitionRequest: createPartitionReq,
-	}
-
-	// drop partition
-	dropPartitionReq := internalpb.DropPartitionRequest{
-		Base: &commonpb.MsgBase{
-			MsgType:   commonpb.MsgType_DropPartition,
-			MsgID:     4,
-			Timestamp: 4,
-			SourceID:  4,
-		},
-		CollectionID:   collID,
-		PartitionID:    partitionID,
-		CollectionName: collName,
-		PartitionName:  partitionName,
-		DbName:         "DbName",
-		DbID:           UniqueID(0),
-	}
-	dropPartitionMsg := msgstream.DropPartitionMsg{
-		BaseMsg: msgstream.BaseMsg{
-			BeginTimestamp: Timestamp(4),
-			EndTimestamp:   Timestamp(4),
-			HashValues:     []uint32{uint32(0)},
-		},
-		DropPartitionRequest: dropPartitionReq,
-	}
-
-	replica.addSegment(1, collID, partitionID, "insert-01")
-	flushCh := make(chan []*datapb.DDLBinlogMeta)
-	inFlushCh <- &flushMsg{
-		msgID:        5,
-		timestamp:    5,
-		segmentID:    UniqueID(1),
-		collectionID: collID,
-		ddlFlushedCh: flushCh,
-	}
-
-	startPos := []*internalpb.MsgPosition{
-		{
-			ChannelName: "aaa",
-			MsgID:       make([]byte, 0),
-			Timestamp:   0,
-		},
-	}
+	// ddNode := newDDNode()
 
-	tsMessages := make([]msgstream.TsMsg, 0)
-	tsMessages = append(tsMessages, msgstream.TsMsg(&createCollMsg))
-	tsMessages = append(tsMessages, msgstream.TsMsg(&dropCollMsg))
-	tsMessages = append(tsMessages, msgstream.TsMsg(&createPartitionMsg))
-	tsMessages = append(tsMessages, msgstream.TsMsg(&dropPartitionMsg))
-	msgStream := flowgraph.GenerateMsgStreamMsg(tsMessages, Timestamp(0), Timestamp(3),
-		startPos, startPos)
-	var inMsg Msg = msgStream
-	ddNode.Operate([]Msg{inMsg})
+	// var inMsg Msg = msgStream
+	// ddNode.Operate([]Msg{inMsg})
 
-	paths := <-flushCh
-	log.Debug("Flushed DDL binlog paths", zap.Any("paths", paths))
-	assert.Equal(t, 1, len(paths))
 }
diff --git a/internal/datanode/flow_graph_msg_stream_input_node.go b/internal/datanode/flow_graph_dmstream_input_node.go
similarity index 68%
rename from internal/datanode/flow_graph_msg_stream_input_node.go
rename to internal/datanode/flow_graph_dmstream_input_node.go
index c7eb70b53..9c50f745b 100644
--- a/internal/datanode/flow_graph_msg_stream_input_node.go
+++ b/internal/datanode/flow_graph_dmstream_input_node.go
@@ -21,8 +21,7 @@ import (
 )
 
 func newDmInputNode(ctx context.Context, factory msgstream.Factory, vchannelName string, vchannelPos *datapb.PositionPair) *flowgraph.InputNode {
-	// TODO use position pair in Seek
-
+	// TODO seek
 	maxQueueLength := Params.FlowGraphMaxQueueLength
 	maxParallelism := Params.FlowGraphMaxParallelism
 	consumeSubName := Params.MsgChannelSubName
@@ -35,19 +34,3 @@ func newDmInputNode(ctx context.Context, factory msgstream.Factory, vchannelName
 	node := flowgraph.NewInputNode(&stream, "dmInputNode", maxQueueLength, maxParallelism)
 	return node
 }
-
-func newDDInputNode(ctx context.Context, factory msgstream.Factory, vchannelName string, vchannelPos *datapb.PositionPair) *flowgraph.InputNode {
-
-	// TODO use position pair in Seek
-	maxQueueLength := Params.FlowGraphMaxQueueLength
-	maxParallelism := Params.FlowGraphMaxParallelism
-	consumeSubName := Params.MsgChannelSubName
-
-	tmpStream, _ := factory.NewTtMsgStream(ctx)
-	tmpStream.AsConsumer([]string{vchannelName}, consumeSubName)
-	log.Debug("datanode AsConsumer: " + vchannelName + " : " + consumeSubName)
-
-	var stream msgstream.MsgStream = tmpStream
-	node := flowgraph.NewInputNode(&stream, "ddInputNode", maxQueueLength, maxParallelism)
-	return node
-}
diff --git a/internal/datanode/flow_graph_filter_dm_node.go b/internal/datanode/flow_graph_filter_dm_node.go
deleted file mode 100644
index 2207e9e63..000000000
--- a/internal/datanode/flow_graph_filter_dm_node.go
+++ /dev/null
@@ -1,172 +0,0 @@
-// Copyright (C) 2019-2020 Zilliz. All rights reserved.
-//
-// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software distributed under the License
-// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
-// or implied. See the License for the specific language governing permissions and limitations under the License.
-
-package datanode
-
-import (
-	"math"
-
-	"github.com/milvus-io/milvus/internal/log"
-	"github.com/milvus-io/milvus/internal/msgstream"
-	"github.com/milvus-io/milvus/internal/proto/commonpb"
-	"github.com/milvus-io/milvus/internal/proto/internalpb"
-	"github.com/milvus-io/milvus/internal/util/flowgraph"
-	"github.com/milvus-io/milvus/internal/util/trace"
-	"github.com/opentracing/opentracing-go"
-	"go.uber.org/zap"
-)
-
-type filterDmNode struct {
-	BaseNode
-	ddMsg *ddMsg
-}
-
-func (fdmNode *filterDmNode) Name() string {
-	return "fdmNode"
-}
-
-func (fdmNode *filterDmNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
-
-	if len(in) != 2 {
-		log.Error("Invalid operate message input in filterDmNode", zap.Int("input length", len(in)))
-		// TODO: add error handling
-	}
-
-	msgStreamMsg, ok := in[0].(*MsgStreamMsg)
-	if !ok {
-		log.Error("type assertion failed for MsgStreamMsg")
-		// TODO: add error handling
-	}
-
-	ddMsg, ok := in[1].(*ddMsg)
-	if !ok {
-		log.Error("type assertion failed for ddMsg")
-		// TODO: add error handling
-	}
-
-	if msgStreamMsg == nil || ddMsg == nil {
-		return []Msg{}
-	}
-	var spans []opentracing.Span
-	for _, msg := range msgStreamMsg.TsMessages() {
-		sp, ctx := trace.StartSpanFromContext(msg.TraceCtx())
-		spans = append(spans, sp)
-		msg.SetTraceCtx(ctx)
-	}
-
-	fdmNode.ddMsg = ddMsg
-
-	var iMsg = insertMsg{
-		insertMessages: make([]*msgstream.InsertMsg, 0),
-		// flushMessages:  make([]*flushMsg, 0),
-		timeRange: TimeRange{
-			timestampMin: msgStreamMsg.TimestampMin(),
-			timestampMax: msgStreamMsg.TimestampMax(),
-		},
-		startPositions: make([]*internalpb.MsgPosition, 0),
-		endPositions:   make([]*internalpb.MsgPosition, 0),
-	}
-
-	// iMsg.flushMessages = append(iMsg.flushMessages, ddMsg.flushMessages...)
-	iMsg.flushMessage = ddMsg.flushMessage
-
-	for _, msg := range msgStreamMsg.TsMessages() {
-		switch msg.Type() {
-		case commonpb.MsgType_Insert:
-			resMsg := fdmNode.filterInvalidInsertMessage(msg.(*msgstream.InsertMsg))
-			if resMsg != nil {
-				iMsg.insertMessages = append(iMsg.insertMessages, resMsg)
-			}
-		// case commonpb.MsgType_kDelete:
-		// dmMsg.deleteMessages = append(dmMsg.deleteMessages, (*msg).(*msgstream.DeleteTask))
-		default:
-			log.Error("Not supporting message type", zap.Any("Type", msg.Type()))
-		}
-	}
-
-	iMsg.startPositions = append(iMsg.startPositions, msgStreamMsg.StartPositions()...)
-	iMsg.endPositions = append(iMsg.endPositions, msgStreamMsg.EndPositions()...)
-	iMsg.gcRecord = ddMsg.gcRecord
-	var res Msg = &iMsg
-	for _, sp := range spans {
-		sp.Finish()
-	}
-	return []Msg{res}
-}
-
-func (fdmNode *filterDmNode) filterInvalidInsertMessage(msg *msgstream.InsertMsg) *msgstream.InsertMsg {
-	// No dd record, do all insert requests.
-	sp, ctx := trace.StartSpanFromContext(msg.TraceCtx())
-	msg.SetTraceCtx(ctx)
-	defer sp.Finish()
-
-	records, ok := fdmNode.ddMsg.collectionRecords[msg.CollectionID]
-	if !ok {
-		return msg
-	}
-
-	// TODO: If the last record is drop type, all insert requests are invalid.
-	//if !records[len(records)-1].createOrDrop {
-	//	return nil
-	//}
-
-	// Filter insert requests before last record.
-	if len(msg.RowIDs) != len(msg.Timestamps) || len(msg.RowIDs) != len(msg.RowData) {
-		// TODO: what if the messages are misaligned? Here, we ignore those messages and print error
-		log.Error("misaligned messages detected")
-		return nil
-	}
-	tmpTimestamps := make([]Timestamp, 0)
-	tmpRowIDs := make([]int64, 0)
-	tmpRowData := make([]*commonpb.Blob, 0)
-
-	// calculate valid time range
-	timeBegin := Timestamp(0)
-	timeEnd := Timestamp(math.MaxUint64)
-	for _, record := range records {
-		if record.createOrDrop && timeBegin < record.timestamp {
-			timeBegin = record.timestamp
-		}
-		if !record.createOrDrop && timeEnd > record.timestamp {
-			timeEnd = record.timestamp
-		}
-	}
-
-	for i, t := range msg.Timestamps {
-		if t >= timeBegin && t <= timeEnd {
-			tmpTimestamps = append(tmpTimestamps, t)
-			tmpRowIDs = append(tmpRowIDs, msg.RowIDs[i])
-			tmpRowData = append(tmpRowData, msg.RowData[i])
-		}
-	}
-
-	if len(tmpRowIDs) <= 0 {
-		return nil
-	}
-
-	msg.Timestamps = tmpTimestamps
-	msg.RowIDs = tmpRowIDs
-	msg.RowData = tmpRowData
-	return msg
-}
-
-func newFilteredDmNode() *filterDmNode {
-	maxQueueLength := Params.FlowGraphMaxQueueLength
-	maxParallelism := Params.FlowGraphMaxParallelism
-
-	baseNode := BaseNode{}
-	baseNode.SetMaxQueueLength(maxQueueLength)
-	baseNode.SetMaxParallelism(maxParallelism)
-
-	return &filterDmNode{
-		BaseNode: baseNode,
-	}
-}
diff --git a/internal/datanode/flow_graph_gc_node.go b/internal/datanode/flow_graph_gc_node.go
deleted file mode 100644
index bea864854..000000000
--- a/internal/datanode/flow_graph_gc_node.go
+++ /dev/null
@@ -1,71 +0,0 @@
-// Copyright (C) 2019-2020 Zilliz. All rights reserved.
-//
-// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software distributed under the License
-// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
-// or implied. See the License for the specific language governing permissions and limitations under the License.
-
-package datanode
-
-import (
-	"github.com/milvus-io/milvus/internal/util/flowgraph"
-
-	"go.uber.org/zap"
-
-	"github.com/milvus-io/milvus/internal/log"
-)
-
-type gcNode struct {
-	BaseNode
-	replica Replica
-}
-
-func (gcNode *gcNode) Name() string {
-	return "gcNode"
-}
-
-func (gcNode *gcNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
-
-	if len(in) != 1 {
-		log.Error("Invalid operate message input in gcNode", zap.Int("input length", len(in)))
-		// TODO: add error handling
-	}
-
-	gcMsg, ok := in[0].(*gcMsg)
-	if !ok {
-		log.Error("type assertion failed for gcMsg")
-		// TODO: add error handling
-	}
-
-	if gcMsg == nil {
-		return []Msg{}
-	}
-
-	// drop collections
-	for _, collectionID := range gcMsg.gcRecord.collections {
-		err := gcNode.replica.removeCollection(collectionID)
-		if err != nil {
-			log.Error("replica remove collection wrong", zap.Error(err))
-		}
-	}
-
-	return nil
-}
-
-func newGCNode(replica Replica) *gcNode {
-	maxQueueLength := Params.FlowGraphMaxQueueLength
-	maxParallelism := Params.FlowGraphMaxParallelism
-
-	baseNode := BaseNode{}
-	baseNode.SetMaxQueueLength(maxQueueLength)
-	baseNode.SetMaxParallelism(maxParallelism)
-
-	return &gcNode{
-		BaseNode: baseNode,
-		replica:  replica,
-	}
-}
diff --git a/internal/datanode/flow_graph_insert_buffer_node.go b/internal/datanode/flow_graph_insert_buffer_node.go
index d8514fb27..3e7b05b51 100644
--- a/internal/datanode/flow_graph_insert_buffer_node.go
+++ b/internal/datanode/flow_graph_insert_buffer_node.go
@@ -54,12 +54,12 @@ type insertBufferNode struct {
 	replica      Replica
 	idAllocator  allocatorInterface
 	flushMap     sync.Map
+	flushChan    <-chan *flushMsg
 
 	minIOKV kv.BaseKV
 
 	timeTickStream          msgstream.MsgStream
 	segmentStatisticsStream msgstream.MsgStream
-	completeFlushStream     msgstream.MsgStream
 }
 
 type insertBuffer struct {
@@ -487,12 +487,13 @@ func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
 
 	// iMsg is Flush() msg from dataservice
 	//   1. insertBuffer(not empty) -> binLogs -> minIO/S3
-	if iMsg.flushMessage != nil && ibNode.replica.hasSegment(iMsg.flushMessage.segmentID) {
-		currentSegID := iMsg.flushMessage.segmentID
+	select {
+	case fmsg := <-ibNode.flushChan:
+		currentSegID := fmsg.segmentID
 		log.Debug(". Receiving flush message", zap.Int64("segmentID", currentSegID))
 
 		finishCh := make(chan map[UniqueID]string)
-		go ibNode.completeFlush(currentSegID, finishCh, iMsg.flushMessage.dmlFlushedCh)
+		go ibNode.completeFlush(currentSegID, finishCh, fmsg.dmlFlushedCh)
 
 		if ibNode.insertBuffer.size(currentSegID) <= 0 {
 			log.Debug(".. Buffer empty ...")
@@ -532,21 +533,19 @@ func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
 				&ibNode.flushMap, ibNode.minIOKV, finishCh, ibNode.idAllocator)
 		}
 
+	default:
 	}
 
-	if err := ibNode.writeHardTimeTick(iMsg.timeRange.timestampMax); err != nil {
-		log.Error("send hard time tick into pulsar channel failed", zap.Error(err))
-	}
+	// TODO write timetick
+	// if err := ibNode.writeHardTimeTick(iMsg.timeRange.timestampMax); err != nil {
+	//     log.Error("send hard time tick into pulsar channel failed", zap.Error(err))
+	// }
 
-	var res Msg = &gcMsg{
-		gcRecord:  iMsg.gcRecord,
-		timeRange: iMsg.timeRange,
-	}
 	for _, sp := range spans {
 		sp.Finish()
 	}
 
-	return []Msg{res}
+	return nil
 }
 
 func flushSegment(collMeta *etcdpb.CollectionMeta, segID, partitionID, collID UniqueID,
@@ -770,7 +769,7 @@ func (ibNode *insertBufferNode) getCollectionandPartitionIDbySegID(segmentID Uni
 	return
 }
 
-func newInsertBufferNode(ctx context.Context, replica Replica, factory msgstream.Factory, idAllocator allocatorInterface) *insertBufferNode {
+func newInsertBufferNode(ctx context.Context, replica Replica, factory msgstream.Factory, idAllocator allocatorInterface, flushCh <-chan *flushMsg) *insertBufferNode {
 	maxQueueLength := Params.FlowGraphMaxQueueLength
 	maxParallelism := Params.FlowGraphMaxParallelism
 
@@ -813,22 +812,17 @@ func newInsertBufferNode(ctx context.Context, replica Replica, factory msgstream
 	var segStatisticsMsgStream msgstream.MsgStream = segS
 	segStatisticsMsgStream.Start()
 
-	// segment flush completed channel
-	cf, _ := factory.NewMsgStream(ctx)
-	cf.AsProducer([]string{Params.CompleteFlushChannelName})
-	log.Debug("datanode AsProducer: " + Params.CompleteFlushChannelName)
-	var completeFlushStream msgstream.MsgStream = cf
-	completeFlushStream.Start()
-
 	return &insertBufferNode{
-		BaseNode:                baseNode,
-		insertBuffer:            iBuffer,
-		minIOKV:                 minIOKV,
+		BaseNode:     baseNode,
+		insertBuffer: iBuffer,
+		minIOKV:      minIOKV,
+
 		timeTickStream:          wTtMsgStream,
 		segmentStatisticsStream: segStatisticsMsgStream,
-		completeFlushStream:     completeFlushStream,
-		replica:                 replica,
-		flushMap:                sync.Map{},
-		idAllocator:             idAllocator,
+
+		replica:     replica,
+		flushMap:    sync.Map{},
+		flushChan:   flushCh,
+		idAllocator: idAllocator,
 	}
 }
diff --git a/internal/datanode/flow_graph_insert_buffer_node_test.go b/internal/datanode/flow_graph_insert_buffer_node_test.go
index 56c1f74a1..320ad334c 100644
--- a/internal/datanode/flow_graph_insert_buffer_node_test.go
+++ b/internal/datanode/flow_graph_insert_buffer_node_test.go
@@ -72,21 +72,28 @@ func TestFlowGraphInsertBufferNode_Operate(t *testing.T) {
 	err = msFactory.SetParams(m)
 	assert.Nil(t, err)
 
-	iBNode := newInsertBufferNode(ctx, replica, msFactory, NewAllocatorFactory())
+	flushChan := make(chan *flushMsg, 100)
+	iBNode := newInsertBufferNode(ctx, replica, msFactory, NewAllocatorFactory(), flushChan)
 
-	ddlFlushedCh := make(chan []*datapb.DDLBinlogMeta)
 	dmlFlushedCh := make(chan []*datapb.ID2PathList)
 
-	inMsg := genInsertMsg(ddlFlushedCh, dmlFlushedCh)
+	flushChan <- &flushMsg{
+		msgID:        1,
+		timestamp:    2000,
+		segmentID:    UniqueID(1),
+		collectionID: UniqueID(1),
+		dmlFlushedCh: dmlFlushedCh,
+	}
+
+	inMsg := genInsertMsg()
 	var iMsg flowgraph.Msg = &inMsg
 	iBNode.Operate([]flowgraph.Msg{iMsg})
-
 	isflushed := <-dmlFlushedCh
 	assert.NotNil(t, isflushed)
 	log.Debug("DML binlog paths", zap.Any("paths", isflushed))
 }
 
-func genInsertMsg(ddlFlushedCh chan<- []*datapb.DDLBinlogMeta, dmlFlushedCh chan<- []*datapb.ID2PathList) insertMsg {
+func genInsertMsg() insertMsg {
 
 	timeRange := TimeRange{
 		timestampMin: 0,
@@ -114,15 +121,6 @@ func genInsertMsg(ddlFlushedCh chan<- []*datapb.DDLBinlogMeta, dmlFlushedCh chan
 	dataFactory := NewDataFactory()
 	iMsg.insertMessages = append(iMsg.insertMessages, dataFactory.GetMsgStreamInsertMsgs(2)...)
 
-	iMsg.flushMessage = &flushMsg{
-		msgID:        1,
-		timestamp:    2000,
-		segmentID:    UniqueID(1),
-		collectionID: UniqueID(1),
-		ddlFlushedCh: ddlFlushedCh,
-		dmlFlushedCh: dmlFlushedCh,
-	}
-
 	return *iMsg
 
 }
diff --git a/internal/datanode/flow_graph_message.go b/internal/datanode/flow_graph_message.go
index 5aca7247c..e3ea55f79 100644
--- a/internal/datanode/flow_graph_message.go
+++ b/internal/datanode/flow_graph_message.go
@@ -23,72 +23,21 @@ type (
 	MsgStreamMsg = flowgraph.MsgStreamMsg
 )
 
-type key2SegMsg struct {
-	tsMessages []msgstream.TsMsg
-	timeRange  TimeRange
-}
-
-type ddMsg struct {
-	collectionRecords map[UniqueID][]*metaOperateRecord
-	partitionRecords  map[UniqueID][]*metaOperateRecord
-	flushMessage      *flushMsg
-	gcRecord          *gcRecord
-	timeRange         TimeRange
-}
-
-type metaOperateRecord struct {
-	createOrDrop bool // create: true, drop: false
-	timestamp    Timestamp
-}
-
 type insertMsg struct {
 	insertMessages []*msgstream.InsertMsg
-	flushMessage   *flushMsg
-	gcRecord       *gcRecord
 	timeRange      TimeRange
 	startPositions []*internalpb.MsgPosition
 	endPositions   []*internalpb.MsgPosition
 }
 
-type deleteMsg struct {
-	deleteMessages []*msgstream.DeleteMsg
-	timeRange      TimeRange
-}
-
-type gcMsg struct {
-	gcRecord  *gcRecord
-	timeRange TimeRange
-}
-
-type gcRecord struct {
-	collections []UniqueID
-}
-
 type flushMsg struct {
 	msgID        UniqueID
 	timestamp    Timestamp
 	segmentID    UniqueID
 	collectionID UniqueID
-	ddlFlushedCh chan<- []*datapb.DDLBinlogMeta
 	dmlFlushedCh chan<- []*datapb.ID2PathList
 }
 
-func (ksMsg *key2SegMsg) TimeTick() Timestamp {
-	return ksMsg.timeRange.timestampMax
-}
-
-func (suMsg *ddMsg) TimeTick() Timestamp {
-	return suMsg.timeRange.timestampMax
-}
-
 func (iMsg *insertMsg) TimeTick() Timestamp {
 	return iMsg.timeRange.timestampMax
 }
-
-func (dMsg *deleteMsg) TimeTick() Timestamp {
-	return dMsg.timeRange.timestampMax
-}
-
-func (gcMsg *gcMsg) TimeTick() Timestamp {
-	return gcMsg.timeRange.timestampMax
-}
-- 
GitLab