diff --git a/pkg/cnservice/server.go b/pkg/cnservice/server.go index f4a7120088eaef25a99e27d1417fddb61fe503eb..82992bee6f2fd9a822c6adb3a5d6860be31a4566 100644 --- a/pkg/cnservice/server.go +++ b/pkg/cnservice/server.go @@ -27,6 +27,7 @@ import ( ie "github.com/matrixorigin/matrixone/pkg/util/internalExecutor" "github.com/matrixorigin/matrixone/pkg/util/metric" "github.com/matrixorigin/matrixone/pkg/util/trace" + txnengine "github.com/matrixorigin/matrixone/pkg/vm/engine/txn" "github.com/matrixorigin/matrixone/pkg/vm/mmu/host" "github.com/fagongzi/goetty/v2" @@ -142,7 +143,15 @@ func (s *service) initEngine( //TODO case EngineMemory: - //TODO + pu.TxnClient = s.txnClient + pu.StorageEngine = txnengine.New( + ctx, + new(txnengine.ShardToSingleStatic), //TODO use hashing shard policy + txnengine.GetClusterDetailsFromHAKeeper( + ctx, + s.hakeeperClient, + ), + ) default: return fmt.Errorf("unknown engine type: %s", s.cfg.Engine.Type) @@ -208,7 +217,7 @@ func (s *service) initHAKeeperClient() error { } func (s *service) initTxnSender() error { - sender, err := rpc.NewSender(s.logger) //TODO set proper options + sender, err := rpc.NewSender(s.logger) //TODO options if err != nil { return err } @@ -217,7 +226,7 @@ func (s *service) initTxnSender() error { } func (s *service) initTxnClient() error { - txnClient := client.NewTxnClient(s.txnSender) //TODO other options + txnClient := client.NewTxnClient(s.txnSender) //TODO options s.txnClient = txnClient return nil } diff --git a/pkg/vm/engine/txn/shard.go b/pkg/vm/engine/txn/shard.go index f41f76bf67b5208ed9f0e30eb2322d2e2c605467..83304a5eb5a41538c9759033abbfc6a9076ee54e 100644 --- a/pkg/vm/engine/txn/shard.go +++ b/pkg/vm/engine/txn/shard.go @@ -15,6 +15,8 @@ package txnengine import ( + "sync" + "github.com/matrixorigin/matrixone/pkg/container/batch" "github.com/matrixorigin/matrixone/pkg/container/vector" logservicepb "github.com/matrixorigin/matrixone/pkg/pb/logservice" @@ -67,3 +69,65 @@ func theseShards(shards []Shard) func() ([]Shard, error) { return shards, nil } } + +type ShardToSingleStatic struct { + setOnce sync.Once + shard Shard +} + +func (s *ShardToSingleStatic) setShard(nodes []logservicepb.DNStore) { + s.setOnce.Do(func() { + info := nodes[0].Shards[0] + s.shard = Shard{ + DNShardRecord: metadata.DNShardRecord{ + ShardID: info.ShardID, + }, + ReplicaID: info.ReplicaID, + } + }) +} + +var _ ShardPolicy = new(ShardToSingleStatic) + +func (s *ShardToSingleStatic) Vector( + vec *vector.Vector, + nodes []logservicepb.DNStore, +) ( + sharded []*ShardedVector, + err error, +) { + s.setShard(nodes) + sharded = append(sharded, &ShardedVector{ + Shard: s.shard, + Vector: vec, + }) + return +} + +func (s *ShardToSingleStatic) Batch( + bat *batch.Batch, + nodes []logservicepb.DNStore, +) ( + sharded []*ShardedBatch, + err error, +) { + s.setShard(nodes) + sharded = append(sharded, &ShardedBatch{ + Shard: s.shard, + Batch: bat, + }) + return +} + +func (s *ShardToSingleStatic) Stores(stores []logservicepb.DNStore) (shards []Shard, err error) { + for _, store := range stores { + info := store.Shards[0] + shards = append(shards, Shard{ + DNShardRecord: metadata.DNShardRecord{ + ShardID: info.ShardID, + }, + ReplicaID: info.ReplicaID, + }) + } + return +} diff --git a/pkg/vm/engine/txn/test/env.go b/pkg/vm/engine/txn/test/env.go index 633cca543dbdb689ffce0f1e5651197f4c0b36c5..ceecd5eacd7006de47bdf03650b03ebfe4f861f7 100644 --- a/pkg/vm/engine/txn/test/env.go +++ b/pkg/vm/engine/txn/test/env.go @@ -69,7 +69,7 @@ func newEnv(ctx context.Context) (*testEnv, error) { env.engine = txnengine.New( context.Background(), - env, + new(txnengine.ShardToSingleStatic), func() (details logservicepb.ClusterDetails, err error) { for _, node := range env.nodes { details.DNStores = append(details.DNStores, node.info) diff --git a/pkg/vm/engine/txn/test/node.go b/pkg/vm/engine/txn/test/node.go index fd383e7bb903f00ca6290e2dcf2f6635612fc322..198493f9aeeb10a73577793c058b07703596db44 100644 --- a/pkg/vm/engine/txn/test/node.go +++ b/pkg/vm/engine/txn/test/node.go @@ -56,6 +56,12 @@ func (t *testEnv) NewNode(id uint64) *Node { UUID: uuid.NewString(), ServiceAddress: shard.Address, State: logservicepb.NormalState, + Shards: []logservicepb.DNShardInfo{ + { + ShardID: id, + ReplicaID: id, + }, + }, } loggerConfig := zap.Config{ diff --git a/pkg/vm/engine/txn/test/shard.go b/pkg/vm/engine/txn/test/shard.go deleted file mode 100644 index c764cdef19d85ec5fbcc379266dd0d251d369fba..0000000000000000000000000000000000000000 --- a/pkg/vm/engine/txn/test/shard.go +++ /dev/null @@ -1,52 +0,0 @@ -// Copyright 2022 Matrix Origin -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package testtxnengine - -import ( - "github.com/matrixorigin/matrixone/pkg/container/batch" - "github.com/matrixorigin/matrixone/pkg/container/vector" - logservicepb "github.com/matrixorigin/matrixone/pkg/pb/logservice" - txnengine "github.com/matrixorigin/matrixone/pkg/vm/engine/txn" -) - -var _ txnengine.ShardPolicy = new(testEnv) - -func (t *testEnv) Batch(batch *batch.Batch, nodes []logservicepb.DNStore) (shards []*txnengine.ShardedBatch, err error) { - shards = append(shards, &txnengine.ShardedBatch{ - Shard: t.nodes[0].shard, - Batch: batch, - }) - return -} - -func (t *testEnv) Vector(vec *vector.Vector, nodes []logservicepb.DNStore) (shards []*txnengine.ShardedVector, err error) { - shards = append(shards, &txnengine.ShardedVector{ - Shard: t.nodes[0].shard, - Vector: vec, - }) - return -} - -func (t *testEnv) Stores(nodes []logservicepb.DNStore) (shards []txnengine.Shard, err error) { - for _, node := range nodes { - for _, n := range t.nodes { - if n.info.UUID == node.UUID { - shards = append(shards, n.shard) - break - } - } - } - return -}