diff --git a/internal/dataservice/server.go b/internal/dataservice/server.go index 535d38408d4d131958c55f871b91a80b82bd044c..397283a8999f12d741479871db61908845ded76a 100644 --- a/internal/dataservice/server.go +++ b/internal/dataservice/server.go @@ -50,7 +50,6 @@ type DataService interface { GetPartitionStatistics(req *datapb.PartitionStatsRequest) (*datapb.PartitionStatsResponse, error) GetComponentStates() (*internalpb2.ComponentStates, error) GetCount(req *datapb.CollectionCountRequest) (*datapb.CollectionCountResponse, error) - GetSegmentInfo(req *datapb.SegmentInfoRequest) (*datapb.SegmentInfoResponse, error) } type MasterClient interface { @@ -729,10 +728,6 @@ func (s *Server) GetCount(req *datapb.CollectionCountRequest) (*datapb.Collectio ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, }, } - if !s.checkStateIsHealthy() { - resp.Status.Reason = "data service is not healthy" - return resp, nil - } nums, err := s.meta.GetNumRowsOfCollection(req.CollectionID) if err != nil { resp.Status.Reason = err.Error() @@ -742,27 +737,3 @@ func (s *Server) GetCount(req *datapb.CollectionCountRequest) (*datapb.Collectio resp.Status.ErrorCode = commonpb.ErrorCode_SUCCESS return resp, nil } - -func (s *Server) GetSegmentInfo(req *datapb.SegmentInfoRequest) (*datapb.SegmentInfoResponse, error) { - resp := &datapb.SegmentInfoResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, - }, - } - if !s.checkStateIsHealthy() { - resp.Status.Reason = "data service is not healthy" - return resp, nil - } - infos := make([]*datapb.SegmentInfo, len(req.SegmentIDs)) - for i, id := range req.SegmentIDs { - segmentInfo, err := s.meta.GetSegment(id) - if err != nil { - resp.Status.Reason = err.Error() - return resp, nil - } - infos[i] = segmentInfo - } - resp.Status.ErrorCode = commonpb.ErrorCode_SUCCESS - resp.Infos = infos - return resp, nil -} diff --git a/internal/distributed/dataservice/client.go b/internal/distributed/dataservice/client.go index 4507502ef070ae4871424879158f1646bbc55073..c2b886ecf92f0dc69aab7fbc5396f00bee6b348f 100644 --- a/internal/distributed/dataservice/client.go +++ b/internal/distributed/dataservice/client.go @@ -137,7 +137,3 @@ func (c *Client) GetSegmentInfoChannel() (string, error) { func (c *Client) GetCount(req *datapb.CollectionCountRequest) (*datapb.CollectionCountResponse, error) { return c.grpcClient.GetCount(context.Background(), req) } - -func (c *Client) GetSegmentInfo(req *datapb.SegmentInfoRequest) (*datapb.SegmentInfoResponse, error) { - return c.grpcClient.GetSegmentInfo(context.Background(), req) -} diff --git a/internal/distributed/dataservice/grpc_service.go b/internal/distributed/dataservice/grpc_service.go index ff39512820b320dc86104938b5b721757774716f..413de1c21ef6d4f69fce20c53fbe242638d3749e 100644 --- a/internal/distributed/dataservice/grpc_service.go +++ b/internal/distributed/dataservice/grpc_service.go @@ -25,7 +25,7 @@ type Service struct { } func (s *Service) GetSegmentInfo(ctx context.Context, request *datapb.SegmentInfoRequest) (*datapb.SegmentInfoResponse, error) { - return s.server.GetSegmentInfo(request) + panic("implement me") } func NewGrpcService(ctx context.Context) *Service { diff --git a/internal/distributed/proxynode/service.go b/internal/distributed/proxynode/service.go index 9c325d1dc08708ea0bb727e2cf7285aa469c30e3..b3c3611245c6b3ac2caa20c19343eb35b33671e1 100644 --- a/internal/distributed/proxynode/service.go +++ b/internal/distributed/proxynode/service.go @@ -322,5 +322,5 @@ func (s *Server) GetDdChannel(ctx context.Context, request *commonpb.Empty) (*mi } func (s *Server) GetPersistentSegmentInfo(ctx context.Context, request *milvuspb.PersistentSegmentInfoRequest) (*milvuspb.PersistentSegmentInfoResponse, error) { - return s.impl.GetPersistentSegmentInfo(request) + panic("implement me") } diff --git a/internal/masterservice/master_service_test.go b/internal/masterservice/master_service_test.go index 9c0a9410a434419d4d3576851100ca74e32b6418..a5584e64389f3372fef9dbed8bcd1ecedd9138b5 100644 --- a/internal/masterservice/master_service_test.go +++ b/internal/masterservice/master_service_test.go @@ -249,12 +249,28 @@ func TestMasterService(t *testing.T) { msg, ok := <-ddStream.Chan() assert.True(t, ok) - assert.Equal(t, len(msg.Msgs), 1) + assert.True(t, len(msg.Msgs) == 2 || len(msg.Msgs) == 1) + createMsg, ok := (msg.Msgs[0]).(*ms.CreateCollectionMsg) assert.True(t, ok) createMeta, err := core.MetaTable.GetCollectionByName("testColl") assert.Nil(t, err) assert.Equal(t, createMsg.CollectionID, createMeta.ID) + assert.Equal(t, len(createMeta.PartitionIDs), 1) + + if len(msg.Msgs) == 2 { + createPart, ok := (msg.Msgs[1]).(*ms.CreatePartitionMsg) + assert.True(t, ok) + assert.Equal(t, createPart.CollectionName, "testColl") + assert.Equal(t, createPart.PartitionID, createMeta.PartitionIDs[0]) + } else { + msg, ok = <-ddStream.Chan() + assert.True(t, ok) + createPart, ok := (msg.Msgs[0]).(*ms.CreatePartitionMsg) + assert.True(t, ok) + assert.Equal(t, createPart.CollectionName, "testColl") + assert.Equal(t, createPart.PartitionID, createMeta.PartitionIDs[0]) + } req.Base.MsgID = 101 req.Base.Timestamp = 101 diff --git a/internal/masterservice/task.go b/internal/masterservice/task.go index 2d6648d4f1be95c79437bf95e08597c77c572c43..abfa425b3ffd8bfd3ea22a65574c2116e6cbc0b8 100644 --- a/internal/masterservice/task.go +++ b/internal/masterservice/task.go @@ -147,6 +147,26 @@ func (t *CreateCollectionReqTask) Execute() error { return err } + ddPart := internalpb2.CreatePartitionRequest{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_kCreatePartition, + MsgID: t.Req.Base.MsgID, //TODO, msg id + Timestamp: t.Req.Base.Timestamp + 1, + SourceID: t.Req.Base.SourceID, + }, + DbName: t.Req.DbName, + CollectionName: t.Req.CollectionName, + PartitionName: Params.DefaultPartitionName, + DbID: 0, //TODO, not used + CollectionID: collMeta.ID, + PartitionID: partMeta.PartitionID, + } + + err = t.core.DdCreatePartitionReq(&ddPart) + if err != nil { + return err + } + return nil } diff --git a/internal/msgstream/unmarshal.go b/internal/msgstream/unmarshal.go index 9e5cee15d74be5e26c542e520af72e5f9ae0614e..ff86fcc6f2257a9dac4663364757066fa19bc845 100644 --- a/internal/msgstream/unmarshal.go +++ b/internal/msgstream/unmarshal.go @@ -51,6 +51,7 @@ func (pudf *ProtoUDFactory) NewUnmarshalDispatcher() *ProtoUnmarshalDispatcher { segmentInfoMsg := SegmentInfoMsg{} flushCompletedMsg := FlushCompletedMsg{} queryNodeSegStatsMsg := QueryNodeStatsMsg{} + segmentStatisticsMsg := SegmentStatisticsMsg{} p := &ProtoUnmarshalDispatcher{} p.TempMap = make(map[commonpb.MsgType]UnmarshalFunc) @@ -68,6 +69,7 @@ func (pudf *ProtoUDFactory) NewUnmarshalDispatcher() *ProtoUnmarshalDispatcher { p.TempMap[commonpb.MsgType_kFlush] = flushMsg.Unmarshal p.TempMap[commonpb.MsgType_kSegmentInfo] = segmentInfoMsg.Unmarshal p.TempMap[commonpb.MsgType_kSegmentFlushDone] = flushCompletedMsg.Unmarshal + p.TempMap[commonpb.MsgType_kSegmentStatistics] = segmentStatisticsMsg.Unmarshal return p } diff --git a/internal/proxynode/impl.go b/internal/proxynode/impl.go index d0c898e5b743813825ea9af4b7cdd5e6028a9301..593ac8b7e7afd9a9f97e62f45cb152f6b449a669 100644 --- a/internal/proxynode/impl.go +++ b/internal/proxynode/impl.go @@ -2,13 +2,10 @@ package proxynode import ( "context" - "errors" "log" "strconv" "time" - "github.com/zilliztech/milvus-distributed/internal/proto/datapb" - "github.com/zilliztech/milvus-distributed/internal/msgstream" "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" @@ -593,109 +590,3 @@ func (node *NodeImpl) Flush(request *milvuspb.FlushRequest) (*commonpb.Status, e func (node *NodeImpl) GetDdChannel(request *commonpb.Empty) (*milvuspb.StringResponse, error) { panic("implement me") } - -func (node *NodeImpl) GetPersistentSegmentInfo(req *milvuspb.PersistentSegmentInfoRequest) (*milvuspb.PersistentSegmentInfoResponse, error) { - resp := &milvuspb.PersistentSegmentInfoResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, - }, - } - segments, err := node.getSegmentsOfCollection(req.DbName, req.CollectionName) - if err != nil { - resp.Status.Reason = err.Error() - return resp, nil - } - infoResp, err := node.dataServiceClient.GetSegmentInfo(&datapb.SegmentInfoRequest{ - Base: &commonpb.MsgBase{ - MsgType: commonpb.MsgType_kSegmentInfo, - MsgID: 0, - Timestamp: 0, - SourceID: Params.ProxyID, - }, - SegmentIDs: segments, - }) - if err != nil { - resp.Status.Reason = err.Error() - return resp, nil - } - if infoResp.Status.ErrorCode != commonpb.ErrorCode_SUCCESS { - resp.Status.Reason = infoResp.Status.Reason - return resp, nil - } - persistentInfos := make([]*milvuspb.PersistentSegmentInfo, len(infoResp.Infos)) - for i, info := range infoResp.Infos { - persistentInfos[i] = &milvuspb.PersistentSegmentInfo{ - SegmentID: info.SegmentID, - CollectionID: info.CollectionID, - PartitionID: info.PartitionID, - OpenTime: info.OpenTime, - SealedTime: info.SealedTime, - FlushedTime: info.FlushedTime, - NumRows: info.NumRows, - MemSize: info.MemSize, - State: info.State, - } - } - resp.Status.ErrorCode = commonpb.ErrorCode_SUCCESS - resp.Infos = persistentInfos - return resp, nil -} - -func (node *NodeImpl) getSegmentsOfCollection(dbName string, collectionName string) ([]UniqueID, error) { - describeCollectionResponse, err := node.masterClient.DescribeCollection(&milvuspb.DescribeCollectionRequest{ - Base: &commonpb.MsgBase{ - MsgType: commonpb.MsgType_kDescribeCollection, - MsgID: 0, - Timestamp: 0, - SourceID: Params.ProxyID, - }, - DbName: dbName, - CollectionName: collectionName, - }) - if err != nil { - return nil, err - } - if describeCollectionResponse.Status.ErrorCode != commonpb.ErrorCode_SUCCESS { - return nil, errors.New(describeCollectionResponse.Status.Reason) - } - collectionID := describeCollectionResponse.CollectionID - showPartitionsResp, err := node.masterClient.ShowPartitions(&milvuspb.ShowPartitionRequest{ - Base: &commonpb.MsgBase{ - MsgType: commonpb.MsgType_kShowPartitions, - MsgID: 0, - Timestamp: 0, - SourceID: Params.ProxyID, - }, - DbName: dbName, - CollectionName: collectionName, - CollectionID: collectionID, - }) - if err != nil { - return nil, err - } - if showPartitionsResp.Status.ErrorCode != commonpb.ErrorCode_SUCCESS { - return nil, errors.New(showPartitionsResp.Status.Reason) - } - - ret := make([]UniqueID, 0) - for _, partitionID := range showPartitionsResp.PartitionIDs { - showSegmentResponse, err := node.masterClient.ShowSegments(&milvuspb.ShowSegmentRequest{ - Base: &commonpb.MsgBase{ - MsgType: commonpb.MsgType_kShowSegment, - MsgID: 0, - Timestamp: 0, - SourceID: Params.ProxyID, - }, - CollectionID: collectionID, - PartitionID: partitionID, - }) - if err != nil { - return nil, err - } - if showSegmentResponse.Status.ErrorCode != commonpb.ErrorCode_SUCCESS { - return nil, errors.New(showSegmentResponse.Status.Reason) - } - ret = append(ret, showSegmentResponse.SegmentIDs...) - } - return ret, nil -} diff --git a/internal/proxynode/interface.go b/internal/proxynode/interface.go index d7f8b048b544d8f6c326fccd99834eac77e946e5..f262a633d3f8b0ef0aa9454a166d799110eebfe0 100644 --- a/internal/proxynode/interface.go +++ b/internal/proxynode/interface.go @@ -22,7 +22,6 @@ type MasterClient interface { ShowPartitions(in *milvuspb.ShowPartitionRequest) (*milvuspb.ShowPartitionResponse, error) CreateIndex(in *milvuspb.CreateIndexRequest) (*commonpb.Status, error) DescribeIndex(in *milvuspb.DescribeIndexRequest) (*milvuspb.DescribeIndexResponse, error) - ShowSegments(in *milvuspb.ShowSegmentRequest) (*milvuspb.ShowSegmentResponse, error) } type IndexServiceClient interface { @@ -52,7 +51,6 @@ type DataServiceClient interface { GetCollectionStatistics(req *datapb.CollectionStatsRequest) (*datapb.CollectionStatsResponse, error) GetComponentStates() (*internalpb2.ComponentStates, error) - GetSegmentInfo(req *datapb.SegmentInfoRequest) (*datapb.SegmentInfoResponse, error) } type ProxyServiceClient interface { diff --git a/internal/proxyservice/impl.go b/internal/proxyservice/impl.go index 01d64c7afe83c2618464451346e80f43d9fbce55..d4951c28e3810560bb5dcad5eef16fdd31aa717f 100644 --- a/internal/proxyservice/impl.go +++ b/internal/proxyservice/impl.go @@ -7,6 +7,7 @@ import ( "os" "path" "runtime" + "strconv" "time" "github.com/zilliztech/milvus-distributed/internal/msgstream" @@ -108,6 +109,16 @@ func (s *ServiceImpl) Init() error { serviceTimeTickMsgStream.CreatePulsarProducers([]string{Params.ServiceTimeTickChannel}) log.Println("create service time tick producer channel: ", []string{Params.ServiceTimeTickChannel}) + channels := make([]string, Params.InsertChannelNum) + var i int64 = 0 + for ; i < Params.InsertChannelNum; i++ { + channels[i] = Params.InsertChannelPrefixName + strconv.FormatInt(i, 10) + } + insertTickMsgStream := pulsarms.NewPulsarMsgStream(s.ctx, 1024, 1024, dispatcherFactory.NewUnmarshalDispatcher()) + insertTickMsgStream.SetPulsarClient(Params.PulsarAddress) + insertTickMsgStream.CreatePulsarProducers(channels) + log.Println("create service time tick producer channel: ", channels) + nodeTimeTickMsgStream := pulsarms.NewPulsarMsgStream(s.ctx, 1024, 1024, dispatcherFactory.NewUnmarshalDispatcher()) nodeTimeTickMsgStream.SetPulsarClient(Params.PulsarAddress) nodeTimeTickMsgStream.CreatePulsarConsumers(Params.NodeTimeTickChannel, @@ -116,7 +127,7 @@ func (s *ServiceImpl) Init() error { ttBarrier := newSoftTimeTickBarrier(s.ctx, nodeTimeTickMsgStream, []UniqueID{0}, 10) log.Println("create soft time tick barrier ...") - s.tick = newTimeTick(s.ctx, ttBarrier, serviceTimeTickMsgStream) + s.tick = newTimeTick(s.ctx, ttBarrier, serviceTimeTickMsgStream, insertTickMsgStream) log.Println("create time tick ...") s.stateCode = internalpb2.StateCode_HEALTHY diff --git a/internal/proxyservice/paramtable.go b/internal/proxyservice/paramtable.go index 27d6b51098dd560ac407d66a74f568487c93a2d5..5a6a59fbb8e90980bbaa4afeeea5dd58e1998e39 100644 --- a/internal/proxyservice/paramtable.go +++ b/internal/proxyservice/paramtable.go @@ -9,11 +9,13 @@ import ( type ParamTable struct { paramtable.BaseTable - PulsarAddress string - MasterAddress string - NodeTimeTickChannel []string - ServiceTimeTickChannel string - DataServiceAddress string + PulsarAddress string + MasterAddress string + NodeTimeTickChannel []string + ServiceTimeTickChannel string + DataServiceAddress string + InsertChannelPrefixName string + InsertChannelNum int64 } var Params ParamTable @@ -21,11 +23,17 @@ var Params ParamTable func (pt *ParamTable) Init() { pt.BaseTable.Init() + if err := pt.LoadYaml("advanced/data_service.yaml"); err != nil { + panic(err) + } + pt.initPulsarAddress() pt.initMasterAddress() pt.initNodeTimeTickChannel() pt.initServiceTimeTickChannel() pt.initDataServiceAddress() + pt.initInsertChannelPrefixName() + pt.initInsertChannelNum() } func (pt *ParamTable) initPulsarAddress() { @@ -65,3 +73,15 @@ func (pt *ParamTable) initDataServiceAddress() { // NOT USED NOW pt.DataServiceAddress = "TODO: read from config" } + +func (pt *ParamTable) initInsertChannelNum() { + pt.InsertChannelNum = pt.ParseInt64("dataservice.insertChannelNum") +} + +func (pt *ParamTable) initInsertChannelPrefixName() { + var err error + pt.InsertChannelPrefixName, err = pt.Load("msgChannel.chanNamePrefix.dataServiceInsertChannel") + if err != nil { + panic(err) + } +} diff --git a/internal/proxyservice/timetick.go b/internal/proxyservice/timetick.go index 162aab64bd584c8569ec52dc83766502c328e842..f93ce864bfbfd54f429c982728f3e1ffdcfd6159 100644 --- a/internal/proxyservice/timetick.go +++ b/internal/proxyservice/timetick.go @@ -20,7 +20,7 @@ type ( TimeTickImpl struct { ttBarrier TimeTickBarrier - channel msgstream.MsgStream + channels []msgstream.MsgStream wg sync.WaitGroup ctx context.Context cancel context.CancelFunc @@ -58,17 +58,19 @@ func (tt *TimeTickImpl) Start() error { }, } msgPack.Msgs = append(msgPack.Msgs, timeTickMsg) - err = tt.channel.Produce(&msgPack) - if err != nil { - log.Println("send time tick error: ", err) - } else { + for _, channel := range tt.channels { + err = channel.Broadcast(&msgPack) + if err != nil { + log.Println("send time tick error: ", err) + } } - log.Println("send to master: ", current) } } }() - tt.channel.Start() + for _, channel := range tt.channels { + channel.Start() + } err := tt.ttBarrier.Start() if err != nil { @@ -79,13 +81,15 @@ func (tt *TimeTickImpl) Start() error { } func (tt *TimeTickImpl) Close() { - tt.channel.Close() + for _, channel := range tt.channels { + channel.Close() + } tt.ttBarrier.Close() tt.cancel() tt.wg.Wait() } -func newTimeTick(ctx context.Context, ttBarrier TimeTickBarrier, channel msgstream.MsgStream) TimeTick { +func newTimeTick(ctx context.Context, ttBarrier TimeTickBarrier, channels ...msgstream.MsgStream) TimeTick { ctx1, cancel := context.WithCancel(ctx) - return &TimeTickImpl{ctx: ctx1, cancel: cancel, ttBarrier: ttBarrier, channel: channel} + return &TimeTickImpl{ctx: ctx1, cancel: cancel, ttBarrier: ttBarrier, channels: channels} }