diff --git a/internal/datanode/data_node.go b/internal/datanode/data_node.go
index 85a1f782ebe750dc34ac6587556be79267f3674c..86fbcc3d2b95dc5cf8f3c1412e33cf453c84879a 100644
--- a/internal/datanode/data_node.go
+++ b/internal/datanode/data_node.go
@@ -40,7 +40,7 @@ type DataNode struct {
 	masterService types.MasterService
 	dataService   types.DataService
 
-	flushChan chan *flushMsg
+	flushChan chan<- *flushMsg
 	replica   Replica
 
 	closer io.Closer
@@ -135,9 +135,10 @@ func (node *DataNode) Init() error {
 	var alloc allocatorInterface = newAllocator(node.masterService)
 
 	chanSize := 100
-	node.flushChan = make(chan *flushMsg, chanSize)
+	flushChan := make(chan *flushMsg, chanSize)
+	node.flushChan = flushChan
 
-	node.dataSyncService = newDataSyncService(node.ctx, node.flushChan, replica, alloc, node.msFactory)
+	node.dataSyncService = newDataSyncService(node.ctx, flushChan, replica, alloc, node.msFactory)
 	node.dataSyncService.init()
 	node.metaService = newMetaService(node.ctx, replica, node.masterService)
 
diff --git a/internal/datanode/data_node_test.go b/internal/datanode/data_node_test.go
index a12c5572ce2ea1da6fd791a28745738d6118b777..9394e4bb87722b88b0d3d5a93da80c2d14dccdfb 100644
--- a/internal/datanode/data_node_test.go
+++ b/internal/datanode/data_node_test.go
@@ -237,11 +237,11 @@ func (mf *MetaFactory) CollectionMetaFactory(collectionID UniqueID, collectionNa
 	}
 
 	collection := etcdpb.CollectionMeta{
-		ID:            collectionID,
-		Schema:        &sch,
-		CreateTime:    Timestamp(1),
-		SegmentIDs:    make([]UniqueID, 0),
-		PartitionTags: make([]string, 0),
+		ID:           collectionID,
+		Schema:       &sch,
+		CreateTime:   Timestamp(1),
+		SegmentIDs:   make([]UniqueID, 0),
+		PartitionIDs: []UniqueID{0},
 	}
 	return &collection
 }
diff --git a/internal/datanode/data_sync_service.go b/internal/datanode/data_sync_service.go
index 008edef8ea5ed8fadfe276230db138ad6c5f5c7c..2c1c7309ca0dcdb85a61575c1ef128e611ee41be 100644
--- a/internal/datanode/data_sync_service.go
+++ b/internal/datanode/data_sync_service.go
@@ -17,13 +17,13 @@ import (
 type dataSyncService struct {
 	ctx         context.Context
 	fg          *flowgraph.TimeTickedFlowGraph
-	flushChan   chan *flushMsg
+	flushChan   <-chan *flushMsg
 	replica     Replica
 	idAllocator allocatorInterface
 	msFactory   msgstream.Factory
 }
 
-func newDataSyncService(ctx context.Context, flushChan chan *flushMsg,
+func newDataSyncService(ctx context.Context, flushChan <-chan *flushMsg,
 	replica Replica, alloc allocatorInterface, factory msgstream.Factory) *dataSyncService {
 	service := &dataSyncService{
 		ctx:         ctx,
diff --git a/internal/datanode/flow_graph_dd_node.go b/internal/datanode/flow_graph_dd_node.go
index fa6c3e1c6d0eb249840b617c1e3f904d4a60e738..7a7c397a9b7a2810a8f67d95953ec4a6fa66b539 100644
--- a/internal/datanode/flow_graph_dd_node.go
+++ b/internal/datanode/flow_graph_dd_node.go
@@ -29,7 +29,7 @@ type ddNode struct {
 	ddRecords *ddRecords
 	ddBuffer  *ddBuffer
 	flushMap  *sync.Map
-	inFlushCh chan *flushMsg
+	inFlushCh <-chan *flushMsg
 
 	kv         kv.Base
 	replica    Replica
@@ -429,7 +429,7 @@ func (ddNode *ddNode) dropPartition(msg *msgstream.DropPartitionMsg) {
 }
 
 func newDDNode(ctx context.Context, binlogMeta *binlogMeta,
-	inFlushCh chan *flushMsg, replica Replica) *ddNode {
+	inFlushCh <-chan *flushMsg, replica Replica) *ddNode {
 	maxQueueLength := Params.FlowGraphMaxQueueLength
 	maxParallelism := Params.FlowGraphMaxParallelism
 
diff --git a/internal/datanode/flow_graph_insert_buffer_node.go b/internal/datanode/flow_graph_insert_buffer_node.go
index a0703c10564036490860f42ad67c6a9731b02b65..76b339a56f87ff9c1bec0e8a0d811464374a6fe0 100644
--- a/internal/datanode/flow_graph_insert_buffer_node.go
+++ b/internal/datanode/flow_graph_insert_buffer_node.go
@@ -137,6 +137,7 @@ func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
 				for _, pos := range iMsg.startPositions {
 					if pos.ChannelName == segment.channelName {
 						startPosition = pos
+						break
 					}
 				}
 				if startPosition == nil {
@@ -457,6 +458,7 @@ func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
 		if ibNode.insertBuffer.full(currentSegID) {
 			log.Debug(". Insert Buffer full, auto flushing ",
 				zap.Int32("num of rows", ibNode.insertBuffer.size(currentSegID)))
+
 			collSch, err := ibNode.getCollectionSchemaByID(collection.GetID())
 			if err != nil {
 				log.Error("Auto flush failed .. cannot get collection schema ..", zap.Error(err))
@@ -516,16 +518,23 @@ func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
 			log.Debug(".. Buffer not empty, flushing ..")
 			ibNode.flushMap.Store(currentSegID, ibNode.insertBuffer.insertData[currentSegID])
 			delete(ibNode.insertBuffer.insertData, currentSegID)
+			clearFn := func() {
+				finishCh <- false
+				log.Debug(".. Clearing flush Buffer ..")
+				ibNode.flushMap.Delete(currentSegID)
+			}
 
 			seg, err := ibNode.replica.getSegmentByID(currentSegID)
 			if err != nil {
 				log.Error("Flush failed .. cannot get segment ..", zap.Error(err))
+				clearFn()
 				continue
 			}
 
 			collSch, err := ibNode.getCollectionSchemaByID(seg.collectionID)
 			if err != nil {
 				log.Error("Flush failed .. cannot get collection schema ..", zap.Error(err))
+				clearFn()
 				continue
 			}
 
@@ -557,10 +566,11 @@ func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
 func flushSegmentTxn(collMeta *etcdpb.CollectionMeta, segID UniqueID, partitionID UniqueID, collID UniqueID,
 	insertData *sync.Map, meta *binlogMeta, kv kv.Base, finishCh chan<- bool) {
 
-	defer func() {
+	clearFn := func(isSuccess bool) {
+		finishCh <- isSuccess
 		log.Debug(".. Clearing flush Buffer ..")
 		insertData.Delete(segID)
-	}()
+	}
 
 	inCodec := storage.NewInsertCodec(collMeta)
 
@@ -568,14 +578,14 @@ func flushSegmentTxn(collMeta *etcdpb.CollectionMeta, segID UniqueID, partitionI
 	data, ok := insertData.Load(segID)
 	if !ok {
 		log.Error("Flush failed ... cannot load insertData ..")
-		finishCh <- false
+		clearFn(false)
 		return
 	}
 
 	binLogs, err := inCodec.Serialize(partitionID, segID, data.(*InsertData))
 	if err != nil {
 		log.Error("Flush failed ... cannot generate binlog ..", zap.Error(err))
-		finishCh <- false
+		clearFn(false)
 		return
 	}
 
@@ -587,14 +597,14 @@ func flushSegmentTxn(collMeta *etcdpb.CollectionMeta, segID UniqueID, partitionI
 		fieldID, err := strconv.ParseInt(blob.GetKey(), 10, 64)
 		if err != nil {
 			log.Error("Flush failed ... cannot parse string to fieldID ..", zap.Error(err))
-			finishCh <- false
+			clearFn(false)
 			return
 		}
 
 		k, err := meta.genKey(true, collID, partitionID, segID, fieldID)
 		if err != nil {
 			log.Error("Flush failed ... cannot alloc ID ..", zap.Error(err))
-			finishCh <- false
+			clearFn(false)
 			return
 		}
 
@@ -608,7 +618,7 @@ func flushSegmentTxn(collMeta *etcdpb.CollectionMeta, segID UniqueID, partitionI
 	if err != nil {
 		log.Error("Flush failed ... cannot save to MinIO ..", zap.Error(err))
 		_ = kv.MultiRemove(paths)
-		finishCh <- false
+		clearFn(false)
 		return
 	}
 
@@ -617,12 +627,11 @@ func flushSegmentTxn(collMeta *etcdpb.CollectionMeta, segID UniqueID, partitionI
 	if err != nil {
 		log.Error("Flush failed ... cannot save binlog paths ..", zap.Error(err))
 		_ = kv.MultiRemove(paths)
-		finishCh <- false
+		clearFn(false)
 		return
 	}
 
-	finishCh <- true
-
+	clearFn(true)
 }
 
 func (ibNode *insertBufferNode) completeFlush(segID UniqueID, finishCh <-chan bool) {
diff --git a/internal/datanode/flow_graph_insert_buffer_node_test.go b/internal/datanode/flow_graph_insert_buffer_node_test.go
index 196d9b48a7f36b8bd3f7b7fc447c83222f5e7229..a5b40c79396e937e7c032f3818270054d6a16bbe 100644
--- a/internal/datanode/flow_graph_insert_buffer_node_test.go
+++ b/internal/datanode/flow_graph_insert_buffer_node_test.go
@@ -40,6 +40,8 @@ func TestFlowGraphInsertBufferNode_Operate(t *testing.T) {
 	replica := newReplica()
 	err = replica.addCollection(collMeta.ID, collMeta.Schema)
 	require.NoError(t, err)
+	err = replica.addSegment(1, collMeta.ID, 0, Params.InsertChannelNames[0])
+	require.NoError(t, err)
 
 	msFactory := pulsarms.NewFactory()
 	m := map[string]interface{}{
@@ -64,7 +66,7 @@ func genInsertMsg() insertMsg {
 
 	startPos := []*internalpb.MsgPosition{
 		{
-			ChannelName: "aaa",
+			ChannelName: Params.InsertChannelNames[0],
 			MsgID:       make([]byte, 0),
 			Timestamp:   0,
 		},