diff --git a/internal/indexnode/indexnode.go b/internal/indexnode/indexnode.go index beeb2884990b8b552dbd42393c343bc840e81a16..3dfab913e18051074c305cc49e247fba0c455375 100644 --- a/internal/indexnode/indexnode.go +++ b/internal/indexnode/indexnode.go @@ -48,6 +48,7 @@ type IndexNode struct { } func NewIndexNode(ctx context.Context) (*IndexNode, error) { + log.Debug("new index node ...") rand.Seed(time.Now().UnixNano()) ctx1, cancel := context.WithCancel(ctx) b := &IndexNode{ @@ -142,6 +143,14 @@ func (i *IndexNode) SetIndexServiceClient(serviceClient types.IndexService) { } func (i *IndexNode) BuildIndex(ctx context.Context, request *indexpb.BuildIndexRequest) (*commonpb.Status, error) { + log.Debug("indexnode building index ...", + zap.Int64("IndexBuildID", request.IndexBuildID), + zap.String("Indexname", request.IndexName), + zap.Int64("IndexID", request.IndexID), + zap.Strings("DataPaths", request.DataPaths), + zap.Any("TypeParams", request.TypeParams), + zap.Any("IndexParams", request.IndexParams)) + t := &IndexBuildTask{ BaseTask: BaseTask{ ctx: ctx, @@ -168,6 +177,7 @@ func (i *IndexNode) BuildIndex(ctx context.Context, request *indexpb.BuildIndexR } func (i *IndexNode) DropIndex(ctx context.Context, request *indexpb.DropIndexRequest) (*commonpb.Status, error) { + log.Debug("indexnode drop index ...", zap.Int64("index id", request.IndexID)) i.sched.IndexBuildQueue.tryToRemoveUselessIndexBuildTask(request.IndexID) return &commonpb.Status{ ErrorCode: commonpb.ErrorCode_Success, @@ -186,7 +196,7 @@ func (i *IndexNode) AddCloseCallback(callbacks ...func()) { } func (i *IndexNode) GetComponentStates(ctx context.Context) (*internalpb.ComponentStates, error) { - + log.Debug("get indexnode components states ...") stateInfo := &internalpb.ComponentInfo{ NodeID: Params.NodeID, Role: "NodeImpl", @@ -200,10 +210,17 @@ func (i *IndexNode) GetComponentStates(ctx context.Context) (*internalpb.Compone ErrorCode: commonpb.ErrorCode_Success, }, } + + log.Debug("indexnode compoents states", + zap.Any("State", ret.State), + zap.Any("Status", ret.Status), + zap.Any("SubcomponentStates", ret.SubcomponentStates)) return ret, nil } func (i *IndexNode) GetTimeTickChannel(ctx context.Context) (*milvuspb.StringResponse, error) { + log.Debug("get indexnode time tick channel ...") + return &milvuspb.StringResponse{ Status: &commonpb.Status{ ErrorCode: commonpb.ErrorCode_Success, @@ -212,6 +229,7 @@ func (i *IndexNode) GetTimeTickChannel(ctx context.Context) (*milvuspb.StringRes } func (i *IndexNode) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResponse, error) { + log.Debug("get indexnode statistics channel ...") return &milvuspb.StringResponse{ Status: &commonpb.Status{ ErrorCode: commonpb.ErrorCode_Success, diff --git a/internal/indexservice/indexservice.go b/internal/indexservice/indexservice.go index e3fc36811aeeb9bf8bc02a7702b68197638a3197..b9388e069b1cf90b364c3bc49cab6d096c5d3a5d 100644 --- a/internal/indexservice/indexservice.go +++ b/internal/indexservice/indexservice.go @@ -153,6 +153,7 @@ func (i *IndexService) UpdateStateCode(code internalpb.StateCode) { } func (i *IndexService) GetComponentStates(ctx context.Context) (*internalpb.ComponentStates, error) { + log.Debug("get indexservice component states ...") stateInfo := &internalpb.ComponentInfo{ NodeID: i.ID, Role: "IndexService", @@ -170,6 +171,7 @@ func (i *IndexService) GetComponentStates(ctx context.Context) (*internalpb.Comp } func (i *IndexService) GetTimeTickChannel(ctx context.Context) (*milvuspb.StringResponse, error) { + log.Debug("get indexservice time tick channel ...") return &milvuspb.StringResponse{ Status: &commonpb.Status{ ErrorCode: commonpb.ErrorCode_Success, @@ -180,6 +182,7 @@ func (i *IndexService) GetTimeTickChannel(ctx context.Context) (*milvuspb.String } func (i *IndexService) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResponse, error) { + log.Debug("get indexservice statistics channel ...") return &milvuspb.StringResponse{ Status: &commonpb.Status{ ErrorCode: commonpb.ErrorCode_Success, @@ -190,7 +193,13 @@ func (i *IndexService) GetStatisticsChannel(ctx context.Context) (*milvuspb.Stri } func (i *IndexService) BuildIndex(ctx context.Context, req *indexpb.BuildIndexRequest) (*indexpb.BuildIndexResponse, error) { - log.Debug("builder building index", zap.String("indexName = ", req.IndexName), zap.Int64("indexID = ", req.IndexID), zap.Strings("dataPath = ", req.DataPaths)) + log.Debug("indexservice building index ...", + zap.Int64("IndexBuildID", req.IndexBuildID), + zap.String("IndexName = ", req.IndexName), + zap.Int64("IndexID = ", req.IndexID), + zap.Strings("DataPath = ", req.DataPaths), + zap.Any("TypeParams", req.TypeParams), + zap.Any("IndexParams", req.IndexParams)) ret := &indexpb.BuildIndexResponse{ Status: &commonpb.Status{ ErrorCode: commonpb.ErrorCode_UnexpectedError, @@ -245,6 +254,7 @@ func (i *IndexService) BuildIndex(ctx context.Context, req *indexpb.BuildIndexRe } func (i *IndexService) GetIndexStates(ctx context.Context, req *indexpb.GetIndexStatesRequest) (*indexpb.GetIndexStatesResponse, error) { + log.Debug("get index states ...", zap.Int64s("IndexBuildIDs", req.IndexBuildIDs)) var indexStates []*indexpb.IndexInfo for _, indexID := range req.IndexBuildIDs { indexState, err := i.metaTable.GetIndexState(indexID) @@ -259,10 +269,16 @@ func (i *IndexService) GetIndexStates(ctx context.Context, req *indexpb.GetIndex }, States: indexStates, } + log.Debug("get index states success") + log.Debug("get index states", + zap.Any("index status", ret.Status), + zap.Any("index states", ret.States)) + return ret, nil } func (i *IndexService) DropIndex(ctx context.Context, req *indexpb.DropIndexRequest) (*commonpb.Status, error) { + log.Debug("indexservice dropping index ...", zap.Int64("indexID", req.IndexID)) i.sched.IndexAddQueue.tryToRemoveUselessIndexAddTask(req.IndexID) err := i.metaTable.MarkIndexAsDeleted(req.IndexID) @@ -286,12 +302,14 @@ func (i *IndexService) DropIndex(ctx context.Context, req *indexpb.DropIndexRequ }() }() + log.Debug("indexservice drop index success") return &commonpb.Status{ ErrorCode: commonpb.ErrorCode_Success, }, nil } func (i *IndexService) GetIndexFilePaths(ctx context.Context, req *indexpb.GetIndexFilePathsRequest) (*indexpb.GetIndexFilePathsResponse, error) { + log.Debug("indexservice", zap.Int64s("get index file paths", req.IndexBuildIDs)) var indexPaths []*indexpb.IndexFilePathInfo = nil for _, indexID := range req.IndexBuildIDs { @@ -301,6 +319,7 @@ func (i *IndexService) GetIndexFilePaths(ctx context.Context, req *indexpb.GetIn } indexPaths = append(indexPaths, indexPathInfo) } + log.Debug("indexservice, get index file paths success") ret := &indexpb.GetIndexFilePathsResponse{ Status: &commonpb.Status{ @@ -308,10 +327,16 @@ func (i *IndexService) GetIndexFilePaths(ctx context.Context, req *indexpb.GetIn }, FilePaths: indexPaths, } + log.Debug("indexservice", zap.Any("index file paths", ret.FilePaths)) + return ret, nil } func (i *IndexService) NotifyBuildIndex(ctx context.Context, nty *indexpb.NotifyBuildIndexRequest) (*commonpb.Status, error) { + log.Debug("indexservice", + zap.Int64("notify build index", nty.IndexBuildID), + zap.Strings("index file paths", nty.IndexFilePaths), + zap.Int64("node ID", nty.NodeID)) ret := &commonpb.Status{ ErrorCode: commonpb.ErrorCode_Success, } diff --git a/internal/indexservice/meta_table.go b/internal/indexservice/meta_table.go index 8476d3f449e4517f6c1e3a942a2b8af50f96e155..db182ea48ca23f84c74511517c50d56334e26704 100644 --- a/internal/indexservice/meta_table.go +++ b/internal/indexservice/meta_table.go @@ -76,6 +76,7 @@ func (mt *metaTable) saveIndexMeta(meta *indexpb.IndexMeta) error { func (mt *metaTable) AddIndex(indexBuildID UniqueID, req *indexpb.BuildIndexRequest) error { mt.lock.Lock() defer mt.lock.Unlock() + log.Debug("indexservice add index ...") _, ok := mt.indexBuildID2Meta[indexBuildID] if ok { return fmt.Errorf("index already exists with ID = %d", indexBuildID) @@ -92,6 +93,8 @@ func (mt *metaTable) MarkIndexAsDeleted(indexID UniqueID) error { mt.lock.Lock() defer mt.lock.Unlock() + log.Debug("indexservice", zap.Int64("mark index is deleted", indexID)) + for indexBuildID, meta := range mt.indexBuildID2Meta { if meta.Req.IndexID == indexID { meta.State = commonpb.IndexState_Deleted @@ -105,6 +108,8 @@ func (mt *metaTable) MarkIndexAsDeleted(indexID UniqueID) error { func (mt *metaTable) NotifyBuildIndex(nty *indexpb.NotifyBuildIndexRequest) error { mt.lock.Lock() defer mt.lock.Unlock() + + log.Debug("indexservice", zap.Int64("notify build index", nty.IndexBuildID)) indexBuildID := nty.IndexBuildID meta, ok := mt.indexBuildID2Meta[indexBuildID] if !ok { diff --git a/internal/indexservice/node_mgr.go b/internal/indexservice/node_mgr.go index 884d7bdd7ec436c6dd540f5c3f576834c52fd03b..2e5970a82c9055709e64131aef2262c05730ab4c 100644 --- a/internal/indexservice/node_mgr.go +++ b/internal/indexservice/node_mgr.go @@ -5,7 +5,10 @@ import ( "errors" "strconv" + "go.uber.org/zap" + grpcindexnodeclient "github.com/zilliztech/milvus-distributed/internal/distributed/indexnode/client" + "github.com/zilliztech/milvus-distributed/internal/log" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" "github.com/zilliztech/milvus-distributed/internal/proto/indexpb" "github.com/zilliztech/milvus-distributed/internal/proto/internalpb" @@ -56,6 +59,7 @@ func (i *IndexService) prepareNodeInitParams() []*commonpb.KeyValuePair { } func (i *IndexService) RegisterNode(ctx context.Context, req *indexpb.RegisterNodeRequest) (*indexpb.RegisterNodeResponse, error) { + log.Debug("indexservice", zap.Any("register index node, node address = ", req.Address)) ret := &indexpb.RegisterNodeResponse{ Status: &commonpb.Status{ ErrorCode: commonpb.ErrorCode_UnexpectedError,