diff --git a/internal/msgstream/client/rocksmq/rmq_client.go b/internal/msgstream/client/rocksmq/rmq_client.go index 2f16d403ce18fd76d067fd2c8b0ee499d65cbd7c..780de35d8771a52fb835c928795129d7b2c7004c 100644 --- a/internal/msgstream/client/rocksmq/rmq_client.go +++ b/internal/msgstream/client/rocksmq/rmq_client.go @@ -57,6 +57,7 @@ func (rc *rmqClient) Subscribe(options client.ConsumerOptions) (client.Consumer, close(msgChannel) return } + msg.Topic = options.Topic msgChannel <- &rmqMessage{msg: msg} } } diff --git a/internal/msgstream/ms/msgstream_impl.go b/internal/msgstream/ms/msgstream_impl.go index d663df247e6e2448555ac75a58ad610c873cb63f..7fc30961c520659575012ba7b54494499051f6f1 100644 --- a/internal/msgstream/ms/msgstream_impl.go +++ b/internal/msgstream/ms/msgstream_impl.go @@ -606,6 +606,7 @@ func (ms *TtMsgStream) findTimeTick(consumer Consumer, } // set msg info to tsMsg + log.Debug(msg.Topic() + "--------" + string(msg.ID().Serialize())) tsMsg.SetPosition(&msgstream.MsgPosition{ ChannelName: filepath.Base(msg.Topic()), MsgID: msg.ID().Serialize(), @@ -701,7 +702,7 @@ func (ms *TtMsgStream) Seek(mp *internalpb.MsgPosition) error { if err != nil { return err } - consumer.Seek(seekMsgID) + _ = consumer.Seek(seekMsgID) ms.addConsumer(consumer, seekChannel) //TODO: May cause problem diff --git a/internal/util/rocksmq/client/rocksmq/client_impl.go b/internal/util/rocksmq/client/rocksmq/client_impl.go index 5795001c72f1d49aed0c687067e9112612d8231a..d723646587700891c9dd0583d9fb86f4103ca527 100644 --- a/internal/util/rocksmq/client/rocksmq/client_impl.go +++ b/internal/util/rocksmq/client/rocksmq/client_impl.go @@ -1,6 +1,8 @@ package rocksmq import ( + "context" + "github.com/zilliztech/milvus-distributed/internal/log" server "github.com/zilliztech/milvus-distributed/internal/util/rocksmq/server/rocksmq" ) @@ -9,6 +11,8 @@ type client struct { server RocksMQ producerOptions []ProducerOptions consumerOptions []ConsumerOptions + context context.Context + cancel context.CancelFunc } func newClient(options ClientOptions) (*client, error) { @@ -16,9 +20,12 @@ func newClient(options ClientOptions) (*client, error) { return nil, newError(InvalidConfiguration, "Server is nil") } + ctx, cancel := context.WithCancel(context.Background()) c := &client{ server: options.Server, producerOptions: []ProducerOptions{}, + context: ctx, + cancel: cancel, } return c, nil } @@ -55,7 +62,7 @@ func (c *client) Subscribe(options ConsumerOptions) (Consumer, error) { if err != nil { return nil, err } - go consume(consumer) + go consume(c.context, consumer) return consumer, nil } consumer, err := newConsumer(c, options) @@ -79,26 +86,25 @@ func (c *client) Subscribe(options ConsumerOptions) (Consumer, error) { // Take messages from RocksDB and put it into consumer.Chan(), // trigger by consumer.MsgMutex which trigger by producer - go consume(consumer) + go consume(c.context, consumer) c.consumerOptions = append(c.consumerOptions, options) return consumer, nil } -func consume(consumer *consumer) { +func consume(ctx context.Context, consumer *consumer) { for { //nolint:gosimple - log.Debug(consumer.topic + "+" + consumer.consumerName) - //if consumer.msgMutex == nil { - // break - //} select { //nolint:gosimple + case <-ctx.Done(): + log.Debug("client finished") + return case _, ok := <-consumer.MsgMutex(): + log.Debug("Before consume") if !ok { // consumer MsgMutex closed, goroutine exit log.Debug("consumer MsgMutex closed") return } - log.Debug("Before consume") for { msg, err := consumer.client.server.Consume(consumer.topic, consumer.consumerName, 1) @@ -120,8 +126,7 @@ func consume(consumer *consumer) { Payload: msg[0].Payload, } } - //default: - // log.Debug("In default") + default: } } } @@ -131,6 +136,8 @@ func (c *client) Close() { for _, opt := range c.consumerOptions { log.Debug("Close" + opt.Topic + "+" + opt.SubscriptionName) _ = c.server.DestroyConsumerGroup(opt.Topic, opt.SubscriptionName) + //TODO(yukun): Should topic be closed? //_ = c.server.DestroyTopic(opt.Topic) } + c.cancel() } diff --git a/internal/util/rocksmq/server/rocksmq/rocksmq_impl.go b/internal/util/rocksmq/server/rocksmq/rocksmq_impl.go index 08af557214276a0cbfc4a091aecb68526e39e3fd..7edfbaf00a66f4e216e725132970086aae05253a 100644 --- a/internal/util/rocksmq/server/rocksmq/rocksmq_impl.go +++ b/internal/util/rocksmq/server/rocksmq/rocksmq_impl.go @@ -134,6 +134,8 @@ func (rmq *rocksmq) DestroyTopic(topicName string) error { log.Debug("RocksMQ: remove " + endKey + " failed.") return err } + + delete(rmq.consumers, topicName) log.Debug("DestroyTopic: " + topicName) return nil @@ -185,6 +187,7 @@ func (rmq *rocksmq) DestroyConsumerGroup(topicName, groupName string) error { } for index, con := range rmq.consumers[topicName] { if con.GroupName == groupName { + close(con.MsgMutex) rmq.consumers[topicName] = append(rmq.consumers[topicName][:index], rmq.consumers[topicName][index+1:]...) }