Skip to content
Snippets Groups Projects
Unverified Commit 133b4b3c authored by reusee's avatar reusee Committed by GitHub
Browse files

cnservice: add memory engine to service.initEngine (#4603)

cnservice: add memory engine to service.initEngine

txnengine: add ShardToSingleStatic

Approved by: @nnsgmsone
parent 0d1ec504
No related branches found
No related tags found
No related merge requests found
...@@ -27,6 +27,7 @@ import ( ...@@ -27,6 +27,7 @@ import (
ie "github.com/matrixorigin/matrixone/pkg/util/internalExecutor" ie "github.com/matrixorigin/matrixone/pkg/util/internalExecutor"
"github.com/matrixorigin/matrixone/pkg/util/metric" "github.com/matrixorigin/matrixone/pkg/util/metric"
"github.com/matrixorigin/matrixone/pkg/util/trace" "github.com/matrixorigin/matrixone/pkg/util/trace"
txnengine "github.com/matrixorigin/matrixone/pkg/vm/engine/txn"
"github.com/matrixorigin/matrixone/pkg/vm/mmu/host" "github.com/matrixorigin/matrixone/pkg/vm/mmu/host"
"github.com/fagongzi/goetty/v2" "github.com/fagongzi/goetty/v2"
...@@ -142,7 +143,15 @@ func (s *service) initEngine( ...@@ -142,7 +143,15 @@ func (s *service) initEngine(
//TODO //TODO
case EngineMemory: case EngineMemory:
//TODO pu.TxnClient = s.txnClient
pu.StorageEngine = txnengine.New(
ctx,
new(txnengine.ShardToSingleStatic), //TODO use hashing shard policy
txnengine.GetClusterDetailsFromHAKeeper(
ctx,
s.hakeeperClient,
),
)
default: default:
return fmt.Errorf("unknown engine type: %s", s.cfg.Engine.Type) return fmt.Errorf("unknown engine type: %s", s.cfg.Engine.Type)
...@@ -208,7 +217,7 @@ func (s *service) initHAKeeperClient() error { ...@@ -208,7 +217,7 @@ func (s *service) initHAKeeperClient() error {
} }
func (s *service) initTxnSender() error { func (s *service) initTxnSender() error {
sender, err := rpc.NewSender(s.logger) //TODO set proper options sender, err := rpc.NewSender(s.logger) //TODO options
if err != nil { if err != nil {
return err return err
} }
...@@ -217,7 +226,7 @@ func (s *service) initTxnSender() error { ...@@ -217,7 +226,7 @@ func (s *service) initTxnSender() error {
} }
func (s *service) initTxnClient() error { func (s *service) initTxnClient() error {
txnClient := client.NewTxnClient(s.txnSender) //TODO other options txnClient := client.NewTxnClient(s.txnSender) //TODO options
s.txnClient = txnClient s.txnClient = txnClient
return nil return nil
} }
...@@ -15,6 +15,8 @@ ...@@ -15,6 +15,8 @@
package txnengine package txnengine
import ( import (
"sync"
"github.com/matrixorigin/matrixone/pkg/container/batch" "github.com/matrixorigin/matrixone/pkg/container/batch"
"github.com/matrixorigin/matrixone/pkg/container/vector" "github.com/matrixorigin/matrixone/pkg/container/vector"
logservicepb "github.com/matrixorigin/matrixone/pkg/pb/logservice" logservicepb "github.com/matrixorigin/matrixone/pkg/pb/logservice"
...@@ -67,3 +69,65 @@ func theseShards(shards []Shard) func() ([]Shard, error) { ...@@ -67,3 +69,65 @@ func theseShards(shards []Shard) func() ([]Shard, error) {
return shards, nil return shards, nil
} }
} }
type ShardToSingleStatic struct {
setOnce sync.Once
shard Shard
}
func (s *ShardToSingleStatic) setShard(nodes []logservicepb.DNStore) {
s.setOnce.Do(func() {
info := nodes[0].Shards[0]
s.shard = Shard{
DNShardRecord: metadata.DNShardRecord{
ShardID: info.ShardID,
},
ReplicaID: info.ReplicaID,
}
})
}
var _ ShardPolicy = new(ShardToSingleStatic)
func (s *ShardToSingleStatic) Vector(
vec *vector.Vector,
nodes []logservicepb.DNStore,
) (
sharded []*ShardedVector,
err error,
) {
s.setShard(nodes)
sharded = append(sharded, &ShardedVector{
Shard: s.shard,
Vector: vec,
})
return
}
func (s *ShardToSingleStatic) Batch(
bat *batch.Batch,
nodes []logservicepb.DNStore,
) (
sharded []*ShardedBatch,
err error,
) {
s.setShard(nodes)
sharded = append(sharded, &ShardedBatch{
Shard: s.shard,
Batch: bat,
})
return
}
func (s *ShardToSingleStatic) Stores(stores []logservicepb.DNStore) (shards []Shard, err error) {
for _, store := range stores {
info := store.Shards[0]
shards = append(shards, Shard{
DNShardRecord: metadata.DNShardRecord{
ShardID: info.ShardID,
},
ReplicaID: info.ReplicaID,
})
}
return
}
...@@ -69,7 +69,7 @@ func newEnv(ctx context.Context) (*testEnv, error) { ...@@ -69,7 +69,7 @@ func newEnv(ctx context.Context) (*testEnv, error) {
env.engine = txnengine.New( env.engine = txnengine.New(
context.Background(), context.Background(),
env, new(txnengine.ShardToSingleStatic),
func() (details logservicepb.ClusterDetails, err error) { func() (details logservicepb.ClusterDetails, err error) {
for _, node := range env.nodes { for _, node := range env.nodes {
details.DNStores = append(details.DNStores, node.info) details.DNStores = append(details.DNStores, node.info)
......
...@@ -56,6 +56,12 @@ func (t *testEnv) NewNode(id uint64) *Node { ...@@ -56,6 +56,12 @@ func (t *testEnv) NewNode(id uint64) *Node {
UUID: uuid.NewString(), UUID: uuid.NewString(),
ServiceAddress: shard.Address, ServiceAddress: shard.Address,
State: logservicepb.NormalState, State: logservicepb.NormalState,
Shards: []logservicepb.DNShardInfo{
{
ShardID: id,
ReplicaID: id,
},
},
} }
loggerConfig := zap.Config{ loggerConfig := zap.Config{
......
// Copyright 2022 Matrix Origin
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package testtxnengine
import (
"github.com/matrixorigin/matrixone/pkg/container/batch"
"github.com/matrixorigin/matrixone/pkg/container/vector"
logservicepb "github.com/matrixorigin/matrixone/pkg/pb/logservice"
txnengine "github.com/matrixorigin/matrixone/pkg/vm/engine/txn"
)
var _ txnengine.ShardPolicy = new(testEnv)
func (t *testEnv) Batch(batch *batch.Batch, nodes []logservicepb.DNStore) (shards []*txnengine.ShardedBatch, err error) {
shards = append(shards, &txnengine.ShardedBatch{
Shard: t.nodes[0].shard,
Batch: batch,
})
return
}
func (t *testEnv) Vector(vec *vector.Vector, nodes []logservicepb.DNStore) (shards []*txnengine.ShardedVector, err error) {
shards = append(shards, &txnengine.ShardedVector{
Shard: t.nodes[0].shard,
Vector: vec,
})
return
}
func (t *testEnv) Stores(nodes []logservicepb.DNStore) (shards []txnengine.Shard, err error) {
for _, node := range nodes {
for _, n := range t.nodes {
if n.info.UUID == node.UUID {
shards = append(shards, n.shard)
break
}
}
}
return
}
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment