• RocketMQ的长轮询(Long Polling)实现分析


    目录

    前言

    长轮询

    1.实现步骤

    1.1客户端轮询发送请求

    1.2服务端处理数据

    1.3客户端接收数据

    2.实现实例

    RocketMQ长轮询

    1.PullMessage服务

    2.PullMessageProcessor服务

    3.PullCallback回调

    总结


    前言

    消息队列一般在消费端都会提供push和pull两种模式,RocketMQ同样实现了这两种模式,分别提供了两个实现类:DefaultMQPushConsumer和DefaultMQPullConsumer;两种方式各有优势:

    push模式:推送模式,即服务端有数据之后立马推送消息给客户端,需要客户端和服务器建立长连接,实时性很高,对客户端来说也简单,接收处理消息即可;缺点就是服务端不知道客户端处理消息的能力,可能会导致数据积压,同时也增加了服务端的工作量,影响服务端的性能;

    pull模式:拉取模式,即客户端主动去服务端拉取数据,主动权在客户端,拉取数据,然后处理数据,再拉取数据,一直循环下去,具体拉取数据的时间间隔不好设定,太短可能会导致大量的连接拉取不到数据,太长导致数据接收不及时; RocketMQ使用了长轮询的方式,兼顾了push和pull两种模式的优点,下面首先对长轮询做简单介绍,进而分析RocketMQ内置的长轮询模式。

    长轮询

    长轮询通过客户端和服务端的配合,达到主动权在客户端,同时也能保证数据的实时性;长轮询本质上也是轮询,只不过对普通的轮询做了优化处理,服务端在没有数据的时候并不是马上返回数据,会hold住请求,等待服务端有数据,或者一直没有数据超时处理,然后一直循环下去;下面看一下如何简单实现一个长轮询;

    1.实现步骤

    1.1客户端轮询发送请求

    客户端应该存在一个一直循环的程序,不停的向服务端发送获取消息请求;

    1.2服务端处理数据

    服务器接收到客户端请求之后,首先查看是否有数据,如果有数据则直接返回,如果没有则保持连接,等待获取数据,服务端获取数据之后,会通知之前的请求连接来获取数据,然后返回给客户端;

    1.3客户端接收数据

    正常情况下,客户端会马上接收到服务端的数据,或者等待一段时间获取到数据;如果一直获取不到数据,会有超时处理;在获取数据或者超时处理之后会关闭连接,然后再次发起长轮询请求;

    2.实现实例

    以下使用netty模拟一个http服务器,使用HttpURLConnection模拟客户端发送请求,使用BlockingQueue存放数据;

    服务端代码

    1. public class Server {
    2. public static void start(final int port) throws Exception {
    3. EventLoopGroup boss = new NioEventLoopGroup();
    4. EventLoopGroup woker = new NioEventLoopGroup();
    5. ServerBootstrap serverBootstrap = new ServerBootstrap();
    6. try {
    7. serverBootstrap.channel(NioServerSocketChannel.class).group(boss, woker)
    8. .childOption(ChannelOption.SO_KEEPALIVE, true).option(ChannelOption.SO_BACKLOG, 1024)
    9. .childHandler(new ChannelInitializer() {
    10. @Override
    11. protected void initChannel(SocketChannel ch) throws Exception {
    12. ch.pipeline().addLast("http-decoder", new HttpServerCodec());
    13. ch.pipeline().addLast(new HttpServerHandler());
    14. }
    15. });
    16. ChannelFuture future = serverBootstrap.bind(port).sync();
    17. System.out.println("server start ok port is " + port);
    18. DataCenter.start();
    19. future.channel().closeFuture().sync();
    20. } finally {
    21. boss.shutdownGracefully();
    22. woker.shutdownGracefully();
    23. }
    24. }
    25. public static void main(String[] args) throws Exception {
    26. start(8080);
    27. }
    28. }

    netty默认支持http协议,直接使用即可,启动端口为8080;同时启动数据中心服务,相关代码如下:

    1. public class DataCenter {
    2. private static Random random = new Random();
    3. private static BlockingQueue queue = new LinkedBlockingQueue<>();
    4. private static AtomicInteger num = new AtomicInteger();
    5. public static void start() {
    6. while (true) {
    7. try {
    8. Thread.sleep(random.nextInt(5) * 1000);
    9. String data = "hello world" + num.incrementAndGet();
    10. queue.put(data);
    11. System.out.println("store data:" + data);
    12. } catch (InterruptedException e) {
    13. e.printStackTrace();
    14. }
    15. }
    16. }
    17. public static String getData() throws InterruptedException {
    18. return queue.take();
    19. }
    20. }

    为了模拟服务端没有数据,需要等待的情况,这里使用BlockingQueue来模拟,不定期的往队列里面插入数据,同时对外提供获取数据的方法,使用的是take方法,没有数据会阻塞知道有数据为止;getData在类HttpServerHandler中使用,此类也很简单,如下:

    1. public class HttpServerHandler extends ChannelInboundHandlerAdapter {
    2. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    3. if (msg instanceof HttpRequest) {
    4. FullHttpResponse httpResponse = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
    5. httpResponse.content().writeBytes(DataCenter.getData().getBytes());
    6. httpResponse.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/plain; charset=UTF-8");
    7. httpResponse.headers().set(HttpHeaders.Names.CONTENT_LENGTH, httpResponse.content().readableBytes());
    8. ctx.writeAndFlush(httpResponse);
    9. }
    10. }
    11. }

    获取到客户端的请求之后,从数据中心获取一条消息,如果没有数据,会进行等待,直到有数据为止;然后使用FullHttpResponse返回给客户端;客户端使用HttpURLConnection来和服务端建立连接,不停的拉取数据,代码如下:

    1. public class Client {
    2. public static void main(String[] args) {
    3. while (true) {
    4. HttpURLConnection connection = null;
    5. try {
    6. URL url = new URL("http://localhost:8080");
    7. connection = (HttpURLConnection) url.openConnection();
    8. connection.setReadTimeout(10000);
    9. connection.setConnectTimeout(3000);
    10. connection.setRequestMethod("GET");
    11. connection.connect();
    12. if (200 == connection.getResponseCode()) {
    13. BufferedReader reader = null;
    14. try {
    15. reader = new BufferedReader(new InputStreamReader(connection.getInputStream(), "UTF-8"));
    16. StringBuffer result = new StringBuffer();
    17. String line = null;
    18. while ((line = reader.readLine()) != null) {
    19. result.append(line);
    20. }
    21. System.out.println("时间:" + new Date().toString() + "result = " + result);
    22. } finally {
    23. if (reader != null) {
    24. reader.close();
    25. }
    26. }
    27. }
    28. } catch (IOException e) {
    29. e.printStackTrace();
    30. } finally {
    31. if (connection != null) {
    32. connection.disconnect();
    33. }
    34. }
    35. }
    36. }
    37. }

    以上只是简单的模拟了长轮询的方式,下面重点来看看RocketMQ是如何实现长轮询的;

    RocketMQ长轮询

    RocketMQ的消费端提供了两种消费模式分别是:DefaultMQPushConsumer和DefaultMQPullConsumer,其中DefaultMQPushConsumer就是使用的长轮询,所以下面重点分析此类;

    1.PullMessage服务

    从名字可以看出来就是客户端从服务端拉取数据的服务,看里面的一个核心方法:

    1. @Override
    2. public void run() {
    3. log.info(this.getServiceName() + " service started");
    4. while (!this.isStopped()) {
    5. try {
    6. PullRequest pullRequest = this.pullRequestQueue.take();
    7. this.pullMessage(pullRequest);
    8. } catch (InterruptedException ignored) {
    9. } catch (Exception e) {
    10. log.error("Pull Message Service Run Method exception", e);
    11. }
    12. }
    13. log.info(this.getServiceName() + " service end");
    14. }

    服务启动之后,会一直不停的循环调用拉取数据,PullRequest可以看作是拉取数据需要的参数,部分代码如下:

    1. public class PullRequest {
    2. private String consumerGroup;
    3. private MessageQueue messageQueue;
    4. private ProcessQueue processQueue;
    5. private long nextOffset;
    6. private boolean lockedFirst = false;
    7. ...省略...
    8. }

    每个MessageQueue 对应了封装成了一个PullRequest,因为拉取数据是以每个Broker下面的Queue为单位,同时里面还一个ProcessQueue,每个MessageQueue也同样对应一个ProcessQueue,保存了这个MessageQueue消息处理状态的快照;还有nextOffset用来标识读取的位置;继续看一段pullMessage中的内容,给服务端发送请求的头内容:

    1. PullMessageRequestHeader requestHeader = new PullMessageRequestHeader();
    2. requestHeader.setConsumerGroup(this.consumerGroup);
    3. requestHeader.setTopic(mq.getTopic());
    4. requestHeader.setQueueId(mq.getQueueId());
    5. requestHeader.setQueueOffset(offset);
    6. requestHeader.setMaxMsgNums(maxNums);
    7. requestHeader.setSysFlag(sysFlagInner);
    8. requestHeader.setCommitOffset(commitOffset);
    9. requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis);
    10. requestHeader.setSubscription(subExpression);
    11. requestHeader.setSubVersion(subVersion);
    12. requestHeader.setExpressionType(expressionType);
    13. String brokerAddr = findBrokerResult.getBrokerAddr();
    14. if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) {
    15. brokerAddr = computPullFromWhichFilterServer(mq.getTopic(), brokerAddr);
    16. }
    17. PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage(
    18. brokerAddr,
    19. requestHeader,
    20. timeoutMillis,
    21. communicationMode,
    22. pullCallback);
    23. return pullResult;

    其中有一个参数是SuspendTimeoutMillis,作用是设置Broker的最长阻塞时间,默认为15秒,前提是没有消息的情况下,有消息会立刻返回;

    2.PullMessageProcessor服务

    从名字可以看出,服务端用来处理pullMessage的服务,下面重点看一下processRequest方法,其中包括对获取不同结果做的处理:

    1. switch (response.getCode()) {
    2. case ResponseCode.SUCCESS:
    3. ...省略...
    4. break;
    5. case ResponseCode.PULL_NOT_FOUND:
    6. if (brokerAllowSuspend && hasSuspendFlag) {
    7. long pollingTimeMills = suspendTimeoutMillisLong;
    8. if (!this.brokerController.getBrokerConfig().isLongPollingEnable()) {
    9. pollingTimeMills = this.brokerController.getBrokerConfig().getShortPollingTimeMills();
    10. }
    11. String topic = requestHeader.getTopic();
    12. long offset = requestHeader.getQueueOffset();
    13. int queueId = requestHeader.getQueueId();
    14. PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills,
    15. this.brokerController.getMessageStore().now(), offset, subscriptionData);
    16. this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest);
    17. response = null;
    18. break;
    19. }
    20. case ResponseCode.PULL_RETRY_IMMEDIATELY:
    21. break;
    22. case ResponseCode.PULL_OFFSET_MOVED:
    23. ...省略...
    24. break;
    25. default:
    26. assert false;

    一共处理了四个类型,我们关心的是在没有获取到数据的情况下是如何处理的,可以重点看一下ResponseCode.PULL_NOT_FOUND,表示没有拉取到数据,此时会调用PullRequestHoldService服务,从名字可以看出此服务用来hold住请求,不会立马返回,response被至为了null,不给客户端响应;下面重点看一下PullRequestHoldService:

    1. @Override
    2. public void run() {
    3. log.info("{} service started", this.getServiceName());
    4. while (!this.isStopped()) {
    5. try {
    6. if (this.brokerController.getBrokerConfig().isLongPollingEnable()) {
    7. this.waitForRunning(5 * 1000);
    8. } else {
    9. this.waitForRunning(this.brokerController.getBrokerConfig().getShortPollingTimeMills());
    10. }
    11. long beginLockTimestamp = this.systemClock.now();
    12. this.checkHoldRequest();
    13. long costTime = this.systemClock.now() - beginLockTimestamp;
    14. if (costTime > 5 * 1000) {
    15. log.info("[NOTIFYME] check hold request cost {} ms.", costTime);
    16. }
    17. } catch (Throwable e) {
    18. log.warn(this.getServiceName() + " service has exception. ", e);
    19. }
    20. }
    21. log.info("{} service end", this.getServiceName());
    22. }

    此方法主要就是通过不停的检查被hold住的请求,检查是否已经有数据了,具体检查哪些就是在ResponseCode.PULL_NOT_FOUND中调用的suspendPullRequest方法:

    1. private ConcurrentHashMap<String/* topic@queueId */, ManyPullRequest> pullRequestTable =
    2. new ConcurrentHashMap<String, ManyPullRequest>(1024);
    3. public void suspendPullRequest(final String topic, final int queueId, final PullRequest pullRequest) {
    4. String key = this.buildKey(topic, queueId);
    5. ManyPullRequest mpr = this.pullRequestTable.get(key);
    6. if (null == mpr) {
    7. mpr = new ManyPullRequest();
    8. ManyPullRequest prev = this.pullRequestTable.putIfAbsent(key, mpr);
    9. if (prev != null) {
    10. mpr = prev;
    11. }
    12. }
    13. mpr.addPullRequest(pullRequest);
    14. }

    将需要hold处理的PullRequest放入到一个ConcurrentHashMap中,等待被检查;具体的检查代码在checkHoldRequest中:

    1. private void checkHoldRequest() {
    2. for (String key : this.pullRequestTable.keySet()) {
    3. String[] kArray = key.split(TOPIC_QUEUEID_SEPARATOR);
    4. if (2 == kArray.length) {
    5. String topic = kArray[0];
    6. int queueId = Integer.parseInt(kArray[1]);
    7. final long offset = this.brokerController.getMessageStore().getMaxOffsetInQuque(topic, queueId);
    8. try {
    9. this.notifyMessageArriving(topic, queueId, offset);
    10. } catch (Throwable e) {
    11. log.error("check hold request failed. topic={}, queueId={}", topic, queueId, e);
    12. }
    13. }
    14. }
    15. }

    此方法用来获取指定messageQueue下最大的offset,然后用来和当前的offset来比较,来确定是否有新的消息到来;往下看notifyMessageArriving方法:

    1. public void notifyMessageArriving(final String topic, final int queueId, final long maxOffset, final Long tagsCode) {
    2. String key = this.buildKey(topic, queueId);
    3. ManyPullRequest mpr = this.pullRequestTable.get(key);
    4. if (mpr != null) {
    5. List<PullRequest> requestList = mpr.cloneListAndClear();
    6. if (requestList != null) {
    7. List<PullRequest> replayList = new ArrayList<PullRequest>();
    8. for (PullRequest request : requestList) {
    9. long newestOffset = maxOffset;
    10. if (newestOffset <= request.getPullFromThisOffset()) {
    11. newestOffset = this.brokerController.getMessageStore().getMaxOffsetInQuque(topic, queueId);
    12. }
    13. if (newestOffset > request.getPullFromThisOffset()) {
    14. if (this.messageFilter.isMessageMatched(request.getSubscriptionData(), tagsCode)) {
    15. try {
    16. this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),
    17. request.getRequestCommand());
    18. } catch (Throwable e) {
    19. log.error("execute request when wakeup failed.", e);
    20. }
    21. continue;
    22. }
    23. }
    24. if (System.currentTimeMillis() >= (request.getSuspendTimestamp() + request.getTimeoutMillis())) {
    25. try {
    26. this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),
    27. request.getRequestCommand());
    28. } catch (Throwable e) {
    29. log.error("execute request when wakeup failed.", e);
    30. }
    31. continue;
    32. }
    33. replayList.add(request);
    34. }
    35. if (!replayList.isEmpty()) {
    36. mpr.addPullRequest(replayList);
    37. }
    38. }
    39. }
    40. }

    方法中两个重要的判定就是:比较当前的offset和maxoffset,看是否有新的消息到来,有新的消息返回客户端;另外一个就是比较当前的时间和阻塞的时间,看是否超过了最大的阻塞时间,超过也同样返回; 此方法不光在PullRequestHoldService服务类中循环调用检查,同时在DefaultMessageStore中消息被存储的时候调用;其实就是主动检查和被动通知两种方式。

    3.PullCallback回调

    服务端处理完之后,给客户端响应,回调其中的PullCallback,其中在处理完消息之后,重要的一步就是再次把pullRequest放到PullMessageService服务中,等待下一次的轮询;

    总结

    本文首先介绍了两种消费消息的模式,介绍了其中的优缺点,然后引出了长轮询,并且在本地简单模拟了长轮询,最后重点介绍了RocketMQ中是如何实现的长轮询

  • 相关阅读:
    Volcano:在离线作业混部管理平台,实现智能资源管理和作业调度
    23种设计模式(创建型、构造型、行为型)
    Flink容错机制
    springboot 部署到 weblogic 中 jar 包冲突
    【算法1-4】[NOIP2003普及组第三题]递归与递推——栈
    dirsearch网站目录暴力破解
    java io读取数据
    Linux登陆配置虚拟机
    第十四届蓝桥杯校内模拟赛第一期——Python
    【Canvas】js用canvas绘制一个钟表时钟动画效果
  • 原文地址:https://blog.csdn.net/FENGQIYUNRAN/article/details/133863747