diff --git a/internal/msgstream/rmqms/rmq_msgstream.go b/internal/msgstream/rmqms/rmq_msgstream.go index db7ff5749177010e8e1381b287c3e9a6410e5f82..6bcaa945855bb792e81f5d1704d982272dd724be 100644 --- a/internal/msgstream/rmqms/rmq_msgstream.go +++ b/internal/msgstream/rmqms/rmq_msgstream.go @@ -4,6 +4,7 @@ import ( "context" "errors" "log" + "path/filepath" "reflect" "strconv" "sync" @@ -28,15 +29,13 @@ type QueryNodeStatsMsg = msgstream.QueryNodeStatsMsg type RepackFunc = msgstream.RepackFunc type RmqMsgStream struct { - isServing int64 - ctx context.Context - serverLoopWg sync.WaitGroup - serverLoopCtx context.Context - serverLoopCancel func() + isServing int64 + ctx context.Context - repackFunc msgstream.RepackFunc - consumers []rocksmq.Consumer - producers []string + repackFunc msgstream.RepackFunc + consumers []rocksmq.Consumer + consumerChannels []string + producers []string unmarshal msgstream.UnmarshalDispatcher receiveBuf chan *msgstream.MsgPack @@ -44,6 +43,7 @@ type RmqMsgStream struct { // tso ticker streamCancel func() rmqBufSize int64 + consumerLock *sync.Mutex consumerReflects []reflect.SelectCase } @@ -52,6 +52,7 @@ func newRmqMsgStream(ctx context.Context, receiveBufSize int64, rmqBufSize int64 streamCtx, streamCancel := context.WithCancel(ctx) receiveBuf := make(chan *msgstream.MsgPack, receiveBufSize) + consumerChannels := make([]string, 0) consumerReflects := make([]reflect.SelectCase, 0) stream := &RmqMsgStream{ ctx: streamCtx, @@ -59,6 +60,7 @@ func newRmqMsgStream(ctx context.Context, receiveBufSize int64, rmqBufSize int64 unmarshal: unmarshal, streamCancel: streamCancel, rmqBufSize: rmqBufSize, + consumerChannels: consumerChannels, consumerReflects: consumerReflects, } @@ -114,6 +116,7 @@ func (ms *RmqMsgStream) AsConsumer(channels []string, groupName string) { } consumer.MsgNum = make(chan int, ms.rmqBufSize) ms.consumers = append(ms.consumers, *consumer) + ms.consumerChannels = append(ms.consumerChannels, channelName) ms.consumerReflects = append(ms.consumerReflects, reflect.SelectCase{ Dir: reflect.SelectRecv, Chan: reflect.ValueOf(consumer.MsgNum), @@ -339,3 +342,273 @@ func (ms *RmqMsgStream) Seek(offset *msgstream.MsgPosition) error { return errors.New("msgStream seek fail") } + +type RmqTtMsgStream struct { + RmqMsgStream + unsolvedBuf map[rocksmq.Consumer][]TsMsg + unsolvedMutex *sync.Mutex + lastTimeStamp Timestamp +} + +func NewRmqTtMsgStream(ctx context.Context, receiveBufSize int64, rmqBufSize int64, + unmarshal msgstream.UnmarshalDispatcher) (*RmqTtMsgStream, error) { + rmqMsgStream, err := newRmqMsgStream(ctx, receiveBufSize, rmqBufSize, unmarshal) + if err != nil { + return nil, err + } + unsolvedBuf := make(map[rocksmq.Consumer][]TsMsg) + return &RmqTtMsgStream{ + RmqMsgStream: *rmqMsgStream, + unsolvedBuf: unsolvedBuf, + unsolvedMutex: &sync.Mutex{}, + }, nil +} + +func (ms *RmqTtMsgStream) AsConsumer(channels []string, + groupName string) { + for _, channelName := range channels { + consumer, err := rocksmq.Rmq.CreateConsumerGroup(groupName, channelName) + if err != nil { + panic(err.Error()) + } + consumer.MsgNum = make(chan int, ms.rmqBufSize) + ms.consumers = append(ms.consumers, *consumer) + ms.consumerReflects = append(ms.consumerReflects, reflect.SelectCase{ + Dir: reflect.SelectRecv, + Chan: reflect.ValueOf(consumer.MsgNum), + }) + } +} + +func (ms *RmqTtMsgStream) Start() { + ms.wait = &sync.WaitGroup{} + if ms.consumers != nil { + ms.wait.Add(1) + go ms.bufMsgPackToChannel() + } +} + +func (ms *RmqTtMsgStream) bufMsgPackToChannel() { + defer ms.wait.Done() + ms.unsolvedBuf = make(map[rocksmq.Consumer][]TsMsg) + isChannelReady := make(map[rocksmq.Consumer]bool) + eofMsgTimeStamp := make(map[rocksmq.Consumer]Timestamp) + + for { + select { + case <-ms.ctx.Done(): + return + default: + wg := sync.WaitGroup{} + findMapMutex := sync.RWMutex{} + ms.consumerLock.Lock() + for _, consumer := range ms.consumers { + if isChannelReady[consumer] { + continue + } + wg.Add(1) + go ms.findTimeTick(consumer, eofMsgTimeStamp, &wg, &findMapMutex) + } + wg.Wait() + timeStamp, ok := checkTimeTickMsg(eofMsgTimeStamp, isChannelReady, &findMapMutex) + ms.consumerLock.Unlock() + if !ok || timeStamp <= ms.lastTimeStamp { + //log.Printf("All timeTick's timestamps are inconsistent") + continue + } + timeTickBuf := make([]TsMsg, 0) + msgPositions := make([]*msgstream.MsgPosition, 0) + ms.unsolvedMutex.Lock() + for consumer, msgs := range ms.unsolvedBuf { + tempBuffer := make([]TsMsg, 0) + var timeTickMsg TsMsg + for _, v := range msgs { + if v.Type() == commonpb.MsgType_kTimeTick { + timeTickMsg = v + continue + } + if v.EndTs() <= timeStamp { + timeTickBuf = append(timeTickBuf, v) + } else { + tempBuffer = append(tempBuffer, v) + } + } + ms.unsolvedBuf[consumer] = tempBuffer + + if len(tempBuffer) > 0 { + msgPositions = append(msgPositions, &msgstream.MsgPosition{ + ChannelName: tempBuffer[0].Position().ChannelName, + MsgID: tempBuffer[0].Position().MsgID, + Timestamp: timeStamp, + }) + } else { + msgPositions = append(msgPositions, &msgstream.MsgPosition{ + ChannelName: timeTickMsg.Position().ChannelName, + MsgID: timeTickMsg.Position().MsgID, + Timestamp: timeStamp, + }) + } + } + ms.unsolvedMutex.Unlock() + + msgPack := MsgPack{ + BeginTs: ms.lastTimeStamp, + EndTs: timeStamp, + Msgs: timeTickBuf, + StartPositions: msgPositions, + } + + ms.receiveBuf <- &msgPack + ms.lastTimeStamp = timeStamp + } + } +} + +func (ms *RmqTtMsgStream) findTimeTick(consumer rocksmq.Consumer, + eofMsgMap map[rocksmq.Consumer]Timestamp, + wg *sync.WaitGroup, + findMapMutex *sync.RWMutex) { + defer wg.Done() + for { + select { + case <-ms.ctx.Done(): + return + case num, ok := <-consumer.MsgNum: + if !ok { + log.Printf("consumer closed!") + return + } + rmqMsg, err := rocksmq.Rmq.Consume(consumer.GroupName, consumer.ChannelName, num) + if err != nil { + log.Printf("Failed to consume message in rocksmq, error = %v", err) + continue + } + + for j := 0; j < len(rmqMsg); j++ { + headerMsg := commonpb.MsgHeader{} + err := proto.Unmarshal(rmqMsg[j].Payload, &headerMsg) + if err != nil { + log.Printf("Failed to unmarshal message header, error = %v", err) + continue + } + tsMsg, err := ms.unmarshal.Unmarshal(rmqMsg[j].Payload, headerMsg.Base.MsgType) + if err != nil { + log.Printf("Failed to unmarshal tsMsg, error = %v", err) + continue + } + + tsMsg.SetPosition(&msgstream.MsgPosition{ + ChannelName: filepath.Base(consumer.ChannelName), + MsgID: strconv.Itoa(int(rmqMsg[j].MsgID)), + }) + + ms.unsolvedMutex.Lock() + ms.unsolvedBuf[consumer] = append(ms.unsolvedBuf[consumer], tsMsg) + ms.unsolvedMutex.Unlock() + + if headerMsg.Base.MsgType == commonpb.MsgType_kTimeTick { + findMapMutex.Lock() + eofMsgMap[consumer] = tsMsg.(*TimeTickMsg).Base.Timestamp + findMapMutex.Unlock() + return + } + } + } + } +} + +func (ms *RmqTtMsgStream) Seek(mp *msgstream.MsgPosition) error { + var consumer rocksmq.Consumer + var msgID UniqueID + for index, channel := range ms.consumerChannels { + if filepath.Base(channel) == filepath.Base(mp.ChannelName) { + consumer = ms.consumers[index] + seekMsgID, err := strconv.ParseInt(mp.MsgID, 10, 64) + if err != nil { + return err + } + msgID = UniqueID(seekMsgID) + break + } + } + err := rocksmq.Rmq.Seek(consumer.GroupName, consumer.ChannelName, msgID) + if err != nil { + return err + } + ms.unsolvedMutex.Lock() + ms.unsolvedBuf[consumer] = make([]TsMsg, 0) + + for { + select { + case <-ms.ctx.Done(): + return nil + case num, ok := <-consumer.MsgNum: + if !ok { + return errors.New("consumer closed") + } + rmqMsg, err := rocksmq.Rmq.Consume(consumer.GroupName, consumer.ChannelName, num) + if err != nil { + log.Printf("Failed to consume message in rocksmq, error = %v", err) + continue + } + + for j := 0; j < len(rmqMsg); j++ { + headerMsg := commonpb.MsgHeader{} + err := proto.Unmarshal(rmqMsg[j].Payload, &headerMsg) + if err != nil { + log.Printf("Failed to unmarshal message header, error = %v", err) + } + tsMsg, err := ms.unmarshal.Unmarshal(rmqMsg[j].Payload, headerMsg.Base.MsgType) + if err != nil { + log.Printf("Failed to unmarshal tsMsg, error = %v", err) + } + + if headerMsg.Base.MsgType == commonpb.MsgType_kTimeTick { + if tsMsg.BeginTs() >= mp.Timestamp { + ms.unsolvedMutex.Unlock() + return nil + } + continue + } + if tsMsg.BeginTs() > mp.Timestamp { + tsMsg.SetPosition(&msgstream.MsgPosition{ + ChannelName: filepath.Base(consumer.ChannelName), + MsgID: strconv.Itoa(int(rmqMsg[j].MsgID)), + }) + ms.unsolvedBuf[consumer] = append(ms.unsolvedBuf[consumer], tsMsg) + } + } + } + } +} + +func checkTimeTickMsg(msg map[rocksmq.Consumer]Timestamp, + isChannelReady map[rocksmq.Consumer]bool, + mu *sync.RWMutex) (Timestamp, bool) { + checkMap := make(map[Timestamp]int) + var maxTime Timestamp = 0 + for _, v := range msg { + checkMap[v]++ + if v > maxTime { + maxTime = v + } + } + if len(checkMap) <= 1 { + for consumer := range msg { + isChannelReady[consumer] = false + } + return maxTime, true + } + for consumer := range msg { + mu.RLock() + v := msg[consumer] + mu.RUnlock() + if v != maxTime { + isChannelReady[consumer] = false + } else { + isChannelReady[consumer] = true + } + } + + return 0, false +} diff --git a/internal/util/rocksmq/rocksmq.go b/internal/util/rocksmq/rocksmq.go index 322b5a0e19e31f13906da1a542f9817d771f66fb..16d3e8493f2004eeaa7b91b515ce8d69eca78672 100644 --- a/internal/util/rocksmq/rocksmq.go +++ b/internal/util/rocksmq/rocksmq.go @@ -54,7 +54,7 @@ type ProducerMessage struct { } type ConsumerMessage struct { - msgID UniqueID + MsgID UniqueID Payload []byte } @@ -299,7 +299,7 @@ func (rmq *RocksMQ) Consume(groupName string, channelName string, n int) ([]Cons return nil, err } msg := ConsumerMessage{ - msgID: msgID, + MsgID: msgID, Payload: val.Data(), } consumerMessage = append(consumerMessage, msg) @@ -314,7 +314,7 @@ func (rmq *RocksMQ) Consume(groupName string, channelName string, n int) ([]Cons return consumerMessage, nil } - newID := consumerMessage[len(consumerMessage)-1].msgID + newID := consumerMessage[len(consumerMessage)-1].MsgID err = rmq.Seek(groupName, channelName, newID) if err != nil { return nil, err