diff --git a/cmd/distributed/components/master_service.go b/cmd/distributed/components/master_service.go index 1530be066be9311cb12b57efcaee4e8b76591238..b13d154190ad7ce63059c499a9217477d2d35922 100644 --- a/cmd/distributed/components/master_service.go +++ b/cmd/distributed/components/master_service.go @@ -125,8 +125,19 @@ func (m *MasterService) Run() error { } func (m *MasterService) Stop() error { - _ = m.proxyService.Stop() - _ = m.indexService.Stop() - _ = m.dataService.Stop() - return m.svr.Stop() + if m != nil { + if m.proxyService != nil { + _ = m.proxyService.Stop() + } + if m.indexService != nil { + _ = m.indexService.Stop() + } + if m.dataService != nil { + _ = m.dataService.Stop() + } + if m.svr != nil { + return m.svr.Stop() + } + } + return nil } diff --git a/cmd/distributed/components/msg_stream_service.go b/cmd/distributed/components/msg_stream_service.go new file mode 100644 index 0000000000000000000000000000000000000000..847d41183e35a857c1f269bfc2bdbbb04692b596 --- /dev/null +++ b/cmd/distributed/components/msg_stream_service.go @@ -0,0 +1,20 @@ +package components + +import ( + "context" +) + +func NewMsgStreamService(ctx context.Context) (*MsgStream, error) { + return nil, nil +} + +type MsgStream struct { +} + +func (ps *MsgStream) Run() error { + return nil +} + +func (ps *MsgStream) Stop() error { + return nil +} diff --git a/cmd/distributed/components/query_node.go b/cmd/distributed/components/query_node.go new file mode 100644 index 0000000000000000000000000000000000000000..ffbb154fdb158dc190746d9307b01ee316b8450a --- /dev/null +++ b/cmd/distributed/components/query_node.go @@ -0,0 +1,20 @@ +package components + +import ( + "context" +) + +func NewQueryNode(ctx context.Context) (*QueryNode, error) { + return nil, nil +} + +type QueryNode struct { +} + +func (ps *QueryNode) Run() error { + return nil +} + +func (ps *QueryNode) Stop() error { + return nil +} diff --git a/cmd/distributed/components/query_service.go b/cmd/distributed/components/query_service.go new file mode 100644 index 0000000000000000000000000000000000000000..fbcd28beded37e8506a3dca2a3c759c23f9ddf88 --- /dev/null +++ b/cmd/distributed/components/query_service.go @@ -0,0 +1,20 @@ +package components + +import ( + "context" +) + +func NewQueryService(ctx context.Context) (*QueryService, error) { + return nil, nil +} + +type QueryService struct { +} + +func (ps *QueryService) Run() error { + return nil +} + +func (ps *QueryService) Stop() error { + return nil +} diff --git a/cmd/distributed/main.go b/cmd/distributed/main.go index 2d98593b7b68e5c5e3bd3ae0715c2c43f69e556f..591940ab9f5f8365ec09cbf3ba9a65b560ae1d88 100644 --- a/cmd/distributed/main.go +++ b/cmd/distributed/main.go @@ -2,19 +2,14 @@ package main import ( "context" - "fmt" + "flag" + "log" "os" "os/signal" - "path/filepath" + "strings" "syscall" - "github.com/go-kit/kit/log/level" - "github.com/ilyakaznacheev/cleanenv" - "github.com/oklog/run" - "github.com/pkg/errors" - "github.com/prometheus/common/promlog" - grpcproxyservice "github.com/zilliztech/milvus-distributed/internal/distributed/proxyservice" - "gopkg.in/alecthomas/kingpin.v2" + "github.com/zilliztech/milvus-distributed/cmd/distributed/components" ) type MilvusRoles struct { @@ -38,137 +33,275 @@ func (mr *MilvusRoles) hasAnyRole() bool { mr.EnableIndexService || mr.EnableIndexNode } -var roles MilvusRoles +func (mr *MilvusRoles) envValue(env string) bool { + env = strings.ToLower(env) + env = strings.Trim(env, " ") + if env == "1" || env == "true" { + return true + } + return false +} func main() { - a := kingpin.New(filepath.Base(os.Args[0]), "Milvus") + var roles MilvusRoles - a.HelpFlag.Short('h') + flag.BoolVar(&roles.EnableMaster, "master-service", false, "start as master service") + flag.BoolVar(&roles.EnableProxyService, "proxy-service", false, "start as proxy service") + flag.BoolVar(&roles.EnableProxyNode, "proxy-node", false, "start as proxy node") + flag.BoolVar(&roles.EnableQueryService, "query-service", false, "start as query service") + flag.BoolVar(&roles.EnableQueryNode, "query-node", false, "start as query node") + flag.BoolVar(&roles.EnableDataService, "data-service", false, "start as data service") + flag.BoolVar(&roles.EnableDataNode, "data-node", false, "start as data node") + flag.BoolVar(&roles.EnableIndexService, "index-service", false, "start as index service") + flag.BoolVar(&roles.EnableIndexNode, "index-node", false, "start as index node") + flag.BoolVar(&roles.EnableMsgStreamService, "msg-stream", false, "start as msg stream service") + flag.Parse() - a.Flag("master", "Run master service").Short('m').Default("false").BoolVar(&roles.EnableMaster) + if !roles.hasAnyRole() { + for _, e := range os.Environ() { + pairs := strings.SplitN(e, "=", 2) + if len(pairs) == 2 { + switch pairs[0] { + case "ENABLE_MASTER": + roles.EnableMaster = roles.envValue(pairs[1]) + case "ENABLE_PROXY_SERVICE": + roles.EnableProxyService = roles.envValue(pairs[1]) + case "ENABLE_PROXY_NODE": + roles.EnableProxyNode = roles.envValue(pairs[1]) + case "ENABLE_QUERY_SERVICE": + roles.EnableQueryService = roles.envValue(pairs[1]) + case "ENABLE_QUERY_NODE": + roles.EnableQueryNode = roles.envValue(pairs[1]) + case "ENABLE_DATA_SERVICE": + roles.EnableDataService = roles.envValue(pairs[1]) + case "ENABLE_DATA_NODE": + roles.EnableDataNode = roles.envValue(pairs[1]) + case "ENABLE_INDEX_SERVICE": + roles.EnableIndexService = roles.envValue(pairs[1]) + case "ENABLE_INDEX_NODE": + roles.EnableIndexNode = roles.envValue(pairs[1]) + case "ENABLE_MSGSTREAM_SERVICE": + roles.EnableMsgStreamService = roles.envValue(pairs[1]) + } + } + } + } - a.Flag("msgstream-service", "Run msgstream service").Short('M').Default("false").BoolVar(&roles.EnableMsgStreamService) + if !roles.hasAnyRole() { + log.Printf("set the roles please ...") + return + } - a.Flag("proxy-service", "Run proxy service").Short('p').Default("false").BoolVar(&roles.EnableProxyService) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() - a.Flag("proxy-node", "Run proxy node").Short('P').Default("false").BoolVar(&roles.EnableProxyNode) + var masterService *components.MasterService + if roles.EnableMaster { + log.Print("start as master service") + go func() { + var err error + masterService, err = components.NewMasterService(ctx) + if err != nil { + panic(err) + } + _ = masterService.Run() + }() + } - a.Flag("query-service", "Run query service").Short('q').Default("false").BoolVar(&roles.EnableQueryService) + var proxyService *components.ProxyService + if roles.EnableProxyService { + log.Print("start as proxy service") + go func() { + var err error + proxyService, err = components.NewProxyService(ctx) + if err != nil { + panic(err) + } + _ = proxyService.Run() + }() + } - a.Flag("query-node", "Run query node").Short('Q').Default("false").BoolVar(&roles.EnableQueryNode) + var proxyNode *components.ProxyNode + if roles.EnableProxyNode { + log.Print("start as proxy node") + go func() { + var err error + proxyNode, err = components.NewProxyNode(ctx) + if err != nil { + panic(err) + } + _ = proxyNode.Run() + }() + } - a.Flag("data-service", "Run data service").Short('d').Default("false").BoolVar(&roles.EnableDataService) + var queryService *components.QueryService + if roles.EnableQueryService { + log.Print("start as query service") + go func() { + var err error + queryService, err = components.NewQueryService(ctx) + if err != nil { + panic(err) + } + _ = queryService.Run() + }() + } - a.Flag("data-node", "Run data node").Short('D').Default("false").BoolVar(&roles.EnableDataNode) + var queryNode *components.QueryNode + if roles.EnableQueryNode { + log.Print("start as query node") + go func() { + var err error + queryNode, err = components.NewQueryNode(ctx) + if err != nil { + panic(err) + } + _ = queryNode.Run() + }() + } - a.Flag("index-service", "Run index service").Short('i').Default("false").BoolVar(&roles.EnableIndexService) + var dataService *components.DataService + if roles.EnableDataService { + log.Print("start as data service") + go func() { + var err error + dataService, err = components.NewDataService(ctx) + if err != nil { + panic(err) + } + _ = dataService.Run() + }() + } - a.Flag("index-node", "Run index node").Short('I').Default("false").BoolVar(&roles.EnableIndexNode) + var dataNode *components.DataNode + if roles.EnableDataNode { + log.Print("start as data node") + go func() { + var err error + dataNode, err = components.NewDataNode(ctx) + if err != nil { + panic(err) + } + _ = dataNode.Run() + }() + } - _, err := a.Parse(os.Args[1:]) - if err != nil { - fmt.Fprintln(os.Stderr, errors.Wrapf(err, "Error parsing commandline arguments")) - a.Usage(os.Args[1:]) - os.Exit(2) + var indexService *components.IndexService + if roles.EnableIndexService { + log.Print("start as index service") + go func() { + var err error + indexService, err = components.NewIndexService(ctx) + if err != nil { + panic(err) + } + _ = indexService.Run() + }() } - if !roles.hasAnyRole() { - err := cleanenv.ReadEnv(&roles) - if err != nil { - fmt.Println(err) - os.Exit(-1) - } + var indexNode *components.IndexNode + if roles.EnableIndexNode { + log.Print("start as index node") + go func() { + var err error + indexNode, err = components.NewIndexNode(ctx) + if err != nil { + panic(err) + } + _ = indexNode.Run() + }() } - if !roles.hasAnyRole() { - fmt.Println("Please select at least one service to start") - os.Exit(-1) - } - - logger := promlog.New(NewLogConfig()) - - var ( - ctxProxyService, cancelProxyService = context.WithCancel(context.Background()) - proxyService = NewProxyService(ctxProxyService) - ) - - var g run.Group - { - // Termination handler. - term := make(chan os.Signal, 1) - signal.Notify(term, os.Interrupt, syscall.SIGTERM) - cancel := make(chan struct{}) - g.Add( - func() error { - select { - case <-term: - level.Warn(logger).Log("msg", "Received SIGTERM, exiting gracefully...") - case <-cancel: - } - return nil - }, - func(err error) { - close(cancel) - }, - ) + var msgStream *components.MsgStream + if roles.EnableMsgStreamService { + log.Print("start as msg stream service") + go func() { + var err error + msgStream, err = components.NewMsgStreamService(ctx) + if err != nil { + panic(err) + } + _ = msgStream.Run() + }() + } + + sc := make(chan os.Signal, 1) + signal.Notify(sc, + syscall.SIGHUP, + syscall.SIGINT, + syscall.SIGTERM, + syscall.SIGQUIT) + sig := <-sc + log.Printf("Get %s signal to exit", sig.String()) + + if roles.EnableMaster { + if masterService != nil { + _ = masterService.Stop() + } + log.Printf("exit master service") } + if roles.EnableProxyService { - // ProxyService - g.Add( - func() error { - err := proxyService.Run() - level.Info(logger).Log("msg", "Proxy service stopped") - return err - }, - func(err error) { - level.Info(logger).Log("msg", "Stopping proxy service...") - cancelProxyService() - }, - ) + if proxyService != nil { + _ = proxyService.Stop() + } + log.Printf("exit proxy service") } + if roles.EnableProxyNode { - // ProxyNode + if proxyNode != nil { + _ = proxyNode.Stop() + } + log.Printf("exit proxy node") } - if err := g.Run(); err != nil { - level.Error(logger).Log("err", err) - os.Exit(1) + if roles.EnableQueryService { + if queryService != nil { + _ = queryService.Stop() + } + log.Printf("exit query service") } - level.Info(logger).Log("msg", "See you next time!") -} -func NewLogConfig() *promlog.Config { - logConfig := promlog.Config{ - Level: &promlog.AllowedLevel{}, - Format: &promlog.AllowedFormat{}, + if roles.EnableQueryNode { + if queryNode != nil { + _ = queryNode.Stop() + } + log.Printf("exit query node") } - err := logConfig.Level.Set("debug") - if err != nil { - fmt.Println(err) - os.Exit(-1) + + if roles.EnableDataService { + if dataService != nil { + _ = dataService.Stop() + } + log.Printf("exit data service") } - err = logConfig.Format.Set("logfmt") - if err != nil { - fmt.Println(err) - os.Exit(-1) + if roles.EnableDataNode { + if dataNode != nil { + _ = dataNode.Stop() + } + log.Printf("exit data node") } - return &logConfig -} + if roles.EnableIndexService { + if indexService != nil { + _ = indexService.Stop() + } + log.Printf("exit index service") + } -// Move to proxyservice package -func NewProxyService(ctx context.Context) *ProxyService { - srv, _ := grpcproxyservice.NewServer(ctx) - ps := &ProxyService{ctx: ctx, server: srv} - return ps -} + if roles.EnableIndexNode { + if indexNode != nil { + _ = indexNode.Stop() + } + log.Printf("exit index node") + } -type ProxyService struct { - ctx context.Context - server *grpcproxyservice.Server -} + if roles.EnableMsgStreamService { + if msgStream != nil { + _ = msgStream.Stop() + } + log.Printf("exit msg stream service") + } -func (ps *ProxyService) Run() error { - return ps.server.Run() } diff --git a/cmd/distributed/main_test.go b/cmd/distributed/main_test.go new file mode 100644 index 0000000000000000000000000000000000000000..928e7d11887e24c5eb24c108b0b0b2f40b6b526b --- /dev/null +++ b/cmd/distributed/main_test.go @@ -0,0 +1,28 @@ +package main + +import ( + "strings" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestRoles(t *testing.T) { + r := MilvusRoles{} + + assert.True(t, r.envValue("1")) + assert.True(t, r.envValue(" 1 ")) + assert.True(t, r.envValue("True")) + assert.True(t, r.envValue(" True ")) + assert.True(t, r.envValue(" TRue ")) + assert.False(t, r.envValue("0")) + assert.False(t, r.envValue(" 0 ")) + assert.False(t, r.envValue(" false ")) + assert.False(t, r.envValue(" False ")) + assert.False(t, r.envValue(" abc ")) + + ss := strings.SplitN("abcdef", "=", 2) + assert.Equal(t, len(ss), 1) + ss = strings.SplitN("adb=def", "=", 2) + assert.Equal(t, len(ss), 2) +}