Skip to content
Snippets Groups Projects
Unverified Commit c5241688 authored by daviszhen's avatar daviszhen Committed by GitHub
Browse files

Add the cn-service (#4535)

1. Connect the cn-service with the frontend and the computation engine.
2. Change the configuraion mechanism to the general method. Remove the auto-generated configuration.

Approved by: @zhangxu19830126, @yingfeng, @fengttt, @aptend, @reusee, @nnsgmsone, @lni
parent f395c45f
No related branches found
No related tags found
No related merge requests found
Showing
with 476 additions and 4316 deletions
......@@ -12,7 +12,6 @@ y.go
.vscode
*.iml
tags
system_vars_config.toml
coverage.txt
out.txt
vendor/
......
......@@ -81,23 +81,10 @@ vendor-build:
# code generation
###############################################################################
# files generated from cmd/generate-config
# they need to be deleted in the clean target
CONFIG_CODE_GENERATED := ./pkg/config/system_vars.go ./pkg/config/system_vars_test.go
CONFIG_DEPS=cmd/generate-config/main.go \
cmd/generate-config/config_template.go \
cmd/generate-config/system_vars_def.toml
.PHONY: config
config: $(CONFIG_DEPS)
config:
$(info [Create build config])
@go mod tidy
@go build -o $(BUILD_CFG) cmd/generate-config/main.go cmd/generate-config/config_template.go
@./$(BUILD_CFG) cmd/generate-config/system_vars_def.toml
@mv -f cmd/generate-config/system_vars_config.toml .
@mv -f cmd/generate-config/system_vars.go pkg/config
@mv -f cmd/generate-config/system_vars_test.go pkg/config
.PHONY: generate-pb
generate-pb:
......@@ -178,7 +165,7 @@ clean:
$(info [Clean up])
$(info Clean go test cache)
@go clean -testcache
rm -f $(CONFIG_CODE_GENERATED) $(BIN_NAME) $(SERVICE_BIN_NAME) $(BUILD_CFG)
rm -f $(BIN_NAME) $(SERVICE_BIN_NAME) $(BUILD_CFG)
rm -rf $(VENDOR_DIRECTORY)
$(MAKE) -C cgo clean
......
......@@ -18,6 +18,7 @@ import (
"context"
"flag"
"fmt"
"github.com/BurntSushi/toml"
"os"
"os/signal"
"syscall"
......@@ -38,9 +39,8 @@ import (
)
const (
InitialValuesExit = 1
LoadConfigExit = 2
RecreateDirExit = 3
LoadConfigExit = 2
RecreateDirExit = 3
// CreateRPCExit = 10
StartMOExit = 12
// RunRPCExit = 14
......@@ -61,9 +61,8 @@ var (
MoVersion = ""
)
func createMOServer(inputCtx context.Context) {
address := fmt.Sprintf("%s:%d", config.GlobalSystemVariables.GetHost(), config.GlobalSystemVariables.GetPort())
pu := config.NewParameterUnit(&config.GlobalSystemVariables, config.HostMmu, config.Mempool, config.StorageEngine, config.ClusterNodes)
func createMOServer(inputCtx context.Context, pu *config.ParameterUnit) {
address := fmt.Sprintf("%s:%d", pu.SV.Host, pu.SV.Port)
moServerCtx := context.WithValue(inputCtx, config.ParameterUnitKey, pu)
mo = frontend.NewMOServer(moServerCtx, address, pu)
......@@ -72,9 +71,9 @@ func createMOServer(inputCtx context.Context) {
if _, err := trace.Init(moServerCtx,
trace.WithMOVersion(MoVersion),
trace.WithNode(0, trace.NodeTypeNode),
trace.EnableTracer(config.GlobalSystemVariables.GetEnableTrace()),
trace.WithBatchProcessMode(config.GlobalSystemVariables.GetTraceBatchProcessor()),
trace.DebugMode(config.GlobalSystemVariables.GetEnableTraceDebug()),
trace.EnableTracer(!pu.SV.DisableTrace),
trace.WithBatchProcessMode(pu.SV.TraceBatchProcessor),
trace.DebugMode(pu.SV.EnableTraceDebug),
trace.WithSQLExecutor(func() ie.InternalExecutor {
return frontend.NewInternalExecutor(pu)
}),
......@@ -83,7 +82,7 @@ func createMOServer(inputCtx context.Context) {
}
}
if config.GlobalSystemVariables.GetEnableMetric() {
if !pu.SV.DisableMetric {
ieFactory := func() ie.InternalExecutor {
return frontend.NewInternalExecutor(pu)
}
......@@ -126,8 +125,8 @@ type taeHandler struct {
tae *db.DB
}
func initTae() *taeHandler {
targetDir := config.GlobalSystemVariables.GetStorePath()
func initTae(pu *config.ParameterUnit) *taeHandler {
targetDir := pu.SV.StorePath
if err := recreateDir(targetDir); err != nil {
logutil.Infof("Recreate dir error:%v\n", err)
os.Exit(RecreateDirExit)
......@@ -142,7 +141,7 @@ func initTae() *taeHandler {
eng := moengine.NewEngine(tae)
//test storage aoe_storage
config.StorageEngine = eng
pu.StorageEngine = eng
return &taeHandler{
eng: eng,
......@@ -179,25 +178,23 @@ func main() {
configFilePath := args[0]
//before anything using the configuration
if err := config.GlobalSystemVariables.LoadInitialValues(); err != nil {
logutil.Infof("Initial values error:%v\n", err)
os.Exit(InitialValuesExit)
}
params := &config.FrontendParameters{}
pu := config.NewParameterUnit(params, nil, nil, nil, nil)
if err := config.LoadvarsConfigFromFile(configFilePath, &config.GlobalSystemVariables); err != nil {
logutil.Infof("Load config error:%v\n", err)
//before anything using the configuration
_, err := toml.DecodeFile(configFilePath, params)
if err != nil {
os.Exit(LoadConfigExit)
}
logConf := logutil.LogConfig{
Level: config.GlobalSystemVariables.GetLogLevel(),
Format: config.GlobalSystemVariables.GetLogFormat(),
Filename: config.GlobalSystemVariables.GetLogFilename(),
MaxSize: int(config.GlobalSystemVariables.GetLogMaxSize()),
MaxDays: int(config.GlobalSystemVariables.GetLogMaxDays()),
MaxBackups: int(config.GlobalSystemVariables.GetLogMaxBackups()),
EnableStore: config.GlobalSystemVariables.GetEnableTrace(),
Level: params.LogLevel,
Format: params.LogFormat,
Filename: params.LogFilename,
MaxSize: int(params.LogMaxSize),
MaxDays: int(params.LogMaxDays),
MaxBackups: int(params.LogMaxBackups),
EnableStore: !params.DisableTrace,
}
logutil.SetupMOLogger(&logConf)
......@@ -208,7 +205,7 @@ func main() {
//just initialize the tae after configuration has been loaded
if len(args) == 2 && args[1] == "initdb" {
fmt.Println("Initialize the TAE engine ...")
taeWrapper := initTae()
taeWrapper := initTae(pu)
err := frontend.InitDB(cancelMoServerCtx, taeWrapper.eng)
if err != nil {
logutil.Infof("Initialize catalog failed. error:%v", err)
......@@ -229,35 +226,26 @@ func main() {
logutil.Infof("Shutdown The Server With Ctrl+C | Ctrl+\\.")
config.HostMmu = host.New(config.GlobalSystemVariables.GetHostMmuLimitation())
// Host := config.GlobalSystemVariables.GetHost()
engineName := config.GlobalSystemVariables.GetStorageEngine()
// port := config.GlobalSystemVariables.GetPortOfRpcServerInComputationEngine()
pu.HostMmu = host.New(params.HostMmuLimitation)
var tae *taeHandler
if engineName == "tae" {
fmt.Println("Initialize the TAE engine ...")
tae = initTae()
err := frontend.InitDB(cancelMoServerCtx, tae.eng)
if err != nil {
logutil.Infof("Initialize catalog failed. error:%v", err)
os.Exit(InitCatalogExit)
}
fmt.Println("Initialize the TAE engine Done")
} else {
logutil.Errorf("undefined engine %s", engineName)
os.Exit(LoadConfigExit)
fmt.Println("Initialize the TAE engine ...")
tae = initTae(pu)
err = frontend.InitDB(cancelMoServerCtx, tae.eng)
if err != nil {
logutil.Infof("Initialize catalog failed. error:%v", err)
os.Exit(InitCatalogExit)
}
fmt.Println("Initialize the TAE engine Done")
if err := agent.Listen(agent.Options{}); err != nil {
logutil.Errorf("listen gops agent failed: %s", err)
os.Exit(StartMOExit)
}
createMOServer(cancelMoServerCtx)
createMOServer(cancelMoServerCtx, pu)
err := runMOServer()
err = runMOServer()
if err != nil {
logutil.Infof("Start MOServer failed, %v", err)
os.Exit(StartMOExit)
......@@ -277,7 +265,5 @@ func main() {
cleanup()
if engineName == "tae" {
closeTae(tae)
}
closeTae(tae)
}
This diff is collapsed.
This diff is collapsed.
// Copyright 2021 Matrix Origin
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
"fmt"
"os"
)
func main() {
argCnt := len(os.Args)
if argCnt < 2 || argCnt > 3 {
fmt.Printf("usage: %s definitionFile [outputDiretory]\n", os.Args[0])
return
}
var gen ConfigurationFileGenerator
if argCnt == 2 {
gen = NewConfigurationFileGenerator(os.Args[1])
} else if argCnt == 3 {
gen = NewConfigurationFileGeneratorWithOutputDirectory(os.Args[1], os.Args[2])
}
if err := gen.Generate(); err != nil {
fmt.Printf("generate system variables failed. error:%v \n", err)
os.Exit(-1)
}
}
# do not change this part {
parameter-struct-name = "SystemVariables"
config-struct-name = "varsConfig"
operation-file-name = "system_vars"
config-file-name = "system_vars_config"
# }
[[parameter]]
name = "rootname"
scope = ["global"]
access = ["file"]
type = "string"
domain-type = "set"
values = ["root"]
comment = "root name"
update-mode = "fix"
[[parameter]]
name = "rootpassword"
scope = ["global"]
access = ["file"]
type = "string"
domain-type = "set"
values = [""]
comment = "root password"
update-mode = "dynamic"
[[parameter]]
name = "dumpuser"
scope = ["global"]
access = ["file"]
type = "string"
domain-type = "set"
values = ["dump"]
comment = "dump user name"
update-mode = "fix"
[[parameter]]
name = "dumppassword"
scope = ["global"]
access = ["file"]
type = "string"
domain-type = "set"
values = ["111"]
comment = "dump user password"
update-mode = "fix"
[[parameter]]
name = "dumpdatabase"
scope = ["global"]
access = ["file"]
type = "string"
domain-type = "set"
values = ["default"]
comment = "dump database name"
update-mode = "dynamic"
[[parameter]]
name = "port"
scope = ["global"]
access = ["file"]
type = "int64"
domain-type = "range"
values = ["6001", "6001", "6010"]
comment = "port defines which port the mo-server listens on and clients connect to"
update-mode = "dynamic"
[[parameter]]
name = "host"
scope = ["global"]
access = ["file"]
type = "string"
domain-type = "set"
values = ["0.0.0.0","localhost","127.0.0.1"]
comment = "listening ip"
update-mode = "dynamic"
[[parameter]]
name = "hostMmuLimitation"
scope = ["global"]
access = ["file"]
type = "int64"
domain-type = "set"
values = ["1099511627776"]
comment = "host mmu limitation. default: 1 << 40 = 1099511627776"
update-mode = "dynamic"
[[parameter]]
name = "guestMmuLimitation"
scope = ["global"]
access = ["file"]
type = "int64"
domain-type = "set"
values = ["1099511627776"]
comment = "guest mmu limitation. default: 1 << 40 = 1099511627776"
update-mode = "dynamic"
[[parameter]]
name = "mempoolMaxSize"
scope = ["global"]
access = ["file"]
type = "int64"
domain-type = "set"
values = ["1099511627776"]
comment = "mempool maxsize. default: 1 << 40 = 1099511627776"
update-mode = "dynamic"
[[parameter]]
name = "mempoolFactor"
scope = ["global"]
access = ["file"]
type = "int64"
domain-type = "set"
values = ["8"]
comment = "mempool factor. default: 8"
update-mode = "dynamic"
[[parameter]]
name = "processLimitationSize"
scope = ["global"]
access = ["file"]
type = "int64"
domain-type = "set"
values = ["42949672960"]
comment = "process.Limitation.Size. default: 10 << 32 = 42949672960"
update-mode = "dynamic"
[[parameter]]
name = "processLimitationBatchRows"
scope = ["global"]
access = ["file"]
type = "int64"
domain-type = "set"
values = ["42949672960"]
comment = "process.Limitation.BatchRows. default: 10 << 32 = 42949672960"
update-mode = "dynamic"
[[parameter]]
name = "processLimitationBatchSize"
scope = ["global"]
access = ["file"]
type = "int64"
domain-type = "set"
values = ["0"]
comment = "process.Limitation.BatchSize. default: 0"
update-mode = "dynamic"
[[parameter]]
name = "processLimitationPartitionRows"
scope = ["global"]
access = ["file"]
type = "int64"
domain-type = "set"
values = ["42949672960"]
comment = "process.Limitation.PartitionRows. default: 10 << 32 = 42949672960"
update-mode = "dynamic"
[[parameter]]
name = "recordTimeElapsedOfSqlRequest"
scope = ["global"]
access = ["file"]
type = "bool"
domain-type = "set"
values = ["true"]
comment = "record the time elapsed of executing sql request"
update-mode = "dynamic"
[[parameter]]
name = "storePath"
scope = ["global"]
access = ["file"]
type = "string"
domain-type = "set"
values = ["./store"]
comment = "the root directory of the storage and matrixcube's data. The actual dir is cubeDirPrefix + nodeID"
update-mode = "dynamic"
[[parameter]]
name = "lengthOfQueryPrinted"
scope = ["global"]
access = ["file"]
type = "int64"
domain-type = "range"
values = ["50", "-1", "10000"]
comment = "the length of query printed into console. -1, complete string. 0, empty string. >0 , length of characters at the header of the string."
update-mode = "dynamic"
[[parameter]]
name = "batchSizeInLoadData"
scope = ["global"]
access = ["file"]
type = "int64"
domain-type = "range"
values = ["40000","10","100000"]
comment = "the count of rows in vector of batch in load data"
update-mode = "dynamic"
[[parameter]]
name = "loadDataConcurrencyCount"
scope = ["global"]
access = ["file"]
type = "int64"
domain-type = "range"
values = ["4","1","16"]
comment = "default is 4. The count of go routine writing batch into the storage."
update-mode = "dynamic"
[[parameter]]
name = "loadDataSkipWritingBatch"
scope = ["global"]
access = ["file"]
type = "bool"
domain-type = "set"
values = []
comment = "default is fase. Skip writing batch into the storage"
update-mode = "dynamic"
[[parameter]]
name = "enableProfileGetDataFromPipeline"
scope = ["global"]
access = ["file"]
type = "bool"
domain-type = "set"
values = []
comment = "defult is false. true for profiling the getDataFromPipeline"
update-mode = "dynamic"
[[parameter]]
name = "maxBytesInOutbufToFlush"
scope = ["global"]
access = ["file"]
type = "int64"
domain-type = "range"
values = ["1024","32","3096"]
comment = "KB. When the number of bytes in the outbuffer exceeds the it,the outbuffer will be flushed."
update-mode = "dynamic"
[[parameter]]
name = "printLogInterVal"
scope = ["global"]
access = ["file"]
type = "int64"
domain-type = "range"
values = ["10", "1", "1000"]
comment = "default printLog Interval is 10s."
update-mode = "dynamic"
[[parameter]]
name = "exportDataDefaultFlushSize"
scope = ["global"]
access = ["file"]
type = "int64"
domain-type = "set"
values = ["1", "2", "4", "8"]
comment = "export data to csv file default flush size"
update-mode = "dynamic"
[[parameter]]
name = "storageEngine"
scope = ["global"]
access = ["file"]
type = "string"
domain-type = "set"
values = ["tae"]
comment = "default engine is 'tae'."
update-mode = "dynamic"
[[parameter]]
name = "portOfRpcServerInComputationEngine"
scope = ["global"]
access = ["file"]
type = "int64"
domain-type = "range"
values = ["20000", "20000", "65535"]
comment = "port defines which port the rpc server listens on"
update-mode = "dynamic"
[[parameter]]
name = "oneTxnPerBatchDuringLoad"
scope = ["global"]
access = ["file"]
type = "bool"
domain-type = "set"
values = ["true"]
comment = "default is false. true : one txn for an independent batch false : only one txn during loading data"
update-mode = "dynamic"
[[parameter]]
name = "statusPort"
scope = ["global"]
access = ["file"]
type = "int64"
domain-type = "range"
values = ["7001", "7001", "7010"]
comment = "statusPort defines which port the mo status server (for metric etc.) listens on and clients connect to"
update-mode = "dynamic"
[[parameter]]
name = "metricToProm"
scope = ["global"]
access = ["file"]
type = "bool"
domain-type = "set"
values = ["true", "false"]
comment = "default is true. if true, metrics can be scraped through host:status/metrics endpoint"
update-mode = "dynamic"
[[parameter]]
name = "enableMetric"
scope = ["global"]
access = ["file"]
type = "bool"
domain-type = "set"
values = ["true", "false"]
comment = "default is true. if true, enable metric at booting"
update-mode = "dynamic"
[[parameter]]
name = "enableTrace"
scope = ["global"]
access = ["file"]
type = "bool"
domain-type = "set"
values = ["true", "false"]
comment = "default is true. if true, enable trace at booting"
update-mode = "dynamic"
[[parameter]]
name = "traceBatchProcessor"
scope = ["global"]
access = ["file"]
type = "string"
domain-type = "set"
values = ["InternalExecutor", "FileService"]
comment = "default is InternalExecutor. if InternalExecutor, use internal sql executor, FileService will implement soon."
update-mode = "dynamic"
[[parameter]]
name = "enableTraceDebug"
scope = ["global"]
access = ["file"]
type = "bool"
domain-type = "set"
values = ["false", "true"]
comment = "default is false. With true, system will check all the children span is ended, which belong to the closing span."
update-mode = "dynamic"
[[parameter]]
name = "logLevel"
scope = ["global"]
access = ["file"]
type = "string"
domain-type = "set"
values = ["debug", "info", "warn", "error", "fatal"]
comment = "default is 'debug'. the level of log."
update-mode = "dynamic"
[[parameter]]
name = "logFormat"
scope = ["global"]
access = ["file"]
type = "string"
domain-type = "set"
values = ["json", "console"]
comment = "default is 'json'. the format of log."
update-mode = "dynamic"
[[parameter]]
name = "logFilename"
scope = ["global"]
access = ["file"]
type = "string"
domain-type = "set"
values = []
comment = "default is ''. the filename of log file."
update-mode = "dynamic"
[[parameter]]
name = "logMaxSize"
scope = ["global"]
access = ["file"]
type = "int64"
domain-type = "set"
values = ["512"]
comment = "default is 512MB. the maximum of log file size"
update-mode = "dynamic"
[[parameter]]
name = "logMaxDays"
scope = ["global"]
access = ["file"]
type = "int64"
domain-type = "set"
values = ["0"]
comment = "default is 0. the maximum days of log file to be kept"
update-mode = "dynamic"
[[parameter]]
name = "logMaxBackups"
scope = ["global"]
access = ["file"]
type = "int64"
domain-type = "set"
values = ["0"]
comment = "default is 0. the maximum numbers of log file to be retained"
update-mode = "dynamic"
\ No newline at end of file
parameter-struct-name = "AllParameters"
config-struct-name = "configuration"
#without ".go"
operation-file-name = "parameters"
#without ".toml"
config-file-name = "config"
[[parameter]]
name = "boolSet1"
scope = ["global"]
access = ["file"]
type = "bool"
domain-type = "set"
values = ["true"]
comment = "boolSet1"
update-mode = "dynamic"
[[parameter]]
name = "boolSet2"
scope = ["global"]
access = ["file"]
type = "bool"
domain-type = "set"
values = ["false"]
comment = "boolSet2"
update-mode = "hotload"
[[parameter]]
name = "boolSet3"
scope = ["global"]
access = ["file"]
type = "bool"
domain-type = "set"
values = []
comment = "boolSet3"
update-mode = "dynamic"
[[parameter]]
name = "stringSet1"
scope = ["global"]
access = ["file"]
type = "string"
domain-type = "set"
values = ["ss1","ss2","ss3"]
comment = "stringSet1"
update-mode = "dynamic"
[[parameter]]
name = "stringSet2"
scope = ["global"]
access = ["file"]
type = "string"
domain-type = "set"
values = []
comment = "stringSet2"
update-mode = "dynamic"
[[parameter]]
name = "int64set1"
scope = ["global"]
access = ["file"]
type = "int64"
domain-type = "set"
values = ["1","2","3","4","5","6"]
comment = "int64Set1"
update-mode = "dynamic"
[[parameter]]
name = "int64set2"
scope = ["global"]
access = ["file"]
type = "int64"
domain-type = "set"
values = ["1","3","5","7"]
comment = "int64Set2"
update-mode = "fix"
[[parameter]]
name = "int64set3"
scope = ["global"]
access = ["file"]
type = "int64"
domain-type = "set"
values = []
comment = "int64Set3"
update-mode = "dynamic"
[[parameter]]
name = "int64Range1"
scope = ["global"]
access = ["file"]
type = "int64"
domain-type = "range"
values = ["1000","0","10000"]
comment = "int64Range1"
update-mode = "dynamic"
[[parameter]]
name = "float64set1"
scope = ["global"]
access = ["file"]
type = "float64"
domain-type = "set"
values = ["1.0","2.","3.","4.00","5","6"]
comment = "float64Set1"
update-mode = "dynamic"
[[parameter]]
name = "float64set2"
scope = ["global"]
access = ["file"]
type = "float64"
domain-type = "set"
values = ["1.001","3.003","5.005","7.007"]
comment = "float64Set2"
update-mode = "fix"
[[parameter]]
name = "float64set3"
scope = ["global"]
access = ["file"]
type = "float64"
domain-type = "set"
values = []
comment = "float64Set3"
update-mode = "dynamic"
[[parameter]]
name = "float64Range1"
scope = ["global"]
access = ["file"]
type = "float64"
domain-type = "range"
values = ["1000.01","0.02","10000.03"]
comment = "float64Range1"
update-mode = "dynamic"
\ No newline at end of file
parameter-struct-name = "Variables"
config-struct-name = "vconfig"
#without ".go"
operation-file-name = "variables"
#without ".toml"
config-file-name = "varconfig"
[[parameter]]
name = "autoload"
scope = ["global"]
access = ["file"]
type = "bool"
domain-type = "set"
values = []
comment = "autoload something"
update-mode = "dynamic"
[[parameter]]
name = "rootname"
scope = ["global"]
access = ["file"]
type = "string"
domain-type = "set"
values = ["root"]
comment = "root name"
update-mode = "dynamic"
[[parameter]]
name = "rootpassword"
scope = ["global"]
access = ["file"]
type = "string"
domain-type = "set"
values = [""]
comment = "root password"
update-mode = "dynamic"
[[parameter]]
name = "dumpuser"
scope = ["global"]
access = ["file"]
type = "string"
domain-type = "set"
values = ["dump"]
comment = "dump user name"
update-mode = "dynamic"
[[parameter]]
name = "dumppassword"
scope = ["global"]
access = ["file"]
type = "string"
domain-type = "set"
values = ["111"]
comment = "dump user password"
update-mode = "dynamic"
[[parameter]]
name = "port"
scope = ["global"]
access = ["file"]
type = "int64"
domain-type = "set"
values = ["9000"]
comment = "port"
update-mode = "dynamic"
[[parameter]]
name = "ip"
scope = ["global"]
access = ["file"]
type = "string"
domain-type = "set"
values = ["localhost","127.0.0.1"]
comment = "listening ip"
update-mode = "dynamic"
\ No newline at end of file
# do not change this part {
parameter-struct-name = "SystemVariables"
config-struct-name = "varsConfig"
operation-file-name = "system_vars"
config-file-name = "system_vars_config"
# }
[[parameter]]
name = "rootname"
scope = ["global"]
access = ["file"]
type = "string"
domain-type = "set"
values = ["root"]
comment = "root name"
update-mode = "fix"
[[parameter]]
name = "rootpassword"
scope = ["global"]
access = ["file"]
type = "string"
domain-type = "set"
values = [""]
comment = "root password"
update-mode = "dynamic"
[[parameter]]
name = "dumpuser"
scope = ["global"]
access = ["file"]
type = "string"
domain-type = "set"
values = ["dump"]
comment = "dump user name"
update-mode = "fix"
[[parameter]]
name = "dumppassword"
scope = ["global"]
access = ["file"]
type = "string"
domain-type = "set"
values = ["111"]
comment = "dump user password"
update-mode = "fix"
[[parameter]]
name = "port"
scope = ["global"]
access = ["file"]
type = "int64"
domain-type = "set"
values = ["6001"]
comment = "port"
update-mode = "fix"
[[parameter]]
name = "host"
scope = ["global"]
access = ["file"]
type = "string"
domain-type = "set"
values = ["localhost","127.0.0.1","0.0.0.0"]
comment = "listening ip"
update-mode = "fix"
\ No newline at end of file
parameter-struct-name = "AllParameters"
config-struct-name = "configuration"
operation-file-name = "parameters"
config-file-name = "config"
[[parameter]]
name = "autocommit"
scope = ["global","session"]
access = ["file"]
type = "bool"
domain-type = "set"
values = ["true"]
comment = "autocommit"
update-mode = "dynamic"
[[parameter]]
name = "backlog"
scope = ["global"]
access = ["file"]
type = "int64"
domain-type = "range"
values = ["2","1","65535"]
comment = "back-log"
update-mode = "fix"
\ No newline at end of file
parameter-struct-name = "AllParameters"
config-struct-name = "configuration"
operation-file-name = "parameters"
config-file-name = "config"
[[parameter]]
name = "autocommit~"
scope = ["global","session"]
access = ["file"]
type = "bool"
domain-type = "set"
values = ["true"]
comment = "autocommit"
update-mode = "dynamic"
\ No newline at end of file
......@@ -16,6 +16,7 @@ package main
import (
"fmt"
"github.com/matrixorigin/matrixone/pkg/cnservice"
"net"
"os"
"strings"
......@@ -62,6 +63,8 @@ type Config struct {
DN dnservice.Config `toml:"dn"`
// LogService is the config for log service
LogService logservice.Config `toml:"logservice"`
// CN cn service config
CN cnservice.Config `toml:"cn"`
}
func parseConfigFromFile(file string) (*Config, error) {
......
......@@ -18,6 +18,7 @@ import (
"context"
"flag"
"fmt"
"github.com/matrixorigin/matrixone/pkg/cnservice"
"math/rand"
"os"
"os/signal"
......@@ -70,7 +71,7 @@ func startService(cfg *Config, stopper *stopper.Stopper) error {
// TODO: start other service
switch strings.ToUpper(cfg.ServiceType) {
case cnServiceType:
panic("not implemented")
return startCNService(cfg, stopper)
case dnServiceType:
return startDNService(cfg, stopper)
case logServiceType:
......@@ -82,6 +83,24 @@ func startService(cfg *Config, stopper *stopper.Stopper) error {
}
}
func startCNService(cfg *Config, stopper *stopper.Stopper) error {
cfg.CN.Frontend.SetLogAndVersion(&cfg.Log, Version)
return stopper.RunNamedTask("cn-service", func(ctx context.Context) {
s, err := cnservice.NewService(&cfg.CN, ctx)
if err != nil {
panic(err)
}
if err := s.Start(); err != nil {
panic(err)
}
<-ctx.Done()
if err := s.Close(); err != nil {
panic(err)
}
})
}
func startDNService(cfg *Config, stopper *stopper.Stopper) error {
return stopper.RunNamedTask("dn-service", func(ctx context.Context) {
s, err := dnservice.NewService(&cfg.DN,
......
# service node type, [DN|CN|LOG]
service-type = "CN"
[log]
level = "debug"
format = "json"
max-size = 512
\ No newline at end of file
......@@ -15,6 +15,14 @@
package cnservice
import (
"context"
"fmt"
"github.com/matrixorigin/matrixone/pkg/config"
"github.com/matrixorigin/matrixone/pkg/frontend"
ie "github.com/matrixorigin/matrixone/pkg/util/internalExecutor"
"github.com/matrixorigin/matrixone/pkg/util/metric"
"github.com/matrixorigin/matrixone/pkg/util/trace"
"github.com/matrixorigin/matrixone/pkg/vm/mmu/host"
"sync"
"github.com/fagongzi/goetty/v2"
......@@ -23,7 +31,7 @@ import (
"github.com/matrixorigin/matrixone/pkg/pb/pipeline"
)
func NewService(cfg *Config) (Service, error) {
func NewService(cfg *Config, ctx context.Context) (Service, error) {
srv := &service{cfg: cfg}
srv.logger = logutil.Adjust(srv.logger)
srv.pool = &sync.Pool{
......@@ -39,14 +47,30 @@ func NewService(cfg *Config) (Service, error) {
}
server.RegisterRequestHandler(srv.handleRequest)
srv.server = server
pu := config.NewParameterUnit(&cfg.Frontend, nil, nil, nil, nil)
cfg.Frontend.SetDefaultValues()
err = srv.initMOServer(ctx, pu)
if err != nil {
return nil, err
}
return srv, nil
}
func (s *service) Start() error {
err := s.runMoServer()
if err != nil {
return err
}
return s.server.Start()
}
func (s *service) Close() error {
err := s.serverShutdown(true)
if err != nil {
return err
}
s.cancelMoServerFunc()
return s.server.Close()
}
......@@ -64,3 +88,76 @@ func (s *service) releaseMessage(msg *pipeline.Message) {
func (s *service) handleRequest(req morpc.Message, _ uint64, cs morpc.ClientSession) error {
return nil
}
func (s *service) initMOServer(ctx context.Context, pu *config.ParameterUnit) error {
var err error
logutil.Infof("Shutdown The Server With Ctrl+C | Ctrl+\\.")
cancelMoServerCtx, cancelMoServerFunc := context.WithCancel(ctx)
s.cancelMoServerFunc = cancelMoServerFunc
pu.HostMmu = host.New(pu.SV.HostMmuLimitation)
fmt.Println("Initialize the engine ...")
err = s.initEngine(ctx, pu)
if err != nil {
return err
}
err = frontend.InitDB(cancelMoServerCtx, s.engine)
if err != nil {
return err
}
fmt.Println("Initialize the engine Done")
s.createMOServer(cancelMoServerCtx, pu)
return nil
}
func (s *service) initEngine(ctx context.Context, pu *config.ParameterUnit) error {
//TODO: initialize the engine
pu.StorageEngine = nil
s.engine = nil
return nil
}
func (s *service) createMOServer(inputCtx context.Context, pu *config.ParameterUnit) {
address := fmt.Sprintf("%s:%d", pu.SV.Host, pu.SV.Port)
moServerCtx := context.WithValue(inputCtx, config.ParameterUnitKey, pu)
s.mo = frontend.NewMOServer(moServerCtx, address, pu)
{
// init trace/log/error framework
if _, err := trace.Init(moServerCtx,
trace.WithMOVersion(pu.SV.MoVersion),
trace.WithNode(0, trace.NodeTypeNode),
trace.EnableTracer(!pu.SV.DisableTrace),
trace.WithBatchProcessMode(pu.SV.TraceBatchProcessor),
trace.DebugMode(pu.SV.EnableTraceDebug),
trace.WithSQLExecutor(func() ie.InternalExecutor {
return frontend.NewInternalExecutor(pu)
}),
); err != nil {
panic(err)
}
}
if !pu.SV.DisableMetric {
ieFactory := func() ie.InternalExecutor {
return frontend.NewInternalExecutor(pu)
}
metric.InitMetric(moServerCtx, ieFactory, pu, 0, metric.ALL_IN_ONE_MODE)
}
frontend.InitServerVersion(pu.SV.MoVersion)
}
func (s *service) runMoServer() error {
return s.mo.Start()
}
func (s *service) serverShutdown(isgraceful bool) error {
// flush trace/log/error framework
if err := trace.Shutdown(trace.DefaultContext()); err != nil {
logutil.Errorf("Shutdown trace err: %v", err)
}
return s.mo.Stop()
}
......@@ -15,6 +15,10 @@
package cnservice
import (
"context"
"github.com/matrixorigin/matrixone/pkg/config"
"github.com/matrixorigin/matrixone/pkg/frontend"
"github.com/matrixorigin/matrixone/pkg/vm/engine"
"sync"
"github.com/matrixorigin/matrixone/pkg/common/morpc"
......@@ -51,11 +55,16 @@ type Config struct {
// BatchSize is the memory limit for one batch
BatchSize int64 `toml:"batch-size"`
}
//parameters for the frontend
Frontend config.FrontendParameters `toml:"frontend"`
}
type service struct {
cfg *Config
pool *sync.Pool
logger *zap.Logger
server morpc.RPCServer
cfg *Config
pool *sync.Pool
logger *zap.Logger
server morpc.RPCServer
cancelMoServerFunc context.CancelFunc
engine engine.Engine
mo *frontend.MOServer
}
......@@ -16,6 +16,8 @@ package config
import (
"context"
"github.com/matrixorigin/matrixone/pkg/logutil"
"github.com/matrixorigin/matrixone/pkg/util/toml"
"github.com/matrixorigin/matrixone/pkg/vm/engine"
"github.com/matrixorigin/matrixone/pkg/vm/mempool"
"github.com/matrixorigin/matrixone/pkg/vm/mmu/host"
......@@ -27,22 +29,303 @@ const (
ParameterUnitKey ConfigurationKeyType = 1
)
var GlobalSystemVariables SystemVariables
var (
//root name
defaultRootName = "root"
// HostMmu host memory
var HostMmu *host.Mmu = nil
//root password
defaultRootPassword = ""
// Mempool memory pool
var Mempool *mempool.Mempool = nil
//dump user name
defaultDumpUser = "dump"
// StorageEngine Storage Engine
var StorageEngine engine.Engine
//dump user password
defaultDumpPassword = "111"
// ClusterNodes Cluster Nodes
var ClusterNodes engine.Nodes
//port defines which port the mo-server listens on and clients connect to
defaultPort = 6001
//listening ip
defaultHost = "0.0.0.0"
//host mmu limitation. 1 << 40 = 1099511627776
defaultHostMmuLimitation = 1099511627776
//guest mmu limitation. 1 << 40 = 1099511627776
defaultGuestMmuLimitation = 1099511627776
//mempool maxsize. 1 << 40 = 1099511627776
defaultMempoolMaxSize = 1099511627776
//mempool factor.
defaultMempoolFactor = 8
//process.Limitation.Size. 10 << 32 = 42949672960
defaultProcessLimitationSize = 42949672960
//process.Limitation.BatchRows. 10 << 32 = 42949672960
defaultProcessLimitationBatchRows = 42949672960
//process.Limitation.PartitionRows. 10 << 32 = 42949672960
defaultProcessLimitationPartitionRows = 42949672960
//the root directory of the storage
defaultStorePath = "./store"
//the length of query printed into console. -1, complete string. 0, empty string. >0 , length of characters at the header of the string.
defaultLengthOfQueryPrinted = 50
//the count of rows in vector of batch in load data
defaultBatchSizeInLoadData = 40000
//initial value is 4. The count of go routine writing batch into the storage.
defaultLoadDataConcurrencyCount = 4
//KB. When the number of bytes in the outbuffer exceeds the it,the outbuffer will be flushed.
defaultMaxBytesInOutbufToFlush = 1024
//printLog Interval is 10s.
defaultPrintLogInterVal = 10
//export data to csv file default flush size
defaultExportDataDefaultFlushSize = 1
//port defines which port the rpc server listens on
defaultPortOfRpcServerInComputationEngine = 20000
//statusPort defines which port the mo status server (for metric etc.) listens on and clients connect to
defaultStatusPort = 7001
//default is InternalExecutor. if InternalExecutor, use internal sql executor, FileService will implement soon.
defaultTraceBatchProcessor = "InternalExecutor"
)
// FrontendParameters of the frontend
type FrontendParameters struct {
MoVersion string
//root name
RootName string `toml:"rootname"`
//root password
RootPassword string `toml:"rootpassword"`
DumpUser string `toml:"dumpuser"`
DumpPassword string `toml:"dumppassword"`
//dump database
DumpDatabase string `toml:"dumpdatabase"`
//port defines which port the mo-server listens on and clients connect to
Port int64 `toml:"port"`
//listening ip
Host string `toml:"host"`
//host mmu limitation. default: 1 << 40 = 1099511627776
HostMmuLimitation int64 `toml:"hostMmuLimitation"`
//guest mmu limitation. default: 1 << 40 = 1099511627776
GuestMmuLimitation int64 `toml:"guestMmuLimitation"`
//mempool maxsize. default: 1 << 40 = 1099511627776
MempoolMaxSize int64 `toml:"mempoolMaxSize"`
//mempool factor. default: 8
MempoolFactor int64 `toml:"mempoolFactor"`
//process.Limitation.Size. default: 10 << 32 = 42949672960
ProcessLimitationSize int64 `toml:"processLimitationSize"`
//process.Limitation.BatchRows. default: 10 << 32 = 42949672960
ProcessLimitationBatchRows int64 `toml:"processLimitationBatchRows"`
//process.Limitation.BatchSize. default: 0
ProcessLimitationBatchSize int64 `toml:"processLimitationBatchSize"`
//process.Limitation.PartitionRows. default: 10 << 32 = 42949672960
ProcessLimitationPartitionRows int64 `toml:"processLimitationPartitionRows"`
//record the time elapsed of executing sql request
DisableRecordTimeElapsedOfSqlRequest bool `toml:"DisableRecordTimeElapsedOfSqlRequest"`
//the root directory of the storage and matrixcube's data. The actual dir is cubeDirPrefix + nodeID
StorePath string `toml:"storePath"`
//the length of query printed into console. -1, complete string. 0, empty string. >0 , length of characters at the header of the string.
LengthOfQueryPrinted int64 `toml:"lengthOfQueryPrinted"`
//the count of rows in vector of batch in load data
BatchSizeInLoadData int64 `toml:"batchSizeInLoadData"`
//default is 4. The count of go routine writing batch into the storage.
LoadDataConcurrencyCount int64 `toml:"loadDataConcurrencyCount"`
//default is false. Skip writing batch into the storage
LoadDataSkipWritingBatch bool `toml:"loadDataSkipWritingBatch"`
//default is false. true for profiling the getDataFromPipeline
EnableProfileGetDataFromPipeline bool `toml:"enableProfileGetDataFromPipeline"`
//KB. When the number of bytes in the outbuffer exceeds it,the outbuffer will be flushed.
MaxBytesInOutbufToFlush int64 `toml:"maxBytesInOutbufToFlush"`
//default printLog Interval is 10s.
PrintLogInterVal int64 `toml:"printLogInterVal"`
//export data to csv file default flush size
ExportDataDefaultFlushSize int64 `toml:"exportDataDefaultFlushSize"`
//port defines which port the rpc server listens on
PortOfRpcServerInComputationEngine int64 `toml:"portOfRpcServerInComputationEngine"`
//default is false. false : one txn for an independent batch true : only one txn during loading data
DisableOneTxnPerBatchDuringLoad bool `toml:"DisableOneTxnPerBatchDuringLoad"`
//statusPort defines which port the mo status server (for metric etc.) listens on and clients connect to
StatusPort int64 `toml:"statusPort"`
//default is false. if false, metrics can be scraped through host:status/metrics endpoint
DisableMetricToProm bool `toml:"disableMetricToProm"`
//default is false. if false, enable metric at booting
DisableMetric bool `toml:"disableMetric"`
//default is false. if false, enable trace at booting
DisableTrace bool `toml:"disableTrace"`
//default is InternalExecutor. if InternalExecutor, use internal sql executor, FileService will implement soon.
TraceBatchProcessor string `toml:"traceBatchProcessor"`
//default is false. With true, system will check all the children span is ended, which belong to the closing span.
EnableTraceDebug bool `toml:"enableTraceDebug"`
//default is 'debug'. the level of log.
LogLevel string `toml:"logLevel"`
//default is 'json'. the format of log.
LogFormat string `toml:"logFormat"`
//default is ''. the file
LogFilename string `toml:"logFilename"`
//default is 512MB. the maximum of log file size
LogMaxSize int64 `toml:"logMaxSize"`
//default is 0. the maximum days of log file to be kept
LogMaxDays int64 `toml:"logMaxDays"`
//default is 0. the maximum numbers of log file to be retained
LogMaxBackups int64 `toml:"logMaxBackups"`
}
func (fp *FrontendParameters) SetDefaultValues() {
if fp.RootName == "" {
fp.RootName = defaultRootName
}
if fp.RootPassword == "" {
fp.RootPassword = defaultRootPassword
}
if fp.DumpUser == "" {
fp.DumpUser = defaultDumpUser
}
if fp.DumpPassword == "" {
fp.DumpPassword = defaultDumpPassword
}
if fp.Port == 0 {
fp.Port = int64(defaultPort)
}
if fp.Host == "" {
fp.Host = defaultHost
}
if fp.HostMmuLimitation == 0 {
fp.HostMmuLimitation = int64(defaultHostMmuLimitation)
}
if fp.GuestMmuLimitation == 0 {
fp.GuestMmuLimitation = int64(toml.ByteSize(defaultGuestMmuLimitation))
}
if fp.MempoolMaxSize == 0 {
fp.MempoolMaxSize = int64(toml.ByteSize(defaultMempoolMaxSize))
}
if fp.MempoolFactor == 0 {
fp.MempoolFactor = int64(defaultMempoolFactor)
}
if fp.ProcessLimitationSize == 0 {
fp.ProcessLimitationSize = int64(toml.ByteSize(defaultProcessLimitationSize))
}
if fp.ProcessLimitationBatchRows == 0 {
fp.ProcessLimitationBatchRows = int64(toml.ByteSize(defaultProcessLimitationBatchRows))
}
if fp.ProcessLimitationPartitionRows == 0 {
fp.ProcessLimitationPartitionRows = int64(toml.ByteSize(defaultProcessLimitationPartitionRows))
}
if fp.StorePath == "" {
fp.StorePath = defaultStorePath
}
if fp.LengthOfQueryPrinted == 0 {
fp.LengthOfQueryPrinted = int64(defaultLengthOfQueryPrinted)
}
if fp.BatchSizeInLoadData == 0 {
fp.BatchSizeInLoadData = int64(defaultBatchSizeInLoadData)
}
if fp.LoadDataConcurrencyCount == 0 {
fp.LoadDataConcurrencyCount = int64(defaultLoadDataConcurrencyCount)
}
if fp.MaxBytesInOutbufToFlush == 0 {
fp.MaxBytesInOutbufToFlush = int64(defaultMaxBytesInOutbufToFlush)
}
if fp.PrintLogInterVal == 0 {
fp.PrintLogInterVal = int64(defaultPrintLogInterVal)
}
if fp.ExportDataDefaultFlushSize == 0 {
fp.ExportDataDefaultFlushSize = int64(defaultExportDataDefaultFlushSize)
}
if fp.PortOfRpcServerInComputationEngine == 0 {
fp.PortOfRpcServerInComputationEngine = int64(defaultPortOfRpcServerInComputationEngine)
}
if fp.StatusPort == 0 {
fp.StatusPort = int64(defaultStatusPort)
}
if fp.TraceBatchProcessor == "" {
fp.TraceBatchProcessor = defaultTraceBatchProcessor
}
}
func (fp *FrontendParameters) SetLogAndVersion(log *logutil.LogConfig, version string) {
fp.LogLevel = log.Level
fp.LogFormat = log.Format
fp.LogFilename = log.Filename
fp.LogMaxSize = int64(log.MaxSize)
fp.LogMaxDays = int64(log.MaxDays)
fp.LogMaxBackups = int64(log.MaxBackups)
fp.MoVersion = version
}
type ParameterUnit struct {
SV *SystemVariables
SV *FrontendParameters
//host memory
HostMmu *host.Mmu
......@@ -57,7 +340,7 @@ type ParameterUnit struct {
ClusterNodes engine.Nodes
}
func NewParameterUnit(sv *SystemVariables, hostMmu *host.Mmu, mempool *mempool.Mempool, storageEngine engine.Engine, clusterNodes engine.Nodes) *ParameterUnit {
func NewParameterUnit(sv *FrontendParameters, hostMmu *host.Mmu, mempool *mempool.Mempool, storageEngine engine.Engine, clusterNodes engine.Nodes) *ParameterUnit {
return &ParameterUnit{
SV: sv,
HostMmu: hostMmu,
......
......@@ -136,7 +136,7 @@ func (ie *internalExecutor) Query(ctx context.Context, sql string, opts ie.Sessi
}
func (ie *internalExecutor) newCmdSession(opts ie.SessionOverrideOptions) *Session {
sess := NewSession(ie.proto, guest.New(ie.pu.SV.GetGuestMmuLimitation(), ie.pu.HostMmu), ie.pu.Mempool, ie.pu, gSysVariables)
sess := NewSession(ie.proto, guest.New(ie.pu.SV.GuestMmuLimitation, ie.pu.HostMmu), ie.pu.Mempool, ie.pu, gSysVariables)
applyOverride(sess, ie.baseSessOpts)
applyOverride(sess, opts)
return sess
......
This diff is collapsed.
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment