Skip to content
Snippets Groups Projects
Commit e96d39bf authored by XuanYang-cn's avatar XuanYang-cn Committed by zhenshan.cao
Browse files

refactor datanode flowgraph (#5538)


Signed-off-by: default avataryangxuan <xuan.yang@zilliz.com>
parent f1ccbb8f
No related branches found
No related tags found
No related merge requests found
......@@ -341,7 +341,6 @@ func (node *DataNode) FlushSegments(ctx context.Context, req *datapb.FlushSegmen
return status, nil
}
ddlFlushedCh := make(chan []*datapb.DDLBinlogMeta)
dmlFlushedCh := make(chan []*datapb.ID2PathList)
flushmsg := &flushMsg{
......@@ -349,7 +348,6 @@ func (node *DataNode) FlushSegments(ctx context.Context, req *datapb.FlushSegmen
timestamp: req.Base.Timestamp,
segmentID: id,
collectionID: req.CollectionID,
ddlFlushedCh: ddlFlushedCh,
dmlFlushedCh: dmlFlushedCh,
}
......@@ -372,28 +370,6 @@ func (node *DataNode) FlushSegments(ctx context.Context, req *datapb.FlushSegmen
req.Field2BinlogPaths = meta
log.Info("Insert messeges flush done!", zap.Any("Binlog paths", meta))
}
case chan []*datapb.DDLBinlogMeta:
select {
case <-time.After(300 * time.Second):
return
case meta := <-Ch:
if meta == nil {
log.Info("Ddl messages flush failed!")
// Modify req to confirm failure
return
}
if len(meta) == 0 {
log.Info("Ddl messages flush Done")
// Modify req with empty ddl binlog paths and position
return
}
// Modify req with valid ddl binlog paths
req.DdlBinlogPaths = meta
log.Info("Ddl messages flush done!", zap.Any("Binlog paths", meta))
}
default:
log.Error("Not supported type")
}
......@@ -414,8 +390,6 @@ func (node *DataNode) FlushSegments(ctx context.Context, req *datapb.FlushSegmen
var wg sync.WaitGroup
wg.Add(1)
go waitReceive(&wg, ddlFlushedCh, req)
wg.Add(1)
go waitReceive(&wg, dmlFlushedCh, req)
wg.Wait()
......
......@@ -84,56 +84,27 @@ func (dsService *dataSyncService) initNodes(vchanPair *datapb.VchannelPair) {
}
var dmStreamNode Node = newDmInputNode(dsService.ctx, dsService.msFactory, vchanPair.GetDmlVchannelName(), vchanPair.GetDmlPosition())
var ddStreamNode Node = newDDInputNode(dsService.ctx, dsService.msFactory, vchanPair.GetDdlVchannelName(), vchanPair.GetDdlPosition())
var filterDmNode Node = newFilteredDmNode()
var ddNode Node = newDDNode(dsService.ctx, dsService.flushChan, dsService.replica, dsService.idAllocator, vchanPair.CollectionID)
var insertBufferNode Node = newInsertBufferNode(dsService.ctx, dsService.replica, dsService.msFactory, dsService.idAllocator)
var gcNode Node = newGCNode(dsService.replica)
var ddNode Node = newDDNode()
var insertBufferNode Node = newInsertBufferNode(dsService.ctx, dsService.replica, dsService.msFactory, dsService.idAllocator, dsService.flushChan)
dsService.fg.AddNode(dmStreamNode)
dsService.fg.AddNode(ddStreamNode)
dsService.fg.AddNode(filterDmNode)
dsService.fg.AddNode(ddNode)
dsService.fg.AddNode(insertBufferNode)
dsService.fg.AddNode(gcNode)
// dmStreamNode
err = dsService.fg.SetEdges(dmStreamNode.Name(),
[]string{},
[]string{filterDmNode.Name()},
)
if err != nil {
log.Error("set edges failed in node", zap.String("name", dmStreamNode.Name()), zap.Error(err))
panic("set edges faild in the node")
}
// ddStreamNode
err = dsService.fg.SetEdges(ddStreamNode.Name(),
err = dsService.fg.SetEdges(dmStreamNode.Name(),
[]string{},
[]string{ddNode.Name()},
)
if err != nil {
log.Error("set edges failed in node", zap.String("name", ddStreamNode.Name()), zap.Error(err))
panic("set edges faild in the node")
}
// filterDmNode
err = dsService.fg.SetEdges(filterDmNode.Name(),
[]string{dmStreamNode.Name(), ddNode.Name()},
[]string{insertBufferNode.Name()},
)
if err != nil {
log.Error("set edges failed in node", zap.String("name", filterDmNode.Name()), zap.Error(err))
log.Error("set edges failed in node", zap.String("name", dmStreamNode.Name()), zap.Error(err))
panic("set edges faild in the node")
}
// ddNode
err = dsService.fg.SetEdges(ddNode.Name(),
[]string{ddStreamNode.Name()},
[]string{filterDmNode.Name()},
[]string{dmStreamNode.Name()},
[]string{insertBufferNode.Name()},
)
if err != nil {
log.Error("set edges failed in node", zap.String("name", ddNode.Name()), zap.Error(err))
......@@ -142,20 +113,11 @@ func (dsService *dataSyncService) initNodes(vchanPair *datapb.VchannelPair) {
// insertBufferNode
err = dsService.fg.SetEdges(insertBufferNode.Name(),
[]string{filterDmNode.Name()},
[]string{gcNode.Name()},
[]string{ddNode.Name()},
[]string{},
)
if err != nil {
log.Error("set edges failed in node", zap.String("name", insertBufferNode.Name()), zap.Error(err))
panic("set edges faild in the node")
}
// gcNode
err = dsService.fg.SetEdges(gcNode.Name(),
[]string{insertBufferNode.Name()},
[]string{})
if err != nil {
log.Error("set edges failed in node", zap.String("name", gcNode.Name()), zap.Error(err))
panic("set edges faild in the node")
}
}
......@@ -12,88 +12,24 @@
package datanode
import (
"context"
"fmt"
"path"
"sort"
"strconv"
"sync"
"github.com/golang/protobuf/proto"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/kv"
miniokv "github.com/milvus-io/milvus/internal/kv/minio"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/msgstream"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/schemapb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/util/flowgraph"
"github.com/milvus-io/milvus/internal/util/trace"
"github.com/opentracing/opentracing-go"
)
type ddNode struct {
BaseNode
ddMsg *ddMsg
ddRecords *ddRecords
ddBuffer *ddBuffer
flushMap *sync.Map
inFlushCh <-chan *flushMsg
idAllocator allocatorInterface
kv kv.BaseKV
replica Replica
collectionID UniqueID
}
type ddData struct {
ddRequestString []string
timestamps []Timestamp
eventTypes []storage.EventTypeCode
}
type ddBuffer struct {
// ddData map[UniqueID]*ddData // collection ID
ddData ddData
sync.Mutex
}
type ddRecords struct {
collectionRecords map[UniqueID]interface{}
partitionRecords map[UniqueID]interface{}
}
func (d *ddBuffer) getData() ddData {
d.Lock()
defer d.Unlock()
r := ddData{
ddRequestString: d.ddData.ddRequestString,
timestamps: d.ddData.timestamps,
eventTypes: d.ddData.eventTypes,
}
d.ddData.ddRequestString = make([]string, 0, 10)
d.ddData.timestamps = make([]Timestamp, 0, 10)
d.ddData.eventTypes = make([]storage.EventTypeCode, 0, 10)
return r
}
func (d *ddBuffer) append(request string, timestamp Timestamp, eventType storage.EventTypeCode) {
d.Lock()
defer d.Unlock()
d.ddData.ddRequestString = append(d.ddData.ddRequestString, request)
d.ddData.timestamps = append(d.ddData.timestamps, timestamp)
d.ddData.eventTypes = append(d.ddData.eventTypes, eventType)
}
func (ddNode *ddNode) Name() string {
func (ddn *ddNode) Name() string {
return "ddNode"
}
func (ddNode *ddNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
func (ddn *ddNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
if len(in) != 1 {
log.Error("Invalid operate message input in ddNode", zap.Int("input length", len(in)))
// TODO: add error handling
......@@ -112,352 +48,48 @@ func (ddNode *ddNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
if msMsg == nil {
return []Msg{}
}
var spans []opentracing.Span
for _, msg := range msMsg.TsMessages() {
sp, ctx := trace.StartSpanFromContext(msg.TraceCtx())
spans = append(spans, sp)
msg.SetTraceCtx(ctx)
}
ddNode.ddMsg = &ddMsg{
collectionRecords: make(map[UniqueID][]*metaOperateRecord),
partitionRecords: make(map[UniqueID][]*metaOperateRecord),
var iMsg = insertMsg{
insertMessages: make([]*msgstream.InsertMsg, 0),
timeRange: TimeRange{
timestampMin: msMsg.TimestampMin(),
timestampMax: msMsg.TimestampMax(),
},
gcRecord: &gcRecord{
collections: make([]UniqueID, 0),
},
startPositions: make([]*internalpb.MsgPosition, 0),
endPositions: make([]*internalpb.MsgPosition, 0),
}
// sort tsMessages
tsMessages := msMsg.TsMessages()
sort.Slice(tsMessages,
func(i, j int) bool {
return tsMessages[i].BeginTs() < tsMessages[j].BeginTs()
})
// do dd tasks
for _, msg := range tsMessages {
switch msg := msg.(type) {
case *msgstream.CreateCollectionMsg:
if msg.CollectionID != ddNode.collectionID {
continue
}
ddNode.createCollection(msg)
case *msgstream.DropCollectionMsg:
if msg.CollectionID != ddNode.collectionID {
continue
}
ddNode.dropCollection(msg)
case *msgstream.CreatePartitionMsg:
if msg.CollectionID != ddNode.collectionID {
continue
}
ddNode.createPartition(msg)
case *msgstream.DropPartitionMsg:
if msg.CollectionID != ddNode.collectionID {
continue
for _, msg := range msMsg.TsMessages() {
switch msg.Type() {
case commonpb.MsgType_DropCollection:
// TODO distroy dataSyncService and nodify datanode
log.Error("Distorying current flowgraph")
case commonpb.MsgType_Insert:
resMsg := ddn.filterFlushedSegmentInsertMessages(msg.(*msgstream.InsertMsg))
if resMsg != nil {
iMsg.insertMessages = append(iMsg.insertMessages, resMsg)
}
ddNode.dropPartition(msg)
default:
log.Error("Not supporting message type", zap.Any("Type", msg.Type()))
}
}
select {
case fmsg := <-ddNode.inFlushCh:
log.Debug(". receive flush message ...")
segID := fmsg.segmentID
if !ddNode.replica.hasSegment(segID) {
log.Debug(".. Segment not exist in this datanode, skip flushing ...")
break
}
iMsg.startPositions = append(iMsg.startPositions, msMsg.StartPositions()...)
iMsg.endPositions = append(iMsg.endPositions, msMsg.EndPositions()...)
seg, _ := ddNode.replica.getSegmentByID(segID)
collID := seg.collectionID
var res Msg = &iMsg
buf := ddNode.ddBuffer.getData()
if len(buf.ddRequestString) == 0 {
fmsg.ddlFlushedCh <- []*datapb.DDLBinlogMeta{}
} else {
ddNode.flushMap.Store(collID, &buf)
log.Debug(".. ddl buffer not empty, flushing ...")
binlogMetaCh := make(chan *datapb.DDLBinlogMeta)
go flush(collID, ddNode.flushMap, ddNode.kv, ddNode.idAllocator, binlogMetaCh)
go ddNode.flushComplete(binlogMetaCh, collID, fmsg.ddlFlushedCh)
}
log.Debug(".. notifying insertbuffer ...")
ddNode.ddMsg.flushMessage = fmsg
default:
}
for _, span := range spans {
span.Finish()
}
var res Msg = ddNode.ddMsg
return []Msg{res}
}
func (ddNode *ddNode) flushComplete(binlogMetaCh <-chan *datapb.DDLBinlogMeta, collID UniqueID, ddlFlushedCh chan<- []*datapb.DDLBinlogMeta) {
binlogMeta := <-binlogMetaCh
if binlogMeta == nil {
ddlFlushedCh <- nil
return
}
ddlFlushedCh <- []*datapb.DDLBinlogMeta{binlogMeta}
func (ddn *ddNode) filterFlushedSegmentInsertMessages(msg *msgstream.InsertMsg) *msgstream.InsertMsg {
// TODO fileter insert messages of flushed segments
return msg
}
/*
flush will
generate binlogs for all buffer data in ddNode,
store the generated binlogs to minIO/S3,
The keys of the binlogs are generated as below:
${tenant}/data_definition_log/${collection_id}/ts/${log_idx}
${tenant}/data_definition_log/${collection_id}/ddl/${log_idx}
*/
func flush(collID UniqueID, ddlData *sync.Map, kv kv.BaseKV, idAllocator allocatorInterface,
binlogMetaCh chan<- *datapb.DDLBinlogMeta) {
clearFn := func(isSuccess bool) {
if !isSuccess {
binlogMetaCh <- nil
}
}
ddCodec := &storage.DataDefinitionCodec{}
d, ok := ddlData.LoadAndDelete(collID)
if !ok {
log.Error("Flush failed ... cannot load ddlData ..")
clearFn(false)
return
}
data := d.(*ddData)
log.Debug(".. ddl flushing ...",
zap.Int64("collectionID", collID),
zap.Int("length", len(data.ddRequestString)))
binLogs, err := ddCodec.Serialize(data.timestamps, data.ddRequestString, data.eventTypes)
if err != nil || len(binLogs) != 2 {
log.Error("Codec Serialize wrong", zap.Error(err))
clearFn(false)
return
}
if len(data.ddRequestString) != len(data.timestamps) ||
len(data.timestamps) != len(data.eventTypes) {
log.Error("illegal ddBuffer, failed to save binlog")
clearFn(false)
return
}
kvs := make(map[string]string, 2)
tsIdx, err := idAllocator.genKey(true)
if err != nil {
log.Error("Id allocate wrong", zap.Error(err))
clearFn(false)
return
}
tsKey := path.Join(Params.DdlBinlogRootPath, strconv.FormatInt(collID, 10), binLogs[0].GetKey(), tsIdx)
kvs[tsKey] = string(binLogs[0].GetValue())
ddlIdx, err := idAllocator.genKey(true)
if err != nil {
log.Error("Id allocate wrong", zap.Error(err))
clearFn(false)
return
}
ddlKey := path.Join(Params.DdlBinlogRootPath, strconv.FormatInt(collID, 10), binLogs[1].GetKey(), ddlIdx)
kvs[ddlKey] = string(binLogs[1].GetValue())
// save ddl/ts binlog to minIO/s3
log.Debug(".. Saving ddl binlog to minIO/s3 ...")
err = kv.MultiSave(kvs)
if err != nil {
log.Error("Save to minIO/S3 Wrong", zap.Error(err))
_ = kv.MultiRemove([]string{tsKey, ddlKey})
clearFn(false)
return
}
log.Debug(".. Clearing ddl flush buffer ...")
clearFn(true)
binlogMetaCh <- &datapb.DDLBinlogMeta{
DdlBinlogPath: ddlKey,
TsBinlogPath: tsKey,
}
log.Debug(".. DDL flushing completed ...")
}
func (ddNode *ddNode) createCollection(msg *msgstream.CreateCollectionMsg) {
sp, ctx := trace.StartSpanFromContext(msg.TraceCtx())
msg.SetTraceCtx(ctx)
defer sp.Finish()
collectionID := msg.CollectionID
// add collection
if _, ok := ddNode.ddRecords.collectionRecords[collectionID]; ok {
err := fmt.Errorf("collection %d is already exists", collectionID)
log.Error("String conversion wrong", zap.Error(err))
return
}
ddNode.ddRecords.collectionRecords[collectionID] = nil
// TODO: add default partition?
var schema schemapb.CollectionSchema
err := proto.Unmarshal(msg.Schema, &schema)
if err != nil {
log.Error("proto unmarshal wrong", zap.Error(err))
return
}
// add collection
err = ddNode.replica.addCollection(collectionID, &schema)
if err != nil {
log.Error("replica add collection wrong", zap.Error(err))
return
}
ddNode.ddMsg.collectionRecords[collectionID] = append(ddNode.ddMsg.collectionRecords[collectionID],
&metaOperateRecord{
createOrDrop: true,
timestamp: msg.Base.Timestamp,
})
ddNode.ddBuffer.append(msg.CreateCollectionRequest.String(), msg.Base.Timestamp, storage.CreateCollectionEventType)
}
/*
dropCollection will drop collection in ddRecords but won't drop collection in replica
*/
func (ddNode *ddNode) dropCollection(msg *msgstream.DropCollectionMsg) {
sp, ctx := trace.StartSpanFromContext(msg.TraceCtx())
msg.SetTraceCtx(ctx)
defer sp.Finish()
collectionID := msg.CollectionID
// remove collection
if _, ok := ddNode.ddRecords.collectionRecords[collectionID]; !ok {
log.Error("Cannot find collection", zap.Int64("collection ID", collectionID))
return
}
delete(ddNode.ddRecords.collectionRecords, collectionID)
ddNode.ddMsg.collectionRecords[collectionID] = append(ddNode.ddMsg.collectionRecords[collectionID],
&metaOperateRecord{
createOrDrop: false,
timestamp: msg.Base.Timestamp,
})
ddNode.ddBuffer.append(msg.DropCollectionRequest.String(), msg.Base.Timestamp, storage.DropCollectionEventType)
ddNode.ddMsg.gcRecord.collections = append(ddNode.ddMsg.gcRecord.collections, collectionID)
}
func (ddNode *ddNode) createPartition(msg *msgstream.CreatePartitionMsg) {
sp, ctx := trace.StartSpanFromContext(msg.TraceCtx())
msg.SetTraceCtx(ctx)
defer sp.Finish()
partitionID := msg.PartitionID
// collectionID := msg.CollectionID
// add partition
if _, ok := ddNode.ddRecords.partitionRecords[partitionID]; ok {
log.Error("partition is already exists", zap.Int64("partition ID", partitionID))
return
}
ddNode.ddRecords.partitionRecords[partitionID] = nil
ddNode.ddMsg.partitionRecords[partitionID] = append(ddNode.ddMsg.partitionRecords[partitionID],
&metaOperateRecord{
createOrDrop: true,
timestamp: msg.Base.Timestamp,
})
ddNode.ddBuffer.append(msg.CreatePartitionRequest.String(), msg.Base.Timestamp, storage.CreatePartitionEventType)
}
func (ddNode *ddNode) dropPartition(msg *msgstream.DropPartitionMsg) {
sp, ctx := trace.StartSpanFromContext(msg.TraceCtx())
msg.SetTraceCtx(ctx)
defer sp.Finish()
partitionID := msg.PartitionID
// collectionID := msg.CollectionID
// remove partition
if _, ok := ddNode.ddRecords.partitionRecords[partitionID]; !ok {
log.Error("cannot found partition", zap.Int64("partition ID", partitionID))
return
}
delete(ddNode.ddRecords.partitionRecords, partitionID)
// partitionName := msg.PartitionName
// ddNode.ddMsg.partitionRecords[partitionName] = append(ddNode.ddMsg.partitionRecords[partitionName],
ddNode.ddMsg.partitionRecords[partitionID] = append(ddNode.ddMsg.partitionRecords[partitionID],
&metaOperateRecord{
createOrDrop: false,
timestamp: msg.Base.Timestamp,
})
ddNode.ddBuffer.append(msg.DropPartitionRequest.String(), msg.Base.Timestamp, storage.DropPartitionEventType)
}
func newDDNode(ctx context.Context, inFlushCh <-chan *flushMsg,
replica Replica, idAllocator allocatorInterface, collectionID UniqueID) *ddNode {
maxQueueLength := Params.FlowGraphMaxQueueLength
maxParallelism := Params.FlowGraphMaxParallelism
func newDDNode() *ddNode {
baseNode := BaseNode{}
baseNode.SetMaxQueueLength(maxQueueLength)
baseNode.SetMaxParallelism(maxParallelism)
ddRecords := &ddRecords{
collectionRecords: make(map[UniqueID]interface{}),
partitionRecords: make(map[UniqueID]interface{}),
}
bucketName := Params.MinioBucketName
option := &miniokv.Option{
Address: Params.MinioAddress,
AccessKeyID: Params.MinioAccessKeyID,
SecretAccessKeyID: Params.MinioSecretAccessKey,
UseSSL: Params.MinioUseSSL,
BucketName: bucketName,
CreateBucket: true,
}
minioKV, err := miniokv.NewMinIOKV(ctx, option)
if err != nil {
panic(err)
}
baseNode.SetMaxParallelism(Params.FlowGraphMaxQueueLength)
return &ddNode{
BaseNode: baseNode,
ddRecords: ddRecords,
ddBuffer: &ddBuffer{
ddData: ddData{
ddRequestString: make([]string, 0, 10),
timestamps: make([]Timestamp, 0, 10),
eventTypes: make([]storage.EventTypeCode, 0, 10),
},
},
inFlushCh: inFlushCh,
idAllocator: idAllocator,
kv: minioKV,
replica: replica,
flushMap: &sync.Map{},
collectionID: collectionID,
BaseNode: baseNode,
}
}
......@@ -12,167 +12,13 @@
package datanode
import (
"context"
"testing"
"time"
"github.com/stretchr/testify/assert"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/msgstream"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/util/flowgraph"
)
func TestFlowGraphDDNode_Operate(t *testing.T) {
const ctxTimeInMillisecond = 2000
const closeWithDeadline = true
var ctx context.Context
if closeWithDeadline {
var cancel context.CancelFunc
d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond)
ctx, cancel = context.WithDeadline(context.Background(), d)
defer cancel()
} else {
ctx = context.Background()
}
inFlushCh := make(chan *flushMsg, 10)
defer close(inFlushCh)
replica := newReplica()
collID := UniqueID(0)
ddNode := newDDNode(ctx, inFlushCh, replica, NewAllocatorFactory(), collID)
collName := "col-test-0"
// create collection
createCollReq := internalpb.CreateCollectionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_CreateCollection,
MsgID: 1,
Timestamp: 1,
SourceID: 1,
},
CollectionID: collID,
Schema: make([]byte, 0),
CollectionName: collName,
DbName: "DbName",
DbID: UniqueID(0),
}
createCollMsg := msgstream.CreateCollectionMsg{
BaseMsg: msgstream.BaseMsg{
BeginTimestamp: Timestamp(1),
EndTimestamp: Timestamp(1),
HashValues: []uint32{uint32(0)},
},
CreateCollectionRequest: createCollReq,
}
// drop collection
dropCollReq := internalpb.DropCollectionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_DropCollection,
MsgID: 2,
Timestamp: 2,
SourceID: 2,
},
CollectionID: collID,
CollectionName: collName,
DbName: "DbName",
DbID: UniqueID(0),
}
dropCollMsg := msgstream.DropCollectionMsg{
BaseMsg: msgstream.BaseMsg{
BeginTimestamp: Timestamp(2),
EndTimestamp: Timestamp(2),
HashValues: []uint32{uint32(0)},
},
DropCollectionRequest: dropCollReq,
}
partitionID := UniqueID(100)
partitionName := "partition-test-0"
// create partition
createPartitionReq := internalpb.CreatePartitionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_CreatePartition,
MsgID: 3,
Timestamp: 3,
SourceID: 3,
},
CollectionID: collID,
PartitionID: partitionID,
CollectionName: collName,
PartitionName: partitionName,
DbName: "DbName",
DbID: UniqueID(0),
}
createPartitionMsg := msgstream.CreatePartitionMsg{
BaseMsg: msgstream.BaseMsg{
BeginTimestamp: Timestamp(3),
EndTimestamp: Timestamp(3),
HashValues: []uint32{uint32(0)},
},
CreatePartitionRequest: createPartitionReq,
}
// drop partition
dropPartitionReq := internalpb.DropPartitionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_DropPartition,
MsgID: 4,
Timestamp: 4,
SourceID: 4,
},
CollectionID: collID,
PartitionID: partitionID,
CollectionName: collName,
PartitionName: partitionName,
DbName: "DbName",
DbID: UniqueID(0),
}
dropPartitionMsg := msgstream.DropPartitionMsg{
BaseMsg: msgstream.BaseMsg{
BeginTimestamp: Timestamp(4),
EndTimestamp: Timestamp(4),
HashValues: []uint32{uint32(0)},
},
DropPartitionRequest: dropPartitionReq,
}
replica.addSegment(1, collID, partitionID, "insert-01")
flushCh := make(chan []*datapb.DDLBinlogMeta)
inFlushCh <- &flushMsg{
msgID: 5,
timestamp: 5,
segmentID: UniqueID(1),
collectionID: collID,
ddlFlushedCh: flushCh,
}
startPos := []*internalpb.MsgPosition{
{
ChannelName: "aaa",
MsgID: make([]byte, 0),
Timestamp: 0,
},
}
// ddNode := newDDNode()
tsMessages := make([]msgstream.TsMsg, 0)
tsMessages = append(tsMessages, msgstream.TsMsg(&createCollMsg))
tsMessages = append(tsMessages, msgstream.TsMsg(&dropCollMsg))
tsMessages = append(tsMessages, msgstream.TsMsg(&createPartitionMsg))
tsMessages = append(tsMessages, msgstream.TsMsg(&dropPartitionMsg))
msgStream := flowgraph.GenerateMsgStreamMsg(tsMessages, Timestamp(0), Timestamp(3),
startPos, startPos)
var inMsg Msg = msgStream
ddNode.Operate([]Msg{inMsg})
// var inMsg Msg = msgStream
// ddNode.Operate([]Msg{inMsg})
paths := <-flushCh
log.Debug("Flushed DDL binlog paths", zap.Any("paths", paths))
assert.Equal(t, 1, len(paths))
}
......@@ -21,8 +21,7 @@ import (
)
func newDmInputNode(ctx context.Context, factory msgstream.Factory, vchannelName string, vchannelPos *datapb.PositionPair) *flowgraph.InputNode {
// TODO use position pair in Seek
// TODO seek
maxQueueLength := Params.FlowGraphMaxQueueLength
maxParallelism := Params.FlowGraphMaxParallelism
consumeSubName := Params.MsgChannelSubName
......@@ -35,19 +34,3 @@ func newDmInputNode(ctx context.Context, factory msgstream.Factory, vchannelName
node := flowgraph.NewInputNode(&stream, "dmInputNode", maxQueueLength, maxParallelism)
return node
}
func newDDInputNode(ctx context.Context, factory msgstream.Factory, vchannelName string, vchannelPos *datapb.PositionPair) *flowgraph.InputNode {
// TODO use position pair in Seek
maxQueueLength := Params.FlowGraphMaxQueueLength
maxParallelism := Params.FlowGraphMaxParallelism
consumeSubName := Params.MsgChannelSubName
tmpStream, _ := factory.NewTtMsgStream(ctx)
tmpStream.AsConsumer([]string{vchannelName}, consumeSubName)
log.Debug("datanode AsConsumer: " + vchannelName + " : " + consumeSubName)
var stream msgstream.MsgStream = tmpStream
node := flowgraph.NewInputNode(&stream, "ddInputNode", maxQueueLength, maxParallelism)
return node
}
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
package datanode
import (
"math"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/msgstream"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/util/flowgraph"
"github.com/milvus-io/milvus/internal/util/trace"
"github.com/opentracing/opentracing-go"
"go.uber.org/zap"
)
type filterDmNode struct {
BaseNode
ddMsg *ddMsg
}
func (fdmNode *filterDmNode) Name() string {
return "fdmNode"
}
func (fdmNode *filterDmNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
if len(in) != 2 {
log.Error("Invalid operate message input in filterDmNode", zap.Int("input length", len(in)))
// TODO: add error handling
}
msgStreamMsg, ok := in[0].(*MsgStreamMsg)
if !ok {
log.Error("type assertion failed for MsgStreamMsg")
// TODO: add error handling
}
ddMsg, ok := in[1].(*ddMsg)
if !ok {
log.Error("type assertion failed for ddMsg")
// TODO: add error handling
}
if msgStreamMsg == nil || ddMsg == nil {
return []Msg{}
}
var spans []opentracing.Span
for _, msg := range msgStreamMsg.TsMessages() {
sp, ctx := trace.StartSpanFromContext(msg.TraceCtx())
spans = append(spans, sp)
msg.SetTraceCtx(ctx)
}
fdmNode.ddMsg = ddMsg
var iMsg = insertMsg{
insertMessages: make([]*msgstream.InsertMsg, 0),
// flushMessages: make([]*flushMsg, 0),
timeRange: TimeRange{
timestampMin: msgStreamMsg.TimestampMin(),
timestampMax: msgStreamMsg.TimestampMax(),
},
startPositions: make([]*internalpb.MsgPosition, 0),
endPositions: make([]*internalpb.MsgPosition, 0),
}
// iMsg.flushMessages = append(iMsg.flushMessages, ddMsg.flushMessages...)
iMsg.flushMessage = ddMsg.flushMessage
for _, msg := range msgStreamMsg.TsMessages() {
switch msg.Type() {
case commonpb.MsgType_Insert:
resMsg := fdmNode.filterInvalidInsertMessage(msg.(*msgstream.InsertMsg))
if resMsg != nil {
iMsg.insertMessages = append(iMsg.insertMessages, resMsg)
}
// case commonpb.MsgType_kDelete:
// dmMsg.deleteMessages = append(dmMsg.deleteMessages, (*msg).(*msgstream.DeleteTask))
default:
log.Error("Not supporting message type", zap.Any("Type", msg.Type()))
}
}
iMsg.startPositions = append(iMsg.startPositions, msgStreamMsg.StartPositions()...)
iMsg.endPositions = append(iMsg.endPositions, msgStreamMsg.EndPositions()...)
iMsg.gcRecord = ddMsg.gcRecord
var res Msg = &iMsg
for _, sp := range spans {
sp.Finish()
}
return []Msg{res}
}
func (fdmNode *filterDmNode) filterInvalidInsertMessage(msg *msgstream.InsertMsg) *msgstream.InsertMsg {
// No dd record, do all insert requests.
sp, ctx := trace.StartSpanFromContext(msg.TraceCtx())
msg.SetTraceCtx(ctx)
defer sp.Finish()
records, ok := fdmNode.ddMsg.collectionRecords[msg.CollectionID]
if !ok {
return msg
}
// TODO: If the last record is drop type, all insert requests are invalid.
//if !records[len(records)-1].createOrDrop {
// return nil
//}
// Filter insert requests before last record.
if len(msg.RowIDs) != len(msg.Timestamps) || len(msg.RowIDs) != len(msg.RowData) {
// TODO: what if the messages are misaligned? Here, we ignore those messages and print error
log.Error("misaligned messages detected")
return nil
}
tmpTimestamps := make([]Timestamp, 0)
tmpRowIDs := make([]int64, 0)
tmpRowData := make([]*commonpb.Blob, 0)
// calculate valid time range
timeBegin := Timestamp(0)
timeEnd := Timestamp(math.MaxUint64)
for _, record := range records {
if record.createOrDrop && timeBegin < record.timestamp {
timeBegin = record.timestamp
}
if !record.createOrDrop && timeEnd > record.timestamp {
timeEnd = record.timestamp
}
}
for i, t := range msg.Timestamps {
if t >= timeBegin && t <= timeEnd {
tmpTimestamps = append(tmpTimestamps, t)
tmpRowIDs = append(tmpRowIDs, msg.RowIDs[i])
tmpRowData = append(tmpRowData, msg.RowData[i])
}
}
if len(tmpRowIDs) <= 0 {
return nil
}
msg.Timestamps = tmpTimestamps
msg.RowIDs = tmpRowIDs
msg.RowData = tmpRowData
return msg
}
func newFilteredDmNode() *filterDmNode {
maxQueueLength := Params.FlowGraphMaxQueueLength
maxParallelism := Params.FlowGraphMaxParallelism
baseNode := BaseNode{}
baseNode.SetMaxQueueLength(maxQueueLength)
baseNode.SetMaxParallelism(maxParallelism)
return &filterDmNode{
BaseNode: baseNode,
}
}
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
package datanode
import (
"github.com/milvus-io/milvus/internal/util/flowgraph"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/log"
)
type gcNode struct {
BaseNode
replica Replica
}
func (gcNode *gcNode) Name() string {
return "gcNode"
}
func (gcNode *gcNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
if len(in) != 1 {
log.Error("Invalid operate message input in gcNode", zap.Int("input length", len(in)))
// TODO: add error handling
}
gcMsg, ok := in[0].(*gcMsg)
if !ok {
log.Error("type assertion failed for gcMsg")
// TODO: add error handling
}
if gcMsg == nil {
return []Msg{}
}
// drop collections
for _, collectionID := range gcMsg.gcRecord.collections {
err := gcNode.replica.removeCollection(collectionID)
if err != nil {
log.Error("replica remove collection wrong", zap.Error(err))
}
}
return nil
}
func newGCNode(replica Replica) *gcNode {
maxQueueLength := Params.FlowGraphMaxQueueLength
maxParallelism := Params.FlowGraphMaxParallelism
baseNode := BaseNode{}
baseNode.SetMaxQueueLength(maxQueueLength)
baseNode.SetMaxParallelism(maxParallelism)
return &gcNode{
BaseNode: baseNode,
replica: replica,
}
}
......@@ -54,12 +54,12 @@ type insertBufferNode struct {
replica Replica
idAllocator allocatorInterface
flushMap sync.Map
flushChan <-chan *flushMsg
minIOKV kv.BaseKV
timeTickStream msgstream.MsgStream
segmentStatisticsStream msgstream.MsgStream
completeFlushStream msgstream.MsgStream
}
type insertBuffer struct {
......@@ -487,12 +487,13 @@ func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
// iMsg is Flush() msg from dataservice
// 1. insertBuffer(not empty) -> binLogs -> minIO/S3
if iMsg.flushMessage != nil && ibNode.replica.hasSegment(iMsg.flushMessage.segmentID) {
currentSegID := iMsg.flushMessage.segmentID
select {
case fmsg := <-ibNode.flushChan:
currentSegID := fmsg.segmentID
log.Debug(". Receiving flush message", zap.Int64("segmentID", currentSegID))
finishCh := make(chan map[UniqueID]string)
go ibNode.completeFlush(currentSegID, finishCh, iMsg.flushMessage.dmlFlushedCh)
go ibNode.completeFlush(currentSegID, finishCh, fmsg.dmlFlushedCh)
if ibNode.insertBuffer.size(currentSegID) <= 0 {
log.Debug(".. Buffer empty ...")
......@@ -532,21 +533,19 @@ func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
&ibNode.flushMap, ibNode.minIOKV, finishCh, ibNode.idAllocator)
}
default:
}
if err := ibNode.writeHardTimeTick(iMsg.timeRange.timestampMax); err != nil {
log.Error("send hard time tick into pulsar channel failed", zap.Error(err))
}
// TODO write timetick
// if err := ibNode.writeHardTimeTick(iMsg.timeRange.timestampMax); err != nil {
// log.Error("send hard time tick into pulsar channel failed", zap.Error(err))
// }
var res Msg = &gcMsg{
gcRecord: iMsg.gcRecord,
timeRange: iMsg.timeRange,
}
for _, sp := range spans {
sp.Finish()
}
return []Msg{res}
return nil
}
func flushSegment(collMeta *etcdpb.CollectionMeta, segID, partitionID, collID UniqueID,
......@@ -770,7 +769,7 @@ func (ibNode *insertBufferNode) getCollectionandPartitionIDbySegID(segmentID Uni
return
}
func newInsertBufferNode(ctx context.Context, replica Replica, factory msgstream.Factory, idAllocator allocatorInterface) *insertBufferNode {
func newInsertBufferNode(ctx context.Context, replica Replica, factory msgstream.Factory, idAllocator allocatorInterface, flushCh <-chan *flushMsg) *insertBufferNode {
maxQueueLength := Params.FlowGraphMaxQueueLength
maxParallelism := Params.FlowGraphMaxParallelism
......@@ -813,22 +812,17 @@ func newInsertBufferNode(ctx context.Context, replica Replica, factory msgstream
var segStatisticsMsgStream msgstream.MsgStream = segS
segStatisticsMsgStream.Start()
// segment flush completed channel
cf, _ := factory.NewMsgStream(ctx)
cf.AsProducer([]string{Params.CompleteFlushChannelName})
log.Debug("datanode AsProducer: " + Params.CompleteFlushChannelName)
var completeFlushStream msgstream.MsgStream = cf
completeFlushStream.Start()
return &insertBufferNode{
BaseNode: baseNode,
insertBuffer: iBuffer,
minIOKV: minIOKV,
BaseNode: baseNode,
insertBuffer: iBuffer,
minIOKV: minIOKV,
timeTickStream: wTtMsgStream,
segmentStatisticsStream: segStatisticsMsgStream,
completeFlushStream: completeFlushStream,
replica: replica,
flushMap: sync.Map{},
idAllocator: idAllocator,
replica: replica,
flushMap: sync.Map{},
flushChan: flushCh,
idAllocator: idAllocator,
}
}
......@@ -72,21 +72,28 @@ func TestFlowGraphInsertBufferNode_Operate(t *testing.T) {
err = msFactory.SetParams(m)
assert.Nil(t, err)
iBNode := newInsertBufferNode(ctx, replica, msFactory, NewAllocatorFactory())
flushChan := make(chan *flushMsg, 100)
iBNode := newInsertBufferNode(ctx, replica, msFactory, NewAllocatorFactory(), flushChan)
ddlFlushedCh := make(chan []*datapb.DDLBinlogMeta)
dmlFlushedCh := make(chan []*datapb.ID2PathList)
inMsg := genInsertMsg(ddlFlushedCh, dmlFlushedCh)
flushChan <- &flushMsg{
msgID: 1,
timestamp: 2000,
segmentID: UniqueID(1),
collectionID: UniqueID(1),
dmlFlushedCh: dmlFlushedCh,
}
inMsg := genInsertMsg()
var iMsg flowgraph.Msg = &inMsg
iBNode.Operate([]flowgraph.Msg{iMsg})
isflushed := <-dmlFlushedCh
assert.NotNil(t, isflushed)
log.Debug("DML binlog paths", zap.Any("paths", isflushed))
}
func genInsertMsg(ddlFlushedCh chan<- []*datapb.DDLBinlogMeta, dmlFlushedCh chan<- []*datapb.ID2PathList) insertMsg {
func genInsertMsg() insertMsg {
timeRange := TimeRange{
timestampMin: 0,
......@@ -114,15 +121,6 @@ func genInsertMsg(ddlFlushedCh chan<- []*datapb.DDLBinlogMeta, dmlFlushedCh chan
dataFactory := NewDataFactory()
iMsg.insertMessages = append(iMsg.insertMessages, dataFactory.GetMsgStreamInsertMsgs(2)...)
iMsg.flushMessage = &flushMsg{
msgID: 1,
timestamp: 2000,
segmentID: UniqueID(1),
collectionID: UniqueID(1),
ddlFlushedCh: ddlFlushedCh,
dmlFlushedCh: dmlFlushedCh,
}
return *iMsg
}
......
......@@ -23,72 +23,21 @@ type (
MsgStreamMsg = flowgraph.MsgStreamMsg
)
type key2SegMsg struct {
tsMessages []msgstream.TsMsg
timeRange TimeRange
}
type ddMsg struct {
collectionRecords map[UniqueID][]*metaOperateRecord
partitionRecords map[UniqueID][]*metaOperateRecord
flushMessage *flushMsg
gcRecord *gcRecord
timeRange TimeRange
}
type metaOperateRecord struct {
createOrDrop bool // create: true, drop: false
timestamp Timestamp
}
type insertMsg struct {
insertMessages []*msgstream.InsertMsg
flushMessage *flushMsg
gcRecord *gcRecord
timeRange TimeRange
startPositions []*internalpb.MsgPosition
endPositions []*internalpb.MsgPosition
}
type deleteMsg struct {
deleteMessages []*msgstream.DeleteMsg
timeRange TimeRange
}
type gcMsg struct {
gcRecord *gcRecord
timeRange TimeRange
}
type gcRecord struct {
collections []UniqueID
}
type flushMsg struct {
msgID UniqueID
timestamp Timestamp
segmentID UniqueID
collectionID UniqueID
ddlFlushedCh chan<- []*datapb.DDLBinlogMeta
dmlFlushedCh chan<- []*datapb.ID2PathList
}
func (ksMsg *key2SegMsg) TimeTick() Timestamp {
return ksMsg.timeRange.timestampMax
}
func (suMsg *ddMsg) TimeTick() Timestamp {
return suMsg.timeRange.timestampMax
}
func (iMsg *insertMsg) TimeTick() Timestamp {
return iMsg.timeRange.timestampMax
}
func (dMsg *deleteMsg) TimeTick() Timestamp {
return dMsg.timeRange.timestampMax
}
func (gcMsg *gcMsg) TimeTick() Timestamp {
return gcMsg.timeRange.timestampMax
}
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment