diff --git a/cmd/db-server/main.go b/cmd/db-server/main.go
index 465dfff29296516d40f9bf19d22e780155450bce..84e28bbfdb67d7847367304f3eda11a476e190a3 100644
--- a/cmd/db-server/main.go
+++ b/cmd/db-server/main.go
@@ -137,6 +137,13 @@ func removeEpoch(epoch uint64) {
 	if err != nil {
 		fmt.Printf("catalog remove ddl failed. error :%v \n", err)
 	}
+	if tpe,ok := config.StorageEngine.(*tpeEngine.TpeEngine) ; ok {
+		err = tpe.RemoveDeletedTable(epoch)
+		if err != nil {
+			fmt.Printf("tpeEngine remove ddl failed. error :%v \n", err)
+		}
+	}
+
 }
 
 func main() {
diff --git a/pkg/vm/engine/tpe/computation/computationHandler.go b/pkg/vm/engine/tpe/computation/computationHandler.go
index 12f88a6d4c03ed5590a2cb36321c118532d68b54..f0c49fdc41114d215271c94793cb6db8422b36cc 100644
--- a/pkg/vm/engine/tpe/computation/computationHandler.go
+++ b/pkg/vm/engine/tpe/computation/computationHandler.go
@@ -41,4 +41,6 @@ type ComputationHandler interface {
 	Read(readCtx interface{}) (*batch.Batch, error)
 
 	Write(writeCtx interface{}, bat *batch.Batch) error
+
+	RemoveDeletedTable(epoch uint64) (int, error)
 }
\ No newline at end of file
diff --git a/pkg/vm/engine/tpe/descriptor/types.go b/pkg/vm/engine/tpe/descriptor/types.go
index f9d07a3f9cfb2d25b82d40a23b589f08c7ad5c2c..604aff5b4b233c08e20c1ffeb1d0978663b31944 100644
--- a/pkg/vm/engine/tpe/descriptor/types.go
+++ b/pkg/vm/engine/tpe/descriptor/types.go
@@ -231,6 +231,12 @@ func ExtractIndexAttributeDescIDs(attrs []*AttributeDesc) []int {
 	return ids
 }
 
+type EpochGCItem struct {
+	Epoch uint64
+	DbID uint64
+	TableID uint64
+}
+
 // DescriptorHandler loads and updates the descriptors
 type DescriptorHandler interface {
 
@@ -267,6 +273,6 @@ type DescriptorHandler interface {
 	//StoreRelationDescIntoAsyncGC stores the table into the asyncgc table
 	StoreRelationDescIntoAsyncGC(epoch uint64, dbID uint64, desc *RelationDesc) error
 
-	//ListRelationDescFromAsyncGC gets all the tables from the asyncgc table
-	ListRelationDescFromAsyncGC(epoch uint64) ([]*RelationDesc, error)
+	//ListRelationDescFromAsyncGC gets all the tables need to deleted from the asyncgc table
+	ListRelationDescFromAsyncGC(epoch uint64) ([]EpochGCItem, error)
 }
\ No newline at end of file
diff --git a/pkg/vm/engine/tpe/engine/engine.go b/pkg/vm/engine/tpe/engine/engine.go
index 2d0749118a898e77c3184882e1a3aa6465369396..9262aba36e9fa0a6fe13ead3f534b668b6a15ea0 100644
--- a/pkg/vm/engine/tpe/engine/engine.go
+++ b/pkg/vm/engine/tpe/engine/engine.go
@@ -91,4 +91,12 @@ func (te * TpeEngine) Database(name string) (engine.Database, error) {
 
 func (te * TpeEngine) Node(s string) *engine.NodeInfo {
 	return &engine.NodeInfo{Mcpu: 1}
+}
+
+func (te * TpeEngine) RemoveDeletedTable(epoch uint64) error {
+	_, err := te.computeHandler.RemoveDeletedTable(epoch)
+	if err != nil {
+		return err
+	}
+	return nil
 }
\ No newline at end of file
diff --git a/pkg/vm/engine/tpe/epoch/epoch.go b/pkg/vm/engine/tpe/epoch/epoch.go
deleted file mode 100644
index 3e1eede9ee9c2ba1506c93e7d2d59070c60394f6..0000000000000000000000000000000000000000
--- a/pkg/vm/engine/tpe/epoch/epoch.go
+++ /dev/null
@@ -1,45 +0,0 @@
-// Copyright 2021 Matrix Origin
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//      http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package epoch
-
-import "sync"
-
-type EpochHandler struct {
-	epoch uint64
-	rwlock sync.RWMutex
-}
-
-func NewEpochHandler() *EpochHandler {
-	return &EpochHandler{}
-}
-
-func (eh *EpochHandler) SetEpoch(e uint64) {
-	eh.rwlock.Lock()
-	defer eh.rwlock.Unlock()
-	eh.epoch = e
-}
-
-func (eh *EpochHandler) GetEpoch() uint64{
-	eh.rwlock.RLock()
-	defer eh.rwlock.RUnlock()
-	return eh.epoch
-}
-
-func (eh *EpochHandler) RemoveDeletedTable(epoch uint64) (int, error) {
-	eh.rwlock.Lock()
-	defer eh.rwlock.Unlock()
-	//TODO:implement
-	return 0, nil
-}
\ No newline at end of file
diff --git a/pkg/vm/engine/tpe/epoch/epoch_test.go b/pkg/vm/engine/tpe/epoch/epoch_test.go
deleted file mode 100644
index da0ffb4903a6b74a9d290d05eab4d1364041a9e3..0000000000000000000000000000000000000000
--- a/pkg/vm/engine/tpe/epoch/epoch_test.go
+++ /dev/null
@@ -1,31 +0,0 @@
-// Copyright 2021 Matrix Origin
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//      http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package epoch
-
-import (
-	"github.com/smartystreets/goconvey/convey"
-	"testing"
-)
-
-func TestEpochHandler_SetEpoch(t *testing.T) {
-	convey.Convey("set/get",t, func() {
-		eh := NewEpochHandler()
-
-		for i := 0; i < 100; i++ {
-			eh.SetEpoch(uint64(i))
-			convey.So(uint64(i),convey.ShouldEqual, eh.GetEpoch())
-		}
-	})
-}
\ No newline at end of file
diff --git a/pkg/vm/engine/tpe/tuplecodec/computationhandler.go b/pkg/vm/engine/tpe/tuplecodec/computationhandler.go
index 03eba9125417987386f5a6c21fc9cc9f29f2494d..461ea3aafca6425ba221702e8b7c49becdb272b1 100644
--- a/pkg/vm/engine/tpe/tuplecodec/computationhandler.go
+++ b/pkg/vm/engine/tpe/tuplecodec/computationhandler.go
@@ -46,6 +46,7 @@ type ComputationHandlerImpl struct {
 	tch *TupleCodecHandler
 	serializer ValueSerializer
 	indexHandler index.IndexHandler
+	epochHandler * EpochHandler
 }
 
 func (chi *ComputationHandlerImpl) Read(readCtx interface{}) (*batch.Batch, error) {
@@ -168,8 +169,8 @@ func (chi *ComputationHandlerImpl) GetDatabase(dbName string) (*descriptor.Datab
 //callbackForGetDatabaseDesc extracts the databaseDesc
 func (chi *ComputationHandlerImpl) callbackForGetDatabaseDesc (callbackCtx interface{},dis []*orderedcodec.DecodedItem)([]byte,error) {
 	//get the name and the desc
-	descAttr := internalDescriptorTableDesc.Attributes[InternalDescriptorTableID_desc_ID]
-	descDI := dis[InternalDescriptorTableID_desc_ID]
+	descAttr := internalDescriptorTableDesc.Attributes[InternalDescriptorTable_desc_ID]
+	descDI := dis[InternalDescriptorTable_desc_ID]
 	if !(descDI.IsValueType(descAttr.Ttype)) {
 		return nil,errorTypeInValueNotEqualToTypeInAttribute
 	}
@@ -303,8 +304,8 @@ func (chi *ComputationHandlerImpl) DropTableByDesc(epoch, dbId uint64, tableDesc
 //callbackForGetTableDesc extracts the tableDesc
 func (chi *ComputationHandlerImpl) callbackForGetTableDesc (callbackCtx interface{},dis []*orderedcodec.DecodedItem)([]byte,error) {
 	//get the name and the desc
-	descAttr := internalDescriptorTableDesc.Attributes[InternalDescriptorTableID_desc_ID]
-	descDI := dis[InternalDescriptorTableID_desc_ID]
+	descAttr := internalDescriptorTableDesc.Attributes[InternalDescriptorTable_desc_ID]
+	descDI := dis[InternalDescriptorTable_desc_ID]
 	if !(descDI.IsValueType(descAttr.Ttype)) {
 		return nil,errorTypeInValueNotEqualToTypeInAttribute
 	}
@@ -374,6 +375,10 @@ func (chi *ComputationHandlerImpl) GetTable(dbId uint64, name string) (*descript
 	return tableDesc,nil
 }
 
+func (chi *ComputationHandlerImpl) RemoveDeletedTable(epoch uint64) (int, error) {
+	return chi.epochHandler.RemoveDeletedTable(epoch)
+}
+
 type AttributeStateForWrite struct {
 	PositionInBatch int
 
diff --git a/pkg/vm/engine/tpe/tuplecodec/constants.go b/pkg/vm/engine/tpe/tuplecodec/constants.go
index 1f7889252f578510000e9ae742e094f27a1d4759..ed19908f9d4b42c9a2101ff943939255a83ad59a 100644
--- a/pkg/vm/engine/tpe/tuplecodec/constants.go
+++ b/pkg/vm/engine/tpe/tuplecodec/constants.go
@@ -29,14 +29,18 @@ const (
 	//holding the schema of the table
 	InternalDescriptorTableID uint64 = 0
 
-	InternalDescriptorTableID_parentID_ID = 0
-	InternalDescriptorTableID_id_ID = 1
-	InternalDescriptorTableID_name_ID = 2
-	InternalDescriptorTableID_desc_ID = 3
-	PrimaryIndexID uint32 = 1
+	InternalDescriptorTable_parentID_ID        = 0
+	InternalDescriptorTable_id_ID              = 1
+	InternalDescriptorTable_name_ID            = 2
+	InternalDescriptorTable_desc_ID            = 3
+	PrimaryIndexID                      uint32 = 1
 
 	//holding the epochgced table
 	InternalAsyncGCTableID uint64 = 1
+	InternalAsyncGCTable_epoch_ID = 0
+	InternalAsyncGCTable_dbID_ID = 1
+	InternalAsyncGCTable_tableID_ID = 2
+	InternalAsyncGCTable_desc_ID = 3
 
 	//user table id offset
 	UserTableIDOffset uint64 = 3
@@ -214,6 +218,16 @@ var (
 					ID:        0,
 					Type:      orderedcodec.VALUE_TYPE_UINT64,
 				},
+				{
+					Name:      "dbID",
+					ID:        1,
+					Type:      orderedcodec.VALUE_TYPE_UINT64,
+				},
+				{
+					Name:      "tableID",
+					ID:        2,
+					Type:      orderedcodec.VALUE_TYPE_UINT64,
+				},
 			},
 			Impilict_attributes:  nil,
 			Composite_attributes: nil,
diff --git a/pkg/vm/engine/tpe/tuplecodec/cubekv.go b/pkg/vm/engine/tpe/tuplecodec/cubekv.go
index e0939487469bc39ddbb2b6fab37297273979a3b7..2c2b89ebacbe77659e896b5aaabe188f4d5a1653 100644
--- a/pkg/vm/engine/tpe/tuplecodec/cubekv.go
+++ b/pkg/vm/engine/tpe/tuplecodec/cubekv.go
@@ -40,6 +40,7 @@ var (
 	errorCubeDriverIsNull = errors.New("cube driver is nil")
 	errorInvalidIDPool = errors.New("invalid idpool")
 	errorInvalidKeyValueCount = errors.New("key count != value count")
+	errorUnsupportedInCubeKV = errors.New("unsupported in cubekv")
 )
 var _ KVHandler = &CubeKV{}
 
@@ -215,6 +216,10 @@ func (ck * CubeKV) Delete(key TupleKey) error {
 	return ck.Cube.Delete(key)
 }
 
+func (ck *CubeKV) DeleteWithPrefix(prefix TupleKey) error {
+	return errorUnsupportedInCubeKV
+}
+
 // Get gets the value of the key.
 // If the key does not exist, it returns the null
 func (ck * CubeKV) Get(key TupleKey) (TupleValue, error) {
diff --git a/pkg/vm/engine/tpe/tuplecodec/descriptorhandler.go b/pkg/vm/engine/tpe/tuplecodec/descriptorhandler.go
index 02c4c96c399bf57d8e061568b771e3875e59e93d..ba9a66aec7ab668def2a64c8c368c00dd76bd119 100644
--- a/pkg/vm/engine/tpe/tuplecodec/descriptorhandler.go
+++ b/pkg/vm/engine/tpe/tuplecodec/descriptorhandler.go
@@ -28,6 +28,8 @@ var (
 	errorDoNotFindTheValue = errors.New("do not find the value")
 	errorDecodeDescriptorFailed = errors.New("decode the descriptor failed")
 	errorDescriptorSavedIsNotTheWanted = errors.New("the descriptor saved is not the wanted one")
+	errorDescInAsyncGCIsNotBytes = errors.New("desc in asyncGC is not bytes")
+	errorIDInAsyncGCIsNotUint64 = errors.New("id in asyncGC is not uint64")
 )
 /*
 Internal descriptor table for schema management.
@@ -105,7 +107,7 @@ func (dhi *DescriptorHandlerImpl) encodeDatabaseDescIntoValue(parentID uint64,
 	return dhi.encodeFieldsIntoValue(parentID, uint64(desc.ID),desc.Name,descBytes)
 }
 
-// decodeValue decodes the data from (parentID,ID,Name,Bytes)
+// decodeValue decodes the data into (parentID,ID,Name,Bytes)
 func (dhi *DescriptorHandlerImpl) decodeValue(data []byte) ([]*orderedcodec.DecodedItem,error) {
 	attrCnt := len(internalDescriptorTableDesc.Attributes)
 	dis := make([]*orderedcodec.DecodedItem,0,attrCnt)
@@ -207,10 +209,10 @@ func (dhi *DescriptorHandlerImpl) GetValuesWithPrefix(parentID uint64, callbackC
 //callbackForGetTableDescByName extracts the tabledesc by name
 func (dhi *DescriptorHandlerImpl) callbackForGetTableDescByName(callbackCtx interface{},dis []*orderedcodec.DecodedItem)([]byte,error) {
 	//get the name and the desc
-	nameAttr := internalDescriptorTableDesc.Attributes[InternalDescriptorTableID_name_ID]
-	descAttr := internalDescriptorTableDesc.Attributes[InternalDescriptorTableID_desc_ID]
-	nameDI := dis[InternalDescriptorTableID_name_ID]
-	descDI := dis[InternalDescriptorTableID_desc_ID]
+	nameAttr := internalDescriptorTableDesc.Attributes[InternalDescriptorTable_name_ID]
+	descAttr := internalDescriptorTableDesc.Attributes[InternalDescriptorTable_desc_ID]
+	nameDI := dis[InternalDescriptorTable_name_ID]
+	descDI := dis[InternalDescriptorTable_desc_ID]
 	if !(nameDI.IsValueType(nameAttr.Ttype) ||
 		descDI.IsValueType(descAttr.Ttype)) {
 		return nil,errorTypeInValueNotEqualToTypeInAttribute
@@ -273,8 +275,8 @@ func (dhi *DescriptorHandlerImpl) LoadRelationDescByID(parentID uint64, tableID
 	if err != nil {
 		return nil, err
 	}
-	descAttr := internalDescriptorTableDesc.Attributes[InternalDescriptorTableID_desc_ID]
-	descDI := dis[InternalDescriptorTableID_desc_ID]
+	descAttr := internalDescriptorTableDesc.Attributes[InternalDescriptorTable_desc_ID]
+	descDI := dis[InternalDescriptorTable_desc_ID]
 	if !descDI.IsValueType(descAttr.Ttype) {
 		return nil,errorTypeInValueNotEqualToTypeInAttribute
 	}
@@ -403,8 +405,8 @@ func (dhi *DescriptorHandlerImpl) LoadDatabaseDescByID(dbID uint64) (*descriptor
 	if err != nil {
 		return nil, err
 	}
-	descAttr := internalDescriptorTableDesc.Attributes[InternalDescriptorTableID_desc_ID]
-	descDI := dis[InternalDescriptorTableID_desc_ID]
+	descAttr := internalDescriptorTableDesc.Attributes[InternalDescriptorTable_desc_ID]
+	descDI := dis[InternalDescriptorTable_desc_ID]
 	if !descDI.IsValueType(descAttr.Ttype) {
 		return nil,errorTypeInValueNotEqualToTypeInAttribute
 	}
@@ -519,14 +521,37 @@ func (dhi *DescriptorHandlerImpl) encodeAsyncgcValue(epoch uint64, dbID uint64,
 	return out,nil
 }
 
+// MakePrefixWithEpochAndDBIDAndTableID makes the prefix(tenantID,dbID,tableID,indexID,delEpoch,delDbID,delTableID)
+func (dhi *DescriptorHandlerImpl) MakePrefixWithEpochAndDBIDAndTableID(
+		dbID uint64, tableID uint64, indexID uint64,
+		gcEpoch uint64, gcDbID uint64,gcTableID uint64)(TupleKey, *orderedcodec.EncodedItem){
+	tke := dhi.codecHandler.GetEncoder()
+
+	//make prefix
+	var prefix TupleKey
+	// tenantID,dbID,tableID,indexID
+	prefix,_ = tke.EncodeIndexPrefix(prefix,
+		dbID,
+		tableID,
+		indexID)
+
+	// append gcEpoch
+	prefix,_ = tke.oe.EncodeUint64(prefix,gcEpoch)
+	// append gcDbId
+	prefix,_ = tke.oe.EncodeUint64(prefix,gcDbID)
+	// append gcTableId
+	prefix,_ = tke.oe.EncodeUint64(prefix,gcTableID)
+	return prefix,nil
+}
+
 func (dhi *DescriptorHandlerImpl) StoreRelationDescIntoAsyncGC(epoch uint64, dbID uint64, desc *descriptor.RelationDesc) error {
-	//save thing into the internal async gc (epoch(pk),dbid,tableid,desc)
+	//save thing into the internal async gc (epoch,dbid,tableid,desc), pk(epoch,dbid,tableid)
 	//prefix(tenantID,dbID,tableID,indexID,epoch)
 	var key TupleKey
-	key,_ = dhi.MakePrefixWithOneExtraID(InternalDatabaseID,
+	key,_ = dhi.MakePrefixWithEpochAndDBIDAndTableID(InternalDatabaseID,
 		InternalAsyncGCTableID,
 		uint64(PrimaryIndexID),
-		epoch)
+		epoch,dbID,uint64(desc.ID))
 
 	value, err := dhi.encodeAsyncgcValue(epoch,dbID, uint64(desc.ID),desc)
 	if err != nil {
@@ -541,9 +566,88 @@ func (dhi *DescriptorHandlerImpl) StoreRelationDescIntoAsyncGC(epoch uint64, dbI
 	return nil
 }
 
-func (dhi *DescriptorHandlerImpl) ListRelationDescFromAsyncGC(epoch uint64) ([]*descriptor.RelationDesc, error) {
-	//TODO:
-	return nil, nil
+// decodeValueIntoEpochGCItem decodes bytes into the (epoch,dbID,tableID,desc)
+func (dhi *DescriptorHandlerImpl) decodeValueIntoEpochGCItem(data []byte) ([]*orderedcodec.DecodedItem,error) {
+	attrCnt := len(internalAsyncGCTableDesc.Attributes)
+	dis := make([]*orderedcodec.DecodedItem,0,attrCnt)
+	for j := 0; j < attrCnt; j++ {
+		rest, di, err := dhi.serializer.DeserializeValue(data)
+		if err != nil {
+			return nil, err
+		}
+		dis = append(dis,di)
+		data = rest
+	}
+	return dis,nil
+}
+
+func (dhi *DescriptorHandlerImpl) ListRelationDescFromAsyncGC(epoch uint64) ([]descriptor.EpochGCItem, error) {
+	var startPrefix TupleKey
+	tke := dhi.codecHandler.GetEncoder()
+	// tenantID,dbID,tableID,indexID
+	startPrefix,_ = tke.EncodeIndexPrefix(nil,
+		InternalDatabaseID,
+		InternalAsyncGCTableID,
+		uint64(PrimaryIndexID))
+
+	var endPrefix TupleKey
+	var nextEpoch uint64 = epoch
+	if epoch < math.MaxUint64 {
+		nextEpoch++
+	}
+	// tenantID,dbID,tableID,indexID,epoch
+	endPrefix,_ = dhi.MakePrefixWithOneExtraID(InternalDatabaseID,
+		InternalAsyncGCTableID,
+		uint64(PrimaryIndexID),
+		nextEpoch)
+
+	//get all epochgc (epoch,dbID,tableID,desc)
+	gcItems, err := dhi.kvHandler.GetRange(startPrefix,endPrefix)
+	if err != nil {
+		return nil, err
+	}
+
+	var retItems []descriptor.EpochGCItem
+
+	for _, item := range gcItems {
+		dis, err := dhi.decodeValueIntoEpochGCItem(item)
+		if err != nil {
+			return nil, err
+		}
+
+		if descBytes,ok := dis[InternalAsyncGCTable_desc_ID].Value.([]byte); ok {
+			desc, err := UnmarshalRelationDesc(descBytes)
+			if err != nil {
+				return nil, err
+			}
+			//check maximal epoch
+			if epoch >= desc.Max_access_epoch {
+				var delDbID, delTableID,delEpoch uint64
+				var ok2,ok3,ok4 bool
+				if delDbID,ok2 = dis[InternalAsyncGCTable_dbID_ID].Value.(uint64) ; !ok2 {
+					return nil, errorIDInAsyncGCIsNotUint64
+				}
+
+				if delTableID,ok3 = dis[InternalAsyncGCTable_tableID_ID].Value.(uint64) ; !ok3 {
+					return nil, errorIDInAsyncGCIsNotUint64
+				}
+
+				if delEpoch,ok4 = dis[InternalAsyncGCTable_epoch_ID].Value.(uint64); !ok4{
+					return nil, errorIDInAsyncGCIsNotUint64
+				}
+
+				retItems = append(retItems,descriptor.EpochGCItem{
+					Epoch:   delEpoch,
+					DbID:    delDbID,
+					TableID: delTableID,
+				})
+
+			}
+		}else{
+			return nil, errorDescInAsyncGCIsNotBytes
+		}
+	}
+	return retItems, nil
 }
 
 
diff --git a/pkg/vm/engine/tpe/tuplecodec/descriptorhandler_test.go b/pkg/vm/engine/tpe/tuplecodec/descriptorhandler_test.go
index 4edf250fe26c091c095c3b29c18d5bc9a0fb6d8c..30d6607d8068907f689a82931dd9881e02befc4d 100644
--- a/pkg/vm/engine/tpe/tuplecodec/descriptorhandler_test.go
+++ b/pkg/vm/engine/tpe/tuplecodec/descriptorhandler_test.go
@@ -881,4 +881,52 @@ func TestDescriptorHandlerImpl_StoreDatabaseDescByID(t *testing.T) {
 			convey.So(err,convey.ShouldBeError)
 		}
 	})
+}
+
+func TestDescriptorHandlerImpl_StoreRelationDescIntoAsyncGC(t *testing.T) {
+	convey.Convey("store relation desc into asyncgc",t, func() {
+		tch := NewTupleCodecHandler(SystemTenantID)
+		kv := NewMemoryKV()
+		serial := &DefaultValueSerializer{}
+		kvLimit := uint64(2)
+
+		dhi := NewDescriptorHandlerImpl(tch,kv,serial,kvLimit)
+
+		make_relation_desc := func(from *descriptor.RelationDesc,name string,id uint32) *descriptor.RelationDesc {
+			desc := new(descriptor.RelationDesc)
+			*desc = *from
+			desc.ID = id
+			desc.Name = name
+			return desc
+		}
+
+		var wantGCItems []descriptor.EpochGCItem
+
+		cnt := uint64(2)
+
+		for epoch := uint64(0); epoch < cnt; epoch++ {
+			offset := 100 * epoch
+			for dbId := offset + uint64(0); dbId < offset + cnt; dbId++ {
+				for tableId := uint32(0); uint64(tableId) < cnt; tableId++ {
+					wantGCItems = append(wantGCItems,descriptor.EpochGCItem{
+						Epoch:   epoch,
+						DbID:    dbId,
+						TableID: uint64(tableId),
+					})
+
+					tableName := fmt.Sprintf("table%d",tableId)
+					desc := make_relation_desc(internalDescriptorTableDesc,tableName,tableId)
+					err := dhi.StoreRelationDescIntoAsyncGC(epoch, dbId, desc)
+					convey.So(err,convey.ShouldBeNil)
+				}
+			}
+		}
+
+		gcItems, err := dhi.ListRelationDescFromAsyncGC(cnt)
+		convey.So(err,convey.ShouldBeNil)
+		convey.So(len(gcItems),convey.ShouldEqual,len(wantGCItems))
+		for i, item := range gcItems {
+			convey.So(item,convey.ShouldResemble,wantGCItems[i])
+		}
+	})
 }
\ No newline at end of file
diff --git a/pkg/vm/engine/tpe/tuplecodec/epoch.go b/pkg/vm/engine/tpe/tuplecodec/epoch.go
new file mode 100644
index 0000000000000000000000000000000000000000..71ab314df42b539a1e954e6e2b33f0984027b703
--- /dev/null
+++ b/pkg/vm/engine/tpe/tuplecodec/epoch.go
@@ -0,0 +1,95 @@
+// Copyright 2021 Matrix Origin
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package tuplecodec
+
+import (
+	"errors"
+	"github.com/matrixorigin/matrixone/pkg/vm/engine/tpe/descriptor"
+	"sync"
+)
+
+var (
+	errorItIsNotDescriptorHandlerImpl = errors.New("it is not descriptor handler impl")
+)
+
+type EpochHandler struct {
+	epoch  uint64
+	tch    *TupleCodecHandler
+	dh     descriptor.DescriptorHandler
+	kv     KVHandler
+	rwlock sync.RWMutex
+}
+
+func NewEpochHandler(tch *TupleCodecHandler,
+		dh descriptor.DescriptorHandler,
+		kv KVHandler) *EpochHandler {
+	return &EpochHandler{
+		tch: tch,
+		dh: dh,
+		kv: kv,
+	}
+}
+
+func (eh *EpochHandler) SetEpoch(e uint64) {
+	eh.rwlock.Lock()
+	defer eh.rwlock.Unlock()
+	eh.epoch = e
+}
+
+func (eh *EpochHandler) GetEpoch() uint64{
+	eh.rwlock.RLock()
+	defer eh.rwlock.RUnlock()
+	return eh.epoch
+}
+
+func (eh *EpochHandler) RemoveDeletedTable(epoch uint64) (int, error) {
+	eh.rwlock.Lock()
+	defer eh.rwlock.Unlock()
+
+	//1.load epoch items from asyncGC
+	gcItems, err := eh.dh.ListRelationDescFromAsyncGC(epoch)
+	if err != nil {
+		return 0, err
+	}
+
+	tke := eh.tch.GetEncoder()
+
+	dhi,ok := eh.dh.(*DescriptorHandlerImpl)
+	if !ok {
+		return 0, errorItIsNotDescriptorHandlerImpl
+	}
+
+	//2. drop table data on gcItems
+	for _, item := range gcItems {
+		//delete the data in the table
+		prefixDeleted, _ := tke.EncodeIndexPrefix(nil, item.DbID, item.TableID,uint64(PrimaryIndexID))
+		err = eh.kv.DeleteWithPrefix(prefixDeleted)
+		if err != nil {
+			return 0, err
+		}
+
+		//3.delete gc item in asyncGC table
+		epochItemKeyDeleted,_ := dhi.MakePrefixWithEpochAndDBIDAndTableID(
+			InternalDatabaseID,InternalAsyncGCTableID,uint64(PrimaryIndexID),
+			item.Epoch,item.DbID,item.TableID)
+
+		err = dhi.kvHandler.Delete(epochItemKeyDeleted)
+		if err != nil {
+			return 0, err
+		}
+	}
+
+	return len(gcItems), nil
+}
\ No newline at end of file
diff --git a/pkg/vm/engine/tpe/tuplecodec/epoch_test.go b/pkg/vm/engine/tpe/tuplecodec/epoch_test.go
new file mode 100644
index 0000000000000000000000000000000000000000..6a552d64a792c9c3e47096adba08a6d9262b10eb
--- /dev/null
+++ b/pkg/vm/engine/tpe/tuplecodec/epoch_test.go
@@ -0,0 +1,91 @@
+// Copyright 2021 Matrix Origin
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package tuplecodec
+
+import (
+	"fmt"
+	"github.com/matrixorigin/matrixone/pkg/vm/engine/tpe/descriptor"
+	"github.com/smartystreets/goconvey/convey"
+	"testing"
+)
+
+func TestEpochHandler_SetEpoch(t *testing.T) {
+	convey.Convey("set/get",t, func() {
+		eh := NewEpochHandler(nil,nil,nil)
+
+		for i := 0; i < 100; i++ {
+			eh.SetEpoch(uint64(i))
+			convey.So(uint64(i),convey.ShouldEqual, eh.GetEpoch())
+		}
+	})
+}
+
+func TestEpochHandler_RemoveDeletedTable(t *testing.T) {
+	convey.Convey("remove deleted table",t, func() {
+		tch := NewTupleCodecHandler(SystemTenantID)
+		kv := NewMemoryKV()
+		serial := &DefaultValueSerializer{}
+		kvLimit := uint64(2)
+
+		dhi := NewDescriptorHandlerImpl(tch,kv,serial,kvLimit)
+
+		eh := NewEpochHandler(tch,dhi,kv)
+
+		make_relation_desc := func(from *descriptor.RelationDesc,name string,id uint32) *descriptor.RelationDesc {
+			desc := new(descriptor.RelationDesc)
+			*desc = *from
+			desc.ID = id
+			desc.Name = name
+			return desc
+		}
+
+		var wantGCItems []descriptor.EpochGCItem
+
+		cnt := uint64(2)
+
+		for epoch := uint64(0); epoch < cnt; epoch++ {
+			offset := 100 * epoch
+			for dbId := UserTableIDOffset + offset + uint64(0); dbId < UserTableIDOffset + offset + cnt; dbId++ {
+				for tableId := uint32(0); uint64(tableId) < cnt; tableId++ {
+					wantGCItems = append(wantGCItems,descriptor.EpochGCItem{
+						Epoch:   epoch,
+						DbID:    dbId,
+						TableID: uint64(tableId),
+					})
+
+					tableName := fmt.Sprintf("table%d",tableId)
+					desc := make_relation_desc(internalDescriptorTableDesc,tableName,tableId)
+					err := dhi.StoreRelationDescIntoAsyncGC(epoch, dbId, desc)
+					convey.So(err,convey.ShouldBeNil)
+				}
+			}
+		}
+
+		gcItems, err := dhi.ListRelationDescFromAsyncGC(cnt)
+		convey.So(err,convey.ShouldBeNil)
+		convey.So(len(gcItems),convey.ShouldEqual,len(wantGCItems))
+		for i, item := range gcItems {
+			convey.So(item,convey.ShouldResemble,wantGCItems[i])
+		}
+
+		gcCnt, err := eh.RemoveDeletedTable(cnt / 2 - 1)
+		convey.So(err,convey.ShouldBeNil)
+		convey.So(uint64(gcCnt),convey.ShouldEqual,cnt*cnt*cnt / 2)
+
+		gcItems, err = dhi.ListRelationDescFromAsyncGC(cnt)
+		convey.So(err,convey.ShouldBeNil)
+		convey.So(len(gcItems),convey.ShouldEqual,cnt*cnt*cnt / 2)
+	})
+}
\ No newline at end of file
diff --git a/pkg/vm/engine/tpe/tuplecodec/indexhandler_test.go b/pkg/vm/engine/tpe/tuplecodec/indexhandler_test.go
index 69ffbbe59a912b6acc363e390452b5bf6572a334..801e797526a2c47bb079d16dd66a0227b9d66bd5 100644
--- a/pkg/vm/engine/tpe/tuplecodec/indexhandler_test.go
+++ b/pkg/vm/engine/tpe/tuplecodec/indexhandler_test.go
@@ -115,9 +115,9 @@ func TestIndexHandlerImpl_ReadFromIndex(t *testing.T) {
 		convey.So(err,convey.ShouldBeNil)
 
 		wantAttr := []*descriptor.AttributeDesc{
-			&internalDescriptorTableDesc.Attributes[InternalDescriptorTableID_parentID_ID],
-			&internalDescriptorTableDesc.Attributes[InternalDescriptorTableID_id_ID],
-			&internalDescriptorTableDesc.Attributes[InternalDescriptorTableID_desc_ID],
+			&internalDescriptorTableDesc.Attributes[InternalDescriptorTable_parentID_ID],
+			&internalDescriptorTableDesc.Attributes[InternalDescriptorTable_id_ID],
+			&internalDescriptorTableDesc.Attributes[InternalDescriptorTable_desc_ID],
 		}
 
 		readCtx := &ReadContext{
diff --git a/pkg/vm/engine/tpe/tuplecodec/kv_types.go b/pkg/vm/engine/tpe/tuplecodec/kv_types.go
index 67a1391253cf292244c4cba7b270f3c59f2a9b4e..85fb44ca60ac76298a19888aac9a94a3b75df75e 100644
--- a/pkg/vm/engine/tpe/tuplecodec/kv_types.go
+++ b/pkg/vm/engine/tpe/tuplecodec/kv_types.go
@@ -42,6 +42,9 @@ type KVHandler interface {
 	// Delete deletes the key
 	Delete(key TupleKey) error
 
+	// DeleteWithPrefix keys with the prefix
+	DeleteWithPrefix(prefix TupleKey) error
+
 	// Get gets the value of the key
 	Get(key TupleKey)(TupleValue, error)
 
diff --git a/pkg/vm/engine/tpe/tuplecodec/memorykv.go b/pkg/vm/engine/tpe/tuplecodec/memorykv.go
index 188d50babb3cac584512112c42f0925cde21d25f..842b77126ef94686ba8d4dc4adca5781fd3f9edc 100644
--- a/pkg/vm/engine/tpe/tuplecodec/memorykv.go
+++ b/pkg/vm/engine/tpe/tuplecodec/memorykv.go
@@ -172,6 +172,31 @@ func (m *MemoryKV) Delete(key TupleKey) error {
 	return nil
 }
 
+func (m *MemoryKV) DeleteWithPrefix(prefix TupleKey) error {
+	m.rwLock.Lock()
+	defer m.rwLock.Unlock()
+
+	var keys []TupleKey
+	iter := func(i btree.Item) bool {
+		if x,ok := i.(*MemoryItem); ok {
+			if bytes.HasPrefix(x.key,prefix) {
+				keys = append(keys,x.key)
+			}
+		}else{
+			keys = append(keys,nil)
+		}
+		return true
+	}
+
+	m.container.Ascend(iter)
+
+	for _, key := range keys {
+		m.container.Delete(NewMemoryItem(key,nil))
+	}
+
+	return nil
+}
+
 func (m *MemoryKV) Get(key TupleKey) (TupleValue, error) {
 	m.rwLock.RLock()
 	defer m.rwLock.RUnlock()
diff --git a/pkg/vm/engine/tpe/tuplecodec/memorykv_test.go b/pkg/vm/engine/tpe/tuplecodec/memorykv_test.go
index bd5d58b8b2a5cb78d5f72e538f5d1a029b34029b..815e6b784f3f6035f2bbcf8e88638a905cde280f 100644
--- a/pkg/vm/engine/tpe/tuplecodec/memorykv_test.go
+++ b/pkg/vm/engine/tpe/tuplecodec/memorykv_test.go
@@ -15,6 +15,7 @@
 package tuplecodec
 
 import (
+	"bytes"
 	"fmt"
 	"github.com/smartystreets/goconvey/convey"
 	"reflect"
@@ -391,4 +392,55 @@ func TestMemoryKV_GetWithPrefix(t *testing.T) {
 			last = SuccessorOfKey(keys[len(keys) - 1])
 		}
 	})
+}
+
+func TestMemoryKV_DeletePrefix(t *testing.T) {
+	convey.Convey("delete prefix",t, func() {
+		kv := NewMemoryKV()
+		cnt := 10
+
+		genData := func(cnt int,handler KVHandler,prefix string) ([]TupleKey,[]TupleValue) {
+			var keys []TupleKey
+			var values []TupleValue
+			for i := 0; i < cnt; i++ {
+				key := fmt.Sprintf("%s%d",prefix,i)
+				value := fmt.Sprintf("v%d",i)
+				keys = append(keys,[]byte(key))
+				values = append(values,[]byte(value))
+				err := handler.Set([]byte(key), []byte(value))
+				convey.So(err,convey.ShouldBeNil)
+			}
+
+			return keys, values
+		}
+
+		wKeys1, wValues1 := genData(cnt,kv,"abc")
+		err := kv.SetBatch(wKeys1,wValues1)
+		for _, e := range err {
+			convey.So(e,convey.ShouldBeNil)
+		}
+
+		wKeys2, wValues2 := genData(cnt,kv,"cde")
+		err = kv.SetBatch(wKeys2,wValues2)
+		for _, e := range err {
+			convey.So(e,convey.ShouldBeNil)
+		}
+
+		err2 := kv.DeleteWithPrefix([]byte("abc"))
+		convey.So(err2,convey.ShouldBeNil)
+
+		resKeys, resValues, err3 := kv.GetWithPrefix(TupleKey("cde"), len("cde"), uint64(cnt))
+		convey.So(err3,convey.ShouldBeNil)
+
+		for i, key := range resKeys {
+			for j, wKey := range wKeys2 {
+				if bytes.Equal(key,wKey) {
+					convey.So(resValues[i],
+						convey.ShouldResemble,wValues2[j])
+				}else {
+					break
+				}
+			}
+		}
+	})
 }
\ No newline at end of file