Skip to content
Snippets Groups Projects
Unverified Commit ad9fae54 authored by jiangxinmeng1's avatar jiangxinmeng1 Committed by GitHub
Browse files

Define and implement whole split process: add catalog listener (#1321)

* catalog listener

* add catalog/helper.go

* add driver/helper.go

* update getTablets

* update OnPostSplit
parent 1107df4a
No related branches found
No related tags found
No related merge requests found
......@@ -215,6 +215,8 @@ func main() {
os.Exit(DecodeAoeConfigExit)
}
catalogListener := catalog.NewCatalogListener()
opt.EventListener = catalogListener
aoeDataStorage, err = aoeDriver.NewStorageWithOptions(targetDir+"/aoe", &opt)
if err != nil {
logutil.Infof("Create aoe driver error, %v\n", err)
......@@ -251,6 +253,7 @@ func main() {
}
c = catalog.NewCatalog(a)
config.ClusterCatalog = c
catalogListener.UpdateCatalog(c)
eng := aoeEngine.New(c)
pci.SetRemoveEpoch(removeEpoch)
......
......@@ -30,8 +30,8 @@ import (
"github.com/matrixorigin/matrixone/pkg/vm/driver"
"github.com/matrixorigin/matrixone/pkg/vm/driver/pb"
"github.com/matrixorigin/matrixone/pkg/vm/engine/aoe"
"github.com/matrixorigin/matrixone/pkg/vm/engine/aoe/common/codec"
"github.com/matrixorigin/matrixone/pkg/vm/engine/aoe/common/helper"
"github.com/matrixorigin/matrixone/pkg/vm/engine/aoe/storage/aoedb/v1"
"github.com/matrixorigin/matrixone/pkg/vm/engine/aoe/storage/event"
)
const (
......@@ -42,6 +42,8 @@ const (
cTablePrefix = "Table"
cTableIDPrefix = "TID"
cRoutePrefix = "Route"
cPreSplitPrefix = "PreSplit"
cSplitPrefix = "Split"
cDeletedTablePrefix = "DeletedTableQueue"
timeout = 2000 * time.Millisecond
idPoolSize = 20
......@@ -56,19 +58,131 @@ type Catalog struct {
tidEnd uint64
pLock int32
}
type CatalogListener struct {
event.NoopListener
catalog *Catalog
}
type SplitEvent struct {
Old uint64
News map[uint64][]uint64
}
func NewCatalogListener() *CatalogListener {
return &CatalogListener{}
}
func (l *CatalogListener) UpdateCatalog(catalog *Catalog) {
l.catalog = catalog
}
//OnPreSplit set presplit key.
func (l *CatalogListener) OnPreSplit(event *event.SplitEvent) error {
catalogSplitEvent, err := l.catalog.decodeSplitEvent(event)
if err != nil {
panic(err)
}
key := l.catalog.preSplitKey(catalogSplitEvent.Old)
value, err := json.Marshal(catalogSplitEvent)
if err != nil {
panic(err)
}
err = l.catalog.Driver.Set(key, value)
if err != nil {
panic(err)
}
return nil
}
//OnPostSplit update route info and delete presplit key.
func (l *CatalogListener) OnPostSplit(res error, event *event.SplitEvent) error {
catalogSplitEvent, err := l.catalog.decodeSplitEvent(event)
if err != nil {
panic(err)
}
postKey := l.catalog.splitKey(catalogSplitEvent.Old)
value, err := json.Marshal(catalogSplitEvent)
if err != nil {
panic(err)
}
err = l.catalog.Driver.Set(postKey, value)
if err != nil {
panic(err)
}
go l.catalog.OnDatabaseSplitted()
preKey := l.catalog.preSplitKey(catalogSplitEvent.Old)
err = l.catalog.Driver.Delete(preKey)
if err != nil {
panic(err)
}
return nil
}
func (c *Catalog) updateRouteInfo(tid, oldSid, newSid uint64) error {
routePrefix := c.routePrefix(tid)
shardIds, err := c.Driver.PrefixKeys(routePrefix, 0)
if err != nil {
logutil.Errorf("PrefixKeys fails, err:%v", err)
return err
}
for _, sidByte := range shardIds {
sid, err := Bytes2Uint64(sidByte[len(c.routePrefix(tid)):])
if err != nil {
logutil.Errorf("Bytes2Uint64 fails, err:%v", err)
return err
}
if sid == oldSid {
oldKey := c.routeKey(tid, sid)
newKey := c.routeKey(tid, newSid)
err = c.Driver.Set(newKey, []byte(strconv.Itoa(int(tid))))
if err != nil {
logutil.Errorf("Set fails, err:%v", err)
return err
}
err = c.Driver.Delete(oldKey)
if err != nil {
logutil.Errorf("Delete fails, err:%v", err)
return err
}
}
}
return nil
}
func (c *Catalog) OnDatabaseSplitted() error {
splitEvents, err := c.Driver.PrefixScan(c.splitPrefix(), 0)
if err != nil {
return err
}
for i := 1; i < len(splitEvents); i += 2 {
splitEvent := SplitEvent{}
splitEventByte := splitEvents[i]
err = json.Unmarshal(splitEventByte, &splitEvent)
if err != nil {
logutil.Errorf("Unmarshal fails, err:%v", err)
return err
}
for newShard, tbls := range splitEvent.News {
for _, tid := range tbls {
err := c.updateRouteInfo(tid, splitEvent.Old, newShard)
if err != nil {
return err
}
}
}
}
return nil
}
// NewCatalog creates a Catalog.
func NewCatalog(store driver.CubeDriver) *Catalog {
catalog := Catalog{
Driver: store,
}
tmpId, err := store.AllocID(codec.String2Bytes(cDBIDPrefix), idPoolSize)
tmpId, err := store.AllocID(String2Bytes(cDBIDPrefix), idPoolSize)
if err != nil {
panic(fmt.Sprintf("init db id pool failed. %v", err))
}
catalog.dbIdEnd = tmpId
catalog.dbIdStart = tmpId - idPoolSize + 1
tmpId, err = store.AllocID(codec.String2Bytes(cTableIDPrefix), idPoolSize)
tmpId, err = store.AllocID(String2Bytes(cTableIDPrefix), idPoolSize)
if err != nil {
panic(fmt.Sprintf("init table id pool failed. %v", err))
}
......@@ -92,7 +206,7 @@ func (c *Catalog) CreateDatabase(epoch uint64, dbName string, typ int) (dbid uin
if err != nil {
return 0, err
}
if err = c.Driver.SetIfNotExist(c.dbIDKey(dbName), codec.Uint642Bytes(dbid)); err != nil {
if err = c.Driver.SetIfNotExist(c.dbIDKey(dbName), Uint642Bytes(dbid)); err != nil {
return 0, err
}
info := aoe.SchemaInfo{
......@@ -220,7 +334,7 @@ func (c *Catalog) SetPrimaryKey(epoch, dbId uint64, tableName, columnName string
return ErrColumnNotExist
}
func (c *Catalog) updateTableInfo(dbId uint64, tbl *aoe.TableInfo) (err error) {
meta, err := helper.EncodeTable(*tbl)
meta, err := EncodeTable(*tbl)
if err != nil {
return err
}
......@@ -257,7 +371,7 @@ func (c *Catalog) CreateTable(epoch, dbId uint64, tbl aoe.TableInfo) (tid uint64
return tid, err
}
tbl.Id = tid
if err = c.Driver.SetIfNotExist(c.tableIDKey(dbId, tbl.Name), codec.Uint642Bytes(tbl.Id)); err != nil {
if err = c.Driver.SetIfNotExist(c.tableIDKey(dbId, tbl.Name), Uint642Bytes(tbl.Id)); err != nil {
return tid, ErrTableCreateExists
}
......@@ -265,7 +379,7 @@ func (c *Catalog) CreateTable(epoch, dbId uint64, tbl aoe.TableInfo) (tid uint64
tbl.Epoch = epoch
tbl.SchemaId = dbId
if shardId, err := c.getAvailableShard(tbl.Id); err == nil {
rkey := c.routeKey(dbId, tbl.Id, shardId)
rkey := c.routeKey(tbl.Id, shardId)
tableName := tbl.Name
aoeTableName := c.encodeTabletName(shardId, tbl.Id)
if err := c.Driver.CreateTablet(aoeTableName, shardId, &tbl); err != nil {
......@@ -274,7 +388,7 @@ func (c *Catalog) CreateTable(epoch, dbId uint64, tbl aoe.TableInfo) (tid uint64
}
tbl.Name = tableName
tbl.State = aoe.StatePublic
meta, err := helper.EncodeTable(tbl)
meta, err := EncodeTable(tbl)
if err != nil {
logutil.Errorf("ErrTableCreateFailed, %v", err)
return tid, err
......@@ -320,7 +434,7 @@ func (c *Catalog) DropTable(epoch, dbId uint64, tableName string) (tid uint64, e
tid = tb.Id
tb.State = aoe.StateDeleteOnly
tb.Epoch = epoch
value, _ := helper.EncodeTable(*tb)
value, _ := EncodeTable(*tb)
if err = c.Driver.Set(c.deletedTableKey(epoch, dbId, tb.Id), value); err != nil {
return tid, err
}
......@@ -338,7 +452,7 @@ func (c *Catalog) ListTablesByName(dbName string) ([]aoe.TableInfo, error) {
if value, err := c.Driver.Get(c.dbIDKey(dbName)); err != nil || value == nil {
return nil, ErrDBNotExists
} else {
id, _ := codec.Bytes2Uint64(value)
id, _ := Bytes2Uint64(value)
return c.ListTables(id)
}
}
......@@ -386,12 +500,12 @@ func (c *Catalog) CreateIndex(epoch uint64, idxInfo aoe.IndexInfo) error {
if idxInfo.Type == aoe.Bsi {
idxInfo.Type = aoe.NumBsi
}
shardIds, err := c.Driver.PrefixKeys(c.routePrefix(tbl.SchemaId, tbl.Id), 0)
shardIds, err := c.Driver.PrefixKeys(c.routePrefix(tbl.Id), 0)
if err != nil {
return err
}
for _, shardId := range shardIds {
sid, err := codec.Bytes2Uint64(shardId[len(c.routePrefix(tbl.SchemaId, tbl.Id)):])
sid, err := Bytes2Uint64(shardId[len(c.routePrefix(tbl.Id)):])
if err != nil {
logutil.Errorf("convert shardid failed, %v", err)
break
......@@ -426,12 +540,12 @@ func (c *Catalog) DropIndex(epoch, tid, dbid uint64, idxName string) error {
if err != nil {
return err
}
shardIds, err := c.Driver.PrefixKeys(c.routePrefix(tbl.SchemaId, tbl.Id), 0)
shardIds, err := c.Driver.PrefixKeys(c.routePrefix(tbl.Id), 0)
if err != nil {
return err
}
for _, shardId := range shardIds {
sid, err := codec.Bytes2Uint64(shardId[len(c.routePrefix(tbl.SchemaId, tbl.Id)):])
sid, err := Bytes2Uint64(shardId[len(c.routePrefix(tbl.Id)):])
if err != nil {
logutil.Errorf("convert shardid failed, %v", err)
break
......@@ -473,7 +587,7 @@ func (c *Catalog) ListTables(dbId uint64) ([]aoe.TableInfo, error) {
}
var tables []aoe.TableInfo
for i := 1; i < len(values); i = i + 2 {
t, _ := helper.DecodeTable(values[i])
t, _ := DecodeTable(values[i])
if t.State != aoe.StatePublic {
continue
}
......@@ -521,25 +635,85 @@ func (c *Catalog) GetTablets(dbId uint64, tableName string) (tablets []aoe.Table
if tb.State != aoe.StatePublic {
return nil, ErrTableNotExists
}
shardIds, err := c.Driver.PrefixKeys(c.routePrefix(dbId, tb.Id), 0)
sids, err := c.getShardidsWithTimeout(tb.Id)
if err != nil {
return nil, err
}
for _, shardId := range shardIds {
if sid, err := codec.Bytes2Uint64(shardId[len(c.routePrefix(dbId, tb.Id)):]); err != nil {
logutil.Errorf("convert shardid failed, %v, shardid is %d, prefix length is %d", err, len(shardId), len(c.routePrefix(dbId, tb.Id)))
continue
} else {
tablets = append(tablets, aoe.TabletInfo{
Name: c.encodeTabletName(sid, tb.Id),
ShardId: sid,
Table: *tb,
})
}
for _, sid := range sids {
tablets = append(tablets, aoe.TabletInfo{
Name: c.encodeTabletName(sid, tb.Id),
ShardId: sid,
Table: *tb,
})
}
return tablets, nil
}
}
func (c *Catalog) getShardidsWithTimeout(tid uint64) (shardids []uint64, err error) {
t0 := time.Now()
defer func() {
logutil.Infof("[getShardidsWithTimeout] get shard for %d, returns %d, %v, cost %d ms", tid, shardids, err, time.Since(t0).Milliseconds())
}()
timeoutC := time.After(timeout)
for {
select {
case <-timeoutC:
logutil.Error("wait for available shard timeout")
return nil, ErrTableCreateTimeout
default:
shards, err := c.getShardids(tid)
if err == nil {
return shards, nil
}
time.Sleep(time.Millisecond * 10)
}
}
}
func (c *Catalog) getShardids(tid uint64) ([]uint64, error) {
sids := make([]uint64, 0)
pendingSids, err := c.GetPendingShards()
logutil.Infof("pending shards are %v",pendingSids)
if err != nil {
return nil, err
}
shardIds, err := c.Driver.PrefixKeys(c.routePrefix(tid), 0)
if err != nil {
return nil, err
}
for _, shardId := range shardIds {
sid, err := Bytes2Uint64(shardId[len(c.routePrefix(tid)):])
if err != nil {
logutil.Errorf("convert shardid failed, %v, shardid is %d, prefix length is %d", err, len(shardId), len(c.routePrefix(tid)))
continue
}
for _, pendingSid := range pendingSids {
if sid == pendingSid {
logutil.Infof("shard %v is pending", sid)
return nil, ErrShardPending
}
}
sids = append(sids, sid)
}
return sids, nil
}
func (c *Catalog) GetPendingShards() (sids []uint64, err error) {
splitEvents, err := c.Driver.PrefixScan(c.preSplitPrefix(), 0)
if err != nil {
logutil.Errorf("PrefixScan fails, err: %v", err)
return nil, err
}
for i := 1; i < len(splitEvents); i += 2 {
splitEvent := SplitEvent{}
splitEventByte := splitEvents[i]
err = json.Unmarshal(splitEventByte, &splitEvent)
if err != nil {
logutil.Errorf("Unmarshal fails, err: %v", err)
return nil, err
}
sids = append(sids, splitEvent.Old)
}
return
}
// RemoveDeletedTable trigger gc
// TODO: handle duplicated remove
......@@ -554,18 +728,18 @@ func (c *Catalog) RemoveDeletedTable(epoch uint64) (cnt int, err error) {
return cnt, err
}
for i := 1; i < len(rsp); i += 2 {
if tbl, err := helper.DecodeTable(rsp[i]); err != nil {
if tbl, err := DecodeTable(rsp[i]); err != nil {
logutil.Errorf("Decode err for table info, %v, %v", err, rsp[i])
continue
} else {
shardIds, err := c.Driver.PrefixKeys(c.routePrefix(tbl.SchemaId, tbl.Id), 0)
shardIds, err := c.Driver.PrefixKeys(c.routePrefix(tbl.Id), 0)
if err != nil {
logutil.Errorf("Failed to get shards for table %v, %v", rsp[i], err)
continue
}
success := true
for _, shardId := range shardIds {
if sid, err := codec.Bytes2Uint64(shardId[len(c.routePrefix(tbl.SchemaId, tbl.Id)):]); err != nil {
if sid, err := Bytes2Uint64(shardId[len(c.routePrefix(tbl.Id)):]); err != nil {
logutil.Errorf("convert shardid failed, %v", err)
success = false
break
......@@ -625,7 +799,7 @@ func (c *Catalog) dropTables(epoch, dbId uint64) (err error) {
for _, tbl := range tbs {
tbl.State = aoe.StateDeleteOnly
tbl.Epoch = epoch
value, _ := helper.EncodeTable(tbl)
value, _ := EncodeTable(tbl)
if err = c.Driver.Set(c.deletedTableKey(epoch, dbId, tbl.Id), value); err != nil {
return err
}
......@@ -646,7 +820,7 @@ func (c *Catalog) checkDBNotExists(dbName string) (*aoe.SchemaInfo, error) {
if value, err := c.Driver.Get(c.dbIDKey(dbName)); err != nil || value == nil {
return nil, nil
} else {
id, _ := codec.Bytes2Uint64(value)
id, _ := Bytes2Uint64(value)
db, err := c.checkDBExists(id)
if err == ErrDBNotExists {
return nil, nil
......@@ -662,7 +836,7 @@ func (c *Catalog) checkTableExists(dbId, id uint64) (*aoe.TableInfo, error) {
if v, err := c.Driver.Get(c.tableKey(dbId, id)); err != nil {
return nil, ErrTableNotExists
} else {
if table, err := helper.DecodeTable(v); err != nil {
if table, err := DecodeTable(v); err != nil {
return nil, ErrTableNotExists
} else {
if table.State == aoe.StateDeleteOnly {
......@@ -680,7 +854,7 @@ func (c *Catalog) checkTableNotExists(dbId uint64, tableName string) (*aoe.Table
if value, err := c.Driver.Get(c.tableIDKey(dbId, tableName)); err != nil || value == nil {
return nil, nil
} else {
id, _ := codec.Bytes2Uint64(value)
id, _ := Bytes2Uint64(value)
tb, err := c.checkTableExists(dbId, id)
if err == ErrTableNotExists {
return nil, nil
......@@ -694,6 +868,11 @@ func (c *Catalog) encodeTabletName(groupId, tableId uint64) string {
return strconv.Itoa(int(tableId))
}
func (c *Catalog) decodeTabletName(tbl string) uint64 {
tid, _ := strconv.Atoi(tbl)
return uint64(tid)
}
//genGlobalUniqIDs generates a global unique id by calling c.Driver.AllocID.
func (c *Catalog) genGlobalUniqIDs(idKey []byte) (uint64, error) {
id, err := c.Driver.AllocID(idKey, 1)
......@@ -705,57 +884,105 @@ func (c *Catalog) genGlobalUniqIDs(idKey []byte) (uint64, error) {
//dbIDKey returns encoded dbName with prefix "meta1DBID"
func (c *Catalog) dbIDKey(dbName string) []byte {
return codec.EncodeKey(cPrefix, defaultCatalogId, cDBIDPrefix, dbName)
return EncodeKey(cPrefix, defaultCatalogId, cDBIDPrefix, dbName)
}
//dbKey returns encoded id with prefix "meta1DBINFO"
func (c *Catalog) dbKey(id uint64) []byte {
return codec.EncodeKey(cPrefix, defaultCatalogId, cDBPrefix, id)
return EncodeKey(cPrefix, defaultCatalogId, cDBPrefix, id)
}
//dbPrefix returns the prefix "meta1DBINFO"
func (c *Catalog) dbPrefix() []byte {
return codec.EncodeKey(cPrefix, defaultCatalogId, cDBPrefix)
return EncodeKey(cPrefix, defaultCatalogId, cDBPrefix)
}
//tableIDKey returns the encoded tableName with prefix "meta1TID$dbId$"
func (c *Catalog) tableIDKey(dbId uint64, tableName string) []byte {
return codec.EncodeKey(cPrefix, defaultCatalogId, cTableIDPrefix, dbId, tableName)
return EncodeKey(cPrefix, defaultCatalogId, cTableIDPrefix, dbId, tableName)
}
//tableKey returns the encoded tID with prefix "meta1Table$dbId$"
func (c *Catalog) tableKey(dbId, tId uint64) []byte {
return codec.EncodeKey(cPrefix, defaultCatalogId, cTablePrefix, dbId, tId)
return EncodeKey(cPrefix, defaultCatalogId, cTablePrefix, dbId, tId)
}
//tablePrefix returns the prefix "meta1Table$dbId$"
func (c *Catalog) tablePrefix(dbId uint64) []byte {
return codec.EncodeKey(cPrefix, defaultCatalogId, cTablePrefix, dbId)
return EncodeKey(cPrefix, defaultCatalogId, cTablePrefix, dbId)
}
//routeKey returns the encoded gId with prefix "meta1Route$tId$"
func (c *Catalog) routeKey(tId, gId uint64) []byte {
return EncodeKey(cPrefix, defaultCatalogId, cRoutePrefix, tId, gId)
}
//routePrefix returns the prefix "meta1Route$$tId"
func (c *Catalog) routePrefix(tId uint64) []byte {
return EncodeKey(cPrefix, defaultCatalogId, cRoutePrefix, tId)
}
func (c *Catalog) splitPrefix() []byte {
return EncodeKey(cPrefix, defaultCatalogId, cSplitPrefix)
}
func (c *Catalog) splitKey(db uint64) []byte {
return EncodeKey(cPrefix, defaultCatalogId, cSplitPrefix, db)
}
//routeKey returns the encoded gId with prefix "meta1Route$dbId$$tId$"
func (c *Catalog) routeKey(dbId, tId, gId uint64) []byte {
return codec.EncodeKey(cPrefix, defaultCatalogId, cRoutePrefix, dbId, tId, gId)
func (c *Catalog) preSplitPrefix() []byte {
return EncodeKey(cPrefix, defaultCatalogId, cPreSplitPrefix)
}
//routePrefix returns the prefix "meta1Route$dbId$$tId"
func (c *Catalog) routePrefix(dbId, tId uint64) []byte {
return codec.EncodeKey(cPrefix, defaultCatalogId, cRoutePrefix, dbId, tId)
func (c *Catalog) preSplitKey(db uint64) []byte {
return EncodeKey(cPrefix, defaultCatalogId, cPreSplitPrefix, db)
}
func (c *Catalog) decodeSplitEvent(aoeSplitEvent *event.SplitEvent) (*SplitEvent, error) {
oldInterface, err := aoedb.IdToNameFactory.Decode(aoeSplitEvent.DB)
if err != nil {
return nil, err
}
old, ok := oldInterface.(uint64)
if !ok {
return nil, errors.New("invalid old shard id")
}
news := make(map[uint64][]uint64)
for newshard, tbls := range aoeSplitEvent.Names {
newInterface, err := aoedb.IdToNameFactory.Decode(newshard)
if err != nil {
return nil, err
}
new, ok := newInterface.(uint64)
if !ok {
return nil, errors.New("invalid new shard id")
}
news[new] = make([]uint64, len(tbls))
for i, tbl := range tbls {
tid := c.decodeTabletName(tbl)
news[new][i] = tid
}
}
catalogSplitEvent := SplitEvent{
Old: old,
News: news,
}
return &catalogSplitEvent, nil
}
//deletedTableKey returns the encoded tId with the prefix "DeletedTableQueue$epoch$$dbId$"
func (c *Catalog) deletedTableKey(epoch, dbId, tId uint64) []byte {
return codec.EncodeKey(cDeletedTablePrefix, epoch, dbId, tId)
return EncodeKey(cDeletedTablePrefix, epoch, dbId, tId)
}
//deletedEpochPrefix returns the prefix "DeletedTableQueue$epoch$"
func (c *Catalog) deletedEpochPrefix(epoch uint64) []byte {
return codec.EncodeKey(cDeletedTablePrefix, epoch)
return EncodeKey(cDeletedTablePrefix, epoch)
}
//deletedPrefix returns the prefix "DeletedTableQueue"
func (c *Catalog) deletedPrefix() []byte {
return codec.EncodeKey(cDeletedTablePrefix)
return EncodeKey(cDeletedTablePrefix)
}
//getAvailableShard get a shard from the shard pool and returns its id.
......@@ -771,7 +998,7 @@ func (c *Catalog) getAvailableShard(tid uint64) (shardid uint64, err error) {
logutil.Error("wait for available shard timeout")
return shardid, ErrTableCreateTimeout
default:
shard, err := c.Driver.GetShardPool().Alloc(uint64(pb.AOEGroup), codec.Uint642Bytes(tid))
shard, err := c.Driver.GetShardPool().Alloc(uint64(pb.AOEGroup), Uint642Bytes(tid))
if err == nil {
return shard.ShardID, err
}
......@@ -855,13 +1082,13 @@ func (c *Catalog) refreshTableIDCache() {
wg := sync.WaitGroup{}
wg.Add(1)
c.Driver.AsyncAllocID(codec.String2Bytes(cTableIDPrefix), idPoolSize, func(i server.CustomRequest, data []byte, err error) {
c.Driver.AsyncAllocID(String2Bytes(cTableIDPrefix), idPoolSize, func(i server.CustomRequest, data []byte, err error) {
defer wg.Done()
if err != nil {
logutil.Errorf("refresh table id failed, checkpoint is %d, %d", c.tidStart, c.tidEnd)
return
}
id, err := codec.Bytes2Uint64(data)
id, err := Bytes2Uint64(data)
if err != nil {
logutil.Errorf("get result of AllocId failed, %v\n", err)
return
......@@ -891,13 +1118,13 @@ func (c *Catalog) refreshDBIDCache() {
wg := sync.WaitGroup{}
wg.Add(1)
c.Driver.AsyncAllocID(codec.String2Bytes(cDBIDPrefix), idPoolSize, func(i server.CustomRequest, data []byte, err error) {
c.Driver.AsyncAllocID(String2Bytes(cDBIDPrefix), idPoolSize, func(i server.CustomRequest, data []byte, err error) {
defer wg.Done()
if err != nil {
logutil.Errorf("refresh db id failed, checkpoint is %d, %d", c.dbIdStart, c.dbIdEnd)
return
}
id, err := codec.Bytes2Uint64(data)
id, err := Bytes2Uint64(data)
if err != nil {
logutil.Errorf("get result of AllocId failed, %v\n", err)
return
......
......@@ -49,4 +49,6 @@ var (
ErrIndexExist = errors.New("index already exist")
//ErrIndexNotExist is the error for trying to drop an index that doesn't exit.
ErrIndexNotExist = errors.New("index not exist")
//ErrShardPending is for pending shards
ErrShardPending = errors.New("shard is pending")
)
// Copyright 2021 Matrix Origin
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package catalog
import (
"bytes"
"encoding/binary"
"sync"
"reflect"
"unsafe"
"fmt"
"github.com/matrixorigin/matrixone/pkg/encoding"
"github.com/matrixorigin/matrixone/pkg/vm/engine/aoe"//Index/TableInfo
)
func EncodeTable(tbl aoe.TableInfo) ([]byte, error) {
return encoding.Encode(tbl)
}
func DecodeTable(data []byte) (aoe.TableInfo, error) {
var tbl aoe.TableInfo
err := encoding.Decode(data, &tbl)
return tbl, err
}
func EncodeIndex(idx aoe.IndexInfo) ([]byte, error) {
return encoding.Encode(idx)
}
func DecodeIndex(data []byte) (aoe.IndexInfo, error) {
var idx aoe.IndexInfo
err := encoding.Decode(data, &idx)
return idx, err
}
var pool = sync.Pool{
New: func() interface{} {
return make([]byte, 8)
},
}
func EncodeKey(ks ...interface{}) []byte {
var buf bytes.Buffer
for i := 0; i < len(ks); i++ {
k := ks[i]
switch v := k.(type) {
case uint8:
buf.Write(encoding.EncodeUint8(v))
case int:
data := pool.Get().([]byte)
binary.BigEndian.PutUint64(data, uint64(v))
buf.Write(data)
pool.Put(data)
case uint64:
data := pool.Get().([]byte)
binary.BigEndian.PutUint64(data, v)
buf.Write(data)
pool.Put(data)
case []byte:
buf.Write(v)
case string:
buf.WriteString(v)
}
}
return buf.Bytes()
}
func String2Bytes(v string) []byte {
if v == "" {
return nil
}
return unsafe.Slice(
(*byte)(unsafe.Pointer(
(*reflect.StringHeader)(unsafe.Pointer(&v)).Data,
)),
len(v),
)
}
func Bytes2String(v []byte) string {
return *(*string)(unsafe.Pointer(&v))
}
func Uint642Bytes(v uint64) []byte {
var buf bytes.Buffer
data := pool.Get().([]byte)
binary.BigEndian.PutUint64(data, v)
buf.Write(data)
pool.Put(data)
return buf.Bytes()
}
func Bytes2Uint64(v []byte) (b uint64, err error) {
if len(v) != 8 {
return b, fmt.Errorf("invalid data, must 8 bytes, but %d", len(v))
}
return binary.BigEndian.Uint64(v), nil
}
......@@ -669,32 +669,32 @@ func (s *Storage) Split(old meta.ShardMetadata, news []meta.ShardMetadata, ctx [
renameTable := func(oldName, dbName string) string {
return oldName
}
dropTableCtx := aoedb.DropTableCtx{
DBMutationCtx: aoedb.DBMutationCtx{
Id: old.LogIndex,
Offset: 0,
Size: 2,
DB: aoedb.IdToNameFactory.Encode(old.ShardID),
},
Table: sPrefix + strconv.Itoa(int(old.ShardID)),
}
_, err := s.DB.DropTable(&dropTableCtx)
if err != nil {
logutil.Errorf("Split:S-%d dropTable fail.",old.ShardID)
return err
}
// dropTableCtx := aoedb.DropTableCtx{
// DBMutationCtx: aoedb.DBMutationCtx{
// Id: old.LogIndex,
// Offset: 0,
// Size: 2,
// DB: aoedb.IdToNameFactory.Encode(old.ShardID),
// },
// Table: sPrefix + strconv.Itoa(int(old.ShardID)),
// }
// _, err := s.DB.DropTable(&dropTableCtx)
// if err != nil {
// logutil.Errorf("Split:S-%d dropTable fail.",old.ShardID)
// return err
// }
execSplitCtx := aoedb.ExecSplitCtx{
DBMutationCtx: aoedb.DBMutationCtx{
Id: old.LogIndex,
Offset: 1,
Size: 2,
Offset: 0,
Size: 1,
DB: aoedb.IdToNameFactory.Encode(old.ShardID),
},
NewNames: newNames,
RenameTable: renameTable,
SplitCtx: ctx,
}
err = s.DB.ExecSplitDatabase(&execSplitCtx)
err := s.DB.ExecSplitDatabase(&execSplitCtx)
if err != nil {
logutil.Errorf("Split:S-%d ExecSplitDatabase fail.",old.ShardID)
return err
......
// Copyright 2021 Matrix Origin
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package driver
import (
"bytes"
"encoding/binary"
"sync"
"reflect"
"unsafe"
"fmt"
"github.com/matrixorigin/matrixone/pkg/encoding"
"github.com/matrixorigin/matrixone/pkg/vm/engine/aoe"
)
func EncodeTable(tbl aoe.TableInfo) ([]byte, error) {
return encoding.Encode(tbl)
}
func DecodeTable(data []byte) (aoe.TableInfo, error) {
var tbl aoe.TableInfo
err := encoding.Decode(data, &tbl)
return tbl, err
}
func EncodeIndex(idx aoe.IndexInfo) ([]byte, error) {
return encoding.Encode(idx)
}
func DecodeIndex(data []byte) (aoe.IndexInfo, error) {
var idx aoe.IndexInfo
err := encoding.Decode(data, &idx)
return idx, err
}
var pool = sync.Pool{
New: func() interface{} {
return make([]byte, 8)
},
}
func EncodeKey(ks ...interface{}) []byte {
var buf bytes.Buffer
for i := 0; i < len(ks); i++ {
k := ks[i]
switch v := k.(type) {
case uint8:
buf.Write(encoding.EncodeUint8(v))
case int:
data := pool.Get().([]byte)
binary.BigEndian.PutUint64(data, uint64(v))
buf.Write(data)
pool.Put(data)
case uint64:
data := pool.Get().([]byte)
binary.BigEndian.PutUint64(data, v)
buf.Write(data)
pool.Put(data)
case []byte:
buf.Write(v)
case string:
buf.WriteString(v)
}
}
return buf.Bytes()
}
func String2Bytes(v string) []byte {
if v == "" {
return nil
}
return unsafe.Slice(
(*byte)(unsafe.Pointer(
(*reflect.StringHeader)(unsafe.Pointer(&v)).Data,
)),
len(v),
)
}
func Bytes2String(v []byte) string {
return *(*string)(unsafe.Pointer(&v))
}
func Uint642Bytes(v uint64) []byte {
var buf bytes.Buffer
data := pool.Get().([]byte)
binary.BigEndian.PutUint64(data, v)
buf.Write(data)
pool.Put(data)
return buf.Bytes()
}
func Bytes2Uint64(v []byte) (b uint64, err error) {
if len(v) != 8 {
return b, fmt.Errorf("invalid data, must 8 bytes, but %d", len(v))
}
return binary.BigEndian.Uint64(v), nil
}
......@@ -22,7 +22,7 @@ import (
errDriver "github.com/matrixorigin/matrixone/pkg/vm/driver/error"
pb3 "github.com/matrixorigin/matrixone/pkg/vm/driver/pb"
"github.com/matrixorigin/matrixone/pkg/vm/engine/aoe/common/codec"
"github.com/matrixorigin/matrixone/pkg/vm/driver"
"github.com/fagongzi/util/protoc"
"github.com/matrixorigin/matrixcube/pb/meta"
......@@ -32,7 +32,6 @@ import (
"github.com/matrixorigin/matrixone/pkg/vm/driver/pb"
"github.com/matrixorigin/matrixcube/util"
// "github.com/matrixorigin/matrixcube/pb/rpc"
)
// Storage memory storage
......@@ -150,7 +149,7 @@ func (ce *kvExecutor) incr(wb util.WriteBatch, req storage.Request) (uint64, []b
id := uint64(0)
if v, ok := ce.attrs[string(req.Key)]; ok {
id, _ = codec.Bytes2Uint64(v.([]byte))
id, _ = driver.Bytes2Uint64(v.([]byte))
} else {
value, err := ce.kv.Get(req.Key)
if err != nil {
......@@ -158,7 +157,7 @@ func (ce *kvExecutor) incr(wb util.WriteBatch, req storage.Request) (uint64, []b
return 0, rep
}
if len(value) > 0 {
id, _ = codec.Bytes2Uint64(value)
id, _ = driver.Bytes2Uint64(value)
}
}
......@@ -168,7 +167,7 @@ func (ce *kvExecutor) incr(wb util.WriteBatch, req storage.Request) (uint64, []b
id += customReq.Batch
}
newV := codec.Uint642Bytes(id)
newV := driver.Uint642Bytes(id)
ce.attrs[string(req.Key)] = newV
wb.Set(req.Key, newV)
......@@ -190,11 +189,11 @@ func (ce *kvExecutor) setIfNotExist(wb util.WriteBatch, req storage.Request) (ui
var rep []byte
if _, ok := ce.attrs[codec.Bytes2String(req.Key)]; ok {
if _, ok := ce.attrs[driver.Bytes2String(req.Key)]; ok {
rep = errDriver.ErrorResp(errors.New("key is already existed"))
return 0, rep
} else {
ce.attrs[codec.Bytes2String(req.Key)] = "1"
ce.attrs[driver.Bytes2String(req.Key)] = "1"
}
value, err := ce.kv.Get(req.Key)
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment