Skip to content
Snippets Groups Projects
Unverified Commit c987dd7a authored by nnsgmsone's avatar nnsgmsone Committed by GitHub
Browse files

Move pipeline server from cn server to pipeline (#4633)

Approved by: @reusee
parent d0b7143e
No related branches found
No related tags found
No related merge requests found
......@@ -22,6 +22,7 @@ import (
"github.com/matrixorigin/matrixone/pkg/config"
"github.com/matrixorigin/matrixone/pkg/frontend"
"github.com/matrixorigin/matrixone/pkg/logservice"
"github.com/matrixorigin/matrixone/pkg/sql/compile"
"github.com/matrixorigin/matrixone/pkg/txn/client"
"github.com/matrixorigin/matrixone/pkg/txn/rpc"
ie "github.com/matrixorigin/matrixone/pkg/util/internalExecutor"
......@@ -52,7 +53,7 @@ func NewService(cfg *Config, ctx context.Context) (Service, error) {
if err != nil {
return nil, err
}
server.RegisterRequestHandler(srv.handleRequest)
server.RegisterRequestHandler(compile.NewServer().HandleRequest)
srv.server = server
pu := config.NewParameterUnit(&cfg.Frontend, nil, nil, nil, nil, nil)
......@@ -93,10 +94,6 @@ func (s *service) releaseMessage(msg *pipeline.Message) {
}
*/
func (s *service) handleRequest(ctx context.Context, req morpc.Message, _ uint64, cs morpc.ClientSession) error {
return nil
}
func (s *service) initMOServer(ctx context.Context, pu *config.ParameterUnit) error {
var err error
logutil.Infof("Shutdown The Server With Ctrl+C | Ctrl+\\.")
......
// Copyright 2021 Matrix Origin
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package compile
import (
"context"
"github.com/matrixorigin/matrixone/pkg/common/morpc"
"github.com/matrixorigin/matrixone/pkg/vm/process"
)
var srv *Server
func NewServer() *Server {
if srv != nil {
return srv
}
srv := &Server{}
return srv
}
func (srv *Server) RegistConnector(reg *process.WaitRegister) {
srv.Lock()
defer srv.Unlock()
srv.mp[srv.curr] = reg
srv.curr++
}
func (srv *Server) HandleRequest(ctx context.Context, req morpc.Message, _ uint64, cs morpc.ClientSession) error {
return nil
}
......@@ -16,6 +16,7 @@ package compile
import (
"context"
"sync"
"github.com/matrixorigin/matrixone/pkg/container/batch"
"github.com/matrixorigin/matrixone/pkg/container/types"
......@@ -110,6 +111,12 @@ type anaylze struct {
analInfos []*process.AnalyzeInfo
}
type Server struct {
sync.Mutex
curr uint64
mp map[uint64]*process.WaitRegister
}
// Compile contains all the information needed for compilation.
type Compile struct {
scope *Scope
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment