• 2024.2.27 模拟实现 RabbitMQ —— 网络通信设计(客户端)


    目录

    需求分析

    RabbitMQ 客户端设定

    ConnectionFactory(连接工厂)

    Connection(连接)

    Channel(通道)

    针对 客户端 和 服务器 单元测试


    需求分析

    RabbitMQ 客户端设定

    • 一个客户端可以有多个模块
    • 每个模块均可以和 Broker Server 之间建立 "逻辑上的连接"(channel)
    • 这几个模块的 channel 彼此之间是相互不影响的
    • 同时这几个 channel 复用了同一个 TCP 连接
    • 此处我们将仿照 RabbitMQ 客户端设定

    ConnectionFactory(连接工厂)

    • 这个类持有服务器的地址
    • 该类用于创建 Connection 对象

    具体代码编写:

    1. import lombok.Getter;
    2. import lombok.Setter;
    3. import java.io.IOException;
    4. @Getter
    5. @Setter
    6. public class ConnectionFactory {
    7. // broker server 的 ip 地址
    8. private String host;
    9. // broker server 的端口号
    10. private int port;
    11. public Connection newConnection() throws IOException {
    12. Connection connection = new Connection(host,port);
    13. return connection;
    14. }
    15. // 访问 broker server 的哪个虚拟主机
    16. // 下列几个属性暂时先不搞了
    17. // private String virtualHostName;
    18. // private String username;
    19. // private String password;
    20. }

    Connection(连接)

    • 这个类表示一个 TCP 连接,持有 Socket 对象
    • 该类用于写入请求/读取响应,管理多个 Channel 对象

    具体代码编写:

    • 编写成员变量
    1. private Socket socket = null;
    2. // 需要管理多个 channel 使用一个 hash 表把若干个 channel 组织起来
    3. private ConcurrentHashMap channelMap = new ConcurrentHashMap<>();
    4. private InputStream inputStream;
    5. private OutputStream outputStream;
    6. private DataOutputStream dataOutputStream;
    7. private DataInputStream dataInputStream;
    8. //用来处理 0xc 的回调,这里开销可能会很大,不希望把 扫描线程 阻塞住,因此使用 线程池 来处理
    9. private ExecutorService callbackPool = null;
    • 编写构造方法
    • 此处不仅需要初始化成员变量,还需创建一个扫描线程,不停的从 socket 中读取响应数据,并将读取到的响应交给 dispatchResponse 方法执行
    1. public Connection(String host, int port) throws IOException {
    2. socket = new Socket(host,port);
    3. inputStream = socket.getInputStream();
    4. outputStream = socket.getOutputStream();
    5. dataInputStream = new DataInputStream(inputStream);
    6. dataOutputStream = new DataOutputStream(outputStream);
    7. callbackPool = Executors.newFixedThreadPool(4);
    8. // 创建一个 扫描线程,由这个线程负责不停的从 socket 中取响应数据 把这个响应数据再交给对应的 channel 负责处理
    9. Thread t = new Thread(() -> {
    10. try {
    11. while (!socket.isClosed()) {
    12. Response response = readResponse();
    13. dispatchResponse(response);
    14. }
    15. }catch (SocketException e) {
    16. // 连接正常断开,此时这个异常直接忽略
    17. System.out.println("[Connection] 连接正常断开!");
    18. }catch (IOException | ClassNotFoundException | MqException e) {
    19. System.out.println("[Connection] 连接异常断开!");
    20. e.printStackTrace();
    21. }
    22. });
    23. t.start();
    24. }
    • 编写 dispatchResponse 方法
    • 使用该方法来区分,当前响应是一个针对控制请求的响应,还是服务器推送过来的消息
    • 如果是服务器推送过来的消息,type = 0xc,也就需要执行回调,通过线程池来执行
    • 如果只是一个普通的响应,就将该结果放到 channel 的 哈希表中
    • 随后 channel 的 putReturns 方法会唤醒所有阻塞等待的线程,让这些线程从 哈希表中拿与自己 rid 相等的返回结果
    1. // 使用这个方法来分别处理,当前的响应是一个针对控制请求的响应,还是服务器推送的消息
    2. private void dispatchResponse(Response response) throws IOException, ClassNotFoundException, MqException {
    3. if(response.getType() == 0xc) {
    4. // 服务器推送来的消息数据
    5. SubScribeReturns subScribeReturns = (SubScribeReturns) BinaryTool.fromBytes(response.getPayload());
    6. // 根据 channelId 找到对应的 channel 对象
    7. Channel channel = channelMap.get(subScribeReturns.getChannelId());
    8. if(channel == null) {
    9. throw new MqException("[Connection] 该消息对应的 channel 在客户端中不存在!channelId = " + channel.getChannelId());
    10. }
    11. // 执行该 channel 对象内部的回调
    12. // 此处我们直接将回调方法交给线程池来执行,而不是用扫描线程来执行
    13. callbackPool.submit(() -> {
    14. try {
    15. channel.getConsumer().handleDelivery(subScribeReturns.getConsumerTag(),subScribeReturns.getBasicProperties(),
    16. subScribeReturns.getBody());
    17. } catch (MqException | IOException e) {
    18. e.printStackTrace();
    19. }
    20. });
    21. }else {
    22. // 当前响应是针对刚才控制请求的响应
    23. BasicReturns basicReturns = (BasicReturns) BinaryTool.fromBytes(response.getPayload());
    24. // 把这个结果放到对应的 channel 的 hash 表中
    25. Channel channel = channelMap.get(basicReturns.getChannelId());
    26. if(channel == null) {
    27. throw new MqException("[Connection] 该消息对应的 channel 在客户端中不存在!channelId = " + channel.getChannelId());
    28. }
    29. channel.putReturns(basicReturns);
    30. }
    31. }
    • 编写 发送请求 与 读取响应 的方法
    1. // 发送请求
    2. public void writeRequest(Request request) throws IOException {
    3. dataOutputStream.writeInt(request.getType());
    4. dataOutputStream.writeInt(request.getLength());
    5. dataOutputStream.write(request.getPayload());
    6. dataOutputStream.flush();
    7. System.out.println("[Connection] 发送请求! type = " + request.getType() + ",length = " + request.getLength());
    8. }
    9. // 读取响应
    10. public Response readResponse() throws IOException {
    11. Response response = new Response();
    12. response.setType(dataInputStream.readInt());
    13. response.setLength(dataInputStream.readInt());
    14. byte[] payload = new byte[response.getLength()];
    15. int n = dataInputStream.read(payload);
    16. if(n != payload.length) {
    17. throw new IOException("读取的响应数据不完整!");
    18. }
    19. response.setPayload(payload);
    20. System.out.println("[Connection] 收到响应! type = " + response.getType() + ",length = " + response.getLength());
    21. return response;
    22. }
    • 编写创建 channel 的方法

    注意:

    • 我们的代码中使用了多次 UUID 
    • message 的 id,就是用 UUID 当时加了个 M- 前缀
    • 现在 channel 的 id 也是使用 UUID 此时加个 C- 前缀
    • rid 也使用 UUID 来生成,加个前缀  R-
    1. // 通过这个方法,在 Connection 中能够创建出一个 Channel
    2. public Channel createChannel() throws IOException, InterruptedException {
    3. String channelId = "C-" + UUID.randomUUID().toString();
    4. Channel channel = new Channel(channelId,this);
    5. // 把这个 channel 对象放到 Connection 管理 channel 的 哈希表 中
    6. channelMap.put(channelId,channel);
    7. // 同时也需要把 创建 channel 的这个消息也告诉服务器
    8. boolean ok = channel.createChannel();
    9. if(!ok) {
    10. // 服务器这里创建失败了!!整个这次创建 channel 操作不顺利!
    11. // 把刚才已经加入 hash 表的键值对,再删了
    12. channelMap.remove(channelId);
    13. return null;
    14. }
    15. return channel;
    16. }
    • 编写释放 Connection 相关资源的方法
    1. public void close() {
    2. // 关闭 Connection 释放上述资源
    3. try {
    4. callbackPool.shutdownNow();
    5. channelMap.clear();
    6. inputStream.close();
    7. outputStream.close();
    8. socket.close();
    9. } catch (IOException e) {
    10. e.printStackTrace();
    11. }
    12. }

    Channel(通道)

    • 这个类表示一个逻辑上的连接

    • 该类用于提供一系列的方法,去和服务器提供的核心 API 相对应

    • 客户端提供的这些方法,其方法内部就是发送了一个特定的请求

    具体代码编写:

    • 编写成员变量  与 构造方法
    1. private String channelId;
    2. // 当前这个 channel 属于哪个连接的
    3. private Connection connection;
    4. // 用来存储后续客户端收到的服务器的响应
    5. private ConcurrentHashMap basicReturnsMap = new ConcurrentHashMap<>();
    6. // 如果当前 Channel 订阅了某个队列,就需要在此处记录下对应回调是啥,当该队列的消息返回回来的时候,调用回调
    7. // 此处约定一个 Channel 中只能有一个回调
    8. private Consumer consumer = null;
    9. public Channel(String channelId,Connection connection) {
    10. this.channelId = channelId;
    11. this.connection = connection;
    12. }
    • 实现 type = 0x1,即创建 channel
    • 构造请求发给服务器,随后阻塞等待,唤醒后从 basicReturnsMap 中尝试获取响应结果
    • 其余 type (0xc 除外,因为 0xc 只有响应没有请求)类型的请求与 0x1 大差不差,对着所需参数,构造即可
    1. // 在这个方法中,和服务器进行交互,告知服务器,此处客户端创建了新的 channel 了
    2. public Boolean createChannel() throws IOException, InterruptedException {
    3. // 对于创建 Channel 来说,
    4. BasicArguments basicArguments = new BasicArguments();
    5. basicArguments.setChannelId(channelId);
    6. basicArguments.setRid(generateRid());
    7. byte[] payload = BinaryTool.toBytes(basicArguments);
    8. Request request = new Request();
    9. request.setType(0x1);
    10. request.setLength(payload.length);
    11. request.setPayload(payload);
    12. // 构造出完整请求之后,就可以发送这个请求了
    13. connection.writeRequest(request);
    14. // 等待服务器的响应
    15. BasicReturns basicReturns = waitResult(basicArguments.getRid());
    16. return basicReturns.isOk();
    17. }
    18. private String generateRid() {
    19. return "R-" + UUID.randomUUID().toString();
    20. }
    21. // 期望使用这个方法来阻塞等待服务器的响应
    22. private BasicReturns waitResult(String rid) throws InterruptedException {
    23. BasicReturns basicReturns = null;
    24. while ((basicReturns = basicReturnsMap.get(rid)) == null) {
    25. // 如果查询结果为 null,说明响应还没回来
    26. // 此时就需要阻塞等待
    27. synchronized (this) {
    28. wait();
    29. }
    30. }
    31. // 读取成功之后,还需要把这个消息从 哈希表中给删除掉
    32. basicReturnsMap.remove(rid);
    33. System.out.println("[Channel] 获取到服务器响应!rid = " + rid);
    34. return basicReturns;
    35. }
    36. public void putReturns(BasicReturns basicReturns) {
    37. basicReturnsMap.put(basicReturns.getRid(),basicReturns);
    38. synchronized (this) {
    39. // 当前也不知道有多少个线程在等待上述的这个响应
    40. // 把所有的等待的线程都唤醒
    41. notifyAll();
    42. }
    43. }
    • 特别注意 type = 0xa ,即 订阅消息

    • 其次值得注意的是 consumerTag 使用 channelId 来表示
    1. // 订阅消息
    2. public boolean basicConsume(String queueName, boolean autoAck, Consumer consumer) throws MqException, IOException, InterruptedException {
    3. // 先设置回调
    4. if(this.consumer != null) {
    5. throw new MqException("该 channel 已经设置过消费消息的回调了,不能重复设置!");
    6. }
    7. this.consumer = consumer;
    8. BasicConsumeArguments basicConsumeArguments = new BasicConsumeArguments();
    9. basicConsumeArguments.setRid(generateRid());
    10. basicConsumeArguments.setChannelId(channelId);
    11. // 此处 ConsumerTag 也使用 channelId 来表示
    12. basicConsumeArguments.setConsumerTag(channelId);
    13. basicConsumeArguments.setQueueName(queueName);
    14. basicConsumeArguments.setAutoAck(autoAck);
    15. byte[] payload = BinaryTool.toBytes(basicConsumeArguments);
    16. Request request = new Request();
    17. request.setType(0xa);
    18. request.setLength(payload.length);
    19. request.setPayload(payload);
    20. connection.writeRequest(request);
    21. BasicReturns basicReturns = waitResult(basicConsumeArguments.getRid());
    22. return basicReturns.isOk();
    23. }

    针对 客户端 和 服务器 单元测试

    • 编写测试用例代码是十分重要的!
    1. package com.example.demo;
    2. import com.example.demo.common.Consumer;
    3. import com.example.demo.common.MqException;
    4. import com.example.demo.mqclient.Channel;
    5. import com.example.demo.mqclient.Connection;
    6. import com.example.demo.mqclient.ConnectionFactory;
    7. import com.example.demo.mqserver.BrokerServer;
    8. import com.example.demo.mqserver.core.BasicProperties;
    9. import com.example.demo.mqserver.core.ExchangeType;
    10. import org.apache.tomcat.util.http.fileupload.FileUtils;
    11. import org.junit.jupiter.api.AfterEach;
    12. import org.junit.jupiter.api.Assertions;
    13. import org.junit.jupiter.api.BeforeEach;
    14. import org.junit.jupiter.api.Test;
    15. import org.springframework.boot.SpringApplication;
    16. import java.io.File;
    17. import java.io.IOException;
    18. public class MqClientTests {
    19. private BrokerServer brokerServer = null;
    20. private ConnectionFactory connectionFactory = null;
    21. private Thread t = null;
    22. @BeforeEach
    23. public void setUp() throws IOException {
    24. // 1、先启动服务器
    25. DemoApplication.context = SpringApplication.run(DemoApplication.class);
    26. brokerServer = new BrokerServer(9090);
    27. t = new Thread(() -> {
    28. // 这个 start 方法会进入一个死循环,使用一个新的线程来运行 start 即可!
    29. try {
    30. brokerServer.start();
    31. } catch (IOException e) {
    32. e.printStackTrace();
    33. }
    34. });
    35. t.start();
    36. // 2、配置 ConnectionFactory
    37. connectionFactory = new ConnectionFactory();
    38. connectionFactory.setHost("127.0.0.1");
    39. connectionFactory.setPort(9090);
    40. }
    41. @AfterEach
    42. public void tearDown() throws IOException {
    43. // 停止服务器
    44. brokerServer.stop();
    45. // t.join();
    46. DemoApplication.context.close();
    47. // 删除必要的文件
    48. File file = new File("./data");
    49. FileUtils.deleteDirectory(file);
    50. connectionFactory = null;
    51. }
    52. @Test
    53. public void testConnection() throws IOException {
    54. Connection connection = connectionFactory.newConnection();
    55. Assertions.assertNotNull(connection);
    56. }
    57. @Test
    58. public void testChannel() throws IOException, InterruptedException {
    59. Connection connection = connectionFactory.newConnection();
    60. Assertions.assertNotNull(connection);
    61. Channel channel = connection.createChannel();
    62. Assertions.assertNotNull(channel);
    63. }
    64. @Test
    65. public void testExchange() throws IOException, InterruptedException {
    66. Connection connection = connectionFactory.newConnection();
    67. Assertions.assertNotNull(connection);
    68. Channel channel = connection.createChannel();
    69. Assertions.assertNotNull(channel);
    70. boolean ok = channel.exchangeDeclare("testExchange", ExchangeType.DIRECT,true,false,null);
    71. Assertions.assertTrue(ok);
    72. ok = channel.exchangeDelete("testExchange");
    73. Assertions.assertTrue(ok);
    74. // 此处稳妥起见,把该关闭的要进行关闭
    75. channel.close();
    76. connection.close();
    77. }
    78. @Test
    79. public void testQueue() throws IOException, InterruptedException{
    80. Connection connection = connectionFactory.newConnection();
    81. Assertions.assertNotNull(connection);
    82. Channel channel = connection.createChannel();
    83. Assertions.assertNotNull(channel);
    84. boolean ok = channel.queueDeclare("testQueue",true,false,false,null);
    85. Assertions.assertTrue(ok);
    86. ok = channel.queueDelete("testQueue");
    87. Assertions.assertTrue(ok);
    88. channel.close();
    89. connection.close();
    90. }
    91. @Test
    92. public void testBinding() throws IOException, InterruptedException{
    93. Connection connection = connectionFactory.newConnection();
    94. Assertions.assertNotNull(connection);
    95. Channel channel = connection.createChannel();
    96. Assertions.assertNotNull(channel);
    97. boolean ok = channel.exchangeDeclare("testExchange", ExchangeType.DIRECT,true,false,null);
    98. Assertions.assertTrue(ok);
    99. ok = channel.queueDeclare("testQueue",true,false,false,null);
    100. Assertions.assertTrue(ok);
    101. ok = channel.queueBind("testQueue","testExchange","testBindingKey");
    102. Assertions.assertTrue(ok);
    103. ok = channel.queueUnbind("testQueue","testExchange");
    104. Assertions.assertTrue(ok);
    105. channel.close();
    106. connection.close();
    107. }
    108. @Test
    109. public void testMessage() throws IOException, InterruptedException, MqException {
    110. Connection connection = connectionFactory.newConnection();
    111. Assertions.assertNotNull(connection);
    112. Channel channel = connection.createChannel();
    113. Assertions.assertNotNull(channel);
    114. boolean ok = channel.exchangeDeclare("testExchange", ExchangeType.DIRECT,true,false,null);
    115. Assertions.assertTrue(ok);
    116. ok = channel.queueDeclare("testQueue",true,false,false,null);
    117. Assertions.assertTrue(ok);
    118. byte[] requestBody = "hello".getBytes();
    119. ok = channel.basicPublish("testExchange","testQueue",null,requestBody);
    120. Assertions.assertTrue(ok);
    121. channel.basicConsume("testQueue", true, new Consumer() {
    122. @Override
    123. public void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException {
    124. System.out.println("[消费数据] 开始!");
    125. System.out.println("consumeTag = " + consumerTag);
    126. System.out.println("basicProperties = " + basicProperties);
    127. Assertions.assertArrayEquals(requestBody,body);
    128. System.out.println("[消费数据] 结束!");
    129. }
    130. });
    131. Assertions.assertTrue(ok);
    132. Thread.sleep(500);
    133. channel.close();
    134. connection.close();
    135. }
    136. }
  • 相关阅读:
    【STM32】MCU HardFault异常处理分析流程及总结(一)
    独立双样本T-Test 前 为什么要先进行列文检验(Levene‘s Test)
    关于白盒测试,这些技巧你得游刃有余~
    视觉SLAM十四讲(高翔版本),ch1-2章节部分笔记
    nginx基础概念
    【React源码】(十七)React 算法之深度优先遍历
    智能捡乒乓球机器人
    idea创建spark项目
    Apache POI使用
    向善的力量:顺丰,如何在不确定性中寻求确定性
  • 原文地址:https://blog.csdn.net/weixin_63888301/article/details/136242153