diff --git a/internal/msgstream/rmqms/rmq_msgstream.go b/internal/msgstream/rmqms/rmq_msgstream.go index 3c463f588c6bb626265f119d23f7d60bc45e25ea..cac7056f6f11f688d394073938c87e1e7e93399f 100644 --- a/internal/msgstream/rmqms/rmq_msgstream.go +++ b/internal/msgstream/rmqms/rmq_msgstream.go @@ -65,17 +65,13 @@ func newRmqMsgStream(ctx context.Context, receiveBufSize int64, rmqBufSize int64 consumerChannels: consumerChannels, consumerReflects: consumerReflects, consumerLock: &sync.Mutex{}, + wait: &sync.WaitGroup{}, } return stream, nil } func (ms *RmqMsgStream) Start() { - ms.wait = &sync.WaitGroup{} - if ms.consumers != nil { - ms.wait.Add(1) - go ms.bufMsgPackToChannel() - } } func (ms *RmqMsgStream) Close() { @@ -123,6 +119,8 @@ func (ms *RmqMsgStream) AsConsumer(channels []string, groupName string) { Dir: reflect.SelectRecv, Chan: reflect.ValueOf(consumer.MsgNum), }) + ms.wait.Add(1) + go ms.receiveMsg(*consumer) } } } @@ -235,56 +233,41 @@ func (ms *RmqMsgStream) Consume() (*msgstream.MsgPack, context.Context) { } } -func (ms *RmqMsgStream) bufMsgPackToChannel() { +/** +receiveMsg func is used to solve search timeout problem +which is caused by selectcase +*/ +func (ms *RmqMsgStream) receiveMsg(consumer rocksmq.Consumer) { defer ms.wait.Done() for { select { case <-ms.ctx.Done(): - log.Println("done") return - default: + case msgNum, ok := <-consumer.MsgNum: + if !ok { + return + } + rmqMsg, err := rocksmq.Rmq.Consume(consumer.GroupName, consumer.ChannelName, msgNum) + if err != nil { + log.Printf("Failed to consume message in rocksmq, error = %v", err) + continue + } tsMsgList := make([]msgstream.TsMsg, 0) - - for { - chosen, value, ok := reflect.Select(ms.consumerReflects) - if !ok { - log.Printf("channel closed") - return - } - - msgNum := value.Interface().(int) - rmqMsg, err := rocksmq.Rmq.Consume(ms.consumers[chosen].GroupName, ms.consumers[chosen].ChannelName, msgNum) + for j := 0; j < len(rmqMsg); j++ { + headerMsg := commonpb.MsgHeader{} + err := proto.Unmarshal(rmqMsg[j].Payload, &headerMsg) if err != nil { - log.Printf("Failed to consume message in rocksmq, error = %v", err) + log.Printf("Failed to unmarshal message header, error = %v", err) continue } - for j := 0; j < len(rmqMsg); j++ { - headerMsg := commonpb.MsgHeader{} - err := proto.Unmarshal(rmqMsg[j].Payload, &headerMsg) - if err != nil { - log.Printf("Failed to unmarshal message header, error = %v", err) - continue - } - tsMsg, err := ms.unmarshal.Unmarshal(rmqMsg[j].Payload, headerMsg.Base.MsgType) - if err != nil { - log.Printf("Failed to unmarshal tsMsg, error = %v", err) - continue - } - tsMsgList = append(tsMsgList, tsMsg) - } - noMoreMessage := true - for k := 0; k < len(ms.consumers); k++ { - if len(ms.consumers[k].MsgNum) > 0 { - noMoreMessage = false - } - } - - if noMoreMessage { - break + tsMsg, err := ms.unmarshal.Unmarshal(rmqMsg[j].Payload, headerMsg.Base.MsgType) + if err != nil { + log.Printf("Failed to unmarshal tsMsg, error = %v", err) + continue } + tsMsgList = append(tsMsgList, tsMsg) } - if len(tsMsgList) > 0 { msgPack := util.MsgPack{Msgs: tsMsgList} ms.receiveBuf <- &msgPack