RocketMQ长轮询源码拆解:为什么它能做到低延迟高吞吐?

做消息中间件的同学都知道,RocketMQ的长轮询(Long Polling)是它低延迟高吞吐的核心武器——既避免了传统轮询的高频无效请求,又能保证消息到达时快速推送。但你真的懂它的源码逻辑吗?今天我们从Pull请求的生命周期入手,拆解长轮询的“挂起-唤醒”底层逻辑,看完你会明白:为什么RocketMQ能在低延迟和高吞吐之间找到完美平衡。
一、长轮询的核心矛盾:为什么传统轮询不行?
在聊RocketMQ的长轮询前,得先搞懂传统短轮询的痛点:客户端每隔固定时间发请求问Broker“有没有消息”,如果没有就返回空。这种方式有两个致命问题:
RocketMQ的长轮询就是为解决这两个矛盾而生——没有消息时不直接返回,而是把请求“挂”在Broker上,等有消息了再回复。
二、长轮询的源码骨架:从Pull请求到挂起
RocketMQ的长轮询核心逻辑集中在PullMessageProcessor(处理Pull请求)和PullRequestHoldService(管理挂起请求)两个类。我们从Pull请求的处理流程开始拆解:
1. Pull请求的第一次处理:检查消息
当Consumer发送Pull请求(包含topic、queueId、当前offset)到Broker后,请求会被转发到PullMessageProcessor的processRequest方法:
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
// PullMessageProcessor.java@Overridepublicvoid processRequest(ChannelHandlerContext ctx,RemotingCommand request)throwsRemotingCommandException{// 解析请求头:拿到topic、queueId、offset等核心参数PullMessageRequestHeader requestHeader = parseRequestHeader(request);// 关键步骤1:从MessageStore检查是否有可用消息GetMessageResult getResult = messageStore.getMessage( requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getOffset(),1// 只查1条,判断是否有消息);// 如果没有消息(NO_MESSAGE_IN_QUEUE),进入挂起逻辑if(getResult.getStatus()==GetMessageStatus.NO_MESSAGE_IN_QUEUE){ holdRequest(ctx, request, requestHeader);return;}// 有消息则直接构建响应返回 sendResponse(ctx, request, getResult);}
这段代码的核心拐点是:如果MessageStore返回“无消息”,不直接返回空响应,而是调用holdRequest方法挂起请求。
2. 挂起请求:把请求交给PullRequestHoldService
holdRequest方法的作用是将当前请求包装成PullRequest对象,然后丢到PullRequestHoldService的阻塞队列中:
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
// PullMessageProcessor.javaprivatevoid holdRequest(ChannelHandlerContext ctx,RemotingCommand request,PullMessageRequestHeader requestHeader){PullRequest pullRequest =newPullRequest(); pullRequest.setChannel(ctx.channel());// 保存客户端连接 pullRequest.setRequestCommand(request);// 保存原始请求 pullRequest.setTopic(requestHeader.getTopic()); pullRequest.setQueueId(requestHeader.getQueueId()); pullRequest.setOffset(requestHeader.getOffset());// 关键操作:将请求放入PullRequestHoldService的队列 pullRequestHoldService.putRequest(pullRequest);}
PullRequestHoldService是一个定时任务线程(继承自ServiceThread),它维护了一个ConcurrentHashMap,key是“topic_queueId”,value是该队列的挂起请求队列:
- 36
- 37
- 38
// PullRequestHoldService.javaprivatefinalConcurrentHashMap<String,LinkedBlockingQueue<PullRequest>> pullRequestTable =newConcurrentHashMap<>();
public void putRequest(PullRequest pullRequest) {
String key = pullRequest.getTopic() + “_” + pullRequest.getQueueId();
// 按topic+queueId分组,创建或获取队列
LinkedBlockingQueue
// 将请求放入队列(阻塞队列,满了会等待)
queue.offer(pullRequest);
}
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
到这一步,Consumer的Pull请求已经被“挂”在Broker的队列里了,不会立即收到响应。### 三、唤醒请求:新消息到达时的触发逻辑挂起的请求什么时候会被唤醒?答案是**当对应topic-queue有新消息到达时**。#### 1. 新消息的通知机制当Producer发送消息到Broker后,`DefaultMessageStore`会调用`dispatchPostWriteRequest`方法,通知`PullRequestHoldService`有新消息到达:```java// DefaultMessageStore.javapublicvoid dispatchPostWriteRequest(MessageQueue messageQueue,long offset){// 关键操作:通知PullRequestHoldService,某个队列有新消息了pullRequestHoldService.notifyMessageArriving(messageQueue.getTopic(),messageQueue.getQueueId(),offset // 新消息的offset);}
2. 唤醒挂起的请求
PullRequestHoldService的notifyMessageArriving方法会遍历该topic-queue下的所有挂起请求,检查它们的offset是否小于新消息的offset(即有新消息可用):
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
// PullRequestHoldService.javapublicvoid notifyMessageArriving(String topic,int queueId,long maxOffset){String key = topic +"_"+ queueId;LinkedBlockingQueue<PullRequest> queue = pullRequestTable.get(key);if (queue == null) return;// 遍历所有挂起的请求Iterator<PullRequest> iterator = queue.iterator();while (iterator.hasNext()) {PullRequest pullRequest = iterator.next();// 检查请求的offset是否小于新消息的maxOffset(即有新消息可用)if (pullRequest.getOffset() < maxOffset) {// 关键操作:唤醒请求,重新处理processPullRequest(pullRequest);iterator.remove(); // 从队列中移除,避免重复处理}}}
processPullRequest方法会重新调用MessageStore检查消息,确认有消息后,构建响应并发送给Consumer。
四、长轮询的完整流程:用UML看全局
为了更清晰展示整个“挂起-唤醒”流程,我们画了一张UML时序图:

这张图的核心是两次检查消息:
五、长轮询的优势:为什么能低延迟高吞吐?
从源码能看出,RocketMQ的长轮询本质是“用Broker的资源换客户端的效率”:
说到底,长轮询的逻辑就是“能等但不瞎等”——Broker帮Consumer“守着”消息,有了再通知,既省了客户端的事儿,又提高了整体效率。
结尾:聊聊长轮询的那些坑
其实RocketMQ的长轮询也不是完美的,比如:
•如果Broker挂了,挂起的请求会丢失,Consumer需要重新发起请求;•如果某个topic-queue的消息很久没到达,挂起的请求会一直占着Broker的内存(不过RocketMQ有超时机制,默认15秒没消息会自动返回空响应)。但总体来说,长轮询还是RocketMQ最核心的“黑科技”之一。你在使用RocketMQ时遇到过哪些长轮询相关的问题?评论区聊聊,咱们一起避坑~
夜雨聆风