From 7dac20c35c5b09ab5f8e2c739fc08bb866b532b4 Mon Sep 17 00:00:00 2001
From: "zhenshan.cao" <zhenshan.cao@zilliz.com>
Date: Sat, 5 Jun 2021 16:21:36 +0800
Subject: [PATCH] Add log for poxynode transform data and querynode segment
 insert (#5627)

* Add log for poxynode transform data and querynode segment insert

Signed-off-by: zhenshan.cao <zhenshan.cao@zilliz.com>

* Decrease print times

Signed-off-by: zhenshan.cao <zhenshan.cao@zilliz.com>
---
 internal/datanode/flow_graph_insert_buffer_node.go | 11 +++++++++++
 internal/proxynode/task.go                         |  7 ++++++-
 internal/querynode/flow_graph_insert_node.go       | 10 +++++++++-
 internal/querynode/impl.go                         |  9 ++++++++-
 internal/querynode/segment.go                      | 13 +++++++++++--
 5 files changed, 45 insertions(+), 5 deletions(-)

diff --git a/internal/datanode/flow_graph_insert_buffer_node.go b/internal/datanode/flow_graph_insert_buffer_node.go
index ce64996ca..d375ef4da 100644
--- a/internal/datanode/flow_graph_insert_buffer_node.go
+++ b/internal/datanode/flow_graph_insert_buffer_node.go
@@ -210,6 +210,17 @@ func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
 		collSchema := collection.schema
 		// 1.2 Get Fields
 		var pos int = 0 // Record position of blob
+		log.Debug("DataNode flow_graph_insert_buffer_node", zap.Any("Fields", collSchema.Fields))
+		var fieldIDs []int64
+		var fieldTypes []schemapb.DataType
+		for _, field := range collSchema.Fields {
+			fieldIDs = append(fieldIDs, field.FieldID)
+			fieldTypes = append(fieldTypes, field.DataType)
+		}
+
+		log.Debug("DataNode flow_graph_insert_buffer_node", zap.Any("FieldIDs", fieldIDs))
+		log.Debug("DataNode flow_graph_insert_buffer_node", zap.Any("fieldTypes", fieldTypes))
+
 		for _, field := range collSchema.Fields {
 			switch field.DataType {
 			case schemapb.DataType_FloatVector:
diff --git a/internal/proxynode/task.go b/internal/proxynode/task.go
index 4839b00f3..ad3caf9e8 100644
--- a/internal/proxynode/task.go
+++ b/internal/proxynode/task.go
@@ -301,6 +301,7 @@ func (it *InsertTask) transferColumnBasedRequestToRowBasedData() error {
 	l := len(dTypes)
 	// TODO(dragondriver): big endian or little endian?
 	endian := binary.LittleEndian
+	printed := false
 	for i := 0; i < rowNum; i++ {
 		blob := &commonpb.Blob{
 			Value: make([]byte, 0),
@@ -376,7 +377,10 @@ func (it *InsertTask) transferColumnBasedRequestToRowBasedData() error {
 				log.Warn("unsupported data type")
 			}
 		}
-
+		if !printed {
+			log.Debug("ProxyNode, transform", zap.Any("ID", it.ID()), zap.Any("BlobLen", len(blob.Value)), zap.Any("dTypes", dTypes))
+			printed = true
+		}
 		it.RowData = append(it.RowData, blob)
 	}
 
@@ -618,6 +622,7 @@ func (it *InsertTask) _assignSegmentID(stream msgstream.MsgStream, pack *msgstre
 func (it *InsertTask) Execute(ctx context.Context) error {
 	collectionName := it.BaseInsertTask.CollectionName
 	collSchema, err := globalMetaCache.GetCollectionSchema(ctx, collectionName)
+	log.Debug("ProxyNode Insert", zap.Any("collSchema", collSchema))
 	if err != nil {
 		return err
 	}
diff --git a/internal/querynode/flow_graph_insert_node.go b/internal/querynode/flow_graph_insert_node.go
index c17a0aba1..0ffdeccfa 100644
--- a/internal/querynode/flow_graph_insert_node.go
+++ b/internal/querynode/flow_graph_insert_node.go
@@ -136,7 +136,15 @@ func (iNode *insertNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
 }
 
 func (iNode *insertNode) insert(insertData *InsertData, segmentID int64, wg *sync.WaitGroup) {
+	log.Debug("QueryNode::iNode::insert", zap.Any("SegmentID", segmentID))
 	var targetSegment, err = iNode.replica.getSegmentByID(segmentID)
+	log.Debug("QueryNode::iNode::insert", zap.Any("SegmentID", segmentID),
+		zap.Any("targetSegment", targetSegment),
+		zap.Error(err),
+		zap.Any("SegmentType", targetSegment.segmentType),
+		zap.Any("enableLoadBinLog", targetSegment.enableLoadBinLog),
+	)
+
 	if targetSegment.segmentType != segmentTypeGrowing || targetSegment.enableLoadBinLog {
 		wg.Done()
 		return
@@ -160,7 +168,7 @@ func (iNode *insertNode) insert(insertData *InsertData, segmentID int64, wg *syn
 
 	err = targetSegment.segmentInsert(offsets, &ids, &timestamps, &records)
 	if err != nil {
-		log.Error(err.Error())
+		log.Debug("QueryNode: targetSegmentInsert failed", zap.Error(err))
 		// TODO: add error handling
 		wg.Done()
 		return
diff --git a/internal/querynode/impl.go b/internal/querynode/impl.go
index 035c60f6b..aa91c1b71 100644
--- a/internal/querynode/impl.go
+++ b/internal/querynode/impl.go
@@ -295,7 +295,7 @@ func (node *QueryNode) ReleasePartitions(ctx context.Context, in *queryPb.Releas
 	return status, nil
 }
 
-// deprecated
+// ReleaseSegments deprecated
 func (node *QueryNode) ReleaseSegments(ctx context.Context, in *queryPb.ReleaseSegmentsRequest) (*commonpb.Status, error) {
 	status := &commonpb.Status{
 		ErrorCode: commonpb.ErrorCode_Success,
@@ -337,20 +337,27 @@ func (node *QueryNode) GetSegmentInfo(ctx context.Context, in *queryPb.GetSegmen
 	}
 	// get info from historical
 	for _, id := range in.SegmentIDs {
+		log.Debug("QueryNode::Impl::GetSegmentInfo for historical", zap.Any("SegmentID", id))
 		segment, err := node.historical.replica.getSegmentByID(id)
 		if err != nil {
+			log.Debug("QueryNode::Impl::GetSegmentInfo, for historical segmentID not exist", zap.Any("SegmentID", id))
 			continue
 		}
 		info := getSegmentInfo(segment)
+		log.Debug("QueryNode::Impl::GetSegmentInfo for historical", zap.Any("SegmentID", id), zap.Any("info", info))
+
 		infos = append(infos, info)
 	}
 	// get info from streaming
 	for _, id := range in.SegmentIDs {
+		log.Debug("QueryNode::Impl::GetSegmentInfo for streaming", zap.Any("SegmentID", id))
 		segment, err := node.streaming.replica.getSegmentByID(id)
 		if err != nil {
+			log.Debug("QueryNode::Impl::GetSegmentInfo, for streaming segmentID not exist", zap.Any("SegmentID", id))
 			continue
 		}
 		info := getSegmentInfo(segment)
+		log.Debug("QueryNode::Impl::GetSegmentInfo for streaming", zap.Any("SegmentID", id), zap.Any("info", info))
 		infos = append(infos, info)
 	}
 	return &queryPb.GetSegmentInfoResponse{
diff --git a/internal/querynode/segment.go b/internal/querynode/segment.go
index 5f620db01..0c356e9c8 100644
--- a/internal/querynode/segment.go
+++ b/internal/querynode/segment.go
@@ -165,10 +165,13 @@ func (s *Segment) getRowCount() int64 {
 		long int
 		getRowCount(CSegmentInterface c_segment);
 	*/
+	segmentPtrIsNil := s.segmentPtr == nil
+	log.Debug("QueryNode::Segment::getRowCount", zap.Any("segmentPtrIsNil", segmentPtrIsNil))
 	if s.segmentPtr == nil {
 		return -1
 	}
 	var rowCount = C.GetRowCount(s.segmentPtr)
+	log.Debug("QueryNode::Segment::getRowCount", zap.Any("rowCount", rowCount))
 	return int64(rowCount)
 }
 
@@ -452,9 +455,12 @@ func (s *Segment) segmentInsert(offset int64, entityIDs *[]UniqueID, timestamps
 		           int sizeof_per_row,
 		           signed long int count);
 	*/
+	log.Debug("QueryNode::Segment::segmentInsert:", zap.Any("segmentType", s.segmentType))
+	log.Debug("QueryNode::Segment::segmentInsert:", zap.Any("enableLoadBinLog", s.enableLoadBinLog))
 	if s.segmentType != segmentTypeGrowing || s.enableLoadBinLog {
 		return nil
 	}
+	log.Debug("QueryNode::Segment::segmentInsert:", zap.Any("s.sgmentPtr", s.segmentPtr))
 
 	if s.segmentPtr == nil {
 		return errors.New("null seg core pointer")
@@ -478,7 +484,7 @@ func (s *Segment) segmentInsert(offset int64, entityIDs *[]UniqueID, timestamps
 	var cTimestampsPtr = (*C.ulong)(&(*timestamps)[0])
 	var cSizeofPerRow = C.int(sizeofPerRow)
 	var cRawDataVoidPtr = unsafe.Pointer(&rawData[0])
-
+	log.Debug("QueryNode::Segment::InsertBegin", zap.Any("cNumOfRows", cNumOfRows))
 	var status = C.Insert(s.segmentPtr,
 		cOffset,
 		cNumOfRows,
@@ -489,11 +495,14 @@ func (s *Segment) segmentInsert(offset int64, entityIDs *[]UniqueID, timestamps
 		cNumOfRows)
 
 	errorCode := status.error_code
+	log.Debug("QueryNode::Segment::InsertEnd", zap.Any("errorCode", errorCode))
 
 	if errorCode != 0 {
 		errorMsg := C.GoString(status.error_msg)
 		defer C.free(unsafe.Pointer(status.error_msg))
-		return errors.New("Insert failed, C runtime error detected, error code = " + strconv.Itoa(int(errorCode)) + ", error msg = " + errorMsg)
+		err := errors.New("Insert failed, C runtime error detected, error code = " + strconv.Itoa(int(errorCode)) + ", error msg = " + errorMsg)
+		log.Debug("QueryNode::Segment::InsertEnd failed", zap.Error(err))
+		return err
 	}
 
 	s.setRecentlyModified(true)
-- 
GitLab