- package com.dxw.producer;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
- /**
- * 生产者:发送消息
- */
- public class Producer_WorkQueues {
- public static void main(String[] args) throws
- IOException, TimeoutException {
- //1、创建连接工厂
- ConnectionFactory factory = new
- ConnectionFactory();
- //2、设置参数
- factory.setHost("localhost");//ip 默认localhost
- factory.setPort(5672);//端口 默认5672
- factory.setVirtualHost("/dxw");//虚拟机 默认/
- factory.setUsername("dxw");//用户名 默认guest
- factory.setPassword("1234");//密码 默认guest
- //3、创建连接
- Connection connection = factory.newConnection();
- //4、创建Channel
- Channel channel = connection.createChannel();
- //5、创建队列
- /*
- * 参数解释:
- * queueDeclare(String queue,
- * boolean durable,
- * boolean exclusive,
- * boolean autoDelete,
- * Map
arguments) - * 1. queue:队列名称
- * 如果没有一个名字叫hello_world的队列,则会创建该队
- 列,如果有则不会创建
- * 2. durable:是否持久化,当mq重启之后,队列中消息还在
- * 3. exclusive:
- * 是否独占。只能有一个消费者监听这队列
- * 当Connection关闭时,是否删除队列
- * 4. autoDelete:是否自动删除。当没有Consumer时,自动
- 删除掉
- * 5. arguments:参数。
- */
- channel.queueDeclare("work_queues",true,false,false,null)
- ;
- //6、发送消息
- /*
- * 参数解释:
- * basicPublish(String exchange,
- * String routingKey,
- * BasicProperties props,
- * byte[] body)
- * 1. exchange:交换机名称。简单模式下交换机会使用默认的
- ""
- * 2. routingKey:路由名称
- * 3. props:配置信息
- * 4. body:发送消息数据
- 启动生产者观察控制台
- 2、消费者
- 第一个消费者代码Consumer_WorkQueues1:
- */
- for(int i=1;i<=10;i++){
- String body = i+"hello rabbitmq~~~";
- channel.basicPublish("","work_queues",null,body.getBytes(
- ));
- }
- //7、释放资源
- //channel.close();
- //connection.close();
- }
- }
- package com.dxw.consumer;
- import com.rabbitmq.client.*;
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
- /**
- * 消费者:接收消息
- */
- public class Consumer_WorkQueues1 {
- public static void main(String[] args) throws
- IOException, TimeoutException {
- //1、创建连接工厂
- ConnectionFactory factory = new
- ConnectionFactory();
- //2. 设置参数
- factory.setHost("localhost");//ip 默认值 localhost
- factory.setPort(5672); //端口 默认值 5672
- factory.setVirtualHost("/dxw");//虚拟机 默认/
- factory.setUsername("dxw");//用户名 默认guest
- factory.setPassword("1234");//密码 默认guest
- //3. 创建连接 Connection
- Connection connection = factory.newConnection();
- //4. 创建Channel
- Channel channel = connection.createChannel();
- //5、创建队列
- /*
- * 参数解释:
- * queueDeclare(String queue,
- * boolean durable,
- * boolean exclusive,
- * boolean autoDelete,
- * Map
arguments) - * 1. queue:队列名称
- * 如果没有一个名字叫hello_world的队列,则会创建该
- 队列,如果有则不会创建
- * 2. durable:是否持久化,当mq重启之后,队列中消息还在
- * 3. exclusive:
- * 是否独占。只能有一个消费者监听这队列
- * 当Connection关闭时,是否删除队列
- * 4. autoDelete:是否自动删除。当没有Consumer时,自
- 动删除掉
- * 5. arguments:参数。
- */
- channel.queueDeclare("work_queues",true,false,false,null)
- ;
- //6、接收消息
- Consumer consumer = new DefaultConsumer(channel){
- /*
- 回调方法,当收到消息后,会自动执行该方法
- 1. consumerTag:标识
- 2. envelope:获取一些信息,交换机,路由key...
- 3. properties:配置信息
- 4. body:数据
- */
- @Override
- public void handleDelivery(String consumerTag,
- Envelope envelope, AMQP.BasicProperties properties, byte[]
- body) throws IOException {
- /*System.out.println("consumerTag:"+consumerTag);
- System.out.println("Exchange:"+envelope.getExchange());
- System.out.println("RoutingKey:"+envelope.getRoutingKey(
- ));
- System.out.println("properties:"+properties);*/
- System.out.println("body:"+new
- String(body));
- }
- };
- /*
- * 参数解释:
- * basicConsume(String queue, boolean autoAck,
- Consumer callback)
- * 1. queue:队列名称
- * 2. autoAck:是否自动确认
- * 3. callback:回调对象
- */
- channel.basicConsume("work_queues",true,consumer);
- //关闭资源?不要
- }
- }
- package com.dxw.consumer;
- import com.rabbitmq.client.*;
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
- /**
- * 消费者:接收消息
- */
- public class Consumer_WorkQueues2 {
- public static void main(String[] args) throws
- IOException, TimeoutException {
- //1、创建连接工厂
- ConnectionFactory factory = new
- ConnectionFactory();
- //2. 设置参数
- factory.setHost("localhost");//ip 默认值 localhost
- factory.setPort(5672); //端口 默认值 5672
- factory.setVirtualHost("/dxw");//虚拟机 默认/
- factory.setUsername("dxw");//用户名 默认guest
- factory.setPassword("1234");//密码 默认guest
- //3. 创建连接 Connection
- Connection connection = factory.newConnection();
- //4. 创建Channel
- Channel channel = connection.createChannel();
- //5、创建队列
- /*
- * 参数解释:
- * queueDeclare(String queue,
- * boolean durable,
- * boolean exclusive,
- * boolean autoDelete,
- * Map
arguments) - * 1. queue:队列名称
- * 如果没有一个名字叫hello_world的队列,则会创建该
- 队列,如果有则不会创建
- * 2. durable:是否持久化,当mq重启之后,队列中消息还在
- * 3. exclusive:
- * 是否独占。只能有一个消费者监听这队列
- * 当Connection关闭时,是否删除队列
- * 4. autoDelete:是否自动删除。当没有Consumer时,自
- 动删除掉
- * 5. arguments:参数。
- */
- channel.queueDeclare("work_queues",true,false,false,null)
- ;
- //6、接收消息
- Consumer consumer = new DefaultConsumer(channel){
- /*
- 回调方法,当收到消息后,会自动执行该方法
- 1. consumerTag:标识
- 2. envelope:获取一些信息,交换机,路由key...
- 3. properties:配置信息
- 4. body:数据
- */
- @Override
- public void handleDelivery(String consumerTag,
- Envelope envelope, AMQP.BasicProperties properties, byte[]
- body) throws IOException {
- /*System.out.println("consumerTag:"+consumerTag);
- System.out.println("Exchange:"+envelope.getExchange());
- System.out.println("RoutingKey:"+envelope.getRoutingKey(
- ));
- System.out.println("properties:"+properties);*/
- System.out.println("body:"+new
- String(body));
- }
- };
- /*
- * 参数解释:
- * basicConsume(String queue, boolean autoAck,
- Consumer callback)
- * 1. queue:队列名称
- * 2. autoAck:是否自动确认
- * 3. callback:回调对象
- 注意:先启动两个消费者,然后再启动生产者,然后观察两个生产者控制台输出
- 3、Pub/Sub订阅模式
- 1、模式介绍
- 在订阅模型中,多了一个 Exchange 角色,而且过程略有变化:
- ⚫ P:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是
- 发给X(交换机)
- ⚫ C:消费者,消息的接收者,会一直等待消息到来
- ⚫ Queue:消息队列,接收消息、缓存消息
- ⚫ Exchange:交换机(X)。一方面,接收生产者发送的消息。另一方
- 面,知道如何处理消息,例如递交给某个特别队 列、递交给所有队列、
- */
- channel.basicConsume("work_queues",true,consumer);
- //关闭资源?不要
- }
- }