Skip to content
Snippets Groups Projects
Unverified Commit 1ef08ad1 authored by fagongzi's avatar fagongzi Committed by GitHub
Browse files

misc: add launch multi-replica mo cluster in main (#5167)

parent 5a22dfb7
No related branches found
No related tags found
No related merge requests found
......@@ -50,6 +50,16 @@ var (
}
)
// LaunchConfig Start a MO cluster with launch
type LaunchConfig struct {
// LogServiceConfigFiles log service config files
LogServiceConfigFiles []string `toml:"logservices"`
// DNServiceConfigsFiles log service config files
DNServiceConfigsFiles []string `toml:"dnservices"`
// CNServiceConfigsFiles log service config files
CNServiceConfigsFiles []string `toml:"cnservices"`
}
// Config mo-service configuration
type Config struct {
// Log log config
......@@ -71,29 +81,22 @@ type Config struct {
Observability config.ObservabilityParameters `toml:"observability"`
}
func parseConfigFromFile(file string) (*Config, error) {
func parseConfigFromFile(file string, cfg any) error {
if file == "" {
return nil, fmt.Errorf("toml config file not set")
return fmt.Errorf("toml config file not set")
}
data, err := os.ReadFile(file)
if err != nil {
return nil, err
return err
}
return parseFromString(string(data))
return parseFromString(string(data), cfg)
}
func parseFromString(data string) (*Config, error) {
cfg := &Config{}
func parseFromString(data string, cfg any) error {
if _, err := toml.Decode(data, cfg); err != nil {
return nil, err
return err
}
if err := cfg.validate(); err != nil {
return nil, err
}
if err := cfg.resolveGossipSeedAddresses(); err != nil {
return nil, err
}
return cfg, nil
return nil
}
func (c *Config) validate() error {
......
......@@ -61,7 +61,8 @@ func TestParseDNConfig(t *testing.T) {
# txn storage backend implementation. [TAE|MEM]
backend = "MEM"
`
cfg, err := parseFromString(data)
cfg := &Config{}
err := parseFromString(data, cfg)
assert.NoError(t, err)
assert.Equal(t, "MEM", cfg.DN.Txn.Storage.Backend)
assert.Equal(t, 2, len(cfg.FileServices))
......@@ -168,7 +169,8 @@ service-addresses = [
"127.0.0.1:32000",
]
`
cfg, err := parseFromString(data)
cfg := &Config{}
err := parseFromString(data, cfg)
assert.NoError(t, err)
assert.Equal(t, 1, len(cfg.LogService.GossipSeedAddresses))
assert.Equal(t, "127.0.0.1:32002", cfg.LogService.GossipSeedAddresses[0])
......
// Copyright 2022 Matrix Origin
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
"context"
"time"
"github.com/matrixorigin/matrixone/pkg/common/moerr"
"github.com/matrixorigin/matrixone/pkg/common/stopper"
"github.com/matrixorigin/matrixone/pkg/logservice"
"github.com/matrixorigin/matrixone/pkg/logutil"
"go.uber.org/zap"
)
func startCluster(stopper *stopper.Stopper) error {
if *launchFile == "" {
panic("launch file not set")
}
cfg := &LaunchConfig{}
if err := parseConfigFromFile(*launchFile, cfg); err != nil {
return err
}
client, err := startLogServiceCluster(cfg.LogServiceConfigFiles, stopper)
if err != nil {
return err
}
defer func() {
if err := client.Close(); err != nil {
logutil.Error("close hakeeper client failed", zap.Error(err))
}
}()
if err := startDNServiceCluster(cfg.DNServiceConfigsFiles, stopper, client); err != nil {
return err
}
if err := startCNServiceCluster(cfg.CNServiceConfigsFiles, stopper); err != nil {
return err
}
return nil
}
func startLogServiceCluster(
files []string,
stopper *stopper.Stopper) (logservice.CNHAKeeperClient, error) {
if len(files) == 0 {
return nil, moerr.NewBadConfig("DN service config not set")
}
var cfg *Config
for _, file := range files {
cfg = &Config{}
if err := parseConfigFromFile(file, cfg); err != nil {
return nil, err
}
if err := startService(cfg, stopper); err != nil {
return nil, err
}
}
return waitHAKeeperReady(cfg.HAKeeperClient)
}
func startDNServiceCluster(
files []string,
stopper *stopper.Stopper,
client logservice.CNHAKeeperClient) error {
if len(files) == 0 {
return moerr.NewBadConfig("DN service config not set")
}
for _, file := range files {
cfg := &Config{}
if err := parseConfigFromFile(file, cfg); err != nil {
return err
}
if err := startService(cfg, stopper); err != nil {
return nil
}
}
return waitAnyShardReady(client)
}
func startCNServiceCluster(
files []string,
stopper *stopper.Stopper) error {
if len(files) == 0 {
return moerr.NewBadConfig("CN service config not set")
}
var cfg *Config
for _, file := range files {
cfg = &Config{}
if err := parseConfigFromFile(file, cfg); err != nil {
return err
}
if err := startService(cfg, stopper); err != nil {
return err
}
}
return nil
}
func waitHAKeeperReady(cfg logservice.HAKeeperClientConfig) (logservice.CNHAKeeperClient, error) {
// wait hakeeper ready
ctx, cancel := context.WithTimeout(context.TODO(), time.Second*30)
defer cancel()
for {
var err error
client, err := logservice.NewCNHAKeeperClient(ctx, cfg)
if moerr.IsMoErrCode(err, moerr.ErrNoHAKeeper) {
// not ready
logutil.Info("hakeeper not ready, retry")
time.Sleep(time.Second)
continue
}
return client, err
}
}
func waitAnyShardReady(client logservice.CNHAKeeperClient) error {
ctx, cancel := context.WithTimeout(context.TODO(), time.Second*30)
defer cancel()
// wait shard ready
for {
if ok, err := func() (bool, error) {
details, err := client.GetClusterDetails(ctx)
if err != nil {
return false, err
}
for _, store := range details.DNStores {
if len(store.Shards) > 0 {
return true, nil
}
}
logutil.Info("shard not ready")
return false, nil
}(); err != nil {
return err
} else if ok {
logutil.Info("shard ready")
return nil
}
time.Sleep(time.Second)
}
}
......@@ -22,9 +22,11 @@ import (
"os"
"os/signal"
"strings"
"sync"
"syscall"
"time"
"github.com/google/uuid"
"github.com/matrixorigin/matrixone/pkg/cnservice"
"github.com/matrixorigin/matrixone/pkg/cnservice/cnclient"
"github.com/matrixorigin/matrixone/pkg/common/moerr"
......@@ -40,12 +42,11 @@ import (
"github.com/matrixorigin/matrixone/pkg/util/metric"
"github.com/matrixorigin/matrixone/pkg/util/trace"
"go.uber.org/zap"
"github.com/google/uuid"
)
var (
configFile = flag.String("cfg", "./mo.toml", "toml configuration used to start mo-service")
launchFile = flag.String("launch", "", "toml configuration used to launch mo cluster")
version = flag.Bool("version", false, "print version information")
)
......@@ -60,24 +61,32 @@ func main() {
if *allocsProfilePathFlag != "" {
defer writeAllocsProfile()
}
rand.Seed(time.Now().UnixNano())
cfg, err := parseConfigFromFile(*configFile)
if err != nil {
panic(fmt.Sprintf("failed to parse config from %s, error: %s", *configFile, err.Error()))
}
setupLogger(cfg)
stopper := stopper.NewStopper("main", stopper.WithLogger(logutil.GetGlobalLogger()))
if err := startService(cfg, stopper); err != nil {
panic(err)
if *launchFile != "" {
if err := startCluster(stopper); err != nil {
panic(err)
}
} else if *configFile != "" {
cfg := &Config{}
if err := parseConfigFromFile(*configFile, cfg); err != nil {
panic(fmt.Sprintf("failed to parse config from %s, error: %s", *configFile, err.Error()))
}
if err := startService(cfg, stopper); err != nil {
panic(err)
}
}
waitSignalToStop(stopper)
}
var setupOnce sync.Once
func setupLogger(cfg *Config) {
logutil.SetupMOLogger(&cfg.Log)
setupOnce.Do(func() {
logutil.SetupMOLogger(&cfg.Log)
})
}
func waitSignalToStop(stopper *stopper.Stopper) {
......@@ -88,6 +97,15 @@ func waitSignalToStop(stopper *stopper.Stopper) {
}
func startService(cfg *Config, stopper *stopper.Stopper) error {
if err := cfg.validate(); err != nil {
return err
}
if err := cfg.resolveGossipSeedAddresses(); err != nil {
return err
}
// FIXME: Initialize the logger with the service's own logging configuration
setupLogger(cfg)
fs, err := cfg.createFileService(localFileServiceName)
if err != nil {
......
# service node type, [DN|CN|LOG]
service-type = "CN"
[log]
level = "debug"
format = "json"
max-size = 512
[hakeeper-client]
service-addresses = [
"127.0.0.1:32001",
"127.0.0.1:32011",
"127.0.0.1:32021",
]
[[fileservice]]
name = "LOCAL"
backend = "MEM"
[[fileservice]]
name = "S3"
backend = "MEM"
[[fileservice]]
name = "ETL"
backend = "DISK-ETL"
data-dir = "store"
[cn]
uuid = "dd1dccb4-4d3c-41f8-b482-5251dc7a41bf"
listen-address = "127.0.0.1:3307"
role = "TP"
[cn.Engine]
type = "memory"
[observability]
statusPort = 9001
\ No newline at end of file
# service node type, [DN|CN|LOG]
service-type = "DN"
[log]
level = "info"
format = "json"
max-size = 512
[hakeeper-client]
service-addresses = [
"127.0.0.1:32001",
"127.0.0.1:32011",
"127.0.0.1:32021",
]
[[fileservice]]
name = "LOCAL"
backend = "MEM"
[[fileservice]]
name = "S3"
backend = "MEM"
[[fileservice]]
name = "ETL"
backend = "DISK-ETL"
data-dir = "store"
[dn]
uuid = "dd4dccb4-4d3c-41f8-b482-5251dc7a41bf"
listen-address = "127.0.0.1:41010"
service-address = "127.0.0.1:41010"
[dn.Txn.Storage]
# txn storage backend implementation. [TAE|MEM]
backend = "MEM"
[observability]
statusPort = 8001
\ No newline at end of file
logservices = [
"./etc/launch/log1.toml",
"./etc/launch/log2.toml",
"./etc/launch/log3.toml"
]
dnservices = [
"./etc/launch/dn1.toml"
]
cnservices = [
"./etc/launch/cn1.toml"
]
\ No newline at end of file
# service node type, [DN|CN|LOG]
service-type = "LOG"
[log]
level = "info"
format = "json"
max-size = 512
[logservice]
deployment-id = 1
data-dir = "node-1-data"
uuid = "7c4dccb4-4d3c-41f8-b482-5251dc7a41bf"
raft-address = "127.0.0.1:32000"
logservice-address = "127.0.0.1:32001"
gossip-address = "127.0.0.1:32002"
gossip-seed-addresses = [
"127.0.0.1:32002",
"127.0.0.1:32012",
"127.0.0.1:32022",
]
[logservice.BootstrapConfig]
bootstrap-cluster = true
num-of-log-shards = 1
num-of-dn-shards = 1
num-of-log-shard-replicas = 3
init-hakeeper-members = [
"131072:7c4dccb4-4d3c-41f8-b482-5251dc7a41bf",
"131073:8c4dccb4-4d3c-41f8-b482-5251dc7a41bf",
"131074:9c4dccb4-4d3c-41f8-b482-5251dc7a41bf",
]
[logservice.HAKeeperConfig]
tick-per-second = 1
log-store-timeout = "20s"
dn-store-timeout = "10s"
[[fileservice]]
name = "LOCAL"
backend = "MEM"
[[fileservice]]
name = "S3"
backend = "MEM"
[[fileservice]]
name = "ETL"
backend = "DISK-ETL"
data-dir = "mo-data/log1"
[observability]
statusPort = 7001
[hakeeper-client]
service-addresses = [
"127.0.0.1:32001",
"127.0.0.1:32011",
"127.0.0.1:32021",
]
\ No newline at end of file
# service node type, [DN|CN|LOG]
service-type = "LOG"
[log]
level = "info"
format = "json"
max-size = 512
[logservice]
deployment-id = 1
data-dir = "node-2-data"
uuid = "8c4dccb4-4d3c-41f8-b482-5251dc7a41bf"
raft-address = "127.0.0.1:32010"
logservice-address = "127.0.0.1:32011"
gossip-address = "127.0.0.1:32012"
gossip-seed-addresses = [
"127.0.0.1:32002",
"127.0.0.1:32012",
"127.0.0.1:32022",
]
[logservice.BootstrapConfig]
bootstrap-cluster = true
num-of-log-shards = 1
num-of-dn-shards = 1
num-of-log-shard-replicas = 3
init-hakeeper-members = [
"131072:7c4dccb4-4d3c-41f8-b482-5251dc7a41bf",
"131073:8c4dccb4-4d3c-41f8-b482-5251dc7a41bf",
"131074:9c4dccb4-4d3c-41f8-b482-5251dc7a41bf",
]
[logservice.HAKeeperConfig]
tick-per-second = 1
log-store-timeout = "20s"
dn-store-timeout = "10s"
[[fileservice]]
name = "LOCAL"
backend = "MEM"
[[fileservice]]
name = "S3"
backend = "MEM"
[[fileservice]]
name = "ETL"
backend = "DISK-ETL"
data-dir = "mo-data/log2"
[hakeeper-client]
service-addresses = [
"127.0.0.1:32001",
"127.0.0.1:32011",
"127.0.0.1:32021",
]
[observability]
statusPort = 7002
\ No newline at end of file
# service node type, [DN|CN|LOG]
service-type = "LOG"
[log]
level = "info"
format = "json"
max-size = 512
[logservice]
deployment-id = 1
data-dir = "node-3-data"
uuid = "9c4dccb4-4d3c-41f8-b482-5251dc7a41bf"
raft-address = "127.0.0.1:32020"
logservice-address = "127.0.0.1:32021"
gossip-address = "127.0.0.1:32022"
gossip-seed-addresses = [
"127.0.0.1:32002",
"127.0.0.1:32012",
"127.0.0.1:32022",
]
[logservice.BootstrapConfig]
bootstrap-cluster = true
num-of-log-shards = 1
num-of-dn-shards = 1
num-of-log-shard-replicas = 3
init-hakeeper-members = [
"131072:7c4dccb4-4d3c-41f8-b482-5251dc7a41bf",
"131073:8c4dccb4-4d3c-41f8-b482-5251dc7a41bf",
"131074:9c4dccb4-4d3c-41f8-b482-5251dc7a41bf",
]
[logservice.HAKeeperConfig]
tick-per-second = 1
log-store-timeout = "20s"
dn-store-timeout = "10s"
[[fileservice]]
name = "LOCAL"
backend = "MEM"
[[fileservice]]
name = "S3"
backend = "MEM"
[[fileservice]]
name = "ETL"
backend = "DISK-ETL"
data-dir = "mo-data/log3"
[hakeeper-client]
service-addresses = [
"127.0.0.1:32001",
"127.0.0.1:32011",
"127.0.0.1:32021",
]
[observability]
statusPort = 7003
\ No newline at end of file
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment