Skip to content
Snippets Groups Projects
Unverified Commit e217d39e authored by daviszhen's avatar daviszhen Committed by GitHub
Browse files

Enable tae engine (#2566)

parent 574f471e
No related branches found
No related tags found
No related merge requests found
......@@ -18,6 +18,8 @@ import (
"errors"
"flag"
"fmt"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/db"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/moengine"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tpe/tuplecodec"
"math"
"os"
......@@ -72,6 +74,8 @@ const (
CreateTpeExit = 13
RunRPCExit = 14
ShutdownExit = 15
CreateTaeExit = 16
InitCatalogExit = 17
)
var (
......@@ -147,6 +151,7 @@ type aoeHandler struct {
port int64
kvStorage storage.DataStorage
aoeStorage storage.DataStorage
eng engine.Engine
}
type tpeHandler struct {
......@@ -154,6 +159,8 @@ type tpeHandler struct {
}
type taeHandler struct {
eng engine.Engine
tae *db.DB
}
func initAoe(configFilePath string) *aoeHandler {
......@@ -234,6 +241,7 @@ func initAoe(configFilePath string) *aoeHandler {
port: cubePort,
kvStorage: kvStorage,
aoeStorage: aoeStorage,
eng: eng,
}
}
......@@ -305,11 +313,26 @@ func closeTpe(tpe *tpeHandler) {
}
func initTae() *taeHandler {
return nil
targetDir := config.GlobalSystemVariables.GetStorePath()
if err := recreateDir(targetDir); err != nil {
logutil.Infof("Recreate dir error:%v\n", err)
os.Exit(RecreateDirExit)
}
tae, err := db.Open(targetDir+"/tae", nil)
if err != nil {
logutil.Infof("Open tae failed. error:%v", err)
os.Exit(CreateTaeExit)
}
return &taeHandler{
eng: moengine.NewEngine(tae),
tae: tae,
}
}
func closeTae(tae *taeHandler) {
_ = tae.tae.Close()
}
func main() {
......@@ -349,6 +372,20 @@ func main() {
os.Exit(LoadConfigExit)
}
//just initialize the tae after configuration has been loaded
if len(args) == 2 && args[1] == "init_db" {
fmt.Println("Initialize the TAE engine ...")
taeWrapper := initTae()
err := frontend.InitDB(taeWrapper.eng)
if err != nil {
logutil.Infof("Initialize catalog failed. error:%v", err)
os.Exit(InitCatalogExit)
}
fmt.Println("Initialize the TAE engine Done")
closeTae(taeWrapper)
os.Exit(0)
}
if *cpuProfilePathFlag != "" {
stop := startCPUProfile()
defer stop()
......
......@@ -15,9 +15,23 @@
package frontend
import (
"errors"
"fmt"
"github.com/matrixorigin/matrixone/pkg/container/batch"
"github.com/matrixorigin/matrixone/pkg/container/types"
"github.com/matrixorigin/matrixone/pkg/logutil"
"github.com/matrixorigin/matrixone/pkg/vm/engine"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/moengine"
"sort"
)
var (
errorIsNotTaeEngine = errors.New("the engine is not tae")
errorMissingCatalogTables = errors.New("missing catalog tables")
errorMissingCatalogDatabases = errors.New("missing catalog databases")
errorNoSuchAttribute = errors.New("no such attribute in the schema")
errorAttributeTypeIsDifferent = errors.New("attribute type is different with that in the schema")
errorAttributeIsNotPrimary = errors.New("attribute is not primary key")
)
// CatalogSchemaAttribute defines the attribute of the schema
......@@ -493,7 +507,7 @@ func DefineSchemaForMoGlobalVariables() *CatalogSchema {
gvVariableValueAttr := &CatalogSchemaAttribute{
AttributeName: "gv_variable_value",
AttributeType: types.T_varchar.ToType(),
IsPrimaryKey: true,
IsPrimaryKey: false,
Comment: "",
}
gvVariableNameAttr.AttributeType.Width = 1024
......@@ -553,7 +567,7 @@ func DefineSchemaForMoUser() *CatalogSchema {
passwordAttr := &CatalogSchemaAttribute{
AttributeName: "authentication_string",
AttributeType: types.T_varchar.ToType(),
IsPrimaryKey: true,
IsPrimaryKey: false,
Comment: "password",
}
passwordAttr.AttributeType.Width = 256
......@@ -579,3 +593,285 @@ func FillInitialDataForMoUser() *batch.Batch {
data := PrepareInitialDataForMoUser()
return PrepareInitialDataForSchema(schema, data)
}
// InitDB setups the initial catalog tables in tae
func InitDB(tae engine.Engine) error {
taeEngine, ok := tae.(moengine.TxnEngine)
if !ok {
return errorIsNotTaeEngine
}
txnCtx, err := taeEngine.StartTxn(nil)
if err != nil {
return err
}
/*
stage 1: create catalog tables
*/
//1.get database mo_catalog handler
//TODO: use mo_catalog after tae is ready
catalogDbName := "mo_catalog_tmp"
err = tae.Create(0, catalogDbName, 0, txnCtx.GetCtx())
if err != nil {
logutil.Infof("create database %v failed.error:%v", catalogDbName, err)
err2 := txnCtx.Rollback()
if err2 != nil {
logutil.Infof("txnCtx rollback failed. error:%v", err2)
return err2
}
return err
}
catalogDB, err := tae.Database(catalogDbName, txnCtx.GetCtx())
if err != nil {
logutil.Infof("get database %v failed.error:%v", catalogDbName, err)
err2 := txnCtx.Rollback()
if err2 != nil {
logutil.Infof("txnCtx rollback failed. error:%v", err2)
return err2
}
return err
}
//2. create table mo_global_variables
gvSch := DefineSchemaForMoGlobalVariables()
gvDefs := convertCatalogSchemaToTableDef(gvSch)
err = catalogDB.Create(0, gvSch.GetName(), gvDefs, txnCtx.GetCtx())
if err != nil {
logutil.Infof("create table %v failed.error:%v", gvSch.GetName(), err)
err2 := txnCtx.Rollback()
if err2 != nil {
logutil.Infof("txnCtx rollback failed. error:%v", err2)
return err2
}
return err
}
//3. create table mo_user
userSch := DefineSchemaForMoUser()
userDefs := convertCatalogSchemaToTableDef(userSch)
err = catalogDB.Create(0, userSch.GetName(), userDefs, txnCtx.GetCtx())
if err != nil {
logutil.Infof("create table %v failed.error:%v", userSch.GetName(), err)
err2 := txnCtx.Rollback()
if err2 != nil {
logutil.Infof("txnCtx rollback failed. error:%v", err2)
return err2
}
return err
}
/*
stage 2: create information_schema database.
Views in the information_schema need to created by 'create view'
*/
//1. create database information_schema
infoSchemaName := "information_schema"
err = tae.Create(0, infoSchemaName, 0, txnCtx.GetCtx())
if err != nil {
logutil.Infof("create database %v failed.error:%v", infoSchemaName, err)
err2 := txnCtx.Rollback()
if err2 != nil {
logutil.Infof("txnCtx rollback failed. error:%v", err2)
return err2
}
return err
}
//TODO: create views after the computation engine is ready
err = txnCtx.Commit()
if err != nil {
logutil.Infof("txnCtx commit failed.error:%v", err)
return err
}
return sanityCheck(tae)
}
// sanityCheck checks the catalog is ready or not
func sanityCheck(tae engine.Engine) error {
taeEngine, ok := tae.(moengine.TxnEngine)
if !ok {
return errorIsNotTaeEngine
}
txnCtx, err := taeEngine.StartTxn(nil)
if err != nil {
return err
}
// databases: mo_catalog,mo_catalog_tmp,information_schema
dbs := tae.Databases(txnCtx.GetCtx())
wantDbs := []string{"mo_catalog", "mo_catalog_tmp", "information_schema"}
if !isWanted(dbs, wantDbs) {
logutil.Infof("wantDbs %v,dbs %v", wantDbs, dbs)
return errorMissingCatalogDatabases
}
// database mo_catalog has tables:mo_database,mo_tables,mo_columns
//TODO:check tae.mo_catalog.mo_databases -> mo_database
//TODO:check tae.mo_catalog.mo_database.datName -> datname
wantTablesOfMoCatalog := []string{"mo_database", "mo_tables", "mo_columns"}
wantSchemasOfCatalog := []*CatalogSchema{
DefineSchemaForMoDatabase(),
DefineSchemaForMoTables(),
DefineSchemaForMoColumns(),
}
catalogDbName := "mo_catalog"
err = isWantedDatabase(taeEngine, txnCtx, catalogDbName, wantTablesOfMoCatalog, wantSchemasOfCatalog)
if err != nil {
return err
}
//TODO:fix it after tae is ready
// database mo_catalog_tmp has tables: mo_global_variables,mo_user
wantTablesOfMoCatalogTmp := []string{"mo_global_variables", "mo_user"}
wantSchemasOfCatalogTmp := []*CatalogSchema{
DefineSchemaForMoGlobalVariables(),
DefineSchemaForMoUser(),
}
catalogDbTmpName := "mo_catalog_tmp"
err = isWantedDatabase(taeEngine, txnCtx, catalogDbTmpName, wantTablesOfMoCatalogTmp, wantSchemasOfCatalogTmp)
if err != nil {
return err
}
err = txnCtx.Commit()
if err != nil {
logutil.Infof("txnCtx commit failed.error:%v", err)
return err
}
return nil
}
// isWanted checks the string slices are same
func isWanted(want, actual []string) bool {
w := make([]string, len(want))
copy(w, want)
a := make([]string, len(actual))
copy(a, actual)
sort.Strings(w)
sort.Strings(a)
if len(w) != len(a) {
return false
}
for i := 0; i < len(w); i++ {
if w[i] != a[i] {
return false
}
}
return true
}
// isWantedDatabase checks the database has the right tables
func isWantedDatabase(taeEngine moengine.TxnEngine, txnCtx moengine.Txn,
dbName string, tables []string, schemas []*CatalogSchema) error {
db, err := taeEngine.Database(dbName, txnCtx.GetCtx())
if err != nil {
logutil.Infof("get database %v failed.error:%v", dbName, err)
err2 := txnCtx.Rollback()
if err2 != nil {
logutil.Infof("txnCtx rollback failed. error:%v", err2)
return err2
}
return err
}
tablesOfMoCatalog := db.Relations(txnCtx.GetCtx())
if !isWanted(tablesOfMoCatalog, tables) {
logutil.Infof("wantTables %v, tables %v", tables, tablesOfMoCatalog)
return errorMissingCatalogTables
}
//check table attributes
for i, tableName := range tables {
err = isWantedTable(db, txnCtx, tableName, schemas[i])
if err != nil {
return err
}
}
return err
}
//isWantedTable checks the table has the right attributes
func isWantedTable(db engine.Database, txnCtx moengine.Txn,
tableName string, schema *CatalogSchema) error {
table, err := db.Relation(tableName, txnCtx.GetCtx())
if err != nil {
logutil.Infof("get table %v failed.error:%v", tableName, err)
err2 := txnCtx.Rollback()
if err2 != nil {
logutil.Infof("txnCtx rollback failed. error:%v", err2)
return err2
}
return err
}
defs := table.TableDefs(txnCtx.GetCtx())
attrs := make(map[string]*CatalogSchemaAttribute)
for _, attr := range schema.GetAttributes() {
attrs[attr.GetName()] = attr
}
for _, def := range defs {
if attr, ok := def.(*engine.AttributeDef); ok {
if schemaAttr, ok2 := attrs[attr.Attr.Name]; ok2 {
if attr.Attr.Name != schemaAttr.GetName() {
logutil.Infof("def name %v schema name %v", attr.Attr.Name, schemaAttr.GetName())
return errorNoSuchAttribute
}
if !attr.Attr.Type.Eq(schemaAttr.GetType()) {
return errorAttributeTypeIsDifferent
}
if !(attr.Attr.Primary && schemaAttr.GetIsPrimaryKey() ||
!attr.Attr.Primary && !schemaAttr.GetIsPrimaryKey()) {
return errorAttributeIsNotPrimary
}
} else {
logutil.Infof("def name 1 %v", attr.Attr.Name)
return errorNoSuchAttribute
}
} else if attr, ok2 := def.(*engine.PrimaryIndexDef); ok2 {
for _, name := range attr.Names {
if schemaAttr, ok2 := attrs[name]; ok2 {
if !schemaAttr.GetIsPrimaryKey() {
return errorAttributeIsNotPrimary
}
} else {
logutil.Infof("def name 2 %v", name)
return errorNoSuchAttribute
}
}
}
}
return nil
}
func convertCatalogSchemaToTableDef(sch *CatalogSchema) []engine.TableDef {
var defs []engine.TableDef
var primaryKeyName []string
for _, attr := range sch.GetAttributes() {
if attr.GetIsPrimaryKey() {
primaryKeyName = append(primaryKeyName, attr.GetName())
}
defs = append(defs, &engine.AttributeDef{Attr: engine.Attribute{
Name: attr.GetName(),
Alg: 0,
Type: attr.GetType(),
Default: engine.DefaultExpr{},
Primary: attr.GetIsPrimaryKey(),
}})
}
if len(primaryKeyName) != 0 {
defs = append(defs, &engine.PrimaryIndexDef{
Names: primaryKeyName,
})
}
return defs
}
......@@ -181,7 +181,7 @@ func Test_load(t *testing.T) {
guestMmu := guest.New(pu.SV.GetGuestMmuLimitation(), pu.HostMmu)
ses := NewSession(proto, epochgc, guestMmu, pu.Mempool, pu)
ses := NewSession(proto, epochgc, guestMmu, pu.Mempool, pu, nil)
mce := NewMysqlCmdExecutor()
......@@ -347,7 +347,7 @@ func Test_load(t *testing.T) {
guestMmu := guest.New(pu.SV.GetGuestMmuLimitation(), pu.HostMmu)
ses := NewSession(proto, epochgc, guestMmu, pu.Mempool, pu)
ses := NewSession(proto, epochgc, guestMmu, pu.Mempool, pu, nil)
mce := NewMysqlCmdExecutor()
......
......@@ -471,7 +471,7 @@ func Test_mce(t *testing.T) {
guestMmu := guest.New(pu.SV.GetGuestMmuLimitation(), pu.HostMmu)
ses := NewSession(proto, epochgc, guestMmu, pu.Mempool, pu)
ses := NewSession(proto, epochgc, guestMmu, pu.Mempool, pu, nil)
mce := NewMysqlCmdExecutor()
......@@ -566,7 +566,7 @@ func Test_mce_selfhandle(t *testing.T) {
guestMmu := guest.New(pu.SV.GetGuestMmuLimitation(), pu.HostMmu)
ses := NewSession(proto, epochgc, guestMmu, pu.Mempool, pu)
ses := NewSession(proto, epochgc, guestMmu, pu.Mempool, pu, nil)
mce := NewMysqlCmdExecutor()
mce.PrepareSessionBeforeExecRequest(ses)
......@@ -601,7 +601,7 @@ func Test_mce_selfhandle(t *testing.T) {
guestMmu := guest.New(pu.SV.GetGuestMmuLimitation(), pu.HostMmu)
ses := NewSession(proto, epochgc, guestMmu, pu.Mempool, pu)
ses := NewSession(proto, epochgc, guestMmu, pu.Mempool, pu, nil)
ses.Mrs = &MysqlResultSet{}
mce := NewMysqlCmdExecutor()
......@@ -674,7 +674,7 @@ func Test_getDataFromPipeline(t *testing.T) {
guestMmu := guest.New(pu.SV.GetGuestMmuLimitation(), pu.HostMmu)
ses := NewSession(proto, epochgc, guestMmu, pu.Mempool, pu)
ses := NewSession(proto, epochgc, guestMmu, pu.Mempool, pu, nil)
ses.Mrs = &MysqlResultSet{}
// mce := NewMysqlCmdExecutor()
......@@ -741,7 +741,7 @@ func Test_getDataFromPipeline(t *testing.T) {
proto := NewMysqlClientProtocol(0, ioses, 1024, pu.SV)
epochgc := getPCI()
guestMmu := guest.New(pu.SV.GetGuestMmuLimitation(), pu.HostMmu)
ses := NewSession(proto, epochgc, guestMmu, pu.Mempool, pu)
ses := NewSession(proto, epochgc, guestMmu, pu.Mempool, pu, nil)
ses.Mrs = &MysqlResultSet{}
convey.So(getDataFromPipeline(ses, nil), convey.ShouldBeNil)
......
......@@ -94,7 +94,7 @@ func (routine *Routine) Loop() {
mgr := routine.GetRoutineMgr()
routine.protocol.(*MysqlProtocolImpl).sequenceId = req.seq
ses := NewSession(routine.protocol, mgr.getEpochgc(), routine.guestMmu, routine.mempool, mgr.getParameterUnit())
ses := NewSession(routine.protocol, mgr.getEpochgc(), routine.guestMmu, routine.mempool, mgr.getParameterUnit(), nil)
routine.executor.PrepareSessionBeforeExecRequest(ses)
......
......@@ -17,6 +17,7 @@ package frontend
import (
"github.com/matrixorigin/matrixone/pkg/config"
"github.com/matrixorigin/matrixone/pkg/sql/parsers/tree"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/moengine"
"github.com/matrixorigin/matrixone/pkg/vm/mempool"
"github.com/matrixorigin/matrixone/pkg/vm/mmu/guest"
)
......@@ -42,10 +43,12 @@ type Session struct {
ep *tree.ExportParam
closeRef *CloseExportData
//tae txn
taeTxn moengine.Txn
}
func NewSession(proto Protocol, pdHook *PDCallbackImpl,
gm *guest.Mmu, mp *mempool.Mempool, PU *config.ParameterUnit) *Session {
func NewSession(proto Protocol, pdHook *PDCallbackImpl, gm *guest.Mmu, mp *mempool.Mempool, PU *config.ParameterUnit, taeTxn moengine.Txn) *Session {
return &Session{
protocol: proto,
pdHook: pdHook,
......@@ -57,6 +60,7 @@ func NewSession(proto Protocol, pdHook *PDCallbackImpl,
Fields: &tree.Fields{},
Lines: &tree.Lines{},
},
taeTxn: taeTxn,
}
}
......
......@@ -16,6 +16,7 @@ package moengine
import (
"bytes"
"github.com/matrixorigin/matrixone/pkg/vm/engine"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/db"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/iface/handle"
......@@ -31,6 +32,13 @@ type Txn interface {
GetError() error
}
type TxnEngine interface {
engine.Engine
StartTxn(info []byte) (txn Txn, err error)
}
var _ TxnEngine = &txnEngine{}
type txnEngine struct {
impl *db.DB
}
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment