diff --git a/internal/datanode/data_node.go b/internal/datanode/data_node.go
index 0ae0497e479afa7473dd0000bc978b603a89d19a..e577c525c89969c9ac6d7d4d23e81418c5c79a86 100644
--- a/internal/datanode/data_node.go
+++ b/internal/datanode/data_node.go
@@ -208,8 +208,8 @@ func (node *DataNode) WatchDmChannels(in *datapb.WatchDmChannelRequest) (*common
switch {
- case node.State != internalpb2.StateCode_HEALTHY:
- status.Reason = fmt.Sprintf("DataNode %d not healthy!", node.NodeID)
+ case node.State != internalpb2.StateCode_INITIALIZING:
+ status.Reason = fmt.Sprintf("DataNode %d not initializing!", node.NodeID)
return status, errors.New(status.GetReason())
case len(Params.InsertChannelNames) != 0:
diff --git a/internal/dataservice/server.go b/internal/dataservice/server.go
index 397283a8999f12d741479871db61908845ded76a..535d38408d4d131958c55f871b91a80b82bd044c 100644
--- a/internal/dataservice/server.go
+++ b/internal/dataservice/server.go
@@ -50,6 +50,7 @@ type DataService interface {
GetPartitionStatistics(req *datapb.PartitionStatsRequest) (*datapb.PartitionStatsResponse, error)
GetComponentStates() (*internalpb2.ComponentStates, error)
GetCount(req *datapb.CollectionCountRequest) (*datapb.CollectionCountResponse, error)
+ GetSegmentInfo(req *datapb.SegmentInfoRequest) (*datapb.SegmentInfoResponse, error)
}
type MasterClient interface {
@@ -728,6 +729,10 @@ func (s *Server) GetCount(req *datapb.CollectionCountRequest) (*datapb.Collectio
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
},
}
+ if !s.checkStateIsHealthy() {
+ resp.Status.Reason = "data service is not healthy"
+ return resp, nil
+ }
nums, err := s.meta.GetNumRowsOfCollection(req.CollectionID)
if err != nil {
resp.Status.Reason = err.Error()
@@ -737,3 +742,27 @@ func (s *Server) GetCount(req *datapb.CollectionCountRequest) (*datapb.Collectio
resp.Status.ErrorCode = commonpb.ErrorCode_SUCCESS
return resp, nil
}
+
+func (s *Server) GetSegmentInfo(req *datapb.SegmentInfoRequest) (*datapb.SegmentInfoResponse, error) {
+ resp := &datapb.SegmentInfoResponse{
+ Status: &commonpb.Status{
+ ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
+ },
+ }
+ if !s.checkStateIsHealthy() {
+ resp.Status.Reason = "data service is not healthy"
+ return resp, nil
+ }
+ infos := make([]*datapb.SegmentInfo, len(req.SegmentIDs))
+ for i, id := range req.SegmentIDs {
+ segmentInfo, err := s.meta.GetSegment(id)
+ if err != nil {
+ resp.Status.Reason = err.Error()
+ return resp, nil
+ }
+ infos[i] = segmentInfo
+ }
+ resp.Status.ErrorCode = commonpb.ErrorCode_SUCCESS
+ resp.Infos = infos
+ return resp, nil
+}
diff --git a/internal/distributed/dataservice/client.go b/internal/distributed/dataservice/client.go
index c2b886ecf92f0dc69aab7fbc5396f00bee6b348f..4507502ef070ae4871424879158f1646bbc55073 100644
--- a/internal/distributed/dataservice/client.go
+++ b/internal/distributed/dataservice/client.go
@@ -137,3 +137,7 @@ func (c *Client) GetSegmentInfoChannel() (string, error) {
func (c *Client) GetCount(req *datapb.CollectionCountRequest) (*datapb.CollectionCountResponse, error) {
return c.grpcClient.GetCount(context.Background(), req)
}
+
+func (c *Client) GetSegmentInfo(req *datapb.SegmentInfoRequest) (*datapb.SegmentInfoResponse, error) {
+ return c.grpcClient.GetSegmentInfo(context.Background(), req)
+}
diff --git a/internal/distributed/dataservice/grpc_service.go b/internal/distributed/dataservice/grpc_service.go
index 413de1c21ef6d4f69fce20c53fbe242638d3749e..ff39512820b320dc86104938b5b721757774716f 100644
--- a/internal/distributed/dataservice/grpc_service.go
+++ b/internal/distributed/dataservice/grpc_service.go
@@ -25,7 +25,7 @@ type Service struct {
}
func (s *Service) GetSegmentInfo(ctx context.Context, request *datapb.SegmentInfoRequest) (*datapb.SegmentInfoResponse, error) {
- panic("implement me")
+ return s.server.GetSegmentInfo(request)
}
func NewGrpcService(ctx context.Context) *Service {
diff --git a/internal/distributed/proxynode/service.go b/internal/distributed/proxynode/service.go
index b3c3611245c6b3ac2caa20c19343eb35b33671e1..9c325d1dc08708ea0bb727e2cf7285aa469c30e3 100644
--- a/internal/distributed/proxynode/service.go
+++ b/internal/distributed/proxynode/service.go
@@ -322,5 +322,5 @@ func (s *Server) GetDdChannel(ctx context.Context, request *commonpb.Empty) (*mi
}
func (s *Server) GetPersistentSegmentInfo(ctx context.Context, request *milvuspb.PersistentSegmentInfoRequest) (*milvuspb.PersistentSegmentInfoResponse, error) {
- panic("implement me")
+ return s.impl.GetPersistentSegmentInfo(request)
}
diff --git a/internal/proxynode/impl.go b/internal/proxynode/impl.go
index 593ac8b7e7afd9a9f97e62f45cb152f6b449a669..d0c898e5b743813825ea9af4b7cdd5e6028a9301 100644
--- a/internal/proxynode/impl.go
+++ b/internal/proxynode/impl.go
@@ -2,10 +2,13 @@ package proxynode
import (
"context"
+ "errors"
"log"
"strconv"
"time"
+ "github.com/zilliztech/milvus-distributed/internal/proto/datapb"
+
"github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
@@ -590,3 +593,109 @@ func (node *NodeImpl) Flush(request *milvuspb.FlushRequest) (*commonpb.Status, e
func (node *NodeImpl) GetDdChannel(request *commonpb.Empty) (*milvuspb.StringResponse, error) {
panic("implement me")
}
+
+func (node *NodeImpl) GetPersistentSegmentInfo(req *milvuspb.PersistentSegmentInfoRequest) (*milvuspb.PersistentSegmentInfoResponse, error) {
+ resp := &milvuspb.PersistentSegmentInfoResponse{
+ Status: &commonpb.Status{
+ ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
+ },
+ }
+ segments, err := node.getSegmentsOfCollection(req.DbName, req.CollectionName)
+ if err != nil {
+ resp.Status.Reason = err.Error()
+ return resp, nil
+ }
+ infoResp, err := node.dataServiceClient.GetSegmentInfo(&datapb.SegmentInfoRequest{
+ Base: &commonpb.MsgBase{
+ MsgType: commonpb.MsgType_kSegmentInfo,
+ MsgID: 0,
+ Timestamp: 0,
+ SourceID: Params.ProxyID,
+ },
+ SegmentIDs: segments,
+ })
+ if err != nil {
+ resp.Status.Reason = err.Error()
+ return resp, nil
+ }
+ if infoResp.Status.ErrorCode != commonpb.ErrorCode_SUCCESS {
+ resp.Status.Reason = infoResp.Status.Reason
+ return resp, nil
+ }
+ persistentInfos := make([]*milvuspb.PersistentSegmentInfo, len(infoResp.Infos))
+ for i, info := range infoResp.Infos {
+ persistentInfos[i] = &milvuspb.PersistentSegmentInfo{
+ SegmentID: info.SegmentID,
+ CollectionID: info.CollectionID,
+ PartitionID: info.PartitionID,
+ OpenTime: info.OpenTime,
+ SealedTime: info.SealedTime,
+ FlushedTime: info.FlushedTime,
+ NumRows: info.NumRows,
+ MemSize: info.MemSize,
+ State: info.State,
+ }
+ }
+ resp.Status.ErrorCode = commonpb.ErrorCode_SUCCESS
+ resp.Infos = persistentInfos
+ return resp, nil
+}
+
+func (node *NodeImpl) getSegmentsOfCollection(dbName string, collectionName string) ([]UniqueID, error) {
+ describeCollectionResponse, err := node.masterClient.DescribeCollection(&milvuspb.DescribeCollectionRequest{
+ Base: &commonpb.MsgBase{
+ MsgType: commonpb.MsgType_kDescribeCollection,
+ MsgID: 0,
+ Timestamp: 0,
+ SourceID: Params.ProxyID,
+ },
+ DbName: dbName,
+ CollectionName: collectionName,
+ })
+ if err != nil {
+ return nil, err
+ }
+ if describeCollectionResponse.Status.ErrorCode != commonpb.ErrorCode_SUCCESS {
+ return nil, errors.New(describeCollectionResponse.Status.Reason)
+ }
+ collectionID := describeCollectionResponse.CollectionID
+ showPartitionsResp, err := node.masterClient.ShowPartitions(&milvuspb.ShowPartitionRequest{
+ Base: &commonpb.MsgBase{
+ MsgType: commonpb.MsgType_kShowPartitions,
+ MsgID: 0,
+ Timestamp: 0,
+ SourceID: Params.ProxyID,
+ },
+ DbName: dbName,
+ CollectionName: collectionName,
+ CollectionID: collectionID,
+ })
+ if err != nil {
+ return nil, err
+ }
+ if showPartitionsResp.Status.ErrorCode != commonpb.ErrorCode_SUCCESS {
+ return nil, errors.New(showPartitionsResp.Status.Reason)
+ }
+
+ ret := make([]UniqueID, 0)
+ for _, partitionID := range showPartitionsResp.PartitionIDs {
+ showSegmentResponse, err := node.masterClient.ShowSegments(&milvuspb.ShowSegmentRequest{
+ Base: &commonpb.MsgBase{
+ MsgType: commonpb.MsgType_kShowSegment,
+ MsgID: 0,
+ Timestamp: 0,
+ SourceID: Params.ProxyID,
+ },
+ CollectionID: collectionID,
+ PartitionID: partitionID,
+ })
+ if err != nil {
+ return nil, err
+ }
+ if showSegmentResponse.Status.ErrorCode != commonpb.ErrorCode_SUCCESS {
+ return nil, errors.New(showSegmentResponse.Status.Reason)
+ }
+ ret = append(ret, showSegmentResponse.SegmentIDs...)
+ }
+ return ret, nil
+}
diff --git a/internal/proxynode/interface.go b/internal/proxynode/interface.go
index f262a633d3f8b0ef0aa9454a166d799110eebfe0..d7f8b048b544d8f6c326fccd99834eac77e946e5 100644
--- a/internal/proxynode/interface.go
+++ b/internal/proxynode/interface.go
@@ -22,6 +22,7 @@ type MasterClient interface {
ShowPartitions(in *milvuspb.ShowPartitionRequest) (*milvuspb.ShowPartitionResponse, error)
CreateIndex(in *milvuspb.CreateIndexRequest) (*commonpb.Status, error)
DescribeIndex(in *milvuspb.DescribeIndexRequest) (*milvuspb.DescribeIndexResponse, error)
+ ShowSegments(in *milvuspb.ShowSegmentRequest) (*milvuspb.ShowSegmentResponse, error)
}
type IndexServiceClient interface {
@@ -51,6 +52,7 @@ type DataServiceClient interface {
GetCollectionStatistics(req *datapb.CollectionStatsRequest) (*datapb.CollectionStatsResponse, error)
GetComponentStates() (*internalpb2.ComponentStates, error)
+ GetSegmentInfo(req *datapb.SegmentInfoRequest) (*datapb.SegmentInfoResponse, error)
}
type ProxyServiceClient interface {