diff --git a/cmd/db-server/main.go b/cmd/db-server/main.go index 454f8977eab0681bb3fbde79cceaaf45dac0a595..2b38eb66caa71bb79a17cf4626d65d90eef44a3b 100644 --- a/cmd/db-server/main.go +++ b/cmd/db-server/main.go @@ -215,6 +215,8 @@ func main() { os.Exit(DecodeAoeConfigExit) } + catalogListener := catalog.NewCatalogListener() + opt.EventListener = catalogListener aoeDataStorage, err = aoeDriver.NewStorageWithOptions(targetDir+"/aoe", &opt) if err != nil { logutil.Infof("Create aoe driver error, %v\n", err) @@ -251,6 +253,7 @@ func main() { } c = catalog.NewCatalog(a) config.ClusterCatalog = c + catalogListener.UpdateCatalog(c) eng := aoeEngine.New(c) pci.SetRemoveEpoch(removeEpoch) diff --git a/pkg/catalog/catalog.go b/pkg/catalog/catalog.go index c700252580f1ef04a12a7cc94111f8a28e547564..6417f41db1e346b92b48bab8c8f343ef00e82a05 100644 --- a/pkg/catalog/catalog.go +++ b/pkg/catalog/catalog.go @@ -30,8 +30,8 @@ import ( "github.com/matrixorigin/matrixone/pkg/vm/driver" "github.com/matrixorigin/matrixone/pkg/vm/driver/pb" "github.com/matrixorigin/matrixone/pkg/vm/engine/aoe" - "github.com/matrixorigin/matrixone/pkg/vm/engine/aoe/common/codec" - "github.com/matrixorigin/matrixone/pkg/vm/engine/aoe/common/helper" + "github.com/matrixorigin/matrixone/pkg/vm/engine/aoe/storage/aoedb/v1" + "github.com/matrixorigin/matrixone/pkg/vm/engine/aoe/storage/event" ) const ( @@ -42,6 +42,8 @@ const ( cTablePrefix = "Table" cTableIDPrefix = "TID" cRoutePrefix = "Route" + cPreSplitPrefix = "PreSplit" + cSplitPrefix = "Split" cDeletedTablePrefix = "DeletedTableQueue" timeout = 2000 * time.Millisecond idPoolSize = 20 @@ -56,19 +58,131 @@ type Catalog struct { tidEnd uint64 pLock int32 } +type CatalogListener struct { + event.NoopListener + catalog *Catalog +} + +type SplitEvent struct { + Old uint64 + News map[uint64][]uint64 +} + +func NewCatalogListener() *CatalogListener { + return &CatalogListener{} +} +func (l *CatalogListener) UpdateCatalog(catalog *Catalog) { + l.catalog = catalog +} + +//OnPreSplit set presplit key. +func (l *CatalogListener) OnPreSplit(event *event.SplitEvent) error { + catalogSplitEvent, err := l.catalog.decodeSplitEvent(event) + if err != nil { + panic(err) + } + key := l.catalog.preSplitKey(catalogSplitEvent.Old) + value, err := json.Marshal(catalogSplitEvent) + if err != nil { + panic(err) + } + err = l.catalog.Driver.Set(key, value) + if err != nil { + panic(err) + } + return nil +} + +//OnPostSplit update route info and delete presplit key. +func (l *CatalogListener) OnPostSplit(res error, event *event.SplitEvent) error { + catalogSplitEvent, err := l.catalog.decodeSplitEvent(event) + if err != nil { + panic(err) + } + postKey := l.catalog.splitKey(catalogSplitEvent.Old) + value, err := json.Marshal(catalogSplitEvent) + if err != nil { + panic(err) + } + err = l.catalog.Driver.Set(postKey, value) + if err != nil { + panic(err) + } + go l.catalog.OnDatabaseSplitted() + preKey := l.catalog.preSplitKey(catalogSplitEvent.Old) + err = l.catalog.Driver.Delete(preKey) + if err != nil { + panic(err) + } + return nil +} +func (c *Catalog) updateRouteInfo(tid, oldSid, newSid uint64) error { + routePrefix := c.routePrefix(tid) + shardIds, err := c.Driver.PrefixKeys(routePrefix, 0) + if err != nil { + logutil.Errorf("PrefixKeys fails, err:%v", err) + return err + } + for _, sidByte := range shardIds { + sid, err := Bytes2Uint64(sidByte[len(c.routePrefix(tid)):]) + if err != nil { + logutil.Errorf("Bytes2Uint64 fails, err:%v", err) + return err + } + if sid == oldSid { + oldKey := c.routeKey(tid, sid) + newKey := c.routeKey(tid, newSid) + err = c.Driver.Set(newKey, []byte(strconv.Itoa(int(tid)))) + if err != nil { + logutil.Errorf("Set fails, err:%v", err) + return err + } + err = c.Driver.Delete(oldKey) + if err != nil { + logutil.Errorf("Delete fails, err:%v", err) + return err + } + } + } + return nil +} +func (c *Catalog) OnDatabaseSplitted() error { + splitEvents, err := c.Driver.PrefixScan(c.splitPrefix(), 0) + if err != nil { + return err + } + for i := 1; i < len(splitEvents); i += 2 { + splitEvent := SplitEvent{} + splitEventByte := splitEvents[i] + err = json.Unmarshal(splitEventByte, &splitEvent) + if err != nil { + logutil.Errorf("Unmarshal fails, err:%v", err) + return err + } + for newShard, tbls := range splitEvent.News { + for _, tid := range tbls { + err := c.updateRouteInfo(tid, splitEvent.Old, newShard) + if err != nil { + return err + } + } + } + } + return nil +} // NewCatalog creates a Catalog. func NewCatalog(store driver.CubeDriver) *Catalog { catalog := Catalog{ Driver: store, } - tmpId, err := store.AllocID(codec.String2Bytes(cDBIDPrefix), idPoolSize) + tmpId, err := store.AllocID(String2Bytes(cDBIDPrefix), idPoolSize) if err != nil { panic(fmt.Sprintf("init db id pool failed. %v", err)) } catalog.dbIdEnd = tmpId catalog.dbIdStart = tmpId - idPoolSize + 1 - tmpId, err = store.AllocID(codec.String2Bytes(cTableIDPrefix), idPoolSize) + tmpId, err = store.AllocID(String2Bytes(cTableIDPrefix), idPoolSize) if err != nil { panic(fmt.Sprintf("init table id pool failed. %v", err)) } @@ -92,7 +206,7 @@ func (c *Catalog) CreateDatabase(epoch uint64, dbName string, typ int) (dbid uin if err != nil { return 0, err } - if err = c.Driver.SetIfNotExist(c.dbIDKey(dbName), codec.Uint642Bytes(dbid)); err != nil { + if err = c.Driver.SetIfNotExist(c.dbIDKey(dbName), Uint642Bytes(dbid)); err != nil { return 0, err } info := aoe.SchemaInfo{ @@ -220,7 +334,7 @@ func (c *Catalog) SetPrimaryKey(epoch, dbId uint64, tableName, columnName string return ErrColumnNotExist } func (c *Catalog) updateTableInfo(dbId uint64, tbl *aoe.TableInfo) (err error) { - meta, err := helper.EncodeTable(*tbl) + meta, err := EncodeTable(*tbl) if err != nil { return err } @@ -257,7 +371,7 @@ func (c *Catalog) CreateTable(epoch, dbId uint64, tbl aoe.TableInfo) (tid uint64 return tid, err } tbl.Id = tid - if err = c.Driver.SetIfNotExist(c.tableIDKey(dbId, tbl.Name), codec.Uint642Bytes(tbl.Id)); err != nil { + if err = c.Driver.SetIfNotExist(c.tableIDKey(dbId, tbl.Name), Uint642Bytes(tbl.Id)); err != nil { return tid, ErrTableCreateExists } @@ -265,7 +379,7 @@ func (c *Catalog) CreateTable(epoch, dbId uint64, tbl aoe.TableInfo) (tid uint64 tbl.Epoch = epoch tbl.SchemaId = dbId if shardId, err := c.getAvailableShard(tbl.Id); err == nil { - rkey := c.routeKey(dbId, tbl.Id, shardId) + rkey := c.routeKey(tbl.Id, shardId) tableName := tbl.Name aoeTableName := c.encodeTabletName(shardId, tbl.Id) if err := c.Driver.CreateTablet(aoeTableName, shardId, &tbl); err != nil { @@ -274,7 +388,7 @@ func (c *Catalog) CreateTable(epoch, dbId uint64, tbl aoe.TableInfo) (tid uint64 } tbl.Name = tableName tbl.State = aoe.StatePublic - meta, err := helper.EncodeTable(tbl) + meta, err := EncodeTable(tbl) if err != nil { logutil.Errorf("ErrTableCreateFailed, %v", err) return tid, err @@ -320,7 +434,7 @@ func (c *Catalog) DropTable(epoch, dbId uint64, tableName string) (tid uint64, e tid = tb.Id tb.State = aoe.StateDeleteOnly tb.Epoch = epoch - value, _ := helper.EncodeTable(*tb) + value, _ := EncodeTable(*tb) if err = c.Driver.Set(c.deletedTableKey(epoch, dbId, tb.Id), value); err != nil { return tid, err } @@ -338,7 +452,7 @@ func (c *Catalog) ListTablesByName(dbName string) ([]aoe.TableInfo, error) { if value, err := c.Driver.Get(c.dbIDKey(dbName)); err != nil || value == nil { return nil, ErrDBNotExists } else { - id, _ := codec.Bytes2Uint64(value) + id, _ := Bytes2Uint64(value) return c.ListTables(id) } } @@ -386,12 +500,12 @@ func (c *Catalog) CreateIndex(epoch uint64, idxInfo aoe.IndexInfo) error { if idxInfo.Type == aoe.Bsi { idxInfo.Type = aoe.NumBsi } - shardIds, err := c.Driver.PrefixKeys(c.routePrefix(tbl.SchemaId, tbl.Id), 0) + shardIds, err := c.Driver.PrefixKeys(c.routePrefix(tbl.Id), 0) if err != nil { return err } for _, shardId := range shardIds { - sid, err := codec.Bytes2Uint64(shardId[len(c.routePrefix(tbl.SchemaId, tbl.Id)):]) + sid, err := Bytes2Uint64(shardId[len(c.routePrefix(tbl.Id)):]) if err != nil { logutil.Errorf("convert shardid failed, %v", err) break @@ -426,12 +540,12 @@ func (c *Catalog) DropIndex(epoch, tid, dbid uint64, idxName string) error { if err != nil { return err } - shardIds, err := c.Driver.PrefixKeys(c.routePrefix(tbl.SchemaId, tbl.Id), 0) + shardIds, err := c.Driver.PrefixKeys(c.routePrefix(tbl.Id), 0) if err != nil { return err } for _, shardId := range shardIds { - sid, err := codec.Bytes2Uint64(shardId[len(c.routePrefix(tbl.SchemaId, tbl.Id)):]) + sid, err := Bytes2Uint64(shardId[len(c.routePrefix(tbl.Id)):]) if err != nil { logutil.Errorf("convert shardid failed, %v", err) break @@ -473,7 +587,7 @@ func (c *Catalog) ListTables(dbId uint64) ([]aoe.TableInfo, error) { } var tables []aoe.TableInfo for i := 1; i < len(values); i = i + 2 { - t, _ := helper.DecodeTable(values[i]) + t, _ := DecodeTable(values[i]) if t.State != aoe.StatePublic { continue } @@ -521,25 +635,85 @@ func (c *Catalog) GetTablets(dbId uint64, tableName string) (tablets []aoe.Table if tb.State != aoe.StatePublic { return nil, ErrTableNotExists } - shardIds, err := c.Driver.PrefixKeys(c.routePrefix(dbId, tb.Id), 0) + sids, err := c.getShardidsWithTimeout(tb.Id) if err != nil { return nil, err } - for _, shardId := range shardIds { - if sid, err := codec.Bytes2Uint64(shardId[len(c.routePrefix(dbId, tb.Id)):]); err != nil { - logutil.Errorf("convert shardid failed, %v, shardid is %d, prefix length is %d", err, len(shardId), len(c.routePrefix(dbId, tb.Id))) - continue - } else { - tablets = append(tablets, aoe.TabletInfo{ - Name: c.encodeTabletName(sid, tb.Id), - ShardId: sid, - Table: *tb, - }) - } + for _, sid := range sids { + tablets = append(tablets, aoe.TabletInfo{ + Name: c.encodeTabletName(sid, tb.Id), + ShardId: sid, + Table: *tb, + }) } return tablets, nil } } +func (c *Catalog) getShardidsWithTimeout(tid uint64) (shardids []uint64, err error) { + t0 := time.Now() + defer func() { + logutil.Infof("[getShardidsWithTimeout] get shard for %d, returns %d, %v, cost %d ms", tid, shardids, err, time.Since(t0).Milliseconds()) + }() + timeoutC := time.After(timeout) + for { + select { + case <-timeoutC: + logutil.Error("wait for available shard timeout") + return nil, ErrTableCreateTimeout + default: + shards, err := c.getShardids(tid) + if err == nil { + return shards, nil + } + time.Sleep(time.Millisecond * 10) + } + } +} +func (c *Catalog) getShardids(tid uint64) ([]uint64, error) { + sids := make([]uint64, 0) + pendingSids, err := c.GetPendingShards() + logutil.Infof("pending shards are %v",pendingSids) + if err != nil { + return nil, err + } + shardIds, err := c.Driver.PrefixKeys(c.routePrefix(tid), 0) + if err != nil { + return nil, err + } + for _, shardId := range shardIds { + sid, err := Bytes2Uint64(shardId[len(c.routePrefix(tid)):]) + if err != nil { + logutil.Errorf("convert shardid failed, %v, shardid is %d, prefix length is %d", err, len(shardId), len(c.routePrefix(tid))) + continue + } + for _, pendingSid := range pendingSids { + if sid == pendingSid { + logutil.Infof("shard %v is pending", sid) + return nil, ErrShardPending + } + } + sids = append(sids, sid) + } + return sids, nil +} +func (c *Catalog) GetPendingShards() (sids []uint64, err error) { + splitEvents, err := c.Driver.PrefixScan(c.preSplitPrefix(), 0) + if err != nil { + logutil.Errorf("PrefixScan fails, err: %v", err) + return nil, err + } + for i := 1; i < len(splitEvents); i += 2 { + splitEvent := SplitEvent{} + splitEventByte := splitEvents[i] + err = json.Unmarshal(splitEventByte, &splitEvent) + if err != nil { + logutil.Errorf("Unmarshal fails, err: %v", err) + return nil, err + } + sids = append(sids, splitEvent.Old) + } + return +} // RemoveDeletedTable trigger gc // TODO: handle duplicated remove @@ -554,18 +728,18 @@ func (c *Catalog) RemoveDeletedTable(epoch uint64) (cnt int, err error) { return cnt, err } for i := 1; i < len(rsp); i += 2 { - if tbl, err := helper.DecodeTable(rsp[i]); err != nil { + if tbl, err := DecodeTable(rsp[i]); err != nil { logutil.Errorf("Decode err for table info, %v, %v", err, rsp[i]) continue } else { - shardIds, err := c.Driver.PrefixKeys(c.routePrefix(tbl.SchemaId, tbl.Id), 0) + shardIds, err := c.Driver.PrefixKeys(c.routePrefix(tbl.Id), 0) if err != nil { logutil.Errorf("Failed to get shards for table %v, %v", rsp[i], err) continue } success := true for _, shardId := range shardIds { - if sid, err := codec.Bytes2Uint64(shardId[len(c.routePrefix(tbl.SchemaId, tbl.Id)):]); err != nil { + if sid, err := Bytes2Uint64(shardId[len(c.routePrefix(tbl.Id)):]); err != nil { logutil.Errorf("convert shardid failed, %v", err) success = false break @@ -625,7 +799,7 @@ func (c *Catalog) dropTables(epoch, dbId uint64) (err error) { for _, tbl := range tbs { tbl.State = aoe.StateDeleteOnly tbl.Epoch = epoch - value, _ := helper.EncodeTable(tbl) + value, _ := EncodeTable(tbl) if err = c.Driver.Set(c.deletedTableKey(epoch, dbId, tbl.Id), value); err != nil { return err } @@ -646,7 +820,7 @@ func (c *Catalog) checkDBNotExists(dbName string) (*aoe.SchemaInfo, error) { if value, err := c.Driver.Get(c.dbIDKey(dbName)); err != nil || value == nil { return nil, nil } else { - id, _ := codec.Bytes2Uint64(value) + id, _ := Bytes2Uint64(value) db, err := c.checkDBExists(id) if err == ErrDBNotExists { return nil, nil @@ -662,7 +836,7 @@ func (c *Catalog) checkTableExists(dbId, id uint64) (*aoe.TableInfo, error) { if v, err := c.Driver.Get(c.tableKey(dbId, id)); err != nil { return nil, ErrTableNotExists } else { - if table, err := helper.DecodeTable(v); err != nil { + if table, err := DecodeTable(v); err != nil { return nil, ErrTableNotExists } else { if table.State == aoe.StateDeleteOnly { @@ -680,7 +854,7 @@ func (c *Catalog) checkTableNotExists(dbId uint64, tableName string) (*aoe.Table if value, err := c.Driver.Get(c.tableIDKey(dbId, tableName)); err != nil || value == nil { return nil, nil } else { - id, _ := codec.Bytes2Uint64(value) + id, _ := Bytes2Uint64(value) tb, err := c.checkTableExists(dbId, id) if err == ErrTableNotExists { return nil, nil @@ -694,6 +868,11 @@ func (c *Catalog) encodeTabletName(groupId, tableId uint64) string { return strconv.Itoa(int(tableId)) } +func (c *Catalog) decodeTabletName(tbl string) uint64 { + tid, _ := strconv.Atoi(tbl) + return uint64(tid) +} + //genGlobalUniqIDs generates a global unique id by calling c.Driver.AllocID. func (c *Catalog) genGlobalUniqIDs(idKey []byte) (uint64, error) { id, err := c.Driver.AllocID(idKey, 1) @@ -705,57 +884,105 @@ func (c *Catalog) genGlobalUniqIDs(idKey []byte) (uint64, error) { //dbIDKey returns encoded dbName with prefix "meta1DBID" func (c *Catalog) dbIDKey(dbName string) []byte { - return codec.EncodeKey(cPrefix, defaultCatalogId, cDBIDPrefix, dbName) + return EncodeKey(cPrefix, defaultCatalogId, cDBIDPrefix, dbName) } //dbKey returns encoded id with prefix "meta1DBINFO" func (c *Catalog) dbKey(id uint64) []byte { - return codec.EncodeKey(cPrefix, defaultCatalogId, cDBPrefix, id) + return EncodeKey(cPrefix, defaultCatalogId, cDBPrefix, id) } //dbPrefix returns the prefix "meta1DBINFO" func (c *Catalog) dbPrefix() []byte { - return codec.EncodeKey(cPrefix, defaultCatalogId, cDBPrefix) + return EncodeKey(cPrefix, defaultCatalogId, cDBPrefix) } //tableIDKey returns the encoded tableName with prefix "meta1TID$dbId$" func (c *Catalog) tableIDKey(dbId uint64, tableName string) []byte { - return codec.EncodeKey(cPrefix, defaultCatalogId, cTableIDPrefix, dbId, tableName) + return EncodeKey(cPrefix, defaultCatalogId, cTableIDPrefix, dbId, tableName) } //tableKey returns the encoded tID with prefix "meta1Table$dbId$" func (c *Catalog) tableKey(dbId, tId uint64) []byte { - return codec.EncodeKey(cPrefix, defaultCatalogId, cTablePrefix, dbId, tId) + return EncodeKey(cPrefix, defaultCatalogId, cTablePrefix, dbId, tId) } //tablePrefix returns the prefix "meta1Table$dbId$" func (c *Catalog) tablePrefix(dbId uint64) []byte { - return codec.EncodeKey(cPrefix, defaultCatalogId, cTablePrefix, dbId) + return EncodeKey(cPrefix, defaultCatalogId, cTablePrefix, dbId) +} + +//routeKey returns the encoded gId with prefix "meta1Route$tId$" +func (c *Catalog) routeKey(tId, gId uint64) []byte { + return EncodeKey(cPrefix, defaultCatalogId, cRoutePrefix, tId, gId) +} + +//routePrefix returns the prefix "meta1Route$$tId" +func (c *Catalog) routePrefix(tId uint64) []byte { + return EncodeKey(cPrefix, defaultCatalogId, cRoutePrefix, tId) +} + +func (c *Catalog) splitPrefix() []byte { + return EncodeKey(cPrefix, defaultCatalogId, cSplitPrefix) +} + +func (c *Catalog) splitKey(db uint64) []byte { + return EncodeKey(cPrefix, defaultCatalogId, cSplitPrefix, db) } -//routeKey returns the encoded gId with prefix "meta1Route$dbId$$tId$" -func (c *Catalog) routeKey(dbId, tId, gId uint64) []byte { - return codec.EncodeKey(cPrefix, defaultCatalogId, cRoutePrefix, dbId, tId, gId) +func (c *Catalog) preSplitPrefix() []byte { + return EncodeKey(cPrefix, defaultCatalogId, cPreSplitPrefix) } -//routePrefix returns the prefix "meta1Route$dbId$$tId" -func (c *Catalog) routePrefix(dbId, tId uint64) []byte { - return codec.EncodeKey(cPrefix, defaultCatalogId, cRoutePrefix, dbId, tId) +func (c *Catalog) preSplitKey(db uint64) []byte { + return EncodeKey(cPrefix, defaultCatalogId, cPreSplitPrefix, db) +} + +func (c *Catalog) decodeSplitEvent(aoeSplitEvent *event.SplitEvent) (*SplitEvent, error) { + oldInterface, err := aoedb.IdToNameFactory.Decode(aoeSplitEvent.DB) + if err != nil { + return nil, err + } + old, ok := oldInterface.(uint64) + if !ok { + return nil, errors.New("invalid old shard id") + } + news := make(map[uint64][]uint64) + for newshard, tbls := range aoeSplitEvent.Names { + newInterface, err := aoedb.IdToNameFactory.Decode(newshard) + if err != nil { + return nil, err + } + new, ok := newInterface.(uint64) + if !ok { + return nil, errors.New("invalid new shard id") + } + news[new] = make([]uint64, len(tbls)) + for i, tbl := range tbls { + tid := c.decodeTabletName(tbl) + news[new][i] = tid + } + } + catalogSplitEvent := SplitEvent{ + Old: old, + News: news, + } + return &catalogSplitEvent, nil } //deletedTableKey returns the encoded tId with the prefix "DeletedTableQueue$epoch$$dbId$" func (c *Catalog) deletedTableKey(epoch, dbId, tId uint64) []byte { - return codec.EncodeKey(cDeletedTablePrefix, epoch, dbId, tId) + return EncodeKey(cDeletedTablePrefix, epoch, dbId, tId) } //deletedEpochPrefix returns the prefix "DeletedTableQueue$epoch$" func (c *Catalog) deletedEpochPrefix(epoch uint64) []byte { - return codec.EncodeKey(cDeletedTablePrefix, epoch) + return EncodeKey(cDeletedTablePrefix, epoch) } //deletedPrefix returns the prefix "DeletedTableQueue" func (c *Catalog) deletedPrefix() []byte { - return codec.EncodeKey(cDeletedTablePrefix) + return EncodeKey(cDeletedTablePrefix) } //getAvailableShard get a shard from the shard pool and returns its id. @@ -771,7 +998,7 @@ func (c *Catalog) getAvailableShard(tid uint64) (shardid uint64, err error) { logutil.Error("wait for available shard timeout") return shardid, ErrTableCreateTimeout default: - shard, err := c.Driver.GetShardPool().Alloc(uint64(pb.AOEGroup), codec.Uint642Bytes(tid)) + shard, err := c.Driver.GetShardPool().Alloc(uint64(pb.AOEGroup), Uint642Bytes(tid)) if err == nil { return shard.ShardID, err } @@ -855,13 +1082,13 @@ func (c *Catalog) refreshTableIDCache() { wg := sync.WaitGroup{} wg.Add(1) - c.Driver.AsyncAllocID(codec.String2Bytes(cTableIDPrefix), idPoolSize, func(i server.CustomRequest, data []byte, err error) { + c.Driver.AsyncAllocID(String2Bytes(cTableIDPrefix), idPoolSize, func(i server.CustomRequest, data []byte, err error) { defer wg.Done() if err != nil { logutil.Errorf("refresh table id failed, checkpoint is %d, %d", c.tidStart, c.tidEnd) return } - id, err := codec.Bytes2Uint64(data) + id, err := Bytes2Uint64(data) if err != nil { logutil.Errorf("get result of AllocId failed, %v\n", err) return @@ -891,13 +1118,13 @@ func (c *Catalog) refreshDBIDCache() { wg := sync.WaitGroup{} wg.Add(1) - c.Driver.AsyncAllocID(codec.String2Bytes(cDBIDPrefix), idPoolSize, func(i server.CustomRequest, data []byte, err error) { + c.Driver.AsyncAllocID(String2Bytes(cDBIDPrefix), idPoolSize, func(i server.CustomRequest, data []byte, err error) { defer wg.Done() if err != nil { logutil.Errorf("refresh db id failed, checkpoint is %d, %d", c.dbIdStart, c.dbIdEnd) return } - id, err := codec.Bytes2Uint64(data) + id, err := Bytes2Uint64(data) if err != nil { logutil.Errorf("get result of AllocId failed, %v\n", err) return diff --git a/pkg/catalog/errors.go b/pkg/catalog/errors.go index e9f5d0a87f48ead1d9f424ff89110c2dbb9b82fc..c4daefe08211ed1614b139bb7fd685273853b813 100644 --- a/pkg/catalog/errors.go +++ b/pkg/catalog/errors.go @@ -49,4 +49,6 @@ var ( ErrIndexExist = errors.New("index already exist") //ErrIndexNotExist is the error for trying to drop an index that doesn't exit. ErrIndexNotExist = errors.New("index not exist") + //ErrShardPending is for pending shards + ErrShardPending = errors.New("shard is pending") ) diff --git a/pkg/catalog/helper.go b/pkg/catalog/helper.go new file mode 100644 index 0000000000000000000000000000000000000000..6e0d2e45414049708210d38dd526917d545fbc42 --- /dev/null +++ b/pkg/catalog/helper.go @@ -0,0 +1,113 @@ +// Copyright 2021 Matrix Origin +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package catalog + +import ( + "bytes" + "encoding/binary" + "sync" + "reflect" + "unsafe" + "fmt" + + "github.com/matrixorigin/matrixone/pkg/encoding" + "github.com/matrixorigin/matrixone/pkg/vm/engine/aoe"//Index/TableInfo +) +func EncodeTable(tbl aoe.TableInfo) ([]byte, error) { + return encoding.Encode(tbl) +} + +func DecodeTable(data []byte) (aoe.TableInfo, error) { + var tbl aoe.TableInfo + + err := encoding.Decode(data, &tbl) + return tbl, err +} +func EncodeIndex(idx aoe.IndexInfo) ([]byte, error) { + return encoding.Encode(idx) +} + +func DecodeIndex(data []byte) (aoe.IndexInfo, error) { + var idx aoe.IndexInfo + + err := encoding.Decode(data, &idx) + return idx, err +} + +var pool = sync.Pool{ + New: func() interface{} { + return make([]byte, 8) + }, +} + +func EncodeKey(ks ...interface{}) []byte { + var buf bytes.Buffer + + for i := 0; i < len(ks); i++ { + k := ks[i] + switch v := k.(type) { + case uint8: + buf.Write(encoding.EncodeUint8(v)) + case int: + data := pool.Get().([]byte) + binary.BigEndian.PutUint64(data, uint64(v)) + buf.Write(data) + pool.Put(data) + case uint64: + data := pool.Get().([]byte) + binary.BigEndian.PutUint64(data, v) + buf.Write(data) + pool.Put(data) + case []byte: + buf.Write(v) + case string: + buf.WriteString(v) + } + } + + return buf.Bytes() +} + +func String2Bytes(v string) []byte { + if v == "" { + return nil + } + return unsafe.Slice( + (*byte)(unsafe.Pointer( + (*reflect.StringHeader)(unsafe.Pointer(&v)).Data, + )), + len(v), + ) +} + +func Bytes2String(v []byte) string { + return *(*string)(unsafe.Pointer(&v)) +} + +func Uint642Bytes(v uint64) []byte { + var buf bytes.Buffer + data := pool.Get().([]byte) + binary.BigEndian.PutUint64(data, v) + buf.Write(data) + pool.Put(data) + return buf.Bytes() +} + +func Bytes2Uint64(v []byte) (b uint64, err error) { + if len(v) != 8 { + return b, fmt.Errorf("invalid data, must 8 bytes, but %d", len(v)) + } + return binary.BigEndian.Uint64(v), nil +} diff --git a/pkg/vm/driver/aoe/storage.go b/pkg/vm/driver/aoe/storage.go index ce11e49039bb0ac739ac34751c4674643d0dafd3..88cee8dd85e97fbbc3253b2b1f00a1c634abaaca 100644 --- a/pkg/vm/driver/aoe/storage.go +++ b/pkg/vm/driver/aoe/storage.go @@ -669,32 +669,32 @@ func (s *Storage) Split(old meta.ShardMetadata, news []meta.ShardMetadata, ctx [ renameTable := func(oldName, dbName string) string { return oldName } - dropTableCtx := aoedb.DropTableCtx{ - DBMutationCtx: aoedb.DBMutationCtx{ - Id: old.LogIndex, - Offset: 0, - Size: 2, - DB: aoedb.IdToNameFactory.Encode(old.ShardID), - }, - Table: sPrefix + strconv.Itoa(int(old.ShardID)), - } - _, err := s.DB.DropTable(&dropTableCtx) - if err != nil { - logutil.Errorf("Split:S-%d dropTable fail.",old.ShardID) - return err - } + // dropTableCtx := aoedb.DropTableCtx{ + // DBMutationCtx: aoedb.DBMutationCtx{ + // Id: old.LogIndex, + // Offset: 0, + // Size: 2, + // DB: aoedb.IdToNameFactory.Encode(old.ShardID), + // }, + // Table: sPrefix + strconv.Itoa(int(old.ShardID)), + // } + // _, err := s.DB.DropTable(&dropTableCtx) + // if err != nil { + // logutil.Errorf("Split:S-%d dropTable fail.",old.ShardID) + // return err + // } execSplitCtx := aoedb.ExecSplitCtx{ DBMutationCtx: aoedb.DBMutationCtx{ Id: old.LogIndex, - Offset: 1, - Size: 2, + Offset: 0, + Size: 1, DB: aoedb.IdToNameFactory.Encode(old.ShardID), }, NewNames: newNames, RenameTable: renameTable, SplitCtx: ctx, } - err = s.DB.ExecSplitDatabase(&execSplitCtx) + err := s.DB.ExecSplitDatabase(&execSplitCtx) if err != nil { logutil.Errorf("Split:S-%d ExecSplitDatabase fail.",old.ShardID) return err diff --git a/pkg/vm/driver/helper.go b/pkg/vm/driver/helper.go new file mode 100644 index 0000000000000000000000000000000000000000..a17c189dcab142798ca902a859fbc888714add6e --- /dev/null +++ b/pkg/vm/driver/helper.go @@ -0,0 +1,113 @@ +// Copyright 2021 Matrix Origin +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package driver + +import ( + "bytes" + "encoding/binary" + "sync" + "reflect" + "unsafe" + "fmt" + + "github.com/matrixorigin/matrixone/pkg/encoding" + "github.com/matrixorigin/matrixone/pkg/vm/engine/aoe" +) +func EncodeTable(tbl aoe.TableInfo) ([]byte, error) { + return encoding.Encode(tbl) +} + +func DecodeTable(data []byte) (aoe.TableInfo, error) { + var tbl aoe.TableInfo + + err := encoding.Decode(data, &tbl) + return tbl, err +} +func EncodeIndex(idx aoe.IndexInfo) ([]byte, error) { + return encoding.Encode(idx) +} + +func DecodeIndex(data []byte) (aoe.IndexInfo, error) { + var idx aoe.IndexInfo + + err := encoding.Decode(data, &idx) + return idx, err +} + +var pool = sync.Pool{ + New: func() interface{} { + return make([]byte, 8) + }, +} + +func EncodeKey(ks ...interface{}) []byte { + var buf bytes.Buffer + + for i := 0; i < len(ks); i++ { + k := ks[i] + switch v := k.(type) { + case uint8: + buf.Write(encoding.EncodeUint8(v)) + case int: + data := pool.Get().([]byte) + binary.BigEndian.PutUint64(data, uint64(v)) + buf.Write(data) + pool.Put(data) + case uint64: + data := pool.Get().([]byte) + binary.BigEndian.PutUint64(data, v) + buf.Write(data) + pool.Put(data) + case []byte: + buf.Write(v) + case string: + buf.WriteString(v) + } + } + + return buf.Bytes() +} + +func String2Bytes(v string) []byte { + if v == "" { + return nil + } + return unsafe.Slice( + (*byte)(unsafe.Pointer( + (*reflect.StringHeader)(unsafe.Pointer(&v)).Data, + )), + len(v), + ) +} + +func Bytes2String(v []byte) string { + return *(*string)(unsafe.Pointer(&v)) +} + +func Uint642Bytes(v uint64) []byte { + var buf bytes.Buffer + data := pool.Get().([]byte) + binary.BigEndian.PutUint64(data, v) + buf.Write(data) + pool.Put(data) + return buf.Bytes() +} + +func Bytes2Uint64(v []byte) (b uint64, err error) { + if len(v) != 8 { + return b, fmt.Errorf("invalid data, must 8 bytes, but %d", len(v)) + } + return binary.BigEndian.Uint64(v), nil +} diff --git a/pkg/vm/driver/kv/executor.go b/pkg/vm/driver/kv/executor.go index 3a7d7f9adf9910f6600183f40eaaf29d1687094b..7df0afeb46a9273152e6085f2bc8a0819b1eb6a6 100644 --- a/pkg/vm/driver/kv/executor.go +++ b/pkg/vm/driver/kv/executor.go @@ -22,7 +22,7 @@ import ( errDriver "github.com/matrixorigin/matrixone/pkg/vm/driver/error" pb3 "github.com/matrixorigin/matrixone/pkg/vm/driver/pb" - "github.com/matrixorigin/matrixone/pkg/vm/engine/aoe/common/codec" + "github.com/matrixorigin/matrixone/pkg/vm/driver" "github.com/fagongzi/util/protoc" "github.com/matrixorigin/matrixcube/pb/meta" @@ -32,7 +32,6 @@ import ( "github.com/matrixorigin/matrixone/pkg/vm/driver/pb" "github.com/matrixorigin/matrixcube/util" - // "github.com/matrixorigin/matrixcube/pb/rpc" ) // Storage memory storage @@ -150,7 +149,7 @@ func (ce *kvExecutor) incr(wb util.WriteBatch, req storage.Request) (uint64, []b id := uint64(0) if v, ok := ce.attrs[string(req.Key)]; ok { - id, _ = codec.Bytes2Uint64(v.([]byte)) + id, _ = driver.Bytes2Uint64(v.([]byte)) } else { value, err := ce.kv.Get(req.Key) if err != nil { @@ -158,7 +157,7 @@ func (ce *kvExecutor) incr(wb util.WriteBatch, req storage.Request) (uint64, []b return 0, rep } if len(value) > 0 { - id, _ = codec.Bytes2Uint64(value) + id, _ = driver.Bytes2Uint64(value) } } @@ -168,7 +167,7 @@ func (ce *kvExecutor) incr(wb util.WriteBatch, req storage.Request) (uint64, []b id += customReq.Batch } - newV := codec.Uint642Bytes(id) + newV := driver.Uint642Bytes(id) ce.attrs[string(req.Key)] = newV wb.Set(req.Key, newV) @@ -190,11 +189,11 @@ func (ce *kvExecutor) setIfNotExist(wb util.WriteBatch, req storage.Request) (ui var rep []byte - if _, ok := ce.attrs[codec.Bytes2String(req.Key)]; ok { + if _, ok := ce.attrs[driver.Bytes2String(req.Key)]; ok { rep = errDriver.ErrorResp(errors.New("key is already existed")) return 0, rep } else { - ce.attrs[codec.Bytes2String(req.Key)] = "1" + ce.attrs[driver.Bytes2String(req.Key)] = "1" } value, err := ce.kv.Get(req.Key)