Skip to content
Snippets Groups Projects
Commit ec83b0c9 authored by sukki37's avatar sukki37 Committed by GitHub
Browse files

(AOE): Add distributed kv storage & Implements global metadata interface (#138)


* Init catalog pkg

* Add incr & test

* Add wr ops

* Update catalog kv args definition & modify catalog test

* Add batchset

* Add async call test

* Add prefix scan test

* Re-org 

* Impl dist storage interface 

* Cluster test case

* Init catalog pkg

* Update catalog kv args definition & modify catalog test

* Dist storage interface 

* Callback when dynamic shard created

* Update cb when dynamic group created

* Bug fix for prefixscan

* Fix bug occurred by shared pebble instance.

* Add dispatch interface

* Add create statement

* Remove debug code

* Change data type

* Update cube version

* Add codecs for table def

* Add codecs for table

* Return node address in Dispatch method

* Chaneg info structure and add serialization method

Co-authored-by: default avatarnnsgmsone <nnsmgsone@outlook.com>
parent cd9dceaf
No related branches found
No related tags found
No related merge requests found
Showing
with 11754 additions and 9803 deletions
...@@ -5,20 +5,25 @@ go 1.15 ...@@ -5,20 +5,25 @@ go 1.15
require ( require (
github.com/BurntSushi/toml v0.3.1 github.com/BurntSushi/toml v0.3.1
github.com/cockroachdb/pebble v0.0.0-20210526183633-dd2a545f5d75 github.com/cockroachdb/pebble v0.0.0-20210526183633-dd2a545f5d75
github.com/fagongzi/goetty v2.0.2+incompatible github.com/fagongzi/goetty v1.8.0
github.com/fagongzi/util v0.0.0-20201116094402-221cc40c4593 github.com/fagongzi/log v0.0.0-20201106014031-b41ebf3bd287 // indirect
github.com/fagongzi/util v0.0.0-20210409031311-a10fdf8fbd7a
github.com/frankban/quicktest v1.11.3 // indirect github.com/frankban/quicktest v1.11.3 // indirect
github.com/golang/protobuf v1.4.2 github.com/golang/protobuf v1.4.2
github.com/google/uuid v1.2.0 github.com/google/uuid v1.2.0
github.com/matrixorigin/matrixcube v0.0.0-20210619015613-2f9ba0b3d83c
github.com/orcaman/concurrent-map v0.0.0-20210501183033-44dafcb38ecc
github.com/panjf2000/ants/v2 v2.4.5 github.com/panjf2000/ants/v2 v2.4.5
github.com/pierrec/lz4 v2.6.0+incompatible github.com/pierrec/lz4 v2.6.0+incompatible
github.com/pilosa/pilosa v1.4.0 github.com/pilosa/pilosa v1.4.1
github.com/pingcap/parser v0.0.0-20210310110710-c7333a4927e6 github.com/pingcap/parser v0.0.0-20210310110710-c7333a4927e6
github.com/pkg/errors v0.9.1 github.com/pkg/errors v0.9.1
github.com/sirupsen/logrus v1.2.0 github.com/sirupsen/logrus v1.4.2
github.com/stretchr/testify v1.7.0 github.com/stretchr/testify v1.7.0
github.com/yireyun/go-queue v0.0.0-20210520035143-72b190eafcba github.com/yireyun/go-queue v0.0.0-20210520035143-72b190eafcba
golang.org/x/net v0.0.0-20201110031124-69a78807bb2b // indirect golang.org/x/net v0.0.0-20201110031124-69a78807bb2b // indirect
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f
golang.org/x/tools v0.0.0-20201105001634-bc3cf281b174 // indirect golang.org/x/tools v0.0.0-20201105001634-bc3cf281b174 // indirect
) )
replace go.etcd.io/etcd => github.com/deepfabric/etcd v1.4.15
source diff could not be displayed: it is too large. Options to address this: view the blob.
...@@ -548,7 +548,7 @@ var MySQLErrName = map[uint16]*mysql.ErrMessage{ ...@@ -548,7 +548,7 @@ var MySQLErrName = map[uint16]*mysql.ErrMessage{
ErrColCountDoesntMatchPleaseUpdate: mysql.Message("Column count of mysql.%s is wrong. Expected %d, found %d. Created with MySQL %d, now running %d. Please use mysqlUpgrade to fix this error.", nil), ErrColCountDoesntMatchPleaseUpdate: mysql.Message("Column count of mysql.%s is wrong. Expected %d, found %d. Created with MySQL %d, now running %d. Please use mysqlUpgrade to fix this error.", nil),
ErrTempTablePreventsSwitchOutOfRbr: mysql.Message("Cannot switch out of the row-based binary log format when the session has open temporary tables", nil), ErrTempTablePreventsSwitchOutOfRbr: mysql.Message("Cannot switch out of the row-based binary log format when the session has open temporary tables", nil),
ErrStoredFunctionPreventsSwitchBinlogFormat: mysql.Message("Cannot change the binary logging format inside a stored function or trigger", nil), ErrStoredFunctionPreventsSwitchBinlogFormat: mysql.Message("Cannot change the binary logging format inside a stored function or trigger", nil),
ErrNdbCantSwitchBinlogFormat: mysql.Message("The NDB cluster engine does not support changing the binlog format on the fly yet", nil), ErrNdbCantSwitchBinlogFormat: mysql.Message("The NDB engine engine does not support changing the binlog format on the fly yet", nil),
ErrPartitionNoTemporary: mysql.Message("Cannot create temporary table with partitions", nil), ErrPartitionNoTemporary: mysql.Message("Cannot create temporary table with partitions", nil),
ErrPartitionConstDomain: mysql.Message("Partition constant is out of partition function domain", nil), ErrPartitionConstDomain: mysql.Message("Partition constant is out of partition function domain", nil),
ErrPartitionFunctionIsNotAllowed: mysql.Message("This partition function is not allowed", nil), ErrPartitionFunctionIsNotAllowed: mysql.Message("This partition function is not allowed", nil),
......
package aoe
import (
"bytes"
"encoding/gob"
"matrixone/pkg/container/types"
"matrixone/pkg/encoding"
"matrixone/pkg/sql/protocol"
"matrixone/pkg/vm/engine"
)
func init() {
gob.Register(TableInfo{})
gob.Register(types.Type{})
gob.Register(IndexInfo{})
gob.Register(ColumnInfo{})
}
func Transfer(sid, tid uint64, name, typ, comment string,
defs []engine.TableDef, pdef *engine.PartitionBy) (TableInfo, error) {
var tbl TableInfo
tbl.SchemaId = sid
tbl.Id = tid
tbl.Name = name
tbl.Type = typ
tbl.Comment = []byte(comment)
tbl.Columns = ColumnDefs(sid, tid, defs)
mp := make(map[string]uint64)
{
for _, col := range tbl.Columns {
mp[col.Name] = col.Id
}
}
tbl.Indexs = IndexDefs(sid, tid, mp, defs)
if pdef != nil {
data, err := PartitionDef(pdef)
if err != nil {
return tbl, err
}
tbl.Partition = data
}
return tbl, nil
}
func EncodeTable(tbl TableInfo) ([]byte, error) {
return encoding.Encode(tbl)
}
func DecodeTable(data []byte) (TableInfo, error) {
var tbl TableInfo
err := encoding.Decode(data, &tbl)
return tbl, err
}
func IndexDefs(sid, tid uint64, mp map[string]uint64, defs []engine.TableDef) []IndexInfo {
var id uint64
var idxs []IndexInfo
for _, def := range defs {
if v, ok := def.(*engine.IndexTableDef); ok {
idx := IndexInfo{
SchemaId: sid,
TableId: tid,
Id: id,
Type: uint64(v.Typ),
}
for _, name := range v.Names {
idx.Names = append(idx.Names, name)
idx.Columns = append(idx.Columns, mp[name])
}
idxs = append(idxs, idx)
id++
}
}
return idxs
}
func ColumnDefs(sid, tid uint64, defs []engine.TableDef) []ColumnInfo {
var id uint64
var cols []ColumnInfo
for _, def := range defs {
if v, ok := def.(*engine.AttributeDef); ok {
cols = append(cols, ColumnInfo{
SchemaId: sid,
TableID: tid,
Id: id,
Name: v.Attr.Name,
Alg: v.Attr.Alg,
Type: v.Attr.Type,
})
id++
}
}
return cols
}
func PartitionDef(def *engine.PartitionBy) ([]byte, error) {
var buf bytes.Buffer
if err := protocol.EncodePartition(def, &buf); err != nil {
return nil, err
}
return buf.Bytes(), nil
}
package aoe
import (
"encoding/json"
"fmt"
"log"
"matrixone/pkg/container/types"
"matrixone/pkg/container/vector"
"matrixone/pkg/sql/colexec/extend"
"matrixone/pkg/sql/protocol"
"matrixone/pkg/vm/engine"
"matrixone/pkg/vm/metadata"
"testing"
)
func TestAoe(t *testing.T) {
data, err := EncodeTable(0, 0, "R", "table", "", NewTableDefs(), NewPartition())
if err != nil {
log.Fatal(err)
}
tbl, err := DecodeTable(data)
if err != nil {
log.Fatal(err)
}
{
data, err := json.Marshal(tbl)
if err != nil {
log.Fatal(err)
}
fmt.Printf("tbl: %v\n", string(data))
}
pdef, _, err := protocol.DecodePartition(tbl.Partition)
if err != nil {
log.Fatal(err)
}
fmt.Printf("partition: %v\n", pdef)
}
func NewTableDefs() []engine.TableDef {
var defs []engine.TableDef
defs = append(defs, &engine.IndexTableDef{
Typ: 0,
Names: []string{"a", "b"},
})
defs = append(defs, &engine.AttributeDef{
Attr: metadata.Attribute{
Alg: 0,
Name: "a",
Type: types.Type{Oid: types.T_float64, Size: 8},
},
})
defs = append(defs, &engine.AttributeDef{
Attr: metadata.Attribute{
Alg: 0,
Name: "b",
Type: types.Type{Oid: types.T_varchar, Size: 8},
},
})
return defs
}
func NewPartition() *engine.PartitionBy {
def := new(engine.PartitionBy)
def.Fields = []string{"a", "b"}
{
def.List = append(def.List, NewListPartition())
def.List = append(def.List, NewListPartition())
}
return def
}
func NewListPartition() engine.ListPartition {
var def engine.ListPartition
def.Name = "x"
def.Extends = append(def.Extends, NewValue(1.2))
def.Extends = append(def.Extends, NewValue(3.2))
return def
}
func NewValue(v float64) extend.Extend {
vec := vector.New(types.Type{types.T(types.T_float64), 8, 0, 0})
vec.Append([]float64{v})
return &extend.ValueExtend{vec}
}
package catalog
import (
"bytes"
"encoding/json"
"fmt"
"github.com/fagongzi/util/format"
"github.com/matrixorigin/matrixcube/pb/bhmetapb"
"github.com/matrixorigin/matrixcube/raftstore"
cmap "github.com/orcaman/concurrent-map"
"matrixone/pkg/vm/engine"
"matrixone/pkg/vm/engine/aoe"
"matrixone/pkg/vm/engine/aoe/dist"
"sync"
)
const (
DefaultCatalogId = uint64(1)
)
var (
cPrefix = "/meta"
cDBPrefix = "DB"
cDBIDPrefix = "DBID"
cTablePrefix = "Table"
cTableIDPrefix = "TID"
cStatePrefix = "State"
cRoutePrefix = "Route"
cSegmentPrefix = "Seg"
)
type Catalog struct {
gMutex cmap.ConcurrentMap
store dist.Storage
}
var gCatalog Catalog
var gInitOnce sync.Once
func DefaultCatalog(store dist.Storage) Catalog {
gInitOnce.Do(func() {
gCatalog = Catalog{
gMutex: cmap.New(),
store: store,
}
})
return gCatalog
}
func (c *Catalog) CreateDatabase(dbName string) (uint64, error) {
if _, err := c.checkDBNotExists(dbName); err != nil {
return 0, err
}
v, ok := c.gMutex.Get(string(c.dbIDKey(dbName)))
if !ok {
v = &sync.RWMutex{}
c.gMutex.Set(string(c.dbIDKey(dbName)), v)
}
lock := v.(*sync.RWMutex)
lock.Lock()
defer lock.Unlock()
id, err := c.genGlobalUniqIDs([]byte(cDBIDPrefix))
if err != nil {
return 0, err
}
if err = c.store.Set(c.dbIDKey(dbName), format.Uint64ToBytes(id)); err != nil {
return 0, err
}
info := aoe.SchemaInfo{
State: aoe.StatePublic,
Name: dbName,
Id: id,
CatalogId: 1,
}
value, _ := json.Marshal(info)
if err = c.store.Set(c.dbKey(id), value); err != nil {
return 0, err
}
return id, nil
}
func (c *Catalog) DelDatabase(dbName string) (uint64, error) {
if id, err := c.checkDBExists(dbName); err != nil {
return 0, err
} else {
v, ok := c.gMutex.Get(string(c.dbIDKey(dbName)))
if !ok {
return 0, nil
}
lock := v.(*sync.RWMutex)
lock.Lock()
defer func() {
c.gMutex.Remove(string(c.dbIDKey(dbName)))
lock.Unlock()
}()
value, err := c.store.Get(c.dbKey(id))
if err != nil {
return 0, err
}
db := aoe.SchemaInfo{}
_ = json.Unmarshal(value, &db)
db.State = aoe.StateDeleteOnly
value, _ = json.Marshal(db)
if err = c.store.Set(c.dbKey(id), value); err != nil {
return 0, err
}
err = c.store.Delete(c.dbIDKey(dbName))
// TODO ADD Async Deletion Task Queue (Drop tables & related within deleted db)
return id, err
}
}
func (c *Catalog) GetDBs() ([]aoe.SchemaInfo, error) {
values, err := c.store.PrefixScan(c.dbPrefix(), 0)
if err != nil {
return nil, err
}
var dbs []aoe.SchemaInfo
for i := 1; i < len(values); i = i + 2 {
db := aoe.SchemaInfo{}
_ = json.Unmarshal(values[i], &db)
dbs = append(dbs, db)
}
return dbs, nil
}
func (c *Catalog) GetDB(dbName string) (*aoe.SchemaInfo, error) {
id, err := c.checkDBExists(dbName)
if err != nil {
return nil, err
}
v, ok := c.gMutex.Get(string(c.dbIDKey(dbName)))
if !ok {
return nil, ErrDBNotExists
}
lock := v.(*sync.RWMutex)
lock.RLock()
defer lock.RUnlock()
resp, err := c.store.Get(c.dbKey(id))
if err != nil || resp == nil {
return nil, ErrDBNotExists
}
db := aoe.SchemaInfo{}
_ = json.Unmarshal(resp, &db)
if db.State != aoe.StatePublic {
return nil, ErrDBNotExists
}
return &db, nil
}
func (c *Catalog) CreateTable(dbName string, tableName string, typ string, comment string, tableDefs []engine.TableDef, pdef *engine.PartitionBy, createStatement []byte) (uint64, error) {
dbId, err := c.checkDBExists(dbName)
if err != nil {
return 0, err
}
err = c.checkTableNotExists(dbId, tableName)
if err != nil {
return 0, err
}
v, ok := c.gMutex.Get(string(c.tableIDKey(dbId, tableName)))
if !ok {
v = &sync.RWMutex{}
c.gMutex.Set(string(c.tableIDKey(dbId, tableName)), v)
}
lock := v.(*sync.RWMutex)
lock.RLock()
defer lock.RUnlock()
tid, err := c.genGlobalUniqIDs([]byte(cTableIDPrefix))
if err != nil {
return 0, err
}
err = c.store.Set(c.tableIDKey(dbId, tableName), format.Uint64ToBytes(tid))
if err != nil {
return 0, err
}
tInfo, err := aoe.Transfer(dbId, tid, tableName, typ, comment, tableDefs, pdef)
if err != nil {
return 0, ErrTableCreateFailed
}
tInfo.State = aoe.StateNone
meta, err := aoe.EncodeTable(tInfo)
if err != nil {
return 0, ErrTableCreateFailed
}
//save metadata to kv
err = c.store.Set(c.tableKey(dbId, tid), meta)
if err != nil {
return 0, err
}
// TODO: support shared tables
// created shards
client := c.store.RaftStore().Prophet().GetClient()
tKey := c.tableKey(dbId, tid)
rKey := c.routePrefix(dbId, tid)
header := format.Uint64ToBytes(uint64(len(tKey) + len(rKey) + len([]byte("#"))))
buf := bytes.Buffer{}
buf.Write(header)
buf.Write(tKey)
buf.Write([]byte("#"))
buf.Write(rKey)
buf.Write(meta)
err = client.AsyncAddResources(raftstore.NewResourceAdapterWithShard(
bhmetapb.Shard{
Start: format.Uint64ToBytes(tid),
End: format.Uint64ToBytes(tid + 1),
Unique: "gTable-" + string(format.UInt64ToString(tid)),
Group: uint64(aoe.AOEGroup),
Data: buf.Bytes(),
}))
// TODO: wait table meta state changed?
if err != nil {
return 0, err
}
return tid, nil
}
func (c *Catalog) GetTables(dbName string) ([]aoe.TableInfo, error) {
if id, err := c.checkDBExists(dbName); err != nil {
return nil, err
} else {
v, ok := c.gMutex.Get(string(c.dbIDKey(dbName)))
if !ok {
return nil, ErrDBNotExists
}
lock := v.(*sync.RWMutex)
lock.RLock()
defer lock.RUnlock()
values, err := c.store.PrefixScan(c.tablePrefix(id), 0)
if err != nil {
return nil, err
}
var tables []aoe.TableInfo
for i := 1; i < len(values); i = i + 2 {
t, _ := aoe.DecodeTable(values[i])
if t.State != aoe.StatePublic {
continue
}
tables = append(tables, t)
}
return tables, nil
}
}
func (c *Catalog) GetTable(dbName string, tableName string) (*aoe.TableInfo, error) {
if dbId, err := c.checkDBExists(dbName); err != nil {
return nil, err
} else {
tid, err := c.checkTableExists(dbId, tableName)
if err != nil {
return nil, err
}
v, ok := c.gMutex.Get(string(c.tableIDKey(dbId, tableName)))
if !ok {
return nil, ErrTableNotExists
}
lock := v.(*sync.RWMutex)
lock.RLock()
defer lock.RUnlock()
value, err := c.store.Get(c.tableKey(dbId, tid))
if err != nil || value == nil {
return nil, ErrTableNotExists
}
t, _ := aoe.DecodeTable(value)
if t.State != aoe.StatePublic {
return nil, err
}
return &t, nil
}
}
func (c *Catalog) DispatchQueries(dbName string, tableName string) ([]aoe.RouteInfo, error) {
items := make(map[uint64]map[uint64][]aoe.SegmentInfo)
if dbId, err := c.checkDBExists(dbName); err != nil {
return nil, err
} else {
tid, err := c.checkTableExists(dbId, tableName)
if err != nil {
return nil, err
}
v, ok := c.gMutex.Get(string(c.tableIDKey(dbId, tableName)))
if !ok {
return nil, ErrTableNotExists
}
lock := v.(*sync.RWMutex)
lock.RLock()
defer lock.RUnlock()
values, err := c.store.PrefixScan(c.routePrefix(dbId, tid), 0)
if err != nil {
return nil, err
}
for i := 0; i < len(values); i = i + 2 {
keys := bytes.Split(values[i], []byte("/"))
pId := format.MustBytesToUint64(keys[len(keys)-1])
gId := format.MustBytesToUint64(keys[len(keys)-2])
value := values[i+1]
seg := aoe.SegmentInfo{}
_ = json.Unmarshal(value, &seg)
items[gId][pId] = append(items[gId][pId], seg)
}
}
var resp []aoe.RouteInfo
for gId, p := range items {
resp = append(resp, aoe.RouteInfo{
Node: []byte(c.store.RaftStore().GetRouter().LeaderAddress(gId)),
GroupId: gId,
Segments: p,
})
}
return resp, nil
}
func (c *Catalog) checkDBExists(dbName string) (uint64, error) {
v, ok := c.gMutex.Get(string(c.dbIDKey(dbName)))
if !ok {
return 0, ErrDBNotExists
}
lock := v.(*sync.RWMutex)
lock.RLock()
defer lock.RUnlock()
if value, err := c.store.Get(c.dbIDKey(dbName)); err != nil || value == nil {
return 0, ErrDBNotExists
} else {
id := format.MustBytesToUint64(value)
if v, err := c.store.Get(c.dbKey(id)); err != nil || value == nil {
return 0, ErrDBNotExists
} else {
db := aoe.SchemaInfo{}
if err = json.Unmarshal(v, &db); err != nil {
return 0, ErrDBNotExists
}
if db.State == aoe.StateDeleteOnly {
return 0, ErrDBNotExists
}
}
return id, nil
}
}
func (c *Catalog) checkDBNotExists(dbName string) (bool, error) {
_, err := c.checkDBExists(dbName)
if err == ErrDBNotExists {
return true, nil
}
return false, ErrDBCreateExists
}
func (c *Catalog) checkTableExists(dbId uint64, tableName string) (uint64, error) {
v, ok := c.gMutex.Get(string(c.tableIDKey(dbId, tableName)))
if !ok {
return 0, ErrTableNotExists
}
lock := v.(*sync.RWMutex)
lock.RLock()
defer lock.RUnlock()
if value, err := c.store.Get(c.tableIDKey(dbId, tableName)); err != nil || value == nil {
return 0, ErrTableNotExists
} else {
id := format.MustBytesToUint64(value)
if v, err := c.store.Get(c.tableKey(dbId, id)); err != nil || value == nil {
return 0, ErrTableNotExists
} else {
if table, err := aoe.DecodeTable(v); err != nil {
return 0, ErrTableNotExists
}else {
if table.State == aoe.StateDeleteOnly {
return 0, ErrTableNotExists
}
}
}
return id, nil
}
}
func (c *Catalog) checkTableNotExists(dbId uint64, tableName string) error {
_, err := c.checkTableExists(dbId, tableName)
if err == ErrTableNotExists {
return nil
}
return ErrTableCreateExists
}
func (c *Catalog) genGlobalUniqIDs(idKey []byte) (uint64, error) {
id, err := c.store.AllocID(idKey)
if err != nil {
return 0, err
}
return id, nil
}
//where to generate id
func (c *Catalog) dbIDKey(dbName string) []byte {
return []byte(fmt.Sprintf("%s/%d/%s/%s", cPrefix, DefaultCatalogId, cDBIDPrefix, dbName))
}
func (c *Catalog) dbKey(id uint64) []byte {
return []byte(fmt.Sprintf("%s/%d/%s/%d", cPrefix, DefaultCatalogId, cDBPrefix, id))
}
func (c *Catalog) dbPrefix() []byte {
return []byte(fmt.Sprintf("%s/%d/%s/", cPrefix, DefaultCatalogId, cDBPrefix))
}
func (c *Catalog) tableIDKey(dbId uint64, tableName string) []byte {
return []byte(fmt.Sprintf("%s/%d/%s/%d/%s", cPrefix, DefaultCatalogId, cTableIDPrefix, dbId, tableName))
}
func (c *Catalog) tableKey(dbId uint64, tId uint64) []byte {
return []byte(fmt.Sprintf("%s/%d/%s/%d/%d", cPrefix, DefaultCatalogId, cTablePrefix, dbId, tId))
}
func (c *Catalog) tablePrefix(dbId uint64) []byte {
return []byte(fmt.Sprintf("%s/%d/%s/%d/", cPrefix, DefaultCatalogId, cTablePrefix, dbId))
}
func (c *Catalog) routeKey(dbId uint64, tId uint64, gId uint64, pId uint64) []byte {
return []byte(fmt.Sprintf("%s/%d/%s/%d/%d/%d/%d", cPrefix, DefaultCatalogId, cRoutePrefix, dbId, tId, gId, pId))
}
func (c *Catalog) routePrefix(dbId uint64, tId uint64) []byte {
return []byte(fmt.Sprintf("%s/%d/%s/%d/%d/", cPrefix, DefaultCatalogId, cRoutePrefix, dbId, tId))
}
package catalog
import (
"encoding/json"
"fmt"
"github.com/matrixorigin/matrixcube/components/prophet/util"
"github.com/matrixorigin/matrixcube/components/prophet/util/typeutil"
"github.com/matrixorigin/matrixcube/config"
"github.com/matrixorigin/matrixcube/server"
"github.com/matrixorigin/matrixcube/storage/mem"
"github.com/matrixorigin/matrixcube/storage/pebble"
"github.com/stretchr/testify/require"
stdLog "log"
"matrixone/pkg/container/types"
"matrixone/pkg/vm/engine"
"matrixone/pkg/vm/engine/aoe"
"matrixone/pkg/vm/engine/aoe/dist"
"matrixone/pkg/vm/metadata"
"os"
"testing"
"time"
)
var (
tmpDir = "./cube-test"
dbName = "test_db1"
tableName = "test_tb"
cols = []engine.TableDef{
&engine.AttributeDef{
Attr: metadata.Attribute{
Name: "col1",
Type: types.Type{},
Alg: 0,
},
},
&engine.AttributeDef{
Attr: metadata.Attribute{
Name: "col2",
Type: types.Type{},
Alg: 0,
},
},
}
)
func recreateTestTempDir() (err error) {
err = os.RemoveAll(tmpDir)
if err != nil {
return err
}
err = os.MkdirAll(tmpDir, os.ModeDir)
return err
}
func cleanupTmpDir() error {
return os.RemoveAll(tmpDir)
}
type testCluster struct {
t *testing.T
applications []dist.Storage
}
func newTestClusterStore(t *testing.T) (*testCluster, error) {
if err := recreateTestTempDir(); err != nil {
return nil, err
}
util.SetLogger(&emptyLog{})
c := &testCluster{t: t}
for i := 0; i < 3; i++ {
metaStorage, err := pebble.NewStorage(fmt.Sprintf("%s/pebble/meta-%d", tmpDir, i))
if err != nil {
return nil, err
}
pebbleDataStorage, err := pebble.NewStorage(fmt.Sprintf("%s/pebble/data-%d", tmpDir, i))
if err != nil {
return nil, err
}
memDataStorage := mem.NewStorage()
if err != nil {
return nil, err
}
a, err := dist.NewStorageWithOptions(metaStorage, pebbleDataStorage, memDataStorage, func(cfg *config.Config) {
cfg.DataPath = fmt.Sprintf("%s/node-%d", tmpDir, i)
cfg.RaftAddr = fmt.Sprintf("127.0.0.1:1000%d", i)
cfg.ClientAddr = fmt.Sprintf("127.0.0.1:2000%d", i)
cfg.Replication.ShardHeartbeatDuration = typeutil.NewDuration(time.Millisecond * 100)
cfg.Replication.StoreHeartbeatDuration = typeutil.NewDuration(time.Second)
cfg.Raft.TickInterval = typeutil.NewDuration(time.Millisecond * 100)
cfg.Prophet.Name = fmt.Sprintf("node-%d", i)
cfg.Prophet.StorageNode = true
cfg.Prophet.RPCAddr = fmt.Sprintf("127.0.0.1:3000%d", i)
if i != 0 {
cfg.Prophet.EmbedEtcd.Join = "http://127.0.0.1:40000"
}
cfg.Prophet.EmbedEtcd.ClientUrls = fmt.Sprintf("http://127.0.0.1:4000%d", i)
cfg.Prophet.EmbedEtcd.PeerUrls = fmt.Sprintf("http://127.0.0.1:5000%d", i)
cfg.Prophet.Schedule.EnableJointConsensus = true
}, server.Cfg{
Addr: fmt.Sprintf("127.0.0.1:809%d", i),
})
if err != nil {
return nil, err
}
c.applications = append(c.applications, a)
}
return c, nil
}
func (c *testCluster) stop() {
for _, s := range c.applications {
s.Close()
}
}
func TestClusterStartAndStop(t *testing.T) {
defer cleanupTmpDir()
c, err := newTestClusterStore(t)
defer c.stop()
time.Sleep(2 * time.Second)
require.NoError(t, err)
stdLog.Printf("app all started.")
catalog := DefaultCatalog(c.applications[0])
//testDBDDL(t, catalog)
testTableDDL(t, catalog)
}
func testTableDDL(t *testing.T, c Catalog) {
tbs, err := c.GetTables(dbName)
require.Error(t, ErrDBNotExists, err)
id, err := c.CreateDatabase(dbName)
require.NoError(t, err)
require.Less(t, uint64(0), id)
tbs, err = c.GetTables(dbName)
require.NoError(t, err)
require.Nil(t, tbs)
tid, err := c.CreateTable(dbName, tableName, "", "", cols, nil, nil)
require.NoError(t, err)
require.Less(t, uint64(0), tid)
completedC := make(chan *aoe.TableInfo, 1)
defer close(completedC)
go func() {
for {
tb, _ := c.GetTable(dbName, tableName)
if tb != nil {
completedC <- tb
break
}
}
}()
select {
case <-completedC:
stdLog.Printf("create %s finished", tableName)
break
case <-time.After(3 * time.Second):
stdLog.Printf("create %s failed, timeout", tableName)
}
tid, err = c.CreateTable(dbName, tableName, "", "", cols, nil, nil)
require.Equal(t, ErrTableCreateExists, err)
for i := 1; i < 10; i++ {
tid2, err := c.CreateTable(dbName, fmt.Sprintf("%s%d", tableName, i), "", "", cols, nil, nil)
require.NoError(t, err)
require.Less(t, tid, tid2)
}
time.Sleep(5 * time.Second)
tbs, err = c.GetTables(dbName)
for _, tb := range tbs {
s, _ := json.Marshal(tb)
stdLog.Println(string(s))
}
require.NoError(t, err)
require.Equal(t, 10, len(tbs))
}
func testDBDDL(t *testing.T, c Catalog) {
dbs, err := c.GetDBs()
require.NoError(t, err)
require.Nil(t, dbs)
id, err := c.CreateDatabase(dbName)
require.NoError(t, err)
require.Equal(t, uint64(1), id)
id, err = c.CreateDatabase(dbName)
require.Equal(t, ErrDBCreateExists, err)
dbs, err = c.GetDBs()
require.NoError(t, err)
require.Equal(t, 1, len(dbs))
db, err := c.GetDB(dbName)
require.NoError(t, err)
require.Equal(t, dbName, db.Name)
id, err = c.DelDatabase(dbName)
require.NoError(t, err)
require.Equal(t, uint64(1), id)
db, err = c.GetDB(dbName)
require.Error(t, ErrDBNotExists, err)
}
type emptyLog struct{}
func (l *emptyLog) Info(v ...interface{}) {
}
func (l *emptyLog) Infof(format string, v ...interface{}) {
stdLog.Printf(format, v...)
}
func (l *emptyLog) Debug(v ...interface{}) {
}
func (l *emptyLog) Debugf(format string, v ...interface{}) {
}
func (l *emptyLog) Warning(v ...interface{}) {
}
func (l *emptyLog) Warningf(format string, v ...interface{}) {
}
func (l *emptyLog) Error(v ...interface{}) {
}
func (l *emptyLog) Errorf(format string, v ...interface{}) {
stdLog.Printf(format, v...)
}
func (l *emptyLog) Fatal(v ...interface{}) {
stdLog.Panic(v...)
}
func (l *emptyLog) Fatalf(format string, v ...interface{}) {
stdLog.Panicf(format, v...)
}
package catalog
import (
"errors"
)
var (
// ErrDBCreate is the err
ErrDBCreate = errors.New("db create failed")
// ErrDBCreateExists is the error for db exists.
ErrDBCreateExists = errors.New("db already exists")
// ErrDBNotExists is the error for db not exists.
ErrDBNotExists = errors.New("db not exist")
// ErrTableCreateExists is the error for table exists.
ErrTableCreateExists = errors.New("table already exists")
// ErrTableNotExists is the error for table not exists.
ErrTableNotExists = errors.New("table not exist")
ErrTableCreateFailed = errors.New("create table failed")
)
package cmd
package dist
import (
"github.com/matrixorigin/matrixcube/pb/bhmetapb"
)
func (h *aoeStorage) addShardCallback(shard bhmetapb.Shard) error {
return nil
}
func (h *aoeStorage) Created(shard bhmetapb.Shard) {
}
func (h *aoeStorage) Splited(shard bhmetapb.Shard) {
}
func (h *aoeStorage) Destory(shard bhmetapb.Shard) {
}
func (h *aoeStorage) BecomeLeader(shard bhmetapb.Shard) {
}
func (h *aoeStorage) BecomeFollower(shard bhmetapb.Shard) {
}
func (h *aoeStorage) SnapshotApplied(shard bhmetapb.Shard) {
}
package dist
import "errors"
var (
ErrCMDNotSupport = errors.New("command is not support")
ErrMarshalFailed = errors.New("request marshal has failed")
ErrInvalidValue = errors.New("value is invalid")
ErrShardNotExisted = errors.New("shard is not existed")
)
func errorResp(err error) []byte {
return []byte(err.Error())
}
package dist
import (
"encoding/json"
"github.com/fagongzi/goetty/codec"
"github.com/matrixorigin/matrixcube/command"
"github.com/matrixorigin/matrixcube/pb/raftcmdpb"
)
type cmdType uint64
const (
Set cmdType = iota
SetWithTTL
Incr
Del
Get
PrefixScan
Scan
)
func (h *aoeStorage) init() {
h.AddWriteFunc(uint64(Set), h.set)
h.AddWriteFunc(uint64(Del), h.del)
h.AddWriteFunc(uint64(Incr), h.incr)
h.AddReadFunc(uint64(Get), h.get)
h.AddReadFunc(uint64(PrefixScan), h.prefixScan)
h.AddReadFunc(uint64(Scan), h.scan)
}
func (h *aoeStorage) BuildRequest(req *raftcmdpb.Request, i interface{}) error {
op := i.(Args)
if _, ok := h.cmds[op.Op]; !ok {
return ErrCMDNotSupport
}
req.Key = op.Args[0]
req.CustemType = op.Op
req.Type = h.cmds[op.Op]
cmd, err := json.Marshal(op)
if err != nil {
return err
}
req.Cmd = cmd
return nil
}
func (h *aoeStorage) Codec() (codec.Encoder, codec.Decoder) {
return nil, nil
}
// AddReadFunc add read handler func
func (h *aoeStorage) AddReadFunc(cmdType uint64, cb command.ReadCommandFunc) {
h.cmds[cmdType] = raftcmdpb.CMDType_Read
h.store.RegisterReadFunc(cmdType, cb)
}
// AddWriteFunc add write handler func
func (h *aoeStorage) AddWriteFunc(cmdType uint64, cb command.WriteCommandFunc) {
h.cmds[cmdType] = raftcmdpb.CMDType_Write
h.store.RegisterWriteFunc(cmdType, cb)
}
package dist
import (
"bytes"
"encoding/json"
"github.com/fagongzi/util/format"
"github.com/matrixorigin/matrixcube/command"
"github.com/matrixorigin/matrixcube/pb"
"github.com/matrixorigin/matrixcube/pb/bhmetapb"
"github.com/matrixorigin/matrixcube/pb/raftcmdpb"
"github.com/matrixorigin/matrixcube/raftstore"
"github.com/sirupsen/logrus"
)
func (h *aoeStorage) set(shard bhmetapb.Shard, req *raftcmdpb.Request, ctx command.Context) (uint64, int64, *raftcmdpb.Response) {
resp := pb.AcquireResponse()
args := &Args{}
err := json.Unmarshal(req.Cmd, &args)
if err != nil {
resp.Value = errorResp(err)
return 0, 0, resp
}
if len(args.Args) <= 1 {
resp.Value = errorResp(ErrInvalidValue)
return 0, 0, resp
}
err = h.getStoreByGroup(shard.Group, shard.ID).Set(req.Key, args.Args[1])
if err != nil {
resp.Value = errorResp(err)
return 0, 0, resp
}
writtenBytes := uint64(len(req.Key) + len(args.Args[1]))
changedBytes := int64(writtenBytes)
resp.Value = []byte("OK")
return writtenBytes, changedBytes, resp
}
func (h *aoeStorage) del(shard bhmetapb.Shard, req *raftcmdpb.Request, ctx command.Context) (uint64, int64, *raftcmdpb.Response) {
resp := pb.AcquireResponse()
err := h.getStoreByGroup(shard.Group, shard.ID).Delete(req.Key)
if err != nil {
resp.Value = errorResp(err)
return 0, 0, resp
}
writtenBytes := uint64(len(req.Key))
changedBytes := int64(writtenBytes)
resp.Value = []byte("OK")
return writtenBytes, changedBytes, resp
}
func (h *aoeStorage) batchSet(shard bhmetapb.Shard, req *raftcmdpb.Request, ctx command.Context) (uint64, int64, *raftcmdpb.Response) {
resp := pb.AcquireResponse()
args := &Args{}
err := json.Unmarshal(req.Cmd, &args)
if err != nil {
resp.Value = errorResp(err)
return 0, 0, resp
}
if len(args.Args)%2 != 0 {
resp.Value = errorResp(ErrInvalidValue)
return 0, 0, resp
}
writtenBytes := uint64(0)
for i := 0; i < len(args.Args)/2; i++ {
key := raftstore.EncodeDataKey(shard.Group, args.Args[2*i])
err = ctx.WriteBatch().Set(key, args.Args[2*i+1])
writtenBytes += uint64(len(key))
writtenBytes += uint64(len(args.Args[2*i+1]))
}
changedBytes := int64(writtenBytes)
resp.Value = []byte("OK")
return writtenBytes, changedBytes, resp
}
func (h *aoeStorage) get(shard bhmetapb.Shard, req *raftcmdpb.Request, ctx command.Context) (*raftcmdpb.Response, uint64) {
resp := pb.AcquireResponse()
value, err := h.getStoreByGroup(shard.Group, req.ToShard).Get(req.Key)
if err != nil {
resp.Value = errorResp(err)
return resp, 500
}
resp.Value = value
return resp, 0
}
func (h *aoeStorage) prefixScan(shard bhmetapb.Shard, req *raftcmdpb.Request, ctx command.Context) (*raftcmdpb.Response, uint64) {
logrus.Infof("prefixScan, req.key is %s, shard.Start is %s, shard.End is %s", string(req.Key), string(shard.Start), string(shard.End))
logrus.Infof("prefixScan, %d, %d", bytes.Compare(shard.Start, req.Key), bytes.Compare(shard.End, req.Key))
logrus.Infof("prefixScan, %d, %d", bytes.Compare(raftstore.EncodeDataKey(shard.Group, shard.Start), req.Key), bytes.Compare(raftstore.EncodeDataKey(shard.Group, shard.End), req.Key))
resp := pb.AcquireResponse()
args := &Args{}
err := json.Unmarshal(req.Cmd, &args)
if err != nil {
resp.Value = errorResp(err)
return resp, 500
}
prefix := raftstore.EncodeDataKey(shard.Group, args.Args[1])
var data [][]byte
err = h.getStoreByGroup(shard.Group, req.ToShard).PrefixScan(prefix, func(key, value []byte) (bool, error) {
if (shard.Start != nil && bytes.Compare(shard.Start, raftstore.DecodeDataKey(key)) > 0) ||
(shard.End != nil && bytes.Compare(shard.End, raftstore.DecodeDataKey(key)) <= 0) {
return true, nil
}
data = append(data, key)
data = append(data, value)
return true, nil
}, false)
if err != nil {
resp.Value = errorResp(err)
return resp, 500
}
if data != nil && shard.End != nil {
data = append(data, shard.End)
}
if data != nil {
resp.Value, err = json.Marshal(data)
}
if err != nil {
resp.Value = errorResp(err)
return resp, 500
}
return resp, 0
}
func (h *aoeStorage) scan(shard bhmetapb.Shard, req *raftcmdpb.Request, ctx command.Context) (*raftcmdpb.Response, uint64) {
resp := pb.AcquireResponse()
var data [][]byte
err := h.getStoreByGroup(shard.Group, req.ToShard).PrefixScan(req.Key, func(key, value []byte) (bool, error) {
data = append(data, key)
data = append(data, value)
return true, nil
}, false)
if err != nil {
resp.Value = errorResp(err)
return resp, 500
}
if data != nil && shard.End != nil {
/* lastKey := data[len(data)-2]
if */
}
resp.Value, err = json.Marshal(data)
if err != nil {
resp.Value = errorResp(err)
return resp, 500
}
return resp, 0
}
func (h *aoeStorage) incr(shard bhmetapb.Shard, req *raftcmdpb.Request, ctx command.Context) (uint64, int64, *raftcmdpb.Response) {
resp := pb.AcquireResponse()
args := &Args{}
err := json.Unmarshal(req.Cmd, &args)
if err != nil {
resp.Value = []byte(err.Error())
return 0, 0, resp
}
id := uint64(0)
if v, ok := ctx.Attrs()[string(req.Key)]; ok {
id = format.MustBytesToUint64(v.([]byte))
} else {
value, err := h.getStoreByGroup(shard.Group, req.ToShard).Get(req.Key)
if err != nil {
return 0, 0, resp
}
if len(value) > 0 {
id = format.MustBytesToUint64(value)
}
}
id++
newV := format.Uint64ToBytes(id)
ctx.Attrs()[string(req.Key)] = newV
err = ctx.WriteBatch().Set(req.Key, newV)
if err != nil {
return 0, 0, resp
}
writtenBytes := uint64(len(req.Key))
changedBytes := int64(writtenBytes)
resp.Value = newV
return writtenBytes, changedBytes, resp
}
package dist
import (
"fmt"
"github.com/matrixorigin/matrixcube/components/prophet/util"
"github.com/matrixorigin/matrixcube/components/prophet/util/typeutil"
"github.com/matrixorigin/matrixcube/config"
"github.com/matrixorigin/matrixcube/pb/bhmetapb"
"github.com/matrixorigin/matrixcube/raftstore"
"github.com/matrixorigin/matrixcube/server"
"github.com/matrixorigin/matrixcube/storage/mem"
"github.com/matrixorigin/matrixcube/storage/pebble"
"github.com/stretchr/testify/assert"
stdLog "log"
"matrixone/pkg/vm/engine/aoe"
"os"
"testing"
"time"
)
var (
tmpDir = "./cube-test"
)
func recreateTestTempDir() (err error) {
err = os.RemoveAll(tmpDir)
if err != nil {
return err
}
err = os.MkdirAll(tmpDir, os.ModeDir)
return err
}
func cleanupTmpDir() error {
return os.RemoveAll(tmpDir)
}
type testCluster struct {
t *testing.T
applications []Storage
}
func newTestClusterStore(t *testing.T) (*testCluster, error) {
if err := recreateTestTempDir(); err != nil {
return nil, err
}
util.SetLogger(&emptyLog{})
c := &testCluster{t: t}
for i := 0; i < 3; i++ {
metaStorage, err := pebble.NewStorage(fmt.Sprintf("%s/pebble/meta-%d", tmpDir, i))
if err != nil {
return nil, err
}
pebbleDataStorage, err := pebble.NewStorage(fmt.Sprintf("%s/pebble/data-%d", tmpDir, i))
if err != nil {
return nil, err
}
memDataStorage := mem.NewStorage()
if err != nil {
return nil, err
}
a, err := NewStorageWithOptions(metaStorage, pebbleDataStorage, memDataStorage, func(cfg *config.Config) {
cfg.DataPath = fmt.Sprintf("%s/node-%d", tmpDir, i)
cfg.RaftAddr = fmt.Sprintf("127.0.0.1:1000%d", i)
cfg.ClientAddr = fmt.Sprintf("127.0.0.1:2000%d", i)
cfg.Replication.ShardHeartbeatDuration = typeutil.NewDuration(time.Millisecond * 100)
cfg.Replication.StoreHeartbeatDuration = typeutil.NewDuration(time.Second)
cfg.Raft.TickInterval = typeutil.NewDuration(time.Millisecond * 100)
cfg.Prophet.Name = fmt.Sprintf("node-%d", i)
cfg.Prophet.StorageNode = true
cfg.Prophet.RPCAddr = fmt.Sprintf("127.0.0.1:3000%d", i)
if i != 0 {
cfg.Prophet.EmbedEtcd.Join = "http://127.0.0.1:40000"
}
cfg.Prophet.EmbedEtcd.ClientUrls = fmt.Sprintf("http://127.0.0.1:4000%d", i)
cfg.Prophet.EmbedEtcd.PeerUrls = fmt.Sprintf("http://127.0.0.1:5000%d", i)
cfg.Prophet.Schedule.EnableJointConsensus = true
}, server.Cfg{
Addr: fmt.Sprintf("127.0.0.1:808%d", i),
})
if err != nil {
return nil, err
}
c.applications = append(c.applications, a)
}
return c, nil
}
func (c *testCluster) stop() {
for _, s := range c.applications {
s.Close()
}
}
func TestClusterStartAndStop(t *testing.T) {
defer cleanupTmpDir()
c, err := newTestClusterStore(t)
defer c.stop()
time.Sleep(2 * time.Second)
assert.NoError(t, err)
stdLog.Printf("app all started.")
//Set Test
resp, err := c.applications[0].Exec(Args{
Op: uint64(Set),
Args: [][]byte{
[]byte("hello"),
[]byte("world"),
},
})
assert.NoError(t, err)
assert.Equal(t, "OK", string(resp))
//Get Test
value, err := c.applications[0].Exec(Args{
Op: uint64(Get),
Args: [][]byte{
[]byte("hello"),
},
})
assert.NoError(t, err)
assert.Equal(t, value, []byte("world"))
// To Shard(Not Existed) Get Test
gValue, err := c.applications[0].ExecWithGroup(Args{
Op: uint64(Get),
Args: [][]byte{
[]byte("hello"),
},
ShardId: 13,
}, aoe.AOEGroup)
assert.Error(t, err, ErrShardNotExisted)
assert.Nil(t, gValue)
// Dynamic Create Shard Test
client := c.applications[0].RaftStore().Prophet().GetClient()
err = client.AsyncAddResources(raftstore.NewResourceAdapterWithShard(
bhmetapb.Shard{
Start: []byte("2"),
End: []byte("3"),
Unique: "gTable1",
Group: uint64(aoe.AOEGroup),
}))
//
assert.NoError(t, err)
time.Sleep(5 * time.Second)
// Get With Group Test
gValue, err = c.applications[0].ExecWithGroup(Args{
Op: uint64(Get),
Args: [][]byte{
[]byte("hello"),
},
ShardId: 13,
}, aoe.AOEGroup)
assert.NoError(t, err)
assert.Nil(t, gValue)
// Set With Group Test
resp, err = c.applications[0].ExecWithGroup(Args{
Op: uint64(Set),
Args: [][]byte{
[]byte("hello"),
[]byte("world"),
},
ShardId: 13,
}, aoe.AOEGroup)
assert.NoError(t, err)
assert.Equal(t, "OK", string(resp))
// Get With Group Test
gValue, err = c.applications[0].ExecWithGroup(Args{
Op: uint64(Get),
Args: [][]byte{
[]byte("hello"),
},
ShardId: 13,
}, aoe.AOEGroup)
assert.NoError(t, err)
assert.Equal(t, gValue, []byte("world"))
}
type emptyLog struct{}
func (l *emptyLog) Info(v ...interface{}) {
}
func (l *emptyLog) Infof(format string, v ...interface{}) {
stdLog.Printf(format, v...)
}
func (l *emptyLog) Debug(v ...interface{}) {
}
func (l *emptyLog) Debugf(format string, v ...interface{}) {
}
func (l *emptyLog) Warning(v ...interface{}) {
}
func (l *emptyLog) Warningf(format string, v ...interface{}) {
}
func (l *emptyLog) Error(v ...interface{}) {
}
func (l *emptyLog) Errorf(format string, v ...interface{}) {
stdLog.Printf(format, v...)
}
func (l *emptyLog) Fatal(v ...interface{}) {
stdLog.Panic(v...)
}
func (l *emptyLog) Fatalf(format string, v ...interface{}) {
stdLog.Panicf(format, v...)
}
package dist
import (
"fmt"
"github.com/matrixorigin/matrixcube/storage/pebble"
"github.com/stretchr/testify/assert"
"strings"
"testing"
)
func TestPebbleStorage(t *testing.T) {
a := "hello world"
p := "hello"
println(strings.HasPrefix(a, p))
s, err := pebble.NewStorage("./pebble-test/data")
defer s.Close()
assert.NoError(t, err)
t.Run("prefix", func(t *testing.T) {
prefix := "/m/db"
for i := 1; i <= 3; i++ {
key := []byte(fmt.Sprintf("%v/%v/%d", prefix, "defaultdb", i))
err := s.Set(key, []byte{byte(0)})
assert.NoError(t, err)
}
err := s.PrefixScan([]byte(fmt.Sprintf("%v/%v", prefix, "defaultdb")),
func(key, value []byte) (bool, error) {
println(string(key), value[0])
return true, nil
}, false)
assert.NoError(t, err)
})
t.Run("scan", func(t *testing.T) {
prefix := "/m/db"
for i := 1; i <= 3; i++ {
key := []byte(fmt.Sprintf("%v/%v/%d", prefix, "defaultdb", i))
err := s.Set(key, []byte{byte(1)})
assert.NoError(t, err)
}
err := s.Scan([]byte(fmt.Sprintf("%v/%v/%d", prefix, "defaultdb", 1)),
[]byte(fmt.Sprintf("%v/%v/%d", prefix, "defaultdb", 4)),
func(key, value []byte) (bool, error) {
println(string(key), value[0])
return true, nil
},
false)
assert.NoError(t, err)
})
t.Run("get set", func(t *testing.T) {
pairs := [][]byte{
[]byte("k1"),
[]byte("v1"),
[]byte("k2"),
[]byte("v2"),
}
err := s.BatchSet(pairs...)
assert.NoError(t, err)
defer func() {
assert.NoError(t, s.BatchDelete(pairs...))
value, err := s.Get([]byte("k1"))
assert.NoError(t, err)
assert.Nil(t, value)
}()
value, err := s.Get([]byte("k1"))
assert.NoError(t, err)
assert.Equal(t, []byte("v1"), value)
})
}
package dist
import (
"bytes"
"encoding/json"
"fmt"
"github.com/fagongzi/util/format"
"github.com/matrixorigin/matrixcube/aware"
"github.com/matrixorigin/matrixcube/components/prophet/metadata"
"github.com/matrixorigin/matrixcube/components/prophet/pb/metapb"
"github.com/matrixorigin/matrixcube/config"
"github.com/matrixorigin/matrixcube/pb/bhmetapb"
"github.com/matrixorigin/matrixcube/pb/raftcmdpb"
"github.com/matrixorigin/matrixcube/proxy"
"github.com/matrixorigin/matrixcube/raftstore"
"github.com/matrixorigin/matrixcube/server"
cstorage "github.com/matrixorigin/matrixcube/storage"
"matrixone/pkg/vm/engine/aoe"
"sync"
"time"
)
const (
defaultRPCTimeout = time.Second * 3
)
// Storage storage
type Storage interface {
// Start the storage
Start() error
// Close close the storage
Close()
// Set set key value
Set([]byte, []byte) error
// SetWithTTL Set set key value with a TTL in seconds
SetWithTTL([]byte, []byte, int64) error
// SetWithGroupWithTTL set key value with a TTL in seconds
SetWithGroupWithTTL([]byte, []byte, aoe.Group, int64) error
// SetWithGroup set key value
SetWithGroup([]byte, []byte, aoe.Group) error
// Get returns the value of key
Get([]byte) ([]byte, error)
// GetWithGroup returns the value of key
GetWithGroup([]byte, aoe.Group) ([]byte, error)
// Delete remove the key from the store
Delete([]byte) error
// Scan scan [start,end) data
Scan([]byte, []byte, uint64) ([][]byte, error)
// ScanWithGroup Scan scan [start,end) data
ScanWithGroup([]byte, []byte, uint64, aoe.Group) ([][]byte, error)
// PrefixScan scan k-vs which k starts with prefix
PrefixScan([]byte, uint64) ([][]byte, error)
// PrefixScanWithGroup scan k-vs which k starts with prefix
PrefixScanWithGroup([]byte, uint64, aoe.Group) ([][]byte, error)
AllocID([]byte) (uint64, error)
// Exec exec command
Exec(cmd interface{}) ([]byte, error)
// AsyncExec async exec command
AsyncExec(interface{}, func(interface{}, []byte, error), interface{})
// ExecWithGroup exec command with group
ExecWithGroup(interface{}, aoe.Group) ([]byte, error)
// AsyncExecWithGroup async exec command with group
AsyncExecWithGroup(interface{}, aoe.Group, func(interface{}, []byte, error), interface{})
// RaftStore returns the raft store
RaftStore() raftstore.Store
}
type aoeStorage struct {
app *server.Application
store raftstore.Store
locks sync.Map // key -> lock
cmds map[uint64]raftcmdpb.CMDType
}
func (h *aoeStorage) Start() error {
return h.app.Start()
}
func (h *aoeStorage) Close() {
h.app.Stop()
}
// NewStorage returns a beehive request handler
func NewStorage(
metadataStorage cstorage.MetadataStorage,
kvDataStorage cstorage.DataStorage,
aoeDataStorage cstorage.DataStorage) (Storage, error) {
return NewStorageWithOptions(metadataStorage, kvDataStorage, aoeDataStorage, nil, server.Cfg{})
}
// NewStorageWithOptions returns a beehive request handler
func NewStorageWithOptions(
metaStorage cstorage.MetadataStorage,
kvDataStorage cstorage.DataStorage,
aoeDataStorage cstorage.DataStorage,
adjustFunc func(cfg *config.Config),
scfg server.Cfg) (Storage, error) {
h := &aoeStorage{
cmds: make(map[uint64]raftcmdpb.CMDType),
}
cfg := &config.Config{}
cfg.Customize.CustomSplitCompletedFuncFactory = func(group uint64) func(old *bhmetapb.Shard, news []bhmetapb.Shard) {
switch group {
case uint64(aoe.AOEGroup):
return func(old *bhmetapb.Shard, news []bhmetapb.Shard) {
//panic("not impl")
}
default:
return func(old *bhmetapb.Shard, news []bhmetapb.Shard) {
}
}
}
cfg.Storage.MetaStorage = metaStorage
cfg.Storage.DataStorageFactory = func(group, shardID uint64) cstorage.DataStorage {
switch group {
case uint64(aoe.KVGroup):
return kvDataStorage
case uint64(aoe.AOEGroup):
return aoeDataStorage
}
return nil
}
cfg.Storage.ForeachDataStorageFunc = func(cb func(cstorage.DataStorage)) {
cb(kvDataStorage)
cb(aoeDataStorage)
}
cfg.Prophet.Replication.Groups = []uint64{uint64(aoe.KVGroup), uint64(aoe.AOEGroup)}
cfg.ShardGroups = 2
cfg.Customize.CustomInitShardsFactory = func() []bhmetapb.Shard {
return []bhmetapb.Shard{
{
Group: uint64(aoe.KVGroup),
},
{
Group: uint64(aoe.AOEGroup),
Start: []byte("0"),
End: []byte("1"),
},
}
}
cfg.Prophet.ResourceStateChangedHandler = func(res metadata.Resource, from metapb.ResourceState, to metapb.ResourceState) {
if from == metapb.ResourceState_WaittingCreate && to == metapb.ResourceState_Running {
if res.Data() == nil {
return
}
header := format.MustBytesToUint64(res.Data()[0:8])
keys := bytes.Split(res.Data()[8:8+header], []byte("#"))
tKey := keys[0]
rKey := []byte(fmt.Sprintf("%s%d", string(keys[1]), res.ID()))
// TODO: Call local interface to create new tablet
// TODO: Re-design group store and set value to <partition, segment_ids>
_ = h.Set(rKey, []byte(res.Unique()))
t, _ := aoe.DecodeTable(res.Data()[8+header:])
t.State = aoe.StatePublic
meta, _ := aoe.EncodeTable(t)
_ = h.Set(tKey, meta)
}
}
cfg.Customize.CustomShardStateAwareFactory = func() aware.ShardStateAware {
return h
}
if adjustFunc != nil {
adjustFunc(cfg)
}
h.store = raftstore.NewStore(cfg)
scfg.Store = h.store
scfg.Handler = h
h.app = server.NewApplicationWithDispatcher(scfg, func(req *raftcmdpb.Request, cmd interface{}, proxy proxy.ShardsProxy) error {
if req.Group == uint64(aoe.KVGroup) {
return proxy.Dispatch(req)
}
args := cmd.(Args)
if args.Node == nil {
return proxy.Dispatch(req)
}
return proxy.DispatchTo(req, args.ShardId, string(args.Node))
})
h.init()
if err := h.app.Start(); err != nil {
return nil, err
}
return h, nil
}
func (h *aoeStorage) Set(key, value []byte) error {
return h.SetWithTTL(key, value, 0)
}
func (h *aoeStorage) SetWithTTL(key, value []byte, ttl int64) error {
req := Args{
Op: uint64(Set),
Args: [][]byte{
key,
value,
},
}
_, err := h.Exec(req)
return err
}
func (h *aoeStorage) SetWithGroup(key, value []byte, group aoe.Group) error {
return h.SetWithGroupWithTTL(key, value, group, 0)
}
func (h *aoeStorage) SetWithGroupWithTTL(key, value []byte, group aoe.Group, ttl int64) error {
req := Args{
Op: uint64(Set),
Args: [][]byte{
key,
value,
},
}
_, err := h.ExecWithGroup(req, group)
return err
}
func (h *aoeStorage) Get(key []byte) ([]byte, error) {
return h.GetWithGroup(key, aoe.KVGroup)
}
// GetWithGroup returns the value of key
func (h *aoeStorage) GetWithGroup(key []byte, group aoe.Group) ([]byte, error) {
req := Args{
Op: uint64(Get),
Args: [][]byte{
key,
},
}
value, err := h.ExecWithGroup(req, group)
return value, err
}
func (h *aoeStorage) Delete(key []byte) error {
req := Args{
Op: uint64(Del),
Args: [][]byte{
key,
},
}
_, err := h.Exec(req)
return err
}
func (h *aoeStorage) Scan(start []byte, end []byte, limit uint64) ([][]byte, error) {
return h.ScanWithGroup(start, end, limit, aoe.KVGroup)
}
func (h *aoeStorage) ScanWithGroup(start []byte, end []byte, limit uint64, group aoe.Group) ([][]byte, error) {
req := Args{
Op: uint64(Scan),
Args: [][]byte{
start,
end,
},
Limit: limit,
}
data, err := h.ExecWithGroup(req, group)
if err != nil {
return nil, err
}
var pairs [][]byte
err = json.Unmarshal(data, &pairs)
if err != nil {
return nil, err
}
return pairs, nil
}
func (h *aoeStorage) PrefixScan(prefix []byte, limit uint64) ([][]byte, error) {
return h.PrefixScanWithGroup(prefix, limit, aoe.KVGroup)
}
func (h *aoeStorage) PrefixScanWithGroup(prefix []byte, limit uint64, group aoe.Group) ([][]byte, error) {
startKey := prefix
req := Args{
Op: uint64(PrefixScan),
Args: [][]byte{
startKey,
prefix,
},
Limit: limit,
}
var pairs [][]byte
var err error
var data []byte
i := 0
for {
i = i + 1
data, err = h.ExecWithGroup(req, group)
if data == nil || err != nil {
break
}
var kvs [][]byte
err = json.Unmarshal(data, &kvs)
if err != nil || kvs == nil || len(kvs) == 0 {
break
}
if len(kvs)%2 == 0 {
pairs = append(pairs, kvs...)
break
}
pairs = append(pairs, kvs[0:len(kvs)-1]...)
req.Args[0] = kvs[len(kvs)-1]
}
return pairs, err
}
func (h *aoeStorage) AllocID(idkey []byte) (uint64, error) {
req := Args{
Op: uint64(Incr),
Args: [][]byte{
idkey,
},
}
data, err := h.Exec(req)
if err != nil {
return 0, err
}
resp := format.MustBytesToUint64(data)
return resp, nil
}
func (h *aoeStorage) Exec(cmd interface{}) ([]byte, error) {
return h.app.Exec(cmd, defaultRPCTimeout)
}
func (h *aoeStorage) AsyncExec(cmd interface{}, cb func(interface{}, []byte, error), arg interface{}) {
h.app.AsyncExecWithTimeout(cmd, cb, defaultRPCTimeout, arg)
}
func (h *aoeStorage) AsyncExecWithGroup(cmd interface{}, group aoe.Group, cb func(interface{}, []byte, error), arg interface{}) {
h.app.AsyncExecWithGroupAndTimeout(cmd, uint64(group), cb, defaultRPCTimeout, arg)
}
func (h *aoeStorage) ExecWithGroup(cmd interface{}, group aoe.Group) ([]byte, error) {
return h.app.ExecWithGroup(cmd, uint64(group), defaultRPCTimeout)
}
func (h *aoeStorage) RaftStore() raftstore.Store {
return h.store
}
func (h *aoeStorage) getStoreByGroup(group uint64, shard uint64) cstorage.DataStorage {
return h.store.DataStorageByGroup(group, shard)
}
package dist
type Args struct {
Op uint64 `json:"op"`
Args [][]byte `json:"args,omitempty"`
Limit uint64 `json:"limit"`
ShardId uint64 `json:"shard_id"`
Node []byte `json:"node"`
}
package aoe
import "matrixone/pkg/container/types"
type Group uint64
const (
KVGroup Group = iota
AOEGroup
)
type SchemaState byte
const (
// StateNone means this schema element is absent and can't be used.
StateNone SchemaState = iota
// StateDeleteOnly means we can only delete items for this schema element.
StateDeleteOnly
// StatePublic means this schema element is ok for all write and read operations.
StatePublic
)
const (
SharedShardUnique = "###shared"
)
type CatalogInfo struct {
Id uint64
Name string
}
// SchemaInfo stores the information of a schema(database).
type SchemaInfo struct {
CatalogId uint64 `json:"catalog_id"`
Id uint64 `json:"id"`
Name string `json:"name"`
Tables []*TableInfo `json:"tables"` // Tables in the DB.
State SchemaState `json:"state"`
}
// TableInfo stores the information of a table or view.
type TableInfo struct {
SchemaId uint64 `json:"schema_id"`
Id uint64 `json:"id"`
Name string `json:"name"`
// Type of the table: BASE TABLE for a normal table, VIEW for a view, etc.
Type string `json:"type"`
// Column is listed in order in which they appear in schema
Indexs []IndexInfo `json:"indexs"`
Columns []ColumnInfo `json:"columns"`
Comment []byte `json:"comment"`
State SchemaState `json:"state"`
Partition []byte `json:"partition"`
}
// ColumnInfo stores the information of a column.
type ColumnInfo struct {
SchemaId uint64 `json:"schema_id"`
TableID uint64 `json:"table_id"`
Id uint64 `json:"column_id"`
Name string `json:"name"`
Type types.Type `json:"type"`
Alg int `json:"alg"`
}
type IndexInfo struct {
SchemaId uint64 `json:"schema_id"`
TableId uint64 `json:"table_id"`
Columns []uint64 `json:"columns"`
Id uint64 `json:"id"`
Names []string `json:"column_names"`
Type uint64 `json:"type"`
}
type SegmentInfo struct {
TableId uint64 `json:"table_id"`
Id uint64 `json:"id"`
GroupId uint64 `json:"group_id"`
TabletId string `json:"tablet_id"`
PartitionId string `json:"partition_id"`
}
type RouteInfo struct {
GroupId uint64 `json:"group_id"`
Node []byte `json:"node"`
Segments map[uint64][]SegmentInfo
}
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment