📃个人主页:不断前进的皮卡丘
🌞博客描述:梦想也许遥不可及,但重要的是追梦的过程,用博客记录自己的成长,记录自己一步一步向上攀登的印记
🔥个人专栏:消息中间件
先来看看RabbitMQ架构图
简单模式
我们先创建一个maven工程,然后引入相关依赖
<dependencies>
<dependency>
<groupId>com.rabbitmqgroupId>
<artifactId>amqp-clientartifactId>
<version>5.16.0version>
<scope>compilescope>
dependency>
dependencies>
public class Producer {
//队列名称
static final String QUEUE_NAME = "helo-queue";
public static void main(String[] args) {
try {
//1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置连接参数
//服务器IP地址
factory.setHost("192.168.88.133");
//连接端口
factory.setPort(5672);
//设置连接的虚拟机名称
factory.setVirtualHost("/myhost");
//用户名
factory.setUsername("admin");
//密码
factory.setPassword("123456");
//2.创建Connection对象
Connection connection = factory.newConnection();
//3.创建信道对象
Channel channel = connection.createChannel();
//4.声明队列(队列名称,是否持久化,是否独占连接,是否在不适用队列的时候自动删除,队列其他参数)
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
//5.准备发送信息
String msg="hello rabbitmq!!!!!";
/**
* 参数1:交换机名称,不填写交换机名称的话则使用默认的交换机
* 参数2:队列名称(路由key)
* 参数3:其他参数
* 参数4:消息内容
*/
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
System.out.println("已经发送消息到队列");
// 关闭资源
channel.close();
connection.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
然后我们运行程序,在浏览器输入服务器地址以及对应端口15672,我们可以看到队列以及发送的消息
public class Consumer {
//队列名称
static final String QUEUE_NAME = "helo-queue";
public static void main(String[] args) {
try {
//1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置连接参数
//服务器IP地址
factory.setHost("192.168.88.133");
//连接端口
factory.setPort(5672);
//设置连接的虚拟机名称
factory.setVirtualHost("/myhost");
//用户名
factory.setUsername("admin");
//密码
factory.setPassword("a87684009.");
//2.创建Connection对象
Connection connection = factory.newConnection();
//3.创建信道对象
Channel channel = connection.createChannel();
//4.声明队列(队列名称,是否持久化,是否独占连接,是否在不使用队列的时候自动删除,队列其他参数)
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
//5.接收消息
DefaultConsumer consumer=new DefaultConsumer(channel){
/**
* 消费回调函数,当收到消息以后,会自动执行这个方法
* @param consumerTag 消费者标识
* @param envelope 消息包的内容(比如交换机,路由key,消息id等)
* @param properties 属性信息
* @param body 消息数据
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消息内容body:"+new String(body,"UTF-8"));
}
};
//监听消息(队列名称,是否自动确认消息,消费对象)
channel.basicConsume(QUEUE_NAME,true,consumer);
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
消费者不能关闭连接,生产者可以关闭连接