diff --git a/pkg/vm/engine/tae/catalog/base.go b/pkg/vm/engine/tae/catalog/base.go index fc44b1d91498047787cb8752b8eb9a54380a5453..827f7945ce814a20a930cd8c2b3483c60fdffe78 100644 --- a/pkg/vm/engine/tae/catalog/base.go +++ b/pkg/vm/engine/tae/catalog/base.go @@ -187,19 +187,19 @@ func (be *BaseEntry) GetUpdateNodeLocked() *UpdateNode { } func (be *BaseEntry) GetCommittedNode() (node *UpdateNode) { - be.MVCC.Loop((func(n *common.DLNode) bool { + be.MVCC.Loop(func(n *common.DLNode) bool { un := n.GetPayload().(*UpdateNode) if !un.IsActive() { node = un return false } return true - }), false) + }, false) return } func (be *BaseEntry) GetNodeToRead(startts types.TS) (node *UpdateNode) { - be.MVCC.Loop((func(n *common.DLNode) bool { + be.MVCC.Loop(func(n *common.DLNode) bool { un := n.GetPayload().(*UpdateNode) if un.IsActive() { if un.IsSameTxn(startts) { @@ -213,7 +213,7 @@ func (be *BaseEntry) GetNodeToRead(startts types.TS) (node *UpdateNode) { return false } return true - }), false) + }, false) return } func (be *BaseEntry) DeleteBefore(ts types.TS) bool { @@ -379,8 +379,11 @@ func (be *BaseEntry) IsEmpty() bool { head := be.MVCC.GetHead() return head == nil } -func (be *BaseEntry) ApplyRollback() error { - return nil +func (be *BaseEntry) ApplyRollback(index *wal.Index) error { + be.Lock() + defer be.Unlock() + return be.GetUpdateNodeLocked().ApplyRollback(index) + } func (be *BaseEntry) ApplyCommit(index *wal.Index) error { @@ -484,6 +487,12 @@ func (be *BaseEntry) GetDeleted() bool { return be.GetUpdateNodeLocked().Deleted } +func (be *BaseEntry) Prepare2PCPrepare() error { + be.Lock() + defer be.Unlock() + return be.GetUpdateNodeLocked().Prepare2PCPrepare() +} + func (be *BaseEntry) PrepareCommit() error { be.Lock() defer be.Unlock() diff --git a/pkg/vm/engine/tae/catalog/updatenode.go b/pkg/vm/engine/tae/catalog/updatenode.go index 7e16aa2ca94234c54180b80517de99677de20932..2a8275c5096be22773095825dc73c270a1a0843d 100644 --- a/pkg/vm/engine/tae/catalog/updatenode.go +++ b/pkg/vm/engine/tae/catalog/updatenode.go @@ -195,7 +195,19 @@ func (e *UpdateNode) ApplyCommit(index *wal.Index) (err error) { return } -func (e *UpdateNode) ApplyRollback() (err error) { +func (e *UpdateNode) ApplyRollback(index *wal.Index) (err error) { + e.AddLogIndex(index) + return +} + +func (e *UpdateNode) Prepare2PCPrepare() (err error) { + if e.CreatedAt.IsEmpty() { + e.CreatedAt = e.Txn.GetPrepareTS() + } + if e.Deleted { + e.DeletedAt = e.Txn.GetPrepareTS() + } + e.End = e.Txn.GetPrepareTS() return } diff --git a/pkg/vm/engine/tae/iface/txnif/consts.go b/pkg/vm/engine/tae/iface/txnif/consts.go index ef29db18204babc27a865f46d8337be4492b8920..5ff63ae365da2815cbe039e08491a0a0bb25beb9 100644 --- a/pkg/vm/engine/tae/iface/txnif/consts.go +++ b/pkg/vm/engine/tae/iface/txnif/consts.go @@ -16,10 +16,6 @@ package txnif import "github.com/matrixorigin/matrixone/pkg/container/types" -const ( -// UncommitTS = ^uint64(0) -) - var UncommitTS types.TS func init() { @@ -30,19 +26,40 @@ type TxnState int32 const ( TxnStateActive TxnState = iota + //TxnStateCommitting only for 1PC TxnStateCommitting + //TxnStatePreparing only for 2PC + TxnStatePreparing + //TxnStatePrepared only for 2PC + TxnStatePrepared + //TxnStateCommittingFinished only for 2PC + TxnStateCommittingFinished TxnStateRollbacking TxnStateCommitted TxnStateRollbacked TxnStateUnknown ) +type TxnStatus int32 + +const ( +// TxnStatusActive TxnStatus = iota +// TxnStatusPrepared +// TxnStatusCommittingFinished +// TxnStatusCommitted +// TxnStatusRollbacked +) + func TxnStrState(state TxnState) string { switch state { case TxnStateActive: return "Active" case TxnStateCommitting: return "Committing" + case TxnStatePrepared: + return "Prepared" + case TxnStateCommittingFinished: + return "CommittingFinished" case TxnStateRollbacking: return "Rollbacking" case TxnStateCommitted: diff --git a/pkg/vm/engine/tae/iface/txnif/types.go b/pkg/vm/engine/tae/iface/txnif/types.go index c89dc5d2bd64eb721331a0b347e7792d0c2a8353..3fb50dd79caba340dc90676c3ac1e73f603e3d8a 100644 --- a/pkg/vm/engine/tae/iface/txnif/types.go +++ b/pkg/vm/engine/tae/iface/txnif/types.go @@ -29,9 +29,12 @@ import ( type Txn2PC interface { PrepareRollback() error ApplyRollback() error - PreCommit() error + PreCommitOr2PCPrepare() error PrepareCommit() error + Prepare2PCPrepare() error PreApplyCommit() error + PreApply2PCPrepare() error + Apply2PCPrepare() error ApplyCommit() error } @@ -42,6 +45,7 @@ type TxnReader interface { GetCtx() []byte GetStartTS() types.TS GetCommitTS() types.TS + GetPrepareTS() types.TS GetInfo() []byte IsTerminated(bool) bool IsVisible(o TxnReader) bool @@ -51,6 +55,7 @@ type TxnReader interface { String() string Repr() string GetLSN() uint64 + Event() int SameTxn(startTs types.TS) bool CommitBefore(startTs types.TS) bool @@ -73,9 +78,12 @@ type TxnChanger interface { RUnlock() ToCommittedLocked() error ToCommittingLocked(ts types.TS) error + ToPreparingLocked(ts types.TS) error ToRollbackedLocked() error ToRollbackingLocked(ts types.TS) error ToUnknownLocked() + Prepare() error + Committing() error Commit() error Rollback() error SetError(error) @@ -227,8 +235,9 @@ type TxnEntry interface { RLock() RUnlock() PrepareCommit() error + Prepare2PCPrepare() error PrepareRollback() error ApplyCommit(index *wal.Index) error - ApplyRollback() error + ApplyRollback(index *wal.Index) error MakeCommand(uint32) (TxnCmd, error) } diff --git a/pkg/vm/engine/tae/tables/txnentries/compactblk.go b/pkg/vm/engine/tae/tables/txnentries/compactblk.go index c8a3268e9d1f239461c59ebb7613369c7c4afccb..6fab912118b360f732b3de82bda2bbbf6d3db6b5 100644 --- a/pkg/vm/engine/tae/tables/txnentries/compactblk.go +++ b/pkg/vm/engine/tae/tables/txnentries/compactblk.go @@ -59,7 +59,10 @@ func (entry *compactBlockEntry) PrepareRollback() (err error) { // TODO: remove block file? (should be scheduled and executed async) return } -func (entry *compactBlockEntry) ApplyRollback() (err error) { return } +func (entry *compactBlockEntry) ApplyRollback(index *wal.Index) (err error) { + //TODO:? + return +} func (entry *compactBlockEntry) ApplyCommit(index *wal.Index) (err error) { if err = entry.scheduler.Checkpoint([]*wal.Index{index}); err != nil { // TODO: @@ -86,6 +89,10 @@ func (entry *compactBlockEntry) MakeCommand(csn uint32) (cmd txnif.TxnCmd, err e return } +func (entry *compactBlockEntry) Prepare2PCPrepare() (err error) { + return +} + func (entry *compactBlockEntry) PrepareCommit() (err error) { dataBlock := entry.from.GetMeta().(*catalog.BlockEntry).GetBlockData() view, err := dataBlock.CollectChangesInRange(entry.txn.GetStartTS(), entry.txn.GetCommitTS()) diff --git a/pkg/vm/engine/tae/tables/txnentries/mergeblocks.go b/pkg/vm/engine/tae/tables/txnentries/mergeblocks.go index 698fff7d9a83a91e9ec872e6ce82e9230105350c..25455946c885bde87015d59570179382619ae137 100644 --- a/pkg/vm/engine/tae/tables/txnentries/mergeblocks.go +++ b/pkg/vm/engine/tae/tables/txnentries/mergeblocks.go @@ -64,7 +64,10 @@ func (entry *mergeBlocksEntry) PrepareRollback() (err error) { // TODO: remove block file? (should be scheduled and executed async) return } -func (entry *mergeBlocksEntry) ApplyRollback() (err error) { return } +func (entry *mergeBlocksEntry) ApplyRollback(index *wal.Index) (err error) { + //TODO::? + return +} func (entry *mergeBlocksEntry) ApplyCommit(index *wal.Index) (err error) { if err = entry.scheduler.Checkpoint([]*wal.Index{index}); err != nil { // TODO: @@ -141,6 +144,10 @@ func (entry *mergeBlocksEntry) resolveAddr(fromPos int, fromOffset uint32) (toPo return } +func (entry *mergeBlocksEntry) Prepare2PCPrepare() (err error) { + return +} + func (entry *mergeBlocksEntry) PrepareCommit() (err error) { blks := make([]handle.Block, len(entry.createdBlks)) for i, meta := range entry.createdBlks { diff --git a/pkg/vm/engine/tae/tables/updates/append.go b/pkg/vm/engine/tae/tables/updates/append.go index afd4a1b83377492bfc882837fcd6d6521aa2af26..33ad5514e97447da4c9b13ae12d4b738d87483f8 100644 --- a/pkg/vm/engine/tae/tables/updates/append.go +++ b/pkg/vm/engine/tae/tables/updates/append.go @@ -30,13 +30,14 @@ import ( type AppendNode struct { sync.RWMutex - commitTs types.TS - txn txnif.AsyncTxn - logIndex *wal.Index - startRow uint32 - maxRow uint32 - mvcc *MVCCHandle - id *common.ID + commitTs types.TS + prepareTs types.TS + txn txnif.AsyncTxn + logIndex *wal.Index + startRow uint32 + maxRow uint32 + mvcc *MVCCHandle + id *common.ID } func MockAppendNode(ts types.TS, startRow, maxRow uint32, mvcc *MVCCHandle) *AppendNode { @@ -63,7 +64,6 @@ func NewAppendNode( txn txnif.AsyncTxn, startRow, maxRow uint32, mvcc *MVCCHandle) *AppendNode { - //ts := uint64(0) var ts types.TS if txn != nil { ts = txn.GetCommitTS() @@ -106,9 +106,18 @@ func (node *AppendNode) GetStartRow() uint32 { return node.startRow } func (node *AppendNode) GetMaxRow() uint32 { return node.maxRow } func (node *AppendNode) SetMaxRow(row uint32) { node.maxRow = row } +func (node *AppendNode) Prepare2PCPrepare() error { + node.Lock() + defer node.Unlock() + node.prepareTs = node.txn.GetPrepareTS() + node.commitTs = node.txn.GetPrepareTS() + return nil +} + func (node *AppendNode) PrepareCommit() error { node.Lock() defer node.Unlock() + node.prepareTs = node.txn.GetCommitTS() node.commitTs = node.txn.GetCommitTS() return nil } @@ -133,6 +142,13 @@ func (node *AppendNode) ApplyCommit(index *wal.Index) error { return listener(node) } +func (node *AppendNode) ApplyRollback(index *wal.Index) (err error) { + node.Lock() + defer node.Unlock() + node.logIndex = index + return +} + func (node *AppendNode) WriteTo(w io.Writer) (n int64, err error) { cn, err := w.Write(txnbase.MarshalID(node.mvcc.GetID())) if err != nil { @@ -183,7 +199,6 @@ func (node *AppendNode) PrepareRollback() (err error) { node.mvcc.DeleteAppendNodeLocked(node) return } -func (node *AppendNode) ApplyRollback() (err error) { return } func (node *AppendNode) MakeCommand(id uint32) (cmd txnif.TxnCmd, err error) { cmd = NewAppendCmd(id, node) return diff --git a/pkg/vm/engine/tae/tables/updates/colupdate.go b/pkg/vm/engine/tae/tables/updates/colupdate.go index de517815b2c65f8e59879a2652b7a817c3a696f1..d7325800bca669b476897bb1a056b7346854b1e3 100644 --- a/pkg/vm/engine/tae/tables/updates/colupdate.go +++ b/pkg/vm/engine/tae/tables/updates/colupdate.go @@ -37,12 +37,13 @@ type ColumnUpdateNode struct { mask *roaring.Bitmap vals map[uint32]any // nulls *roaring.Bitmap - chain *ColumnChain - startTs types.TS - commitTs types.TS - txn txnif.AsyncTxn - logIndex *wal.Index - id *common.ID + chain *ColumnChain + startTs types.TS + commitTs types.TS + prepareTs types.TS + txn txnif.AsyncTxn + logIndex *wal.Index + id *common.ID } func NewSimpleColumnUpdateNode() *ColumnUpdateNode { @@ -330,6 +331,22 @@ func (node *ColumnUpdateNode) StringLocked() string { return s } +func (node *ColumnUpdateNode) Prepare2PCPrepare() (err error) { + node.chain.Lock() + defer node.chain.Unlock() + if node.commitTs != txnif.UncommitTS { + return + } + // if node.commitTs != txnif.UncommitTS { + // panic("logic error") + // } + node.commitTs = node.txn.GetPrepareTS() + node.prepareTs = node.txn.GetPrepareTS() + node.chain.UpdateLocked(node) + // TODO: merge updates + return +} + func (node *ColumnUpdateNode) PrepareCommit() (err error) { node.chain.Lock() defer node.chain.Unlock() @@ -340,6 +357,7 @@ func (node *ColumnUpdateNode) PrepareCommit() (err error) { // panic("logic error") // } node.commitTs = node.txn.GetCommitTS() + node.prepareTs = node.txn.GetCommitTS() node.chain.UpdateLocked(node) // TODO: merge updates return @@ -363,4 +381,13 @@ func (node *ColumnUpdateNode) PrepareRollback() (err error) { return } -func (node *ColumnUpdateNode) ApplyRollback() (err error) { return } +func (node *ColumnUpdateNode) ApplyRollback(index *wal.Index) (err error) { + node.Lock() + defer node.Unlock() + if node.txn == nil { + panic("ColumnUpdateNode | ApplyCommit | LogicErr") + } + node.logIndex = index + return + +} diff --git a/pkg/vm/engine/tae/tables/updates/delete.go b/pkg/vm/engine/tae/tables/updates/delete.go index 8771bcb3fcd22a33f7746073cc55e1bf702eccb4..3e5dc90cd0eda37730912c7924447bd9968e719c 100644 --- a/pkg/vm/engine/tae/tables/updates/delete.go +++ b/pkg/vm/engine/tae/tables/updates/delete.go @@ -46,6 +46,7 @@ type DeleteNode struct { mask *roaring.Bitmap startTs types.TS commitTs types.TS + prepareTs types.TS nt NodeType id *common.ID dt handle.DeleteType @@ -164,6 +165,18 @@ func (node *DeleteNode) RangeDeleteLocked(start, end uint32) { } func (node *DeleteNode) GetCardinalityLocked() uint32 { return uint32(node.mask.GetCardinality()) } +func (node *DeleteNode) Prepare2PCPrepare() (err error) { + node.chain.mvcc.Lock() + defer node.chain.mvcc.Unlock() + if node.commitTs != txnif.UncommitTS { + return + } + node.commitTs = node.txn.GetPrepareTS() + node.prepareTs = node.txn.GetPrepareTS() + node.chain.UpdateLocked(node) + return +} + func (node *DeleteNode) PrepareCommit() (err error) { node.chain.mvcc.Lock() defer node.chain.mvcc.Unlock() @@ -171,6 +184,7 @@ func (node *DeleteNode) PrepareCommit() (err error) { return } node.commitTs = node.txn.GetCommitTS() + node.prepareTs = node.txn.GetCommitTS() node.chain.UpdateLocked(node) return } @@ -191,6 +205,15 @@ func (node *DeleteNode) ApplyCommit(index *wal.Index) (err error) { return node.OnApply() } +func (node *DeleteNode) ApplyRollback(index *wal.Index) (err error) { + node.Lock() + if node.txn == nil { + panic("DeleteNode | ApplyCommit | LogicErr") + } + node.logIndex = index + return +} + func (node *DeleteNode) GeneralString() string { return fmt.Sprintf("TS=%d;Cnt=%d;LogIndex%v", node.commitTs, node.mask.GetCardinality(), node.logIndex) } @@ -288,8 +311,6 @@ func (node *DeleteNode) PrepareRollback() (err error) { return } -func (node *DeleteNode) ApplyRollback() (err error) { return } - func (node *DeleteNode) OnApply() (err error) { if node.dt == handle.DT_Normal { listener := node.chain.mvcc.GetDeletesListener() diff --git a/pkg/vm/engine/tae/tables/updates/mvcc.go b/pkg/vm/engine/tae/tables/updates/mvcc.go index 5bc48029a444f8f96ef83f482f3c90a3c5dfeb9c..9905dabd776c9f9700f16aae2bd12a971f67e936 100644 --- a/pkg/vm/engine/tae/tables/updates/mvcc.go +++ b/pkg/vm/engine/tae/tables/updates/mvcc.go @@ -306,6 +306,9 @@ func (n *MVCCHandle) GetTotalRow() uint32 { return n.appends[len(n.appends)-1].maxRow - delets } +// TODO::it will be rewritten in V0.6,since maxVisible of MVCC handel +// +// would not be increased monotonically. func (n *MVCCHandle) getMaxVisibleRowLocked(ts types.TS) (int, uint32, bool, error) { if len(n.appends) == 0 { return 0, 0, false, nil diff --git a/pkg/vm/engine/tae/txn/txnbase/errors.go b/pkg/vm/engine/tae/txn/txnbase/errors.go index a6bf25fce41f424f8af5fec76bf19534c84d7bab..eddb7724ec4ddeec9e23f8ef89dbe8a2890e2c2b 100644 --- a/pkg/vm/engine/tae/txn/txnbase/errors.go +++ b/pkg/vm/engine/tae/txn/txnbase/errors.go @@ -24,4 +24,11 @@ var ( ErrTxnCannotRollback = errors.New("tae: txn cannot txn rollback") ErrDDLDropCreated = errors.New("tae: DDL cannot drop created in a txn") + + ErrTxnStateNotActive = errors.New("tae: txn is not in TxnStateActive") + ErrTxnStateNotPrepared = errors.New("tae: txn is not in TxnStatePrepared") + ErrTxnNotFound = errors.New("tae: txn is not found in txn manager") + + ErrTxnStateCannotRollback = errors.New("tae: txn is not in right state,can not rollback") + ErrTxnStateCannotCommit = errors.New("tae: txn is not is right state, can not Commit") ) diff --git a/pkg/vm/engine/tae/txn/txnbase/store.go b/pkg/vm/engine/tae/txn/txnbase/store.go index 8d2245a815c2d1d871eaba16cbb99092107529e8..6d48b1b76ab504cf5eea730468fcb736bcf1c585 100644 --- a/pkg/vm/engine/tae/txn/txnbase/store.go +++ b/pkg/vm/engine/tae/txn/txnbase/store.go @@ -30,11 +30,14 @@ func (store *NoopTxnStore) BindTxn(txn txnif.AsyncTxn) func (store *NoopTxnStore) Close() error { return nil } func (store *NoopTxnStore) Append(dbId, id uint64, data *containers.Batch) error { return nil } func (store *NoopTxnStore) PrepareRollback() error { return nil } -func (store *NoopTxnStore) PreCommit() error { return nil } +func (store *NoopTxnStore) PreCommitOr2PCPrepare() error { return nil } func (store *NoopTxnStore) PrepareCommit() error { return nil } +func (store *NoopTxnStore) Prepare2PCPrepare() error { return nil } func (store *NoopTxnStore) ApplyRollback() error { return nil } func (store *NoopTxnStore) PreApplyCommit() error { return nil } +func (store *NoopTxnStore) PreApply2PCPrepare() error { return nil } func (store *NoopTxnStore) ApplyCommit() error { return nil } +func (store *NoopTxnStore) Apply2PCPrepare() error { return nil } func (store *NoopTxnStore) AddTxnEntry(t txnif.TxnEntryType, entry txnif.TxnEntry) {} diff --git a/pkg/vm/engine/tae/txn/txnbase/txn.go b/pkg/vm/engine/tae/txn/txnbase/txn.go index 14696f5549023c18a8dfc58339b8eec0547deb53..14de94f8c7c3017b366425b1ea5b6cee3c7cef64 100644 --- a/pkg/vm/engine/tae/txn/txnbase/txn.go +++ b/pkg/vm/engine/tae/txn/txnbase/txn.go @@ -31,6 +31,15 @@ type OpType int8 const ( OpCommit = iota OpRollback + OpPrepare + OpCommitting + OpInvalid +) + +const ( + EventRollback = iota + 1 + EventCommitting + EventCommit ) type OpTxn struct { @@ -53,23 +62,26 @@ var DefaultTxnFactory = func(mgr *TxnManager, store txnif.TxnStore, id uint64, s type Txn struct { sync.WaitGroup *TxnCtx - Mgr *TxnManager - Store txnif.TxnStore - Err error - LSN uint64 - + Ch chan int + Mgr *TxnManager + Store txnif.TxnStore + Err error + LSN uint64 TenantID, UserID, RoleID uint32 - PrepareCommitFn func(txnif.AsyncTxn) error - PrepareRollbackFn func(txnif.AsyncTxn) error - ApplyCommitFn func(txnif.AsyncTxn) error - ApplyRollbackFn func(txnif.AsyncTxn) error + PrepareCommitFn func(txnif.AsyncTxn) error + Prepare2PCPrepareFn func(txnif.AsyncTxn) error + PrepareRollbackFn func(txnif.AsyncTxn) error + ApplyPrepareFn func(txnif.AsyncTxn) error + ApplyCommitFn func(txnif.AsyncTxn) error + ApplyRollbackFn func(txnif.AsyncTxn) error } func NewTxn(mgr *TxnManager, store txnif.TxnStore, txnId uint64, start types.TS, info []byte) *Txn { txn := &Txn{ Mgr: mgr, Store: store, + Ch: make(chan int, 1), } txn.TxnCtx = NewTxnCtx(txnId, start, info) return txn @@ -85,21 +97,44 @@ func (txn *Txn) SetPrepareRollbackFn(fn func(txnif.AsyncTxn) error) { txn.Prepar func (txn *Txn) SetApplyCommitFn(fn func(txnif.AsyncTxn) error) { txn.ApplyCommitFn = fn } func (txn *Txn) SetApplyRollbackFn(fn func(txnif.AsyncTxn) error) { txn.ApplyRollbackFn = fn } -func (txn *Txn) Commit() (err error) { - if txn.Store.IsReadonly() { - txn.Mgr.DeleteTxn(txn.GetID()) - return nil +//The state transition of transaction is as follows: +// 1PC: TxnStateActive--->TxnStateCommitting--->TxnStateCommitted/TxnStateRollbacked +// TxnStateActive--->TxnStateRollbacking--->TxnStateRollbacked +// 2PC running on Coordinator: TxnStateActive--->TxnStatePreparing-->TxnStatePrepared +// -->TxnStateCommittingFinished--->TxnStateCommitted or +// TxnStateActive--->TxnStatePreparing-->TxnStatePrepared-->TxnStateRollbacked or +// TxnStateActive--->TxnStateRollbacking--->TxnStateRollbacked. +// 2PC running on Participant: TxnStateActive--->TxnStatePrepared-->TxnStateCommitted or +// TxnStateActive--->TxnStatePrepared-->TxnStateRollbacked or +// TxnStateActive--->TxnStateRollbacking-->TxnStateRollbacked. + +// Prepare is used to pre-commit a 2PC distributed transaction. +// Notice that once any error happened, we should rollback the txn. +// TODO: 1. How to handle the case in which log service timed out? +// 2. For a 2pc transaction, Rollback message may arrive before Prepare message, +// should handle this case by TxnStorage? +func (txn *Txn) Prepare() (err error) { + //TODO::should handle this by TxnStorage? + if txn.Mgr.GetTxn(txn.GetID()) == nil { + logutil.Warn("tae : txn is not found in TxnManager") + //txn.Err = ErrTxnNotFound + return ErrTxnNotFound + } + state := (txnif.TxnState)(atomic.LoadInt32((*int32)(&txn.State))) + if state != txnif.TxnStateActive { + logutil.Warnf("unexpected txn status : %s", txnif.TxnStrState(txn.State)) + txn.Err = ErrTxnStateNotActive + return txn.Err } txn.Add(1) err = txn.Mgr.OnOpTxn(&OpTxn{ Txn: txn, - Op: OpCommit, + Op: OpPrepare, }) // TxnManager is closed if err != nil { txn.SetError(err) txn.Lock() - //_ = txn.ToRollbackingLocked(txn.GetStartTS() + 1) _ = txn.ToRollbackingLocked(txn.GetStartTS().Next()) txn.Unlock() _ = txn.PrepareRollback() @@ -107,21 +142,68 @@ func (txn *Txn) Commit() (err error) { txn.DoneWithErr(err) } txn.Wait() - txn.Mgr.DeleteTxn(txn.GetID()) + if txn.Err == nil { + //txn.State = txnif.TxnStatePrepared + atomic.StoreInt32((*int32)(&txn.State), (int32)(txnif.TxnStatePrepared)) + } else { + //txn.Status = txnif.TxnStatusRollbacked + txn.Mgr.DeleteTxn(txn.GetID()) + } return txn.GetError() } -func (txn *Txn) GetStore() txnif.TxnStore { - return txn.Store -} - -func (txn *Txn) GetLSN() uint64 { return txn.LSN } - +// Rollback is used to roll back a 1PC or 2PC transaction. +// rollback's idempotent is handled here, Although Prepare/Commit/Committing message's idempotent +// is handled by the transaction framework. +// Notice that there may be a such scenario in which a 2PC distributed transaction in ACTIVE will be rollbacked, +// since Rollback message may arrive before the Prepare message. Should handle this case by TxnStorage? func (txn *Txn) Rollback() (err error) { + //TODO:idempotent for rollback should be guaranteed by TxnStoage? + if txn.Mgr.GetTxn(txn.GetID()) == nil { + logutil.Warn("tae : txn is not found in TxnManager") + return + } + + state := (txnif.TxnState)(atomic.LoadInt32((*int32)(&txn.State))) + if (!txn.Is2PC && state != txnif.TxnStateActive) || + txn.Is2PC && state != txnif.TxnStateActive && + state != txnif.TxnStatePrepared { + logutil.Warnf("unexpected txn status : %s", txnif.TxnStrState(txn.State)) + return ErrTxnStateCannotRollback + } if txn.Store.IsReadonly() { txn.Mgr.DeleteTxn(txn.GetID()) return } + //2PC + if txn.Is2PC { + if state == txnif.TxnStateActive { + txn.Add(1) + err = txn.Mgr.OnOpTxn(&OpTxn{ + Txn: txn, + Op: OpRollback, + }) + if err != nil { + _ = txn.PrepareRollback() + _ = txn.ApplyRollback() + txn.DoneWithErr(err) + } + txn.Wait() + //txn.Status = txnif.TxnStatusRollbacked + //atomic.StoreInt32((*int32)(&txn.State), (int32)(txnif.TxnStateRollbacked)) + txn.Mgr.DeleteTxn(txn.GetID()) + } + if state == txnif.TxnStatePrepared { + txn.Add(1) + txn.Ch <- EventRollback + //Wait txn rollbacked + txn.Wait() + //txn.Status = txnif.TxnStatusRollbacked + txn.Mgr.DeleteTxn(txn.GetID()) + } + return txn.GetError() + } + //1PC txn.Add(1) err = txn.Mgr.OnOpTxn(&OpTxn{ Txn: txn, @@ -133,11 +215,106 @@ func (txn *Txn) Rollback() (err error) { txn.DoneWithErr(err) } txn.Wait() + //txn.Status = txnif.TxnStatusRollbacked + txn.Mgr.DeleteTxn(txn.GetID()) + return txn.Err +} + +// Committing is used to record a log for 2PC distributed transaction running in Coordinator. +// Notice that transaction must be committed once committing message arrives, since Preparing +// had already succeeded. +func (txn *Txn) Committing() (err error) { + state := (txnif.TxnState)(atomic.LoadInt32((*int32)(&txn.State))) + if state != txnif.TxnStatePrepared { + logutil.Warnf("unexpected txn status : %s", txnif.TxnStrState(txn.State)) + //txn.Err = ErrTxnStatusNotPrepared + return ErrTxnStateNotPrepared + } + txn.Add(1) + txn.Ch <- EventCommitting + txn.Wait() + //txn.Status = txnif.TxnStatusCommittingFinished + atomic.StoreInt32((*int32)(&txn.State), (int32)(txnif.TxnStateCommittingFinished)) + return txn.Err +} + +// Commit is used to commit a 1PC or 2PC transaction running in Coordinator or running in Participant. +// Notice that the Commit of a 2PC transaction must be success once the commit message arrives, +// since Preparing had already succeeded. +func (txn *Txn) Commit() (err error) { + state := (txnif.TxnState)(atomic.LoadInt32((*int32)(&txn.State))) + if (!txn.Is2PC && state != txnif.TxnStateActive) || + txn.Is2PC && state != txnif.TxnStateCommittingFinished && + state != txnif.TxnStatePrepared { + logutil.Warnf("unexpected txn state : %s", txnif.TxnStrState(txn.State)) + //txn.Err = ErrTxnStatusCannotCommit + return ErrTxnStateCannotCommit + } + if txn.Store.IsReadonly() { + txn.Mgr.DeleteTxn(txn.GetID()) + return nil + } + if txn.Is2PC { + //It's a 2PC transaction running in Coordinator + if state == txnif.TxnStateCommittingFinished { + //TODO:Append committed log entry into log service asynchronously + // for checkpointing the committing log entry + //txn.SetError(txn.LogTxnEntry()) + if txn.Err == nil { + //txn.State = txnif.TxnStateCommitted + atomic.StoreInt32((*int32)(&txn.State), (int32)(txnif.TxnStateCommitted)) + } + txn.Mgr.DeleteTxn(txn.GetID()) + } + //It's a 2PC transaction running in Participant. + //Notice that Commit must be success once the commit message arrives, + //since Preparing had already succeeded. + if state == txnif.TxnStatePrepared { + txn.Add(1) + txn.Ch <- EventCommit + txn.Wait() + //txn.Status = txnif.TxnStatusCommitted + atomic.StoreInt32((*int32)(&txn.State), (int32)(txnif.TxnStateCommitted)) + txn.Mgr.DeleteTxn(txn.GetID()) + } + return txn.GetError() + } + //It's a 1PC transaction + txn.Add(1) + err = txn.Mgr.OnOpTxn(&OpTxn{ + Txn: txn, + Op: OpCommit, + }) + // TxnManager is closed + if err != nil { + txn.SetError(err) + txn.Lock() + _ = txn.ToRollbackingLocked(txn.GetStartTS().Next()) + txn.Unlock() + _ = txn.PrepareRollback() + _ = txn.ApplyRollback() + txn.DoneWithErr(err) + } + txn.Wait() + //if txn.Err == nil { + //txn.Status = txnif.TxnStatusCommitted + //} txn.Mgr.DeleteTxn(txn.GetID()) - err = txn.Err + return txn.GetError() +} + +func (txn *Txn) GetStore() txnif.TxnStore { + return txn.Store +} + +func (txn *Txn) GetLSN() uint64 { return txn.LSN } + +func (txn *Txn) Event() (e int) { + e = <-txn.Ch return } +// TODO::need to take 2PC txn account into. func (txn *Txn) DoneWithErr(err error) { txn.DoneCond.L.Lock() if err != nil { @@ -175,13 +352,48 @@ func (txn *Txn) PrepareCommit() (err error) { return err } +func (txn *Txn) Prepare2PCPrepare() (err error) { + logutil.Debugf("Prepare Committing %d", txn.ID) + if txn.Prepare2PCPrepareFn != nil { + if err = txn.Prepare2PCPrepareFn(txn); err != nil { + return + } + } + err = txn.Store.Prepare2PCPrepare() + return err +} + func (txn *Txn) PreApplyCommit() (err error) { err = txn.Store.PreApplyCommit() return } +func (txn *Txn) PreApply2PCPrepare() (err error) { + err = txn.Store.PreApply2PCPrepare() + return +} + +// Apply2PCPrepare apply preparing for a 2PC distributed transaction +func (txn *Txn) Apply2PCPrepare() (err error) { + defer func() { + //Get the lsn of ETTxnRecord entry in GroupC + txn.LSN = txn.Store.GetLSN() + if err != nil { + txn.Store.Close() + } + }() + if txn.ApplyPrepareFn != nil { + if err = txn.ApplyPrepareFn(txn); err != nil { + return + } + } + err = txn.Store.Apply2PCPrepare() + return +} + func (txn *Txn) ApplyCommit() (err error) { defer func() { + //Get the lsn of ETTxnRecord entry in GroupC. txn.LSN = txn.Store.GetLSN() if err == nil { err = txn.Store.Close() @@ -216,8 +428,8 @@ func (txn *Txn) ApplyRollback() (err error) { return } -func (txn *Txn) PreCommit() error { - return txn.Store.PreCommit() +func (txn *Txn) PreCommitOr2PCPrepare() error { + return txn.Store.PreCommitOr2PCPrepare() } func (txn *Txn) PrepareRollback() (err error) { diff --git a/pkg/vm/engine/tae/txn/txnbase/txnctx.go b/pkg/vm/engine/tae/txn/txnbase/txnctx.go index aa22f3d6fff6aac5be81b017c901255f1b052d72..70151ee3913f84c28278d8d2f092c36ace1e1d99 100644 --- a/pkg/vm/engine/tae/txn/txnbase/txnctx.go +++ b/pkg/vm/engine/tae/txn/txnbase/txnctx.go @@ -36,12 +36,13 @@ func IDCtxToID(buf []byte) uint64 { type TxnCtx struct { sync.RWMutex - DoneCond sync.Cond - ID uint64 - IDCtx []byte - StartTS, CommitTS types.TS - Info []byte - State txnif.TxnState + DoneCond sync.Cond + ID uint64 + IDCtx []byte + StartTS, CommitTS, PrepareTS types.TS + Info []byte + State txnif.TxnState + Is2PC bool } func NewTxnCtx(id uint64, start types.TS, info []byte) *TxnCtx { @@ -84,6 +85,11 @@ func (ctx *TxnCtx) GetCommitTS() types.TS { defer ctx.RUnlock() return ctx.CommitTS } +func (ctx *TxnCtx) GetPrepareTS() types.TS { + ctx.RLock() + defer ctx.RUnlock() + return ctx.PrepareTS +} // Atomically returns the current txn state func (ctx *TxnCtx) getTxnState() txnif.TxnState { @@ -134,6 +140,19 @@ func (ctx *TxnCtx) IsActiveLocked() bool { return ctx.CommitTS == txnif.UncommitTS } +func (ctx *TxnCtx) ToPreparingLocked(ts types.TS) error { + if ts.LessEq(ctx.StartTS) { + panic(fmt.Sprintf("start ts %d should be less than commit ts %d", ctx.StartTS, ts)) + } + if !ctx.CommitTS.Equal(txnif.UncommitTS) { + return ErrTxnNotActive + } + ctx.PrepareTS = ts + ctx.CommitTS = ts + ctx.State = txnif.TxnStatePreparing + return nil +} + func (ctx *TxnCtx) ToCommittingLocked(ts types.TS) error { if ts.LessEq(ctx.StartTS) { panic(fmt.Sprintf("start ts %d should be less than commit ts %d", ctx.StartTS, ts)) @@ -142,6 +161,7 @@ func (ctx *TxnCtx) ToCommittingLocked(ts types.TS) error { return ErrTxnNotActive } ctx.CommitTS = ts + ctx.PrepareTS = ts ctx.State = txnif.TxnStateCommitting return nil } @@ -162,6 +182,7 @@ func (ctx *TxnCtx) ToRollbackingLocked(ts types.TS) error { return ErrTxnCannotRollback } ctx.CommitTS = ts + ctx.PrepareTS = ts ctx.State = txnif.TxnStateRollbacking return nil } diff --git a/pkg/vm/engine/tae/txn/txnbase/txnmgr.go b/pkg/vm/engine/tae/txn/txnbase/txnmgr.go index 130a5e24ce3f15c2c1dead59bdcfa67cbd9c5dbf..565c2aed4b782e5f71a331cb064701a4ebbdf3ba 100644 --- a/pkg/vm/engine/tae/txn/txnbase/txnmgr.go +++ b/pkg/vm/engine/tae/txn/txnbase/txnmgr.go @@ -34,7 +34,10 @@ type TxnFactory = func(*TxnManager, txnif.TxnStore, uint64, types.TS, []byte) tx type TxnManager struct { sync.RWMutex common.ClosedState - sm.StateMachine + PreparingSM sm.StateMachine + //Notice that prepared transactions would be enqueued into + //the receiveQueue of CommittingSM at run time or replay time. + CommittingSM sm.StateMachine IDMap map[uint64]txnif.AsyncTxn IdAlloc *common.IdAlloctor TsAlloc *types.TsAlloctor @@ -59,8 +62,12 @@ func NewTxnManager(txnStoreFactory TxnStoreFactory, txnFactory TxnFactory, clock }), Exception: new(atomic.Value)} pqueue := sm.NewSafeQueue(20000, 1000, mgr.onPreparing) - cqueue := sm.NewSafeQueue(20000, 1000, mgr.onCommit) - mgr.StateMachine = sm.NewStateMachine(new(sync.WaitGroup), mgr, pqueue, cqueue) + fqueue := sm.NewSafeQueue(20000, 1000, mgr.onFlushing) + mgr.PreparingSM = sm.NewStateMachine(new(sync.WaitGroup), mgr, pqueue, fqueue) + + cqueue := sm.NewSafeQueue(20000, 1000, mgr.onCommitting) + cfqueue := sm.NewSafeQueue(20000, 1000, mgr.onCFlushing) + mgr.CommittingSM = sm.NewStateMachine(new(sync.WaitGroup), mgr, cqueue, cfqueue) return mgr } @@ -115,7 +122,6 @@ func (mgr *TxnManager) DeleteTxn(id uint64) { defer mgr.Unlock() txn := mgr.IDMap[id] delete(mgr.IDMap, id) - //mgr.ActiveMask.Remove(txn.GetStartTS()) mgr.Active.Delete(txn.GetStartTS()) } @@ -129,14 +135,29 @@ func (mgr *TxnManager) GetTxn(id uint64) txnif.AsyncTxn { return mgr.IDMap[id] } +func (mgr *TxnManager) EnqueueFlushing(op any) (err error) { + _, err = mgr.PreparingSM.EnqueueCheckpoint(op) + return +} + +func (mgr *TxnManager) EnqueueCommitting(op any) (err error) { + _, err = mgr.CommittingSM.EnqueueRecevied(op) + return +} + +func (mgr *TxnManager) EnqueueCFlushing(op any) (err error) { + _, err = mgr.CommittingSM.EnqueueCheckpoint(op) + return +} + func (mgr *TxnManager) OnOpTxn(op *OpTxn) (err error) { - _, err = mgr.EnqueueRecevied(op) + _, err = mgr.PreparingSM.EnqueueRecevied(op) return } -func (mgr *TxnManager) onPreCommit(txn txnif.AsyncTxn) { +func (mgr *TxnManager) onPreCommitOr2PCPrepare(txn txnif.AsyncTxn) { now := time.Now() - txn.SetError(txn.PreCommit()) + txn.SetError(txn.PreCommitOr2PCPrepare()) logutil.Debug("[PreCommit]", TxnField(txn), common.DurationField(time.Since(now))) } @@ -144,6 +165,10 @@ func (mgr *TxnManager) onPreparCommit(txn txnif.AsyncTxn) { txn.SetError(txn.PrepareCommit()) } +func (mgr *TxnManager) onPrepar2PCPrepare(txn txnif.AsyncTxn) { + txn.SetError(txn.Prepare2PCPrepare()) +} + func (mgr *TxnManager) onPreApplyCommit(txn txnif.AsyncTxn) { if err := txn.PreApplyCommit(); err != nil { txn.SetError(err) @@ -151,18 +176,31 @@ func (mgr *TxnManager) onPreApplyCommit(txn txnif.AsyncTxn) { } } +func (mgr *TxnManager) onPreApply2PCPrepare(txn txnif.AsyncTxn) { + if err := txn.PreApply2PCPrepare(); err != nil { + txn.SetError(err) + mgr.OnException(err) + } +} + func (mgr *TxnManager) onPreparRollback(txn txnif.AsyncTxn) { _ = txn.PrepareRollback() } -// TODO +// onPreparing the commit of 1PC txn and prepare of 2PC txn +// must both enter into this queue for conflict check. +// OpCommit : the commit of 1PC txn +// OpPrepare: the prepare of 2PC txn +// OPRollback:the rollback of 2PC or 1PC func (mgr *TxnManager) onPreparing(items ...any) { now := time.Now() for _, item := range items { op := item.(*OpTxn) - if op.Op == OpCommit { - mgr.onPreCommit(op.Txn) + if op.Op == OpCommit || op.Op == OpPrepare { + //Mainly do conflict check for 1PC Commit or 2PC Prepare + mgr.onPreCommitOr2PCPrepare(op.Txn) } + //Before this moment, all mvcc nodes of a txn has been pushed into the MVCCHandle. mgr.Lock() ts := mgr.TsAlloc.Alloc() op.Txn.Lock() @@ -175,9 +213,12 @@ func (mgr *TxnManager) onPreparing(items ...any) { } else if op.Op == OpRollback { // Should not fail here _ = op.Txn.ToRollbackingLocked(ts) + } else if op.Op == OpPrepare { + _ = op.Txn.ToPreparingLocked(ts) } op.Txn.Unlock() mgr.Unlock() + //for 1PC Commit if op.Op == OpCommit { mgr.onPreparCommit(op.Txn) if op.Txn.GetError() != nil { @@ -188,43 +229,150 @@ func (mgr *TxnManager) onPreparing(items ...any) { op.Txn.Unlock() mgr.onPreparRollback(op.Txn) } else { + //1. Appending the data into appendableNode of block + // 2. Collect redo log,append into WalDriver + //TODO::need to handle the error,instead of panic for simplicity mgr.onPreApplyCommit(op.Txn) + if op.Txn.GetError() != nil { + panic(op.Txn.GetID()) + } + } + //for 2PC Prepare + } else if op.Op == OpPrepare { + mgr.onPrepar2PCPrepare(op.Txn) + if op.Txn.GetError() != nil { + op.Op = OpRollback + op.Txn.Lock() + // Should not fail here + _ = op.Txn.ToRollbackingLocked(ts) + op.Txn.Unlock() + mgr.onPreparRollback(op.Txn) + } else { + //1.Appending the data into appendableNode of block + // 2. Collect redo log,append into WalDriver + //TODO::need to handle the error,instead of panic for simplicity + mgr.onPreApply2PCPrepare(op.Txn) + if op.Txn.GetError() != nil { + panic(op.Txn.GetID()) + } } } else { + //for 1PC or 2PC Rollback mgr.onPreparRollback(op.Txn) } - if _, err := mgr.EnqueueCheckpoint(op); err != nil { + if err := mgr.EnqueueFlushing(op); err != nil { panic(err) } } - logutil.Debug("[PrepareCommit]", + logutil.Debug("[onPreparing]", common.NameSpaceField("txns"), common.DurationField(time.Since(now)), common.CountField(len(items))) } -// TODO -func (mgr *TxnManager) onCommit(items ...any) { +func (mgr *TxnManager) onFlushing(items ...any) { var err error now := time.Now() for _, item := range items { op := item.(*OpTxn) switch op.Op { + //for 1PC Commit case OpCommit: if err = op.Txn.ApplyCommit(); err != nil { - mgr.OnException(err) - logutil.Warn("[ApplyCommit]", TxnField(op.Txn), common.ErrorField(err)) + panic(err) } + //for 2PC Prepare + case OpPrepare: + //wait for redo log synced. + if err = op.Txn.Apply2PCPrepare(); err != nil { + panic(err) + } + err = mgr.EnqueueCommitting(&OpTxn{ + Txn: op.Txn, + Op: OpInvalid, + }) + if err != nil { + panic(err) + } + //for 1PC or 2PC Rollback case OpRollback: if err = op.Txn.ApplyRollback(); err != nil { mgr.OnException(err) logutil.Warn("[ApplyRollback]", TxnField(op.Txn), common.ErrorField(err)) } } - // Here only wait the txn to be done. The err returned can be access via op.Txn.GetError() + // Here only notify the user txn have been done with err. + // The err returned can be access via op.Txn.GetError() _ = op.Txn.WaitDone(err) } - logutil.Debug("[Commit]", + logutil.Debug("[onFlushing]", + common.NameSpaceField("txns"), + common.CountField(len(items)), + common.DurationField(time.Since(now))) +} + +// wait for committing, commit, rollback events of 2PC distributed transactions +func (mgr *TxnManager) onCommitting(items ...any) { + var err error + now := time.Now() + for _, item := range items { + op := item.(OpTxn) + ev := op.Txn.Event() + switch ev { + //TODO:1. Set the commit ts passed by the Coordinator. + // 2. Apply Commit and append a committing log. + case EventCommitting: + //txn.LogTxnEntry() + + op.Op = OpCommitting + //TODO:1. Set the commit ts passed by the Coordinator. + // 2. Apply Commit and append a commit log + case EventCommit: + //txn.LogTxnEntry + op.Op = OpCommit + case EventRollback: + pts := op.Txn.GetPrepareTS() + op.Txn.Lock() + //FIXME::set commit ts of a rollbacking transaction to its prepare ts? + _ = op.Txn.ToRollbackingLocked(pts) + op.Txn.Unlock() + op.Op = OpRollback + //_ = op.Txn.ApplyRollback() + //_ = op.Txn.WaitDone(err) + + } + err = mgr.EnqueueCFlushing(op) + if err != nil { + panic(err) + } + } + logutil.Debug("[onCommitting]", + common.NameSpaceField("txns"), + common.CountField(len(items)), + common.DurationField(time.Since(now))) +} + +func (mgr *TxnManager) onCFlushing(items ...any) { + var err error + now := time.Now() + for _, item := range items { + op := item.(OpTxn) + switch op.Op { + //TODO::wait for commit log synced + case OpCommit: + + //TODO:wait for committing log synced. + case OpCommitting: + + case OpRollback: + //Notice that can't call op.Txn.PrepareRollback here, + //since data had been appended into the appendableNode of block + //_ = op.Txn.PrepareRollback() + _ = op.Txn.ApplyRollback() + } + _ = op.Txn.WaitDone(err) + } + logutil.Debug("[onCFlushing]", common.NameSpaceField("txns"), common.CountField(len(items)), common.DurationField(time.Since(now))) @@ -240,8 +388,14 @@ func (mgr *TxnManager) OnException(new error) { } } +func (mgr *TxnManager) Start() { + mgr.CommittingSM.Start() + mgr.PreparingSM.Start() +} + func (mgr *TxnManager) Stop() { - mgr.StateMachine.Stop() + mgr.PreparingSM.Stop() + mgr.CommittingSM.Stop() mgr.OnException(common.ErrClose) logutil.Info("[Stop]", TxnMgrField(mgr)) } diff --git a/pkg/vm/engine/tae/txn/txnimpl/store.go b/pkg/vm/engine/tae/txn/txnimpl/store.go index 95d6da04dd93c750e1b4a93644fb02ee4aad977b..f28f26e15235439e321b77a50d08ec43b2643f24 100644 --- a/pkg/vm/engine/tae/txn/txnimpl/store.go +++ b/pkg/vm/engine/tae/txn/txnimpl/store.go @@ -349,6 +349,9 @@ func (store *txnStore) SoftDeleteSegment(dbId uint64, id *common.ID) (err error) } func (store *txnStore) ApplyRollback() (err error) { + if store.cmdMgr.GetCSN() == 0 { + return + } for _, db := range store.dbs { if err = db.ApplyRollback(); err != nil { break @@ -357,6 +360,22 @@ func (store *txnStore) ApplyRollback() (err error) { return } +// ApplyPrepare apply preparing for a 2PC distributed transaction +func (store *txnStore) Apply2PCPrepare() (err error) { + for _, e := range store.logs { + if err = e.WaitDone(); err != nil { + return + } + e.Free() + } + for _, db := range store.dbs { + if err = db.Apply2PCPrepare(); err != nil { + break + } + } + return +} + func (store *txnStore) ApplyCommit() (err error) { for _, e := range store.logs { if err = e.WaitDone(); err != nil { @@ -372,9 +391,9 @@ func (store *txnStore) ApplyCommit() (err error) { return } -func (store *txnStore) PreCommit() (err error) { +func (store *txnStore) PreCommitOr2PCPrepare() (err error) { for _, db := range store.dbs { - if err = db.PreCommit(); err != nil { + if err = db.PreCommitOr2PCPrepare(); err != nil { return } } @@ -396,6 +415,21 @@ func (store *txnStore) PrepareCommit() (err error) { return } +func (store *txnStore) Prepare2PCPrepare() (err error) { + if store.warChecker != nil { + if err = store.warChecker.check(); err != nil { + return err + } + } + for _, db := range store.dbs { + if err = db.Prepare2PCPrepare(); err != nil { + break + } + } + + return +} + func (store *txnStore) PreApplyCommit() (err error) { now := time.Now() for _, db := range store.dbs { @@ -422,6 +456,33 @@ func (store *txnStore) PreApplyCommit() (err error) { return } +func (store *txnStore) PreApply2PCPrepare() (err error) { + now := time.Now() + for _, db := range store.dbs { + if err = db.PreApply2PCPrepare(); err != nil { + return + } + } + if err = store.CollectCmd(); err != nil { + return + } + + if store.cmdMgr.GetCSN() == 0 { + return + } + //TODO:How to distinguish prepare log of 2PC entry from commit log entry of 1PC? + //logEntry, err := store.cmdMgr.ApplyTxnRecord(store.txn.GetID(), type) + logEntry, err := store.cmdMgr.ApplyTxnRecord(store.txn.GetID()) + if err != nil { + return + } + if logEntry != nil { + store.logs = append(store.logs, logEntry) + } + logutil.Debugf("Txn-%d PrepareCommit Takes %s", store.txn.GetID(), time.Since(now)) + return +} + func (store *txnStore) CollectCmd() (err error) { dbs := make([]*txnDB, len(store.dbs)) for _, db := range store.dbs { diff --git a/pkg/vm/engine/tae/txn/txnimpl/table.go b/pkg/vm/engine/tae/txn/txnimpl/table.go index 243ff8b0340d481bcef35206e6d45187975727d3..d7f7bf27943978903474018be49a6651d7d0ac5d 100644 --- a/pkg/vm/engine/tae/txn/txnimpl/table.go +++ b/pkg/vm/engine/tae/txn/txnimpl/table.go @@ -540,7 +540,8 @@ func (tbl *txnTable) UncommittedRows() uint32 { return tbl.localSegment.Rows() } -func (tbl *txnTable) PreCommitDedup() (err error) { +// PreCommitOr2PCPrepareDedup do deduplication check for 1PC Commit or 2PC Prepare +func (tbl *txnTable) PreCommitOr2PCPrepareDedup() (err error) { if tbl.localSegment == nil || !tbl.schema.HasPK() { return } @@ -669,7 +670,7 @@ func (tbl *txnTable) ApplyAppend() (err error) { return } -func (tbl *txnTable) PreCommit() (err error) { +func (tbl *txnTable) PreCommitOr2PCPrepare() (err error) { if tbl.localSegment != nil { err = tbl.localSegment.PrepareApply() } @@ -685,10 +686,23 @@ func (tbl *txnTable) PrepareCommit() (err error) { return } +func (tbl *txnTable) Prepare2PCPrepare() (err error) { + for _, node := range tbl.txnEntries { + if err = node.Prepare2PCPrepare(); err != nil { + break + } + } + return +} + func (tbl *txnTable) PreApplyCommit() (err error) { return tbl.ApplyAppend() } +func (tbl *txnTable) PreApply2PCPrepare() (err error) { + return tbl.ApplyAppend() +} + func (tbl *txnTable) ApplyCommit() (err error) { csn := tbl.csnStart for _, node := range tbl.txnEntries { @@ -701,10 +715,12 @@ func (tbl *txnTable) ApplyCommit() (err error) { } func (tbl *txnTable) ApplyRollback() (err error) { + csn := tbl.csnStart for _, node := range tbl.txnEntries { - if err = node.ApplyRollback(); err != nil { + if err = node.ApplyRollback(tbl.store.cmdMgr.MakeLogIndex(csn)); err != nil { break } + csn++ } return } diff --git a/pkg/vm/engine/tae/txn/txnimpl/txndb.go b/pkg/vm/engine/tae/txn/txnimpl/txndb.go index 142aa60ea3edba18d8d5e68f09af4378e71f6600..f0848cc8a46be486325ce3fc2fbeacfabbf79e78 100644 --- a/pkg/vm/engine/tae/txn/txnimpl/txndb.go +++ b/pkg/vm/engine/tae/txn/txnimpl/txndb.go @@ -303,7 +303,7 @@ func (db *txnDB) SoftDeleteSegment(id *common.ID) (err error) { func (db *txnDB) ApplyRollback() (err error) { if db.createEntry != nil { - if err = db.createEntry.ApplyRollback(); err != nil { + if err = db.createEntry.ApplyRollback(db.store.cmdMgr.MakeLogIndex(db.ddlCSN)); err != nil { return } } @@ -313,13 +313,23 @@ func (db *txnDB) ApplyRollback() (err error) { } } if db.dropEntry != nil { - if err = db.dropEntry.ApplyRollback(); err != nil { + if err = db.dropEntry.ApplyRollback(db.store.cmdMgr.MakeLogIndex(db.ddlCSN)); err != nil { return } } return } +// ApplyPrepare apply preparing for a 2PC distributed transaction +func (db *txnDB) Apply2PCPrepare() (err error) { + now := time.Now() + for _, table := range db.tables { + table.WaitSynced() + } + logutil.Debugf("Txn-%d ApplyCommit Takes %s", db.store.txn.GetID(), time.Since(now)) + return +} + func (db *txnDB) ApplyCommit() (err error) { now := time.Now() for _, table := range db.tables { @@ -344,14 +354,14 @@ func (db *txnDB) ApplyCommit() (err error) { return } -func (db *txnDB) PreCommit() (err error) { +func (db *txnDB) PreCommitOr2PCPrepare() (err error) { for _, table := range db.tables { - if err = table.PreCommitDedup(); err != nil { + if err = table.PreCommitOr2PCPrepareDedup(); err != nil { return } } for _, table := range db.tables { - if err = table.PreCommit(); err != nil { + if err = table.PreCommitOr2PCPrepare(); err != nil { panic(err) } } @@ -381,6 +391,39 @@ func (db *txnDB) PrepareCommit() (err error) { return } +func (db *txnDB) Prepare2PCPrepare() (err error) { + now := time.Now() + if db.createEntry != nil { + if err = db.createEntry.Prepare2PCPrepare(); err != nil { + return + } + } + for _, table := range db.tables { + if err = table.Prepare2PCPrepare(); err != nil { + break + } + } + if db.dropEntry != nil { + if err = db.dropEntry.Prepare2PCPrepare(); err != nil { + return + } + } + + logutil.Debugf("Txn-%d PrepareCommit Takes %s", db.store.txn.GetID(), time.Since(now)) + + return +} + +func (db *txnDB) PreApply2PCPrepare() (err error) { + for _, table := range db.tables { + // table.ApplyAppend() + if err = table.PreApply2PCPrepare(); err != nil { + return + } + } + return +} + func (db *txnDB) PreApplyCommit() (err error) { for _, table := range db.tables { // table.ApplyAppend()