diff --git a/internal/datanode/allocator.go b/internal/datanode/allocator.go index 9335d148aafdc041799788a0a7245d0894ff1d1d..3d6ccb0010d5ec6c55671627a45b36857155b489 100644 --- a/internal/datanode/allocator.go +++ b/internal/datanode/allocator.go @@ -13,6 +13,8 @@ package datanode import ( "context" + "path" + "strconv" "github.com/milvus-io/milvus/internal/types" @@ -22,11 +24,15 @@ import ( type allocatorInterface interface { allocID() (UniqueID, error) + genKey(alloc bool, ids ...UniqueID) (key string, err error) } + type allocator struct { masterService types.MasterService } +var _ allocatorInterface = &allocator{} + func newAllocator(s types.MasterService) *allocator { return &allocator{ masterService: s, @@ -49,3 +55,24 @@ func (alloc *allocator) allocID() (UniqueID, error) { } return resp.ID, nil } + +// genKey gives a valid key string for lists of UniqueIDs: +// if alloc is true, the returned keys will have a generated-unique ID at the end. +// if alloc is false, the returned keys will only consist of provided ids. +func (alloc *allocator) genKey(isalloc bool, ids ...UniqueID) (key string, err error) { + if isalloc { + idx, err := alloc.allocID() + if err != nil { + return "", err + } + ids = append(ids, idx) + } + + idStr := make([]string, len(ids)) + for _, id := range ids { + idStr = append(idStr, strconv.FormatInt(id, 10)) + } + + key = path.Join(idStr...) + return +} diff --git a/internal/datanode/binlog_meta.go b/internal/datanode/binlog_meta.go index 838c0f5fb1c27929bfc0905a978530efd6537589..ed72e1db6a8f2e92c1ff1b11b9a7513189f62778 100644 --- a/internal/datanode/binlog_meta.go +++ b/internal/datanode/binlog_meta.go @@ -63,20 +63,22 @@ func (bm *binlogMeta) genKey(alloc bool, ids ...UniqueID) (key string, err error // SaveSegmentBinlogMetaTxn stores all fields' binlog paths of a segment in a transaction. // segment binlog etcd meta key: // ${prefix}/${segmentID}/${fieldID}/${idx} -func (bm *binlogMeta) SaveSegmentBinlogMetaTxn(segmentID UniqueID, field2Path map[UniqueID]string) error { - - etcdKey2binlogPath := make(map[string]string, len(field2Path)) - for fieldID, p := range field2Path { - key, err := bm.genKey(true, segmentID, fieldID) - if err != nil { - return err +func (bm *binlogMeta) SaveSegmentBinlogMetaTxn(segmentID UniqueID, field2Path map[UniqueID][]string) error { + + etcdKey2binlogPath := make(map[string]string) + for fieldID, paths := range field2Path { + for _, p := range paths { + key, err := bm.genKey(true, segmentID, fieldID) + if err != nil { + return err + } + + binlogPath := proto.MarshalTextString(&datapb.SegmentFieldBinlogMeta{ + FieldID: fieldID, + BinlogPath: p, + }) + etcdKey2binlogPath[path.Join(Params.SegFlushMetaSubPath, key)] = binlogPath } - - binlogPath := proto.MarshalTextString(&datapb.SegmentFieldBinlogMeta{ - FieldID: fieldID, - BinlogPath: p, - }) - etcdKey2binlogPath[path.Join(Params.SegFlushMetaSubPath, key)] = binlogPath } return bm.client.MultiSave(etcdKey2binlogPath) } diff --git a/internal/datanode/binlog_meta_test.go b/internal/datanode/binlog_meta_test.go index e5023db9a5839a41ec3a498d9451a9e8c0aa2cff..b7c14067f26a1fbbb4a1b42c90c4f91bfa86a726 100644 --- a/internal/datanode/binlog_meta_test.go +++ b/internal/datanode/binlog_meta_test.go @@ -63,10 +63,10 @@ func TestMetaTable_Basic(t *testing.T) { t.Run("TestBasic_SaveSegmentBinlogMetaTxn", func(t *testing.T) { segID := UniqueID(999999) - fieldID2Path := map[UniqueID]string{ - 100: "a", - 200: "b", - 300: "c", + fieldID2Path := map[UniqueID][]string{ + 100: {"a"}, + 200: {"b"}, + 300: {"c"}, } err := meta.SaveSegmentBinlogMetaTxn(segID, fieldID2Path) @@ -87,10 +87,10 @@ func TestMetaTable_Basic(t *testing.T) { assert.Equal(t, 1, len(metas)) assert.Equal(t, "c", metas[0].GetBinlogPath()) - fieldID2Path2 := map[UniqueID]string{ - 100: "aa", - 200: "bb", - 300: "cc", + fieldID2Path2 := map[UniqueID][]string{ + 100: {"aa"}, + 200: {"bb"}, + 300: {"cc"}, } err = meta.SaveSegmentBinlogMetaTxn(segID, fieldID2Path2) diff --git a/internal/datanode/data_node_test.go b/internal/datanode/data_node_test.go index fb51d02f2898e5d3ff548b8e3a4b0c324e016f91..ce3a79fa54ece3d0581cb71f789cabf42a3df969 100644 --- a/internal/datanode/data_node_test.go +++ b/internal/datanode/data_node_test.go @@ -21,10 +21,12 @@ import ( "github.com/milvus-io/milvus/internal/proto/internalpb" ) -func TestDataNode(t *testing.T) { +func TestMain(t *testing.M) { Params.Init() refreshChannelNames() +} +func TestDataNode(t *testing.T) { node := newDataNodeMock() node.Start() diff --git a/internal/datanode/data_sync_service.go b/internal/datanode/data_sync_service.go index 57c71acdf46e061fc9539e325182e6a9d162bd08..f7b7de8f3bf9345b1f10ee0b5c519093ed7b8abd 100644 --- a/internal/datanode/data_sync_service.go +++ b/internal/datanode/data_sync_service.go @@ -107,7 +107,7 @@ func (dsService *dataSyncService) initNodes() { var filterDmNode Node = newFilteredDmNode() var ddNode Node = newDDNode(dsService.ctx, mt, dsService.flushChan, dsService.replica) - var insertBufferNode Node = newInsertBufferNode(dsService.ctx, mt, dsService.replica, dsService.msFactory) + var insertBufferNode Node = newInsertBufferNode(dsService.ctx, mt, dsService.replica, dsService.msFactory, dsService.idAllocator) var gcNode Node = newGCNode(dsService.replica) dsService.fg.AddNode(dmStreamNode) diff --git a/internal/datanode/flow_graph_insert_buffer_node.go b/internal/datanode/flow_graph_insert_buffer_node.go index be0dd2f56b3dd29e82eb5506ace033ef1d8e27d9..b25c45577af6388e59c4b35724beb2b8ce0430db 100644 --- a/internal/datanode/flow_graph_insert_buffer_node.go +++ b/internal/datanode/flow_graph_insert_buffer_node.go @@ -15,6 +15,7 @@ import ( "bytes" "context" "encoding/binary" + "errors" "path" "strconv" "sync" @@ -50,7 +51,8 @@ type insertBufferNode struct { BaseNode insertBuffer *insertBuffer replica Replica - flushMeta *binlogMeta + flushMeta *binlogMeta // GOOSE TODO remove + idAllocator allocatorInterface flushMap sync.Map minIOKV kv.BaseKV @@ -163,9 +165,9 @@ func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { uniqueSeg[currentSegID] = segNum + int64(len(msg.RowIDs)) } - segIDs := make([]UniqueID, 0, len(uniqueSeg)) + segToUpdate := make([]UniqueID, 0, len(uniqueSeg)) for id, num := range uniqueSeg { - segIDs = append(segIDs, id) + segToUpdate = append(segToUpdate, id) err := ibNode.replica.updateStatistics(id, num) if err != nil { @@ -173,8 +175,8 @@ func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { } } - if len(segIDs) > 0 { - err := ibNode.updateSegStatistics(segIDs) + if len(segToUpdate) > 0 { + err := ibNode.updateSegStatistics(segToUpdate) if err != nil { log.Error("update segment statistics error", zap.Error(err)) } @@ -465,37 +467,6 @@ func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { ibNode.replica.setEndPosition(currentSegID, endPosition) } - // 1.4 if full, auto flush - if ibNode.insertBuffer.full(currentSegID) { - log.Debug(". Insert Buffer full, auto flushing ", - zap.Int32("num of rows", ibNode.insertBuffer.size(currentSegID))) - - collSch, err := ibNode.getCollectionSchemaByID(collection.GetID()) - if err != nil { - log.Error("Auto flush failed .. cannot get collection schema ..", zap.Error(err)) - continue - } - collMeta := &etcdpb.CollectionMeta{ - Schema: collSch, - ID: collection.GetID(), - } - - ibNode.flushMap.Store(currentSegID, ibNode.insertBuffer.insertData[currentSegID]) - delete(ibNode.insertBuffer.insertData, currentSegID) - - finishCh := make(chan bool) - go flushSegmentTxn(collMeta, currentSegID, msg.GetPartitionID(), collection.GetID(), - &ibNode.flushMap, ibNode.flushMeta, ibNode.minIOKV, - finishCh) - - go func(finishCh <-chan bool) { - if finished := <-finishCh; !finished { - log.Debug(".. Auto Flush failed ..") - return - } - log.Debug(".. Auto Flush completed ..") - }(finishCh) - } } if len(iMsg.insertMessages) > 0 { @@ -517,12 +488,13 @@ func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { for _, currentSegID := range msg.segmentIDs { log.Debug(". Receiving flush message", zap.Int64("segmentID", currentSegID)) - finishCh := make(chan bool) + // finishCh := make(chan bool) + finishCh := make(chan map[UniqueID]string) go ibNode.completeFlush(currentSegID, finishCh) if ibNode.insertBuffer.size(currentSegID) <= 0 { log.Debug(".. Buffer empty ...") - finishCh <- true + finishCh <- make(map[UniqueID]string) continue } @@ -530,7 +502,7 @@ func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { ibNode.flushMap.Store(currentSegID, ibNode.insertBuffer.insertData[currentSegID]) delete(ibNode.insertBuffer.insertData, currentSegID) clearFn := func() { - finishCh <- false + finishCh <- nil log.Debug(".. Clearing flush Buffer ..") ibNode.flushMap.Delete(currentSegID) } @@ -554,8 +526,36 @@ func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { ID: seg.collectionID, } - go flushSegmentTxn(collMeta, currentSegID, seg.partitionID, seg.collectionID, - &ibNode.flushMap, ibNode.flushMeta, ibNode.minIOKV, finishCh) + go flushSegment(collMeta, currentSegID, seg.partitionID, seg.collectionID, + &ibNode.flushMap, ibNode.minIOKV, finishCh, ibNode.idAllocator) + } + } + + for _, segToFlush := range segToUpdate { + // If full, auto flush + if ibNode.insertBuffer.full(segToFlush) { + log.Debug(". Insert Buffer full, auto flushing ", + zap.Int32("num of rows", ibNode.insertBuffer.size(segToFlush))) + + collMeta, err := ibNode.getCollMetabySegID(segToFlush) + if err != nil { + log.Error("Auto flush failed .. cannot get collection meta ..", zap.Error(err)) + continue + } + + ibNode.flushMap.Store(segToFlush, ibNode.insertBuffer.insertData[segToFlush]) + delete(ibNode.insertBuffer.insertData, segToFlush) + + collID, partitionID, err := ibNode.getCollectionandPartitionIDbySegID(segToFlush) + if err != nil { + log.Error("Auto flush failed .. cannot get collection ID or partition ID..", zap.Error(err)) + continue + } + + finishCh := make(chan map[UniqueID]string) + go flushSegment(collMeta, segToFlush, partitionID, collID, + &ibNode.flushMap, ibNode.minIOKV, finishCh, ibNode.idAllocator) + go ibNode.bufferAutoFlushPaths(finishCh, segToFlush) } } @@ -574,11 +574,14 @@ func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { return []Msg{res} } -func flushSegmentTxn(collMeta *etcdpb.CollectionMeta, segID UniqueID, partitionID UniqueID, collID UniqueID, - insertData *sync.Map, meta *binlogMeta, kv kv.BaseKV, finishCh chan<- bool) { +func flushSegment(collMeta *etcdpb.CollectionMeta, segID, partitionID, collID UniqueID, + insertData *sync.Map, kv kv.BaseKV, field2PathCh chan<- map[UniqueID]string, idAllocator allocatorInterface) { clearFn := func(isSuccess bool) { - finishCh <- isSuccess + if !isSuccess { + field2PathCh <- nil + } + log.Debug(".. Clearing flush Buffer ..") insertData.Delete(segID) } @@ -612,7 +615,7 @@ func flushSegmentTxn(collMeta *etcdpb.CollectionMeta, segID UniqueID, partitionI return } - k, err := meta.genKey(true, collID, partitionID, segID, fieldID) + k, err := idAllocator.genKey(true, collID, partitionID, segID, fieldID) if err != nil { log.Error("Flush failed ... cannot alloc ID ..", zap.Error(err)) clearFn(false) @@ -633,20 +636,38 @@ func flushSegmentTxn(collMeta *etcdpb.CollectionMeta, segID UniqueID, partitionI return } - log.Debug(".. Saving binlog paths to etcd ..", zap.Int("number", len(binLogs))) - err = meta.SaveSegmentBinlogMetaTxn(segID, field2Path) - if err != nil { - log.Error("Flush failed ... cannot save binlog paths ..", zap.Error(err)) - _ = kv.MultiRemove(paths) - clearFn(false) - return + field2PathCh <- field2Path + clearFn(true) +} + +func (ibNode *insertBufferNode) bufferAutoFlushPaths(wait <-chan map[UniqueID]string, segID UniqueID) error { + field2Path := <-wait + if field2Path == nil { + return errors.New("Nil field2Path") } - clearFn(true) + return ibNode.replica.bufferAutoFlushBinlogPaths(segID, field2Path) } -func (ibNode *insertBufferNode) completeFlush(segID UniqueID, finishCh <-chan bool) { - if finished := <-finishCh; !finished { +func (ibNode *insertBufferNode) completeFlush(segID UniqueID, wait <-chan map[UniqueID]string) { + field2Path := <-wait + + if field2Path == nil { + return + } + + // TODO Call DataService RPC SaveBinlogPaths + ibNode.replica.bufferAutoFlushBinlogPaths(segID, field2Path) + bufferField2Paths, err := ibNode.replica.getBufferPaths(segID) + if err != nil { + log.Error("Flush failed ... cannot get buffered paths", zap.Error(err)) + } + + // GOOSE TODO remove the below + log.Debug(".. Saving binlog paths to etcd ..", zap.Int("number of fields", len(field2Path))) + err = ibNode.flushMeta.SaveSegmentBinlogMetaTxn(segID, bufferField2Paths) + if err != nil { + log.Error("Flush failed ... cannot save binlog paths ..", zap.Error(err)) return } @@ -672,7 +693,7 @@ func (ibNode *insertBufferNode) completeFlush(segID UniqueID, finishCh <-chan bo } msgPack.Msgs = append(msgPack.Msgs, msg) - err := ibNode.completeFlushStream.Produce(&msgPack) + err = ibNode.completeFlushStream.Produce(&msgPack) if err != nil { log.Error(".. Produce complete flush msg failed ..", zap.Error(err)) } @@ -742,8 +763,33 @@ func (ibNode *insertBufferNode) getCollectionSchemaByID(collectionID UniqueID) ( return ret.schema, nil } +func (ibNode *insertBufferNode) getCollMetabySegID(segmentID UniqueID) (meta *etcdpb.CollectionMeta, err error) { + ret, err := ibNode.replica.getSegmentByID(segmentID) + if err != nil { + return + } + meta.ID = ret.collectionID + + coll, err := ibNode.replica.getCollectionByID(ret.collectionID) + if err != nil { + return + } + meta.Schema = coll.GetSchema() + return +} + +func (ibNode *insertBufferNode) getCollectionandPartitionIDbySegID(segmentID UniqueID) (collID, partitionID UniqueID, err error) { + seg, err := ibNode.replica.getSegmentByID(segmentID) + if err != nil { + return + } + collID = seg.collectionID + partitionID = seg.partitionID + return +} + func newInsertBufferNode(ctx context.Context, flushMeta *binlogMeta, - replica Replica, factory msgstream.Factory) *insertBufferNode { + replica Replica, factory msgstream.Factory, idAllocator allocatorInterface) *insertBufferNode { maxQueueLength := Params.FlowGraphMaxQueueLength maxParallelism := Params.FlowGraphMaxParallelism @@ -803,5 +849,6 @@ func newInsertBufferNode(ctx context.Context, flushMeta *binlogMeta, replica: replica, flushMeta: flushMeta, flushMap: sync.Map{}, + idAllocator: idAllocator, } } diff --git a/internal/datanode/flow_graph_insert_buffer_node_test.go b/internal/datanode/flow_graph_insert_buffer_node_test.go index ad624e3c6dbc868a316319d47dc99305b677126d..55561501c3d1e53f7312b2445341e8e562e5411f 100644 --- a/internal/datanode/flow_graph_insert_buffer_node_test.go +++ b/internal/datanode/flow_graph_insert_buffer_node_test.go @@ -61,7 +61,7 @@ func TestFlowGraphInsertBufferNode_Operate(t *testing.T) { err = msFactory.SetParams(m) assert.Nil(t, err) - iBNode := newInsertBufferNode(ctx, newBinlogMeta(), replica, msFactory) + iBNode := newInsertBufferNode(ctx, newBinlogMeta(), replica, msFactory, NewAllocatorFactory()) inMsg := genInsertMsg() var iMsg flowgraph.Msg = &inMsg iBNode.Operate([]flowgraph.Msg{iMsg}) diff --git a/internal/datanode/mock_test.go b/internal/datanode/mock_test.go index 7ef8520d0823ff1b418877c7ab46a0a785d30413..215913828e9a0db9659223f635971a7092b12860 100644 --- a/internal/datanode/mock_test.go +++ b/internal/datanode/mock_test.go @@ -17,6 +17,7 @@ import ( "encoding/binary" "math" "math/rand" + "path" "strconv" "sync" "time" @@ -111,6 +112,7 @@ func newBinlogMeta() *binlogMeta { func clearEtcd(rootPath string) error { etcdAddr := Params.EtcdAddress + log.Info("etcd tests address", zap.String("address", etcdAddr)) etcdClient, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}}) if err != nil { return err @@ -421,6 +423,8 @@ type AllocatorFactory struct { r *rand.Rand } +var _ allocatorInterface = &AllocatorFactory{} + func NewAllocatorFactory(id ...UniqueID) *AllocatorFactory { f := &AllocatorFactory{ r: rand.New(rand.NewSource(time.Now().UnixNano())), @@ -435,6 +439,24 @@ func (alloc *AllocatorFactory) allocID() (UniqueID, error) { return alloc.r.Int63n(1000000), nil } +func (alloc *AllocatorFactory) genKey(isalloc bool, ids ...UniqueID) (key string, err error) { + if isalloc { + idx, err := alloc.allocID() + if err != nil { + return "", err + } + ids = append(ids, idx) + } + + idStr := make([]string, len(ids)) + for _, id := range ids { + idStr = append(idStr, strconv.FormatInt(id, 10)) + } + + key = path.Join(idStr...) + return +} + func (m *MasterServiceFactory) setID(id UniqueID) { m.ID = id // GOOSE TODO: random ID generator }