Skip to content
Snippets Groups Projects
Unverified Commit 63a75c8a authored by nnsgmsone's avatar nnsgmsone Committed by GitHub
Browse files

Add empty workspace for cn (#4759)

Approved by: @reusee
parent d5a2c03d
No related branches found
No related tags found
No related merge requests found
......@@ -18,15 +18,12 @@ import (
"context"
"time"
"github.com/matrixorigin/matrixone/pkg/common/moerr"
"github.com/matrixorigin/matrixone/pkg/pb/logservice"
"github.com/matrixorigin/matrixone/pkg/txn/client"
"github.com/matrixorigin/matrixone/pkg/vm/engine"
)
type Engine struct {
getClusterDetails GetClusterDetailsFunc
}
type GetClusterDetailsFunc = func() (logservice.ClusterDetails, error)
func New(
......@@ -35,31 +32,63 @@ func New(
) *Engine {
return &Engine{
getClusterDetails: getClusterDetails,
txns: make(map[string]*Transaction),
}
}
var _ engine.Engine = new(Engine)
func (e *Engine) Create(context.Context, string, client.TxnOperator) error {
func (e *Engine) Create(ctx context.Context, name string, op client.TxnOperator) error {
//TODO
e.newTransaction(op)
panic("unimplemented")
}
func (e *Engine) Database(context.Context, string, client.TxnOperator) (engine.Database, error) {
func (e *Engine) Database(ctx context.Context, name string, op client.TxnOperator) (engine.Database, error) {
//TODO
e.newTransaction(op)
panic("unimplemented")
}
func (e *Engine) Databases(context.Context, client.TxnOperator) ([]string, error) {
func (e *Engine) Databases(ctx context.Context, op client.TxnOperator) ([]string, error) {
//TODO
e.newTransaction(op)
panic("unimplemented")
}
func (e *Engine) Delete(context.Context, string, client.TxnOperator) error {
func (e *Engine) Delete(ctx context.Context, name string, op client.TxnOperator) error {
//TODO
e.newTransaction(op)
panic("unimplemented")
}
// hasConflict used to detect if a transaction on a cn is in conflict,
// currently an empty implementation, assuming all transactions on a cn are conflict free
func (e *Engine) HasConflict(txn *Transaction) bool {
return false
}
func (e *Engine) PreCommit(ctx context.Context, op client.TxnOperator) error {
txn := e.getTransaction(op)
if txn == nil {
return moerr.New(moerr.ErrTxnClosed, "the transaction has been committed or aborted")
}
defer e.delTransaction(txn)
if e.HasConflict(txn) {
return moerr.New(moerr.ErrTxnWriteConflict, "write conflict")
}
return nil
}
func (e *Engine) Rollback(ctx context.Context, op client.TxnOperator) error {
txn := e.getTransaction(op)
if txn == nil {
return moerr.New(moerr.ErrTxnClosed, "the transaction has been committed or aborted")
}
defer e.delTransaction(txn)
return nil
}
func (e *Engine) Nodes() (engine.Nodes, error) {
clusterDetails, err := e.getClusterDetails()
if err != nil {
......@@ -82,3 +111,31 @@ func (e *Engine) Hints() (h engine.Hints) {
h.CommitOrRollbackTimeout = time.Minute * 5
return
}
func (e *Engine) newTransaction(op client.TxnOperator) {
txn := &Transaction{
readOnly: false,
meta: op.Txn(),
}
e.addTransaction(txn)
}
func (e *Engine) getTransaction(op client.TxnOperator) *Transaction {
e.Lock()
defer e.Unlock()
return e.txns[string(op.Txn().ID)]
}
func (e *Engine) addTransaction(txn *Transaction) {
e.Lock()
defer e.Unlock()
if _, ok := e.txns[string(txn.meta.ID)]; !ok {
e.txns[string(txn.meta.ID)] = txn
}
}
func (e *Engine) delTransaction(txn *Transaction) {
e.Lock()
defer e.Unlock()
delete(e.txns, string(txn.meta.ID))
}
// Copyright 2022 Matrix Origin
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package disttae
import (
"github.com/matrixorigin/matrixone/pkg/container/batch"
)
// detecting whether a transaction is a read-only transaction
func (txn *Transaction) ReadOnly() bool {
return txn.readOnly
}
// use for solving halloween problem
func (txn *Transaction) SetStatementId(id [2]uint64) {
txn.statementId = id
}
// Write used to write data to the transaction buffer
// insert/delete/update all use this api
func (txn *Transaction) WriteBatch(bat *batch.Batch) error {
txn.readOnly = true
return nil
}
// WriteFile used to add a s3 file information to the transaction buffer
// insert/delete/update all use this api
func (txn *Transaction) WriteFile(file string) error {
txn.readOnly = true
return nil
}
// Copyright 2022 Matrix Origin
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package disttae
import (
"sync"
"github.com/matrixorigin/matrixone/pkg/pb/txn"
)
type Engine struct {
sync.Mutex
getClusterDetails GetClusterDetailsFunc
txns map[string]*Transaction
}
// Transaction represents a transaction
type Transaction struct {
// readOnly default value is true, once a write happen, then set to false
readOnly bool
statementId [2]uint64
meta txn.TxnMeta
}
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment