diff --git a/internal/datanode/collection.go b/internal/datanode/collection.go index 24ebb593b4fdc06f5e8a2feaf82b4809048fe5ea..3610dc0b81840aa0bf3a0c04e59baae4fcfdca1e 100644 --- a/internal/datanode/collection.go +++ b/internal/datanode/collection.go @@ -20,6 +20,10 @@ func (c *Collection) ID() UniqueID { return c.id } +func (c *Collection) Schema() *schemapb.CollectionSchema { + return c.schema +} + func newCollection(collectionID UniqueID, schemaStr string) *Collection { var schema schemapb.CollectionSchema diff --git a/internal/datanode/collection_replica.go b/internal/datanode/collection_replica.go index 119fb86ba520857f32f92a1ab97cef34c6fb02bf..f73ca597ff5fe419ecdec130a2b85a7995a4c57b 100644 --- a/internal/datanode/collection_replica.go +++ b/internal/datanode/collection_replica.go @@ -16,21 +16,25 @@ type collectionReplica interface { removeCollection(collectionID UniqueID) error getCollectionByID(collectionID UniqueID) (*Collection, error) getCollectionByName(collectionName string) (*Collection, error) + getCollectionIDByName(collectionName string) (UniqueID, error) hasCollection(collectionID UniqueID) bool // segment - addSegment(segmentID UniqueID) error + addSegment(segmentID UniqueID, collName string, partitionName string) error removeSegment(segmentID UniqueID) error hasSegment(segmentID UniqueID) bool updateSegmentRowNums(segmentID UniqueID, numRows int64) error getSegmentStatisticsUpdates(segmentID UniqueID) (*internalpb2.SegmentStatisticsUpdates, error) + getSegmentByID(segmentID UniqueID) (*Segment, error) } type ( Segment struct { - segmentID UniqueID - numRows int64 - memorySize int64 + segmentID UniqueID + collectionID UniqueID + partitionName string + numRows int64 + memorySize int64 } collectionReplicaImpl struct { @@ -42,20 +46,40 @@ type ( //----------------------------------------------------------------------------------------------------- collection -func (colReplica *collectionReplicaImpl) addSegment(segmentID UniqueID) error { +func (colReplica *collectionReplicaImpl) getSegmentByID(segmentID UniqueID) (*Segment, error) { + // GOOSE TODO: read write lock colReplica.mu.RLock() defer colReplica.mu.RUnlock() + + for _, segment := range colReplica.segments { + if segment.segmentID == segmentID { + return segment, nil + } + } + return nil, errors.Errorf("cannot find segment, id = %v", segmentID) +} + +func (colReplica *collectionReplicaImpl) addSegment(segmentID UniqueID, collName string, partitionName string) error { + colReplica.mu.Lock() + defer colReplica.mu.Unlock() log.Println("Add Segment", segmentID) + collID, err := colReplica.getCollectionIDByName(collName) + if err != nil { + return err + } + seg := &Segment{ - segmentID: segmentID, + segmentID: segmentID, + collectionID: collID, + partitionName: partitionName, } colReplica.segments = append(colReplica.segments, seg) return nil } func (colReplica *collectionReplicaImpl) removeSegment(segmentID UniqueID) error { - colReplica.mu.RLock() - defer colReplica.mu.RUnlock() + colReplica.mu.Lock() + defer colReplica.mu.Unlock() for index, ele := range colReplica.segments { if ele.segmentID == segmentID { @@ -82,8 +106,8 @@ func (colReplica *collectionReplicaImpl) hasSegment(segmentID UniqueID) bool { } func (colReplica *collectionReplicaImpl) updateSegmentRowNums(segmentID UniqueID, numRows int64) error { - colReplica.mu.RLock() - defer colReplica.mu.RUnlock() + colReplica.mu.Lock() + defer colReplica.mu.Unlock() for _, ele := range colReplica.segments { if ele.segmentID == segmentID { @@ -131,6 +155,19 @@ func (colReplica *collectionReplicaImpl) addCollection(collectionID UniqueID, sc return nil } +func (colReplica *collectionReplicaImpl) getCollectionIDByName(collName string) (UniqueID, error) { + colReplica.mu.RLock() + defer colReplica.mu.RUnlock() + + for _, collection := range colReplica.collections { + if collection.Name() == collName { + return collection.ID(), nil + } + } + return 0, errors.Errorf("There is no collection name=%v", collName) + +} + func (colReplica *collectionReplicaImpl) removeCollection(collectionID UniqueID) error { // GOOSE TODO: optimize colReplica.mu.Lock() diff --git a/internal/datanode/data_node.go b/internal/datanode/data_node.go index f54d76d2a9cfefbc8704d7dab4de29da75cad4da..edded79077522563c48c19962624d72baa440a51 100644 --- a/internal/datanode/data_node.go +++ b/internal/datanode/data_node.go @@ -8,15 +8,17 @@ import ( "github.com/opentracing/opentracing-go" "github.com/uber/jaeger-client-go" "github.com/uber/jaeger-client-go/config" + "github.com/zilliztech/milvus-distributed/internal/proto/datapb" + "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" ) type DataNode struct { - ctx context.Context - DataNodeID uint64 - dataSyncService *dataSyncService - flushSyncService *flushSyncService - metaService *metaService - replica collectionReplica + ctx context.Context + DataNodeID uint64 + dataSyncService *dataSyncService + metaService *metaService + + replica collectionReplica tracer opentracing.Tracer closer io.Closer @@ -31,19 +33,19 @@ func NewDataNode(ctx context.Context, dataNodeID uint64) *DataNode { } node := &DataNode{ - ctx: ctx, - DataNodeID: dataNodeID, - dataSyncService: nil, - flushSyncService: nil, - metaService: nil, - replica: replica, + ctx: ctx, + DataNodeID: dataNodeID, + dataSyncService: nil, + // metaService: nil, + replica: replica, } return node } -func Init() { +func (node *DataNode) Init() error { Params.Init() + return nil } func (node *DataNode) Start() error { @@ -68,21 +70,34 @@ func (node *DataNode) Start() error { // TODO GOOSE Init Size?? chanSize := 100 - ddChan := make(chan *ddlFlushSyncMsg, chanSize) - insertChan := make(chan *insertFlushSyncMsg, chanSize) + flushChan := make(chan *flushMsg, chanSize) - node.flushSyncService = newFlushSyncService(node.ctx, ddChan, insertChan) - node.dataSyncService = newDataSyncService(node.ctx, ddChan, insertChan, node.replica) + node.dataSyncService = newDataSyncService(node.ctx, flushChan, node.replica) node.metaService = newMetaService(node.ctx, node.replica) go node.dataSyncService.start() - go node.flushSyncService.start() + // go node.flushSyncService.start() node.metaService.start() return nil } -func (node *DataNode) Close() { +func (node *DataNode) WatchDmChannels(in *datapb.WatchDmChannelRequest) error { + // GOOSE TODO: Implement me + return nil +} + +func (node *DataNode) GetComponentStates() (*internalpb2.ComponentStates, error) { + // GOOSE TODO: Implement me + return nil, nil +} + +func (node *DataNode) FlushSegments(in *datapb.FlushSegRequest) error { + // GOOSE TODO: Implement me + return nil +} + +func (node *DataNode) Stop() error { <-node.ctx.Done() // close services @@ -93,5 +108,6 @@ func (node *DataNode) Close() { if node.closer != nil { node.closer.Close() } + return nil } diff --git a/internal/datanode/data_sync_service.go b/internal/datanode/data_sync_service.go index 8ecd8ca97f92cb5a7e152cfa181f5031db6fb683..40cf55371fb902d045887e8ce1b23449d774553c 100644 --- a/internal/datanode/data_sync_service.go +++ b/internal/datanode/data_sync_service.go @@ -4,26 +4,26 @@ import ( "context" "log" + etcdkv "github.com/zilliztech/milvus-distributed/internal/kv/etcd" "github.com/zilliztech/milvus-distributed/internal/util/flowgraph" + "go.etcd.io/etcd/clientv3" ) type dataSyncService struct { - ctx context.Context - fg *flowgraph.TimeTickedFlowGraph - ddChan chan *ddlFlushSyncMsg - insertChan chan *insertFlushSyncMsg - replica collectionReplica + ctx context.Context + fg *flowgraph.TimeTickedFlowGraph + flushChan chan *flushMsg + replica collectionReplica } -func newDataSyncService(ctx context.Context, - ddChan chan *ddlFlushSyncMsg, insertChan chan *insertFlushSyncMsg, replica collectionReplica) *dataSyncService { +func newDataSyncService(ctx context.Context, flushChan chan *flushMsg, + replica collectionReplica) *dataSyncService { return &dataSyncService{ - ctx: ctx, - fg: nil, - ddChan: ddChan, - insertChan: insertChan, - replica: replica, + ctx: ctx, + fg: nil, + flushChan: flushChan, + replica: replica, } } @@ -40,6 +40,17 @@ func (dsService *dataSyncService) close() { func (dsService *dataSyncService) initNodes() { // TODO: add delete pipeline support + // New metaTable + etcdClient, err := clientv3.New(clientv3.Config{Endpoints: []string{Params.EtcdAddress}}) + if err != nil { + panic(err) + } + + etcdKV := etcdkv.NewEtcdKV(etcdClient, Params.MetaRootPath) + mt, err := NewMetaTable(etcdKV) + if err != nil { + panic(err) + } dsService.fg = flowgraph.NewTimeTickedFlowGraph(dsService.ctx) @@ -48,8 +59,8 @@ func (dsService *dataSyncService) initNodes() { var filterDmNode Node = newFilteredDmNode() - var ddNode Node = newDDNode(dsService.ctx, dsService.ddChan, dsService.replica) - var insertBufferNode Node = newInsertBufferNode(dsService.ctx, dsService.insertChan, dsService.replica) + var ddNode Node = newDDNode(dsService.ctx, mt, dsService.flushChan, dsService.replica) + var insertBufferNode Node = newInsertBufferNode(dsService.ctx, mt, dsService.replica) var gcNode Node = newGCNode(dsService.replica) dsService.fg.AddNode(&dmStreamNode) @@ -62,7 +73,7 @@ func (dsService *dataSyncService) initNodes() { dsService.fg.AddNode(&gcNode) // dmStreamNode - var err = dsService.fg.SetEdges(dmStreamNode.Name(), + err = dsService.fg.SetEdges(dmStreamNode.Name(), []string{}, []string{filterDmNode.Name()}, ) diff --git a/internal/datanode/data_sync_service_test.go b/internal/datanode/data_sync_service_test.go index 7c1fbf27ac25ea8c7895b268ef7add7ec9049fa0..eeb002898880921c4cb299462e7f8da86655b91f 100644 --- a/internal/datanode/data_sync_service_test.go +++ b/internal/datanode/data_sync_service_test.go @@ -205,11 +205,11 @@ func TestDataSyncService_Start(t *testing.T) { assert.NoError(t, err) // dataSync - // replica := newReplica() - node.dataSyncService = newDataSyncService(node.ctx, nil, nil, node.replica) + Params.FlushInsertBufferSize = 1 + node.dataSyncService = newDataSyncService(node.ctx, nil, node.replica) go node.dataSyncService.start() - node.Close() + node.Stop() <-ctx.Done() } diff --git a/internal/datanode/flow_graph_dd_node.go b/internal/datanode/flow_graph_dd_node.go index 4cffa6bf87e2e863c40704fea2d0ef2ea1b740f9..b6a1eaa324bde2fe16fee18b3abaec49f9b44252 100644 --- a/internal/datanode/flow_graph_dd_node.go +++ b/internal/datanode/flow_graph_dd_node.go @@ -23,11 +23,12 @@ type ddNode struct { ddMsg *ddMsg ddRecords *ddRecords ddBuffer *ddBuffer - outCh chan *ddlFlushSyncMsg // for flush sync + inFlushCh chan *flushMsg idAllocator *allocator.IDAllocator kv kv.Base replica collectionReplica + flushMeta *metaTable } type ddData struct { @@ -80,21 +81,18 @@ func (ddNode *ddNode) Operate(in []*Msg) []*Msg { // TODO: add error handling } - var ddMsg = ddMsg{ + ddNode.ddMsg = &ddMsg{ collectionRecords: make(map[string][]metaOperateRecord), partitionRecords: make(map[string][]metaOperateRecord), timeRange: TimeRange{ timestampMin: msMsg.TimestampMin(), timestampMax: msMsg.TimestampMax(), }, - flushMessages: make([]*msgstream.FlushMsg, 0), - } - ddNode.ddMsg = &ddMsg - - gcRecord := gcRecord{ - collections: make([]UniqueID, 0), + flushMessages: make([]*flushMsg, 0), + gcRecord: &gcRecord{ + collections: make([]UniqueID, 0), + }, } - ddNode.ddMsg.gcRecord = &gcRecord // sort tsMessages tsMessages := msMsg.TsMessages() @@ -114,29 +112,23 @@ func (ddNode *ddNode) Operate(in []*Msg) []*Msg { ddNode.createPartition(msg.(*msgstream.CreatePartitionMsg)) case commonpb.MsgType_kDropPartition: ddNode.dropPartition(msg.(*msgstream.DropPartitionMsg)) - case commonpb.MsgType_kFlush: - fMsg := msg.(*msgstream.FlushMsg) - flushSegID := fMsg.SegmentID - ddMsg.flushMessages = append(ddMsg.flushMessages, fMsg) - ddNode.flush() - - log.Println(".. manual flush completed ...") - ddlFlushMsg := &ddlFlushSyncMsg{ - flushCompleted: true, - ddlBinlogPathMsg: ddlBinlogPathMsg{ - segID: flushSegID, - }, - } - - ddNode.outCh <- ddlFlushMsg - default: log.Println("Non supporting message type:", msg.Type()) } } + select { + case fmsg := <-ddNode.inFlushCh: + log.Println(". receive flush message, flushing ...") + ddNode.flush() + ddNode.ddMsg.flushMessages = append(ddNode.ddMsg.flushMessages, fmsg) + default: + log.Println("..........default do nothing") + } + // generate binlog if ddNode.ddBuffer.full() { + log.Println(". dd buffer full, auto flushing ...") ddNode.flush() } @@ -146,7 +138,6 @@ func (ddNode *ddNode) Operate(in []*Msg) []*Msg { func (ddNode *ddNode) flush() { // generate binlog - log.Println(". dd buffer full or receive Flush msg ...") ddCodec := &storage.DataDefinitionCodec{} for collectionID, data := range ddNode.ddBuffer.ddData { // buffer data to binlog @@ -196,15 +187,7 @@ func (ddNode *ddNode) flush() { } log.Println("save dd binlog, key = ", ddKey) - ddlFlushMsg := &ddlFlushSyncMsg{ - flushCompleted: false, - ddlBinlogPathMsg: ddlBinlogPathMsg{ - collID: collectionID, - paths: []string{timestampKey, ddKey}, - }, - } - - ddNode.outCh <- ddlFlushMsg + ddNode.flushMeta.AppendDDLBinlogPaths(collectionID, []string{timestampKey, ddKey}) } } @@ -328,9 +311,14 @@ func (ddNode *ddNode) createPartition(msg *msgstream.CreatePartitionMsg) { } } - ddNode.ddBuffer.ddData[collectionID].ddRequestString = append(ddNode.ddBuffer.ddData[collectionID].ddRequestString, msg.CreatePartitionRequest.String()) - ddNode.ddBuffer.ddData[collectionID].timestamps = append(ddNode.ddBuffer.ddData[collectionID].timestamps, msg.Base.Timestamp) - ddNode.ddBuffer.ddData[collectionID].eventTypes = append(ddNode.ddBuffer.ddData[collectionID].eventTypes, storage.CreatePartitionEventType) + ddNode.ddBuffer.ddData[collectionID].ddRequestString = + append(ddNode.ddBuffer.ddData[collectionID].ddRequestString, msg.CreatePartitionRequest.String()) + + ddNode.ddBuffer.ddData[collectionID].timestamps = + append(ddNode.ddBuffer.ddData[collectionID].timestamps, msg.Base.Timestamp) + + ddNode.ddBuffer.ddData[collectionID].eventTypes = + append(ddNode.ddBuffer.ddData[collectionID].eventTypes, storage.CreatePartitionEventType) } func (ddNode *ddNode) dropPartition(msg *msgstream.DropPartitionMsg) { @@ -361,12 +349,18 @@ func (ddNode *ddNode) dropPartition(msg *msgstream.DropPartitionMsg) { } } - ddNode.ddBuffer.ddData[collectionID].ddRequestString = append(ddNode.ddBuffer.ddData[collectionID].ddRequestString, msg.DropPartitionRequest.String()) - ddNode.ddBuffer.ddData[collectionID].timestamps = append(ddNode.ddBuffer.ddData[collectionID].timestamps, msg.Base.Timestamp) - ddNode.ddBuffer.ddData[collectionID].eventTypes = append(ddNode.ddBuffer.ddData[collectionID].eventTypes, storage.DropPartitionEventType) + ddNode.ddBuffer.ddData[collectionID].ddRequestString = + append(ddNode.ddBuffer.ddData[collectionID].ddRequestString, msg.DropPartitionRequest.String()) + + ddNode.ddBuffer.ddData[collectionID].timestamps = + append(ddNode.ddBuffer.ddData[collectionID].timestamps, msg.Base.Timestamp) + + ddNode.ddBuffer.ddData[collectionID].eventTypes = + append(ddNode.ddBuffer.ddData[collectionID].eventTypes, storage.DropPartitionEventType) } -func newDDNode(ctx context.Context, outCh chan *ddlFlushSyncMsg, replica collectionReplica) *ddNode { +func newDDNode(ctx context.Context, flushMeta *metaTable, + inFlushCh chan *flushMsg, replica collectionReplica) *ddNode { maxQueueLength := Params.FlowGraphMaxQueueLength maxParallelism := Params.FlowGraphMaxParallelism @@ -409,10 +403,12 @@ func newDDNode(ctx context.Context, outCh chan *ddlFlushSyncMsg, replica collect ddData: make(map[UniqueID]*ddData), maxSize: Params.FlushDdBufferSize, }, - outCh: outCh, + // outCh: outCh, + inFlushCh: inFlushCh, idAllocator: idAllocator, kv: minioKV, replica: replica, + flushMeta: flushMeta, } } diff --git a/internal/datanode/flow_graph_dd_node_test.go b/internal/datanode/flow_graph_dd_node_test.go index 6df71a507614de9a720f3176eee72bacf439a14d..b1d1dc24a83f1064a521a9d426c3d2c5f408fbb7 100644 --- a/internal/datanode/flow_graph_dd_node_test.go +++ b/internal/datanode/flow_graph_dd_node_test.go @@ -2,11 +2,12 @@ package datanode import ( "context" + "log" "testing" "time" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.etcd.io/etcd/clientv3" etcdkv "github.com/zilliztech/milvus-distributed/internal/kv/etcd" "github.com/zilliztech/milvus-distributed/internal/msgstream" @@ -15,6 +16,14 @@ import ( "github.com/zilliztech/milvus-distributed/internal/util/flowgraph" ) +func newMetaTable() *metaTable { + etcdClient, _ := clientv3.New(clientv3.Config{Endpoints: []string{Params.EtcdAddress}}) + + etcdKV := etcdkv.NewEtcdKV(etcdClient, Params.MetaRootPath) + mt, _ := NewMetaTable(etcdKV) + return mt +} + func TestFlowGraphDDNode_Operate(t *testing.T) { newMeta() const ctxTimeInMillisecond = 2000 @@ -30,22 +39,17 @@ func TestFlowGraphDDNode_Operate(t *testing.T) { ctx = context.Background() } - ddChan := make(chan *ddlFlushSyncMsg, 10) - defer close(ddChan) - insertChan := make(chan *insertFlushSyncMsg, 10) - defer close(insertChan) + inFlushCh := make(chan *flushMsg, 10) + defer close(inFlushCh) testPath := "/test/datanode/root/meta" err := clearEtcd(testPath) require.NoError(t, err) Params.MetaRootPath = testPath - fService := newFlushSyncService(ctx, ddChan, insertChan) - assert.Equal(t, testPath, fService.metaTable.client.(*etcdkv.EtcdKV).GetPath(".")) - go fService.start() Params.FlushDdBufferSize = 4 replica := newReplica() - ddNode := newDDNode(ctx, ddChan, replica) + ddNode := newDDNode(ctx, newMetaTable(), inFlushCh, replica) colID := UniqueID(0) colName := "col-test-0" @@ -135,21 +139,11 @@ func TestFlowGraphDDNode_Operate(t *testing.T) { DropPartitionRequest: dropPartitionReq, } - flushMsg := msgstream.FlushMsg{ - BaseMsg: msgstream.BaseMsg{ - BeginTimestamp: Timestamp(5), - EndTimestamp: Timestamp(5), - HashValues: []uint32{uint32(0)}, - }, - FlushMsg: internalpb2.FlushMsg{ - Base: &commonpb.MsgBase{ - MsgType: commonpb.MsgType_kFlush, - MsgID: 1, - Timestamp: 6, - SourceID: 1, - }, - SegmentID: 1, - }, + inFlushCh <- &flushMsg{ + msgID: 1, + Timestamp: 6, + segmentIDs: []UniqueID{1}, + collectionID: UniqueID(1), } tsMessages := make([]msgstream.TsMsg, 0) @@ -157,8 +151,38 @@ func TestFlowGraphDDNode_Operate(t *testing.T) { tsMessages = append(tsMessages, msgstream.TsMsg(&dropColMsg)) tsMessages = append(tsMessages, msgstream.TsMsg(&createPartitionMsg)) tsMessages = append(tsMessages, msgstream.TsMsg(&dropPartitionMsg)) - tsMessages = append(tsMessages, msgstream.TsMsg(&flushMsg)) msgStream := flowgraph.GenerateMsgStreamMsg(tsMessages, Timestamp(0), Timestamp(3)) var inMsg Msg = msgStream ddNode.Operate([]*Msg{&inMsg}) } + +func clearEtcd(rootPath string) error { + etcdAddr := Params.EtcdAddress + etcdClient, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}}) + if err != nil { + return err + } + etcdKV := etcdkv.NewEtcdKV(etcdClient, rootPath) + + err = etcdKV.RemoveWithPrefix("writer/segment") + if err != nil { + return err + } + _, _, err = etcdKV.LoadWithPrefix("writer/segment") + if err != nil { + return err + } + log.Println("Clear ETCD with prefix writer/segment ") + + err = etcdKV.RemoveWithPrefix("writer/ddl") + if err != nil { + return err + } + _, _, err = etcdKV.LoadWithPrefix("writer/ddl") + if err != nil { + return err + } + log.Println("Clear ETCD with prefix writer/ddl") + return nil + +} diff --git a/internal/datanode/flow_graph_filter_dm_node.go b/internal/datanode/flow_graph_filter_dm_node.go index ad61a29772310f50b3ea93e7357e8fdab9579073..4706275b6a8eadecd1c4378397d4775cb1d56b2e 100644 --- a/internal/datanode/flow_graph_filter_dm_node.go +++ b/internal/datanode/flow_graph_filter_dm_node.go @@ -66,21 +66,14 @@ func (fdmNode *filterDmNode) Operate(in []*Msg) []*Msg { var iMsg = insertMsg{ insertMessages: make([]*msgstream.InsertMsg, 0), - flushMessages: make([]*msgstream.FlushMsg, 0), + flushMessages: make([]*flushMsg, 0), timeRange: TimeRange{ timestampMin: msgStreamMsg.TimestampMin(), timestampMax: msgStreamMsg.TimestampMax(), }, } - for _, fmsg := range ddMsg.flushMessages { - switch fmsg.Type() { - case commonpb.MsgType_kFlush: - iMsg.flushMessages = append(iMsg.flushMessages, fmsg) - default: - log.Println("Non supporting message type:", fmsg.Type()) - } - } + iMsg.flushMessages = append(iMsg.flushMessages, ddMsg.flushMessages...) for key, msg := range msgStreamMsg.TsMessages() { switch msg.Type() { diff --git a/internal/datanode/flow_graph_insert_buffer_node.go b/internal/datanode/flow_graph_insert_buffer_node.go index 20ffa426d3c1b6bcf4165dc2073b0b7fd20368df..e5064628eeef901c3e21cf468d5d8f6e8ef102c0 100644 --- a/internal/datanode/flow_graph_insert_buffer_node.go +++ b/internal/datanode/flow_graph_insert_buffer_node.go @@ -16,6 +16,7 @@ import ( oplog "github.com/opentracing/opentracing-go/log" "github.com/zilliztech/milvus-distributed/internal/allocator" + "github.com/zilliztech/milvus-distributed/internal/errors" "github.com/zilliztech/milvus-distributed/internal/kv" miniokv "github.com/zilliztech/milvus-distributed/internal/kv/minio" "github.com/zilliztech/milvus-distributed/internal/msgstream" @@ -38,14 +39,17 @@ type ( insertBufferNode struct { BaseNode - insertBuffer *insertBuffer - minIOKV kv.Base - minioPrifex string - idAllocator *allocator.IDAllocator - outCh chan *insertFlushSyncMsg - pulsarDataNodeTimeTickStream msgstream.MsgStream - segmentStatisticsStream msgstream.MsgStream - replica collectionReplica + insertBuffer *insertBuffer + replica collectionReplica + flushMeta *metaTable + + minIOKV kv.Base + minioPrefix string + + idAllocator *allocator.IDAllocator + + timeTickStream msgstream.MsgStream + segmentStatisticsStream msgstream.MsgStream } insertBuffer struct { @@ -101,11 +105,14 @@ func (ibNode *insertBufferNode) Operate(in []*Msg) []*Msg { // TODO: add error handling } + // Updating segment statistics uniqueSeg := make(map[UniqueID]bool) for _, msg := range iMsg.insertMessages { currentSegID := msg.GetSegmentID() + collName := msg.GetCollectionName() + partitionName := msg.GetPartitionName() if !ibNode.replica.hasSegment(currentSegID) { - err := ibNode.replica.addSegment(currentSegID) + err := ibNode.replica.addSegment(currentSegID, collName, partitionName) if err != nil { log.Println("Error: add segment error") } @@ -164,7 +171,7 @@ func (ibNode *insertBufferNode) Operate(in []*Msg) []*Msg { continue } - collectionID := collection.ID() + // collectionID := collection.ID() GOOSE TODO remove collSchema := collection.schema // 1.2 Get Fields var pos int = 0 // Record position of blob @@ -410,73 +417,12 @@ func (ibNode *insertBufferNode) Operate(in []*Msg) []*Msg { span.LogFields(oplog.String("generate binlogs", "generate binlogs")) if ibNode.insertBuffer.full(currentSegID) { log.Printf(". Insert Buffer full, auto flushing (%v) rows of data...", ibNode.insertBuffer.size(currentSegID)) - // partitionTag -> partitionID - partitionTag := msg.GetPartitionName() - partitionID, err := typeutil.Hash32String(partitionTag) - if err != nil { - log.Println("partitionTag to partitionID wrong") - // TODO GOOSE add error handler - } - collMeta := &etcdpb.CollectionMeta{ - Schema: collSchema, - ID: collectionID, - } - inCodec := storage.NewInsertCodec(collMeta) - - // buffer data to binlogs - binLogs, err := inCodec.Serialize(partitionID, - currentSegID, ibNode.insertBuffer.insertData[currentSegID]) + err = ibNode.flushSegment(currentSegID, msg.GetPartitionName(), collection.ID()) if err != nil { - log.Println("generate binlog wrong: ", err) - } - - // clear buffer - delete(ibNode.insertBuffer.insertData, currentSegID) - log.Println(".. Clearing buffer") - - // 1.5.2 binLogs -> minIO/S3 - collIDStr := strconv.FormatInt(collectionID, 10) - partitionIDStr := strconv.FormatInt(partitionID, 10) - segIDStr := strconv.FormatInt(currentSegID, 10) - keyPrefix := path.Join(ibNode.minioPrifex, collIDStr, partitionIDStr, segIDStr) - - log.Printf(".. Saving (%v) binlogs to MinIO ...", len(binLogs)) - for index, blob := range binLogs { - uid, err := ibNode.idAllocator.AllocOne() - if err != nil { - log.Println("Allocate Id failed") - // GOOSE TODO error handler - } - - key := path.Join(keyPrefix, blob.Key, strconv.FormatInt(uid, 10)) - err = ibNode.minIOKV.Save(key, string(blob.Value[:])) - if err != nil { - log.Println("Save to MinIO failed") - // GOOSE TODO error handler - } - - fieldID, err := strconv.ParseInt(blob.Key, 10, 32) - if err != nil { - log.Println("string to fieldID wrong") - // GOOSE TODO error handler - } - - inBinlogMsg := &insertFlushSyncMsg{ - flushCompleted: false, - insertBinlogPathMsg: insertBinlogPathMsg{ - ts: iMsg.timeRange.timestampMax, - segID: currentSegID, - fieldID: fieldID, - paths: []string{key}, - }, - } - - log.Println("... Appending binlog paths ...", index) - ibNode.outCh <- inBinlogMsg + log.Printf("flush segment (%v) fail: %v", currentSegID, err) } } - span.Finish() } if len(iMsg.insertMessages) > 0 { @@ -491,96 +437,28 @@ func (ibNode *insertBufferNode) Operate(in []*Msg) []*Msg { } } - // iMsg is Flush() msg from master + // iMsg is Flush() msg from dataservice // 1. insertBuffer(not empty) -> binLogs -> minIO/S3 for _, msg := range iMsg.flushMessages { - currentSegID := msg.GetSegmentID() - flushTs := msg.Base.GetTimestamp() - partitionTag := msg.GetPartitionTag() - collectionID := msg.GetCollectionID() - log.Printf(". Receiving flush message segID(%v)...", currentSegID) - - if ibNode.insertBuffer.size(currentSegID) > 0 { - log.Println(".. Buffer not empty, flushing ...") - collSchema, err := ibNode.getCollectionSchemaByID(collectionID) - if err != nil { - // GOOSE TODO add error handler - log.Println("aaa, Get meta wrong: ", err) - } - collMeta := &etcdpb.CollectionMeta{ - Schema: collSchema, - ID: collectionID, - } - inCodec := storage.NewInsertCodec(collMeta) - - // partitionTag -> partitionID - partitionID, err := typeutil.Hash32String(partitionTag) - if err != nil { - // GOOSE TODO add error handler - log.Println("partitionTag to partitionID Wrong: ", err) - } - - // buffer data to binlogs - binLogs, err := inCodec.Serialize(partitionID, - currentSegID, ibNode.insertBuffer.insertData[currentSegID]) - if err != nil { - log.Println("generate binlog wrong: ", err) - } - - // clear buffer - delete(ibNode.insertBuffer.insertData, currentSegID) - - // binLogs -> minIO/S3 - collIDStr := strconv.FormatInt(collectionID, 10) - partitionIDStr := strconv.FormatInt(partitionID, 10) - segIDStr := strconv.FormatInt(currentSegID, 10) - keyPrefix := path.Join(ibNode.minioPrifex, collIDStr, partitionIDStr, segIDStr) - - for _, blob := range binLogs { - uid, err := ibNode.idAllocator.AllocOne() + // flushTs := msg.Timestamp + for _, currentSegID := range msg.segmentIDs { + log.Printf(". Receiving flush message segID(%v)...", currentSegID) + if ibNode.insertBuffer.size(currentSegID) > 0 { + log.Println(".. Buffer not empty, flushing ...") + seg, err := ibNode.replica.getSegmentByID(currentSegID) if err != nil { - log.Println("Allocate Id failed") - // GOOSE TODO error handler + log.Printf("flush segment fail: %v", err) + continue } - key := path.Join(keyPrefix, blob.Key, strconv.FormatInt(uid, 10)) - err = ibNode.minIOKV.Save(key, string(blob.Value[:])) + err = ibNode.flushSegment(currentSegID, seg.partitionName, seg.collectionID) if err != nil { - log.Println("Save to MinIO failed") - // GOOSE TODO error handler + log.Printf("flush segment (%v) fail: %v", currentSegID, err) + continue } - fieldID, err := strconv.ParseInt(blob.Key, 10, 32) - if err != nil { - log.Println("string to fieldID wrong") - // GOOSE TODO error handler - } - - // Append binlogs - inBinlogMsg := &insertFlushSyncMsg{ - flushCompleted: false, - insertBinlogPathMsg: insertBinlogPathMsg{ - ts: flushTs, - segID: currentSegID, - fieldID: fieldID, - paths: []string{key}, - }, - } - ibNode.outCh <- inBinlogMsg } } - - // Flushed - log.Println(".. Flush finished ...") - inBinlogMsg := &insertFlushSyncMsg{ - flushCompleted: true, - insertBinlogPathMsg: insertBinlogPathMsg{ - ts: flushTs, - segID: currentSegID, - }, - } - - ibNode.outCh <- inBinlogMsg } if err := ibNode.writeHardTimeTick(iMsg.timeRange.timestampMax); err != nil { @@ -595,6 +473,67 @@ func (ibNode *insertBufferNode) Operate(in []*Msg) []*Msg { return []*Msg{&res} } +func (ibNode *insertBufferNode) flushSegment(segID UniqueID, partitionName string, collID UniqueID) error { + // partitionName -> partitionID GOOSE TODO: remove this + partitionID, err := typeutil.Hash32String(partitionName) + if err != nil { + return errors.Errorf("partitionName to partitionID wrong, %v", err) + } + + coll, err := ibNode.replica.getCollectionByID(collID) + if err != nil { + return errors.Errorf("Get collection by ID wrong, %v", err) + } + + collMeta := &etcdpb.CollectionMeta{ + Schema: coll.schema, + ID: collID, + } + + inCodec := storage.NewInsertCodec(collMeta) + + // buffer data to binlogs + binLogs, err := inCodec.Serialize(partitionID, + segID, ibNode.insertBuffer.insertData[segID]) + + if err != nil { + return errors.Errorf("generate binlog wrong: %v", err) + } + + // clear buffer + delete(ibNode.insertBuffer.insertData, segID) + log.Println(".. Clearing buffer") + + // 1.5.2 binLogs -> minIO/S3 + collIDStr := strconv.FormatInt(coll.ID(), 10) + partitionIDStr := strconv.FormatInt(partitionID, 10) + segIDStr := strconv.FormatInt(segID, 10) + keyPrefix := path.Join(ibNode.minioPrefix, collIDStr, partitionIDStr, segIDStr) + + log.Printf(".. Saving (%v) binlogs to MinIO ...", len(binLogs)) + for index, blob := range binLogs { + uid, err := ibNode.idAllocator.AllocOne() + if err != nil { + return errors.Errorf("Allocate Id failed, %v", err) + } + + key := path.Join(keyPrefix, blob.Key, strconv.FormatInt(uid, 10)) + err = ibNode.minIOKV.Save(key, string(blob.Value[:])) + if err != nil { + return errors.Errorf("Save to MinIO failed, %v", err) + } + + fieldID, err := strconv.ParseInt(blob.Key, 10, 32) + if err != nil { + return errors.Errorf("string to fieldID wrong, %v", err) + } + + log.Println("... Appending binlog paths ...", index) + ibNode.flushMeta.AppendSegBinlogPaths(segID, fieldID, []string{key}) + } + return nil +} + func (ibNode *insertBufferNode) writeHardTimeTick(ts Timestamp) error { msgPack := msgstream.MsgPack{} timeTickMsg := msgstream.TimeTickMsg{ @@ -613,7 +552,7 @@ func (ibNode *insertBufferNode) writeHardTimeTick(ts Timestamp) error { }, } msgPack.Msgs = append(msgPack.Msgs, &timeTickMsg) - return ibNode.pulsarDataNodeTimeTickStream.Produce(&msgPack) + return ibNode.timeTickStream.Produce(&msgPack) } func (ibNode *insertBufferNode) updateSegStatistics(segIDs []UniqueID) error { @@ -671,7 +610,8 @@ func (ibNode *insertBufferNode) getCollectionSchemaByName(collectionName string) return ret.schema, nil } -func newInsertBufferNode(ctx context.Context, outCh chan *insertFlushSyncMsg, replica collectionReplica) *insertBufferNode { +func newInsertBufferNode(ctx context.Context, + flushMeta *metaTable, replica collectionReplica) *insertBufferNode { maxQueueLength := Params.FlowGraphMaxQueueLength maxParallelism := Params.FlowGraphMaxParallelism @@ -710,36 +650,29 @@ func newInsertBufferNode(ctx context.Context, outCh chan *insertFlushSyncMsg, re panic(err) } - // GOOSE TODO: Pulsar stream Start() ??? //input stream, data node time tick wTt := pulsarms.NewPulsarMsgStream(ctx, 1024) wTt.SetPulsarClient(Params.PulsarAddress) wTt.CreatePulsarProducers([]string{Params.TimeTickChannelName}) var wTtMsgStream msgstream.MsgStream = wTt - // var wTtMsgStream pulsarms.PulsarMsgStream = *wTt wTtMsgStream.Start() - // wTt.Start() // update statistics channel segS := pulsarms.NewPulsarMsgStream(ctx, Params.SegmentStatisticsBufSize) segS.SetPulsarClient(Params.PulsarAddress) segS.CreatePulsarProducers([]string{Params.SegmentStatisticsChannelName}) var segStatisticsMsgStream msgstream.MsgStream = segS - // var segStatisticsMsgStream pulsarms.PulsarMsgStream = segS segStatisticsMsgStream.Start() - // segS.Start() return &insertBufferNode{ - BaseNode: baseNode, - insertBuffer: iBuffer, - minIOKV: minIOKV, - minioPrifex: minioPrefix, - idAllocator: idAllocator, - outCh: outCh, - pulsarDataNodeTimeTickStream: wTtMsgStream, - segmentStatisticsStream: segStatisticsMsgStream, - // pulsarDataNodeTimeTickStream: wTt, - // segmentStatisticsStream: segS, - replica: replica, + BaseNode: baseNode, + insertBuffer: iBuffer, + minIOKV: minIOKV, + minioPrefix: minioPrefix, + idAllocator: idAllocator, + timeTickStream: wTtMsgStream, + segmentStatisticsStream: segStatisticsMsgStream, + replica: replica, + flushMeta: flushMeta, } } diff --git a/internal/datanode/flow_graph_insert_buffer_node_test.go b/internal/datanode/flow_graph_insert_buffer_node_test.go index fa1e1aa73f0aa80c0cd1235b93546a55b475a59a..236c1bb6113f5aa8efb1ed671dca676fde414739 100644 --- a/internal/datanode/flow_graph_insert_buffer_node_test.go +++ b/internal/datanode/flow_graph_insert_buffer_node_test.go @@ -10,9 +10,7 @@ import ( "time" "github.com/golang/protobuf/proto" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - etcdkv "github.com/zilliztech/milvus-distributed/internal/kv/etcd" "github.com/zilliztech/milvus-distributed/internal/msgstream" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" @@ -34,18 +32,10 @@ func TestFlowGraphInsertBufferNode_Operate(t *testing.T) { ctx = context.Background() } - ddChan := make(chan *ddlFlushSyncMsg, 10) - defer close(ddChan) - insertChan := make(chan *insertFlushSyncMsg, 10) - defer close(insertChan) - testPath := "/test/datanode/root/meta" err := clearEtcd(testPath) require.NoError(t, err) Params.MetaRootPath = testPath - fService := newFlushSyncService(ctx, ddChan, insertChan) - assert.Equal(t, testPath, fService.metaTable.client.(*etcdkv.EtcdKV).GetPath(".")) - go fService.start() collMeta := newMeta() schemaBlob := proto.MarshalTextString(collMeta.Schema) @@ -56,7 +46,7 @@ func TestFlowGraphInsertBufferNode_Operate(t *testing.T) { require.NoError(t, err) // Params.FlushInsertBufSize = 2 - iBNode := newInsertBufferNode(ctx, insertChan, replica) + iBNode := newInsertBufferNode(ctx, newMetaTable(), replica) inMsg := genInsertMsg() var iMsg flowgraph.Msg = &inMsg iBNode.Operate([]*flowgraph.Msg{&iMsg}) @@ -148,7 +138,7 @@ func genInsertMsg() insertMsg { var iMsg = &insertMsg{ insertMessages: make([]*msgstream.InsertMsg, 0), - flushMessages: make([]*msgstream.FlushMsg, 0), + flushMessages: make([]*flushMsg, 0), timeRange: TimeRange{ timestampMin: timeRange.timestampMin, timestampMax: timeRange.timestampMax, @@ -203,25 +193,14 @@ func genInsertMsg() insertMsg { iMsg.insertMessages = append(iMsg.insertMessages, msg) } - var fmsg msgstream.FlushMsg = msgstream.FlushMsg{ - BaseMsg: msgstream.BaseMsg{ - HashValues: []uint32{ - uint32(10), - }, - }, - FlushMsg: internalpb2.FlushMsg{ - Base: &commonpb.MsgBase{ - MsgType: commonpb.MsgType_kFlush, - MsgID: 1, - Timestamp: 2000, - SourceID: 1, - }, - SegmentID: UniqueID(1), - CollectionID: UniqueID(1), - PartitionTag: "default", - }, + fmsg := &flushMsg{ + msgID: 1, + Timestamp: 2000, + segmentIDs: []UniqueID{1}, + collectionID: UniqueID(1), } - iMsg.flushMessages = append(iMsg.flushMessages, &fmsg) + + iMsg.flushMessages = append(iMsg.flushMessages, fmsg) return *iMsg } diff --git a/internal/datanode/flow_graph_message.go b/internal/datanode/flow_graph_message.go index a547dc9c305ed0d9075ed8f68a0f2f619a06f3fc..a4b762a7dfccb3d935db0c2ac63fa57694a9f6be 100644 --- a/internal/datanode/flow_graph_message.go +++ b/internal/datanode/flow_graph_message.go @@ -21,7 +21,7 @@ type ( collectionRecords map[string][]metaOperateRecord // TODO: use partition id partitionRecords map[string][]metaOperateRecord - flushMessages []*msgstream.FlushMsg + flushMessages []*flushMsg gcRecord *gcRecord timeRange TimeRange } @@ -33,7 +33,7 @@ type ( insertMsg struct { insertMessages []*msgstream.InsertMsg - flushMessages []*msgstream.FlushMsg + flushMessages []*flushMsg gcRecord *gcRecord timeRange TimeRange } @@ -51,6 +51,13 @@ type ( gcRecord struct { collections []UniqueID } + + flushMsg struct { + msgID UniqueID + Timestamp Timestamp + segmentIDs []UniqueID + collectionID UniqueID + } ) func (ksMsg *key2SegMsg) TimeTick() Timestamp { diff --git a/internal/datanode/flow_graph_msg_stream_input_node.go b/internal/datanode/flow_graph_msg_stream_input_node.go index 4e752025e5f54f43fa449a8d8ca5befb42f5605b..413d50796129b20663e77ba96132a97966a7fec6 100644 --- a/internal/datanode/flow_graph_msg_stream_input_node.go +++ b/internal/datanode/flow_graph_msg_stream_input_node.go @@ -20,11 +20,9 @@ func newDmInputNode(ctx context.Context) *flowgraph.InputNode { insertStream := pulsarms.NewPulsarTtMsgStream(ctx, receiveBufSize) - // TODO could panic of nil pointer insertStream.SetPulsarClient(msgStreamURL) unmarshalDispatcher := util.NewUnmarshalDispatcher() - // TODO could panic of nil pointer insertStream.CreatePulsarConsumers(consumeChannels, consumeSubName, unmarshalDispatcher, pulsarBufSize) var stream msgstream.MsgStream = insertStream diff --git a/internal/datanode/flush_msg.go b/internal/datanode/flush_msg.go deleted file mode 100644 index cbcfd8688130d304de161f1c987670cb8eab51ad..0000000000000000000000000000000000000000 --- a/internal/datanode/flush_msg.go +++ /dev/null @@ -1,39 +0,0 @@ -package datanode - -type ( - // segID: set when flushComplete == true, to tell - // the flush_sync_service which segFlush msg does this - // DDL flush for, so that ddl flush and insert flush - // will sync. - ddlBinlogPathMsg struct { - collID UniqueID - segID UniqueID - paths []string - } - - ddlFlushSyncMsg struct { - ddlBinlogPathMsg - flushCompleted bool - } - - insertBinlogPathMsg struct { - ts Timestamp - segID UniqueID - fieldID int64 // TODO GOOSE may need to change - paths []string - } - - // This Msg can notify flushSyncService - // 1.To append binary logs - // 2.To set flush-completed status - // - // When `flushComplete == false` - // `ts` means OpenTime of a segFlushMeta - // When `flushComplete == true` - // `ts` means CloseTime of a segFlushMeta, - // `fieldID` and `paths` need to be empty - insertFlushSyncMsg struct { - insertBinlogPathMsg - flushCompleted bool - } -) diff --git a/internal/datanode/flush_sync_service.go b/internal/datanode/flush_sync_service.go deleted file mode 100644 index dcee71a17aadc427f8ec811d2aab8b1c17a97605..0000000000000000000000000000000000000000 --- a/internal/datanode/flush_sync_service.go +++ /dev/null @@ -1,120 +0,0 @@ -package datanode - -import ( - "context" - "log" - - etcdkv "github.com/zilliztech/milvus-distributed/internal/kv/etcd" - "go.etcd.io/etcd/clientv3" -) - -type ( - flushSyncService struct { - ctx context.Context - metaTable *metaTable - ddChan chan *ddlFlushSyncMsg - insertChan chan *insertFlushSyncMsg - ddFlushed map[UniqueID]bool // Segment ID - insertFlushed map[UniqueID]bool // Segment ID - } -) - -func newFlushSyncService(ctx context.Context, - ddChan chan *ddlFlushSyncMsg, insertChan chan *insertFlushSyncMsg) *flushSyncService { - - service := &flushSyncService{ - ctx: ctx, - ddChan: ddChan, - insertChan: insertChan, - ddFlushed: make(map[UniqueID]bool), - insertFlushed: make(map[UniqueID]bool), - } - - // New metaTable - etcdAddr := Params.EtcdAddress - etcdClient, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}}) - if err != nil { - panic(err) - } - etcdKV := etcdkv.NewEtcdKV(etcdClient, Params.MetaRootPath) - metaKV, err2 := NewMetaTable(etcdKV) - if err2 != nil { - panic(err2) - } - - service.metaTable = metaKV - return service -} - -func (fService *flushSyncService) completeDDFlush(segID UniqueID) { - if _, ok := fService.ddFlushed[segID]; !ok { - fService.ddFlushed[segID] = true - return - } - - fService.ddFlushed[segID] = true -} - -func (fService *flushSyncService) completeInsertFlush(segID UniqueID) { - if _, ok := fService.insertFlushed[segID]; !ok { - fService.insertFlushed[segID] = true - return - } - fService.insertFlushed[segID] = true -} - -func (fService *flushSyncService) FlushCompleted(segID UniqueID) bool { - isddFlushed, ok := fService.ddFlushed[segID] - if !ok { - return false - } - - isinsertFlushed, ok := fService.insertFlushed[segID] - if !ok { - return false - } - return isddFlushed && isinsertFlushed -} - -func (fService *flushSyncService) start() { - for { - select { - case <-fService.ctx.Done(): - return - - case ddFlushMsg := <-fService.ddChan: - if ddFlushMsg == nil { - continue - } - if !ddFlushMsg.flushCompleted { - err := fService.metaTable.AppendDDLBinlogPaths(ddFlushMsg.collID, ddFlushMsg.paths) - if err != nil { - log.Println("Append segBinlog Error") - // GOOSE TODO error handling - } - continue - } - fService.completeDDFlush(ddFlushMsg.segID) - - case insertFlushMsg := <-fService.insertChan: - if insertFlushMsg == nil { - continue - } - if !insertFlushMsg.flushCompleted { - err := fService.metaTable.AppendSegBinlogPaths(insertFlushMsg.ts, insertFlushMsg.segID, insertFlushMsg.fieldID, - insertFlushMsg.paths) - if err != nil { - log.Println("Append segBinlog Error") - // GOOSE TODO error handling - } - continue - } - fService.completeInsertFlush(insertFlushMsg.segID) - - if fService.FlushCompleted(insertFlushMsg.segID) { - log.Printf("Seg(%d) flush completed.", insertFlushMsg.segID) - fService.metaTable.CompleteFlush(insertFlushMsg.ts, insertFlushMsg.segID) - } - } - } -} diff --git a/internal/datanode/flush_sync_service_test.go b/internal/datanode/flush_sync_service_test.go deleted file mode 100644 index 6e9a63f37254ee0adeaab036583ebe71556f8f7a..0000000000000000000000000000000000000000 --- a/internal/datanode/flush_sync_service_test.go +++ /dev/null @@ -1,166 +0,0 @@ -package datanode - -import ( - "context" - "log" - "testing" - "time" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "go.etcd.io/etcd/clientv3" - - etcdkv "github.com/zilliztech/milvus-distributed/internal/kv/etcd" -) - -func clearEtcd(rootPath string) error { - etcdAddr := Params.EtcdAddress - etcdClient, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}}) - if err != nil { - return err - } - etcdKV := etcdkv.NewEtcdKV(etcdClient, rootPath) - - err = etcdKV.RemoveWithPrefix("writer/segment") - if err != nil { - return err - } - _, _, err = etcdKV.LoadWithPrefix("writer/segment") - if err != nil { - return err - } - log.Println("Clear ETCD with prefix writer/segment ") - - err = etcdKV.RemoveWithPrefix("writer/ddl") - if err != nil { - return err - } - _, _, err = etcdKV.LoadWithPrefix("writer/ddl") - if err != nil { - return err - } - log.Println("Clear ETCD with prefix writer/ddl") - return nil - -} - -func TestFlushSyncService_Start(t *testing.T) { - const ctxTimeInMillisecond = 3000 - const closeWithDeadline = false - var ctx context.Context - var cancel context.CancelFunc - - if closeWithDeadline { - d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond) - ctx, cancel = context.WithDeadline(context.Background(), d) - defer cancel() - } else { - // ctx = context.Background() - ctx, cancel = context.WithCancel(context.Background()) - defer cancel() - } - - ddChan := make(chan *ddlFlushSyncMsg, 10) - defer close(ddChan) - insertChan := make(chan *insertFlushSyncMsg, 10) - defer close(insertChan) - - testPath := "/test/datanode/root/meta" - err := clearEtcd(testPath) - require.NoError(t, err) - Params.MetaRootPath = testPath - fService := newFlushSyncService(ctx, ddChan, insertChan) - assert.Equal(t, testPath, fService.metaTable.client.(*etcdkv.EtcdKV).GetPath(".")) - - t.Run("FlushSyncService", func(t *testing.T) { - go fService.start() - - SegID := UniqueID(100) - ddMsgs := genDdlFlushSyncMsgs(SegID) - insertMsgs := geninsertFlushSyncMsgs(SegID) - - for _, msg := range ddMsgs { - ddChan <- msg - time.Sleep(time.Millisecond * 50) - } - - for _, msg := range insertMsgs { - insertChan <- msg - time.Sleep(time.Millisecond * 50) - } - - for { - if len(ddChan) == 0 && len(insertChan) == 0 && fService.FlushCompleted(SegID) { - break - } - } - - ret, err := fService.metaTable.getSegBinlogPaths(SegID) - assert.NoError(t, err) - assert.Equal(t, map[int64][]string{ - 0: {"x", "y", "z"}, - 1: {"x", "y", "z"}, - 2: {"x", "y", "z"}, - 3: {"x", "y", "z"}, - 4: {"x", "y", "z"}, - }, ret) - - ts, err := fService.metaTable.getFlushOpenTime(SegID) - assert.NoError(t, err) - assert.Equal(t, Timestamp(1000), ts) - - ts, err = fService.metaTable.getFlushCloseTime(SegID) - assert.NoError(t, err) - assert.Equal(t, Timestamp(2010), ts) - - cp, err := fService.metaTable.checkFlushComplete(SegID) - assert.NoError(t, err) - assert.Equal(t, true, cp) - - }) -} - -func genDdlFlushSyncMsgs(segID UniqueID) []*ddlFlushSyncMsg { - ret := make([]*ddlFlushSyncMsg, 0) - for i := 0; i < 5; i++ { - ret = append(ret, &ddlFlushSyncMsg{ - flushCompleted: false, - ddlBinlogPathMsg: ddlBinlogPathMsg{ - collID: UniqueID(100), - paths: []string{"a", "b", "c"}, - }, - }) - } - - ret = append(ret, &ddlFlushSyncMsg{ - flushCompleted: true, - ddlBinlogPathMsg: ddlBinlogPathMsg{ - segID: segID, - }, - }) - return ret -} - -func geninsertFlushSyncMsgs(segID UniqueID) []*insertFlushSyncMsg { - ret := make([]*insertFlushSyncMsg, 0) - for i := 0; i < 5; i++ { - ret = append(ret, &insertFlushSyncMsg{ - flushCompleted: false, - insertBinlogPathMsg: insertBinlogPathMsg{ - ts: Timestamp(1000 + i), - segID: segID, - fieldID: int64(i), - paths: []string{"x", "y", "z"}, - }, - }) - } - - ret = append(ret, &insertFlushSyncMsg{ - flushCompleted: true, - insertBinlogPathMsg: insertBinlogPathMsg{ - ts: Timestamp(2010), - segID: segID, - }, - }) - return ret -} diff --git a/internal/datanode/interface.go b/internal/datanode/interface.go new file mode 100644 index 0000000000000000000000000000000000000000..47046f4b6c96d782f0877f4a4c6e319ea2449862 --- /dev/null +++ b/internal/datanode/interface.go @@ -0,0 +1,14 @@ +package datanode + +import ( + "github.com/zilliztech/milvus-distributed/internal/proto/datapb" +) + +type Interface interface { + Init() error + Start() error + Stop() error + + WatchDmChannels(in *datapb.WatchDmChannelRequest) error + FlushSegments(req *datapb.FlushSegRequest) error +} diff --git a/internal/datanode/meta_table.go b/internal/datanode/meta_table.go index 84f0bf23548c199ef887cc44c4fe9efe6ce0690c..f44096d6a617c23e1b250050736b8ee866d3bbf4 100644 --- a/internal/datanode/meta_table.go +++ b/internal/datanode/meta_table.go @@ -5,18 +5,16 @@ import ( "strconv" "sync" - "github.com/zilliztech/milvus-distributed/internal/util/typeutil" - "github.com/golang/protobuf/proto" "github.com/zilliztech/milvus-distributed/internal/errors" "github.com/zilliztech/milvus-distributed/internal/kv" - pb "github.com/zilliztech/milvus-distributed/internal/proto/writerpb" + "github.com/zilliztech/milvus-distributed/internal/proto/datapb" ) type metaTable struct { - client kv.TxnBase // client of a reliable kv service, i.e. etcd client - segID2FlushMeta map[UniqueID]pb.SegmentFlushMeta // segment id to flush meta - collID2DdlMeta map[UniqueID]*pb.DDLFlushMeta + client kv.TxnBase // + segID2FlushMeta map[UniqueID]*datapb.SegmentFlushMeta + collID2DdlMeta map[UniqueID]*datapb.DDLFlushMeta lock sync.RWMutex } @@ -44,7 +42,7 @@ func (mt *metaTable) AppendDDLBinlogPaths(collID UniqueID, paths []string) error _, ok := mt.collID2DdlMeta[collID] if !ok { - mt.collID2DdlMeta[collID] = &pb.DDLFlushMeta{ + mt.collID2DdlMeta[collID] = &datapb.DDLFlushMeta{ CollectionID: collID, BinlogPaths: make([]string, 0), } @@ -56,10 +54,10 @@ func (mt *metaTable) AppendDDLBinlogPaths(collID UniqueID, paths []string) error return mt.saveDDLFlushMeta(meta) } -func (mt *metaTable) AppendSegBinlogPaths(tsOpen Timestamp, segmentID UniqueID, fieldID int64, dataPaths []string) error { +func (mt *metaTable) AppendSegBinlogPaths(segmentID UniqueID, fieldID int64, dataPaths []string) error { _, ok := mt.segID2FlushMeta[segmentID] if !ok { - err := mt.addSegmentFlush(segmentID, tsOpen) + err := mt.addSegmentFlush(segmentID) if err != nil { return err } @@ -77,31 +75,30 @@ func (mt *metaTable) AppendSegBinlogPaths(tsOpen Timestamp, segmentID UniqueID, } if !found { - newField := &pb.FieldFlushMeta{ + newField := &datapb.FieldFlushMeta{ FieldID: fieldID, BinlogPaths: dataPaths, } meta.Fields = append(meta.Fields, newField) } - return mt.saveSegFlushMeta(&meta) + return mt.saveSegFlushMeta(meta) } -func (mt *metaTable) CompleteFlush(tsClose Timestamp, segmentID UniqueID) error { +func (mt *metaTable) CompleteFlush(segmentID UniqueID) error { mt.lock.Lock() defer mt.lock.Unlock() meta, ok := mt.segID2FlushMeta[segmentID] if !ok { - return errors.Errorf("segment not exists with ID = " + strconv.FormatInt(segmentID, 10)) + return errors.Errorf("segment not exists with ID = %v", segmentID) } - meta.IsClosed = true - meta.CloseTime = tsClose + meta.IsFlushed = true - return mt.saveSegFlushMeta(&meta) + return mt.saveSegFlushMeta(meta) } // metaTable.lock.Lock() before call this function -func (mt *metaTable) saveDDLFlushMeta(meta *pb.DDLFlushMeta) error { +func (mt *metaTable) saveDDLFlushMeta(meta *datapb.DDLFlushMeta) error { value := proto.MarshalTextString(meta) mt.collID2DdlMeta[meta.CollectionID] = meta @@ -111,14 +108,14 @@ func (mt *metaTable) saveDDLFlushMeta(meta *pb.DDLFlushMeta) error { } func (mt *metaTable) reloadDdlMetaFromKV() error { - mt.collID2DdlMeta = make(map[UniqueID]*pb.DDLFlushMeta) + mt.collID2DdlMeta = make(map[UniqueID]*datapb.DDLFlushMeta) _, values, err := mt.client.LoadWithPrefix(Params.DDLFlushMetaSubPath) if err != nil { return err } for _, value := range values { - ddlMeta := &pb.DDLFlushMeta{} + ddlMeta := &datapb.DDLFlushMeta{} err = proto.UnmarshalText(value, ddlMeta) if err != nil { return err @@ -129,17 +126,17 @@ func (mt *metaTable) reloadDdlMetaFromKV() error { } // metaTable.lock.Lock() before call this function -func (mt *metaTable) saveSegFlushMeta(meta *pb.SegmentFlushMeta) error { +func (mt *metaTable) saveSegFlushMeta(meta *datapb.SegmentFlushMeta) error { value := proto.MarshalTextString(meta) - mt.segID2FlushMeta[meta.SegmentID] = *meta + mt.segID2FlushMeta[meta.SegmentID] = meta prefix := path.Join(Params.SegFlushMetaSubPath, strconv.FormatInt(meta.SegmentID, 10)) return mt.client.Save(prefix, value) } func (mt *metaTable) reloadSegMetaFromKV() error { - mt.segID2FlushMeta = make(map[UniqueID]pb.SegmentFlushMeta) + mt.segID2FlushMeta = make(map[UniqueID]*datapb.SegmentFlushMeta) _, values, err := mt.client.LoadWithPrefix(Params.SegFlushMetaSubPath) if err != nil { @@ -147,8 +144,8 @@ func (mt *metaTable) reloadSegMetaFromKV() error { } for _, value := range values { - flushMeta := pb.SegmentFlushMeta{} - err = proto.UnmarshalText(value, &flushMeta) + flushMeta := &datapb.SegmentFlushMeta{} + err = proto.UnmarshalText(value, flushMeta) if err != nil { return err } @@ -158,49 +155,48 @@ func (mt *metaTable) reloadSegMetaFromKV() error { return nil } -func (mt *metaTable) addSegmentFlush(segmentID UniqueID, timestamp Timestamp) error { +func (mt *metaTable) addSegmentFlush(segmentID UniqueID) error { mt.lock.Lock() defer mt.lock.Unlock() _, ok := mt.segID2FlushMeta[segmentID] if ok { return errors.Errorf("segment already exists with ID = " + strconv.FormatInt(segmentID, 10)) } - meta := pb.SegmentFlushMeta{ - IsClosed: false, + meta := &datapb.SegmentFlushMeta{ + IsFlushed: false, SegmentID: segmentID, - OpenTime: timestamp, - } - return mt.saveSegFlushMeta(&meta) -} - -func (mt *metaTable) getFlushCloseTime(segmentID UniqueID) (Timestamp, error) { - mt.lock.RLock() - defer mt.lock.RUnlock() - meta, ok := mt.segID2FlushMeta[segmentID] - if !ok { - return typeutil.ZeroTimestamp, errors.Errorf("segment not exists with ID = " + strconv.FormatInt(segmentID, 10)) } - return meta.CloseTime, nil + return mt.saveSegFlushMeta(meta) } -func (mt *metaTable) getFlushOpenTime(segmentID UniqueID) (Timestamp, error) { - mt.lock.RLock() - defer mt.lock.RUnlock() - meta, ok := mt.segID2FlushMeta[segmentID] - if !ok { - return typeutil.ZeroTimestamp, errors.Errorf("segment not exists with ID = " + strconv.FormatInt(segmentID, 10)) - } - return meta.OpenTime, nil -} +// func (mt *metaTable) getFlushCloseTime(segmentID UniqueID) (Timestamp, error) { +// mt.lock.RLock() +// defer mt.lock.RUnlock() +// meta, ok := mt.segID2FlushMeta[segmentID] +// if !ok { +// return typeutil.ZeroTimestamp, errors.Errorf("segment not exists with ID = " + strconv.FormatInt(segmentID, 10)) +// } +// return meta.CloseTime, nil +// } + +// func (mt *metaTable) getFlushOpenTime(segmentID UniqueID) (Timestamp, error) { +// mt.lock.RLock() +// defer mt.lock.RUnlock() +// meta, ok := mt.segID2FlushMeta[segmentID] +// if !ok { +// return typeutil.ZeroTimestamp, errors.Errorf("segment not exists with ID = " + strconv.FormatInt(segmentID, 10)) +// } +// return meta.OpenTime, nil +// } func (mt *metaTable) checkFlushComplete(segmentID UniqueID) (bool, error) { mt.lock.RLock() defer mt.lock.RUnlock() meta, ok := mt.segID2FlushMeta[segmentID] if !ok { - return false, errors.Errorf("segment not exists with ID = " + strconv.FormatInt(segmentID, 10)) + return false, errors.Errorf("segment not exists with ID = %v", segmentID) } - return meta.IsClosed, nil + return meta.IsFlushed, nil } func (mt *metaTable) getSegBinlogPaths(segmentID UniqueID) (map[int64][]string, error) { @@ -208,7 +204,7 @@ func (mt *metaTable) getSegBinlogPaths(segmentID UniqueID) (map[int64][]string, defer mt.lock.RUnlock() meta, ok := mt.segID2FlushMeta[segmentID] if !ok { - return nil, errors.Errorf("segment not exists with ID = " + strconv.FormatInt(segmentID, 10)) + return nil, errors.Errorf("segment not exists with ID = %v", segmentID) } ret := make(map[int64][]string) for _, field := range meta.Fields { @@ -222,7 +218,7 @@ func (mt *metaTable) getDDLBinlogPaths(collID UniqueID) (map[UniqueID][]string, defer mt.lock.RUnlock() meta, ok := mt.collID2DdlMeta[collID] if !ok { - return nil, errors.Errorf("collection not exists with ID = " + strconv.FormatInt(collID, 10)) + return nil, errors.Errorf("collection not exists with ID = %v", collID) } ret := make(map[UniqueID][]string) ret[meta.CollectionID] = meta.BinlogPaths diff --git a/internal/datanode/meta_table_test.go b/internal/datanode/meta_table_test.go index 4ac885c0978916de2a52b38aed45dda5ed1a1e12..1844edfe2673bb868f50ada68b09489d62af413b 100644 --- a/internal/datanode/meta_table_test.go +++ b/internal/datanode/meta_table_test.go @@ -24,27 +24,15 @@ func TestMetaTable_all(t *testing.T) { assert.NoError(t, err) defer meta.client.Close() - t.Run("TestMetaTable_addSegmentFlush_and_OpenTime", func(t *testing.T) { - tsOpen := Timestamp(100) - err := meta.addSegmentFlush(101, tsOpen) + t.Run("TestMetaTable_addSegmentFlush", func(t *testing.T) { + err := meta.addSegmentFlush(101) assert.NoError(t, err) - exp, err := meta.getFlushOpenTime(101) - assert.NoError(t, err) - assert.Equal(t, tsOpen, exp) - tsOpen = Timestamp(200) - err = meta.addSegmentFlush(102, tsOpen) - assert.NoError(t, err) - exp, err = meta.getFlushOpenTime(102) + err = meta.addSegmentFlush(102) assert.NoError(t, err) - assert.Equal(t, tsOpen, exp) - tsOpen = Timestamp(200) - err = meta.addSegmentFlush(103, tsOpen) + err = meta.addSegmentFlush(103) assert.NoError(t, err) - exp, err = meta.getFlushOpenTime(103) - assert.NoError(t, err) - assert.Equal(t, tsOpen, exp) err = meta.reloadSegMetaFromKV() assert.NoError(t, err) @@ -52,8 +40,7 @@ func TestMetaTable_all(t *testing.T) { t.Run("TestMetaTable_AppendSegBinlogPaths", func(t *testing.T) { segmentID := UniqueID(201) - tsOpen := Timestamp(1000) - err := meta.addSegmentFlush(segmentID, tsOpen) + err := meta.addSegmentFlush(segmentID) assert.Nil(t, err) exp := map[int64][]string{ @@ -62,9 +49,9 @@ func TestMetaTable_all(t *testing.T) { } for fieldID, dataPaths := range exp { for _, dp := range dataPaths { - err = meta.AppendSegBinlogPaths(tsOpen, segmentID, fieldID, []string{dp}) + err = meta.AppendSegBinlogPaths(segmentID, fieldID, []string{dp}) assert.Nil(t, err) - err = meta.AppendSegBinlogPaths(tsOpen, segmentID, fieldID, []string{dp}) + err = meta.AppendSegBinlogPaths(segmentID, fieldID, []string{dp}) assert.Nil(t, err) } } @@ -99,27 +86,22 @@ func TestMetaTable_all(t *testing.T) { } }) - t.Run("TestMetaTable_CompleteFlush_and_CloseTime", func(t *testing.T) { + t.Run("TestMetaTable_CompleteFlush", func(t *testing.T) { var segmentID UniqueID = 401 - openTime := Timestamp(1000) - closeTime := Timestamp(10000) - err := meta.addSegmentFlush(segmentID, openTime) + err := meta.addSegmentFlush(segmentID) assert.NoError(t, err) ret, err := meta.checkFlushComplete(segmentID) assert.NoError(t, err) assert.Equal(t, false, ret) - meta.CompleteFlush(closeTime, segmentID) + meta.CompleteFlush(segmentID) ret, err = meta.checkFlushComplete(segmentID) assert.NoError(t, err) assert.Equal(t, true, ret) - ts, err := meta.getFlushCloseTime(segmentID) - assert.NoError(t, err) - assert.Equal(t, closeTime, ts) }) } diff --git a/internal/distributed/datanode/client.go b/internal/distributed/datanode/client.go index 1bdfcc492c5699e4e674f0f1f58fcd95ad4f0a8e..2444a932a0fe3b6a980cda66da58e1f7c77b1830 100644 --- a/internal/distributed/datanode/client.go +++ b/internal/distributed/datanode/client.go @@ -3,11 +3,14 @@ package datanode import ( "context" + "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" "github.com/zilliztech/milvus-distributed/internal/proto/datapb" + "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" ) type Client struct { ctx context.Context + // GOOSE TODO: add DataNodeClient } @@ -23,10 +26,14 @@ func (c *Client) Stop() error { panic("implement me") } -func (c *Client) WatchDmChannels(datapb.WatchDmChannelRequest, error) { +func (c *Client) GetComponentStates(empty *commonpb.Empty) (*internalpb2.ComponentStates, error) { + panic("implement me") +} + +func (c *Client) WatchDmChannels(in *datapb.WatchDmChannelRequest) error { panic("implement me") } -func (c *Client) FlushSegment() (datapb.FlushSegRequest, error) { +func (c *Client) FlushSegments(in *datapb.FlushSegRequest) error { panic("implement me") } diff --git a/internal/distributed/datanode/service.go b/internal/distributed/datanode/service.go index d3aab5f6bf460f42bca531fba48743e6bf74a90b..9a192a60fac5f2cc0d70652fce1ae36b2cdde758 100644 --- a/internal/distributed/datanode/service.go +++ b/internal/distributed/datanode/service.go @@ -1,11 +1,53 @@ package datanode import ( + "context" + "sync" + "github.com/zilliztech/milvus-distributed/internal/datanode" + "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" + "github.com/zilliztech/milvus-distributed/internal/proto/datapb" + "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" + "google.golang.org/grpc" ) type Server struct { - node datanode.Node - brpcServer *grpc.Server + node datanode.Interface + core datanode.DataNode + + grpcServer *grpc.Server + grpcError error + grpcErrMux sync.Mutex + + ctx context.Context + cancel context.CancelFunc +} + +func NewGrpcServer() (*Server, error) { + panic("implement me") +} + +func (s *Server) Init() error { + return s.core.Init() +} + +func (s *Server) Start() error { + return s.core.Start() +} + +func (s *Server) Stop() error { + return s.core.Stop() +} + +func (s *Server) GetComponentStates(ctx context.Context, empty *commonpb.Empty) (*internalpb2.ComponentStates, error) { + return nil, nil +} + +func (s *Server) WatchDmChannels(ctx context.Context, in *datapb.WatchDmChannelRequest) error { + return s.core.WatchDmChannels(in) +} + +func (s *Server) FlushSegments(ctx context.Context, in *datapb.FlushSegRequest) error { + return s.core.FlushSegments(in) } diff --git a/internal/msgstream/msg.go b/internal/msgstream/msg.go index b305ca7127f07af0059219420e74700d585ebc89..253c74d3f3fccd0538d39f9f2944ac168bdf731c 100644 --- a/internal/msgstream/msg.go +++ b/internal/msgstream/msg.go @@ -105,6 +105,7 @@ func (it *InsertMsg) Unmarshal(input []byte) (TsMsg, error) { } /////////////////////////////////////////Flush////////////////////////////////////////// +// GOOSE TODO remove this type FlushMsg struct { BaseMsg internalpb2.FlushMsg diff --git a/internal/proto/data_service.proto b/internal/proto/data_service.proto index 20415c193afb427ae552d2af172f8a5f8f52e9c5..a619af7c51115fbdbd14111c726ddfdef8026609 100644 --- a/internal/proto/data_service.proto +++ b/internal/proto/data_service.proto @@ -136,6 +136,22 @@ message SegmentMsg{ SegmentInfo segment = 2; } +message FieldFlushMeta { + int64 fieldID = 1; + repeated string binlog_paths = 2; +} + +message SegmentFlushMeta{ + int64 segmentID = 1; + bool is_flushed = 2; + repeated FieldFlushMeta fields = 5; +} + +message DDLFlushMeta { + int64 collectionID = 1; + repeated string binlog_paths = 2; +} + service DataService { rpc RegisterNode(RegisterNodeRequest) returns (RegisterNodeResponse) {} @@ -149,4 +165,4 @@ service DataService { rpc GetInsertChannels(InsertChannelRequest) returns (internal.StringList) {} -} \ No newline at end of file +} diff --git a/internal/proto/datapb/data_service.pb.go b/internal/proto/datapb/data_service.pb.go index 54fe12cbebed891bb46b4b99cf6aa853fe32e2ad..619e61eda3c73189a6a305826fd404faa4791d0a 100644 --- a/internal/proto/datapb/data_service.pb.go +++ b/internal/proto/datapb/data_service.pb.go @@ -1103,6 +1103,155 @@ func (m *SegmentMsg) GetSegment() *SegmentInfo { return nil } +type FieldFlushMeta struct { + FieldID int64 `protobuf:"varint,1,opt,name=fieldID,proto3" json:"fieldID,omitempty"` + BinlogPaths []string `protobuf:"bytes,2,rep,name=binlog_paths,json=binlogPaths,proto3" json:"binlog_paths,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *FieldFlushMeta) Reset() { *m = FieldFlushMeta{} } +func (m *FieldFlushMeta) String() string { return proto.CompactTextString(m) } +func (*FieldFlushMeta) ProtoMessage() {} +func (*FieldFlushMeta) Descriptor() ([]byte, []int) { + return fileDescriptor_3385cd32ad6cfe64, []int{18} +} + +func (m *FieldFlushMeta) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_FieldFlushMeta.Unmarshal(m, b) +} +func (m *FieldFlushMeta) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_FieldFlushMeta.Marshal(b, m, deterministic) +} +func (m *FieldFlushMeta) XXX_Merge(src proto.Message) { + xxx_messageInfo_FieldFlushMeta.Merge(m, src) +} +func (m *FieldFlushMeta) XXX_Size() int { + return xxx_messageInfo_FieldFlushMeta.Size(m) +} +func (m *FieldFlushMeta) XXX_DiscardUnknown() { + xxx_messageInfo_FieldFlushMeta.DiscardUnknown(m) +} + +var xxx_messageInfo_FieldFlushMeta proto.InternalMessageInfo + +func (m *FieldFlushMeta) GetFieldID() int64 { + if m != nil { + return m.FieldID + } + return 0 +} + +func (m *FieldFlushMeta) GetBinlogPaths() []string { + if m != nil { + return m.BinlogPaths + } + return nil +} + +type SegmentFlushMeta struct { + SegmentID int64 `protobuf:"varint,1,opt,name=segmentID,proto3" json:"segmentID,omitempty"` + IsFlushed bool `protobuf:"varint,2,opt,name=is_flushed,json=isFlushed,proto3" json:"is_flushed,omitempty"` + Fields []*FieldFlushMeta `protobuf:"bytes,5,rep,name=fields,proto3" json:"fields,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *SegmentFlushMeta) Reset() { *m = SegmentFlushMeta{} } +func (m *SegmentFlushMeta) String() string { return proto.CompactTextString(m) } +func (*SegmentFlushMeta) ProtoMessage() {} +func (*SegmentFlushMeta) Descriptor() ([]byte, []int) { + return fileDescriptor_3385cd32ad6cfe64, []int{19} +} + +func (m *SegmentFlushMeta) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_SegmentFlushMeta.Unmarshal(m, b) +} +func (m *SegmentFlushMeta) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_SegmentFlushMeta.Marshal(b, m, deterministic) +} +func (m *SegmentFlushMeta) XXX_Merge(src proto.Message) { + xxx_messageInfo_SegmentFlushMeta.Merge(m, src) +} +func (m *SegmentFlushMeta) XXX_Size() int { + return xxx_messageInfo_SegmentFlushMeta.Size(m) +} +func (m *SegmentFlushMeta) XXX_DiscardUnknown() { + xxx_messageInfo_SegmentFlushMeta.DiscardUnknown(m) +} + +var xxx_messageInfo_SegmentFlushMeta proto.InternalMessageInfo + +func (m *SegmentFlushMeta) GetSegmentID() int64 { + if m != nil { + return m.SegmentID + } + return 0 +} + +func (m *SegmentFlushMeta) GetIsFlushed() bool { + if m != nil { + return m.IsFlushed + } + return false +} + +func (m *SegmentFlushMeta) GetFields() []*FieldFlushMeta { + if m != nil { + return m.Fields + } + return nil +} + +type DDLFlushMeta struct { + CollectionID int64 `protobuf:"varint,1,opt,name=collectionID,proto3" json:"collectionID,omitempty"` + BinlogPaths []string `protobuf:"bytes,2,rep,name=binlog_paths,json=binlogPaths,proto3" json:"binlog_paths,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *DDLFlushMeta) Reset() { *m = DDLFlushMeta{} } +func (m *DDLFlushMeta) String() string { return proto.CompactTextString(m) } +func (*DDLFlushMeta) ProtoMessage() {} +func (*DDLFlushMeta) Descriptor() ([]byte, []int) { + return fileDescriptor_3385cd32ad6cfe64, []int{20} +} + +func (m *DDLFlushMeta) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_DDLFlushMeta.Unmarshal(m, b) +} +func (m *DDLFlushMeta) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_DDLFlushMeta.Marshal(b, m, deterministic) +} +func (m *DDLFlushMeta) XXX_Merge(src proto.Message) { + xxx_messageInfo_DDLFlushMeta.Merge(m, src) +} +func (m *DDLFlushMeta) XXX_Size() int { + return xxx_messageInfo_DDLFlushMeta.Size(m) +} +func (m *DDLFlushMeta) XXX_DiscardUnknown() { + xxx_messageInfo_DDLFlushMeta.DiscardUnknown(m) +} + +var xxx_messageInfo_DDLFlushMeta proto.InternalMessageInfo + +func (m *DDLFlushMeta) GetCollectionID() int64 { + if m != nil { + return m.CollectionID + } + return 0 +} + +func (m *DDLFlushMeta) GetBinlogPaths() []string { + if m != nil { + return m.BinlogPaths + } + return nil +} + func init() { proto.RegisterEnum("milvus.proto.data.SegmentState", SegmentState_name, SegmentState_value) proto.RegisterType((*RegisterNodeRequest)(nil), "milvus.proto.data.RegisterNodeRequest") @@ -1123,86 +1272,95 @@ func init() { proto.RegisterType((*FlushSegRequest)(nil), "milvus.proto.data.FlushSegRequest") proto.RegisterType((*SegmentInfo)(nil), "milvus.proto.data.SegmentInfo") proto.RegisterType((*SegmentMsg)(nil), "milvus.proto.data.SegmentMsg") + proto.RegisterType((*FieldFlushMeta)(nil), "milvus.proto.data.FieldFlushMeta") + proto.RegisterType((*SegmentFlushMeta)(nil), "milvus.proto.data.SegmentFlushMeta") + proto.RegisterType((*DDLFlushMeta)(nil), "milvus.proto.data.DDLFlushMeta") } func init() { proto.RegisterFile("data_service.proto", fileDescriptor_3385cd32ad6cfe64) } var fileDescriptor_3385cd32ad6cfe64 = []byte{ - // 1169 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xbc, 0x56, 0xdd, 0x6e, 0x1b, 0xc5, - 0x17, 0x8f, 0xbf, 0x12, 0xfb, 0xd8, 0xb1, 0x9d, 0x49, 0xfa, 0xff, 0xbb, 0x6e, 0x68, 0xc2, 0x4a, - 0x6d, 0xa2, 0x0a, 0x12, 0x94, 0xaa, 0xc0, 0x15, 0xa2, 0xc1, 0x25, 0xb2, 0x20, 0x51, 0x34, 0x46, - 0x42, 0x2a, 0x17, 0xd6, 0xda, 0x7b, 0xb2, 0x1e, 0x69, 0x77, 0x66, 0xd9, 0x19, 0x37, 0x51, 0x6e, - 0xe0, 0x01, 0xfa, 0x04, 0x70, 0xcf, 0x25, 0x8f, 0x00, 0xaf, 0xc0, 0x23, 0xa1, 0x9d, 0x59, 0xaf, - 0x77, 0xfd, 0x51, 0x47, 0x81, 0x72, 0xb7, 0x73, 0xf6, 0x37, 0xe7, 0xeb, 0x77, 0x3e, 0x06, 0x88, - 0x63, 0x2b, 0xbb, 0x2f, 0x31, 0x7c, 0xc3, 0x86, 0x78, 0x14, 0x84, 0x42, 0x09, 0xb2, 0xe5, 0x33, - 0xef, 0xcd, 0x58, 0x9a, 0xd3, 0x51, 0x04, 0x68, 0xd7, 0x86, 0xc2, 0xf7, 0x05, 0x37, 0xa2, 0x76, - 0x9d, 0x71, 0x85, 0x21, 0xb7, 0x3d, 0x73, 0xb6, 0x7e, 0x82, 0x6d, 0x8a, 0x2e, 0x93, 0x0a, 0xc3, - 0x0b, 0xe1, 0x20, 0xc5, 0x1f, 0xc7, 0x28, 0x15, 0xf9, 0x04, 0x8a, 0x03, 0x5b, 0x62, 0x2b, 0xb7, - 0x9f, 0x3b, 0xac, 0x9e, 0xec, 0x1e, 0x65, 0xd4, 0xc6, 0x0a, 0xcf, 0xa5, 0x7b, 0x6a, 0x4b, 0xa4, - 0x1a, 0x49, 0x3e, 0x85, 0x0d, 0xdb, 0x71, 0x42, 0x94, 0xb2, 0x95, 0x7f, 0xc7, 0xa5, 0x97, 0x06, - 0x43, 0x27, 0x60, 0xeb, 0x35, 0xec, 0x64, 0x1d, 0x90, 0x81, 0xe0, 0x12, 0xc9, 0x29, 0x54, 0x19, - 0x67, 0xaa, 0x1f, 0xd8, 0xa1, 0xed, 0xcb, 0xd8, 0x91, 0x0f, 0xb3, 0x3a, 0x93, 0x58, 0xba, 0x9c, - 0xa9, 0x4b, 0x0d, 0xa4, 0xc0, 0x92, 0x6f, 0xeb, 0xaf, 0x1c, 0xd4, 0x7a, 0xe8, 0x76, 0x3b, 0x93, - 0xb0, 0x76, 0xa0, 0x34, 0x14, 0x63, 0xae, 0xb4, 0xba, 0x4d, 0x6a, 0x0e, 0x64, 0x1f, 0xaa, 0xc3, - 0x91, 0xcd, 0x39, 0x7a, 0x17, 0xb6, 0x8f, 0xda, 0xfd, 0x0a, 0x4d, 0x8b, 0x88, 0x05, 0xb5, 0xa1, - 0xf0, 0x3c, 0x1c, 0x2a, 0x26, 0x78, 0xb7, 0xd3, 0x2a, 0xec, 0xe7, 0x0e, 0x0b, 0x34, 0x23, 0x8b, - 0xb4, 0x04, 0x76, 0xa8, 0x58, 0x0c, 0x29, 0x6a, 0x48, 0x5a, 0x44, 0x1e, 0x41, 0x25, 0xba, 0xd1, - 0xe7, 0x91, 0x95, 0x92, 0xb6, 0x52, 0x8e, 0x04, 0xda, 0xc4, 0x13, 0xa8, 0x27, 0x58, 0x83, 0x58, - 0xd7, 0x88, 0xcd, 0x44, 0x1a, 0xc1, 0xac, 0xdf, 0x73, 0x40, 0x5e, 0x4a, 0xc9, 0x5c, 0x9e, 0x09, - 0xec, 0x7f, 0xb0, 0xce, 0x85, 0x83, 0xdd, 0x8e, 0x8e, 0xac, 0x40, 0xe3, 0x53, 0x64, 0x32, 0x40, - 0x0c, 0xfb, 0xa1, 0xf0, 0x26, 0x81, 0x95, 0x23, 0x01, 0x15, 0x1e, 0x92, 0x5d, 0xa8, 0x28, 0xe6, - 0xa3, 0x54, 0xb6, 0x1f, 0xe8, 0x90, 0x8a, 0x74, 0x2a, 0x20, 0xaf, 0x60, 0x53, 0xa6, 0x4c, 0xc8, - 0x56, 0x71, 0xbf, 0x70, 0x58, 0x3d, 0xd9, 0x3b, 0x9a, 0x2b, 0xb1, 0xa3, 0xb4, 0x2b, 0x34, 0x7b, - 0xcb, 0xfa, 0x33, 0x0f, 0x0d, 0xfd, 0xdf, 0x78, 0xed, 0x23, 0xd7, 0x34, 0x68, 0x50, 0xec, 0xac, - 0x39, 0xdc, 0x81, 0x86, 0x84, 0xbe, 0x42, 0x9a, 0xbe, 0x59, 0x72, 0x8a, 0xab, 0xc9, 0x29, 0xcd, - 0x93, 0xb3, 0x07, 0x55, 0xbc, 0x09, 0x58, 0x88, 0xfd, 0x28, 0x05, 0x3a, 0xf9, 0x45, 0x0a, 0x46, - 0xf4, 0x1d, 0xf3, 0x91, 0x3c, 0x87, 0x75, 0xa9, 0x6c, 0x35, 0x96, 0xad, 0x0d, 0x5d, 0x8b, 0x8f, - 0x16, 0xd6, 0x77, 0x4f, 0x43, 0x68, 0x0c, 0xcd, 0x52, 0x5e, 0x5e, 0x49, 0x79, 0x65, 0x11, 0xe5, - 0xbf, 0xe4, 0x60, 0x3b, 0x43, 0x79, 0xdc, 0x21, 0x17, 0xd0, 0x94, 0xd9, 0xc4, 0x46, 0x6d, 0x12, - 0x71, 0x64, 0x2d, 0xe3, 0x68, 0x0a, 0xa5, 0x73, 0x77, 0x53, 0x01, 0xe6, 0xef, 0x1c, 0xa0, 0x75, - 0x03, 0xb5, 0xaf, 0xbd, 0xb1, 0x1c, 0xdd, 0x7f, 0x70, 0x10, 0x28, 0x3a, 0x83, 0x6e, 0x47, 0x1b, - 0x2d, 0x50, 0xfd, 0x7d, 0x17, 0x4a, 0xad, 0xb7, 0x39, 0x20, 0xbd, 0x91, 0xb8, 0xee, 0xa1, 0xab, - 0x03, 0xba, 0xb7, 0x03, 0xb3, 0xc6, 0xf2, 0xab, 0xeb, 0xa7, 0x30, 0x57, 0x3f, 0xd6, 0x0b, 0xd8, - 0xce, 0x78, 0x13, 0x93, 0xf4, 0x18, 0x40, 0x1a, 0x51, 0xb7, 0x63, 0xe8, 0x29, 0xd0, 0x94, 0xc4, - 0xba, 0x82, 0x9d, 0xf8, 0x4a, 0x94, 0x58, 0x94, 0xf7, 0x0f, 0x63, 0x17, 0x2a, 0x89, 0xde, 0x38, - 0x86, 0xa9, 0xc0, 0xfa, 0x2d, 0x0f, 0x0f, 0x66, 0x0c, 0xc5, 0x1e, 0xbe, 0x80, 0x52, 0xc4, 0xa5, - 0x31, 0x55, 0x5f, 0xd6, 0xdf, 0xc9, 0x45, 0x6a, 0xd0, 0x51, 0xbf, 0x0c, 0x43, 0xb4, 0x55, 0xdc, - 0x2f, 0x79, 0xd3, 0x2f, 0x46, 0xa4, 0xfb, 0x65, 0x0f, 0xaa, 0x12, 0x6d, 0x0f, 0x1d, 0x03, 0x30, - 0xf3, 0x05, 0x8c, 0x48, 0x03, 0xbe, 0x81, 0x86, 0x54, 0x76, 0xa8, 0xfa, 0x81, 0x90, 0x3a, 0x8b, - 0x93, 0x11, 0x63, 0x2d, 0x99, 0xf2, 0xe7, 0xd2, 0xbd, 0x8c, 0xa1, 0xb4, 0xae, 0xaf, 0x4e, 0x8e, - 0x92, 0x9c, 0xc1, 0x26, 0x72, 0x27, 0xa5, 0xaa, 0x74, 0x67, 0x55, 0x35, 0xe4, 0x4e, 0xa2, 0xc8, - 0x62, 0xf0, 0xff, 0x2e, 0x97, 0x18, 0xaa, 0x53, 0xc6, 0x3d, 0xe1, 0x5e, 0xda, 0x6a, 0xf4, 0xbe, - 0x38, 0x09, 0xe0, 0xe1, 0xac, 0xa9, 0x29, 0x2d, 0x6d, 0x28, 0x5f, 0x31, 0xf4, 0x9c, 0x69, 0xd9, - 0x24, 0x67, 0xf2, 0x19, 0x94, 0x82, 0x08, 0xdc, 0xca, 0xeb, 0x20, 0x97, 0x6d, 0xc5, 0x9e, 0x0a, - 0x19, 0x77, 0xbf, 0x65, 0x52, 0x51, 0x83, 0xb7, 0x7e, 0xce, 0xc1, 0x8e, 0x31, 0xf9, 0x95, 0x19, - 0xab, 0xef, 0xb7, 0x6d, 0x17, 0xac, 0x49, 0xcb, 0x87, 0x07, 0xdf, 0xdb, 0x6a, 0x38, 0xea, 0xf8, - 0xff, 0xd8, 0x85, 0xc8, 0xdc, 0x74, 0x3b, 0x98, 0x6c, 0x54, 0x68, 0x46, 0x66, 0xfd, 0x9a, 0x83, - 0x86, 0x1e, 0x50, 0x3d, 0x74, 0xff, 0xf3, 0x60, 0x67, 0xba, 0xbf, 0x38, 0xd7, 0xfd, 0x6f, 0xf3, - 0x50, 0x8d, 0x9b, 0xab, 0xcb, 0xaf, 0x44, 0xb6, 0x5e, 0x72, 0x33, 0xf5, 0xf2, 0xef, 0x0c, 0x2a, - 0x72, 0x00, 0x0d, 0xa6, 0x4b, 0xa0, 0x1f, 0x27, 0xca, 0x38, 0x56, 0xa1, 0x75, 0x96, 0xae, 0x0c, - 0xbd, 0xbb, 0x44, 0x80, 0xdc, 0xb4, 0x6f, 0x49, 0xb7, 0x6f, 0x39, 0x12, 0xe8, 0xe6, 0xfd, 0x00, - 0x60, 0xe8, 0x09, 0x99, 0xd9, 0x96, 0x15, 0x2d, 0xd1, 0xbf, 0x1f, 0x42, 0x99, 0x8f, 0xfd, 0x7e, - 0x28, 0xae, 0xcd, 0xba, 0x2c, 0xd0, 0x0d, 0x3e, 0xf6, 0xa9, 0xb8, 0x96, 0xd1, 0x2f, 0x1f, 0xfd, - 0xbe, 0x64, 0xb7, 0x66, 0x23, 0x16, 0xe8, 0x86, 0x8f, 0x7e, 0x8f, 0xdd, 0xa2, 0x75, 0x03, 0x10, - 0x67, 0xe3, 0x5c, 0xba, 0xf7, 0xa0, 0xe9, 0x73, 0xd8, 0x88, 0xb3, 0x15, 0xaf, 0xb0, 0xc7, 0xcb, - 0x87, 0x59, 0x94, 0x6f, 0x3a, 0x81, 0x3f, 0xfb, 0x41, 0x3f, 0x14, 0x93, 0x21, 0x47, 0x1a, 0x09, - 0x2f, 0x17, 0x82, 0x63, 0x73, 0x8d, 0x6c, 0xeb, 0x57, 0x8c, 0x11, 0xa8, 0x57, 0x37, 0x4c, 0xaa, - 0x66, 0x8e, 0x10, 0xa8, 0xc7, 0xc2, 0xb3, 0x50, 0x5c, 0x33, 0xee, 0x36, 0xf3, 0x64, 0x0b, 0x36, - 0x27, 0x9a, 0xf4, 0xa8, 0x6b, 0x16, 0x4e, 0xfe, 0x28, 0x41, 0xb5, 0x63, 0x2b, 0xbb, 0x67, 0x9e, - 0xea, 0xc4, 0x86, 0x5a, 0xfa, 0xc9, 0x4b, 0x9e, 0x2e, 0xf0, 0x72, 0xc1, 0xa3, 0xbc, 0x7d, 0xb0, - 0x12, 0x67, 0x66, 0x87, 0xb5, 0x46, 0xce, 0xa0, 0xa4, 0xab, 0x9e, 0x2c, 0x1a, 0xe7, 0xe9, 0x85, - 0xdd, 0x7e, 0xd7, 0x96, 0xb7, 0xd6, 0xc8, 0x00, 0x1a, 0xc9, 0xdb, 0x23, 0x2e, 0xc3, 0x27, 0x0b, - 0x54, 0xce, 0x3f, 0x49, 0xdb, 0x4f, 0x57, 0xc1, 0x12, 0x67, 0xfb, 0x50, 0x4b, 0xad, 0x4e, 0xb9, - 0xd0, 0xc0, 0xfc, 0xa6, 0x5f, 0x68, 0x60, 0xc1, 0x0a, 0xb6, 0xd6, 0x88, 0x0b, 0xcd, 0x33, 0x54, - 0x99, 0xf5, 0x47, 0x0e, 0x56, 0xec, 0xb9, 0xc9, 0x26, 0x6e, 0x1f, 0xae, 0x06, 0x26, 0x86, 0x42, - 0xd8, 0x39, 0x43, 0x35, 0x37, 0xd4, 0xc9, 0xb3, 0x05, 0x3a, 0x96, 0x6c, 0x99, 0xf6, 0x47, 0x77, - 0xc0, 0xa6, 0x6d, 0xda, 0xb0, 0x95, 0xd8, 0x4c, 0x7a, 0xf7, 0x60, 0xa9, 0x92, 0xec, 0xd4, 0x6d, - 0xaf, 0xde, 0x1d, 0xd6, 0xda, 0xe9, 0x97, 0xaf, 0xbf, 0x70, 0x99, 0x1a, 0x8d, 0x07, 0x51, 0x79, - 0x1c, 0xdf, 0x32, 0xcf, 0x63, 0xb7, 0x0a, 0x87, 0xa3, 0x63, 0x73, 0xf7, 0x63, 0x87, 0x49, 0x15, - 0xb2, 0xc1, 0x58, 0xa1, 0x73, 0x3c, 0xd1, 0x70, 0xac, 0x15, 0x1e, 0x47, 0x96, 0x83, 0xc1, 0x60, - 0x5d, 0x9f, 0x9e, 0xff, 0x1d, 0x00, 0x00, 0xff, 0xff, 0x45, 0xdf, 0xe6, 0xdf, 0xb4, 0x0e, 0x00, - 0x00, + // 1267 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xbc, 0x57, 0xdb, 0x6e, 0x1b, 0x45, + 0x18, 0xce, 0xfa, 0x90, 0xd8, 0xbf, 0x1d, 0xdb, 0x9d, 0xa4, 0xe0, 0xba, 0xa7, 0x74, 0xa4, 0x36, + 0x51, 0x05, 0x09, 0x6a, 0x55, 0x0e, 0x37, 0x88, 0x06, 0xb7, 0x91, 0x45, 0x13, 0x55, 0x63, 0x10, + 0x52, 0xb9, 0xb0, 0xd6, 0xde, 0xc9, 0x7a, 0xa4, 0xdd, 0xd9, 0x65, 0x67, 0xdc, 0x44, 0xb9, 0x81, + 0x6b, 0xd4, 0x27, 0x80, 0x7b, 0x2e, 0x79, 0x04, 0x78, 0x05, 0x1e, 0x09, 0xed, 0xcc, 0x78, 0xbd, + 0x6b, 0xaf, 0xe3, 0x28, 0x50, 0xee, 0x3c, 0xbf, 0xbf, 0xf9, 0x0f, 0xf3, 0xfd, 0xa7, 0x05, 0xe4, + 0xd8, 0xd2, 0x1e, 0x08, 0x1a, 0xbd, 0x65, 0x23, 0xba, 0x1f, 0x46, 0x81, 0x0c, 0xd0, 0x0d, 0x9f, + 0x79, 0x6f, 0x27, 0x42, 0x9f, 0xf6, 0x63, 0x40, 0xa7, 0x3e, 0x0a, 0x7c, 0x3f, 0xe0, 0x5a, 0xd4, + 0x69, 0x30, 0x2e, 0x69, 0xc4, 0x6d, 0x4f, 0x9f, 0xf1, 0x4f, 0xb0, 0x45, 0xa8, 0xcb, 0x84, 0xa4, + 0xd1, 0x49, 0xe0, 0x50, 0x42, 0x7f, 0x9c, 0x50, 0x21, 0xd1, 0x27, 0x50, 0x1a, 0xda, 0x82, 0xb6, + 0xad, 0x1d, 0x6b, 0xaf, 0xf6, 0xe4, 0xce, 0x7e, 0x46, 0xad, 0x51, 0x78, 0x2c, 0xdc, 0x43, 0x5b, + 0x50, 0xa2, 0x90, 0xe8, 0x53, 0xd8, 0xb0, 0x1d, 0x27, 0xa2, 0x42, 0xb4, 0x0b, 0x97, 0x5c, 0x7a, + 0xae, 0x31, 0x64, 0x0a, 0xc6, 0x6f, 0x60, 0x3b, 0xeb, 0x80, 0x08, 0x03, 0x2e, 0x28, 0x3a, 0x84, + 0x1a, 0xe3, 0x4c, 0x0e, 0x42, 0x3b, 0xb2, 0x7d, 0x61, 0x1c, 0x79, 0x90, 0xd5, 0x99, 0xc4, 0xd2, + 0xe3, 0x4c, 0xbe, 0x56, 0x40, 0x02, 0x2c, 0xf9, 0x8d, 0xff, 0xb6, 0xa0, 0xde, 0xa7, 0x6e, 0xaf, + 0x3b, 0x0d, 0x6b, 0x1b, 0xca, 0xa3, 0x60, 0xc2, 0xa5, 0x52, 0xb7, 0x49, 0xf4, 0x01, 0xed, 0x40, + 0x6d, 0x34, 0xb6, 0x39, 0xa7, 0xde, 0x89, 0xed, 0x53, 0xe5, 0x7e, 0x95, 0xa4, 0x45, 0x08, 0x43, + 0x7d, 0x14, 0x78, 0x1e, 0x1d, 0x49, 0x16, 0xf0, 0x5e, 0xb7, 0x5d, 0xdc, 0xb1, 0xf6, 0x8a, 0x24, + 0x23, 0x8b, 0xb5, 0x84, 0x76, 0x24, 0x99, 0x81, 0x94, 0x14, 0x24, 0x2d, 0x42, 0xb7, 0xa1, 0x1a, + 0xdf, 0x18, 0xf0, 0xd8, 0x4a, 0x59, 0x59, 0xa9, 0xc4, 0x02, 0x65, 0xe2, 0x21, 0x34, 0x12, 0xac, + 0x46, 0xac, 0x2b, 0xc4, 0x66, 0x22, 0x8d, 0x61, 0xf8, 0x0f, 0x0b, 0xd0, 0x73, 0x21, 0x98, 0xcb, + 0x33, 0x81, 0x7d, 0x00, 0xeb, 0x3c, 0x70, 0x68, 0xaf, 0xab, 0x22, 0x2b, 0x12, 0x73, 0x8a, 0x4d, + 0x86, 0x94, 0x46, 0x83, 0x28, 0xf0, 0xa6, 0x81, 0x55, 0x62, 0x01, 0x09, 0x3c, 0x8a, 0xee, 0x40, + 0x55, 0x32, 0x9f, 0x0a, 0x69, 0xfb, 0xa1, 0x0a, 0xa9, 0x44, 0x66, 0x02, 0xf4, 0x02, 0x36, 0x45, + 0xca, 0x84, 0x68, 0x97, 0x76, 0x8a, 0x7b, 0xb5, 0x27, 0xf7, 0xf7, 0x17, 0x52, 0x6c, 0x3f, 0xed, + 0x0a, 0xc9, 0xde, 0xc2, 0x7f, 0x15, 0xa0, 0xa9, 0xfe, 0xd7, 0x5e, 0xfb, 0x94, 0x2b, 0x1a, 0x14, + 0xc8, 0x38, 0xab, 0x0f, 0x57, 0xa0, 0x21, 0xa1, 0xaf, 0x98, 0xa6, 0x6f, 0x9e, 0x9c, 0xd2, 0x6a, + 0x72, 0xca, 0x8b, 0xe4, 0xdc, 0x87, 0x1a, 0x3d, 0x0f, 0x59, 0x44, 0x07, 0xf1, 0x13, 0xa8, 0xc7, + 0x2f, 0x11, 0xd0, 0xa2, 0x6f, 0x99, 0x4f, 0xd1, 0x53, 0x58, 0x17, 0xd2, 0x96, 0x13, 0xd1, 0xde, + 0x50, 0xb9, 0x78, 0x3b, 0x37, 0xbf, 0xfb, 0x0a, 0x42, 0x0c, 0x34, 0x4b, 0x79, 0x65, 0x25, 0xe5, + 0xd5, 0x3c, 0xca, 0x7f, 0xb5, 0x60, 0x2b, 0x43, 0xb9, 0xa9, 0x90, 0x13, 0x68, 0x89, 0xec, 0xc3, + 0xc6, 0x65, 0x12, 0x73, 0x84, 0x97, 0x71, 0x34, 0x83, 0x92, 0x85, 0xbb, 0xa9, 0x00, 0x0b, 0x57, + 0x0e, 0x10, 0x9f, 0x43, 0xfd, 0xa5, 0x37, 0x11, 0xe3, 0xeb, 0x37, 0x0e, 0x04, 0x25, 0x67, 0xd8, + 0xeb, 0x2a, 0xa3, 0x45, 0xa2, 0x7e, 0x5f, 0x85, 0x52, 0xfc, 0xce, 0x02, 0xd4, 0x1f, 0x07, 0x67, + 0x7d, 0xea, 0xaa, 0x80, 0xae, 0xed, 0xc0, 0xbc, 0xb1, 0xc2, 0xea, 0xfc, 0x29, 0x2e, 0xe4, 0x0f, + 0x7e, 0x06, 0x5b, 0x19, 0x6f, 0x0c, 0x49, 0xf7, 0x00, 0x84, 0x16, 0xf5, 0xba, 0x9a, 0x9e, 0x22, + 0x49, 0x49, 0xf0, 0x29, 0x6c, 0x9b, 0x2b, 0xf1, 0xc3, 0x52, 0x71, 0xfd, 0x30, 0xee, 0x40, 0x35, + 0xd1, 0x6b, 0x62, 0x98, 0x09, 0xf0, 0xef, 0x05, 0xb8, 0x39, 0x67, 0xc8, 0x78, 0xf8, 0x0c, 0xca, + 0x31, 0x97, 0xda, 0x54, 0x63, 0x59, 0x7d, 0x27, 0x17, 0x89, 0x46, 0xc7, 0xf5, 0x32, 0x8a, 0xa8, + 0x2d, 0x4d, 0xbd, 0x14, 0x74, 0xbd, 0x68, 0x91, 0xaa, 0x97, 0xfb, 0x50, 0x13, 0xd4, 0xf6, 0xa8, + 0xa3, 0x01, 0xba, 0xbf, 0x80, 0x16, 0x29, 0xc0, 0x37, 0xd0, 0x14, 0xd2, 0x8e, 0xe4, 0x20, 0x0c, + 0x84, 0x7a, 0xc5, 0x69, 0x8b, 0xc1, 0x4b, 0xba, 0xfc, 0xb1, 0x70, 0x5f, 0x1b, 0x28, 0x69, 0xa8, + 0xab, 0xd3, 0xa3, 0x40, 0x47, 0xb0, 0x49, 0xb9, 0x93, 0x52, 0x55, 0xbe, 0xb2, 0xaa, 0x3a, 0xe5, + 0x4e, 0xa2, 0x08, 0x33, 0xf8, 0xb0, 0xc7, 0x05, 0x8d, 0xe4, 0x21, 0xe3, 0x5e, 0xe0, 0xbe, 0xb6, + 0xe5, 0xf8, 0x7d, 0x71, 0x12, 0xc2, 0xad, 0x79, 0x53, 0x33, 0x5a, 0x3a, 0x50, 0x39, 0x65, 0xd4, + 0x73, 0x66, 0x69, 0x93, 0x9c, 0xd1, 0x67, 0x50, 0x0e, 0x63, 0x70, 0xbb, 0xa0, 0x82, 0x5c, 0x36, + 0x15, 0xfb, 0x32, 0x62, 0xdc, 0x7d, 0xc5, 0x84, 0x24, 0x1a, 0x8f, 0x7f, 0xb6, 0x60, 0x5b, 0x9b, + 0xfc, 0x5a, 0xb7, 0xd5, 0xf7, 0x5b, 0xb6, 0x39, 0x63, 0x12, 0xfb, 0x70, 0xf3, 0x7b, 0x5b, 0x8e, + 0xc6, 0x5d, 0xff, 0x5f, 0xbb, 0x10, 0x9b, 0x9b, 0x4d, 0x07, 0xfd, 0x1a, 0x55, 0x92, 0x91, 0xe1, + 0xdf, 0x2c, 0x68, 0xaa, 0x06, 0xd5, 0xa7, 0xee, 0xff, 0x1e, 0xec, 0x5c, 0xf5, 0x97, 0x16, 0xaa, + 0xff, 0x5d, 0x01, 0x6a, 0xa6, 0xb8, 0x7a, 0xfc, 0x34, 0xc8, 0xe6, 0x8b, 0x35, 0x97, 0x2f, 0xff, + 0x4d, 0xa3, 0x42, 0xbb, 0xd0, 0x64, 0x2a, 0x05, 0x06, 0xe6, 0xa1, 0xb4, 0x63, 0x55, 0xd2, 0x60, + 0xe9, 0xcc, 0x50, 0xb3, 0x2b, 0x08, 0x29, 0xd7, 0xe5, 0x5b, 0x56, 0xe5, 0x5b, 0x89, 0x05, 0xaa, + 0x78, 0xef, 0x02, 0x8c, 0xbc, 0x40, 0x64, 0xa6, 0x65, 0x55, 0x49, 0xd4, 0xdf, 0xb7, 0xa0, 0xc2, + 0x27, 0xfe, 0x20, 0x0a, 0xce, 0xf4, 0xb8, 0x2c, 0x92, 0x0d, 0x3e, 0xf1, 0x49, 0x70, 0x26, 0xe2, + 0xbf, 0x7c, 0xea, 0x0f, 0x04, 0xbb, 0xd0, 0x13, 0xb1, 0x48, 0x36, 0x7c, 0xea, 0xf7, 0xd9, 0x05, + 0xc5, 0xe7, 0x00, 0xe6, 0x35, 0x8e, 0x85, 0x7b, 0x0d, 0x9a, 0x3e, 0x87, 0x0d, 0xf3, 0x5a, 0x66, + 0x84, 0xdd, 0x5b, 0xde, 0xcc, 0xe2, 0xf7, 0x26, 0x53, 0x38, 0x3e, 0x86, 0xc6, 0xcb, 0xb8, 0xba, + 0x54, 0xaa, 0x1c, 0x53, 0x69, 0xa3, 0x36, 0x6c, 0x98, 0x7a, 0x33, 0x44, 0x4c, 0x8f, 0xe8, 0x01, + 0xd4, 0x87, 0xaa, 0x60, 0x07, 0xb3, 0x22, 0xac, 0x92, 0xda, 0x70, 0x56, 0xc4, 0xf8, 0x17, 0x0b, + 0x5a, 0xc6, 0xce, 0x4c, 0xe3, 0xe5, 0xe4, 0xde, 0x05, 0x60, 0x62, 0x70, 0x1a, 0xa3, 0xa9, 0xa3, + 0xdc, 0xaf, 0x90, 0x2a, 0x13, 0x2f, 0xb5, 0x00, 0x7d, 0x01, 0xeb, 0xca, 0xfe, 0xb4, 0xb1, 0x3d, + 0xc8, 0x89, 0x2c, 0x1b, 0x01, 0x31, 0x17, 0xf0, 0x77, 0x50, 0xef, 0x76, 0x5f, 0xcd, 0xfc, 0x98, + 0x4f, 0x23, 0x2b, 0x27, 0x8d, 0x56, 0xc7, 0xf8, 0xf8, 0x07, 0xb5, 0x5b, 0x27, 0x73, 0x01, 0x35, + 0x93, 0x54, 0x3e, 0x09, 0x38, 0x6d, 0xad, 0xa1, 0x2d, 0xb5, 0xf8, 0x69, 0x81, 0x7c, 0x71, 0xce, + 0x84, 0x6c, 0x59, 0x08, 0x41, 0xc3, 0x08, 0x8f, 0xa2, 0xe0, 0x8c, 0x71, 0xb7, 0x55, 0x40, 0x37, + 0x60, 0x73, 0xaa, 0x49, 0x4d, 0x87, 0x56, 0xf1, 0xc9, 0x9f, 0x65, 0xa8, 0x75, 0x6d, 0x69, 0xf7, + 0xf5, 0xd7, 0x0d, 0xb2, 0xa1, 0x9e, 0xfe, 0x4a, 0x40, 0x8f, 0x72, 0xc2, 0xcf, 0xf9, 0x8e, 0xe9, + 0xec, 0xae, 0xc4, 0xe9, 0x76, 0x8b, 0xd7, 0xd0, 0x11, 0x94, 0xd5, 0x1b, 0xa1, 0xbc, 0x09, 0x98, + 0xde, 0x71, 0x3a, 0x97, 0x2d, 0x46, 0x78, 0x0d, 0x0d, 0xa1, 0x99, 0xac, 0x6b, 0x86, 0xdc, 0x87, + 0x39, 0x2a, 0x17, 0xb7, 0xf8, 0xce, 0xa3, 0x55, 0xb0, 0xc4, 0xd9, 0x01, 0xd4, 0x53, 0xdb, 0x86, + 0xc8, 0x35, 0xb0, 0xb8, 0x1c, 0xe5, 0x1a, 0xc8, 0xd9, 0x5a, 0xf0, 0x1a, 0x72, 0xa1, 0x75, 0x44, + 0x65, 0x66, 0x63, 0x40, 0xbb, 0x2b, 0x56, 0x83, 0xe9, 0xf2, 0xd2, 0xd9, 0x5b, 0x0d, 0x4c, 0x0c, + 0x45, 0xb0, 0x7d, 0x44, 0xe5, 0xc2, 0x1c, 0x44, 0x8f, 0x73, 0x74, 0x2c, 0x19, 0xcc, 0x9d, 0x8f, + 0xae, 0x80, 0x4d, 0xdb, 0xb4, 0xe1, 0x46, 0x62, 0x33, 0x69, 0x77, 0xbb, 0x4b, 0x95, 0x64, 0x07, + 0x55, 0x67, 0xf5, 0xb8, 0xc5, 0x6b, 0x87, 0x5f, 0xbd, 0xf9, 0xd2, 0x65, 0x72, 0x3c, 0x19, 0xc6, + 0xe9, 0x71, 0x70, 0xc1, 0x3c, 0x8f, 0x5d, 0x48, 0x3a, 0x1a, 0x1f, 0xe8, 0xbb, 0x1f, 0x3b, 0x4c, + 0xc8, 0x88, 0x0d, 0x27, 0x92, 0x3a, 0x07, 0x53, 0x0d, 0x07, 0x4a, 0xe1, 0x41, 0x6c, 0x39, 0x1c, + 0x0e, 0xd7, 0xd5, 0xe9, 0xe9, 0x3f, 0x01, 0x00, 0x00, 0xff, 0xff, 0xd9, 0xa6, 0xbb, 0x19, 0xe7, + 0x0f, 0x00, 0x00, } // Reference imports to suppress errors if they are not otherwise used. diff --git a/internal/proto/write_node.proto b/internal/proto/write_node.proto index 1a173928f269112b3e0bc8507e0f5effdc5a6e8b..56853c480fca09c76af1b39ea57333e79bcd6cb0 100644 --- a/internal/proto/write_node.proto +++ b/internal/proto/write_node.proto @@ -1,4 +1,4 @@ - +// GOOSE TODO remove this proto file syntax = "proto3"; package milvus.proto.service;