From 23e781f29b286e808813143dc6752ba2fd3f1112 Mon Sep 17 00:00:00 2001 From: reusee <reusee@gmail.com> Date: Fri, 26 Aug 2022 14:31:14 +0800 Subject: [PATCH] vm/engine: add Engine.Hints (#4722) vm/engine: add Engine.Hints Approved by: @nnsgmsone, @XuPeng-SH, @daviszhen --- pkg/frontend/load_test.go | 9 +++++ pkg/frontend/mysql_cmd_executor_test.go | 10 ++++++ pkg/frontend/session.go | 10 ++++++ pkg/frontend/session_test.go | 14 ++++++++ pkg/frontend/test/engine_mock.go | 46 ++++++++++++++++--------- pkg/vm/engine/disttae/engine.go | 6 ++++ pkg/vm/engine/memEngine/engine.go | 6 ++++ pkg/vm/engine/tae/moengine/engine.go | 6 ++++ pkg/vm/engine/txn/engine.go | 6 ++++ pkg/vm/engine/types.go | 26 +++++++++++--- 10 files changed, 118 insertions(+), 21 deletions(-) diff --git a/pkg/frontend/load_test.go b/pkg/frontend/load_test.go index fbac5bd7b..04072b3e1 100644 --- a/pkg/frontend/load_test.go +++ b/pkg/frontend/load_test.go @@ -43,6 +43,9 @@ func Test_readTextFile(t *testing.T) { defer ctrl.Finish() eng := mock_frontend.NewMockTxnEngine(ctrl) + eng.EXPECT().Hints().Return(engine.Hints{ + CommitOrRollbackTimeout: time.Second, + }).AnyTimes() txn := mock_frontend.NewMockTxn(ctrl) txn.EXPECT().GetCtx().Return(nil).AnyTimes() txn.EXPECT().Commit().Return(nil).AnyTimes() @@ -156,6 +159,9 @@ func Test_readTextFile(t *testing.T) { defer ctrl.Finish() eng := mock_frontend.NewMockTxnEngine(ctrl) + eng.EXPECT().Hints().Return(engine.Hints{ + CommitOrRollbackTimeout: time.Second, + }).AnyTimes() txn := mock_frontend.NewMockTxn(ctrl) txn.EXPECT().GetCtx().Return(nil).AnyTimes() txn.EXPECT().GetID().Return(uint64(0)).AnyTimes() @@ -339,6 +345,9 @@ func Test_readTextFile(t *testing.T) { defer ctrl.Finish() eng := mock_frontend.NewMockTxnEngine(ctrl) + eng.EXPECT().Hints().Return(engine.Hints{ + CommitOrRollbackTimeout: time.Second, + }).AnyTimes() txn := mock_frontend.NewMockTxn(ctrl) txn.EXPECT().GetCtx().Return(nil).AnyTimes() txn.EXPECT().Commit().Return(nil).AnyTimes() diff --git a/pkg/frontend/mysql_cmd_executor_test.go b/pkg/frontend/mysql_cmd_executor_test.go index b3af8cca6..da91bea25 100644 --- a/pkg/frontend/mysql_cmd_executor_test.go +++ b/pkg/frontend/mysql_cmd_executor_test.go @@ -18,6 +18,7 @@ import ( "context" "fmt" "testing" + "time" "github.com/fagongzi/goetty/v2/buf" "github.com/golang/mock/gomock" @@ -49,6 +50,9 @@ func Test_mce(t *testing.T) { defer ctrl.Finish() eng := mock_frontend.NewMockEngine(ctrl) + eng.EXPECT().Hints().Return(engine.Hints{ + CommitOrRollbackTimeout: time.Second, + }).AnyTimes() txnOperator := mock_frontend.NewMockTxnOperator(ctrl) eng.EXPECT().Database(ctx, gomock.Any(), txnOperator).Return(nil, nil).AnyTimes() @@ -264,6 +268,9 @@ func Test_mce_selfhandle(t *testing.T) { defer ctrl.Finish() eng := mock_frontend.NewMockEngine(ctrl) + eng.EXPECT().Hints().Return(engine.Hints{ + CommitOrRollbackTimeout: time.Second, + }).AnyTimes() cnt := 0 eng.EXPECT().Database(ctx, gomock.Any(), gomock.Any()).DoAndReturn( func(ctx2 context.Context, db string, dump interface{}) (engine.Database, error) { @@ -892,6 +899,9 @@ func Test_CMD_FIELD_LIST(t *testing.T) { table.EXPECT().TableDefs(ctx).Return(defs, nil).AnyTimes() eng.EXPECT().Database(ctx, gomock.Any(), nil).Return(db, nil).AnyTimes() + eng.EXPECT().Hints().Return(engine.Hints{ + CommitOrRollbackTimeout: time.Second, + }).AnyTimes() txnOperator := mock_frontend.NewMockTxnOperator(ctrl) txnOperator.EXPECT().Commit(ctx).Return(nil).AnyTimes() diff --git a/pkg/frontend/session.go b/pkg/frontend/session.go index 1debe9117..71fb39edb 100644 --- a/pkg/frontend/session.go +++ b/pkg/frontend/session.go @@ -639,6 +639,11 @@ func (th *TxnHandler) CommitTxn() error { if ctx == nil { panic("context should not be nil") } + ctx, cancel := context.WithTimeout( + ctx, + th.storage.Hints().CommitOrRollbackTimeout, + ) + defer cancel() err := th.txn.Commit(ctx) th.SetInvalid() return err @@ -652,6 +657,11 @@ func (th *TxnHandler) RollbackTxn() error { if ctx == nil { panic("context should not be nil") } + ctx, cancel := context.WithTimeout( + ctx, + th.storage.Hints().CommitOrRollbackTimeout, + ) + defer cancel() err := th.txn.Rollback(ctx) th.SetInvalid() return err diff --git a/pkg/frontend/session_test.go b/pkg/frontend/session_test.go index 1577079a2..cdad6f043 100644 --- a/pkg/frontend/session_test.go +++ b/pkg/frontend/session_test.go @@ -18,9 +18,11 @@ import ( "context" "errors" "testing" + "time" "github.com/matrixorigin/matrixone/pkg/config" "github.com/matrixorigin/matrixone/pkg/txn/client" + "github.com/matrixorigin/matrixone/pkg/vm/engine" "github.com/fagongzi/goetty/v2/buf" "github.com/golang/mock/gomock" @@ -49,6 +51,9 @@ func TestTxnHandler_NewTxn(t *testing.T) { } }).AnyTimes() eng := mock_frontend.NewMockEngine(ctrl) + eng.EXPECT().Hints().Return(engine.Hints{ + CommitOrRollbackTimeout: time.Second, + }).AnyTimes() txn := InitTxnHandler(eng, txnClient) txn.ses = &Session{ @@ -83,6 +88,9 @@ func TestTxnHandler_CommitTxn(t *testing.T) { txnClient := mock_frontend.NewMockTxnClient(ctrl) eng := mock_frontend.NewMockEngine(ctrl) + eng.EXPECT().Hints().Return(engine.Hints{ + CommitOrRollbackTimeout: time.Second, + }).AnyTimes() txnClient.EXPECT().New().Return(txnOperator, nil).AnyTimes() @@ -121,6 +129,9 @@ func TestTxnHandler_RollbackTxn(t *testing.T) { txnClient := mock_frontend.NewMockTxnClient(ctrl) eng := mock_frontend.NewMockEngine(ctrl) + eng.EXPECT().Hints().Return(engine.Hints{ + CommitOrRollbackTimeout: time.Second, + }).AnyTimes() txnClient.EXPECT().New().Return(txnOperator, nil).AnyTimes() @@ -485,6 +496,9 @@ func TestSession_TxnCompilerContext(t *testing.T) { txnClient := mock_frontend.NewMockTxnClient(ctrl) txnClient.EXPECT().New().Return(txnOperator, nil).AnyTimes() eng := mock_frontend.NewMockEngine(ctrl) + eng.EXPECT().Hints().Return(engine.Hints{ + CommitOrRollbackTimeout: time.Second, + }).AnyTimes() db := mock_frontend.NewMockDatabase(ctrl) db.EXPECT().Relations(gomock.Any()).Return(nil, nil).AnyTimes() diff --git a/pkg/frontend/test/engine_mock.go b/pkg/frontend/test/engine_mock.go index 4b44fdb03..13abf49f9 100644 --- a/pkg/frontend/test/engine_mock.go +++ b/pkg/frontend/test/engine_mock.go @@ -471,61 +471,75 @@ func (m *MockEngine) EXPECT() *MockEngineMockRecorder { } // Create mocks base method. -func (m *MockEngine) Create(arg0 context.Context, arg1 string, arg2 client.TxnOperator) error { +func (m *MockEngine) Create(ctx context.Context, databaseName string, op client.TxnOperator) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Create", arg0, arg1, arg2) + ret := m.ctrl.Call(m, "Create", ctx, databaseName, op) ret0, _ := ret[0].(error) return ret0 } // Create indicates an expected call of Create. -func (mr *MockEngineMockRecorder) Create(arg0, arg1, arg2 interface{}) *gomock.Call { +func (mr *MockEngineMockRecorder) Create(ctx, databaseName, op interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Create", reflect.TypeOf((*MockEngine)(nil).Create), arg0, arg1, arg2) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Create", reflect.TypeOf((*MockEngine)(nil).Create), ctx, databaseName, op) } // Database mocks base method. -func (m *MockEngine) Database(arg0 context.Context, arg1 string, arg2 client.TxnOperator) (engine.Database, error) { +func (m *MockEngine) Database(ctx context.Context, databaseName string, op client.TxnOperator) (engine.Database, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Database", arg0, arg1, arg2) + ret := m.ctrl.Call(m, "Database", ctx, databaseName, op) ret0, _ := ret[0].(engine.Database) ret1, _ := ret[1].(error) return ret0, ret1 } // Database indicates an expected call of Database. -func (mr *MockEngineMockRecorder) Database(arg0, arg1, arg2 interface{}) *gomock.Call { +func (mr *MockEngineMockRecorder) Database(ctx, databaseName, op interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Database", reflect.TypeOf((*MockEngine)(nil).Database), arg0, arg1, arg2) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Database", reflect.TypeOf((*MockEngine)(nil).Database), ctx, databaseName, op) } // Databases mocks base method. -func (m *MockEngine) Databases(arg0 context.Context, arg1 client.TxnOperator) ([]string, error) { +func (m *MockEngine) Databases(ctx context.Context, op client.TxnOperator) ([]string, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Databases", arg0, arg1) + ret := m.ctrl.Call(m, "Databases", ctx, op) ret0, _ := ret[0].([]string) ret1, _ := ret[1].(error) return ret0, ret1 } // Databases indicates an expected call of Databases. -func (mr *MockEngineMockRecorder) Databases(arg0, arg1 interface{}) *gomock.Call { +func (mr *MockEngineMockRecorder) Databases(ctx, op interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Databases", reflect.TypeOf((*MockEngine)(nil).Databases), arg0, arg1) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Databases", reflect.TypeOf((*MockEngine)(nil).Databases), ctx, op) } // Delete mocks base method. -func (m *MockEngine) Delete(arg0 context.Context, arg1 string, arg2 client.TxnOperator) error { +func (m *MockEngine) Delete(ctx context.Context, databaseName string, op client.TxnOperator) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Delete", arg0, arg1, arg2) + ret := m.ctrl.Call(m, "Delete", ctx, databaseName, op) ret0, _ := ret[0].(error) return ret0 } // Delete indicates an expected call of Delete. -func (mr *MockEngineMockRecorder) Delete(arg0, arg1, arg2 interface{}) *gomock.Call { +func (mr *MockEngineMockRecorder) Delete(ctx, databaseName, op interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Delete", reflect.TypeOf((*MockEngine)(nil).Delete), ctx, databaseName, op) +} + +// Hints mocks base method. +func (m *MockEngine) Hints() engine.Hints { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Hints") + ret0, _ := ret[0].(engine.Hints) + return ret0 +} + +// Hints indicates an expected call of Hints. +func (mr *MockEngineMockRecorder) Hints() *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Delete", reflect.TypeOf((*MockEngine)(nil).Delete), arg0, arg1, arg2) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Hints", reflect.TypeOf((*MockEngine)(nil).Hints)) } // Nodes mocks base method. diff --git a/pkg/vm/engine/disttae/engine.go b/pkg/vm/engine/disttae/engine.go index 30f9ed15b..aca8551d7 100644 --- a/pkg/vm/engine/disttae/engine.go +++ b/pkg/vm/engine/disttae/engine.go @@ -16,6 +16,7 @@ package disttae import ( "context" + "time" "github.com/matrixorigin/matrixone/pkg/pb/logservice" "github.com/matrixorigin/matrixone/pkg/txn/client" @@ -76,3 +77,8 @@ func (e *Engine) Nodes() (engine.Nodes, error) { return nodes, nil } + +func (e *Engine) Hints() (h engine.Hints) { + h.CommitOrRollbackTimeout = time.Minute * 5 + return +} diff --git a/pkg/vm/engine/memEngine/engine.go b/pkg/vm/engine/memEngine/engine.go index fb06c35b1..611893cb9 100644 --- a/pkg/vm/engine/memEngine/engine.go +++ b/pkg/vm/engine/memEngine/engine.go @@ -17,6 +17,7 @@ package memEngine import ( "context" "fmt" + "time" "github.com/matrixorigin/matrixone/pkg/pb/plan" "github.com/matrixorigin/matrixone/pkg/txn/client" @@ -111,3 +112,8 @@ func (e *MemEngine) Cost(_ *plan.ObjectRef, _ *plan.Expr) *plan.Cost { func (e *MemEngine) GetRootSql() string { return "" } + +func (e *MemEngine) Hints() (h engine.Hints) { + h.CommitOrRollbackTimeout = time.Second + return +} diff --git a/pkg/vm/engine/tae/moengine/engine.go b/pkg/vm/engine/tae/moengine/engine.go index 185274932..5937c45a9 100644 --- a/pkg/vm/engine/tae/moengine/engine.go +++ b/pkg/vm/engine/tae/moengine/engine.go @@ -16,6 +16,7 @@ package moengine import ( "context" + "time" "github.com/matrixorigin/matrixone/pkg/txn/client" "github.com/matrixorigin/matrixone/pkg/vm/engine" @@ -87,3 +88,8 @@ func (e *txnEngine) Nodes() (engine.Nodes, error) { func (e *txnEngine) StartTxn(info []byte) (txn Txn, err error) { return e.impl.StartTxn(info) } + +func (e *txnEngine) Hints() (h engine.Hints) { + h.CommitOrRollbackTimeout = time.Minute + return +} diff --git a/pkg/vm/engine/txn/engine.go b/pkg/vm/engine/txn/engine.go index 9484aed66..34dd452de 100644 --- a/pkg/vm/engine/txn/engine.go +++ b/pkg/vm/engine/txn/engine.go @@ -17,6 +17,7 @@ package txnengine import ( "context" "strings" + "time" logservicepb "github.com/matrixorigin/matrixone/pkg/pb/logservice" "github.com/matrixorigin/matrixone/pkg/txn/client" @@ -151,3 +152,8 @@ func (e *Engine) Nodes() (engine.Nodes, error) { return nodes, nil } + +func (e *Engine) Hints() (h engine.Hints) { + h.CommitOrRollbackTimeout = time.Minute * 5 + return +} diff --git a/pkg/vm/engine/types.go b/pkg/vm/engine/types.go index 440afb83f..702a1e660 100644 --- a/pkg/vm/engine/types.go +++ b/pkg/vm/engine/types.go @@ -16,6 +16,7 @@ package engine import ( "context" + "time" "github.com/matrixorigin/matrixone/pkg/compress" "github.com/matrixorigin/matrixone/pkg/container/batch" @@ -161,12 +162,27 @@ type Database interface { } type Engine interface { - Delete(context.Context, string, client.TxnOperator) error + // Delete deletes a database + Delete(ctx context.Context, databaseName string, op client.TxnOperator) error - Create(context.Context, string, client.TxnOperator) error // Create Database - (name, engine type) + // Create creates a database + Create(ctx context.Context, databaseName string, op client.TxnOperator) error - Databases(context.Context, client.TxnOperator) ([]string, error) - Database(context.Context, string, client.TxnOperator) (Database, error) + // Databases returns all database names + Databases(ctx context.Context, op client.TxnOperator) (databaseNames []string, err error) - Nodes() (Nodes, error) + // Database creates a handle for a database + Database(ctx context.Context, databaseName string, op client.TxnOperator) (Database, error) + + // Nodes returns all nodes for worker jobs + Nodes() (cnNodes Nodes, err error) + + // Hints returns hints of engine features + // return value should not be cached + // since implementations may update hints after engine had initialized + Hints() Hints +} + +type Hints struct { + CommitOrRollbackTimeout time.Duration } -- GitLab