Performance optimization-rocketmq client consumption stepping

Performance optimization-rocketmq client consumption stepping

1. Problem description

       The related business process of our open service is actually a data forwarding service, which will listen to events in the field and then push it to relevant third parties using httpClient.

        Due to the poor performance of some interfaces, after optimizing a wave of interfaces, we re-launched this service, but as soon as it went online, the ** exploded! It exploded! **I went online twice at 9 o clock and 1 at 14 o clock in total. Once the service was updated, there was an oom situation. After the first oom, we checked the business code and thought it was due to the amount of data forwarded. The oom was too large, so I increased the memory of the jvm and found it was useless. This is very weird.

2. Problem analysis

2.1 oom problem analysis

      Looking at the stack file through jprofiler, you can see that there are a large number of MessageClientExt objects? At this time, I still look confused. Why are there so many objects that have not been recovered? Our preliminary judgment is that a large number of mq messages have been pulled, which caused the accumulation of local messages.

       I used Arthas to set the probe to the consumption function, and found a large number of method calls during mq consumption. It is very strange. Why do you consume so many messages? Then look at the timestamp of the message body, most of which are messages from a few days ago, so why would you pull messages that have already been consumed? How does mqconsumer control message consumption?

2.2 The consumer principle of rocketMQ

First of all, let's review the principle of rocketmq client message consumption

      1. let s look at the principle of consumption. Here we focus on consumption patterns and consumption strategies.

Consumption mode:

  • The default is the CLUSTERING mode, that is, multiple consumers in the same Consumer group each consume a portion, and the content of the messages they receive is different. In this case, the Broker side stores and controls the Offset value, using the RemoteBrokerOffsetStore structure.

  • In BROADCASTING mode, each Consumer receives all the messages of this topic, and there is no interference between consumers. RocketMQ uses LocalfileOffsetStore to store the Offset locally

Consumption strategy:

  • CONSUME_FROM_LAST_OFFSET//The default strategy, start consumption from the end of the queue, that is, skip historical messages
  • CONSUME_FROM_FIRST_OFFSET//Start consumption from the very beginning of the queue, that is, all historical messages (also stored in the broker) are consumed once
  • CONSUME_FROM_TIMESTAMP//Start consumption from a certain point in time, used in conjunction with setConsumeTimestamp(), the default is half an hour ago

       When we use DefaultMQPushConsumer, we don't need to care about OffsetStore. When using PullConsumer, we need to process OffsetStore on the client side. There is a function in the DefaultMQPushConsumer class to set where to start consumption: setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET), this statement sets the reading from the smallest offset, and the offset update for the message pull from the broker can be completed by sending a command.


   -RemoteBrokerOffsetStore#fetchConsumeOffsetFromBroker (send an update offset command to the broker)

The overall flow of message consumption is as follows:

Everyone has a question here, why doesn't the mq client do flow control processing? With so much news accumulated in the local area, he will go to pull new news stupidly? In fact, there is specific flow control logic in org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#pullMessage. The client has flow control based on the memory size of the local cache queue and the number of messages that have been stored. Many unprocessed cached messages will choose not to pull.

Then why is it still pulling news all the time? Coincidentally, this flow control logic is invalid for multi-threaded consumption scenarios . When a message processing request submitted to ProcessQueue is submitted to the thread pool for processing each time, even if the thread pool cannot be processed, it will be thrown into the queue for execution. , So it will cause ProcessQueue to fail to meet the size threshold of the number of messages and the amount of memory, thereby avoiding flow control, but because the listener method processing is time-consuming, the processing time is getting higher and higher under heavy traffic, but it is submitted to In the thread pool (blocking queue), this causes the blocked unbounded queue to become infinitely larger. After the local recurrence, you can see that there are tens of thousands of consumer processing requests in the waiting queue of the entire thread pool, and each consumer request holds With the MessageExt object, of course, the memory has been exhausted at this time.

3. Problem solving

       Talking about a big circle, in fact, in the application scenario of multi-threaded consumption, the wrong consumption offset is set when the message client is initialized, which causes a large number of historical messages to be pulled from the broker and submitted to the thread pool for processing. The processing logic takes a lot of time and enters the waiting queue. Finally, the growth of the queue consumes memory and causes oom. So first, we first set the consumption start position to the last position of the queue, so that the client does not need to pull historical messages for consumption. Of course, this offset setting still needs to be dynamically changed according to the usage scenario. For example, when a large number of broker messages accumulate, some data needs to be discarded. You can set to start consumption from a certain point in time.

        In order to better understand the operation of the consumer side, we encapsulated ConsumeMessageConcurrentlyService ourselves, used a dynamic thread pool to replace the default thread pool, and reported the running parameters of the thread pool to es for unified supervision.

4. Summary

        This article is an online case of Rocketmq's configuration problems leading to service oom. It combines the Rocketmq client to conduct a complete analysis of the root causes and principles of the message consumption process.

It has been more than 20 days since the last update, not because of laziness, but because there are too many online problems, haha. Will be sorted out one after another later. The pit can't be stepped on, just don't step on it twice!