diff --git a/internal/msgstream/rmqms/rmq_msgstream.go b/internal/msgstream/rmqms/rmq_msgstream.go index 0b5114330bd62b680caaa9fcb178f8598a10b3ed..9365790a535ee7875891dafe3f772da29f0dd881 100644 --- a/internal/msgstream/rmqms/rmq_msgstream.go +++ b/internal/msgstream/rmqms/rmq_msgstream.go @@ -597,6 +597,7 @@ func (rtms *RmqTtMsgStream) Seek(mp *msgstream.MsgPosition) error { rtms.addConsumer(consumer, seekChannel) if len(consumer.Chan()) == 0 { + log.Debug("len(consumer.Chan()) = 0") return nil } diff --git a/internal/util/rocksmq/client/rocksmq/client_impl.go b/internal/util/rocksmq/client/rocksmq/client_impl.go index aa0da6cbeacabb02e8e452adc022bd6cf39a5da8..5795001c72f1d49aed0c687067e9112612d8231a 100644 --- a/internal/util/rocksmq/client/rocksmq/client_impl.go +++ b/internal/util/rocksmq/client/rocksmq/client_impl.go @@ -1,8 +1,6 @@ package rocksmq import ( - "strconv" - "github.com/zilliztech/milvus-distributed/internal/log" server "github.com/zilliztech/milvus-distributed/internal/util/rocksmq/server/rocksmq" ) @@ -44,6 +42,22 @@ func (c *client) CreateProducer(options ProducerOptions) (Producer, error) { func (c *client) Subscribe(options ConsumerOptions) (Consumer, error) { // Create a consumer + //for _, con := range c.consumers { + // log.Debug(con.Topic() + "---------------" + con.Subscription()) + // if con.Topic() == options.Topic && con.Subscription() == options.SubscriptionName { + // log.Debug("consumer existed") + // return con, nil + // } + //} + if exist, con := c.server.ExistConsumerGroup(options.Topic, options.SubscriptionName); exist { + log.Debug("EXISTED") + consumer, err := newConsumer1(c, options, con.MsgMutex) + if err != nil { + return nil, err + } + go consume(consumer) + return consumer, nil + } consumer, err := newConsumer(c, options) if err != nil { return nil, err @@ -65,42 +79,58 @@ func (c *client) Subscribe(options ConsumerOptions) (Consumer, error) { // Take messages from RocksDB and put it into consumer.Chan(), // trigger by consumer.MsgMutex which trigger by producer - go func() { - for { //nolint:gosimple - select { - case _, ok := <-consumer.MsgMutex(): - if !ok { - // consumer MsgMutex closed, goroutine exit - return + go consume(consumer) + c.consumerOptions = append(c.consumerOptions, options) + + return consumer, nil +} + +func consume(consumer *consumer) { + for { //nolint:gosimple + log.Debug(consumer.topic + "+" + consumer.consumerName) + //if consumer.msgMutex == nil { + // break + //} + select { //nolint:gosimple + case _, ok := <-consumer.MsgMutex(): + if !ok { + // consumer MsgMutex closed, goroutine exit + log.Debug("consumer MsgMutex closed") + return + } + log.Debug("Before consume") + + for { + msg, err := consumer.client.server.Consume(consumer.topic, consumer.consumerName, 1) + if err != nil { + log.Debug("Consumer's goroutine cannot consume from (" + consumer.topic + + "," + consumer.consumerName + "): " + err.Error()) + break + } + + if len(msg) != 1 { + //log.Debug("Consumer's goroutine cannot consume from (" + consumer.topic + + // "," + consumer.consumerName + "): message len(" + strconv.Itoa(len(msg)) + + // ") is not 1") + break } - for { - msg, err := consumer.client.server.Consume(consumer.topic, consumer.consumerName, 1) - if err != nil { - log.Debug("Consumer's goroutine cannot consume from (" + consumer.topic + - "," + consumer.consumerName + "): " + err.Error()) - break - } - - if len(msg) != 1 { - log.Debug("Consumer's goroutine cannot consume from (" + consumer.topic + - "," + consumer.consumerName + "): message len(" + strconv.Itoa(len(msg)) + - ") is not 1") - break - } - - consumer.messageCh <- ConsumerMessage{ - MsgID: msg[0].MsgID, - Payload: msg[0].Payload, - } + consumer.messageCh <- ConsumerMessage{ + MsgID: msg[0].MsgID, + Payload: msg[0].Payload, } } + //default: + // log.Debug("In default") } - }() - - return consumer, nil + } } func (c *client) Close() { // TODO: free resources + for _, opt := range c.consumerOptions { + log.Debug("Close" + opt.Topic + "+" + opt.SubscriptionName) + _ = c.server.DestroyConsumerGroup(opt.Topic, opt.SubscriptionName) + //_ = c.server.DestroyTopic(opt.Topic) + } } diff --git a/internal/util/rocksmq/client/rocksmq/consumer_impl.go b/internal/util/rocksmq/client/rocksmq/consumer_impl.go index ecd88b1dd911efcfb234e4774e0d722f46205c09..9314f168555b79f620ad097403ad03c402529e58 100644 --- a/internal/util/rocksmq/client/rocksmq/consumer_impl.go +++ b/internal/util/rocksmq/client/rocksmq/consumer_impl.go @@ -38,6 +38,34 @@ func newConsumer(c *client, options ConsumerOptions) (*consumer, error) { }, nil } +func newConsumer1(c *client, options ConsumerOptions, msgMutex chan struct{}) (*consumer, error) { + if c == nil { + return nil, newError(InvalidConfiguration, "client is nil") + } + + if options.Topic == "" { + return nil, newError(InvalidConfiguration, "Topic is empty") + } + + if options.SubscriptionName == "" { + return nil, newError(InvalidConfiguration, "SubscriptionName is empty") + } + + messageCh := options.MessageChannel + if options.MessageChannel == nil { + messageCh = make(chan ConsumerMessage, 1) + } + + return &consumer{ + topic: options.Topic, + client: c, + consumerName: options.SubscriptionName, + options: options, + msgMutex: msgMutex, + messageCh: messageCh, + }, nil +} + func (c *consumer) Subscription() string { return c.consumerName } diff --git a/internal/util/rocksmq/server/rocksmq/rocksmq.go b/internal/util/rocksmq/server/rocksmq/rocksmq.go index 974600bb0ec9343b365fac39fe4ccb713766d362..8f5edb4a047b1454b6e220604e9cf67df27163b5 100644 --- a/internal/util/rocksmq/server/rocksmq/rocksmq.go +++ b/internal/util/rocksmq/server/rocksmq/rocksmq.go @@ -26,4 +26,5 @@ type RocksMQ interface { Produce(topicName string, messages []ProducerMessage) error Consume(topicName string, groupName string, n int) ([]ConsumerMessage, error) Seek(topicName string, groupName string, msgID UniqueID) error + ExistConsumerGroup(topicName string, groupName string) (bool, *Consumer) } diff --git a/internal/util/rocksmq/server/rocksmq/rocksmq_impl.go b/internal/util/rocksmq/server/rocksmq/rocksmq_impl.go index 323f083141e9a53755517f725061502f58b00f5c..08af557214276a0cbfc4a091aecb68526e39e3fd 100644 --- a/internal/util/rocksmq/server/rocksmq/rocksmq_impl.go +++ b/internal/util/rocksmq/server/rocksmq/rocksmq_impl.go @@ -134,10 +134,23 @@ func (rmq *rocksmq) DestroyTopic(topicName string) error { log.Debug("RocksMQ: remove " + endKey + " failed.") return err } + log.Debug("DestroyTopic: " + topicName) return nil } +func (rmq *rocksmq) ExistConsumerGroup(topicName, groupName string) (bool, *Consumer) { + key := groupName + "/" + topicName + "/current_id" + if rmq.checkKeyExist(key) { + for _, con := range rmq.consumers[topicName] { + if con.GroupName == groupName { + return true, con + } + } + } + return false, nil +} + func (rmq *rocksmq) CreateConsumerGroup(topicName, groupName string) error { key := groupName + "/" + topicName + "/current_id" if rmq.checkKeyExist(key) { @@ -170,6 +183,14 @@ func (rmq *rocksmq) DestroyConsumerGroup(topicName, groupName string) error { log.Debug("RocksMQ: remove " + key + " failed.") return err } + for index, con := range rmq.consumers[topicName] { + if con.GroupName == groupName { + rmq.consumers[topicName] = append(rmq.consumers[topicName][:index], + rmq.consumers[topicName][index+1:]...) + } + } + + log.Debug("DestroyConsumerGroup: " + topicName + "+" + groupName) return nil }