Skip to content
Snippets Groups Projects
flow_graph_insert_buffer_node.go 25.4 KiB
Newer Older
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.

XuanYang-cn's avatar
XuanYang-cn committed
package datanode

import (
	"bytes"
	"context"
	"encoding/binary"
	"path"
	"strconv"
XuanYang-cn's avatar
XuanYang-cn committed
	"unsafe"

XuanYang-cn's avatar
XuanYang-cn committed
	"go.uber.org/zap"
XuanYang-cn's avatar
XuanYang-cn committed

Xiangyu Wang's avatar
Xiangyu Wang committed
	"github.com/milvus-io/milvus/internal/kv"
	miniokv "github.com/milvus-io/milvus/internal/kv/minio"
	"github.com/milvus-io/milvus/internal/log"
	"github.com/milvus-io/milvus/internal/msgstream"
	"github.com/milvus-io/milvus/internal/storage"
	"github.com/milvus-io/milvus/internal/util/flowgraph"
	"github.com/milvus-io/milvus/internal/util/trace"
	"github.com/opentracing/opentracing-go"
Xiangyu Wang's avatar
Xiangyu Wang committed

	"github.com/milvus-io/milvus/internal/proto/commonpb"
yangxuan's avatar
yangxuan committed
	"github.com/milvus-io/milvus/internal/proto/datapb"
Xiangyu Wang's avatar
Xiangyu Wang committed
	"github.com/milvus-io/milvus/internal/proto/etcdpb"
	"github.com/milvus-io/milvus/internal/proto/internalpb"
	"github.com/milvus-io/milvus/internal/proto/schemapb"
XuanYang-cn's avatar
XuanYang-cn committed
)

const (
	CollectionPrefix = "/collection/"
	SegmentPrefix    = "/segment/"
)

type (
	InsertData = storage.InsertData
	Blob       = storage.Blob
)
type insertBufferNode struct {
	BaseNode
	insertBuffer *insertBuffer
	replica      Replica
	idAllocator  allocatorInterface
	flushChan    <-chan *flushMsg
陈庆祥's avatar
陈庆祥 committed
	minIOKV kv.BaseKV

	timeTickStream          msgstream.MsgStream
	segmentStatisticsStream msgstream.MsgStream

	dsSaveBinlog func(fu *autoFlushUnit) error
neza2017's avatar
neza2017 committed
	openSegList  map[UniqueID]bool
	openSegLock  sync.Mutex
}

type autoFlushUnit struct {
	segID              UniqueID
	field2Path         map[UniqueID]string
	openSegCheckpoints map[UniqueID]internalpb.MsgPosition
	flushed            bool
}

type insertBuffer struct {
	insertData map[UniqueID]*InsertData // SegmentID to InsertData
	maxSize    int32
}
XuanYang-cn's avatar
XuanYang-cn committed

func (ib *insertBuffer) size(segmentID UniqueID) int32 {
	if ib.insertData == nil || len(ib.insertData) <= 0 {
		return 0
	}
	idata, ok := ib.insertData[segmentID]
	if !ok {
		return 0
	}

	var maxSize int32 = 0
	for _, data := range idata.Data {
		fdata, ok := data.(*storage.FloatVectorFieldData)
		if ok && int32(fdata.NumRows) > maxSize {
			maxSize = int32(fdata.NumRows)
		}

		bdata, ok := data.(*storage.BinaryVectorFieldData)
		if ok && int32(bdata.NumRows) > maxSize {
			maxSize = int32(bdata.NumRows)
		}

	}
	return maxSize
}

func (ib *insertBuffer) full(segmentID UniqueID) bool {
	log.Debug("Segment size", zap.Any("segment", segmentID), zap.Int32("size", ib.size(segmentID)), zap.Int32("maxsize", ib.maxSize))
XuanYang-cn's avatar
XuanYang-cn committed
	return ib.size(segmentID) >= ib.maxSize
}

func (ibNode *insertBufferNode) Name() string {
	return "ibNode"
}

func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
XuanYang-cn's avatar
XuanYang-cn committed

	if len(in) != 1 {
XuanYang-cn's avatar
XuanYang-cn committed
		log.Error("Invalid operate message input in insertBufferNode", zap.Int("input length", len(in)))
XuanYang-cn's avatar
XuanYang-cn committed
		// TODO: add error handling
	}

	iMsg, ok := in[0].(*insertMsg)
XuanYang-cn's avatar
XuanYang-cn committed
	if !ok {
XuanYang-cn's avatar
XuanYang-cn committed
		log.Error("type assertion failed for insertMsg")
XuanYang-cn's avatar
XuanYang-cn committed
		// TODO: add error handling
	}

Yihao Dai's avatar
Yihao Dai committed
	if iMsg == nil {
		return []Msg{}
	}

	var spans []opentracing.Span
	for _, msg := range iMsg.insertMessages {
		sp, ctx := trace.StartSpanFromContext(msg.TraceCtx())
		spans = append(spans, sp)
		msg.SetTraceCtx(ctx)
XuanYang-cn's avatar
XuanYang-cn committed
	// Updating segment statistics
	uniqueSeg := make(map[UniqueID]int64)
	for _, msg := range iMsg.insertMessages {
		currentSegID := msg.GetSegmentID()
XuanYang-cn's avatar
XuanYang-cn committed
		collID := msg.GetCollectionID()
		partitionID := msg.GetPartitionID()

		if !ibNode.replica.hasSegment(currentSegID) {
XuanYang-cn's avatar
XuanYang-cn committed
			err := ibNode.replica.addSegment(currentSegID, collID, partitionID, msg.GetChannelID())
XuanYang-cn's avatar
XuanYang-cn committed
			if err != nil {
XuanYang-cn's avatar
XuanYang-cn committed
				log.Error("add segment wrong", zap.Error(err))
			// set msg pack start positions
			// this position is the start position of current segment, not start position of current MsgPack
			// so setStartPositions will only call once when meet new segment
			ibNode.replica.setStartPositions(currentSegID, iMsg.startPositions)
		segNum := uniqueSeg[currentSegID]
		uniqueSeg[currentSegID] = segNum + int64(len(msg.RowIDs))
	segToUpdate := make([]UniqueID, 0, len(uniqueSeg))
	for id, num := range uniqueSeg {
		segToUpdate = append(segToUpdate, id)

		err := ibNode.replica.updateStatistics(id, num)
		if err != nil {
			log.Error("update Segment Row number wrong", zap.Error(err))
		}
	if len(segToUpdate) > 0 {
		err := ibNode.updateSegStatistics(segToUpdate)
		if err != nil {
			log.Error("update segment statistics error", zap.Error(err))
XuanYang-cn's avatar
XuanYang-cn committed
	// iMsg is insertMsg
	// 1. iMsg -> buffer
	for _, msg := range iMsg.insertMessages {
		if len(msg.RowIDs) != len(msg.Timestamps) || len(msg.RowIDs) != len(msg.RowData) {
XuanYang-cn's avatar
XuanYang-cn committed
			log.Error("misaligned messages detected")
XuanYang-cn's avatar
XuanYang-cn committed
			continue
		}
		currentSegID := msg.GetSegmentID()
XuanYang-cn's avatar
XuanYang-cn committed
		collectionID := msg.GetCollectionID()
XuanYang-cn's avatar
XuanYang-cn committed

		idata, ok := ibNode.insertBuffer.insertData[currentSegID]
		if !ok {
			idata = &InsertData{
				Data: make(map[UniqueID]storage.FieldData),
			}
		}

XuanYang-cn's avatar
XuanYang-cn committed
		// 1.1 Get CollectionMeta
		collection, err := ibNode.replica.getCollectionByID(collectionID)
XuanYang-cn's avatar
XuanYang-cn committed
		if err != nil {
			// GOOSE TODO add error handler
XuanYang-cn's avatar
XuanYang-cn committed
			log.Error("Get meta wrong:", zap.Error(err))
XuanYang-cn's avatar
XuanYang-cn committed
			continue
		}

		collSchema := collection.schema
		// 1.2 Get Fields
		var pos int = 0 // Record position of blob
		log.Debug("DataNode flow_graph_insert_buffer_node", zap.Any("Fields", collSchema.Fields))
		var fieldIDs []int64
		var fieldTypes []schemapb.DataType
		for _, field := range collSchema.Fields {
			fieldIDs = append(fieldIDs, field.FieldID)
			fieldTypes = append(fieldTypes, field.DataType)
		}

		log.Debug("DataNode flow_graph_insert_buffer_node", zap.Any("FieldIDs", fieldIDs))
		log.Debug("DataNode flow_graph_insert_buffer_node", zap.Any("fieldTypes", fieldTypes))

XuanYang-cn's avatar
XuanYang-cn committed
		for _, field := range collSchema.Fields {
			switch field.DataType {
陈庆祥's avatar
陈庆祥 committed
			case schemapb.DataType_FloatVector:
XuanYang-cn's avatar
XuanYang-cn committed
				var dim int
				for _, t := range field.TypeParams {
					if t.Key == "dim" {
						dim, err = strconv.Atoi(t.Value)
						if err != nil {
							log.Error("strconv wrong on get dim", zap.Error(err))
XuanYang-cn's avatar
XuanYang-cn committed
						}
						break
					}
				}
				if dim <= 0 {
XuanYang-cn's avatar
XuanYang-cn committed
					log.Error("invalid dim")
					continue
XuanYang-cn's avatar
XuanYang-cn committed
					// TODO: add error handling
				}

				if _, ok := idata.Data[field.FieldID]; !ok {
					idata.Data[field.FieldID] = &storage.FloatVectorFieldData{
						NumRows: 0,
						Data:    make([]float32, 0),
						Dim:     dim,
					}
				}

				fieldData := idata.Data[field.FieldID].(*storage.FloatVectorFieldData)

				var offset int
				for _, blob := range msg.RowData {
					offset = 0
					for j := 0; j < dim; j++ {
						var v float32
						buf := bytes.NewBuffer(blob.GetValue()[pos+offset:])
						if err := binary.Read(buf, binary.LittleEndian, &v); err != nil {
XuanYang-cn's avatar
XuanYang-cn committed
							log.Error("binary.read float32 wrong", zap.Error(err))
XuanYang-cn's avatar
XuanYang-cn committed
						}
						fieldData.Data = append(fieldData.Data, v)
						offset += int(unsafe.Sizeof(*(&v)))
					}
				}
				pos += offset
				fieldData.NumRows += len(msg.RowIDs)

陈庆祥's avatar
陈庆祥 committed
			case schemapb.DataType_BinaryVector:
XuanYang-cn's avatar
XuanYang-cn committed
				var dim int
				for _, t := range field.TypeParams {
					if t.Key == "dim" {
						dim, err = strconv.Atoi(t.Value)
						if err != nil {
XuanYang-cn's avatar
XuanYang-cn committed
							log.Error("strconv wrong")
XuanYang-cn's avatar
XuanYang-cn committed
						}
						break
					}
				}
				if dim <= 0 {
XuanYang-cn's avatar
XuanYang-cn committed
					log.Error("invalid dim")
XuanYang-cn's avatar
XuanYang-cn committed
					// TODO: add error handling
				}

				if _, ok := idata.Data[field.FieldID]; !ok {
					idata.Data[field.FieldID] = &storage.BinaryVectorFieldData{
						NumRows: 0,
						Data:    make([]byte, 0),
						Dim:     dim,
					}
				}
				fieldData := idata.Data[field.FieldID].(*storage.BinaryVectorFieldData)

				var offset int
				for _, blob := range msg.RowData {
xige-16's avatar
xige-16 committed
					bv := blob.GetValue()[pos : pos+(dim/8)]
XuanYang-cn's avatar
XuanYang-cn committed
					fieldData.Data = append(fieldData.Data, bv...)
					offset = len(bv)
				}
				pos += offset
				fieldData.NumRows += len(msg.RowData)

陈庆祥's avatar
陈庆祥 committed
			case schemapb.DataType_Bool:
XuanYang-cn's avatar
XuanYang-cn committed
				if _, ok := idata.Data[field.FieldID]; !ok {
					idata.Data[field.FieldID] = &storage.BoolFieldData{
						NumRows: 0,
						Data:    make([]bool, 0),
					}
				}

				fieldData := idata.Data[field.FieldID].(*storage.BoolFieldData)
				var v bool
				for _, blob := range msg.RowData {
					buf := bytes.NewReader(blob.GetValue()[pos:])
					if err := binary.Read(buf, binary.LittleEndian, &v); err != nil {
XuanYang-cn's avatar
XuanYang-cn committed
						log.Error("binary.Read bool wrong", zap.Error(err))
XuanYang-cn's avatar
XuanYang-cn committed
					}
					fieldData.Data = append(fieldData.Data, v)

				}
				pos += int(unsafe.Sizeof(*(&v)))
				fieldData.NumRows += len(msg.RowIDs)

陈庆祥's avatar
陈庆祥 committed
			case schemapb.DataType_Int8:
XuanYang-cn's avatar
XuanYang-cn committed
				if _, ok := idata.Data[field.FieldID]; !ok {
					idata.Data[field.FieldID] = &storage.Int8FieldData{
						NumRows: 0,
						Data:    make([]int8, 0),
					}
				}

				fieldData := idata.Data[field.FieldID].(*storage.Int8FieldData)
				var v int8
				for _, blob := range msg.RowData {
					buf := bytes.NewReader(blob.GetValue()[pos:])
					if err := binary.Read(buf, binary.LittleEndian, &v); err != nil {
XuanYang-cn's avatar
XuanYang-cn committed
						log.Error("binary.Read int8 wrong", zap.Error(err))
XuanYang-cn's avatar
XuanYang-cn committed
					}
					fieldData.Data = append(fieldData.Data, v)
				}
				pos += int(unsafe.Sizeof(*(&v)))
				fieldData.NumRows += len(msg.RowIDs)

陈庆祥's avatar
陈庆祥 committed
			case schemapb.DataType_Int16:
XuanYang-cn's avatar
XuanYang-cn committed
				if _, ok := idata.Data[field.FieldID]; !ok {
					idata.Data[field.FieldID] = &storage.Int16FieldData{
						NumRows: 0,
						Data:    make([]int16, 0),
					}
				}

				fieldData := idata.Data[field.FieldID].(*storage.Int16FieldData)
				var v int16
				for _, blob := range msg.RowData {
					buf := bytes.NewReader(blob.GetValue()[pos:])
					if err := binary.Read(buf, binary.LittleEndian, &v); err != nil {
XuanYang-cn's avatar
XuanYang-cn committed
						log.Error("binary.Read int16 wrong", zap.Error(err))
XuanYang-cn's avatar
XuanYang-cn committed
					}
					fieldData.Data = append(fieldData.Data, v)
				}
				pos += int(unsafe.Sizeof(*(&v)))
				fieldData.NumRows += len(msg.RowIDs)

陈庆祥's avatar
陈庆祥 committed
			case schemapb.DataType_Int32:
XuanYang-cn's avatar
XuanYang-cn committed
				if _, ok := idata.Data[field.FieldID]; !ok {
					idata.Data[field.FieldID] = &storage.Int32FieldData{
						NumRows: 0,
						Data:    make([]int32, 0),
					}
				}

				fieldData := idata.Data[field.FieldID].(*storage.Int32FieldData)
				var v int32
				for _, blob := range msg.RowData {
					buf := bytes.NewReader(blob.GetValue()[pos:])
					if err := binary.Read(buf, binary.LittleEndian, &v); err != nil {
XuanYang-cn's avatar
XuanYang-cn committed
						log.Error("binary.Read int32 wrong", zap.Error(err))
XuanYang-cn's avatar
XuanYang-cn committed
					}
					fieldData.Data = append(fieldData.Data, v)
				}
				pos += int(unsafe.Sizeof(*(&v)))
				fieldData.NumRows += len(msg.RowIDs)

陈庆祥's avatar
陈庆祥 committed
			case schemapb.DataType_Int64:
XuanYang-cn's avatar
XuanYang-cn committed
				if _, ok := idata.Data[field.FieldID]; !ok {
					idata.Data[field.FieldID] = &storage.Int64FieldData{
						NumRows: 0,
						Data:    make([]int64, 0),
					}
				}

				fieldData := idata.Data[field.FieldID].(*storage.Int64FieldData)
				switch field.FieldID {
				case 0: // rowIDs
					fieldData.Data = append(fieldData.Data, msg.RowIDs...)
					fieldData.NumRows += len(msg.RowIDs)
				case 1: // Timestamps
					for _, ts := range msg.Timestamps {
						fieldData.Data = append(fieldData.Data, int64(ts))
					}
					fieldData.NumRows += len(msg.Timestamps)
				default:
					var v int64
					for _, blob := range msg.RowData {
						buf := bytes.NewBuffer(blob.GetValue()[pos:])
						if err := binary.Read(buf, binary.LittleEndian, &v); err != nil {
XuanYang-cn's avatar
XuanYang-cn committed
							log.Error("binary.Read int64 wrong", zap.Error(err))
XuanYang-cn's avatar
XuanYang-cn committed
						}
						fieldData.Data = append(fieldData.Data, v)
					}
					pos += int(unsafe.Sizeof(*(&v)))
					fieldData.NumRows += len(msg.RowIDs)
				}

陈庆祥's avatar
陈庆祥 committed
			case schemapb.DataType_Float:
XuanYang-cn's avatar
XuanYang-cn committed
				if _, ok := idata.Data[field.FieldID]; !ok {
					idata.Data[field.FieldID] = &storage.FloatFieldData{
						NumRows: 0,
						Data:    make([]float32, 0),
					}
				}

				fieldData := idata.Data[field.FieldID].(*storage.FloatFieldData)
				var v float32
				for _, blob := range msg.RowData {
					buf := bytes.NewBuffer(blob.GetValue()[pos:])
					if err := binary.Read(buf, binary.LittleEndian, &v); err != nil {
XuanYang-cn's avatar
XuanYang-cn committed
						log.Error("binary.Read float32 wrong", zap.Error(err))
XuanYang-cn's avatar
XuanYang-cn committed
					}
					fieldData.Data = append(fieldData.Data, v)
				}
				pos += int(unsafe.Sizeof(*(&v)))
				fieldData.NumRows += len(msg.RowIDs)

陈庆祥's avatar
陈庆祥 committed
			case schemapb.DataType_Double:
XuanYang-cn's avatar
XuanYang-cn committed
				if _, ok := idata.Data[field.FieldID]; !ok {
					idata.Data[field.FieldID] = &storage.DoubleFieldData{
						NumRows: 0,
						Data:    make([]float64, 0),
					}
				}

				fieldData := idata.Data[field.FieldID].(*storage.DoubleFieldData)
				var v float64
				for _, blob := range msg.RowData {
					buf := bytes.NewBuffer(blob.GetValue()[pos:])
					if err := binary.Read(buf, binary.LittleEndian, &v); err != nil {
XuanYang-cn's avatar
XuanYang-cn committed
						log.Error("binary.Read float64 wrong", zap.Error(err))
XuanYang-cn's avatar
XuanYang-cn committed
					}
					fieldData.Data = append(fieldData.Data, v)
				}

				pos += int(unsafe.Sizeof(*(&v)))
				fieldData.NumRows += len(msg.RowIDs)
			}
		}

		// 1.3 store in buffer
		ibNode.insertBuffer.insertData[currentSegID] = idata

		// store current endPositions as Segment->EndPostion
		ibNode.replica.setEndPositions(currentSegID, iMsg.endPositions)
XuanYang-cn's avatar
XuanYang-cn committed
	}

	if len(iMsg.insertMessages) > 0 {
XuanYang-cn's avatar
XuanYang-cn committed
		log.Debug("---insert buffer status---")
XuanYang-cn's avatar
XuanYang-cn committed
		var stopSign int = 0
		for k := range ibNode.insertBuffer.insertData {
			if stopSign >= 10 {
XuanYang-cn's avatar
XuanYang-cn committed
				log.Debug("......")
XuanYang-cn's avatar
XuanYang-cn committed
				break
			}
XuanYang-cn's avatar
XuanYang-cn committed
			log.Debug("seg buffer status", zap.Int64("segmentID", k), zap.Int32("buffer size", ibNode.insertBuffer.size(k)))
XuanYang-cn's avatar
XuanYang-cn committed
			stopSign++
		}
	}

	finishCh := make(chan autoFlushUnit, len(segToUpdate))
	finishCnt := sync.WaitGroup{}
	for _, segToFlush := range segToUpdate {
		// If full, auto flush
		if ibNode.insertBuffer.full(segToFlush) {
			log.Debug(". Insert Buffer full, auto flushing ",
				zap.Int32("num of rows", ibNode.insertBuffer.size(segToFlush)))
XuanYang-cn's avatar
XuanYang-cn committed

			collMeta, err := ibNode.getCollMetabySegID(segToFlush)
			if err != nil {
				log.Error("Auto flush failed .. cannot get collection meta ..", zap.Error(err))
				continue
			}
			ibNode.flushMap.Store(segToFlush, ibNode.insertBuffer.insertData[segToFlush])
			delete(ibNode.insertBuffer.insertData, segToFlush)

			collID, partitionID, err := ibNode.getCollectionandPartitionIDbySegID(segToFlush)
			if err != nil {
				log.Error("Auto flush failed .. cannot get collection ID or partition ID..", zap.Error(err))
XuanYang-cn's avatar
XuanYang-cn committed
			}
			finishCnt.Add(1)
			go flushSegment(collMeta, segToFlush, partitionID, collID,
neza2017's avatar
neza2017 committed
				&ibNode.flushMap, ibNode.minIOKV, finishCh, &finishCnt, ibNode, ibNode.idAllocator)
		}
	}
	finishCnt.Wait()
	close(finishCh)
	for fu := range finishCh {
		if fu.field2Path == nil {
			log.Debug("segment is empty")
			continue
		}
neza2017's avatar
neza2017 committed
		fu.openSegCheckpoints, fu.numRows = ibNode.listOpenSegmentCheckPointAndNumRows()
		fu.flushed = false
		if err := ibNode.dsSaveBinlog(&fu); err != nil {
			log.Debug("data service save bin log path failed", zap.Error(err))
		}
	}

	// iMsg is Flush() msg from dataservice
	//   1. insertBuffer(not empty) -> binLogs -> minIO/S3
	select {
	case fmsg := <-ibNode.flushChan:
		currentSegID := fmsg.segmentID
		log.Debug(". Receiving flush message", zap.Int64("segmentID", currentSegID))

		if ibNode.insertBuffer.size(currentSegID) <= 0 {
			log.Debug(".. Buffer empty ...")
neza2017's avatar
neza2017 committed
			c, n := ibNode.listOpenSegmentCheckPointAndNumRows()
			ibNode.dsSaveBinlog(&autoFlushUnit{
				segID:              currentSegID,
				openSegCheckpoints: c,
				flushed:            true,
			})
neza2017's avatar
neza2017 committed
			ibNode.removeSegmentCheckPoint(fmsg.segmentID)
			fmsg.dmlFlushedCh <- []*datapb.ID2PathList{{ID: currentSegID, Paths: []string{}}}
			log.Debug(".. Buffer not empty, flushing ..")
			finishCh := make(chan autoFlushUnit, 1)

			ibNode.flushMap.Store(currentSegID, ibNode.insertBuffer.insertData[currentSegID])
			delete(ibNode.insertBuffer.insertData, currentSegID)
			clearFn := func() {
				finishCh <- autoFlushUnit{field2Path: nil}
				log.Debug(".. Clearing flush Buffer ..")
				ibNode.flushMap.Delete(currentSegID)
				close(finishCh)
				fmsg.dmlFlushedCh <- []*datapb.ID2PathList{{ID: currentSegID, Paths: nil}}
			var collMeta *etcdpb.CollectionMeta
			var collSch *schemapb.CollectionSchema
			seg, err := ibNode.replica.getSegmentByID(currentSegID)
			if err != nil {
				log.Error("Flush failed .. cannot get segment ..", zap.Error(err))
				clearFn()
				// TODO add error handling
			collSch, err = ibNode.getCollectionSchemaByID(seg.collectionID)
				log.Error("Flush failed .. cannot get collection schema ..", zap.Error(err))
				clearFn()
				// TODO add error handling
			collMeta = &etcdpb.CollectionMeta{
				Schema: collSch,
				ID:     seg.collectionID,
			}

			flushSegment(collMeta, currentSegID, seg.partitionID, seg.collectionID,
neza2017's avatar
neza2017 committed
				&ibNode.flushMap, ibNode.minIOKV, finishCh, nil, ibNode, ibNode.idAllocator)
			fu := <-finishCh
			close(finishCh)
			if fu.field2Path != nil {
neza2017's avatar
neza2017 committed
				fu.openSegCheckpoints, fu.numRows = ibNode.listOpenSegmentCheckPointAndNumRows()
				fu.flushed = true
				if ibNode.dsSaveBinlog(&fu) != nil {
					log.Debug("data service save bin log path failed", zap.Error(err))
				} else {
					// this segment has flushed, so it's not `open segment`, so remove from the check point
neza2017's avatar
neza2017 committed
					ibNode.removeSegmentCheckPoint(fu.segID)
				}
			}
			fmsg.dmlFlushedCh <- []*datapb.ID2PathList{{ID: currentSegID, Paths: []string{}}}
	// TODO write timetick
	if err := ibNode.writeHardTimeTick(iMsg.timeRange.timestampMax); err != nil {
		log.Error("send hard time tick into pulsar channel failed", zap.Error(err))
	}
XuanYang-cn's avatar
XuanYang-cn committed

	for _, sp := range spans {
		sp.Finish()
	}
XuanYang-cn's avatar
XuanYang-cn committed

	return nil
func flushSegment(collMeta *etcdpb.CollectionMeta, segID, partitionID, collID UniqueID,
	insertData *sync.Map, kv kv.BaseKV, flushUnit chan<- autoFlushUnit, wgFinish *sync.WaitGroup,
neza2017's avatar
neza2017 committed
	ibNode *insertBufferNode, idAllocator allocatorInterface) {
	if wgFinish != nil {
		defer wgFinish.Done()
	}
XuanYang-cn's avatar
XuanYang-cn committed

	clearFn := func(isSuccess bool) {
		if !isSuccess {
			flushUnit <- autoFlushUnit{field2Path: nil}
		log.Debug(".. Clearing flush Buffer ..")
		insertData.Delete(segID)
XuanYang-cn's avatar
XuanYang-cn committed

	inCodec := storage.NewInsertCodec(collMeta)

	// buffer data to binlogs
	data, ok := insertData.Load(segID)
	if !ok {
		log.Error("Flush failed ... cannot load insertData ..")
		clearFn(false)
XuanYang-cn's avatar
XuanYang-cn committed

	binLogs, statsBinlogs, err := inCodec.Serialize(partitionID, segID, data.(*InsertData))
XuanYang-cn's avatar
XuanYang-cn committed
	if err != nil {
		log.Error("Flush failed ... cannot generate binlog ..", zap.Error(err))
		clearFn(false)
	log.Debug(".. Saving binlogs to MinIO ..", zap.Int("number", len(binLogs)))
	field2Path := make(map[UniqueID]string, len(binLogs))
	kvs := make(map[string]string, len(binLogs))
	paths := make([]string, 0, len(binLogs))
	field2Logidx := make(map[UniqueID]UniqueID, len(binLogs))

	// write insert binlog
	for _, blob := range binLogs {
		fieldID, err := strconv.ParseInt(blob.GetKey(), 10, 64)
		log.Debug("save binlog", zap.Int64("fieldID", fieldID))
XuanYang-cn's avatar
XuanYang-cn committed
		if err != nil {
			log.Error("Flush failed ... cannot parse string to fieldID ..", zap.Error(err))
			clearFn(false)
		logidx, err := idAllocator.allocID()
XuanYang-cn's avatar
XuanYang-cn committed
		if err != nil {
			log.Error("Flush failed ... cannot alloc ID ..", zap.Error(err))
			clearFn(false)
		// no error raise if alloc=false
		k, _ := idAllocator.genKey(false, collID, partitionID, segID, fieldID, logidx)

		key := path.Join(Params.InsertBinlogRootPath, k)
		paths = append(paths, key)
		kvs[key] = string(blob.Value[:])
		field2Path[fieldID] = key
		field2Logidx[fieldID] = logidx
	}

	// write stats binlog
	for _, blob := range statsBinlogs {
		fieldID, err := strconv.ParseInt(blob.GetKey(), 10, 64)
		if err != nil {
			log.Error("Flush failed ... cannot parse string to fieldID ..", zap.Error(err))
			clearFn(false)
			return
		}

		logidx := field2Logidx[fieldID]

		// no error raise if alloc=false
		k, _ := idAllocator.genKey(false, collID, partitionID, segID, fieldID, logidx)

		key := path.Join(Params.StatsBinlogRootPath, k)
		kvs[key] = string(blob.Value[:])
	log.Debug("save binlog file to MinIO/S3")

	err = kv.MultiSave(kvs)
	if err != nil {
		log.Error("Flush failed ... cannot save to MinIO ..", zap.Error(err))
		_ = kv.MultiRemove(paths)
		clearFn(false)
XuanYang-cn's avatar
XuanYang-cn committed

neza2017's avatar
neza2017 committed
	ibNode.setSegmentCheckPoint(segID)
	flushUnit <- autoFlushUnit{collID: collID, segID: segID, field2Path: field2Path}
neza2017's avatar
neza2017 committed
func (ibNode *insertBufferNode) setSegmentCheckPoint(segID UniqueID) {
	ibNode.openSegLock.Lock()
	defer ibNode.openSegLock.Unlock()
	ibNode.openSegList[segID] = true
}
func (ibNode *insertBufferNode) removeSegmentCheckPoint(segID UniqueID) {
	ibNode.openSegLock.Lock()
	defer ibNode.openSegLock.Unlock()
	delete(ibNode.openSegList, segID)
}
func (ibNode *insertBufferNode) listOpenSegmentCheckPointAndNumRows() (map[UniqueID]internalpb.MsgPosition, map[UniqueID]int64) {
	ibNode.openSegLock.Lock()
	defer ibNode.openSegLock.Unlock()
	segs := make([]UniqueID, 0, len(ibNode.openSegList))
	for k := range ibNode.openSegList {
		segs = append(segs, k)
	}
	return ibNode.replica.listOpenSegmentCheckPointAndNumRows(segs)
}

XuanYang-cn's avatar
XuanYang-cn committed
func (ibNode *insertBufferNode) writeHardTimeTick(ts Timestamp) error {
	msgPack := msgstream.MsgPack{}
	timeTickMsg := msgstream.TimeTickMsg{
		BaseMsg: msgstream.BaseMsg{
			BeginTimestamp: ts,
			EndTimestamp:   ts,
			HashValues:     []uint32{0},
		},
陈庆祥's avatar
陈庆祥 committed
		TimeTickMsg: internalpb.TimeTickMsg{
XuanYang-cn's avatar
XuanYang-cn committed
			Base: &commonpb.MsgBase{
				MsgType:   commonpb.MsgType_TimeTick,
XuanYang-cn's avatar
XuanYang-cn committed
				MsgID:     0,  // GOOSE TODO
				Timestamp: ts, // GOOSE TODO
				SourceID:  Params.NodeID,
XuanYang-cn's avatar
XuanYang-cn committed
			},
		},
	}
	msgPack.Msgs = append(msgPack.Msgs, &timeTickMsg)
	return ibNode.timeTickStream.Produce(&msgPack)
func (ibNode *insertBufferNode) updateSegStatistics(segIDs []UniqueID) error {
XuanYang-cn's avatar
XuanYang-cn committed
	log.Debug("Updating segments statistics...")
陈庆祥's avatar
陈庆祥 committed
	statsUpdates := make([]*internalpb.SegmentStatisticsUpdates, 0, len(segIDs))
	for _, segID := range segIDs {
		updates, err := ibNode.replica.getSegmentStatisticsUpdates(segID)
		if err != nil {
XuanYang-cn's avatar
XuanYang-cn committed
			log.Error("get segment statistics updates wrong", zap.Int64("segmentID", segID), zap.Error(err))
			continue
		}
		statsUpdates = append(statsUpdates, updates)
	}

陈庆祥's avatar
陈庆祥 committed
	segStats := internalpb.SegmentStatistics{
		Base: &commonpb.MsgBase{
			MsgType:   commonpb.MsgType_SegmentStatistics,
			MsgID:     UniqueID(0),  // GOOSE TODO
			Timestamp: Timestamp(0), // GOOSE TODO
			SourceID:  Params.NodeID,
		},
		SegStats: statsUpdates,
	}

	var msg msgstream.TsMsg = &msgstream.SegmentStatisticsMsg{
		BaseMsg: msgstream.BaseMsg{
XuanYang-cn's avatar
XuanYang-cn committed
			HashValues: []uint32{0}, // GOOSE TODO
		},
		SegmentStatistics: segStats,
	}

	var msgPack = msgstream.MsgPack{
		Msgs: []msgstream.TsMsg{msg},
	}
	return ibNode.segmentStatisticsStream.Produce(&msgPack)
}

func (ibNode *insertBufferNode) getCollectionSchemaByID(collectionID UniqueID) (*schemapb.CollectionSchema, error) {
	ret, err := ibNode.replica.getCollectionByID(collectionID)
	if err != nil {
		return nil, err
	}
	return ret.schema, nil
}

func (ibNode *insertBufferNode) getCollMetabySegID(segmentID UniqueID) (meta *etcdpb.CollectionMeta, err error) {
	ret, err := ibNode.replica.getSegmentByID(segmentID)
	if err != nil {
		return
	}
	meta = &etcdpb.CollectionMeta{}
	meta.ID = ret.collectionID

	coll, err := ibNode.replica.getCollectionByID(ret.collectionID)
	if err != nil {
		return
	}
	meta.Schema = coll.GetSchema()
	return
}

func (ibNode *insertBufferNode) getCollectionandPartitionIDbySegID(segmentID UniqueID) (collID, partitionID UniqueID, err error) {
	seg, err := ibNode.replica.getSegmentByID(segmentID)
	if err != nil {
		return
	}
	collID = seg.collectionID
	partitionID = seg.partitionID
	return
}

func newInsertBufferNode(
	ctx context.Context,
	replica Replica,
	factory msgstream.Factory,
	idAllocator allocatorInterface,
	flushCh <-chan *flushMsg,
	saveBinlog func(*autoFlushUnit) error,
) *insertBufferNode {

XuanYang-cn's avatar
XuanYang-cn committed
	maxQueueLength := Params.FlowGraphMaxQueueLength
	maxParallelism := Params.FlowGraphMaxParallelism

	baseNode := BaseNode{}
	baseNode.SetMaxQueueLength(maxQueueLength)
	baseNode.SetMaxParallelism(maxParallelism)

	maxSize := Params.FlushInsertBufferSize
	iBuffer := &insertBuffer{
		insertData: make(map[UniqueID]*InsertData),
		maxSize:    maxSize,
	}

	// MinIO
	option := &miniokv.Option{
		Address:           Params.MinioAddress,
		AccessKeyID:       Params.MinioAccessKeyID,
		SecretAccessKeyID: Params.MinioSecretAccessKey,
		UseSSL:            Params.MinioUseSSL,
		CreateBucket:      true,
		BucketName:        Params.MinioBucketName,
	}

	minIOKV, err := miniokv.NewMinIOKV(ctx, option)
	if err != nil {
		panic(err)
	}

	//input stream, data node time tick
zhenshan.cao's avatar
zhenshan.cao committed
	wTt, _ := factory.NewMsgStream(ctx)
	wTt.AsProducer([]string{Params.TimeTickChannelName})
Xiangyu Wang's avatar
Xiangyu Wang committed
	log.Debug("datanode AsProducer: " + Params.TimeTickChannelName)
	var wTtMsgStream msgstream.MsgStream = wTt
	wTtMsgStream.Start()

	// update statistics channel
zhenshan.cao's avatar
zhenshan.cao committed
	segS, _ := factory.NewMsgStream(ctx)
	segS.AsProducer([]string{Params.SegmentStatisticsChannelName})
Xiangyu Wang's avatar
Xiangyu Wang committed
	log.Debug("datanode AsProducer: " + Params.SegmentStatisticsChannelName)
	var segStatisticsMsgStream msgstream.MsgStream = segS
	segStatisticsMsgStream.Start()
XuanYang-cn's avatar
XuanYang-cn committed
	return &insertBufferNode{
		BaseNode:     baseNode,
		insertBuffer: iBuffer,
		minIOKV:      minIOKV,

XuanYang-cn's avatar
XuanYang-cn committed
		timeTickStream:          wTtMsgStream,
		segmentStatisticsStream: segStatisticsMsgStream,
		replica:      replica,
		flushMap:     sync.Map{},
		flushChan:    flushCh,
		idAllocator:  idAllocator,
		dsSaveBinlog: saveBinlog,
neza2017's avatar
neza2017 committed
		openSegList:  make(map[UniqueID]bool),