Skip to content
Snippets Groups Projects
Unverified Commit 8705c0ba authored by reusee's avatar reusee Committed by GitHub
Browse files

mo-service, cnservice, etc: codes for starting standalone mem engine and...

    mo-service, cnservice, etc: codes for starting standalone mem engine and distributed TAE (#4700)

mo-service, cnservice, etc: codes for starting standalone mem engine and distributed TAE

Approved by: @zhangxu19830126, @lni, @nnsgmsone
parent be9de06e
No related branches found
Tags v4.0-rc6
No related merge requests found
......@@ -16,6 +16,7 @@ package main
import (
"context"
"errors"
"flag"
"fmt"
"math/rand"
......@@ -78,7 +79,7 @@ func startService(cfg *Config, stopper *stopper.Stopper) error {
case logServiceType:
return startLogService(cfg, stopper)
case standaloneServiceType:
panic("not implemented")
return startStandalone(cfg, stopper)
default:
panic("unknown service type")
}
......@@ -145,3 +146,65 @@ func startLogService(cfg *Config, stopper *stopper.Stopper) error {
}
})
}
func startStandalone(cfg *Config, stopper *stopper.Stopper) error {
// start log service
if err := startLogService(cfg, stopper); err != nil {
return err
}
// wait hakeeper ready
ctx, cancel := context.WithTimeout(context.TODO(), time.Second*30)
defer cancel()
var client logservice.CNHAKeeperClient
for {
var err error
client, err = logservice.NewCNHAKeeperClient(ctx, cfg.HAKeeperClient)
if errors.Is(err, logservice.ErrNotHAKeeper) {
// not ready
logutil.Info("hakeeper not ready, retry")
time.Sleep(time.Second)
continue
}
if err != nil {
return err
}
break
}
// start DN
if err := startDNService(cfg, stopper); err != nil {
return err
}
// wait shard ready
for {
if ok, err := func() (bool, error) {
details, err := client.GetClusterDetails(ctx)
if err != nil {
return false, err
}
for _, store := range details.DNStores {
if len(store.Shards) > 0 {
return true, nil
}
}
logutil.Info("shard not ready")
return false, nil
}(); err != nil {
return err
} else if ok {
logutil.Info("shard ready")
break
}
time.Sleep(time.Second)
}
// start CN
if err := startCNService(cfg, stopper); err != nil {
return err
}
return nil
}
service-type = "STANDALONE"
[log]
level = "debug"
format = "json"
max-size = 512
[hakeeper-client]
service-addresses = [
"127.0.0.1:32000",
]
[cn.Engine]
type = "distributed-tae"
[dn]
uuid = "42"
[dn.Txn.Storage]
backend = "TAE"
[logservice]
deployment-id = 1
uuid = "9c4dccb4-4d3c-41f8-b482-5251dc7a41bf"
gossip-seed-addresses = [
"127.0.0.1:32002",
]
[logservice.BootstrapConfig]
bootstrap-cluster = true
num-of-log-shards = 1
num-of-dn-shards = 1
num-of-log-shard-replicas = 1
init-hakeeper-members = [
"131072:9c4dccb4-4d3c-41f8-b482-5251dc7a41bf",
]
[[fileservice]]
name = "LOCAL"
backend = "MEM"
[[fileservice]]
name = "S3"
backend = "MEM"
service-type = "STANDALONE"
[log]
level = "debug"
format = "json"
max-size = 512
[hakeeper-client]
service-addresses = [
"127.0.0.1:32000",
]
[cn.Engine]
type = "memory"
[dn]
uuid = "42"
[dn.Txn.Storage]
backend = "MEM"
[logservice]
deployment-id = 1
uuid = "9c4dccb4-4d3c-41f8-b482-5251dc7a41bf"
gossip-seed-addresses = [
"127.0.0.1:32002",
]
[logservice.BootstrapConfig]
bootstrap-cluster = true
num-of-log-shards = 1
num-of-dn-shards = 1
num-of-log-shard-replicas = 1
init-hakeeper-members = [
"131072:9c4dccb4-4d3c-41f8-b482-5251dc7a41bf",
]
[[fileservice]]
name = "LOCAL"
backend = "MEM"
[[fileservice]]
name = "S3"
backend = "MEM"
// Copyright 2021 - 2022 Matrix Origin
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package cnservice
import (
"context"
"github.com/matrixorigin/matrixone/pkg/config"
"github.com/matrixorigin/matrixone/pkg/vm/engine/disttae"
txnengine "github.com/matrixorigin/matrixone/pkg/vm/engine/txn"
)
func (s *service) initDistributedTAE(
ctx context.Context,
pu *config.ParameterUnit,
) error {
// txn client
client, err := s.getTxnClient()
if err != nil {
return err
}
pu.TxnClient = client
// hakeeper
hakeeper, err := s.getHAKeeperClient()
if err != nil {
return err
}
// engine
pu.StorageEngine = disttae.New(
ctx,
txnengine.GetClusterDetailsFromHAKeeper(
ctx,
hakeeper,
),
)
return nil
}
// Copyright 2021 - 2022 Matrix Origin
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package cnservice
import (
"context"
"github.com/matrixorigin/matrixone/pkg/config"
txnengine "github.com/matrixorigin/matrixone/pkg/vm/engine/txn"
)
func (s *service) initMemoryEngine(
ctx context.Context,
pu *config.ParameterUnit,
) error {
// txn client
client, err := s.getTxnClient()
if err != nil {
return err
}
pu.TxnClient = client
// hakeeper
hakeeper, err := s.getHAKeeperClient()
if err != nil {
return err
}
// engine
pu.StorageEngine = txnengine.New(
ctx,
new(txnengine.ShardToSingleStatic), //TODO use hashing shard policy
txnengine.GetClusterDetailsFromHAKeeper(
ctx,
hakeeper,
),
)
return nil
}
......@@ -28,7 +28,6 @@ import (
ie "github.com/matrixorigin/matrixone/pkg/util/internalExecutor"
"github.com/matrixorigin/matrixone/pkg/util/metric"
"github.com/matrixorigin/matrixone/pkg/util/trace"
txnengine "github.com/matrixorigin/matrixone/pkg/vm/engine/txn"
"github.com/matrixorigin/matrixone/pkg/vm/mmu/host"
"github.com/fagongzi/goetty/v2"
......@@ -39,6 +38,10 @@ import (
func NewService(cfg *Config, ctx context.Context) (Service, error) {
if err := cfg.Validate(); err != nil {
return nil, err
}
srv := &service{cfg: cfg}
srv.logger = logutil.Adjust(srv.logger)
srv.pool = &sync.Pool{
......@@ -127,26 +130,14 @@ func (s *service) initEngine(
}
case EngineDistributedTAE:
//TODO
case EngineMemory:
client, err := s.getTxnClient()
if err != nil {
if err := s.initDistributedTAE(cancelMoServerCtx, pu); err != nil {
return err
}
pu.TxnClient = client
hakeeper, err := s.getHAKeeperClient()
if err != nil {
case EngineMemory:
if err := s.initMemoryEngine(cancelMoServerCtx, pu); err != nil {
return err
}
pu.StorageEngine = txnengine.New(
ctx,
new(txnengine.ShardToSingleStatic), //TODO use hashing shard policy
txnengine.GetClusterDetailsFromHAKeeper(
ctx,
hakeeper,
),
)
default:
return fmt.Errorf("unknown engine type: %s", s.cfg.Engine.Type)
......
......@@ -17,8 +17,10 @@ package cnservice
import (
"context"
"sync"
"time"
"github.com/matrixorigin/matrixone/pkg/config"
"github.com/matrixorigin/matrixone/pkg/fileservice"
"github.com/matrixorigin/matrixone/pkg/frontend"
"github.com/matrixorigin/matrixone/pkg/logservice"
"github.com/matrixorigin/matrixone/pkg/txn/client"
......@@ -26,7 +28,6 @@ import (
"github.com/matrixorigin/matrixone/pkg/util/toml"
"github.com/matrixorigin/matrixone/pkg/common/morpc"
"github.com/matrixorigin/matrixone/pkg/fileservice"
"go.uber.org/zap"
)
......@@ -93,6 +94,19 @@ type Config struct {
RPC rpc.Config `toml:"rpc"`
}
func (c *Config) Validate() error {
if c.HAKeeper.DiscoveryTimeout.Duration == 0 {
c.HAKeeper.DiscoveryTimeout.Duration = time.Second * 30
}
if c.HAKeeper.HeatbeatDuration.Duration == 0 {
c.HAKeeper.HeatbeatDuration.Duration = time.Second
}
if c.HAKeeper.HeatbeatTimeout.Duration == 0 {
c.HAKeeper.HeatbeatTimeout.Duration = time.Millisecond * 500
}
return nil
}
type service struct {
cfg *Config
pool *sync.Pool
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment