Skip to content
Snippets Groups Projects
Unverified Commit 7c26ae49 authored by fagongzi's avatar fagongzi Committed by GitHub
Browse files

cnservice: add load and save CNStore metadata (#4864)

Manage CNStore metadata at the CN node. Provide data for CN and HAKeeper's heartbeat process

Approved by: @nnsgmsone, @daviszhen, @reusee, @lni
parent 170bf626
No related branches found
No related tags found
No related merge requests found
......@@ -12,6 +12,9 @@ service-addresses = [
"2"
]
[cn]
uuid = "cn1"
[cn.Engine]
type = "tae"
......
......@@ -10,6 +10,9 @@ service-addresses = [
"127.0.0.1:32000",
]
[cn]
uuid = "cn1"
[cn.Engine]
type = "distributed-tae"
......
......@@ -10,6 +10,9 @@ service-addresses = [
"127.0.0.1:32000",
]
[cn]
uuid = "cn1"
[cn.Engine]
type = "memory"
......
......@@ -20,6 +20,9 @@ backend = "MEM"
name = "S3"
backend = "MEM"
[cn]
uuid = "cn1"
[cn.Engine]
type = "tae"
......
......@@ -34,6 +34,7 @@ import (
"github.com/fagongzi/goetty/v2"
"github.com/matrixorigin/matrixone/pkg/common/morpc"
"github.com/matrixorigin/matrixone/pkg/logutil"
"github.com/matrixorigin/matrixone/pkg/pb/metadata"
"github.com/matrixorigin/matrixone/pkg/pb/pipeline"
)
......@@ -45,16 +46,30 @@ func NewService(
fileService fileservice.FileService,
options ...Options,
) (Service, error) {
if err := cfg.Validate(); err != nil {
return nil, err
}
// get metadata fs
fs, err := fileservice.Get[fileservice.ReplaceableFileService](fileService, "LOCAL")
if err != nil {
return nil, err
}
srv := &service{
logger: logutil.GetGlobalLogger().Named("cnservice"),
metadata: metadata.CNStore{
UUID: cfg.UUID,
Role: metadata.MustParseCNRole(cfg.Role),
},
cfg: cfg,
metadataFS: fs,
fileService: fileService,
}
srv.logger = logutil.Adjust(srv.logger)
if err := srv.initMetadata(); err != nil {
return nil, err
}
srv.responsePool = &sync.Pool{
New: func() any {
return &pipeline.Message{}
......@@ -63,8 +78,7 @@ func NewService(
pu := config.NewParameterUnit(&cfg.Frontend, nil, nil, nil, nil, nil)
cfg.Frontend.SetDefaultValues()
err := srv.initMOServer(ctx, pu)
if err != nil {
if err = srv.initMOServer(ctx, pu); err != nil {
return nil, err
}
......
// Copyright 2021 - 2022 Matrix Origin
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package cnservice
import (
"github.com/fagongzi/util/protoc"
"github.com/matrixorigin/matrixone/pkg/pb/metadata"
"github.com/matrixorigin/matrixone/pkg/util/file"
"go.uber.org/zap"
)
const (
metadataFile = "cnstore/metadata.data"
)
func (s *service) initMetadata() error {
data, err := file.ReadFile(s.metadataFS, metadataFile)
if err != nil {
return err
}
if len(data) == 0 {
s.mustUpdateMetadata()
return nil
}
v := &metadata.CNStore{}
protoc.MustUnmarshal(v, data)
if v.UUID != s.metadata.UUID {
s.logger.Fatal("BUG: disk CNStore and start CNStore not match",
zap.String("disk-store", v.UUID))
}
s.metadata = *v
s.logger.Info("local CNStore loaded",
zap.String("metadata", s.metadata.DebugString()))
return nil
}
func (s *service) mustUpdateMetadata() {
if err := file.WriteFile(s.metadataFS, metadataFile, protoc.MustMarshal(&s.metadata)); err != nil {
s.logger.Fatal("update metadata to local file failed",
zap.Error(err))
}
}
// Copyright 2021 - 2022 Matrix Origin
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package cnservice
import (
"testing"
"github.com/fagongzi/util/protoc"
"github.com/matrixorigin/matrixone/pkg/fileservice"
"github.com/matrixorigin/matrixone/pkg/logutil"
"github.com/matrixorigin/matrixone/pkg/pb/metadata"
"github.com/matrixorigin/matrixone/pkg/util/file"
"github.com/stretchr/testify/assert"
)
func TestInitMetadata(t *testing.T) {
fs, err := fileservice.NewMemoryFS("LOCAL")
assert.NoError(t, err)
s := &service{logger: logutil.GetPanicLogger(), metadataFS: fs}
s.metadata.UUID = "1"
assert.NoError(t, s.initMetadata())
v, err := file.ReadFile(s.metadataFS, metadataFile)
assert.NoError(t, err)
assert.NotEmpty(t, v)
}
func TestInitMetadataWithExistData(t *testing.T) {
fs, err := fileservice.NewMemoryFS("LOCAL")
assert.NoError(t, err)
value := metadata.CNStore{
UUID: "cn1",
}
assert.NoError(t, file.WriteFile(fs, metadataFile, protoc.MustMarshal(&value)))
s := &service{logger: logutil.GetPanicLogger(), metadataFS: fs}
s.metadata.UUID = "cn1"
assert.NoError(t, s.initMetadata())
assert.Equal(t, value, s.metadata)
}
func TestInitMetadataWithInvalidUUIDWillPanic(t *testing.T) {
defer func() {
if err := recover(); err != nil {
return
}
assert.Fail(t, "must panic")
}()
fs, err := fileservice.NewMemoryFS("LOCAL")
assert.NoError(t, err)
value := metadata.CNStore{
UUID: "cn1",
}
assert.NoError(t, file.WriteFile(fs, metadataFile, protoc.MustMarshal(&value)))
s := &service{logger: logutil.GetPanicLogger(), metadataFS: fs}
assert.NoError(t, s.initMetadata())
}
......@@ -23,6 +23,7 @@ import (
"github.com/matrixorigin/matrixone/pkg/fileservice"
"github.com/matrixorigin/matrixone/pkg/frontend"
"github.com/matrixorigin/matrixone/pkg/logservice"
"github.com/matrixorigin/matrixone/pkg/pb/metadata"
"github.com/matrixorigin/matrixone/pkg/txn/client"
"github.com/matrixorigin/matrixone/pkg/txn/rpc"
"github.com/matrixorigin/matrixone/pkg/util/toml"
......@@ -46,6 +47,11 @@ const (
// Config cn service
type Config struct {
// UUID cn store uuid
UUID string `toml:"uuid"`
// Role cn node role, [AP|TP]
Role string `toml:"role"`
// ListenAddress listening address for receiving external requests
ListenAddress string `toml:"listen-address"`
// FileService file service configuration
......@@ -93,6 +99,12 @@ type Config struct {
}
func (c *Config) Validate() error {
if c.UUID == "" {
panic("missing cn store UUID")
}
if c.Role == "" {
c.Role = metadata.CNRole_TP.String()
}
if c.HAKeeper.DiscoveryTimeout.Duration == 0 {
c.HAKeeper.DiscoveryTimeout.Duration = time.Second * 30
}
......@@ -106,6 +118,7 @@ func (c *Config) Validate() error {
}
type service struct {
metadata metadata.CNStore
cfg *Config
responsePool *sync.Pool
logger *zap.Logger
......@@ -119,5 +132,6 @@ type service struct {
_txnSender rpc.TxnSender
initTxnClientOnce sync.Once
_txnClient client.TxnClient
metadataFS fileservice.ReplaceableFileService
fileService fileservice.FileService
}
......@@ -17,6 +17,7 @@ package metadata
import (
"bytes"
"fmt"
"strings"
)
// IsEmpty return true if is a empty DNShard
......@@ -51,3 +52,16 @@ func (m DNStore) DebugString() string {
buf.WriteString("]")
return buf.String()
}
// DebugString returns debug string
func (m CNStore) DebugString() string {
return fmt.Sprintf("%s/%s", m.UUID, m.Role.String())
}
// MustParseCNRole parse CN Role from role string
func MustParseCNRole(role string) CNRole {
if v, ok := CNRole_value[strings.ToUpper(role)]; ok {
return CNRole(v)
}
panic(fmt.Sprintf("invalid CN Role %s", role))
}
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment