Skip to content
Snippets Groups Projects
Unverified Commit adff1462 authored by fagongzi's avatar fagongzi Committed by GitHub
Browse files

tests: Add txn test framework (#5588)

We need a transactional integration testing framework to do some white-box transactional testing. Using this framework, we can easily build a cluster test environment, specify the number of CN, DN, LOG services we need and control the start and stop of these services, and even simulate some network partitioning and error injection

Approved by: @yingfeng, @reusee, @cnutshell, @fengttt
parent 0b18d897
No related branches found
No related tags found
No related merge requests found
......@@ -66,6 +66,7 @@
/pkg/sql/parsers @iamlinjunhong
/pkg/sql/plan @ouyuanning @aunjgr
/pkg/tests/service @zhangxu19830126 @cnutshell
/pkg/tests/txn @zhangxu19830126
/pkg/testutil @nnsgmsone
/pkg/txn @zhangxu19830126
/pkg/txn/storage/memorystorage @reusee
......
......@@ -41,8 +41,9 @@ const (
var (
supportTxnStorageBackends = map[string]struct{}{
memStorageBackend: {},
taeStorageBackend: {},
memKVStorageBackend: {},
memStorageBackend: {},
taeStorageBackend: {},
}
)
......
......@@ -38,7 +38,8 @@ type CNService interface {
// ID returns uuid of store
ID() string
// SQLAddress returns the sql listen address
SQLAddress() string
//GetTaskRunner returns the taskRunner.
GetTaskRunner() taskservice.TaskRunner
}
......@@ -49,8 +50,8 @@ type CNService interface {
type cnService struct {
sync.Mutex
status ServiceStatus
uuid string
svc cnservice.Service
cfg *cnservice.Config
}
func (c *cnService) Start() error {
......@@ -94,7 +95,11 @@ func (c *cnService) ID() string {
c.Lock()
defer c.Unlock()
return c.uuid
return c.cfg.UUID
}
func (c *cnService) SQLAddress() string {
return fmt.Sprintf("127.0.0.1:%d", c.cfg.Frontend.Port)
}
func (c *cnService) GetTaskRunner() taskservice.TaskRunner {
......@@ -119,8 +124,8 @@ func newCNService(
return &cnService{
status: ServiceInitialized,
uuid: cfg.UUID,
svc: srv,
cfg: cfg,
}, nil
}
......
......@@ -141,8 +141,8 @@ func newDNService(
}, nil
}
// buildDnConfig builds configuration for a dn service.
func buildDnConfig(
// buildDNConfig builds configuration for a dn service.
func buildDNConfig(
index int, opt Options, address serviceAddresses,
) *dnservice.Config {
cfg := &dnservice.Config{
......@@ -163,10 +163,10 @@ func buildDnConfig(
return cfg
}
// buildDnOptions builds options for a dn service.
// buildDNOptions builds options for a dn service.
//
// NB: We need the filled version of dnservice.Config.
func buildDnOptions(cfg *dnservice.Config, filter FilterFunc) dnOptions {
func buildDNOptions(cfg *dnservice.Config, filter FilterFunc) dnOptions {
// factory to construct client for hakeeper
hakeeperClientFactory := func() (logservice.DNHAKeeperClient, error) {
ctx, cancel := context.WithTimeout(
......
......@@ -15,9 +15,10 @@
package service
import (
"github.com/matrixorigin/matrixone/pkg/taskservice"
"time"
"github.com/matrixorigin/matrixone/pkg/taskservice"
"go.uber.org/zap/zapcore"
"github.com/matrixorigin/matrixone/pkg/hakeeper"
......@@ -235,8 +236,8 @@ func (opt Options) WithRootDataDir(root string) Options {
return opt
}
// WithDnStorage sets dn transaction storage.
func (opt Options) WithDnTxnStorage(s string) Options {
// WithDNStorage sets dn transaction storage.
func (opt Options) WithDNTxnStorage(s string) Options {
opt.dn.txnStorageBackend = s
return opt
}
......@@ -299,6 +300,11 @@ func (opt Options) WithTaskStorage(storage taskservice.TaskStorage) Options {
return opt
}
// GetTxnStorageBackend returns the txn storage backend
func (opt Options) GetTxnStorageBackend() string {
return opt.dn.txnStorageBackend
}
// gossipSeedNum calculates the count of gossip seed.
//
// Select gossip addresses of the first 3 log services.
......
......@@ -63,9 +63,9 @@ func TestWithRootDataDir(t *testing.T) {
require.Equal(t, root, opt.rootDataDir)
}
func TestWithDnTxnStorage(t *testing.T) {
func TestWithDNTxnStorage(t *testing.T) {
s := "MEM"
opt := Options{}.WithDnTxnStorage(s)
opt := Options{}.WithDNTxnStorage(s)
require.Equal(t, s, opt.dn.txnStorageBackend)
}
......
......@@ -51,6 +51,8 @@ type Cluster interface {
Start() error
// Close stops svcs sequentially
Close() error
// Options returns the adjusted options
Options() Options
ClusterOperation
ClusterAwareness
......@@ -196,6 +198,10 @@ type ClusterWaitState interface {
WaitDNStoreReported(ctx context.Context, uuid string)
// WaitDNStoreReportedIndexed waits dn store reported by index.
WaitDNStoreReportedIndexed(ctx context.Context, index int)
// WaitCNStoreReported waits cn store reported by uuid.
WaitCNStoreReported(ctx context.Context, uuid string)
// WaitCNStoreReportedIndexed waits cn store reported by index.
WaitCNStoreReportedIndexed(ctx context.Context, index int)
// WaitLogStoreTimeout waits log store timeout by uuid.
WaitLogStoreTimeout(ctx context.Context, uuid string)
......@@ -285,7 +291,7 @@ func NewCluster(t *testing.T, opt Options) (Cluster, error) {
c.log.cfgs, c.log.opts = c.buildLogConfigs(c.network.addresses)
// build dn service configurations
c.dn.cfgs, c.dn.opts = c.buildDnConfigs(c.network.addresses)
c.dn.cfgs, c.dn.opts = c.buildDNConfigs(c.network.addresses)
c.cn.cfgs, c.cn.opts = c.buildCNConfigs(c.network.addresses)
......@@ -326,6 +332,10 @@ func (c *testCluster) Start() error {
return nil
}
func (c *testCluster) Options() Options {
return c.opt
}
func (c *testCluster) Close() error {
c.mu.Lock()
defer c.mu.Unlock()
......@@ -792,6 +802,41 @@ func (c *testCluster) WaitDNStoreReportedIndexed(ctx context.Context, index int)
c.WaitDNStoreReported(ctx, ds.ID())
}
func (c *testCluster) WaitCNStoreReported(ctx context.Context, uuid string) {
for {
select {
case <-ctx.Done():
assert.FailNow(
c.t,
"terminated when waiting cn store reported",
"cn store %s, error: %s", uuid, ctx.Err(),
)
default:
time.Sleep(defaultWaitInterval)
expired, err := c.CNStoreExpired(uuid)
if err != nil {
c.logger.Error("fail to check cn store expired or not",
zap.Error(err),
zap.String("uuid", uuid),
)
continue
}
if !expired {
return
}
}
}
}
func (c *testCluster) WaitCNStoreReportedIndexed(ctx context.Context, index int) {
ds, err := c.GetCNServiceIndexed(index)
require.NoError(c.t, err)
c.WaitCNStoreReported(ctx, ds.ID())
}
func (c *testCluster) WaitLogStoreTimeout(ctx context.Context, uuid string) {
for {
select {
......@@ -1113,8 +1158,8 @@ func (c *testCluster) buildFileServices() *fileServices {
return newFileServices(c.t, c.opt.initial.dnServiceNum, c.opt.initial.cnServiceNum)
}
// buildDnConfigs builds configurations for all dn services.
func (c *testCluster) buildDnConfigs(
// buildDNConfigs builds configurations for all dn services.
func (c *testCluster) buildDNConfigs(
address serviceAddresses,
) ([]*dnservice.Config, []dnOptions) {
batch := c.opt.initial.dnServiceNum
......@@ -1122,11 +1167,11 @@ func (c *testCluster) buildDnConfigs(
cfgs := make([]*dnservice.Config, 0, batch)
opts := make([]dnOptions, 0, batch)
for i := 0; i < batch; i++ {
cfg := buildDnConfig(i, c.opt, address)
cfg := buildDNConfig(i, c.opt, address)
cfgs = append(cfgs, cfg)
localAddr := cfg.ListenAddress
opt := buildDnOptions(cfg, c.backendFilterFactory(localAddr))
opt := buildDNOptions(cfg, c.backendFilterFactory(localAddr))
opts = append(opts, opt)
}
return cfgs, opts
......
// Copyright 2021 - 2022 Matrix Origin
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package txn
import (
"context"
"fmt"
"testing"
"time"
"github.com/matrixorigin/matrixone/pkg/common/stopper"
"github.com/matrixorigin/matrixone/pkg/logutil"
"github.com/matrixorigin/matrixone/pkg/pb/logservice"
"github.com/matrixorigin/matrixone/pkg/tests/service"
"github.com/matrixorigin/matrixone/pkg/txn/clock"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
)
var (
defaultTestTimeout = time.Minute
memKVTxnStorage = "MEMKV"
memTxnStorage = "MEM"
)
type cluster struct {
t *testing.T
logger *zap.Logger
clock clock.Clock
env service.Cluster
stopper *stopper.Stopper
}
// NewCluster new txn testing cluster based on the service.Cluster
func NewCluster(t *testing.T, options service.Options) (Cluster, error) {
env, err := service.NewCluster(t, options)
if err != nil {
return nil, err
}
stopper := stopper.NewStopper("test-env-stopper")
return &cluster{
t: t,
env: env,
logger: logutil.GetPanicLoggerWithLevel(zap.DebugLevel),
clock: clock.NewUnixNanoHLCClockWithStopper(stopper, 0),
stopper: stopper,
}, nil
}
func (c *cluster) Start() {
if err := c.env.Start(); err != nil {
assert.FailNow(c.t, "start testing cluster failed")
}
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
c.env.WaitHAKeeperState(ctx, logservice.HAKeeperRunning)
c.env.WaitHAKeeperLeader(ctx)
c.env.WaitDNShardsReported(ctx)
}
func (c *cluster) Stop() {
if err := c.env.Close(); err != nil {
assert.FailNow(c.t, "stop testing cluster failed")
}
}
func (c *cluster) Env() service.Cluster {
return c.env
}
func (c *cluster) NewClient() Client {
backend := c.env.Options().GetTxnStorageBackend()
switch backend {
case memKVTxnStorage:
cli, err := newKVClient(c.env, c.clock, c.logger)
require.NoError(c.t, err)
return cli
case memTxnStorage:
cli, err := newSQLClient(c.env)
require.NoError(c.t, err)
return cli
default:
panic(fmt.Sprintf("%s backend txn storage not support", backend))
}
}
// Copyright 2021 - 2022 Matrix Origin
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package txn
import (
"testing"
"github.com/matrixorigin/matrixone/pkg/tests/service"
"github.com/matrixorigin/matrixone/pkg/txn/client"
"github.com/stretchr/testify/require"
)
var (
testBackends = []string{memKVTxnStorage, memTxnStorage}
)
func TestBasicSingleShard(t *testing.T) {
// this case will start a mo cluster with 1 CNService, 1 DNService and 3 LogService.
// A Txn read and write will success.
for _, backend := range testBackends {
t.Run(backend, func(t *testing.T) {
c, err := NewCluster(t,
getBasicClusterOptions(backend))
require.NoError(t, err)
c.Start()
defer c.Stop()
cli := c.NewClient()
key := "k"
value := "v"
checkRead(t, mustNewTxn(t, cli), key, "", nil, true)
checkWrite(t, mustNewTxn(t, cli), key, value, nil, true)
checkRead(t, mustNewTxn(t, cli), key, value, nil, true)
})
}
}
func TestBasicSingleShardCannotReadUncomittedValue(t *testing.T) {
// this case will start a mo cluster with 1 CNService, 1 DNService and 3 LogService.
// 1. start t1
// 2. start t2
// 3. t1 write
// 4. t2 can not read t1's write
for _, backend := range testBackends {
t.Run(backend, func(t *testing.T) {
c, err := NewCluster(t,
getBasicClusterOptions(memKVTxnStorage))
require.NoError(t, err)
c.Start()
defer c.Stop()
cli := c.NewClient()
key := "k"
value := "v"
t1 := mustNewTxn(t, cli)
t2 := mustNewTxn(t, cli)
checkWrite(t, t1, key, value, nil, false)
checkRead(t, t2, key, "", nil, true)
})
}
}
func checkRead(t *testing.T, txn Txn, key string, expectValue string, expectError error, commit bool) {
v, err := txn.Read(key)
defer func() {
if commit {
require.NoError(t, txn.Commit())
}
}()
require.Equal(t, expectError, err)
require.Equal(t, expectValue, v)
}
func checkWrite(t *testing.T, txn Txn, key, value string, expectError error, commit bool) {
defer func() {
if commit {
require.NoError(t, txn.Commit())
}
}()
require.Equal(t, expectError, txn.Write(key, value))
}
func getBasicClusterOptions(txnStorageBackend string) service.Options {
options := service.DefaultOptions().
WithDNShardNum(1).
WithLogShardNum(1).
WithDNServiceNum(1).
WithLogServiceNum(3).
WithCNShardNum(0).
WithCNServiceNum(0).
WithDNTxnStorage(txnStorageBackend)
if txnStorageBackend != memKVTxnStorage {
options = options.WithCNShardNum(1).
WithCNServiceNum(1)
}
return options
}
func mustNewTxn(t *testing.T, cli Client, options ...client.TxnOption) Txn {
txn, err := cli.NewTxn(options...)
require.NoError(t, err)
return txn
}
// Copyright 2021 - 2022 Matrix Origin
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
/*
The current package is based on an integration testing framework to test
end-to-end transactions.
*/
package txn
// Copyright 2021 - 2022 Matrix Origin
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package txn
import (
"context"
"hash/fnv"
"sort"
"sync"
"time"
"github.com/matrixorigin/matrixone/pkg/common/moerr"
"github.com/matrixorigin/matrixone/pkg/pb/metadata"
"github.com/matrixorigin/matrixone/pkg/pb/txn"
"github.com/matrixorigin/matrixone/pkg/tests/service"
"github.com/matrixorigin/matrixone/pkg/txn/client"
"github.com/matrixorigin/matrixone/pkg/txn/clock"
"github.com/matrixorigin/matrixone/pkg/txn/rpc"
"github.com/matrixorigin/matrixone/pkg/txn/storage/mem"
"go.uber.org/zap"
)
// kvClient the kvClient use the pkg/txn/storage/mem as the txn storage. Sends transaction requests directly to the DN,
// bypassing the CN and assuming some of the CN's responsibilities, such as routing, etc.
type kvClient struct {
env service.Cluster
client client.TxnClient
mu struct {
sync.RWMutex
dnshards []metadata.DNShard
}
}
func newKVClient(env service.Cluster,
clock clock.Clock,
logger *zap.Logger) (Client, error) {
sender, err := rpc.NewSender(clock, logger)
if err != nil {
return nil, err
}
c := &kvClient{
env: env,
client: client.NewTxnClient(sender),
}
c.refreshDNShards()
return c, nil
}
func (c *kvClient) NewTxn(options ...client.TxnOption) (Txn, error) {
op, err := c.client.New(options...)
if err != nil {
return nil, err
}
return &kvTxn{op: op,
env: c.env,
router: c.getTargetDN,
refreshRouter: c.refreshDNShards}, nil
}
func (c *kvClient) getTargetDN(key string) metadata.DNShard {
h := fnv.New32()
if _, err := h.Write([]byte(key)); err != nil {
panic(err)
}
v := h.Sum32()
c.mu.RLock()
defer c.mu.RUnlock()
return c.mu.dnshards[v%uint32(len(c.mu.dnshards))]
}
func (c *kvClient) refreshDNShards() {
c.mu.Lock()
defer c.mu.Unlock()
c.refreshDNShardsLocked()
}
func (c *kvClient) refreshDNShardsLocked() {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
for {
state, err := c.env.GetClusterState(ctx)
if err != nil {
time.Sleep(time.Second)
continue
}
c.mu.dnshards = c.mu.dnshards[:0]
for _, store := range state.DNState.Stores {
for _, shard := range store.Shards {
c.mu.dnshards = append(c.mu.dnshards, metadata.DNShard{
DNShardRecord: metadata.DNShardRecord{
ShardID: shard.ShardID,
},
ReplicaID: shard.ReplicaID,
Address: store.ServiceAddress,
})
}
}
sort.Slice(c.mu.dnshards, func(i, j int) bool {
return c.mu.dnshards[i].ShardID < c.mu.dnshards[j].ShardID
})
break
}
}
type kvTxn struct {
op client.TxnOperator
env service.Cluster
router func(key string) metadata.DNShard
refreshRouter func()
mu struct {
sync.Mutex
closed bool
}
}
func (kop *kvTxn) Commit() error {
kop.mu.Lock()
defer kop.mu.Unlock()
kop.mu.closed = true
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
return kop.op.Commit(ctx)
}
func (kop *kvTxn) Rollback() error {
kop.mu.Lock()
defer kop.mu.Unlock()
if kop.mu.closed {
return nil
}
kop.mu.closed = true
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
return kop.op.Rollback(ctx)
}
func (kop *kvTxn) Read(key string) (string, error) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
for {
req := mem.NewGetTxnRequest([][]byte{[]byte(key)})
req.CNRequest.Target = kop.router(key)
result, err := kop.op.Read(ctx, []txn.TxnRequest{req})
if err != nil {
if moerr.IsMoErrCode(err, moerr.ErrDNShardNotFound) {
kop.refreshRouter()
continue
}
return "", err
}
defer result.Release()
values := mem.MustParseGetPayload(result.Responses[0].CNOpResponse.Payload)
if len(values) != 1 {
panic("invalid read responses")
}
if values[0] == nil {
return "", nil
}
return string(values[0]), nil
}
}
func (kop *kvTxn) Write(key, value string) error {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
for {
req := mem.NewSetTxnRequest([][]byte{[]byte(key)}, [][]byte{[]byte(value)})
req.CNRequest.Target = kop.router(key)
result, err := kop.op.Write(ctx,
[]txn.TxnRequest{req})
if err != nil {
if moerr.IsMoErrCode(err, moerr.ErrDNShardNotFound) {
kop.refreshRouter()
continue
}
return err
}
result.Release()
return nil
}
}
// Copyright 2021 - 2022 Matrix Origin
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package txn
import (
"context"
"database/sql"
"fmt"
"sync"
_ "github.com/go-sql-driver/mysql"
"github.com/matrixorigin/matrixone/pkg/common/moerr"
"github.com/matrixorigin/matrixone/pkg/tests/service"
"github.com/matrixorigin/matrixone/pkg/txn/client"
"go.uber.org/multierr"
)
var (
createDB = `create database if not exists kv_test`
useDB = `use kv_test;`
createSql = `create table if not exists txn_test_kv (kv_key varchar(20) primary key, kv_value varchar(10))`
)
// sqlClient use sql client to connect to CN node and use a table to simulate rr test KV operations
type sqlClient struct {
cn service.CNService
}
func newSQLClient(env service.Cluster) (Client, error) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
env.WaitCNStoreReportedIndexed(ctx, 0)
cn, err := env.GetCNServiceIndexed(0)
if err != nil {
return nil, err
}
db, err := sql.Open("mysql", fmt.Sprintf("dump:111@tcp(%s)/", cn.SQLAddress()))
if err != nil {
return nil, err
}
_, err = db.Exec(createDB)
if err != nil {
return nil, multierr.Append(err, db.Close())
}
_, err = db.Exec(useDB)
if err != nil {
return nil, multierr.Append(err, db.Close())
}
_, err = db.Exec(createSql)
if err != nil {
return nil, multierr.Append(err, db.Close())
}
return &sqlClient{
cn: cn,
}, multierr.Append(err, db.Close())
}
func (c *sqlClient) NewTxn(options ...client.TxnOption) (Txn, error) {
return newSQLTxn(c.cn)
}
type sqlTxn struct {
db *sql.DB
txn *sql.Tx
mu struct {
sync.Mutex
closed bool
}
}
func newSQLTxn(cn service.CNService) (Txn, error) {
db, err := sql.Open("mysql", fmt.Sprintf("dump:111@tcp(%s)/kv_test", cn.SQLAddress()))
if err != nil {
return nil, err
}
txn, err := db.Begin()
if err != nil {
return nil, multierr.Append(err, db.Close())
}
return &sqlTxn{
db: db,
txn: txn,
}, nil
}
func (kop *sqlTxn) Commit() error {
kop.mu.Lock()
defer kop.mu.Unlock()
if kop.mu.closed {
return moerr.NewTxnClosed()
}
kop.mu.closed = true
err := kop.txn.Commit()
if err != nil {
return multierr.Append(err, kop.db.Close())
}
return kop.db.Close()
}
func (kop *sqlTxn) Rollback() error {
kop.mu.Lock()
defer kop.mu.Unlock()
if kop.mu.closed {
return nil
}
err := kop.txn.Rollback()
if err != nil {
return multierr.Append(err, kop.db.Close())
}
return kop.db.Close()
}
func (kop *sqlTxn) Read(key string) (string, error) {
rows, err := kop.txn.Query(fmt.Sprintf("select kv_value from txn_test_kv where kv_key = '%s'", key))
if err != nil {
return "", err
}
if !rows.Next() {
return "", rows.Close()
}
v := ""
if err := rows.Scan(&v); err != nil {
return "", multierr.Append(err, rows.Close())
}
return v, multierr.Append(err, rows.Close())
}
func (kop *sqlTxn) Write(key, value string) error {
v, err := kop.Read(key)
if err != nil {
return err
}
if v == "" {
return kop.insert(key, value)
}
return kop.update(key, value)
}
func (kop *sqlTxn) insert(key, value string) error {
res, err := kop.txn.Exec(fmt.Sprintf("insert into txn_test_kv(kv_key, kv_value) values('%s', '%s')", key, value))
if err != nil {
return err
}
n, err := res.RowsAffected()
if err != nil {
panic(err)
}
if n != 1 {
panic(n)
}
return err
}
func (kop *sqlTxn) update(key, value string) error {
_, err := kop.txn.Exec(fmt.Sprintf("update txn_test_kv set kv_value = '%s' where kv_key = '%s'", key, value))
return err
}
// Copyright 2021 - 2022 Matrix Origin
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package txn
import (
"github.com/matrixorigin/matrixone/pkg/tests/service"
"github.com/matrixorigin/matrixone/pkg/txn/client"
)
// Cluster txn testing cluster
type Cluster interface {
// Start start the cluster, block until all service started and all DNShard created
Start()
// Stop stop the cluster
Stop()
// Env return the test cluster env
Env() service.Cluster
// NewClient create a test txn client
NewClient() Client
}
// Client used to execute read and write.
type Client interface {
// NewTxn create a txn to execute read and write command
NewTxn(options ...client.TxnOption) (Txn, error)
}
// Txn txn operation handler
type Txn interface {
// Commit commit the txn
Commit() error
// Rollback rollback the txn
Rollback() error
// Read read by key
Read(key string) (string, error)
// Write write key
Write(key, value string) error
}
......@@ -21,6 +21,7 @@ import (
"fmt"
"math"
"sync"
"time"
"github.com/matrixorigin/matrixone/pkg/common/moerr"
"github.com/matrixorigin/matrixone/pkg/logservice"
......@@ -430,9 +431,16 @@ func (kv *KVTxnStorage) getWriteKeysLocked(txnMeta txn.TxnMeta) [][]byte {
}
func (kv *KVTxnStorage) saveLog(log *KVLog) (logservice.Lsn, error) {
return kv.logClient.Append(context.Background(), logservice.LogRecord{
Data: log.MustMarshal(),
})
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
data := log.MustMarshal()
record := kv.logClient.GetLogRecord(len(data))
if len(record.Data) == 0 {
record.Data = data
} else {
copy(record.Data[len(record.Data)-len(data):], data)
}
return kv.logClient.Append(ctx, record)
}
func (kv *KVTxnStorage) commitWithKVLogLocked(klog *KVLog) {
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment