目录
- 此处我们模拟 RabbitMQ 实现了一个消息队列服务器
核心功能
- 提供了 虚拟主机、交换机、队列、绑定、消息 概念的管理
- 九大核心 API 创建队列、销毁队列、创建交换机、销毁交换机、创建绑定、解除绑定、发布消息、订阅消息、确认消息
- 实现了三种典型的消息转换方式 直接交换机(Direct)、扇出交换机(Fanout)、主题交换机(Topic)
- 交换机、队列、绑定 使用 SQLite 数据库持久化,消息 使用文件持久化
- 基于 TCP + 自定义应用层协议 实现生产者/消费者和 Broker Server 之间的交互工作
核心技术
- Spring Boot / MyBatis / Lombok
- SQLite
- TCP
- 关于该项目的需求分析,可点击下方链接跳转
- 关于该项目的核心类,可点击下方链接跳转
- 关于该项目的数据库操作,可点击下方链接跳转
- 关于该项目的消息持久化,可点击下方链接跳转
- 关于该项目的内存数据管理,可点击下方链接跳转
- 关于该项目的虚拟机设计,可点击下方链接跳转
- 关于该项目的交换机转发规则,可点击下方链接跳转
- 关于该项目的消费逻辑,可点击下方链接跳转
- 关于该项目网络通信设计,可点击下方链接跳转
- 简单写一个 demo 模拟 跨主机的生产者消费者模型
- 此处为了方便,就在本机演示
- 此处我们创建的交换机类型为 直接交换机
1、在 Spring Boot 项目的启动类中创建 Broker Server,绑定端口并启动!
@SpringBootApplication public class DemoApplication { public static ConfigurableApplicationContext context; public static void main(String[] args) throws IOException { context = SpringApplication.run(DemoApplication.class, args); BrokerServer brokerServer = new BrokerServer(9090); brokerServer.start(); } }
2、编写生产者代码
/* * 这个类用来表示一个生产着 * 通常这是一个单独的服务器程序 * */ public class DemoProducer { public static void main(String[] args) throws IOException, InterruptedException { System.out.println("启动生产者!"); ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("127.0.0.1"); connectionFactory.setPort(9090); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); // 创建交换机和队列 channel.exchangeDeclare("testExchange", ExchangeType.DIRECT,true,false,null); channel.queueDeclare("testQueue",true,false,false,null); // 创建一个消息并发送 byte[] body = "hello".getBytes(); boolean ok = channel.basicPublish("testExchange","testQueue",null,body); System.out.println("消息投递完成! ok = " + ok); Thread.sleep(500); channel.close(); connection.close(); } }
3、编写消费者代码
/* * 这个类表示一个消费者 * 通常这个类也应该是在一个独立的服务器中被执行 * */ public class DemoConsumer { public static void main(String[] args) throws IOException, InterruptedException, MqException { System.out.println("启动消费者!"); ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("127.0.0.1"); connectionFactory.setPort(9090); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare("testExchange",ExchangeType.DIRECT,true,false,null); channel.queueDeclare("testQueue",true,false,false,null); channel.basicConsume("testQueue", true, new Consumer() { @Override public void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException { System.out.println("[消费数据] 开始!"); System.out.println("consumerTag = " + consumerTag); System.out.println("basicProperties = " + basicProperties); String bodyString = new String(body,0,body.length); System.out.println("body = " + bodyString); System.out.println("[消费数据] 结束!"); } }); // 由于消费者也不知道生产者要生产多少消息,就在这里通过这个循环模拟一直等待消费 while (true) { Thread.sleep(500); } } }
4、启动 Spring Boot 项目(启动 Broker Server)
5、运行消费者代码
6、运行生产者代码
7、继续观察消费者的控制台
- 此处我们创建的交换机类型为 扇出交换机
1、编写生产者代码
/* * 这个类用来表示一个生产着 * 通常这是一个单独的服务器程序 * */ public class DemoProducer { public static void main(String[] args) throws IOException, InterruptedException { System.out.println("启动生产者!"); ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("127.0.0.1"); connectionFactory.setPort(9090); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); // 创建交换机 channel.exchangeDeclare("testExchange", ExchangeType.FANOUT,true,false,null); // 创建一个消息并发送 byte[] body = "hello".getBytes(); boolean ok = channel.basicPublish("testExchange","",null,body); System.out.println("消息投递完成! ok = " + ok); Thread.sleep(500); channel.close(); connection.close(); } }
3、编写消费者A 代码
/* * 这个类表示一个消费者A * 通常这个类也应该是在一个独立的服务器中被执行 * */ public class DemoConsumerA { public static void main(String[] args) throws IOException, InterruptedException, MqException { System.out.println("启动消费者!"); ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("127.0.0.1"); connectionFactory.setPort(9090); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); // 创建交换机 channel.exchangeDeclare("testExchange",ExchangeType.FANOUT,true,false,null); // 创建队列 channel.queueDeclare("testQueue1",true,false,false,null); // 设置绑定 channel.queueBind("testQueue1","testExchange",""); // 订阅消息 channel.basicConsume("testQueue1", true, new Consumer() { @Override public void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException { System.out.println("[testQueue1 消费数据] 开始!"); System.out.println("consumerTag = " + consumerTag); System.out.println("basicProperties = " + basicProperties); String bodyString = new String(body,0,body.length); System.out.println("body = " + bodyString); System.out.println("[testQueue1 消费数据] 结束!"); } }); // 由于消费者也不知道生产者要生产多少消息,就在这里通过这个循环模拟一直等待消费 while (true) { Thread.sleep(500); } } }
4、编写消费者B 代码
/* * 这个类表示一个消费者B * 通常这个类也应该是在一个独立的服务器中被执行 * */ public class DemoConsumerB { public static void main(String[] args) throws IOException, InterruptedException, MqException { System.out.println("启动消费者!"); ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("127.0.0.1"); connectionFactory.setPort(9090); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); // 创建交换机 channel.exchangeDeclare("testExchange", ExchangeType.FANOUT,true,false,null); // 创建队列 channel.queueDeclare("testQueue2",true,false,false,null); // 设置绑定 channel.queueBind("testQueue2","testExchange",""); // 订阅消息 channel.basicConsume("testQueue2", true, new Consumer() { @Override public void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException { System.out.println("[testQueue1 消费数据] 开始!"); System.out.println("consumerTag = " + consumerTag); System.out.println("basicProperties = " + basicProperties); String bodyString = new String(body,0,body.length); System.out.println("body = " + bodyString); System.out.println("[testQueue1 消费数据] 结束!"); } }); // 由于消费者也不知道生产者要生产多少消息,就在这里通过这个循环模拟一直等待消费 while (true) { Thread.sleep(500); } } }
5、启动 Spring Boot 项目(启动 Broker Server)
6、运行消费者A 代码
7、运行消费者B 代码
8、运行生产者代码
9、继续观察消费者A 的控制台
10、继续观察消费者B 的控制台
- 此处我们创建的交换机为 主题交换机
1、编写生产者代码
/* * 这个类用来表示一个生产着 * 通常这是一个单独的服务器程序 * */ public class DemoProducer { public static void main(String[] args) throws IOException, InterruptedException { System.out.println("启动生产者!"); ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("127.0.0.1"); connectionFactory.setPort(9090); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); // 创建交换机和队列 channel.exchangeDeclare("testExchange", ExchangeType.TOPIC,true,false,null); channel.queueDeclare("testQueue",true,false,false,null); // 创建消息A 并发送 byte[] body = "hello".getBytes(); boolean ok = channel.basicPublish("testExchange","ccc.aaa.bbb",null,body); System.out.println("消息投递完成! ok = " + ok); // 创建消息B 并发送 body = "hi".getBytes(); ok = channel.basicPublish("testExchange","aaa.bbb",null,body); System.out.println("消息投递完成! ok = " + ok); Thread.sleep(500); channel.close(); connection.close(); } }
3、编写消费者代码
/* * 这个类表示一个消费者 * 通常这个类也应该是在一个独立的服务器中被执行 * */ public class DemoConsumer { public static void main(String[] args) throws IOException, InterruptedException, MqException { System.out.println("启动消费者!"); ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("127.0.0.1"); connectionFactory.setPort(9090); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); // 创建交换机 channel.exchangeDeclare("testExchange", ExchangeType.TOPIC,true,false,null); // 创建队列 channel.queueDeclare("testQueue",true,false,false,null); // 设置绑定 channel.queueBind("testQueue","testExchange","*.aaa.bbb"); // 订阅消息 channel.basicConsume("testQueue", true, new Consumer() { @Override public void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException { System.out.println("[消费数据] 开始!"); System.out.println("consumerTag = " + consumerTag); System.out.println("basicProperties = " + basicProperties); String bodyString = new String(body,0,body.length); System.out.println("body = " + bodyString); System.out.println("[消费数据] 结束!"); } }); // 由于消费者也不知道生产者要生产多少消息,就在这里通过这个循环模拟一直等待消费 while (true) { Thread.sleep(500); } } }
4、启动 Spring Boot 项目(启动 Broker Server)
5、运行消费者代码
6、运行生产者代码
7、继续观察消费者的控制台