Skip to content
Snippets Groups Projects
Commit 9828a529 authored by neza2017's avatar neza2017 Committed by yefu.chen
Browse files

Add create index


Signed-off-by: default avatarneza2017 <yefu.chen@zilliz.com>
parent 7f044fff
No related branches found
No related tags found
No related merge requests found
Showing
with 592 additions and 68 deletions
......@@ -21,7 +21,7 @@ msgChannel:
proxyTimeTick: "proxyTimeTick"
writeNodeTimeTick: "writeNodeTimeTick" # GOOSE TODO: remove this
dataNodeTimeTick: "dataNodeTimeTick"
queryNodeTimeTick: "queryNodeTimeTick"
queryTimeTick: "queryTimeTick"
dataNodeSegStatistics: "dataNodeSegStatistics"
# old name: statsChannels: "statistic"
queryNodeStats: "query-node-stats"
......
......@@ -52,8 +52,6 @@ proxyNode:
port: 19530
queryNode:
ip: localhost
port: 20010
gracefulTime: 5000 #ms
indexBuilder:
......
......@@ -78,9 +78,9 @@ In order to boost throughput, we model Milvus as a stream-driven system.
```go
type Component interface {
Init()
Start()
Stop()
Init() error
Start() error
Stop() error
GetComponentStates() (ComponentStates, error)
GetTimeTickChannel() (string, error)
GetStatisticsChannel() (string, error)
......
......@@ -11,15 +11,15 @@ type Client struct {
// GOOSE TODO: add DataNodeClient
}
func (c *Client) Init() {
func (c *Client) Init() error {
panic("implement me")
}
func (c *Client) Start() {
func (c *Client) Start() error {
panic("implement me")
}
func (c *Client) Stop() {
func (c *Client) Stop() error {
panic("implement me")
}
......
......@@ -34,7 +34,7 @@ func (g Client) GetIndexStates(req *indexpb.IndexStatesRequest) (*indexpb.IndexS
return g.grpcClient.GetIndexStates(ctx, req)
}
func (g Client) GetIndexFilePaths(req *indexpb.IndexFilePathRequest) (*indexpb.IndexFilePathsResponse, error) {
func (g Client) GetIndexFilePaths(req *indexpb.IndexFilePathsRequest) (*indexpb.IndexFilePathsResponse, error) {
ctx := context.TODO()
......
......@@ -28,8 +28,9 @@ type Server struct {
loopWg sync.WaitGroup
}
func (s *Server) Init() {
func (s *Server) Init() error {
indexservice.Params.Init()
return nil
}
func (s *Server) Start() error {
......@@ -37,8 +38,9 @@ func (s *Server) Start() error {
return s.startIndexServer()
}
func (s *Server) Stop() {
func (s *Server) Stop() error {
s.loopWg.Wait()
return nil
}
func (s *Server) GetComponentStates() (*internalpb2.ComponentStates, error) {
......@@ -69,7 +71,7 @@ func (s *Server) GetIndexStates(ctx context.Context, req *indexpb.IndexStatesReq
return s.server.GetIndexStates(req)
}
func (s *Server) GetIndexFilePaths(ctx context.Context, req *indexpb.IndexFilePathRequest) (*indexpb.IndexFilePathsResponse, error) {
func (s *Server) GetIndexFilePaths(ctx context.Context, req *indexpb.IndexFilePathsRequest) (*indexpb.IndexFilePathsResponse, error) {
return s.server.GetIndexFilePaths(req)
}
......
......@@ -10,6 +10,7 @@ import (
"github.com/stretchr/testify/assert"
cms "github.com/zilliztech/milvus-distributed/internal/masterservice"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/datapb"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
"github.com/zilliztech/milvus-distributed/internal/proto/milvuspb"
"github.com/zilliztech/milvus-distributed/internal/proto/schemapb"
......@@ -37,6 +38,7 @@ func TestGrpcService(t *testing.T) {
cms.Params.MaxPartitionNum = 64
cms.Params.DefaultPartitionName = "_default"
cms.Params.DefaultIndexName = "_default"
t.Logf("master service port = %d", cms.Params.Port)
......@@ -84,6 +86,18 @@ func TestGrpcService(t *testing.T) {
return nil
}
core.DataServiceSegmentChan = make(chan *datapb.SegmentInfo, 1024)
core.GetBinlogFilePathsFromDataServiceReq = func(segID typeutil.UniqueID, fieldID typeutil.UniqueID) ([]string, error) {
return []string{"file1", "file2", "file3"}, nil
}
binlogPathArray := make([]string, 0, 16)
core.BuildIndexReq = func(binlog []string, typeParams []*commonpb.KeyValuePair, indexParams []*commonpb.KeyValuePair) (typeutil.UniqueID, error) {
binlogPathArray = append(binlogPathArray, binlog...)
return 2000, nil
}
err = svr.Start()
assert.Nil(t, err)
......@@ -275,6 +289,107 @@ func TestGrpcService(t *testing.T) {
assert.Nil(t, err)
assert.Equal(t, rsp.Status.ErrorCode, commonpb.ErrorCode_SUCCESS)
assert.Equal(t, len(rsp.PartitionNames), 2)
assert.Equal(t, len(rsp.PartitionIDs), 2)
})
t.Run("show segment", func(t *testing.T) {
coll, err := core.MetaTable.GetCollectionByName("testColl")
assert.Nil(t, err)
partID := coll.PartitionIDs[1]
part, err := core.MetaTable.GetPartitionByID(partID)
assert.Nil(t, err)
assert.Zero(t, len(part.SegmentIDs))
seg := &datapb.SegmentInfo{
SegmentID: 1000,
CollectionID: coll.ID,
PartitionID: part.PartitionID,
}
core.DataServiceSegmentChan <- seg
time.Sleep(time.Millisecond * 100)
part, err = core.MetaTable.GetPartitionByID(partID)
assert.Nil(t, err)
assert.Equal(t, len(part.SegmentIDs), 1)
req := &milvuspb.ShowSegmentRequest{
Base: &commonpb.MsgBase{
MsgType: 111, //TODO show segment request msg type
MsgID: 111,
Timestamp: 111,
SourceID: 111,
},
CollectionID: coll.ID,
PartitionID: partID,
}
rsp, err := cli.ShowSegments(req)
assert.Nil(t, err)
assert.Equal(t, rsp.Status.ErrorCode, commonpb.ErrorCode_SUCCESS)
assert.Equal(t, rsp.SegmentIDs[0], int64(1000))
assert.Equal(t, len(rsp.SegmentIDs), 1)
})
t.Run("create index", func(t *testing.T) {
req := &milvuspb.CreateIndexRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_kCreateIndex,
MsgID: 112,
Timestamp: 112,
SourceID: 112,
},
DbName: "",
CollectionName: "testColl",
FieldName: "vector",
ExtraParams: nil,
}
rsp, err := cli.CreateIndex(req)
assert.Nil(t, err)
assert.Equal(t, rsp.ErrorCode, commonpb.ErrorCode_SUCCESS)
assert.Equal(t, 3, len(binlogPathArray))
assert.ElementsMatch(t, binlogPathArray, []string{"file1", "file2", "file3"})
req.FieldName = "no field"
rsp, err = cli.CreateIndex(req)
assert.Nil(t, err)
assert.NotEqual(t, rsp.ErrorCode, commonpb.ErrorCode_SUCCESS)
})
t.Run("describe segment", func(t *testing.T) {
coll, err := core.MetaTable.GetCollectionByName("testColl")
assert.Nil(t, err)
req := &milvuspb.DescribeSegmentRequest{
Base: &commonpb.MsgBase{
MsgType: 113, //TODO, describe segment request msg type
MsgID: 113,
Timestamp: 113,
SourceID: 113,
},
CollectionID: coll.ID,
SegmentID: 1000,
}
rsp, err := cli.DescribeSegment(req)
assert.Nil(t, err)
assert.Equal(t, rsp.Status.ErrorCode, commonpb.ErrorCode_SUCCESS)
t.Logf("index id = %d", rsp.IndexID)
})
t.Run("describe index", func(t *testing.T) {
req := &milvuspb.DescribeIndexRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_kDescribeIndex,
MsgID: 114,
Timestamp: 114,
SourceID: 114,
},
DbName: "",
CollectionName: "testColl",
FieldName: "vector",
IndexName: "",
}
rsp, err := cli.DescribeIndex(req)
assert.Nil(t, err)
assert.Equal(t, rsp.Status.ErrorCode, commonpb.ErrorCode_SUCCESS)
assert.Equal(t, len(rsp.IndexDescriptions), 1)
assert.Equal(t, rsp.IndexDescriptions[0].IndexName, cms.Params.DefaultIndexName)
})
t.Run("drop partition", func(t *testing.T) {
......
......@@ -13,15 +13,15 @@ type Client struct {
grpcClient querypb.QueryNodeClient
}
func (c *Client) Init() {
func (c *Client) Init() error {
panic("implement me")
}
func (c *Client) Start() {
func (c *Client) Start() error {
panic("implement me")
}
func (c *Client) Stop() {
func (c *Client) Stop() error {
panic("implement me")
}
......
......@@ -9,15 +9,15 @@ type Client struct {
grpcClient querypb.QueryServiceClient
}
func (c *Client) Init() {
func (c *Client) Init() error {
panic("implement me")
}
func (c *Client) Start() {
func (c *Client) Start() error {
panic("implement me")
}
func (c *Client) Stop() {
func (c *Client) Stop() error {
panic("implement me")
}
......
......@@ -13,15 +13,15 @@ type Server struct {
queryService queryServiceImpl.Interface
}
func (s *Server) Init() {
func (s *Server) Init() error {
panic("implement me")
}
func (s *Server) Start() {
func (s *Server) Start() error {
panic("implement me")
}
func (s *Server) Stop() {
func (s *Server) Stop() error {
panic("implement me")
}
......
......@@ -133,13 +133,13 @@ func (c *Client) GetIndexStates(indexIDs []UniqueID) (*indexpb.IndexStatesRespon
return response, err
}
func (c *Client) GetIndexFilePaths(indexID UniqueID) ([]string, error) {
func (c *Client) GetIndexFilePaths(indexIDs []UniqueID) ([][]string, error) {
if c.tryConnect() != nil {
panic("GetIndexFilePaths: failed to connect index builder")
}
ctx := context.TODO()
request := &indexpb.IndexFilePathRequest{
IndexID: indexID,
request := &indexpb.IndexFilePathsRequest{
IndexIDs: indexIDs,
}
response, err := c.client.GetIndexFilePaths(ctx, request)
......@@ -147,5 +147,15 @@ func (c *Client) GetIndexFilePaths(indexID UniqueID) ([]string, error) {
return nil, err
}
return response.IndexFilePaths, nil
var filePaths [][]string
for _, indexID := range indexIDs {
for _, filePathInfo := range response.FilePaths {
if indexID == filePathInfo.IndexID {
filePaths = append(filePaths, filePathInfo.IndexFilePaths)
break
}
}
}
return filePaths, nil
}
......@@ -74,16 +74,25 @@ func (b *Builder) GetIndexStates(ctx context.Context, request *indexpb.IndexStat
return ret, nil
}
func (b *Builder) GetIndexFilePaths(ctx context.Context, request *indexpb.IndexFilePathRequest) (*indexpb.IndexFilePathsResponse, error) {
func (b *Builder) GetIndexFilePaths(ctx context.Context, request *indexpb.IndexFilePathsRequest) (*indexpb.IndexFilePathsResponse, error) {
ret := &indexpb.IndexFilePathsResponse{
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_SUCCESS},
IndexID: request.IndexID,
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_SUCCESS},
}
filePaths, err := b.metaTable.GetIndexFilePaths(request.IndexID)
if err != nil {
ret.Status.ErrorCode = commonpb.ErrorCode_UNEXPECTED_ERROR
ret.Status.Reason = err.Error()
var filePathInfos []*indexpb.IndexFilePathInfo
for _, indexID := range request.IndexIDs {
filePathInfo := &indexpb.IndexFilePathInfo{
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_SUCCESS},
IndexID: indexID,
}
filePaths, err := b.metaTable.GetIndexFilePaths(indexID)
if err != nil {
filePathInfo.Status.ErrorCode = commonpb.ErrorCode_UNEXPECTED_ERROR
filePathInfo.Status.Reason = err.Error()
}
filePathInfo.IndexFilePaths = filePaths
filePathInfos = append(filePathInfos, filePathInfo)
}
ret.IndexFilePaths = filePaths
ret.FilePaths = filePathInfos
return ret, nil
}
......@@ -45,15 +45,15 @@ type IndexNode struct {
//serviceClient indexservice.Interface // method factory
}
func (i *IndexNode) Init() {
func (i *IndexNode) Init() error {
panic("implement me")
}
func (i *IndexNode) Start() {
func (i *IndexNode) Start() error {
panic("implement me")
}
func (i *IndexNode) Stop() {
func (i *IndexNode) Stop() error {
panic("implement me")
}
......
......@@ -124,8 +124,7 @@ func TestBuilder_GRPC(t *testing.T) {
assert.Equal(t, commonpb.IndexState_INPROGRESS, description.States[0].State)
assert.Equal(t, indexID, description.States[0].IndexID)
indexDataPaths, err := buildClient.GetIndexFilePaths(indexID)
indexDataPaths, err := buildClient.GetIndexFilePaths([]UniqueID{indexID})
assert.Nil(t, err)
assert.Nil(t, indexDataPaths)
assert.Nil(t, indexDataPaths[0])
}
......@@ -46,15 +46,15 @@ type IndexService struct {
type UniqueID = typeutil.UniqueID
type Timestamp = typeutil.Timestamp
func (i *IndexService) Init() {
func (i *IndexService) Init() error {
panic("implement me")
}
func (i *IndexService) Start() {
func (i *IndexService) Start() error {
panic("implement me")
}
func (i *IndexService) Stop() {
func (i *IndexService) Stop() error {
panic("implement me")
}
......@@ -149,7 +149,7 @@ func (i *IndexService) GetIndexStates(req *indexpb.IndexStatesRequest) (*indexpb
return ret, nil
}
func (i *IndexService) GetIndexFilePaths(req *indexpb.IndexFilePathRequest) (*indexpb.IndexFilePathsResponse, error) {
func (i *IndexService) GetIndexFilePaths(req *indexpb.IndexFilePathsRequest) (*indexpb.IndexFilePathsResponse, error) {
panic("implement me")
}
......
......@@ -13,6 +13,6 @@ type Interface interface {
RegisterNode(req *indexpb.RegisterNodeRequest) (*indexpb.RegisterNodeResponse, error)
BuildIndex(req *indexpb.BuildIndexRequest) (*indexpb.BuildIndexResponse, error)
GetIndexStates(req *indexpb.IndexStatesRequest) (*indexpb.IndexStatesResponse, error)
GetIndexFilePaths(req *indexpb.IndexFilePathRequest) (*indexpb.IndexFilePathsResponse, error)
GetIndexFilePaths(req *indexpb.IndexFilePathsRequest) (*indexpb.IndexFilePathsResponse, error)
NotifyBuildIndex(nty *indexpb.BuildIndexNotification) (*commonpb.Status, error)
}
......@@ -65,7 +65,7 @@ func (m *MockWriteNodeClient) GetInsertBinlogPaths(segmentID UniqueID) (map[Uniq
type BuildIndexClient interface {
BuildIndex(columnDataPaths []string, typeParams map[string]string, indexParams map[string]string) (UniqueID, error)
GetIndexStates(indexIDs []UniqueID) (*indexpb.IndexStatesResponse, error)
GetIndexFilePaths(indexID UniqueID) ([]string, error)
GetIndexFilePaths(indexID []UniqueID) ([][]string, error)
}
type MockBuildIndexClient struct {
......@@ -107,8 +107,8 @@ func (m *MockBuildIndexClient) GetIndexStates(indexIDs []UniqueID) (*indexpb.Ind
return ret, nil
}
func (m *MockBuildIndexClient) GetIndexFilePaths(indexID UniqueID) ([]string, error) {
return []string{"/binlog/index/file_1", "/binlog/index/file_2", "/binlog/index/file_3"}, nil
func (m *MockBuildIndexClient) GetIndexFilePaths(indexIDs []UniqueID) ([][]string, error) {
return [][]string{{"/binlog/index/file_1", "/binlog/index/file_2", "/binlog/index/file_3"}}, nil
}
type LoadIndexClient interface {
......
......@@ -116,10 +116,11 @@ func (scheduler *IndexBuildScheduler) describe() error {
}
if description.States[0].State == commonpb.IndexState_FINISHED {
log.Printf("build index for segment %d field %d is finished", indexBuildInfo.segmentID, indexBuildInfo.fieldID)
filePaths, err := scheduler.client.GetIndexFilePaths(indexID)
filesPaths, err := scheduler.client.GetIndexFilePaths([]UniqueID{indexID})
if err != nil {
return err
}
filePaths := filesPaths[0]
//TODO: remove fileName
var fieldName string
......
......@@ -14,6 +14,7 @@ import (
"github.com/zilliztech/milvus-distributed/internal/msgstream/pulsarms"
"github.com/zilliztech/milvus-distributed/internal/msgstream/util"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/datapb"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
"github.com/zilliztech/milvus-distributed/internal/proto/masterpb"
"github.com/zilliztech/milvus-distributed/internal/proto/milvuspb"
......@@ -134,6 +135,18 @@ type Core struct {
//setMsgStreams, send drop partition into dd channel
DdDropPartitionReq func(req *internalpb2.DropPartitionRequest) error
//setMsgStreams segment channel, receive segment info from data service, if master create segment
DataServiceSegmentChan chan *datapb.SegmentInfo
//TODO,get binlog file path from data service,
GetBinlogFilePathsFromDataServiceReq func(segID typeutil.UniqueID, fieldID typeutil.UniqueID) ([]string, error)
//TODO, call index builder's client to build index, return build id
BuildIndexReq func(binlog []string, typeParams []*commonpb.KeyValuePair, indexParams []*commonpb.KeyValuePair) (typeutil.UniqueID, error)
// put create index task into this chan
indexTaskQueue chan *CreateIndexTask
//dd request scheduler
ddReqQueue chan reqTask //dd request will be push into this chan
lastDdTimeStamp typeutil.Timestamp
......@@ -202,6 +215,18 @@ func (c *Core) checkInit() error {
if c.DdDropPartitionReq == nil {
return errors.Errorf("DdDropPartitionReq is nil")
}
if c.DataServiceSegmentChan == nil {
return errors.Errorf("DataServiceSegmentChan is nil")
}
if c.GetBinlogFilePathsFromDataServiceReq == nil {
return errors.Errorf("GetBinlogFilePathsFromDataServiceReq is nil")
}
if c.BuildIndexReq == nil {
return errors.Errorf("BuildIndexReq is nil")
}
if c.indexTaskQueue == nil {
return errors.Errorf("indexTaskQueue is nil")
}
log.Printf("master node id = %d\n", Params.NodeID)
return nil
}
......@@ -255,6 +280,47 @@ func (c *Core) startTimeTickLoop() {
}
}
//data service send segment info to master when create segment
func (c *Core) startDataServiceSegmentLoop() {
for {
select {
case <-c.ctx.Done():
log.Printf("close data service segment loop")
return
case seg, ok := <-c.DataServiceSegmentChan:
if !ok {
log.Printf("data service segment is closed, exit loop")
return
}
if seg == nil {
log.Printf("segment from data service is nill")
} else if err := c.MetaTable.AddSegment(seg); err != nil {
//what if master add segment failed, but data service success?
log.Printf("add segment info meta table failed ")
}
}
}
}
//create index loop
func (c *Core) startCreateIndexLoop() {
for {
select {
case <-c.ctx.Done():
log.Printf("close create index loop")
return
case t, ok := <-c.indexTaskQueue:
if !ok {
log.Printf("index task chan is close, exit loop")
return
}
if err := t.BuildIndex(); err != nil {
log.Printf("create index failed, error = %s", err.Error())
}
}
}
}
func (c *Core) setMsgStreams() error {
//proxy time tick stream,
proxyTimeTickStream := pulsarms.NewPulsarMsgStream(c.ctx, 1024)
......@@ -405,6 +471,7 @@ func (c *Core) setMsgStreams() error {
dataServiceStream.SetPulsarClient(Params.PulsarAddress)
dataServiceStream.CreatePulsarConsumers([]string{Params.DataServiceSegmentChannel}, Params.MsgChannelSubName, util.NewUnmarshalDispatcher(), 1024)
dataServiceStream.Start()
c.DataServiceSegmentChan = make(chan *datapb.SegmentInfo, 1024)
// receive segment info from msg stream
go func() {
......@@ -420,9 +487,7 @@ func (c *Core) setMsgStreams() error {
for _, segm := range segMsg.Msgs {
segInfoMsg, ok := segm.(*ms.SegmentInfoMsg)
if ok {
if err := c.MetaTable.AddSegment(segInfoMsg.Segment); err != nil {
log.Printf("create segment failed, segmentid = %d,colleciont id = %d, error = %s", segInfoMsg.Segment.SegmentID, segInfoMsg.Segment.CollectionID, err.Error())
}
c.DataServiceSegmentChan <- segInfoMsg.Segment
}
//TODO, if data node flush
}
......@@ -457,6 +522,7 @@ func (c *Core) Init(params *InitParams) error {
return
}
c.ddReqQueue = make(chan reqTask, 1024)
c.indexTaskQueue = make(chan *CreateIndexTask, 1024)
initError = c.setMsgStreams()
c.isInit.Store(true)
})
......@@ -474,6 +540,8 @@ func (c *Core) Start() error {
c.startOnce.Do(func() {
go c.startDdScheduler()
go c.startTimeTickLoop()
go c.startDataServiceSegmentLoop()
go c.startCreateIndexLoop()
c.stateCode.Store(internalpb2.StateCode_HEALTHY)
})
return nil
......@@ -760,24 +828,116 @@ func (c *Core) ShowPartitions(in *milvuspb.ShowPartitionRequest) (*milvuspb.Show
return t.Rsp, nil
}
//TODO
func (c *Core) CreateIndex(in *milvuspb.CreateIndexRequest) (*commonpb.Status, error) {
return nil, nil
t := &CreateIndexReqTask{
baseReqTask: baseReqTask{
cv: make(chan error),
core: c,
},
Req: in,
}
c.ddReqQueue <- t
err := t.WaitToFinish()
if err != nil {
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: "CreateIndex failed, error = " + err.Error(),
}, nil
}
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_SUCCESS,
Reason: "",
}, nil
}
//TODO
func (c *Core) DescribeIndex(in *milvuspb.DescribeIndexRequest) (*milvuspb.DescribeIndexResponse, error) {
return nil, nil
t := &DescribeIndexReqTask{
baseReqTask: baseReqTask{
cv: make(chan error),
core: c,
},
Req: in,
Rsp: &milvuspb.DescribeIndexResponse{
Status: nil,
IndexDescriptions: nil,
},
}
c.ddReqQueue <- t
err := t.WaitToFinish()
if err != nil {
return &milvuspb.DescribeIndexResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: "DescribeIndex failed, error = " + err.Error(),
},
IndexDescriptions: nil,
}, nil
}
t.Rsp.Status = &commonpb.Status{
ErrorCode: commonpb.ErrorCode_SUCCESS,
Reason: "",
}
return t.Rsp, nil
}
//TODO
func (c *Core) DescribeSegment(in *milvuspb.DescribeSegmentRequest) (*milvuspb.DescribeSegmentResponse, error) {
return nil, nil
t := &DescribeSegmentReqTask{
baseReqTask: baseReqTask{
cv: make(chan error),
core: c,
},
Req: in,
Rsp: &milvuspb.DescribeSegmentResponse{
Status: nil,
IndexID: 0,
},
}
c.ddReqQueue <- t
err := t.WaitToFinish()
if err != nil {
return &milvuspb.DescribeSegmentResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: "DescribeSegment failed, error = " + err.Error(),
},
IndexID: 0,
}, nil
}
t.Rsp.Status = &commonpb.Status{
ErrorCode: commonpb.ErrorCode_SUCCESS,
Reason: "",
}
return t.Rsp, nil
}
//TODO
func (c *Core) ShowSegments(in *milvuspb.ShowSegmentRequest) (*milvuspb.ShowSegmentResponse, error) {
return nil, nil
t := &ShowSegmentReqTask{
baseReqTask: baseReqTask{
cv: make(chan error),
core: c,
},
Req: in,
Rsp: &milvuspb.ShowSegmentResponse{
Status: nil,
SegmentIDs: nil,
},
}
c.ddReqQueue <- t
err := t.WaitToFinish()
if err != nil {
return &milvuspb.ShowSegmentResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: "ShowSegments failed, error: " + err.Error(),
},
SegmentIDs: nil,
}, nil
}
t.Rsp.Status = &commonpb.Status{
ErrorCode: commonpb.ErrorCode_SUCCESS,
Reason: "",
}
return t.Rsp, nil
}
func (c *Core) AllocTimestamp(in *masterpb.TsoRequest) (*masterpb.TsoResponse, error) {
......
......@@ -9,8 +9,10 @@ import (
"github.com/golang/protobuf/proto"
"github.com/zilliztech/milvus-distributed/internal/errors"
"github.com/zilliztech/milvus-distributed/internal/kv"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/datapb"
pb "github.com/zilliztech/milvus-distributed/internal/proto/etcdpb"
"github.com/zilliztech/milvus-distributed/internal/proto/schemapb"
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
)
......@@ -24,14 +26,16 @@ const (
)
type metaTable struct {
client kv.TxnBase // client of a reliable kv service, i.e. etcd client
tenantID2Meta map[typeutil.UniqueID]pb.TenantMeta // tenant id to tenant meta
proxyID2Meta map[typeutil.UniqueID]pb.ProxyMeta // proxy id to proxy meta
collID2Meta map[typeutil.UniqueID]pb.CollectionInfo // collection id to collection meta,
collName2ID map[string]typeutil.UniqueID // collection name to collection id
partitionID2Meta map[typeutil.UniqueID]pb.PartitionInfo //partition id -> partition meta
segID2IndexMeta map[typeutil.UniqueID]*map[typeutil.UniqueID]pb.SegmentIndexInfo // segment id -> index id -> segment index meta
indexID2Meta map[typeutil.UniqueID]pb.IndexInfo //index id ->index meta
client kv.TxnBase // client of a reliable kv service, i.e. etcd client
tenantID2Meta map[typeutil.UniqueID]pb.TenantMeta // tenant id to tenant meta
proxyID2Meta map[typeutil.UniqueID]pb.ProxyMeta // proxy id to proxy meta
collID2Meta map[typeutil.UniqueID]pb.CollectionInfo // collection id to collection meta,
collName2ID map[string]typeutil.UniqueID // collection name to collection id
partitionID2Meta map[typeutil.UniqueID]pb.PartitionInfo // partition id -> partition meta
segID2IndexMeta map[typeutil.UniqueID]*map[typeutil.UniqueID]pb.SegmentIndexInfo // segment id -> index id -> segment index meta
indexID2Meta map[typeutil.UniqueID]pb.IndexInfo // index id ->index meta
segID2CollID map[typeutil.UniqueID]typeutil.UniqueID // segment id -> collection id
partitionID2CollID map[typeutil.UniqueID]typeutil.UniqueID // partition id -> collection id
tenantLock sync.RWMutex
proxyLock sync.RWMutex
......@@ -61,6 +65,8 @@ func (mt *metaTable) reloadFromKV() error {
mt.partitionID2Meta = make(map[typeutil.UniqueID]pb.PartitionInfo)
mt.segID2IndexMeta = make(map[typeutil.UniqueID]*map[typeutil.UniqueID]pb.SegmentIndexInfo)
mt.indexID2Meta = make(map[typeutil.UniqueID]pb.IndexInfo)
mt.partitionID2CollID = make(map[typeutil.UniqueID]typeutil.UniqueID)
mt.segID2CollID = make(map[typeutil.UniqueID]typeutil.UniqueID)
_, values, err := mt.client.LoadWithPrefix(TenantMetaPrefix)
if err != nil {
......@@ -103,6 +109,9 @@ func (mt *metaTable) reloadFromKV() error {
}
mt.collID2Meta[collectionInfo.ID] = collectionInfo
mt.collName2ID[collectionInfo.Schema.Name] = collectionInfo.ID
for _, partID := range collectionInfo.PartitionIDs {
mt.partitionID2CollID[partID] = collectionInfo.ID
}
}
_, values, err = mt.client.LoadWithPrefix(PartitionMetaPrefix)
......@@ -115,7 +124,15 @@ func (mt *metaTable) reloadFromKV() error {
if err != nil {
return err
}
collID, ok := mt.partitionID2CollID[partitionInfo.PartitionID]
if !ok {
log.Printf("partition id %d not belong to any collection", partitionInfo.PartitionID)
continue
}
mt.partitionID2Meta[partitionInfo.PartitionID] = partitionInfo
for _, segID := range partitionInfo.SegmentIDs {
mt.segID2CollID[segID] = collID
}
}
_, values, err = mt.client.LoadWithPrefix(SegmentIndexMetaPrefix)
......@@ -169,6 +186,7 @@ func (mt *metaTable) AddCollection(coll *pb.CollectionInfo, part *pb.PartitionIn
mt.collID2Meta[coll.ID] = *coll
mt.collName2ID[coll.Schema.Name] = coll.ID
mt.partitionID2Meta[part.PartitionID] = *part
mt.partitionID2CollID[part.PartitionID] = coll.ID
k1 := path.Join(CollectionMetaPrefix, strconv.FormatInt(coll.ID, 10))
v1 := proto.MarshalTextString(coll)
......@@ -240,6 +258,17 @@ func (mt *metaTable) HasCollection(collID typeutil.UniqueID) bool {
return ok
}
func (mt *metaTable) GetCollectionByID(collectionID typeutil.UniqueID) (pb.CollectionInfo, error) {
mt.ddLock.RLock()
defer mt.ddLock.RUnlock()
col, ok := mt.collID2Meta[collectionID]
if !ok {
return pb.CollectionInfo{}, errors.Errorf("can't find collection id : %d", collectionID)
}
return col, nil
}
func (mt *metaTable) GetCollectionByName(collectionName string) (pb.CollectionInfo, error) {
mt.ddLock.RLock()
defer mt.ddLock.RUnlock()
......@@ -299,6 +328,7 @@ func (mt *metaTable) AddPartition(collID typeutil.UniqueID, partitionName string
coll.PartitionIDs = append(coll.PartitionIDs, partitionID)
mt.partitionID2Meta[partitionID] = partMeta
mt.collID2Meta[collID] = coll
mt.partitionID2CollID[partitionID] = collID
k1 := path.Join(CollectionMetaPrefix, strconv.FormatInt(coll.ID, 10))
v1 := proto.MarshalTextString(&coll)
......@@ -442,6 +472,7 @@ func (mt *metaTable) AddSegment(seg *datapb.SegmentInfo) error {
}
partMeta.SegmentIDs = append(partMeta.SegmentIDs, seg.SegmentID)
mt.partitionID2Meta[seg.PartitionID] = partMeta
mt.segID2CollID[seg.SegmentID] = seg.CollectionID
err := mt.client.Save(path.Join(PartitionMetaPrefix, strconv.FormatInt(seg.PartitionID, 10)), proto.MarshalTextString(&partMeta))
if err != nil {
......@@ -450,3 +481,202 @@ func (mt *metaTable) AddSegment(seg *datapb.SegmentInfo) error {
}
return nil
}
func (mt *metaTable) AddIndex(seg *pb.SegmentIndexInfo, idx *pb.IndexInfo) error {
mt.ddLock.Lock()
defer mt.ddLock.Unlock()
if seg.IndexID != idx.IndexID {
return errors.Errorf("index id in segment is %d, in index info is %d, not equal", seg.IndexID, idx.IndexID)
}
segIdxMap, ok := mt.segID2IndexMeta[seg.SegmentID]
if !ok {
idxMap := map[typeutil.UniqueID]pb.SegmentIndexInfo{seg.IndexID: *seg}
mt.segID2IndexMeta[seg.SegmentID] = &idxMap
} else {
_, ok := (*segIdxMap)[seg.IndexID]
if ok {
return errors.Errorf("index id = %d exist", seg.IndexID)
}
}
_, ok = mt.indexID2Meta[idx.IndexID]
if ok {
return errors.Errorf("index id = %d exist", idx.IndexID)
}
(*(mt.segID2IndexMeta[seg.SegmentID]))[seg.IndexID] = *seg
mt.indexID2Meta[idx.IndexID] = *idx
k1 := path.Join(SegmentIndexMetaPrefix, strconv.FormatInt(seg.SegmentID, 10), strconv.FormatInt(seg.IndexID, 10))
v1 := proto.MarshalTextString(seg)
k2 := path.Join(IndexMetaPrefix, strconv.FormatInt(idx.IndexID, 10))
v2 := proto.MarshalTextString(idx)
meta := map[string]string{k1: v1, k2: v2}
err := mt.client.MultiSave(meta)
if err != nil {
_ = mt.reloadFromKV()
return err
}
return nil
}
func (mt *metaTable) GetSegmentIndexInfoByID(segID typeutil.UniqueID, filedID int64, idxName string) (pb.SegmentIndexInfo, error) {
mt.ddLock.RLock()
defer mt.ddLock.RUnlock()
segIdxMap, ok := mt.segID2IndexMeta[segID]
if !ok {
return pb.SegmentIndexInfo{}, errors.Errorf("segment id %d not has any index", segID)
}
if len(*segIdxMap) == 0 {
return pb.SegmentIndexInfo{}, errors.Errorf("segment id %d not has any index", segID)
}
if filedID == -1 && idxName == "" { // return any index
for _, seg := range *segIdxMap {
return seg, nil
}
} else {
for idxID, seg := range *segIdxMap {
idxMeta, ok := mt.indexID2Meta[idxID]
if ok {
if idxMeta.IndexName != idxName {
continue
}
if seg.FieldID != filedID {
continue
}
return seg, nil
}
}
}
return pb.SegmentIndexInfo{}, errors.Errorf("can't find index name = %s on segment = %d, with filed id = %d", idxName, segID, filedID)
}
func (mt *metaTable) GetFieldSchema(collName string, fieldName string) (schemapb.FieldSchema, error) {
mt.ddLock.RLock()
defer mt.ddLock.RUnlock()
collID, ok := mt.collName2ID[collName]
if !ok {
return schemapb.FieldSchema{}, errors.Errorf("collection %s not found", collName)
}
collMeta, ok := mt.collID2Meta[collID]
if !ok {
return schemapb.FieldSchema{}, errors.Errorf("collection %s not found", collName)
}
for _, field := range collMeta.Schema.Fields {
if field.Name == fieldName {
return *field, nil
}
}
return schemapb.FieldSchema{}, errors.Errorf("collection %s doesn't have filed %s", collName, fieldName)
}
//return true/false
func (mt *metaTable) IsSegmentIndexed(segID typeutil.UniqueID, fieldSchema *schemapb.FieldSchema, indexParams []*commonpb.KeyValuePair) bool {
mt.ddLock.RLock()
defer mt.ddLock.RUnlock()
segIdx, ok := mt.segID2IndexMeta[segID]
if !ok {
return false
}
exist := false
for idxID, meta := range *segIdx {
if meta.FieldID != fieldSchema.FieldID {
continue
}
idxMeta, ok := mt.indexID2Meta[idxID]
if !ok {
continue
}
if EqualKeyPairArray(indexParams, idxMeta.IndexParams) {
exist = true
break
}
}
return exist
}
// return segment ids, type params, error
func (mt *metaTable) GetNotIndexedSegments(collName string, fieldName string, indexParams []*commonpb.KeyValuePair) ([]typeutil.UniqueID, schemapb.FieldSchema, error) {
mt.ddLock.RLock()
defer mt.ddLock.RUnlock()
collID, ok := mt.collName2ID[collName]
if !ok {
return nil, schemapb.FieldSchema{}, errors.Errorf("collection %s not found", collName)
}
collMeta, ok := mt.collID2Meta[collID]
if !ok {
return nil, schemapb.FieldSchema{}, errors.Errorf("collection %s not found", collName)
}
fieldSchema, err := mt.GetFieldSchema(collName, fieldName)
if err != nil {
return nil, fieldSchema, err
}
rstID := make([]typeutil.UniqueID, 0, 16)
for _, partID := range collMeta.PartitionIDs {
partMeta, ok := mt.partitionID2Meta[partID]
if ok {
for _, segID := range partMeta.SegmentIDs {
if exist := mt.IsSegmentIndexed(segID, &fieldSchema, indexParams); !exist {
rstID = append(rstID, segID)
}
}
}
}
return rstID, fieldSchema, nil
}
func (mt *metaTable) GetIndexByName(collName string, fieldName string, indexName string) ([]pb.IndexInfo, error) {
mt.ddLock.RLock()
mt.ddLock.RUnlock()
collID, ok := mt.collName2ID[collName]
if !ok {
return nil, errors.Errorf("collection %s not found", collName)
}
collMeta, ok := mt.collID2Meta[collID]
if !ok {
return nil, errors.Errorf("collection %s not found", collName)
}
fileSchema, err := mt.GetFieldSchema(collName, fieldName)
if err != nil {
return nil, err
}
rstIndex := make([]*pb.IndexInfo, 0, 16)
for _, partID := range collMeta.PartitionIDs {
partMeta, ok := mt.partitionID2Meta[partID]
if ok {
for _, segID := range partMeta.SegmentIDs {
idxMeta, ok := mt.segID2IndexMeta[segID]
if !ok {
continue
}
for idxID, segMeta := range *idxMeta {
if segMeta.FieldID != fileSchema.FieldID {
continue
}
idxMeta, ok := mt.indexID2Meta[idxID]
if !ok {
continue
}
if indexName == "" {
rstIndex = append(rstIndex, &idxMeta)
} else if idxMeta.IndexName == indexName {
rstIndex = append(rstIndex, &idxMeta)
}
}
}
}
}
rst := make([]pb.IndexInfo, 0, len(rstIndex))
for i := range rstIndex {
rst = append(rst, *rstIndex[i])
}
return rst, nil
}
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment