目录
我们在使用RabbitMQ需要先在Docker中拉取RabbitMQ使用下面命令拉取
- systemctl start docker #启动docker
- docker pull rabbitmq:3-management
- #安装这个版本的rabbitmq
这里我之前安装了最新的RabbitMQ发现打开不了网站,我用了这个版本的就打开了
在拉取镜像成功后,使用创建容器的命令创建RabbitMQ容器
- docker run \
- -e RABBITMQ_DEFAULT_USER=demo \
- -e RABBITMQ_DEFAULT_PASS=123456 \
- --name rq \
- -p 15672:15672 \
- -p 5672:5672 \
- -d \
- rabbitmq:3-management
这样就算安装完成了RabbitMQ,并且我们登陆RabbitMQ
我们现在开始使用Java来发起最基本的publisher->equeue->consume,发布给队列,队列取值这种效果。
首先先导入依赖和配置yml文件
- <dependency>
- <groupId>org.springframework.bootgroupId>
- <artifactId>spring-boot-starter-amqpartifactId>
- dependency>
- spring:
- rabbitmq:
- host: 192.168.3.26
- port: 5672
- virtual-host: /
- username: demo
- password: 123456
写一个测试发布的,在这之前我们需要,先创建一个队列(注意:这里已经使用了SpringBootTest,所以如果在同一个包下,如果有SpringbootApplicaiton的注解需要先注释掉,不然是会报错的)
-
- @RunWith(SpringRunner.class)
- @SpringBootTest
- public class Publisher {
- @Autowired
- private RabbitTemplate rabbitTemplate;
-
- @Test
- public void test1(){
- String queuename = "simple.queue";
- String message = "hello,spring";
- rabbitTemplate.convertAndSend(queuename,message);
- }
- }
这样之后,我们在写一个消费者,用于接收队列中的数据(这个我们需要springboot启动后,一直做监听)消费者中配置一样的yml文件和依赖
- @Component
- public class Consum {
- @RabbitListener(queues="simple.queue")
- public void listenT(String msg){
- System.out.println("接收到消息:"+msg);
- }
- }
我首先没有启动消费者的springboot,所以会在队列中看到有一条数据
然后我们启动消费者
成功的接收到了队列中的消息,队列的消息也没有了,已经传送过来了
下面我们将讲解workQueue
workQueue是一个发布者,和多个消费者,但是当中会有预取机制,会将每一个传送来的值均匀分布在每一个消费者上。
我将发布的利用循环,让其发送5次信息
- @RunWith(SpringRunner.class)
- @SpringBootTest
- @EnableRabbit
- public class Publisher {
- @Autowired
- private RabbitTemplate rabbitTemplate;
-
- @Test
- public void test1(){
- String queuename = "simple.queue";
- String message = "hello,springmq";
- for (int i=0;i<5;i++)
- {
- rabbitTemplate.convertAndSend(queuename,message);
- }
- }
- }
消费者中使用两个监听的来获取同一个队列
- @Component
- public class Consum {
- @RabbitListener(queues="simple.queue")
- public void listenT1(String msg){
- System.out.println("demo1接收到消息:"+msg);
- }
- @RabbitListener(queues="simple.queue")
- public void listenT2(String msg){
- System.out.println("demo2接收到消息:"+msg);
- }
- }
结果如下
这就是workQueue的作用,下面我们将讲解FanoutExchange
交换机在这里作用就是改变我们使用的传输数据的方式,不是通过发布到队列,然后到队列当中去取,而是通过发布者发布给交换机,又交换机交给不同的队列,再由消费者来调用。
我们更改代码,现在使用注解的方式来进行交换机和队列的创建,并拉取他们之间连接,这里我们首先先创建交换机和队列以及彼此的连接,这里我新建了一个类,和监听的分开,更符合微服务的思想
- @Configuration
- public class Exchange {
- @Bean
- public FanoutExchange FanoutExchange1(){
- return new FanoutExchange("demo.list");
- }
- @Bean
- public Queue fanoutqueue1(){
- return new Queue("fanout.queue1");
- }
- @Bean
- public Queue fanoutqueue2(){
- return new Queue("fanout.queue2");
- }
- @Bean
- public Binding fanoutbinding(Queue fanoutqueue1, FanoutExchange FanoutExchange1){
- return BindingBuilder.bind(fanoutqueue1).to(FanoutExchange1);
- }
- @Bean
- public Binding fanoutbinding2(Queue fanoutqueue2, FanoutExchange FanoutExchange1){
- return BindingBuilder.bind(fanoutqueue2).to(FanoutExchange1);
- }
- }
这里我创建两个队列和一个交换机。下面是监听类中添加对两种队列的监听。
- @Component
- public class Consum {
-
- @RabbitListener(queues="simple.queue")
- public void listenT1(String msg){
- System.out.println("demo1接收到消息:"+msg);
- }
- @RabbitListener(queues="simple.queue")
- public void listenT2(String msg){
- System.out.println("demo2接收到消息:"+msg);
- }
- @RabbitListener(queues="fanout.queue1")
- public void listenT3(String msg){
- System.out.println("demofanout1接收到消息:"+msg);
- }
- @RabbitListener(queues="fanout.queue2")
- public void listenT4(String msg){
- System.out.println("demofanou2接收到消息:"+msg);
- }
- }
发送就改变不是向队列发送,而是向交换机发送
- @RunWith(SpringRunner.class)
- @SpringBootTest
- @EnableRabbit
- public class Publisher {
- @Autowired
- private RabbitTemplate rabbitTemplate;
-
- @Test
- public void test1(){
- String exchangename = "demo.list";
- String message = "hello,springmq";
- rabbitTemplate.convertAndSend(exchangename,"",message);
- }
- }
在交换机和发送的消息中有一个“”,这个后面给大家说清楚,下面看结果
成功得到信息,然后讲解DirectExchange
DirectExchange会将接受到消息根据规则路由到指定的Queue,当中重要的是使用bindingkey来控制,可以使用@RabbitListener声明Exange,Queue,Routingkey。
这里我们讲解了新的注解,就是我们可以直接在@RabbitListener中添加一些参数这样就可以不用使用@Bean的方法去一个个的声明了。从这里大家就可以看出来DirectExchange的不同点就在于使用的bindingkey,而这个值正是""。下面直接上代码
这边是消费者我们使用注解的形式进行开发,也是最简单的一种,注意因为上面我们使用的是finout,所以我们这里需要先删除一次交换机,再执行代码,让其自动创建新的交换机
- @Component
- public class Consum {
-
- @RabbitListener(queues="simple.queue")
- public void listenT1(String msg){
- System.out.println("demo1接收到消息:"+msg);
- }
- @RabbitListener(queues="simple.queue")
- public void listenT2(String msg){
- System.out.println("demo2接收到消息:"+msg);
- }
- @RabbitListener(queues="fanout.queue1")
- public void listenT3(String msg){
- System.out.println("demofanout1接收到消息:"+msg);
- }
- @RabbitListener(queues="fanout.queue2")
- public void listenT4(String msg){
- System.out.println("demofanou2接收到消息:"+msg);
- }
- @RabbitListener(bindings = @QueueBinding(
- value = @Queue(name = "direct.queue1"),
- exchange = @Exchange(name = "demo.list", type = ExchangeTypes.DIRECT),
- key = {"red", "blue"}
- ))
- public void listen5(String msg) {
- System.out.println("demodirect1接受到消息:" + msg);
- }
-
- @RabbitListener(bindings = @QueueBinding(
- value = @Queue(name = "direct.queue2"),
- exchange = @Exchange(name = "demo.list", type = ExchangeTypes.DIRECT),
- key = {"red", "yellow"}
- ))
- public void listen6(String msg) {
- System.out.println("demodirect2接受到消息:" + msg);
- }
-
- }
发布者添加我们需要识别的bindkey就可以了
-
- @RunWith(SpringRunner.class)
- @SpringBootTest
- @EnableRabbit
- public class Publisher {
- @Autowired
- private RabbitTemplate rabbitTemplate;
-
- @Test
- public void test1(){
- String exchangename = "demo.list";
- String message = "hello,springmq";
- rabbitTemplate.convertAndSend(exchangename,"red",message);
- }
- }
这里我们先传red的,也就是两个都能收到
我们把当中bindingkey更改为blue也就是只有1能收到
-
- @RunWith(SpringRunner.class)
- @SpringBootTest
- @EnableRabbit
- public class Publisher {
- @Autowired
- private RabbitTemplate rabbitTemplate;
-
- @Test
- public void test1(){
- String exchangename = "demo.list";
- String message = "hello,springmq";
- rabbitTemplate.convertAndSend(exchangename,"blue",message);
- }
- }
这就是DirectExchange,后面还有一种是TopicExchange,这种的用法没有什么不同,就是增加了可以使用通配符来代替交换机的名称,只是偷偷懒,最后再讲一种:消息转换器