Skip to content
Snippets Groups Projects
Unverified Commit b5c13430 authored by chenmingsong's avatar chenmingsong Committed by GitHub
Browse files

add serialization methods and cn-client codes for pipeline's remote-run (#4757)

1. add serialization methods for struct Compile.Scope for pipeline's remote run.
2. add cn-client and cn-service related methods.

Approved by: @daviszhen, @reusee, @zhangxu19830126, @nnsgmsone
parent 63a75c8a
No related branches found
Tags v4.0-rc6
No related merge requests found
Showing
with 4063 additions and 1190 deletions
......@@ -19,6 +19,8 @@ import (
"errors"
"flag"
"fmt"
"github.com/matrixorigin/matrixone/pkg/cnservice/cnclient"
"github.com/matrixorigin/matrixone/pkg/sql/compile"
"math/rand"
"os"
"os/signal"
......@@ -88,13 +90,17 @@ func startService(cfg *Config, stopper *stopper.Stopper) error {
func startCNService(cfg *Config, stopper *stopper.Stopper) error {
return stopper.RunNamedTask("cn-service", func(ctx context.Context) {
c := cfg.getCNServiceConfig()
s, err := cnservice.NewService(&c, ctx)
s, err := cnservice.NewService(&c, ctx, cnservice.WithMessageHandle(compile.CnServerMessageHandler))
if err != nil {
panic(err)
}
if err := s.Start(); err != nil {
panic(err)
}
err = cnclient.NewCNClient(&cnclient.ClientConfig{})
if err != nil {
panic(err)
}
<-ctx.Done()
if err := s.Close(); err != nil {
......
// Copyright 2021 - 2022 Matrix Origin
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package cnclient
import (
"context"
"github.com/fagongzi/goetty/v2"
"github.com/matrixorigin/matrixone/pkg/common/morpc"
"github.com/matrixorigin/matrixone/pkg/pb/pipeline"
"sync"
)
var Client *CNClient
type CNClient struct {
config *ClientConfig
client morpc.RPCClient
// pool for send message
requestPool *sync.Pool
}
func (c *CNClient) Send(ctx context.Context, backend string, request morpc.Message) (*morpc.Future, error) {
return c.client.Send(ctx, backend, request)
}
func (c *CNClient) NewStream(backend string) (morpc.Stream, error) {
return c.client.NewStream(backend)
}
func (c *CNClient) Close() error {
return c.client.Close()
}
const (
dfMaxSenderNumber = 10
dfClientReadBufferSize = 1 << 10
dfClientWriteBufferSize = 1 << 10
dfPayLoadCopyBufferSize = 1 << 20
)
// ClientConfig a config to init a CNClient
type ClientConfig struct {
// MaxSenderNumber is the max number of backends per host for compute node service.
MaxSenderNumber int
// related buffer size.
PayLoadCopyBufferSize int
ReadBufferSize int
WriteBufferSize int
}
func NewCNClient(cfg *ClientConfig) error {
var err error
cfg.Fill()
Client = &CNClient{config: cfg}
Client.requestPool = &sync.Pool{New: func() any { return &pipeline.Message{} }}
codec := morpc.NewMessageCodec(Client.acquireMessage, cfg.PayLoadCopyBufferSize)
factory := morpc.NewGoettyBasedBackendFactory(codec,
morpc.WithBackendConnectWhenCreate(),
morpc.WithBackendGoettyOptions(goetty.WithSessionRWBUfferSize(
cfg.ReadBufferSize, cfg.WriteBufferSize)),
)
Client.client, err = morpc.NewClient(factory,
morpc.WithClientMaxBackendPerHost(cfg.MaxSenderNumber),
)
return err
}
func (c *CNClient) acquireMessage() morpc.Message {
return c.requestPool.Get().(*pipeline.Message)
}
// Fill set some default value for client config.
func (cfg *ClientConfig) Fill() {
if cfg.PayLoadCopyBufferSize < 0 {
cfg.PayLoadCopyBufferSize = dfPayLoadCopyBufferSize
}
if cfg.MaxSenderNumber <= 0 {
cfg.MaxSenderNumber = dfMaxSenderNumber
}
if cfg.ReadBufferSize < 0 {
cfg.ReadBufferSize = dfClientReadBufferSize
}
if cfg.WriteBufferSize < 0 {
cfg.WriteBufferSize = dfClientWriteBufferSize
}
}
......@@ -36,7 +36,9 @@ import (
"github.com/matrixorigin/matrixone/pkg/pb/pipeline"
)
func NewService(cfg *Config, ctx context.Context) (Service, error) {
type Options func(*service)
func NewService(cfg *Config, ctx context.Context, options ...Options) (Service, error) {
if err := cfg.Validate(); err != nil {
return nil, err
......@@ -44,26 +46,31 @@ func NewService(cfg *Config, ctx context.Context) (Service, error) {
srv := &service{cfg: cfg}
srv.logger = logutil.Adjust(srv.logger)
srv.pool = &sync.Pool{
srv.responsePool = &sync.Pool{
New: func() any {
return &pipeline.Message{}
},
}
pu := config.NewParameterUnit(&cfg.Frontend, nil, nil, nil, nil, nil)
cfg.Frontend.SetDefaultValues()
err := srv.initMOServer(ctx, pu)
if err != nil {
return nil, err
}
server, err := morpc.NewRPCServer("cn-server", cfg.ListenAddress,
morpc.NewMessageCodec(srv.acquireMessage, 16<<20),
morpc.WithServerGoettyOptions(goetty.WithSessionRWBUfferSize(1<<20, 1<<20)))
morpc.NewMessageCodec(srv.acquireMessage, cfg.PayLoadCopyBufferSize),
morpc.WithServerGoettyOptions(goetty.WithSessionRWBUfferSize(cfg.ReadBufferSize, cfg.WriteBufferSize)))
if err != nil {
return nil, err
}
server.RegisterRequestHandler(compile.NewServer().HandleRequest)
srv.server = server
pu := config.NewParameterUnit(&cfg.Frontend, nil, nil, nil, nil, nil)
cfg.Frontend.SetDefaultValues()
err = srv.initMOServer(ctx, pu)
if err != nil {
return nil, err
srv.requestHandler = defaultRequestHandler
for _, opt := range options {
opt(srv)
}
return srv, nil
......@@ -87,15 +94,16 @@ func (s *service) Close() error {
}
func (s *service) acquireMessage() morpc.Message {
return s.pool.Get().(*pipeline.Message)
return s.responsePool.Get().(*pipeline.Message)
}
/*
func (s *service) releaseMessage(msg *pipeline.Message) {
msg.Reset()
s.pool.Put(msg)
func defaultRequestHandler(ctx context.Context, message morpc.Message, cs morpc.ClientSession) error {
return nil
}
*/
//func (s *service) handleRequest(ctx context.Context, req morpc.Message, _ uint64, cs morpc.ClientSession) error {
// return s.requestHandler(ctx, req, cs)
//}
func (s *service) initMOServer(ctx context.Context, pu *config.ParameterUnit) error {
var err error
......@@ -234,3 +242,9 @@ func (s *service) getTxnClient() (c client.TxnClient, err error) {
c = s._txnClient
return
}
func WithMessageHandle(f func(ctx context.Context, message morpc.Message, cs morpc.ClientSession) error) Options {
return func(s *service) {
s.requestHandler = f
}
}
......@@ -61,6 +61,11 @@ type Config struct {
S3 fileservice.S3Config `toml:"s3"`
}
// parameters for cn-server related buffer.
PayLoadCopyBufferSize int
ReadBufferSize int
WriteBufferSize int
// Pipeline configuration
Pipeline struct {
// HostSize is the memory limit
......@@ -109,9 +114,10 @@ func (c *Config) Validate() error {
type service struct {
cfg *Config
pool *sync.Pool
responsePool *sync.Pool
logger *zap.Logger
server morpc.RPCServer
requestHandler func(ctx context.Context, message morpc.Message, cs morpc.ClientSession) error
cancelMoServerFunc context.CancelFunc
mo *frontend.MOServer
initHakeeperClientOnce sync.Once
......
......@@ -14,7 +14,9 @@
package pipeline
import fmt "fmt"
import "fmt"
const MessageEnd = 1
func (m *Message) Size() int {
return m.ProtoSize()
......
This diff is collapsed.
......@@ -22,6 +22,7 @@ import (
"github.com/matrixorigin/matrixone/pkg/container/batch"
"github.com/matrixorigin/matrixone/pkg/container/types"
"github.com/matrixorigin/matrixone/pkg/pb/plan"
"github.com/matrixorigin/matrixone/pkg/sql/colexec"
"github.com/matrixorigin/matrixone/pkg/sql/colexec/hashbuild"
"github.com/matrixorigin/matrixone/pkg/sql/plan/function"
"github.com/matrixorigin/matrixone/pkg/testutil"
......@@ -51,7 +52,7 @@ var (
func init() {
tcs = []joinTestCase{
newTestCase(testutil.NewMheap(), []bool{false}, []types.Type{{Oid: types.T_int8}}, []ResultPos{{0, 0}},
newTestCase(testutil.NewMheap(), []bool{false}, []types.Type{{Oid: types.T_int8}}, []colexec.ResultPos{colexec.NewResultPos(0, 0)},
[][]*plan.Expr{
{
newExpr(0, types.Type{Oid: types.T_int8}),
......@@ -60,7 +61,7 @@ func init() {
newExpr(0, types.Type{Oid: types.T_int8}),
},
}),
newTestCase(testutil.NewMheap(), []bool{true}, []types.Type{{Oid: types.T_int8}}, []ResultPos{{0, 0}, {1, 0}},
newTestCase(testutil.NewMheap(), []bool{true}, []types.Type{{Oid: types.T_int8}}, []colexec.ResultPos{colexec.NewResultPos(0, 0), colexec.NewResultPos(1, 0)},
[][]*plan.Expr{
{
newExpr(0, types.Type{Oid: types.T_int8}),
......@@ -104,7 +105,7 @@ func TestJoin(t *testing.T) {
func BenchmarkJoin(b *testing.B) {
for i := 0; i < b.N; i++ {
tcs = []joinTestCase{
newTestCase(testutil.NewMheap(), []bool{false}, []types.Type{{Oid: types.T_int8}}, []ResultPos{{0, 0}, {1, 0}},
newTestCase(testutil.NewMheap(), []bool{false}, []types.Type{{Oid: types.T_int8}}, []colexec.ResultPos{colexec.NewResultPos(0, 0), colexec.NewResultPos(1, 0)},
[][]*plan.Expr{
{
newExpr(0, types.Type{Oid: types.T_int8}),
......@@ -113,7 +114,7 @@ func BenchmarkJoin(b *testing.B) {
newExpr(0, types.Type{Oid: types.T_int8}),
},
}),
newTestCase(testutil.NewMheap(), []bool{true}, []types.Type{{Oid: types.T_int8}}, []ResultPos{{0, 0}, {1, 0}},
newTestCase(testutil.NewMheap(), []bool{true}, []types.Type{{Oid: types.T_int8}}, []colexec.ResultPos{colexec.NewResultPos(0, 0), colexec.NewResultPos(1, 0)},
[][]*plan.Expr{
{
newExpr(0, types.Type{Oid: types.T_int8}),
......@@ -161,7 +162,7 @@ func newExpr(pos int32, typ types.Type) *plan.Expr {
}
}
func newTestCase(m *mheap.Mheap, flgs []bool, ts []types.Type, rp []ResultPos, cs [][]*plan.Expr) joinTestCase {
func newTestCase(m *mheap.Mheap, flgs []bool, ts []types.Type, rp []colexec.ResultPos, cs [][]*plan.Expr) joinTestCase {
proc := process.New(m)
proc.Reg.MergeReceivers = make([]*process.WaitRegister, 2)
ctx, cancel := context.WithCancel(context.Background())
......
......@@ -19,6 +19,7 @@ import (
"github.com/matrixorigin/matrixone/pkg/container/batch"
"github.com/matrixorigin/matrixone/pkg/container/types"
"github.com/matrixorigin/matrixone/pkg/container/vector"
"github.com/matrixorigin/matrixone/pkg/sql/colexec"
"github.com/matrixorigin/matrixone/pkg/sql/plan"
)
......@@ -46,16 +47,11 @@ type container struct {
mp *hashmap.JoinMap
}
type ResultPos struct {
Rel int32
Pos int32
}
type Argument struct {
ctr *container
Ibucket uint64 // index in buckets
Nbucket uint64 // buckets count
Result []ResultPos
Result []colexec.ResultPos
Typs []types.Type
Cond *plan.Expr
Conditions [][]*plan.Expr
......
......@@ -22,6 +22,7 @@ import (
"github.com/matrixorigin/matrixone/pkg/container/batch"
"github.com/matrixorigin/matrixone/pkg/container/types"
"github.com/matrixorigin/matrixone/pkg/pb/plan"
"github.com/matrixorigin/matrixone/pkg/sql/colexec"
"github.com/matrixorigin/matrixone/pkg/sql/colexec/hashbuild"
"github.com/matrixorigin/matrixone/pkg/sql/plan/function"
"github.com/matrixorigin/matrixone/pkg/testutil"
......@@ -51,7 +52,7 @@ var (
func init() {
tcs = []joinTestCase{
newTestCase(testutil.NewMheap(), []bool{false}, []types.Type{{Oid: types.T_int8}}, []ResultPos{{0, 0}},
newTestCase(testutil.NewMheap(), []bool{false}, []types.Type{{Oid: types.T_int8}}, []colexec.ResultPos{colexec.NewResultPos(0, 0)},
[][]*plan.Expr{
{
newExpr(0, types.Type{Oid: types.T_int8}),
......@@ -60,7 +61,7 @@ func init() {
newExpr(0, types.Type{Oid: types.T_int8}),
},
}),
newTestCase(testutil.NewMheap(), []bool{true}, []types.Type{{Oid: types.T_int8}}, []ResultPos{{0, 0}, {1, 0}},
newTestCase(testutil.NewMheap(), []bool{true}, []types.Type{{Oid: types.T_int8}}, []colexec.ResultPos{colexec.NewResultPos(0, 0), colexec.NewResultPos(1, 0)},
[][]*plan.Expr{
{
newExpr(0, types.Type{Oid: types.T_int8}),
......@@ -123,7 +124,7 @@ func TestJoin(t *testing.T) {
func BenchmarkJoin(b *testing.B) {
for i := 0; i < b.N; i++ {
tcs = []joinTestCase{
newTestCase(testutil.NewMheap(), []bool{false}, []types.Type{{Oid: types.T_int8}}, []ResultPos{{0, 0}, {1, 0}},
newTestCase(testutil.NewMheap(), []bool{false}, []types.Type{{Oid: types.T_int8}}, []colexec.ResultPos{colexec.NewResultPos(0, 0), colexec.NewResultPos(1, 0)},
[][]*plan.Expr{
{
newExpr(0, types.Type{Oid: types.T_int8}),
......@@ -132,7 +133,7 @@ func BenchmarkJoin(b *testing.B) {
newExpr(0, types.Type{Oid: types.T_int8}),
},
}),
newTestCase(testutil.NewMheap(), []bool{true}, []types.Type{{Oid: types.T_int8}}, []ResultPos{{0, 0}, {1, 0}},
newTestCase(testutil.NewMheap(), []bool{true}, []types.Type{{Oid: types.T_int8}}, []colexec.ResultPos{colexec.NewResultPos(0, 0), colexec.NewResultPos(1, 0)},
[][]*plan.Expr{
{
newExpr(0, types.Type{Oid: types.T_int8}),
......@@ -180,7 +181,7 @@ func newExpr(pos int32, typ types.Type) *plan.Expr {
}
}
func newTestCase(m *mheap.Mheap, flgs []bool, ts []types.Type, rp []ResultPos, cs [][]*plan.Expr) joinTestCase {
func newTestCase(m *mheap.Mheap, flgs []bool, ts []types.Type, rp []colexec.ResultPos, cs [][]*plan.Expr) joinTestCase {
proc := process.New(m)
proc.Reg.MergeReceivers = make([]*process.WaitRegister, 2)
ctx, cancel := context.WithCancel(context.Background())
......
......@@ -19,6 +19,7 @@ import (
"github.com/matrixorigin/matrixone/pkg/container/batch"
"github.com/matrixorigin/matrixone/pkg/container/types"
"github.com/matrixorigin/matrixone/pkg/container/vector"
"github.com/matrixorigin/matrixone/pkg/sql/colexec"
"github.com/matrixorigin/matrixone/pkg/sql/plan"
)
......@@ -46,16 +47,11 @@ type container struct {
mp *hashmap.JoinMap
}
type ResultPos struct {
Rel int32
Pos int32
}
type Argument struct {
ctr *container
Ibucket uint64
Nbucket uint64
Result []ResultPos
Result []colexec.ResultPos
Typs []types.Type
Cond *plan.Expr
Conditions [][]*plan.Expr
......
......@@ -22,6 +22,7 @@ import (
"github.com/matrixorigin/matrixone/pkg/container/batch"
"github.com/matrixorigin/matrixone/pkg/container/types"
"github.com/matrixorigin/matrixone/pkg/pb/plan"
"github.com/matrixorigin/matrixone/pkg/sql/colexec"
"github.com/matrixorigin/matrixone/pkg/sql/colexec/hashbuild"
"github.com/matrixorigin/matrixone/pkg/sql/plan/function"
"github.com/matrixorigin/matrixone/pkg/testutil"
......@@ -55,7 +56,7 @@ func init() {
hm := host.New(1 << 30)
gm := guest.New(1<<30, hm)
tcs = []joinTestCase{
newTestCase(mheap.New(gm), []bool{false}, []types.Type{{Oid: types.T_int8}}, []ResultPos{{0, 0}, {1, 0}}),
newTestCase(mheap.New(gm), []bool{false}, []types.Type{{Oid: types.T_int8}}, []colexec.ResultPos{colexec.NewResultPos(0, 0), colexec.NewResultPos(1, 0)}),
}
}
......@@ -100,8 +101,8 @@ func BenchmarkJoin(b *testing.B) {
hm := host.New(1 << 30)
gm := guest.New(1<<30, hm)
tcs = []joinTestCase{
newTestCase(mheap.New(gm), []bool{false}, []types.Type{{Oid: types.T_int8}}, []ResultPos{{0, 0}, {1, 0}}),
newTestCase(mheap.New(gm), []bool{true}, []types.Type{{Oid: types.T_int8}}, []ResultPos{{0, 0}, {1, 0}}),
newTestCase(mheap.New(gm), []bool{false}, []types.Type{{Oid: types.T_int8}}, []colexec.ResultPos{colexec.NewResultPos(0, 0), colexec.NewResultPos(1, 0)}),
newTestCase(mheap.New(gm), []bool{true}, []types.Type{{Oid: types.T_int8}}, []colexec.ResultPos{colexec.NewResultPos(0, 0), colexec.NewResultPos(1, 0)}),
}
t := new(testing.T)
for _, tc := range tcs {
......@@ -125,7 +126,7 @@ func BenchmarkJoin(b *testing.B) {
}
}
func newTestCase(m *mheap.Mheap, flgs []bool, ts []types.Type, rp []ResultPos) joinTestCase {
func newTestCase(m *mheap.Mheap, flgs []bool, ts []types.Type, rp []colexec.ResultPos) joinTestCase {
proc := process.New(m)
proc.Reg.MergeReceivers = make([]*process.WaitRegister, 2)
ctx, cancel := context.WithCancel(context.Background())
......
......@@ -18,6 +18,7 @@ import (
"github.com/matrixorigin/matrixone/pkg/container/batch"
"github.com/matrixorigin/matrixone/pkg/container/types"
"github.com/matrixorigin/matrixone/pkg/pb/plan"
"github.com/matrixorigin/matrixone/pkg/sql/colexec"
)
const (
......@@ -31,14 +32,9 @@ type container struct {
bat *batch.Batch
}
type ResultPos struct {
Rel int32
Pos int32
}
type Argument struct {
ctr *container
Cond *plan.Expr
Result []ResultPos
Result []colexec.ResultPos
Typs []types.Type
}
......@@ -22,6 +22,7 @@ import (
"github.com/matrixorigin/matrixone/pkg/container/batch"
"github.com/matrixorigin/matrixone/pkg/container/types"
"github.com/matrixorigin/matrixone/pkg/pb/plan"
"github.com/matrixorigin/matrixone/pkg/sql/colexec"
"github.com/matrixorigin/matrixone/pkg/sql/colexec/hashbuild"
"github.com/matrixorigin/matrixone/pkg/sql/plan/function"
"github.com/matrixorigin/matrixone/pkg/testutil"
......@@ -55,8 +56,8 @@ func init() {
hm := host.New(1 << 30)
gm := guest.New(1<<30, hm)
tcs = []joinTestCase{
newTestCase(mheap.New(gm), []bool{false}, []types.Type{{Oid: types.T_int8}}, []ResultPos{{0, 0}, {1, 0}}),
newTestCase(mheap.New(gm), []bool{true}, []types.Type{{Oid: types.T_int8}}, []ResultPos{{0, 0}, {1, 0}}),
newTestCase(mheap.New(gm), []bool{false}, []types.Type{{Oid: types.T_int8}}, []colexec.ResultPos{colexec.NewResultPos(0, 0), colexec.NewResultPos(1, 0)}),
newTestCase(mheap.New(gm), []bool{true}, []types.Type{{Oid: types.T_int8}}, []colexec.ResultPos{colexec.NewResultPos(0, 0), colexec.NewResultPos(1, 0)}),
}
}
......@@ -120,8 +121,8 @@ func BenchmarkJoin(b *testing.B) {
hm := host.New(1 << 30)
gm := guest.New(1<<30, hm)
tcs = []joinTestCase{
newTestCase(mheap.New(gm), []bool{false}, []types.Type{{Oid: types.T_int8}}, []ResultPos{{0, 0}, {1, 0}}),
newTestCase(mheap.New(gm), []bool{true}, []types.Type{{Oid: types.T_int8}}, []ResultPos{{0, 0}, {1, 0}}),
newTestCase(mheap.New(gm), []bool{false}, []types.Type{{Oid: types.T_int8}}, []colexec.ResultPos{colexec.NewResultPos(0, 0), colexec.NewResultPos(1, 0)}),
newTestCase(mheap.New(gm), []bool{true}, []types.Type{{Oid: types.T_int8}}, []colexec.ResultPos{colexec.NewResultPos(0, 0), colexec.NewResultPos(1, 0)}),
}
t := new(testing.T)
for _, tc := range tcs {
......@@ -145,7 +146,7 @@ func BenchmarkJoin(b *testing.B) {
}
}
func newTestCase(m *mheap.Mheap, flgs []bool, ts []types.Type, rp []ResultPos) joinTestCase {
func newTestCase(m *mheap.Mheap, flgs []bool, ts []types.Type, rp []colexec.ResultPos) joinTestCase {
proc := process.New(m)
proc.Reg.MergeReceivers = make([]*process.WaitRegister, 2)
ctx, cancel := context.WithCancel(context.Background())
......
......@@ -18,6 +18,7 @@ import (
"github.com/matrixorigin/matrixone/pkg/container/batch"
"github.com/matrixorigin/matrixone/pkg/container/types"
"github.com/matrixorigin/matrixone/pkg/pb/plan"
"github.com/matrixorigin/matrixone/pkg/sql/colexec"
)
const (
......@@ -31,14 +32,9 @@ type container struct {
bat *batch.Batch
}
type ResultPos struct {
Rel int32
Pos int32
}
type Argument struct {
ctr *container
Typs []types.Type
Cond *plan.Expr
Result []ResultPos
Result []colexec.ResultPos
}
......@@ -22,6 +22,7 @@ import (
"github.com/matrixorigin/matrixone/pkg/container/batch"
"github.com/matrixorigin/matrixone/pkg/container/types"
"github.com/matrixorigin/matrixone/pkg/pb/plan"
"github.com/matrixorigin/matrixone/pkg/sql/colexec"
"github.com/matrixorigin/matrixone/pkg/sql/colexec/hashbuild"
"github.com/matrixorigin/matrixone/pkg/sql/plan/function"
"github.com/matrixorigin/matrixone/pkg/testutil"
......@@ -55,8 +56,8 @@ func init() {
hm := host.New(1 << 30)
gm := guest.New(1<<30, hm)
tcs = []joinTestCase{
newTestCase(mheap.New(gm), []bool{false}, []types.Type{{Oid: types.T_int8}}, []ResultPos{{0, 0}, {1, 0}}),
newTestCase(mheap.New(gm), []bool{true}, []types.Type{{Oid: types.T_int8}}, []ResultPos{{0, 0}, {1, 0}}),
newTestCase(mheap.New(gm), []bool{false}, []types.Type{{Oid: types.T_int8}}, []colexec.ResultPos{colexec.NewResultPos(0, 0), colexec.NewResultPos(1, 0)}),
newTestCase(mheap.New(gm), []bool{true}, []types.Type{{Oid: types.T_int8}}, []colexec.ResultPos{colexec.NewResultPos(0, 0), colexec.NewResultPos(1, 0)}),
}
}
......@@ -120,8 +121,8 @@ func BenchmarkJoin(b *testing.B) {
hm := host.New(1 << 30)
gm := guest.New(1<<30, hm)
tcs = []joinTestCase{
newTestCase(mheap.New(gm), []bool{false}, []types.Type{{Oid: types.T_int8}}, []ResultPos{{0, 0}, {1, 0}}),
newTestCase(mheap.New(gm), []bool{true}, []types.Type{{Oid: types.T_int8}}, []ResultPos{{0, 0}, {1, 0}}),
newTestCase(mheap.New(gm), []bool{false}, []types.Type{{Oid: types.T_int8}}, []colexec.ResultPos{colexec.NewResultPos(0, 0), colexec.NewResultPos(1, 0)}),
newTestCase(mheap.New(gm), []bool{true}, []types.Type{{Oid: types.T_int8}}, []colexec.ResultPos{colexec.NewResultPos(0, 0), colexec.NewResultPos(1, 0)}),
}
t := new(testing.T)
for _, tc := range tcs {
......@@ -145,7 +146,7 @@ func BenchmarkJoin(b *testing.B) {
}
}
func newTestCase(m *mheap.Mheap, flgs []bool, ts []types.Type, rp []ResultPos) joinTestCase {
func newTestCase(m *mheap.Mheap, flgs []bool, ts []types.Type, rp []colexec.ResultPos) joinTestCase {
proc := process.New(m)
proc.Reg.MergeReceivers = make([]*process.WaitRegister, 2)
ctx, cancel := context.WithCancel(context.Background())
......
......@@ -18,6 +18,7 @@ import (
"github.com/matrixorigin/matrixone/pkg/container/batch"
"github.com/matrixorigin/matrixone/pkg/container/types"
"github.com/matrixorigin/matrixone/pkg/pb/plan"
"github.com/matrixorigin/matrixone/pkg/sql/colexec"
)
const (
......@@ -31,14 +32,9 @@ type container struct {
bat *batch.Batch
}
type ResultPos struct {
Rel int32
Pos int32
}
type Argument struct {
ctr *container
Cond *plan.Expr
Result []ResultPos
Typs []types.Type
Result []colexec.ResultPos
}
......@@ -21,7 +21,6 @@ import (
"github.com/matrixorigin/matrixone/pkg/container/batch"
"github.com/matrixorigin/matrixone/pkg/container/vector"
"github.com/matrixorigin/matrixone/pkg/sql/colexec"
"github.com/matrixorigin/matrixone/pkg/sql/colexec/order"
"github.com/matrixorigin/matrixone/pkg/vm/process"
)
......@@ -128,7 +127,7 @@ func (ctr *container) build(ap *Argument, proc *process.Process, anal process.An
ctr.cmps = make([]compare.Compare, len(bat.Vecs))
for i := range ctr.cmps {
if pos, ok := mp[i]; ok {
ctr.cmps[i] = compare.New(bat.Vecs[i].Typ, ap.Fs[pos].Type == order.Descending)
ctr.cmps[i] = compare.New(bat.Vecs[i].Typ, ap.Fs[pos].Type == colexec.Descending)
} else {
ctr.cmps[i] = compare.New(bat.Vecs[i].Typ, true)
}
......
......@@ -22,7 +22,7 @@ import (
"github.com/matrixorigin/matrixone/pkg/container/batch"
"github.com/matrixorigin/matrixone/pkg/container/types"
"github.com/matrixorigin/matrixone/pkg/pb/plan"
"github.com/matrixorigin/matrixone/pkg/sql/colexec/order"
"github.com/matrixorigin/matrixone/pkg/sql/colexec"
"github.com/matrixorigin/matrixone/pkg/testutil"
"github.com/matrixorigin/matrixone/pkg/vm/mheap"
"github.com/matrixorigin/matrixone/pkg/vm/mmu/guest"
......@@ -53,11 +53,11 @@ func init() {
hm := host.New(1 << 30)
gm := guest.New(1<<30, hm)
tcs = []orderTestCase{
newTestCase(mheap.New(gm), []bool{false}, []types.Type{{Oid: types.T_int8}}, []order.Field{{E: newExpression(0), Type: 0}}),
newTestCase(mheap.New(gm), []bool{true}, []types.Type{{Oid: types.T_int8}}, []order.Field{{E: newExpression(0), Type: 2}}),
newTestCase(mheap.New(gm), []bool{false, false}, []types.Type{{Oid: types.T_int8}, {Oid: types.T_int64}}, []order.Field{{E: newExpression(1), Type: 0}}),
newTestCase(mheap.New(gm), []bool{true, false}, []types.Type{{Oid: types.T_int8}, {Oid: types.T_int64}}, []order.Field{{E: newExpression(0), Type: 2}}),
newTestCase(mheap.New(gm), []bool{true, false}, []types.Type{{Oid: types.T_int8}, {Oid: types.T_int64}}, []order.Field{{E: newExpression(0), Type: 2}, {E: newExpression(1), Type: 0}}),
newTestCase(mheap.New(gm), []bool{false}, []types.Type{{Oid: types.T_int8}}, []colexec.Field{{E: newExpression(0), Type: 0}}),
newTestCase(mheap.New(gm), []bool{true}, []types.Type{{Oid: types.T_int8}}, []colexec.Field{{E: newExpression(0), Type: 2}}),
newTestCase(mheap.New(gm), []bool{false, false}, []types.Type{{Oid: types.T_int8}, {Oid: types.T_int64}}, []colexec.Field{{E: newExpression(1), Type: 0}}),
newTestCase(mheap.New(gm), []bool{true, false}, []types.Type{{Oid: types.T_int8}, {Oid: types.T_int64}}, []colexec.Field{{E: newExpression(0), Type: 2}}),
newTestCase(mheap.New(gm), []bool{true, false}, []types.Type{{Oid: types.T_int8}, {Oid: types.T_int64}}, []colexec.Field{{E: newExpression(0), Type: 2}, {E: newExpression(1), Type: 0}}),
}
}
......@@ -110,8 +110,8 @@ func BenchmarkOrder(b *testing.B) {
hm := host.New(1 << 30)
gm := guest.New(1<<30, hm)
tcs = []orderTestCase{
newTestCase(mheap.New(gm), []bool{false}, []types.Type{{Oid: types.T_int8}}, []order.Field{{E: newExpression(0), Type: 0}}),
newTestCase(mheap.New(gm), []bool{true}, []types.Type{{Oid: types.T_int8}}, []order.Field{{E: newExpression(0), Type: 2}}),
newTestCase(mheap.New(gm), []bool{false}, []types.Type{{Oid: types.T_int8}}, []colexec.Field{{E: newExpression(0), Type: 0}}),
newTestCase(mheap.New(gm), []bool{true}, []types.Type{{Oid: types.T_int8}}, []colexec.Field{{E: newExpression(0), Type: 2}}),
}
t := new(testing.T)
for _, tc := range tcs {
......@@ -143,7 +143,7 @@ func BenchmarkOrder(b *testing.B) {
}
}
func newTestCase(m *mheap.Mheap, ds []bool, ts []types.Type, fs []order.Field) orderTestCase {
func newTestCase(m *mheap.Mheap, ds []bool, ts []types.Type, fs []colexec.Field) orderTestCase {
proc := process.New(m)
proc.Reg.MergeReceivers = make([]*process.WaitRegister, 2)
ctx, cancel := context.WithCancel(context.Background())
......
......@@ -17,7 +17,7 @@ package mergeorder
import (
"github.com/matrixorigin/matrixone/pkg/compare"
"github.com/matrixorigin/matrixone/pkg/container/batch"
"github.com/matrixorigin/matrixone/pkg/sql/colexec/order"
"github.com/matrixorigin/matrixone/pkg/sql/colexec"
)
const (
......@@ -36,6 +36,6 @@ type container struct {
}
type Argument struct {
Fs []order.Field // Fields store the order information
ctr *container // ctr stores the attributes needn't do Serialization work
ctr *container // ctr stores the attributes needn't do Serialization work
Fs []colexec.Field // Fields store the order information
}
......@@ -23,7 +23,6 @@ import (
"github.com/matrixorigin/matrixone/pkg/container/batch"
"github.com/matrixorigin/matrixone/pkg/container/vector"
"github.com/matrixorigin/matrixone/pkg/sql/colexec"
"github.com/matrixorigin/matrixone/pkg/sql/colexec/top"
"github.com/matrixorigin/matrixone/pkg/vm/process"
)
......@@ -130,7 +129,7 @@ func (ctr *container) build(ap *Argument, proc *process.Process, anal process.An
ctr.cmps = make([]compare.Compare, len(bat.Vecs))
for i := range ctr.cmps {
if pos, ok := mp[i]; ok {
ctr.cmps[i] = compare.New(bat.Vecs[i].Typ, ap.Fs[pos].Type == top.Descending)
ctr.cmps[i] = compare.New(bat.Vecs[i].Typ, ap.Fs[pos].Type == colexec.Descending)
} else {
ctr.cmps[i] = compare.New(bat.Vecs[i].Typ, true)
}
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment