• RabbitMQ初步到精通-第八章-Java-AMQP-Client源码分析


    目录

    第八章-Java-AMQP-Client源码分析

    1、背景

            1.1 客户端介绍

            1.2 看源码好处

            1.3 如何看源码

    2、生产者

    3、消费者监听

    4、创建连接

    5、消费者消费

    6. 总结:


    第八章-Java-AMQP-Client源码分析

    1、背景

    1.1 客户端介绍

    通过前面几章的学习,大家对rabbitmq 的基本使用应该ok了,但有的同学可能仍然不满足先去看看Rabbitmq如何实现的,由于rabbitmq 使用Erlang实现的,大家可以自行研究。

    看不了mq的实现,可以看下他的客户端的实现。客户端也有多种语言的实现,我们以java的amqp-client来看。

    1. <dependency>
    2. <groupId>com.rabbitmq</groupId>
    3. <artifactId>amqp-client</artifactId>
    4. <version>5.7.3</version>
    5. </dependency>

    1.2 看源码好处

    另外有些同学会有疑问,能用不就行了,看这源码有啥用呢。

    首先,看源码能满足我们的好奇心,做到知其然又知其所以然。

    其次,在实际运用的过程中,出现的一些问题,需要靠看源码来分析解决。例如,我们之前发现mq的消费很慢,但是消费者处理速度和生产者处理的速度都很快,所以想从mq的客户端看看,有没有什么瓶颈。

    再有,看源码能提升我们的编码能力,学到很多优秀的编码习惯,算法,架构等等。既然这些中间件能开源出来,被广泛使用,肯定有他优秀的地方,开阔自己视野,站在巨人的肩膀上看世界。

    等等...

    1.3 如何看源码

    有的同学可能认为,直接拔开就看呗,一个类一个类的,一个方法一个方法的看。从上往下。不可否认这是一种看法,但这不太适合初期刚看代码的时机,会搞的很懵,直接失去看源码的兴趣。

    总结几个小方法:

    1、可以把源码下载到本地,部署起来,一定要能跑起来。另外也可以省事些,在IDE里面点进Jar包,下载源码,直接从Jar包里看。

    2、按图索骥,看的时候一定不是按一个网来看的,而是专注的一个点,从这个点进去,一步一步跟随到源码中,串成一条线,最后很多的线就会组成一个网,是逐步按照 点、线、面的方式来。

    3、开启Debug,直接读有时候会绕进去,找不到绕出来的方向,因为源码中各个类的实现关联都很多,不如直接按Debug模式,跟随着读进去。这里面有一个很重要的点,-有的源码中是靠新启线程实现的,所以记得Debug断点的时候,要使用Thread模式哦。

    4、抓大放小,读源码我们有时候需要一些不求甚解,需要一些模棱两可,我们无法搞懂所有的东西,特别是刚读的时候,但我们一定要清楚哪些是主线,哪些是边角料。把主线搞清楚即可。

    5、Again and Again, 源码不可能一遍就让你读懂,都熟悉。这是需要不断的重复的一个过程,一遍不懂,就再来一遍,十遍不行就 二十遍,三十遍,每一遍都会有新的收获。

    6、坚持,不轻易放弃。

    后续我们看源码的这几个点,也是按照我们之前讲过的 RabbitMQ的 Simple模式,最简单的案例,涉及到的,追踪到源码中去分析。

    2、生产者

    生产者,代码很简单,追进去,也比较清晰。

    业务代码:

    channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());

    这就是生产者通过channel 发布了一条消息 给默认的 Exchange,并指定了 队列的名称。

    好,追进去,一直追到 ChannelIN 的 basicPublish方法:

    1. /** Public API - {@inheritDoc} */
    2. @Override
    3. public void basicPublish(String exchange, String routingKey,
    4. boolean mandatory, boolean immediate,
    5. BasicProperties props, byte[] body)
    6. throws IOException
    7. {
    8. if (nextPublishSeqNo > 0) {
    9. unconfirmedSet.add(getNextPublishSeqNo());
    10. nextPublishSeqNo++;
    11. }
    12. if (props == null) {
    13. props = MessageProperties.MINIMAL_BASIC;
    14. }
    15. //组装 AMQCommand对象,后续进行网络传输
    16. // 拼装了 交换机,路由键,消息等内容
    17. AMQCommand command = new AMQCommand(
    18. new Basic.Publish.Builder()
    19. .exchange(exchange)
    20. .routingKey(routingKey)
    21. .mandatory(mandatory)
    22. .immediate(immediate)
    23. .build(), props, body);
    24. try {
    25. // 核心发送方法
    26. transmit(command);
    27. } catch (IOException e) {
    28. metricsCollector.basicPublishFailure(this, e);
    29. throw e;
    30. }
    31. metricsCollector.basicPublish(this);
    32. }

    继续追transmit方法,追至 AMQCommand.transmit 方法即可,中间其他的方法可以略过

    这里面的内容也没啥太多关注的,就是拿到Connection去写信息,最后Flush过去。

    1. public void transmit(AMQChannel channel) throws IOException {
    2. int channelNumber = channel.getChannelNumber();
    3. AMQConnection connection = channel.getConnection();
    4. synchronized (assembler) {
    5. Method m = this.assembler.getMethod();
    6. if (m.hasContent()) {
    7. byte[] body = this.assembler.getContentBody();
    8. Frame headerFrame = this.assembler.getContentHeader().toFrame(channelNumber, body.length);
    9. int frameMax = connection.getFrameMax();
    10. boolean cappedFrameMax = frameMax > 0;
    11. int bodyPayloadMax = cappedFrameMax ? frameMax - EMPTY_FRAME_SIZE : body.length;
    12. if (cappedFrameMax && headerFrame.size() > frameMax) {
    13. String msg = String.format("Content headers exceeded max frame size: %d > %d", headerFrame.size(), frameMax);
    14. throw new IllegalArgumentException(msg);
    15. }
    16. connection.writeFrame(m.toFrame(channelNumber));
    17. connection.writeFrame(headerFrame);
    18. for (int offset = 0; offset < body.length; offset += bodyPayloadMax) {
    19. int remaining = body.length - offset;
    20. int fragmentLength = (remaining < bodyPayloadMax) ? remaining
    21. : bodyPayloadMax;
    22. Frame frame = Frame.fromBodyFragment(channelNumber, body,
    23. offset, fragmentLength);
    24. connection.writeFrame(frame);
    25. }
    26. } else {
    27. connection.writeFrame(m.toFrame(channelNumber));
    28. }
    29. }
    30. connection.flush();
    31. }

    至此就把,消息推送到了 MQ Broker。 

    大家用抓包工具抓下会看的更清晰:

    这一次的发送有3个AMQP协议的内容

    第一个,Method

    这些都是我们代码中的参数,完全匹配:

    第二个:头信息

    第三个:消息内容:

    3、消费者监听

    业务代码是要开启一个监听然后将此监听发送到MQ中

    1. //4.开启监听Queue
    2. DefaultConsumer consumer = new DefaultConsumer(channel) {
    3. @Override
    4. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    5. try {
    6. Thread.sleep(1000);
    7. } catch (InterruptedException e) {
    8. e.printStackTrace();
    9. }
    10. System.out.println("小明洗澡用水: " + new String(body, "UTF-8"));
    11. }
    12. };
    13. /**
    14. * 参数1:queue 指定消费哪个队列
    15. * 参数1:deliverCallback 指定是否ACK(true:收到消息会立即告诉RabbiMQ,false:手动告诉)
    16. * 参数1:cancelCallback 指定消费回调
    17. */
    18. channel.basicConsume(QUEUE_NAME, true, consumer);

    我们从basicConsume跟进去:一直到ChannelN 的 basicConsume 中去:

    1. /** Public API - {@inheritDoc} */
    2. @Override
    3. public String basicConsume(String queue, final boolean autoAck, String consumerTag,
    4. boolean noLocal, boolean exclusive, Map arguments,
    5. final Consumer callback)
    6. throws IOException
    7. {
    8. // 拼装 Method 对象,Basic.Consume 后续传输使用
    9. final Method m = new Basic.Consume.Builder()
    10. .queue(queue)
    11. .consumerTag(consumerTag)
    12. .noLocal(noLocal)
    13. .noAck(autoAck)
    14. .exclusive(exclusive)
    15. .arguments(arguments)
    16. .build();
    17. // 声明监听对象 为后续 传输至MQ,MQ返回消息接收使用
    18. BlockingRpcContinuation k = new BlockingRpcContinuation(m) {
    19. @Override
    20. public String transformReply(AMQCommand replyCommand) {
    21. String actualConsumerTag = ((Basic.ConsumeOk) replyCommand.getMethod()).getConsumerTag();
    22. _consumers.put(actualConsumerTag, callback);
    23. // need to register consumer in stats before it actually starts consuming
    24. metricsCollector.basicConsume(ChannelN.this, actualConsumerTag, autoAck);
    25. dispatcher.handleConsumeOk(callback, actualConsumerTag);
    26. return actualConsumerTag;
    27. }
    28. };
    29. // 核心调用 传输信息
    30. rpc(m, k);
    31. try {
    32. if(_rpcTimeout == NO_RPC_TIMEOUT) {
    33. return k.getReply();
    34. } else {
    35. try {
    36. return k.getReply(_rpcTimeout);
    37. } catch (TimeoutException e) {
    38. throw wrapTimeoutException(m, e);
    39. }
    40. }
    41. } catch(ShutdownSignalException ex) {
    42. throw wrap(ex);
    43. }
    44. }

    然后经过后面的方法,还会上面的Method对象包装一层 成为 AMQCommand,最后又调用到了和生产者调用一致的部分:不再赘述了。

    1. AMQPCommand
    2. public void transmit(AMQChannel channel) throws IOException {

    到此为止,就会出现疑问了,我们把消费的信息推送给MQ了,啥时候消费,啥时候调用我们自定义的监听的消费方法呢?这里相当于一个异步了,断层了。

    这个就得往后看了,实际上是后续MQ得知有消费者注册到queue上之后,就会推送给消费者消息,消费者再去获取这个消息。先不急。

    看下网络数据包的情况:

    这时候我们先产生一个交互,先告诉MQ说,我是个消费者,想要消费SolarWaterHeater 这个队列的消息了。MQ如果告诉他,可以。后续MQ再推送消息过来。

     协议信息内容:

    4、创建连接

    我们看下创建连接的部分:因为无论生产者和消费者与MQ交互都得首先创建连接,而且创建连接里面还干了一件重要的事,来解决刚才上面提到的 如何消费MQ推送过来的消息的问题。

    业务创建连接代码:

    1. Connection connection = null;
    2. //创建Connection工厂
    3. ConnectionFactory factory = new ConnectionFactory();
    4. factory.setVirtualHost("my-test-virtual");
    5. factory.setPassword("test");
    6. factory.setUsername("test");
    7. factory.setHost("127.0.0.1");
    8. factory.setPort(5672);
    9. //拿到连接
    10. try {
    11. connection = factory.newConnection();

    跟进newConnection : 来到 ConnectionFactory .newConnection

    1. public Connection newConnection(ExecutorService executor, AddressResolver addressResolver, String clientProvidedName)
    2. throws IOException, TimeoutException {
    3. if(this.metricsCollector == null) {
    4. this.metricsCollector = new NoOpMetricsCollector();
    5. }
    6. // make sure we respect the provided thread factory
    7. FrameHandlerFactory fhFactory = createFrameHandlerFactory();
    8. ConnectionParams params = params(executor);
    9. // set client-provided via a client property
    10. if (clientProvidedName != null) {
    11. Map properties = new HashMap(params.getClientProperties());
    12. properties.put("connection_name", clientProvidedName);
    13. params.setClientProperties(properties);
    14. }
    15. if (isAutomaticRecoveryEnabled()) {
    16. // see com.rabbitmq.client.impl.recovery.RecoveryAwareAMQConnectionFactory#newConnection
    17. AutorecoveringConnection conn = new AutorecoveringConnection(params, fhFactory, addressResolver, metricsCollector);
    18. conn.init();
    19. return conn;
    20. } else {
    21. List
      addrs = addressResolver.getAddresses();
    22. Exception lastException = null;
    23. for (Address addr : addrs) {
    24. try {
    25. // 创建 FrameHandler
    26. FrameHandler handler = fhFactory.create(addr, clientProvidedName);
    27. // 组装AMQConnection 对象
    28. AMQConnection conn = createConnection(params, handler, metricsCollector);
    29. // 核心启动
    30. conn.start();
    31. this.metricsCollector.newConnection(conn);
    32. return conn;
    33. } catch (IOException e) {
    34. lastException = e;
    35. } catch (TimeoutException te) {
    36. lastException = te;
    37. }
    38. }
    39. if (lastException != null) {
    40. if (lastException instanceof IOException) {
    41. throw (IOException) lastException;
    42. } else if (lastException instanceof TimeoutException) {
    43. throw (TimeoutException) lastException;
    44. }
    45. }
    46. throw new IOException("failed to connect");
    47. }
    48. }

    所做的一切,拿配置,地址,拼接FrameHandler 都是为了 组装 AMQConnection 对象,组装对象完成后即需要,conn.start(); 启动连接。继续往下跟:

    1. /**
    2. * Start up the connection, including the MainLoop thread.
    3. * Sends the protocol
    4. * version negotiation header, and runs through
    5. * Connection.Start/.StartOk, Connection.Tune/.TuneOk, and then
    6. * calls Connection.Open and waits for the OpenOk. Sets heart-beat
    7. * and frame max values after tuning has taken place.
    8. * @throws IOException if an error is encountered
    9. * either before, or during, protocol negotiation;
    10. * sub-classes {@link ProtocolVersionMismatchException} and
    11. * {@link PossibleAuthenticationFailureException} will be thrown in the
    12. * corresponding circumstances. {@link AuthenticationFailureException}
    13. * will be thrown if the broker closes the connection with ACCESS_REFUSED.
    14. * If an exception is thrown, connection resources allocated can all be
    15. * garbage collected when the connection object is no longer referenced.
    16. */
    17. public void start()
    18. throws IOException, TimeoutException {
    19. initializeConsumerWorkService();
    20. initializeHeartbeatSender();
    21. this._running = true;
    22. // Make sure that the first thing we do is to send the header,
    23. // which should cause any socket errors to show up for us, rather
    24. // than risking them pop out in the MainLoop
    25. AMQChannel.SimpleBlockingRpcContinuation connStartBlocker =
    26. new AMQChannel.SimpleBlockingRpcContinuation();
    27. // We enqueue an RPC continuation here without sending an RPC
    28. // request, since the protocol specifies that after sending
    29. // the version negotiation header, the client (connection
    30. // initiator) is to wait for a connection.start method to
    31. // arrive.
    32. _channel0.enqueueRpc(connStartBlocker);
    33. try {
    34. // The following two lines are akin to AMQChannel's
    35. // transmit() method for this pseudo-RPC.
    36. _frameHandler.setTimeout(handshakeTimeout);
    37. _frameHandler.sendHeader();
    38. } catch (IOException ioe) {
    39. _frameHandler.close();
    40. throw ioe;
    41. }
    42. this._frameHandler.initialize(this);
    43. AMQP.Connection.Start connStart;
    44. AMQP.Connection.Tune connTune = null;
    45. try {
    46. connStart =
    47. (AMQP.Connection.Start) connStartBlocker.getReply(handshakeTimeout/2).getMethod();
    48. _serverProperties = Collections.unmodifiableMap(connStart.getServerProperties());
    49. Version serverVersion =
    50. new Version(connStart.getVersionMajor(),
    51. connStart.getVersionMinor());
    52. if (!Version.checkVersion(clientVersion, serverVersion)) {
    53. throw new ProtocolVersionMismatchException(clientVersion,
    54. serverVersion);
    55. }
    56. String[] mechanisms = connStart.getMechanisms().toString().split(" ");
    57. SaslMechanism sm = this.saslConfig.getSaslMechanism(mechanisms);
    58. if (sm == null) {
    59. throw new IOException("No compatible authentication mechanism found - " +
    60. "server offered [" + connStart.getMechanisms() + "]");
    61. }
    62. String username = credentialsProvider.getUsername();
    63. String password = credentialsProvider.getPassword();
    64. LongString challenge = null;
    65. LongString response = sm.handleChallenge(null, username, password);
    66. do {
    67. Method method = (challenge == null)
    68. ? new AMQP.Connection.StartOk.Builder()
    69. .clientProperties(_clientProperties)
    70. .mechanism(sm.getName())
    71. .response(response)
    72. .build()
    73. : new AMQP.Connection.SecureOk.Builder().response(response).build();
    74. try {
    75. Method serverResponse = _channel0.rpc(method, handshakeTimeout/2).getMethod();
    76. if (serverResponse instanceof AMQP.Connection.Tune) {
    77. connTune = (AMQP.Connection.Tune) serverResponse;
    78. } else {
    79. challenge = ((AMQP.Connection.Secure) serverResponse).getChallenge();
    80. response = sm.handleChallenge(challenge, username, password);
    81. }
    82. } catch (ShutdownSignalException e) {
    83. Method shutdownMethod = e.getReason();
    84. if (shutdownMethod instanceof AMQP.Connection.Close) {
    85. AMQP.Connection.Close shutdownClose = (AMQP.Connection.Close) shutdownMethod;
    86. if (shutdownClose.getReplyCode() == AMQP.ACCESS_REFUSED) {
    87. throw new AuthenticationFailureException(shutdownClose.getReplyText());
    88. }
    89. }
    90. throw new PossibleAuthenticationFailureException(e);
    91. }
    92. } while (connTune == null);
    93. } catch (TimeoutException te) {
    94. _frameHandler.close();
    95. throw te;
    96. } catch (ShutdownSignalException sse) {
    97. _frameHandler.close();
    98. throw AMQChannel.wrap(sse);
    99. } catch(IOException ioe) {
    100. _frameHandler.close();
    101. throw ioe;
    102. }
    103. try {
    104. int channelMax =
    105. negotiateChannelMax(this.requestedChannelMax,
    106. connTune.getChannelMax());
    107. _channelManager = instantiateChannelManager(channelMax, threadFactory);
    108. int frameMax =
    109. negotiatedMaxValue(this.requestedFrameMax,
    110. connTune.getFrameMax());
    111. this._frameMax = frameMax;
    112. int heartbeat =
    113. negotiatedMaxValue(this.requestedHeartbeat,
    114. connTune.getHeartbeat());
    115. setHeartbeat(heartbeat);
    116. _channel0.transmit(new AMQP.Connection.TuneOk.Builder()
    117. .channelMax(channelMax)
    118. .frameMax(frameMax)
    119. .heartbeat(heartbeat)
    120. .build());
    121. _channel0.exnWrappingRpc(new AMQP.Connection.Open.Builder()
    122. .virtualHost(_virtualHost)
    123. .build());
    124. } catch (IOException ioe) {
    125. _heartbeatSender.shutdown();
    126. _frameHandler.close();
    127. throw ioe;
    128. } catch (ShutdownSignalException sse) {
    129. _heartbeatSender.shutdown();
    130. _frameHandler.close();
    131. throw AMQChannel.wrap(sse);
    132. }
    133. // We can now respond to errors having finished tailoring the connection
    134. this._inConnectionNegotiation = false;
    135. }

    上面这段代码比较长,也是最核心的启动连接代码了,其实他的注释已经说的很清楚了,我们来看下注释:

    * Start up the connection, including the MainLoop thread.
    启动连接,包括MainLoop thread  重点来了-后续我们的消费消息,就主要靠这哥们了。
    * Sends the protocol
    发送协议
    * version negotiation header, and runs through
    协议头
    * Connection.Start/.StartOk, Connection.Tune/.TuneOk, and then
    这都是建立连接的相关协议内容了
    * calls Connection.Open and waits for the OpenOk. 
    有来有回
    * Sets heart-beat 
    心跳
    * and frame max values after tuning has taken place.

    所以,这个方法主要是发送创建连接的各种协议,双方经过沟通建立连接的过程。当然,最最最重要的一点,创建了MainLoop

    创建连接的内容,我们不在过多关注了,我们主要看下创建的MainLoop,

    找到:

    this._frameHandler.initialize(this);

    跟进去:一直跟到 AMQConnection.startMainLoop

    1. public void startMainLoop() {
    2. MainLoop loop = new MainLoop();
    3. final String name = "AMQP Connection " + getHostAddress() + ":" + getPort();
    4. mainLoopThread = Environment.newThread(threadFactory, loop, name);
    5. mainLoopThread.start();
    6. }

    很明显MainLoop 是一个线程,通过 ThreadFactory new出来,并启动了。来看下这个线程是做什么的:

    1. private class MainLoop implements Runnable {
    2. /**
    3. * Channel reader thread main loop. Reads a frame, and if it is
    4. * not a heartbeat frame, dispatches it to the channel it refers to.
    5. * Continues running until the "running" flag is set false by
    6. * shutdown().
    7. */
    8. @Override
    9. public void run() {
    10. boolean shouldDoFinalShutdown = true;
    11. try {
    12. while (_running) {
    13. Frame frame = _frameHandler.readFrame();
    14. readFrame(frame);
    15. }
    16. } catch (Throwable ex) {
    17. if (ex instanceof InterruptedException) {
    18. // loop has been interrupted during shutdown,
    19. // no need to do it again
    20. shouldDoFinalShutdown = false;
    21. } else {
    22. handleFailure(ex);
    23. }
    24. } finally {
    25. if (shouldDoFinalShutdown) {
    26. doFinalShutdown();
    27. }
    28. }
    29. }
    30. }

    读注释即可:

    * Channel reader thread main loop. Reads a frame, 
    读数据帧
    
    * and if it is not a heartbeat frame, 
    如果不是心跳帧
    
    * dispatches it to the channel it refers to.
    分发到对应的处理channel中去
    
    * Continues running until the "running" flag is set false by shutdown().
    一直在持续运行,直至关闭

    总结:

    这就很清楚了,这是一个无限循环的线程,一直在读取Broker传递给我们的信息,读到对应的非心跳的内容,转交到对应的处理类进行处理。

    到这里我们是不是有些思路了,其实 消费者的消费就是在这里监听到,并处理的。

    截止到此,创建连接的内容完毕,并引出了MainLoop的内容。

    5、消费者消费

    好,我们继续看下这个死循环,是如何读消息的:

    Frame frame = _frameHandler.readFrame();
    

    这个是读到的消息,并包装成了Frame 对象,我们不再看这部分内容了

    继续:跟进

    readFrame(frame);
    1. private void readFrame(Frame frame) throws IOException {
    2. if (frame != null) {
    3. _missedHeartbeats = 0;
    4. if (frame.type == AMQP.FRAME_HEARTBEAT) {
    5. // Ignore it: we've already just reset the heartbeat counter.
    6. } else {
    7. if (frame.channel == 0) { // the special channel
    8. _channel0.handleFrame(frame);
    9. } else {
    10. if (isOpen()) {
    11. // If we're still _running, but not isOpen(), then we
    12. // must be quiescing, which means any inbound frames
    13. // for non-zero channels (and any inbound commands on
    14. // channel zero that aren't Connection.CloseOk) must
    15. // be discarded.
    16. ChannelManager cm = _channelManager;
    17. if (cm != null) {
    18. ChannelN channel;
    19. try {
    20. channel = cm.getChannel(frame.channel);
    21. } catch(UnknownChannelException e) {
    22. // this can happen if channel has been closed,
    23. // but there was e.g. an in-flight delivery.
    24. // just ignoring the frame to avoid closing the whole connection
    25. LOGGER.info("Received a frame on an unknown channel, ignoring it");
    26. return;
    27. }
    28. channel.handleFrame(frame);
    29. }
    30. }
    31. }
    32. }
    33. } else {
    34. // Socket timeout waiting for a frame.
    35. // Maybe missed heartbeat.
    36. handleSocketTimeout();
    37. }
    38. }

    上面的内容是真正的去解析处理 读到的Frame 的内容了,我们看 channel.handleFrame(frame);

    即可,继续追踪:

    1. public void handleFrame(Frame frame) throws IOException {
    2. AMQCommand command = _command;
    3. if (command.handleFrame(frame)) { // a complete command has rolled off the assembly line
    4. _command = new AMQCommand(); // prepare for the next one
    5. handleCompleteInboundCommand(command);
    6. }
    7. }

    继续追踪处理:

    command.handleFrame(frame)

    这个实际上是解析消息的具体内容,然后设置到对应的对象中的属性中去了。pass掉了

    继续:

    handleCompleteInboundCommand(command);

    追踪至 AMQChannel中的 

    handleCompleteInboundCommand- > 
    processAsync(command)

    一至到ChannelN 中的 processAsync 

    1. /**
    2. * Protected API - Filters the inbound command stream, processing
    3. * Basic.Deliver, Basic.Return and Channel.Close specially. If
    4. * we're in quiescing mode, all inbound commands are ignored,
    5. * except for Channel.Close and Channel.CloseOk.
    6. */
    7. @Override public boolean processAsync(Command command) throws IOException
    8. {
    9. // If we are isOpen(), then we process commands normally.
    10. //
    11. // If we are not, however, then we are in a quiescing, or
    12. // shutting-down state as the result of an application
    13. // decision to close this channel, and we are to discard all
    14. // incoming commands except for a close and close-ok.
    15. Method method = command.getMethod();
    16. // we deal with channel.close in the same way, regardless
    17. if (method instanceof Channel.Close) {
    18. asyncShutdown(command);
    19. return true;
    20. }
    21. if (isOpen()) {
    22. // We're in normal running mode.
    23. if (method instanceof Basic.Deliver) {
    24. processDelivery(command, (Basic.Deliver) method);
    25. return true;
    26. } else if (method instanceof Basic.Return) {
    27. callReturnListeners(command, (Basic.Return) method);
    28. return true;
    29. } else if (method instanceof Channel.Flow) {
    30. Channel.Flow channelFlow = (Channel.Flow) method;
    31. synchronized (_channelMutex) {
    32. _blockContent = !channelFlow.getActive();
    33. transmit(new Channel.FlowOk(!_blockContent));
    34. _channelMutex.notifyAll();
    35. }
    36. return true;
    37. } else if (method instanceof Basic.Ack) {
    38. Basic.Ack ack = (Basic.Ack) method;
    39. callConfirmListeners(command, ack);
    40. handleAckNack(ack.getDeliveryTag(), ack.getMultiple(), false);
    41. return true;
    42. } else if (method instanceof Basic.Nack) {
    43. Basic.Nack nack = (Basic.Nack) method;
    44. callConfirmListeners(command, nack);
    45. handleAckNack(nack.getDeliveryTag(), nack.getMultiple(), true);
    46. return true;
    47. } else if (method instanceof Basic.RecoverOk) {
    48. for (Map.Entry entry : Utility.copy(_consumers).entrySet()) {
    49. this.dispatcher.handleRecoverOk(entry.getValue(), entry.getKey());
    50. }
    51. // Unlike all the other cases we still want this RecoverOk to
    52. // be handled by whichever RPC continuation invoked Recover,
    53. // so return false
    54. return false;
    55. } else if (method instanceof Basic.Cancel) {
    56. Basic.Cancel m = (Basic.Cancel)method;
    57. String consumerTag = m.getConsumerTag();
    58. Consumer callback = _consumers.remove(consumerTag);
    59. if (callback == null) {
    60. callback = defaultConsumer;
    61. }
    62. if (callback != null) {
    63. try {
    64. this.dispatcher.handleCancel(callback, consumerTag);
    65. } catch (WorkPoolFullException e) {
    66. // couldn't enqueue in work pool, propagating
    67. throw e;
    68. } catch (Throwable ex) {
    69. getConnection().getExceptionHandler().handleConsumerException(this,
    70. ex,
    71. callback,
    72. consumerTag,
    73. "handleCancel");
    74. }
    75. }
    76. return true;
    77. } else {
    78. return false;
    79. }
    80. } else {
    81. // We're in quiescing mode == !isOpen()
    82. if (method instanceof Channel.CloseOk) {
    83. // We're quiescing, and we see a channel.close-ok:
    84. // this is our signal to leave quiescing mode and
    85. // finally shut down for good. Let it be handled as an
    86. // RPC reply one final time by returning false.
    87. return false;
    88. } else {
    89. // We're quiescing, and this inbound command should be
    90. // discarded as per spec. "Consume" it by returning
    91. // true.
    92. return true;
    93. }
    94. }
    95. }

    开始逐步烧脑了,这一段debug的时候,记得开启Thread模式哦。

    首先看方法名吧:processAsync  明显的一个异步处理,究竟处理啥?看注释:

    * Protected API - Filters the inbound command stream, processing
    * Basic.Deliver, Basic.Return and Channel.Close specially.  If
    * we're in quiescing mode, all inbound commands are ignored,
    * except for Channel.Close and Channel.CloseOk.

    处理 Basic.Deliver, Basic.Return and Channel.Close  ,我们最关注的是 Deliver 投递对吧,

    这就是Broker把消息投递给我们呢方法。

    所以,我们找到了我们的关注点:

    1. if (method instanceof Basic.Deliver) {
    2. processDelivery(command, (Basic.Deliver) method);
    3. return true;

    继续哦:

    1. protected void processDelivery(Command command, Basic.Deliver method) {
    2. Basic.Deliver m = method;
    3. Consumer callback = _consumers.get(m.getConsumerTag());
    4. if (callback == null) {
    5. if (defaultConsumer == null) {
    6. // No handler set. We should blow up as this message
    7. // needs acking, just dropping it is not enough. See bug
    8. // 22587 for discussion.
    9. throw new IllegalStateException("Unsolicited delivery -" +
    10. " see Channel.setDefaultConsumer to handle this" +
    11. " case.");
    12. }
    13. else {
    14. callback = defaultConsumer;
    15. }
    16. }
    17. Envelope envelope = new Envelope(m.getDeliveryTag(),
    18. m.getRedelivered(),
    19. m.getExchange(),
    20. m.getRoutingKey());
    21. try {
    22. // call metricsCollector before the dispatching (which is async anyway)
    23. // this way, the message is inside the stats before it is handled
    24. // in case a manual ack in the callback, the stats will be able to record the ack
    25. metricsCollector.consumedMessage(this, m.getDeliveryTag(), m.getConsumerTag());
    26. this.dispatcher.handleDelivery(callback,
    27. m.getConsumerTag(),
    28. envelope,
    29. (BasicProperties) command.getContentHeader(),
    30. command.getContentBody());
    31. } catch (WorkPoolFullException e) {
    32. // couldn't enqueue in work pool, propagating
    33. throw e;
    34. } catch (Throwable ex) {
    35. getConnection().getExceptionHandler().handleConsumerException(this,
    36. ex,
    37. callback,
    38. m.getConsumerTag(),
    39. "handleDelivery");
    40. }
    41. }

    哦哦,我们看到了什么 Consumer callback ,消费者回调,That's whant we want.

    this.dispatcher.handleDelivery(callback,
                                   m.getConsumerTag(),
                                   envelope,
                                   (BasicProperties) command.getContentHeader(),
                                   command.getContentBody());

    继续哦 

    进入到了ConsumerDispatcher 

    handleDelivery
    1. public void handleDelivery(final Consumer delegate,
    2. final String consumerTag,
    3. final Envelope envelope,
    4. final AMQP.BasicProperties properties,
    5. final byte[] body) throws IOException {
    6. executeUnlessShuttingDown(
    7. new Runnable() {
    8. @Override
    9. public void run() {
    10. try {
    11. delegate.handleDelivery(consumerTag,
    12. envelope,
    13. properties,
    14. body);
    15. } catch (Throwable ex) {
    16. connection.getExceptionHandler().handleConsumerException(
    17. channel,
    18. ex,
    19. delegate,
    20. consumerTag,
    21. "handleDelivery");
    22. }
    23. }
    24. });
    25. }

    哇,handleDelivery 有没有很熟悉,我们的业务代码监听不就是实现的她吗? 真的是她吗?

    是她是她就是她。。。

    再来波回忆杀:

    1. DefaultConsumer consumer = new DefaultConsumer(channel) {
    2. @Override
    3. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    4. try {
    5. Thread.sleep(1000);
    6. } catch (InterruptedException e) {
    7. e.printStackTrace();
    8. }
    9. System.out.println("小明洗澡用水: " + new String(body, "UTF-8"));
    10. }
    11. };

     好了,到此我们终于找到了,是死循环读来的消息,调用回调Consumer,调用到了子类的实现的handleDelivery 方法,真正的去实现消息的消费。

    不过还没完,她到底是怎么触发的呢?

    1. private void executeUnlessShuttingDown(Runnable r) {
    2. if (!this.shuttingDown) execute(r);
    3. }
    1. private void execute(Runnable r) {
    2. checkShutdown();
    3. this.workService.addWork(this.channel, r);
    4. }

    继续来到

    ConsumerWorkService.addWork
    1. public void addWork(Channel channel, Runnable runnable) {
    2. if (this.workPool.addWorkItem(channel, runnable)) {
    3. this.executor.execute(new WorkPoolRunnable());
    4. }
    5. }

    继续:WorkPool 

    addWorkItem
    1. /**
    2. * Add (enqueue) an item for a specific client.
    3. * No change and returns false if client not registered.
    4. * If dormant, the client will be marked ready.
    5. * @param key the client to add to the work item to
    6. * @param item the work item to add to the client queue
    7. * @return true if and only if the client is marked ready
    8. * — as a result of this work item
    9. */
    10. public boolean addWorkItem(K key, W item) {
    11. VariableLinkedBlockingQueue queue;
    12. synchronized (this) {
    13. queue = this.pool.get(key);
    14. }
    15. // The put operation may block. We need to make sure we are not holding the lock while that happens.
    16. if (queue != null) {
    17. enqueueingCallback.accept(queue, item);
    18. synchronized (this) {
    19. if (isDormant(key)) {
    20. dormantToReady(key);
    21. return true;
    22. }
    23. }
    24. }
    25. return false;
    26. }

    这稍微有点绕了,首先我们要从 Map 缓存pool 取出 一个  VariableLinkedBlockingQueue

    根据啥取呢,根据的是Channel,所以每个Channel是相独立的,Blocking Queue后续的操作也是阻塞的。

    来到 了 

    enqueueingCallback.accept(queue, item);

    这是个啥鬼? 这又一个回调,使用了@FunctionalInterface

    真实的方法在初始WorkPool 的时候

    1. public WorkPool(final int queueingTimeout) {
    2. if (queueingTimeout > 0) {
    3. this.enqueueingCallback = (queue, item) -> {
    4. try {
    5. boolean offered = queue.offer(item, queueingTimeout, TimeUnit.MILLISECONDS);
    6. if (!offered) {
    7. throw new WorkPoolFullException("Could not enqueue in work pool after " + queueingTimeout + " ms.");
    8. }
    9. } catch (InterruptedException e) {
    10. Thread.currentThread();
    11. }
    12. };
    13. } else {
    14. this.enqueueingCallback = (queue, item) -> {
    15. try {
    16. queue.put(item);
    17. } catch (InterruptedException e) {
    18. Thread.currentThread().interrupt();
    19. }
    20. };
    21. }
    22. }

    我们看后面的else内容即可:

    queue.put(item);

    What ?竟然把内容放到了一个本地的BlockingQueue 中去了,放的啥内容呢?

    就是我们前面的那个线程对象 

    Runnable runnable

    倒腾一下嘛,就是那个

    delegate.handleDelivery(consumerTag,
            envelope,
            properties,
            body);

    有点意思了吧,把整个需要消费的内容扔进了队列里,这时候程序就可以返回给MainLoop了,他有可以继续抓包了。

    但我们还没结束,对吧,继续咯

    代码倒回来:

    1. if (this.workPool.addWorkItem(channel, runnable)) {
    2. this.executor.execute(new WorkPoolRunnable());
    3. }

    这次要看 

    executor.execute了

    首先我们看下这个  ExecutorService executor 这个没特殊指定的话,我们再初始Connection的时候就会初始化这个 ConsumerWorkService,就把 executor 初始化了,一个固定的线程池:

    1. public ConsumerWorkService(ExecutorService executor, ThreadFactory threadFactory, int queueingTimeout, int shutdownTimeout) {
    2. this.privateExecutor = (executor == null);
    3. this.executor = (executor == null) ? Executors.newFixedThreadPool(DEFAULT_NUM_THREADS, threadFactory)
    4. : executor;
    5. this.workPool = new WorkPool<>(queueingTimeout);
    6. this.shutdownTimeout = shutdownTimeout;
    7. }

    几个线程呢?

    private static final int DEFAULT_NUM_THREADS = Runtime.getRuntime().availableProcessors() * 2;
    

    当前计算机的核数 * 2 , 八核的就是初始化 16个线程。

    这16个线程是跟随Connection的,所以,每个Connection就只有这16个线程在处理呗。

    继续咯

    this.executor.execute(new WorkPoolRunnable());

    又要搞个线程,

    1. private final class WorkPoolRunnable implements Runnable {
    2. @Override
    3. public void run() {
    4. int size = MAX_RUNNABLE_BLOCK_SIZE;
    5. List block = new ArrayList(size);
    6. try {
    7. Channel key = ConsumerWorkService.this.workPool.nextWorkBlock(block, size);
    8. if (key == null) return; // nothing ready to run
    9. try {
    10. for (Runnable runnable : block) {
    11. runnable.run();
    12. }
    13. } finally {
    14. if (ConsumerWorkService.this.workPool.finishWorkBlock(key)) {
    15. ConsumerWorkService.this.executor.execute(new WorkPoolRunnable());
    16. }
    17. }
    18. } catch (RuntimeException e) {
    19. Thread.currentThread().interrupt();
    20. }
    21. }
    22. }

    终于等到你,这就是我们核心中的核心了,触发消费也就靠这了。

    这个线程被线程池搞起后,做啥了呢?

    1.声明一个 16个大小的 ArrayList

    2. 取出我们的BlockingQueue,再接着呢,从Queue中取出16个Runnable对象【真正的消费逻辑】,放到ArrayList 中

    3. 循环16个 Runable对象,直接调用其run 方法, 这时候自然就调到了我们的handleDelivery- 业务方法愉快的去消费了。

    4. 最后呢,还要看我们这队列中还没有待处理的数据了,如果还要有的话,通过线程池再起线程继续执行  WorkPoolRunnable 的run 方法,也就是本方法,

    如果队列中一直有消息,而且还一直有消息进来,那线程池就会一直在启线程处理,直到16个线程都启动满负载运转,这时候就会存在本地BlockingQueue的堆积了。

    补充下消费的抓包情况:

    第一个AMQP

    第二个:

    第三个:

    第四个:消息内容:

      

    6. 总结:

    总的来说,amqp的代码相对简单的,最绕的就是消费者那块了。

    首先是靠 MainLoop驱动,

    其次,将消息内容的处理方法投递到了本地 BlockingQueue中,

    最后,靠启动线程取出Queue中的处理方法,进行本地消费。

    来个汇总小图,大家结合代码看:

  • 相关阅读:
    IO DAY2
    Qt使用qtwebapp编写http服务的步骤
    ARM按键中断控制事件
    PostgreSQL分区分表方法研究
    2.3IP详解及配置
    JDBC 版本和历史
    前端模块化
    日常开发小汇总(5)数组克隆、伪数组转换为真数组、随机排序
    博物馆网上展厅有哪些用途,如何搭建数字时代的文化宝库
    深度理解Synchronized及底层原理
  • 原文地址:https://blog.csdn.net/blucastle/article/details/127954324