RabbitMQ是一个消息中间件:他可以接受并转发消息。例如你可以把它当做一个快递站点,当你要发送一个包 裹时,你把你的包裹放到快递站,快递员最终会把你的快递送到收件人那里,按照这种逻辑 RabbitMQ 是 一个快递站,一个快递员帮你传递快件。RabbitMQ 与快递站的主要区别在于,它不处理快件而是接收, 存储和转发消息数据
生产者:产生数据发送消息的程序
交换机:交换机是RabbitMQ中的一个重要的部件,一方面它接受来自生产者的消息,另一方面它将消息推送到队列中。交换机必须确切的知道如何处理它接受到的消息,是将这些消息推送到特定队列还是推送到多个队列,或者是把消息丢弃,这个得有交换机类型决定
队列:队列是RabbitMQ内部使用的一种数据结构,尽管消息流经RabbitMQ和应用程序,但是它们只能存储在的队列中。队列仅受主机内存和磁盘限制的约束,本质上是一个大的消息缓冲区。许多生产者可以将消息发送到一个队列,许多消费者可以尝试从一个队列接收数据。这就是我们使用队列的方式
消费者:消费者大多数是一个等待接收消息的程序。请注意生产者,消费者和消息中间件很多时候并不在一个机器上。同一个程序既可以是生产者也可以是消费者


Broker:接收和分发消息的应用,RabbitMQ Server 就是 Message Broker
Virtual host:出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类似 于网络中的 namespace 概念。当多个不同的用户使用同一个 RabbitMQ server 提供的服务时,可以划分出 多个 vhost,每个用户在自己的 vhost 创建 exchange/queue 等
Connection:publisher/consumer 和 broker 之间的 TCP 连接
Channel:如果每一次访问RabbitMQ都建立一个Connection,在消息量大的时候建立TCP Connection的开销是非常大的,效率是很低效的。Channel是在connect内部建立的逻辑连接,如果应用成型鼓支持多线程,通常每个 thread 创建单独的 channel 进行通讯,AMQP method 包含了 channel id 帮助客 户端和 message broker 识别 channel,所以 channel 之间是完全隔离的。Channel 作为轻量级的 Connection 极大减少了操作系统建立 TCP Connection 的开销
Exchange:message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发 消息到 queue 中去。常用的类型有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast)
Queue:消息最终被送到这里等待 consumer 取走
Binding:exchange 和 queue 之间的虚拟连接,binding 中可以包含 routing key,Binding 信息被保 存到 exchange 中的查询表中,用于 message 的分发依据
官网地址:rabbitmq.com/download.html
百度网盘大家自取:
链接:https://pan.baidu.com/s/1U7rdXf2yk9PRGxOJxhcY8A?pwd=d0jd
提取码:d0jd

以CentOS系统举例:将以上的软件安装使用命令scp
scp D:\Java学习课件\MQ\软件\rabbitmq-server-3.8.8-1.el7.noarch.rpm root@118.31.6.100:/root
- rpm -ivh erlang-21.3-1.el7.x86_64.rpm
- yum install socat -y
- rpm -ivh rabbitmq-server-3.8.8-1.el7.noarch.rpm
添加开机启动 RabbitMQ 服务
chkconfig rabbitmq-server on
启动服务
/sbin/service rabbitmq-server start
查看服务状态
/sbin/service rabbitmq-server status

停止服务(选择执行)
/sbin/service rabbitmq-server stop
开启 web 管理插件
rabbitmq-plugins enable rabbitmq_management
用默认账号密码(guest)访问地址 http://真实IP:15672/出现权限问题使用云服务器的一定要把云服务器的防火墙的打开

创建账号
rabbitmqctl add_user admin 123
设置用户角色
rabbitmqctl set_user_tags admin administrator
设置用户权限
- set permissions-p <vhostpath><user><conf> <write><read>
-
- rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"
用户 user_admin 具有/vhost1 这个 virtual host 中所有资源的配置、写、读权限
当前用户和角色
rabbitmqctl list_users

重置命令
关闭应用的命令
rabbitmqctl stop_app
清除的命令
rabbitmqctl reset
重新启动命令
rabbitmqctl start_app
引入依赖
- "1.0" encoding="UTF-8"?>
- <project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0modelVersion>
-
- <groupId>com.atguigu.rabbitmqgroupId>
- <artifactId>rabbitmq-helloartifactId>
- <version>1.0-SNAPSHOTversion>
-
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.pluginsgroupId>
- <artifactId>maven-compiler-pluginartifactId>
- <configuration>
- <source>8source>
- <target>8target>
- configuration>
- plugin>
- plugins>
- build>
- <dependencies>
-
- <dependency>
- <groupId>com.rabbitmqgroupId>
- <artifactId>amqp-clientartifactId>
- <version>5.8.0version>
- dependency>
-
- <dependency>
- <groupId>commons-iogroupId>
- <artifactId>commons-ioartifactId>
- <version>2.6version>
- dependency>
- <dependency>
- <groupId>ch.qos.logbackgroupId>
- <artifactId>logback-classicartifactId>
- <version>1.2.6version>
- dependency>
-
- dependencies>
-
- project>
生产者代码:
- package com.rabbitmq.one;
-
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
-
- import java.io.IOException;
- import java.nio.charset.StandardCharsets;
- import java.util.concurrent.TimeoutException;
-
- /**
- * 生产者:发消息
- */
-
- public class Produce {
- public static final String QUEUE_NAME="hello";
-
- public static void main(String[] args) throws IOException, TimeoutException {
- //创建工厂
- ConnectionFactory factory = new ConnectionFactory();
- //工厂IP连接rabbitmq
- factory.setHost("118.31.6.132");
- //用户名
- factory.setUsername("admin");
- //密码
- factory.setPassword("123");
- //创建连接
- Connection connection = factory.newConnection();
- //获取信道
- Channel channel = connection.createChannel();
- /*
- *生成一个队列
- * 1.队列名称
- * 2.队列里面的信息是否持久化(磁盘)默认情况时在内存
- * 3.该队列是否只供一个消费者进行消费 是否消费共享 true是允许
- * 4.是否自动删除 最后一个消费者断开连接之后 该队列是否自动删除 true自动删除 false不自动删除
- * 5.其他参数 延迟消息等
- */
- channel.queueDeclare(QUEUE_NAME,false,false,false,null);
- //发消息
- String message = "hello world";
- /**
- * 发送一个消息
- * 1.发送到那个交换机
- * 2.路由的KEY值是哪个 本次是队列的名称
- * 3.其他参数信息
- * 4.发送消息的消息体
- */
- channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
- System.out.println("消息发送完毕!");
- }
- }
消费者代码:
- package com.rabbitmq.one;
-
- import com.rabbitmq.client.*;
-
- import java.io.IOException;
- import java.util.Collections;
- import java.util.concurrent.TimeoutException;
-
- /**
- * 消费者
- */
- public class Consume {
- //队列名称
- public static final String QUEUE_NAME = "hello";
- //接受消息
- public static void main(String[] args) throws IOException, TimeoutException {
- //创建连接工厂
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("118.31.6.132");
- factory.setUsername("admin");
- factory.setPassword("123");
- Connection connection = factory.newConnection();
- Channel channel = connection.createChannel();
- //声明 接受消息
- DeliverCallback deliverCallback = (consumerTag,message)->{
- System.out.println(new String(message.getBody()));
- };
- //取消消息的回调
- CancelCallback cancelCallback = consumerTag->{
- System.out.println("消息消费被中断");
- };
- /**
- * 消费者信息
- * 1.消费哪个队列
- * 2.消费成功之后是否要自动应答 true自动应答 false手动应答
- * 3.消费者微车才能更改消费的回调
- * 4.消费者取消消费回调
- */
- channel.basicConsume(QUEUE_NAME,true, deliverCallback,cancelCallback);
- }
- }
运行结果:
