• 2024.2.29 模拟实现 RabbitMQ —— 项目展示


    目录

    项目介绍

    核心功能

    核心技术

    演示直接交换机

    演示扇出交换机

    演示主题交换机


    项目介绍

    • 此处我们模拟 RabbitMQ 实现了一个消息队列服务器

    核心功能

    • 提供了 虚拟主机交换机队列绑定消息 概念的管理
    • 九大核心 API 创建队列销毁队列创建交换机销毁交换机创建绑定解除绑定发布消息订阅消息确认消息
    • 实现了三种典型的消息转换方式 直接交换机(Direct)扇出交换机(Fanout)主题交换机(Topic)
    • 交换机队列绑定 使用 SQLite 数据库持久化,消息 使用文件持久化
    • 基于 TCP + 自定义应用层协议 实现生产者/消费者和 Broker Server 之间的交互工作

    核心技术

    • Spring Boot / MyBatis / Lombok
    • SQLite
    • TCP

    • 关于该项目的需求分析,可点击下方链接跳转

    模拟实现 RabbitMQ —— 需求分析


    • 关于该项目的核心类,可点击下方链接跳转

    模拟实现 RabbitMQ —— 实现核心类


    • 关于该项目的数据库操作,可点击下方链接跳转

    模拟实现 RabbitMQ —— 数据库操作


    • 关于该项目的消息持久化,可点击下方链接跳转

    模拟实现 RabbitMQ —— 消息持久化


    • 关于该项目的内存数据管理,可点击下方链接跳转

    模拟实现 RabbitMQ —— 内存数据管理


    • 关于该项目的虚拟机设计,可点击下方链接跳转

    模拟实现 RabbitMQ —— 虚拟主机设计


    • 关于该项目的交换机转发规则,可点击下方链接跳转

    模拟实现 RabbitMQ —— 实现转发规则


    • 关于该项目的消费逻辑,可点击下方链接跳转

    模拟实现 RabbitMQ —— 实现消费消息逻辑


    • 关于该项目网络通信设计,可点击下方链接跳转

    模拟实现 RabbitMQ —— 网络通信设计(服务器)

    模拟实现 RabbitMQ —— 网络通信设计(客户端)

    演示直接交换机

    • 简单写一个 demo 模拟 跨主机的生产者消费者模型
    • 此处为了方便,就在本机演示

    • 此处我们创建的交换机类型为 直接交换机

    1、在 Spring Boot 项目的启动类中创建 Broker Server,绑定端口并启动!

    1. @SpringBootApplication
    2. public class DemoApplication {
    3. public static ConfigurableApplicationContext context;
    4. public static void main(String[] args) throws IOException {
    5. context = SpringApplication.run(DemoApplication.class, args);
    6. BrokerServer brokerServer = new BrokerServer(9090);
    7. brokerServer.start();
    8. }
    9. }

    2、编写生产者代码

    1. /*
    2. * 这个类用来表示一个生产着
    3. * 通常这是一个单独的服务器程序
    4. * */
    5. public class DemoProducer {
    6. public static void main(String[] args) throws IOException, InterruptedException {
    7. System.out.println("启动生产者!");
    8. ConnectionFactory connectionFactory = new ConnectionFactory();
    9. connectionFactory.setHost("127.0.0.1");
    10. connectionFactory.setPort(9090);
    11. Connection connection = connectionFactory.newConnection();
    12. Channel channel = connection.createChannel();
    13. // 创建交换机和队列
    14. channel.exchangeDeclare("testExchange", ExchangeType.DIRECT,true,false,null);
    15. channel.queueDeclare("testQueue",true,false,false,null);
    16. // 创建一个消息并发送
    17. byte[] body = "hello".getBytes();
    18. boolean ok = channel.basicPublish("testExchange","testQueue",null,body);
    19. System.out.println("消息投递完成! ok = " + ok);
    20. Thread.sleep(500);
    21. channel.close();
    22. connection.close();
    23. }
    24. }

    3、编写消费者代码

    1. /*
    2. * 这个类表示一个消费者
    3. * 通常这个类也应该是在一个独立的服务器中被执行
    4. * */
    5. public class DemoConsumer {
    6. public static void main(String[] args) throws IOException, InterruptedException, MqException {
    7. System.out.println("启动消费者!");
    8. ConnectionFactory connectionFactory = new ConnectionFactory();
    9. connectionFactory.setHost("127.0.0.1");
    10. connectionFactory.setPort(9090);
    11. Connection connection = connectionFactory.newConnection();
    12. Channel channel = connection.createChannel();
    13. channel.exchangeDeclare("testExchange",ExchangeType.DIRECT,true,false,null);
    14. channel.queueDeclare("testQueue",true,false,false,null);
    15. channel.basicConsume("testQueue", true, new Consumer() {
    16. @Override
    17. public void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException {
    18. System.out.println("[消费数据] 开始!");
    19. System.out.println("consumerTag = " + consumerTag);
    20. System.out.println("basicProperties = " + basicProperties);
    21. String bodyString = new String(body,0,body.length);
    22. System.out.println("body = " + bodyString);
    23. System.out.println("[消费数据] 结束!");
    24. }
    25. });
    26. // 由于消费者也不知道生产者要生产多少消息,就在这里通过这个循环模拟一直等待消费
    27. while (true) {
    28. Thread.sleep(500);
    29. }
    30. }
    31. }

    4、启动 Spring Boot 项目(启动 Broker Server)


    5、运行消费者代码


    6、运行生产者代码


    7、继续观察消费者的控制台

    演示扇出交换机

    • 此处我们创建的交换机类型为 扇出交换机

     1、编写生产者代码

    1. /*
    2. * 这个类用来表示一个生产着
    3. * 通常这是一个单独的服务器程序
    4. * */
    5. public class DemoProducer {
    6. public static void main(String[] args) throws IOException, InterruptedException {
    7. System.out.println("启动生产者!");
    8. ConnectionFactory connectionFactory = new ConnectionFactory();
    9. connectionFactory.setHost("127.0.0.1");
    10. connectionFactory.setPort(9090);
    11. Connection connection = connectionFactory.newConnection();
    12. Channel channel = connection.createChannel();
    13. // 创建交换机
    14. channel.exchangeDeclare("testExchange", ExchangeType.FANOUT,true,false,null);
    15. // 创建一个消息并发送
    16. byte[] body = "hello".getBytes();
    17. boolean ok = channel.basicPublish("testExchange","",null,body);
    18. System.out.println("消息投递完成! ok = " + ok);
    19. Thread.sleep(500);
    20. channel.close();
    21. connection.close();
    22. }
    23. }

    3、编写消费者A 代码

    1. /*
    2. * 这个类表示一个消费者A
    3. * 通常这个类也应该是在一个独立的服务器中被执行
    4. * */
    5. public class DemoConsumerA {
    6. public static void main(String[] args) throws IOException, InterruptedException, MqException {
    7. System.out.println("启动消费者!");
    8. ConnectionFactory connectionFactory = new ConnectionFactory();
    9. connectionFactory.setHost("127.0.0.1");
    10. connectionFactory.setPort(9090);
    11. Connection connection = connectionFactory.newConnection();
    12. Channel channel = connection.createChannel();
    13. // 创建交换机
    14. channel.exchangeDeclare("testExchange",ExchangeType.FANOUT,true,false,null);
    15. // 创建队列
    16. channel.queueDeclare("testQueue1",true,false,false,null);
    17. // 设置绑定
    18. channel.queueBind("testQueue1","testExchange","");
    19. // 订阅消息
    20. channel.basicConsume("testQueue1", true, new Consumer() {
    21. @Override
    22. public void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException {
    23. System.out.println("[testQueue1 消费数据] 开始!");
    24. System.out.println("consumerTag = " + consumerTag);
    25. System.out.println("basicProperties = " + basicProperties);
    26. String bodyString = new String(body,0,body.length);
    27. System.out.println("body = " + bodyString);
    28. System.out.println("[testQueue1 消费数据] 结束!");
    29. }
    30. });
    31. // 由于消费者也不知道生产者要生产多少消息,就在这里通过这个循环模拟一直等待消费
    32. while (true) {
    33. Thread.sleep(500);
    34. }
    35. }
    36. }

    4、编写消费者B 代码

    1. /*
    2. * 这个类表示一个消费者B
    3. * 通常这个类也应该是在一个独立的服务器中被执行
    4. * */
    5. public class DemoConsumerB {
    6. public static void main(String[] args) throws IOException, InterruptedException, MqException {
    7. System.out.println("启动消费者!");
    8. ConnectionFactory connectionFactory = new ConnectionFactory();
    9. connectionFactory.setHost("127.0.0.1");
    10. connectionFactory.setPort(9090);
    11. Connection connection = connectionFactory.newConnection();
    12. Channel channel = connection.createChannel();
    13. // 创建交换机
    14. channel.exchangeDeclare("testExchange", ExchangeType.FANOUT,true,false,null);
    15. // 创建队列
    16. channel.queueDeclare("testQueue2",true,false,false,null);
    17. // 设置绑定
    18. channel.queueBind("testQueue2","testExchange","");
    19. // 订阅消息
    20. channel.basicConsume("testQueue2", true, new Consumer() {
    21. @Override
    22. public void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException {
    23. System.out.println("[testQueue1 消费数据] 开始!");
    24. System.out.println("consumerTag = " + consumerTag);
    25. System.out.println("basicProperties = " + basicProperties);
    26. String bodyString = new String(body,0,body.length);
    27. System.out.println("body = " + bodyString);
    28. System.out.println("[testQueue1 消费数据] 结束!");
    29. }
    30. });
    31. // 由于消费者也不知道生产者要生产多少消息,就在这里通过这个循环模拟一直等待消费
    32. while (true) {
    33. Thread.sleep(500);
    34. }
    35. }
    36. }

    5、启动 Spring Boot 项目(启动 Broker Server)


    6、运行消费者A 代码


    7、运行消费者B 代码


    8、运行生产者代码


    9、继续观察消费者A 的控制台


    10、继续观察消费者B 的控制台

    演示主题交换机

    • 此处我们创建的交换机为 主题交换机

     1、编写生产者代码

    1. /*
    2. * 这个类用来表示一个生产着
    3. * 通常这是一个单独的服务器程序
    4. * */
    5. public class DemoProducer {
    6. public static void main(String[] args) throws IOException, InterruptedException {
    7. System.out.println("启动生产者!");
    8. ConnectionFactory connectionFactory = new ConnectionFactory();
    9. connectionFactory.setHost("127.0.0.1");
    10. connectionFactory.setPort(9090);
    11. Connection connection = connectionFactory.newConnection();
    12. Channel channel = connection.createChannel();
    13. // 创建交换机和队列
    14. channel.exchangeDeclare("testExchange", ExchangeType.TOPIC,true,false,null);
    15. channel.queueDeclare("testQueue",true,false,false,null);
    16. // 创建消息A 并发送
    17. byte[] body = "hello".getBytes();
    18. boolean ok = channel.basicPublish("testExchange","ccc.aaa.bbb",null,body);
    19. System.out.println("消息投递完成! ok = " + ok);
    20. // 创建消息B 并发送
    21. body = "hi".getBytes();
    22. ok = channel.basicPublish("testExchange","aaa.bbb",null,body);
    23. System.out.println("消息投递完成! ok = " + ok);
    24. Thread.sleep(500);
    25. channel.close();
    26. connection.close();
    27. }
    28. }

    3、编写消费者代码

    1. /*
    2. * 这个类表示一个消费者
    3. * 通常这个类也应该是在一个独立的服务器中被执行
    4. * */
    5. public class DemoConsumer {
    6. public static void main(String[] args) throws IOException, InterruptedException, MqException {
    7. System.out.println("启动消费者!");
    8. ConnectionFactory connectionFactory = new ConnectionFactory();
    9. connectionFactory.setHost("127.0.0.1");
    10. connectionFactory.setPort(9090);
    11. Connection connection = connectionFactory.newConnection();
    12. Channel channel = connection.createChannel();
    13. // 创建交换机
    14. channel.exchangeDeclare("testExchange", ExchangeType.TOPIC,true,false,null);
    15. // 创建队列
    16. channel.queueDeclare("testQueue",true,false,false,null);
    17. // 设置绑定
    18. channel.queueBind("testQueue","testExchange","*.aaa.bbb");
    19. // 订阅消息
    20. channel.basicConsume("testQueue", true, new Consumer() {
    21. @Override
    22. public void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException {
    23. System.out.println("[消费数据] 开始!");
    24. System.out.println("consumerTag = " + consumerTag);
    25. System.out.println("basicProperties = " + basicProperties);
    26. String bodyString = new String(body,0,body.length);
    27. System.out.println("body = " + bodyString);
    28. System.out.println("[消费数据] 结束!");
    29. }
    30. });
    31. // 由于消费者也不知道生产者要生产多少消息,就在这里通过这个循环模拟一直等待消费
    32. while (true) {
    33. Thread.sleep(500);
    34. }
    35. }
    36. }

    4、启动 Spring Boot 项目(启动 Broker Server)


    5、运行消费者代码


    6、运行生产者代码


    7、继续观察消费者的控制台

  • 相关阅读:
    宅在家里也能干的副业,每天挣60—300元,人人可做
    dubbo源码解析
    java给图片添加自定义文字信息
    Vue3使用Vite创建项目
    跨线程访问控件的操作
    uni-app使用iconfont字体图标
    嵌入式分享合集44
    一文带你了解RabbitMQ
    部署Node.js环境
    常见排序算法之冒泡排序
  • 原文地址:https://blog.csdn.net/weixin_63888301/article/details/136275127