diff --git a/internal/distributed/masterservice/masterservice_test.go b/internal/distributed/masterservice/masterservice_test.go
index 9f4a0df466fb9dfafdb5badc30e82bbc365163d4..3d3219a65eac4df3dd2f37f99ecaa2b8f0965ad9 100644
--- a/internal/distributed/masterservice/masterservice_test.go
+++ b/internal/distributed/masterservice/masterservice_test.go
@@ -3,6 +3,7 @@ package masterservice
 import (
 	"fmt"
 	"math/rand"
+	"regexp"
 	"testing"
 	"time"
 
@@ -99,6 +100,12 @@ func TestGrpcService(t *testing.T) {
 		return 2000, nil
 	}
 
+	collectionMetaCache := make([]string, 0, 16)
+	core.InvalidateCollectionMetaCache = func(dbName string, collectionName string) error {
+		collectionMetaCache = append(collectionMetaCache, collectionName)
+		return nil
+	}
+
 	err = svr.Start()
 	assert.Nil(t, err)
 
@@ -150,15 +157,60 @@ func TestGrpcService(t *testing.T) {
 		assert.Equal(t, status.ErrorCode, commonpb.ErrorCode_SUCCESS)
 		assert.Equal(t, createCollectionArray[0].Base.MsgType, commonpb.MsgType_kCreateCollection)
 		assert.Equal(t, createCollectionArray[0].CollectionName, "testColl")
+
+		req.Base.MsgID = 101
+		req.Base.Timestamp = 101
+		req.Base.SourceID = 101
+		status, err = cli.CreateCollection(req)
+		assert.Nil(t, err)
+		assert.Equal(t, status.ErrorCode, commonpb.ErrorCode_UNEXPECTED_ERROR)
+
+		req.Base.MsgID = 102
+		req.Base.Timestamp = 102
+		req.Base.SourceID = 102
+		req.CollectionName = "testColl-again"
+		status, err = cli.CreateCollection(req)
+		assert.Nil(t, err)
+		assert.Equal(t, status.ErrorCode, commonpb.ErrorCode_UNEXPECTED_ERROR)
+
+		schema.Name = req.CollectionName
+		sbf, err = proto.Marshal(&schema)
+		assert.Nil(t, err)
+		req.Schema = sbf
+		req.Base.MsgID = 103
+		req.Base.Timestamp = 103
+		req.Base.SourceID = 103
+		status, err = cli.CreateCollection(req)
+		assert.Nil(t, err)
+		assert.Equal(t, status.ErrorCode, commonpb.ErrorCode_SUCCESS)
+		assert.Equal(t, len(createCollectionArray), 2)
+		assert.Equal(t, createCollectionArray[1].Base.MsgType, commonpb.MsgType_kCreateCollection)
+		assert.Equal(t, createCollectionArray[1].CollectionName, "testColl-again")
+
+		//time stamp go back
+		schema.Name = "testColl-goback"
+		sbf, err = proto.Marshal(&schema)
+		assert.Nil(t, err)
+		req.CollectionName = schema.Name
+		req.Schema = sbf
+		req.Base.MsgID = 103
+		req.Base.Timestamp = 103
+		req.Base.SourceID = 103
+		status, err = cli.CreateCollection(req)
+		assert.Nil(t, err)
+		assert.Equal(t, status.ErrorCode, commonpb.ErrorCode_UNEXPECTED_ERROR)
+		matched, err := regexp.MatchString("input timestamp = [0-9]+, last dd time stamp = [0-9]+", status.Reason)
+		assert.Nil(t, err)
+		assert.True(t, matched)
 	})
 
 	t.Run("has collection", func(t *testing.T) {
 		req := &milvuspb.HasCollectionRequest{
 			Base: &commonpb.MsgBase{
 				MsgType:   commonpb.MsgType_kHasCollection,
-				MsgID:     101,
-				Timestamp: 101,
-				SourceID:  101,
+				MsgID:     110,
+				Timestamp: 110,
+				SourceID:  110,
 			},
 			DbName:         "testDb",
 			CollectionName: "testColl",
@@ -171,9 +223,9 @@ func TestGrpcService(t *testing.T) {
 		req = &milvuspb.HasCollectionRequest{
 			Base: &commonpb.MsgBase{
 				MsgType:   commonpb.MsgType_kHasCollection,
-				MsgID:     102,
-				Timestamp: 102,
-				SourceID:  102,
+				MsgID:     111,
+				Timestamp: 111,
+				SourceID:  111,
 			},
 			DbName:         "testDb",
 			CollectionName: "testColl2",
@@ -183,20 +235,21 @@ func TestGrpcService(t *testing.T) {
 		assert.Equal(t, rsp.Status.ErrorCode, commonpb.ErrorCode_SUCCESS)
 		assert.Equal(t, rsp.Value, false)
 
+		// test time stamp go back
 		req = &milvuspb.HasCollectionRequest{
 			Base: &commonpb.MsgBase{
 				MsgType:   commonpb.MsgType_kHasCollection,
-				MsgID:     102,
-				Timestamp: 102,
-				SourceID:  102,
+				MsgID:     111,
+				Timestamp: 111,
+				SourceID:  111,
 			},
 			DbName:         "testDb",
 			CollectionName: "testColl2",
 		}
 		rsp, err = cli.HasCollection(req)
 		assert.Nil(t, err)
-		assert.Equal(t, rsp.Status.ErrorCode, commonpb.ErrorCode_UNEXPECTED_ERROR)
-
+		assert.Equal(t, rsp.Status.ErrorCode, commonpb.ErrorCode_SUCCESS)
+		assert.Equal(t, rsp.Value, false)
 	})
 
 	t.Run("describe collection", func(t *testing.T) {
@@ -205,9 +258,9 @@ func TestGrpcService(t *testing.T) {
 		req := &milvuspb.DescribeCollectionRequest{
 			Base: &commonpb.MsgBase{
 				MsgType:   commonpb.MsgType_kDescribeCollection,
-				MsgID:     103,
-				Timestamp: 103,
-				SourceID:  103,
+				MsgID:     120,
+				Timestamp: 120,
+				SourceID:  120,
 			},
 			DbName:         "testDb",
 			CollectionName: "testColl",
@@ -223,26 +276,26 @@ func TestGrpcService(t *testing.T) {
 		req := &milvuspb.ShowCollectionRequest{
 			Base: &commonpb.MsgBase{
 				MsgType:   commonpb.MsgType_kShowCollections,
-				MsgID:     106,
-				Timestamp: 106,
-				SourceID:  106,
+				MsgID:     130,
+				Timestamp: 130,
+				SourceID:  130,
 			},
 			DbName: "testDb",
 		}
 		rsp, err := cli.ShowCollections(req)
 		assert.Nil(t, err)
 		assert.Equal(t, rsp.Status.ErrorCode, commonpb.ErrorCode_SUCCESS)
-		assert.Equal(t, rsp.CollectionNames[0], "testColl")
-		assert.Equal(t, len(rsp.CollectionNames), 1)
+		assert.ElementsMatch(t, rsp.CollectionNames, []string{"testColl", "testColl-again"})
+		assert.Equal(t, len(rsp.CollectionNames), 2)
 	})
 
 	t.Run("create partition", func(t *testing.T) {
 		req := &milvuspb.CreatePartitionRequest{
 			Base: &commonpb.MsgBase{
 				MsgType:   commonpb.MsgType_kCreatePartition,
-				MsgID:     107,
-				Timestamp: 107,
-				SourceID:  107,
+				MsgID:     140,
+				Timestamp: 140,
+				SourceID:  140,
 			},
 			DbName:         "testDb",
 			CollectionName: "testColl",
@@ -264,9 +317,9 @@ func TestGrpcService(t *testing.T) {
 		req := &milvuspb.HasPartitionRequest{
 			Base: &commonpb.MsgBase{
 				MsgType:   commonpb.MsgType_kHasPartition,
-				MsgID:     108,
-				Timestamp: 108,
-				SourceID:  108,
+				MsgID:     150,
+				Timestamp: 150,
+				SourceID:  150,
 			},
 			DbName:         "testDb",
 			CollectionName: "testColl",
@@ -284,9 +337,9 @@ func TestGrpcService(t *testing.T) {
 		req := &milvuspb.ShowPartitionRequest{
 			Base: &commonpb.MsgBase{
 				MsgType:   commonpb.MsgType_kShowPartitions,
-				MsgID:     110,
-				Timestamp: 110,
-				SourceID:  110,
+				MsgID:     160,
+				Timestamp: 160,
+				SourceID:  160,
 			},
 			DbName:         "testDb",
 			CollectionName: "testColl",
@@ -320,9 +373,9 @@ func TestGrpcService(t *testing.T) {
 		req := &milvuspb.ShowSegmentRequest{
 			Base: &commonpb.MsgBase{
 				MsgType:   commonpb.MsgType_kShowSegment,
-				MsgID:     111,
-				Timestamp: 111,
-				SourceID:  111,
+				MsgID:     170,
+				Timestamp: 170,
+				SourceID:  170,
 			},
 			CollectionID: coll.ID,
 			PartitionID:  partID,
@@ -338,9 +391,9 @@ func TestGrpcService(t *testing.T) {
 		req := &milvuspb.CreateIndexRequest{
 			Base: &commonpb.MsgBase{
 				MsgType:   commonpb.MsgType_kCreateIndex,
-				MsgID:     112,
-				Timestamp: 112,
-				SourceID:  112,
+				MsgID:     180,
+				Timestamp: 180,
+				SourceID:  180,
 			},
 			DbName:         "",
 			CollectionName: "testColl",
@@ -366,9 +419,9 @@ func TestGrpcService(t *testing.T) {
 		req := &milvuspb.DescribeSegmentRequest{
 			Base: &commonpb.MsgBase{
 				MsgType:   commonpb.MsgType_kDescribeSegment,
-				MsgID:     113,
-				Timestamp: 113,
-				SourceID:  113,
+				MsgID:     190,
+				Timestamp: 190,
+				SourceID:  190,
 			},
 			CollectionID: coll.ID,
 			SegmentID:    1000,
@@ -383,9 +436,9 @@ func TestGrpcService(t *testing.T) {
 		req := &milvuspb.DescribeIndexRequest{
 			Base: &commonpb.MsgBase{
 				MsgType:   commonpb.MsgType_kDescribeIndex,
-				MsgID:     114,
-				Timestamp: 114,
-				SourceID:  114,
+				MsgID:     200,
+				Timestamp: 200,
+				SourceID:  200,
 			},
 			DbName:         "",
 			CollectionName: "testColl",
@@ -422,9 +475,9 @@ func TestGrpcService(t *testing.T) {
 		req := &milvuspb.DescribeIndexRequest{
 			Base: &commonpb.MsgBase{
 				MsgType:   commonpb.MsgType_kDescribeIndex,
-				MsgID:     115,
-				Timestamp: 115,
-				SourceID:  115,
+				MsgID:     210,
+				Timestamp: 210,
+				SourceID:  210,
 			},
 			DbName:         "",
 			CollectionName: "testColl",
@@ -444,9 +497,9 @@ func TestGrpcService(t *testing.T) {
 		req := &milvuspb.DropPartitionRequest{
 			Base: &commonpb.MsgBase{
 				MsgType:   commonpb.MsgType_kDropPartition,
-				MsgID:     199,
-				Timestamp: 199,
-				SourceID:  199,
+				MsgID:     220,
+				Timestamp: 220,
+				SourceID:  220,
 			},
 			DbName:         "testDb",
 			CollectionName: "testColl",
@@ -467,9 +520,9 @@ func TestGrpcService(t *testing.T) {
 		req := &milvuspb.DropCollectionRequest{
 			Base: &commonpb.MsgBase{
 				MsgType:   commonpb.MsgType_kDropCollection,
-				MsgID:     200,
-				Timestamp: 200,
-				SourceID:  200,
+				MsgID:     230,
+				Timestamp: 230,
+				SourceID:  230,
 			},
 			DbName:         "testDb",
 			CollectionName: "testColl",
@@ -481,13 +534,15 @@ func TestGrpcService(t *testing.T) {
 		assert.Equal(t, status.ErrorCode, commonpb.ErrorCode_SUCCESS)
 		assert.Equal(t, dropCollectionArray[0].Base.MsgType, commonpb.MsgType_kDropCollection)
 		assert.Equal(t, dropCollectionArray[0].CollectionName, "testColl")
+		assert.Equal(t, len(collectionMetaCache), 1)
+		assert.Equal(t, collectionMetaCache[0], "testColl")
 
 		req = &milvuspb.DropCollectionRequest{
 			Base: &commonpb.MsgBase{
 				MsgType:   commonpb.MsgType_kDropCollection,
-				MsgID:     200,
-				Timestamp: 200,
-				SourceID:  200,
+				MsgID:     231,
+				Timestamp: 231,
+				SourceID:  231,
 			},
 			DbName:         "testDb",
 			CollectionName: "testColl",
diff --git a/internal/masterservice/master_service.go b/internal/masterservice/master_service.go
index e3be0abe125cd0232f7d002cc85835e9421c733f..708927801eda30a4dc7c5d95197b3f687b1590d2 100644
--- a/internal/masterservice/master_service.go
+++ b/internal/masterservice/master_service.go
@@ -148,6 +148,9 @@ type Core struct {
 	//TODO, call index builder's client to build index, return build id
 	BuildIndexReq func(binlog []string, typeParams []*commonpb.KeyValuePair, indexParams []*commonpb.KeyValuePair) (typeutil.UniqueID, error)
 
+	//TODO, proxy service interface, notify proxy service to drop collection
+	InvalidateCollectionMetaCache func(dbName string, collectionName string) error
+
 	// put create index task into this chan
 	indexTaskQueue chan *CreateIndexTask
 
@@ -228,6 +231,9 @@ func (c *Core) checkInit() error {
 	if c.BuildIndexReq == nil {
 		return errors.Errorf("BuildIndexReq is nil")
 	}
+	if c.InvalidateCollectionMetaCache == nil {
+		return errors.Errorf("InvalidateCollectionMetaCache is nil")
+	}
 	if c.indexTaskQueue == nil {
 		return errors.Errorf("indexTaskQueue is nil")
 	}
@@ -254,13 +260,15 @@ func (c *Core) startDdScheduler() {
 				task.Notify(err)
 				break
 			}
-			if ts <= c.lastDdTimeStamp {
+			if !task.IgnoreTimeStamp() && ts <= c.lastDdTimeStamp {
 				task.Notify(errors.Errorf("input timestamp = %d, last dd time stamp = %d", ts, c.lastDdTimeStamp))
 				break
 			}
 			err = task.Execute()
 			task.Notify(err)
-			c.lastDdTimeStamp = ts
+			if ts > c.lastDdTimeStamp {
+				c.lastDdTimeStamp = ts
+			}
 		}
 	}
 }
@@ -523,8 +531,14 @@ func (c *Core) setMsgStreams() error {
 						segInfoMsg, ok := segm.(*ms.SegmentInfoMsg)
 						if ok {
 							c.DataServiceSegmentChan <- segInfoMsg.Segment
+						} else {
+							flushMsg, ok := segm.(*ms.SegmentFlushCompletedMsg)
+							if ok {
+								c.DataNodeSegmentFlushCompletedChan <- flushMsg.SegmentFlushCompletedMsg.SegmentID
+							} else {
+								log.Printf("receiver unexpected msg from data service stream, value = %v", segm)
+							}
 						}
-						//TODO, if data node flush
 					}
 				}
 			}
diff --git a/internal/masterservice/task.go b/internal/masterservice/task.go
index 185bd65d612c90b0acf4d07d05ed2e473934c554..9481e6a05c5c8aa8c424b7450065a60d69141dcc 100644
--- a/internal/masterservice/task.go
+++ b/internal/masterservice/task.go
@@ -14,6 +14,7 @@ import (
 type reqTask interface {
 	Type() commonpb.MsgType
 	Ts() (typeutil.Timestamp, error)
+	IgnoreTimeStamp() bool
 	Execute() error
 	WaitToFinish() error
 	Notify(err error)
@@ -48,10 +49,15 @@ type CreateCollectionReqTask struct {
 func (t *CreateCollectionReqTask) Type() commonpb.MsgType {
 	return t.Req.Base.MsgType
 }
+
 func (t *CreateCollectionReqTask) Ts() (typeutil.Timestamp, error) {
 	return t.Req.Base.Timestamp, nil
 }
 
+func (t *CreateCollectionReqTask) IgnoreTimeStamp() bool {
+	return false
+}
+
 func (t *CreateCollectionReqTask) Execute() error {
 	var schema schemapb.CollectionSchema
 	err := proto.Unmarshal(t.Req.Schema, &schema)
@@ -59,6 +65,10 @@ func (t *CreateCollectionReqTask) Execute() error {
 		return err
 	}
 
+	if t.Req.CollectionName != schema.Name {
+		return errors.Errorf("collection name = %s, schema.Name=%s", t.Req.CollectionName, schema.Name)
+	}
+
 	for idx, field := range schema.Fields {
 		field.FieldID = int64(idx + StartOfUserFieldID)
 	}
@@ -141,11 +151,18 @@ func (t *DropCollectionReqTask) Ts() (typeutil.Timestamp, error) {
 	return t.Req.Base.Timestamp, nil
 }
 
+func (t *DropCollectionReqTask) IgnoreTimeStamp() bool {
+	return false
+}
+
 func (t *DropCollectionReqTask) Execute() error {
 	collMeta, err := t.core.MetaTable.GetCollectionByName(t.Req.CollectionName)
 	if err != nil {
 		return err
 	}
+	if err = t.core.InvalidateCollectionMetaCache(t.Req.DbName, t.Req.CollectionName); err != nil {
+		return err
+	}
 	err = t.core.MetaTable.DeleteCollection(collMeta.ID)
 	if err != nil {
 		return err
@@ -182,6 +199,10 @@ func (t *HasCollectionReqTask) Ts() (typeutil.Timestamp, error) {
 	return t.Req.Base.Timestamp, nil
 }
 
+func (t *HasCollectionReqTask) IgnoreTimeStamp() bool {
+	return true
+}
+
 func (t *HasCollectionReqTask) Execute() error {
 	_, err := t.core.MetaTable.GetCollectionByName(t.Req.CollectionName)
 	if err == nil {
@@ -206,6 +227,10 @@ func (t *DescribeCollectionReqTask) Ts() (typeutil.Timestamp, error) {
 	return t.Req.Base.Timestamp, nil
 }
 
+func (t *DescribeCollectionReqTask) IgnoreTimeStamp() bool {
+	return true
+}
+
 func (t *DescribeCollectionReqTask) Execute() error {
 	coll, err := t.core.MetaTable.GetCollectionByName(t.Req.CollectionName)
 	if err != nil {
@@ -237,6 +262,10 @@ func (t *ShowCollectionReqTask) Ts() (typeutil.Timestamp, error) {
 	return t.Req.Base.Timestamp, nil
 }
 
+func (t *ShowCollectionReqTask) IgnoreTimeStamp() bool {
+	return true
+}
+
 func (t *ShowCollectionReqTask) Execute() error {
 	coll, err := t.core.MetaTable.ListCollections()
 	if err != nil {
@@ -259,6 +288,10 @@ func (t *CreatePartitionReqTask) Ts() (typeutil.Timestamp, error) {
 	return t.Req.Base.Timestamp, nil
 }
 
+func (t *CreatePartitionReqTask) IgnoreTimeStamp() bool {
+	return false
+}
+
 func (t *CreatePartitionReqTask) Execute() error {
 	collMeta, err := t.core.MetaTable.GetCollectionByName(t.Req.CollectionName)
 	if err != nil {
@@ -304,6 +337,10 @@ func (t *DropPartitionReqTask) Ts() (typeutil.Timestamp, error) {
 	return t.Req.Base.Timestamp, nil
 }
 
+func (t *DropPartitionReqTask) IgnoreTimeStamp() bool {
+	return false
+}
+
 func (t *DropPartitionReqTask) Execute() error {
 	coll, err := t.core.MetaTable.GetCollectionByName(t.Req.CollectionName)
 	if err != nil {
@@ -345,6 +382,10 @@ func (t *HasPartitionReqTask) Ts() (typeutil.Timestamp, error) {
 	return t.Req.Base.Timestamp, nil
 }
 
+func (t *HasPartitionReqTask) IgnoreTimeStamp() bool {
+	return true
+}
+
 func (t *HasPartitionReqTask) Execute() error {
 	coll, err := t.core.MetaTable.GetCollectionByName(t.Req.CollectionName)
 	if err != nil {
@@ -368,6 +409,10 @@ func (t *ShowPartitionReqTask) Ts() (typeutil.Timestamp, error) {
 	return t.Req.Base.Timestamp, nil
 }
 
+func (t *ShowPartitionReqTask) IgnoreTimeStamp() bool {
+	return true
+}
+
 func (t *ShowPartitionReqTask) Execute() error {
 	coll, err := t.core.MetaTable.GetCollectionByID(t.Req.CollectionID)
 	if err != nil {
@@ -401,6 +446,10 @@ func (t *DescribeSegmentReqTask) Ts() (typeutil.Timestamp, error) {
 	return t.Req.Base.Timestamp, nil
 }
 
+func (t *DescribeSegmentReqTask) IgnoreTimeStamp() bool {
+	return true
+}
+
 func (t *DescribeSegmentReqTask) Execute() error {
 	coll, err := t.core.MetaTable.GetCollectionByID(t.Req.CollectionID)
 	if err != nil {
@@ -448,6 +497,10 @@ func (t *ShowSegmentReqTask) Ts() (typeutil.Timestamp, error) {
 	return t.Req.Base.Timestamp, nil
 }
 
+func (t *ShowSegmentReqTask) IgnoreTimeStamp() bool {
+	return true
+}
+
 func (t *ShowSegmentReqTask) Execute() error {
 	coll, err := t.core.MetaTable.GetCollectionByID(t.Req.CollectionID)
 	if err != nil {
@@ -476,6 +529,10 @@ func (t *CreateIndexReqTask) Ts() (typeutil.Timestamp, error) {
 	return t.Req.Base.Timestamp, nil
 }
 
+func (t *CreateIndexReqTask) IgnoreTimeStamp() bool {
+	return false
+}
+
 func (t *CreateIndexReqTask) Execute() error {
 	segIDs, field, err := t.core.MetaTable.GetNotIndexedSegments(t.Req.CollectionName, t.Req.FieldName, t.Req.ExtraParams)
 	if err != nil {
@@ -511,6 +568,10 @@ func (t *DescribeIndexReqTask) Ts() (typeutil.Timestamp, error) {
 	return t.Req.Base.Timestamp, nil
 }
 
+func (t *DescribeIndexReqTask) IgnoreTimeStamp() bool {
+	return true
+}
+
 func (t *DescribeIndexReqTask) Execute() error {
 	idx, err := t.core.MetaTable.GetIndexByName(t.Req.CollectionName, t.Req.FieldName, t.Req.IndexName)
 	if err != nil {
diff --git a/internal/msgstream/msg.go b/internal/msgstream/msg.go
index 6fa8a0c8d5ea3372150b70f8148373360f96cb43..e3084b123ef81331317663e9e2eda9dc87c22c1a 100644
--- a/internal/msgstream/msg.go
+++ b/internal/msgstream/msg.go
@@ -690,3 +690,41 @@ func (sim *SegmentInfoMsg) Unmarshal(input []byte) (TsMsg, error) {
 		SegmentMsg: segMsg,
 	}, nil
 }
+
+/////////////////////////////////////////SegmentFlushCompletedMsg//////////////////////////////////////////
+type SegmentFlushCompletedMsg struct {
+	BaseMsg
+	datapb.SegmentFlushCompletedMsg
+}
+
+func (sfm *SegmentFlushCompletedMsg) Type() MsgType {
+	return sfm.Base.MsgType
+}
+
+func (sfm *SegmentFlushCompletedMsg) GetMsgContext() context.Context {
+	return sfm.MsgCtx
+}
+
+func (sfm *SegmentFlushCompletedMsg) SetMsgContext(ctx context.Context) {
+	sfm.MsgCtx = ctx
+}
+
+func (sfm *SegmentFlushCompletedMsg) Marshal(input TsMsg) ([]byte, error) {
+	sfmsg := input.(*SegmentFlushCompletedMsg)
+	mb, err := proto.Marshal(&sfmsg.SegmentFlushCompletedMsg)
+	if err != nil {
+		return nil, err
+	}
+	return mb, nil
+}
+
+func (sfm *SegmentFlushCompletedMsg) Unmarshal(input []byte) (TsMsg, error) {
+	sfmsg := datapb.SegmentFlushCompletedMsg{}
+	err := proto.Unmarshal(input, &sfmsg)
+	if err != nil {
+		return nil, err
+	}
+	return &SegmentFlushCompletedMsg{
+		SegmentFlushCompletedMsg: sfmsg,
+	}, nil
+}