diff --git a/pkg/vm/engine/txn/test/bvt_test.go b/pkg/vm/engine/txn/test/bvt_test.go deleted file mode 100644 index ca9edea3cc8b54af6b3d84c7b7fd40bb7e1ced46..0000000000000000000000000000000000000000 --- a/pkg/vm/engine/txn/test/bvt_test.go +++ /dev/null @@ -1,109 +0,0 @@ -// Copyright 2022 Matrix Origin -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package testtxnengine - -import ( - "bytes" - "context" - "io/fs" - "os" - "path/filepath" - "testing" - "time" - - "github.com/stretchr/testify/assert" -) - -func testBVT(t *testing.T, dir string) { - - ctx, cancel := context.WithTimeout(context.Background(), time.Minute) - defer cancel() - - env, err := newEnv(ctx) - assert.Nil(t, err) - defer func() { - if err := env.Close(); err != nil { - t.Fatal(err) - } - }() - - session := env.NewSession() - - err = filepath.WalkDir(dir, func(path string, entry fs.DirEntry, err error) error { - if err != nil { - return err - } - if entry.IsDir() { - return nil - } - - content, err := os.ReadFile(path) - assert.Nil(t, err) - //fmt.Printf("FILE: %s\n", path) - - // trim comment lines - lines := bytes.Split(content, []byte("\n")) - filtered := lines[:0] - for _, line := range lines { - if bytes.HasPrefix(line, []byte("--")) { - continue - } - if bytes.HasPrefix(line, []byte("#")) { - continue - } - filtered = append(filtered, line) - } - content = bytes.Join(filtered, []byte("\n")) - - err = session.Exec(ctx, path, string(content)) - if err != nil { - panic(err) - } - - return nil - }) - assert.Nil(t, err) - -} - -func TestTPCH(t *testing.T) { - testBVT( - t, - filepath.Join( - "..", "..", "..", "..", "..", - "test", "cases", "benchmark", "tpch", - ), - ) -} - -func TestDML(t *testing.T) { - testBVT( - t, - filepath.Join( - "..", "..", "..", "..", "..", - "test", "cases", "dml", - ), - ) -} - -func TestBVT(t *testing.T) { - testBVT( - t, - filepath.Join( - "..", "..", "..", "..", "..", - "test", "cases", - ), - ) -} diff --git a/pkg/vm/engine/txn/test/engine_test.go b/pkg/vm/engine/txn/test/engine_test.go deleted file mode 100644 index 04ee1b49bddc1439e8c5ff8d5cf85ab2ab85e0b5..0000000000000000000000000000000000000000 --- a/pkg/vm/engine/txn/test/engine_test.go +++ /dev/null @@ -1,63 +0,0 @@ -// Copyright 2022 Matrix Origin -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package testtxnengine - -import ( - "context" - "testing" - "time" - - "github.com/stretchr/testify/assert" -) - -func TestEngine(t *testing.T) { - - ctx, cancel := context.WithTimeout(context.Background(), time.Minute) - defer cancel() - - env, err := newEnv(ctx) - assert.Nil(t, err) - defer func() { - if err := env.Close(); err != nil { - t.Fatal(err) - } - }() - - session := env.NewSession() - - err = session.Exec(ctx, "", ` - create table foo ( - a int primary key, - b int - ) - `) - assert.Nil(t, err) - - err = session.Exec(ctx, "", `insert into foo (a, b) values (1, 2)`) - assert.Nil(t, err) - - err = session.Exec(ctx, "", `select a, b from foo where b = 2`) - assert.Nil(t, err) - - err = session.Exec(ctx, "", `update foo set b = 3 where a = 1`) - assert.Nil(t, err) - - err = session.Exec(ctx, "", `delete from foo where a = 1`) - assert.Nil(t, err) - - err = session.Exec(ctx, "", `drop table foo`) - assert.Nil(t, err) - -} diff --git a/pkg/vm/engine/txn/test/env.go b/pkg/vm/engine/txn/test/env.go deleted file mode 100644 index 41f888b3396726421206b8485ee9659965ebcfaf..0000000000000000000000000000000000000000 --- a/pkg/vm/engine/txn/test/env.go +++ /dev/null @@ -1,94 +0,0 @@ -// Copyright 2022 Matrix Origin -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package testtxnengine - -import ( - "context" - "math" - "time" - - logservicepb "github.com/matrixorigin/matrixone/pkg/pb/logservice" - "github.com/matrixorigin/matrixone/pkg/txn/client" - "github.com/matrixorigin/matrixone/pkg/txn/clock" - txnengine "github.com/matrixorigin/matrixone/pkg/vm/engine/txn" -) - -const defaultDatabase = "db" - -type testEnv struct { - txnClient client.TxnClient - engine *txnengine.Engine - nodes []*Node - clock clock.Clock - sender *Sender -} - -func (t *testEnv) Close() error { - for _, node := range t.nodes { - if err := node.service.Close(true); err != nil { - return err - } - } - return nil -} - -func newEnv(ctx context.Context) (*testEnv, error) { - env := &testEnv{} - - sender := &Sender{ - env: env, - } - env.sender = sender - - env.clock = clock.NewHLCClock( - func() int64 { - return time.Now().Unix() - }, - math.MaxInt64, - ) - - env.txnClient = client.NewTxnClient(sender, - client.WithClock(env.clock), - ) - - env.nodes = []*Node{ - env.NewNode(1), - } - - env.engine = txnengine.New( - context.Background(), - new(txnengine.ShardToSingleStatic), - func() (details logservicepb.ClusterDetails, err error) { - for _, node := range env.nodes { - details.DNStores = append(details.DNStores, node.info) - } - return - }, - ) - - // create default database - op, err := env.txnClient.New() - if err != nil { - return nil, err - } - if err := env.engine.Create(ctx, defaultDatabase, op); err != nil { - return nil, err - } - if err := op.Commit(ctx); err != nil { - return nil, err - } - - return env, nil -} diff --git a/pkg/vm/engine/txn/test/execution.go b/pkg/vm/engine/txn/test/execution.go deleted file mode 100644 index ca19708f00b4a23977bf0215ff38eb56784c87bb..0000000000000000000000000000000000000000 --- a/pkg/vm/engine/txn/test/execution.go +++ /dev/null @@ -1,180 +0,0 @@ -// Copyright 2022 Matrix Origin -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package testtxnengine - -import ( - "context" - "errors" - - "github.com/matrixorigin/matrixone/pkg/sql/parsers/tree" - "github.com/matrixorigin/matrixone/pkg/sql/plan" - "github.com/matrixorigin/matrixone/pkg/vm/engine" - txnengine "github.com/matrixorigin/matrixone/pkg/vm/engine/txn" -) - -type Execution struct { - ctx context.Context - tx *Tx - stmt tree.Statement -} - -var _ plan.CompilerContext = new(Execution) - -func (e *Execution) Cost(obj *plan.ObjectRef, expr *plan.Expr) *plan.Cost { - return &plan.Cost{} -} - -func (e *Execution) DatabaseExists(name string) bool { - _, err := e.tx.session.env.engine.Database( - e.ctx, - name, - e.tx.operator, - ) - return err == nil -} - -func (e *Execution) DefaultDatabase() string { - return e.tx.session.currentDB -} - -func (e *Execution) GetRootSql() string { - return "" -} - -func (e *Execution) GetHideKeyDef(dbName string, tableName string) *plan.ColDef { - attrs, err := e.getTableAttrs(dbName, tableName) - if err != nil { - panic(err) - } - for i, attr := range attrs { - if attr.IsHidden { - return engineAttrToPlanColDef(i, attr) - } - } - return nil -} - -func (e *Execution) GetPrimaryKeyDef(dbName string, tableName string) (defs []*plan.ColDef) { - attrs, err := e.getTableAttrs(dbName, tableName) - if err != nil { - panic(err) - } - for i, attr := range attrs { - if !attr.Primary { - continue - } - defs = append(defs, engineAttrToPlanColDef(i, attr)) - } - return -} - -func (e *Execution) Resolve(schemaName string, tableName string) (objRef *plan.ObjectRef, tableDef *plan.TableDef) { - if schemaName == "" { - schemaName = e.tx.session.currentDB - } - - objRef = &plan.ObjectRef{ - SchemaName: schemaName, - ObjName: tableName, - } - - tableDef = &plan.TableDef{ - Name: tableName, - } - - attrs, err := e.getTableAttrs(schemaName, tableName) - var errDBNotFound txnengine.ErrDatabaseNotFound - if errors.As(err, &errDBNotFound) { - return nil, nil - } - var errRelNotFound txnengine.ErrRelationNotFound - if errors.As(err, &errRelNotFound) { - return nil, nil - } - if err != nil { - panic(err) - } - - for i, attr := range attrs { - - // return hidden columns for update or detete statement - if attr.IsHidden { - switch e.stmt.(type) { - case *tree.Update, *tree.Delete: - default: - continue - } - } - - tableDef.Cols = append(tableDef.Cols, engineAttrToPlanColDef(i, attr)) - } - - //TODO properties - //TODO view - - return -} - -func (e *Execution) ResolveVariable(varName string, isSystemVar bool, isGlobalVar bool) (interface{}, error) { - panic("unimplemented") -} - -func (e *Execution) getTableAttrs(dbName string, tableName string) (attrs []*engine.Attribute, err error) { - db, err := e.tx.session.env.engine.Database( - e.ctx, - dbName, - e.tx.operator, - ) - if err != nil { - return nil, err - } - table, err := db.Relation( - e.ctx, - tableName, - ) - if err != nil { - return nil, err - } - defs, err := table.TableDefs(e.ctx) - if err != nil { - return nil, err - } - for _, def := range defs { - attr, ok := def.(*engine.AttributeDef) - if !ok { - continue - } - attrs = append(attrs, &attr.Attr) - } - return -} - -func engineAttrToPlanColDef(idx int, attr *engine.Attribute) *plan.ColDef { - return &plan.ColDef{ - Name: attr.Name, - Typ: &plan.Type{ - Id: int32(attr.Type.Oid), - Nullable: attr.Default.NullAbility, - Width: attr.Type.Width, - Precision: attr.Type.Precision, - Size: attr.Type.Size, - Scale: attr.Type.Scale, - }, - Default: attr.Default, - Primary: attr.Primary, - Pkidx: int32(idx), - Comment: attr.Comment, - } -} diff --git a/pkg/vm/engine/txn/test/node.go b/pkg/vm/engine/txn/test/node.go deleted file mode 100644 index 29c7035c06756ccf4e6d06b10461c2ea55ee1889..0000000000000000000000000000000000000000 --- a/pkg/vm/engine/txn/test/node.go +++ /dev/null @@ -1,101 +0,0 @@ -// Copyright 2022 Matrix Origin -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package testtxnengine - -import ( - "fmt" - "time" - - "github.com/google/uuid" - logservicepb "github.com/matrixorigin/matrixone/pkg/pb/logservice" - "github.com/matrixorigin/matrixone/pkg/pb/metadata" - "github.com/matrixorigin/matrixone/pkg/testutil" - "github.com/matrixorigin/matrixone/pkg/txn/service" - txnstorage "github.com/matrixorigin/matrixone/pkg/txn/storage/txn" - "go.uber.org/zap" -) - -type Node struct { - info logservicepb.DNStore - // one node, one shard, one service - service service.TxnService - shard metadata.DNShard -} - -func (t *testEnv) NewNode(id uint64) *Node { - - shard := metadata.DNShard{ - DNShardRecord: metadata.DNShardRecord{ - ShardID: id, - LogShardID: id, - }, - ReplicaID: id, - Address: fmt.Sprintf("shard-%d", id), - } - - storage, err := txnstorage.New( - txnstorage.NewMemHandler(testutil.NewMheap(), txnstorage.IsolationPolicy{ - Read: txnstorage.ReadCommitted, - }), - ) - if err != nil { - panic(err) - } - - nodeInfo := logservicepb.DNStore{ - UUID: uuid.NewString(), - ServiceAddress: shard.Address, - State: logservicepb.NormalState, - Shards: []logservicepb.DNShardInfo{ - { - ShardID: id, - ReplicaID: id, - }, - }, - } - - loggerConfig := zap.Config{ - Level: zap.NewAtomicLevelAt(zap.InfoLevel), - Development: true, - Encoding: "console", - EncoderConfig: zap.NewDevelopmentEncoderConfig(), - OutputPaths: []string{"stderr"}, - ErrorOutputPaths: []string{"stderr"}, - } - logger, err := loggerConfig.Build() - if err != nil { - panic(err) - } - - service := service.NewTxnService( - logger, - shard, - storage, - t.sender, - t.clock, - time.Second*61, - ) - if err := service.Start(); err != nil { - panic(err) - } - - node := &Node{ - info: nodeInfo, - service: service, - shard: shard, - } - - return node -} diff --git a/pkg/vm/engine/txn/test/session.go b/pkg/vm/engine/txn/test/session.go deleted file mode 100644 index 779ae8416dc6206ad6022efabb045a3f47f343ec..0000000000000000000000000000000000000000 --- a/pkg/vm/engine/txn/test/session.go +++ /dev/null @@ -1,107 +0,0 @@ -// Copyright 2022 Matrix Origin -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package testtxnengine - -import ( - "context" - - "github.com/matrixorigin/matrixone/pkg/sql/parsers/dialect" - "github.com/matrixorigin/matrixone/pkg/sql/parsers/dialect/mysql" - "github.com/matrixorigin/matrixone/pkg/sql/parsers/tree" -) - -type Session struct { - env *testEnv - currentDB string - currentTx *Tx -} - -func (t *testEnv) NewSession() *Session { - session := &Session{ - env: t, - currentDB: defaultDatabase, - } - return session -} - -func (s *Session) Exec(ctx context.Context, filePath string, content string) (err error) { - - s.currentDB = defaultDatabase - - defer func() { - if err == nil { - return - } - - //sqlError, ok := err.(*errors.SqlError) - //if ok { - // fmt.Printf("ERROR: %s\n", sqlError.Error()) - // err = nil - //} - - //moError, ok := err.(*moerr.Error) - //if ok { - // fmt.Printf("ERROR: %s\n", moError.Error()) - // err = nil - //} - - //posErr, ok := err.(mysql.PositionedErr) - //if ok { - // fmt.Printf("ERROR: %s\n", content[posErr.Pos:]) - // err = nil - //} - - //TODO - //fmt.Printf("ERROR: %s\n", content[posErr.Pos:]) - err = nil - - }() - - stmts, err := mysql.Parse(content) - if err != nil { - return err - } - - for _, stmt := range stmts { - stmtText := tree.String(stmt, dialect.MYSQL) - //fmt.Printf("SQL: %s @%s\n", stmtText, filePath) - - if s.currentTx == nil { - tx, err := s.NewTx() - if err != nil { - return err - } - err = tx.Exec(ctx, stmtText, stmt) - if err != nil { - tx.Rollback(ctx) - return err - } else { - if err := tx.Commit(ctx); err != nil { - return err - } - } - continue - } - - if err := s.currentTx.Exec(ctx, stmtText, stmt); err != nil { - s.currentTx.Rollback(ctx) - s.currentTx = nil - return err - } - - } - - return nil -} diff --git a/pkg/vm/engine/txn/test/tx.go b/pkg/vm/engine/txn/test/tx.go deleted file mode 100644 index 746a5688f8fd2fda6d43e2b06814ca66a4276f73..0000000000000000000000000000000000000000 --- a/pkg/vm/engine/txn/test/tx.go +++ /dev/null @@ -1,127 +0,0 @@ -// Copyright 2022 Matrix Origin -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package testtxnengine - -import ( - "context" - "time" - - "github.com/matrixorigin/matrixone/pkg/container/batch" - "github.com/matrixorigin/matrixone/pkg/sql/compile" - "github.com/matrixorigin/matrixone/pkg/sql/parsers/tree" - "github.com/matrixorigin/matrixone/pkg/sql/plan" - "github.com/matrixorigin/matrixone/pkg/testutil" - "github.com/matrixorigin/matrixone/pkg/txn/client" -) - -type Tx struct { - operator client.TxnOperator - session *Session -} - -func (s *Session) NewTx() (*Tx, error) { - operator, err := s.env.txnClient.New() - if err != nil { - return nil, err - } - tx := &Tx{ - operator: operator, - session: s, - } - return tx, nil -} - -func (t *Tx) Exec(ctx context.Context, stmtText string, stmt tree.Statement) (err error) { - - switch stmt := stmt.(type) { - - case *tree.Use: - t.session.currentDB = stmt.Name - return - - case *tree.ExplainStmt: - //TODO - return - - } - - proc := testutil.NewProcess() - proc.TxnOperator = t.operator - proc.SessionInfo.TimeZone = time.Local - compileCtx := compile.New( - t.session.currentDB, - stmtText, - "", - ctx, - t.session.env.engine, - proc, - stmt, - ) - - exec := &Execution{ - ctx: ctx, - tx: t, - stmt: stmt, - } - - var execPlan *plan.Plan - switch stmt := stmt.(type) { - - case *tree.Select, - *tree.ParenSelect, - *tree.Update, - *tree.Delete: - optimizer := plan.NewBaseOptimizer(exec) - query, err := optimizer.Optimize(stmt) - if err != nil { - return err - } - execPlan = &plan.Plan{ - Plan: &plan.Plan_Query{ - Query: query, - }, - } - - default: - var err error - execPlan, err = plan.BuildPlan(exec, stmt) - if err != nil { - return err - } - } - - err = compileCtx.Compile(execPlan, nil, func(i any, batch *batch.Batch) error { - //fmt.Printf("%v\n", batch) //TODO - return nil - }) - if err != nil { - return err - } - - err = compileCtx.Run(0) - if err != nil { - return err - } - - return nil -} - -func (t *Tx) Commit(ctx context.Context) error { - return t.operator.Commit(ctx) -} - -func (t *Tx) Rollback(ctx context.Context) error { - return t.operator.Rollback(ctx) -} diff --git a/pkg/vm/engine/txn/test/txn_sender.go b/pkg/vm/engine/txn/test/txn_sender.go deleted file mode 100644 index 5fd38d90f82d59aee09a8a3926b252230eeadc5e..0000000000000000000000000000000000000000 --- a/pkg/vm/engine/txn/test/txn_sender.go +++ /dev/null @@ -1,78 +0,0 @@ -// Copyright 2022 Matrix Origin -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package testtxnengine - -import ( - "context" - "fmt" - - "github.com/matrixorigin/matrixone/pkg/pb/txn" - "github.com/matrixorigin/matrixone/pkg/txn/rpc" -) - -type Sender struct { - env *testEnv -} - -var _ rpc.TxnSender = new(Sender) - -func (s *Sender) Send(ctx context.Context, reqs []txn.TxnRequest) (*rpc.SendResult, error) { - result := &rpc.SendResult{} - - for _, req := range reqs { - req := req - - targetDN := req.GetTargetDN() - if targetDN.IsEmpty() { - panic(fmt.Errorf("target DN not specified: %+v", req)) - } - - var node *Node - for _, n := range s.env.nodes { - if n.shard.Equal(targetDN) { - node = n - break - } - } - if node == nil { - return nil, fmt.Errorf("node not found") - } - - fn := map[txn.TxnMethod]func(context.Context, *txn.TxnRequest, *txn.TxnResponse) error{ - txn.TxnMethod_Read: node.service.Read, - txn.TxnMethod_Write: node.service.Write, - txn.TxnMethod_Commit: node.service.Commit, - txn.TxnMethod_Rollback: node.service.Rollback, - txn.TxnMethod_Prepare: node.service.Prepare, - txn.TxnMethod_GetStatus: node.service.GetStatus, - txn.TxnMethod_CommitDNShard: node.service.CommitDNShard, - txn.TxnMethod_RollbackDNShard: node.service.RollbackDNShard, - }[req.Method] - - var resp txn.TxnResponse - if err := fn(ctx, &req, &resp); err != nil { - return nil, err - } - resp.Txn = &req.Txn - result.Responses = append(result.Responses, resp) - - } - - return result, nil -} - -func (s *Sender) Close() error { - return nil -}