diff --git a/go.mod b/go.mod index ed3131e8f41836bb7c4120e14599fe47b17bca83..560b4ea8bbd85379cfcce23926d254e1065cf066 100644 --- a/go.mod +++ b/go.mod @@ -15,7 +15,7 @@ require ( github.com/golang/protobuf v1.5.2 github.com/google/btree v1.0.1 github.com/huandu/go-clone v1.3.0 - github.com/matrixorigin/matrixcube v0.0.0-20211124114820-5a4415521df1 + github.com/matrixorigin/matrixcube v0.0.0-20211203085052-d4ebc7b6a171 github.com/matrixorigin/simdcsv v0.0.0-20210926114300-591bf748a770 github.com/panjf2000/ants/v2 v2.4.5 github.com/pierrec/lz4 v2.6.0+incompatible diff --git a/go.sum b/go.sum index 238f529326b1dcd9d51632e62b86153fa4393dd3..befc6abec79715debe9e8f622f7d284dd5a3fb93 100644 --- a/go.sum +++ b/go.sum @@ -323,8 +323,8 @@ github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czP github.com/magiconair/properties v1.8.1/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= github.com/matrixorigin/etcd/raft/v3 v3.5.1-0.20210824022435-0203115049c2 h1:s7CQEsRxL8+/sAPW23uIqpwR+M8Aje81AH6w3/ZkxQw= github.com/matrixorigin/etcd/raft/v3 v3.5.1-0.20210824022435-0203115049c2/go.mod h1:UFOHSIvO/nKwd4lhkwabrTD3cqW5yVyYYf/KlD00Szc= -github.com/matrixorigin/matrixcube v0.0.0-20211124114820-5a4415521df1 h1:2UTxSLrAwvZTB0qoD4sjCXFc48kFdGGB6ypQcGp7Fkk= -github.com/matrixorigin/matrixcube v0.0.0-20211124114820-5a4415521df1/go.mod h1:5xrn3BfKKYJw9rDwO5E/RwWg0wPE3H8wnQyjpwIhyYk= +github.com/matrixorigin/matrixcube v0.0.0-20211203085052-d4ebc7b6a171 h1:zQhmYqZJNPucdbkqxUsxrnLqpYAfTMIQ7+A4gzXPuLg= +github.com/matrixorigin/matrixcube v0.0.0-20211203085052-d4ebc7b6a171/go.mod h1:5xrn3BfKKYJw9rDwO5E/RwWg0wPE3H8wnQyjpwIhyYk= github.com/matrixorigin/simdcsv v0.0.0-20210926114300-591bf748a770 h1:apc228jeCtUvvfkaydzJygk1jdH8y+THma2o73Yv4Vg= github.com/matrixorigin/simdcsv v0.0.0-20210926114300-591bf748a770/go.mod h1:A7O+LRuZcr/BbOLsMzM/q69ZmoLENUMpptYG0pPzTFQ= github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU= diff --git a/pkg/catalog/catalog.go b/pkg/catalog/catalog.go index dcce84a94e252f1308c35dfe0f475ee054af1dee..c700252580f1ef04a12a7cc94111f8a28e547564 100644 --- a/pkg/catalog/catalog.go +++ b/pkg/catalog/catalog.go @@ -691,7 +691,7 @@ func (c *Catalog) checkTableNotExists(dbId uint64, tableName string) (*aoe.Table //encodeTabletName encodes the groupId(the id of the shard) and tableId together to one string by calling codec.Bytes2String. func (c *Catalog) encodeTabletName(groupId, tableId uint64) string { - return strconv.Itoa(int(groupId)) + strconv.Itoa(int(tableId)) + return strconv.Itoa(int(tableId)) } //genGlobalUniqIDs generates a global unique id by calling c.Driver.AllocID. diff --git a/pkg/vm/driver/aoe/storage.go b/pkg/vm/driver/aoe/storage.go index 361a91a765460b9dfe3e8241f3266d893e23eb83..ce11e49039bb0ac739ac34751c4674643d0dafd3 100644 --- a/pkg/vm/driver/aoe/storage.go +++ b/pkg/vm/driver/aoe/storage.go @@ -18,7 +18,6 @@ import ( "bytes" "encoding/json" "fmt" - "os" "strconv" "strings" "sync/atomic" @@ -100,7 +99,7 @@ func (s *Storage) createIndex(index uint64, offset int, batchSize int, shardId u } t0 := time.Now() defer func() { - logutil.Debugf("[logIndex:%d,%d]createIndex handler cost %d ms", index, offset, time.Since(t0).Milliseconds()) + logutil.Debugf("[S-%d|logIndex:%d,%d]createIndex handler cost %d ms", shardId, index, offset, time.Since(t0).Milliseconds()) }() customReq := &pb.CreateIndexRequest{} protoc.MustUnmarshal(customReq, cmd) @@ -139,7 +138,7 @@ func (s *Storage) dropIndex(index uint64, offset int, batchsize int, shardId uin } t0 := time.Now() defer func() { - logutil.Debugf("[logIndex:%d,%d]dropIndex handler cost %d ms", index, offset, time.Since(t0).Milliseconds()) + logutil.Debugf("[S-%d|logIndex:%d,%d]dropIndex handler cost %d ms", shardId, index, offset, time.Since(t0).Milliseconds()) }() customReq := &pb.DropIndexRequest{} protoc.MustUnmarshal(customReq, cmd) @@ -174,7 +173,7 @@ func (s *Storage) Append(index uint64, offset int, batchSize int, shardId uint64 } t0 := time.Now() defer func() { - logutil.Debugf("[logIndex:%d,%d]append handler cost %d ms", index, offset, time.Since(t0).Milliseconds()) + logutil.Debugf("[S-%d|logIndex:%d,%d]append handler cost %d ms", shardId, index, offset, time.Since(t0).Milliseconds()) }() customReq := &pb.AppendRequest{} protoc.MustUnmarshal(customReq, cmd) @@ -257,7 +256,7 @@ func (s *Storage) createTable(index uint64, offset int, batchsize int, shardId u } t0 := time.Now() defer func() { - logutil.Debugf("[logIndex:%d,%d]createTable handler cost %d ms", index, offset, time.Since(t0).Milliseconds()) + logutil.Debugf("[S-%d|logIndex:%d,%d]createTable handler cost %d ms", shardId, index, offset, time.Since(t0).Milliseconds()) }() customReq := &pb.CreateTabletRequest{} protoc.MustUnmarshal(customReq, cmd) @@ -301,7 +300,7 @@ func (s *Storage) dropTable(index uint64, offset, batchsize int, shardId uint64, } t0 := time.Now() defer func() { - logutil.Debugf("[logIndex:%d,%d]dropTable handler cost %d ms", index, offset, time.Since(t0).Milliseconds()) + logutil.Debugf("[S-%d|logIndex:%d,%d]dropTable handler cost %d ms", shardId, index, offset, time.Since(t0).Milliseconds()) }() customReq := &pb.DropTabletRequest{} protoc.MustUnmarshal(customReq, cmd) @@ -353,24 +352,36 @@ func (s *Storage) tableNames() (names []string) { return } -//TODO +//SplitCheck checks before the split func (s *Storage) SplitCheck(shard meta.Shard, size uint64) (currentApproximateSize uint64, currentApproximateKeys uint64, splitKeys [][]byte, ctx []byte, err error) { - return 0, 0, nil, nil, err + prepareSplitCtx := aoedb.PrepareSplitCtx{ + DB: aoedb.IdToNameFactory.Encode(shard.ID), + Size: size, + } + return s.DB.PrepareSplitDatabase(&prepareSplitCtx) } -//TODO -func (s *Storage) CreateSnapshot(shardID uint64, path string) (uint64, uint64, error) { - if _, err := os.Stat(path); err != nil { - os.MkdirAll(path, os.FileMode(0755)) +//CreateSnapshot create a snapshot +func (s *Storage) CreateSnapshot(shardID uint64, path string) error { + ctx := aoedb.CreateSnapshotCtx{ + DB: aoedb.IdToNameFactory.Encode(shardID), + Path: path, + Sync: false, } - return 0, 0, nil + _, err := s.DB.CreateSnapshot(&ctx) + return err } -//TODO +//ApplySnapshot apply the snapshot in the storage func (s *Storage) ApplySnapshot(shardID uint64, path string) error { - return nil + ctx := aoedb.ApplySnapshotCtx{ + DB: aoedb.IdToNameFactory.Encode(shardID), + Path: path, + } + err := s.DB.ApplySnapshot(&ctx) + return err } //Close closes the storage. @@ -488,19 +499,65 @@ func (s *Storage) Read(ctx storage.ReadContext) ([]byte, error) { func (s *Storage) GetPersistentLogIndex(shardID uint64) (uint64, error) { db, _ := s.DB.Store.Catalog.SimpleGetDatabaseByName(aoedb.IdToNameFactory.Encode(shardID)) - logutil.Infof("GetPersistentLogIndex, shard id is %v, storage is %v", shardID, s) if db == nil { + logutil.Infof("GetPersistentLogIndex, shard id is %v, LogIndex is %v, storage is %v", shardID, 0, s) return 0, nil } rsp := s.DB.GetShardCheckpointId(db.GetShardId()) if rsp == 0 { rsp = 1 } + logutil.Infof("GetPersistentLogIndex, shard id is %v, LogIndex is %v, storage is %v", shardID, rsp, s) return rsp, nil } +func shardMetadataToBatch(metadata meta.ShardMetadata) (*batch.Batch, error) { + attrs := []string{sShardId, sLogIndex, sMetadata} + bat := batch.New(true, attrs) + vShardID := vector.New(types.Type{Oid: types.T_uint64, Size: 8}) + vShardID.Ref = 1 + vShardID.Col = []uint64{metadata.ShardID} + bat.Vecs[0] = vShardID + vLogIndex := vector.New(types.Type{Oid: types.T_uint64, Size: 8}) + vLogIndex.Ref = 1 + vLogIndex.Col = []uint64{metadata.LogIndex} + bat.Vecs[1] = vLogIndex + vMetadata := vector.New(types.Type{Oid: types.T_varchar, Size: int32(len(protoc.MustMarshal(&metadata.Metadata)))}) + vMetadata.Ref = 1 + vMetadata.Col = &types.Bytes{ + Data: protoc.MustMarshal(&metadata.Metadata), + Offsets: []uint32{0}, + Lengths: []uint32{uint32(len(protoc.MustMarshal(&metadata.Metadata)))}, + } + bat.Vecs[2] = vMetadata + return bat, nil +} + +func createMetadataTableInfo(shardId uint64) *aoe.TableInfo{ + tableName := sPrefix + strconv.Itoa(int(shardId)) + metaTblInfo := aoe.TableInfo{ + Name: tableName, + Indices: make([]aoe.IndexInfo, 0), + } + ShardId := aoe.ColumnInfo{ + Name: sShardId, + } + ShardId.Type = types.Type{Oid: types.T_uint64, Size: 8} + metaTblInfo.Columns = append(metaTblInfo.Columns, ShardId) + LogIndex := aoe.ColumnInfo{ + Name: sLogIndex, + } + LogIndex.Type = types.Type{Oid: types.T_uint64, Size: 8} + metaTblInfo.Columns = append(metaTblInfo.Columns, LogIndex) + colInfo := aoe.ColumnInfo{ + Name: sMetadata, + } + colInfo.Type = types.Type{Oid: types.T(types.T_varchar)} + metaTblInfo.Columns = append(metaTblInfo.Columns, colInfo) + return &metaTblInfo +} + func (s *Storage) SaveShardMetadata(metadatas []meta.ShardMetadata) error { - // var index uint64 for _, metadata := range metadatas { tableName := sPrefix + strconv.Itoa(int(metadata.ShardID)) db, err := s.DB.Store.Catalog.SimpleGetDatabaseByName(aoedb.IdToNameFactory.Encode(metadata.ShardID)) @@ -528,33 +585,14 @@ func (s *Storage) SaveShardMetadata(metadatas []meta.ShardMetadata) error { } createTable := false if tbl == nil { - mateTblInfo := aoe.TableInfo{ - Name: tableName, - Indices: make([]aoe.IndexInfo, 0), - } - ShardId := aoe.ColumnInfo{ - Name: sShardId, - } - ShardId.Type = types.Type{Oid: types.T_uint64, Size: 8} - mateTblInfo.Columns = append(mateTblInfo.Columns, ShardId) - LogIndex := aoe.ColumnInfo{ - Name: sLogIndex, - } - LogIndex.Type = types.Type{Oid: types.T_uint64, Size: 8} - mateTblInfo.Columns = append(mateTblInfo.Columns, LogIndex) - colInfo := aoe.ColumnInfo{ - Name: sMetadata, - } - colInfo.Type = types.Type{Oid: types.T(types.T_varchar)} - mateTblInfo.Columns = append(mateTblInfo.Columns, colInfo) + metaTblInfo:=createMetadataTableInfo(metadata.ShardID) offset := 0 size := 2 if createDatabase { - // index = metadata.LogIndex + 1 offset = 1 size = 3 } - schema, indexSchema := adaptor.TableInfoToSchema(s.DB.Store.Catalog, &mateTblInfo) + schema, indexSchema := adaptor.TableInfoToSchema(s.DB.Store.Catalog, metaTblInfo) ctx := aoedb.CreateTableCtx{ DBMutationCtx: aoedb.DBMutationCtx{ Id: metadata.LogIndex, @@ -572,34 +610,14 @@ func (s *Storage) SaveShardMetadata(metadatas []meta.ShardMetadata) error { createTable = true } - attrs := []string{sShardId, sLogIndex, sMetadata} - bat := batch.New(true, attrs) - vShardID := vector.New(types.Type{Oid: types.T_uint64, Size: 8}) - vShardID.Ref = 1 - vShardID.Col = []uint64{metadata.ShardID} - bat.Vecs[0] = vShardID - vLogIndex := vector.New(types.Type{Oid: types.T_uint64, Size: 8}) - vLogIndex.Ref = 1 - vLogIndex.Col = []uint64{metadata.LogIndex} - bat.Vecs[1] = vLogIndex - vMetadata := vector.New(types.Type{Oid: types.T_varchar, Size: int32(len(protoc.MustMarshal(&metadata.Metadata)))}) - vMetadata.Ref = 1 - vMetadata.Col = &types.Bytes{ - Data: protoc.MustMarshal(&metadata.Metadata), - Offsets: []uint32{0}, - Lengths: []uint32{uint32(len(protoc.MustMarshal(&metadata.Metadata)))}, - } - bat.Vecs[2] = vMetadata - + bat, _ := shardMetadataToBatch(metadata) offset := 0 size := 1 if createTable { - // index = metadata.LogIndex + 1 offset = 1 size = 2 } if createDatabase { - // index = metadata.LogIndex + 2 offset = 2 size = 3 } @@ -617,7 +635,7 @@ func (s *Storage) SaveShardMetadata(metadatas []meta.ShardMetadata) error { } err = s.DB.Append(&ctx) if err != nil { - logutil.Errorf("SaveShardMetadata is failed: %v\n", err.Error()) + logutil.Errorf("SaveShardMetadata is failed: %v", err.Error()) return err } } @@ -626,9 +644,13 @@ func (s *Storage) SaveShardMetadata(metadatas []meta.ShardMetadata) error { func (s *Storage) RemoveShard(shard meta.Shard, removeData bool) error { var err error - if removeData { + t0:=time.Now() + defer func() { + logutil.Debugf("[S-%d|logIndex:%d,%d]createIndex handler cost %d ms", shard.ID, ^uint64(0), 0, time.Since(t0).Milliseconds()) + }() + if removeData { ctx := aoedb.DropDBCtx{ - Id: shard.ID, + Id: ^uint64(0), Offset: 0, Size: 1, DB: aoedb.IdToNameFactory.Encode(shard.ID), @@ -639,5 +661,67 @@ func (s *Storage) RemoveShard(shard meta.Shard, removeData bool) error { } func (s *Storage) Split(old meta.ShardMetadata, news []meta.ShardMetadata, ctx []byte) error { - return nil + newNames := make([]string, len(news)) + for _, shard := range news { + name := aoedb.IdToNameFactory.Encode(shard.ShardID) + newNames = append(newNames, name) + } + renameTable := func(oldName, dbName string) string { + return oldName + } + dropTableCtx := aoedb.DropTableCtx{ + DBMutationCtx: aoedb.DBMutationCtx{ + Id: old.LogIndex, + Offset: 0, + Size: 2, + DB: aoedb.IdToNameFactory.Encode(old.ShardID), + }, + Table: sPrefix + strconv.Itoa(int(old.ShardID)), + } + _, err := s.DB.DropTable(&dropTableCtx) + if err != nil { + logutil.Errorf("Split:S-%d dropTable fail.",old.ShardID) + return err + } + execSplitCtx := aoedb.ExecSplitCtx{ + DBMutationCtx: aoedb.DBMutationCtx{ + Id: old.LogIndex, + Offset: 1, + Size: 2, + DB: aoedb.IdToNameFactory.Encode(old.ShardID), + }, + NewNames: newNames, + RenameTable: renameTable, + SplitCtx: ctx, + } + err = s.DB.ExecSplitDatabase(&execSplitCtx) + if err != nil { + logutil.Errorf("Split:S-%d ExecSplitDatabase fail.",old.ShardID) + return err + } + for _, shard := range news { + tableName := sPrefix + strconv.Itoa(int(shard.ShardID)) + bat,_:=shardMetadataToBatch(shard) + + offset := 0 + size := 1 + ctx := aoedb.AppendCtx{ + TableMutationCtx: aoedb.TableMutationCtx{ + DBMutationCtx: aoedb.DBMutationCtx{ + Id: shard.LogIndex, + Offset: offset, + Size: size, + DB: aoedb.IdToNameFactory.Encode(shard.ShardID), + }, + Table: tableName, + }, + Data: bat, + } + err = s.DB.Append(&ctx) + if err != nil { + logutil.Errorf("Split:S-%d append fail.",shard.ShardID) + return err + } + } + return err } diff --git a/pkg/vm/driver/aoe/storage_test.go b/pkg/vm/driver/aoe/storage_test.go new file mode 100644 index 0000000000000000000000000000000000000000..ad5b2f0677db13eb29a9d43863e521e227649ed2 --- /dev/null +++ b/pkg/vm/driver/aoe/storage_test.go @@ -0,0 +1,153 @@ +// Copyright 2021 Matrix Origin +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package aoe + +import ( + "bytes" + "os" + "testing" + + "github.com/fagongzi/util/protoc" + "github.com/matrixorigin/matrixcube/pb/meta" + "github.com/matrixorigin/matrixone/pkg/container/batch" + "github.com/matrixorigin/matrixone/pkg/container/types" + "github.com/matrixorigin/matrixone/pkg/sql/protocol" + "github.com/matrixorigin/matrixone/pkg/vm/driver/pb" + "github.com/matrixorigin/matrixone/pkg/vm/engine/aoe" + "github.com/matrixorigin/matrixone/pkg/vm/engine/aoe/common/helper" + "github.com/matrixorigin/matrixone/pkg/vm/engine/aoe/storage/container/vector" + "github.com/stretchr/testify/require" +) + +const ( + ShardMetadataCount = 5 + Rows = uint64(40000) +) + +var ( + testMetadatas []meta.ShardMetadata + testSplitMetadatas []meta.ShardMetadata + testPath = "./test" + dbPath1 = "./test/1" + dbPath2 = "./test/2" + createTableCmd []byte + createTableKey []byte + testTableName = "testTable" + testTableInfo []byte + appendCmd []byte +) + +func init() { + for i := 0; i < ShardMetadataCount; i++ { + testMetadata := meta.ShardMetadata{ + ShardID: uint64(i), + LogIndex: 1, + Metadata: meta.ShardLocalState{}} + testMetadatas = append(testMetadatas, testMetadata) + } + + for i := 0; i < 2; i++ { + testSplitMetadata := meta.ShardMetadata{ + ShardID: uint64(i + ShardMetadataCount), + LogIndex: 1, + Metadata: meta.ShardLocalState{}} + testSplitMetadatas = append(testMetadatas, testSplitMetadata) + } + + columns := aoe.ColumnInfo{ + Name: "col", + Type: types.Type{Oid: types.T(types.T_varchar), Size: 24}, + } + testTableInfo, _ = helper.EncodeTable(aoe.TableInfo{ + Columns: []aoe.ColumnInfo{columns}, + }) + + createTableCmd = protoc.MustMarshal(&pb.CreateTabletRequest{ + Name: testTableName, + TableInfo: testTableInfo, + }) + + attrs := []string{"col"} + bat := batch.New(true, attrs) + var err error + vec := vector.MockVector(types.Type{Oid: types.T(types.T_varchar), Size: 24}, Rows) + bat.Vecs[0], err = vec.CopyToVector() + if err != nil { + panic(err) + } + vec.Close() + var buf bytes.Buffer + protocol.EncodeBatch(bat, &buf) + testBatch := buf.Bytes() + appendCmd = protoc.MustMarshal(&pb.AppendRequest{ + TabletName: testTableName, + Data: testBatch, + }) +} + +func TestStorage(t *testing.T) { + os.RemoveAll(testPath) + db1, err := NewStorage(dbPath1) + require.Nil(t, err) + db2, err := NewStorage(dbPath2) + require.Nil(t, err) + err = db1.SaveShardMetadata(testMetadatas) + require.Nil(t, err) + err = db2.SaveShardMetadata(testMetadatas) + require.Nil(t, err) + + initShardMetadata, err := db1.GetInitialStates() + require.Equal(t, ShardMetadataCount, len(initShardMetadata)) + require.Nil(t, err) + + tbls := db1.tableNames() + require.Equal(t, 5, len(tbls)) + + tbls = db2.tableNames() + require.Equal(t, 5, len(tbls)) + + db1.createTable(uint64(2), 0, 1, uint64(0), createTableCmd, createTableKey) + db1.Append(uint64(3), 0, 1, uint64(0), appendCmd, createTableKey) + db1.Sync([]uint64{uint64(0)}) + + tbls = db1.tableNames() + require.Equal(t, 6, len(tbls)) + + //test snapshot + err = db1.CreateSnapshot(uint64(0), testPath) + require.Nil(t, err) + err = db2.ApplySnapshot(uint64(0), testPath) + require.Nil(t, err) + + // tbls = db2.tableNames() + // switch index { + // case 0: + // require.Equal(t, 4, len(tbls)) + // case 1: + // require.Equal(t, 5, len(tbls)) + // case 2: + // require.Equal(t, 6, len(tbls)) + // default: + // require.Equal(t, 6, len(tbls)) + // } + + //test split + // currentApproximateSize, currentApproximateKeys, splitKeys, ctx, err := db1.SplitCheck(testMetadatas[0].Metadata.Shard, uint64(1)) + // require.Nil(t, err) + // fmt.Printf("%v,%v,%v,%v", currentApproximateSize, currentApproximateKeys, splitKeys, ctx) + // testMetadatas[0].LogIndex = 4 + // err = db1.Split(testMetadatas[0], testSplitMetadatas, ctx) + // require.Nil(t, err) +} diff --git a/pkg/vm/driver/aoe_shard_aware.go b/pkg/vm/driver/aoe_shard_aware.go index 186fb7209c1a345dc888a1c0b2e5de061b63d421..773a711a8f01bd0ac4944033cbfe9ff0b323d5a5 100644 --- a/pkg/vm/driver/aoe_shard_aware.go +++ b/pkg/vm/driver/aoe_shard_aware.go @@ -26,6 +26,10 @@ func (h *driver) Created(shard meta.Shard) { } +func (h *driver) Updated(meta.Shard){ + +} + func (h *driver) Splited(shard meta.Shard) { } diff --git a/pkg/vm/driver/cube_driver.go b/pkg/vm/driver/cube_driver.go index 4ace84d24a9e6e013eefb6c661977b3b7babcdfa..3adf61426d59b7147174cfcffc660bb5de9e2b85 100644 --- a/pkg/vm/driver/cube_driver.go +++ b/pkg/vm/driver/cube_driver.go @@ -222,23 +222,6 @@ func NewCubeDriverWithFactory( return h } - c.CubeConfig.Customize.CustomAdjustCompactFuncFactory = func(group uint64) func(shard meta.Shard, compactIndex uint64) (newCompactIdx uint64, err error) { - return func(shard meta.Shard, compactIndex uint64) (newCompactIdx uint64, err error) { - defer func() { - logutil.Debugf("CompactIndex of [%d]shard-%d is adjusted from %d to %d", group, shard.ID, compactIndex, newCompactIdx) - }() - if group != uint64(pb.AOEGroup) { - newCompactIdx = compactIndex - } else { - newCompactIdx = h.aoeDB.GetShardCheckpointId(shard.ID) - if newCompactIdx == 0 { - newCompactIdx = compactIndex - } - } - return newCompactIdx, nil - } - } - store, err := raftStoreFactory(&c.CubeConfig) if err != nil { return nil, err