diff --git a/internal/distributed/masterservice/server.go b/internal/distributed/masterservice/server.go index 56d4329cf1c0c2176bd9b8069fd2cc04cb8e1bf2..0bab777679d5addaa0e1b3e62df25962cd30af26 100644 --- a/internal/distributed/masterservice/server.go +++ b/internal/distributed/masterservice/server.go @@ -108,9 +108,6 @@ func (s *Server) setClient() { if err := dsClient.Start(); err != nil { panic(err) } - if err := funcutil.WaitForComponentInitOrHealthy(ctx, dsClient, "DataService", 1000000, 200*time.Millisecond); err != nil { - panic(err) - } return dsClient } s.newIndexServiceClient = func(s, metaRootPath, etcdAddress string, timeout time.Duration) types.IndexService { diff --git a/internal/masterservice/master_service.go b/internal/masterservice/master_service.go index 11eec3f0ffa48f4f9ecbe5fb3f5f45728f80611b..c604176f0c29ec71954582d0f40d8ca6bb8c2e1d 100644 --- a/internal/masterservice/master_service.go +++ b/internal/masterservice/master_service.go @@ -709,10 +709,6 @@ func (c *Core) setMsgStreams() error { } }() - if Params.DataServiceSegmentChannel == "" { - return fmt.Errorf("DataServiceSegmentChannel is empty") - } - // data service will put msg into this channel when create segment dsChanName := Params.DataServiceSegmentChannel dsSubName := Params.MsgChannelSubName + "ds" @@ -755,13 +751,6 @@ func (c *Core) SetNewProxyClient(f func(sess *sessionutil.Session) (types.ProxyN } func (c *Core) SetDataService(ctx context.Context, s types.DataService) error { - rsp, err := s.GetSegmentInfoChannel(ctx) - if err != nil { - return err - } - Params.DataServiceSegmentChannel = rsp.Value - log.Debug("data service segment", zap.String("channel name", Params.DataServiceSegmentChannel)) - c.CallGetBinlogFilePathsService = func(segID typeutil.UniqueID, fieldID typeutil.UniqueID) (retFiles []string, retErr error) { defer func() { if err := recover(); err != nil { diff --git a/internal/masterservice/param_table.go b/internal/masterservice/param_table.go index 3c0636693cca8c65afd1825e8c9d77ca05a0bbb5..7231fd02afeceb2c66aa30b46ed3704c92f2a06e 100644 --- a/internal/masterservice/param_table.go +++ b/internal/masterservice/param_table.go @@ -38,7 +38,7 @@ type ParamTable struct { TimeTickChannel string DdChannel string StatisticsChannel string - DataServiceSegmentChannel string // get from data service, data service create segment, or data node flush segment + DataServiceSegmentChannel string // data service create segment, or data node flush segment MaxPartitionNum int64 DefaultPartitionName string @@ -71,6 +71,7 @@ func (p *ParamTable) Init() { p.initTimeTickChannel() p.initDdChannelName() p.initStatisticsChannelName() + p.initSegmentInfoChannelName() p.initMaxPartitionNum() p.initMinSegmentSizeToEnableIndex() @@ -157,6 +158,14 @@ func (p *ParamTable) initStatisticsChannelName() { p.StatisticsChannel = channel } +func (p *ParamTable) initSegmentInfoChannelName() { + channel, err := p.Load("msgChannel.chanNamePrefix.dataServiceSegmentInfo") + if err != nil { + panic(err) + } + p.DataServiceSegmentChannel = channel +} + func (p *ParamTable) initMaxPartitionNum() { p.MaxPartitionNum = p.ParseInt64("master.maxPartitionNum") } diff --git a/internal/msgstream/mq_msgstream_test.go b/internal/msgstream/mq_msgstream_test.go index d395d2e26b7de917b522ecc95ab811c61d3062cf..19c7daefb7e4097e83ad9f539b217f633bd58598 100644 --- a/internal/msgstream/mq_msgstream_test.go +++ b/internal/msgstream/mq_msgstream_test.go @@ -191,7 +191,7 @@ func getTimeTickMsg(reqID UniqueID) TsMsg { } // Generate MsgPack contains 'num' msgs, with timestamp in (start, end) -func getInsertMsgPack(num int, start int, end int) *MsgPack { +func getRandInsertMsgPack(num int, start int, end int) *MsgPack { Rand := rand.New(rand.NewSource(time.Now().UnixNano())) set := make(map[int]bool) msgPack := MsgPack{} @@ -206,6 +206,14 @@ func getInsertMsgPack(num int, start int, end int) *MsgPack { return &msgPack } +func getInsertMsgPack(ts []int) *MsgPack { + msgPack := MsgPack{} + for i := 0; i < len(ts); i++ { + msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_Insert, int64(ts[i]))) + } + return &msgPack +} + func getTimeTickMsgPack(reqID UniqueID) *MsgPack { msgPack := MsgPack{} msgPack.Msgs = append(msgPack.Msgs, getTimeTickMsg(reqID)) @@ -712,13 +720,29 @@ func TestStream_PulsarTtMsgStream_UnMarshalHeader(t *testing.T) { outputStream.Close() } -func createMsgPacks(msgsInPack int, numOfMsgPack int, deltaTs int) []*MsgPack { +func createRandMsgPacks(msgsInPack int, numOfMsgPack int, deltaTs int) []*MsgPack { + msgPacks := make([]*MsgPack, numOfMsgPack) + + // generate MsgPack + for i := 0; i < numOfMsgPack; i++ { + if i%2 == 0 { + msgPacks[i] = getRandInsertMsgPack(msgsInPack, i/2*deltaTs, (i/2+2)*deltaTs+2) + } else { + msgPacks[i] = getTimeTickMsgPack(int64((i + 1) / 2 * deltaTs)) + } + } + msgPacks = append(msgPacks, nil) + msgPacks = append(msgPacks, getTimeTickMsgPack(int64(numOfMsgPack*deltaTs))) + return msgPacks +} + +func createMsgPacks(ts [][]int, numOfMsgPack int, deltaTs int) []*MsgPack { msgPacks := make([]*MsgPack, numOfMsgPack) // generate MsgPack for i := 0; i < numOfMsgPack; i++ { if i%2 == 0 { - msgPacks[i] = getInsertMsgPack(msgsInPack, i/2*deltaTs, (i/2+2)*deltaTs+2) + msgPacks[i] = getInsertMsgPack(ts[i/2]) } else { msgPacks[i] = getTimeTickMsgPack(int64((i + 1) / 2 * deltaTs)) } @@ -774,11 +798,11 @@ func TestStream_PulsarTtMsgStream_1(t *testing.T) { consumerSubName := funcutil.RandomString(8) inputStream1 := getPulsarInputStream(pulsarAddr, p1Channels) - msgPacks1 := createMsgPacks(3, 10, 10) + msgPacks1 := createRandMsgPacks(3, 10, 10) assert.Nil(t, sendMsgPacks(inputStream1, msgPacks1)) inputStream2 := getPulsarInputStream(pulsarAddr, p2Channels) - msgPacks2 := createMsgPacks(5, 10, 10) + msgPacks2 := createRandMsgPacks(5, 10, 10) assert.Nil(t, sendMsgPacks(inputStream2, msgPacks2)) // consume msg @@ -836,11 +860,11 @@ func TestStream_PulsarTtMsgStream_2(t *testing.T) { consumerSubName := funcutil.RandomString(8) inputStream1 := getPulsarInputStream(pulsarAddr, p1Channels) - msgPacks1 := createMsgPacks(3, 10, 10) + msgPacks1 := createRandMsgPacks(3, 10, 10) assert.Nil(t, sendMsgPacks(inputStream1, msgPacks1)) inputStream2 := getPulsarInputStream(pulsarAddr, p2Channels) - msgPacks2 := createMsgPacks(5, 10, 10) + msgPacks2 := createRandMsgPacks(5, 10, 10) assert.Nil(t, sendMsgPacks(inputStream2, msgPacks2)) // consume msg