Skip to content
Snippets Groups Projects
Commit 05918f55 authored by Cai Zhang 【张财】's avatar Cai Zhang 【张财】 Committed by yefu.chen
Browse files

Impl CreateIndex interface for proxy server


Signed-off-by: default avatarcai.zhang <cai.zhang@zilliz.com>
parent 9d5c9374
No related branches found
No related tags found
No related merge requests found
...@@ -23,6 +23,11 @@ enum MsgType { ...@@ -23,6 +23,11 @@ enum MsgType {
kDescribePartition = 203; kDescribePartition = 203;
kShowPartitions = 204; kShowPartitions = 204;
/* Definition Requests: Index */
kCreateIndex = 300;
kDescribeIndex = 301;
kDescribeIndexProgress = 302;
/* Manipulation Requests */ /* Manipulation Requests */
kInsert = 400; kInsert = 400;
kDelete = 401; kDelete = 401;
......
This diff is collapsed.
...@@ -595,7 +595,48 @@ func (p *Proxy) ShowPartitions(ctx context.Context, req *servicepb.CollectionNam ...@@ -595,7 +595,48 @@ func (p *Proxy) ShowPartitions(ctx context.Context, req *servicepb.CollectionNam
} }
func (p *Proxy) CreateIndex(ctx context.Context, indexParam *servicepb.IndexParam) (*commonpb.Status, error) { func (p *Proxy) CreateIndex(ctx context.Context, indexParam *servicepb.IndexParam) (*commonpb.Status, error) {
return nil, nil log.Println("create index: ", indexParam.IndexName)
cit := &CreateIndexTask{
Condition: NewTaskCondition(ctx),
CreateIndexRequest: internalpb.CreateIndexRequest{
MsgType: internalpb.MsgType_kCreateIndex,
CollectionName: indexParam.CollectionName,
FieldName: indexParam.FieldName,
IndexName: indexParam.IndexName,
ExtraParams: indexParam.ExtraParams,
},
masterClient: p.masterClient,
}
var cancel func()
cit.ctx, cancel = context.WithTimeout(ctx, reqTimeoutInterval)
defer cancel()
fn := func() error {
select {
case <-ctx.Done():
return errors.New("create index timeout")
default:
return p.sched.DdQueue.Enqueue(cit)
}
}
err := fn()
if err != nil {
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: err.Error(),
}, nil
}
err = cit.WaitToFinish()
if err != nil {
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: err.Error(),
}, nil
}
return cit.result, nil
} }
func (p *Proxy) DescribeIndex(context.Context, *servicepb.IndexParam) (*servicepb.DescribeIndexResponse, error) { func (p *Proxy) DescribeIndex(context.Context, *servicepb.IndexParam) (*servicepb.DescribeIndexResponse, error) {
......
...@@ -962,3 +962,168 @@ func (spt *ShowPartitionsTask) Execute() (err error) { ...@@ -962,3 +962,168 @@ func (spt *ShowPartitionsTask) Execute() (err error) {
func (spt *ShowPartitionsTask) PostExecute() error { func (spt *ShowPartitionsTask) PostExecute() error {
return nil return nil
} }
type CreateIndexTask struct {
Condition
internalpb.CreateIndexRequest
masterClient masterpb.MasterClient
result *commonpb.Status
ctx context.Context
}
func (cit *CreateIndexTask) ID() UniqueID {
return cit.ReqID
}
func (cit *CreateIndexTask) SetID(uid UniqueID) {
cit.ReqID = uid
}
func (cit *CreateIndexTask) Type() internalpb.MsgType {
return cit.MsgType
}
func (cit *CreateIndexTask) BeginTs() Timestamp {
return cit.Timestamp
}
func (cit *CreateIndexTask) EndTs() Timestamp {
return cit.Timestamp
}
func (cit *CreateIndexTask) SetTs(ts Timestamp) {
cit.Timestamp = ts
}
func (cit *CreateIndexTask) PreExecute() error {
collName, fieldName := cit.CollectionName, cit.FieldName
if err := ValidateCollectionName(collName); err != nil {
return err
}
if err := ValidateFieldName(fieldName); err != nil {
return err
}
return nil
}
func (cit *CreateIndexTask) Execute() (err error) {
cit.result, err = cit.masterClient.CreateIndex(cit.ctx, &cit.CreateIndexRequest)
return err
}
func (cit *CreateIndexTask) PostExecute() error {
return nil
}
type DescribeIndexTask struct {
Condition
internalpb.DescribeIndexRequest
masterClient masterpb.MasterClient
result *servicepb.DescribeIndexResponse
ctx context.Context
}
func (dit *DescribeIndexTask) ID() UniqueID {
return dit.ReqID
}
func (dit *DescribeIndexTask) SetID(uid UniqueID) {
dit.ReqID = uid
}
func (dit *DescribeIndexTask) Type() internalpb.MsgType {
return dit.MsgType
}
func (dit *DescribeIndexTask) BeginTs() Timestamp {
return dit.Timestamp
}
func (dit *DescribeIndexTask) EndTs() Timestamp {
return dit.Timestamp
}
func (dit *DescribeIndexTask) SetTs(ts Timestamp) {
dit.Timestamp = ts
}
func (dit *DescribeIndexTask) PreExecute() error {
collName, fieldName := dit.CollectionName, dit.FieldName
if err := ValidateCollectionName(collName); err != nil {
return err
}
if err := ValidateFieldName(fieldName); err != nil {
return err
}
return nil
}
func (dit *DescribeIndexTask) Execute() (err error) {
dit.result, err = dit.masterClient.DescribeIndex(dit.ctx, &dit.DescribeIndexRequest)
return err
}
func (dit *DescribeIndexTask) PostExecute() error {
return nil
}
type DescribeIndexProgressTask struct {
Condition
internalpb.DescribeIndexProgressRequest
masterClient masterpb.MasterClient
result *servicepb.BoolResponse
ctx context.Context
}
func (dipt *DescribeIndexProgressTask) ID() UniqueID {
return dipt.ReqID
}
func (dipt *DescribeIndexProgressTask) SetID(uid UniqueID) {
dipt.ReqID = uid
}
func (dipt *DescribeIndexProgressTask) Type() internalpb.MsgType {
return dipt.MsgType
}
func (dipt *DescribeIndexProgressTask) BeginTs() Timestamp {
return dipt.Timestamp
}
func (dipt *DescribeIndexProgressTask) EndTs() Timestamp {
return dipt.Timestamp
}
func (dipt *DescribeIndexProgressTask) SetTs(ts Timestamp) {
dipt.Timestamp = ts
}
func (dipt *DescribeIndexProgressTask) PreExecute() error {
collName, fieldName := dipt.CollectionName, dipt.FieldName
if err := ValidateCollectionName(collName); err != nil {
return err
}
if err := ValidateFieldName(fieldName); err != nil {
return err
}
return nil
}
func (dipt *DescribeIndexProgressTask) Execute() (err error) {
dipt.result, err = dipt.masterClient.DescribeIndexProgress(dipt.ctx, &dipt.DescribeIndexProgressRequest)
return err
}
func (dipt *DescribeIndexProgressTask) PostExecute() error {
return nil
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment