RabbitMQ安装部署详情可见:RabbitMQ简介及在Linux中安装部署(yum)

- <dependency>
- <groupId>com.rabbitmqgroupId>
- <artifactId>amqp-clientartifactId>
- <version>5.8.0version>
- dependency>
- <dependency>
- <groupId>org.slf4jgroupId>
- <artifactId>slf4j-nopartifactId>
- <version>1.7.29version>
- dependency>
发送消息类:Send.java
- public class Send {
-
- private final static String QUEUE_NAME = "hello";
-
- public static void main(String[] args) throws IOException, TimeoutException {
- //创建连接工厂
- ConnectionFactory factory = new ConnectionFactory();
- //设置RabbitMQ地址
- factory.setHost("192.168.119.129");
- factory.setUsername("admin");
- factory.setPassword("password");
- //建立连接
- Connection connection = factory.newConnection();
- //获得信道
- Channel channel = connection.createChannel();
- //声明队列
- channel.queueDeclare(QUEUE_NAME, false, false, false, null);
- //发布消息
- String message = "你好,老6";
- channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
- System.out.println("发送了消息:" + message);
- //关闭连接
- channel.close();
- connection.close();
- }
- }
接收消息类:Recv.java
- public class Recv {
- private final static String QUEUE_NAME = "hello";
-
- public static void main(String[] args) throws IOException, TimeoutException {
- //创建连接工厂
- ConnectionFactory factory = new ConnectionFactory();
- //设置RabbitMQ地址
- factory.setHost("192.168.119.129");
- factory.setUsername("admin");
- factory.setPassword("password");
- //建立连接
- Connection connection = factory.newConnection();
- //获得信道
- Channel channel = connection.createChannel();
- //声明队列
- channel.queueDeclare(QUEUE_NAME, false, false, false, null);
- //接收消息并消费
- channel.basicConsume(QUEUE_NAME, true, new DefaultConsumer(channel){
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope,
- BasicProperties properties, byte[] body) throws IOException {
- String message = new String(body, "UTF-8");
- System.out.println("收到消息:" + message);
- }
- });
- }
- }
运行结果: 
发送多任务消息类NewTask.java
- public class NewTask {
-
- private final static String TASK_QUEUE_NAME = "task_queue";
-
- public static void main(String[] args) throws IOException, TimeoutException {
- //创建连接工厂
- ConnectionFactory factory = new ConnectionFactory();
- //设置RabbitMQ地址
- factory.setHost("192.168.119.129");
- factory.setUsername("admin");
- factory.setPassword("password");
- //建立连接
- Connection connection = factory.newConnection();
- //获得信道
- Channel channel = connection.createChannel();
- //声明队列
- channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
- for (int i = 0; i < 10; i++) {
- String message;
- if (i % 2 == 0) {
- message = i + "...";
- } else {
- message = String.valueOf(i);
- }
- channel.basicPublish("", TASK_QUEUE_NAME, null, message.getBytes("UTF-8"));
- System.out.println("发送了消息:" + message);
- }
-
- channel.close();
- connection.close();
- }
- }
接收多任务消息类Worker.java
- public class Worker {
-
- private final static String TASK_QUEUE_NAME = "task_queue";
-
- public static void main(String[] args) throws IOException, TimeoutException {
- //创建连接工厂
- ConnectionFactory factory = new ConnectionFactory();
- //设置RabbitMQ地址
- factory.setHost("192.168.119.129");
- factory.setUsername("admin");
- factory.setPassword("password");
- //建立连接
- Connection connection = factory.newConnection();
- //获得信道
- final Channel channel = connection.createChannel();
- //声明队列
- channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
- System.out.println("开始接收消息");
- channel.basicQos(1);
-
- channel.basicConsume(TASK_QUEUE_NAME, false, new DefaultConsumer(channel) {
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope,
- BasicProperties properties, byte[] body) throws IOException {
- String message = new String(body, "UTF-8");
- System.out.println("收到了消息:" + message);
- try {
- doWork(message);
- } finally {
- System.out.println("消息处理完成");
- channel.basicAck(envelope.getDeliveryTag(), false);
- }
- }
- });
- }
-
- private static void doWork(String task) {
- char[] chars = task.toCharArray();
- for (char ch : chars) {
- if (ch == '.') {
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- }
- }
运行结果:

发送消息类:EmitLogDirect.java
- public class EmitLogDirect {
-
- private static final String EXCHANGE_NAME = "direct_logs";
-
- public static void main(String[] args) throws IOException, TimeoutException {
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("192.168.119.129");
- factory.setUsername("admin");
- factory.setPassword("password");
- Connection connection = factory.newConnection();
- Channel channel = connection.createChannel();
- channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
-
- String message = "info:Hello World!";
- channel.basicPublish(EXCHANGE_NAME, "info", null, message.getBytes("UTF-8"));
- System.out.println("发送了消息," + "等级为info,消息内容:" + message);
-
- message = "warning:Hello World!";
- channel.basicPublish(EXCHANGE_NAME, "warning", null, message.getBytes("UTF-8"));
- System.out.println("发送了消息," + "等级为warning,消息内容:" + message);
-
- message = "error:Hello World!";
- channel.basicPublish(EXCHANGE_NAME, "error", null, message.getBytes("UTF-8"));
- System.out.println("发送了消息," + "等级为error,消息内容:" + message);
- channel.close();
- connection.close();
- }
- }
接收所有消息类:ReceiveLogsDirect1.java
- public class ReceiveLogsDirect1 {
- private static final String EXCHANGE_NAME = "direct_logs";
-
- public static void main(String[] args) throws IOException, TimeoutException {
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("192.168.119.129");
- factory.setUsername("admin");
- factory.setPassword("password");
- Connection connection = factory.newConnection();
- Channel channel = connection.createChannel();
- channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
- //生成一个随机的临时的queue
- String queueName = channel.queueDeclare().getQueue();
- //一个交换机同时绑定3个queue
- channel.queueBind(queueName, EXCHANGE_NAME, "info");
- channel.queueBind(queueName, EXCHANGE_NAME, "warning");
- channel.queueBind(queueName, EXCHANGE_NAME, "error");
-
- System.out.println("开始接收消息");
- Consumer consumer = new DefaultConsumer(channel) {
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope,
- BasicProperties properties, byte[] body) throws IOException {
- String message = new String(body, "UTF-8");
- System.out.println("收到消息:" + message);
- }
- };
- channel.basicConsume(queueName, true, consumer);
- }
- }
仅仅接收error消息类:ReceiveLogsDirect2.java
- public class ReceiveLogsDirect2 {
- private static final String EXCHANGE_NAME = "direct_logs";
-
- public static void main(String[] args) throws IOException, TimeoutException {
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("192.168.119.129");
- factory.setUsername("admin");
- factory.setPassword("password");
- Connection connection = factory.newConnection();
- Channel channel = connection.createChannel();
- channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
- //生成一个随机的临时的queue
- String queueName = channel.queueDeclare().getQueue();
- //一个交换机绑定1个queue
- channel.queueBind(queueName, EXCHANGE_NAME, "error");
-
- System.out.println("开始接收消息");
- Consumer consumer = new DefaultConsumer(channel) {
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope,
- BasicProperties properties, byte[] body) throws IOException {
- String message = new String(body, "UTF-8");
- System.out.println("收到消息:" + message);
- }
- };
- channel.basicConsume(queueName, true, consumer);
- }
- }
运行结果:
