diff --git a/cmd/masterservice/main.go b/cmd/masterservice/main.go deleted file mode 100644 index 9b050f956098fd4ec7616ae7f509c9307f1017ce..0000000000000000000000000000000000000000 --- a/cmd/masterservice/main.go +++ /dev/null @@ -1,93 +0,0 @@ -package main - -import ( - "context" - "fmt" - "log" - "os" - "os/signal" - "syscall" - - ds "github.com/zilliztech/milvus-distributed/internal/dataservice" - dsc "github.com/zilliztech/milvus-distributed/internal/distributed/dataservice" - isc "github.com/zilliztech/milvus-distributed/internal/distributed/indexservice/client" - msc "github.com/zilliztech/milvus-distributed/internal/distributed/masterservice" - psc "github.com/zilliztech/milvus-distributed/internal/distributed/proxyservice" - is "github.com/zilliztech/milvus-distributed/internal/indexservice" - ms "github.com/zilliztech/milvus-distributed/internal/masterservice" - "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" - "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" -) - -const reTryCnt = 3 - -func main() { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - log.Printf("master service address : %s:%d", ms.Params.Address, ms.Params.Port) - - svr, err := msc.NewGrpcServer(ctx) - if err != nil { - panic(err) - } - - log.Printf("proxy service address : %s", psc.Params.NetworkAddress()) - //proxyService := psc.NewClient(ctx, psc.Params.NetworkAddress()) - - //TODO, test proxy service GetComponentStates, before set - - //if err = svr.SetProxyService(proxyService); err != nil { - // panic(err) - //} - - log.Printf("data service address : %s:%d", ds.Params.Address, ds.Params.Port) - dataService := dsc.NewClient(fmt.Sprintf("%s:%d", ds.Params.Address, ds.Params.Port)) - if err = dataService.Init(); err != nil { - panic(err) - } - if err = dataService.Start(); err != nil { - panic(err) - } - cnt := 0 - for cnt = 0; cnt < reTryCnt; cnt++ { - dsStates, err := dataService.GetComponentStates() - if err != nil { - continue - } - if dsStates.Status.ErrorCode != commonpb.ErrorCode_SUCCESS { - continue - } - if dsStates.State.StateCode != internalpb2.StateCode_INITIALIZING && dsStates.State.StateCode != internalpb2.StateCode_HEALTHY { - continue - } - break - } - if cnt >= reTryCnt { - panic("connect to data service failed") - } - - //if err = svr.SetDataService(dataService); err != nil { - // panic(err) - //} - - log.Printf("index service address : %s", is.Params.Address) - indexService := isc.NewClient(is.Params.Address) - - if err = svr.SetIndexService(indexService); err != nil { - panic(err) - } - - if err = svr.Start(); err != nil { - panic(err) - } - - sc := make(chan os.Signal, 1) - signal.Notify(sc, - syscall.SIGHUP, - syscall.SIGINT, - syscall.SIGTERM, - syscall.SIGQUIT) - sig := <-sc - log.Printf("Got %s signal to exit", sig.String()) - _ = svr.Stop() -} diff --git a/configs/advanced/master.yaml b/configs/advanced/master.yaml index 3e1c4e7816b0ed21682547262e140e86557c910a..69474322ae7cca630a3823294effab00227af124 100644 --- a/configs/advanced/master.yaml +++ b/configs/advanced/master.yaml @@ -23,5 +23,4 @@ master: IDAssignExpiration: 2000 # ms maxPartitionNum: 4096 - nodeID: 100 - timeout: 5 # time out, 5 seconds \ No newline at end of file + nodeID: 100 \ No newline at end of file diff --git a/internal/dataservice/meta.go b/internal/dataservice/meta.go index 8499b578b8b5c5a76f5eb380f8d4bacdbf84f27f..24bce18a77d3f3af1e9acd54d99ba9b51db621ea 100644 --- a/internal/dataservice/meta.go +++ b/internal/dataservice/meta.go @@ -29,7 +29,6 @@ type ( client kv.TxnBase // client of a reliable kv service, i.e. etcd client collID2Info map[UniqueID]*collectionInfo // collection id to collection info segID2Info map[UniqueID]*datapb.SegmentInfo // segment id to segment info - allocator allocator ddLock sync.RWMutex } ) @@ -50,12 +49,11 @@ func (err errCollectionNotFound) Error() string { return fmt.Sprintf("collection %d not found", err.collectionID) } -func newMeta(kv kv.TxnBase, allocator allocator) (*meta, error) { +func newMeta(kv kv.TxnBase) (*meta, error) { mt := &meta{ client: kv, collID2Info: make(map[UniqueID]*collectionInfo), segID2Info: make(map[UniqueID]*datapb.SegmentInfo), - allocator: allocator, } err := mt.reloadFromKV() if err != nil { @@ -120,29 +118,6 @@ func (meta *meta) GetCollection(collectionID UniqueID) (*collectionInfo, error) return collectionInfo, nil } -func (meta *meta) BuildSegment(collectionID UniqueID, partitionID UniqueID, channelRange []string) (*datapb.SegmentInfo, error) { - id, err := meta.allocator.allocID() - if err != nil { - return nil, err - } - ts, err := meta.allocator.allocTimestamp() - if err != nil { - return nil, err - } - - return &datapb.SegmentInfo{ - SegmentID: id, - CollectionID: collectionID, - PartitionID: partitionID, - InsertChannels: channelRange, - OpenTime: ts, - SealedTime: 0, - NumRows: 0, - MemSize: 0, - State: datapb.SegmentState_SegmentGrowing, - }, nil -} - func (meta *meta) AddSegment(segmentInfo *datapb.SegmentInfo) error { meta.ddLock.Lock() defer meta.ddLock.Unlock() @@ -191,23 +166,37 @@ func (meta *meta) GetSegment(segID UniqueID) (*datapb.SegmentInfo, error) { return segmentInfo, nil } -func (meta *meta) SealSegment(segID UniqueID) error { +func (meta *meta) OpenSegment(segmentID UniqueID, timetick Timestamp) error { meta.ddLock.Lock() defer meta.ddLock.Unlock() - segInfo, ok := meta.segID2Info[segID] + segInfo, ok := meta.segID2Info[segmentID] if !ok { - return newErrSegmentNotFound(segID) + return newErrSegmentNotFound(segmentID) } - ts, err := meta.allocator.allocTimestamp() + segInfo.OpenTime = timetick + + err := meta.saveSegmentInfo(segInfo) if err != nil { + _ = meta.reloadFromKV() return err } - segInfo.SealedTime = ts - segInfo.State = datapb.SegmentState_SegmentSealed + return nil +} + +func (meta *meta) SealSegment(segID UniqueID, timetick Timestamp) error { + meta.ddLock.Lock() + defer meta.ddLock.Unlock() + + segInfo, ok := meta.segID2Info[segID] + if !ok { + return newErrSegmentNotFound(segID) + } + + segInfo.SealedTime = timetick - err = meta.saveSegmentInfo(segInfo) + err := meta.saveSegmentInfo(segInfo) if err != nil { _ = meta.reloadFromKV() return err @@ -215,7 +204,7 @@ func (meta *meta) SealSegment(segID UniqueID) error { return nil } -func (meta *meta) FlushSegment(segID UniqueID) error { +func (meta *meta) FlushSegment(segID UniqueID, timetick Timestamp) error { meta.ddLock.Lock() defer meta.ddLock.Unlock() @@ -224,14 +213,26 @@ func (meta *meta) FlushSegment(segID UniqueID) error { return newErrSegmentNotFound(segID) } - ts, err := meta.allocator.allocTimestamp() + segInfo.FlushedTime = timetick + + err := meta.saveSegmentInfo(segInfo) if err != nil { + _ = meta.reloadFromKV() return err } - segInfo.FlushedTime = ts - segInfo.State = datapb.SegmentState_SegmentFlushed + return nil +} + +func (meta *meta) SetSegmentState(segmentID UniqueID, state datapb.SegmentState) error { + meta.ddLock.Lock() + defer meta.ddLock.Unlock() - err = meta.saveSegmentInfo(segInfo) + segInfo, ok := meta.segID2Info[segmentID] + if !ok { + return newErrSegmentNotFound(segmentID) + } + segInfo.State = state + err := meta.saveSegmentInfo(segInfo) if err != nil { _ = meta.reloadFromKV() return err @@ -316,3 +317,16 @@ func (meta *meta) saveSegmentInfo(segmentInfo *datapb.SegmentInfo) error { func (meta *meta) removeSegmentInfo(segID UniqueID) error { return meta.client.Remove("/segment/" + strconv.FormatInt(segID, 10)) } +func BuildSegment(collectionID UniqueID, partitionID UniqueID, segmentID UniqueID, channelRange []string) (*datapb.SegmentInfo, error) { + return &datapb.SegmentInfo{ + SegmentID: segmentID, + CollectionID: collectionID, + PartitionID: partitionID, + InsertChannels: channelRange, + OpenTime: 0, + SealedTime: 0, + NumRows: 0, + MemSize: 0, + State: datapb.SegmentState_SegmentGrowing, + }, nil +} diff --git a/internal/dataservice/meta_test.go b/internal/dataservice/meta_test.go index 2474763ae0c0155fac229241cb5068cfc5369e13..26ec1296ea98f00678bb7b38837bc58c905b9a00 100644 --- a/internal/dataservice/meta_test.go +++ b/internal/dataservice/meta_test.go @@ -46,7 +46,9 @@ func TestSegment(t *testing.T) { assert.Nil(t, err) id, err := mockAllocator.allocID() assert.Nil(t, err) - segmentInfo, err := meta.BuildSegment(id, 100, []string{"c1", "c2"}) + segID, err := mockAllocator.allocID() + assert.Nil(t, err) + segmentInfo, err := BuildSegment(id, 100, segID, []string{"c1", "c2"}) assert.Nil(t, err) err = meta.AddSegment(segmentInfo) assert.Nil(t, err) @@ -59,9 +61,9 @@ func TestSegment(t *testing.T) { ids = meta.GetSegmentsByCollectionAndPartitionID(id, 100) assert.EqualValues(t, 1, len(ids)) assert.EqualValues(t, segmentInfo.SegmentID, ids[0]) - err = meta.SealSegment(segmentInfo.SegmentID) + err = meta.SealSegment(segmentInfo.SegmentID, 100) assert.Nil(t, err) - err = meta.FlushSegment(segmentInfo.SegmentID) + err = meta.FlushSegment(segmentInfo.SegmentID, 200) assert.Nil(t, err) info, err = meta.GetSegment(segmentInfo.SegmentID) assert.Nil(t, err) diff --git a/internal/dataservice/mock.go b/internal/dataservice/mock.go index e289c383facb7585cd0752e1aa8841303fbc5745..2922d33abf075730665e8b95b4990fe50d1ff71b 100644 --- a/internal/dataservice/mock.go +++ b/internal/dataservice/mock.go @@ -12,7 +12,7 @@ import ( func newMemoryMeta(allocator allocator) (*meta, error) { memoryKV := memkv.NewMemoryKV() - return newMeta(memoryKV, allocator) + return newMeta(memoryKV) } type MockAllocator struct { diff --git a/internal/dataservice/param.go b/internal/dataservice/param.go index 5d96614c4410a1fd8724299762b445227925cf73..2e3f00e37656b43eb5410e8b38bc8b0f51ec06ae 100644 --- a/internal/dataservice/param.go +++ b/internal/dataservice/param.go @@ -29,7 +29,8 @@ type ParamTable struct { StatisticsChannelName string TimeTickChannelName string DataNodeNum int - SegmentChannelName string // todo init + SegmentInfoChannelName string + DataServiceSubscriptionName string } var Params ParamTable @@ -62,6 +63,8 @@ func (p *ParamTable) Init() { p.initStatisticsChannelName() p.initTimeTickChannelName() p.initDataNodeNum() + p.initSegmentInfoChannelName() + p.initDataServiceSubscriptionName() } func (p *ParamTable) initAddress() { @@ -151,3 +154,11 @@ func (p *ParamTable) initTimeTickChannelName() { func (p *ParamTable) initDataNodeNum() { p.DataNodeNum = 2 } + +func (p *ParamTable) initSegmentInfoChannelName() { + p.SegmentInfoChannelName = "segment-info-channel" +} + +func (p *ParamTable) initDataServiceSubscriptionName() { + p.DataServiceSubscriptionName = "dataserive-sub" +} diff --git a/internal/dataservice/segment_allocator.go b/internal/dataservice/segment_allocator.go index b92e88e8413ed448e03ea178d00adf6cc1f49e03..53e81b25d98cbd5a09a6624acf3c684457c3679d 100644 --- a/internal/dataservice/segment_allocator.go +++ b/internal/dataservice/segment_allocator.go @@ -2,7 +2,6 @@ package dataservice import ( "fmt" - "log" "strconv" "sync" "time" @@ -41,7 +40,7 @@ type segmentAllocator interface { // ExpireAllocations check all allocations' expire time and remove the expired allocation. ExpireAllocations(timeTick Timestamp) error // SealAllSegments get all opened segment ids of collection. return success and failed segment ids - SealAllSegments(collectionID UniqueID) (bool, []UniqueID) + SealAllSegments(collectionID UniqueID) // IsAllocationsExpired check all allocations of segment expired. IsAllocationsExpired(segmentID UniqueID, ts Timestamp) (bool, error) } @@ -208,9 +207,6 @@ func (allocator *segmentAllocatorImpl) SealSegment(segmentID UniqueID) error { if !ok { return nil } - if err := allocator.mt.SealSegment(segmentID); err != nil { - return err - } status.sealed = true return nil } @@ -246,23 +242,15 @@ func (allocator *segmentAllocatorImpl) IsAllocationsExpired(segmentID UniqueID, return status.lastExpireTime <= ts, nil } -func (allocator *segmentAllocatorImpl) SealAllSegments(collectionID UniqueID) (bool, []UniqueID) { +func (allocator *segmentAllocatorImpl) SealAllSegments(collectionID UniqueID) { allocator.mu.Lock() defer allocator.mu.Unlock() - failed := make([]UniqueID, 0) - success := true for _, status := range allocator.segments { if status.collectionID == collectionID { if status.sealed { continue } - if err := allocator.mt.SealSegment(status.id); err != nil { - log.Printf("seal segment error: %s", err.Error()) - failed = append(failed, status.id) - success = false - } status.sealed = true } } - return success, failed } diff --git a/internal/dataservice/segment_allocator_test.go b/internal/dataservice/segment_allocator_test.go index fd9ba3533c3a9e286dcea2cce56c5b975ad9f143..03697593ae60e17576082f455961ed6d001e58f0 100644 --- a/internal/dataservice/segment_allocator_test.go +++ b/internal/dataservice/segment_allocator_test.go @@ -25,7 +25,9 @@ func TestAllocSegment(t *testing.T) { Schema: schema, }) assert.Nil(t, err) - segmentInfo, err := meta.BuildSegment(collID, 100, []string{"c1", "c2"}) + id, err := mockAllocator.allocID() + assert.Nil(t, err) + segmentInfo, err := BuildSegment(collID, 100, id, []string{"c1", "c2"}) assert.Nil(t, err) err = meta.AddSegment(segmentInfo) assert.Nil(t, err) @@ -76,7 +78,9 @@ func TestSealSegment(t *testing.T) { assert.Nil(t, err) var lastSegID UniqueID for i := 0; i < 10; i++ { - segmentInfo, err := meta.BuildSegment(collID, 100, []string{"c" + strconv.Itoa(i)}) + id, err := mockAllocator.allocID() + assert.Nil(t, err) + segmentInfo, err := BuildSegment(collID, 100, id, []string{"c" + strconv.Itoa(i)}) assert.Nil(t, err) err = meta.AddSegment(segmentInfo) assert.Nil(t, err) @@ -87,9 +91,7 @@ func TestSealSegment(t *testing.T) { err = segAllocator.SealSegment(lastSegID) assert.Nil(t, err) - success, ids := segAllocator.SealAllSegments(collID) - assert.True(t, success) - assert.EqualValues(t, 0, len(ids)) + segAllocator.SealAllSegments(collID) sealedSegments, err := segAllocator.GetSealedSegments() assert.Nil(t, err) assert.EqualValues(t, 10, len(sealedSegments)) @@ -111,7 +113,9 @@ func TestExpireSegment(t *testing.T) { Schema: schema, }) assert.Nil(t, err) - segmentInfo, err := meta.BuildSegment(collID, 100, []string{"c1", "c2"}) + id, err := mockAllocator.allocID() + assert.Nil(t, err) + segmentInfo, err := BuildSegment(collID, 100, id, []string{"c1", "c2"}) assert.Nil(t, err) err = meta.AddSegment(segmentInfo) assert.Nil(t, err) diff --git a/internal/dataservice/server.go b/internal/dataservice/server.go index a0c0cc8e51d23031ea69fe18118c0a12883c3c6e..e3782b22aeea93a9dc9c4e534496d37335a63be7 100644 --- a/internal/dataservice/server.go +++ b/internal/dataservice/server.go @@ -6,6 +6,8 @@ import ( "log" "sync" + "github.com/zilliztech/milvus-distributed/internal/msgstream/util" + "github.com/zilliztech/milvus-distributed/internal/msgstream" "github.com/zilliztech/milvus-distributed/internal/msgstream/pulsarms" @@ -28,6 +30,7 @@ const role = "dataservice" type DataService interface { typeutil.Service + typeutil.Component RegisterNode(req *datapb.RegisterNodeRequest) (*datapb.RegisterNodeResponse, error) Flush(req *datapb.FlushRequest) (*commonpb.Status, error) @@ -35,36 +38,35 @@ type DataService interface { ShowSegments(req *datapb.ShowSegmentRequest) (*datapb.ShowSegmentResponse, error) GetSegmentStates(req *datapb.SegmentStatesRequest) (*datapb.SegmentStatesResponse, error) GetInsertBinlogPaths(req *datapb.InsertBinlogPathRequest) (*datapb.InsertBinlogPathsResponse, error) - - GetInsertChannels(req *datapb.InsertChannelRequest) (*internalpb2.StringList, error) + GetSegmentInfoChannel() (string, error) + GetInsertChannels(req *datapb.InsertChannelRequest) ([]string, error) GetCollectionStatistics(req *datapb.CollectionStatsRequest) (*datapb.CollectionStatsResponse, error) GetPartitionStatistics(req *datapb.PartitionStatsRequest) (*datapb.PartitionStatsResponse, error) GetComponentStates() (*internalpb2.ComponentStates, error) - GetTimeTickChannel() (*milvuspb.StringResponse, error) - GetStatisticsChannel() (*milvuspb.StringResponse, error) } type ( UniqueID = typeutil.UniqueID Timestamp = typeutil.Timestamp Server struct { - ctx context.Context - serverLoopCtx context.Context - serverLoopCancel context.CancelFunc - serverLoopWg sync.WaitGroup - state internalpb2.StateCode - client *etcdkv.EtcdKV - meta *meta - segAllocator segmentAllocator - statsHandler *statsHandler - insertChannelMgr *insertChannelManager - allocator allocator - cluster *dataNodeCluster - msgProducer *timesync.MsgProducer - registerFinishCh chan struct{} - masterClient *masterservice.GrpcClient - ttMsgStream msgstream.MsgStream - ddChannelName string + ctx context.Context + serverLoopCtx context.Context + serverLoopCancel context.CancelFunc + serverLoopWg sync.WaitGroup + state internalpb2.StateCode + client *etcdkv.EtcdKV + meta *meta + segAllocator segmentAllocator + statsHandler *statsHandler + insertChannelMgr *insertChannelManager + allocator allocator + cluster *dataNodeCluster + msgProducer *timesync.MsgProducer + registerFinishCh chan struct{} + masterClient *masterservice.GrpcClient + ttMsgStream msgstream.MsgStream + ddChannelName string + segmentInfoStream msgstream.MsgStream } ) @@ -105,6 +107,7 @@ func (s *Server) Start() error { return err } + s.initSegmentInfoChannel() s.startServerLoop() s.state = internalpb2.StateCode_HEALTHY log.Println("start success") @@ -118,7 +121,7 @@ func (s *Server) initMeta() error { } etcdKV := etcdkv.NewEtcdKV(etcdClient, Params.MetaRootPath) s.client = etcdKV - s.meta, err = newMeta(etcdKV, s.allocator) + s.meta, err = newMeta(etcdKV) if err != nil { return err } @@ -132,7 +135,10 @@ func (s *Server) waitDataNodeRegister() { } func (s *Server) initMsgProducer() error { - s.ttMsgStream = pulsarms.NewPulsarTtMsgStream(s.ctx, 1024) + ttMsgStream := pulsarms.NewPulsarTtMsgStream(s.ctx, 1024) + ttMsgStream.SetPulsarClient(Params.PulsarAddress) + ttMsgStream.CreatePulsarConsumers([]string{Params.TimeTickChannelName}, Params.DataServiceSubscriptionName, util.NewUnmarshalDispatcher(), 1024) + s.ttMsgStream = ttMsgStream s.ttMsgStream.Start() timeTickBarrier := timesync.NewHardTimeTickBarrier(s.ttMsgStream, s.cluster.GetNodeIDs()) dataNodeTTWatcher := newDataNodeTimeTickWatcher(s.meta, s.segAllocator, s.cluster) @@ -154,6 +160,8 @@ func (s *Server) startServerLoop() { func (s *Server) startStatsChannel(ctx context.Context) { defer s.serverLoopWg.Done() statsStream := pulsarms.NewPulsarMsgStream(ctx, 1024) + statsStream.SetPulsarClient(Params.PulsarAddress) + statsStream.CreatePulsarConsumers([]string{Params.StatisticsChannelName}, Params.DataServiceSubscriptionName, util.NewUnmarshalDispatcher(), 1024) statsStream.Start() defer statsStream.Close() for { @@ -175,6 +183,14 @@ func (s *Server) startStatsChannel(ctx context.Context) { } } +func (s *Server) initSegmentInfoChannel() { + segmentInfoStream := pulsarms.NewPulsarMsgStream(s.ctx, 1024) + segmentInfoStream.SetPulsarClient(Params.PulsarAddress) + segmentInfoStream.CreatePulsarProducers([]string{Params.SegmentInfoChannelName}) + s.segmentInfoStream = segmentInfoStream + s.segmentInfoStream.Start() +} + func (s *Server) loadMetaFromMaster() error { log.Println("loading collection meta from master") collections, err := s.masterClient.ShowCollections(&milvuspb.ShowCollectionRequest{ @@ -236,6 +252,7 @@ func (s *Server) loadMetaFromMaster() error { func (s *Server) Stop() error { s.ttMsgStream.Close() s.msgProducer.Close() + s.segmentInfoStream.Close() s.stopServerLoop() return nil } @@ -266,22 +283,12 @@ func (s *Server) GetComponentStates() (*internalpb2.ComponentStates, error) { return resp, nil } -func (s *Server) GetTimeTickChannel() (*milvuspb.StringResponse, error) { - return &milvuspb.StringResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_SUCCESS, - }, - Value: Params.TimeTickChannelName, - }, nil +func (s *Server) GetTimeTickChannel() (string, error) { + return Params.TimeTickChannelName, nil } -func (s *Server) GetStatisticsChannel() (*milvuspb.StringResponse, error) { - return &milvuspb.StringResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_SUCCESS, - }, - Value: Params.StatisticsChannelName, - }, nil +func (s *Server) GetStatisticsChannel() (string, error) { + return Params.StatisticsChannelName, nil } func (s *Server) RegisterNode(req *datapb.RegisterNodeRequest) (*datapb.RegisterNodeResponse, error) { @@ -291,7 +298,7 @@ func (s *Server) RegisterNode(req *datapb.RegisterNodeRequest) (*datapb.Register }, } s.cluster.Register(req.Address.Ip, req.Address.Port, req.Base.SourceID) - if len(s.ddChannelName) == 0 { + if s.ddChannelName == "" { resp, err := s.masterClient.GetDdChannel() if err != nil { ret.Status.Reason = err.Error() @@ -306,21 +313,14 @@ func (s *Server) RegisterNode(req *datapb.RegisterNodeRequest) (*datapb.Register {Key: "DDChannelName", Value: s.ddChannelName}, {Key: "SegmentStatisticsChannelName", Value: Params.StatisticsChannelName}, {Key: "TimeTickChannelName", Value: Params.TimeTickChannelName}, - {Key: "CompleteFlushChannelName", Value: Params.SegmentChannelName}, + {Key: "CompleteFlushChannelName", Value: Params.SegmentInfoChannelName}, }, } return ret, nil } func (s *Server) Flush(req *datapb.FlushRequest) (*commonpb.Status, error) { - success, fails := s.segAllocator.SealAllSegments(req.CollectionID) - log.Printf("sealing failed segments: %v", fails) - if !success { - return &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, - Reason: fmt.Sprintf("flush failed, %d segment can not be sealed", len(fails)), - }, nil - } + s.segAllocator.SealAllSegments(req.CollectionID) return &commonpb.Status{ ErrorCode: commonpb.ErrorCode_SUCCESS, }, nil @@ -383,7 +383,12 @@ func (s *Server) openNewSegment(collectionID UniqueID, partitionID UniqueID, cha if err != nil { return err } - segmentInfo, err := s.meta.BuildSegment(collectionID, partitionID, group) + + id, err := s.allocator.allocID() + if err != nil { + return err + } + segmentInfo, err := BuildSegment(collectionID, partitionID, id, group) if err != nil { return err } @@ -425,22 +430,14 @@ func (s *Server) GetInsertBinlogPaths(req *datapb.InsertBinlogPathRequest) (*dat panic("implement me") } -func (s *Server) GetInsertChannels(req *datapb.InsertChannelRequest) (*internalpb2.StringList, error) { - resp := &internalpb2.StringList{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_SUCCESS, - }, - } +func (s *Server) GetInsertChannels(req *datapb.InsertChannelRequest) ([]string, error) { contains, ret := s.insertChannelMgr.ContainsCollection(req.CollectionID) if contains { - resp.Values = ret - return resp, nil + return ret, nil } channelGroups, err := s.insertChannelMgr.AllocChannels(req.CollectionID, s.cluster.GetNumOfNodes()) if err != nil { - resp.Status.ErrorCode = commonpb.ErrorCode_UNEXPECTED_ERROR - resp.Status.Reason = err.Error() - return resp, nil + return nil, err } channels := make([]string, Params.InsertChannelNumPerCollection) @@ -449,8 +446,7 @@ func (s *Server) GetInsertChannels(req *datapb.InsertChannelRequest) (*internalp } s.cluster.WatchInsertChannels(channelGroups) - resp.Values = channels - return resp, nil + return channels, nil } func (s *Server) GetCollectionStatistics(req *datapb.CollectionStatsRequest) (*datapb.CollectionStatsResponse, error) { @@ -462,3 +458,7 @@ func (s *Server) GetPartitionStatistics(req *datapb.PartitionStatsRequest) (*dat // todo implement return nil, nil } + +func (s *Server) GetSegmentInfoChannel() (string, error) { + return Params.SegmentInfoChannelName, nil +} diff --git a/internal/dataservice/stats_handler.go b/internal/dataservice/stats_handler.go index 387422f0722883c28ab9d3c048560cfa471fc24d..2c1edc95f0ecfd81f5fee903b08854e884ca6346 100644 --- a/internal/dataservice/stats_handler.go +++ b/internal/dataservice/stats_handler.go @@ -20,6 +20,10 @@ func (handler *statsHandler) HandleSegmentStat(segStats *internalpb2.SegmentStat return err } + //if segStats.IsNewSegment { + // segMeta.OpenTime = segStats.CreateTime + // segMeta.segStats.StartPositions + //} segMeta.NumRows = segStats.NumRows segMeta.MemSize = segStats.MemorySize diff --git a/internal/dataservice/watcher.go b/internal/dataservice/watcher.go index 581e1485574084f54c7e6dcc02226ed12ca9e226..9246ba702b84e232e2ec43727283209d5ce15b14 100644 --- a/internal/dataservice/watcher.go +++ b/internal/dataservice/watcher.go @@ -66,7 +66,7 @@ func (watcher *dataNodeTimeTickWatcher) StartBackgroundLoop(ctx context.Context) for { select { case <-ctx.Done(): - log.Println("data node time tick watcher clsoed") + log.Println("data node time tick watcher closed") return case msg := <-watcher.msgQueue: segments, err := watcher.allocator.GetSealedSegments() @@ -86,6 +86,10 @@ func (watcher *dataNodeTimeTickWatcher) StartBackgroundLoop(ctx context.Context) log.Println(err.Error()) continue } + if err = watcher.meta.SetSegmentState(id, datapb.SegmentState_SegmentSealed); err != nil { + log.Println(err.Error()) + continue + } watcher.cluster.FlushSegment(&datapb.FlushSegRequest{ Base: &commonpb.MsgBase{ MsgType: commonpb.MsgType_kShowCollections, diff --git a/internal/distributed/dataservice/client.go b/internal/distributed/dataservice/client.go index 354a187f0852c9e186763e1128180a46c9d6b245..7acdc45f6d28c80d3cbb4bad5a9fd079dbe13c37 100644 --- a/internal/distributed/dataservice/client.go +++ b/internal/distributed/dataservice/client.go @@ -2,14 +2,13 @@ package dataservice import ( "context" + "errors" "time" "google.golang.org/grpc" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" - "github.com/zilliztech/milvus-distributed/internal/proto/milvuspb" - "github.com/zilliztech/milvus-distributed/internal/proto/datapb" "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" ) @@ -59,12 +58,26 @@ func (c *Client) GetComponentStates() (*internalpb2.ComponentStates, error) { return c.grpcClient.GetComponentStates(context.Background(), nil) } -func (c *Client) GetTimeTickChannel() (*milvuspb.StringResponse, error) { - return c.grpcClient.GetTimeTickChannel(context.Background(), nil) +func (c *Client) GetTimeTickChannel() (string, error) { + resp, err := c.grpcClient.GetTimeTickChannel(context.Background(), nil) + if err != nil { + return "", err + } + if resp.Status.ErrorCode != commonpb.ErrorCode_SUCCESS { + return "", errors.New(resp.Status.Reason) + } + return resp.Value, nil } -func (c *Client) GetStatisticsChannel() (*milvuspb.StringResponse, error) { - return c.grpcClient.GetStatisticsChannel(context.Background(), nil) +func (c *Client) GetStatisticsChannel() (string, error) { + resp, err := c.grpcClient.GetStatisticsChannel(context.Background(), nil) + if err != nil { + return "", err + } + if resp.Status.ErrorCode != commonpb.ErrorCode_SUCCESS { + return "", errors.New(resp.Status.Reason) + } + return resp.Value, nil } func (c *Client) RegisterNode(req *datapb.RegisterNodeRequest) (*datapb.RegisterNodeResponse, error) { @@ -91,8 +104,15 @@ func (c *Client) GetInsertBinlogPaths(req *datapb.InsertBinlogPathRequest) (*dat return c.grpcClient.GetInsertBinlogPaths(context.Background(), req) } -func (c *Client) GetInsertChannels(req *datapb.InsertChannelRequest) (*internalpb2.StringList, error) { - return c.grpcClient.GetInsertChannels(context.Background(), req) +func (c *Client) GetInsertChannels(req *datapb.InsertChannelRequest) ([]string, error) { + resp, err := c.grpcClient.GetInsertChannels(context.Background(), req) + if err != nil { + return nil, err + } + if resp.Status.ErrorCode != commonpb.ErrorCode_SUCCESS { + return nil, errors.New(resp.Status.Reason) + } + return resp.Values, nil } func (c *Client) GetCollectionStatistics(req *datapb.CollectionStatsRequest) (*datapb.CollectionStatsResponse, error) { @@ -102,3 +122,14 @@ func (c *Client) GetCollectionStatistics(req *datapb.CollectionStatsRequest) (*d func (c *Client) GetPartitionStatistics(req *datapb.PartitionStatsRequest) (*datapb.PartitionStatsResponse, error) { return c.grpcClient.GetPartitionStatistics(context.Background(), req) } + +func (c *Client) GetSegmentInfoChannel() (string, error) { + resp, err := c.grpcClient.GetSegmentInfoChannel(context.Background(), nil) + if err != nil { + return "", err + } + if resp.Status.ErrorCode != commonpb.ErrorCode_SUCCESS { + return "", errors.New(resp.Status.Reason) + } + return resp.Value, nil +} diff --git a/internal/distributed/dataservice/grpc_service.go b/internal/distributed/dataservice/grpc_service.go index 724ef09d2fa2ffcec201d043bdc66f86f6ef37aa..37a1edf2d831fb05bb7b2830a9f7663c14842859 100644 --- a/internal/distributed/dataservice/grpc_service.go +++ b/internal/distributed/dataservice/grpc_service.go @@ -108,7 +108,20 @@ func (s *Service) GetInsertBinlogPaths(ctx context.Context, request *datapb.Inse } func (s *Service) GetInsertChannels(ctx context.Context, request *datapb.InsertChannelRequest) (*internalpb2.StringList, error) { - return s.server.GetInsertChannels(request) + resp := &internalpb2.StringList{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_SUCCESS, + }, + } + channels, err := s.server.GetInsertChannels(request) + if err != nil { + resp.Status.ErrorCode = commonpb.ErrorCode_UNEXPECTED_ERROR + resp.Status.Reason = err.Error() + return resp, nil + } + + resp.Values = channels + return resp, nil } func (s *Service) GetCollectionStatistics(ctx context.Context, request *datapb.CollectionStatsRequest) (*datapb.CollectionStatsResponse, error) { @@ -124,9 +137,51 @@ func (s *Service) GetComponentStates(ctx context.Context, empty *commonpb.Empty) } func (s *Service) GetTimeTickChannel(ctx context.Context, empty *commonpb.Empty) (*milvuspb.StringResponse, error) { - return s.server.GetTimeTickChannel() + resp := &milvuspb.StringResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_SUCCESS, + }, + } + channel, err := s.server.GetTimeTickChannel() + if err != nil { + resp.Status.ErrorCode = commonpb.ErrorCode_UNEXPECTED_ERROR + resp.Status.Reason = err.Error() + return resp, nil + } + + resp.Value = channel + return resp, nil } func (s *Service) GetStatisticsChannel(ctx context.Context, empty *commonpb.Empty) (*milvuspb.StringResponse, error) { - return s.server.GetStatisticsChannel() + resp := &milvuspb.StringResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_SUCCESS, + }, + } + channel, err := s.server.GetStatisticsChannel() + if err != nil { + resp.Status.ErrorCode = commonpb.ErrorCode_UNEXPECTED_ERROR + resp.Status.Reason = err.Error() + return resp, nil + } + + resp.Value = channel + return resp, nil +} + +func (s *Service) GetSegmentInfoChannel(ctx context.Context, empty *commonpb.Empty) (*milvuspb.StringResponse, error) { + resp := &milvuspb.StringResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_SUCCESS, + }, + } + channel, err := s.server.GetSegmentInfoChannel() + if err != nil { + resp.Status.ErrorCode = commonpb.ErrorCode_UNEXPECTED_ERROR + resp.Status.Reason = err.Error() + return resp, nil + } + resp.Value = channel + return resp, nil } diff --git a/internal/distributed/masterservice/client.go b/internal/distributed/masterservice/client.go index 18af4df96bdc664045287e0ebc526c63a16fdd9e..ff81bee94ff9bd88435ba40eea11262ded94463c 100644 --- a/internal/distributed/masterservice/client.go +++ b/internal/distributed/masterservice/client.go @@ -5,7 +5,6 @@ import ( "time" "github.com/zilliztech/milvus-distributed/internal/errors" - cms "github.com/zilliztech/milvus-distributed/internal/masterservice" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" "github.com/zilliztech/milvus-distributed/internal/proto/masterpb" @@ -57,93 +56,63 @@ func (c *GrpcClient) Stop() error { } func (c *GrpcClient) GetComponentStates() (*internalpb2.ComponentStates, error) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout)) - defer cancel() - return c.grpcClient.GetComponentStatesRPC(ctx, &commonpb.Empty{}) + return c.grpcClient.GetComponentStatesRPC(context.Background(), &commonpb.Empty{}) } //DDL request func (c *GrpcClient) CreateCollection(in *milvuspb.CreateCollectionRequest) (*commonpb.Status, error) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout)) - defer cancel() - return c.grpcClient.CreateCollection(ctx, in) + return c.grpcClient.CreateCollection(context.Background(), in) } func (c *GrpcClient) DropCollection(in *milvuspb.DropCollectionRequest) (*commonpb.Status, error) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout)) - defer cancel() - return c.grpcClient.DropCollection(ctx, in) + return c.grpcClient.DropCollection(context.Background(), in) } func (c *GrpcClient) HasCollection(in *milvuspb.HasCollectionRequest) (*milvuspb.BoolResponse, error) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout)) - defer cancel() - return c.grpcClient.HasCollection(ctx, in) + return c.grpcClient.HasCollection(context.Background(), in) } func (c *GrpcClient) DescribeCollection(in *milvuspb.DescribeCollectionRequest) (*milvuspb.DescribeCollectionResponse, error) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout)) - defer cancel() - return c.grpcClient.DescribeCollection(ctx, in) + return c.grpcClient.DescribeCollection(context.Background(), in) } func (c *GrpcClient) ShowCollections(in *milvuspb.ShowCollectionRequest) (*milvuspb.ShowCollectionResponse, error) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout)) - defer cancel() - return c.grpcClient.ShowCollections(ctx, in) + return c.grpcClient.ShowCollections(context.Background(), in) } func (c *GrpcClient) CreatePartition(in *milvuspb.CreatePartitionRequest) (*commonpb.Status, error) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout)) - defer cancel() - return c.grpcClient.CreatePartition(ctx, in) + return c.grpcClient.CreatePartition(context.Background(), in) } func (c *GrpcClient) DropPartition(in *milvuspb.DropPartitionRequest) (*commonpb.Status, error) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout)) - defer cancel() - return c.grpcClient.DropPartition(ctx, in) + return c.grpcClient.DropPartition(context.Background(), in) } func (c *GrpcClient) HasPartition(in *milvuspb.HasPartitionRequest) (*milvuspb.BoolResponse, error) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout)) - defer cancel() - return c.grpcClient.HasPartition(ctx, in) + return c.grpcClient.HasPartition(context.Background(), in) } func (c *GrpcClient) ShowPartitions(in *milvuspb.ShowPartitionRequest) (*milvuspb.ShowPartitionResponse, error) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout)) - defer cancel() - return c.grpcClient.ShowPartitions(ctx, in) + return c.grpcClient.ShowPartitions(context.Background(), in) } //index builder service func (c *GrpcClient) CreateIndex(in *milvuspb.CreateIndexRequest) (*commonpb.Status, error) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout)) - defer cancel() - return c.grpcClient.CreateIndex(ctx, in) + return c.grpcClient.CreateIndex(context.Background(), in) } func (c *GrpcClient) DescribeIndex(in *milvuspb.DescribeIndexRequest) (*milvuspb.DescribeIndexResponse, error) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout)) - defer cancel() - return c.grpcClient.DescribeIndex(ctx, in) + return c.grpcClient.DescribeIndex(context.Background(), in) } //global timestamp allocator func (c *GrpcClient) AllocTimestamp(in *masterpb.TsoRequest) (*masterpb.TsoResponse, error) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout)) - defer cancel() - return c.grpcClient.AllocTimestamp(ctx, in) + return c.grpcClient.AllocTimestamp(context.Background(), in) } func (c *GrpcClient) AllocID(in *masterpb.IDRequest) (*masterpb.IDResponse, error) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout)) - defer cancel() - return c.grpcClient.AllocID(ctx, in) + return c.grpcClient.AllocID(context.Background(), in) } //receiver time tick from proxy service, and put it into this channel func (c *GrpcClient) GetTimeTickChannel() (string, error) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout)) - defer cancel() - rsp, err := c.grpcClient.GetTimeTickChannelRPC(ctx, &commonpb.Empty{}) + rsp, err := c.grpcClient.GetTimeTickChannelRPC(context.Background(), &commonpb.Empty{}) if err != nil { return "", err } @@ -155,9 +124,7 @@ func (c *GrpcClient) GetTimeTickChannel() (string, error) { //receive ddl from rpc and time tick from proxy service, and put them into this channel func (c *GrpcClient) GetDdChannel() (string, error) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout)) - defer cancel() - rsp, err := c.grpcClient.GetDdChannelRPC(ctx, &commonpb.Empty{}) + rsp, err := c.grpcClient.GetDdChannelRPC(context.Background(), &commonpb.Empty{}) if err != nil { return "", err } @@ -169,9 +136,7 @@ func (c *GrpcClient) GetDdChannel() (string, error) { //just define a channel, not used currently func (c *GrpcClient) GetStatisticsChannel() (string, error) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout)) - defer cancel() - rsp, err := c.grpcClient.GetStatisticsChannelRPC(ctx, &commonpb.Empty{}) + rsp, err := c.grpcClient.GetStatisticsChannelRPC(context.Background(), &commonpb.Empty{}) if err != nil { return "", err } @@ -182,13 +147,9 @@ func (c *GrpcClient) GetStatisticsChannel() (string, error) { } func (c *GrpcClient) DescribeSegment(in *milvuspb.DescribeSegmentRequest) (*milvuspb.DescribeSegmentResponse, error) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout)) - defer cancel() - return c.grpcClient.DescribeSegment(ctx, in) + return c.grpcClient.DescribeSegment(context.Background(), in) } func (c *GrpcClient) ShowSegments(in *milvuspb.ShowSegmentRequest) (*milvuspb.ShowSegmentResponse, error) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout)) - defer cancel() - return c.grpcClient.ShowSegments(ctx, in) + return c.grpcClient.ShowSegments(context.Background(), in) } diff --git a/internal/distributed/masterservice/masterservice_test.go b/internal/distributed/masterservice/masterservice_test.go index f2e9514c698696e0248e459e5db311cfcf9718d7..17ecc0acd6108427b41dbc7abc1d49c68ca06023 100644 --- a/internal/distributed/masterservice/masterservice_test.go +++ b/internal/distributed/masterservice/masterservice_test.go @@ -1,7 +1,6 @@ package masterservice import ( - "context" "fmt" "math/rand" "regexp" @@ -27,7 +26,7 @@ func TestGrpcService(t *testing.T) { //cms.Params.Address = "127.0.0.1" cms.Params.Port = (randVal % 100) + 10000 - svr, err := NewGrpcServer(context.Background()) + svr, err := NewGrpcServer() assert.Nil(t, err) // cms.Params.NodeID = 0 diff --git a/internal/distributed/masterservice/server.go b/internal/distributed/masterservice/server.go index e1731e30255f62891d4e593bdd18a89384c09ca0..a531f860e39464689843a8d5bb2d7165e556d9e5 100644 --- a/internal/distributed/masterservice/server.go +++ b/internal/distributed/masterservice/server.go @@ -6,7 +6,6 @@ import ( "net" "sync" - "github.com/zilliztech/milvus-distributed/internal/errors" cms "github.com/zilliztech/milvus-distributed/internal/masterservice" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" "github.com/zilliztech/milvus-distributed/internal/proto/datapb" @@ -27,10 +26,10 @@ type GrpcServer struct { cancel context.CancelFunc } -func NewGrpcServer(ctx context.Context) (*GrpcServer, error) { +func NewGrpcServer() (*GrpcServer, error) { s := &GrpcServer{} var err error - s.ctx, s.cancel = context.WithCancel(ctx) + s.ctx, s.cancel = context.WithCancel(context.Background()) if s.core, err = cms.NewCore(s.ctx); err != nil { return nil, err } @@ -74,30 +73,6 @@ func (s *GrpcServer) Stop() error { return err } -func (s *GrpcServer) SetProxyService(p cms.ProxyServiceInterface) error { - c, ok := s.core.(*cms.Core) - if !ok { - return errors.Errorf("set proxy service failed") - } - return c.SetProxyService(p) -} - -func (s *GrpcServer) SetDataService(p cms.DataServiceInterface) error { - c, ok := s.core.(*cms.Core) - if !ok { - return errors.Errorf("set data service failed") - } - return c.SetDataService(p) -} - -func (s *GrpcServer) SetIndexService(p cms.IndexServiceInterface) error { - c, ok := s.core.(*cms.Core) - if !ok { - return errors.Errorf("set index service failed") - } - return c.SetIndexService(p) -} - func (s *GrpcServer) GetComponentStatesRPC(ctx context.Context, empty *commonpb.Empty) (*internalpb2.ComponentStates, error) { return s.core.GetComponentStates() } diff --git a/internal/masterservice/master_service.go b/internal/masterservice/master_service.go index cc4338d4ab5d0ed3457f15fc96ae881bf39b436a..5cf7eb90dffe5f83664924cc2bff04064cdbe170 100644 --- a/internal/masterservice/master_service.go +++ b/internal/masterservice/master_service.go @@ -2,7 +2,6 @@ package masterservice import ( "context" - "fmt" "log" "math/rand" "strconv" @@ -736,13 +735,6 @@ func (c *Core) GetStatisticsChannel() (string, error) { } func (c *Core) CreateCollection(in *milvuspb.CreateCollectionRequest) (*commonpb.Status, error) { - code := c.stateCode.Load().(internalpb2.StateCode) - if code != internalpb2.StateCode_HEALTHY { - return &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, - Reason: fmt.Sprintf("state code = %s", internalpb2.StateCode_name[int32(code)]), - }, nil - } t := &CreateCollectionReqTask{ baseReqTask: baseReqTask{ cv: make(chan error), @@ -766,13 +758,6 @@ func (c *Core) CreateCollection(in *milvuspb.CreateCollectionRequest) (*commonpb } func (c *Core) DropCollection(in *milvuspb.DropCollectionRequest) (*commonpb.Status, error) { - code := c.stateCode.Load().(internalpb2.StateCode) - if code != internalpb2.StateCode_HEALTHY { - return &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, - Reason: fmt.Sprintf("state code = %s", internalpb2.StateCode_name[int32(code)]), - }, nil - } t := &DropCollectionReqTask{ baseReqTask: baseReqTask{ cv: make(chan error), @@ -795,16 +780,6 @@ func (c *Core) DropCollection(in *milvuspb.DropCollectionRequest) (*commonpb.Sta } func (c *Core) HasCollection(in *milvuspb.HasCollectionRequest) (*milvuspb.BoolResponse, error) { - code := c.stateCode.Load().(internalpb2.StateCode) - if code != internalpb2.StateCode_HEALTHY { - return &milvuspb.BoolResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, - Reason: fmt.Sprintf("state code = %s", internalpb2.StateCode_name[int32(code)]), - }, - Value: false, - }, nil - } t := &HasCollectionReqTask{ baseReqTask: baseReqTask{ cv: make(chan error), @@ -834,17 +809,6 @@ func (c *Core) HasCollection(in *milvuspb.HasCollectionRequest) (*milvuspb.BoolR } func (c *Core) DescribeCollection(in *milvuspb.DescribeCollectionRequest) (*milvuspb.DescribeCollectionResponse, error) { - code := c.stateCode.Load().(internalpb2.StateCode) - if code != internalpb2.StateCode_HEALTHY { - return &milvuspb.DescribeCollectionResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, - Reason: fmt.Sprintf("state code = %s", internalpb2.StateCode_name[int32(code)]), - }, - Schema: nil, - CollectionID: 0, - }, nil - } t := &DescribeCollectionReqTask{ baseReqTask: baseReqTask{ cv: make(chan error), @@ -872,16 +836,6 @@ func (c *Core) DescribeCollection(in *milvuspb.DescribeCollectionRequest) (*milv } func (c *Core) ShowCollections(in *milvuspb.ShowCollectionRequest) (*milvuspb.ShowCollectionResponse, error) { - code := c.stateCode.Load().(internalpb2.StateCode) - if code != internalpb2.StateCode_HEALTHY { - return &milvuspb.ShowCollectionResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, - Reason: fmt.Sprintf("state code = %s", internalpb2.StateCode_name[int32(code)]), - }, - CollectionNames: nil, - }, nil - } t := &ShowCollectionReqTask{ baseReqTask: baseReqTask{ cv: make(chan error), @@ -911,13 +865,6 @@ func (c *Core) ShowCollections(in *milvuspb.ShowCollectionRequest) (*milvuspb.Sh } func (c *Core) CreatePartition(in *milvuspb.CreatePartitionRequest) (*commonpb.Status, error) { - code := c.stateCode.Load().(internalpb2.StateCode) - if code != internalpb2.StateCode_HEALTHY { - return &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, - Reason: fmt.Sprintf("state code = %s", internalpb2.StateCode_name[int32(code)]), - }, nil - } t := &CreatePartitionReqTask{ baseReqTask: baseReqTask{ cv: make(chan error), @@ -940,13 +887,6 @@ func (c *Core) CreatePartition(in *milvuspb.CreatePartitionRequest) (*commonpb.S } func (c *Core) DropPartition(in *milvuspb.DropPartitionRequest) (*commonpb.Status, error) { - code := c.stateCode.Load().(internalpb2.StateCode) - if code != internalpb2.StateCode_HEALTHY { - return &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, - Reason: fmt.Sprintf("state code = %s", internalpb2.StateCode_name[int32(code)]), - }, nil - } t := &DropPartitionReqTask{ baseReqTask: baseReqTask{ cv: make(chan error), @@ -969,16 +909,6 @@ func (c *Core) DropPartition(in *milvuspb.DropPartitionRequest) (*commonpb.Statu } func (c *Core) HasPartition(in *milvuspb.HasPartitionRequest) (*milvuspb.BoolResponse, error) { - code := c.stateCode.Load().(internalpb2.StateCode) - if code != internalpb2.StateCode_HEALTHY { - return &milvuspb.BoolResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, - Reason: fmt.Sprintf("state code = %s", internalpb2.StateCode_name[int32(code)]), - }, - Value: false, - }, nil - } t := &HasPartitionReqTask{ baseReqTask: baseReqTask{ cv: make(chan error), @@ -1008,17 +938,6 @@ func (c *Core) HasPartition(in *milvuspb.HasPartitionRequest) (*milvuspb.BoolRes } func (c *Core) ShowPartitions(in *milvuspb.ShowPartitionRequest) (*milvuspb.ShowPartitionResponse, error) { - code := c.stateCode.Load().(internalpb2.StateCode) - if code != internalpb2.StateCode_HEALTHY { - return &milvuspb.ShowPartitionResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, - Reason: fmt.Sprintf("state code = %s", internalpb2.StateCode_name[int32(code)]), - }, - PartitionNames: nil, - PartitionIDs: nil, - }, nil - } t := &ShowPartitionReqTask{ baseReqTask: baseReqTask{ cv: make(chan error), @@ -1049,13 +968,6 @@ func (c *Core) ShowPartitions(in *milvuspb.ShowPartitionRequest) (*milvuspb.Show } func (c *Core) CreateIndex(in *milvuspb.CreateIndexRequest) (*commonpb.Status, error) { - code := c.stateCode.Load().(internalpb2.StateCode) - if code != internalpb2.StateCode_HEALTHY { - return &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, - Reason: fmt.Sprintf("state code = %s", internalpb2.StateCode_name[int32(code)]), - }, nil - } t := &CreateIndexReqTask{ baseReqTask: baseReqTask{ cv: make(chan error), @@ -1078,16 +990,6 @@ func (c *Core) CreateIndex(in *milvuspb.CreateIndexRequest) (*commonpb.Status, e } func (c *Core) DescribeIndex(in *milvuspb.DescribeIndexRequest) (*milvuspb.DescribeIndexResponse, error) { - code := c.stateCode.Load().(internalpb2.StateCode) - if code != internalpb2.StateCode_HEALTHY { - return &milvuspb.DescribeIndexResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, - Reason: fmt.Sprintf("state code = %s", internalpb2.StateCode_name[int32(code)]), - }, - IndexDescriptions: nil, - }, nil - } t := &DescribeIndexReqTask{ baseReqTask: baseReqTask{ cv: make(chan error), @@ -1118,16 +1020,6 @@ func (c *Core) DescribeIndex(in *milvuspb.DescribeIndexRequest) (*milvuspb.Descr } func (c *Core) DescribeSegment(in *milvuspb.DescribeSegmentRequest) (*milvuspb.DescribeSegmentResponse, error) { - code := c.stateCode.Load().(internalpb2.StateCode) - if code != internalpb2.StateCode_HEALTHY { - return &milvuspb.DescribeSegmentResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, - Reason: fmt.Sprintf("state code = %s", internalpb2.StateCode_name[int32(code)]), - }, - IndexID: 0, - }, nil - } t := &DescribeSegmentReqTask{ baseReqTask: baseReqTask{ cv: make(chan error), @@ -1158,16 +1050,6 @@ func (c *Core) DescribeSegment(in *milvuspb.DescribeSegmentRequest) (*milvuspb.D } func (c *Core) ShowSegments(in *milvuspb.ShowSegmentRequest) (*milvuspb.ShowSegmentResponse, error) { - code := c.stateCode.Load().(internalpb2.StateCode) - if code != internalpb2.StateCode_HEALTHY { - return &milvuspb.ShowSegmentResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, - Reason: fmt.Sprintf("state code = %s", internalpb2.StateCode_name[int32(code)]), - }, - SegmentIDs: nil, - }, nil - } t := &ShowSegmentReqTask{ baseReqTask: baseReqTask{ cv: make(chan error), diff --git a/internal/masterservice/param_table.go b/internal/masterservice/param_table.go index b4f8c45f4b0d69cfef4cf0065eabdf5a8b302e39..61dde4ba0958e846a48c56d0ed6df0c0d9a193e2 100644 --- a/internal/masterservice/param_table.go +++ b/internal/masterservice/param_table.go @@ -27,8 +27,6 @@ type ParamTable struct { MaxPartitionNum int64 DefaultPartitionName string DefaultIndexName string - - Timeout int } func (p *ParamTable) Init() { @@ -56,8 +54,6 @@ func (p *ParamTable) Init() { p.initMaxPartitionNum() p.initDefaultPartitionName() p.initDefaultIndexName() - - p.initTimeout() } func (p *ParamTable) initAddress() { @@ -167,7 +163,3 @@ func (p *ParamTable) initDefaultIndexName() { } p.DefaultIndexName = name } - -func (p *ParamTable) initTimeout() { - p.Timeout = p.ParseInt("master.timeout") -} diff --git a/internal/masterservice/param_table_test.go b/internal/masterservice/param_table_test.go index af09ab497770a5fe4fc99adbd01f6661a440dc1f..2c2b071e7c35202240bedb45202a66fdf36b3590 100644 --- a/internal/masterservice/param_table_test.go +++ b/internal/masterservice/param_table_test.go @@ -50,7 +50,4 @@ func TestParamTable(t *testing.T) { assert.NotEqual(t, Params.DefaultIndexName, "") t.Logf("default index name = %s", Params.DefaultIndexName) - - assert.NotZero(t, Params.Timeout) - t.Logf("master timeout = %d", Params.Timeout) } diff --git a/internal/proto/data_service.proto b/internal/proto/data_service.proto index 3df32d9ea1e731ad331efad376b16598907bcb2e..49242cd30ca3b0569a6c33e18a11a7de8fd65326 100644 --- a/internal/proto/data_service.proto +++ b/internal/proto/data_service.proto @@ -137,6 +137,8 @@ message SegmentInfo { int64 num_rows=8; int64 mem_size=9; SegmentState state = 10; + repeated internal.MsgPosition start_position = 11; + repeated internal.MsgPosition end_position = 12; } message SegmentMsg{ @@ -201,6 +203,8 @@ service DataService { rpc GetComponentStates(common.Empty) returns (internal.ComponentStates) {} rpc GetTimeTickChannel(common.Empty) returns(milvus.StringResponse) {} rpc GetStatisticsChannel(common.Empty) returns(milvus.StringResponse){} + + rpc GetSegmentInfoChannel(common.Empty) returns (milvus.StringResponse){} } service DataNode { diff --git a/internal/proto/datapb/data_service.pb.go b/internal/proto/datapb/data_service.pb.go index 5669f13f4aa66a83c8fe5b06c512055c98e968a4..e8b6ab1a025aed1354799aab54de68117ffeb0e0 100644 --- a/internal/proto/datapb/data_service.pb.go +++ b/internal/proto/datapb/data_service.pb.go @@ -998,19 +998,21 @@ func (m *FlushSegRequest) GetSegmentIDs() []int64 { } type SegmentInfo struct { - SegmentID int64 `protobuf:"varint,1,opt,name=segmentID,proto3" json:"segmentID,omitempty"` - CollectionID int64 `protobuf:"varint,2,opt,name=collectionID,proto3" json:"collectionID,omitempty"` - PartitionID int64 `protobuf:"varint,3,opt,name=partitionID,proto3" json:"partitionID,omitempty"` - InsertChannels []string `protobuf:"bytes,4,rep,name=insert_channels,json=insertChannels,proto3" json:"insert_channels,omitempty"` - OpenTime uint64 `protobuf:"varint,5,opt,name=open_time,json=openTime,proto3" json:"open_time,omitempty"` - SealedTime uint64 `protobuf:"varint,6,opt,name=sealed_time,json=sealedTime,proto3" json:"sealed_time,omitempty"` - FlushedTime uint64 `protobuf:"varint,7,opt,name=flushed_time,json=flushedTime,proto3" json:"flushed_time,omitempty"` - NumRows int64 `protobuf:"varint,8,opt,name=num_rows,json=numRows,proto3" json:"num_rows,omitempty"` - MemSize int64 `protobuf:"varint,9,opt,name=mem_size,json=memSize,proto3" json:"mem_size,omitempty"` - State SegmentState `protobuf:"varint,10,opt,name=state,proto3,enum=milvus.proto.data.SegmentState" json:"state,omitempty"` - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_unrecognized []byte `json:"-"` - XXX_sizecache int32 `json:"-"` + SegmentID int64 `protobuf:"varint,1,opt,name=segmentID,proto3" json:"segmentID,omitempty"` + CollectionID int64 `protobuf:"varint,2,opt,name=collectionID,proto3" json:"collectionID,omitempty"` + PartitionID int64 `protobuf:"varint,3,opt,name=partitionID,proto3" json:"partitionID,omitempty"` + InsertChannels []string `protobuf:"bytes,4,rep,name=insert_channels,json=insertChannels,proto3" json:"insert_channels,omitempty"` + OpenTime uint64 `protobuf:"varint,5,opt,name=open_time,json=openTime,proto3" json:"open_time,omitempty"` + SealedTime uint64 `protobuf:"varint,6,opt,name=sealed_time,json=sealedTime,proto3" json:"sealed_time,omitempty"` + FlushedTime uint64 `protobuf:"varint,7,opt,name=flushed_time,json=flushedTime,proto3" json:"flushed_time,omitempty"` + NumRows int64 `protobuf:"varint,8,opt,name=num_rows,json=numRows,proto3" json:"num_rows,omitempty"` + MemSize int64 `protobuf:"varint,9,opt,name=mem_size,json=memSize,proto3" json:"mem_size,omitempty"` + State SegmentState `protobuf:"varint,10,opt,name=state,proto3,enum=milvus.proto.data.SegmentState" json:"state,omitempty"` + StartPosition []*internalpb2.MsgPosition `protobuf:"bytes,11,rep,name=start_position,json=startPosition,proto3" json:"start_position,omitempty"` + EndPosition []*internalpb2.MsgPosition `protobuf:"bytes,12,rep,name=end_position,json=endPosition,proto3" json:"end_position,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` } func (m *SegmentInfo) Reset() { *m = SegmentInfo{} } @@ -1108,6 +1110,20 @@ func (m *SegmentInfo) GetState() SegmentState { return SegmentState_SegmentNone } +func (m *SegmentInfo) GetStartPosition() []*internalpb2.MsgPosition { + if m != nil { + return m.StartPosition + } + return nil +} + +func (m *SegmentInfo) GetEndPosition() []*internalpb2.MsgPosition { + if m != nil { + return m.EndPosition + } + return nil +} + type SegmentMsg struct { Base *commonpb.MsgBase `protobuf:"bytes,1,opt,name=base,proto3" json:"base,omitempty"` Segment *SegmentInfo `protobuf:"bytes,2,opt,name=segment,proto3" json:"segment,omitempty"` @@ -1548,104 +1564,106 @@ func init() { func init() { proto.RegisterFile("data_service.proto", fileDescriptor_3385cd32ad6cfe64) } var fileDescriptor_3385cd32ad6cfe64 = []byte{ - // 1541 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xc4, 0x58, 0xdf, 0x6e, 0x1b, 0x45, - 0x17, 0xcf, 0x7a, 0xfd, 0xf7, 0xd8, 0xb1, 0xdd, 0x49, 0x9a, 0xa4, 0x6e, 0xbf, 0x36, 0xd9, 0x4f, - 0x6d, 0xd2, 0xea, 0xfb, 0x12, 0x94, 0x0a, 0x0a, 0x37, 0x88, 0xa6, 0x6e, 0x23, 0xab, 0x4d, 0x14, - 0x8d, 0x0b, 0x15, 0xbd, 0xb1, 0xd6, 0xf6, 0xc4, 0x19, 0xf0, 0xee, 0x9a, 0x9d, 0x71, 0x93, 0xe6, - 0x06, 0xae, 0x40, 0x42, 0x48, 0x70, 0xc5, 0x05, 0x5c, 0xf3, 0x02, 0xf0, 0x00, 0xbc, 0x02, 0x8f, - 0xc1, 0x53, 0x20, 0xb4, 0x33, 0xb3, 0xff, 0xec, 0x75, 0x6c, 0xdc, 0x96, 0xde, 0x79, 0xce, 0xfe, - 0xe6, 0x9c, 0x33, 0xe7, 0x9c, 0xf9, 0x9d, 0x33, 0x06, 0xd4, 0x35, 0xb9, 0xd9, 0x62, 0xc4, 0x7d, - 0x41, 0x3b, 0x64, 0x7b, 0xe0, 0x3a, 0xdc, 0x41, 0x97, 0x2c, 0xda, 0x7f, 0x31, 0x64, 0x72, 0xb5, - 0xed, 0x01, 0x6a, 0xa5, 0x8e, 0x63, 0x59, 0x8e, 0x2d, 0x45, 0xb5, 0x32, 0xb5, 0x39, 0x71, 0x6d, - 0xb3, 0xaf, 0xd6, 0xa5, 0xe8, 0x06, 0xe3, 0x4b, 0x58, 0xc2, 0xa4, 0x47, 0x19, 0x27, 0xee, 0xa1, - 0xd3, 0x25, 0x98, 0x7c, 0x31, 0x24, 0x8c, 0xa3, 0x77, 0x20, 0xdd, 0x36, 0x19, 0x59, 0xd3, 0xd6, - 0xb5, 0xad, 0xe2, 0xee, 0xb5, 0xed, 0x98, 0x11, 0xa5, 0xfe, 0x80, 0xf5, 0xf6, 0x4c, 0x46, 0xb0, - 0x40, 0xa2, 0xf7, 0x20, 0x67, 0x76, 0xbb, 0x2e, 0x61, 0x6c, 0x2d, 0x75, 0xc1, 0xa6, 0xfb, 0x12, - 0x83, 0x7d, 0xb0, 0xf1, 0xbd, 0x06, 0xcb, 0x71, 0x0f, 0xd8, 0xc0, 0xb1, 0x19, 0x41, 0x7b, 0x50, - 0xa4, 0x36, 0xe5, 0xad, 0x81, 0xe9, 0x9a, 0x16, 0x53, 0x9e, 0x6c, 0xc4, 0x95, 0x06, 0x47, 0x6b, - 0xd8, 0x94, 0x1f, 0x09, 0x20, 0x06, 0x1a, 0xfc, 0x46, 0x77, 0x21, 0xcb, 0xb8, 0xc9, 0x87, 0xbe, - 0x4f, 0x57, 0x13, 0x7d, 0x6a, 0x0a, 0x08, 0x56, 0x50, 0xe3, 0x0f, 0x0d, 0x4a, 0x4d, 0xd2, 0x6b, - 0xd4, 0xfd, 0x60, 0x2c, 0x43, 0xa6, 0xe3, 0x0c, 0x6d, 0x2e, 0x7c, 0x58, 0xc4, 0x72, 0x81, 0xd6, - 0xa1, 0xd8, 0x39, 0x31, 0x6d, 0x9b, 0xf4, 0x0f, 0x4d, 0x8b, 0x08, 0x03, 0x05, 0x1c, 0x15, 0x21, - 0x03, 0x4a, 0x1d, 0xa7, 0xdf, 0x27, 0x1d, 0x4e, 0x1d, 0xbb, 0x51, 0x5f, 0xd3, 0xd7, 0xb5, 0x2d, - 0x1d, 0xc7, 0x64, 0x9e, 0x96, 0x81, 0xe9, 0x72, 0xaa, 0x20, 0x69, 0x01, 0x89, 0x8a, 0xd0, 0x55, - 0x28, 0x78, 0x3b, 0x5a, 0xb6, 0x67, 0x25, 0x23, 0xac, 0xe4, 0x3d, 0x81, 0x30, 0x71, 0x13, 0xca, - 0x01, 0x56, 0x22, 0xb2, 0x02, 0xb1, 0x18, 0x48, 0x3d, 0x98, 0xf1, 0x83, 0x06, 0xe8, 0x3e, 0x63, - 0xb4, 0x67, 0xc7, 0x0e, 0xb6, 0x02, 0x59, 0xdb, 0xe9, 0x92, 0x46, 0x5d, 0x9c, 0x4c, 0xc7, 0x6a, - 0xe5, 0x99, 0x1c, 0x10, 0xe2, 0xb6, 0x5c, 0xa7, 0xef, 0x1f, 0x2c, 0xef, 0x09, 0xb0, 0xd3, 0x27, - 0xe8, 0x21, 0x2c, 0xb2, 0x88, 0x12, 0xb6, 0xa6, 0xaf, 0xeb, 0x5b, 0xc5, 0xdd, 0x1b, 0xdb, 0x63, - 0x85, 0xb8, 0x1d, 0x35, 0x86, 0xe3, 0xbb, 0x8c, 0xdf, 0x53, 0x50, 0x11, 0xdf, 0xa5, 0x5f, 0x16, - 0xb1, 0x45, 0xa0, 0x05, 0x48, 0xb9, 0x23, 0x17, 0x33, 0x04, 0x3a, 0x48, 0x90, 0x1e, 0x4d, 0xd0, - 0x68, 0xf8, 0xd3, 0xd3, 0xc3, 0x9f, 0x19, 0x0f, 0xff, 0x0d, 0x28, 0x92, 0xb3, 0x01, 0x75, 0x49, - 0x8b, 0x53, 0x15, 0xde, 0x34, 0x06, 0x29, 0x7a, 0x4a, 0x2d, 0x12, 0xa9, 0xb1, 0xdc, 0xcc, 0x35, - 0x16, 0x4f, 0x6a, 0x7e, 0x6a, 0x52, 0x0b, 0x49, 0x49, 0xfd, 0x49, 0x83, 0xa5, 0x58, 0x52, 0xd5, - 0xc5, 0x39, 0x84, 0x2a, 0x8b, 0x07, 0xd6, 0xbb, 0x3d, 0x5e, 0x8e, 0x8c, 0x49, 0x39, 0x0a, 0xa1, - 0x78, 0x6c, 0xef, 0x7c, 0x97, 0xe8, 0x0c, 0x4a, 0x8f, 0xfa, 0x43, 0x76, 0x32, 0x3f, 0xa1, 0x20, - 0x48, 0x77, 0xdb, 0x8d, 0xba, 0x30, 0xaa, 0x63, 0xf1, 0x7b, 0x96, 0x94, 0x1a, 0xdf, 0x69, 0x80, - 0x9a, 0x27, 0xce, 0x69, 0x93, 0xf4, 0xc4, 0x81, 0xe6, 0x76, 0x60, 0xd4, 0x58, 0x6a, 0x7a, 0xfd, - 0xe8, 0x63, 0xf5, 0x63, 0x7c, 0x06, 0x4b, 0x31, 0x6f, 0x54, 0x92, 0xae, 0x03, 0x30, 0x29, 0x6a, - 0xd4, 0x65, 0x7a, 0x74, 0x1c, 0x91, 0xcc, 0x17, 0xf4, 0x63, 0x58, 0x56, 0x76, 0xbc, 0x0f, 0x84, - 0xcd, 0x7f, 0xf6, 0x6b, 0x50, 0x08, 0x9c, 0x51, 0x07, 0x0f, 0x05, 0xc6, 0x5f, 0x29, 0xb8, 0x3c, - 0x62, 0x48, 0x1d, 0xeb, 0x5d, 0xc8, 0x78, 0xbe, 0x48, 0x53, 0xe5, 0x49, 0xa4, 0x10, 0x6c, 0xc4, - 0x12, 0xed, 0x5d, 0xb2, 0x8e, 0x4b, 0x4c, 0xae, 0x2e, 0x59, 0x4a, 0x5e, 0x32, 0x29, 0x12, 0x97, - 0xec, 0x06, 0x14, 0x19, 0x31, 0xfb, 0xa4, 0x2b, 0x01, 0xba, 0x04, 0x48, 0x91, 0x00, 0x6c, 0x40, - 0xe9, 0xd8, 0xab, 0x37, 0x1f, 0x91, 0x16, 0x88, 0xa2, 0x92, 0x09, 0xc8, 0x63, 0xa8, 0x30, 0x6e, - 0xba, 0xbc, 0x35, 0x70, 0x98, 0xc8, 0x0e, 0x5b, 0xcb, 0x24, 0x5d, 0x8b, 0xa0, 0xa9, 0x1c, 0xb0, - 0xde, 0x91, 0x82, 0xe2, 0xb2, 0xd8, 0xea, 0x2f, 0x19, 0xda, 0x87, 0x45, 0x62, 0x77, 0x23, 0xaa, - 0xb2, 0x33, 0xab, 0x2a, 0x11, 0xbb, 0x1b, 0x2a, 0x9a, 0x87, 0x3e, 0x0c, 0x0a, 0xab, 0x0d, 0x9b, - 0x11, 0x97, 0xef, 0x51, 0xbb, 0xef, 0xf4, 0x8e, 0x4c, 0x7e, 0xf2, 0xa6, 0x72, 0xfd, 0x8b, 0x06, - 0x57, 0x46, 0x6d, 0x85, 0xf9, 0xae, 0x41, 0xfe, 0x98, 0x92, 0x7e, 0x37, 0x2c, 0xe2, 0x60, 0x8d, - 0xee, 0x41, 0x66, 0xe0, 0x81, 0xd7, 0x52, 0x22, 0x34, 0x93, 0x5a, 0x77, 0x93, 0xbb, 0xd4, 0xee, - 0x3d, 0xa1, 0x8c, 0x63, 0x89, 0x8f, 0x84, 0x44, 0x9f, 0x3d, 0x24, 0x5f, 0x69, 0xb0, 0x2c, 0xfd, - 0x7c, 0x20, 0x3b, 0xc3, 0x9b, 0x65, 0x9e, 0x84, 0x5e, 0x6e, 0x58, 0x70, 0xf9, 0x99, 0xc9, 0x3b, - 0x27, 0x75, 0xeb, 0x95, 0x5d, 0xf0, 0xcc, 0x85, 0x0d, 0x4e, 0x86, 0xb0, 0x80, 0x63, 0x32, 0xe3, - 0x67, 0x0d, 0x2a, 0x82, 0x63, 0x9b, 0xa4, 0xf7, 0xaf, 0x1f, 0x76, 0x84, 0xc0, 0xd2, 0xa3, 0x04, - 0x66, 0xfc, 0x99, 0x82, 0xa2, 0xba, 0xea, 0x0d, 0xfb, 0xd8, 0x89, 0x57, 0x99, 0x36, 0x52, 0x65, - 0xaf, 0x87, 0x6b, 0xd1, 0x26, 0x54, 0xa8, 0x28, 0x81, 0x96, 0x0a, 0x94, 0x74, 0xac, 0x80, 0xcb, - 0x34, 0x5a, 0x19, 0xa2, 0xfd, 0x3a, 0x03, 0x62, 0x4b, 0xaa, 0xc8, 0x08, 0xaa, 0xc8, 0x7b, 0x82, - 0x24, 0xae, 0xc9, 0x4e, 0xe5, 0x9a, 0xdc, 0x38, 0xd7, 0x5c, 0x81, 0xbc, 0x3d, 0xb4, 0x5a, 0xae, - 0x73, 0xca, 0x44, 0x7b, 0xd7, 0x71, 0xce, 0x1e, 0x5a, 0xd8, 0x39, 0x65, 0xde, 0x27, 0x8b, 0x58, - 0x2d, 0x46, 0xcf, 0x65, 0x5f, 0xd7, 0x71, 0xce, 0x22, 0x56, 0x93, 0x9e, 0x47, 0xd8, 0x13, 0xfe, - 0x09, 0x7b, 0x1a, 0x67, 0x00, 0x4a, 0x7c, 0xc0, 0x7a, 0x73, 0x94, 0xc0, 0xfb, 0x90, 0x53, 0x99, - 0x50, 0xcd, 0xe6, 0xfa, 0x64, 0xc3, 0x5e, 0x2e, 0xb1, 0x0f, 0xf7, 0x7a, 0xed, 0xca, 0x83, 0x20, - 0x47, 0x9e, 0x53, 0xaf, 0xd0, 0x73, 0x56, 0x21, 0xd7, 0x6d, 0xcb, 0x79, 0x47, 0xce, 0x78, 0xd9, - 0x6e, 0x5b, 0xcc, 0x43, 0x9b, 0x50, 0x09, 0x0b, 0x41, 0x02, 0x74, 0x01, 0x28, 0x87, 0x62, 0x31, - 0x11, 0x7d, 0xa3, 0xc1, 0xea, 0x98, 0x3b, 0x8a, 0xa9, 0xee, 0xc9, 0xd8, 0xfa, 0xa3, 0xd0, 0x46, - 0xa2, 0x43, 0x8f, 0xc9, 0xcb, 0x4f, 0xcc, 0xfe, 0x90, 0x1c, 0x99, 0xd4, 0x95, 0xd1, 0x9d, 0xb3, - 0x13, 0xff, 0xaa, 0xc1, 0xe5, 0x23, 0xbf, 0x32, 0xdf, 0x76, 0x5c, 0x12, 0x06, 0xca, 0x74, 0xd2, - 0x40, 0xf9, 0xb5, 0x06, 0x2b, 0xa3, 0x4e, 0xbf, 0x95, 0xe8, 0x1d, 0x40, 0xf9, 0x91, 0xd7, 0x45, - 0x04, 0xbb, 0x1d, 0x10, 0x6e, 0xa2, 0x35, 0xc8, 0xa9, 0xbe, 0xa2, 0xb8, 0xc3, 0x5f, 0x7a, 0x97, - 0xb1, 0x2d, 0x1a, 0x53, 0x2b, 0x6c, 0x36, 0x05, 0x5c, 0x6c, 0x87, 0xcd, 0xca, 0xf8, 0x56, 0x83, - 0xaa, 0x2a, 0xdf, 0x50, 0xe3, 0xc5, 0x7c, 0xf4, 0x1f, 0x00, 0xca, 0x5a, 0xea, 0x46, 0x0b, 0xd7, - 0xf3, 0xb8, 0x40, 0xd9, 0x23, 0x29, 0x40, 0x1f, 0x40, 0x56, 0xd8, 0xf7, 0x27, 0x88, 0x8d, 0x84, - 0x0b, 0x13, 0x3f, 0x01, 0x56, 0x1b, 0x8c, 0x8f, 0xa1, 0x54, 0xaf, 0x3f, 0x09, 0xfd, 0x18, 0x65, - 0x3e, 0x2d, 0x81, 0xf9, 0xa6, 0x9f, 0xf1, 0x8e, 0x23, 0xde, 0xac, 0x01, 0x35, 0xa0, 0x4a, 0xc0, - 0xbe, 0x87, 0x8e, 0x4d, 0xaa, 0x0b, 0x68, 0x49, 0x3c, 0xb7, 0xa4, 0x80, 0x3f, 0x3c, 0xa3, 0x8c, - 0x57, 0x35, 0x84, 0xa0, 0xac, 0x84, 0xfb, 0xae, 0x73, 0x4a, 0xed, 0x5e, 0x35, 0x85, 0x2e, 0xc1, - 0xa2, 0xaf, 0x49, 0x50, 0x5e, 0x55, 0x8f, 0xc0, 0x54, 0x00, 0xaa, 0xe9, 0xdd, 0xdf, 0x0a, 0x50, - 0xac, 0x9b, 0xdc, 0x6c, 0xca, 0x7f, 0x23, 0x90, 0x09, 0xa5, 0xe8, 0x33, 0x1e, 0xdd, 0x4a, 0x08, - 0x49, 0xc2, 0x3f, 0x0d, 0xb5, 0xcd, 0xa9, 0x38, 0x59, 0x82, 0xc6, 0x02, 0xda, 0x87, 0x8c, 0xb0, - 0x8f, 0x92, 0x88, 0x31, 0xfa, 0xda, 0xa8, 0x5d, 0x54, 0x65, 0xc6, 0x02, 0x6a, 0x43, 0x25, 0x78, - 0x38, 0xa9, 0x84, 0xdf, 0x4c, 0x50, 0x39, 0xfe, 0x62, 0xae, 0xdd, 0x9a, 0x06, 0x0b, 0x9c, 0x6d, - 0x41, 0x29, 0x32, 0xf7, 0xb3, 0x44, 0x03, 0xe3, 0xcf, 0x94, 0x44, 0x03, 0x09, 0xef, 0x07, 0x63, - 0x01, 0xf5, 0xa0, 0xba, 0x4f, 0x78, 0x6c, 0x0c, 0x47, 0x9b, 0x53, 0x3a, 0x86, 0xcf, 0x42, 0xb5, - 0xad, 0xe9, 0xc0, 0xc0, 0x90, 0x0b, 0xcb, 0xfb, 0x84, 0x8f, 0xcd, 0x80, 0xe8, 0x4e, 0x82, 0x8e, - 0x09, 0x53, 0x69, 0xed, 0x7f, 0x33, 0x60, 0xa3, 0x36, 0x4d, 0xb8, 0x14, 0xd8, 0x0c, 0xba, 0xf6, - 0xe6, 0x44, 0x25, 0xf1, 0x79, 0xab, 0x36, 0x7d, 0xd4, 0x14, 0xc7, 0x5a, 0xdd, 0x27, 0x3c, 0xde, - 0x2e, 0x28, 0xe3, 0xb4, 0xc3, 0xd0, 0xed, 0x04, 0x43, 0xc9, 0x6d, 0xae, 0x76, 0x67, 0x16, 0x68, - 0x70, 0x2c, 0x07, 0x56, 0xf6, 0x09, 0x8f, 0x71, 0xac, 0x32, 0x99, 0x94, 0x90, 0xc4, 0x06, 0x52, - 0xbb, 0x3d, 0x03, 0x32, 0x30, 0xf8, 0x1c, 0x90, 0x38, 0xa4, 0x35, 0x70, 0xec, 0xb0, 0x4c, 0x6a, - 0x89, 0xd7, 0xe3, 0xa1, 0x35, 0xe0, 0x2f, 0x47, 0x0b, 0x30, 0x88, 0xdd, 0x88, 0x0e, 0x63, 0x01, - 0x3d, 0x13, 0xba, 0xbd, 0x71, 0xe7, 0x29, 0xed, 0x7c, 0xae, 0x52, 0x70, 0xa1, 0xee, 0xff, 0xc6, - 0xbf, 0xa9, 0x85, 0xcc, 0x4a, 0xc4, 0xe9, 0x4f, 0x45, 0xc1, 0x85, 0xc1, 0x79, 0x7d, 0xaa, 0x77, - 0x7f, 0x4c, 0x41, 0xde, 0x63, 0x2d, 0x41, 0x51, 0x6f, 0x32, 0x38, 0xcf, 0xa1, 0x12, 0x7f, 0x0b, - 0x24, 0xa7, 0x38, 0xf1, 0xbd, 0x30, 0x8d, 0xbe, 0x30, 0x2c, 0xfa, 0x73, 0xbf, 0xe4, 0x16, 0x63, - 0x12, 0x1f, 0x86, 0x2f, 0x83, 0x29, 0x3a, 0xf7, 0x3e, 0x7a, 0xfe, 0x61, 0x8f, 0xf2, 0x93, 0x61, - 0xdb, 0xfb, 0xb2, 0x73, 0x4e, 0xfb, 0x7d, 0x7a, 0xce, 0x49, 0xe7, 0x64, 0x47, 0xee, 0xfa, 0x7f, - 0x97, 0x32, 0xee, 0xd2, 0xf6, 0x90, 0x93, 0xee, 0x8e, 0x7f, 0xec, 0x1d, 0xa1, 0x6a, 0xc7, 0x33, - 0x37, 0x68, 0xb7, 0xb3, 0x62, 0x75, 0xf7, 0xef, 0x00, 0x00, 0x00, 0xff, 0xff, 0x78, 0xd6, 0xcd, - 0x18, 0xa5, 0x16, 0x00, 0x00, + // 1574 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xc4, 0x58, 0x5f, 0x6f, 0x1b, 0x45, + 0x10, 0xcf, 0xf9, 0xfc, 0x77, 0xec, 0xd8, 0xee, 0xe6, 0x9f, 0xeb, 0x96, 0x36, 0x39, 0xd4, 0x26, + 0xad, 0x20, 0x41, 0xa9, 0xa0, 0xf0, 0x82, 0x68, 0xea, 0x34, 0xb2, 0xda, 0x44, 0xd1, 0xba, 0x50, + 0x91, 0x17, 0xeb, 0x6c, 0x6f, 0x9c, 0x05, 0xdf, 0x9d, 0xb9, 0x5d, 0x37, 0x69, 0x5e, 0xe0, 0x09, + 0x24, 0x84, 0x54, 0x9e, 0x78, 0x80, 0x67, 0xbe, 0x00, 0x5f, 0x80, 0xaf, 0xc0, 0xe7, 0xe1, 0x01, + 0xa1, 0xdb, 0x5d, 0xdf, 0x1f, 0xfb, 0x12, 0x1b, 0xb7, 0xa5, 0x6f, 0xde, 0xb9, 0xdf, 0xce, 0xcc, + 0xce, 0xcc, 0xfe, 0x66, 0xd6, 0x80, 0x3a, 0x26, 0x37, 0x9b, 0x8c, 0xb8, 0xcf, 0x69, 0x9b, 0x6c, + 0xf6, 0x5d, 0x87, 0x3b, 0xe8, 0x8a, 0x45, 0x7b, 0xcf, 0x07, 0x4c, 0xae, 0x36, 0x3d, 0x40, 0xb5, + 0xd0, 0x76, 0x2c, 0xcb, 0xb1, 0xa5, 0xa8, 0x5a, 0xa4, 0x36, 0x27, 0xae, 0x6d, 0xf6, 0xd4, 0xba, + 0x10, 0xde, 0x60, 0x7c, 0x0b, 0x0b, 0x98, 0x74, 0x29, 0xe3, 0xc4, 0x3d, 0x70, 0x3a, 0x04, 0x93, + 0x6f, 0x06, 0x84, 0x71, 0xf4, 0x01, 0x24, 0x5b, 0x26, 0x23, 0x15, 0x6d, 0x55, 0xdb, 0xc8, 0x6f, + 0x5f, 0xdf, 0x8c, 0x18, 0x51, 0xea, 0xf7, 0x59, 0x77, 0xc7, 0x64, 0x04, 0x0b, 0x24, 0xfa, 0x08, + 0x32, 0x66, 0xa7, 0xe3, 0x12, 0xc6, 0x2a, 0x89, 0x4b, 0x36, 0x3d, 0x90, 0x18, 0x3c, 0x04, 0x1b, + 0x2f, 0x35, 0x58, 0x8c, 0x7a, 0xc0, 0xfa, 0x8e, 0xcd, 0x08, 0xda, 0x81, 0x3c, 0xb5, 0x29, 0x6f, + 0xf6, 0x4d, 0xd7, 0xb4, 0x98, 0xf2, 0x64, 0x2d, 0xaa, 0xd4, 0x3f, 0x5a, 0xdd, 0xa6, 0xfc, 0x50, + 0x00, 0x31, 0x50, 0xff, 0x37, 0xba, 0x07, 0x69, 0xc6, 0x4d, 0x3e, 0x18, 0xfa, 0x74, 0x2d, 0xd6, + 0xa7, 0x86, 0x80, 0x60, 0x05, 0x35, 0xfe, 0xd2, 0xa0, 0xd0, 0x20, 0xdd, 0x7a, 0x6d, 0x18, 0x8c, + 0x45, 0x48, 0xb5, 0x9d, 0x81, 0xcd, 0x85, 0x0f, 0xf3, 0x58, 0x2e, 0xd0, 0x2a, 0xe4, 0xdb, 0x27, + 0xa6, 0x6d, 0x93, 0xde, 0x81, 0x69, 0x11, 0x61, 0x20, 0x87, 0xc3, 0x22, 0x64, 0x40, 0xa1, 0xed, + 0xf4, 0x7a, 0xa4, 0xcd, 0xa9, 0x63, 0xd7, 0x6b, 0x15, 0x7d, 0x55, 0xdb, 0xd0, 0x71, 0x44, 0xe6, + 0x69, 0xe9, 0x9b, 0x2e, 0xa7, 0x0a, 0x92, 0x14, 0x90, 0xb0, 0x08, 0x5d, 0x83, 0x9c, 0xb7, 0xa3, + 0x69, 0x7b, 0x56, 0x52, 0xc2, 0x4a, 0xd6, 0x13, 0x08, 0x13, 0xb7, 0xa0, 0xe8, 0x63, 0x25, 0x22, + 0x2d, 0x10, 0xf3, 0xbe, 0xd4, 0x83, 0x19, 0x3f, 0x6b, 0x80, 0x1e, 0x30, 0x46, 0xbb, 0x76, 0xe4, + 0x60, 0xcb, 0x90, 0xb6, 0x9d, 0x0e, 0xa9, 0xd7, 0xc4, 0xc9, 0x74, 0xac, 0x56, 0x9e, 0xc9, 0x3e, + 0x21, 0x6e, 0xd3, 0x75, 0x7a, 0xc3, 0x83, 0x65, 0x3d, 0x01, 0x76, 0x7a, 0x04, 0xed, 0xc2, 0x3c, + 0x0b, 0x29, 0x61, 0x15, 0x7d, 0x55, 0xdf, 0xc8, 0x6f, 0xdf, 0xdc, 0x1c, 0x2b, 0xc4, 0xcd, 0xb0, + 0x31, 0x1c, 0xdd, 0x65, 0xfc, 0x99, 0x80, 0x92, 0xf8, 0x2e, 0xfd, 0xb2, 0x88, 0x2d, 0x02, 0x2d, + 0x40, 0xca, 0x1d, 0xb9, 0x98, 0x22, 0xd0, 0x7e, 0x82, 0xf4, 0x70, 0x82, 0x46, 0xc3, 0x9f, 0x9c, + 0x1c, 0xfe, 0xd4, 0x78, 0xf8, 0x6f, 0x42, 0x9e, 0x9c, 0xf5, 0xa9, 0x4b, 0x9a, 0x9c, 0xaa, 0xf0, + 0x26, 0x31, 0x48, 0xd1, 0x53, 0x6a, 0x91, 0x50, 0x8d, 0x65, 0xa6, 0xae, 0xb1, 0x68, 0x52, 0xb3, + 0x13, 0x93, 0x9a, 0x8b, 0x4b, 0xea, 0xaf, 0x1a, 0x2c, 0x44, 0x92, 0xaa, 0x2e, 0xce, 0x01, 0x94, + 0x59, 0x34, 0xb0, 0xde, 0xed, 0xf1, 0x72, 0x64, 0x5c, 0x94, 0xa3, 0x00, 0x8a, 0xc7, 0xf6, 0xce, + 0x76, 0x89, 0xce, 0xa0, 0xf0, 0xa8, 0x37, 0x60, 0x27, 0xb3, 0x13, 0x0a, 0x82, 0x64, 0xa7, 0x55, + 0xaf, 0x09, 0xa3, 0x3a, 0x16, 0xbf, 0xa7, 0x49, 0xa9, 0xf1, 0x93, 0x06, 0xa8, 0x71, 0xe2, 0x9c, + 0x36, 0x48, 0x57, 0x1c, 0x68, 0x66, 0x07, 0x46, 0x8d, 0x25, 0x26, 0xd7, 0x8f, 0x3e, 0x56, 0x3f, + 0xc6, 0x57, 0xb0, 0x10, 0xf1, 0x46, 0x25, 0xe9, 0x06, 0x00, 0x93, 0xa2, 0x7a, 0x4d, 0xa6, 0x47, + 0xc7, 0x21, 0xc9, 0x6c, 0x41, 0x3f, 0x86, 0x45, 0x65, 0xc7, 0xfb, 0x40, 0xd8, 0xec, 0x67, 0xbf, + 0x0e, 0x39, 0xdf, 0x19, 0x75, 0xf0, 0x40, 0x60, 0xfc, 0x93, 0x80, 0xa5, 0x11, 0x43, 0xea, 0x58, + 0x1f, 0x42, 0xca, 0xf3, 0x45, 0x9a, 0x2a, 0x5e, 0x44, 0x0a, 0xfe, 0x46, 0x2c, 0xd1, 0xde, 0x25, + 0x6b, 0xbb, 0xc4, 0xe4, 0xea, 0x92, 0x25, 0xe4, 0x25, 0x93, 0x22, 0x71, 0xc9, 0x6e, 0x42, 0x9e, + 0x11, 0xb3, 0x47, 0x3a, 0x12, 0xa0, 0x4b, 0x80, 0x14, 0x09, 0xc0, 0x1a, 0x14, 0x8e, 0xbd, 0x7a, + 0x1b, 0x22, 0x92, 0x02, 0x91, 0x57, 0x32, 0x01, 0x79, 0x0c, 0x25, 0xc6, 0x4d, 0x97, 0x37, 0xfb, + 0x0e, 0x13, 0xd9, 0x61, 0x95, 0x54, 0xdc, 0xb5, 0xf0, 0x9b, 0xca, 0x3e, 0xeb, 0x1e, 0x2a, 0x28, + 0x2e, 0x8a, 0xad, 0xc3, 0x25, 0x43, 0x7b, 0x30, 0x4f, 0xec, 0x4e, 0x48, 0x55, 0x7a, 0x6a, 0x55, + 0x05, 0x62, 0x77, 0x02, 0x45, 0xb3, 0xd0, 0x87, 0x41, 0x61, 0xa5, 0x6e, 0x33, 0xe2, 0xf2, 0x1d, + 0x6a, 0xf7, 0x9c, 0xee, 0xa1, 0xc9, 0x4f, 0xde, 0x54, 0xae, 0x7f, 0xd7, 0xe0, 0xea, 0xa8, 0xad, + 0x20, 0xdf, 0x55, 0xc8, 0x1e, 0x53, 0xd2, 0xeb, 0x04, 0x45, 0xec, 0xaf, 0xd1, 0x7d, 0x48, 0xf5, + 0x3d, 0x70, 0x25, 0x21, 0x42, 0x73, 0x51, 0xeb, 0x6e, 0x70, 0x97, 0xda, 0xdd, 0x27, 0x94, 0x71, + 0x2c, 0xf1, 0xa1, 0x90, 0xe8, 0xd3, 0x87, 0xe4, 0x3b, 0x0d, 0x16, 0xa5, 0x9f, 0x0f, 0x65, 0x67, + 0x78, 0xb3, 0xcc, 0x13, 0xd3, 0xcb, 0x0d, 0x0b, 0x96, 0x9e, 0x99, 0xbc, 0x7d, 0x52, 0xb3, 0x5e, + 0xd9, 0x05, 0xcf, 0x5c, 0xd0, 0xe0, 0x64, 0x08, 0x73, 0x38, 0x22, 0x33, 0x7e, 0xd3, 0xa0, 0x24, + 0x38, 0xb6, 0x41, 0xba, 0xff, 0xfb, 0x61, 0x47, 0x08, 0x2c, 0x39, 0x4a, 0x60, 0xc6, 0xdf, 0x3a, + 0xe4, 0xd5, 0x55, 0xaf, 0xdb, 0xc7, 0x4e, 0xb4, 0xca, 0xb4, 0x91, 0x2a, 0x7b, 0x3d, 0x5c, 0x8b, + 0xd6, 0xa1, 0x44, 0x45, 0x09, 0x34, 0x55, 0xa0, 0xa4, 0x63, 0x39, 0x5c, 0xa4, 0xe1, 0xca, 0x10, + 0xed, 0xd7, 0xe9, 0x13, 0x5b, 0x52, 0x45, 0x4a, 0x50, 0x45, 0xd6, 0x13, 0xc4, 0x71, 0x4d, 0x7a, + 0x22, 0xd7, 0x64, 0xc6, 0xb9, 0xe6, 0x2a, 0x64, 0xed, 0x81, 0xd5, 0x74, 0x9d, 0x53, 0x26, 0xda, + 0xbb, 0x8e, 0x33, 0xf6, 0xc0, 0xc2, 0xce, 0x29, 0xf3, 0x3e, 0x59, 0xc4, 0x6a, 0x32, 0x7a, 0x2e, + 0xfb, 0xba, 0x8e, 0x33, 0x16, 0xb1, 0x1a, 0xf4, 0x3c, 0xc4, 0x9e, 0xf0, 0x9f, 0xd8, 0xb3, 0x0e, + 0xc5, 0x28, 0xb1, 0x55, 0xf2, 0x53, 0x93, 0xd1, 0x7c, 0x84, 0xd7, 0xd0, 0x2e, 0x14, 0xc2, 0xb4, + 0x56, 0x29, 0x4c, 0xad, 0x28, 0x1f, 0x62, 0x35, 0xe3, 0x0c, 0x40, 0x39, 0xba, 0xcf, 0xba, 0x33, + 0x14, 0xe5, 0xc7, 0x90, 0x51, 0xb5, 0xa1, 0xda, 0xdf, 0x8d, 0x8b, 0x43, 0xe1, 0x55, 0x17, 0x1e, + 0xc2, 0xbd, 0xee, 0xbf, 0xfc, 0xd0, 0xaf, 0x1a, 0x2f, 0x4c, 0xaf, 0xd0, 0x05, 0x57, 0x20, 0xd3, + 0x69, 0xc9, 0x09, 0x4c, 0x4e, 0x9d, 0xe9, 0x4e, 0x4b, 0x4c, 0x68, 0xeb, 0x50, 0x0a, 0x4a, 0x53, + 0x02, 0x74, 0x01, 0x28, 0x06, 0x62, 0x31, 0xa3, 0xfd, 0xa0, 0xc1, 0xca, 0x98, 0x3b, 0x8a, 0x3b, + 0xef, 0xcb, 0x6c, 0x0f, 0x87, 0xb3, 0xb5, 0x58, 0x87, 0x1e, 0x93, 0x17, 0x5f, 0x98, 0xbd, 0x01, + 0x39, 0x34, 0xa9, 0x2b, 0xf3, 0x3d, 0xe3, 0x6c, 0xf0, 0x87, 0x06, 0x4b, 0x87, 0xc3, 0xbb, 0xf2, + 0xb6, 0xe3, 0x12, 0x33, 0xe2, 0x26, 0xe3, 0x46, 0xdc, 0xef, 0x35, 0x58, 0x1e, 0x75, 0xfa, 0xad, + 0x44, 0x6f, 0x1f, 0x8a, 0x8f, 0xbc, 0xbe, 0x26, 0xf8, 0x76, 0x9f, 0x70, 0x13, 0x55, 0x20, 0xa3, + 0x3a, 0x9d, 0x62, 0xb3, 0xe1, 0xd2, 0xa3, 0x87, 0x96, 0x68, 0x95, 0xcd, 0xa0, 0xfd, 0xe5, 0x70, + 0xbe, 0x15, 0xb4, 0x4f, 0xe3, 0x47, 0x0d, 0xca, 0xaa, 0x7c, 0x03, 0x8d, 0x97, 0x33, 0xe4, 0x3b, + 0x00, 0x94, 0x35, 0x15, 0xc7, 0x08, 0xd7, 0xb3, 0x38, 0x47, 0xd9, 0x23, 0x29, 0x40, 0x9f, 0x40, + 0x5a, 0xd8, 0x1f, 0xce, 0x34, 0x6b, 0x31, 0x17, 0x26, 0x7a, 0x02, 0xac, 0x36, 0x18, 0x9f, 0x43, + 0xa1, 0x56, 0x7b, 0x12, 0xf8, 0x31, 0xca, 0xc5, 0x5a, 0x0c, 0x17, 0x4f, 0x3e, 0xe3, 0x5d, 0x47, + 0xbc, 0xa2, 0x7d, 0xb2, 0x42, 0x25, 0xbf, 0x1f, 0x1c, 0x38, 0x36, 0x29, 0xcf, 0xa1, 0x05, 0xf1, + 0x00, 0x94, 0x02, 0xbe, 0x7b, 0x46, 0x19, 0x2f, 0x6b, 0x08, 0x41, 0x51, 0x09, 0xf7, 0x5c, 0xe7, + 0x94, 0xda, 0xdd, 0x72, 0x02, 0x5d, 0x81, 0xf9, 0xa1, 0x26, 0x41, 0xc2, 0x65, 0x3d, 0x04, 0x53, + 0x01, 0x28, 0x27, 0xb7, 0x5f, 0x02, 0xe4, 0x6b, 0x26, 0x37, 0x1b, 0xf2, 0xff, 0x11, 0x64, 0x42, + 0x21, 0xfc, 0xc7, 0x02, 0xba, 0x1d, 0x13, 0x92, 0x98, 0xff, 0x3e, 0xaa, 0xeb, 0x13, 0x71, 0xb2, + 0x04, 0x8d, 0x39, 0xb4, 0x07, 0x29, 0x61, 0x1f, 0xc5, 0x51, 0x75, 0xf8, 0xfd, 0x53, 0xbd, 0xac, + 0xca, 0x8c, 0x39, 0xd4, 0x82, 0x92, 0xff, 0x94, 0x53, 0x09, 0xbf, 0x15, 0xa3, 0x72, 0xfc, 0x0d, + 0x5f, 0xbd, 0x3d, 0x09, 0xe6, 0x3b, 0xdb, 0x84, 0x42, 0xe8, 0x25, 0xc2, 0x62, 0x0d, 0x8c, 0x3f, + 0x9c, 0x62, 0x0d, 0xc4, 0xbc, 0x68, 0x8c, 0x39, 0xd4, 0x85, 0xf2, 0x1e, 0xe1, 0x91, 0x87, 0x01, + 0x5a, 0x9f, 0xd0, 0xc3, 0x86, 0x2c, 0x54, 0xdd, 0x98, 0x0c, 0xf4, 0x0d, 0xb9, 0xb0, 0xb8, 0x47, + 0xf8, 0xd8, 0x54, 0x8a, 0xee, 0xc6, 0xe8, 0xb8, 0x60, 0x4e, 0xae, 0xbe, 0x37, 0x05, 0x36, 0x6c, + 0xd3, 0x84, 0x2b, 0xbe, 0x4d, 0x7f, 0x8e, 0x58, 0xbf, 0x50, 0x49, 0x74, 0x02, 0xac, 0x4e, 0x1e, + 0x7e, 0xc5, 0xb1, 0x56, 0xf6, 0x08, 0x8f, 0xb6, 0x0b, 0xca, 0x38, 0x6d, 0x33, 0x74, 0x27, 0xc6, + 0x50, 0x7c, 0x9b, 0xab, 0xde, 0x9d, 0x06, 0xea, 0x1f, 0xcb, 0x81, 0xe5, 0x3d, 0xc2, 0x23, 0x1c, + 0xab, 0x4c, 0xc6, 0x25, 0x24, 0xb6, 0x81, 0x54, 0xef, 0x4c, 0x81, 0xf4, 0x0d, 0x1e, 0x01, 0x12, + 0x87, 0xb4, 0xfa, 0x8e, 0x1d, 0x94, 0x49, 0x35, 0xf6, 0x7a, 0xec, 0x5a, 0x7d, 0xfe, 0x62, 0xb4, + 0x00, 0xfd, 0xd8, 0x8d, 0xe8, 0x30, 0xe6, 0xd0, 0x33, 0xa1, 0xdb, 0x1b, 0xc0, 0x9e, 0xd2, 0xf6, + 0xd7, 0x2a, 0x05, 0x97, 0xea, 0x7e, 0x37, 0xfa, 0x4d, 0x2d, 0x64, 0x56, 0x42, 0x4e, 0x7f, 0x29, + 0x0a, 0x2e, 0x08, 0xce, 0x6b, 0x54, 0x7d, 0x04, 0x4b, 0xc1, 0xa5, 0xf1, 0x66, 0x99, 0xd7, 0xa7, + 0x7b, 0xfb, 0x97, 0x04, 0x64, 0x3d, 0x46, 0x14, 0xf4, 0xf7, 0x26, 0x03, 0x7f, 0x04, 0xa5, 0xe8, + 0xcb, 0x27, 0xbe, 0x7c, 0x62, 0x5f, 0x47, 0x93, 0xa8, 0x11, 0xc3, 0xfc, 0xf0, 0x95, 0x23, 0x79, + 0xcb, 0xb8, 0x88, 0x6b, 0x83, 0x77, 0xd0, 0x04, 0x9d, 0x3b, 0x9f, 0x1d, 0x7d, 0xda, 0xa5, 0xfc, + 0x64, 0xd0, 0xf2, 0xbe, 0x6c, 0x9d, 0xd3, 0x5e, 0x8f, 0x9e, 0x73, 0xd2, 0x3e, 0xd9, 0x92, 0xbb, + 0xde, 0xef, 0x50, 0xc6, 0x5d, 0xda, 0x1a, 0x70, 0xd2, 0xd9, 0x1a, 0x1e, 0x7b, 0x4b, 0xa8, 0xda, + 0xf2, 0xcc, 0xf5, 0x5b, 0xad, 0xb4, 0x58, 0xdd, 0xfb, 0x37, 0x00, 0x00, 0xff, 0xff, 0x2f, 0x5b, + 0x6e, 0xda, 0x93, 0x17, 0x00, 0x00, } // Reference imports to suppress errors if they are not otherwise used. @@ -1672,6 +1690,7 @@ type DataServiceClient interface { GetComponentStates(ctx context.Context, in *commonpb.Empty, opts ...grpc.CallOption) (*internalpb2.ComponentStates, error) GetTimeTickChannel(ctx context.Context, in *commonpb.Empty, opts ...grpc.CallOption) (*milvuspb.StringResponse, error) GetStatisticsChannel(ctx context.Context, in *commonpb.Empty, opts ...grpc.CallOption) (*milvuspb.StringResponse, error) + GetSegmentInfoChannel(ctx context.Context, in *commonpb.Empty, opts ...grpc.CallOption) (*milvuspb.StringResponse, error) } type dataServiceClient struct { @@ -1790,6 +1809,15 @@ func (c *dataServiceClient) GetStatisticsChannel(ctx context.Context, in *common return out, nil } +func (c *dataServiceClient) GetSegmentInfoChannel(ctx context.Context, in *commonpb.Empty, opts ...grpc.CallOption) (*milvuspb.StringResponse, error) { + out := new(milvuspb.StringResponse) + err := c.cc.Invoke(ctx, "/milvus.proto.data.DataService/GetSegmentInfoChannel", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + // DataServiceServer is the server API for DataService service. type DataServiceServer interface { RegisterNode(context.Context, *RegisterNodeRequest) (*RegisterNodeResponse, error) @@ -1804,6 +1832,7 @@ type DataServiceServer interface { GetComponentStates(context.Context, *commonpb.Empty) (*internalpb2.ComponentStates, error) GetTimeTickChannel(context.Context, *commonpb.Empty) (*milvuspb.StringResponse, error) GetStatisticsChannel(context.Context, *commonpb.Empty) (*milvuspb.StringResponse, error) + GetSegmentInfoChannel(context.Context, *commonpb.Empty) (*milvuspb.StringResponse, error) } // UnimplementedDataServiceServer can be embedded to have forward compatible implementations. @@ -1846,6 +1875,9 @@ func (*UnimplementedDataServiceServer) GetTimeTickChannel(ctx context.Context, r func (*UnimplementedDataServiceServer) GetStatisticsChannel(ctx context.Context, req *commonpb.Empty) (*milvuspb.StringResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method GetStatisticsChannel not implemented") } +func (*UnimplementedDataServiceServer) GetSegmentInfoChannel(ctx context.Context, req *commonpb.Empty) (*milvuspb.StringResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method GetSegmentInfoChannel not implemented") +} func RegisterDataServiceServer(s *grpc.Server, srv DataServiceServer) { s.RegisterService(&_DataService_serviceDesc, srv) @@ -2067,6 +2099,24 @@ func _DataService_GetStatisticsChannel_Handler(srv interface{}, ctx context.Cont return interceptor(ctx, in, info, handler) } +func _DataService_GetSegmentInfoChannel_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(commonpb.Empty) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(DataServiceServer).GetSegmentInfoChannel(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/milvus.proto.data.DataService/GetSegmentInfoChannel", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(DataServiceServer).GetSegmentInfoChannel(ctx, req.(*commonpb.Empty)) + } + return interceptor(ctx, in, info, handler) +} + var _DataService_serviceDesc = grpc.ServiceDesc{ ServiceName: "milvus.proto.data.DataService", HandlerType: (*DataServiceServer)(nil), @@ -2119,6 +2169,10 @@ var _DataService_serviceDesc = grpc.ServiceDesc{ MethodName: "GetStatisticsChannel", Handler: _DataService_GetStatisticsChannel_Handler, }, + { + MethodName: "GetSegmentInfoChannel", + Handler: _DataService_GetSegmentInfoChannel_Handler, + }, }, Streams: []grpc.StreamDesc{}, Metadata: "data_service.proto",