From 91f5c48037ba04e65f786da87a38deb023d84806 Mon Sep 17 00:00:00 2001
From: XuanYang-cn <xuan.yang@zilliz.com>
Date: Tue, 30 Mar 2021 09:47:27 +0800
Subject: [PATCH] Fix datanode goroutine leak

Signed-off-by: XuanYang-cn <xuan.yang@zilliz.com>
---
 internal/datanode/data_node.go                |  7 +++--
 internal/datanode/data_node_test.go           | 10 +++----
 internal/datanode/data_sync_service.go        |  4 +--
 internal/datanode/flow_graph_dd_node.go       |  4 +--
 .../datanode/flow_graph_insert_buffer_node.go | 29 ++++++++++++-------
 .../flow_graph_insert_buffer_node_test.go     |  4 ++-
 6 files changed, 35 insertions(+), 23 deletions(-)

diff --git a/internal/datanode/data_node.go b/internal/datanode/data_node.go
index 85a1f782e..86fbcc3d2 100644
--- a/internal/datanode/data_node.go
+++ b/internal/datanode/data_node.go
@@ -40,7 +40,7 @@ type DataNode struct {
 	masterService types.MasterService
 	dataService   types.DataService
 
-	flushChan chan *flushMsg
+	flushChan chan<- *flushMsg
 	replica   Replica
 
 	closer io.Closer
@@ -135,9 +135,10 @@ func (node *DataNode) Init() error {
 	var alloc allocatorInterface = newAllocator(node.masterService)
 
 	chanSize := 100
-	node.flushChan = make(chan *flushMsg, chanSize)
+	flushChan := make(chan *flushMsg, chanSize)
+	node.flushChan = flushChan
 
-	node.dataSyncService = newDataSyncService(node.ctx, node.flushChan, replica, alloc, node.msFactory)
+	node.dataSyncService = newDataSyncService(node.ctx, flushChan, replica, alloc, node.msFactory)
 	node.dataSyncService.init()
 	node.metaService = newMetaService(node.ctx, replica, node.masterService)
 
diff --git a/internal/datanode/data_node_test.go b/internal/datanode/data_node_test.go
index a12c5572c..9394e4bb8 100644
--- a/internal/datanode/data_node_test.go
+++ b/internal/datanode/data_node_test.go
@@ -237,11 +237,11 @@ func (mf *MetaFactory) CollectionMetaFactory(collectionID UniqueID, collectionNa
 	}
 
 	collection := etcdpb.CollectionMeta{
-		ID:            collectionID,
-		Schema:        &sch,
-		CreateTime:    Timestamp(1),
-		SegmentIDs:    make([]UniqueID, 0),
-		PartitionTags: make([]string, 0),
+		ID:           collectionID,
+		Schema:       &sch,
+		CreateTime:   Timestamp(1),
+		SegmentIDs:   make([]UniqueID, 0),
+		PartitionIDs: []UniqueID{0},
 	}
 	return &collection
 }
diff --git a/internal/datanode/data_sync_service.go b/internal/datanode/data_sync_service.go
index 008edef8e..2c1c7309c 100644
--- a/internal/datanode/data_sync_service.go
+++ b/internal/datanode/data_sync_service.go
@@ -17,13 +17,13 @@ import (
 type dataSyncService struct {
 	ctx         context.Context
 	fg          *flowgraph.TimeTickedFlowGraph
-	flushChan   chan *flushMsg
+	flushChan   <-chan *flushMsg
 	replica     Replica
 	idAllocator allocatorInterface
 	msFactory   msgstream.Factory
 }
 
-func newDataSyncService(ctx context.Context, flushChan chan *flushMsg,
+func newDataSyncService(ctx context.Context, flushChan <-chan *flushMsg,
 	replica Replica, alloc allocatorInterface, factory msgstream.Factory) *dataSyncService {
 	service := &dataSyncService{
 		ctx:         ctx,
diff --git a/internal/datanode/flow_graph_dd_node.go b/internal/datanode/flow_graph_dd_node.go
index fa6c3e1c6..7a7c397a9 100644
--- a/internal/datanode/flow_graph_dd_node.go
+++ b/internal/datanode/flow_graph_dd_node.go
@@ -29,7 +29,7 @@ type ddNode struct {
 	ddRecords *ddRecords
 	ddBuffer  *ddBuffer
 	flushMap  *sync.Map
-	inFlushCh chan *flushMsg
+	inFlushCh <-chan *flushMsg
 
 	kv         kv.Base
 	replica    Replica
@@ -429,7 +429,7 @@ func (ddNode *ddNode) dropPartition(msg *msgstream.DropPartitionMsg) {
 }
 
 func newDDNode(ctx context.Context, binlogMeta *binlogMeta,
-	inFlushCh chan *flushMsg, replica Replica) *ddNode {
+	inFlushCh <-chan *flushMsg, replica Replica) *ddNode {
 	maxQueueLength := Params.FlowGraphMaxQueueLength
 	maxParallelism := Params.FlowGraphMaxParallelism
 
diff --git a/internal/datanode/flow_graph_insert_buffer_node.go b/internal/datanode/flow_graph_insert_buffer_node.go
index a0703c105..76b339a56 100644
--- a/internal/datanode/flow_graph_insert_buffer_node.go
+++ b/internal/datanode/flow_graph_insert_buffer_node.go
@@ -137,6 +137,7 @@ func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
 				for _, pos := range iMsg.startPositions {
 					if pos.ChannelName == segment.channelName {
 						startPosition = pos
+						break
 					}
 				}
 				if startPosition == nil {
@@ -457,6 +458,7 @@ func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
 		if ibNode.insertBuffer.full(currentSegID) {
 			log.Debug(". Insert Buffer full, auto flushing ",
 				zap.Int32("num of rows", ibNode.insertBuffer.size(currentSegID)))
+
 			collSch, err := ibNode.getCollectionSchemaByID(collection.GetID())
 			if err != nil {
 				log.Error("Auto flush failed .. cannot get collection schema ..", zap.Error(err))
@@ -516,16 +518,23 @@ func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
 			log.Debug(".. Buffer not empty, flushing ..")
 			ibNode.flushMap.Store(currentSegID, ibNode.insertBuffer.insertData[currentSegID])
 			delete(ibNode.insertBuffer.insertData, currentSegID)
+			clearFn := func() {
+				finishCh <- false
+				log.Debug(".. Clearing flush Buffer ..")
+				ibNode.flushMap.Delete(currentSegID)
+			}
 
 			seg, err := ibNode.replica.getSegmentByID(currentSegID)
 			if err != nil {
 				log.Error("Flush failed .. cannot get segment ..", zap.Error(err))
+				clearFn()
 				continue
 			}
 
 			collSch, err := ibNode.getCollectionSchemaByID(seg.collectionID)
 			if err != nil {
 				log.Error("Flush failed .. cannot get collection schema ..", zap.Error(err))
+				clearFn()
 				continue
 			}
 
@@ -557,10 +566,11 @@ func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
 func flushSegmentTxn(collMeta *etcdpb.CollectionMeta, segID UniqueID, partitionID UniqueID, collID UniqueID,
 	insertData *sync.Map, meta *binlogMeta, kv kv.Base, finishCh chan<- bool) {
 
-	defer func() {
+	clearFn := func(isSuccess bool) {
+		finishCh <- isSuccess
 		log.Debug(".. Clearing flush Buffer ..")
 		insertData.Delete(segID)
-	}()
+	}
 
 	inCodec := storage.NewInsertCodec(collMeta)
 
@@ -568,14 +578,14 @@ func flushSegmentTxn(collMeta *etcdpb.CollectionMeta, segID UniqueID, partitionI
 	data, ok := insertData.Load(segID)
 	if !ok {
 		log.Error("Flush failed ... cannot load insertData ..")
-		finishCh <- false
+		clearFn(false)
 		return
 	}
 
 	binLogs, err := inCodec.Serialize(partitionID, segID, data.(*InsertData))
 	if err != nil {
 		log.Error("Flush failed ... cannot generate binlog ..", zap.Error(err))
-		finishCh <- false
+		clearFn(false)
 		return
 	}
 
@@ -587,14 +597,14 @@ func flushSegmentTxn(collMeta *etcdpb.CollectionMeta, segID UniqueID, partitionI
 		fieldID, err := strconv.ParseInt(blob.GetKey(), 10, 64)
 		if err != nil {
 			log.Error("Flush failed ... cannot parse string to fieldID ..", zap.Error(err))
-			finishCh <- false
+			clearFn(false)
 			return
 		}
 
 		k, err := meta.genKey(true, collID, partitionID, segID, fieldID)
 		if err != nil {
 			log.Error("Flush failed ... cannot alloc ID ..", zap.Error(err))
-			finishCh <- false
+			clearFn(false)
 			return
 		}
 
@@ -608,7 +618,7 @@ func flushSegmentTxn(collMeta *etcdpb.CollectionMeta, segID UniqueID, partitionI
 	if err != nil {
 		log.Error("Flush failed ... cannot save to MinIO ..", zap.Error(err))
 		_ = kv.MultiRemove(paths)
-		finishCh <- false
+		clearFn(false)
 		return
 	}
 
@@ -617,12 +627,11 @@ func flushSegmentTxn(collMeta *etcdpb.CollectionMeta, segID UniqueID, partitionI
 	if err != nil {
 		log.Error("Flush failed ... cannot save binlog paths ..", zap.Error(err))
 		_ = kv.MultiRemove(paths)
-		finishCh <- false
+		clearFn(false)
 		return
 	}
 
-	finishCh <- true
-
+	clearFn(true)
 }
 
 func (ibNode *insertBufferNode) completeFlush(segID UniqueID, finishCh <-chan bool) {
diff --git a/internal/datanode/flow_graph_insert_buffer_node_test.go b/internal/datanode/flow_graph_insert_buffer_node_test.go
index 196d9b48a..a5b40c793 100644
--- a/internal/datanode/flow_graph_insert_buffer_node_test.go
+++ b/internal/datanode/flow_graph_insert_buffer_node_test.go
@@ -40,6 +40,8 @@ func TestFlowGraphInsertBufferNode_Operate(t *testing.T) {
 	replica := newReplica()
 	err = replica.addCollection(collMeta.ID, collMeta.Schema)
 	require.NoError(t, err)
+	err = replica.addSegment(1, collMeta.ID, 0, Params.InsertChannelNames[0])
+	require.NoError(t, err)
 
 	msFactory := pulsarms.NewFactory()
 	m := map[string]interface{}{
@@ -64,7 +66,7 @@ func genInsertMsg() insertMsg {
 
 	startPos := []*internalpb.MsgPosition{
 		{
-			ChannelName: "aaa",
+			ChannelName: Params.InsertChannelNames[0],
 			MsgID:       make([]byte, 0),
 			Timestamp:   0,
 		},
-- 
GitLab