diff --git a/internal/dataservice/server.go b/internal/dataservice/server.go index 9526d2b4c7d1070e71420316ff512234aba10869..7545d658e187df516ee90a7bd11f4274137d1a61 100644 --- a/internal/dataservice/server.go +++ b/internal/dataservice/server.go @@ -117,9 +117,6 @@ func (s *Server) Start() error { if err = s.initMasterClient(); err != nil { return err } - if err = s.getDDChannelFromMaster(); err != nil { - return err - } if err = s.initMeta(); err != nil { return err @@ -236,15 +233,6 @@ func (s *Server) initFlushMsgStream() error { return nil } -func (s *Server) getDDChannelFromMaster() error { - resp, err := s.masterClient.GetDdChannel(s.ctx) - if err = VerifyResponse(resp, err); err != nil { - return err - } - s.ddChannelName = resp.Value - return nil -} - func (s *Server) startServerLoop() { s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(s.ctx) s.serverLoopWg.Add(5) diff --git a/internal/distributed/masterservice/client/client.go b/internal/distributed/masterservice/client/client.go index 286164e88db4590a8de882d96b5d6b3b74108fd3..5927cface15bda188692a217e29571383dc53f7f 100644 --- a/internal/distributed/masterservice/client/client.go +++ b/internal/distributed/masterservice/client/client.go @@ -176,15 +176,7 @@ func (c *GrpcClient) GetStatisticsChannel(ctx context.Context) (*milvuspb.String return ret.(*milvuspb.StringResponse), err } -// GetDdChannel receive ddl from rpc and time tick from proxy service, and put them into this channel -func (c *GrpcClient) GetDdChannel(ctx context.Context) (*milvuspb.StringResponse, error) { - ret, err := c.recall(func() (interface{}, error) { - return c.grpcClient.GetDdChannel(ctx, &internalpb.GetDdChannelRequest{}) - }) - return ret.(*milvuspb.StringResponse), err -} - -// CreateCollection DDL request +//DDL request func (c *GrpcClient) CreateCollection(ctx context.Context, in *milvuspb.CreateCollectionRequest) (*commonpb.Status, error) { ret, err := c.recall(func() (interface{}, error) { return c.grpcClient.CreateCollection(ctx, in) diff --git a/internal/distributed/masterservice/masterservice_test.go b/internal/distributed/masterservice/masterservice_test.go index 99b2fa20b97c7cf6d4e61371b69b64f2afe219fb..3c75929ae77477f845a46993e896e776c5bd1263 100644 --- a/internal/distributed/masterservice/masterservice_test.go +++ b/internal/distributed/masterservice/masterservice_test.go @@ -129,7 +129,6 @@ func TestGrpcService(t *testing.T) { cms.Params.KvRootPath = fmt.Sprintf("/%d/test/kv", randVal) cms.Params.MsgChannelSubName = fmt.Sprintf("msgChannel%d", randVal) cms.Params.TimeTickChannel = fmt.Sprintf("timeTick%d", randVal) - cms.Params.DdChannel = fmt.Sprintf("ddChannel%d", randVal) cms.Params.StatisticsChannel = fmt.Sprintf("stateChannel%d", randVal) cms.Params.DataServiceSegmentChannel = fmt.Sprintf("segmentChannel%d", randVal) @@ -286,13 +285,6 @@ func TestGrpcService(t *testing.T) { assert.Equal(t, commonpb.ErrorCode_Success, rsp.Status.ErrorCode) }) - t.Run("get dd channel", func(t *testing.T) { - req := &internalpb.GetDdChannelRequest{} - rsp, err := svr.GetDdChannel(ctx, req) - assert.Nil(t, err) - assert.Equal(t, commonpb.ErrorCode_Success, rsp.Status.ErrorCode) - }) - t.Run("alloc time stamp", func(t *testing.T) { req := &masterpb.AllocTimestampRequest{ Count: 1, diff --git a/internal/distributed/masterservice/server.go b/internal/distributed/masterservice/server.go index 9d7a7485a7059715203b4ad7413555b4e523c1b4..0c3edc2b5285c07a823d6f413ac0e35a3e9b0c34 100644 --- a/internal/distributed/masterservice/server.go +++ b/internal/distributed/masterservice/server.go @@ -296,12 +296,7 @@ func (s *Server) GetStatisticsChannel(ctx context.Context, req *internalpb.GetSt return s.masterService.GetStatisticsChannel(ctx) } -// GetDdChannel receive ddl from rpc and time tick from proxy service, and put them into this channel -func (s *Server) GetDdChannel(ctx context.Context, req *internalpb.GetDdChannelRequest) (*milvuspb.StringResponse, error) { - return s.masterService.GetDdChannel(ctx) -} - -// CreateCollection DDL request +//DDL request func (s *Server) CreateCollection(ctx context.Context, in *milvuspb.CreateCollectionRequest) (*commonpb.Status, error) { return s.masterService.CreateCollection(ctx, in) } diff --git a/internal/masterservice/dml_channels.go b/internal/masterservice/dml_channels.go new file mode 100644 index 0000000000000000000000000000000000000000..1584b52c0d464a97ef654570ac816f6eac42a88f --- /dev/null +++ b/internal/masterservice/dml_channels.go @@ -0,0 +1,129 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +package masterservice + +import ( + "fmt" + "sync" + + "github.com/milvus-io/milvus/internal/log" + "github.com/milvus-io/milvus/internal/msgstream" + "go.uber.org/zap" +) + +type dmlChannels struct { + core *Core + lock sync.RWMutex + dml map[string]msgstream.MsgStream +} + +func newDMLChannels(c *Core) *dmlChannels { + return &dmlChannels{ + core: c, + lock: sync.RWMutex{}, + dml: make(map[string]msgstream.MsgStream), + } +} + +func (d *dmlChannels) GetNumChannles() int { + d.lock.RLock() + defer d.lock.RUnlock() + return len(d.dml) +} + +func (d *dmlChannels) ProduceAll(pack *msgstream.MsgPack) { + d.lock.RLock() + defer d.lock.RUnlock() + + for n, ms := range d.dml { + if err := ms.Produce(pack); err != nil { + log.Debug("msgstream produce error", zap.String("name", n), zap.Error(err)) + } + } +} + +func (d *dmlChannels) BroadcastAll(pack *msgstream.MsgPack) { + d.lock.RLock() + defer d.lock.RUnlock() + + for n, ms := range d.dml { + if err := ms.Broadcast(pack); err != nil { + log.Debug("msgstream broadcast error", zap.String("name", n), zap.Error(err)) + } + } +} + +func (d *dmlChannels) Produce(name string, pack *msgstream.MsgPack) error { + d.lock.Lock() + defer d.lock.Unlock() + + var err error + ms, ok := d.dml[name] + if !ok { + ms, err = d.core.msFactory.NewMsgStream(d.core.ctx) + if err != nil { + return fmt.Errorf("create mstream failed, name = %s, error=%w", name, err) + } + ms.AsProducer([]string{name}) + d.dml[name] = ms + } + return ms.Produce(pack) + +} + +func (d *dmlChannels) Broadcast(name string, pack *msgstream.MsgPack) error { + d.lock.Lock() + defer d.lock.Unlock() + + var err error + ms, ok := d.dml[name] + if !ok { + ms, err = d.core.msFactory.NewMsgStream(d.core.ctx) + if err != nil { + return fmt.Errorf("create msgtream failed, name = %s, error=%w", name, err) + } + ms.AsProducer([]string{name}) + d.dml[name] = ms + } + return ms.Broadcast(pack) +} + +func (d *dmlChannels) AddProducerChannles(names ...string) { + d.lock.Lock() + defer d.lock.Unlock() + + var err error + for _, name := range names { + ms, ok := d.dml[name] + if !ok { + ms, err = d.core.msFactory.NewMsgStream(d.core.ctx) + if err != nil { + log.Debug("add msgstream failed", zap.String("name", name), zap.Error(err)) + continue + } + ms.AsProducer([]string{name}) + d.dml[name] = ms + } + } +} + +func (d *dmlChannels) RemoveProducerChannels(names ...string) { + d.lock.Lock() + defer d.lock.Unlock() + + for _, name := range names { + if ms, ok := d.dml[name]; ok { + ms.Close() + delete(d.dml, name) + } + } +} diff --git a/internal/masterservice/master_service.go b/internal/masterservice/master_service.go index eade11de8ded3574a749ef46b27ea935f6ee41b9..3a1409088be3a014c15df9079e3a849a57ca7d9a 100644 --- a/internal/masterservice/master_service.go +++ b/internal/masterservice/master_service.go @@ -143,6 +143,9 @@ type Core struct { //dd request scheduler ddReqQueue chan reqTask //dd request will be push into this chan + //dml channels + dmlChannels *dmlChannels + //ProxyNode manager proxyNodeManager *proxyNodeManager @@ -528,14 +531,6 @@ func (c *Core) setMsgStreams() error { timeTickStream.AsProducer([]string{Params.TimeTickChannel}) log.Debug("masterservice AsProducer: " + Params.TimeTickChannel) - // master dd channel - if Params.DdChannel == "" { - return fmt.Errorf("DdChannel is empty") - } - ddStream, _ := c.msFactory.NewMsgStream(c.ctx) - ddStream.AsProducer([]string{Params.DdChannel}) - log.Debug("masterservice AsProducer: " + Params.DdChannel) - c.SendTimeTick = func(t typeutil.Timestamp) error { msgPack := ms.MsgPack{} baseMsg := ms.BaseMsg{ @@ -559,11 +554,25 @@ func (c *Core) setMsgStreams() error { if err := timeTickStream.Broadcast(&msgPack); err != nil { return err } - if err := ddStream.Broadcast(&msgPack); err != nil { - return err - } metrics.MasterDDChannelTimeTick.Set(float64(tsoutil.Mod24H(t))) - return nil + + c.dmlChannels.BroadcastAll(&msgPack) + pc := c.MetaTable.ListCollectionPhysicalChannels() + pt := make([]uint64, len(pc)) + for i := 0; i < len(pt); i++ { + pt[i] = t + } + ttMsg := internalpb.ChannelTimeTickMsg{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_TimeTick, + MsgID: 0, //TODO + Timestamp: t, + SourceID: c.session.ServerID, + }, + ChannelNames: pc, + Timestamps: pt, + } + return c.chanTimeTick.UpdateTimeTick(&ttMsg) } c.SendDdCreateCollectionReq = func(ctx context.Context, req *internalpb.CreateCollectionRequest) error { @@ -574,14 +583,12 @@ func (c *Core) setMsgStreams() error { EndTimestamp: req.Base.Timestamp, HashValues: []uint32{0}, } - collMsg := &ms.CreateCollectionMsg{ + msg := &ms.CreateCollectionMsg{ BaseMsg: baseMsg, CreateCollectionRequest: *req, } - msgPack.Msgs = append(msgPack.Msgs, collMsg) - if err := ddStream.Broadcast(&msgPack); err != nil { - return err - } + msgPack.Msgs = append(msgPack.Msgs, msg) + c.dmlChannels.BroadcastAll(&msgPack) return nil } @@ -593,14 +600,12 @@ func (c *Core) setMsgStreams() error { EndTimestamp: req.Base.Timestamp, HashValues: []uint32{0}, } - collMsg := &ms.DropCollectionMsg{ + msg := &ms.DropCollectionMsg{ BaseMsg: baseMsg, DropCollectionRequest: *req, } - msgPack.Msgs = append(msgPack.Msgs, collMsg) - if err := ddStream.Broadcast(&msgPack); err != nil { - return err - } + msgPack.Msgs = append(msgPack.Msgs, msg) + c.dmlChannels.BroadcastAll(&msgPack) return nil } @@ -612,14 +617,12 @@ func (c *Core) setMsgStreams() error { EndTimestamp: req.Base.Timestamp, HashValues: []uint32{0}, } - collMsg := &ms.CreatePartitionMsg{ + msg := &ms.CreatePartitionMsg{ BaseMsg: baseMsg, CreatePartitionRequest: *req, } - msgPack.Msgs = append(msgPack.Msgs, collMsg) - if err := ddStream.Broadcast(&msgPack); err != nil { - return err - } + msgPack.Msgs = append(msgPack.Msgs, msg) + c.dmlChannels.BroadcastAll(&msgPack) return nil } @@ -631,14 +634,12 @@ func (c *Core) setMsgStreams() error { EndTimestamp: req.Base.Timestamp, HashValues: []uint32{0}, } - collMsg := &ms.DropPartitionMsg{ + msg := &ms.DropPartitionMsg{ BaseMsg: baseMsg, DropPartitionRequest: *req, } - msgPack.Msgs = append(msgPack.Msgs, collMsg) - if err := ddStream.Broadcast(&msgPack); err != nil { - return err - } + msgPack.Msgs = append(msgPack.Msgs, msg) + c.dmlChannels.BroadcastAll(&msgPack) return nil } @@ -968,7 +969,13 @@ func (c *Core) Init() error { if initError = c.msFactory.SetParams(m); initError != nil { return } + + c.dmlChannels = newDMLChannels(c) + pc := c.MetaTable.ListCollectionPhysicalChannels() + c.dmlChannels.AddProducerChannles(pc...) + c.chanTimeTick = newTimeTickSync(c) + c.chanTimeTick.AddProxyNode(c.session) c.proxyClientManager = newProxyClientManager(c) c.proxyNodeManager, initError = newProxyNodeManager( @@ -1096,7 +1103,6 @@ func (c *Core) Start() error { } log.Debug("MasterService", zap.Int64("node id", c.session.ServerID)) - log.Debug("MasterService", zap.String("dd channel name", Params.DdChannel)) log.Debug("MasterService", zap.String("time tick channel name", Params.TimeTickChannel)) c.startOnce.Do(func() { @@ -1163,16 +1169,6 @@ func (c *Core) GetTimeTickChannel(ctx context.Context) (*milvuspb.StringResponse }, nil } -func (c *Core) GetDdChannel(ctx context.Context) (*milvuspb.StringResponse, error) { - return &milvuspb.StringResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_Success, - Reason: "", - }, - Value: Params.DdChannel, - }, nil -} - func (c *Core) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResponse, error) { return &milvuspb.StringResponse{ Status: &commonpb.Status{ diff --git a/internal/masterservice/master_service_test.go b/internal/masterservice/master_service_test.go index d84ee09f729d38a5a23e0a323a15a9bc41dc8d9d..e54aaa2105e14f0efbc111e91e4859eedc2bbea7 100644 --- a/internal/masterservice/master_service_test.go +++ b/internal/masterservice/master_service_test.go @@ -174,7 +174,7 @@ func (idx *indexMock) getFileArray() []string { return ret } -func consumeMsgChan(timeout time.Duration, targetChan <-chan *msgstream.MsgPack) { +func clearMsgChan(timeout time.Duration, targetChan <-chan *msgstream.MsgPack) { ch := time.After(timeout) for { select { @@ -232,6 +232,27 @@ func GenFlushedSegMsgPack(segID typeutil.UniqueID) *msgstream.MsgPack { return &msgPack } +func getNotTtMsg(ctx context.Context, n int, ch <-chan *msgstream.MsgPack) []msgstream.TsMsg { + ret := make([]msgstream.TsMsg, 0, n) + for { + select { + case <-ctx.Done(): + return nil + case msg, ok := <-ch: + if ok { + for _, v := range msg.Msgs { + if _, ok := v.(*msgstream.TimeTickMsg); !ok { + ret = append(ret, v) + } + } + if len(ret) >= n { + return ret + } + } + } + } +} + func TestMasterService(t *testing.T) { const ( dbName = "testDb" @@ -250,7 +271,6 @@ func TestMasterService(t *testing.T) { randVal := rand.Int() Params.TimeTickChannel = fmt.Sprintf("master-time-tick-%d", randVal) - Params.DdChannel = fmt.Sprintf("master-dd-%d", randVal) Params.StatisticsChannel = fmt.Sprintf("master-statistics-%d", randVal) Params.MetaRootPath = fmt.Sprintf("/%d/%s", randVal, Params.MetaRootPath) Params.KvRootPath = fmt.Sprintf("/%d/%s", randVal, Params.KvRootPath) @@ -322,17 +342,15 @@ func TestMasterService(t *testing.T) { timeTickStream.AsConsumer([]string{Params.TimeTickChannel}, Params.MsgChannelSubName) timeTickStream.Start() - ddStream, _ := tmpFactory.NewMsgStream(ctx) - ddStream.AsConsumer([]string{Params.DdChannel}, Params.MsgChannelSubName) - ddStream.Start() + dmlStream, _ := tmpFactory.NewMsgStream(ctx) // test dataServiceSegmentStream seek dataNodeSubName := Params.MsgChannelSubName + "dn" flushedSegStream, _ := tmpFactory.NewMsgStream(ctx) flushedSegStream.AsConsumer([]string{Params.DataServiceSegmentChannel}, dataNodeSubName) flushedSegStream.Start() - msgPack := GenFlushedSegMsgPack(9999) - err = dataServiceSegmentStream.Produce(msgPack) + msgPackTmp := GenFlushedSegMsgPack(9999) + err = dataServiceSegmentStream.Produce(msgPackTmp) assert.Nil(t, err) flushedSegMsgPack := flushedSegStream.Consume() @@ -360,27 +378,6 @@ func TestMasterService(t *testing.T) { time.Sleep(time.Second) - getNotTtMsg := func(n int, ch <-chan *msgstream.MsgPack) []msgstream.TsMsg { - ret := make([]msgstream.TsMsg, 0, n) - for { - select { - case <-ctx.Done(): - return nil - case msg, ok := <-ch: - if ok { - for _, v := range msg.Msgs { - if _, ok := v.(*msgstream.TimeTickMsg); !ok { - ret = append(ret, v) - } - } - if len(ret) >= n { - return ret - } - } - } - } - } - t.Run("time tick", func(t *testing.T) { ttmsg, ok := <-timeTickStream.Chan() assert.True(t, ok) @@ -397,14 +394,6 @@ func TestMasterService(t *testing.T) { assert.True(t, ok) assert.Greater(t, ttm2.Base.Timestamp, uint64(0)) assert.Equal(t, ttm2.Base.Timestamp, ttm.Base.Timestamp+1) - - ddmsg, ok := <-ddStream.Chan() - assert.True(t, ok) - assert.Equal(t, 1, len(ddmsg.Msgs)) - ddm, ok := (ddmsg.Msgs[0]).(*msgstream.TimeTickMsg) - assert.True(t, ok) - assert.Greater(t, ddm.Base.Timestamp, uint64(0)) - assert.Equal(t, ttm.Base.Timestamp, ddm.Base.Timestamp) }) t.Run("create collection", func(t *testing.T) { @@ -446,10 +435,17 @@ func TestMasterService(t *testing.T) { assert.Nil(t, err) assert.Equal(t, commonpb.ErrorCode_Success, status.ErrorCode) - msgs := getNotTtMsg(2, ddStream.Chan()) - assert.Equal(t, 2, len(msgs)) + assert.Equal(t, 2, len(core.MetaTable.vChan2Chan)) + assert.Equal(t, 2, len(core.dmlChannels.dml)) - createMsg, ok := (msgs[0]).(*msgstream.CreateCollectionMsg) + pChan := core.MetaTable.ListCollectionPhysicalChannels() + dmlStream.AsConsumer([]string{pChan[0]}, Params.MsgChannelSubName) + dmlStream.Start() + + // get CreateCollectionMsg + msgPack, ok := <-dmlStream.Chan() + assert.True(t, ok) + createMsg, ok := (msgPack.Msgs[0]).(*msgstream.CreateCollectionMsg) assert.True(t, ok) createMeta, err := core.MetaTable.GetCollectionByName(collName, 0) assert.Nil(t, err) @@ -463,11 +459,23 @@ func TestMasterService(t *testing.T) { assert.Nil(t, err) assert.Equal(t, createMeta.PhysicalChannelNames[0], chanName) - createPart, ok := (msgs[1]).(*msgstream.CreatePartitionMsg) + // get CreatePartitionMsg + msgPack, ok = <-dmlStream.Chan() + assert.True(t, ok) + createPart, ok := (msgPack.Msgs[0]).(*msgstream.CreatePartitionMsg) assert.True(t, ok) assert.Equal(t, collName, createPart.CollectionName) assert.Equal(t, createMeta.PartitionIDs[0], createPart.PartitionID) + // get TimeTickMsg + msgPack, ok = <-dmlStream.Chan() + assert.True(t, ok) + assert.Equal(t, 1, len(msgPack.Msgs)) + ddm, ok := (msgPack.Msgs[0]).(*msgstream.TimeTickMsg) + assert.True(t, ok) + assert.Greater(t, ddm.Base.Timestamp, uint64(0)) + + // check invalid operation req.Base.MsgID = 101 req.Base.Timestamp = 101 req.Base.SourceID = 101 @@ -494,7 +502,7 @@ func TestMasterService(t *testing.T) { assert.Nil(t, err) assert.Equal(t, commonpb.ErrorCode_Success, status.ErrorCode) - msgs = getNotTtMsg(1, ddStream.Chan()) + msgs := getNotTtMsg(ctx, 1, dmlStream.Chan()) createMsg, ok = (msgs[0]).(*msgstream.CreateCollectionMsg) assert.True(t, ok) createMeta, err = core.MetaTable.GetCollectionByName("testColl-again", 0) @@ -623,7 +631,7 @@ func TestMasterService(t *testing.T) { CollectionName: collName, PartitionName: partName, } - consumeMsgChan(time.Second, ddStream.Chan()) + clearMsgChan(10*time.Millisecond, dmlStream.Chan()) status, err := core.CreatePartition(ctx, req) assert.Nil(t, err) t.Log(status.Reason) @@ -635,7 +643,7 @@ func TestMasterService(t *testing.T) { assert.Nil(t, err) assert.Equal(t, partName, partMeta.PartitionName) - msgs := getNotTtMsg(1, ddStream.Chan()) + msgs := getNotTtMsg(ctx, 1, dmlStream.Chan()) assert.Equal(t, 1, len(msgs)) partMsg, ok := (msgs[0]).(*msgstream.CreatePartitionMsg) assert.True(t, ok) @@ -989,7 +997,7 @@ func TestMasterService(t *testing.T) { assert.Nil(t, err) assert.Equal(t, Params.DefaultPartitionName, partMeta.PartitionName) - msgs := getNotTtMsg(1, ddStream.Chan()) + msgs := getNotTtMsg(ctx, 1, dmlStream.Chan()) assert.Equal(t, 1, len(msgs)) dmsg, ok := (msgs[0]).(*msgstream.DropPartitionMsg) assert.True(t, ok) @@ -1038,7 +1046,7 @@ func TestMasterService(t *testing.T) { _, err = core.MetaTable.GetChanNameByVirtualChan(vChanName) assert.NotNil(t, err) - msgs := getNotTtMsg(1, ddStream.Chan()) + msgs := getNotTtMsg(ctx, 1, dmlStream.Chan()) assert.Equal(t, 1, len(msgs)) dmsg, ok := (msgs[0]).(*msgstream.DropCollectionMsg) assert.True(t, ok) @@ -1067,7 +1075,6 @@ func TestMasterService(t *testing.T) { assert.Nil(t, err) assert.Equal(t, commonpb.ErrorCode_UnexpectedError, status.ErrorCode) time.Sleep(time.Second) - //assert.Zero(t, len(ddStream.Chan())) collArray = pnm.GetCollArray() assert.Equal(t, 3, len(collArray)) assert.Equal(t, collName, collArray[2]) @@ -1441,8 +1448,6 @@ func TestMasterService(t *testing.T) { t.Run("get_channels", func(t *testing.T) { _, err := core.GetTimeTickChannel(ctx) assert.Nil(t, err) - _, err = core.GetDdChannel(ctx) - assert.Nil(t, err) _, err = core.GetStatisticsChannel(ctx) assert.Nil(t, err) }) @@ -1490,7 +1495,7 @@ func TestMasterService(t *testing.T) { s, _ := core.UpdateChannelTimeTick(ctx, msg0) assert.Equal(t, commonpb.ErrorCode_Success, s.ErrorCode) time.Sleep(100 * time.Millisecond) - t.Log(core.chanTimeTick.proxyTimeTick) + //t.Log(core.chanTimeTick.proxyTimeTick) msg1 := &internalpb.ChannelTimeTickMsg{ Base: &commonpb.MsgBase{ @@ -1514,8 +1519,11 @@ func TestMasterService(t *testing.T) { assert.Equal(t, commonpb.ErrorCode_UnexpectedError, s.ErrorCode) time.Sleep(1 * time.Second) - assert.Equal(t, 2, core.chanTimeTick.GetProxyNodeNum()) - assert.Equal(t, 3, core.chanTimeTick.GetChanNum()) + // 2 proxy nodes, 1 master + assert.Equal(t, 3, core.chanTimeTick.GetProxyNodeNum()) + + // 3 proxy node channels, 2 master channels + assert.Equal(t, 5, core.chanTimeTick.GetChanNum()) }) err = core.Stop() @@ -1734,7 +1742,6 @@ func TestMasterService2(t *testing.T) { randVal := rand.Int() Params.TimeTickChannel = fmt.Sprintf("master-time-tick-%d", randVal) - Params.DdChannel = fmt.Sprintf("master-dd-%d", randVal) Params.StatisticsChannel = fmt.Sprintf("master-statistics-%d", randVal) Params.MetaRootPath = fmt.Sprintf("/%d/%s", randVal, Params.MetaRootPath) Params.KvRootPath = fmt.Sprintf("/%d/%s", randVal, Params.KvRootPath) @@ -1788,28 +1795,8 @@ func TestMasterService2(t *testing.T) { timeTickStream.AsConsumer([]string{Params.TimeTickChannel}, Params.MsgChannelSubName) timeTickStream.Start() - ddStream, _ := msFactory.NewMsgStream(ctx) - ddStream.AsConsumer([]string{Params.DdChannel}, Params.MsgChannelSubName) - ddStream.Start() - time.Sleep(time.Second) - getNotTTMsg := func(ch <-chan *msgstream.MsgPack, n int) []msgstream.TsMsg { - msg := make([]msgstream.TsMsg, 0, n) - for { - m, ok := <-ch - assert.True(t, ok) - for _, m := range m.Msgs { - if _, ok := (m).(*msgstream.TimeTickMsg); !ok { - msg = append(msg, m) - } - } - if len(msg) >= n { - return msg - } - } - } - t.Run("time tick", func(t *testing.T) { ttmsg, ok := <-timeTickStream.Chan() assert.True(t, ok) @@ -1817,13 +1804,6 @@ func TestMasterService2(t *testing.T) { ttm, ok := (ttmsg.Msgs[0]).(*msgstream.TimeTickMsg) assert.True(t, ok) assert.Greater(t, ttm.Base.Timestamp, typeutil.Timestamp(0)) - - ddmsg, ok := <-ddStream.Chan() - assert.True(t, ok) - assert.Equal(t, 1, len(ddmsg.Msgs)) - ddm, ok := (ddmsg.Msgs[0]).(*msgstream.TimeTickMsg) - assert.True(t, ok) - assert.Greater(t, ddm.Base.Timestamp, typeutil.Timestamp(0)) }) t.Run("create collection", func(t *testing.T) { @@ -1847,11 +1827,17 @@ func TestMasterService2(t *testing.T) { assert.Nil(t, err) assert.Equal(t, commonpb.ErrorCode_Success, status.ErrorCode) - msg := getNotTTMsg(ddStream.Chan(), 2) - assert.GreaterOrEqual(t, len(msg), 2) - m1, ok := (msg[0]).(*msgstream.CreateCollectionMsg) + pChan := core.MetaTable.ListCollectionPhysicalChannels() + dmlStream, _ := msFactory.NewMsgStream(ctx) + dmlStream.AsConsumer(pChan, Params.MsgChannelSubName) + dmlStream.Start() + + msgs := getNotTtMsg(ctx, 2, dmlStream.Chan()) + assert.Equal(t, 2, len(msgs)) + + m1, ok := (msgs[0]).(*msgstream.CreateCollectionMsg) assert.True(t, ok) - m2, ok := (msg[1]).(*msgstream.CreatePartitionMsg) + m2, ok := (msgs[1]).(*msgstream.CreatePartitionMsg) assert.True(t, ok) assert.Equal(t, m1.Base.Timestamp, m2.Base.Timestamp) t.Log("time tick", m1.Base.Timestamp) diff --git a/internal/masterservice/meta_table.go b/internal/masterservice/meta_table.go index 5001b10c23b9284092c101e25d8d457b604145d3..736d10582087442411d58e5e30bcd47acf1ba8cb 100644 --- a/internal/masterservice/meta_table.go +++ b/internal/masterservice/meta_table.go @@ -488,6 +488,30 @@ func (mt *metaTable) ListCollections(ts typeutil.Timestamp) (map[string]typeutil return colls, nil } +// ListCollectionVirtualChannels list virtual channel of all the collection +func (mt *metaTable) ListCollectionVirtualChannels() []string { + mt.ddLock.RLock() + defer mt.ddLock.RUnlock() + vlist := []string{} + + for _, c := range mt.collID2Meta { + vlist = append(vlist, c.VirtualChannelNames...) + } + return vlist +} + +// ListCollectionPhysicalChannels list physical channel of all the collection +func (mt *metaTable) ListCollectionPhysicalChannels() []string { + mt.ddLock.RLock() + defer mt.ddLock.RUnlock() + plist := []string{} + + for _, c := range mt.collID2Meta { + plist = append(plist, c.PhysicalChannelNames...) + } + return plist +} + func (mt *metaTable) AddPartition(collID typeutil.UniqueID, partitionName string, partitionID typeutil.UniqueID, ddOpStr func(ts typeutil.Timestamp) (string, error)) (typeutil.Timestamp, error) { mt.ddLock.Lock() defer mt.ddLock.Unlock() diff --git a/internal/masterservice/param_table.go b/internal/masterservice/param_table.go index fb293255df046818bdf5c47c7deea28b74352746..d0aa0e129765d1240e9b54a2c232408dff8ab978 100644 --- a/internal/masterservice/param_table.go +++ b/internal/masterservice/param_table.go @@ -35,7 +35,6 @@ type ParamTable struct { KvRootPath string MsgChannelSubName string TimeTickChannel string - DdChannel string StatisticsChannel string DataServiceSegmentChannel string // data service create segment, or data node flush segment @@ -68,7 +67,6 @@ func (p *ParamTable) Init() { p.initMsgChannelSubName() p.initTimeTickChannel() - p.initDdChannelName() p.initStatisticsChannelName() p.initSegmentInfoChannelName() @@ -141,14 +139,6 @@ func (p *ParamTable) initTimeTickChannel() { p.TimeTickChannel = channel } -func (p *ParamTable) initDdChannelName() { - channel, err := p.Load("msgChannel.chanNamePrefix.dataDefinition") - if err != nil { - panic(err) - } - p.DdChannel = channel -} - func (p *ParamTable) initStatisticsChannelName() { channel, err := p.Load("msgChannel.chanNamePrefix.masterStatistics") if err != nil { diff --git a/internal/masterservice/param_table_test.go b/internal/masterservice/param_table_test.go index 8c3e45f8d7703dd84aef76617348ef336ffa30c1..a3af013a48f8239d1b8b7b5d15bcb82ecd025a7b 100644 --- a/internal/masterservice/param_table_test.go +++ b/internal/masterservice/param_table_test.go @@ -38,9 +38,6 @@ func TestParamTable(t *testing.T) { assert.NotEqual(t, Params.TimeTickChannel, "") t.Logf("master time tick channel = %s", Params.TimeTickChannel) - assert.NotEqual(t, Params.DdChannel, "") - t.Logf("master dd channel = %s", Params.DdChannel) - assert.NotEqual(t, Params.StatisticsChannel, "") t.Logf("master statistics channel = %s", Params.StatisticsChannel) diff --git a/internal/masterservice/task.go b/internal/masterservice/task.go index d80c2e0f14c64a4f527219859c37938e63637f84..02722fc8d5a6b4f5faf128b6d8a5632d50c59dfd 100644 --- a/internal/masterservice/task.go +++ b/internal/masterservice/task.go @@ -236,6 +236,9 @@ func (t *CreateCollectionReqTask) Execute(ctx context.Context) error { return err } + // add dml channel before send dd msg + t.core.dmlChannels.AddProducerChannles(chanNames...) + err = t.core.SendDdCreateCollectionReq(ctx, &ddCollReq) if err != nil { return err @@ -301,6 +304,9 @@ func (t *DropCollectionReqTask) Execute(ctx context.Context) error { t.core.SendTimeTick(ts) + // remove dml channel after send dd msg + t.core.dmlChannels.RemoveProducerChannels(collMeta.PhysicalChannelNames...) + //notify query service to release collection go func() { if err = t.core.CallReleaseCollectionService(t.core.ctx, ts, 0, collMeta.ID); err != nil { diff --git a/internal/masterservice/timestamp_test.go b/internal/masterservice/timestamp_test.go index de65784a81bb7f30fbc8487dab3b595ef3b37f7e..379310f0efa60b4809bebcaf2c42857355d70d21 100644 --- a/internal/masterservice/timestamp_test.go +++ b/internal/masterservice/timestamp_test.go @@ -15,6 +15,7 @@ import ( "context" "fmt" "math/rand" + "sync" "testing" "github.com/milvus-io/milvus/internal/msgstream" @@ -26,6 +27,7 @@ import ( "github.com/milvus-io/milvus/internal/proto/proxypb" "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/types" + "github.com/milvus-io/milvus/internal/util/sessionutil" "github.com/stretchr/testify/assert" ) @@ -100,7 +102,6 @@ func BenchmarkAllocTimestamp(b *testing.B) { randVal := rand.Int() Params.TimeTickChannel = fmt.Sprintf("master-time-tick-%d", randVal) - Params.DdChannel = fmt.Sprintf("master-dd-%d", randVal) Params.StatisticsChannel = fmt.Sprintf("master-statistics-%d", randVal) Params.MetaRootPath = fmt.Sprintf("/%d/%s", randVal, Params.MetaRootPath) Params.KvRootPath = fmt.Sprintf("/%d/%s", randVal, Params.KvRootPath) @@ -115,6 +116,17 @@ func BenchmarkAllocTimestamp(b *testing.B) { err = core.SetQueryService(&tbq{}) assert.Nil(b, err) + err = core.Register() + assert.Nil(b, err) + + pnm := &proxyNodeMock{ + collArray: make([]string, 0, 16), + mutex: sync.Mutex{}, + } + core.NewProxyClient = func(*sessionutil.Session) (types.ProxyNode, error) { + return pnm, nil + } + err = core.Init() assert.Nil(b, err) diff --git a/internal/masterservice/timeticksync.go b/internal/masterservice/timeticksync.go index 4571bd714e4ee73374513d2e538f2911e7646100..22d5d58c81bf381cedfae1a3c82098a34210ff7a 100644 --- a/internal/masterservice/timeticksync.go +++ b/internal/masterservice/timeticksync.go @@ -30,7 +30,6 @@ type timetickSync struct { core *Core lock sync.Mutex proxyTimeTick map[typeutil.UniqueID]*internalpb.ChannelTimeTickMsg - chanStream map[string]msgstream.MsgStream sendChan chan map[typeutil.UniqueID]*internalpb.ChannelTimeTickMsg } @@ -39,7 +38,6 @@ func newTimeTickSync(core *Core) *timetickSync { lock: sync.Mutex{}, core: core, proxyTimeTick: make(map[typeutil.UniqueID]*internalpb.ChannelTimeTickMsg), - chanStream: make(map[string]msgstream.MsgStream), sendChan: make(chan map[typeutil.UniqueID]*internalpb.ChannelTimeTickMsg, 16), } } @@ -157,23 +155,7 @@ func (t *timetickSync) SendChannelTimeTick(chanName string, ts typeutil.Timestam } msgPack.Msgs = append(msgPack.Msgs, timeTickMsg) - t.lock.Lock() - defer t.lock.Unlock() - - // send timetick msg to msg stream - var err error - var stream msgstream.MsgStream - stream, ok := t.chanStream[chanName] - if !ok { - stream, err = t.core.msFactory.NewMsgStream(t.core.ctx) - if err != nil { - return err - } - stream.AsProducer([]string{chanName}) - t.chanStream[chanName] = stream - } - - err = stream.Broadcast(&msgPack) + err := t.core.dmlChannels.Broadcast(chanName, &msgPack) if err == nil { metrics.MasterInsertChannelTimeTick.WithLabelValues(chanName).Set(float64(tsoutil.Mod24H(ts))) } @@ -189,7 +171,5 @@ func (t *timetickSync) GetProxyNodeNum() int { // GetChanNum return the num of channel func (t *timetickSync) GetChanNum() int { - t.lock.Lock() - defer t.lock.Unlock() - return len(t.chanStream) + return t.core.dmlChannels.GetNumChannles() } diff --git a/internal/proto/master.proto b/internal/proto/master.proto index 69b442379293a01b02a48c0ffea0e8d2970fd1e3..fa6827d5f12037e3090ec873316dc22422018e56 100644 --- a/internal/proto/master.proto +++ b/internal/proto/master.proto @@ -94,8 +94,6 @@ service MasterService { rpc AllocTimestamp(AllocTimestampRequest) returns (AllocTimestampResponse) {} rpc AllocID(AllocIDRequest) returns (AllocIDResponse) {} rpc UpdateChannelTimeTick(internal.ChannelTimeTickMsg) returns (common.Status) {} - - rpc GetDdChannel(internal.GetDdChannelRequest) returns (milvus.StringResponse) {} } message AllocTimestampRequest { diff --git a/internal/proto/masterpb/master.pb.go b/internal/proto/masterpb/master.pb.go index 59f7d0504520098cf3ef2d90dfe40e03b6f93869..2d43633087ededa7c82e788c9d46fdc72e51e7eb 100644 --- a/internal/proto/masterpb/master.pb.go +++ b/internal/proto/masterpb/master.pb.go @@ -241,52 +241,51 @@ func init() { func init() { proto.RegisterFile("master.proto", fileDescriptor_f9c348dec43a6705) } var fileDescriptor_f9c348dec43a6705 = []byte{ - // 710 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xac, 0x96, 0x5b, 0x4f, 0xdb, 0x3c, - 0x18, 0xc7, 0x69, 0xe1, 0xe5, 0x15, 0x0f, 0x6d, 0x41, 0x1e, 0x4c, 0xa8, 0xe3, 0x82, 0x95, 0x1d, - 0x5a, 0x60, 0x2d, 0x02, 0xed, 0x03, 0x8c, 0x56, 0x82, 0x5e, 0x20, 0x6d, 0x2d, 0x4c, 0x3b, 0x08, - 0x21, 0x37, 0x58, 0xad, 0x45, 0x62, 0x87, 0xd8, 0x85, 0xdd, 0xee, 0xd3, 0xee, 0x6b, 0x4c, 0x89, - 0x63, 0x37, 0x69, 0x0e, 0x4b, 0xb5, 0xdd, 0xc5, 0xf1, 0xcf, 0xff, 0xbf, 0x9f, 0x83, 0xf4, 0x18, + // 694 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xac, 0x96, 0x5d, 0x4f, 0xdb, 0x3e, + 0x14, 0xc6, 0x69, 0xe1, 0xcf, 0x5f, 0x1c, 0xda, 0x82, 0x3c, 0x98, 0x50, 0xc7, 0x05, 0x2b, 0x7b, + 0x69, 0x81, 0xb5, 0x08, 0xb4, 0x0f, 0x30, 0x5a, 0x09, 0x7a, 0x81, 0xb4, 0xb5, 0x30, 0xed, 0x45, + 0x08, 0xb9, 0xc1, 0x6a, 0x2d, 0x12, 0x3b, 0xc4, 0x2e, 0xec, 0x76, 0xd2, 0x3e, 0xf8, 0x94, 0x38, + 0x76, 0x93, 0xe6, 0x65, 0xa9, 0xb6, 0xbb, 0xba, 0xf9, 0xf9, 0x79, 0x7c, 0xce, 0x73, 0x24, 0x1b, 0x2a, 0x0e, 0x16, 0x92, 0x78, 0x6d, 0xd7, 0xe3, 0x92, 0xa3, 0x67, 0x0e, 0xb5, 0x1f, 0xa7, 0x42, - 0xad, 0xda, 0x6a, 0xab, 0x5e, 0xb1, 0xb8, 0xe3, 0x70, 0xa6, 0x7e, 0xd6, 0x2b, 0x51, 0xa4, 0x5e, - 0xa3, 0x4c, 0x12, 0x8f, 0x61, 0x5b, 0xad, 0x1b, 0xb7, 0xb0, 0xfd, 0xc1, 0xb6, 0xb9, 0x75, 0x45, - 0x1d, 0x22, 0x24, 0x76, 0xdc, 0x01, 0x79, 0x98, 0x12, 0x21, 0xd1, 0x31, 0xac, 0x8c, 0xb0, 0x20, - 0x3b, 0xa5, 0xbd, 0x52, 0x73, 0xfd, 0x64, 0xb7, 0x1d, 0x33, 0x0a, 0x0d, 0x2e, 0xc5, 0xf8, 0x0c, - 0x0b, 0x32, 0x08, 0x48, 0xb4, 0x05, 0xff, 0x59, 0x7c, 0xca, 0xe4, 0xce, 0xf2, 0x5e, 0xa9, 0x59, - 0x1d, 0xa8, 0x45, 0xe3, 0x67, 0x09, 0x9e, 0xcf, 0x3b, 0x08, 0x97, 0x33, 0x41, 0xd0, 0x29, 0xac, - 0x0a, 0x89, 0xe5, 0x54, 0x84, 0x26, 0x2f, 0x52, 0x4d, 0x86, 0x01, 0x32, 0x08, 0x51, 0xb4, 0x0b, - 0x6b, 0x52, 0x2b, 0xed, 0x94, 0xf7, 0x4a, 0xcd, 0x95, 0xc1, 0xec, 0x47, 0xc6, 0x1d, 0xbe, 0x40, - 0x2d, 0xb8, 0x42, 0xbf, 0xf7, 0x0f, 0xa2, 0x2b, 0x47, 0x95, 0x6d, 0xd8, 0x30, 0xca, 0x7f, 0x13, - 0x55, 0x0d, 0xca, 0xfd, 0x5e, 0x20, 0xbd, 0x3c, 0x28, 0xf7, 0x7b, 0xe9, 0x71, 0x9c, 0xfc, 0xda, - 0x84, 0xea, 0x65, 0x50, 0xe3, 0x21, 0xf1, 0x1e, 0xa9, 0x45, 0x90, 0x0b, 0xe8, 0x9c, 0xc8, 0x2e, - 0x77, 0x5c, 0xce, 0x08, 0x93, 0xbe, 0x2a, 0x11, 0xe8, 0x38, 0x6e, 0x69, 0x4a, 0x9e, 0x44, 0xc3, - 0x7c, 0xd4, 0xdf, 0x64, 0x9c, 0x98, 0xc3, 0x1b, 0x4b, 0xc8, 0x09, 0x1c, 0xfd, 0x62, 0x5e, 0x51, - 0xeb, 0xbe, 0x3b, 0xc1, 0x8c, 0x11, 0x3b, 0xcf, 0x71, 0x0e, 0xd5, 0x8e, 0xfb, 0xf1, 0x13, 0xe1, - 0x62, 0x28, 0x3d, 0xca, 0xc6, 0x3a, 0x97, 0x8d, 0x25, 0xf4, 0x00, 0x5b, 0xe7, 0x24, 0x70, 0xa7, + 0xad, 0xda, 0xea, 0x53, 0xbd, 0x62, 0x71, 0xc7, 0xe1, 0x4c, 0xfd, 0x59, 0xaf, 0x44, 0x91, 0x7a, + 0x8d, 0x32, 0x49, 0x3c, 0x86, 0x6d, 0xb5, 0x6e, 0xdc, 0xc2, 0xf6, 0x07, 0xdb, 0xe6, 0xd6, 0x15, + 0x75, 0x88, 0x90, 0xd8, 0x71, 0x07, 0xe4, 0x61, 0x4a, 0x84, 0x44, 0xc7, 0xb0, 0x32, 0xc2, 0x82, + 0xec, 0x94, 0xf6, 0x4a, 0xcd, 0xf5, 0x93, 0xdd, 0x76, 0xcc, 0x28, 0x34, 0xb8, 0x14, 0xe3, 0x33, + 0x2c, 0xc8, 0x20, 0x20, 0xd1, 0x16, 0xfc, 0x67, 0xf1, 0x29, 0x93, 0x3b, 0xcb, 0x7b, 0xa5, 0x66, + 0x75, 0xa0, 0x16, 0x8d, 0x9f, 0x25, 0x78, 0x3e, 0xef, 0x20, 0x5c, 0xce, 0x04, 0x41, 0xa7, 0xb0, + 0x2a, 0x24, 0x96, 0x53, 0x11, 0x9a, 0xbc, 0x48, 0x35, 0x19, 0x06, 0xc8, 0x20, 0x44, 0xd1, 0x2e, + 0xac, 0x49, 0xad, 0xb4, 0x53, 0xde, 0x2b, 0x35, 0x57, 0x06, 0xb3, 0x3f, 0x32, 0xce, 0xf0, 0x05, + 0x6a, 0xc1, 0x11, 0xfa, 0xbd, 0x7f, 0x50, 0x5d, 0x39, 0xaa, 0x6c, 0xc3, 0x86, 0x51, 0xfe, 0x9b, + 0xaa, 0x6a, 0x50, 0xee, 0xf7, 0x02, 0xe9, 0xe5, 0x41, 0xb9, 0xdf, 0x4b, 0xaf, 0xe3, 0xe4, 0xd7, + 0x26, 0x54, 0x2f, 0x83, 0x8c, 0x87, 0xc4, 0x7b, 0xa4, 0x16, 0x41, 0x2e, 0xa0, 0x73, 0x22, 0xbb, + 0xdc, 0x71, 0x39, 0x23, 0x4c, 0xfa, 0xaa, 0x44, 0xa0, 0xe3, 0xb8, 0xa5, 0x89, 0x3c, 0x89, 0x86, + 0xfd, 0xa8, 0xbf, 0xc9, 0xd8, 0x31, 0x87, 0x37, 0x96, 0x90, 0x13, 0x38, 0xfa, 0x61, 0x5e, 0x51, + 0xeb, 0xbe, 0x3b, 0xc1, 0x8c, 0x11, 0x3b, 0xcf, 0x71, 0x0e, 0xd5, 0x8e, 0xfb, 0xf1, 0x1d, 0xe1, + 0x62, 0x28, 0x3d, 0xca, 0xc6, 0xba, 0x97, 0x8d, 0x25, 0xf4, 0x00, 0x5b, 0xe7, 0x24, 0x70, 0xa7, 0x42, 0x52, 0x4b, 0x68, 0xc3, 0x93, 0x6c, 0xc3, 0x04, 0xbc, 0xa0, 0xe5, 0x2d, 0x6c, 0x76, 0x3d, - 0x82, 0x25, 0xe9, 0x72, 0xdb, 0x26, 0x96, 0xa4, 0x9c, 0xa1, 0xa3, 0xd4, 0xa3, 0xf3, 0x98, 0x36, - 0xca, 0x2b, 0x79, 0x63, 0x09, 0x7d, 0x87, 0x5a, 0xcf, 0xe3, 0x6e, 0x44, 0xfe, 0x20, 0x55, 0x3e, - 0x0e, 0x15, 0x14, 0xbf, 0x85, 0xea, 0x05, 0x16, 0x11, 0xed, 0x56, 0xaa, 0x76, 0x8c, 0xd1, 0xd2, - 0x2f, 0x53, 0xd1, 0x33, 0xce, 0xed, 0x48, 0x7a, 0x9e, 0x00, 0xf5, 0x88, 0xb0, 0x3c, 0x3a, 0x8a, - 0x26, 0xa8, 0x9d, 0x1e, 0x41, 0x02, 0xd4, 0x56, 0x9d, 0xc2, 0xbc, 0x31, 0x66, 0xb0, 0x31, 0x9c, - 0xf0, 0xa7, 0xd9, 0x9e, 0x40, 0x87, 0xe9, 0x15, 0x8d, 0x53, 0xda, 0xf2, 0xa8, 0x18, 0x6c, 0xfc, - 0x6e, 0x60, 0x43, 0x15, 0xf8, 0x23, 0xf6, 0x24, 0x0d, 0xa2, 0x3c, 0xcc, 0x69, 0x03, 0x43, 0x15, - 0x2c, 0xd4, 0x57, 0xa8, 0xfa, 0x05, 0x9e, 0x89, 0xb7, 0x32, 0x9b, 0x60, 0x51, 0xe9, 0x1b, 0xa8, - 0x5c, 0x60, 0x31, 0x53, 0x6e, 0x66, 0xb5, 0x40, 0x42, 0xb8, 0x50, 0x07, 0xdc, 0x43, 0xcd, 0xcf, - 0x9a, 0x39, 0x2c, 0x32, 0xfa, 0x37, 0x0e, 0x69, 0x8b, 0xc3, 0x42, 0x6c, 0xb4, 0xea, 0xba, 0x2b, - 0x86, 0x64, 0xec, 0x10, 0x26, 0x33, 0xaa, 0x30, 0x47, 0xe5, 0x57, 0x3d, 0x01, 0x1b, 0x3f, 0x02, - 0x15, 0xff, 0x2e, 0xe1, 0x86, 0xc8, 0xc8, 0x5d, 0x14, 0xd1, 0x4e, 0xad, 0x02, 0xa4, 0xb1, 0xb9, - 0x86, 0x75, 0xd5, 0x36, 0x7d, 0x76, 0x47, 0x7e, 0xa0, 0xb7, 0x39, 0x8d, 0x15, 0x10, 0x05, 0x2b, - 0x3f, 0x81, 0xaa, 0x0e, 0x4d, 0x09, 0xb7, 0x72, 0xc3, 0x8f, 0x49, 0x1f, 0x14, 0x41, 0x4d, 0x00, - 0x9f, 0x60, 0xcd, 0x6f, 0x4d, 0xe5, 0xf2, 0x3a, 0xb3, 0x75, 0x17, 0xb9, 0xfc, 0x7d, 0x38, 0xa6, - 0xcd, 0x4b, 0x21, 0xd1, 0x57, 0xea, 0xe9, 0x93, 0xfa, 0x60, 0x49, 0xf4, 0x55, 0x3a, 0x6b, 0xee, - 0xff, 0x19, 0xfe, 0x0f, 0x27, 0x37, 0xda, 0xcf, 0x3e, 0x69, 0x5e, 0x0c, 0xf5, 0x57, 0xf9, 0x90, - 0xd1, 0xc5, 0xb0, 0x7d, 0xed, 0xde, 0xf9, 0x63, 0x41, 0x0d, 0x1f, 0x3d, 0xfe, 0xe6, 0x2b, 0x31, - 0x1b, 0xb1, 0x71, 0xee, 0x52, 0x8c, 0xff, 0x94, 0x27, 0x0c, 0x95, 0x73, 0x22, 0x7b, 0x77, 0x7a, - 0x16, 0x1e, 0x64, 0xcf, 0x42, 0x03, 0x2d, 0x36, 0x03, 0xcf, 0xde, 0x7f, 0x3b, 0x1d, 0x53, 0x39, - 0x99, 0x8e, 0x7c, 0xf3, 0x8e, 0xa2, 0xde, 0x51, 0x1e, 0x7e, 0x75, 0xb4, 0x45, 0x27, 0x50, 0xe9, - 0xa8, 0x64, 0xb8, 0xa3, 0xd1, 0x6a, 0xb0, 0x3e, 0xfd, 0x1d, 0x00, 0x00, 0xff, 0xff, 0xee, 0x13, - 0xfc, 0x7b, 0xa5, 0x0a, 0x00, 0x00, + 0x82, 0x25, 0xe9, 0x72, 0xdb, 0x26, 0x96, 0xa4, 0x9c, 0xa1, 0xa3, 0xd4, 0xad, 0xf3, 0x98, 0x36, + 0xca, 0x8b, 0xbc, 0xb1, 0x84, 0xbe, 0x43, 0xad, 0xe7, 0x71, 0x37, 0x22, 0x7f, 0x90, 0x2a, 0x1f, + 0x87, 0x0a, 0x8a, 0xdf, 0x42, 0xf5, 0x02, 0x8b, 0x88, 0x76, 0x2b, 0x55, 0x3b, 0xc6, 0x68, 0xe9, + 0x97, 0xa9, 0xe8, 0x19, 0xe7, 0x76, 0xa4, 0x3d, 0x4f, 0x80, 0x7a, 0x44, 0x58, 0x1e, 0x1d, 0x45, + 0x1b, 0xd4, 0x4e, 0xaf, 0x20, 0x01, 0x6a, 0xab, 0x4e, 0x61, 0xde, 0x18, 0x33, 0xd8, 0x18, 0x4e, + 0xf8, 0xd3, 0xec, 0x9b, 0x40, 0x87, 0xe9, 0x89, 0xc6, 0x29, 0x6d, 0x79, 0x54, 0x0c, 0x36, 0x7e, + 0x37, 0xb0, 0xa1, 0x02, 0xfe, 0x88, 0x3d, 0x49, 0x83, 0x2a, 0x0f, 0x73, 0xc6, 0xc0, 0x50, 0x05, + 0x83, 0xfa, 0x0a, 0x55, 0x3f, 0xe0, 0x99, 0x78, 0x2b, 0x73, 0x08, 0x16, 0x95, 0xbe, 0x81, 0xca, + 0x05, 0x16, 0x33, 0xe5, 0x66, 0xd6, 0x08, 0x24, 0x84, 0x0b, 0x4d, 0xc0, 0x3d, 0xd4, 0xfc, 0xae, + 0x99, 0xcd, 0x22, 0x63, 0x7e, 0xe3, 0x90, 0xb6, 0x38, 0x2c, 0xc4, 0x46, 0x53, 0xd7, 0x53, 0x31, + 0x24, 0x63, 0x87, 0x30, 0x99, 0x91, 0xc2, 0x1c, 0x95, 0x9f, 0x7a, 0x02, 0x36, 0x7e, 0x04, 0x2a, + 0xfe, 0x59, 0xc2, 0x0f, 0x22, 0xa3, 0x77, 0x51, 0x44, 0x3b, 0xb5, 0x0a, 0x90, 0xc6, 0xe6, 0x1a, + 0xd6, 0xd5, 0xd8, 0xf4, 0xd9, 0x1d, 0xf9, 0x81, 0xde, 0xe6, 0x0c, 0x56, 0x40, 0x14, 0x4c, 0x7e, + 0x02, 0x55, 0x5d, 0x9a, 0x12, 0x6e, 0xe5, 0x96, 0x1f, 0x93, 0x3e, 0x28, 0x82, 0x9a, 0x02, 0x3e, + 0xc1, 0x9a, 0x3f, 0x9a, 0xca, 0xe5, 0x75, 0xe6, 0xe8, 0x2e, 0x72, 0xf8, 0xfb, 0xf0, 0x9a, 0x36, + 0x2f, 0x85, 0xc4, 0x5c, 0xa9, 0xa7, 0x4f, 0xea, 0x83, 0x25, 0x31, 0x57, 0xe9, 0xac, 0x39, 0xff, + 0x67, 0xf8, 0x3f, 0xbc, 0xb9, 0xd1, 0x7e, 0xf6, 0x4e, 0xf3, 0x62, 0xa8, 0xbf, 0xca, 0x87, 0x8c, + 0x2e, 0x86, 0xed, 0x6b, 0xf7, 0xce, 0xbf, 0x16, 0xd4, 0xe5, 0xa3, 0xaf, 0xbf, 0xf9, 0x24, 0x66, + 0x57, 0x6c, 0x9c, 0xbb, 0x14, 0xe3, 0x3f, 0xf4, 0xe9, 0xec, 0xfd, 0xb7, 0xd3, 0x31, 0x95, 0x93, + 0xe9, 0xc8, 0xff, 0xd2, 0x51, 0xe8, 0x3b, 0xca, 0xc3, 0x5f, 0x1d, 0xad, 0xdc, 0x09, 0x76, 0x77, + 0xd4, 0x49, 0xdd, 0xd1, 0x68, 0x35, 0x58, 0x9f, 0xfe, 0x0e, 0x00, 0x00, 0xff, 0xff, 0x99, 0x60, + 0x10, 0x62, 0x42, 0x0a, 0x00, 0x00, } // Reference imports to suppress errors if they are not otherwise used. @@ -367,7 +366,6 @@ type MasterServiceClient interface { AllocTimestamp(ctx context.Context, in *AllocTimestampRequest, opts ...grpc.CallOption) (*AllocTimestampResponse, error) AllocID(ctx context.Context, in *AllocIDRequest, opts ...grpc.CallOption) (*AllocIDResponse, error) UpdateChannelTimeTick(ctx context.Context, in *internalpb.ChannelTimeTickMsg, opts ...grpc.CallOption) (*commonpb.Status, error) - GetDdChannel(ctx context.Context, in *internalpb.GetDdChannelRequest, opts ...grpc.CallOption) (*milvuspb.StringResponse, error) } type masterServiceClient struct { @@ -558,15 +556,6 @@ func (c *masterServiceClient) UpdateChannelTimeTick(ctx context.Context, in *int return out, nil } -func (c *masterServiceClient) GetDdChannel(ctx context.Context, in *internalpb.GetDdChannelRequest, opts ...grpc.CallOption) (*milvuspb.StringResponse, error) { - out := new(milvuspb.StringResponse) - err := c.cc.Invoke(ctx, "/milvus.proto.master.MasterService/GetDdChannel", in, out, opts...) - if err != nil { - return nil, err - } - return out, nil -} - // MasterServiceServer is the server API for MasterService service. type MasterServiceServer interface { GetComponentStates(context.Context, *internalpb.GetComponentStatesRequest) (*internalpb.ComponentStates, error) @@ -635,7 +624,6 @@ type MasterServiceServer interface { AllocTimestamp(context.Context, *AllocTimestampRequest) (*AllocTimestampResponse, error) AllocID(context.Context, *AllocIDRequest) (*AllocIDResponse, error) UpdateChannelTimeTick(context.Context, *internalpb.ChannelTimeTickMsg) (*commonpb.Status, error) - GetDdChannel(context.Context, *internalpb.GetDdChannelRequest) (*milvuspb.StringResponse, error) } // UnimplementedMasterServiceServer can be embedded to have forward compatible implementations. @@ -702,9 +690,6 @@ func (*UnimplementedMasterServiceServer) AllocID(ctx context.Context, req *Alloc func (*UnimplementedMasterServiceServer) UpdateChannelTimeTick(ctx context.Context, req *internalpb.ChannelTimeTickMsg) (*commonpb.Status, error) { return nil, status.Errorf(codes.Unimplemented, "method UpdateChannelTimeTick not implemented") } -func (*UnimplementedMasterServiceServer) GetDdChannel(ctx context.Context, req *internalpb.GetDdChannelRequest) (*milvuspb.StringResponse, error) { - return nil, status.Errorf(codes.Unimplemented, "method GetDdChannel not implemented") -} func RegisterMasterServiceServer(s *grpc.Server, srv MasterServiceServer) { s.RegisterService(&_MasterService_serviceDesc, srv) @@ -1070,24 +1055,6 @@ func _MasterService_UpdateChannelTimeTick_Handler(srv interface{}, ctx context.C return interceptor(ctx, in, info, handler) } -func _MasterService_GetDdChannel_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(internalpb.GetDdChannelRequest) - if err := dec(in); err != nil { - return nil, err - } - if interceptor == nil { - return srv.(MasterServiceServer).GetDdChannel(ctx, in) - } - info := &grpc.UnaryServerInfo{ - Server: srv, - FullMethod: "/milvus.proto.master.MasterService/GetDdChannel", - } - handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(MasterServiceServer).GetDdChannel(ctx, req.(*internalpb.GetDdChannelRequest)) - } - return interceptor(ctx, in, info, handler) -} - var _MasterService_serviceDesc = grpc.ServiceDesc{ ServiceName: "milvus.proto.master.MasterService", HandlerType: (*MasterServiceServer)(nil), @@ -1172,10 +1139,6 @@ var _MasterService_serviceDesc = grpc.ServiceDesc{ MethodName: "UpdateChannelTimeTick", Handler: _MasterService_UpdateChannelTimeTick_Handler, }, - { - MethodName: "GetDdChannel", - Handler: _MasterService_GetDdChannel_Handler, - }, }, Streams: []grpc.StreamDesc{}, Metadata: "master.proto", diff --git a/internal/types/types.go b/internal/types/types.go index 8299b453220c52c69acbcd34cefb6743333605fa..5bf9e6643ab0c5dea9a838d5c927dc3147d99ed8 100644 --- a/internal/types/types.go +++ b/internal/types/types.go @@ -111,8 +111,6 @@ type MasterService interface { //segment DescribeSegment(ctx context.Context, req *milvuspb.DescribeSegmentRequest) (*milvuspb.DescribeSegmentResponse, error) ShowSegments(ctx context.Context, req *milvuspb.ShowSegmentsRequest) (*milvuspb.ShowSegmentsResponse, error) - - GetDdChannel(ctx context.Context) (*milvuspb.StringResponse, error) } // MasterComponent is used by grpc server of master service