2.查看防火墙状态,如果是开启状态则需要关闭防火墙
3.通过浏览器访问rabbitmq控制台,ip+15672端口号 ,例如http://192.168.174.129:15672
登录时输入自己的此前设置的登录名和密码
4.打开idea,创建rabbitmq-product-java模块
导入依赖
- <dependencies>
- <dependency>
- <groupId>com.rabbitmqgroupId>
- <artifactId>amqp-clientartifactId>
- <version>5.1.1version>
- dependency>
- dependencies>
编写主程序代码
- package com.it.rabbitmq;
-
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
-
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
-
- public class Product {
-
- public static void main(String[] args) {
- //创建连接工厂
- ConnectionFactory factory=new ConnectionFactory();
- //配置rabbitMQ的连接信息
- factory.setHost("192.168.174.129");
- factory.setPort(5672);
- factory.setUsername("root");
- factory.setPassword("123456");
- //定义连接
- Connection connection=null;
- //定义通道
- Channel channel=null;
-
- try {
- connection=factory.newConnection();//获取连接
- channel=connection.createChannel();//获取通道
- /**
- * 声明一个队列
- * 参数1:为队列名取任意值
- * 参数2:是否为持久化队列
- * 参数3:是否排外,如果排外则这个队列只允许一个消费者监听
- * 参数4:是否自动删除,如果为true则表示当前队列中没有消息,也没有消费者连接时就会自动删除这个队列。
- * 参数5:为队列的一些属性设置通常为null即可
- */
- channel.queueDeclare("myQueue",true,false,false,null);
- //定义消息
- String message="我的RabbitMQ的测试消息";
- /**
- * 发送消息
- * 参数1:为交换机名称,这里为空字符串表示不使用交换机
- * 参数2:为队列名或RoutingKey,当指定了交换机名称以后这个值就是RoutingKey
- * 参数3:为消息的属性信息,通常为空即可
- * 参数4:为具体的消息数据的字节数组
- */
- channel.basicPublish("","myQueue",null,message.getBytes("utf-8"));
- System.out.println("消息发送成功!");
-
- } catch (IOException e) {
- e.printStackTrace();
- } catch (TimeoutException e) {
- e.printStackTrace();
- }finally {
- if (channel!=null){
- try {
- channel.close();
- } catch (IOException e) {
- e.printStackTrace();
- } catch (TimeoutException e) {
- e.printStackTrace();
- }
- }
- if (connection!=null){
- try {
- connection.close();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- }
-
-
- }
-
- }
运行代码
通过rabbitmq控制台查看添加的消息
点击该条消息进入消息详情页
点击消息出队
再次返回消息队列,此时消息队列已经没有了消息,消息已经模拟出队了
4.模拟连续向队列中放两次消息,这两条消息的队列名称相同,内容不同。
修改Idea消息的内容再向队列里发送一次消息
运行一次
进入rabbitmq控制台
修改Idea消息的内容再向队列里发送一次消息
运行一次
进入rabbitmq控制台
点击进入消息详细页
模拟从队列中取出消息,查看取出的消息内容
由此可知每次只能模拟取一条数据,取消息时符合队列先进先出的理念
1.在idea中创建rabbitmq-consumer-java的模块
2.导入Maven依赖
- <dependencies>
- <dependency>
- <groupId>com.rabbitmqgroupId>
- <artifactId>amqp-clientartifactId>
- <version>5.1.1version>
- dependency>
- dependencies>
3.创建主函数
- package com.it.rabbitmq;
-
- import com.rabbitmq.client.*;
-
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
-
- public class Consumer {
-
- public static void main(String[] args) throws IOException, TimeoutException {
- ConnectionFactory factory = new ConnectionFactory();
- factory.setUsername("root");
- factory.setPassword("root");
- factory.setHost("192.168.222.128");
- //建立到代理服务器到连接
- Connection conn = factory.newConnection();
- //获得信道
- final Channel channel = conn.createChannel();
- //声明队列
- channel.queueDeclare("myQueue", true, false, false, null);
- //消费消息
- boolean autoAck = true;
- String consumerTag = "";
- //接收消息
- //参数1 队列名称
- //参数2 是否自动确认消息 true表示自动确认 false表示手动确认
- //参数3 为消息标签 用来区分不同的消费者这里暂时为""
- // 参数4 消费者回调方法用于编写处理消息的具体代码(例如打印或将消息写入数据库)
- //注意:使用了basicConsume方法以后,会启动一个线程持续的监听队列,如果队列中有信息的数据进入则会自动接收消息
- //因此不能关闭连接和通道对象
- channel.basicConsume("myQueue", autoAck, consumerTag, new DefaultConsumer(channel) {
- @Override
- public void handleDelivery(String consumerTag,
- Envelope envelope,
- AMQP.BasicProperties properties,
- byte[] body) throws IOException {
- //获取消息数据
- String bodyStr = new String(body, "UTF-8");
- System.out.println("消费者-- "+bodyStr);
- }
- });
- // channel.close();
- // conn.close();
- }
-
- }
4.运行主函数
5.运行发送消息主函数
6.再次运行接收消息主函数
7.此时进入控制台查看队列,发现队列中的消息已经全部取出
注意:
1、Queue的消息只能被同一个消费者消费,如果没有消费监听队列那么消息会存放到队列中持久化保存,直到有消费者来消费这个消息,如果有消费者监听队列则立即消费发送到队列中的消息
2、Queue的消息可以保证每个消息都一定能被消费
3、消息接收编码时,不能关闭连接,一旦关闭就无法持续监听,照成功能失效。