Skip to content
Snippets Groups Projects
Unverified Commit e6c5e8f8 authored by nnsgmsone's avatar nnsgmsone Committed by GitHub
Browse files

Add writes and fileMap for txn (#4822)

Approved by: @reusee, @fengttt
parent 14caa985
No related branches found
No related tags found
No related merge requests found
// Copyright 2022 Matrix Origin
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package disttae
import (
"github.com/matrixorigin/matrixone/pkg/pb/plan"
)
func (db *DB) NewReader(databaseName, tableName string, expr *plan.Expr) Reader {
return nil
}
......@@ -19,6 +19,7 @@ import (
"time"
"github.com/matrixorigin/matrixone/pkg/common/moerr"
"github.com/matrixorigin/matrixone/pkg/container/batch"
"github.com/matrixorigin/matrixone/pkg/pb/logservice"
"github.com/matrixorigin/matrixone/pkg/txn/client"
"github.com/matrixorigin/matrixone/pkg/vm/engine"
......@@ -39,32 +40,40 @@ func New(
var _ engine.Engine = new(Engine)
func (e *Engine) Create(ctx context.Context, name string, op client.TxnOperator) error {
//TODO
e.newTransaction(op)
panic("unimplemented")
txn, err := e.getOrAddTransaction(op, e.getClusterDetails)
if err != nil {
return err
}
if err := txn.WriteBatch(INSERT, MO_CATALOG_ID, MO_DATABASE_ID, MO_CATALOG, MO_DATABASE, generateCreateDatabaseTuple(name)); err != nil {
return err
}
return nil
}
func (e *Engine) Database(ctx context.Context, name string, op client.TxnOperator) (engine.Database, error) {
//TODO
e.newTransaction(op)
panic("unimplemented")
}
func (e *Engine) Databases(ctx context.Context, op client.TxnOperator) ([]string, error) {
//TODO
e.newTransaction(op)
panic("unimplemented")
}
func (e *Engine) Delete(ctx context.Context, name string, op client.TxnOperator) error {
//TODO
e.newTransaction(op)
panic("unimplemented")
txn, err := e.getOrAddTransaction(op, e.getClusterDetails)
if err != nil {
return err
}
if err := txn.WriteBatch(DELETE, MO_CATALOG_ID, MO_DATABASE_ID, MO_CATALOG, MO_DATABASE, generateDropDatabaseTuple(name)); err != nil {
return err
}
return nil
}
// hasConflict used to detect if a transaction on a cn is in conflict,
// currently an empty implementation, assuming all transactions on a cn are conflict free
func (e *Engine) HasConflict(txn *Transaction) bool {
func (e *Engine) hasConflict(txn *Transaction) bool {
return false
}
......@@ -74,7 +83,7 @@ func (e *Engine) PreCommit(ctx context.Context, op client.TxnOperator) error {
return moerr.New(moerr.ErrTxnClosed, "the transaction has been committed or aborted")
}
defer e.delTransaction(txn)
if e.HasConflict(txn) {
if e.hasConflict(txn) {
return moerr.New(moerr.ErrTxnWriteConflict, "write conflict")
}
return nil
......@@ -112,30 +121,44 @@ func (e *Engine) Hints() (h engine.Hints) {
return
}
func (e *Engine) newTransaction(op client.TxnOperator) {
txn := &Transaction{
readOnly: false,
meta: op.Txn(),
func (e *Engine) getOrAddTransaction(op client.TxnOperator, getClusterDetails GetClusterDetailsFunc) (*Transaction, error) {
id := string(op.Txn().ID)
e.Lock()
defer e.Unlock()
txn, ok := e.txns[id]
if !ok {
cluster, err := getClusterDetails()
if err != nil {
return nil, err
}
txn = &Transaction{
readOnly: false,
meta: op.Txn(),
dnStores: cluster.DNStores,
fileMap: make(map[string]uint64),
}
txn.writes = append(txn.writes, make([]Entry, 0, 1))
e.txns[id] = txn
}
e.addTransaction(txn)
return txn, nil
}
func (e *Engine) getTransaction(op client.TxnOperator) *Transaction {
e.Lock()
defer e.Unlock()
e.RLock()
defer e.RUnlock()
return e.txns[string(op.Txn().ID)]
}
func (e *Engine) addTransaction(txn *Transaction) {
e.Lock()
defer e.Unlock()
if _, ok := e.txns[string(txn.meta.ID)]; !ok {
e.txns[string(txn.meta.ID)] = txn
}
}
func (e *Engine) delTransaction(txn *Transaction) {
e.Lock()
defer e.Unlock()
delete(e.txns, string(txn.meta.ID))
}
func generateCreateDatabaseTuple(name string) *batch.Batch {
return &batch.Batch{}
}
func generateDropDatabaseTuple(name string) *batch.Batch {
return &batch.Batch{}
}
......@@ -16,16 +16,115 @@ package disttae
import (
"context"
"math/rand"
"testing"
"time"
"github.com/google/uuid"
"github.com/matrixorigin/matrixone/pkg/common/moerr"
"github.com/matrixorigin/matrixone/pkg/container/batch"
"github.com/matrixorigin/matrixone/pkg/pb/logservice"
"github.com/matrixorigin/matrixone/pkg/pb/timestamp"
"github.com/matrixorigin/matrixone/pkg/pb/txn"
"github.com/matrixorigin/matrixone/pkg/txn/rpc"
"github.com/stretchr/testify/require"
)
func TestNewEngine(t *testing.T) {
type testTxnOperator struct {
meta txn.TxnMeta
}
func TestDB(t *testing.T) {
db := new(DB)
db.readTs = newTimestamp(rand.Int63())
_ = db.NewReader("test", "test", nil)
}
func TestEngine(t *testing.T) {
ctx := context.Background()
getClusterDetails := func() (details logservice.ClusterDetails, err error) {
return
}
engine := New(ctx, getClusterDetails)
_ = engine
txnOp := newTestTxnOperator()
e := New(ctx, getClusterDetails)
err := e.Create(ctx, "test", txnOp)
require.NoError(t, err)
err = e.Delete(ctx, "test", txnOp)
require.NoError(t, err)
err = e.PreCommit(ctx, txnOp)
require.NoError(t, err)
err = e.Rollback(ctx, txnOp)
require.Equal(t, moerr.New(moerr.ErrTxnClosed, "the transaction has been committed or aborted"), err)
_, err = e.Nodes()
require.NoError(t, err)
hints := e.Hints()
require.Equal(t, time.Minute*5, hints.CommitOrRollbackTimeout)
}
func TestTransaction(t *testing.T) {
txn := &Transaction{
readOnly: false,
meta: newTxnMeta(rand.Int63()),
fileMap: make(map[string]uint64),
}
txn.writes = append(txn.writes, make([]Entry, 0, 1))
ro := txn.ReadOnly()
require.Equal(t, false, ro)
err := txn.WriteBatch(INSERT, 0, 0, "test", "test", new(batch.Batch))
require.NoError(t, err)
txn.IncStatementId()
txn.RegisterFile("test")
err = txn.WriteFile(DELETE, 0, 0, "test", "test", "test")
require.NoError(t, err)
}
func newTestTxnOperator() *testTxnOperator {
return &testTxnOperator{
meta: newTxnMeta(rand.Int63()),
}
}
func (op *testTxnOperator) Txn() txn.TxnMeta {
return op.meta
}
func (op *testTxnOperator) Snapshot() ([]byte, error) {
return nil, nil
}
func (op *testTxnOperator) ApplySnapshot(data []byte) error {
return nil
}
func (op *testTxnOperator) Read(ctx context.Context, ops []txn.TxnRequest) (*rpc.SendResult, error) {
return nil, nil
}
func (op *testTxnOperator) Write(ctx context.Context, ops []txn.TxnRequest) (*rpc.SendResult, error) {
return nil, nil
}
func (op *testTxnOperator) WriteAndCommit(ctx context.Context, ops []txn.TxnRequest) (*rpc.SendResult, error) {
return nil, nil
}
func (op *testTxnOperator) Commit(ctx context.Context) error {
return nil
}
func (op *testTxnOperator) Rollback(ctx context.Context) error {
return nil
}
func newTimestamp(v int64) timestamp.Timestamp {
return timestamp.Timestamp{PhysicalTime: v}
}
func newTxnMeta(snapshotTS int64) txn.TxnMeta {
id := uuid.New()
return txn.TxnMeta{
ID: id[:],
Status: txn.TxnStatus_Active,
SnapshotTS: newTimestamp(snapshotTS),
}
}
......@@ -24,20 +24,43 @@ func (txn *Transaction) ReadOnly() bool {
}
// use for solving halloween problem
func (txn *Transaction) SetStatementId(id [2]uint64) {
txn.statementId = id
func (txn *Transaction) IncStatementId() {
txn.statementId++
txn.writes = append(txn.writes, make([]Entry, 0, 1))
}
// Write used to write data to the transaction buffer
// insert/delete/update all use this api
func (txn *Transaction) WriteBatch(bat *batch.Batch) error {
func (txn *Transaction) WriteBatch(typ int, databaseId, tableId uint64, databaseName, tableName string, bat *batch.Batch) error {
txn.readOnly = true
txn.writes[txn.statementId] = append(txn.writes[txn.statementId], Entry{
typ: typ,
bat: bat,
tableId: tableId,
databaseId: databaseId,
tableName: tableName,
databaseName: databaseName,
})
return nil
}
func (txn *Transaction) RegisterFile(fileName string) {
txn.fileMap[fileName] = txn.blockId
txn.blockId++
}
// WriteFile used to add a s3 file information to the transaction buffer
// insert/delete/update all use this api
func (txn *Transaction) WriteFile(file string) error {
func (txn *Transaction) WriteFile(typ int, databaseId, tableId uint64, databaseName, tableName string, fileName string) error {
txn.readOnly = true
txn.writes[txn.statementId] = append(txn.writes[txn.statementId], Entry{
typ: typ,
tableId: tableId,
databaseId: databaseId,
tableName: tableName,
databaseName: databaseName,
fileName: fileName,
blockId: txn.fileMap[fileName],
})
return nil
}
......@@ -17,19 +17,74 @@ package disttae
import (
"sync"
"github.com/matrixorigin/matrixone/pkg/container/batch"
"github.com/matrixorigin/matrixone/pkg/pb/logservice"
"github.com/matrixorigin/matrixone/pkg/pb/timestamp"
"github.com/matrixorigin/matrixone/pkg/pb/txn"
)
const (
INSERT = iota
DELETE
)
const (
// default database name for catalog
MO_CATALOG = "mo_catalog"
MO_DATABASE = "mo_database"
MO_TABLES = "mo_tables"
MO_COLUMNS = "mo_columns"
)
const (
// default database id for catalog
MO_CATALOG_ID = 0
MO_DATABASE_ID = 1
MO_TABLES_ID = 2
MO_COLUMNS_ID = 3
)
type Reader interface {
}
type Engine struct {
sync.Mutex
sync.RWMutex
getClusterDetails GetClusterDetailsFunc
txns map[string]*Transaction
}
// ReadOnly DB cache for tae
type DB struct {
readTs timestamp.Timestamp
}
// Transaction represents a transaction
type Transaction struct {
// readOnly default value is true, once a write happen, then set to false
readOnly bool
statementId [2]uint64
readOnly bool
// db *DB
// blockId starts at 0 and keeps incrementing,
// this is used to name the file on s3 and then give it to tae to use
blockId uint64
// use for solving halloween problem
statementId uint64
meta txn.TxnMeta
// fileMaps used to store the mapping relationship between s3 filenames and blockId
fileMap map[string]uint64
// writes cache stores any writes done by txn
// every statement is an element
writes [][]Entry
dnStores []logservice.DNStore
}
// Entry represents a delete/insert
type Entry struct {
typ int
tableId uint64
databaseId uint64
tableName string
databaseName string
fileName string // blockName for s3 file
blockId uint64 // blockId for s3 file
bat *batch.Batch // update or delete
}
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment