Skip to content
Snippets Groups Projects
Unverified Commit 46d6f05f authored by fagongzi's avatar fagongzi Committed by GitHub
Browse files

dn: add dn store (#4099)

parent cc879e7f
No related branches found
No related tags found
No related merge requests found
Showing
with 7023 additions and 5346 deletions
......@@ -124,7 +124,7 @@ require (
github.com/tklauser/numcpus v0.4.0 // indirect
github.com/yusufpapurcu/wmi v1.2.2 // indirect
go.uber.org/atomic v1.9.0 // indirect
go.uber.org/multierr v1.8.0 // indirect
go.uber.org/multierr v1.8.0
golang.org/x/crypto v0.0.0-20220214200702-86341886e292 // indirect
golang.org/x/exp v0.0.0-20220414153411-bcd21879b8fd
golang.org/x/net v0.0.0-20211216030914-fe4d6282115f // indirect
......
// Copyright 2021 - 2022 Matrix Origin
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package dnservice
import (
"fmt"
"strings"
"time"
"github.com/matrixorigin/matrixone/pkg/fileservice"
"github.com/matrixorigin/matrixone/pkg/logservice"
"github.com/matrixorigin/matrixone/pkg/util/toml"
)
var (
defaultListenAddress = "unix:///tmp/dn.sock"
defaultMaxConnections = 400
defaultSendQueueSize = 10240
defaultMaxClockOffset = time.Millisecond * 500
defaultZombieTimeout = time.Hour
defaultDiscoveryTimeout = time.Second * 30
defaultHeatbeatDuration = time.Second
defaultConnectTimeout = time.Second * 30
defaultHeatbeatTimeout = time.Millisecond * 500
defaultBufferSize = 1024
)
// Config dn store configuration
type Config struct {
// UUID dn store uuid
UUID string `toml:"uuid"`
// DataDir storage directory for local data. Include DNShard metadata and TAE data.
DataDir string `toml:"data-dir"`
// ListenAddress listening address for receiving external requests.
ListenAddress string `toml:"listen-address"`
// ServiceAddress service address for communication, if this address is not set, use
// ListenAddress as the communication address.
ServiceAddress string `toml:"service-address"`
// HAKeeper configuration
HAKeeper struct {
// HeatbeatDuration heartbeat duration to send message to hakeeper. Default is 1s
HeatbeatDuration toml.Duration `toml:"hakeeper-heartbeat-duration"`
// HeatbeatTimeout heartbeat request timeout. Default is 500ms
HeatbeatTimeout toml.Duration `toml:"hakeeper-heartbeat-timeout"`
// DiscoveryTimeout discovery HAKeeper service timeout. Default is 30s
DiscoveryTimeout toml.Duration `toml:"hakeeper-discovery-timeout"`
// ClientConfig hakeeper client configuration
ClientConfig logservice.HAKeeperClientConfig `toml:"hakeeper-client"`
}
// LogService log service configuration
LogService struct {
// ConnectTimeout timeout for connect to logservice. Default is 30s.
ConnectTimeout toml.Duration `toml:"connect-timeout"`
}
// FileService file service configuration
FileService struct {
// Backend file service backend implementation. [Mem|DISK|S3|MINIO]. Default is DISK.
Backend string `toml:"backend"`
// S3 s3 configuration
S3 fileservice.S3Config `toml:"s3"`
}
// RPC configuration
RPC struct {
// MaxConnections maximum number of connections to communicate with each DNStore.
// Default is 400.
MaxConnections int `toml:"max-connections"`
// SendQueueSize maximum capacity of the send request queue per connection, when the
// queue is full, the send request will be blocked. Default is 10240.
SendQueueSize int `toml:"send-queue-size"`
// BusyQueueSize when the length of the send queue reaches the currently set value, the
// current connection is busy with high load. When any connection with Busy status exists,
// a new connection will be created until the value set by MaxConnections is reached.
// Default is 3/4 of SendQueueSize.
BusyQueueSize int `toml:"busy-queue-size"`
// WriteBufferSize buffer size for write messages per connection. Default is 1kb
WriteBufferSize toml.ByteSize `toml:"send-buffer-size"`
// ReadBufferSize buffer size for read messages per connection. Default is 1kb
ReadBufferSize toml.ByteSize `toml:"send-buffer-size"`
}
// Txn transactions configuration
Txn struct {
// ZombieTimeout A transaction timeout, if an active transaction has not operated for more
// than the specified time, it will be considered a zombie transaction and the backend will
// roll back the transaction.
ZombieTimeout toml.Duration `toml:"zombie-timeout"`
// Storage txn storage config
Storage struct {
// Backend txn storage backend implementation. [TAE|Mem], default TAE.
Backend string `toml:"backend"`
// TAE tae storage configuration
TAE struct {
}
// Mem mem storage configuration
Mem struct {
}
}
// Clock txn clock type. [LOCAL|HLC]. Default is LOCAL.
Clock struct {
// Backend clock backend implementation. [LOCAL|HLC], default LOCAL.
Backend string `toml:"source"`
// MaxClockOffset max clock offset between two nodes. Default is 500ms
MaxClockOffset toml.Duration `toml:"max-clock-offset"`
}
}
}
func (c *Config) validate() error {
if c.UUID == "" {
return fmt.Errorf("Config.UUID not set")
}
if c.DataDir == "" {
return fmt.Errorf("Config.DataDir not set")
}
if c.ListenAddress == "" {
c.ListenAddress = defaultListenAddress
}
if c.ServiceAddress == "" {
c.ServiceAddress = c.ListenAddress
}
if c.RPC.MaxConnections == 0 {
c.RPC.MaxConnections = defaultMaxConnections
}
if c.RPC.SendQueueSize == 0 {
c.RPC.SendQueueSize = defaultSendQueueSize
}
if c.RPC.BusyQueueSize == 0 {
c.RPC.BusyQueueSize = c.RPC.SendQueueSize * 3 / 4
}
if c.RPC.WriteBufferSize == 0 {
c.RPC.WriteBufferSize = toml.ByteSize(defaultBufferSize)
}
if c.RPC.ReadBufferSize == 0 {
c.RPC.ReadBufferSize = toml.ByteSize(defaultBufferSize)
}
if c.Txn.Clock.MaxClockOffset.Duration == 0 {
c.Txn.Clock.MaxClockOffset.Duration = defaultMaxClockOffset
}
if c.Txn.Clock.Backend == "" {
c.Txn.Clock.Backend = localClockBackend
}
if _, ok := supportTxnClockBackends[strings.ToUpper(c.Txn.Clock.Backend)]; !ok {
return fmt.Errorf("%s clock backend not support", c.Txn.Storage)
}
if c.Txn.Storage.Backend == "" {
c.Txn.Storage.Backend = taeStorageBackend
}
if _, ok := supportTxnStorageBackends[strings.ToUpper(c.Txn.Storage.Backend)]; !ok {
return fmt.Errorf("%s txn storage backend not support", c.Txn.Storage)
}
if c.Txn.ZombieTimeout.Duration == 0 {
c.Txn.ZombieTimeout.Duration = defaultZombieTimeout
}
if c.HAKeeper.DiscoveryTimeout.Duration == 0 {
c.HAKeeper.DiscoveryTimeout.Duration = defaultDiscoveryTimeout
}
if c.HAKeeper.HeatbeatDuration.Duration == 0 {
c.HAKeeper.HeatbeatDuration.Duration = defaultHeatbeatDuration
}
if c.HAKeeper.HeatbeatTimeout.Duration == 0 {
c.HAKeeper.HeatbeatTimeout.Duration = defaultHeatbeatTimeout
}
if c.LogService.ConnectTimeout.Duration == 0 {
c.LogService.ConnectTimeout.Duration = defaultConnectTimeout
}
if c.FileService.Backend == "" {
c.FileService.Backend = diskFileServiceBackend
}
if _, ok := supportFileServiceBackends[strings.ToUpper(c.FileService.Backend)]; !ok {
return fmt.Errorf("%s file service backend not support", c.Txn.Storage)
}
return nil
}
func (c Config) getMetadataDir() string {
return fmt.Sprintf("%s/metadata", c.DataDir)
}
func (c Config) getDataDir() string {
return fmt.Sprintf("%s/data", c.DataDir)
}
// Copyright 2021 - 2022 Matrix Origin
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package dnservice
import (
"testing"
"github.com/matrixorigin/matrixone/pkg/util/toml"
"github.com/stretchr/testify/assert"
)
func TestValidate(t *testing.T) {
c := &Config{}
assert.Error(t, c.validate())
c.UUID = "dn1"
assert.Error(t, c.validate())
c.DataDir = "/tmp"
assert.NoError(t, c.validate())
assert.Equal(t, defaultListenAddress, c.ListenAddress)
assert.Equal(t, c.ListenAddress, c.ServiceAddress)
assert.Equal(t, defaultMaxConnections, c.RPC.MaxConnections)
assert.Equal(t, defaultSendQueueSize, c.RPC.SendQueueSize)
assert.Equal(t, toml.ByteSize(defaultBufferSize), c.RPC.WriteBufferSize)
assert.Equal(t, toml.ByteSize(defaultBufferSize), c.RPC.ReadBufferSize)
assert.Equal(t, defaultMaxClockOffset, c.Txn.Clock.MaxClockOffset.Duration)
assert.Equal(t, localClockBackend, c.Txn.Clock.Backend)
assert.Equal(t, taeStorageBackend, c.Txn.Storage.Backend)
assert.Equal(t, defaultZombieTimeout, c.Txn.ZombieTimeout.Duration)
assert.Equal(t, defaultDiscoveryTimeout, c.HAKeeper.DiscoveryTimeout.Duration)
assert.Equal(t, defaultHeatbeatDuration, c.HAKeeper.HeatbeatDuration.Duration)
assert.Equal(t, defaultHeatbeatTimeout, c.HAKeeper.HeatbeatTimeout.Duration)
assert.Equal(t, defaultConnectTimeout, c.LogService.ConnectTimeout.Duration)
}
// Copyright 2021 - 2022 Matrix Origin
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package dnservice
import (
"context"
"fmt"
"github.com/matrixorigin/matrixone/pkg/fileservice"
"github.com/matrixorigin/matrixone/pkg/logservice"
"github.com/matrixorigin/matrixone/pkg/pb/metadata"
"github.com/matrixorigin/matrixone/pkg/txn/clock"
"github.com/matrixorigin/matrixone/pkg/txn/storage"
"github.com/matrixorigin/matrixone/pkg/txn/storage/mem"
)
const (
memStorageBackend = "MEM"
taeStorageBackend = "TAE"
localClockBackend = "LOCAL"
hlcClockBackend = "HLC"
memFileServiceBackend = "MEM"
diskFileServiceBackend = "DISK"
s3FileServiceBackend = "S3"
minioFileServiceBackend = "MINIO"
)
var (
supportTxnStorageBackends = map[string]struct{}{
memStorageBackend: {},
taeStorageBackend: {},
}
supportTxnClockBackends = map[string]struct{}{
localClockBackend: {},
hlcClockBackend: {},
}
supportFileServiceBackends = map[string]struct{}{
memFileServiceBackend: {},
diskFileServiceBackend: {},
s3FileServiceBackend: {},
minioFileServiceBackend: {},
}
)
func (s *store) createClock() (clock.Clock, error) {
switch s.cfg.Txn.Clock.Backend {
case localClockBackend:
return s.newLocalClock(), nil
default:
return nil, fmt.Errorf("not implment for %s", s.cfg.Txn.Clock.Backend)
}
}
func (s *store) createTxnStorage(shard metadata.DNShard) (storage.TxnStorage, error) {
logClient, err := s.createLogServiceClient(shard)
if err != nil {
return nil, err
}
switch s.cfg.Txn.Storage.Backend {
case memStorageBackend:
return s.newMemTxnStorage(shard, logClient)
default:
return nil, fmt.Errorf("not implment for %s", s.cfg.Txn.Storage.Backend)
}
}
func (s *store) createLogServiceClient(shard metadata.DNShard) (logservice.Client, error) {
if s.options.logServiceClientFactory != nil {
return s.options.logServiceClientFactory(shard)
}
return s.newLogServiceClient(shard)
}
func (s *store) createFileService() (fileservice.FileService, error) {
switch s.cfg.FileService.Backend {
case memFileServiceBackend:
return s.newMemFileService()
case diskFileServiceBackend:
return s.newDiskFileService()
case minioFileServiceBackend:
return s.newMinioFileService()
case s3FileServiceBackend:
return s.newS3FileService()
default:
return nil, fmt.Errorf("not implment for %s", s.cfg.FileService.Backend)
}
}
func (s *store) newLogServiceClient(shard metadata.DNShard) (logservice.Client, error) {
ctx, cancel := context.WithTimeout(context.Background(), s.cfg.LogService.ConnectTimeout.Duration)
defer cancel()
return logservice.NewClient(ctx, logservice.ClientConfig{
ReadOnly: false,
LogShardID: shard.LogShardID,
DNReplicaID: shard.ReplicaID,
})
}
func (s *store) newLocalClock() clock.Clock {
return clock.NewUnixNanoHLCClockWithStopper(s.stopper, s.cfg.Txn.Clock.MaxClockOffset.Duration)
}
func (s *store) newMemTxnStorage(shard metadata.DNShard, logClient logservice.Client) (storage.TxnStorage, error) {
return mem.NewKVTxnStorage(0, logClient), nil
}
func (s *store) newMemFileService() (fileservice.FileService, error) {
return fileservice.NewMemoryFS()
}
func (s *store) newDiskFileService() (fileservice.FileService, error) {
return fileservice.NewLocalFS(s.cfg.getDataDir())
}
func (s *store) newMinioFileService() (fileservice.FileService, error) {
return fileservice.NewS3FSMinio(s.cfg.FileService.S3)
}
func (s *store) newS3FileService() (fileservice.FileService, error) {
return fileservice.NewS3FS(s.cfg.FileService.S3.Endpoint,
s.cfg.FileService.S3.Bucket,
s.cfg.FileService.S3.KeyPrefix)
}
// Copyright 2021 - 2022 Matrix Origin
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package dnservice
import (
"testing"
"github.com/matrixorigin/matrixone/pkg/common/stopper"
"github.com/matrixorigin/matrixone/pkg/logservice"
"github.com/matrixorigin/matrixone/pkg/pb/metadata"
"github.com/matrixorigin/matrixone/pkg/txn/storage/mem"
"github.com/stretchr/testify/assert"
)
func TestCreateClock(t *testing.T) {
s := &store{cfg: &Config{}, stopper: stopper.NewStopper("")}
s.cfg.Txn.Clock.Backend = localClockBackend
s.cfg.Txn.Clock.MaxClockOffset.Duration = defaultMaxClockOffset
v, err := s.createClock()
assert.NoError(t, err)
assert.NotNil(t, v)
s.cfg.Txn.Clock.Backend = "error"
v, err = s.createClock()
assert.Error(t, err)
assert.Nil(t, v)
}
func TestCreateLogServiceClient(t *testing.T) {
s := &store{cfg: &Config{}, stopper: stopper.NewStopper("")}
s.options.logServiceClientFactory = func(d metadata.DNShard) (logservice.Client, error) {
return mem.NewMemLog(), nil
}
v, err := s.createLogServiceClient(metadata.DNShard{})
assert.NoError(t, err)
assert.NotNil(t, v)
}
func TestCreateFileService(t *testing.T) {
s := &store{cfg: &Config{}, stopper: stopper.NewStopper("")}
s.cfg.FileService.Backend = memFileServiceBackend
v, err := s.createFileService()
assert.NoError(t, err)
assert.NotNil(t, v)
s.cfg.FileService.Backend = "error"
v, err = s.createFileService()
assert.Error(t, err)
assert.Nil(t, v)
}
func TestCreateTxnStorage(t *testing.T) {
s := &store{cfg: &Config{}, stopper: stopper.NewStopper("")}
s.options.logServiceClientFactory = func(d metadata.DNShard) (logservice.Client, error) {
return mem.NewMemLog(), nil
}
s.cfg.Txn.Storage.Backend = memStorageBackend
v, err := s.createTxnStorage(metadata.DNShard{})
assert.NoError(t, err)
assert.NotNil(t, v)
s.cfg.Txn.Storage.Backend = "error"
v, err = s.createTxnStorage(metadata.DNShard{})
assert.Error(t, err)
assert.Nil(t, v)
}
// Copyright 2021 - 2022 Matrix Origin
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package dnservice
import (
"context"
"github.com/matrixorigin/matrixone/pkg/logutil"
"github.com/matrixorigin/matrixone/pkg/pb/metadata"
"github.com/matrixorigin/matrixone/pkg/pb/txn"
"github.com/matrixorigin/matrixone/pkg/txn/service"
"github.com/matrixorigin/matrixone/pkg/txn/util"
"go.uber.org/zap"
)
// replica dn shard replica.
type replica struct {
logger *zap.Logger
shard metadata.DNShard
service service.TxnService
startedC chan struct{}
}
func newReplica(shard metadata.DNShard, logger *zap.Logger) *replica {
return &replica{
shard: shard,
logger: logutil.Adjust(logger).With(util.TxnDNShardField(shard)),
startedC: make(chan struct{}),
}
}
func (r *replica) start(txnService service.TxnService) error {
defer close(r.startedC)
r.service = txnService
return r.service.Start()
}
func (r *replica) close() error {
r.waitStarted()
return r.service.Close()
}
func (r *replica) handleLocalRequest(ctx context.Context, request *txn.TxnRequest, response *txn.TxnResponse) error {
r.waitStarted()
prepareResponse(request, response)
switch request.Method {
case txn.TxnMethod_GetStatus:
return r.service.GetStatus(ctx, request, response)
case txn.TxnMethod_Prepare:
return r.service.Prepare(ctx, request, response)
case txn.TxnMethod_CommitDNShard:
return r.service.CommitDNShard(ctx, request, response)
case txn.TxnMethod_RollbackDNShard:
return r.service.RollbackDNShard(ctx, request, response)
default:
panic("cannot handle local CN request")
}
}
func (r *replica) waitStarted() {
<-r.startedC
}
// Copyright 2021 - 2022 Matrix Origin
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package dnservice
import (
"context"
"testing"
"time"
"github.com/matrixorigin/matrixone/pkg/pb/txn"
"github.com/matrixorigin/matrixone/pkg/txn/service"
"github.com/stretchr/testify/assert"
)
func TestNewReplica(t *testing.T) {
r := newReplica(newTestDNShard(1, 2, 3), nil)
select {
case <-r.startedC:
assert.Fail(t, "cannot started")
default:
}
}
func TestWaitStarted(t *testing.T) {
r := newReplica(newTestDNShard(1, 2, 3), nil)
c := make(chan struct{})
go func() {
r.waitStarted()
c <- struct{}{}
}()
ts := service.NewTestTxnService(t, 1, service.NewTestSender(), service.NewTestClock(1))
defer func() {
assert.NoError(t, ts.Close())
}()
assert.NoError(t, r.start(ts))
defer func() {
assert.NoError(t, r.close())
}()
select {
case <-c:
case <-time.After(time.Minute):
assert.Fail(t, "wait started failed")
}
}
func TestHandleLocalCNRequestsWillPanic(t *testing.T) {
defer func() {
if err := recover(); err != nil {
return
}
assert.Fail(t, "must panic")
}()
r := newReplica(newTestDNShard(1, 2, 3), nil)
ts := service.NewTestTxnService(t, 1, service.NewTestSender(), service.NewTestClock(1))
defer func() {
assert.NoError(t, ts.Close())
}()
assert.NoError(t, r.start(ts))
defer func() {
assert.NoError(t, r.close())
}()
req := service.NewTestReadRequest(1, txn.TxnMeta{}, 1)
assert.NoError(t, r.handleLocalRequest(context.Background(), &req, &txn.TxnResponse{}))
}
// Copyright 2021 - 2022 Matrix Origin
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package dnservice
import (
"context"
"sync"
"time"
"github.com/fagongzi/goetty/v2"
"github.com/matrixorigin/matrixone/pkg/common/morpc"
"github.com/matrixorigin/matrixone/pkg/common/stopper"
"github.com/matrixorigin/matrixone/pkg/fileservice"
"github.com/matrixorigin/matrixone/pkg/logservice"
"github.com/matrixorigin/matrixone/pkg/logutil"
logservicepb "github.com/matrixorigin/matrixone/pkg/pb/logservice"
"github.com/matrixorigin/matrixone/pkg/pb/metadata"
"github.com/matrixorigin/matrixone/pkg/pb/txn"
"github.com/matrixorigin/matrixone/pkg/txn/clock"
"github.com/matrixorigin/matrixone/pkg/txn/rpc"
"github.com/matrixorigin/matrixone/pkg/txn/service"
"github.com/matrixorigin/matrixone/pkg/txn/util"
"go.uber.org/multierr"
"go.uber.org/zap"
)
// WithLogger set logger
func WithLogger(logger *zap.Logger) Option {
return func(s *store) {
s.logger = logger
}
}
// WithConfigAdjust set adjust config func
func WithConfigAdjust(adjustConfigFunc func(c *Config)) Option {
return func(s *store) {
s.options.adjustConfigFunc = adjustConfigFunc
}
}
// WithRequestFilter set filtering txn.TxnRequest sent to other DNShard
func WithRequestFilter(filter func(*txn.TxnRequest) bool) Option {
return func(s *store) {
s.options.requestFilter = filter
}
}
// WithMetadataFSFactory set metadata file service factory
func WithMetadataFSFactory(factory func() fileservice.ReplaceableFileService) Option {
return func(s *store) {
s.options.metadataFSFactory = factory
}
}
// WithHAKeeperClientFactory set hakeeper client factory
func WithHAKeeperClientFactory(factory func() (logservice.DNHAKeeperClient, error)) Option {
return func(s *store) {
s.options.hakeekerClientFactory = factory
}
}
// WithLogServiceClientFactory set log service client factory
func WithLogServiceClientFactory(factory func(metadata.DNShard) (logservice.Client, error)) Option {
return func(s *store) {
s.options.logServiceClientFactory = factory
}
}
type store struct {
cfg *Config
logger *zap.Logger
clock clock.Clock
sender rpc.TxnSender
server rpc.TxnServer
hakeeperClient logservice.DNHAKeeperClient
metadataFS fileservice.ReplaceableFileService
dataFS fileservice.FileService
replicas *sync.Map
stopper *stopper.Stopper
options struct {
logServiceClientFactory func(metadata.DNShard) (logservice.Client, error)
hakeekerClientFactory func() (logservice.DNHAKeeperClient, error)
metadataFSFactory func() fileservice.ReplaceableFileService
requestFilter func(*txn.TxnRequest) bool
adjustConfigFunc func(c *Config)
}
mu struct {
sync.RWMutex
metadata metadata.DNStore
}
}
// NewService create DN Service
func NewService(cfg *Config, opts ...Option) (Service, error) {
if err := cfg.validate(); err != nil {
return nil, err
}
s := &store{
cfg: cfg,
}
for _, opt := range opts {
opt(s)
}
s.logger = logutil.Adjust(s.logger).With(zap.String("dn-store", cfg.UUID))
s.replicas = &sync.Map{}
s.stopper = stopper.NewStopper("dn-store", stopper.WithLogger(s.logger))
s.mu.metadata = metadata.DNStore{UUID: cfg.UUID}
if s.options.adjustConfigFunc != nil {
s.options.adjustConfigFunc(s.cfg)
}
if err := s.initClocker(); err != nil {
return nil, err
}
if err := s.initHAKeeperClient(); err != nil {
return nil, err
}
if err := s.initTxnSender(); err != nil {
return nil, err
}
if err := s.initTxnServer(); err != nil {
return nil, err
}
if err := s.initFileService(); err != nil {
return nil, err
}
if err := s.initMetadata(); err != nil {
return nil, err
}
return s, nil
}
func (s *store) Start() error {
if err := s.startDNShards(); err != nil {
return err
}
if err := s.server.Start(); err != nil {
return err
}
return s.stopper.RunTask(s.heartbeatTask)
}
func (s *store) Close() error {
var err error
if e := s.hakeeperClient.Close(); e != nil {
err = multierr.Append(e, err)
}
if e := s.sender.Close(); e != nil {
err = multierr.Append(e, err)
}
if e := s.server.Close(); e != nil {
err = multierr.Append(e, err)
}
s.replicas.Range(func(_, value any) bool {
r := value.(*replica)
if e := r.close(); e != nil {
err = multierr.Append(e, err)
}
return true
})
s.stopper.Stop()
return err
}
func (s *store) StartDNReplica(shard metadata.DNShard) error {
return s.createReplica(shard)
}
func (s *store) CloseDNReplica(shard metadata.DNShard) error {
return s.removeReplica(shard.ShardID)
}
func (s *store) startDNShards() error {
s.mu.Lock()
defer s.mu.Unlock()
for _, shard := range s.mu.metadata.Shards {
if err := s.createReplica(shard); err != nil {
return err
}
}
return nil
}
func (s *store) getDNShardInfo() []logservicepb.DNShardInfo {
var shards []logservicepb.DNShardInfo
s.replicas.Range(func(_, value any) bool {
r := value.(*replica)
shards = append(shards, logservicepb.DNShardInfo{
ShardID: r.shard.ShardID,
ReplicaID: r.shard.ReplicaID,
})
return true
})
return shards
}
func (s *store) heartbeatTask(ctx context.Context) {
ticker := time.NewTicker(s.cfg.HAKeeper.HeatbeatDuration.Duration)
defer ticker.Stop()
s.logger.Info("DNShard heartbeat started")
for {
select {
case <-ctx.Done():
s.logger.Info("DNShard heartbeat stopped")
return
case <-ticker.C:
ctx, cancel := context.WithTimeout(context.Background(), s.cfg.HAKeeper.HeatbeatTimeout.Duration)
commands, err := s.hakeeperClient.SendDNHeartbeat(ctx, logservicepb.DNStoreHeartbeat{
UUID: s.cfg.UUID,
ServiceAddress: s.cfg.ServiceAddress,
Shards: s.getDNShardInfo(),
})
cancel()
if err != nil {
s.logger.Error("send DNShard heartbeat request failed",
zap.Error(err))
continue
}
for _, cmd := range commands.Commands {
if cmd.ServiceType != logservicepb.DnService {
s.logger.Fatal("receive invalid schedule command",
zap.String("type", cmd.ServiceType.String()))
}
if cmd.ConfigChange != nil {
var err error
switch cmd.ConfigChange.ChangeType {
case logservicepb.AddReplica, logservicepb.StartReplica:
err = s.createReplica(metadata.DNShard{
DNShardRecord: metadata.DNShardRecord{
ShardID: cmd.ConfigChange.Replica.ShardID,
LogShardID: cmd.ConfigChange.Replica.LogShardID,
},
ReplicaID: cmd.ConfigChange.Replica.ReplicaID,
Address: s.cfg.ServiceAddress,
})
case logservicepb.RemoveReplica, logservicepb.StopReplica:
err = s.removeReplica(cmd.ConfigChange.Replica.ShardID)
}
if err != nil {
s.logger.Error("handle schedule command failed",
zap.String("command", cmd.String()),
zap.Error(err))
}
}
}
}
}
}
func (s *store) createReplica(shard metadata.DNShard) error {
r := newReplica(shard, s.logger.With(util.TxnDNShardField(shard)))
_, ok := s.replicas.LoadOrStore(shard.ShardID, r)
if ok {
return nil
}
storage, err := s.createTxnStorage(shard)
if err != nil {
return err
}
err = s.stopper.RunTask(func(ctx context.Context) {
select {
case <-ctx.Done():
return
default:
err := r.start(service.NewTxnService(r.logger,
shard,
storage,
s.sender,
s.clock,
s.cfg.Txn.ZombieTimeout.Duration))
if err != nil {
r.logger.Fatal("start DNShard failed",
zap.Error(err))
}
}
})
if err != nil {
return err
}
s.addDNShard(shard)
return nil
}
func (s *store) removeReplica(dnShardID uint64) error {
if r := s.getReplica(dnShardID); r != nil {
err := r.close()
s.replicas.Delete(dnShardID)
s.removeDNShard(dnShardID)
return err
}
return nil
}
func (s *store) getReplica(id uint64) *replica {
v, ok := s.replicas.Load(id)
if !ok {
return nil
}
return v.(*replica)
}
func (s *store) initTxnSender() error {
sender, err := rpc.NewSender(s.logger,
rpc.WithSenderBackendOptions(s.getBackendOptions()...),
rpc.WithSenderClientOptions(s.getClientOptions()...),
rpc.WithSenderLocalDispatch(s.dispatchLocalRequest))
if err != nil {
return err
}
s.sender = sender
return nil
}
func (s *store) initTxnServer() error {
server, err := rpc.NewTxnServer(s.cfg.ListenAddress, s.logger)
if err != nil {
return err
}
s.server = server
s.registerRPCHandlers()
return nil
}
func (s *store) initClocker() error {
v, err := s.createClock()
if err != nil {
return err
}
s.clock = v
return nil
}
func (s *store) initHAKeeperClient() error {
if s.options.hakeekerClientFactory != nil {
client, err := s.options.hakeekerClientFactory()
if err != nil {
return err
}
s.hakeeperClient = client
return nil
}
ctx, cancel := context.WithTimeout(context.Background(), s.cfg.HAKeeper.DiscoveryTimeout.Duration)
defer cancel()
client, err := logservice.NewDNHAKeeperClient(ctx, s.cfg.HAKeeper.ClientConfig)
if err != nil {
return err
}
s.hakeeperClient = client
return nil
}
func (s *store) initFileService() error {
if s.options.metadataFSFactory != nil {
s.metadataFS = s.options.metadataFSFactory()
} else {
fs, err := fileservice.NewLocalFS(s.cfg.getMetadataDir())
if err != nil {
return err
}
s.metadataFS = fs
}
fs, err := s.createFileService()
if err != nil {
return err
}
s.dataFS = fs
return nil
}
func (s *store) getBackendOptions() []morpc.BackendOption {
return []morpc.BackendOption{
morpc.WithBackendLogger(s.logger),
morpc.WithBackendFilter(func(m morpc.Message) bool {
return s.options.requestFilter == nil || s.options.requestFilter(m.(*txn.TxnRequest))
}),
morpc.WithBackendBusyBufferSize(s.cfg.RPC.BusyQueueSize),
morpc.WithBackendBufferSize(s.cfg.RPC.SendQueueSize),
morpc.WithBackendGoettyOptions(goetty.WithBufSize(int(s.cfg.RPC.ReadBufferSize), int(s.cfg.RPC.WriteBufferSize))),
}
}
func (s *store) getClientOptions() []morpc.ClientOption {
return []morpc.ClientOption{
morpc.WithClientLogger(s.logger),
morpc.WithClientMaxBackendPerHost(s.cfg.RPC.MaxConnections),
}
}
// Copyright 2021 - 2022 Matrix Origin
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package dnservice
import (
"context"
"time"
"github.com/fagongzi/util/protoc"
"github.com/matrixorigin/matrixone/pkg/fileservice"
"github.com/matrixorigin/matrixone/pkg/pb/metadata"
"go.uber.org/zap"
)
const (
metadataFile = "metadata.data"
)
func (s *store) initMetadata() error {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
vec := &fileservice.IOVector{
FilePath: metadataFile,
Entries: []fileservice.IOEntry{
{
Offset: 0,
Size: -1,
},
},
}
if err := s.metadataFS.Read(ctx, vec); err != nil {
if err == fileservice.ErrFileNotFound {
return nil
}
return err
}
if len(vec.Entries[0].Data) == 0 {
return nil
}
v := &metadata.DNStore{}
protoc.MustUnmarshal(v, vec.Entries[0].Data)
if v.UUID != s.mu.metadata.UUID {
s.logger.Fatal("BUG: disk DNStore and start DNStore not match",
zap.String("disk-store", v.UUID))
}
s.mu.metadata = *v
return nil
}
func (s *store) addDNShard(shard metadata.DNShard) {
s.mu.Lock()
defer s.mu.Unlock()
for _, dn := range s.mu.metadata.Shards {
if dn.ShardID == shard.ShardID {
return
}
}
s.mu.metadata.Shards = append(s.mu.metadata.Shards, shard)
s.mustUpdateMetadataLocked()
}
func (s *store) removeDNShard(id uint64) {
s.mu.Lock()
defer s.mu.Unlock()
var newShards []metadata.DNShard
for _, dn := range s.mu.metadata.Shards {
if dn.ShardID != id {
newShards = append(newShards, dn)
}
}
s.mu.metadata.Shards = newShards
s.mustUpdateMetadataLocked()
}
func (s *store) mustUpdateMetadataLocked() {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
vec := fileservice.IOVector{
FilePath: metadataFile,
Entries: []fileservice.IOEntry{
{
Offset: 0,
Size: s.mu.metadata.Size(),
Data: protoc.MustMarshal(&s.mu.metadata),
},
},
}
if err := s.metadataFS.Replace(ctx, vec); err != nil {
s.logger.Fatal("update metadata to local file failed",
zap.Error(err))
}
}
// Copyright 2021 - 2022 Matrix Origin
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package dnservice
import (
"context"
"testing"
"github.com/fagongzi/util/protoc"
"github.com/matrixorigin/matrixone/pkg/fileservice"
"github.com/matrixorigin/matrixone/pkg/logutil"
"github.com/matrixorigin/matrixone/pkg/pb/metadata"
"github.com/stretchr/testify/assert"
)
func TestInitMetadata(t *testing.T) {
fs, err := fileservice.NewMemoryFS()
assert.NoError(t, err)
s := &store{logger: logutil.GetPanicLogger(), metadataFS: fs}
assert.NoError(t, s.initMetadata())
}
func TestInitMetadataWithExistData(t *testing.T) {
fs, err := fileservice.NewMemoryFS()
assert.NoError(t, err)
value := metadata.DNStore{
UUID: "dn1",
Shards: []metadata.DNShard{
{
DNShardRecord: metadata.DNShardRecord{ShardID: 1},
},
{
DNShardRecord: metadata.DNShardRecord{ShardID: 2},
},
},
}
assert.NoError(t, fs.Write(context.Background(), fileservice.IOVector{
FilePath: metadataFile,
Entries: []fileservice.IOEntry{
{
Offset: 0,
Size: value.Size(),
Data: protoc.MustMarshal(&value),
},
},
}))
s := &store{logger: logutil.GetPanicLogger(), metadataFS: fs}
s.mu.metadata.UUID = "dn1"
assert.NoError(t, s.initMetadata())
assert.Equal(t, value, s.mu.metadata)
}
func TestInitMetadataWithInvalidUUIDWillPanic(t *testing.T) {
defer func() {
if err := recover(); err != nil {
return
}
assert.Fail(t, "must panic")
}()
fs, err := fileservice.NewMemoryFS()
assert.NoError(t, err)
value := metadata.DNStore{
UUID: "dn1",
}
assert.NoError(t, fs.Write(context.Background(), fileservice.IOVector{
FilePath: metadataFile,
Entries: []fileservice.IOEntry{
{
Offset: 0,
Size: value.Size(),
Data: protoc.MustMarshal(&value),
},
},
}))
s := &store{logger: logutil.GetPanicLogger(), metadataFS: fs}
assert.NoError(t, s.initMetadata())
}
// Copyright 2021 - 2022 Matrix Origin
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package dnservice
import (
"context"
"github.com/matrixorigin/matrixone/pkg/pb/metadata"
"github.com/matrixorigin/matrixone/pkg/pb/txn"
"github.com/matrixorigin/matrixone/pkg/txn/rpc"
)
func (s *store) registerRPCHandlers() {
// request from CN node
s.server.RegisterMethodHandler(txn.TxnMethod_Read, s.handleRead)
s.server.RegisterMethodHandler(txn.TxnMethod_Write, s.handleWrite)
s.server.RegisterMethodHandler(txn.TxnMethod_Commit, s.handleCommit)
s.server.RegisterMethodHandler(txn.TxnMethod_Rollback, s.handleRollback)
// request from other DN node
s.server.RegisterMethodHandler(txn.TxnMethod_Prepare, s.handlePrepare)
s.server.RegisterMethodHandler(txn.TxnMethod_CommitDNShard, s.handleCommitDNShard)
s.server.RegisterMethodHandler(txn.TxnMethod_RollbackDNShard, s.handleRollbackDNShard)
s.server.RegisterMethodHandler(txn.TxnMethod_GetStatus, s.handleGetStatus)
}
func (s *store) dispatchLocalRequest(shard metadata.DNShard) rpc.TxnRequestHandleFunc {
r := s.getReplica(shard.ShardID)
if r == nil {
return nil
}
return r.handleLocalRequest
}
func (s *store) handleRead(ctx context.Context, request *txn.TxnRequest, response *txn.TxnResponse) error {
shard := request.GetTargetDN()
r := s.getReplica(shard.ShardID)
if r == nil {
return nil
}
r.waitStarted()
prepareResponse(request, response)
return r.service.Read(ctx, request, response)
}
func (s *store) handleWrite(ctx context.Context, request *txn.TxnRequest, response *txn.TxnResponse) error {
shard := request.GetTargetDN()
r := s.getReplica(shard.ShardID)
if r == nil {
return nil
}
r.waitStarted()
prepareResponse(request, response)
return r.service.Write(ctx, request, response)
}
func (s *store) handleCommit(ctx context.Context, request *txn.TxnRequest, response *txn.TxnResponse) error {
shard := request.GetTargetDN()
r := s.getReplica(shard.ShardID)
if r == nil {
return nil
}
r.waitStarted()
prepareResponse(request, response)
return r.service.Commit(ctx, request, response)
}
func (s *store) handleRollback(ctx context.Context, request *txn.TxnRequest, response *txn.TxnResponse) error {
shard := request.GetTargetDN()
r := s.getReplica(shard.ShardID)
if r == nil {
return nil
}
r.waitStarted()
prepareResponse(request, response)
return r.service.Rollback(ctx, request, response)
}
func (s *store) handlePrepare(ctx context.Context, request *txn.TxnRequest, response *txn.TxnResponse) error {
shard := request.GetTargetDN()
r := s.getReplica(shard.ShardID)
if r == nil {
return nil
}
r.waitStarted()
prepareResponse(request, response)
return r.service.Prepare(ctx, request, response)
}
func (s *store) handleCommitDNShard(ctx context.Context, request *txn.TxnRequest, response *txn.TxnResponse) error {
shard := request.GetTargetDN()
r := s.getReplica(shard.ShardID)
if r == nil {
return nil
}
r.waitStarted()
prepareResponse(request, response)
return r.service.CommitDNShard(ctx, request, response)
}
func (s *store) handleRollbackDNShard(ctx context.Context, request *txn.TxnRequest, response *txn.TxnResponse) error {
shard := request.GetTargetDN()
r := s.getReplica(shard.ShardID)
if r == nil {
return nil
}
r.waitStarted()
prepareResponse(request, response)
return r.service.RollbackDNShard(ctx, request, response)
}
func (s *store) handleGetStatus(ctx context.Context, request *txn.TxnRequest, response *txn.TxnResponse) error {
shard := request.GetTargetDN()
r := s.getReplica(shard.ShardID)
if r == nil {
return nil
}
r.waitStarted()
prepareResponse(request, response)
return r.service.GetStatus(ctx, request, response)
}
func prepareResponse(request *txn.TxnRequest, response *txn.TxnResponse) {
response.Method = request.Method
response.Flag = request.Flag
response.RequestID = request.RequestID
}
// Copyright 2021 - 2022 Matrix Origin
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package dnservice
import (
"context"
"testing"
"time"
"github.com/matrixorigin/matrixone/pkg/pb/txn"
"github.com/matrixorigin/matrixone/pkg/txn/service"
"github.com/stretchr/testify/assert"
)
func TestHandleRead(t *testing.T) {
runDNStoreTest(t, func(s *store) {
shard := newTestDNShard(1, 2, 3)
assert.NoError(t, s.StartDNReplica(shard))
req := service.NewTestReadRequest(1, service.NewTestTxn(1, 1, 1), 1)
req.CNRequest.Target.ReplicaID = 2
assert.NoError(t, s.handleRead(context.Background(), &req, &txn.TxnResponse{}))
})
}
func TestHandleWrite(t *testing.T) {
runDNStoreTest(t, func(s *store) {
shard := newTestDNShard(1, 2, 3)
assert.NoError(t, s.StartDNReplica(shard))
req := service.NewTestWriteRequest(1, service.NewTestTxn(1, 1, 1), 1)
req.CNRequest.Target.ReplicaID = 2
assert.NoError(t, s.handleWrite(context.Background(), &req, &txn.TxnResponse{}))
})
}
func TestHandleCommit(t *testing.T) {
runDNStoreTest(t, func(s *store) {
shard := newTestDNShard(1, 2, 3)
assert.NoError(t, s.StartDNReplica(shard))
req := service.NewTestCommitRequest(service.NewTestTxn(1, 1, 1))
req.Txn.DNShards[0].ReplicaID = 2
assert.NoError(t, s.handleCommit(context.Background(), &req, &txn.TxnResponse{}))
})
}
func TestHandleRollback(t *testing.T) {
runDNStoreTest(t, func(s *store) {
shard := newTestDNShard(1, 2, 3)
assert.NoError(t, s.StartDNReplica(shard))
req := service.NewTestRollbackRequest(service.NewTestTxn(1, 1, 1))
req.Txn.DNShards[0].ReplicaID = 2
assert.NoError(t, s.handleRollback(context.Background(), &req, &txn.TxnResponse{}))
})
}
func TestHandlePrepare(t *testing.T) {
runDNStoreTest(t, func(s *store) {
shard := newTestDNShard(1, 2, 3)
assert.NoError(t, s.StartDNReplica(shard))
req := service.NewTestPrepareRequest(service.NewTestTxn(1, 1, 1), 1)
req.PrepareRequest.DNShard.ReplicaID = 2
assert.NoError(t, s.handlePrepare(context.Background(), &req, &txn.TxnResponse{}))
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
_, err := s.sender.Send(ctx, []txn.TxnRequest{req})
assert.NoError(t, err)
})
}
func TestHandleGetStatus(t *testing.T) {
runDNStoreTest(t, func(s *store) {
shard := newTestDNShard(1, 2, 3)
assert.NoError(t, s.StartDNReplica(shard))
req := service.NewTestGetStatusRequest(service.NewTestTxn(1, 1, 1), 1)
req.GetStatusRequest.DNShard.ReplicaID = 2
assert.NoError(t, s.handleGetStatus(context.Background(), &req, &txn.TxnResponse{}))
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
_, err := s.sender.Send(ctx, []txn.TxnRequest{req})
assert.NoError(t, err)
})
}
func TestHandleCommitDNShard(t *testing.T) {
runDNStoreTest(t, func(s *store) {
shard := newTestDNShard(1, 2, 3)
assert.NoError(t, s.StartDNReplica(shard))
req := service.NewTestCommitShardRequest(service.NewTestTxn(1, 1, 1))
req.CommitDNShardRequest.DNShard.ReplicaID = 2
assert.NoError(t, s.handleCommitDNShard(context.Background(), &req, &txn.TxnResponse{}))
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
_, err := s.sender.Send(ctx, []txn.TxnRequest{req})
assert.NoError(t, err)
})
}
func TestHandleRollbackDNShard(t *testing.T) {
runDNStoreTest(t, func(s *store) {
shard := newTestDNShard(1, 2, 3)
assert.NoError(t, s.StartDNReplica(shard))
req := service.NewTestRollbackShardRequest(service.NewTestTxn(1, 1, 1))
req.RollbackDNShardRequest.DNShard.ReplicaID = 2
assert.NoError(t, s.handleRollbackDNShard(context.Background(), &req, &txn.TxnResponse{}))
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
_, err := s.sender.Send(ctx, []txn.TxnRequest{req})
assert.NoError(t, err)
})
}
// Copyright 2021 - 2022 Matrix Origin
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package dnservice
import (
"context"
"math"
"os"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/matrixorigin/matrixone/pkg/logservice"
logservicepb "github.com/matrixorigin/matrixone/pkg/pb/logservice"
"github.com/matrixorigin/matrixone/pkg/pb/metadata"
"github.com/matrixorigin/matrixone/pkg/txn/service"
"github.com/matrixorigin/matrixone/pkg/txn/storage/mem"
"github.com/stretchr/testify/assert"
)
var (
testDNStoreAddr = "unix:///tmp/test-dnstore.sock"
testDataDir = "/tmp/mo/dn"
)
func TestNewAndStartAndCloseService(t *testing.T) {
runDNStoreTest(t, func(s *store) {
thc := s.hakeeperClient.(*testHAKeeperClient)
for {
if v := thc.getCount(); v > 0 {
return
}
}
})
}
func TestAddReplica(t *testing.T) {
runDNStoreTest(t, func(s *store) {
thc := s.hakeeperClient.(*testHAKeeperClient)
thc.setCommandBatch(logservicepb.CommandBatch{
Commands: []logservicepb.ScheduleCommand{
{
ServiceType: logservicepb.DnService,
ConfigChange: &logservicepb.ConfigChange{
ChangeType: logservicepb.AddReplica,
Replica: logservicepb.Replica{
LogShardID: 3,
ReplicaID: 2,
ShardID: 1,
},
},
},
},
})
for {
r := s.getReplica(1)
if r != nil {
r.waitStarted()
assert.Equal(t, newTestDNShard(1, 2, 3), r.shard)
return
}
time.Sleep(time.Millisecond * 10)
}
})
}
func TestStartReplica(t *testing.T) {
runDNStoreTest(t, func(s *store) {
assert.NoError(t, s.StartDNReplica(newTestDNShard(1, 2, 3)))
r := s.getReplica(1)
r.waitStarted()
assert.Equal(t, newTestDNShard(1, 2, 3), r.shard)
})
}
func TestRemoveReplica(t *testing.T) {
runDNStoreTest(t, func(s *store) {
assert.NoError(t, s.StartDNReplica(newTestDNShard(1, 2, 3)))
r := s.getReplica(1)
r.waitStarted()
thc := s.hakeeperClient.(*testHAKeeperClient)
thc.setCommandBatch(logservicepb.CommandBatch{
Commands: []logservicepb.ScheduleCommand{
{
ServiceType: logservicepb.DnService,
ConfigChange: &logservicepb.ConfigChange{
ChangeType: logservicepb.RemoveReplica,
Replica: logservicepb.Replica{
LogShardID: 3,
ReplicaID: 2,
ShardID: 1,
},
},
},
},
})
for {
r := s.getReplica(1)
if r == nil {
return
}
time.Sleep(time.Millisecond * 10)
}
})
}
func TestCloseReplica(t *testing.T) {
runDNStoreTest(t, func(s *store) {
shard := newTestDNShard(1, 2, 3)
assert.NoError(t, s.StartDNReplica(shard))
r := s.getReplica(1)
r.waitStarted()
assert.Equal(t, shard, r.shard)
assert.NoError(t, s.CloseDNReplica(shard))
assert.Nil(t, s.getReplica(1))
})
}
func runDNStoreTest(t *testing.T, testFn func(*store), opts ...Option) {
thc := newTestHAKeeperClient()
opts = append(opts, WithHAKeeperClientFactory(func() (logservice.DNHAKeeperClient, error) {
return thc, nil
}),
WithLogServiceClientFactory(func(d metadata.DNShard) (logservice.Client, error) {
return mem.NewMemLog(), nil
}),
WithConfigAdjust(func(c *Config) {
c.HAKeeper.HeatbeatDuration.Duration = time.Millisecond * 10
c.Txn.Storage.Backend = memStorageBackend
}))
s := newTestStore(t, "u1", opts...)
defer func() {
assert.NoError(t, s.Close())
}()
assert.NoError(t, s.Start())
testFn(s)
}
func newTestStore(t *testing.T, uuid string, options ...Option) *store {
assert.NoError(t, os.RemoveAll(testDataDir))
assert.NoError(t, os.RemoveAll(testDNStoreAddr[7:]))
assert.NoError(t, os.MkdirAll(testDataDir, 0755))
c := &Config{
UUID: uuid,
DataDir: testDataDir,
ListenAddress: testDNStoreAddr,
}
c.Txn.Clock.MaxClockOffset.Duration = time.Duration(math.MaxInt64)
s, err := NewService(c, options...)
assert.NoError(t, err)
return s.(*store)
}
func newTestDNShard(shardID, replicaID, logShardID uint64) metadata.DNShard {
dnShard := service.NewTestDNShard(shardID)
dnShard.ReplicaID = replicaID
dnShard.LogShardID = logShardID
dnShard.Address = testDNStoreAddr
return dnShard
}
type testHAKeeperClient struct {
mu struct {
sync.RWMutex
commandBatch logservicepb.CommandBatch
}
atomic struct {
count uint64
}
}
func newTestHAKeeperClient() *testHAKeeperClient {
return &testHAKeeperClient{}
}
func (thc *testHAKeeperClient) setCommandBatch(commandBatch logservicepb.CommandBatch) {
thc.mu.Lock()
defer thc.mu.Unlock()
thc.mu.commandBatch = commandBatch
}
func (thc *testHAKeeperClient) getCount() uint64 {
return atomic.LoadUint64(&thc.atomic.count)
}
func (thc *testHAKeeperClient) Close() error {
return nil
}
func (thc *testHAKeeperClient) SendDNHeartbeat(ctx context.Context, hb logservicepb.DNStoreHeartbeat) (logservicepb.CommandBatch, error) {
atomic.AddUint64(&thc.atomic.count, 1)
thc.mu.RLock()
defer thc.mu.RUnlock()
return thc.mu.commandBatch, nil
}
// Copyright 2021 - 2022 Matrix Origin
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package dnservice
import (
"github.com/matrixorigin/matrixone/pkg/pb/metadata"
)
// Option store option
type Option func(*store)
// Service DN Service
type Service interface {
// Start start dn store. Start all DNShards currently managed by the Store and listen
// to and process requests from CN and other DNs.
Start() error
// Close close dn store
Close() error
// StartDNReplica start the DNShard replica
StartDNReplica(metadata.DNShard) error
// CloseDNReplica close the DNShard replica.
CloseDNReplica(shard metadata.DNShard) error
}
This diff is collapsed.
......@@ -22,7 +22,7 @@ var _ = math.Inf
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.GoGoProtoPackageIsVersion2 // please upgrade the proto package
const _ = proto.GoGoProtoPackageIsVersion3 // please upgrade the proto package
// DNShardRecord is DN shard metadata describing what is a DN shard. It
// is internally used by HAKeeper to maintain how many DNs available in
......@@ -51,7 +51,7 @@ func (m *DNShardRecord) XXX_Marshal(b []byte, deterministic bool) ([]byte, error
return xxx_messageInfo_DNShardRecord.Marshal(b, m, deterministic)
} else {
b = b[:cap(b)]
n, err := m.MarshalTo(b)
n, err := m.MarshalToSizedBuffer(b)
if err != nil {
return nil, err
}
......@@ -113,7 +113,7 @@ func (m *DNShard) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_DNShard.Marshal(b, m, deterministic)
} else {
b = b[:cap(b)]
n, err := m.MarshalTo(b)
n, err := m.MarshalToSizedBuffer(b)
if err != nil {
return nil, err
}
......@@ -175,7 +175,7 @@ func (m *LogShardRecord) XXX_Marshal(b []byte, deterministic bool) ([]byte, erro
return xxx_messageInfo_LogShardRecord.Marshal(b, m, deterministic)
} else {
b = b[:cap(b)]
n, err := m.MarshalTo(b)
n, err := m.MarshalToSizedBuffer(b)
if err != nil {
return nil, err
}
......@@ -215,40 +215,101 @@ func (m *LogShardRecord) GetName() string {
return ""
}
// DNStore DN store metadata
type DNStore struct {
// UUID DNStore uuid id
UUID string `protobuf:"bytes,1,opt,name=UUID,proto3" json:"UUID,omitempty"`
// Shards DNShards
Shards []DNShard `protobuf:"bytes,2,rep,name=Shards,proto3" json:"Shards"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *DNStore) Reset() { *m = DNStore{} }
func (m *DNStore) String() string { return proto.CompactTextString(m) }
func (*DNStore) ProtoMessage() {}
func (*DNStore) Descriptor() ([]byte, []int) {
return fileDescriptor_56d9f74966f40d04, []int{3}
}
func (m *DNStore) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
}
func (m *DNStore) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
if deterministic {
return xxx_messageInfo_DNStore.Marshal(b, m, deterministic)
} else {
b = b[:cap(b)]
n, err := m.MarshalToSizedBuffer(b)
if err != nil {
return nil, err
}
return b[:n], nil
}
}
func (m *DNStore) XXX_Merge(src proto.Message) {
xxx_messageInfo_DNStore.Merge(m, src)
}
func (m *DNStore) XXX_Size() int {
return m.Size()
}
func (m *DNStore) XXX_DiscardUnknown() {
xxx_messageInfo_DNStore.DiscardUnknown(m)
}
var xxx_messageInfo_DNStore proto.InternalMessageInfo
func (m *DNStore) GetUUID() string {
if m != nil {
return m.UUID
}
return ""
}
func (m *DNStore) GetShards() []DNShard {
if m != nil {
return m.Shards
}
return nil
}
func init() {
proto.RegisterType((*DNShardRecord)(nil), "metadata.DNShardRecord")
proto.RegisterType((*DNShard)(nil), "metadata.DNShard")
proto.RegisterType((*LogShardRecord)(nil), "metadata.LogShardRecord")
proto.RegisterType((*DNStore)(nil), "metadata.DNStore")
}
func init() { proto.RegisterFile("metadata.proto", fileDescriptor_56d9f74966f40d04) }
var fileDescriptor_56d9f74966f40d04 = []byte{
// 279 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0xcb, 0x4d, 0x2d, 0x49,
0x4c, 0x49, 0x2c, 0x49, 0xd4, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0xe2, 0x80, 0xf1, 0xa5, 0x74,
0xd3, 0x33, 0x4b, 0x32, 0x4a, 0x93, 0xf4, 0x92, 0xf3, 0x73, 0xf5, 0xd3, 0xf3, 0xd3, 0xf3, 0xf5,
0xc1, 0x0a, 0x92, 0x4a, 0xd3, 0xc0, 0x3c, 0x30, 0x07, 0xcc, 0x82, 0x68, 0x54, 0xf2, 0xe4, 0xe2,
0x75, 0xf1, 0x0b, 0xce, 0x48, 0x2c, 0x4a, 0x09, 0x4a, 0x4d, 0xce, 0x2f, 0x4a, 0x11, 0x92, 0xe0,
0x62, 0x07, 0x73, 0x3d, 0x5d, 0x24, 0x18, 0x15, 0x18, 0x35, 0x58, 0x82, 0x60, 0x5c, 0x21, 0x39,
0x2e, 0x2e, 0x9f, 0xfc, 0x74, 0x98, 0x24, 0x13, 0x58, 0x12, 0x49, 0x44, 0xa9, 0x8b, 0x91, 0x8b,
0x1d, 0x6a, 0x96, 0x90, 0x3b, 0x9a, 0xb1, 0x60, 0xb3, 0xb8, 0x8d, 0xc4, 0xf5, 0xe0, 0xee, 0x46,
0x91, 0x76, 0xe2, 0x38, 0x71, 0x4f, 0x9e, 0xe1, 0xc2, 0x3d, 0x79, 0xc6, 0x20, 0x34, 0xe7, 0xc8,
0x70, 0x71, 0x06, 0xa5, 0x16, 0xe4, 0x64, 0x26, 0x27, 0xc2, 0xed, 0x44, 0x08, 0x80, 0x1c, 0xeb,
0x98, 0x92, 0x52, 0x94, 0x5a, 0x5c, 0x2c, 0xc1, 0xac, 0xc0, 0xa8, 0xc1, 0x19, 0x04, 0xe3, 0x2a,
0x65, 0x71, 0xf1, 0xc1, 0x9c, 0x46, 0xd0, 0x63, 0x5a, 0x5c, 0x02, 0x7e, 0xa5, 0xb9, 0x49, 0xa9,
0x45, 0xfe, 0x69, 0x50, 0xa3, 0x8b, 0xa1, 0x56, 0x61, 0x88, 0x0b, 0x09, 0x71, 0xb1, 0xf8, 0x25,
0xe6, 0xa6, 0x42, 0xad, 0x03, 0xb3, 0x9d, 0xec, 0x4f, 0x3c, 0x92, 0x63, 0xbc, 0xf0, 0x48, 0x8e,
0xf1, 0xc1, 0x23, 0x39, 0xc6, 0x28, 0x43, 0xa4, 0x08, 0xc8, 0x4d, 0x2c, 0x29, 0xca, 0xac, 0xc8,
0x2f, 0xca, 0x4c, 0xcf, 0xcc, 0x83, 0x71, 0xf2, 0x52, 0xf5, 0x0b, 0xb2, 0xd3, 0xf5, 0x0b, 0x92,
0xf4, 0x61, 0x61, 0x91, 0xc4, 0x06, 0x8e, 0x0b, 0x63, 0x40, 0x00, 0x00, 0x00, 0xff, 0xff, 0x56,
0xc5, 0x4d, 0x2e, 0xd6, 0x01, 0x00, 0x00,
// 317 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x84, 0x51, 0xcd, 0x4a, 0x03, 0x31,
0x10, 0x36, 0xb6, 0xf4, 0x27, 0xc5, 0xa2, 0xb9, 0x58, 0x44, 0xb6, 0x65, 0x4f, 0x45, 0xb0, 0xc1,
0xfa, 0x00, 0x62, 0x29, 0x48, 0x41, 0x56, 0x88, 0xf4, 0xe2, 0x2d, 0xdb, 0x4d, 0xd3, 0x55, 0xb7,
0x59, 0xb2, 0x59, 0xf0, 0x19, 0x7c, 0xb2, 0x1e, 0xfb, 0x04, 0x45, 0xf6, 0x49, 0x64, 0xc7, 0xa4,
0xd6, 0xf6, 0xe0, 0x6d, 0xbe, 0xf9, 0x26, 0xdf, 0xf7, 0x4d, 0x06, 0xb7, 0x13, 0x61, 0x78, 0xc4,
0x0d, 0x1f, 0xa4, 0x5a, 0x19, 0x45, 0x1a, 0x0e, 0x5f, 0x5c, 0xcb, 0xd8, 0x2c, 0xf2, 0x70, 0x30,
0x53, 0x09, 0x95, 0x4a, 0x2a, 0x0a, 0x03, 0x61, 0x3e, 0x07, 0x04, 0x00, 0xaa, 0x9f, 0x87, 0xfe,
0x04, 0x9f, 0x8c, 0x83, 0xe7, 0x05, 0xd7, 0x11, 0x13, 0x33, 0xa5, 0x23, 0xd2, 0xc1, 0x75, 0x80,
0x93, 0x71, 0x07, 0xf5, 0x50, 0xbf, 0xca, 0x1c, 0x24, 0x1e, 0xc6, 0x8f, 0x4a, 0x3a, 0xf2, 0x18,
0xc8, 0x9d, 0x8e, 0xff, 0x89, 0x70, 0xdd, 0x6a, 0x91, 0x87, 0x3d, 0x59, 0xd0, 0x6a, 0x0d, 0xcf,
0x07, 0xdb, 0xdc, 0x7f, 0xe8, 0x51, 0x63, 0xb5, 0xe9, 0x1e, 0xad, 0x37, 0x5d, 0xc4, 0xf6, 0xe2,
0x5c, 0xe2, 0x26, 0x13, 0xe9, 0x7b, 0x3c, 0xe3, 0x5b, 0xcf, 0xdf, 0x46, 0x19, 0xf6, 0x3e, 0x8a,
0xb4, 0xc8, 0xb2, 0x4e, 0xa5, 0x87, 0xfa, 0x4d, 0xe6, 0xa0, 0xff, 0x8a, 0xdb, 0x2e, 0xda, 0xbf,
0x8b, 0x5d, 0xe1, 0xd3, 0x20, 0x4f, 0x42, 0xa1, 0x9f, 0xe6, 0x56, 0x3a, 0xb3, 0x56, 0x07, 0x7d,
0x42, 0x70, 0x35, 0xe0, 0x89, 0xb0, 0x76, 0x50, 0xfb, 0x01, 0xec, 0x6d, 0x94, 0x16, 0x25, 0x3d,
0x9d, 0x5a, 0x87, 0x26, 0x83, 0x9a, 0x50, 0x5c, 0x03, 0xa7, 0x52, 0xb4, 0xd2, 0x6f, 0x0d, 0xcf,
0x0e, 0x3e, 0x61, 0x54, 0x2d, 0xd7, 0x67, 0x76, 0x6c, 0x74, 0xb7, 0x2a, 0x3c, 0xb4, 0x2e, 0x3c,
0xf4, 0x55, 0x78, 0xe8, 0xe5, 0x66, 0xe7, 0xa0, 0x09, 0x37, 0x3a, 0xfe, 0x50, 0x3a, 0x96, 0xf1,
0xd2, 0x81, 0xa5, 0xa0, 0xe9, 0x9b, 0xa4, 0x69, 0x48, 0x9d, 0x6c, 0x58, 0x83, 0xdb, 0xde, 0x7e,
0x07, 0x00, 0x00, 0xff, 0xff, 0x5b, 0xd0, 0xb2, 0x72, 0x26, 0x02, 0x00, 0x00,
}
func (m *DNShardRecord) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalTo(dAtA)
n, err := m.MarshalToSizedBuffer(dAtA[:size])
if err != nil {
return nil, err
}
......@@ -256,30 +317,36 @@ func (m *DNShardRecord) Marshal() (dAtA []byte, err error) {
}
func (m *DNShardRecord) MarshalTo(dAtA []byte) (int, error) {
var i int
size := m.Size()
return m.MarshalToSizedBuffer(dAtA[:size])
}
func (m *DNShardRecord) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i := len(dAtA)
_ = i
var l int
_ = l
if m.ShardID != 0 {
dAtA[i] = 0x8
i++
i = encodeVarintMetadata(dAtA, i, uint64(m.ShardID))
if m.XXX_unrecognized != nil {
i -= len(m.XXX_unrecognized)
copy(dAtA[i:], m.XXX_unrecognized)
}
if m.LogShardID != 0 {
dAtA[i] = 0x10
i++
i = encodeVarintMetadata(dAtA, i, uint64(m.LogShardID))
i--
dAtA[i] = 0x10
}
if m.XXX_unrecognized != nil {
i += copy(dAtA[i:], m.XXX_unrecognized)
if m.ShardID != 0 {
i = encodeVarintMetadata(dAtA, i, uint64(m.ShardID))
i--
dAtA[i] = 0x8
}
return i, nil
return len(dAtA) - i, nil
}
func (m *DNShard) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalTo(dAtA)
n, err := m.MarshalToSizedBuffer(dAtA[:size])
if err != nil {
return nil, err
}
......@@ -287,39 +354,48 @@ func (m *DNShard) Marshal() (dAtA []byte, err error) {
}
func (m *DNShard) MarshalTo(dAtA []byte) (int, error) {
var i int
size := m.Size()
return m.MarshalToSizedBuffer(dAtA[:size])
}
func (m *DNShard) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i := len(dAtA)
_ = i
var l int
_ = l
dAtA[i] = 0xa
i++
i = encodeVarintMetadata(dAtA, i, uint64(m.DNShardRecord.Size()))
n1, err1 := m.DNShardRecord.MarshalTo(dAtA[i:])
if err1 != nil {
return 0, err1
}
i += n1
if m.ReplicaID != 0 {
dAtA[i] = 0x10
i++
i = encodeVarintMetadata(dAtA, i, uint64(m.ReplicaID))
if m.XXX_unrecognized != nil {
i -= len(m.XXX_unrecognized)
copy(dAtA[i:], m.XXX_unrecognized)
}
if len(m.Address) > 0 {
dAtA[i] = 0x1a
i++
i -= len(m.Address)
copy(dAtA[i:], m.Address)
i = encodeVarintMetadata(dAtA, i, uint64(len(m.Address)))
i += copy(dAtA[i:], m.Address)
i--
dAtA[i] = 0x1a
}
if m.XXX_unrecognized != nil {
i += copy(dAtA[i:], m.XXX_unrecognized)
if m.ReplicaID != 0 {
i = encodeVarintMetadata(dAtA, i, uint64(m.ReplicaID))
i--
dAtA[i] = 0x10
}
return i, nil
{
size, err := m.DNShardRecord.MarshalToSizedBuffer(dAtA[:i])
if err != nil {
return 0, err
}
i -= size
i = encodeVarintMetadata(dAtA, i, uint64(size))
}
i--
dAtA[i] = 0xa
return len(dAtA) - i, nil
}
func (m *LogShardRecord) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalTo(dAtA)
n, err := m.MarshalToSizedBuffer(dAtA[:size])
if err != nil {
return nil, err
}
......@@ -327,40 +403,97 @@ func (m *LogShardRecord) Marshal() (dAtA []byte, err error) {
}
func (m *LogShardRecord) MarshalTo(dAtA []byte) (int, error) {
var i int
size := m.Size()
return m.MarshalToSizedBuffer(dAtA[:size])
}
func (m *LogShardRecord) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i := len(dAtA)
_ = i
var l int
_ = l
if m.ShardID != 0 {
dAtA[i] = 0x8
i++
i = encodeVarintMetadata(dAtA, i, uint64(m.ShardID))
if m.XXX_unrecognized != nil {
i -= len(m.XXX_unrecognized)
copy(dAtA[i:], m.XXX_unrecognized)
}
if len(m.Name) > 0 {
i -= len(m.Name)
copy(dAtA[i:], m.Name)
i = encodeVarintMetadata(dAtA, i, uint64(len(m.Name)))
i--
dAtA[i] = 0x1a
}
if m.NumberOfReplicas != 0 {
dAtA[i] = 0x10
i++
i = encodeVarintMetadata(dAtA, i, uint64(m.NumberOfReplicas))
i--
dAtA[i] = 0x10
}
if len(m.Name) > 0 {
dAtA[i] = 0x1a
i++
i = encodeVarintMetadata(dAtA, i, uint64(len(m.Name)))
i += copy(dAtA[i:], m.Name)
if m.ShardID != 0 {
i = encodeVarintMetadata(dAtA, i, uint64(m.ShardID))
i--
dAtA[i] = 0x8
}
return len(dAtA) - i, nil
}
func (m *DNStore) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalToSizedBuffer(dAtA[:size])
if err != nil {
return nil, err
}
return dAtA[:n], nil
}
func (m *DNStore) MarshalTo(dAtA []byte) (int, error) {
size := m.Size()
return m.MarshalToSizedBuffer(dAtA[:size])
}
func (m *DNStore) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i := len(dAtA)
_ = i
var l int
_ = l
if m.XXX_unrecognized != nil {
i += copy(dAtA[i:], m.XXX_unrecognized)
i -= len(m.XXX_unrecognized)
copy(dAtA[i:], m.XXX_unrecognized)
}
if len(m.Shards) > 0 {
for iNdEx := len(m.Shards) - 1; iNdEx >= 0; iNdEx-- {
{
size, err := m.Shards[iNdEx].MarshalToSizedBuffer(dAtA[:i])
if err != nil {
return 0, err
}
i -= size
i = encodeVarintMetadata(dAtA, i, uint64(size))
}
i--
dAtA[i] = 0x12
}
}
if len(m.UUID) > 0 {
i -= len(m.UUID)
copy(dAtA[i:], m.UUID)
i = encodeVarintMetadata(dAtA, i, uint64(len(m.UUID)))
i--
dAtA[i] = 0xa
}
return i, nil
return len(dAtA) - i, nil
}
func encodeVarintMetadata(dAtA []byte, offset int, v uint64) int {
offset -= sovMetadata(v)
base := offset
for v >= 1<<7 {
dAtA[offset] = uint8(v&0x7f | 0x80)
v >>= 7
offset++
}
dAtA[offset] = uint8(v)
return offset + 1
return base
}
func (m *DNShardRecord) Size() (n int) {
if m == nil {
......@@ -423,6 +556,28 @@ func (m *LogShardRecord) Size() (n int) {
return n
}
func (m *DNStore) Size() (n int) {
if m == nil {
return 0
}
var l int
_ = l
l = len(m.UUID)
if l > 0 {
n += 1 + l + sovMetadata(uint64(l))
}
if len(m.Shards) > 0 {
for _, e := range m.Shards {
l = e.Size()
n += 1 + l + sovMetadata(uint64(l))
}
}
if m.XXX_unrecognized != nil {
n += len(m.XXX_unrecognized)
}
return n
}
func sovMetadata(x uint64) (n int) {
return (math_bits.Len64(x|1) + 6) / 7
}
......@@ -502,10 +657,7 @@ func (m *DNShardRecord) Unmarshal(dAtA []byte) error {
if err != nil {
return err
}
if skippy < 0 {
return ErrInvalidLengthMetadata
}
if (iNdEx + skippy) < 0 {
if (skippy < 0) || (iNdEx+skippy) < 0 {
return ErrInvalidLengthMetadata
}
if (iNdEx + skippy) > l {
......@@ -640,10 +792,7 @@ func (m *DNShard) Unmarshal(dAtA []byte) error {
if err != nil {
return err
}
if skippy < 0 {
return ErrInvalidLengthMetadata
}
if (iNdEx + skippy) < 0 {
if (skippy < 0) || (iNdEx+skippy) < 0 {
return ErrInvalidLengthMetadata
}
if (iNdEx + skippy) > l {
......@@ -764,10 +913,124 @@ func (m *LogShardRecord) Unmarshal(dAtA []byte) error {
if err != nil {
return err
}
if skippy < 0 {
if (skippy < 0) || (iNdEx+skippy) < 0 {
return ErrInvalidLengthMetadata
}
if (iNdEx + skippy) < 0 {
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...)
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
func (m *DNStore) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowMetadata
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: DNStore: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: DNStore: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field UUID", wireType)
}
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowMetadata
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
stringLen |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
intStringLen := int(stringLen)
if intStringLen < 0 {
return ErrInvalidLengthMetadata
}
postIndex := iNdEx + intStringLen
if postIndex < 0 {
return ErrInvalidLengthMetadata
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.UUID = string(dAtA[iNdEx:postIndex])
iNdEx = postIndex
case 2:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Shards", wireType)
}
var msglen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowMetadata
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
msglen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if msglen < 0 {
return ErrInvalidLengthMetadata
}
postIndex := iNdEx + msglen
if postIndex < 0 {
return ErrInvalidLengthMetadata
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Shards = append(m.Shards, DNShard{})
if err := m.Shards[len(m.Shards)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
return err
}
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipMetadata(dAtA[iNdEx:])
if err != nil {
return err
}
if (skippy < 0) || (iNdEx+skippy) < 0 {
return ErrInvalidLengthMetadata
}
if (iNdEx + skippy) > l {
......@@ -786,6 +1049,7 @@ func (m *LogShardRecord) Unmarshal(dAtA []byte) error {
func skipMetadata(dAtA []byte) (n int, err error) {
l := len(dAtA)
iNdEx := 0
depth := 0
for iNdEx < l {
var wire uint64
for shift := uint(0); ; shift += 7 {
......@@ -817,10 +1081,8 @@ func skipMetadata(dAtA []byte) (n int, err error) {
break
}
}
return iNdEx, nil
case 1:
iNdEx += 8
return iNdEx, nil
case 2:
var length int
for shift := uint(0); ; shift += 7 {
......@@ -841,55 +1103,30 @@ func skipMetadata(dAtA []byte) (n int, err error) {
return 0, ErrInvalidLengthMetadata
}
iNdEx += length
if iNdEx < 0 {
return 0, ErrInvalidLengthMetadata
}
return iNdEx, nil
case 3:
for {
var innerWire uint64
var start int = iNdEx
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return 0, ErrIntOverflowMetadata
}
if iNdEx >= l {
return 0, io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
innerWire |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
innerWireType := int(innerWire & 0x7)
if innerWireType == 4 {
break
}
next, err := skipMetadata(dAtA[start:])
if err != nil {
return 0, err
}
iNdEx = start + next
if iNdEx < 0 {
return 0, ErrInvalidLengthMetadata
}
}
return iNdEx, nil
depth++
case 4:
return iNdEx, nil
if depth == 0 {
return 0, ErrUnexpectedEndOfGroupMetadata
}
depth--
case 5:
iNdEx += 4
return iNdEx, nil
default:
return 0, fmt.Errorf("proto: illegal wireType %d", wireType)
}
if iNdEx < 0 {
return 0, ErrInvalidLengthMetadata
}
if depth == 0 {
return iNdEx, nil
}
}
panic("unreachable")
return 0, io.ErrUnexpectedEOF
}
var (
ErrInvalidLengthMetadata = fmt.Errorf("proto: negative length found during unmarshaling")
ErrIntOverflowMetadata = fmt.Errorf("proto: integer overflow")
ErrInvalidLengthMetadata = fmt.Errorf("proto: negative length found during unmarshaling")
ErrIntOverflowMetadata = fmt.Errorf("proto: integer overflow")
ErrUnexpectedEndOfGroupMetadata = fmt.Errorf("proto: unexpected end of group")
)
......@@ -22,7 +22,7 @@ var _ = math.Inf
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.GoGoProtoPackageIsVersion2 // please upgrade the proto package
const _ = proto.GoGoProtoPackageIsVersion3 // please upgrade the proto package
type MetricType int32
......@@ -74,7 +74,7 @@ func (m *LabelPair) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_LabelPair.Marshal(b, m, deterministic)
} else {
b = b[:cap(b)]
n, err := m.MarshalTo(b)
n, err := m.MarshalToSizedBuffer(b)
if err != nil {
return nil, err
}
......@@ -128,7 +128,7 @@ func (m *Gauge) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_Gauge.Marshal(b, m, deterministic)
} else {
b = b[:cap(b)]
n, err := m.MarshalTo(b)
n, err := m.MarshalToSizedBuffer(b)
if err != nil {
return nil, err
}
......@@ -175,7 +175,7 @@ func (m *Counter) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_Counter.Marshal(b, m, deterministic)
} else {
b = b[:cap(b)]
n, err := m.MarshalTo(b)
n, err := m.MarshalToSizedBuffer(b)
if err != nil {
return nil, err
}
......@@ -223,7 +223,7 @@ func (m *Sample) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_Sample.Marshal(b, m, deterministic)
} else {
b = b[:cap(b)]
n, err := m.MarshalTo(b)
n, err := m.MarshalToSizedBuffer(b)
if err != nil {
return nil, err
}
......@@ -277,7 +277,7 @@ func (m *RawHist) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_RawHist.Marshal(b, m, deterministic)
} else {
b = b[:cap(b)]
n, err := m.MarshalTo(b)
n, err := m.MarshalToSizedBuffer(b)
if err != nil {
return nil, err
}
......@@ -328,7 +328,7 @@ func (m *Metric) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_Metric.Marshal(b, m, deterministic)
} else {
b = b[:cap(b)]
n, err := m.MarshalTo(b)
n, err := m.MarshalToSizedBuffer(b)
if err != nil {
return nil, err
}
......@@ -408,7 +408,7 @@ func (m *MetricFamily) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
return xxx_messageInfo_MetricFamily.Marshal(b, m, deterministic)
} else {
b = b[:cap(b)]
n, err := m.MarshalTo(b)
n, err := m.MarshalToSizedBuffer(b)
if err != nil {
return nil, err
}
......@@ -517,7 +517,7 @@ var fileDescriptor_da41641f55bff5df = []byte{
func (m *LabelPair) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalTo(dAtA)
n, err := m.MarshalToSizedBuffer(dAtA[:size])
if err != nil {
return nil, err
}
......@@ -525,32 +525,40 @@ func (m *LabelPair) Marshal() (dAtA []byte, err error) {
}
func (m *LabelPair) MarshalTo(dAtA []byte) (int, error) {
var i int
size := m.Size()
return m.MarshalToSizedBuffer(dAtA[:size])
}
func (m *LabelPair) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i := len(dAtA)
_ = i
var l int
_ = l
if len(m.Name) > 0 {
dAtA[i] = 0xa
i++
i = encodeVarintMetric(dAtA, i, uint64(len(m.Name)))
i += copy(dAtA[i:], m.Name)
if m.XXX_unrecognized != nil {
i -= len(m.XXX_unrecognized)
copy(dAtA[i:], m.XXX_unrecognized)
}
if len(m.Value) > 0 {
dAtA[i] = 0x12
i++
i -= len(m.Value)
copy(dAtA[i:], m.Value)
i = encodeVarintMetric(dAtA, i, uint64(len(m.Value)))
i += copy(dAtA[i:], m.Value)
i--
dAtA[i] = 0x12
}
if m.XXX_unrecognized != nil {
i += copy(dAtA[i:], m.XXX_unrecognized)
if len(m.Name) > 0 {
i -= len(m.Name)
copy(dAtA[i:], m.Name)
i = encodeVarintMetric(dAtA, i, uint64(len(m.Name)))
i--
dAtA[i] = 0xa
}
return i, nil
return len(dAtA) - i, nil
}
func (m *Gauge) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalTo(dAtA)
n, err := m.MarshalToSizedBuffer(dAtA[:size])
if err != nil {
return nil, err
}
......@@ -558,26 +566,32 @@ func (m *Gauge) Marshal() (dAtA []byte, err error) {
}
func (m *Gauge) MarshalTo(dAtA []byte) (int, error) {
var i int
size := m.Size()
return m.MarshalToSizedBuffer(dAtA[:size])
}
func (m *Gauge) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i := len(dAtA)
_ = i
var l int
_ = l
if m.XXX_unrecognized != nil {
i -= len(m.XXX_unrecognized)
copy(dAtA[i:], m.XXX_unrecognized)
}
if m.Value != 0 {
dAtA[i] = 0x9
i++
i -= 8
encoding_binary.LittleEndian.PutUint64(dAtA[i:], uint64(math.Float64bits(float64(m.Value))))
i += 8
}
if m.XXX_unrecognized != nil {
i += copy(dAtA[i:], m.XXX_unrecognized)
i--
dAtA[i] = 0x9
}
return i, nil
return len(dAtA) - i, nil
}
func (m *Counter) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalTo(dAtA)
n, err := m.MarshalToSizedBuffer(dAtA[:size])
if err != nil {
return nil, err
}
......@@ -585,26 +599,32 @@ func (m *Counter) Marshal() (dAtA []byte, err error) {
}
func (m *Counter) MarshalTo(dAtA []byte) (int, error) {
var i int
size := m.Size()
return m.MarshalToSizedBuffer(dAtA[:size])
}
func (m *Counter) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i := len(dAtA)
_ = i
var l int
_ = l
if m.XXX_unrecognized != nil {
i -= len(m.XXX_unrecognized)
copy(dAtA[i:], m.XXX_unrecognized)
}
if m.Value != 0 {
dAtA[i] = 0x9
i++
i -= 8
encoding_binary.LittleEndian.PutUint64(dAtA[i:], uint64(math.Float64bits(float64(m.Value))))
i += 8
}
if m.XXX_unrecognized != nil {
i += copy(dAtA[i:], m.XXX_unrecognized)
i--
dAtA[i] = 0x9
}
return i, nil
return len(dAtA) - i, nil
}
func (m *Sample) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalTo(dAtA)
n, err := m.MarshalToSizedBuffer(dAtA[:size])
if err != nil {
return nil, err
}
......@@ -612,31 +632,37 @@ func (m *Sample) Marshal() (dAtA []byte, err error) {
}
func (m *Sample) MarshalTo(dAtA []byte) (int, error) {
var i int
size := m.Size()
return m.MarshalToSizedBuffer(dAtA[:size])
}
func (m *Sample) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i := len(dAtA)
_ = i
var l int
_ = l
if m.Datetime != 0 {
dAtA[i] = 0x8
i++
i = encodeVarintMetric(dAtA, i, uint64(m.Datetime))
if m.XXX_unrecognized != nil {
i -= len(m.XXX_unrecognized)
copy(dAtA[i:], m.XXX_unrecognized)
}
if m.Value != 0 {
dAtA[i] = 0x11
i++
i -= 8
encoding_binary.LittleEndian.PutUint64(dAtA[i:], uint64(math.Float64bits(float64(m.Value))))
i += 8
i--
dAtA[i] = 0x11
}
if m.XXX_unrecognized != nil {
i += copy(dAtA[i:], m.XXX_unrecognized)
if m.Datetime != 0 {
i = encodeVarintMetric(dAtA, i, uint64(m.Datetime))
i--
dAtA[i] = 0x8
}
return i, nil
return len(dAtA) - i, nil
}
func (m *RawHist) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalTo(dAtA)
n, err := m.MarshalToSizedBuffer(dAtA[:size])
if err != nil {
return nil, err
}
......@@ -644,32 +670,40 @@ func (m *RawHist) Marshal() (dAtA []byte, err error) {
}
func (m *RawHist) MarshalTo(dAtA []byte) (int, error) {
var i int
size := m.Size()
return m.MarshalToSizedBuffer(dAtA[:size])
}
func (m *RawHist) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i := len(dAtA)
_ = i
var l int
_ = l
if m.XXX_unrecognized != nil {
i -= len(m.XXX_unrecognized)
copy(dAtA[i:], m.XXX_unrecognized)
}
if len(m.Samples) > 0 {
for _, msg := range m.Samples {
dAtA[i] = 0xa
i++
i = encodeVarintMetric(dAtA, i, uint64(msg.Size()))
n, err := msg.MarshalTo(dAtA[i:])
if err != nil {
return 0, err
for iNdEx := len(m.Samples) - 1; iNdEx >= 0; iNdEx-- {
{
size, err := m.Samples[iNdEx].MarshalToSizedBuffer(dAtA[:i])
if err != nil {
return 0, err
}
i -= size
i = encodeVarintMetric(dAtA, i, uint64(size))
}
i += n
i--
dAtA[i] = 0xa
}
}
if m.XXX_unrecognized != nil {
i += copy(dAtA[i:], m.XXX_unrecognized)
}
return i, nil
return len(dAtA) - i, nil
}
func (m *Metric) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalTo(dAtA)
n, err := m.MarshalToSizedBuffer(dAtA[:size])
if err != nil {
return nil, err
}
......@@ -677,67 +711,81 @@ func (m *Metric) Marshal() (dAtA []byte, err error) {
}
func (m *Metric) MarshalTo(dAtA []byte) (int, error) {
var i int
size := m.Size()
return m.MarshalToSizedBuffer(dAtA[:size])
}
func (m *Metric) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i := len(dAtA)
_ = i
var l int
_ = l
if len(m.Label) > 0 {
for _, msg := range m.Label {
dAtA[i] = 0xa
i++
i = encodeVarintMetric(dAtA, i, uint64(msg.Size()))
n, err := msg.MarshalTo(dAtA[i:])
if m.XXX_unrecognized != nil {
i -= len(m.XXX_unrecognized)
copy(dAtA[i:], m.XXX_unrecognized)
}
if m.Collecttime != 0 {
i = encodeVarintMetric(dAtA, i, uint64(m.Collecttime))
i--
dAtA[i] = 0x28
}
if m.RawHist != nil {
{
size, err := m.RawHist.MarshalToSizedBuffer(dAtA[:i])
if err != nil {
return 0, err
}
i += n
i -= size
i = encodeVarintMetric(dAtA, i, uint64(size))
}
}
if m.Gauge != nil {
dAtA[i] = 0x12
i++
i = encodeVarintMetric(dAtA, i, uint64(m.Gauge.Size()))
n1, err1 := m.Gauge.MarshalTo(dAtA[i:])
if err1 != nil {
return 0, err1
}
i += n1
i--
dAtA[i] = 0x22
}
if m.Counter != nil {
dAtA[i] = 0x1a
i++
i = encodeVarintMetric(dAtA, i, uint64(m.Counter.Size()))
n2, err2 := m.Counter.MarshalTo(dAtA[i:])
if err2 != nil {
return 0, err2
{
size, err := m.Counter.MarshalToSizedBuffer(dAtA[:i])
if err != nil {
return 0, err
}
i -= size
i = encodeVarintMetric(dAtA, i, uint64(size))
}
i += n2
i--
dAtA[i] = 0x1a
}
if m.RawHist != nil {
dAtA[i] = 0x22
i++
i = encodeVarintMetric(dAtA, i, uint64(m.RawHist.Size()))
n3, err3 := m.RawHist.MarshalTo(dAtA[i:])
if err3 != nil {
return 0, err3
if m.Gauge != nil {
{
size, err := m.Gauge.MarshalToSizedBuffer(dAtA[:i])
if err != nil {
return 0, err
}
i -= size
i = encodeVarintMetric(dAtA, i, uint64(size))
}
i += n3
}
if m.Collecttime != 0 {
dAtA[i] = 0x28
i++
i = encodeVarintMetric(dAtA, i, uint64(m.Collecttime))
i--
dAtA[i] = 0x12
}
if m.XXX_unrecognized != nil {
i += copy(dAtA[i:], m.XXX_unrecognized)
if len(m.Label) > 0 {
for iNdEx := len(m.Label) - 1; iNdEx >= 0; iNdEx-- {
{
size, err := m.Label[iNdEx].MarshalToSizedBuffer(dAtA[:i])
if err != nil {
return 0, err
}
i -= size
i = encodeVarintMetric(dAtA, i, uint64(size))
}
i--
dAtA[i] = 0xa
}
}
return i, nil
return len(dAtA) - i, nil
}
func (m *MetricFamily) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalTo(dAtA)
n, err := m.MarshalToSizedBuffer(dAtA[:size])
if err != nil {
return nil, err
}
......@@ -745,64 +793,77 @@ func (m *MetricFamily) Marshal() (dAtA []byte, err error) {
}
func (m *MetricFamily) MarshalTo(dAtA []byte) (int, error) {
var i int
size := m.Size()
return m.MarshalToSizedBuffer(dAtA[:size])
}
func (m *MetricFamily) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i := len(dAtA)
_ = i
var l int
_ = l
if len(m.Name) > 0 {
dAtA[i] = 0xa
i++
i = encodeVarintMetric(dAtA, i, uint64(len(m.Name)))
i += copy(dAtA[i:], m.Name)
if m.XXX_unrecognized != nil {
i -= len(m.XXX_unrecognized)
copy(dAtA[i:], m.XXX_unrecognized)
}
if len(m.Help) > 0 {
dAtA[i] = 0x12
i++
i = encodeVarintMetric(dAtA, i, uint64(len(m.Help)))
i += copy(dAtA[i:], m.Help)
if len(m.Role) > 0 {
i -= len(m.Role)
copy(dAtA[i:], m.Role)
i = encodeVarintMetric(dAtA, i, uint64(len(m.Role)))
i--
dAtA[i] = 0x32
}
if m.Type != 0 {
dAtA[i] = 0x18
i++
i = encodeVarintMetric(dAtA, i, uint64(m.Type))
if m.Node != 0 {
i = encodeVarintMetric(dAtA, i, uint64(m.Node))
i--
dAtA[i] = 0x28
}
if len(m.Metric) > 0 {
for _, msg := range m.Metric {
dAtA[i] = 0x22
i++
i = encodeVarintMetric(dAtA, i, uint64(msg.Size()))
n, err := msg.MarshalTo(dAtA[i:])
if err != nil {
return 0, err
for iNdEx := len(m.Metric) - 1; iNdEx >= 0; iNdEx-- {
{
size, err := m.Metric[iNdEx].MarshalToSizedBuffer(dAtA[:i])
if err != nil {
return 0, err
}
i -= size
i = encodeVarintMetric(dAtA, i, uint64(size))
}
i += n
i--
dAtA[i] = 0x22
}
}
if m.Node != 0 {
dAtA[i] = 0x28
i++
i = encodeVarintMetric(dAtA, i, uint64(m.Node))
if m.Type != 0 {
i = encodeVarintMetric(dAtA, i, uint64(m.Type))
i--
dAtA[i] = 0x18
}
if len(m.Role) > 0 {
dAtA[i] = 0x32
i++
i = encodeVarintMetric(dAtA, i, uint64(len(m.Role)))
i += copy(dAtA[i:], m.Role)
if len(m.Help) > 0 {
i -= len(m.Help)
copy(dAtA[i:], m.Help)
i = encodeVarintMetric(dAtA, i, uint64(len(m.Help)))
i--
dAtA[i] = 0x12
}
if m.XXX_unrecognized != nil {
i += copy(dAtA[i:], m.XXX_unrecognized)
if len(m.Name) > 0 {
i -= len(m.Name)
copy(dAtA[i:], m.Name)
i = encodeVarintMetric(dAtA, i, uint64(len(m.Name)))
i--
dAtA[i] = 0xa
}
return i, nil
return len(dAtA) - i, nil
}
func encodeVarintMetric(dAtA []byte, offset int, v uint64) int {
offset -= sovMetric(v)
base := offset
for v >= 1<<7 {
dAtA[offset] = uint8(v&0x7f | 0x80)
v >>= 7
offset++
}
dAtA[offset] = uint8(v)
return offset + 1
return base
}
func (m *LabelPair) Size() (n int) {
if m == nil {
......@@ -1064,10 +1125,7 @@ func (m *LabelPair) Unmarshal(dAtA []byte) error {
if err != nil {
return err
}
if skippy < 0 {
return ErrInvalidLengthMetric
}
if (iNdEx + skippy) < 0 {
if (skippy < 0) || (iNdEx+skippy) < 0 {
return ErrInvalidLengthMetric
}
if (iNdEx + skippy) > l {
......@@ -1129,10 +1187,7 @@ func (m *Gauge) Unmarshal(dAtA []byte) error {
if err != nil {
return err
}
if skippy < 0 {
return ErrInvalidLengthMetric
}
if (iNdEx + skippy) < 0 {
if (skippy < 0) || (iNdEx+skippy) < 0 {
return ErrInvalidLengthMetric
}
if (iNdEx + skippy) > l {
......@@ -1194,10 +1249,7 @@ func (m *Counter) Unmarshal(dAtA []byte) error {
if err != nil {
return err
}
if skippy < 0 {
return ErrInvalidLengthMetric
}
if (iNdEx + skippy) < 0 {
if (skippy < 0) || (iNdEx+skippy) < 0 {
return ErrInvalidLengthMetric
}
if (iNdEx + skippy) > l {
......@@ -1278,10 +1330,7 @@ func (m *Sample) Unmarshal(dAtA []byte) error {
if err != nil {
return err
}
if skippy < 0 {
return ErrInvalidLengthMetric
}
if (iNdEx + skippy) < 0 {
if (skippy < 0) || (iNdEx+skippy) < 0 {
return ErrInvalidLengthMetric
}
if (iNdEx + skippy) > l {
......@@ -1366,10 +1415,7 @@ func (m *RawHist) Unmarshal(dAtA []byte) error {
if err != nil {
return err
}
if skippy < 0 {
return ErrInvalidLengthMetric
}
if (iNdEx + skippy) < 0 {
if (skippy < 0) || (iNdEx+skippy) < 0 {
return ErrInvalidLengthMetric
}
if (iNdEx + skippy) > l {
......@@ -1581,10 +1627,7 @@ func (m *Metric) Unmarshal(dAtA []byte) error {
if err != nil {
return err
}
if skippy < 0 {
return ErrInvalidLengthMetric
}
if (iNdEx + skippy) < 0 {
if (skippy < 0) || (iNdEx+skippy) < 0 {
return ErrInvalidLengthMetric
}
if (iNdEx + skippy) > l {
......@@ -1803,10 +1846,7 @@ func (m *MetricFamily) Unmarshal(dAtA []byte) error {
if err != nil {
return err
}
if skippy < 0 {
return ErrInvalidLengthMetric
}
if (iNdEx + skippy) < 0 {
if (skippy < 0) || (iNdEx+skippy) < 0 {
return ErrInvalidLengthMetric
}
if (iNdEx + skippy) > l {
......@@ -1825,6 +1865,7 @@ func (m *MetricFamily) Unmarshal(dAtA []byte) error {
func skipMetric(dAtA []byte) (n int, err error) {
l := len(dAtA)
iNdEx := 0
depth := 0
for iNdEx < l {
var wire uint64
for shift := uint(0); ; shift += 7 {
......@@ -1856,10 +1897,8 @@ func skipMetric(dAtA []byte) (n int, err error) {
break
}
}
return iNdEx, nil
case 1:
iNdEx += 8
return iNdEx, nil
case 2:
var length int
for shift := uint(0); ; shift += 7 {
......@@ -1880,55 +1919,30 @@ func skipMetric(dAtA []byte) (n int, err error) {
return 0, ErrInvalidLengthMetric
}
iNdEx += length
if iNdEx < 0 {
return 0, ErrInvalidLengthMetric
}
return iNdEx, nil
case 3:
for {
var innerWire uint64
var start int = iNdEx
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return 0, ErrIntOverflowMetric
}
if iNdEx >= l {
return 0, io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
innerWire |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
innerWireType := int(innerWire & 0x7)
if innerWireType == 4 {
break
}
next, err := skipMetric(dAtA[start:])
if err != nil {
return 0, err
}
iNdEx = start + next
if iNdEx < 0 {
return 0, ErrInvalidLengthMetric
}
}
return iNdEx, nil
depth++
case 4:
return iNdEx, nil
if depth == 0 {
return 0, ErrUnexpectedEndOfGroupMetric
}
depth--
case 5:
iNdEx += 4
return iNdEx, nil
default:
return 0, fmt.Errorf("proto: illegal wireType %d", wireType)
}
if iNdEx < 0 {
return 0, ErrInvalidLengthMetric
}
if depth == 0 {
return iNdEx, nil
}
}
panic("unreachable")
return 0, io.ErrUnexpectedEOF
}
var (
ErrInvalidLengthMetric = fmt.Errorf("proto: negative length found during unmarshaling")
ErrIntOverflowMetric = fmt.Errorf("proto: integer overflow")
ErrInvalidLengthMetric = fmt.Errorf("proto: negative length found during unmarshaling")
ErrIntOverflowMetric = fmt.Errorf("proto: integer overflow")
ErrUnexpectedEndOfGroupMetric = fmt.Errorf("proto: unexpected end of group")
)
This diff is collapsed.
......@@ -21,7 +21,7 @@ var _ = math.Inf
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.GoGoProtoPackageIsVersion2 // please upgrade the proto package
const _ = proto.GoGoProtoPackageIsVersion3 // please upgrade the proto package
// Timestamp is a HLC time value. All its field should never be accessed
// directly by its users.
......@@ -53,7 +53,7 @@ func (m *Timestamp) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_Timestamp.Marshal(b, m, deterministic)
} else {
b = b[:cap(b)]
n, err := m.MarshalTo(b)
n, err := m.MarshalToSizedBuffer(b)
if err != nil {
return nil, err
}
......@@ -109,7 +109,7 @@ var fileDescriptor_edac929d8ae1e24f = []byte{
func (m *Timestamp) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalTo(dAtA)
n, err := m.MarshalToSizedBuffer(dAtA[:size])
if err != nil {
return nil, err
}
......@@ -117,34 +117,42 @@ func (m *Timestamp) Marshal() (dAtA []byte, err error) {
}
func (m *Timestamp) MarshalTo(dAtA []byte) (int, error) {
var i int
size := m.Size()
return m.MarshalToSizedBuffer(dAtA[:size])
}
func (m *Timestamp) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i := len(dAtA)
_ = i
var l int
_ = l
if m.PhysicalTime != 0 {
dAtA[i] = 0x8
i++
i = encodeVarintTimestamp(dAtA, i, uint64(m.PhysicalTime))
if m.XXX_unrecognized != nil {
i -= len(m.XXX_unrecognized)
copy(dAtA[i:], m.XXX_unrecognized)
}
if m.LogicalTime != 0 {
dAtA[i] = 0x10
i++
i = encodeVarintTimestamp(dAtA, i, uint64(m.LogicalTime))
i--
dAtA[i] = 0x10
}
if m.XXX_unrecognized != nil {
i += copy(dAtA[i:], m.XXX_unrecognized)
if m.PhysicalTime != 0 {
i = encodeVarintTimestamp(dAtA, i, uint64(m.PhysicalTime))
i--
dAtA[i] = 0x8
}
return i, nil
return len(dAtA) - i, nil
}
func encodeVarintTimestamp(dAtA []byte, offset int, v uint64) int {
offset -= sovTimestamp(v)
base := offset
for v >= 1<<7 {
dAtA[offset] = uint8(v&0x7f | 0x80)
v >>= 7
offset++
}
dAtA[offset] = uint8(v)
return offset + 1
return base
}
func (m *Timestamp) Size() (n int) {
if m == nil {
......@@ -243,10 +251,7 @@ func (m *Timestamp) Unmarshal(dAtA []byte) error {
if err != nil {
return err
}
if skippy < 0 {
return ErrInvalidLengthTimestamp
}
if (iNdEx + skippy) < 0 {
if (skippy < 0) || (iNdEx+skippy) < 0 {
return ErrInvalidLengthTimestamp
}
if (iNdEx + skippy) > l {
......@@ -265,6 +270,7 @@ func (m *Timestamp) Unmarshal(dAtA []byte) error {
func skipTimestamp(dAtA []byte) (n int, err error) {
l := len(dAtA)
iNdEx := 0
depth := 0
for iNdEx < l {
var wire uint64
for shift := uint(0); ; shift += 7 {
......@@ -296,10 +302,8 @@ func skipTimestamp(dAtA []byte) (n int, err error) {
break
}
}
return iNdEx, nil
case 1:
iNdEx += 8
return iNdEx, nil
case 2:
var length int
for shift := uint(0); ; shift += 7 {
......@@ -320,55 +324,30 @@ func skipTimestamp(dAtA []byte) (n int, err error) {
return 0, ErrInvalidLengthTimestamp
}
iNdEx += length
if iNdEx < 0 {
return 0, ErrInvalidLengthTimestamp
}
return iNdEx, nil
case 3:
for {
var innerWire uint64
var start int = iNdEx
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return 0, ErrIntOverflowTimestamp
}
if iNdEx >= l {
return 0, io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
innerWire |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
innerWireType := int(innerWire & 0x7)
if innerWireType == 4 {
break
}
next, err := skipTimestamp(dAtA[start:])
if err != nil {
return 0, err
}
iNdEx = start + next
if iNdEx < 0 {
return 0, ErrInvalidLengthTimestamp
}
}
return iNdEx, nil
depth++
case 4:
return iNdEx, nil
if depth == 0 {
return 0, ErrUnexpectedEndOfGroupTimestamp
}
depth--
case 5:
iNdEx += 4
return iNdEx, nil
default:
return 0, fmt.Errorf("proto: illegal wireType %d", wireType)
}
if iNdEx < 0 {
return 0, ErrInvalidLengthTimestamp
}
if depth == 0 {
return iNdEx, nil
}
}
panic("unreachable")
return 0, io.ErrUnexpectedEOF
}
var (
ErrInvalidLengthTimestamp = fmt.Errorf("proto: negative length found during unmarshaling")
ErrIntOverflowTimestamp = fmt.Errorf("proto: integer overflow")
ErrInvalidLengthTimestamp = fmt.Errorf("proto: negative length found during unmarshaling")
ErrIntOverflowTimestamp = fmt.Errorf("proto: integer overflow")
ErrUnexpectedEndOfGroupTimestamp = fmt.Errorf("proto: unexpected end of group")
)
This diff is collapsed.
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment