• 2024.2.25 模拟实现 RabbitMQ —— 网络通信设计(服务器)


    目录

    引言

    约定应用层的通信协议

    自定义应用层协议

    Type

    Length

    PayLod

     实现 Broker Server 类

    属性 与 构造

    启动 Broker Server

    停止 Broker Server

    处理客户端连接

    读取请求 与 写回响应

    根据请求计算响应

    清除 channel 


    引言

    • 生产者 和 消费者 都是客户端,均通过 网络 和 Broker Server 进行通信

    注意点一:

    • 此处我们将使用 TCP 协议来作为通信的底层协议

    注意点二:

    • TCP 是有连接的(Connection)
    • 由于 创建/断开 TCP 连接的成本还挺高,需要三次握手啥的
    • 所以为了能够让 TCP 连接得到复用
    • 我们还将创建一个 Channel 类作为 Connection 内部的 逻辑上 的连接
    • 即一个 Connection 中可能有多个 Channel(一个管道,多个网线传输的效果)

    约定应用层的通信协议

    • 此处要交互的 Message 为 二进制数据
    • HTTP 为文本协议,JSON 为文本格式,不适用此处场景
    • 所以我们自定义一个应用层协议,使用二进制的方式来传输数据!

    自定义应用层协议


    Type

    • type 描述当前这个请求或响应是干啥的

    具体理解:

    • 在我们的 MQ 中,客户端(生产者 + 消费者)和 服务器(Broker Server)之间要进行的操作就是 VirtualHost 中的那些核心 API
    • 我们希望客户端通过网络能够远程调用 VirtualHost 中的核心 API
    • 此处 type 就是在描述当前这个请求/响应是在调用哪个 API
    • 取值如下:
    • 0x1 创建 channel
    • 0x2 关闭 channel
    • 0x3 创建 exchange
    • 0x4 销毁 exchange
    • 0x5 创建 queue
    • 0x6 销毁 queue
    • 0x7 创建 binding
    • 0x8 销毁 binding
    • 0x9 发送 message
    • 0xa 订阅 message
    • 0xb 返回 ack
    • 0xc 服务器给客户端推送消息(被订阅的消息)响应独有的

    Length

    • length 用来描述 payload 长度(防止粘包问题)

    PayLod

    • payload 会根据当前是请求还是响应,以及当前的 type 有不同的取值

    实例理解


    实例一:

    • 比如 type 是 0x3(创建交换机),同时当前是一个请求
    • 此时 payload 里的内容,就相当于是 exchangeDeclare 的参数序列化的结果

    具体代码实现:

    • 按照上述自定义应用层协议 创建 Request 类
    1. import lombok.Data;
    2. /*
    3. * 表示一个网络通信中的请求对象,按照自定义协议的格式来展开的
    4. * */
    5. @Data
    6. public class Request {
    7. private int type;
    8. private int length;
    9. private byte[] payload;
    10. }
    • 按照上述自定义应用层协议 创建 BasicArguments 类用于表示各方法的公共参数
    1. import lombok.Data;
    2. import java.io.Serializable;
    3. /*
    4. * 使用这个类表示方法的公共参数/辅助的字段
    5. * 后续使用每个方法又会有一些不同的参数,不同的参数再分别使用不同的子类来表示
    6. * */
    7. @Data
    8. public class BasicArguments implements Serializable {
    9. // 表示一次请求/响应 的身份标识,可以把请求和响应对上
    10. protected String rid;
    11. // 这个通信使用的 channel 的身份标识
    12. protected String channelId;
    13. }
    • 每个方法有不同的参数,此处实例 type = 0x3 ,即 创建交换机(exchangeDeclare
    • 所以我们根据 VirtualHost 中的 exchangeDeclare 方法中的参数,单独创建一个类出来
    • 该类还需 继承用于表示公共参数的 BasicArguments 类
    1. import com.example.demo.mqserver.core.ExchangeType;
    2. import lombok.Getter;
    3. import lombok.Setter;
    4. import java.io.Serializable;
    5. import java.util.Map;
    6. @Getter
    7. @Setter
    8. public class ExchangeDeclareArguments extends BasicArguments implements Serializable {
    9. private String exchangeName;
    10. private ExchangeType exchangeType;
    11. private boolean durable;
    12. private boolean autoDelete;
    13. private Map arguments;
    14. }

    注意:

    • 其他 type 类型(除 0x1、0x2 、0xa 外)也均根据 其在 VirtualHost 中对应的参数,单独创建一个类即可
    • 0x1 和 0x2 分别为 创建 channel 和 关闭 channel,二者 API 所需参数就是公共参数,使用 BasicArguments 类即可,无需单独创建类
    • type = 0xa,即 订阅消息(basicConsume),后文详细讲解

    实例二:

    • 比如 type = 0x3(创建交换机),同时当前是一个响应
    • 此时 payload 里的内容,就是 exchangeDeclare 的返回结果的序列化内容

    具体代码实现:

    • 按照上述自定义应用层协议 创建 Response 类
    1. import lombok.Data;
    2. /*
    3. * 这个对象表示一个响应,也是根据自定义应用层协议来的
    4. * */
    5. @Data
    6. public class Response {
    7. private int type;
    8. private int length;
    9. private byte[] payload;
    10. }
    • 按照上述自定义应用层协议 创建 BasicReturns 类用于表示远程调用方法的返回值
    1. import lombok.Data;
    2. import java.io.Serializable;
    3. /*
    4. * 这个类表示各个远程调用的方法的返回值和公共信息
    5. * */
    6. @Data
    7. public class BasicReturns implements Serializable {
    8. // 用来标识唯一的请求和响应
    9. protected String rid;
    10. // 用来标识一个 channelId
    11. protected String channelId;
    12. // 表示当前这个远程调用方法的返回值
    13. protected boolean ok;
    14. }

    注意:

    • 其他 type 类型(除 0xc 外)均使用 BasicReturns 类中的成员变量 作为返回参数
    • type = 0xc,该 type 类型为响应独占,表示 服务器给客户端推送消息(被订阅的消息),后文详解讲解

    特例一:

    • 比如 type = 0xa(订阅消息),同时当前是一个请求
    • 这个核心 API 比较特殊,其参数中包含有 回调函数

    具体代码编写:

    • 我们根据 VirtualHost 中的 BasicConsume 方法中的参数,单独创建一个类出来
    • 并且该类也要 继承用于表示公共参数的 BasicArguments 类
    • 唯一不同的是,其中用于表示 回调函数的参数 consumer 我们不写入该类中
    • 也就代表着在客户端发送请求时,不再携带 consumer 参数
    • 因为在 broker server 这边,我们规定 BasicConsume 的回调方法统一为 将收到的消息返回给消费者
    • 消费者仅需收到消息后,再在客户端自己这边执行一个用户自定义的回调就行了!
    1. import lombok.Getter;
    2. import lombok.Setter;
    3. import java.io.Serializable;
    4. @Getter
    5. @Setter
    6. public class BasicConsumeArguments extends BasicArguments implements Serializable {
    7. private String consumerTag;
    8. private String queueName;
    9. private boolean autoAck;
    10. // 这个类对应的 basicConsume 方法中,还有一个参数,是回调函数(如何来有效处理消息)
    11. // 这个回调函数,是不能通过网络传输的
    12. // 站在 broker server 这边,针对消息的处理问题,其实是统一的(把消息返回给客户端)
    13. // 客户端这边收到消息之后,再在客户端自己这边执行一个用户自定义的回调就行了
    14. // 此时客户端就不需要把自身的回调告诉服务器了!
    15. // 这个类就不需要 consumer 成员了
    16. }

    特列二:

    • type = 0xc,即 服务器给客户端推送消息(被订阅的消息),该类型一定是一个响应!

    • 如上图所示的蓝色部分
    • 此处我们定义一个 SubScribeReturns 类用于表示在消费者订阅队列之后,服务器给消费推送消息的响应参数
    • 此处仍需继承一下 代表响应公共参数的 BasicReturns 类
    1. import com.example.demo.mqserver.core.BasicProperties;
    2. import lombok.Getter;
    3. import lombok.Setter;
    4. import java.io.Serializable;
    5. @Getter
    6. @Setter
    7. public class SubScribeReturns extends BasicReturns implements Serializable {
    8. private String consumerTag;
    9. private BasicProperties basicProperties;
    10. private byte[] body;
    11. }

    注意:

    • SubScribeReturns 类虽然继承了 BasicReturns 类
    • 但是在返回时,无需填写 BasicReturns 类中的成员变量 rid
    • 因为该响应无相对应的请求,故该响应无 rid,即将 rid 设为空字符串即可

    小结:

    • 上述内容属于服务器程序的关键环节,自定义应用层协议

     实现 Broker Server 类

    属性 与 构造

    1. /*
    2. * 这个 BrokerServer 就是咱们 消息队列 本体服务器
    3. * 本质上就是一个 TCP 的服务器
    4. * */
    5. public class BrokerServer {
    6. private ServerSocket serverSocket = null;
    7. // 当前考虑一个 BrokerServer 上只有一个 虚拟主机
    8. private VirtualHost virtualHost = new VirtualHost("default");
    9. // 使用这个 哈希表 表示当前的所有会话(也就是说有哪些客户端正在和咱们的服务器进行通信)
    10. // 此处的 key 是 channelId,value 为对应的 Socket 对象
    11. private ConcurrentHashMap sessions = new ConcurrentHashMap();
    12. // 引入线程池,来处理多个客户端的请求
    13. private ExecutorService executorService = null;
    14. // 引入一个 Boolean 变量控制服务器是否继续运行
    15. private volatile boolean runnable = true;
    16. public BrokerServer(int port) throws IOException {
    17. serverSocket = new ServerSocket(port);
    18. }
    19. }

    启动 Broker Server

    1. public void start() throws IOException {
    2. System.out.println("[BrokerServer] 启动!");
    3. executorService = Executors.newCachedThreadPool();
    4. try {
    5. while (runnable) {
    6. Socket clientSocket = serverSocket.accept();
    7. // 把处理连接的逻辑丢给这个线程池
    8. executorService.submit(() ->{
    9. processConnection(clientSocket);
    10. });
    11. }
    12. }catch (SocketException e){
    13. System.out.println("[BrokerServer] 服务器停止运行!");
    14. }
    15. }

    停止 Broker Server

    1. // 一般来说停止服务器,就是直接 kill 掉对应进程就行了
    2. // 此处还是搞一个单独的停止方法,主要是用于后续的单元测试
    3. public void stop() throws IOException {
    4. runnable = false;
    5. // 把线程池中的任务都放弃了,让线程都销毁
    6. executorService.shutdownNow();
    7. serverSocket.close();
    8. }

    处理客户端连接

    1. // 通过这个方法来处理一个客户端的连接
    2. // 在这一个连接中,可能会涉及到多个请求和响应
    3. private void processConnection(Socket clientSocket){
    4. try (InputStream inputStream = clientSocket.getInputStream();
    5. OutputStream outputStream = clientSocket.getOutputStream()){
    6. // 这里需要按照特定格式来读取并解析,此时就需要用到 DataInputStream 和 DataOutputStream
    7. try (DataInputStream dataInputStream = new DataInputStream(inputStream);
    8. DataOutputStream dataOutputStream = new DataOutputStream(outputStream)){
    9. while (true) {
    10. // 1、读取请求并解析
    11. Request request = readRequest(dataInputStream);
    12. // 2、根据请求计算响应
    13. Response response = process(request,clientSocket);
    14. // 3、把响应写回给客户端
    15. writeResponse(dataOutputStream,response);
    16. }
    17. }
    18. } catch (EOFException | SocketException e) {
    19. // 对于这个代码,DataInputStream 如果读到 EOF,就会抛出一个 EOFException 异常
    20. // 需要借助这个异常来结束循环
    21. System.out.println("[BrokerServer] connection 关闭!客户端的地址:" + clientSocket.getInetAddress().toString()
    22. + ":" + clientSocket.getPort());
    23. } catch (IOException | ClassNotFoundException | MqException e) {
    24. System.out.println("[BrokerServer] connection 出现异常!");
    25. e.printStackTrace();
    26. }finally {
    27. try {
    28. // 当连接处理完了,就需要记得关闭 socket
    29. clientSocket.close();
    30. // 一个 TCP 连接中,可能包含多个 channel 需要把当前这个 socket 对应的所有 channel 也顺便清理掉
    31. clearClosedSession(clientSocket);
    32. }catch (IOException e) {
    33. e.printStackTrace();
    34. }
    35. }
    36. }

    读取请求 与 写回响应

    1. private Request readRequest(DataInputStream dataInputStream) throws IOException {
    2. Request request = new Request();
    3. request.setType(dataInputStream.readInt());
    4. request.setLength(dataInputStream.readInt());
    5. byte[] payload = new byte[request.getLength()];
    6. int n = dataInputStream.read(payload);
    7. if(n != request.getLength()) {
    8. throw new IOException("读取请求格式出错!");
    9. }
    10. request.setPayload(payload);
    11. return request;
    12. }
    13. private void writeResponse(DataOutputStream dataOutputStream, Response response) throws IOException {
    14. dataOutputStream.writeInt(response.getType());
    15. dataOutputStream.writeInt(response.getLength());
    16. dataOutputStream.write(response.getPayload());
    17. // 这个刷新缓冲区也是重要的操作,保证当前写的这些数据能够快速进入到网卡里,而不至于在内存中呆着
    18. dataOutputStream.flush();
    19. }

    根据请求计算响应

    • 根据不同的 type 类型,来远程调用 VirtualHost 中不同的核心 API

    具体代码编写:

    1. private Response process(Request request, Socket clientSocket) throws IOException, ClassNotFoundException, MqException {
    2. // 1、把 request 中的 payload 做一个初步的解析
    3. BasicArguments basicArguments = (BasicArguments) BinaryTool.fromBytes(request.getPayload());
    4. System.out.println("[Request] rid = " + basicArguments.getRid() + ", channelId = " + basicArguments.getChannelId()
    5. + ", type = " + request.getType() + ", length = " + request.getLength());
    6. // 2、根据 type 的值,来进一步区分接下来这次请求要干啥
    7. boolean ok = true;
    8. if(request.getType() == 0x1) {
    9. // 创建 channel
    10. sessions.put(basicArguments.getChannelId(), clientSocket);
    11. System.out.println("[BrokerServer] 创建 channel 完成! channelId = " + basicArguments.getChannelId());
    12. }else if(request.getType() == 0x2) {
    13. // 销毁 channel
    14. sessions.remove(basicArguments.getChannelId());
    15. System.out.println("[BrokerServer] 销毁 channel 完成! channelId = " + basicArguments.getChannelId());
    16. } else if(request.getType() == 0x3) {
    17. // 创建交换机,此时 payload 就是 ExchangeDeclareArguments 对象了
    18. ExchangeDeclareArguments arguments = (ExchangeDeclareArguments) basicArguments;
    19. ok = virtualHost.exchangeDeclare(arguments.getExchangeName(),arguments.getExchangeType(),
    20. arguments.isDurable(),arguments.isAutoDelete(),arguments.getArguments());
    21. } else if(request.getType() == 0x4) {
    22. // 删除交换机,此时 payload 就是 ExchangeDeleteArguments 对象了
    23. ExchangeDeleteArguments arguments = (ExchangeDeleteArguments) basicArguments;
    24. ok = virtualHost.exchangeDelete(arguments.getExchangeName());
    25. } else if(request.getType() == 0x5) {
    26. // 创建队列,此时 payload 就是 QueueDeclareArguments 对象了
    27. QueueDeclareArguments arguments = (QueueDeclareArguments) basicArguments;
    28. ok = virtualHost.queueDeclare(arguments.getQueueName(),arguments.isDurable(),
    29. arguments.isExclusive(),arguments.isAutoDelete(),arguments.getArguments());
    30. } else if(request.getType() == 0x6){
    31. // 销毁队列,此时 payload 就是 QueueDeleteArguments 对象了
    32. QueueDeleteArguments arguments = (QueueDeleteArguments) basicArguments;
    33. ok = virtualHost.queueDelete(arguments.getQueueName());
    34. } else if(request.getType() == 0x7){
    35. // 创建绑定,此时 payload 就是 QueueBindArguments 对象了
    36. QueueBindArguments arguments = (QueueBindArguments) basicArguments;
    37. ok = virtualHost.queueBind(arguments.getQueueName(), arguments.getExchangeName(), arguments.getBindingKey());
    38. } else if(request.getType() == 0x8){
    39. // 删除绑定,此时 payload 就是 QueueUnbindArguments 对象了
    40. QueueUnbindArguments arguments = (QueueUnbindArguments) basicArguments;
    41. ok = virtualHost.queueUnbind(arguments.getQueueName(),arguments.getExchangeName());
    42. } else if(request.getType() == 0x9){
    43. // 发送消息,此时 payload 就是 BasicPublishArguments 对象了
    44. BasicPublishArguments arguments = (BasicPublishArguments) basicArguments;
    45. ok = virtualHost.basicPublish(arguments.getExchangeName(),arguments.getRoutingKey(),
    46. arguments.getBasicProperties(),arguments.getBody());
    47. } else if(request.getType() == 0xa){
    48. // 订阅消息,此时 payload 就是 BasicConsumeArguments 对象了
    49. BasicConsumeArguments arguments = (BasicConsumeArguments) basicArguments;
    50. ok = virtualHost.basicConsume(arguments.getConsumerTag(), arguments.getQueueName(), arguments.isAutoAck(), new Consumer() {
    51. // 这个回调函数要做的工作,就是把服务器收到的消息可以直接推送回对应的消费者客户端
    52. @Override
    53. public void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException {
    54. // 先知道当前这个收到的消息,要发给哪个客户端,此处 consumerTag 其实是 channelId
    55. // 根据 channelId 去 sessions 中查询,就可以得到对应的 socket 对象了,从而可以往里面发送数据了
    56. // 1、根据 channelId 找到 socket 对象
    57. Socket clientSocket = sessions.get(consumerTag);
    58. if(clientSocket == null || clientSocket.isClosed()) {
    59. throw new MqException("[BrokerServer] 订阅消息的客户端已经关闭!");
    60. }
    61. // 2、构造响应数据
    62. SubScribeReturns subScribeReturns = new SubScribeReturns();
    63. subScribeReturns.setChannelId(consumerTag);
    64. subScribeReturns.setRid(""); // 由于这里只有响应,没有请求,不需要去对应 rid 暂时不需要
    65. subScribeReturns.setOk(true);
    66. subScribeReturns.setConsumerTag(consumerTag);
    67. subScribeReturns.setBody(body);
    68. subScribeReturns.setBasicProperties(basicProperties);
    69. byte[] paylaod = BinaryTool.toBytes(subScribeReturns);
    70. Response response = new Response();
    71. // 0xc 表示服务器给消费者客户端推送的消息数据
    72. response.setType(0xc);
    73. // response 的 payload 就是一个 SubScribeReturns
    74. response.setLength(paylaod.length);
    75. response.setPayload(paylaod);
    76. // 3、把数据写回给客户端
    77. // 注意!此处的 dataOutputStream 这个对象不能 close!
    78. // 如果把 dataOutputStream 关闭,就会直接把 clientSocket 里的 outputStream 也给关了
    79. // 此时就无法继续往 socket 中写入后续数据了!
    80. DataOutputStream dataOutputStream = new DataOutputStream(clientSocket.getOutputStream());
    81. writeResponse(dataOutputStream,response);
    82. }
    83. });
    84. } else if(request.getType() == 0xb){
    85. // 确认消息,此时 payload 就是 BasicAckArguments 对象了
    86. BasicAckArguments arguments = (BasicAckArguments) basicArguments;
    87. ok = virtualHost.basicAck(arguments.getQueueName(), arguments.getMessageId());
    88. }else {
    89. // 当前的 type 是非法的
    90. throw new MqException("[BrokerServer] 未知的 type!type = " + request.getType());
    91. }
    92. // 3、构造响应
    93. BasicReturns basicReturns = new BasicReturns();
    94. basicReturns.setChannelId(basicArguments.getChannelId());
    95. basicReturns.setRid(basicArguments.getRid());
    96. basicReturns.setOk(ok);
    97. byte[] payload = BinaryTool.toBytes(basicReturns);
    98. Response response = new Response();
    99. response.setType(request.getType());
    100. response.setLength(payload.length);
    101. response.setPayload(payload);
    102. System.out.println("[Response] rid = " + basicReturns.getRid() + ",channelId = " + basicReturns.getChannelId()
    103. + ", type = " + response.getType() + ",length = " + response.getLength());
    104. return response;
    105. }

    注意点一:

    • 当前请求中的 payload 里面放的内容 是根据 type 的类型来走的
    • 比如 type 是 0x3,payload 就是 ExchangeDeclareArguments
    • 比如 type 是 0x4,payload 就是 ExchangeDeleteArguments
    • ...

    注意点二:

    • 此处设定的不同的方法的参数,虽然都有不同的类
    • 但是它们均继承自同一个 BasicArguments 类
    • 因此先将 payload 转成 BasicArguments

    清除 channel 

    • 清理 sessions 这个 哈希表 中的 session 信息

    具体代码编写:

    1. private void clearClosedSession(Socket clientSocket) {
    2. // 这里要做的事情,主要就是遍历上述 session hash 表,把该关闭的 socket 对应的键值对,统统删掉
    3. List toDeleteChannelId = new ArrayList<>();
    4. for(Map.Entry entry : sessions.entrySet()) {
    5. if(entry.getValue() == clientSocket) {
    6. // 不能在这里直接删除
    7. // 这属于集合类的一个大忌!!一边遍历,一边删除!
    8. // session.remove(entry.getKey());
    9. toDeleteChannelId.add(entry.getKey());
    10. }
    11. }
    12. for (String channelId : toDeleteChannelId) {
    13. sessions.remove(channelId);
    14. }
    15. System.out.println("[BrokerServer] 清理 session 完成!被清理的 channelId = " + toDeleteChannelId);
    16. }
  • 相关阅读:
    [go学习笔记.第十章.面向对象编程] 8.面向对象的三大特性-封装
    【博客456】OVN (Open Virtual Network)实现三层网络平面连通性控制
    g++中的常用编译优化参数
    CANopen协议 学习笔记
    iOS 开发中上传 IPA 文件的方法(无需 Mac 电脑)
    图像处理: ImageKit.NET 3.0.10704 Crack
    用Python如何进行Web开发
    docker save与docker export的区别
    Windows 上下载并提取 Wikipedia
    C++基础算法⑦——信奥一本通递归算法(放苹果、求最大公约数问题、2的幂次方表示、分数求和、因子分解、判断元素是否存在)
  • 原文地址:https://blog.csdn.net/weixin_63888301/article/details/136300473