乐于分享
好东西不私藏

RocketMQ长轮询源码拆解:为什么它能做到低延迟高吞吐?

RocketMQ长轮询源码拆解:为什么它能做到低延迟高吞吐?

做消息中间件的同学都知道,RocketMQ的长轮询(Long Polling)是它低延迟高吞吐的核心武器——既避免了传统轮询的高频无效请求,又能保证消息到达时快速推送。但你真的懂它的源码逻辑吗?今天我们从Pull请求的生命周期入手,拆解长轮询的“挂起-唤醒”底层逻辑,看完你会明白:为什么RocketMQ能在低延迟和高吞吐之间找到完美平衡。

一、长轮询的核心矛盾:为什么传统轮询不行?

在聊RocketMQ的长轮询前,得先搞懂传统短轮询的痛点:客户端每隔固定时间发请求问Broker“有没有消息”,如果没有就返回空。这种方式有两个致命问题:

1.高延迟:如果轮询间隔是1秒,消息可能在0.1秒到达,但客户端要等0.9秒才会拿到;2.高无效请求:大部分请求都是“空响应”,浪费Broker和网络资源。
RocketMQ的长轮询就是为解决这两个矛盾而生——没有消息时不直接返回,而是把请求“挂”在Broker上,等有消息了再回复

二、长轮询的源码骨架:从Pull请求到挂起

RocketMQ的长轮询核心逻辑集中在PullMessageProcessor(处理Pull请求)和PullRequestHoldService(管理挂起请求)两个类。我们从Pull请求的处理流程开始拆解:

1. Pull请求的第一次处理:检查消息

当Consumer发送Pull请求(包含topic、queueId、当前offset)到Broker后,请求会被转发到PullMessageProcessorprocessRequest方法:

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

// PullMessageProcessor.java@Overridepublicvoid processRequest(ChannelHandlerContext ctx,RemotingCommand request)throwsRemotingCommandException{// 解析请求头:拿到topic、queueId、offset等核心参数PullMessageRequestHeader requestHeader = parseRequestHeader(request);// 关键步骤1:从MessageStore检查是否有可用消息GetMessageResult getResult = messageStore.getMessage( requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getOffset(),1// 只查1条,判断是否有消息);// 如果没有消息(NO_MESSAGE_IN_QUEUE),进入挂起逻辑if(getResult.getStatus()==GetMessageStatus.NO_MESSAGE_IN_QUEUE){ holdRequest(ctx, request, requestHeader);return;}// 有消息则直接构建响应返回 sendResponse(ctx, request, getResult);}

这段代码的核心拐点是:如果MessageStore返回“无消息”,不直接返回空响应,而是调用holdRequest方法挂起请求

2. 挂起请求:把请求交给PullRequestHoldService

holdRequest方法的作用是将当前请求包装成PullRequest对象,然后丢到PullRequestHoldService的阻塞队列中:

  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35

// PullMessageProcessor.javaprivatevoid holdRequest(ChannelHandlerContext ctx,RemotingCommand request,PullMessageRequestHeader requestHeader){PullRequest pullRequest =newPullRequest(); pullRequest.setChannel(ctx.channel());// 保存客户端连接 pullRequest.setRequestCommand(request);// 保存原始请求 pullRequest.setTopic(requestHeader.getTopic()); pullRequest.setQueueId(requestHeader.getQueueId()); pullRequest.setOffset(requestHeader.getOffset());// 关键操作:将请求放入PullRequestHoldService的队列 pullRequestHoldService.putRequest(pullRequest);}

PullRequestHoldService是一个定时任务线程(继承自ServiceThread),它维护了一个ConcurrentHashMap,key是“topic_queueId”,value是该队列的挂起请求队列:

  • 36
  • 37
  • 38

// PullRequestHoldService.javaprivatefinalConcurrentHashMap<String,LinkedBlockingQueue<PullRequest>> pullRequestTable =newConcurrentHashMap<>();

public void putRequest(PullRequest pullRequest) {
   String key = pullRequest.getTopic() + “_” + pullRequest.getQueueId();
   // 按topic+queueId分组,创建或获取队列
   LinkedBlockingQueue queue = pullRequestTable.computeIfAbsent(key, k -> new LinkedBlockingQueue<>());
   // 将请求放入队列(阻塞队列,满了会等待)
   queue.offer(pullRequest);
}

  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
到这一步,ConsumerPull请求已经被“挂”在Broker的队列里了,不会立即收到响应。### 三、唤醒请求:新消息到达时的触发逻辑挂起的请求什么时候会被唤醒?答案是**当对应topic-queue有新消息到达时**。#### 1. 新消息的通知机制Producer发送消息到Broker后,`DefaultMessageStore`会调用`dispatchPostWriteRequest`方法,通知`PullRequestHoldService`有新消息到达:```java// DefaultMessageStore.javapublicvoid dispatchPostWriteRequest(MessageQueue messageQueue,long offset){// 关键操作:通知PullRequestHoldService,某个队列有新消息了 pullRequestHoldService.notifyMessageArriving( messageQueue.getTopic(), messageQueue.getQueueId(), offset // 新消息的offset);}

2. 唤醒挂起的请求

PullRequestHoldServicenotifyMessageArriving方法会遍历该topic-queue下的所有挂起请求,检查它们的offset是否小于新消息的offset(即有新消息可用):

  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
// PullRequestHoldService.javapublicvoid notifyMessageArriving(String topic,int queueId,long maxOffset){String key = topic +"_"+ queueId;LinkedBlockingQueue<PullRequest> queue = pullRequestTable.get(key);    if (queue == null) return;    // 遍历所有挂起的请求    Iterator<PullRequest> iterator = queue.iterator();    while (iterator.hasNext()) {        PullRequest pullRequest = iterator.next();        // 检查请求的offset是否小于新消息的maxOffset(即有新消息可用)        if (pullRequest.getOffset() < maxOffset) {            // 关键操作:唤醒请求,重新处理            processPullRequest(pullRequest);            iterator.remove(); // 从队列中移除,避免重复处理        }    }}

processPullRequest方法会重新调用MessageStore检查消息,确认有消息后,构建响应并发送给Consumer。

四、长轮询的完整流程:用UML看全局

为了更清晰展示整个“挂起-唤醒”流程,我们画了一张UML时序图

这张图的核心是两次检查消息

1.第一次检查:Pull请求刚到达时,无消息则挂起;2.第二次检查:新消息到达时,重新检查,有消息则唤醒。

五、长轮询的优势:为什么能低延迟高吞吐?

从源码能看出,RocketMQ的长轮询本质是“用Broker的资源换客户端的效率”

低延迟:新消息到达时,Broker会立即唤醒挂起的请求,Consumer几乎能“实时”收到消息(延迟取决于Broker的处理速度);高吞吐:减少了90%以上的无效请求,Broker能处理更多有效请求;客户端友好:Consumer不需要设置短轮询间隔,降低了客户端的CPU和网络消耗。

说到底,长轮询的逻辑就是“能等但不瞎等”——Broker帮Consumer“守着”消息,有了再通知,既省了客户端的事儿,又提高了整体效率。

结尾:聊聊长轮询的那些坑

其实RocketMQ的长轮询也不是完美的,比如:如果Broker挂了,挂起的请求会丢失,Consumer需要重新发起请求;如果某个topic-queue的消息很久没到达,挂起的请求会一直占着Broker的内存(不过RocketMQ有超时机制,默认15秒没消息会自动返回空响应)。
但总体来说,长轮询还是RocketMQ最核心的“黑科技”之一。你在使用RocketMQ时遇到过哪些长轮询相关的问题?评论区聊聊,咱们一起避坑~

本站文章均为手工撰写未经允许谢绝转载:夜雨聆风 » RocketMQ长轮询源码拆解:为什么它能做到低延迟高吞吐?

猜你喜欢

  • 暂无文章