• 消息队列(六):服务器设计


    紧接着上一章没说完的进行服务器的补充。

    推送给消费者消息的基本实现思路

    1. 让 brokerServer 把哪些消费者管理好
    2. 收到对应的消息,把消息推送给消费者

    消费者是以队列为维度来订阅消息的,一个队列可以有多个消费者(此处我们约定按照轮询的方式来进行消费)

    消费者消费消息的核心逻辑

    这里又一次提到了消费者,我们来把消费者相关的代码完善一下。

    消费者管理实现

    消费者

    先前我们提到了这个函数式接口,这个接口的作用就是用来消费消息(和消费者作用一样)。

    那么我们就给这个函数式接口起名消费者 consumer 

    上一章也提到了具体的代码,这里再演示一次:

    1. @FunctionalInterface
    2. public interface Consumer {
    3. // Deliver 的意思就是"投递",这个方法预期是在每次服务器收到消息,来调用
    4. // 通过这个方法把消息推送给对应的消费者
    5. void handleDeliver(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException;
    6. }

    具体是想干啥,谁调用由谁决定(具体代码)。

    消费者完整的执行环境

    消费者在消费的时候,需要知道是哪个消费者进行消费了,是哪个队列传过来的消息,知否自动应答。

    故此,我们大概有如下几个参数:

    1. private String consumerTag;
    2. private String queueName;
    3. private boolean autoAck;
    4. // 通过这个回调来处理收到的消息
    5. private Consumer consumer;

    以及对应的 getting、setting  方法 和 构造方法 。

    消费者管理类

    这个类我放在了核心类。

    啥时候执行这个消费者完整的执行环境啊,通过这个类,来实现消费消息的核心逻辑。

    订阅消息的核心 就是这个    consumer.addConsumer() 。

    根据这个图,我们也能看出来我们大致需要如下几个属性:

    需要一个队列,一个扫描线程,此外还需要记录一下是哪个虚拟主机持有的消费者;此外,还需要一个执回调(函数式接口)的线程池。

    1. // 持有上层的 VirtualHost 对象的引用. 用来操作数据.
    2. private VirtualHost parent;
    3. // 指定一个线程池, 负责去执行具体的回调任务.
    4. private ExecutorService workerPool = Executors.newFixedThreadPool(4);
    5. // 存放令牌的队列
    6. private BlockingQueue<String> tokenQueue = new LinkedBlockingQueue<>();
    7. // 扫描线程
    8. private Thread scannerThread = null;

    方法:

    • 构造方法
    • 添加消费者
    • 消费消息
    • 唤醒消费

    先来说说构造方法:

    构造器只持有虚拟机的名字;因为扫描线程是不断地进行扫描,啥时候启动这个扫描线程呢?这里就设置在了构造方法中。

    扫描线程的实现逻辑就如上图所言,一旦调用虚拟主机中的发送消息就会唤醒消费(具体的唤醒就是往存放令牌的队列中添加队列名),扫描线程扫描的就是这个队列,一旦队列有消息进来就调用消费消息这个方法。

    1. public ConsumerManager(VirtualHost parent) {
    2. this.parent = parent;
    3. scannerThread = new Thread(() -> {
    4. while (true) {
    5. try {
    6. // 1. 拿到令牌
    7. String queueName = tokenQueue.take();
    8. // 2. 根据令牌找到队列
    9. MSGQueue queue = parent.getMemoryDataCenter().getQueue(queueName);
    10. if (queue == null) {
    11. throw new MqException("[ConsumerManager] 取令牌后发现,该队列名不存在!queueName="+queueName);
    12. }
    13. // 3. 从这个队列中消费一个信息
    14. synchronized (queue) {
    15. consumeMessage(queue);
    16. }
    17. }catch (InterruptedException | MqException e) {
    18. e.printStackTrace();;
    19. }
    20. }
    21. });
    22. // 将线程设为后台线程
    23. scannerThread.setDaemon(true);
    24. scannerThread.start();
    25. }

    唤醒消费

    该方法只有一行代码,就是将队列名放入令牌队列。

    1. // 这个方法的调用时机就是发送消息的时候.
    2. public void notifyConsume(String queueName) throws InterruptedException {
    3. tokenQueue.put(queueName);
    4. }

    添加消费者

    在这个方法中,之前记录的虚拟机主机名就派上用场了。

    我们需要从虚拟主机中获取到队列(调用者需要传递下来队列名称),类型为 核心类: MSGQueue(此时可以补充两个方法):

    1. // 添加一个新的订阅者
    2. public void addConsumerEnv(ConsumerEnv consumerEnv) {
    3. consumerEnvList.add(consumerEnv);
    4. }
    5. // 订阅者删除暂时先不考虑
    6. // 先挑选一个订阅者,用来处理当前的消息(轮询的方式)
    7. public ConsumerEnv chooseConsumer() {
    8. if (consumerEnvList.size() == 0) {
    9. // 该队列没有人订阅
    10. return null;
    11. }
    12. // 计算一下当前要取的元素下标
    13. int index = consumerSeq.get() % consumerEnvList.size();
    14. consumerSeq.getAndIncrement();
    15. return consumerEnvList.get(index);
    16. }

    我们采取一个轮询的方式去处理消息,每个消费者都有机会进行消费消息。

    具体的消费就是通过下标取模的方式,这里涉及到了多线程同时调用,所以使用了原子类 AtomicInteger 修饰 consumerSeq 。

    没有就需要抛出异常,有的话需要创建出完整的消费者环境(将参数都传进去),随后将其添加到队列中去。

    在进行循环,如果队列有消息就先进行消费完。

    具体代码如下:

    1. public void addConsumer(String consumerTag, String queueName, boolean autoAck, Consumer consumer) throws MqException {
    2. MSGQueue queue = parent.getMemoryDataCenter().getQueue(queueName);
    3. if (queue == null) {
    4. throw new MqException("[ConsumerManager] 队列不存在! queueName=" + queueName);
    5. }
    6. ConsumerEnv consumerEnv = new ConsumerEnv(consumerTag,queueName,autoAck,consumer);
    7. synchronized (queue) {
    8. queue.addConsumerEnv(consumerEnv);
    9. // 如果当前这个队列已经有消息了,就需要立即消费掉
    10. int n = parent.getMemoryDataCenter().getMessageCount(queueName);
    11. for (int i = 0; i < n; i++) {
    12. // 这个放啊调用一次就消费一条消息
    13. consumeMessage(queue);
    14. }
    15. }
    16. }

    消费消息

    大致逻辑

    1. 按照轮询的方法,找个消费者出来(没有消费者就暂时不消费,等有消费者出现才进行消费)
    2. 从队列中取出一个消息(当前队列中还没有消息,也不需要消费)
    3. 把消息带入到消费者的回调方法中, 丢给线程池执行
      1. 把消息放到待确认的集合中. 这个操作势必在执行回调之前
      2. 真正执行回调操作
      3. 如果当前是 "自动应答" , 就可以直接把消息删除了. 如果当前是 "手动应答" , 则先不处理, 交给后续消费者调用 basicAck 方法来处理.
        1. 删除硬盘上的消息
        2. 删除上面的待确认集合中的消息
        3. 删除内存中消息中心里的消息
    具体代码如下:
    1. private void consumeMessage(MSGQueue queue) {
    2. // 1. 按照轮询的方法,找个消费者出来
    3. ConsumerEnv luckyDog = queue.chooseConsumer();
    4. if (luckyDog == null) {
    5. // 当前队列没有消费者, 暂时不消费. 等后面有消费者出现再说.
    6. return;
    7. }
    8. // 2. 从队列中取出一个消息
    9. Message message = parent.getMemoryDataCenter().pollMessage(queue.getName());
    10. if (message == null) {
    11. // 当前队列中还没有消息, 也不需要消费.
    12. return;
    13. }
    14. // 3. 把消息带入到消费者的回调方法中, 丢给线程池执行.
    15. workerPool.submit(() -> {
    16. try {
    17. // 1. 把消息放到待确认的集合中. 这个操作势必在执行回调之前.
    18. parent.getMemoryDataCenter().addMessageWaitAck(queue.getName(), message);
    19. // 2. 真正执行回调操作
    20. luckyDog.getConsumer().handleDeliver(luckyDog.getConsumerTag(), message.getBasicProperties(),
    21. message.getBody());
    22. // 3. 如果当前是 "自动应答" , 就可以直接把消息删除了.
    23. // 如果当前是 "手动应答" , 则先不处理, 交给后续消费者调用 basicAck 方法来处理.
    24. if (luckyDog.isAutoAck()) {
    25. // 1) 删除硬盘上的消息
    26. if (message.getDeliverMode() == 2) {
    27. parent.getDiskDataCenter().deleteMessage(queue, message);
    28. }
    29. // 2) 删除上面的待确认集合中的消息
    30. parent.getMemoryDataCenter().removeMessageWaitAck(queue.getName(), message.getMessageId());
    31. // 3) 删除内存中消息中心里的消息
    32. parent.getMemoryDataCenter().removeMessage(message.getMessageId());
    33. System.out.println("[ConsumerManager] 消息被成功消费! queueName=" + queue.getName());
    34. }
    35. } catch (Exception e) {
    36. e.printStackTrace();
    37. }
    38. });
    39. }
    40. }
    关于确认消息
    能够确保消息是被正确的消费掉了,消费者的回调函数,顺利执⾏完了(中间没有抛出异常)
    这条消息就可以被删除了。
    消息确认也就是为了保证“消息不丢失”
    为了达成消息不丢失这样的效果,这样处理:
    • 在真正执⾏回调之前,把这个消息先放到 “待确认的集合”中
    • 真正回调
    • 当前消费者采取的是 autoAck=true,就认为回调执⾏完毕不抛异常,就算消费成功,然后就可以删除消息
    • 当前消费者采取的是 autoAck=false,⼿动应答,就需要消费者再回调函数内部,显式调⽤
      basicAck这个核⼼API
    basicAck实现原理,⽐较简单,当传⼊参数 autoAck=false, 就⼿动再回调函数的时候,调⽤
    basicAck 就⾏(具体的在 VirtualHost中)

    消息确认是为了保证消息不丢失,而需要的逻辑。

    • 1. 执⾏回调⽅法的过程中,抛异常了
      • 当回调函数异常,后续逻辑执⾏不到了。此时这个消费就会始终待在待确认集合中。
        RabbitMQ中会设置⼀个死信队列,每⼀个队列都会绑定⼀个死信队列。应⽤场景:当消息在 消费过程中出现异常,就会把消息投⼊到死信队列中;当消息设置了过期时间,如果在过期时 间内,没有被消费,就会投⼊到死信队列中;当队列达到最⼤⻓度时,新的消息将⽆法被发送 到队列中。此时,RabbitMQ可以选择将这些⽆法发送的消息发送到死信队列中,以便进⾏进 ⼀步处理
    • 2. 执⾏回调过程中, Broker Server崩溃了,内存数据都没了!但是硬盘数据还在,正在消费的这个 消息,在硬盘中仍然存在。BrokerServer重启后,这个消息就⼜被加载到内存了,就像从来没被消 费过⼀样。消费者就会有机会重新得到这个消息。

    BrokerServer

    Broker Server 本质是一个服务器,我在这个自定义服务器上添加了自定义应用层协议。

    自定义协议

    具体的协议设置:

    请求和响应

    •  type : 用于描述当前这个请求和响应是要干啥的
      • 在MQ中,客户端(⽣产者 + 消费者)和 服务器 (Broker Server)之间,要进⾏哪些操作?(就是VirtualHost中的那些核⼼API)
      • 希望客户端,能通过⽹络远程调⽤这些API
      • 此处的type就是描述当前这个请求/响应是在调⽤哪个API
      • TCP是有连接的,Channel 是 Connection 内部的逻辑连接。此时⼀个 Connection 中可能有多 个连接,为啥要这么设计?就是为了让 TCP 连接得到复用(不断地创建和删除 TCP 连接,成本还是比较高的)
    • length:⾥⾯存储的是 payload的⻓度。⽤4个字节来存储
    • payload:会根据当前是请求还是响应,以及当前的 type 有不同的值
      • ⽐如 type 是 0x3(创建交换机),同时当前是个请求,此时 payload 的内容,就相当于是
        exchangeDelcare 的参数的序列化结果
      • ⽐如 type 是 0x3(创建交换机),同时当前是个响应,此时 payload 的内容,就相当于是
        exchangeDelcare 的返回结果的序列化内容

    每一个请求对应的 响应不同(重点是 payload 不同),所以对应每一个请求都单独设计一个类,帮助构造响应。

    ExchangeDelcare

    request

    response

    通信流程:

    由于不同的 payload 我们需要对其进行设计:

    根据上述图示,我们需要如下几个参数:

    由于每次响应都会带有 rid 和 channelId,所以将其设为父类:

    其他的类也一样,继承这个类,并实现串行化,我就不一一举例了,我把大致的图放下来:

    ExchangeDelete

    QueueDelcare

    QueueDelete

    QueueBind

    QueueUnBind

    BasicPublish

    BasicConsumer

    BasicAck

    创建 BrokerServer类

    消息队列本体服务器

    实现读取请求和写回响应

    读取请求
    1. private Request readRequest(DataInputStream dataInputStream) throws IOException {
    2. Request request = new Request();
    3. request.setType(dataInputStream.readInt());
    4. request.setLength(dataInputStream.readInt());
    5. byte[] payload = new byte[request.getLength()];
    6. int n = dataInputStream.read(payload);
    7. if (n != request.getLength()) {
    8. throw new IOException("读取请求格式出错!");
    9. }
    10. request.setPayload(payload);
    11. return request;
    12. }
    写回响应
    1. private void writeResponse(DataOutputStream dataOutputStream, Response response) throws IOException {
    2. dataOutputStream.writeInt(response.getType());
    3. dataOutputStream.writeInt(response.getLength());
    4. dataOutputStream.write(response.getPayload());
    5. // 这个刷新缓冲区也是重要的操作!!
    6. dataOutputStream.flush();
    7. }

    清理过期会话

    1. private void clearClosedSession(Socket clientSocket) {
    2. // 这里要做的事情, 主要就是遍历上述 sessions hash 表, 把该被关闭的 socket 对应的键值对, 统统删掉.
    3. List<String> toDeleteChannelId = new ArrayList<>();
    4. for (Map.Entry<String, Socket> entry : sessions.entrySet()) {
    5. if (entry.getValue() == clientSocket) {
    6. // 不能在这里直接删除!!!
    7. // 这属于使用集合类的一个大忌!!! 一边遍历, 一边删除!!!
    8. // sessions.remove(entry.getKey());
    9. toDeleteChannelId.add(entry.getKey());
    10. }
    11. }
    12. for (String channelId : toDeleteChannelId) {
    13. sessions.remove(channelId);
    14. }
    15. System.out.println("[BrokerServer] 清理 session 完成! 被清理的 channelId=" + toDeleteChannelId);
    16. }

    解析请求(Process)

    这一步属于重中之重。

    大致流程如下:

    1. 把 request 中的 payload 做一个初步解析
    2. 根据 type 的值,决定具体要干啥
    3. 构造响应
    1. private Response process(Request request, Socket clientSocket) throws IOException, ClassNotFoundException, MqException {
    2. // 1. 把 request 中的 payload 做一个初步的解析.
    3. BasicArguments basicArguments = (BasicArguments) BinaryTool.fromBytes(request.getPayload());
    4. System.out.println("[Request] rid=" + basicArguments.getRid() + ", channelId=" + basicArguments.getChannelId()
    5. + ", type=" + request.getType() + ", length=" + request.getLength());
    6. // 2. 根据 type 的值, 来进一步区分接下来这次请求要干啥.
    7. boolean ok = true;
    8. if (request.getType() == 0x1) {
    9. // 创建 channel
    10. sessions.put(basicArguments.getChannelId(), clientSocket);
    11. System.out.println("[BrokerServer] 创建 channel 完成! channelId=" + basicArguments.getChannelId());
    12. } else if (request.getType() == 0x2) {
    13. // 销毁 channel
    14. sessions.remove(basicArguments.getChannelId());
    15. System.out.println("[BrokerServer] 销毁 channel 完成! channelId=" + basicArguments.getChannelId());
    16. } else if (request.getType() == 0x3) {
    17. // 创建交换机. 此时 payload 就是 ExchangeDeclareArguments 对象了.
    18. ExchangeDeclareArguments arguments = (ExchangeDeclareArguments) basicArguments;
    19. ok = virtualHost.exchangeDeclare(arguments.getExchangeName(), arguments.getExchangeType(),
    20. arguments.isDurable(), arguments.isAutoDelete(), arguments.getArguments());
    21. } else if (request.getType() == 0x4) {
    22. ExchangeDeleteArguments arguments = (ExchangeDeleteArguments) basicArguments;
    23. ok = virtualHost.exchangeDelete(arguments.getExchangeName());
    24. } else if (request.getType() == 0x5) {
    25. QueueDeclareArguments arguments = (QueueDeclareArguments) basicArguments;
    26. ok = virtualHost.queueDeclare(arguments.getQueueName(), arguments.isDurable(),
    27. arguments.isExclusive(), arguments.isAutoDelete(), arguments.getArguments());
    28. } else if (request.getType() == 0x6) {
    29. QueueDeleteArguments arguments = (QueueDeleteArguments) basicArguments;
    30. ok = virtualHost.queueDelete((arguments.getQueueName()));
    31. } else if (request.getType() == 0x7) {
    32. QueueBindArguments arguments = (QueueBindArguments) basicArguments;
    33. ok = virtualHost.queueBind(arguments.getQueueName(), arguments.getExchangeName(), arguments.getBindingKey());
    34. } else if (request.getType() == 0x8) {
    35. QueueUnbindArguments arguments = (QueueUnbindArguments) basicArguments;
    36. ok = virtualHost.queueUnbind(arguments.getQueueName(), arguments.getExchangeName());
    37. } else if (request.getType() == 0x9) {
    38. BasicPublishArguments arguments = (BasicPublishArguments) basicArguments;
    39. ok = virtualHost.basicPublish(arguments.getExchangeName(), arguments.getRoutingKey(),
    40. arguments.getBasicProperties(), arguments.getBody());
    41. } else if (request.getType() == 0xa) {
    42. BasicConsumeArguments arguments = (BasicConsumeArguments) basicArguments;
    43. ok = virtualHost.basicConsume(arguments.getConsumerTag(), arguments.getQueueName(), arguments.isAutoAck(),
    44. new Consumer() {
    45. @Override
    46. public void handleDeliver(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException {
    47. // 先知道当前这个收到的消息, 要发给哪个客户端.
    48. // 此处 consumerTag 其实是 channelId. 根据 channelId 去 sessions 中查询, 就可以得到对应的
    49. // socket 对象了, 从而可以往里面发送数据了
    50. // 1. 根据 channelId 找到 socket 对象
    51. Socket clientSocket = sessions.get(consumerTag);
    52. if (clientSocket == null || clientSocket.isClosed()) {
    53. throw new MqException("[BrokerServer] 订阅消息的客户端已经关闭!");
    54. }
    55. // 2. 构造响应数据
    56. SubScribeReturns subScribeReturns = new SubScribeReturns();
    57. subScribeReturns.setChannelId(consumerTag);
    58. subScribeReturns.setRid(""); // 由于这里只有响应, 没有请求, 不需要去对应. rid 暂时不需要.
    59. subScribeReturns.setOk(true);
    60. subScribeReturns.setConsumerTag(consumerTag);
    61. subScribeReturns.setBasicProperties(basicProperties);
    62. subScribeReturns.setBody(body);
    63. byte[] payload = BinaryTool.toBytes(subScribeReturns);
    64. Response response = new Response();
    65. // 0xc 表示服务器给消费者客户端推送的消息数据.
    66. response.setType(0xc);
    67. // response 的 payload 就是一个 SubScribeReturns
    68. response.setLength(payload.length);
    69. response.setPayload(payload);
    70. // 3. 把数据写回给客户端.
    71. // 注意! 此处的 dataOutputStream 这个对象不能 close !!!
    72. // 如果 把 dataOutputStream 关闭, 就会直接把 clientSocket 里的 outputStream 也关了.
    73. // 此时就无法继续往 socket 中写入后续数据了.
    74. DataOutputStream dataOutputStream = new DataOutputStream(clientSocket.getOutputStream());
    75. writeResponse(dataOutputStream, response);
    76. }
    77. });
    78. } else if (request.getType() == 0xb) {
    79. // 调用 basicAck 确认消息.
    80. BasicAckArguments arguments = (BasicAckArguments) basicArguments;
    81. ok = virtualHost.basicAck(arguments.getQueueName(), arguments.getMessageId());
    82. } else {
    83. // 当前的 type 是非法的.
    84. throw new MqException("[BrokerServer] 未知的 type! type=" + request.getType());
    85. }
    86. // 3. 构造响应
    87. BasicReturns basicReturns = new BasicReturns();
    88. basicReturns.setChannelId(basicArguments.getChannelId());
    89. basicReturns.setRid(basicArguments.getRid());
    90. basicReturns.setOk(ok);
    91. byte[] payload = BinaryTool.toBytes(basicReturns);
    92. Response response = new Response();
    93. response.setType(request.getType());
    94. response.setLength(payload.length);
    95. response.setPayload(payload);
    96. System.out.println("[Response] rid=" + basicReturns.getRid() + ", channelId=" + basicReturns.getChannelId()
    97. + ", type=" + response.getType() + ", length=" + response.getLength());
    98. return response;
    99. }

    这一段逻辑看起来吓人,其实就是在处理请求中传递而来的 type ,根据不同 type 的类型来调用不同的方法

    当然还有启动和关闭,这个就不用一步步分析了,大概来看看代码把:

    这里还有关于连接没有讲到,等下一章继续完善最后的连接,

    自定义协议响应代码

    BrokerServer

  • 相关阅读:
    sublime 文件编辑器使用快捷键
    Java教程:如何使用Jib插件容器化SpringBoot应用?
    如何在uniapp中实现二维码生成功能
    YGGSEA:为什么 SubDAO 是不可忽视的力量
    用vuex对token/refresh_token 进行管理以及处理token过期问题
    为什么学不会俄语?
    java-php-python-ssm员工培训管理系统计算机毕业设计
    Nuxt服务端请求及获取Cookie
    机器学习:图文详解因子分解与独立图I-Map(附例题分析+Python实验)
    设计模式-相关内容
  • 原文地址:https://blog.csdn.net/weixin_67807492/article/details/132898308