diff --git a/Makefile b/Makefile index 12a3567ce7ce525e6be49e4aa8add0c698bd1ff0..83921606261248ad977c2a7bf52b82eedad50671 100644 --- a/Makefile +++ b/Makefile @@ -20,6 +20,10 @@ GENERATE_OVERLOAD_COMPARE := ./pkg/sql/colexec/extend/overload/eq.go ./pkg/sql/c GENERATE_OVERLOAD_OTHERS := ./pkg/sql/colexec/extend/overload/like.go ./pkg/sql/colexec/extend/overload/cast.go GENERATE_OVERLOAD_UNARYS := ./pkg/sql/colexec/extend/overload/unaryops.go +# files generated from cmd/generate-config +# they need to be deleted in cleaning +CONFIG_CODE_GENERATED := ./pkg/config/system_vars.go ./pkg/config/system_vars_test.go + # Creating build config .PHONY: config config: cmd/generate-config/main.go cmd/generate-config/config_template.go cmd/generate-config/system_vars_def.toml @@ -98,6 +102,7 @@ clean: @rm -f $(GENERATE_OVERLOAD_COMPARE) @rm -f $(GENERATE_OVERLOAD_OTHERS) @rm -f $(GENERATE_OVERLOAD_UNARYS) + @rm -f $(CONFIG_CODE_GENERATED) ifneq ($(wildcard $(BIN_NAME)),) $(info Remove file $(BIN_NAME)) @rm -f $(BIN_NAME) diff --git a/cmd/db-server/main.go b/cmd/db-server/main.go index 8d583b0e636ac4b78b6b6e20f0c963a386f078b6..582a501ecfba7fefbd50fcf6685ccce766ee1886 100644 --- a/cmd/db-server/main.go +++ b/cmd/db-server/main.go @@ -18,6 +18,7 @@ import ( "errors" "flag" "fmt" + "github.com/matrixorigin/matrixone/pkg/vm/engine/tpe/tuplecodec" "math" "os" "os/signal" @@ -48,6 +49,7 @@ import ( "github.com/matrixorigin/matrixone/pkg/vm/engine" aoeEngine "github.com/matrixorigin/matrixone/pkg/vm/engine/aoe/engine" aoeStorage "github.com/matrixorigin/matrixone/pkg/vm/engine/aoe/storage" + tpeEngine "github.com/matrixorigin/matrixone/pkg/vm/engine/tpe/engine" "github.com/matrixorigin/matrixone/pkg/vm/mheap" "github.com/matrixorigin/matrixone/pkg/vm/mmu/guest" "github.com/matrixorigin/matrixone/pkg/vm/mmu/host" @@ -123,9 +125,12 @@ call the catalog service to remove the epoch */ func removeEpoch(epoch uint64) { //logutil.Infof("removeEpoch %d",epoch) - _, err := c.RemoveDeletedTable(epoch) - if err != nil { - fmt.Printf("catalog remove ddl failed. error :%v \n", err) + var err error + if c != nil { + _, err = c.RemoveDeletedTable(epoch) + if err != nil { + fmt.Printf("catalog remove ddl failed. error :%v \n", err) + } } /* if tpe, ok := config.StorageEngine.(*tpeEngine.TpeEngine); ok { @@ -137,6 +142,176 @@ func removeEpoch(epoch uint64) { */ } +type aoeHandler struct { + cube driver.CubeDriver + port int64 + kvStorage storage.DataStorage + aoeStorage storage.DataStorage +} + +type tpeHandler struct { + aoe *aoeHandler +} + +type taeHandler struct { +} + +func initAoe(configFilePath string) *aoeHandler { + targetDir := config.GlobalSystemVariables.GetStorePath() + if err := recreateDir(targetDir); err != nil { + logutil.Infof("Recreate dir error:%v\n", err) + os.Exit(RecreateDirExit) + } + + cfg := parseConfig(configFilePath, targetDir) + + //aoe : kvstorage config + _, kvStorage := getKVDataStorage(targetDir, cfg) + + //aoe : catalog + catalogListener := catalog.NewCatalogListener() + aoeStorage := getAOEDataStorage(configFilePath, targetDir, catalogListener, cfg) + + //aoe cube driver + a, err := driver.NewCubeDriverWithOptions(kvStorage, aoeStorage, &cfg) + if err != nil { + logutil.Infof("Create cube driver failed, %v", err) + os.Exit(CreateCubeExit) + } + err = a.Start() + if err != nil { + logutil.Infof("Start cube driver failed, %v", err) + os.Exit(StartCubeExit) + } + + //aoe & tpe: address for computation + addr := cfg.CubeConfig.AdvertiseClientAddr + if len(addr) != 0 { + logutil.Infof("compile init address from cube AdvertiseClientAddr %s", addr) + } else { + logutil.Infof("compile init address from cube ClientAddr %s", cfg.CubeConfig.ClientAddr) + addr = cfg.CubeConfig.ClientAddr + } + + //put the node info to the computation + compile.InitAddress(addr) + + //aoe: catalog + c = catalog.NewCatalog(a) + config.ClusterCatalog = c + catalogListener.UpdateCatalog(c) + cngineConfig := aoeEngine.EngineConfig{} + _, err = toml.DecodeFile(configFilePath, &cngineConfig) + if err != nil { + logutil.Infof("Decode cube config error:%v\n", err) + os.Exit(DecodeCubeConfigExit) + } + + eng := aoeEngine.New(c, &cngineConfig) + + err = waitClusterStartup(a, 300*time.Second, int(cfg.CubeConfig.Prophet.Replication.MaxReplicas), int(cfg.ClusterConfig.PreAllocatedGroupNum)) + + if err != nil { + logutil.Infof("wait cube cluster startup failed, %v", err) + os.Exit(WaitCubeStartExit) + } + + //test storage aoe_storage + config.StorageEngine = eng + + li := strings.LastIndex(cfg.CubeConfig.ClientAddr, ":") + if li == -1 { + logutil.Infof("There is no port in client addr") + os.Exit(LoadConfigExit) + } + cubePort, err := strconv.ParseInt(string(cfg.CubeConfig.ClientAddr[li+1:]), 10, 32) + if err != nil { + logutil.Infof("Invalid port") + os.Exit(LoadConfigExit) + } + return &aoeHandler{ + cube: a, + port: cubePort, + kvStorage: kvStorage, + aoeStorage: aoeStorage, + } +} + +func closeAoe(aoe *aoeHandler) { + aoe.kvStorage.Close() + aoe.aoeStorage.Close() + aoe.cube.Close() +} + +func initTpe(configFilePath string, args []string) *tpeHandler { + aoe := initAoe(configFilePath) + tpeConf := &tpeEngine.TpeConfig{} + tpeConf.PBKV = nil + tpeConf.KVLimit = uint64(config.GlobalSystemVariables.GetTpeKVLimit()) + tpeConf.ParallelReader = config.GlobalSystemVariables.GetTpeParallelReader() + tpeConf.MultiNode = config.GlobalSystemVariables.GetTpeMultiNode() + tpeConf.TpeDedupSetBatchTimeout = time.Duration(config.GlobalSystemVariables.GetTpeDedupSetBatchTimeout()) + tpeConf.TpeDedupSetBatchTrycount = int(config.GlobalSystemVariables.GetTpeDedupSetBatchTryCount()) + tpeConf.TpeScanTimeout = time.Duration(config.GlobalSystemVariables.GetTpeScanTimeout()) + tpeConf.TpeScanTryCount = int(config.GlobalSystemVariables.GetTpeScanTryCount()) + tpeConf.ValueLayoutSerializerType = config.GlobalSystemVariables.GetTpeValueLayoutSerializer() + configKvTyp := strings.ToLower(config.GlobalSystemVariables.GetTpeKVType()) + if configKvTyp == "memorykv" { + tpeConf.KvType = tuplecodec.KV_MEMORY + } else if configKvTyp == "cubekv" { + tpeConf.KvType = tuplecodec.KV_CUBE + tpeConf.Cube = aoe.cube + } else { + logutil.Infof("there is no such kvType %s \n", configKvTyp) + os.Exit(CreateTpeExit) + } + configSerializeTyp := strings.ToLower(config.GlobalSystemVariables.GetTpeSerializer()) + if configSerializeTyp == "concise" { + tpeConf.SerialType = tuplecodec.ST_CONCISE + } else if configSerializeTyp == "json" { + tpeConf.SerialType = tuplecodec.ST_JSON + } else if configSerializeTyp == "flat" { + tpeConf.SerialType = tuplecodec.ST_FLAT + } else { + logutil.Infof("there is no such serializerType %s \n", configSerializeTyp) + os.Exit(CreateTpeExit) + } + te, err := tpeEngine.NewTpeEngine(tpeConf) + if err != nil { + logutil.Infof("create tpe error:%v\n", err) + os.Exit(CreateTpeExit) + } + err = te.Open() + if err != nil { + logutil.Infof("open tpe error:%v\n", err) + os.Exit(CreateTpeExit) + } + + //test storage aoe_storage + config.StorageEngine = te + + //test cluster nodes + config.ClusterNodes = engine.Nodes{} + err = tpeEngine.DumpDatabaseInfo(config.StorageEngine, args) + if err != nil { + logutil.Errorf("%s", err) + } + + return &tpeHandler{aoe: aoe} +} + +func closeTpe(tpe *tpeHandler) { + closeAoe(tpe.aoe) +} + +func initTae() *taeHandler { + return nil +} + +func closeTae(tae *taeHandler) { + +} + func main() { // if the argument passed in is "--version", return version info and exit if len(os.Args) == 2 && os.Args[1] == "--version" { @@ -191,123 +366,32 @@ func main() { ppu := frontend.NewPDCallbackParameterUnit(int(config.GlobalSystemVariables.GetPeriodOfEpochTimer()), int(config.GlobalSystemVariables.GetPeriodOfPersistence()), int(config.GlobalSystemVariables.GetPeriodOfDDLDeleteTimer()), int(config.GlobalSystemVariables.GetTimeoutOfHeartbeat()), config.GlobalSystemVariables.GetEnableEpochLogging(), math.MaxInt64) + //aoe : epochgc ? pci = frontend.NewPDCallbackImpl(ppu) pci.Id = int(NodeId) - - targetDir := config.GlobalSystemVariables.GetStorePath() - if err := recreateDir(targetDir); err != nil { - logutil.Infof("Recreate dir error:%v\n", err) - os.Exit(RecreateDirExit) - } - - cfg := parseConfig(configFilePath, targetDir) - - //kvs, kvStorage := getKVDataStorage(targetDir, cfg) - _, kvStorage := getKVDataStorage(targetDir, cfg) - defer kvStorage.Close() - - catalogListener := catalog.NewCatalogListener() - aoeStorage := getAOEDataStorage(configFilePath, targetDir, catalogListener, cfg) - defer aoeStorage.Close() - - a, err := driver.NewCubeDriverWithOptions(kvStorage, aoeStorage, &cfg) - if err != nil { - logutil.Infof("Create cube driver failed, %v", err) - os.Exit(CreateCubeExit) - } - err = a.Start() - if err != nil { - logutil.Infof("Start cube driver failed, %v", err) - os.Exit(StartCubeExit) - } - defer a.Close() - - addr := cfg.CubeConfig.AdvertiseClientAddr - if len(addr) != 0 { - logutil.Infof("compile init address from cube AdvertiseClientAddr %s", addr) - } else { - logutil.Infof("compile init address from cube ClientAddr %s", cfg.CubeConfig.ClientAddr) - addr = cfg.CubeConfig.ClientAddr - } - - //put the node info to the computation - compile.InitAddress(addr) - - c = catalog.NewCatalog(a) - config.ClusterCatalog = c - catalogListener.UpdateCatalog(c) - cngineConfig := aoeEngine.EngineConfig{} - _, err = toml.DecodeFile(configFilePath, &cngineConfig) - if err != nil { - logutil.Infof("Decode cube config error:%v\n", err) - os.Exit(DecodeCubeConfigExit) - } - var eng engine.Engine - // enableTpe := config.GlobalSystemVariables.GetEnableTpe() - /* - if enableTpe { - tpeConf := &tpeEngine.TpeConfig{} - tpeConf.PBKV = kvs - tpeConf.KVLimit = uint64(config.GlobalSystemVariables.GetTpeKVLimit()) - tpeConf.ParallelReader = config.GlobalSystemVariables.GetTpeParallelReader() - tpeConf.MultiNode = config.GlobalSystemVariables.GetTpeMultiNode() - tpeConf.TpeDedupSetBatchTimeout = time.Duration(config.GlobalSystemVariables.GetTpeDedupSetBatchTimeout()) - tpeConf.TpeDedupSetBatchTrycount = int(config.GlobalSystemVariables.GetTpeDedupSetBatchTryCount()) - tpeConf.TpeScanTimeout = time.Duration(config.GlobalSystemVariables.GetTpeScanTimeout()) - tpeConf.TpeScanTryCount = int(config.GlobalSystemVariables.GetTpeScanTryCount()) - tpeConf.ValueLayoutSerializerType = config.GlobalSystemVariables.GetTpeValueLayoutSerializer() - configKvTyp := strings.ToLower(config.GlobalSystemVariables.GetTpeKVType()) - if configKvTyp == "memorykv" { - tpeConf.KvType = tuplecodec.KV_MEMORY - } else if configKvTyp == "cubekv" { - tpeConf.KvType = tuplecodec.KV_CUBE - tpeConf.Cube = a - } else { - logutil.Infof("there is no such kvType %s \n", configKvTyp) - os.Exit(CreateTpeExit) - } - configSerializeTyp := strings.ToLower(config.GlobalSystemVariables.GetTpeSerializer()) - if configSerializeTyp == "concise" { - tpeConf.SerialType = tuplecodec.ST_CONCISE - } else if configSerializeTyp == "json" { - tpeConf.SerialType = tuplecodec.ST_JSON - } else if configSerializeTyp == "flat" { - tpeConf.SerialType = tuplecodec.ST_FLAT - } else { - logutil.Infof("there is no such serializerType %s \n", configSerializeTyp) - os.Exit(CreateTpeExit) - } - te, err := tpeEngine.NewTpeEngine(tpeConf) - if err != nil { - logutil.Infof("create tpe error:%v\n", err) - os.Exit(CreateTpeExit) - } - err = te.Open() - if err != nil { - logutil.Infof("open tpe error:%v\n", err) - os.Exit(CreateTpeExit) - } - eng = te - } else { - eng = aoeEngine.New(c, &cngineConfig) - } - */ - eng = aoeEngine.New(c, &cngineConfig) - pci.SetRemoveEpoch(removeEpoch) - li := strings.LastIndex(cfg.CubeConfig.ClientAddr, ":") - if li == -1 { - logutil.Infof("There is no port in client addr") - os.Exit(LoadConfigExit) - } - cubePort, err := strconv.ParseInt(string(cfg.CubeConfig.ClientAddr[li+1:]), 10, 32) - if err != nil { - logutil.Infof("Invalid port") + engineName := config.GlobalSystemVariables.GetStorageEngine() + var port int64 + port = config.GlobalSystemVariables.GetPortOfRpcServerInComputationEngine() + + var aoe *aoeHandler + var tpe *tpeHandler + var tae *taeHandler + if engineName == "aoe" { + aoe = initAoe(configFilePath) + port = aoe.port + } else if engineName == "tae" { + tae = initTae() + } else if engineName == "tpe" { + tpe = initTpe(configFilePath, args) + port = tpe.aoe.port + } else { + logutil.Errorf("undefined engine %s", engineName) os.Exit(LoadConfigExit) } - srv, err := rpcserver.New(fmt.Sprintf("%s:%d", Host, cubePort+100), 1<<30, logutil.GetGlobalLogger()) + srv, err := rpcserver.New(fmt.Sprintf("%s:%d", Host, port+100), 1<<30, logutil.GetGlobalLogger()) if err != nil { logutil.Infof("Create rpcserver failed, %v", err) os.Exit(CreateRPCExit) @@ -315,33 +399,15 @@ func main() { hm := host.New(1 << 40) gm := guest.New(1<<40, hm) proc := process.New(mheap.New(gm)) - hp := handler.New(eng, proc) + hp := handler.New(config.StorageEngine, proc) srv.Register(hp.Process) - err = waitClusterStartup(a, 300*time.Second, int(cfg.CubeConfig.Prophet.Replication.MaxReplicas), int(cfg.ClusterConfig.PreAllocatedGroupNum)) - - if err != nil { - logutil.Infof("wait cube cluster startup failed, %v", err) - os.Exit(WaitCubeStartExit) - } - go func() { if err := srv.Run(); err != nil { logutil.Infof("Start rpcserver failed, %v", err) os.Exit(RunRPCExit) } }() - //test storage aoe_storage - config.StorageEngine = eng - - //test cluster nodes - config.ClusterNodes = engine.Nodes{} - /* - err = tpeEngine.DumpDatabaseInfo(eng, args) - if err != nil { - logutil.Errorf("%s", err) - } - */ createMOServer(pci) @@ -359,6 +425,14 @@ func main() { } cleanup() + + if engineName == "aoe" { + closeAoe(aoe) + } else if engineName == "tae" { + closeTae(tae) + } else if engineName == "tpe" { + closeTpe(tpe) + } } func waitClusterStartup(driver driver.CubeDriver, timeout time.Duration, maxReplicas int, minimalAvailableShard int) error { diff --git a/cmd/generate-config/system_vars_def.toml b/cmd/generate-config/system_vars_def.toml index 34756827c20f9d0e5d3d91556ac63591a0c31812..5f1a1da4486f913a8d80eac7a8575c6ae0047f93 100644 --- a/cmd/generate-config/system_vars_def.toml +++ b/cmd/generate-config/system_vars_def.toml @@ -498,6 +498,26 @@ values = ["compact","default"] comment = "default is 'compact'. default : attributes in value without offset array. compact: attributes in value with offset array" update-mode = "dynamic" +[[parameter]] +name = "storageEngine" +scope = ["global"] +access = ["file"] +type = "string" +domain-type = "set" +values = ["aoe","tae","tpe"] +comment = "default engine is 'aoe'." +update-mode = "dynamic" + +[[parameter]] +name = "portOfRpcServerInComputationEngine" +scope = ["global"] +access = ["file"] +type = "int64" +domain-type = "range" +values = ["20000", "20000", "65535"] +comment = "port defines which port the rpc server listens on" +update-mode = "dynamic" + # Cluster Configs pre-allocated-group-num = 20 max-group-num = 0 diff --git a/pkg/frontend/init_db.go b/pkg/frontend/init_db.go index 93bdd7485f37e7d3a99c006c8175f3db23aeb1df..f14c619a6da597408a736f81b3328ebdb9be6691 100644 --- a/pkg/frontend/init_db.go +++ b/pkg/frontend/init_db.go @@ -212,7 +212,7 @@ func PrepareInitialDataForMoTables() [][]string { hard code tables: mo_database,mo_tables,mo_columns - tabled created in the initdb step: + tables created in the initdb step: mo_global_variables,mo_user */ data := [][]string{ @@ -473,7 +473,7 @@ func FillInitialDataForMoColumns() *batch.Batch { return PrepareInitialDataForSchema(schema, data) } -// DefineSchemaForMoColumns decides the schema of the mo_global_variables +// DefineSchemaForMoGlobalVariables decides the schema of the mo_global_variables func DefineSchemaForMoGlobalVariables() *CatalogSchema { /* mo_global_variables schema @@ -523,3 +523,59 @@ func FillInitialDataForMoGlobalVariables() *batch.Batch { data := PrepareInitialDataForMoGlobalVariables() return PrepareInitialDataForSchema(schema, data) } + +// DefineSchemaForMoUser decides the schema of the mo_table +func DefineSchemaForMoUser() *CatalogSchema { + /* + mo_user schema + | Attribute | Type | Primary Key | Note | + | --------- | ------------ | ---- | --------- | + | user_host | varchar(256) | PK | user host | + | user_name | varchar(256) | PK | user name | + | authentication_string | varchar(4096) | | password | + */ + userHostAttr := &CatalogSchemaAttribute{ + AttributeName: "user_host", + AttributeType: types.T_varchar.ToType(), + IsPrimaryKey: true, + Comment: "user host", + } + userHostAttr.AttributeType.Width = 256 + + userNameAttr := &CatalogSchemaAttribute{ + AttributeName: "user_name", + AttributeType: types.T_varchar.ToType(), + IsPrimaryKey: true, + Comment: "user name", + } + userNameAttr.AttributeType.Width = 256 + + passwordAttr := &CatalogSchemaAttribute{ + AttributeName: "authentication_string", + AttributeType: types.T_varchar.ToType(), + IsPrimaryKey: true, + Comment: "password", + } + passwordAttr.AttributeType.Width = 256 + + attrs := []*CatalogSchemaAttribute{ + userHostAttr, + userNameAttr, + passwordAttr, + } + return &CatalogSchema{Name: "mo_user", Attributes: attrs} +} + +func PrepareInitialDataForMoUser() [][]string { + data := [][]string{ + {"localhost", "root", "''"}, + {"localhost", "dump", "111"}, + } + return data +} + +func FillInitialDataForMoUser() *batch.Batch { + schema := DefineSchemaForMoUser() + data := PrepareInitialDataForMoUser() + return PrepareInitialDataForSchema(schema, data) +} diff --git a/pkg/frontend/init_db_test.go b/pkg/frontend/init_db_test.go index 5f94e4812d03071c126f882d8845a5d1962a59b0..19b2a35eef51bce379f8b79e6767758be9493d43 100644 --- a/pkg/frontend/init_db_test.go +++ b/pkg/frontend/init_db_test.go @@ -88,4 +88,21 @@ func TestPrepareInitialData(t *testing.T) { convey.So(line, convey.ShouldResemble, s) } }) + + convey.Convey("mo_user", t, func() { + sch := DefineSchemaForMoUser() + data := PrepareInitialDataForMoUser() + bat := FillInitialDataForMoUser() + convey.So(bat, convey.ShouldNotBeNil) + convey.So(batch.Length(bat), convey.ShouldEqual, len(data)) + convey.So(len(bat.Vecs), convey.ShouldEqual, len(data[0])) + convey.So(len(bat.Vecs), convey.ShouldEqual, sch.Length()) + for i, attr := range sch.GetAttributes() { + convey.So(attr.AttributeType.Eq(bat.Vecs[i].Typ), convey.ShouldBeTrue) + } + for i, line := range data { + s := FormatLineInBatch(bat, i) + convey.So(line, convey.ShouldResemble, s) + } + }) }