diff --git a/cmd/distributed/components/master_service.go b/cmd/distributed/components/master_service.go index b13d154190ad7ce63059c499a9217477d2d35922..2bdb31f87e97418dc44b09ea8a47dd8742d99bd1 100644 --- a/cmd/distributed/components/master_service.go +++ b/cmd/distributed/components/master_service.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "log" + "time" ds "github.com/zilliztech/milvus-distributed/internal/dataservice" dsc "github.com/zilliztech/milvus-distributed/internal/distributed/dataservice" @@ -11,10 +12,12 @@ import ( msc "github.com/zilliztech/milvus-distributed/internal/distributed/masterservice" ps "github.com/zilliztech/milvus-distributed/internal/distributed/proxyservice" psc "github.com/zilliztech/milvus-distributed/internal/distributed/proxyservice/client" + qsc "github.com/zilliztech/milvus-distributed/internal/distributed/queryservice/client" is "github.com/zilliztech/milvus-distributed/internal/indexservice" ms "github.com/zilliztech/milvus-distributed/internal/masterservice" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" + qs "github.com/zilliztech/milvus-distributed/internal/queryservice" ) type MasterService struct { @@ -24,6 +27,7 @@ type MasterService struct { proxyService *psc.Client dataService *dsc.Client indexService *isc.Client + queryService *qsc.Client } func NewMasterService(ctx context.Context) (*MasterService, error) { @@ -103,6 +107,18 @@ func NewMasterService(ctx context.Context) (*MasterService, error) { if err = svr.SetIndexService(indexService); err != nil { return nil, err } + + qs.Params.Init() + log.Printf("query service address = %s:%d", qs.Params.Address, qs.Params.Port) + queryService, err := qsc.NewClient(fmt.Sprintf("%s:%d", qs.Params.Address, qs.Params.Port), time.Duration(ms.Params.Timeout)*time.Second) + if err != nil { + return nil, err + } + + if err = svr.SetQueryService(queryService); err != nil { + return nil, err + } + return &MasterService{ ctx: ctx, svr: svr, @@ -110,6 +126,7 @@ func NewMasterService(ctx context.Context) (*MasterService, error) { proxyService: proxyService, dataService: dataService, indexService: indexService, + queryService: queryService, }, nil } @@ -135,6 +152,9 @@ func (m *MasterService) Stop() error { if m.dataService != nil { _ = m.dataService.Stop() } + if m.queryService != nil { + _ = m.queryService.Stop() + } if m.svr != nil { return m.svr.Stop() } diff --git a/internal/distributed/masterservice/masterservice_test.go b/internal/distributed/masterservice/masterservice_test.go index fca7ce4d0235c857ab6bf591583dad330d81445d..0a8e7549553b948739414fd81ed4754ca4613186 100644 --- a/internal/distributed/masterservice/masterservice_test.go +++ b/internal/distributed/masterservice/masterservice_test.go @@ -111,6 +111,10 @@ func TestGrpcService(t *testing.T) { return nil } + core.ReleaseCollection = func(ts typeutil.Timestamp, dbID typeutil.UniqueID, collectionID typeutil.UniqueID) error { + return nil + } + err = svr.Start() assert.Nil(t, err) diff --git a/internal/distributed/masterservice/server.go b/internal/distributed/masterservice/server.go index 1462194328dcb7a9fce15d98449f5973f1e116f4..ebd1b87d2a73b0026acd72e4b248e3047d8bbd1d 100644 --- a/internal/distributed/masterservice/server.go +++ b/internal/distributed/masterservice/server.go @@ -97,6 +97,14 @@ func (s *GrpcServer) SetIndexService(p cms.IndexServiceInterface) error { return c.SetIndexService(p) } +func (s *GrpcServer) SetQueryService(q cms.QueryServiceInterface) error { + c, ok := s.core.(*cms.Core) + if !ok { + return errors.Errorf("set query service failed") + } + return c.SetQueryService(q) +} + func (s *GrpcServer) GetComponentStatesRPC(ctx context.Context, empty *commonpb.Empty) (*internalpb2.ComponentStates, error) { return s.core.GetComponentStates() } diff --git a/internal/masterservice/master_service.go b/internal/masterservice/master_service.go index be018471e79f3fdc91acc34e30a56f39e0ef5784..2862e77df53800a69faccd79ded80ff0727cb734 100644 --- a/internal/masterservice/master_service.go +++ b/internal/masterservice/master_service.go @@ -20,6 +20,7 @@ import ( "github.com/zilliztech/milvus-distributed/internal/proto/masterpb" "github.com/zilliztech/milvus-distributed/internal/proto/milvuspb" "github.com/zilliztech/milvus-distributed/internal/proto/proxypb" + "github.com/zilliztech/milvus-distributed/internal/proto/querypb" "github.com/zilliztech/milvus-distributed/internal/util/tsoutil" "github.com/zilliztech/milvus-distributed/internal/util/typeutil" "go.etcd.io/etcd/clientv3" @@ -47,6 +48,10 @@ type IndexServiceInterface interface { BuildIndex(req *indexpb.BuildIndexRequest) (*indexpb.BuildIndexResponse, error) } +type QueryServiceInterface interface { + ReleaseCollection(req *querypb.ReleaseCollectionRequest) (*commonpb.Status, error) +} + type Interface interface { //service Init() error @@ -147,15 +152,18 @@ type Core struct { //setMsgStreams ,if segment flush completed, data node would put segment id into msg stream DataNodeSegmentFlushCompletedChan chan typeutil.UniqueID - //TODO,get binlog file path from data service, + //get binlog file path from data service, GetBinlogFilePathsFromDataServiceReq func(segID typeutil.UniqueID, fieldID typeutil.UniqueID) ([]string, error) - //TODO, call index builder's client to build index, return build id + //call index builder's client to build index, return build id BuildIndexReq func(binlog []string, typeParams []*commonpb.KeyValuePair, indexParams []*commonpb.KeyValuePair, indexID typeutil.UniqueID, indexName string) (typeutil.UniqueID, error) - //TODO, proxy service interface, notify proxy service to drop collection + //proxy service interface, notify proxy service to drop collection InvalidateCollectionMetaCache func(ts typeutil.Timestamp, dbName string, collectionName string) error + //query service interface, notify query service to release collection + ReleaseCollection func(ts typeutil.Timestamp, dbID typeutil.UniqueID, collectionID typeutil.UniqueID) error + // put create index task into this chan indexTaskQueue chan *CreateIndexTask @@ -245,6 +253,10 @@ func (c *Core) checkInit() error { if c.DataNodeSegmentFlushCompletedChan == nil { return errors.Errorf("DataNodeSegmentFlushCompletedChan is nil") } + if c.ReleaseCollection == nil { + return errors.Errorf("ReleaseCollection is nil") + } + log.Printf("master node id = %d", Params.NodeID) log.Printf("master dd channel name = %s", Params.DdChannel) log.Printf("master time ticke channel name = %s", Params.TimeTickChannel) @@ -691,6 +703,30 @@ func (c *Core) SetIndexService(s IndexServiceInterface) error { return nil } +func (c *Core) SetQueryService(s QueryServiceInterface) error { + c.ReleaseCollection = func(ts typeutil.Timestamp, dbID typeutil.UniqueID, collectionID typeutil.UniqueID) error { + req := &querypb.ReleaseCollectionRequest{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_kReleaseCollection, + MsgID: 0, //TODO, msg ID + Timestamp: ts, + SourceID: int64(Params.NodeID), + }, + DbID: dbID, + CollectionID: collectionID, + } + rsp, err := s.ReleaseCollection(req) + if err != nil { + return err + } + if rsp.ErrorCode != commonpb.ErrorCode_SUCCESS { + return errors.Errorf("ReleaseCollection from query service failed, error = %s", rsp.Reason) + } + return nil + } + return nil +} + func (c *Core) Init() error { var initError error = nil c.initOnce.Do(func() { diff --git a/internal/masterservice/master_service_test.go b/internal/masterservice/master_service_test.go index 0b42e4020cfa04003d015d1041daa60966284d54..de1bebbae80df0c7a00dd6f27652032fed684fb8 100644 --- a/internal/masterservice/master_service_test.go +++ b/internal/masterservice/master_service_test.go @@ -18,6 +18,7 @@ import ( "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" "github.com/zilliztech/milvus-distributed/internal/proto/milvuspb" "github.com/zilliztech/milvus-distributed/internal/proto/proxypb" + "github.com/zilliztech/milvus-distributed/internal/proto/querypb" "github.com/zilliztech/milvus-distributed/internal/proto/schemapb" "github.com/zilliztech/milvus-distributed/internal/util/typeutil" ) @@ -79,6 +80,21 @@ func (d *dataMock) GetInsertBinlogPaths(req *datapb.InsertBinlogPathRequest) (*d return rst, nil } +type queryMock struct { + collID []typeutil.UniqueID + mutex sync.Mutex +} + +func (q *queryMock) ReleaseCollection(req *querypb.ReleaseCollectionRequest) (*commonpb.Status, error) { + q.mutex.Lock() + defer q.mutex.Unlock() + q.collID = append(q.collID, req.CollectionID) + return &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_SUCCESS, + Reason: "", + }, nil +} + func (d *dataMock) GetSegmentInfoChannel() (*milvuspb.StringResponse, error) { return &milvuspb.StringResponse{ Status: &commonpb.Status{ @@ -163,6 +179,13 @@ func TestMasterService(t *testing.T) { err = core.SetIndexService(im) assert.Nil(t, err) + qm := &queryMock{ + collID: nil, + mutex: sync.Mutex{}, + } + err = core.SetQueryService(qm) + assert.Nil(t, err) + err = core.Init() assert.Nil(t, err) @@ -768,6 +791,12 @@ func TestMasterService(t *testing.T) { assert.Equal(t, len(collArray), 1) assert.Equal(t, collArray[0], "testColl") + time.Sleep(time.Millisecond * 100) + qm.mutex.Lock() + assert.Equal(t, len(qm.collID), 1) + assert.Equal(t, qm.collID[0], collMeta.ID) + qm.mutex.Unlock() + req = &milvuspb.DropCollectionRequest{ Base: &commonpb.MsgBase{ MsgType: commonpb.MsgType_kDropCollection, diff --git a/internal/masterservice/task.go b/internal/masterservice/task.go index abfa425b3ffd8bfd3ea22a65574c2116e6cbc0b8..70a8a7b6462fa47af088b7ecdb7c4ea7f6df68ae 100644 --- a/internal/masterservice/task.go +++ b/internal/masterservice/task.go @@ -1,6 +1,8 @@ package masterservice import ( + "log" + "github.com/golang/protobuf/proto" "github.com/zilliztech/milvus-distributed/internal/errors" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" @@ -195,6 +197,7 @@ func (t *DropCollectionReqTask) Execute() error { if err = t.core.InvalidateCollectionMetaCache(t.Req.Base.Timestamp, t.Req.DbName, t.Req.CollectionName); err != nil { return err } + err = t.core.MetaTable.DeleteCollection(collMeta.ID) if err != nil { return err @@ -214,6 +217,14 @@ func (t *DropCollectionReqTask) Execute() error { if err != nil { return err } + + //notify query service to release collection + go func() { + if err = t.core.ReleaseCollection(t.Req.Base.Timestamp, 0, collMeta.ID); err != nil { + log.Printf("%s", err.Error()) + } + }() + return nil } diff --git a/internal/proxyservice/impl.go b/internal/proxyservice/impl.go index 1543b0eeed6733a1ba09e9aeaea95d737383a688..379694f3af853fba59e739984caa8a34d448c11f 100644 --- a/internal/proxyservice/impl.go +++ b/internal/proxyservice/impl.go @@ -121,7 +121,7 @@ func (s *ServiceImpl) Init() error { "proxyservicesub") // TODO: add config log.Println("create node time tick consumer channel: ", Params.NodeTimeTickChannel) - ttBarrier := newSoftTimeTickBarrier(s.ctx, nodeTimeTickMsgStream, []UniqueID{0}, 10) + ttBarrier := newSoftTimeTickBarrier(s.ctx, nodeTimeTickMsgStream, []UniqueID{1}, 10) log.Println("create soft time tick barrier ...") s.tick = newTimeTick(s.ctx, ttBarrier, serviceTimeTickMsgStream, insertTickMsgStream) log.Println("create time tick ...")