diff --git a/internal/datanode/flow_graph_insert_buffer_node.go b/internal/datanode/flow_graph_insert_buffer_node.go index ce64996ca2e940cd2fa97ef2c98bab0e2666bc97..d375ef4da71ba589de242373c1993ad133d50d40 100644 --- a/internal/datanode/flow_graph_insert_buffer_node.go +++ b/internal/datanode/flow_graph_insert_buffer_node.go @@ -210,6 +210,17 @@ func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { collSchema := collection.schema // 1.2 Get Fields var pos int = 0 // Record position of blob + log.Debug("DataNode flow_graph_insert_buffer_node", zap.Any("Fields", collSchema.Fields)) + var fieldIDs []int64 + var fieldTypes []schemapb.DataType + for _, field := range collSchema.Fields { + fieldIDs = append(fieldIDs, field.FieldID) + fieldTypes = append(fieldTypes, field.DataType) + } + + log.Debug("DataNode flow_graph_insert_buffer_node", zap.Any("FieldIDs", fieldIDs)) + log.Debug("DataNode flow_graph_insert_buffer_node", zap.Any("fieldTypes", fieldTypes)) + for _, field := range collSchema.Fields { switch field.DataType { case schemapb.DataType_FloatVector: diff --git a/internal/proxynode/task.go b/internal/proxynode/task.go index 4839b00f39dabe3170eb8fdc0c3cbce56e8472dc..ad3caf9e8d6678668261014c64595df31a248062 100644 --- a/internal/proxynode/task.go +++ b/internal/proxynode/task.go @@ -301,6 +301,7 @@ func (it *InsertTask) transferColumnBasedRequestToRowBasedData() error { l := len(dTypes) // TODO(dragondriver): big endian or little endian? endian := binary.LittleEndian + printed := false for i := 0; i < rowNum; i++ { blob := &commonpb.Blob{ Value: make([]byte, 0), @@ -376,7 +377,10 @@ func (it *InsertTask) transferColumnBasedRequestToRowBasedData() error { log.Warn("unsupported data type") } } - + if !printed { + log.Debug("ProxyNode, transform", zap.Any("ID", it.ID()), zap.Any("BlobLen", len(blob.Value)), zap.Any("dTypes", dTypes)) + printed = true + } it.RowData = append(it.RowData, blob) } @@ -618,6 +622,7 @@ func (it *InsertTask) _assignSegmentID(stream msgstream.MsgStream, pack *msgstre func (it *InsertTask) Execute(ctx context.Context) error { collectionName := it.BaseInsertTask.CollectionName collSchema, err := globalMetaCache.GetCollectionSchema(ctx, collectionName) + log.Debug("ProxyNode Insert", zap.Any("collSchema", collSchema)) if err != nil { return err } diff --git a/internal/querynode/flow_graph_insert_node.go b/internal/querynode/flow_graph_insert_node.go index c17a0aba1abb0f7f6d0412e2fe0c10743603b072..0ffdeccfabd17e94f86ae5a658c03a98b8aa06f3 100644 --- a/internal/querynode/flow_graph_insert_node.go +++ b/internal/querynode/flow_graph_insert_node.go @@ -136,7 +136,15 @@ func (iNode *insertNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { } func (iNode *insertNode) insert(insertData *InsertData, segmentID int64, wg *sync.WaitGroup) { + log.Debug("QueryNode::iNode::insert", zap.Any("SegmentID", segmentID)) var targetSegment, err = iNode.replica.getSegmentByID(segmentID) + log.Debug("QueryNode::iNode::insert", zap.Any("SegmentID", segmentID), + zap.Any("targetSegment", targetSegment), + zap.Error(err), + zap.Any("SegmentType", targetSegment.segmentType), + zap.Any("enableLoadBinLog", targetSegment.enableLoadBinLog), + ) + if targetSegment.segmentType != segmentTypeGrowing || targetSegment.enableLoadBinLog { wg.Done() return @@ -160,7 +168,7 @@ func (iNode *insertNode) insert(insertData *InsertData, segmentID int64, wg *syn err = targetSegment.segmentInsert(offsets, &ids, ×tamps, &records) if err != nil { - log.Error(err.Error()) + log.Debug("QueryNode: targetSegmentInsert failed", zap.Error(err)) // TODO: add error handling wg.Done() return diff --git a/internal/querynode/impl.go b/internal/querynode/impl.go index 035c60f6b0a9ace205a34c45d65e3dcaf69f36f1..aa91c1b7149eda77d8682c8b8fbcb97ec4f37283 100644 --- a/internal/querynode/impl.go +++ b/internal/querynode/impl.go @@ -295,7 +295,7 @@ func (node *QueryNode) ReleasePartitions(ctx context.Context, in *queryPb.Releas return status, nil } -// deprecated +// ReleaseSegments deprecated func (node *QueryNode) ReleaseSegments(ctx context.Context, in *queryPb.ReleaseSegmentsRequest) (*commonpb.Status, error) { status := &commonpb.Status{ ErrorCode: commonpb.ErrorCode_Success, @@ -337,20 +337,27 @@ func (node *QueryNode) GetSegmentInfo(ctx context.Context, in *queryPb.GetSegmen } // get info from historical for _, id := range in.SegmentIDs { + log.Debug("QueryNode::Impl::GetSegmentInfo for historical", zap.Any("SegmentID", id)) segment, err := node.historical.replica.getSegmentByID(id) if err != nil { + log.Debug("QueryNode::Impl::GetSegmentInfo, for historical segmentID not exist", zap.Any("SegmentID", id)) continue } info := getSegmentInfo(segment) + log.Debug("QueryNode::Impl::GetSegmentInfo for historical", zap.Any("SegmentID", id), zap.Any("info", info)) + infos = append(infos, info) } // get info from streaming for _, id := range in.SegmentIDs { + log.Debug("QueryNode::Impl::GetSegmentInfo for streaming", zap.Any("SegmentID", id)) segment, err := node.streaming.replica.getSegmentByID(id) if err != nil { + log.Debug("QueryNode::Impl::GetSegmentInfo, for streaming segmentID not exist", zap.Any("SegmentID", id)) continue } info := getSegmentInfo(segment) + log.Debug("QueryNode::Impl::GetSegmentInfo for streaming", zap.Any("SegmentID", id), zap.Any("info", info)) infos = append(infos, info) } return &queryPb.GetSegmentInfoResponse{ diff --git a/internal/querynode/segment.go b/internal/querynode/segment.go index 5f620db0109fa585cc005fc2a0570fd8f3790fe6..0c356e9c80de68f3259f327a4e0d90d7af7d412c 100644 --- a/internal/querynode/segment.go +++ b/internal/querynode/segment.go @@ -165,10 +165,13 @@ func (s *Segment) getRowCount() int64 { long int getRowCount(CSegmentInterface c_segment); */ + segmentPtrIsNil := s.segmentPtr == nil + log.Debug("QueryNode::Segment::getRowCount", zap.Any("segmentPtrIsNil", segmentPtrIsNil)) if s.segmentPtr == nil { return -1 } var rowCount = C.GetRowCount(s.segmentPtr) + log.Debug("QueryNode::Segment::getRowCount", zap.Any("rowCount", rowCount)) return int64(rowCount) } @@ -452,9 +455,12 @@ func (s *Segment) segmentInsert(offset int64, entityIDs *[]UniqueID, timestamps int sizeof_per_row, signed long int count); */ + log.Debug("QueryNode::Segment::segmentInsert:", zap.Any("segmentType", s.segmentType)) + log.Debug("QueryNode::Segment::segmentInsert:", zap.Any("enableLoadBinLog", s.enableLoadBinLog)) if s.segmentType != segmentTypeGrowing || s.enableLoadBinLog { return nil } + log.Debug("QueryNode::Segment::segmentInsert:", zap.Any("s.sgmentPtr", s.segmentPtr)) if s.segmentPtr == nil { return errors.New("null seg core pointer") @@ -478,7 +484,7 @@ func (s *Segment) segmentInsert(offset int64, entityIDs *[]UniqueID, timestamps var cTimestampsPtr = (*C.ulong)(&(*timestamps)[0]) var cSizeofPerRow = C.int(sizeofPerRow) var cRawDataVoidPtr = unsafe.Pointer(&rawData[0]) - + log.Debug("QueryNode::Segment::InsertBegin", zap.Any("cNumOfRows", cNumOfRows)) var status = C.Insert(s.segmentPtr, cOffset, cNumOfRows, @@ -489,11 +495,14 @@ func (s *Segment) segmentInsert(offset int64, entityIDs *[]UniqueID, timestamps cNumOfRows) errorCode := status.error_code + log.Debug("QueryNode::Segment::InsertEnd", zap.Any("errorCode", errorCode)) if errorCode != 0 { errorMsg := C.GoString(status.error_msg) defer C.free(unsafe.Pointer(status.error_msg)) - return errors.New("Insert failed, C runtime error detected, error code = " + strconv.Itoa(int(errorCode)) + ", error msg = " + errorMsg) + err := errors.New("Insert failed, C runtime error detected, error code = " + strconv.Itoa(int(errorCode)) + ", error msg = " + errorMsg) + log.Debug("QueryNode::Segment::InsertEnd failed", zap.Error(err)) + return err } s.setRecentlyModified(true)