From 1da902e42c9f5eb3b901b650ecb50417bd5a5181 Mon Sep 17 00:00:00 2001 From: reusee <reusee@gmail.com> Date: Wed, 17 Aug 2022 21:55:25 +0800 Subject: [PATCH] txnengine, txnstorage: various changes (#4553) txnengine: add Errors and extract errors in doTxnRequest txnengine: add Errors.As; create default database; fix Execution.Resolve txnengine: add MemHandler.mheap; fix MemHandler.HandleRead txnengine: add TestTPCH txnengine: add Tx.databaseName txnengine: add file path param to Tx.Exec txnengine: add typed errors txnengine: fix Execution.Resolve txnengine: fix Tx.Exec to execute all statements txnengine: fix engineAttrToPlanColDef txnengine: fix plan types gob registeration txnengine: gob register plan types txnengine: save database and table name in lowercase; set default schema name in Execution.Resolve txnengine: set zap logger for NewTxnService txnengine: trim comment lines in bvt files txnstorage: add AttributeRow.Order txnstorage: add hidden __version column txnstorage: fix BatchIter txnstorage: fix MemHandler.HandleRead txnstorage: use req.TableID,DatabaseID in HandleDelTableDef,HandleGetRelations,HandleGetTableDefs,HandleGetTableDefs Approved by: @lni --- pkg/txn/storage/txn/batch.go | 142 ++++++++++++++++++---- pkg/txn/storage/txn/catalog.go | 1 + pkg/txn/storage/txn/mem_handler.go | 155 +++++++++++++++++------- pkg/txn/storage/txn/mem_handler_test.go | 8 +- pkg/txn/storage/txn/storage_test.go | 16 +-- pkg/vm/engine/txn/database.go | 10 +- pkg/vm/engine/txn/engine.go | 7 +- pkg/vm/engine/txn/error.go | 97 ++++++++++++++- pkg/vm/engine/txn/operations.go | 67 ++++++---- pkg/vm/engine/txn/test/bvt_test.go | 102 ++++++++++++++++ pkg/vm/engine/txn/test/engine_test.go | 21 ++-- pkg/vm/engine/txn/test/env.go | 15 ++- pkg/vm/engine/txn/test/execution.go | 42 ++++++- pkg/vm/engine/txn/test/node.go | 19 ++- pkg/vm/engine/txn/test/tx.go | 90 +++++++++----- pkg/vm/engine/txn/txn.go | 18 +++ 16 files changed, 650 insertions(+), 160 deletions(-) create mode 100644 pkg/vm/engine/txn/test/bvt_test.go diff --git a/pkg/txn/storage/txn/batch.go b/pkg/txn/storage/txn/batch.go index 17e1e2033..629fde1a8 100644 --- a/pkg/txn/storage/txn/batch.go +++ b/pkg/txn/storage/txn/batch.go @@ -22,78 +22,168 @@ import ( "github.com/matrixorigin/matrixone/pkg/container/vector" ) -type BatchIter func() (cols []any) +type BatchIter func() (tuple []any) func NewBatchIter(b *batch.Batch) BatchIter { i := 0 - max := b.Vecs[0].Length - iter := func() (cols []any) { - if i >= max { - return nil - } + iter := func() (tuple []any) { for _, vec := range b.Vecs { switch vec.Typ.Oid { case types.T_bool: - cols = append(cols, vec.Col.([]bool)[i]) + col := vec.Col.([]bool) + if i < len(col) { + tuple = append(tuple, col[i]) + } else { + return + } case types.T_int8: - cols = append(cols, vec.Col.([]int8)[i]) + col := vec.Col.([]int8) + if i < len(col) { + tuple = append(tuple, col[i]) + } else { + return + } case types.T_int16: - cols = append(cols, vec.Col.([]int16)[i]) + col := vec.Col.([]int16) + if i < len(col) { + tuple = append(tuple, col[i]) + } else { + return + } case types.T_int32: - cols = append(cols, vec.Col.([]int32)[i]) + col := vec.Col.([]int32) + if i < len(col) { + tuple = append(tuple, col[i]) + } else { + return + } case types.T_int64: - cols = append(cols, vec.Col.([]int64)[i]) + col := vec.Col.([]int64) + if i < len(col) { + tuple = append(tuple, col[i]) + } else { + return + } case types.T_uint8: - cols = append(cols, vec.Col.([]uint8)[i]) + col := vec.Col.([]uint8) + if i < len(col) { + tuple = append(tuple, col[i]) + } else { + return + } case types.T_uint16: - cols = append(cols, vec.Col.([]uint16)[i]) + col := vec.Col.([]uint16) + if i < len(col) { + tuple = append(tuple, col[i]) + } else { + return + } case types.T_uint32: - cols = append(cols, vec.Col.([]uint32)[i]) + col := vec.Col.([]uint32) + if i < len(col) { + tuple = append(tuple, col[i]) + } else { + return + } case types.T_uint64: - cols = append(cols, vec.Col.([]uint64)[i]) + col := vec.Col.([]uint64) + if i < len(col) { + tuple = append(tuple, col[i]) + } else { + return + } case types.T_float32: - cols = append(cols, vec.Col.([]float32)[i]) + col := vec.Col.([]float32) + if i < len(col) { + tuple = append(tuple, col[i]) + } else { + return + } case types.T_float64: - cols = append(cols, vec.Col.([]float64)[i]) + col := vec.Col.([]float64) + if i < len(col) { + tuple = append(tuple, col[i]) + } else { + return + } case types.T_sel: - cols = append(cols, vec.Col.([]int64)[i]) + col := vec.Col.([]int64) + if i < len(col) { + tuple = append(tuple, col[i]) + } else { + return + } case types.T_tuple: - cols = append(cols, vec.Col.([][]any)[i]) + col := vec.Col.([][]any) + if i < len(col) { + tuple = append(tuple, col[i]) + } else { + return + } case types.T_char, types.T_varchar, types.T_json, types.T_blob: info := vec.Col.(*types.Bytes) - str := vec.Data[info.Offsets[i] : info.Offsets[i]+info.Lengths[i]] - cols = append(cols, str) + if i < len(info.Offsets) { + str := vec.Data[info.Offsets[i] : info.Offsets[i]+info.Lengths[i]] + tuple = append(tuple, str) + } else { + return + } case types.T_date: - cols = append(cols, vec.Col.([]types.Date)[i]) + col := vec.Col.([]types.Date) + if i < len(col) { + tuple = append(tuple, col[i]) + } else { + return + } case types.T_datetime: - cols = append(cols, vec.Col.([]types.Datetime)[i]) + col := vec.Col.([]types.Datetime) + if i < len(col) { + tuple = append(tuple, col[i]) + } else { + return + } case types.T_timestamp: - cols = append(cols, vec.Col.([]types.Timestamp)[i]) + col := vec.Col.([]types.Timestamp) + if i < len(col) { + tuple = append(tuple, col[i]) + } else { + return + } case types.T_decimal64: - cols = append(cols, vec.Col.([]types.Decimal64)[i]) + col := vec.Col.([]types.Decimal64) + if i < len(col) { + tuple = append(tuple, col[i]) + } else { + return + } case types.T_decimal128: - cols = append(cols, vec.Col.([]types.Decimal128)[i]) + col := vec.Col.([]types.Decimal128) + if i < len(col) { + tuple = append(tuple, col[i]) + } else { + return + } } } diff --git a/pkg/txn/storage/txn/catalog.go b/pkg/txn/storage/txn/catalog.go index a2772431f..13d85ae8b 100644 --- a/pkg/txn/storage/txn/catalog.go +++ b/pkg/txn/storage/txn/catalog.go @@ -44,6 +44,7 @@ func (r RelationRow) PrimaryKey() Text { type AttributeRow struct { ID string RelationID string + Order int engine.Attribute } diff --git a/pkg/txn/storage/txn/mem_handler.go b/pkg/txn/storage/txn/mem_handler.go index 3a8195d9d..183cb245c 100644 --- a/pkg/txn/storage/txn/mem_handler.go +++ b/pkg/txn/storage/txn/mem_handler.go @@ -18,15 +18,20 @@ import ( "database/sql" "errors" "fmt" + "math/rand" + "sort" "sync" "github.com/google/uuid" "github.com/matrixorigin/matrixone/pkg/container/batch" + "github.com/matrixorigin/matrixone/pkg/container/types" "github.com/matrixorigin/matrixone/pkg/container/vector" + "github.com/matrixorigin/matrixone/pkg/pb/plan" "github.com/matrixorigin/matrixone/pkg/pb/timestamp" "github.com/matrixorigin/matrixone/pkg/pb/txn" "github.com/matrixorigin/matrixone/pkg/vm/engine" txnengine "github.com/matrixorigin/matrixone/pkg/vm/engine/txn" + "github.com/matrixorigin/matrixone/pkg/vm/mheap" ) type MemHandler struct { @@ -56,6 +61,9 @@ type MemHandler struct { // iterator id -> iterator Map map[string]*Iter[AnyKey, *AnyRow] } + + // misc + mheap *mheap.Mheap } type Iter[ @@ -67,7 +75,9 @@ type Iter[ FirstCalled bool } -func NewMemHandler() *MemHandler { +func NewMemHandler( + mheap *mheap.Mheap, +) *MemHandler { h := &MemHandler{} h.transactions.Map = make(map[string]*Transaction) h.tables.Map = make(map[string]*Table[AnyKey, *AnyRow]) @@ -76,6 +86,7 @@ func NewMemHandler() *MemHandler { h.relations = NewTable[Text, RelationRow]() h.attributes = NewTable[Text, AttributeRow]() h.indexes = NewTable[Text, IndexRow]() + h.mheap = mheap return h } @@ -83,13 +94,26 @@ var _ Handler = new(MemHandler) func (m *MemHandler) HandleAddTableDef(meta txn.TxnMeta, req txnengine.AddTableDefReq, resp *txnengine.AddTableDefResp) error { tx := m.getTx(meta) + + maxAttributeOrder := 0 + iter := m.attributes.NewIter(tx) + for ok := iter.First(); ok; ok = iter.Next() { + _, row := iter.Read() + if row.RelationID != req.TableID { + continue + } + if row.Order > maxAttributeOrder { + maxAttributeOrder = row.Order + } + } + switch def := req.Def.(type) { case *engine.CommentDef: // update comments row, err := m.relations.Get(tx, Text(req.TableID)) if errors.Is(err, sql.ErrNoRows) { - resp.ErrTableNotFound = true + resp.ErrTableNotFound.ID = req.TableID return nil } if err != nil { @@ -117,6 +141,7 @@ func (m *MemHandler) HandleAddTableDef(meta txn.TxnMeta, req txnengine.AddTableD attrRow := AttributeRow{ ID: uuid.NewString(), RelationID: req.TableID, + Order: maxAttributeOrder + 1, Attribute: def.Attr, } if err := m.attributes.Insert(tx, attrRow); err != nil { @@ -150,7 +175,7 @@ func (m *MemHandler) HandleAddTableDef(meta txn.TxnMeta, req txnengine.AddTableD // update properties row, err := m.relations.Get(tx, Text(req.TableID)) if errors.Is(err, sql.ErrNoRows) { - resp.ErrTableNotFound = true + resp.ErrTableNotFound.ID = req.TableID return nil } for _, prop := range def.Properties { @@ -198,7 +223,7 @@ func (m *MemHandler) HandleCloseTableIter(meta txn.TxnMeta, req txnengine.CloseT defer m.iterators.Unlock() iter, ok := m.iterators.Map[req.IterID] if !ok { - resp.ErrIterNotFound = true + resp.ErrIterNotFound.ID = req.IterID return nil } delete(m.iterators.Map, req.IterID) @@ -285,8 +310,19 @@ func (m *MemHandler) HandleCreateRelation(meta txn.TxnMeta, req txnengine.Create } } + // add version column + relAttrs = append(relAttrs, engine.Attribute{ + IsHidden: true, + IsRowId: true, + Name: rowVersionColumnName, + Type: types.T_decimal128.ToType(), + Default: &plan.Default{ + NullAbility: true, + }, + }) + // insert relation attributes - for _, attr := range relAttrs { + for i, attr := range relAttrs { if len(primaryColumnNames) > 0 { isPrimary := false for _, name := range primaryColumnNames { @@ -300,6 +336,7 @@ func (m *MemHandler) HandleCreateRelation(meta txn.TxnMeta, req txnengine.Create attrRow := AttributeRow{ ID: uuid.NewString(), RelationID: row.ID, + Order: i + 1, Attribute: attr, } if err := m.attributes.Insert(tx, attrRow); err != nil { @@ -324,9 +361,16 @@ func (m *MemHandler) HandleCreateRelation(meta txn.TxnMeta, req txnengine.Create return err } + // create table + m.tables.Lock() + defer m.tables.Unlock() + m.tables.Map[row.ID] = NewTable[AnyKey, *AnyRow]() + return nil } +const rowVersionColumnName = "__version" + func (m *MemHandler) HandleDelTableDef(meta txn.TxnMeta, req txnengine.DelTableDefReq, resp *txnengine.DelTableDefResp) error { tx := m.getTx(meta) switch def := req.Def.(type) { @@ -335,7 +379,7 @@ func (m *MemHandler) HandleDelTableDef(meta txn.TxnMeta, req txnengine.DelTableD // del comments row, err := m.relations.Get(tx, Text(req.TableID)) if errors.Is(err, sql.ErrNoRows) { - resp.ErrTableNotFound = true + resp.ErrTableNotFound.ID = req.TableID return nil } if err != nil { @@ -378,7 +422,7 @@ func (m *MemHandler) HandleDelTableDef(meta txn.TxnMeta, req txnengine.DelTableD // delete properties row, err := m.relations.Get(tx, Text(req.TableID)) if errors.Is(err, sql.ErrNoRows) { - resp.ErrTableNotFound = true + resp.ErrTableNotFound.ID = req.TableID return nil } for _, prop := range def.Properties { @@ -394,6 +438,9 @@ func (m *MemHandler) HandleDelTableDef(meta txn.TxnMeta, req txnengine.DelTableD defer iter.Close() for ok := iter.First(); ok; ok = iter.Next() { _, attrRow := iter.Read() + if attrRow.RelationID != req.TableID { + continue + } if !attrRow.Primary { continue } @@ -416,7 +463,7 @@ func (m *MemHandler) HandleDelete(meta txn.TxnMeta, req txnengine.DeleteReq, res table, ok := m.tables.Map[req.TableID] m.tables.Unlock() if !ok { - resp.ErrTableNotFound = true + resp.ErrTableNotFound.ID = req.TableID return nil } @@ -447,7 +494,7 @@ func (m *MemHandler) HandleDeleteDatabase(meta txn.TxnMeta, req txnengine.Delete } return nil } - resp.ErrNotFound = true + resp.ErrNotFound.Name = req.Name return nil } @@ -466,7 +513,7 @@ func (m *MemHandler) HandleDeleteRelation(meta txn.TxnMeta, req txnengine.Delete } return nil } - resp.ErrNotFound = true + resp.ErrNotFound.Name = req.Name return nil } @@ -504,6 +551,9 @@ func (m *MemHandler) HandleGetRelations(meta txn.TxnMeta, req txnengine.GetRelat defer iter.Close() for ok := iter.First(); ok; ok = iter.Next() { _, row := iter.Read() + if row.DatabaseID != req.DatabaseID { + continue + } resp.Names = append(resp.Names, row.Name) } return nil @@ -514,7 +564,7 @@ func (m *MemHandler) HandleGetTableDefs(meta txn.TxnMeta, req txnengine.GetTable relRow, err := m.relations.Get(tx, Text(req.TableID)) if errors.Is(err, sql.ErrNoRows) { - resp.ErrTableNotFound = true + resp.ErrTableNotFound.ID = req.TableID return nil } if err != nil { @@ -531,11 +581,13 @@ func (m *MemHandler) HandleGetTableDefs(meta txn.TxnMeta, req txnengine.GetTable var primaryAttrNames []string iter := m.attributes.NewIter(tx) defer iter.Close() + var attrRows []*AttributeRow for ok := iter.First(); ok; ok = iter.Next() { _, attrRow := iter.Read() - resp.Defs = append(resp.Defs, &engine.AttributeDef{ - Attr: attrRow.Attribute, - }) + if attrRow.RelationID != req.TableID { + continue + } + attrRows = append(attrRows, attrRow) if attrRow.Primary { primaryAttrNames = append(primaryAttrNames, attrRow.Name) } @@ -545,6 +597,15 @@ func (m *MemHandler) HandleGetTableDefs(meta txn.TxnMeta, req txnengine.GetTable Names: primaryAttrNames, }) } + sort.Slice(attrRows, func(i, j int) bool { + return attrRows[i].Order < attrRows[j].Order + }) + for _, row := range attrRows { + resp.Defs = append(resp.Defs, &engine.AttributeDef{ + Attr: row.Attribute, + }) + + } } // indexes @@ -553,6 +614,9 @@ func (m *MemHandler) HandleGetTableDefs(meta txn.TxnMeta, req txnengine.GetTable defer iter.Close() for ok := iter.First(); ok; ok = iter.Next() { _, indexRow := iter.Read() + if indexRow.RelationID != req.TableID { + continue + } resp.Defs = append(resp.Defs, &indexRow.IndexTableDef) } } @@ -577,7 +641,7 @@ func (m *MemHandler) HandleNewTableIter(meta txn.TxnMeta, req txnengine.NewTable defer m.tables.Unlock() table, ok := m.tables.Map[req.TableID] if !ok { - resp.ErrTableNotFound = true + resp.ErrTableNotFound.ID = req.TableID return nil } @@ -617,7 +681,7 @@ func (m *MemHandler) HandleOpenDatabase(meta txn.TxnMeta, req txnengine.OpenData return nil } } - resp.ErrNotFound = true + resp.ErrNotFound.Name = req.Name return nil } @@ -634,7 +698,7 @@ func (m *MemHandler) HandleOpenRelation(meta txn.TxnMeta, req txnengine.OpenRela return nil } } - resp.ErrNotFound = true + resp.ErrNotFound.Name = req.Name return nil } @@ -643,11 +707,13 @@ func (m *MemHandler) HandleRead(meta txn.TxnMeta, req txnengine.ReadReq, resp *t iter, ok := m.iterators.Map[req.IterID] if !ok { m.iterators.Unlock() - resp.ErrIterNotFound = true + resp.ErrIterNotFound.ID = req.IterID return nil } m.iterators.Unlock() + //TODO handle system tables + b := batch.New(false, req.ColNames) for i, name := range req.ColNames { @@ -660,25 +726,30 @@ func (m *MemHandler) HandleRead(meta txn.TxnMeta, req txnengine.ReadReq, resp *t } else { iter.FirstCalled = true } - maxRows := 1024 + maxRows := 4096 rows := 0 for ok := fn(); ok; ok = iter.TableIter.Next() { - if rows > maxRows { - break - } - _, row := iter.TableIter.Read() for i, name := range req.ColNames { - value, ok := (*row).attributes[name] + value, ok := (*row).attributes[iter.AttrsMap[name].ID] if !ok { - resp.ErrColumnNotFound = name + resp.ErrColumnNotFound.Name = name return nil } - b.Vecs[i].Append(value, nil) + b.Vecs[i].Append(value, m.mheap) } rows++ + + if rows >= maxRows { + break + } + } + + if rows > 0 { + b.InitZsOne(rows) + resp.Batch = b } return nil @@ -688,7 +759,7 @@ func (m *MemHandler) HandleTruncate(meta txn.TxnMeta, req txnengine.TruncateReq, tx := m.getTx(meta) _, err := m.relations.Get(tx, Text(req.TableID)) if errors.Is(err, sql.ErrNoRows) { - resp.ErrTableNotFound = true + resp.ErrTableNotFound.ID = req.TableID return nil } m.tables.Lock() @@ -749,7 +820,7 @@ func (m *MemHandler) rangeBatchPhysicalRows( tx *Transaction, tableID string, b *batch.Batch, - errTableNotFound *bool, + errTableNotFound *txnengine.ErrRelationNotFound, fn func( *Table[AnyKey, *AnyRow], *AnyRow, @@ -769,12 +840,17 @@ func (m *MemHandler) rangeBatchPhysicalRows( } if len(nameToAttrs) == 0 { - *errTableNotFound = true + errTableNotFound.ID = tableID return nil } // write - table := m.ensureTable(tableID) + m.tables.Lock() + table, ok := m.tables.Map[tableID] + m.tables.Unlock() + if !ok { + panic("impossible") + } batchIter := NewBatchIter(b) for { row := batchIter() @@ -799,6 +875,12 @@ func (m *MemHandler) rangeBatchPhysicalRows( physicalRow.attributes[attr.ID] = col } + // add version + a := rand.Int63() + b := rand.Int63() + version := types.Decimal128FromInt64Raw(a, b) + physicalRow.attributes[nameToAttrs[rowVersionColumnName].ID] = version + if err := fn(table, physicalRow); err != nil { return err } @@ -820,17 +902,6 @@ func (m *MemHandler) getTx(meta txn.TxnMeta) *Transaction { return tx } -func (m *MemHandler) ensureTable(relationID string) *Table[AnyKey, *AnyRow] { - m.tables.Lock() - defer m.tables.Unlock() - table, ok := m.tables.Map[relationID] - if !ok { - table = NewTable[AnyKey, *AnyRow]() - m.tables.Map[relationID] = table - } - return table -} - func (*MemHandler) HandleClose() error { return nil } @@ -846,7 +917,7 @@ func (m *MemHandler) HandleCommitting(meta txn.TxnMeta) error { } func (m *MemHandler) HandleDestroy() error { - *m = *NewMemHandler() + *m = *NewMemHandler(m.mheap) return nil } diff --git a/pkg/txn/storage/txn/mem_handler_test.go b/pkg/txn/storage/txn/mem_handler_test.go index 8f2deef38..f59f35f5c 100644 --- a/pkg/txn/storage/txn/mem_handler_test.go +++ b/pkg/txn/storage/txn/mem_handler_test.go @@ -14,10 +14,14 @@ package txnstorage -import "testing" +import ( + "testing" + + "github.com/matrixorigin/matrixone/pkg/testutil" +) func TestMemHandler(t *testing.T) { testDatabase(t, func() (*Storage, error) { - return New(NewMemHandler()) + return New(NewMemHandler(testutil.NewMheap())) }) } diff --git a/pkg/txn/storage/txn/storage_test.go b/pkg/txn/storage/txn/storage_test.go index 71d5907ec..f180bd0ff 100644 --- a/pkg/txn/storage/txn/storage_test.go +++ b/pkg/txn/storage/txn/storage_test.go @@ -57,7 +57,7 @@ func testDatabase( Name: "foo", }, ) - assert.Equal(t, true, resp.ErrNotFound) + assert.Equal(t, "foo", resp.ErrNotFound.Name) } // create database @@ -69,7 +69,7 @@ func testDatabase( Name: "foo", }, ) - assert.Equal(t, false, resp.ErrExisted) + assert.Equal(t, txnengine.ErrExisted(false), resp.ErrExisted) } // get databases @@ -93,7 +93,7 @@ func testDatabase( Name: "foo", }, ) - assert.Equal(t, false, resp.ErrNotFound) + assert.Equal(t, "", resp.ErrNotFound.Name) assert.NotNil(t, resp.ID) dbID = resp.ID @@ -107,7 +107,7 @@ func testDatabase( Name: "foo", }, ) - assert.Equal(t, false, resp.ErrNotFound) + assert.Equal(t, "", resp.ErrNotFound.Name) } { resp := testRead[txnengine.GetDatabasesResp]( @@ -130,7 +130,7 @@ func testDatabase( Name: "table", }, ) - assert.Equal(t, true, resp.ErrNotFound) + assert.Equal(t, "table", resp.ErrNotFound.Name) } // create relation @@ -160,7 +160,7 @@ func testDatabase( }, }, ) - assert.Equal(t, false, resp.ErrExisted) + assert.Equal(t, txnengine.ErrExisted(false), resp.ErrExisted) } // get relations @@ -187,7 +187,7 @@ func testDatabase( Name: "table", }, ) - assert.Equal(t, false, resp.ErrNotFound) + assert.Equal(t, "", resp.ErrNotFound.Name) assert.NotNil(t, resp.ID) relID = resp.ID assert.Equal(t, txnengine.RelationTable, resp.Type) @@ -205,7 +205,7 @@ func testDatabase( Name: "table", }, ) - assert.Equal(t, false, resp.ErrNotFound) + assert.Equal(t, "", resp.ErrNotFound.Name) } { resp := testRead[txnengine.GetRelationsResp]( diff --git a/pkg/vm/engine/txn/database.go b/pkg/vm/engine/txn/database.go index 7fa2ad48f..65524a977 100644 --- a/pkg/vm/engine/txn/database.go +++ b/pkg/vm/engine/txn/database.go @@ -17,6 +17,7 @@ package txnengine import ( "context" "fmt" + "strings" "github.com/matrixorigin/matrixone/pkg/txn/client" "github.com/matrixorigin/matrixone/pkg/vm/engine" @@ -42,7 +43,7 @@ func (d *Database) Create(ctx context.Context, relName string, defs []engine.Tab CreateRelationReq{ DatabaseID: d.id, Type: RelationTable, - Name: relName, + Name: strings.ToLower(relName), Defs: defs, }, ) @@ -63,7 +64,7 @@ func (d *Database) Delete(ctx context.Context, relName string) error { OpDeleteRelation, DeleteRelationReq{ DatabaseID: d.id, - Name: relName, + Name: strings.ToLower(relName), }, ) if err != nil { @@ -83,7 +84,7 @@ func (d *Database) Relation(ctx context.Context, relName string) (engine.Relatio OpOpenRelation, OpenRelationReq{ DatabaseID: d.id, - Name: relName, + Name: strings.ToLower(relName), }, ) if err != nil { @@ -91,9 +92,6 @@ func (d *Database) Relation(ctx context.Context, relName string) (engine.Relatio } resp := resps[0] - if resp.ErrNotFound { - return nil, fmt.Errorf("relation not found: %s", relName) - } switch resp.Type { diff --git a/pkg/vm/engine/txn/engine.go b/pkg/vm/engine/txn/engine.go index 516a8eb1b..4ea4cb34f 100644 --- a/pkg/vm/engine/txn/engine.go +++ b/pkg/vm/engine/txn/engine.go @@ -16,6 +16,7 @@ package txnengine import ( "context" + "strings" logservicepb "github.com/matrixorigin/matrixone/pkg/pb/logservice" "github.com/matrixorigin/matrixone/pkg/txn/client" @@ -56,7 +57,7 @@ func (e *Engine) Create(ctx context.Context, dbName string, txnOperator client.T e.allNodesShards, OpCreateDatabase, CreateDatabaseReq{ - Name: dbName, + Name: strings.ToLower(dbName), }, ) if err != nil { @@ -76,7 +77,7 @@ func (e *Engine) Database(ctx context.Context, dbName string, txnOperator client e.firstNodeShard, OpOpenDatabase, OpenDatabaseReq{ - Name: dbName, + Name: strings.ToLower(dbName), }, ) if err != nil { @@ -127,7 +128,7 @@ func (e *Engine) Delete(ctx context.Context, dbName string, txnOperator client.T e.allNodesShards, OpDeleteDatabase, DeleteDatabaseReq{ - Name: dbName, + Name: strings.ToLower(dbName), }, ) if err != nil { diff --git a/pkg/vm/engine/txn/error.go b/pkg/vm/engine/txn/error.go index fb20a4e1c..2c1274d31 100644 --- a/pkg/vm/engine/txn/error.go +++ b/pkg/vm/engine/txn/error.go @@ -15,16 +15,20 @@ package txnengine import ( + "errors" + "fmt" + "strings" + "github.com/matrixorigin/matrixone/pkg/pb/txn" ) -type Error struct { +type TxnError struct { txnError *txn.TxnError } -var _ error = Error{} +var _ error = TxnError{} -func (e Error) Error() string { +func (e TxnError) Error() string { if e.txnError != nil { return e.txnError.DebugString() } @@ -34,10 +38,95 @@ func (e Error) Error() string { func errorFromTxnResponses(resps []txn.TxnResponse) error { for _, resp := range resps { if resp.TxnError != nil { - return Error{ + return TxnError{ txnError: resp.TxnError, } } } return nil } + +type ErrExisted bool + +var _ error = ErrExisted(true) + +func (e ErrExisted) Error() string { + return "existed" +} + +type ErrDatabaseNotFound struct { + ID string + Name string +} + +var _ error = ErrDatabaseNotFound{} + +func (e ErrDatabaseNotFound) Error() string { + return fmt.Sprintf("database not found: [%s] [%s]", e.Name, e.ID) +} + +type ErrRelationNotFound struct { + ID string + Name string +} + +var _ error = ErrRelationNotFound{} + +func (e ErrRelationNotFound) Error() string { + return fmt.Sprintf("relation not found: [%s] [%s]", e.Name, e.ID) +} + +type ErrDefNotFound struct { + ID string + Name string +} + +var _ error = ErrDefNotFound{} + +func (e ErrDefNotFound) Error() string { + return fmt.Sprintf("definition not found: [%s] [%s]", e.Name, e.ID) +} + +type ErrIterNotFound struct { + ID string +} + +var _ error = ErrIterNotFound{} + +func (e ErrIterNotFound) Error() string { + return fmt.Sprintf("iter not found: %s", e.ID) +} + +type ErrColumnNotFound struct { + Name string +} + +var _ error = ErrColumnNotFound{} + +func (e ErrColumnNotFound) Error() string { + return fmt.Sprintf("column not found: %s", e.Name) +} + +type Errors []error + +var _ error = Errors{} + +func (e Errors) Error() string { + buf := new(strings.Builder) + for i, err := range e { + if i > 0 { + buf.WriteRune('\n') + } + buf.WriteString(err.Error()) + } + return buf.String() +} + +func (e Errors) As(target any) bool { + for _, err := range e { + if errors.As(err, target) { + return true + } + } + return false +} diff --git a/pkg/vm/engine/txn/operations.go b/pkg/vm/engine/txn/operations.go index 8d885f69f..b39409560 100644 --- a/pkg/vm/engine/txn/operations.go +++ b/pkg/vm/engine/txn/operations.go @@ -84,6 +84,31 @@ func init() { gob.Register([]types.Timestamp{}) gob.Register([]types.Decimal64{}) gob.Register([]types.Decimal128{}) + + // plan types + gob.Register(&plan.Expr_C{}) + gob.Register(&plan.Expr_P{}) + gob.Register(&plan.Expr_V{}) + gob.Register(&plan.Expr_Col{}) + gob.Register(&plan.Expr_F{}) + gob.Register(&plan.Expr_Sub{}) + gob.Register(&plan.Expr_Corr{}) + gob.Register(&plan.Expr_T{}) + gob.Register(&plan.Expr_List{}) + gob.Register(&plan.Const_Ival{}) + gob.Register(&plan.Const_Dval{}) + gob.Register(&plan.Const_Sval{}) + gob.Register(&plan.Const_Bval{}) + gob.Register(&plan.Const_Uval{}) + gob.Register(&plan.Const_Fval{}) + gob.Register(&plan.Const_Dateval{}) + gob.Register(&plan.Const_Datetimeval{}) + gob.Register(&plan.Const_Decimal64Val{}) + gob.Register(&plan.Const_Decimal128Val{}) + gob.Register(&plan.Const_Timestampval{}) + gob.Register(&plan.Const_Jsonval{}) + gob.Register(&plan.Const_Defaultval{}) + } type CreateDatabaseReq struct { @@ -91,7 +116,7 @@ type CreateDatabaseReq struct { } type CreateDatabaseResp struct { - ErrExisted bool + ErrExisted ErrExisted } type OpenDatabaseReq struct { @@ -100,7 +125,7 @@ type OpenDatabaseReq struct { type OpenDatabaseResp struct { ID string - ErrNotFound bool + ErrNotFound ErrDatabaseNotFound } type GetDatabasesReq struct { @@ -115,7 +140,7 @@ type DeleteDatabaseReq struct { } type DeleteDatabaseResp struct { - ErrNotFound bool + ErrNotFound ErrDatabaseNotFound } type CreateRelationReq struct { @@ -126,7 +151,7 @@ type CreateRelationReq struct { } type CreateRelationResp struct { - ErrExisted bool + ErrExisted ErrExisted } type DeleteRelationReq struct { @@ -135,7 +160,7 @@ type DeleteRelationReq struct { } type DeleteRelationResp struct { - ErrNotFound bool + ErrNotFound ErrRelationNotFound } type OpenRelationReq struct { @@ -146,7 +171,7 @@ type OpenRelationReq struct { type OpenRelationResp struct { ID string Type RelationType - ErrNotFound bool + ErrNotFound ErrRelationNotFound } type GetRelationsReq struct { @@ -163,9 +188,9 @@ type AddTableDefReq struct { } type AddTableDefResp struct { - ErrTableNotFound bool - ErrExisted bool - ErrColumnNotFound string + ErrTableNotFound ErrRelationNotFound + ErrExisted ErrExisted + ErrColumnNotFound ErrColumnNotFound } type DelTableDefReq struct { @@ -174,8 +199,8 @@ type DelTableDefReq struct { } type DelTableDefResp struct { - ErrTableNotFound bool - ErrDefNotFound bool + ErrTableNotFound ErrRelationNotFound + ErrDefNotFound ErrDefNotFound } type DeleteReq struct { @@ -184,7 +209,7 @@ type DeleteReq struct { } type DeleteResp struct { - ErrTableNotFound bool + ErrTableNotFound ErrRelationNotFound } type GetPrimaryKeysReq struct { @@ -193,7 +218,7 @@ type GetPrimaryKeysReq struct { type GetPrimaryKeysResp struct { Attrs []*engine.Attribute - ErrTableNotFound bool + ErrTableNotFound ErrRelationNotFound } type GetTableDefsReq struct { @@ -202,7 +227,7 @@ type GetTableDefsReq struct { type GetTableDefsResp struct { Defs []engine.TableDef - ErrTableNotFound bool + ErrTableNotFound ErrRelationNotFound } type TruncateReq struct { @@ -211,7 +236,7 @@ type TruncateReq struct { type TruncateResp struct { AffectedRows int64 - ErrTableNotFound bool + ErrTableNotFound ErrRelationNotFound } type UpdateReq struct { @@ -220,7 +245,7 @@ type UpdateReq struct { } type UpdateResp struct { - ErrTableNotFound bool + ErrTableNotFound ErrRelationNotFound } type WriteReq struct { @@ -229,7 +254,7 @@ type WriteReq struct { } type WriteResp struct { - ErrTableNotFound bool + ErrTableNotFound ErrRelationNotFound } type NewTableIterReq struct { @@ -240,7 +265,7 @@ type NewTableIterReq struct { type NewTableIterResp struct { IterID string - ErrTableNotFound bool + ErrTableNotFound ErrRelationNotFound } type ReadReq struct { @@ -250,8 +275,8 @@ type ReadReq struct { type ReadResp struct { Batch *batch.Batch - ErrIterNotFound bool - ErrColumnNotFound string + ErrIterNotFound ErrIterNotFound + ErrColumnNotFound ErrColumnNotFound } type CloseTableIterReq struct { @@ -259,5 +284,5 @@ type CloseTableIterReq struct { } type CloseTableIterResp struct { - ErrIterNotFound bool + ErrIterNotFound ErrIterNotFound } diff --git a/pkg/vm/engine/txn/test/bvt_test.go b/pkg/vm/engine/txn/test/bvt_test.go new file mode 100644 index 000000000..68fca6791 --- /dev/null +++ b/pkg/vm/engine/txn/test/bvt_test.go @@ -0,0 +1,102 @@ +// Copyright 2022 Matrix Origin +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package testtxnengine + +import ( + "bytes" + "context" + "os" + "path/filepath" + "sort" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func testBVT(t *testing.T, globPattern string) { + files, err := filepath.Glob(globPattern) + assert.Nil(t, err) + assert.True(t, len(files) > 0) + sort.Strings(files) + + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() + + env, err := newEnv(ctx) + assert.Nil(t, err) + defer func() { + if err := env.Close(); err != nil { + t.Fatal(err) + } + }() + + tx := env.NewTx() + defer func() { + if err == nil { + err = tx.Commit(ctx) + } + }() + + for _, file := range files { + content, err := os.ReadFile(file) + assert.Nil(t, err) + + // trim comment lines + lines := bytes.Split(content, []byte("\n")) + filtered := lines[:0] + for _, line := range lines { + if bytes.HasPrefix(line, []byte("--")) { + continue + } + filtered = append(filtered, line) + } + content = bytes.Join(filtered, []byte("\n")) + + err = tx.Exec(ctx, file, string(content)) + assert.Nil(t, err) + } + +} + +func TestTPCH(t *testing.T) { + _ = testBVT + t.Skip() + testBVT( + t, + filepath.Join( + "..", "..", "..", "..", "..", + "test", "cases", "benchmark", "tpch", "*", "*", + ), + ) +} + +func TestDML(t *testing.T) { + t.Skip() + for _, dir := range []string{ + "select", + "insert", + "update", + "delete", + } { + testBVT( + t, + filepath.Join( + "..", "..", "..", "..", "..", + "test", "cases", "dml", dir, "*", + ), + ) + } +} diff --git a/pkg/vm/engine/txn/test/engine_test.go b/pkg/vm/engine/txn/test/engine_test.go index fac1e8800..3868c7f20 100644 --- a/pkg/vm/engine/txn/test/engine_test.go +++ b/pkg/vm/engine/txn/test/engine_test.go @@ -24,7 +24,11 @@ import ( func TestEngine(t *testing.T) { - env := newEnv() + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() + + env, err := newEnv(ctx) + assert.Nil(t, err) defer func() { if err := env.Close(); err != nil { t.Fatal(err) @@ -32,10 +36,7 @@ func TestEngine(t *testing.T) { }() tx := env.NewTx() - ctx, cancel := context.WithTimeout(context.Background(), time.Minute) - defer cancel() - - err := tx.Exec(ctx, ` + err = tx.Exec(ctx, "", ` create table foo ( a int primary key, b int @@ -43,19 +44,19 @@ func TestEngine(t *testing.T) { `) assert.Nil(t, err) - err = tx.Exec(ctx, `insert into foo (a, b) values (1, 2)`) + err = tx.Exec(ctx, "", `insert into foo (a, b) values (1, 2)`) assert.Nil(t, err) - err = tx.Exec(ctx, `select a, b from foo where b = 2`) + err = tx.Exec(ctx, "", `select a, b from foo where b = 2`) assert.Nil(t, err) - err = tx.Exec(ctx, `update foo set b = 3 where a = 1`) + err = tx.Exec(ctx, "", `update foo set b = 3 where a = 1`) assert.Nil(t, err) - err = tx.Exec(ctx, `delete from foo where a = 1`) + err = tx.Exec(ctx, "", `delete from foo where a = 1`) assert.Nil(t, err) - err = tx.Exec(ctx, `drop table foo`) + err = tx.Exec(ctx, "", `drop table foo`) assert.Nil(t, err) err = tx.Commit(ctx) diff --git a/pkg/vm/engine/txn/test/env.go b/pkg/vm/engine/txn/test/env.go index e40d1f6dd..239577c08 100644 --- a/pkg/vm/engine/txn/test/env.go +++ b/pkg/vm/engine/txn/test/env.go @@ -25,6 +25,8 @@ import ( txnengine "github.com/matrixorigin/matrixone/pkg/vm/engine/txn" ) +const defaultDatabase = "db" + type testEnv struct { txnClient client.TxnClient engine *txnengine.Engine @@ -42,7 +44,7 @@ func (t *testEnv) Close() error { return nil } -func newEnv() *testEnv { +func newEnv(ctx context.Context) (*testEnv, error) { env := &testEnv{} sender := &Sender{ @@ -74,5 +76,14 @@ func newEnv() *testEnv { }, ) - return env + // create default database + op := env.txnClient.New() + if err := env.engine.Create(ctx, defaultDatabase, op); err != nil { + return nil, err + } + if err := op.Commit(ctx); err != nil { + return nil, err + } + + return env, nil } diff --git a/pkg/vm/engine/txn/test/execution.go b/pkg/vm/engine/txn/test/execution.go index d35a16acd..2f05a57c2 100644 --- a/pkg/vm/engine/txn/test/execution.go +++ b/pkg/vm/engine/txn/test/execution.go @@ -16,14 +16,18 @@ package testtxnengine import ( "context" + "errors" + "github.com/matrixorigin/matrixone/pkg/sql/parsers/tree" "github.com/matrixorigin/matrixone/pkg/sql/plan" "github.com/matrixorigin/matrixone/pkg/vm/engine" + txnengine "github.com/matrixorigin/matrixone/pkg/vm/engine/txn" ) type Execution struct { - tx *Tx - ctx context.Context + ctx context.Context + tx *Tx + stmt tree.Statement } var _ plan.CompilerContext = new(Execution) @@ -42,7 +46,7 @@ func (e *Execution) DatabaseExists(name string) bool { } func (e *Execution) DefaultDatabase() string { - return "test" + return e.tx.databaseName } func (e *Execution) GetRootSql() string { @@ -55,7 +59,7 @@ func (e *Execution) GetHideKeyDef(dbName string, tableName string) *plan.ColDef panic(err) } for i, attr := range attrs { - if attr.Primary { //TODO hide key? + if attr.IsHidden { return engineAttrToPlanColDef(i, attr) } } @@ -77,6 +81,9 @@ func (e *Execution) GetPrimaryKeyDef(dbName string, tableName string) (defs []*p } func (e *Execution) Resolve(schemaName string, tableName string) (objRef *plan.ObjectRef, tableDef *plan.TableDef) { + if schemaName == "" { + schemaName = e.tx.databaseName + } objRef = &plan.ObjectRef{ SchemaName: schemaName, @@ -86,14 +93,37 @@ func (e *Execution) Resolve(schemaName string, tableName string) (objRef *plan.O tableDef = &plan.TableDef{ Name: tableName, } + attrs, err := e.getTableAttrs(schemaName, tableName) + var errDBNotFound txnengine.ErrDatabaseNotFound + if errors.As(err, &errDBNotFound) { + return nil, nil + } + var errRelNotFound txnengine.ErrRelationNotFound + if errors.As(err, &errRelNotFound) { + return nil, nil + } if err != nil { panic(err) } + for i, attr := range attrs { + + // return hidden columns for update or detete statement + if attr.IsHidden { + switch e.stmt.(type) { + case *tree.Update, *tree.Delete: + default: + continue + } + } + tableDef.Cols = append(tableDef.Cols, engineAttrToPlanColDef(i, attr)) } + //TODO properties + //TODO view + return } @@ -136,13 +166,15 @@ func engineAttrToPlanColDef(idx int, attr *engine.Attribute) *plan.ColDef { Name: attr.Name, Typ: &plan.Type{ Id: int32(attr.Type.Oid), - Nullable: false, //TODO + Nullable: attr.Default.NullAbility, Width: attr.Type.Width, Precision: attr.Type.Precision, Size: attr.Type.Size, Scale: attr.Type.Scale, }, + Default: attr.Default, Primary: attr.Primary, Pkidx: int32(idx), + Comment: attr.Comment, } } diff --git a/pkg/vm/engine/txn/test/node.go b/pkg/vm/engine/txn/test/node.go index 399fb8e2b..91b283686 100644 --- a/pkg/vm/engine/txn/test/node.go +++ b/pkg/vm/engine/txn/test/node.go @@ -21,8 +21,10 @@ import ( "github.com/google/uuid" logservicepb "github.com/matrixorigin/matrixone/pkg/pb/logservice" "github.com/matrixorigin/matrixone/pkg/pb/metadata" + "github.com/matrixorigin/matrixone/pkg/testutil" "github.com/matrixorigin/matrixone/pkg/txn/service" txnstorage "github.com/matrixorigin/matrixone/pkg/txn/storage/txn" + "go.uber.org/zap" ) type Node struct { @@ -44,7 +46,7 @@ func (t *testEnv) NewNode(id uint64) *Node { } storage, err := txnstorage.New( - txnstorage.NewMemHandler(), + txnstorage.NewMemHandler(testutil.NewMheap()), ) if err != nil { panic(err) @@ -56,8 +58,21 @@ func (t *testEnv) NewNode(id uint64) *Node { State: logservicepb.NormalState, } + loggerConfig := zap.Config{ + Level: zap.NewAtomicLevelAt(zap.InfoLevel), + Development: true, + Encoding: "console", + EncoderConfig: zap.NewDevelopmentEncoderConfig(), + OutputPaths: []string{"stderr"}, + ErrorOutputPaths: []string{"stderr"}, + } + logger, err := loggerConfig.Build() + if err != nil { + panic(err) + } + service := service.NewTxnService( - nil, + logger, shard, storage, t.sender, diff --git a/pkg/vm/engine/txn/test/tx.go b/pkg/vm/engine/txn/test/tx.go index 20ca9a586..766bea1db 100644 --- a/pkg/vm/engine/txn/test/tx.go +++ b/pkg/vm/engine/txn/test/tx.go @@ -16,9 +16,14 @@ package testtxnengine import ( "context" + "fmt" + "github.com/matrixorigin/matrixone/pkg/container/batch" "github.com/matrixorigin/matrixone/pkg/sql/compile" + "github.com/matrixorigin/matrixone/pkg/sql/errors" + "github.com/matrixorigin/matrixone/pkg/sql/parsers/dialect" "github.com/matrixorigin/matrixone/pkg/sql/parsers/dialect/mysql" + "github.com/matrixorigin/matrixone/pkg/sql/parsers/tree" "github.com/matrixorigin/matrixone/pkg/sql/plan" "github.com/matrixorigin/matrixone/pkg/testutil" "github.com/matrixorigin/matrixone/pkg/txn/client" @@ -26,55 +31,82 @@ import ( ) type Tx struct { - operator client.TxnOperator - engine *txnengine.Engine + operator client.TxnOperator + engine *txnengine.Engine + databaseName string } func (t *testEnv) NewTx() *Tx { operator := t.txnClient.New() tx := &Tx{ - operator: operator, - engine: t.engine, + operator: operator, + engine: t.engine, + databaseName: defaultDatabase, } return tx } -func (t *Tx) Exec(ctx context.Context, stmtText string) error { +func (t *Tx) Exec(ctx context.Context, filePath string, stmtText string) error { stmts, err := mysql.Parse(stmtText) if err != nil { return err } - stmt := stmts[0] - proc := testutil.NewProcess() - proc.Snapshot = txnengine.OperatorToSnapshot(t.operator) //TODO remove this - compileCtx := compile.New("test", stmtText, "", ctx, t.engine, proc, stmt) + for _, stmt := range stmts { - //optimizer := plan.NewBaseOptimizer(t) - //query, err := optimizer.Optimize(stmt) - //if err != nil { - // return err - //} + stmtText := tree.String(stmt, dialect.MYSQL) + println(stmtText) - exec := &Execution{ - tx: t, - ctx: ctx, - } + switch stmt := stmt.(type) { - execPlan, err := plan.BuildPlan(exec, stmt) - if err != nil { - return err - } + case *tree.Use: + t.databaseName = stmt.Name + continue - err = compileCtx.Compile(execPlan, nil, nil) - if err != nil { - return err - } + } + + proc := testutil.NewProcess() + proc.Snapshot = txnengine.OperatorToSnapshot(t.operator) //TODO remove this + compileCtx := compile.New(t.databaseName, stmtText, "", ctx, t.engine, proc, stmt) + + //optimizer := plan.NewBaseOptimizer(t) + //query, err := optimizer.Optimize(stmt) + //if err != nil { + // return err + //} + + exec := &Execution{ + ctx: ctx, + tx: t, + stmt: stmt, + } + + execPlan, err := plan.BuildPlan(exec, stmt) + if err != nil { + return err + } + + err = compileCtx.Compile(execPlan, nil, func(i any, batch *batch.Batch) error { + fmt.Printf("%v\n", batch) //TODO + return nil + }) + if err != nil { + return err + } + + err = compileCtx.Run(0) + if err != nil { + + sqlError, ok := err.(*errors.SqlError) + if ok { + fmt.Printf("%s\n", sqlError.Error()) + return nil + } + + panic(err) + } - err = compileCtx.Run(0) - if err != nil { - return err } return nil diff --git a/pkg/vm/engine/txn/txn.go b/pkg/vm/engine/txn/txn.go index efa12bcda..ebf517839 100644 --- a/pkg/vm/engine/txn/txn.go +++ b/pkg/vm/engine/txn/txn.go @@ -18,6 +18,7 @@ import ( "bytes" "context" "encoding/gob" + "reflect" "time" "github.com/matrixorigin/matrixone/pkg/pb/txn" @@ -65,13 +66,30 @@ func doTxnRequest[ return } + var respErrors Errors for _, res := range result.Responses { var resp Resp if err = gob.NewDecoder(bytes.NewReader(res.CNOpResponse.Payload)).Decode(&resp); err != nil { return } + + respValue := reflect.ValueOf(resp) + for i := 0; i < respValue.NumField(); i++ { + field := respValue.Field(i) + if field.Type().Implements(errorType) && + !field.IsZero() { + respErrors = append(respErrors, field.Interface().(error)) + } + } + resps = append(resps, resp) } + if len(respErrors) > 0 { + err = respErrors + } + return } + +var errorType = reflect.TypeOf((*error)(nil)).Elem() -- GitLab