Skip to content
Snippets Groups Projects
Commit ae0e8b1a authored by BossZou's avatar BossZou Committed by yefu.chen
Browse files

Init data service param before new DataServive


Signed-off-by: default avatarBossZou <yinghao.zou@zilliz.com>
parent 5c8747cf
No related branches found
No related tags found
No related merge requests found
......@@ -2,13 +2,15 @@ package roles
import (
"context"
"log"
"fmt"
"os"
"os/signal"
"strings"
"syscall"
"github.com/zilliztech/milvus-distributed/cmd/distributed/components"
ds "github.com/zilliztech/milvus-distributed/internal/dataservice"
"github.com/zilliztech/milvus-distributed/internal/log"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/msgstream/pulsarms"
"github.com/zilliztech/milvus-distributed/internal/msgstream/rmqms"
......@@ -55,7 +57,6 @@ func (mr *MilvusRoles) EnvValue(env string) bool {
func (mr *MilvusRoles) Run(localMsg bool) {
if !mr.HasAnyRole() {
log.Printf("set the roles please ...")
return
}
......@@ -64,7 +65,6 @@ func (mr *MilvusRoles) Run(localMsg bool) {
var masterService *components.MasterService
if mr.EnableMaster {
log.Print("start as master service")
go func() {
factory := newMsgFactory(localMsg)
var err error
......@@ -78,7 +78,6 @@ func (mr *MilvusRoles) Run(localMsg bool) {
var proxyService *components.ProxyService
if mr.EnableProxyService {
log.Print("start as proxy service")
go func() {
factory := newMsgFactory(localMsg)
var err error
......@@ -92,7 +91,6 @@ func (mr *MilvusRoles) Run(localMsg bool) {
var proxyNode *components.ProxyNode
if mr.EnableProxyNode {
log.Print("start as proxy node")
go func() {
factory := newMsgFactory(localMsg)
var err error
......@@ -106,7 +104,6 @@ func (mr *MilvusRoles) Run(localMsg bool) {
var queryService *components.QueryService
if mr.EnableQueryService {
log.Print("start as query service")
go func() {
factory := newMsgFactory(localMsg)
var err error
......@@ -120,7 +117,6 @@ func (mr *MilvusRoles) Run(localMsg bool) {
var queryNode *components.QueryNode
if mr.EnableQueryNode {
log.Print("start as query node")
go func() {
factory := newMsgFactory(localMsg)
var err error
......@@ -134,10 +130,12 @@ func (mr *MilvusRoles) Run(localMsg bool) {
var dataService *components.DataService
if mr.EnableDataService {
log.Print("start as data service")
go func() {
factory := newMsgFactory(localMsg)
var err error
// Init data service params
ds.Params.Init()
log.SetupLogger(&ds.Params.Log)
dataService, err = components.NewDataService(ctx, factory)
if err != nil {
panic(err)
......@@ -148,7 +146,6 @@ func (mr *MilvusRoles) Run(localMsg bool) {
var dataNode *components.DataNode
if mr.EnableDataNode {
log.Print("start as data node")
go func() {
factory := newMsgFactory(localMsg)
var err error
......@@ -162,7 +159,6 @@ func (mr *MilvusRoles) Run(localMsg bool) {
var indexService *components.IndexService
if mr.EnableIndexService {
log.Print("start as index service")
go func() {
var err error
indexService, err = components.NewIndexService(ctx)
......@@ -175,7 +171,6 @@ func (mr *MilvusRoles) Run(localMsg bool) {
var indexNode *components.IndexNode
if mr.EnableIndexNode {
log.Print("start as index node")
go func() {
var err error
indexNode, err = components.NewIndexNode(ctx)
......@@ -188,7 +183,6 @@ func (mr *MilvusRoles) Run(localMsg bool) {
var msgStream *components.MsgStream
if mr.EnableMsgStreamService {
log.Print("start as msg stream service")
go func() {
var err error
msgStream, err = components.NewMsgStreamService(ctx)
......@@ -206,76 +200,66 @@ func (mr *MilvusRoles) Run(localMsg bool) {
syscall.SIGTERM,
syscall.SIGQUIT)
sig := <-sc
log.Printf("Get %s signal to exit", sig.String())
fmt.Printf("Get %s signal to exit", sig.String())
if mr.EnableMaster {
if masterService != nil {
_ = masterService.Stop()
}
log.Printf("exit master service")
}
if mr.EnableProxyService {
if proxyService != nil {
_ = proxyService.Stop()
}
log.Printf("exit proxy service")
}
if mr.EnableProxyNode {
if proxyNode != nil {
_ = proxyNode.Stop()
}
log.Printf("exit proxy node")
}
if mr.EnableQueryService {
if queryService != nil {
_ = queryService.Stop()
}
log.Printf("exit query service")
}
if mr.EnableQueryNode {
if queryNode != nil {
_ = queryNode.Stop()
}
log.Printf("exit query node")
}
if mr.EnableDataService {
if dataService != nil {
_ = dataService.Stop()
}
log.Printf("exit data service")
}
if mr.EnableDataNode {
if dataNode != nil {
_ = dataNode.Stop()
}
log.Printf("exit data node")
}
if mr.EnableIndexService {
if indexService != nil {
_ = indexService.Stop()
}
log.Printf("exit index service")
}
if mr.EnableIndexNode {
if indexNode != nil {
_ = indexNode.Stop()
}
log.Printf("exit index node")
}
if mr.EnableMsgStreamService {
if msgStream != nil {
_ = msgStream.Stop()
}
log.Printf("exit msg stream service")
}
defer rocksmq.CloseRocksMQ()
......
......@@ -93,7 +93,7 @@ func (node *NodeImpl) waitForServiceReady(service Component, serviceName string)
return nil
}
// wait for 10 seconds
err := retry.Retry(10, time.Millisecond*200, checkFunc)
err := retry.Retry(10, time.Second, checkFunc)
if err != nil {
errMsg := fmt.Sprintf("ProxyNode wait for %s ready failed", serviceName)
return errors.New(errMsg)
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment