From f1ccbb8f9aea7c2ad0b696706f85445b63760718 Mon Sep 17 00:00:00 2001
From: sunby <bingyi.sun@zilliz.com>
Date: Wed, 2 Jun 2021 15:11:17 +0800
Subject: [PATCH] Fix init session in dataservice (#5522)

Signed-off-by: sunby <bingyi.sun@zilliz.com>
---
 internal/dataservice/grpc_services.go     | 26 ++++++++++++++++++++---
 internal/dataservice/server.go            |  7 +++---
 internal/dataservice/server_test.go       |  2 ++
 internal/util/sessionutil/session_util.go |  1 -
 4 files changed, 29 insertions(+), 7 deletions(-)

diff --git a/internal/dataservice/grpc_services.go b/internal/dataservice/grpc_services.go
index 440e1600e..9b1f2b34d 100644
--- a/internal/dataservice/grpc_services.go
+++ b/internal/dataservice/grpc_services.go
@@ -17,7 +17,7 @@ import (
 )
 
 func (s *Server) isClosed() bool {
-	return atomic.LoadInt64(&s.isServing) == 0
+	return atomic.LoadInt64(&s.isServing) != 2
 }
 
 func (s *Server) GetTimeTickChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
@@ -332,9 +332,29 @@ func (s *Server) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPath
 	return resp, nil
 }
 
-// todo remove these rpc
+// todo deprecated rpc
 func (s *Server) GetComponentStates(ctx context.Context) (*internalpb.ComponentStates, error) {
-	return nil, nil
+	resp := &internalpb.ComponentStates{
+		State: &internalpb.ComponentInfo{
+			NodeID:    Params.NodeID,
+			Role:      "dataservice",
+			StateCode: 0,
+		},
+		Status: &commonpb.Status{
+			ErrorCode: commonpb.ErrorCode_Success,
+			Reason:    "",
+		},
+	}
+	state := atomic.LoadInt64(&s.isServing)
+	switch state {
+	case 1:
+		resp.State.StateCode = internalpb.StateCode_Initializing
+	case 2:
+		resp.State.StateCode = internalpb.StateCode_Healthy
+	default:
+		resp.State.StateCode = internalpb.StateCode_Abnormal
+	}
+	return resp, nil
 }
 
 func (s *Server) RegisterNode(ctx context.Context, req *datapb.RegisterNodeRequest) (*datapb.RegisterNodeResponse, error) {
diff --git a/internal/dataservice/server.go b/internal/dataservice/server.go
index cd0eb5908..5bea66e52 100644
--- a/internal/dataservice/server.go
+++ b/internal/dataservice/server.go
@@ -94,13 +94,14 @@ func CreateServer(ctx context.Context, factory msgstream.Factory) (*Server, erro
 
 // Register register data service at etcd
 func (s *Server) Register() error {
+	s.session = sessionutil.NewSession(s.ctx, Params.MetaRootPath, []string{Params.EtcdAddress})
 	s.activeCh = s.session.Init(typeutil.DataServiceRole, Params.IP, true)
 	Params.NodeID = s.session.ServerID
 	return nil
 }
 
 func (s *Server) Init() error {
-	s.session = sessionutil.NewSession(s.ctx, Params.MetaRootPath, []string{Params.EtcdAddress})
+	atomic.StoreInt64(&s.isServing, 1)
 	return nil
 }
 
@@ -146,7 +147,7 @@ func (s *Server) Start() error {
 
 	s.startServerLoop()
 
-	atomic.StoreInt64(&s.isServing, 1)
+	atomic.StoreInt64(&s.isServing, 2)
 	log.Debug("start success")
 	return nil
 }
@@ -478,7 +479,7 @@ func (s *Server) initMasterClient() error {
 }
 
 func (s *Server) Stop() error {
-	if !atomic.CompareAndSwapInt64(&s.isServing, 1, 0) {
+	if !atomic.CompareAndSwapInt64(&s.isServing, 2, 0) {
 		return nil
 	}
 	log.Debug("dataservice server shutdown")
diff --git a/internal/dataservice/server_test.go b/internal/dataservice/server_test.go
index 1cf5c74c2..8525a8374 100644
--- a/internal/dataservice/server_test.go
+++ b/internal/dataservice/server_test.go
@@ -908,6 +908,8 @@ func newTestServer(t *testing.T, receiveCh chan interface{}) *Server {
 		return newMockMasterService(), nil
 	}
 	assert.Nil(t, err)
+	err = svr.Register()
+	assert.Nil(t, err)
 	err = svr.Init()
 	assert.Nil(t, err)
 	err = svr.Start()
diff --git a/internal/util/sessionutil/session_util.go b/internal/util/sessionutil/session_util.go
index d825af27b..ba4150354 100644
--- a/internal/util/sessionutil/session_util.go
+++ b/internal/util/sessionutil/session_util.go
@@ -282,7 +282,6 @@ func (s *Session) WatchServices(prefix string, revision int64) (eventChannel <-c
 					case mvccpb.PUT:
 						log.Debug("watch services",
 							zap.Any("add kv", ev.Kv))
-						session := &Session{}
 						err := json.Unmarshal([]byte(ev.Kv.Value), session)
 						if err != nil {
 							log.Error("watch services", zap.Error(err))
-- 
GitLab