diff --git a/pkg/vm/engine/disttae/engine.go b/pkg/vm/engine/disttae/engine.go new file mode 100644 index 0000000000000000000000000000000000000000..30f9ed15b43e2f240ac99f305f249eca9afb4882 --- /dev/null +++ b/pkg/vm/engine/disttae/engine.go @@ -0,0 +1,78 @@ +// Copyright 2022 Matrix Origin +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package disttae + +import ( + "context" + + "github.com/matrixorigin/matrixone/pkg/pb/logservice" + "github.com/matrixorigin/matrixone/pkg/txn/client" + "github.com/matrixorigin/matrixone/pkg/vm/engine" +) + +type Engine struct { + getClusterDetails GetClusterDetailsFunc +} + +type GetClusterDetailsFunc = func() (logservice.ClusterDetails, error) + +func New( + ctx context.Context, + getClusterDetails GetClusterDetailsFunc, +) *Engine { + return &Engine{ + getClusterDetails: getClusterDetails, + } +} + +var _ engine.Engine = new(Engine) + +func (e *Engine) Create(context.Context, string, client.TxnOperator) error { + //TODO + panic("unimplemented") +} + +func (e *Engine) Database(context.Context, string, client.TxnOperator) (engine.Database, error) { + //TODO + panic("unimplemented") +} + +func (e *Engine) Databases(context.Context, client.TxnOperator) ([]string, error) { + //TODO + panic("unimplemented") +} + +func (e *Engine) Delete(context.Context, string, client.TxnOperator) error { + //TODO + panic("unimplemented") +} + +func (e *Engine) Nodes() (engine.Nodes, error) { + clusterDetails, err := e.getClusterDetails() + if err != nil { + return nil, err + } + + var nodes engine.Nodes + for _, store := range clusterDetails.CNStores { + nodes = append(nodes, engine.Node{ + Mcpu: 1, + Id: store.UUID, + Addr: store.ServiceAddress, + }) + } + + return nodes, nil +} diff --git a/pkg/vm/engine/disttae/engine_test.go b/pkg/vm/engine/disttae/engine_test.go new file mode 100644 index 0000000000000000000000000000000000000000..f259d0b9f86cc138fbbdc79cff5c6973915b4a6a --- /dev/null +++ b/pkg/vm/engine/disttae/engine_test.go @@ -0,0 +1,31 @@ +// Copyright 2022 Matrix Origin +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package disttae + +import ( + "context" + "testing" + + "github.com/matrixorigin/matrixone/pkg/pb/logservice" +) + +func TestNewEngine(t *testing.T) { + ctx := context.Background() + getClusterDetails := func() (details logservice.ClusterDetails, err error) { + return + } + engine := New(ctx, getClusterDetails) + _ = engine +} diff --git a/pkg/vm/engine/txn/txn.go b/pkg/vm/engine/txn.go similarity index 66% rename from pkg/vm/engine/txn/txn.go rename to pkg/vm/engine/txn.go index ebf51783983d336137f041c47ba054663a0dc5a6..41d5e5e5cb6ec444d7420ed1b6f70ed8a3823ad3 100644 --- a/pkg/vm/engine/txn/txn.go +++ b/pkg/vm/engine/txn.go @@ -12,25 +12,31 @@ // See the License for the specific language governing permissions and // limitations under the License. -package txnengine +package engine import ( "bytes" "context" "encoding/gob" + "errors" "reflect" + "strings" "time" + "github.com/matrixorigin/matrixone/pkg/pb/metadata" "github.com/matrixorigin/matrixone/pkg/pb/txn" "github.com/matrixorigin/matrixone/pkg/txn/rpc" ) -func doTxnRequest[ +type Shard = metadata.DNShard + +func DoTxnRequest[ Resp any, Req any, ]( ctx context.Context, - e *Engine, + e Engine, + // TxnOperator.Read or TxnOperator.Write reqFunc func(context.Context, []txn.TxnRequest) (*rpc.SendResult, error), shardsFunc func() ([]Shard, error), op uint32, @@ -46,10 +52,14 @@ func doTxnRequest[ } requests := make([]txn.TxnRequest, 0, len(shards)) for _, shard := range shards { + buf := new(bytes.Buffer) + if err := gob.NewEncoder(buf).Encode(req); err != nil { + panic(err) + } requests = append(requests, txn.TxnRequest{ CNRequest: &txn.CNOpRequest{ OpCode: op, - Payload: mustEncodePayload(req), + Payload: buf.Bytes(), Target: shard, }, }) @@ -93,3 +103,51 @@ func doTxnRequest[ } var errorType = reflect.TypeOf((*error)(nil)).Elem() + +type TxnError struct { + txnError *txn.TxnError +} + +var _ error = TxnError{} + +func (e TxnError) Error() string { + if e.txnError != nil { + return e.txnError.DebugString() + } + panic("impossible") +} + +func errorFromTxnResponses(resps []txn.TxnResponse) error { + for _, resp := range resps { + if resp.TxnError != nil { + return TxnError{ + txnError: resp.TxnError, + } + } + } + return nil +} + +type Errors []error + +var _ error = Errors{} + +func (e Errors) Error() string { + buf := new(strings.Builder) + for i, err := range e { + if i > 0 { + buf.WriteRune('\n') + } + buf.WriteString(err.Error()) + } + return buf.String() +} + +func (e Errors) As(target any) bool { + for _, err := range e { + if errors.As(err, target) { + return true + } + } + return false +} diff --git a/pkg/vm/engine/txn/database.go b/pkg/vm/engine/txn/database.go index 65524a9771578a16c3c814f9e5dcd839fbdf231c..51c4b3afba42cd5a57f6bc81c0c3c1a56c1da316 100644 --- a/pkg/vm/engine/txn/database.go +++ b/pkg/vm/engine/txn/database.go @@ -34,7 +34,7 @@ var _ engine.Database = new(Database) func (d *Database) Create(ctx context.Context, relName string, defs []engine.TableDef) error { - _, err := doTxnRequest[CreateDatabaseResp]( + _, err := engine.DoTxnRequest[CreateDatabaseResp]( ctx, d.engine, d.txnOperator.Write, @@ -56,7 +56,7 @@ func (d *Database) Create(ctx context.Context, relName string, defs []engine.Tab func (d *Database) Delete(ctx context.Context, relName string) error { - _, err := doTxnRequest[DeleteRelationResp]( + _, err := engine.DoTxnRequest[DeleteRelationResp]( ctx, d.engine, d.txnOperator.Write, @@ -76,7 +76,7 @@ func (d *Database) Delete(ctx context.Context, relName string) error { func (d *Database) Relation(ctx context.Context, relName string) (engine.Relation, error) { - resps, err := doTxnRequest[OpenRelationResp]( + resps, err := engine.DoTxnRequest[OpenRelationResp]( ctx, d.engine, d.txnOperator.Read, @@ -111,7 +111,7 @@ func (d *Database) Relation(ctx context.Context, relName string) (engine.Relatio func (d *Database) Relations(ctx context.Context) ([]string, error) { - resps, err := doTxnRequest[GetRelationsResp]( + resps, err := engine.DoTxnRequest[GetRelationsResp]( ctx, d.engine, d.txnOperator.Read, diff --git a/pkg/vm/engine/txn/engine.go b/pkg/vm/engine/txn/engine.go index a75d14973a13e4016bd91b800a90dd402057d67c..9484aed6660fc9e4f7050e0ac717a8614ffe5528 100644 --- a/pkg/vm/engine/txn/engine.go +++ b/pkg/vm/engine/txn/engine.go @@ -49,7 +49,7 @@ var _ engine.Engine = new(Engine) func (e *Engine) Create(ctx context.Context, dbName string, txnOperator client.TxnOperator) error { - _, err := doTxnRequest[CreateDatabaseResp]( + _, err := engine.DoTxnRequest[CreateDatabaseResp]( ctx, e, txnOperator.Write, @@ -68,7 +68,7 @@ func (e *Engine) Create(ctx context.Context, dbName string, txnOperator client.T func (e *Engine) Database(ctx context.Context, dbName string, txnOperator client.TxnOperator) (engine.Database, error) { - resps, err := doTxnRequest[OpenDatabaseResp]( + resps, err := engine.DoTxnRequest[OpenDatabaseResp]( ctx, e, txnOperator.Read, @@ -95,7 +95,7 @@ func (e *Engine) Database(ctx context.Context, dbName string, txnOperator client func (e *Engine) Databases(ctx context.Context, txnOperator client.TxnOperator) ([]string, error) { - resps, err := doTxnRequest[GetDatabasesResp]( + resps, err := engine.DoTxnRequest[GetDatabasesResp]( ctx, e, txnOperator.Read, @@ -117,7 +117,7 @@ func (e *Engine) Databases(ctx context.Context, txnOperator client.TxnOperator) func (e *Engine) Delete(ctx context.Context, dbName string, txnOperator client.TxnOperator) error { - _, err := doTxnRequest[DeleteDatabaseResp]( + _, err := engine.DoTxnRequest[DeleteDatabaseResp]( ctx, e, txnOperator.Write, diff --git a/pkg/vm/engine/txn/error.go b/pkg/vm/engine/txn/error.go index 2c1274d31257de8b6efa65d6e8691e901bf7a615..18a47fffcab434cf05762d0d53aee3129419ad44 100644 --- a/pkg/vm/engine/txn/error.go +++ b/pkg/vm/engine/txn/error.go @@ -15,37 +15,9 @@ package txnengine import ( - "errors" "fmt" - "strings" - - "github.com/matrixorigin/matrixone/pkg/pb/txn" ) -type TxnError struct { - txnError *txn.TxnError -} - -var _ error = TxnError{} - -func (e TxnError) Error() string { - if e.txnError != nil { - return e.txnError.DebugString() - } - panic("impossible") -} - -func errorFromTxnResponses(resps []txn.TxnResponse) error { - for _, resp := range resps { - if resp.TxnError != nil { - return TxnError{ - txnError: resp.TxnError, - } - } - } - return nil -} - type ErrExisted bool var _ error = ErrExisted(true) @@ -106,27 +78,3 @@ var _ error = ErrColumnNotFound{} func (e ErrColumnNotFound) Error() string { return fmt.Sprintf("column not found: %s", e.Name) } - -type Errors []error - -var _ error = Errors{} - -func (e Errors) Error() string { - buf := new(strings.Builder) - for i, err := range e { - if i > 0 { - buf.WriteRune('\n') - } - buf.WriteString(err.Error()) - } - return buf.String() -} - -func (e Errors) As(target any) bool { - for _, err := range e { - if errors.As(err, target) { - return true - } - } - return false -} diff --git a/pkg/vm/engine/txn/operations.go b/pkg/vm/engine/txn/operations.go index b394095608a918442077dfc12e2d309a5894b1bf..4dc7fe5b3adb58ff77056c0eaf3f394102264865 100644 --- a/pkg/vm/engine/txn/operations.go +++ b/pkg/vm/engine/txn/operations.go @@ -15,7 +15,6 @@ package txnengine import ( - "bytes" "encoding/gob" "github.com/matrixorigin/matrixone/pkg/container/batch" @@ -47,14 +46,6 @@ const ( OpCloseTableIter ) -func mustEncodePayload(o any) []byte { - buf := new(bytes.Buffer) - if err := gob.NewEncoder(buf).Encode(o); err != nil { - panic(err) - } - return buf.Bytes() -} - func init() { // register TableDef types diff --git a/pkg/vm/engine/txn/table.go b/pkg/vm/engine/txn/table.go index 2c13e4b2a2b4c8f4b955512b389eff3890492797..187dae05ecbf8d9434bcc8bc8fee793ecc74db0e 100644 --- a/pkg/vm/engine/txn/table.go +++ b/pkg/vm/engine/txn/table.go @@ -41,7 +41,7 @@ func (*Table) Size(string) int64 { func (t *Table) AddTableDef(ctx context.Context, def engine.TableDef) error { - _, err := doTxnRequest[AddTableDefResp]( + _, err := engine.DoTxnRequest[AddTableDefResp]( ctx, t.engine, t.txnOperator.Write, @@ -61,7 +61,7 @@ func (t *Table) AddTableDef(ctx context.Context, def engine.TableDef) error { func (t *Table) DelTableDef(ctx context.Context, def engine.TableDef) error { - _, err := doTxnRequest[DelTableDefResp]( + _, err := engine.DoTxnRequest[DelTableDefResp]( ctx, t.engine, t.txnOperator.Write, @@ -94,7 +94,7 @@ func (t *Table) Delete(ctx context.Context, vec *vector.Vector, _ string) error } for _, shard := range shards { - _, err := doTxnRequest[DeleteResp]( + _, err := engine.DoTxnRequest[DeleteResp]( ctx, t.engine, t.txnOperator.Write, @@ -123,7 +123,7 @@ func (*Table) GetPriKeyOrHideKey() ([]engine.Attribute, bool) { func (t *Table) GetPrimaryKeys(ctx context.Context) ([]*engine.Attribute, error) { - resps, err := doTxnRequest[GetPrimaryKeysResp]( + resps, err := engine.DoTxnRequest[GetPrimaryKeysResp]( ctx, t.engine, t.txnOperator.Read, @@ -157,7 +157,7 @@ func (t *Table) Ranges(ctx context.Context) ([][]byte, error) { func (t *Table) TableDefs(ctx context.Context) ([]engine.TableDef, error) { - resps, err := doTxnRequest[GetTableDefsResp]( + resps, err := engine.DoTxnRequest[GetTableDefsResp]( ctx, t.engine, t.txnOperator.Read, @@ -178,7 +178,7 @@ func (t *Table) TableDefs(ctx context.Context) ([]engine.TableDef, error) { func (t *Table) Truncate(ctx context.Context) (uint64, error) { - resps, err := doTxnRequest[TruncateResp]( + resps, err := engine.DoTxnRequest[TruncateResp]( ctx, t.engine, t.txnOperator.Write, @@ -216,7 +216,7 @@ func (t *Table) Update(ctx context.Context, data *batch.Batch) error { } for _, shard := range shards { - _, err := doTxnRequest[UpdateResp]( + _, err := engine.DoTxnRequest[UpdateResp]( ctx, t.engine, t.txnOperator.Write, @@ -251,7 +251,7 @@ func (t *Table) Write(ctx context.Context, data *batch.Batch) error { } for _, shard := range shards { - _, err := doTxnRequest[WriteResp]( + _, err := engine.DoTxnRequest[WriteResp]( ctx, t.engine, t.txnOperator.Write, diff --git a/pkg/vm/engine/txn/table_reader.go b/pkg/vm/engine/txn/table_reader.go index 5b08b29607c92a54c389534e94cb4ec370b469ec..2ac72974125e4736a4d40eca1646b57aaf6ec3e6 100644 --- a/pkg/vm/engine/txn/table_reader.go +++ b/pkg/vm/engine/txn/table_reader.go @@ -72,7 +72,7 @@ func (t *Table) NewReader( return nil, err } - resps, err := doTxnRequest[NewTableIterResp]( + resps, err := engine.DoTxnRequest[NewTableIterResp]( ctx, t.engine, t.txnOperator.Read, @@ -136,7 +136,7 @@ func (t *TableReader) Read(colNames []string, plan *plan.Expr, mh *mheap.Mheap) return nil, nil } - resps, err := doTxnRequest[ReadResp]( + resps, err := engine.DoTxnRequest[ReadResp]( t.ctx, t.engine, t.txnOperator.Read, @@ -169,7 +169,7 @@ func (t *TableReader) Close() error { return nil } for _, info := range t.iterInfos { - _, err := doTxnRequest[CloseTableIterResp]( + _, err := engine.DoTxnRequest[CloseTableIterResp]( t.ctx, t.engine, t.txnOperator.Read,