Skip to content
Snippets Groups Projects
Unverified Commit 98b59cae authored by triump2020's avatar triump2020 Committed by GitHub
Browse files

Transaction pipeline new (#4718)

Approved by: @XuPeng-SH
parent 23e781f2
No related branches found
No related tags found
No related merge requests found
Showing
with 738 additions and 94 deletions
......@@ -187,19 +187,19 @@ func (be *BaseEntry) GetUpdateNodeLocked() *UpdateNode {
}
func (be *BaseEntry) GetCommittedNode() (node *UpdateNode) {
be.MVCC.Loop((func(n *common.DLNode) bool {
be.MVCC.Loop(func(n *common.DLNode) bool {
un := n.GetPayload().(*UpdateNode)
if !un.IsActive() {
node = un
return false
}
return true
}), false)
}, false)
return
}
func (be *BaseEntry) GetNodeToRead(startts types.TS) (node *UpdateNode) {
be.MVCC.Loop((func(n *common.DLNode) bool {
be.MVCC.Loop(func(n *common.DLNode) bool {
un := n.GetPayload().(*UpdateNode)
if un.IsActive() {
if un.IsSameTxn(startts) {
......@@ -213,7 +213,7 @@ func (be *BaseEntry) GetNodeToRead(startts types.TS) (node *UpdateNode) {
return false
}
return true
}), false)
}, false)
return
}
func (be *BaseEntry) DeleteBefore(ts types.TS) bool {
......@@ -379,8 +379,11 @@ func (be *BaseEntry) IsEmpty() bool {
head := be.MVCC.GetHead()
return head == nil
}
func (be *BaseEntry) ApplyRollback() error {
return nil
func (be *BaseEntry) ApplyRollback(index *wal.Index) error {
be.Lock()
defer be.Unlock()
return be.GetUpdateNodeLocked().ApplyRollback(index)
}
func (be *BaseEntry) ApplyCommit(index *wal.Index) error {
......@@ -484,6 +487,12 @@ func (be *BaseEntry) GetDeleted() bool {
return be.GetUpdateNodeLocked().Deleted
}
func (be *BaseEntry) Prepare2PCPrepare() error {
be.Lock()
defer be.Unlock()
return be.GetUpdateNodeLocked().Prepare2PCPrepare()
}
func (be *BaseEntry) PrepareCommit() error {
be.Lock()
defer be.Unlock()
......
......@@ -195,7 +195,19 @@ func (e *UpdateNode) ApplyCommit(index *wal.Index) (err error) {
return
}
func (e *UpdateNode) ApplyRollback() (err error) {
func (e *UpdateNode) ApplyRollback(index *wal.Index) (err error) {
e.AddLogIndex(index)
return
}
func (e *UpdateNode) Prepare2PCPrepare() (err error) {
if e.CreatedAt.IsEmpty() {
e.CreatedAt = e.Txn.GetPrepareTS()
}
if e.Deleted {
e.DeletedAt = e.Txn.GetPrepareTS()
}
e.End = e.Txn.GetPrepareTS()
return
}
......
......@@ -16,10 +16,6 @@ package txnif
import "github.com/matrixorigin/matrixone/pkg/container/types"
const (
// UncommitTS = ^uint64(0)
)
var UncommitTS types.TS
func init() {
......@@ -30,19 +26,40 @@ type TxnState int32
const (
TxnStateActive TxnState = iota
//TxnStateCommitting only for 1PC
TxnStateCommitting
//TxnStatePreparing only for 2PC
TxnStatePreparing
//TxnStatePrepared only for 2PC
TxnStatePrepared
//TxnStateCommittingFinished only for 2PC
TxnStateCommittingFinished
TxnStateRollbacking
TxnStateCommitted
TxnStateRollbacked
TxnStateUnknown
)
type TxnStatus int32
const (
// TxnStatusActive TxnStatus = iota
// TxnStatusPrepared
// TxnStatusCommittingFinished
// TxnStatusCommitted
// TxnStatusRollbacked
)
func TxnStrState(state TxnState) string {
switch state {
case TxnStateActive:
return "Active"
case TxnStateCommitting:
return "Committing"
case TxnStatePrepared:
return "Prepared"
case TxnStateCommittingFinished:
return "CommittingFinished"
case TxnStateRollbacking:
return "Rollbacking"
case TxnStateCommitted:
......
......@@ -29,9 +29,12 @@ import (
type Txn2PC interface {
PrepareRollback() error
ApplyRollback() error
PreCommit() error
PreCommitOr2PCPrepare() error
PrepareCommit() error
Prepare2PCPrepare() error
PreApplyCommit() error
PreApply2PCPrepare() error
Apply2PCPrepare() error
ApplyCommit() error
}
......@@ -42,6 +45,7 @@ type TxnReader interface {
GetCtx() []byte
GetStartTS() types.TS
GetCommitTS() types.TS
GetPrepareTS() types.TS
GetInfo() []byte
IsTerminated(bool) bool
IsVisible(o TxnReader) bool
......@@ -51,6 +55,7 @@ type TxnReader interface {
String() string
Repr() string
GetLSN() uint64
Event() int
SameTxn(startTs types.TS) bool
CommitBefore(startTs types.TS) bool
......@@ -73,9 +78,12 @@ type TxnChanger interface {
RUnlock()
ToCommittedLocked() error
ToCommittingLocked(ts types.TS) error
ToPreparingLocked(ts types.TS) error
ToRollbackedLocked() error
ToRollbackingLocked(ts types.TS) error
ToUnknownLocked()
Prepare() error
Committing() error
Commit() error
Rollback() error
SetError(error)
......@@ -227,8 +235,9 @@ type TxnEntry interface {
RLock()
RUnlock()
PrepareCommit() error
Prepare2PCPrepare() error
PrepareRollback() error
ApplyCommit(index *wal.Index) error
ApplyRollback() error
ApplyRollback(index *wal.Index) error
MakeCommand(uint32) (TxnCmd, error)
}
......@@ -59,7 +59,10 @@ func (entry *compactBlockEntry) PrepareRollback() (err error) {
// TODO: remove block file? (should be scheduled and executed async)
return
}
func (entry *compactBlockEntry) ApplyRollback() (err error) { return }
func (entry *compactBlockEntry) ApplyRollback(index *wal.Index) (err error) {
//TODO:?
return
}
func (entry *compactBlockEntry) ApplyCommit(index *wal.Index) (err error) {
if err = entry.scheduler.Checkpoint([]*wal.Index{index}); err != nil {
// TODO:
......@@ -86,6 +89,10 @@ func (entry *compactBlockEntry) MakeCommand(csn uint32) (cmd txnif.TxnCmd, err e
return
}
func (entry *compactBlockEntry) Prepare2PCPrepare() (err error) {
return
}
func (entry *compactBlockEntry) PrepareCommit() (err error) {
dataBlock := entry.from.GetMeta().(*catalog.BlockEntry).GetBlockData()
view, err := dataBlock.CollectChangesInRange(entry.txn.GetStartTS(), entry.txn.GetCommitTS())
......
......@@ -64,7 +64,10 @@ func (entry *mergeBlocksEntry) PrepareRollback() (err error) {
// TODO: remove block file? (should be scheduled and executed async)
return
}
func (entry *mergeBlocksEntry) ApplyRollback() (err error) { return }
func (entry *mergeBlocksEntry) ApplyRollback(index *wal.Index) (err error) {
//TODO::?
return
}
func (entry *mergeBlocksEntry) ApplyCommit(index *wal.Index) (err error) {
if err = entry.scheduler.Checkpoint([]*wal.Index{index}); err != nil {
// TODO:
......@@ -141,6 +144,10 @@ func (entry *mergeBlocksEntry) resolveAddr(fromPos int, fromOffset uint32) (toPo
return
}
func (entry *mergeBlocksEntry) Prepare2PCPrepare() (err error) {
return
}
func (entry *mergeBlocksEntry) PrepareCommit() (err error) {
blks := make([]handle.Block, len(entry.createdBlks))
for i, meta := range entry.createdBlks {
......
......@@ -30,13 +30,14 @@ import (
type AppendNode struct {
sync.RWMutex
commitTs types.TS
txn txnif.AsyncTxn
logIndex *wal.Index
startRow uint32
maxRow uint32
mvcc *MVCCHandle
id *common.ID
commitTs types.TS
prepareTs types.TS
txn txnif.AsyncTxn
logIndex *wal.Index
startRow uint32
maxRow uint32
mvcc *MVCCHandle
id *common.ID
}
func MockAppendNode(ts types.TS, startRow, maxRow uint32, mvcc *MVCCHandle) *AppendNode {
......@@ -63,7 +64,6 @@ func NewAppendNode(
txn txnif.AsyncTxn,
startRow, maxRow uint32,
mvcc *MVCCHandle) *AppendNode {
//ts := uint64(0)
var ts types.TS
if txn != nil {
ts = txn.GetCommitTS()
......@@ -106,9 +106,18 @@ func (node *AppendNode) GetStartRow() uint32 { return node.startRow }
func (node *AppendNode) GetMaxRow() uint32 { return node.maxRow }
func (node *AppendNode) SetMaxRow(row uint32) { node.maxRow = row }
func (node *AppendNode) Prepare2PCPrepare() error {
node.Lock()
defer node.Unlock()
node.prepareTs = node.txn.GetPrepareTS()
node.commitTs = node.txn.GetPrepareTS()
return nil
}
func (node *AppendNode) PrepareCommit() error {
node.Lock()
defer node.Unlock()
node.prepareTs = node.txn.GetCommitTS()
node.commitTs = node.txn.GetCommitTS()
return nil
}
......@@ -133,6 +142,13 @@ func (node *AppendNode) ApplyCommit(index *wal.Index) error {
return listener(node)
}
func (node *AppendNode) ApplyRollback(index *wal.Index) (err error) {
node.Lock()
defer node.Unlock()
node.logIndex = index
return
}
func (node *AppendNode) WriteTo(w io.Writer) (n int64, err error) {
cn, err := w.Write(txnbase.MarshalID(node.mvcc.GetID()))
if err != nil {
......@@ -183,7 +199,6 @@ func (node *AppendNode) PrepareRollback() (err error) {
node.mvcc.DeleteAppendNodeLocked(node)
return
}
func (node *AppendNode) ApplyRollback() (err error) { return }
func (node *AppendNode) MakeCommand(id uint32) (cmd txnif.TxnCmd, err error) {
cmd = NewAppendCmd(id, node)
return
......
......@@ -37,12 +37,13 @@ type ColumnUpdateNode struct {
mask *roaring.Bitmap
vals map[uint32]any
// nulls *roaring.Bitmap
chain *ColumnChain
startTs types.TS
commitTs types.TS
txn txnif.AsyncTxn
logIndex *wal.Index
id *common.ID
chain *ColumnChain
startTs types.TS
commitTs types.TS
prepareTs types.TS
txn txnif.AsyncTxn
logIndex *wal.Index
id *common.ID
}
func NewSimpleColumnUpdateNode() *ColumnUpdateNode {
......@@ -330,6 +331,22 @@ func (node *ColumnUpdateNode) StringLocked() string {
return s
}
func (node *ColumnUpdateNode) Prepare2PCPrepare() (err error) {
node.chain.Lock()
defer node.chain.Unlock()
if node.commitTs != txnif.UncommitTS {
return
}
// if node.commitTs != txnif.UncommitTS {
// panic("logic error")
// }
node.commitTs = node.txn.GetPrepareTS()
node.prepareTs = node.txn.GetPrepareTS()
node.chain.UpdateLocked(node)
// TODO: merge updates
return
}
func (node *ColumnUpdateNode) PrepareCommit() (err error) {
node.chain.Lock()
defer node.chain.Unlock()
......@@ -340,6 +357,7 @@ func (node *ColumnUpdateNode) PrepareCommit() (err error) {
// panic("logic error")
// }
node.commitTs = node.txn.GetCommitTS()
node.prepareTs = node.txn.GetCommitTS()
node.chain.UpdateLocked(node)
// TODO: merge updates
return
......@@ -363,4 +381,13 @@ func (node *ColumnUpdateNode) PrepareRollback() (err error) {
return
}
func (node *ColumnUpdateNode) ApplyRollback() (err error) { return }
func (node *ColumnUpdateNode) ApplyRollback(index *wal.Index) (err error) {
node.Lock()
defer node.Unlock()
if node.txn == nil {
panic("ColumnUpdateNode | ApplyCommit | LogicErr")
}
node.logIndex = index
return
}
......@@ -46,6 +46,7 @@ type DeleteNode struct {
mask *roaring.Bitmap
startTs types.TS
commitTs types.TS
prepareTs types.TS
nt NodeType
id *common.ID
dt handle.DeleteType
......@@ -164,6 +165,18 @@ func (node *DeleteNode) RangeDeleteLocked(start, end uint32) {
}
func (node *DeleteNode) GetCardinalityLocked() uint32 { return uint32(node.mask.GetCardinality()) }
func (node *DeleteNode) Prepare2PCPrepare() (err error) {
node.chain.mvcc.Lock()
defer node.chain.mvcc.Unlock()
if node.commitTs != txnif.UncommitTS {
return
}
node.commitTs = node.txn.GetPrepareTS()
node.prepareTs = node.txn.GetPrepareTS()
node.chain.UpdateLocked(node)
return
}
func (node *DeleteNode) PrepareCommit() (err error) {
node.chain.mvcc.Lock()
defer node.chain.mvcc.Unlock()
......@@ -171,6 +184,7 @@ func (node *DeleteNode) PrepareCommit() (err error) {
return
}
node.commitTs = node.txn.GetCommitTS()
node.prepareTs = node.txn.GetCommitTS()
node.chain.UpdateLocked(node)
return
}
......@@ -191,6 +205,15 @@ func (node *DeleteNode) ApplyCommit(index *wal.Index) (err error) {
return node.OnApply()
}
func (node *DeleteNode) ApplyRollback(index *wal.Index) (err error) {
node.Lock()
if node.txn == nil {
panic("DeleteNode | ApplyCommit | LogicErr")
}
node.logIndex = index
return
}
func (node *DeleteNode) GeneralString() string {
return fmt.Sprintf("TS=%d;Cnt=%d;LogIndex%v", node.commitTs, node.mask.GetCardinality(), node.logIndex)
}
......@@ -288,8 +311,6 @@ func (node *DeleteNode) PrepareRollback() (err error) {
return
}
func (node *DeleteNode) ApplyRollback() (err error) { return }
func (node *DeleteNode) OnApply() (err error) {
if node.dt == handle.DT_Normal {
listener := node.chain.mvcc.GetDeletesListener()
......
......@@ -306,6 +306,9 @@ func (n *MVCCHandle) GetTotalRow() uint32 {
return n.appends[len(n.appends)-1].maxRow - delets
}
// TODO::it will be rewritten in V0.6,since maxVisible of MVCC handel
//
// would not be increased monotonically.
func (n *MVCCHandle) getMaxVisibleRowLocked(ts types.TS) (int, uint32, bool, error) {
if len(n.appends) == 0 {
return 0, 0, false, nil
......
......@@ -24,4 +24,11 @@ var (
ErrTxnCannotRollback = errors.New("tae: txn cannot txn rollback")
ErrDDLDropCreated = errors.New("tae: DDL cannot drop created in a txn")
ErrTxnStateNotActive = errors.New("tae: txn is not in TxnStateActive")
ErrTxnStateNotPrepared = errors.New("tae: txn is not in TxnStatePrepared")
ErrTxnNotFound = errors.New("tae: txn is not found in txn manager")
ErrTxnStateCannotRollback = errors.New("tae: txn is not in right state,can not rollback")
ErrTxnStateCannotCommit = errors.New("tae: txn is not is right state, can not Commit")
)
......@@ -30,11 +30,14 @@ func (store *NoopTxnStore) BindTxn(txn txnif.AsyncTxn)
func (store *NoopTxnStore) Close() error { return nil }
func (store *NoopTxnStore) Append(dbId, id uint64, data *containers.Batch) error { return nil }
func (store *NoopTxnStore) PrepareRollback() error { return nil }
func (store *NoopTxnStore) PreCommit() error { return nil }
func (store *NoopTxnStore) PreCommitOr2PCPrepare() error { return nil }
func (store *NoopTxnStore) PrepareCommit() error { return nil }
func (store *NoopTxnStore) Prepare2PCPrepare() error { return nil }
func (store *NoopTxnStore) ApplyRollback() error { return nil }
func (store *NoopTxnStore) PreApplyCommit() error { return nil }
func (store *NoopTxnStore) PreApply2PCPrepare() error { return nil }
func (store *NoopTxnStore) ApplyCommit() error { return nil }
func (store *NoopTxnStore) Apply2PCPrepare() error { return nil }
func (store *NoopTxnStore) AddTxnEntry(t txnif.TxnEntryType, entry txnif.TxnEntry) {}
......
......@@ -31,6 +31,15 @@ type OpType int8
const (
OpCommit = iota
OpRollback
OpPrepare
OpCommitting
OpInvalid
)
const (
EventRollback = iota + 1
EventCommitting
EventCommit
)
type OpTxn struct {
......@@ -53,23 +62,26 @@ var DefaultTxnFactory = func(mgr *TxnManager, store txnif.TxnStore, id uint64, s
type Txn struct {
sync.WaitGroup
*TxnCtx
Mgr *TxnManager
Store txnif.TxnStore
Err error
LSN uint64
Ch chan int
Mgr *TxnManager
Store txnif.TxnStore
Err error
LSN uint64
TenantID, UserID, RoleID uint32
PrepareCommitFn func(txnif.AsyncTxn) error
PrepareRollbackFn func(txnif.AsyncTxn) error
ApplyCommitFn func(txnif.AsyncTxn) error
ApplyRollbackFn func(txnif.AsyncTxn) error
PrepareCommitFn func(txnif.AsyncTxn) error
Prepare2PCPrepareFn func(txnif.AsyncTxn) error
PrepareRollbackFn func(txnif.AsyncTxn) error
ApplyPrepareFn func(txnif.AsyncTxn) error
ApplyCommitFn func(txnif.AsyncTxn) error
ApplyRollbackFn func(txnif.AsyncTxn) error
}
func NewTxn(mgr *TxnManager, store txnif.TxnStore, txnId uint64, start types.TS, info []byte) *Txn {
txn := &Txn{
Mgr: mgr,
Store: store,
Ch: make(chan int, 1),
}
txn.TxnCtx = NewTxnCtx(txnId, start, info)
return txn
......@@ -85,21 +97,44 @@ func (txn *Txn) SetPrepareRollbackFn(fn func(txnif.AsyncTxn) error) { txn.Prepar
func (txn *Txn) SetApplyCommitFn(fn func(txnif.AsyncTxn) error) { txn.ApplyCommitFn = fn }
func (txn *Txn) SetApplyRollbackFn(fn func(txnif.AsyncTxn) error) { txn.ApplyRollbackFn = fn }
func (txn *Txn) Commit() (err error) {
if txn.Store.IsReadonly() {
txn.Mgr.DeleteTxn(txn.GetID())
return nil
//The state transition of transaction is as follows:
// 1PC: TxnStateActive--->TxnStateCommitting--->TxnStateCommitted/TxnStateRollbacked
// TxnStateActive--->TxnStateRollbacking--->TxnStateRollbacked
// 2PC running on Coordinator: TxnStateActive--->TxnStatePreparing-->TxnStatePrepared
// -->TxnStateCommittingFinished--->TxnStateCommitted or
// TxnStateActive--->TxnStatePreparing-->TxnStatePrepared-->TxnStateRollbacked or
// TxnStateActive--->TxnStateRollbacking--->TxnStateRollbacked.
// 2PC running on Participant: TxnStateActive--->TxnStatePrepared-->TxnStateCommitted or
// TxnStateActive--->TxnStatePrepared-->TxnStateRollbacked or
// TxnStateActive--->TxnStateRollbacking-->TxnStateRollbacked.
// Prepare is used to pre-commit a 2PC distributed transaction.
// Notice that once any error happened, we should rollback the txn.
// TODO: 1. How to handle the case in which log service timed out?
// 2. For a 2pc transaction, Rollback message may arrive before Prepare message,
// should handle this case by TxnStorage?
func (txn *Txn) Prepare() (err error) {
//TODO::should handle this by TxnStorage?
if txn.Mgr.GetTxn(txn.GetID()) == nil {
logutil.Warn("tae : txn is not found in TxnManager")
//txn.Err = ErrTxnNotFound
return ErrTxnNotFound
}
state := (txnif.TxnState)(atomic.LoadInt32((*int32)(&txn.State)))
if state != txnif.TxnStateActive {
logutil.Warnf("unexpected txn status : %s", txnif.TxnStrState(txn.State))
txn.Err = ErrTxnStateNotActive
return txn.Err
}
txn.Add(1)
err = txn.Mgr.OnOpTxn(&OpTxn{
Txn: txn,
Op: OpCommit,
Op: OpPrepare,
})
// TxnManager is closed
if err != nil {
txn.SetError(err)
txn.Lock()
//_ = txn.ToRollbackingLocked(txn.GetStartTS() + 1)
_ = txn.ToRollbackingLocked(txn.GetStartTS().Next())
txn.Unlock()
_ = txn.PrepareRollback()
......@@ -107,21 +142,68 @@ func (txn *Txn) Commit() (err error) {
txn.DoneWithErr(err)
}
txn.Wait()
txn.Mgr.DeleteTxn(txn.GetID())
if txn.Err == nil {
//txn.State = txnif.TxnStatePrepared
atomic.StoreInt32((*int32)(&txn.State), (int32)(txnif.TxnStatePrepared))
} else {
//txn.Status = txnif.TxnStatusRollbacked
txn.Mgr.DeleteTxn(txn.GetID())
}
return txn.GetError()
}
func (txn *Txn) GetStore() txnif.TxnStore {
return txn.Store
}
func (txn *Txn) GetLSN() uint64 { return txn.LSN }
// Rollback is used to roll back a 1PC or 2PC transaction.
// rollback's idempotent is handled here, Although Prepare/Commit/Committing message's idempotent
// is handled by the transaction framework.
// Notice that there may be a such scenario in which a 2PC distributed transaction in ACTIVE will be rollbacked,
// since Rollback message may arrive before the Prepare message. Should handle this case by TxnStorage?
func (txn *Txn) Rollback() (err error) {
//TODO:idempotent for rollback should be guaranteed by TxnStoage?
if txn.Mgr.GetTxn(txn.GetID()) == nil {
logutil.Warn("tae : txn is not found in TxnManager")
return
}
state := (txnif.TxnState)(atomic.LoadInt32((*int32)(&txn.State)))
if (!txn.Is2PC && state != txnif.TxnStateActive) ||
txn.Is2PC && state != txnif.TxnStateActive &&
state != txnif.TxnStatePrepared {
logutil.Warnf("unexpected txn status : %s", txnif.TxnStrState(txn.State))
return ErrTxnStateCannotRollback
}
if txn.Store.IsReadonly() {
txn.Mgr.DeleteTxn(txn.GetID())
return
}
//2PC
if txn.Is2PC {
if state == txnif.TxnStateActive {
txn.Add(1)
err = txn.Mgr.OnOpTxn(&OpTxn{
Txn: txn,
Op: OpRollback,
})
if err != nil {
_ = txn.PrepareRollback()
_ = txn.ApplyRollback()
txn.DoneWithErr(err)
}
txn.Wait()
//txn.Status = txnif.TxnStatusRollbacked
//atomic.StoreInt32((*int32)(&txn.State), (int32)(txnif.TxnStateRollbacked))
txn.Mgr.DeleteTxn(txn.GetID())
}
if state == txnif.TxnStatePrepared {
txn.Add(1)
txn.Ch <- EventRollback
//Wait txn rollbacked
txn.Wait()
//txn.Status = txnif.TxnStatusRollbacked
txn.Mgr.DeleteTxn(txn.GetID())
}
return txn.GetError()
}
//1PC
txn.Add(1)
err = txn.Mgr.OnOpTxn(&OpTxn{
Txn: txn,
......@@ -133,11 +215,106 @@ func (txn *Txn) Rollback() (err error) {
txn.DoneWithErr(err)
}
txn.Wait()
//txn.Status = txnif.TxnStatusRollbacked
txn.Mgr.DeleteTxn(txn.GetID())
return txn.Err
}
// Committing is used to record a log for 2PC distributed transaction running in Coordinator.
// Notice that transaction must be committed once committing message arrives, since Preparing
// had already succeeded.
func (txn *Txn) Committing() (err error) {
state := (txnif.TxnState)(atomic.LoadInt32((*int32)(&txn.State)))
if state != txnif.TxnStatePrepared {
logutil.Warnf("unexpected txn status : %s", txnif.TxnStrState(txn.State))
//txn.Err = ErrTxnStatusNotPrepared
return ErrTxnStateNotPrepared
}
txn.Add(1)
txn.Ch <- EventCommitting
txn.Wait()
//txn.Status = txnif.TxnStatusCommittingFinished
atomic.StoreInt32((*int32)(&txn.State), (int32)(txnif.TxnStateCommittingFinished))
return txn.Err
}
// Commit is used to commit a 1PC or 2PC transaction running in Coordinator or running in Participant.
// Notice that the Commit of a 2PC transaction must be success once the commit message arrives,
// since Preparing had already succeeded.
func (txn *Txn) Commit() (err error) {
state := (txnif.TxnState)(atomic.LoadInt32((*int32)(&txn.State)))
if (!txn.Is2PC && state != txnif.TxnStateActive) ||
txn.Is2PC && state != txnif.TxnStateCommittingFinished &&
state != txnif.TxnStatePrepared {
logutil.Warnf("unexpected txn state : %s", txnif.TxnStrState(txn.State))
//txn.Err = ErrTxnStatusCannotCommit
return ErrTxnStateCannotCommit
}
if txn.Store.IsReadonly() {
txn.Mgr.DeleteTxn(txn.GetID())
return nil
}
if txn.Is2PC {
//It's a 2PC transaction running in Coordinator
if state == txnif.TxnStateCommittingFinished {
//TODO:Append committed log entry into log service asynchronously
// for checkpointing the committing log entry
//txn.SetError(txn.LogTxnEntry())
if txn.Err == nil {
//txn.State = txnif.TxnStateCommitted
atomic.StoreInt32((*int32)(&txn.State), (int32)(txnif.TxnStateCommitted))
}
txn.Mgr.DeleteTxn(txn.GetID())
}
//It's a 2PC transaction running in Participant.
//Notice that Commit must be success once the commit message arrives,
//since Preparing had already succeeded.
if state == txnif.TxnStatePrepared {
txn.Add(1)
txn.Ch <- EventCommit
txn.Wait()
//txn.Status = txnif.TxnStatusCommitted
atomic.StoreInt32((*int32)(&txn.State), (int32)(txnif.TxnStateCommitted))
txn.Mgr.DeleteTxn(txn.GetID())
}
return txn.GetError()
}
//It's a 1PC transaction
txn.Add(1)
err = txn.Mgr.OnOpTxn(&OpTxn{
Txn: txn,
Op: OpCommit,
})
// TxnManager is closed
if err != nil {
txn.SetError(err)
txn.Lock()
_ = txn.ToRollbackingLocked(txn.GetStartTS().Next())
txn.Unlock()
_ = txn.PrepareRollback()
_ = txn.ApplyRollback()
txn.DoneWithErr(err)
}
txn.Wait()
//if txn.Err == nil {
//txn.Status = txnif.TxnStatusCommitted
//}
txn.Mgr.DeleteTxn(txn.GetID())
err = txn.Err
return txn.GetError()
}
func (txn *Txn) GetStore() txnif.TxnStore {
return txn.Store
}
func (txn *Txn) GetLSN() uint64 { return txn.LSN }
func (txn *Txn) Event() (e int) {
e = <-txn.Ch
return
}
// TODO::need to take 2PC txn account into.
func (txn *Txn) DoneWithErr(err error) {
txn.DoneCond.L.Lock()
if err != nil {
......@@ -175,13 +352,48 @@ func (txn *Txn) PrepareCommit() (err error) {
return err
}
func (txn *Txn) Prepare2PCPrepare() (err error) {
logutil.Debugf("Prepare Committing %d", txn.ID)
if txn.Prepare2PCPrepareFn != nil {
if err = txn.Prepare2PCPrepareFn(txn); err != nil {
return
}
}
err = txn.Store.Prepare2PCPrepare()
return err
}
func (txn *Txn) PreApplyCommit() (err error) {
err = txn.Store.PreApplyCommit()
return
}
func (txn *Txn) PreApply2PCPrepare() (err error) {
err = txn.Store.PreApply2PCPrepare()
return
}
// Apply2PCPrepare apply preparing for a 2PC distributed transaction
func (txn *Txn) Apply2PCPrepare() (err error) {
defer func() {
//Get the lsn of ETTxnRecord entry in GroupC
txn.LSN = txn.Store.GetLSN()
if err != nil {
txn.Store.Close()
}
}()
if txn.ApplyPrepareFn != nil {
if err = txn.ApplyPrepareFn(txn); err != nil {
return
}
}
err = txn.Store.Apply2PCPrepare()
return
}
func (txn *Txn) ApplyCommit() (err error) {
defer func() {
//Get the lsn of ETTxnRecord entry in GroupC.
txn.LSN = txn.Store.GetLSN()
if err == nil {
err = txn.Store.Close()
......@@ -216,8 +428,8 @@ func (txn *Txn) ApplyRollback() (err error) {
return
}
func (txn *Txn) PreCommit() error {
return txn.Store.PreCommit()
func (txn *Txn) PreCommitOr2PCPrepare() error {
return txn.Store.PreCommitOr2PCPrepare()
}
func (txn *Txn) PrepareRollback() (err error) {
......
......@@ -36,12 +36,13 @@ func IDCtxToID(buf []byte) uint64 {
type TxnCtx struct {
sync.RWMutex
DoneCond sync.Cond
ID uint64
IDCtx []byte
StartTS, CommitTS types.TS
Info []byte
State txnif.TxnState
DoneCond sync.Cond
ID uint64
IDCtx []byte
StartTS, CommitTS, PrepareTS types.TS
Info []byte
State txnif.TxnState
Is2PC bool
}
func NewTxnCtx(id uint64, start types.TS, info []byte) *TxnCtx {
......@@ -84,6 +85,11 @@ func (ctx *TxnCtx) GetCommitTS() types.TS {
defer ctx.RUnlock()
return ctx.CommitTS
}
func (ctx *TxnCtx) GetPrepareTS() types.TS {
ctx.RLock()
defer ctx.RUnlock()
return ctx.PrepareTS
}
// Atomically returns the current txn state
func (ctx *TxnCtx) getTxnState() txnif.TxnState {
......@@ -134,6 +140,19 @@ func (ctx *TxnCtx) IsActiveLocked() bool {
return ctx.CommitTS == txnif.UncommitTS
}
func (ctx *TxnCtx) ToPreparingLocked(ts types.TS) error {
if ts.LessEq(ctx.StartTS) {
panic(fmt.Sprintf("start ts %d should be less than commit ts %d", ctx.StartTS, ts))
}
if !ctx.CommitTS.Equal(txnif.UncommitTS) {
return ErrTxnNotActive
}
ctx.PrepareTS = ts
ctx.CommitTS = ts
ctx.State = txnif.TxnStatePreparing
return nil
}
func (ctx *TxnCtx) ToCommittingLocked(ts types.TS) error {
if ts.LessEq(ctx.StartTS) {
panic(fmt.Sprintf("start ts %d should be less than commit ts %d", ctx.StartTS, ts))
......@@ -142,6 +161,7 @@ func (ctx *TxnCtx) ToCommittingLocked(ts types.TS) error {
return ErrTxnNotActive
}
ctx.CommitTS = ts
ctx.PrepareTS = ts
ctx.State = txnif.TxnStateCommitting
return nil
}
......@@ -162,6 +182,7 @@ func (ctx *TxnCtx) ToRollbackingLocked(ts types.TS) error {
return ErrTxnCannotRollback
}
ctx.CommitTS = ts
ctx.PrepareTS = ts
ctx.State = txnif.TxnStateRollbacking
return nil
}
......
......@@ -34,7 +34,10 @@ type TxnFactory = func(*TxnManager, txnif.TxnStore, uint64, types.TS, []byte) tx
type TxnManager struct {
sync.RWMutex
common.ClosedState
sm.StateMachine
PreparingSM sm.StateMachine
//Notice that prepared transactions would be enqueued into
//the receiveQueue of CommittingSM at run time or replay time.
CommittingSM sm.StateMachine
IDMap map[uint64]txnif.AsyncTxn
IdAlloc *common.IdAlloctor
TsAlloc *types.TsAlloctor
......@@ -59,8 +62,12 @@ func NewTxnManager(txnStoreFactory TxnStoreFactory, txnFactory TxnFactory, clock
}),
Exception: new(atomic.Value)}
pqueue := sm.NewSafeQueue(20000, 1000, mgr.onPreparing)
cqueue := sm.NewSafeQueue(20000, 1000, mgr.onCommit)
mgr.StateMachine = sm.NewStateMachine(new(sync.WaitGroup), mgr, pqueue, cqueue)
fqueue := sm.NewSafeQueue(20000, 1000, mgr.onFlushing)
mgr.PreparingSM = sm.NewStateMachine(new(sync.WaitGroup), mgr, pqueue, fqueue)
cqueue := sm.NewSafeQueue(20000, 1000, mgr.onCommitting)
cfqueue := sm.NewSafeQueue(20000, 1000, mgr.onCFlushing)
mgr.CommittingSM = sm.NewStateMachine(new(sync.WaitGroup), mgr, cqueue, cfqueue)
return mgr
}
......@@ -115,7 +122,6 @@ func (mgr *TxnManager) DeleteTxn(id uint64) {
defer mgr.Unlock()
txn := mgr.IDMap[id]
delete(mgr.IDMap, id)
//mgr.ActiveMask.Remove(txn.GetStartTS())
mgr.Active.Delete(txn.GetStartTS())
}
......@@ -129,14 +135,29 @@ func (mgr *TxnManager) GetTxn(id uint64) txnif.AsyncTxn {
return mgr.IDMap[id]
}
func (mgr *TxnManager) EnqueueFlushing(op any) (err error) {
_, err = mgr.PreparingSM.EnqueueCheckpoint(op)
return
}
func (mgr *TxnManager) EnqueueCommitting(op any) (err error) {
_, err = mgr.CommittingSM.EnqueueRecevied(op)
return
}
func (mgr *TxnManager) EnqueueCFlushing(op any) (err error) {
_, err = mgr.CommittingSM.EnqueueCheckpoint(op)
return
}
func (mgr *TxnManager) OnOpTxn(op *OpTxn) (err error) {
_, err = mgr.EnqueueRecevied(op)
_, err = mgr.PreparingSM.EnqueueRecevied(op)
return
}
func (mgr *TxnManager) onPreCommit(txn txnif.AsyncTxn) {
func (mgr *TxnManager) onPreCommitOr2PCPrepare(txn txnif.AsyncTxn) {
now := time.Now()
txn.SetError(txn.PreCommit())
txn.SetError(txn.PreCommitOr2PCPrepare())
logutil.Debug("[PreCommit]", TxnField(txn), common.DurationField(time.Since(now)))
}
......@@ -144,6 +165,10 @@ func (mgr *TxnManager) onPreparCommit(txn txnif.AsyncTxn) {
txn.SetError(txn.PrepareCommit())
}
func (mgr *TxnManager) onPrepar2PCPrepare(txn txnif.AsyncTxn) {
txn.SetError(txn.Prepare2PCPrepare())
}
func (mgr *TxnManager) onPreApplyCommit(txn txnif.AsyncTxn) {
if err := txn.PreApplyCommit(); err != nil {
txn.SetError(err)
......@@ -151,18 +176,31 @@ func (mgr *TxnManager) onPreApplyCommit(txn txnif.AsyncTxn) {
}
}
func (mgr *TxnManager) onPreApply2PCPrepare(txn txnif.AsyncTxn) {
if err := txn.PreApply2PCPrepare(); err != nil {
txn.SetError(err)
mgr.OnException(err)
}
}
func (mgr *TxnManager) onPreparRollback(txn txnif.AsyncTxn) {
_ = txn.PrepareRollback()
}
// TODO
// onPreparing the commit of 1PC txn and prepare of 2PC txn
// must both enter into this queue for conflict check.
// OpCommit : the commit of 1PC txn
// OpPrepare: the prepare of 2PC txn
// OPRollback:the rollback of 2PC or 1PC
func (mgr *TxnManager) onPreparing(items ...any) {
now := time.Now()
for _, item := range items {
op := item.(*OpTxn)
if op.Op == OpCommit {
mgr.onPreCommit(op.Txn)
if op.Op == OpCommit || op.Op == OpPrepare {
//Mainly do conflict check for 1PC Commit or 2PC Prepare
mgr.onPreCommitOr2PCPrepare(op.Txn)
}
//Before this moment, all mvcc nodes of a txn has been pushed into the MVCCHandle.
mgr.Lock()
ts := mgr.TsAlloc.Alloc()
op.Txn.Lock()
......@@ -175,9 +213,12 @@ func (mgr *TxnManager) onPreparing(items ...any) {
} else if op.Op == OpRollback {
// Should not fail here
_ = op.Txn.ToRollbackingLocked(ts)
} else if op.Op == OpPrepare {
_ = op.Txn.ToPreparingLocked(ts)
}
op.Txn.Unlock()
mgr.Unlock()
//for 1PC Commit
if op.Op == OpCommit {
mgr.onPreparCommit(op.Txn)
if op.Txn.GetError() != nil {
......@@ -188,43 +229,150 @@ func (mgr *TxnManager) onPreparing(items ...any) {
op.Txn.Unlock()
mgr.onPreparRollback(op.Txn)
} else {
//1. Appending the data into appendableNode of block
// 2. Collect redo log,append into WalDriver
//TODO::need to handle the error,instead of panic for simplicity
mgr.onPreApplyCommit(op.Txn)
if op.Txn.GetError() != nil {
panic(op.Txn.GetID())
}
}
//for 2PC Prepare
} else if op.Op == OpPrepare {
mgr.onPrepar2PCPrepare(op.Txn)
if op.Txn.GetError() != nil {
op.Op = OpRollback
op.Txn.Lock()
// Should not fail here
_ = op.Txn.ToRollbackingLocked(ts)
op.Txn.Unlock()
mgr.onPreparRollback(op.Txn)
} else {
//1.Appending the data into appendableNode of block
// 2. Collect redo log,append into WalDriver
//TODO::need to handle the error,instead of panic for simplicity
mgr.onPreApply2PCPrepare(op.Txn)
if op.Txn.GetError() != nil {
panic(op.Txn.GetID())
}
}
} else {
//for 1PC or 2PC Rollback
mgr.onPreparRollback(op.Txn)
}
if _, err := mgr.EnqueueCheckpoint(op); err != nil {
if err := mgr.EnqueueFlushing(op); err != nil {
panic(err)
}
}
logutil.Debug("[PrepareCommit]",
logutil.Debug("[onPreparing]",
common.NameSpaceField("txns"),
common.DurationField(time.Since(now)),
common.CountField(len(items)))
}
// TODO
func (mgr *TxnManager) onCommit(items ...any) {
func (mgr *TxnManager) onFlushing(items ...any) {
var err error
now := time.Now()
for _, item := range items {
op := item.(*OpTxn)
switch op.Op {
//for 1PC Commit
case OpCommit:
if err = op.Txn.ApplyCommit(); err != nil {
mgr.OnException(err)
logutil.Warn("[ApplyCommit]", TxnField(op.Txn), common.ErrorField(err))
panic(err)
}
//for 2PC Prepare
case OpPrepare:
//wait for redo log synced.
if err = op.Txn.Apply2PCPrepare(); err != nil {
panic(err)
}
err = mgr.EnqueueCommitting(&OpTxn{
Txn: op.Txn,
Op: OpInvalid,
})
if err != nil {
panic(err)
}
//for 1PC or 2PC Rollback
case OpRollback:
if err = op.Txn.ApplyRollback(); err != nil {
mgr.OnException(err)
logutil.Warn("[ApplyRollback]", TxnField(op.Txn), common.ErrorField(err))
}
}
// Here only wait the txn to be done. The err returned can be access via op.Txn.GetError()
// Here only notify the user txn have been done with err.
// The err returned can be access via op.Txn.GetError()
_ = op.Txn.WaitDone(err)
}
logutil.Debug("[Commit]",
logutil.Debug("[onFlushing]",
common.NameSpaceField("txns"),
common.CountField(len(items)),
common.DurationField(time.Since(now)))
}
// wait for committing, commit, rollback events of 2PC distributed transactions
func (mgr *TxnManager) onCommitting(items ...any) {
var err error
now := time.Now()
for _, item := range items {
op := item.(OpTxn)
ev := op.Txn.Event()
switch ev {
//TODO:1. Set the commit ts passed by the Coordinator.
// 2. Apply Commit and append a committing log.
case EventCommitting:
//txn.LogTxnEntry()
op.Op = OpCommitting
//TODO:1. Set the commit ts passed by the Coordinator.
// 2. Apply Commit and append a commit log
case EventCommit:
//txn.LogTxnEntry
op.Op = OpCommit
case EventRollback:
pts := op.Txn.GetPrepareTS()
op.Txn.Lock()
//FIXME::set commit ts of a rollbacking transaction to its prepare ts?
_ = op.Txn.ToRollbackingLocked(pts)
op.Txn.Unlock()
op.Op = OpRollback
//_ = op.Txn.ApplyRollback()
//_ = op.Txn.WaitDone(err)
}
err = mgr.EnqueueCFlushing(op)
if err != nil {
panic(err)
}
}
logutil.Debug("[onCommitting]",
common.NameSpaceField("txns"),
common.CountField(len(items)),
common.DurationField(time.Since(now)))
}
func (mgr *TxnManager) onCFlushing(items ...any) {
var err error
now := time.Now()
for _, item := range items {
op := item.(OpTxn)
switch op.Op {
//TODO::wait for commit log synced
case OpCommit:
//TODO:wait for committing log synced.
case OpCommitting:
case OpRollback:
//Notice that can't call op.Txn.PrepareRollback here,
//since data had been appended into the appendableNode of block
//_ = op.Txn.PrepareRollback()
_ = op.Txn.ApplyRollback()
}
_ = op.Txn.WaitDone(err)
}
logutil.Debug("[onCFlushing]",
common.NameSpaceField("txns"),
common.CountField(len(items)),
common.DurationField(time.Since(now)))
......@@ -240,8 +388,14 @@ func (mgr *TxnManager) OnException(new error) {
}
}
func (mgr *TxnManager) Start() {
mgr.CommittingSM.Start()
mgr.PreparingSM.Start()
}
func (mgr *TxnManager) Stop() {
mgr.StateMachine.Stop()
mgr.PreparingSM.Stop()
mgr.CommittingSM.Stop()
mgr.OnException(common.ErrClose)
logutil.Info("[Stop]", TxnMgrField(mgr))
}
......@@ -349,6 +349,9 @@ func (store *txnStore) SoftDeleteSegment(dbId uint64, id *common.ID) (err error)
}
func (store *txnStore) ApplyRollback() (err error) {
if store.cmdMgr.GetCSN() == 0 {
return
}
for _, db := range store.dbs {
if err = db.ApplyRollback(); err != nil {
break
......@@ -357,6 +360,22 @@ func (store *txnStore) ApplyRollback() (err error) {
return
}
// ApplyPrepare apply preparing for a 2PC distributed transaction
func (store *txnStore) Apply2PCPrepare() (err error) {
for _, e := range store.logs {
if err = e.WaitDone(); err != nil {
return
}
e.Free()
}
for _, db := range store.dbs {
if err = db.Apply2PCPrepare(); err != nil {
break
}
}
return
}
func (store *txnStore) ApplyCommit() (err error) {
for _, e := range store.logs {
if err = e.WaitDone(); err != nil {
......@@ -372,9 +391,9 @@ func (store *txnStore) ApplyCommit() (err error) {
return
}
func (store *txnStore) PreCommit() (err error) {
func (store *txnStore) PreCommitOr2PCPrepare() (err error) {
for _, db := range store.dbs {
if err = db.PreCommit(); err != nil {
if err = db.PreCommitOr2PCPrepare(); err != nil {
return
}
}
......@@ -396,6 +415,21 @@ func (store *txnStore) PrepareCommit() (err error) {
return
}
func (store *txnStore) Prepare2PCPrepare() (err error) {
if store.warChecker != nil {
if err = store.warChecker.check(); err != nil {
return err
}
}
for _, db := range store.dbs {
if err = db.Prepare2PCPrepare(); err != nil {
break
}
}
return
}
func (store *txnStore) PreApplyCommit() (err error) {
now := time.Now()
for _, db := range store.dbs {
......@@ -422,6 +456,33 @@ func (store *txnStore) PreApplyCommit() (err error) {
return
}
func (store *txnStore) PreApply2PCPrepare() (err error) {
now := time.Now()
for _, db := range store.dbs {
if err = db.PreApply2PCPrepare(); err != nil {
return
}
}
if err = store.CollectCmd(); err != nil {
return
}
if store.cmdMgr.GetCSN() == 0 {
return
}
//TODO:How to distinguish prepare log of 2PC entry from commit log entry of 1PC?
//logEntry, err := store.cmdMgr.ApplyTxnRecord(store.txn.GetID(), type)
logEntry, err := store.cmdMgr.ApplyTxnRecord(store.txn.GetID())
if err != nil {
return
}
if logEntry != nil {
store.logs = append(store.logs, logEntry)
}
logutil.Debugf("Txn-%d PrepareCommit Takes %s", store.txn.GetID(), time.Since(now))
return
}
func (store *txnStore) CollectCmd() (err error) {
dbs := make([]*txnDB, len(store.dbs))
for _, db := range store.dbs {
......
......@@ -540,7 +540,8 @@ func (tbl *txnTable) UncommittedRows() uint32 {
return tbl.localSegment.Rows()
}
func (tbl *txnTable) PreCommitDedup() (err error) {
// PreCommitOr2PCPrepareDedup do deduplication check for 1PC Commit or 2PC Prepare
func (tbl *txnTable) PreCommitOr2PCPrepareDedup() (err error) {
if tbl.localSegment == nil || !tbl.schema.HasPK() {
return
}
......@@ -669,7 +670,7 @@ func (tbl *txnTable) ApplyAppend() (err error) {
return
}
func (tbl *txnTable) PreCommit() (err error) {
func (tbl *txnTable) PreCommitOr2PCPrepare() (err error) {
if tbl.localSegment != nil {
err = tbl.localSegment.PrepareApply()
}
......@@ -685,10 +686,23 @@ func (tbl *txnTable) PrepareCommit() (err error) {
return
}
func (tbl *txnTable) Prepare2PCPrepare() (err error) {
for _, node := range tbl.txnEntries {
if err = node.Prepare2PCPrepare(); err != nil {
break
}
}
return
}
func (tbl *txnTable) PreApplyCommit() (err error) {
return tbl.ApplyAppend()
}
func (tbl *txnTable) PreApply2PCPrepare() (err error) {
return tbl.ApplyAppend()
}
func (tbl *txnTable) ApplyCommit() (err error) {
csn := tbl.csnStart
for _, node := range tbl.txnEntries {
......@@ -701,10 +715,12 @@ func (tbl *txnTable) ApplyCommit() (err error) {
}
func (tbl *txnTable) ApplyRollback() (err error) {
csn := tbl.csnStart
for _, node := range tbl.txnEntries {
if err = node.ApplyRollback(); err != nil {
if err = node.ApplyRollback(tbl.store.cmdMgr.MakeLogIndex(csn)); err != nil {
break
}
csn++
}
return
}
......@@ -303,7 +303,7 @@ func (db *txnDB) SoftDeleteSegment(id *common.ID) (err error) {
func (db *txnDB) ApplyRollback() (err error) {
if db.createEntry != nil {
if err = db.createEntry.ApplyRollback(); err != nil {
if err = db.createEntry.ApplyRollback(db.store.cmdMgr.MakeLogIndex(db.ddlCSN)); err != nil {
return
}
}
......@@ -313,13 +313,23 @@ func (db *txnDB) ApplyRollback() (err error) {
}
}
if db.dropEntry != nil {
if err = db.dropEntry.ApplyRollback(); err != nil {
if err = db.dropEntry.ApplyRollback(db.store.cmdMgr.MakeLogIndex(db.ddlCSN)); err != nil {
return
}
}
return
}
// ApplyPrepare apply preparing for a 2PC distributed transaction
func (db *txnDB) Apply2PCPrepare() (err error) {
now := time.Now()
for _, table := range db.tables {
table.WaitSynced()
}
logutil.Debugf("Txn-%d ApplyCommit Takes %s", db.store.txn.GetID(), time.Since(now))
return
}
func (db *txnDB) ApplyCommit() (err error) {
now := time.Now()
for _, table := range db.tables {
......@@ -344,14 +354,14 @@ func (db *txnDB) ApplyCommit() (err error) {
return
}
func (db *txnDB) PreCommit() (err error) {
func (db *txnDB) PreCommitOr2PCPrepare() (err error) {
for _, table := range db.tables {
if err = table.PreCommitDedup(); err != nil {
if err = table.PreCommitOr2PCPrepareDedup(); err != nil {
return
}
}
for _, table := range db.tables {
if err = table.PreCommit(); err != nil {
if err = table.PreCommitOr2PCPrepare(); err != nil {
panic(err)
}
}
......@@ -381,6 +391,39 @@ func (db *txnDB) PrepareCommit() (err error) {
return
}
func (db *txnDB) Prepare2PCPrepare() (err error) {
now := time.Now()
if db.createEntry != nil {
if err = db.createEntry.Prepare2PCPrepare(); err != nil {
return
}
}
for _, table := range db.tables {
if err = table.Prepare2PCPrepare(); err != nil {
break
}
}
if db.dropEntry != nil {
if err = db.dropEntry.Prepare2PCPrepare(); err != nil {
return
}
}
logutil.Debugf("Txn-%d PrepareCommit Takes %s", db.store.txn.GetID(), time.Since(now))
return
}
func (db *txnDB) PreApply2PCPrepare() (err error) {
for _, table := range db.tables {
// table.ApplyAppend()
if err = table.PreApply2PCPrepare(); err != nil {
return
}
}
return
}
func (db *txnDB) PreApplyCommit() (err error) {
for _, table := range db.tables {
// table.ApplyAppend()
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment