From 8657251f41d26eeda70e4cf4fc55d6c53e4681da Mon Sep 17 00:00:00 2001
From: congqixia <congqi.xia@zilliz.com>
Date: Fri, 28 May 2021 16:47:29 +0800
Subject: [PATCH] DataNode stores segment msgpack positions (#5472)

* DataNode stores segment msgpack positions

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>

* Fix typo caused deadlock

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>

* Use MsgPack EndPositions for flush pos

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
---
 internal/datanode/collection_replica.go       | 41 ++++++++++++++++++-
 internal/datanode/data_node.go                |  9 ++++
 .../datanode/flow_graph_insert_buffer_node.go |  4 ++
 3 files changed, 52 insertions(+), 2 deletions(-)

diff --git a/internal/datanode/collection_replica.go b/internal/datanode/collection_replica.go
index a3ac67524..84638bfc5 100644
--- a/internal/datanode/collection_replica.go
+++ b/internal/datanode/collection_replica.go
@@ -45,6 +45,10 @@ type Replica interface {
 	bufferAutoFlushBinlogPaths(segmentID UniqueID, field2Path map[UniqueID]string) error
 	getBufferPaths(segID UniqueID) (map[UniqueID][]string, error)
 	getChannelName(segID UniqueID) (string, error)
+	//new msg postions
+	setStartPositions(segmentID UniqueID, startPos []*internalpb.MsgPosition) error
+	setEndPositions(segmentID UniqueID, endPos []*internalpb.MsgPosition) error
+	getSegmentPositions(segID UniqueID) ([]*internalpb.MsgPosition, []*internalpb.MsgPosition)
 }
 
 // Segment is the data structure of segments in data node replica.
@@ -71,6 +75,10 @@ type CollectionSegmentReplica struct {
 	mu          sync.RWMutex
 	segments    map[UniqueID]*Segment
 	collections map[UniqueID]*Collection
+
+	posMu          sync.Mutex
+	startPositions map[UniqueID][]*internalpb.MsgPosition
+	endPositions   map[UniqueID][]*internalpb.MsgPosition
 }
 
 var _ Replica = &CollectionSegmentReplica{}
@@ -80,8 +88,10 @@ func newReplica() Replica {
 	collections := make(map[UniqueID]*Collection)
 
 	var replica Replica = &CollectionSegmentReplica{
-		segments:    segments,
-		collections: collections,
+		segments:       segments,
+		collections:    collections,
+		startPositions: make(map[UniqueID][]*internalpb.MsgPosition),
+		endPositions:   make(map[UniqueID][]*internalpb.MsgPosition),
 	}
 	return replica
 }
@@ -338,3 +348,30 @@ func (replica *CollectionSegmentReplica) hasCollection(collectionID UniqueID) bo
 	_, ok := replica.collections[collectionID]
 	return ok
 }
+
+// setStartPositions set segment `Start Position` - means the `startPositions` from the MsgPack when segment is first found
+func (replica *CollectionSegmentReplica) setStartPositions(segID UniqueID, startPositions []*internalpb.MsgPosition) error {
+	replica.posMu.Lock()
+	defer replica.posMu.Unlock()
+	replica.startPositions[segID] = startPositions
+	return nil
+}
+
+// setEndPositions set segment `End Position` - means the `endPositions` from the MsgPack when segment need to be flushed
+func (replica *CollectionSegmentReplica) setEndPositions(segID UniqueID, endPositions []*internalpb.MsgPosition) error {
+	replica.posMu.Lock()
+	defer replica.posMu.Unlock()
+	replica.endPositions[segID] = endPositions
+	return nil
+}
+
+// getSegmentPositions returns stored segment start-end Positions
+// To te Noted: start/end positions are NOT start&end position from one single MsgPack, they are from different MsgPack!
+// see setStartPositions, setEndPositions comment
+func (replica *CollectionSegmentReplica) getSegmentPositions(segID UniqueID) ([]*internalpb.MsgPosition, []*internalpb.MsgPosition) {
+	replica.posMu.Lock()
+	defer replica.posMu.Unlock()
+	startPos := replica.startPositions[segID]
+	endPos := replica.endPositions[segID]
+	return startPos, endPos
+}
diff --git a/internal/datanode/data_node.go b/internal/datanode/data_node.go
index ca9c23126..539e0b2f9 100644
--- a/internal/datanode/data_node.go
+++ b/internal/datanode/data_node.go
@@ -63,6 +63,7 @@ type DataNode struct {
 	State   atomic.Value // internalpb.StateCode_Initializing
 	watchDm chan struct{}
 
+	chanMut           sync.RWMutex
 	vchan2SyncService map[string]*dataSyncService
 	vchan2FlushCh     map[string]chan<- *flushMsg
 
@@ -187,6 +188,8 @@ func (node *DataNode) Init() error {
 
 // NewDataSyncService adds a new dataSyncService for new dmlVchannel and starts dataSyncService.
 func (node *DataNode) NewDataSyncService(vchanPair *datapb.VchannelPair) error {
+	node.chanMut.Lock()
+	defer node.chanMut.Unlock()
 	if _, ok := node.vchan2SyncService[vchanPair.GetDmlVchannelName()]; ok {
 		return nil
 	}
@@ -258,6 +261,8 @@ func (node *DataNode) GetComponentStates(ctx context.Context) (*internalpb.Compo
 }
 
 func (node *DataNode) getChannelName(segID UniqueID) string {
+	node.chanMut.RLock()
+	defer node.chanMut.RUnlock()
 	for name, dataSync := range node.vchan2SyncService {
 		if dataSync.replica.hasSegment(segID) {
 			return name
@@ -282,7 +287,9 @@ func (node *DataNode) FlushSegments(ctx context.Context, req *datapb.FlushSegmen
 			status.Reason = fmt.Sprintf("DataNode not find segment %d!", id)
 			return status, errors.New(status.GetReason())
 		}
+		node.chanMut.RLock()
 		flushCh, ok := node.vchan2FlushCh[chanName]
+		node.chanMut.RUnlock()
 		if !ok {
 			// TODO restart DataNode or reshape vchan2FlushCh and vchan2SyncService
 			status.Reason = "DataNode abnormal!"
@@ -390,6 +397,8 @@ func (node *DataNode) FlushSegments(ctx context.Context, req *datapb.FlushSegmen
 func (node *DataNode) Stop() error {
 	node.cancel()
 
+	node.chanMut.RLock()
+	defer node.chanMut.RUnlock()
 	// close services
 	for _, syncService := range node.vchan2SyncService {
 		if syncService != nil {
diff --git a/internal/datanode/flow_graph_insert_buffer_node.go b/internal/datanode/flow_graph_insert_buffer_node.go
index e8e32b783..402e250c6 100644
--- a/internal/datanode/flow_graph_insert_buffer_node.go
+++ b/internal/datanode/flow_graph_insert_buffer_node.go
@@ -160,6 +160,8 @@ func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
 					ibNode.replica.setStartPosition(currentSegID, startPosition)
 				}
 			}
+			// set msg pack start positions, new design
+			ibNode.replica.setStartPositions(currentSegID, iMsg.startPositions)
 		}
 
 		segNum := uniqueSeg[currentSegID]
@@ -479,6 +481,8 @@ func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
 			ibNode.replica.setEndPosition(currentSegID, endPosition)
 		}
 
+		// store current startPositions as Segment->EndPostion
+		ibNode.replica.setEndPositions(currentSegID, iMsg.endPositions)
 	}
 
 	if len(iMsg.insertMessages) > 0 {
-- 
GitLab