Skip to content
Snippets Groups Projects
Commit c6c99ef3 authored by Yihao Dai's avatar Yihao Dai Committed by yefu.chen
Browse files

Implement getSegmengInfo function


Signed-off-by: default avatarbigsheeper <yihao.dai@zilliz.com>
parent 1aafe86f
No related branches found
No related tags found
No related merge requests found
Showing with 505 additions and 87 deletions
......@@ -26,6 +26,7 @@ type QueryService interface {
ReleasePartitions(req ReleasePartitionRequest) error
CreateQueryChannel() (CreateQueryChannelResponse, error)
GetSegmentInfo(req SegmentInfoRequest) (SegmentInfoResponse, error)
}
```
......@@ -166,7 +167,28 @@ type CreateQueryChannelResponse struct {
}
```
* *GetSegmentInfo* *
```go
type SegmentInfo struct {
segmentID UniqueID
collectionID UniqueID
partitionID UniqueID
mem_size int64
num_rows int64
index_name string
indexID UniqueID
}
type SegmentInfoRequest struct {
MsgBase
SegmentIDs []UniqueID
}
type SegmentInfoResponse struct {
infos []SegmentInfo
}
```
#### 8.2 Query Channel
......
......@@ -96,3 +96,7 @@ func (c *Client) LoadSegments(in *querypb.LoadSegmentRequest) (*commonpb.Status,
func (c *Client) ReleaseSegments(in *querypb.ReleaseSegmentRequest) (*commonpb.Status, error) {
return c.grpcClient.ReleaseSegments(context.TODO(), in)
}
func (c *Client) GetSegmentInfo(in *querypb.SegmentInfoRequest) (*querypb.SegmentInfoResponse, error) {
return c.grpcClient.GetSegmentInfo(context.TODO(), in)
}
......@@ -156,3 +156,7 @@ func (s *Server) ReleaseSegments(ctx context.Context, in *querypb.ReleaseSegment
// ignore ctx
return s.node.ReleaseSegments(in)
}
func (s *Server) GetSegmentInfo(ctx context.Context, in *querypb.SegmentInfoRequest) (*querypb.SegmentInfoResponse, error) {
return s.node.GetSegmentInfo(in)
}
......@@ -122,3 +122,7 @@ func (c *Client) CreateQueryChannel() (*querypb.CreateQueryChannelResponse, erro
func (c *Client) GetPartitionStates(req *querypb.PartitionStatesRequest) (*querypb.PartitionStatesResponse, error) {
return c.grpcClient.GetPartitionStates(context.TODO(), req)
}
func (c *Client) GetSegmentInfo(req *querypb.SegmentInfoRequest) (*querypb.SegmentInfoResponse, error) {
return c.grpcClient.GetSegmentInfo(context.TODO(), req)
}
......@@ -166,6 +166,10 @@ func (s *Server) CreateQueryChannel(ctx context.Context, req *commonpb.Empty) (*
return s.queryService.CreateQueryChannel()
}
func (s *Server) GetSegmentInfo(ctx context.Context, req *querypb.SegmentInfoRequest) (*querypb.SegmentInfoResponse, error) {
return s.queryService.GetSegmentInfo(req)
}
func NewServer(ctx context.Context) *Server {
ctx1, cancel := context.WithCancel(ctx)
service, err := queryservice.NewQueryService(ctx1)
......
......@@ -141,6 +141,26 @@ message ComponentStatesResponse {
internal.ComponentStates states = 2;
}
message SegmentInfo {
int64 segmentID = 1;
int64 collectionID = 2;
int64 partitionID = 3;
int64 mem_size = 4;
int64 num_rows = 5;
string index_name = 6;
int64 indexID = 7;
}
message SegmentInfoRequest {
common.MsgBase base = 1;
repeated int64 segmentIDs = 2;
}
message SegmentInfoResponse {
common.Status status = 1;
repeated SegmentInfo infos = 2;
}
service QueryService {
rpc RegisterNode(RegisterNodeRequest) returns (RegisterNodeResponse) {}
......@@ -157,6 +177,7 @@ service QueryService {
rpc GetStatisticsChannel(common.Empty) returns (milvus.StringResponse) {}
rpc GetPartitionStates(PartitionStatesRequest) returns (PartitionStatesResponse) {}
rpc GetComponentStates(common.Empty) returns (internal.ComponentStates) {}
rpc GetSegmentInfo(SegmentInfoRequest) returns (SegmentInfoResponse) {}
}
service QueryNode {
......@@ -169,4 +190,5 @@ service QueryNode {
rpc WatchDmChannels(WatchDmChannelsRequest) returns (common.Status) {}
rpc LoadSegments(LoadSegmentRequest) returns (common.Status) {}
rpc ReleaseSegments(ReleaseSegmentRequest) returns (common.Status) {}
rpc GetSegmentInfo(SegmentInfoRequest) returns (SegmentInfoResponse) {}
}
This diff is collapsed.
......@@ -93,8 +93,10 @@ func (s *loadService) execute(l *loadIndex) error {
var err error
var indexBuffer [][]byte
var indexParams indexParam
var indexName string
var indexID UniqueID
fn := func() error {
indexBuffer, indexParams, err = s.loadIndex(l.indexPaths)
indexBuffer, indexParams, indexName, indexID, err = s.loadIndex(l.indexPaths)
if err != nil {
return err
}
......@@ -117,8 +119,8 @@ func (s *loadService) execute(l *loadIndex) error {
if err != nil {
return err
}
//3. update segment index stats
err = s.updateSegmentIndexStats(indexParams, l)
// 3. update segment index stats
err = s.updateSegmentIndexStats(indexParams, indexName, indexID, l)
if err != nil {
return err
}
......@@ -173,7 +175,7 @@ func (s *loadService) fieldsStatsKey2IDs(key string) (UniqueID, UniqueID, error)
return collectionID, fieldID, nil
}
func (s *loadService) updateSegmentIndexStats(indexParams indexParam, l *loadIndex) error {
func (s *loadService) updateSegmentIndexStats(indexParams indexParam, indexName string, indexID UniqueID, l *loadIndex) error {
targetSegment, err := s.replica.getSegmentByID(l.segmentID)
if err != nil {
return err
......@@ -214,30 +216,39 @@ func (s *loadService) updateSegmentIndexStats(indexParams indexParam, l *loadInd
})
}
}
return targetSegment.setIndexParam(l.fieldID, newIndexParams)
err = targetSegment.setIndexParam(l.fieldID, newIndexParams)
if err != nil {
return err
}
targetSegment.setIndexName(indexName)
targetSegment.setIndexID(indexID)
return nil
}
func (s *loadService) loadIndex(indexPath []string) ([][]byte, indexParam, error) {
func (s *loadService) loadIndex(indexPath []string) ([][]byte, indexParam, string, UniqueID, error) {
index := make([][]byte, 0)
var indexParams indexParam
var indexName string
var indexID UniqueID
for _, p := range indexPath {
fmt.Println("load path = ", indexPath)
indexPiece, err := s.kv.Load(p)
if err != nil {
return nil, nil, err
return nil, nil, "", -1, err
}
// get index params when detecting indexParamPrefix
if path.Base(p) == storage.IndexParamsFile {
indexCodec := storage.NewIndexCodec()
_, indexParams, _, _, err = indexCodec.Deserialize([]*storage.Blob{
_, indexParams, indexName, indexID, err = indexCodec.Deserialize([]*storage.Blob{
{
Key: storage.IndexParamsFile,
Value: []byte(indexPiece),
},
})
if err != nil {
return nil, nil, err
return nil, nil, "", -1, err
}
} else {
index = append(index, []byte(indexPiece))
......@@ -245,9 +256,9 @@ func (s *loadService) loadIndex(indexPath []string) ([][]byte, indexParam, error
}
if len(indexParams) <= 0 {
return nil, nil, errors.New("cannot find index param")
return nil, nil, "", -1, errors.New("cannot find index param")
}
return index, indexParams, nil
return index, indexParams, indexName, indexID, nil
}
func (s *loadService) updateSegmentIndex(indexParams indexParam, bytesIndex [][]byte, l *loadIndex) error {
......
......@@ -38,6 +38,7 @@ type Node interface {
WatchDmChannels(in *queryPb.WatchDmChannelsRequest) (*commonpb.Status, error)
LoadSegments(in *queryPb.LoadSegmentRequest) (*commonpb.Status, error)
ReleaseSegments(in *queryPb.ReleaseSegmentRequest) (*commonpb.Status, error)
GetSegmentInfo(in *queryPb.SegmentInfoRequest) (*queryPb.SegmentInfoResponse, error)
}
type QueryService = typeutil.QueryServiceInterface
......@@ -498,3 +499,29 @@ func (node *QueryNode) ReleaseSegments(in *queryPb.ReleaseSegmentRequest) (*comm
ErrorCode: commonpb.ErrorCode_SUCCESS,
}, nil
}
func (node *QueryNode) GetSegmentInfo(in *queryPb.SegmentInfoRequest) (*queryPb.SegmentInfoResponse, error) {
infos := make([]*queryPb.SegmentInfo, 0)
for _, id := range in.SegmentIDs {
segment, err := node.replica.getSegmentByID(id)
if err != nil {
continue
}
info := &queryPb.SegmentInfo{
SegmentID: segment.ID(),
CollectionID: segment.collectionID,
PartitionID: segment.partitionID,
MemSize: segment.getMemSize(),
NumRows: segment.getRowCount(),
IndexName: segment.getIndexName(),
IndexID: segment.getIndexID(),
}
infos = append(infos, info)
}
return &queryPb.SegmentInfoResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_SUCCESS,
},
Infos: infos,
}, nil
}
......@@ -47,8 +47,10 @@ type Segment struct {
typeMu sync.Mutex // guards builtIndex
segmentType C.SegmentType
paramMutex sync.RWMutex // guards indexParam
paramMutex sync.RWMutex // guards index
indexParam map[int64]indexParam
indexName string
indexID UniqueID
}
//-------------------------------------------------------------------------------------- common interfaces
......@@ -68,6 +70,30 @@ func (s *Segment) getRecentlyModified() bool {
return s.recentlyModified
}
func (s *Segment) setIndexName(name string) {
s.rmMutex.Lock()
defer s.rmMutex.Unlock()
s.indexName = name
}
func (s *Segment) getIndexName() string {
s.rmMutex.Lock()
defer s.rmMutex.Unlock()
return s.indexName
}
func (s *Segment) setIndexID(id UniqueID) {
s.rmMutex.Lock()
defer s.rmMutex.Unlock()
s.indexID = id
}
func (s *Segment) getIndexID() UniqueID {
s.rmMutex.Lock()
defer s.rmMutex.Unlock()
return s.indexID
}
func (s *Segment) setType(segType segmentType) {
s.typeMu.Lock()
defer s.typeMu.Unlock()
......
......@@ -21,3 +21,7 @@ func (qn *queryNodeInfo) GetComponentStates() (*internalpb2.ComponentStates, err
func (qn *queryNodeInfo) LoadSegments(in *querypb.LoadSegmentRequest) (*commonpb.Status, error) {
return qn.client.LoadSegments(in)
}
func (qn *queryNodeInfo) GetSegmentInfo(in *querypb.SegmentInfoRequest) (*querypb.SegmentInfoResponse, error) {
return qn.client.GetSegmentInfo(in)
}
......@@ -35,6 +35,7 @@ type QueryNodeInterface interface {
WatchDmChannels(in *querypb.WatchDmChannelsRequest) (*commonpb.Status, error)
LoadSegments(in *querypb.LoadSegmentRequest) (*commonpb.Status, error)
ReleaseSegments(in *querypb.ReleaseSegmentRequest) (*commonpb.Status, error)
GetSegmentInfo(req *querypb.SegmentInfoRequest) (*querypb.SegmentInfoResponse, error)
}
type QueryService struct {
......@@ -462,6 +463,28 @@ func (qs *QueryService) GetPartitionStates(req *querypb.PartitionStatesRequest)
}, nil
}
func (qs *QueryService) GetSegmentInfo(req *querypb.SegmentInfoRequest) (*querypb.SegmentInfoResponse, error) {
segmentInfos := make([]*querypb.SegmentInfo, 0)
for _, node := range qs.queryNodes {
segmentInfo, err := node.client.GetSegmentInfo(req)
if err != nil {
return &querypb.SegmentInfoResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: err.Error(),
},
}, err
}
segmentInfos = append(segmentInfos, segmentInfo.Infos...)
}
return &querypb.SegmentInfoResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_SUCCESS,
},
Infos: segmentInfos,
}, nil
}
func NewQueryService(ctx context.Context) (*QueryService, error) {
nodes := make([]*queryNodeInfo, 0)
ctx1, cancel := context.WithCancel(ctx)
......
......@@ -48,4 +48,5 @@ type QueryServiceInterface interface {
ReleasePartitions(req *querypb.ReleasePartitionRequest) (*commonpb.Status, error)
CreateQueryChannel() (*querypb.CreateQueryChannelResponse, error)
GetPartitionStates(req *querypb.PartitionStatesRequest) (*querypb.PartitionStatesResponse, error)
GetSegmentInfo(req *querypb.SegmentInfoRequest) (*querypb.SegmentInfoResponse, error)
}
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment