From 133b4b3c609bbc163769bbc05ba74cc72f534279 Mon Sep 17 00:00:00 2001
From: reusee <reusee@gmail.com>
Date: Sat, 20 Aug 2022 19:46:12 +0800
Subject: [PATCH] cnservice: add memory engine to service.initEngine (#4603)

cnservice: add memory engine to service.initEngine

txnengine: add ShardToSingleStatic

Approved by: @nnsgmsone
---
 pkg/cnservice/server.go         | 15 ++++++--
 pkg/vm/engine/txn/shard.go      | 64 +++++++++++++++++++++++++++++++++
 pkg/vm/engine/txn/test/env.go   |  2 +-
 pkg/vm/engine/txn/test/node.go  |  6 ++++
 pkg/vm/engine/txn/test/shard.go | 52 ---------------------------
 5 files changed, 83 insertions(+), 56 deletions(-)
 delete mode 100644 pkg/vm/engine/txn/test/shard.go

diff --git a/pkg/cnservice/server.go b/pkg/cnservice/server.go
index f4a712008..82992bee6 100644
--- a/pkg/cnservice/server.go
+++ b/pkg/cnservice/server.go
@@ -27,6 +27,7 @@ import (
 	ie "github.com/matrixorigin/matrixone/pkg/util/internalExecutor"
 	"github.com/matrixorigin/matrixone/pkg/util/metric"
 	"github.com/matrixorigin/matrixone/pkg/util/trace"
+	txnengine "github.com/matrixorigin/matrixone/pkg/vm/engine/txn"
 	"github.com/matrixorigin/matrixone/pkg/vm/mmu/host"
 
 	"github.com/fagongzi/goetty/v2"
@@ -142,7 +143,15 @@ func (s *service) initEngine(
 		//TODO
 
 	case EngineMemory:
-		//TODO
+		pu.TxnClient = s.txnClient
+		pu.StorageEngine = txnengine.New(
+			ctx,
+			new(txnengine.ShardToSingleStatic), //TODO use hashing shard policy
+			txnengine.GetClusterDetailsFromHAKeeper(
+				ctx,
+				s.hakeeperClient,
+			),
+		)
 
 	default:
 		return fmt.Errorf("unknown engine type: %s", s.cfg.Engine.Type)
@@ -208,7 +217,7 @@ func (s *service) initHAKeeperClient() error {
 }
 
 func (s *service) initTxnSender() error {
-	sender, err := rpc.NewSender(s.logger) //TODO set proper options
+	sender, err := rpc.NewSender(s.logger) //TODO options
 	if err != nil {
 		return err
 	}
@@ -217,7 +226,7 @@ func (s *service) initTxnSender() error {
 }
 
 func (s *service) initTxnClient() error {
-	txnClient := client.NewTxnClient(s.txnSender) //TODO other options
+	txnClient := client.NewTxnClient(s.txnSender) //TODO options
 	s.txnClient = txnClient
 	return nil
 }
diff --git a/pkg/vm/engine/txn/shard.go b/pkg/vm/engine/txn/shard.go
index f41f76bf6..83304a5eb 100644
--- a/pkg/vm/engine/txn/shard.go
+++ b/pkg/vm/engine/txn/shard.go
@@ -15,6 +15,8 @@
 package txnengine
 
 import (
+	"sync"
+
 	"github.com/matrixorigin/matrixone/pkg/container/batch"
 	"github.com/matrixorigin/matrixone/pkg/container/vector"
 	logservicepb "github.com/matrixorigin/matrixone/pkg/pb/logservice"
@@ -67,3 +69,65 @@ func theseShards(shards []Shard) func() ([]Shard, error) {
 		return shards, nil
 	}
 }
+
+type ShardToSingleStatic struct {
+	setOnce sync.Once
+	shard   Shard
+}
+
+func (s *ShardToSingleStatic) setShard(nodes []logservicepb.DNStore) {
+	s.setOnce.Do(func() {
+		info := nodes[0].Shards[0]
+		s.shard = Shard{
+			DNShardRecord: metadata.DNShardRecord{
+				ShardID: info.ShardID,
+			},
+			ReplicaID: info.ReplicaID,
+		}
+	})
+}
+
+var _ ShardPolicy = new(ShardToSingleStatic)
+
+func (s *ShardToSingleStatic) Vector(
+	vec *vector.Vector,
+	nodes []logservicepb.DNStore,
+) (
+	sharded []*ShardedVector,
+	err error,
+) {
+	s.setShard(nodes)
+	sharded = append(sharded, &ShardedVector{
+		Shard:  s.shard,
+		Vector: vec,
+	})
+	return
+}
+
+func (s *ShardToSingleStatic) Batch(
+	bat *batch.Batch,
+	nodes []logservicepb.DNStore,
+) (
+	sharded []*ShardedBatch,
+	err error,
+) {
+	s.setShard(nodes)
+	sharded = append(sharded, &ShardedBatch{
+		Shard: s.shard,
+		Batch: bat,
+	})
+	return
+}
+
+func (s *ShardToSingleStatic) Stores(stores []logservicepb.DNStore) (shards []Shard, err error) {
+	for _, store := range stores {
+		info := store.Shards[0]
+		shards = append(shards, Shard{
+			DNShardRecord: metadata.DNShardRecord{
+				ShardID: info.ShardID,
+			},
+			ReplicaID: info.ReplicaID,
+		})
+	}
+	return
+}
diff --git a/pkg/vm/engine/txn/test/env.go b/pkg/vm/engine/txn/test/env.go
index 633cca543..ceecd5eac 100644
--- a/pkg/vm/engine/txn/test/env.go
+++ b/pkg/vm/engine/txn/test/env.go
@@ -69,7 +69,7 @@ func newEnv(ctx context.Context) (*testEnv, error) {
 
 	env.engine = txnengine.New(
 		context.Background(),
-		env,
+		new(txnengine.ShardToSingleStatic),
 		func() (details logservicepb.ClusterDetails, err error) {
 			for _, node := range env.nodes {
 				details.DNStores = append(details.DNStores, node.info)
diff --git a/pkg/vm/engine/txn/test/node.go b/pkg/vm/engine/txn/test/node.go
index fd383e7bb..198493f9a 100644
--- a/pkg/vm/engine/txn/test/node.go
+++ b/pkg/vm/engine/txn/test/node.go
@@ -56,6 +56,12 @@ func (t *testEnv) NewNode(id uint64) *Node {
 		UUID:           uuid.NewString(),
 		ServiceAddress: shard.Address,
 		State:          logservicepb.NormalState,
+		Shards: []logservicepb.DNShardInfo{
+			{
+				ShardID:   id,
+				ReplicaID: id,
+			},
+		},
 	}
 
 	loggerConfig := zap.Config{
diff --git a/pkg/vm/engine/txn/test/shard.go b/pkg/vm/engine/txn/test/shard.go
deleted file mode 100644
index c764cdef1..000000000
--- a/pkg/vm/engine/txn/test/shard.go
+++ /dev/null
@@ -1,52 +0,0 @@
-// Copyright 2022 Matrix Origin
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//      http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package testtxnengine
-
-import (
-	"github.com/matrixorigin/matrixone/pkg/container/batch"
-	"github.com/matrixorigin/matrixone/pkg/container/vector"
-	logservicepb "github.com/matrixorigin/matrixone/pkg/pb/logservice"
-	txnengine "github.com/matrixorigin/matrixone/pkg/vm/engine/txn"
-)
-
-var _ txnengine.ShardPolicy = new(testEnv)
-
-func (t *testEnv) Batch(batch *batch.Batch, nodes []logservicepb.DNStore) (shards []*txnengine.ShardedBatch, err error) {
-	shards = append(shards, &txnengine.ShardedBatch{
-		Shard: t.nodes[0].shard,
-		Batch: batch,
-	})
-	return
-}
-
-func (t *testEnv) Vector(vec *vector.Vector, nodes []logservicepb.DNStore) (shards []*txnengine.ShardedVector, err error) {
-	shards = append(shards, &txnengine.ShardedVector{
-		Shard:  t.nodes[0].shard,
-		Vector: vec,
-	})
-	return
-}
-
-func (t *testEnv) Stores(nodes []logservicepb.DNStore) (shards []txnengine.Shard, err error) {
-	for _, node := range nodes {
-		for _, n := range t.nodes {
-			if n.info.UUID == node.UUID {
-				shards = append(shards, n.shard)
-				break
-			}
-		}
-	}
-	return
-}
-- 
GitLab