From e68d50f29e44db9713a24012b56a10e55b4d9547 Mon Sep 17 00:00:00 2001 From: neza2017 <yefu.chen@zilliz.com> Date: Sat, 23 Jan 2021 10:12:41 +0800 Subject: [PATCH] Invalid collection meta caceh && ingore time stamp if not dd request Signed-off-by: neza2017 <yefu.chen@zilliz.com> --- .../masterservice/masterservice_test.go | 159 ++++++++++++------ internal/masterservice/master_service.go | 20 ++- internal/masterservice/task.go | 61 +++++++ internal/msgstream/msg.go | 38 +++++ 4 files changed, 223 insertions(+), 55 deletions(-) diff --git a/internal/distributed/masterservice/masterservice_test.go b/internal/distributed/masterservice/masterservice_test.go index 9f4a0df46..3d3219a65 100644 --- a/internal/distributed/masterservice/masterservice_test.go +++ b/internal/distributed/masterservice/masterservice_test.go @@ -3,6 +3,7 @@ package masterservice import ( "fmt" "math/rand" + "regexp" "testing" "time" @@ -99,6 +100,12 @@ func TestGrpcService(t *testing.T) { return 2000, nil } + collectionMetaCache := make([]string, 0, 16) + core.InvalidateCollectionMetaCache = func(dbName string, collectionName string) error { + collectionMetaCache = append(collectionMetaCache, collectionName) + return nil + } + err = svr.Start() assert.Nil(t, err) @@ -150,15 +157,60 @@ func TestGrpcService(t *testing.T) { assert.Equal(t, status.ErrorCode, commonpb.ErrorCode_SUCCESS) assert.Equal(t, createCollectionArray[0].Base.MsgType, commonpb.MsgType_kCreateCollection) assert.Equal(t, createCollectionArray[0].CollectionName, "testColl") + + req.Base.MsgID = 101 + req.Base.Timestamp = 101 + req.Base.SourceID = 101 + status, err = cli.CreateCollection(req) + assert.Nil(t, err) + assert.Equal(t, status.ErrorCode, commonpb.ErrorCode_UNEXPECTED_ERROR) + + req.Base.MsgID = 102 + req.Base.Timestamp = 102 + req.Base.SourceID = 102 + req.CollectionName = "testColl-again" + status, err = cli.CreateCollection(req) + assert.Nil(t, err) + assert.Equal(t, status.ErrorCode, commonpb.ErrorCode_UNEXPECTED_ERROR) + + schema.Name = req.CollectionName + sbf, err = proto.Marshal(&schema) + assert.Nil(t, err) + req.Schema = sbf + req.Base.MsgID = 103 + req.Base.Timestamp = 103 + req.Base.SourceID = 103 + status, err = cli.CreateCollection(req) + assert.Nil(t, err) + assert.Equal(t, status.ErrorCode, commonpb.ErrorCode_SUCCESS) + assert.Equal(t, len(createCollectionArray), 2) + assert.Equal(t, createCollectionArray[1].Base.MsgType, commonpb.MsgType_kCreateCollection) + assert.Equal(t, createCollectionArray[1].CollectionName, "testColl-again") + + //time stamp go back + schema.Name = "testColl-goback" + sbf, err = proto.Marshal(&schema) + assert.Nil(t, err) + req.CollectionName = schema.Name + req.Schema = sbf + req.Base.MsgID = 103 + req.Base.Timestamp = 103 + req.Base.SourceID = 103 + status, err = cli.CreateCollection(req) + assert.Nil(t, err) + assert.Equal(t, status.ErrorCode, commonpb.ErrorCode_UNEXPECTED_ERROR) + matched, err := regexp.MatchString("input timestamp = [0-9]+, last dd time stamp = [0-9]+", status.Reason) + assert.Nil(t, err) + assert.True(t, matched) }) t.Run("has collection", func(t *testing.T) { req := &milvuspb.HasCollectionRequest{ Base: &commonpb.MsgBase{ MsgType: commonpb.MsgType_kHasCollection, - MsgID: 101, - Timestamp: 101, - SourceID: 101, + MsgID: 110, + Timestamp: 110, + SourceID: 110, }, DbName: "testDb", CollectionName: "testColl", @@ -171,9 +223,9 @@ func TestGrpcService(t *testing.T) { req = &milvuspb.HasCollectionRequest{ Base: &commonpb.MsgBase{ MsgType: commonpb.MsgType_kHasCollection, - MsgID: 102, - Timestamp: 102, - SourceID: 102, + MsgID: 111, + Timestamp: 111, + SourceID: 111, }, DbName: "testDb", CollectionName: "testColl2", @@ -183,20 +235,21 @@ func TestGrpcService(t *testing.T) { assert.Equal(t, rsp.Status.ErrorCode, commonpb.ErrorCode_SUCCESS) assert.Equal(t, rsp.Value, false) + // test time stamp go back req = &milvuspb.HasCollectionRequest{ Base: &commonpb.MsgBase{ MsgType: commonpb.MsgType_kHasCollection, - MsgID: 102, - Timestamp: 102, - SourceID: 102, + MsgID: 111, + Timestamp: 111, + SourceID: 111, }, DbName: "testDb", CollectionName: "testColl2", } rsp, err = cli.HasCollection(req) assert.Nil(t, err) - assert.Equal(t, rsp.Status.ErrorCode, commonpb.ErrorCode_UNEXPECTED_ERROR) - + assert.Equal(t, rsp.Status.ErrorCode, commonpb.ErrorCode_SUCCESS) + assert.Equal(t, rsp.Value, false) }) t.Run("describe collection", func(t *testing.T) { @@ -205,9 +258,9 @@ func TestGrpcService(t *testing.T) { req := &milvuspb.DescribeCollectionRequest{ Base: &commonpb.MsgBase{ MsgType: commonpb.MsgType_kDescribeCollection, - MsgID: 103, - Timestamp: 103, - SourceID: 103, + MsgID: 120, + Timestamp: 120, + SourceID: 120, }, DbName: "testDb", CollectionName: "testColl", @@ -223,26 +276,26 @@ func TestGrpcService(t *testing.T) { req := &milvuspb.ShowCollectionRequest{ Base: &commonpb.MsgBase{ MsgType: commonpb.MsgType_kShowCollections, - MsgID: 106, - Timestamp: 106, - SourceID: 106, + MsgID: 130, + Timestamp: 130, + SourceID: 130, }, DbName: "testDb", } rsp, err := cli.ShowCollections(req) assert.Nil(t, err) assert.Equal(t, rsp.Status.ErrorCode, commonpb.ErrorCode_SUCCESS) - assert.Equal(t, rsp.CollectionNames[0], "testColl") - assert.Equal(t, len(rsp.CollectionNames), 1) + assert.ElementsMatch(t, rsp.CollectionNames, []string{"testColl", "testColl-again"}) + assert.Equal(t, len(rsp.CollectionNames), 2) }) t.Run("create partition", func(t *testing.T) { req := &milvuspb.CreatePartitionRequest{ Base: &commonpb.MsgBase{ MsgType: commonpb.MsgType_kCreatePartition, - MsgID: 107, - Timestamp: 107, - SourceID: 107, + MsgID: 140, + Timestamp: 140, + SourceID: 140, }, DbName: "testDb", CollectionName: "testColl", @@ -264,9 +317,9 @@ func TestGrpcService(t *testing.T) { req := &milvuspb.HasPartitionRequest{ Base: &commonpb.MsgBase{ MsgType: commonpb.MsgType_kHasPartition, - MsgID: 108, - Timestamp: 108, - SourceID: 108, + MsgID: 150, + Timestamp: 150, + SourceID: 150, }, DbName: "testDb", CollectionName: "testColl", @@ -284,9 +337,9 @@ func TestGrpcService(t *testing.T) { req := &milvuspb.ShowPartitionRequest{ Base: &commonpb.MsgBase{ MsgType: commonpb.MsgType_kShowPartitions, - MsgID: 110, - Timestamp: 110, - SourceID: 110, + MsgID: 160, + Timestamp: 160, + SourceID: 160, }, DbName: "testDb", CollectionName: "testColl", @@ -320,9 +373,9 @@ func TestGrpcService(t *testing.T) { req := &milvuspb.ShowSegmentRequest{ Base: &commonpb.MsgBase{ MsgType: commonpb.MsgType_kShowSegment, - MsgID: 111, - Timestamp: 111, - SourceID: 111, + MsgID: 170, + Timestamp: 170, + SourceID: 170, }, CollectionID: coll.ID, PartitionID: partID, @@ -338,9 +391,9 @@ func TestGrpcService(t *testing.T) { req := &milvuspb.CreateIndexRequest{ Base: &commonpb.MsgBase{ MsgType: commonpb.MsgType_kCreateIndex, - MsgID: 112, - Timestamp: 112, - SourceID: 112, + MsgID: 180, + Timestamp: 180, + SourceID: 180, }, DbName: "", CollectionName: "testColl", @@ -366,9 +419,9 @@ func TestGrpcService(t *testing.T) { req := &milvuspb.DescribeSegmentRequest{ Base: &commonpb.MsgBase{ MsgType: commonpb.MsgType_kDescribeSegment, - MsgID: 113, - Timestamp: 113, - SourceID: 113, + MsgID: 190, + Timestamp: 190, + SourceID: 190, }, CollectionID: coll.ID, SegmentID: 1000, @@ -383,9 +436,9 @@ func TestGrpcService(t *testing.T) { req := &milvuspb.DescribeIndexRequest{ Base: &commonpb.MsgBase{ MsgType: commonpb.MsgType_kDescribeIndex, - MsgID: 114, - Timestamp: 114, - SourceID: 114, + MsgID: 200, + Timestamp: 200, + SourceID: 200, }, DbName: "", CollectionName: "testColl", @@ -422,9 +475,9 @@ func TestGrpcService(t *testing.T) { req := &milvuspb.DescribeIndexRequest{ Base: &commonpb.MsgBase{ MsgType: commonpb.MsgType_kDescribeIndex, - MsgID: 115, - Timestamp: 115, - SourceID: 115, + MsgID: 210, + Timestamp: 210, + SourceID: 210, }, DbName: "", CollectionName: "testColl", @@ -444,9 +497,9 @@ func TestGrpcService(t *testing.T) { req := &milvuspb.DropPartitionRequest{ Base: &commonpb.MsgBase{ MsgType: commonpb.MsgType_kDropPartition, - MsgID: 199, - Timestamp: 199, - SourceID: 199, + MsgID: 220, + Timestamp: 220, + SourceID: 220, }, DbName: "testDb", CollectionName: "testColl", @@ -467,9 +520,9 @@ func TestGrpcService(t *testing.T) { req := &milvuspb.DropCollectionRequest{ Base: &commonpb.MsgBase{ MsgType: commonpb.MsgType_kDropCollection, - MsgID: 200, - Timestamp: 200, - SourceID: 200, + MsgID: 230, + Timestamp: 230, + SourceID: 230, }, DbName: "testDb", CollectionName: "testColl", @@ -481,13 +534,15 @@ func TestGrpcService(t *testing.T) { assert.Equal(t, status.ErrorCode, commonpb.ErrorCode_SUCCESS) assert.Equal(t, dropCollectionArray[0].Base.MsgType, commonpb.MsgType_kDropCollection) assert.Equal(t, dropCollectionArray[0].CollectionName, "testColl") + assert.Equal(t, len(collectionMetaCache), 1) + assert.Equal(t, collectionMetaCache[0], "testColl") req = &milvuspb.DropCollectionRequest{ Base: &commonpb.MsgBase{ MsgType: commonpb.MsgType_kDropCollection, - MsgID: 200, - Timestamp: 200, - SourceID: 200, + MsgID: 231, + Timestamp: 231, + SourceID: 231, }, DbName: "testDb", CollectionName: "testColl", diff --git a/internal/masterservice/master_service.go b/internal/masterservice/master_service.go index e3be0abe1..708927801 100644 --- a/internal/masterservice/master_service.go +++ b/internal/masterservice/master_service.go @@ -148,6 +148,9 @@ type Core struct { //TODO, call index builder's client to build index, return build id BuildIndexReq func(binlog []string, typeParams []*commonpb.KeyValuePair, indexParams []*commonpb.KeyValuePair) (typeutil.UniqueID, error) + //TODO, proxy service interface, notify proxy service to drop collection + InvalidateCollectionMetaCache func(dbName string, collectionName string) error + // put create index task into this chan indexTaskQueue chan *CreateIndexTask @@ -228,6 +231,9 @@ func (c *Core) checkInit() error { if c.BuildIndexReq == nil { return errors.Errorf("BuildIndexReq is nil") } + if c.InvalidateCollectionMetaCache == nil { + return errors.Errorf("InvalidateCollectionMetaCache is nil") + } if c.indexTaskQueue == nil { return errors.Errorf("indexTaskQueue is nil") } @@ -254,13 +260,15 @@ func (c *Core) startDdScheduler() { task.Notify(err) break } - if ts <= c.lastDdTimeStamp { + if !task.IgnoreTimeStamp() && ts <= c.lastDdTimeStamp { task.Notify(errors.Errorf("input timestamp = %d, last dd time stamp = %d", ts, c.lastDdTimeStamp)) break } err = task.Execute() task.Notify(err) - c.lastDdTimeStamp = ts + if ts > c.lastDdTimeStamp { + c.lastDdTimeStamp = ts + } } } } @@ -523,8 +531,14 @@ func (c *Core) setMsgStreams() error { segInfoMsg, ok := segm.(*ms.SegmentInfoMsg) if ok { c.DataServiceSegmentChan <- segInfoMsg.Segment + } else { + flushMsg, ok := segm.(*ms.SegmentFlushCompletedMsg) + if ok { + c.DataNodeSegmentFlushCompletedChan <- flushMsg.SegmentFlushCompletedMsg.SegmentID + } else { + log.Printf("receiver unexpected msg from data service stream, value = %v", segm) + } } - //TODO, if data node flush } } } diff --git a/internal/masterservice/task.go b/internal/masterservice/task.go index 185bd65d6..9481e6a05 100644 --- a/internal/masterservice/task.go +++ b/internal/masterservice/task.go @@ -14,6 +14,7 @@ import ( type reqTask interface { Type() commonpb.MsgType Ts() (typeutil.Timestamp, error) + IgnoreTimeStamp() bool Execute() error WaitToFinish() error Notify(err error) @@ -48,10 +49,15 @@ type CreateCollectionReqTask struct { func (t *CreateCollectionReqTask) Type() commonpb.MsgType { return t.Req.Base.MsgType } + func (t *CreateCollectionReqTask) Ts() (typeutil.Timestamp, error) { return t.Req.Base.Timestamp, nil } +func (t *CreateCollectionReqTask) IgnoreTimeStamp() bool { + return false +} + func (t *CreateCollectionReqTask) Execute() error { var schema schemapb.CollectionSchema err := proto.Unmarshal(t.Req.Schema, &schema) @@ -59,6 +65,10 @@ func (t *CreateCollectionReqTask) Execute() error { return err } + if t.Req.CollectionName != schema.Name { + return errors.Errorf("collection name = %s, schema.Name=%s", t.Req.CollectionName, schema.Name) + } + for idx, field := range schema.Fields { field.FieldID = int64(idx + StartOfUserFieldID) } @@ -141,11 +151,18 @@ func (t *DropCollectionReqTask) Ts() (typeutil.Timestamp, error) { return t.Req.Base.Timestamp, nil } +func (t *DropCollectionReqTask) IgnoreTimeStamp() bool { + return false +} + func (t *DropCollectionReqTask) Execute() error { collMeta, err := t.core.MetaTable.GetCollectionByName(t.Req.CollectionName) if err != nil { return err } + if err = t.core.InvalidateCollectionMetaCache(t.Req.DbName, t.Req.CollectionName); err != nil { + return err + } err = t.core.MetaTable.DeleteCollection(collMeta.ID) if err != nil { return err @@ -182,6 +199,10 @@ func (t *HasCollectionReqTask) Ts() (typeutil.Timestamp, error) { return t.Req.Base.Timestamp, nil } +func (t *HasCollectionReqTask) IgnoreTimeStamp() bool { + return true +} + func (t *HasCollectionReqTask) Execute() error { _, err := t.core.MetaTable.GetCollectionByName(t.Req.CollectionName) if err == nil { @@ -206,6 +227,10 @@ func (t *DescribeCollectionReqTask) Ts() (typeutil.Timestamp, error) { return t.Req.Base.Timestamp, nil } +func (t *DescribeCollectionReqTask) IgnoreTimeStamp() bool { + return true +} + func (t *DescribeCollectionReqTask) Execute() error { coll, err := t.core.MetaTable.GetCollectionByName(t.Req.CollectionName) if err != nil { @@ -237,6 +262,10 @@ func (t *ShowCollectionReqTask) Ts() (typeutil.Timestamp, error) { return t.Req.Base.Timestamp, nil } +func (t *ShowCollectionReqTask) IgnoreTimeStamp() bool { + return true +} + func (t *ShowCollectionReqTask) Execute() error { coll, err := t.core.MetaTable.ListCollections() if err != nil { @@ -259,6 +288,10 @@ func (t *CreatePartitionReqTask) Ts() (typeutil.Timestamp, error) { return t.Req.Base.Timestamp, nil } +func (t *CreatePartitionReqTask) IgnoreTimeStamp() bool { + return false +} + func (t *CreatePartitionReqTask) Execute() error { collMeta, err := t.core.MetaTable.GetCollectionByName(t.Req.CollectionName) if err != nil { @@ -304,6 +337,10 @@ func (t *DropPartitionReqTask) Ts() (typeutil.Timestamp, error) { return t.Req.Base.Timestamp, nil } +func (t *DropPartitionReqTask) IgnoreTimeStamp() bool { + return false +} + func (t *DropPartitionReqTask) Execute() error { coll, err := t.core.MetaTable.GetCollectionByName(t.Req.CollectionName) if err != nil { @@ -345,6 +382,10 @@ func (t *HasPartitionReqTask) Ts() (typeutil.Timestamp, error) { return t.Req.Base.Timestamp, nil } +func (t *HasPartitionReqTask) IgnoreTimeStamp() bool { + return true +} + func (t *HasPartitionReqTask) Execute() error { coll, err := t.core.MetaTable.GetCollectionByName(t.Req.CollectionName) if err != nil { @@ -368,6 +409,10 @@ func (t *ShowPartitionReqTask) Ts() (typeutil.Timestamp, error) { return t.Req.Base.Timestamp, nil } +func (t *ShowPartitionReqTask) IgnoreTimeStamp() bool { + return true +} + func (t *ShowPartitionReqTask) Execute() error { coll, err := t.core.MetaTable.GetCollectionByID(t.Req.CollectionID) if err != nil { @@ -401,6 +446,10 @@ func (t *DescribeSegmentReqTask) Ts() (typeutil.Timestamp, error) { return t.Req.Base.Timestamp, nil } +func (t *DescribeSegmentReqTask) IgnoreTimeStamp() bool { + return true +} + func (t *DescribeSegmentReqTask) Execute() error { coll, err := t.core.MetaTable.GetCollectionByID(t.Req.CollectionID) if err != nil { @@ -448,6 +497,10 @@ func (t *ShowSegmentReqTask) Ts() (typeutil.Timestamp, error) { return t.Req.Base.Timestamp, nil } +func (t *ShowSegmentReqTask) IgnoreTimeStamp() bool { + return true +} + func (t *ShowSegmentReqTask) Execute() error { coll, err := t.core.MetaTable.GetCollectionByID(t.Req.CollectionID) if err != nil { @@ -476,6 +529,10 @@ func (t *CreateIndexReqTask) Ts() (typeutil.Timestamp, error) { return t.Req.Base.Timestamp, nil } +func (t *CreateIndexReqTask) IgnoreTimeStamp() bool { + return false +} + func (t *CreateIndexReqTask) Execute() error { segIDs, field, err := t.core.MetaTable.GetNotIndexedSegments(t.Req.CollectionName, t.Req.FieldName, t.Req.ExtraParams) if err != nil { @@ -511,6 +568,10 @@ func (t *DescribeIndexReqTask) Ts() (typeutil.Timestamp, error) { return t.Req.Base.Timestamp, nil } +func (t *DescribeIndexReqTask) IgnoreTimeStamp() bool { + return true +} + func (t *DescribeIndexReqTask) Execute() error { idx, err := t.core.MetaTable.GetIndexByName(t.Req.CollectionName, t.Req.FieldName, t.Req.IndexName) if err != nil { diff --git a/internal/msgstream/msg.go b/internal/msgstream/msg.go index 6fa8a0c8d..e3084b123 100644 --- a/internal/msgstream/msg.go +++ b/internal/msgstream/msg.go @@ -690,3 +690,41 @@ func (sim *SegmentInfoMsg) Unmarshal(input []byte) (TsMsg, error) { SegmentMsg: segMsg, }, nil } + +/////////////////////////////////////////SegmentFlushCompletedMsg////////////////////////////////////////// +type SegmentFlushCompletedMsg struct { + BaseMsg + datapb.SegmentFlushCompletedMsg +} + +func (sfm *SegmentFlushCompletedMsg) Type() MsgType { + return sfm.Base.MsgType +} + +func (sfm *SegmentFlushCompletedMsg) GetMsgContext() context.Context { + return sfm.MsgCtx +} + +func (sfm *SegmentFlushCompletedMsg) SetMsgContext(ctx context.Context) { + sfm.MsgCtx = ctx +} + +func (sfm *SegmentFlushCompletedMsg) Marshal(input TsMsg) ([]byte, error) { + sfmsg := input.(*SegmentFlushCompletedMsg) + mb, err := proto.Marshal(&sfmsg.SegmentFlushCompletedMsg) + if err != nil { + return nil, err + } + return mb, nil +} + +func (sfm *SegmentFlushCompletedMsg) Unmarshal(input []byte) (TsMsg, error) { + sfmsg := datapb.SegmentFlushCompletedMsg{} + err := proto.Unmarshal(input, &sfmsg) + if err != nil { + return nil, err + } + return &SegmentFlushCompletedMsg{ + SegmentFlushCompletedMsg: sfmsg, + }, nil +} -- GitLab