From 7342e0758ccbae7222b5970d07db1e1fde28161f Mon Sep 17 00:00:00 2001
From: bigsheeper <yihao.dai@zilliz.com>
Date: Sun, 7 Feb 2021 17:02:13 +0800
Subject: [PATCH] Fix index and flush errors, and fix master crash error

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
---
 cmd/distributed/components/master_service.go       | 3 +++
 internal/datanode/flow_graph_insert_buffer_node.go | 5 +++++
 internal/indexservice/indexservice.go              | 9 +++++++--
 internal/indexservice/node_mgr.go                  | 4 ++++
 internal/masterservice/task.go                     | 2 ++
 internal/querynode/index_loader.go                 | 1 -
 internal/querynode/load_service.go                 | 3 ++-
 7 files changed, 23 insertions(+), 4 deletions(-)

diff --git a/cmd/distributed/components/master_service.go b/cmd/distributed/components/master_service.go
index b059a8463..9d90c7e13 100644
--- a/cmd/distributed/components/master_service.go
+++ b/cmd/distributed/components/master_service.go
@@ -103,6 +103,9 @@ func NewMasterService(ctx context.Context) (*MasterService, error) {
 	is.Params.Init()
 	log.Printf("index service address : %s", is.Params.Address)
 	indexService := isc.NewClient(is.Params.Address)
+	if err = indexService.Init(); err != nil {
+		return nil, err
+	}
 
 	if err = svr.SetIndexService(indexService); err != nil {
 		return nil, err
diff --git a/internal/datanode/flow_graph_insert_buffer_node.go b/internal/datanode/flow_graph_insert_buffer_node.go
index a9ff30b1e..eac2177df 100644
--- a/internal/datanode/flow_graph_insert_buffer_node.go
+++ b/internal/datanode/flow_graph_insert_buffer_node.go
@@ -456,6 +456,11 @@ func (ibNode *insertBufferNode) Operate(in []*Msg) []*Msg {
 					continue
 				}
 			}
+			err := ibNode.completeFlush(currentSegID)
+			if err != nil {
+				log.Println(err)
+			}
+			log.Println("Flush completed")
 		}
 	}
 
diff --git a/internal/indexservice/indexservice.go b/internal/indexservice/indexservice.go
index 27730955f..085908bf3 100644
--- a/internal/indexservice/indexservice.go
+++ b/internal/indexservice/indexservice.go
@@ -2,6 +2,7 @@ package indexservice
 
 import (
 	"context"
+	"fmt"
 	"log"
 	"sync"
 	"time"
@@ -177,6 +178,7 @@ func (i *ServiceImpl) GetStatisticsChannel() (string, error) {
 }
 
 func (i *ServiceImpl) BuildIndex(req *indexpb.BuildIndexRequest) (*indexpb.BuildIndexResponse, error) {
+	fmt.Println("builder building index ..., indexName = ", req.IndexName, "indexID = ", req.IndexID, "dataPath = ", req.DataPaths)
 	ret := &indexpb.BuildIndexResponse{
 		Status: &commonpb.Status{
 			ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
@@ -244,10 +246,13 @@ func (i *ServiceImpl) GetIndexStates(req *indexpb.IndexStatesRequest) (*indexpb.
 }
 
 func (i *ServiceImpl) GetIndexFilePaths(req *indexpb.IndexFilePathsRequest) (*indexpb.IndexFilePathsResponse, error) {
-	var indexPaths []*indexpb.IndexFilePathInfo
+	var indexPaths []*indexpb.IndexFilePathInfo = nil
 
 	for _, indexID := range req.IndexBuildIDs {
-		indexPathInfo, _ := i.metaTable.GetIndexFilePathInfo(indexID)
+		indexPathInfo, err := i.metaTable.GetIndexFilePathInfo(indexID)
+		if err != nil {
+			return nil, err
+		}
 		indexPaths = append(indexPaths, indexPathInfo)
 	}
 
diff --git a/internal/indexservice/node_mgr.go b/internal/indexservice/node_mgr.go
index c29b2437c..76f8d7077 100644
--- a/internal/indexservice/node_mgr.go
+++ b/internal/indexservice/node_mgr.go
@@ -30,6 +30,10 @@ func (i *ServiceImpl) addNode(nodeID UniqueID, req *indexpb.RegisterNodeRequest)
 	if err != nil {
 		return err
 	}
+	err = nodeClient.Init()
+	if err != nil {
+		return err
+	}
 	item := &PQItem{
 		value:    nodeClient,
 		key:      nodeID,
diff --git a/internal/masterservice/task.go b/internal/masterservice/task.go
index f144fefe0..e7200ca7d 100644
--- a/internal/masterservice/task.go
+++ b/internal/masterservice/task.go
@@ -1,6 +1,7 @@
 package masterservice
 
 import (
+	"fmt"
 	"log"
 
 	"github.com/golang/protobuf/proto"
@@ -597,6 +598,7 @@ func (t *CreateIndexReqTask) Execute() error {
 			indexParams: t.Req.ExtraParams,
 		}
 		t.core.indexTaskQueue <- &task
+		fmt.Println("create index task enqueue, segID = ", seg)
 	}
 	return nil
 }
diff --git a/internal/querynode/index_loader.go b/internal/querynode/index_loader.go
index 08d867ec7..37e24abd9 100644
--- a/internal/querynode/index_loader.go
+++ b/internal/querynode/index_loader.go
@@ -323,7 +323,6 @@ func (loader *indexLoader) getIndexPaths(indexBuildID UniqueID) ([]string, error
 	}
 
 	indexFilePathRequest := &indexpb.IndexFilePathsRequest{
-		// TODO: rename indexIDs to buildIDs
 		IndexBuildIDs: []UniqueID{indexBuildID},
 	}
 	pathResponse, err := loader.indexClient.GetIndexFilePaths(indexFilePathRequest)
diff --git a/internal/querynode/load_service.go b/internal/querynode/load_service.go
index 654001137..b06f2c503 100644
--- a/internal/querynode/load_service.go
+++ b/internal/querynode/load_service.go
@@ -10,7 +10,7 @@ import (
 	"github.com/zilliztech/milvus-distributed/internal/msgstream"
 )
 
-const indexCheckInterval = 1
+const indexCheckInterval = 3
 
 type loadService struct {
 	ctx    context.Context
@@ -115,6 +115,7 @@ func (s *loadService) loadSegmentInternal(collectionID UniqueID, partitionID Uni
 		return err
 	}
 	if errIndex == nil {
+		fmt.Println("loading index...")
 		indexPaths, err := s.segLoader.indexLoader.getIndexPaths(buildID)
 		if err != nil {
 			return err
-- 
GitLab