RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)
使用 docker安装 RabbitMQ和延时插件,实现消息延时消费
目录
docker拉取镜像
docker pull rabbitmq:3.10-management
开启容器
docker run -it --name rabbitmq -p 5672:5672 -p 15672:15672 -d rabbitmq:3.10-management
开启容器后,浏览器访问宿主机器 ip+15672端口,访问RabbitMQ管理页面
笔者的宿主机器 ip 是 192.168.5.25
初始账号密码都是guest
下载插件地址:https://www.rabbitmq.com/community-plugins.html
找到 rabbitmq_delayed_message_exchange
点击进入下载页面
找到对应的版本进行下载
下载插件后,将插件上传到服务器
使用 docker 命令将插件复制到容器内部 plugins目录下
docker cp rabbitmq_delayed_message_exchange-3.10.0.ez rabbitmq:/plugins
进入容器内部进行查看
docker exec -it rabbitmq bash
进入 plugins 目录查看
cd plugins
执行命令安装插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
通过命令查看已安装的插件
rabbitmq-plugins list
退出容器后,重启 rabbitmq 容器
docker restart rabbitmq
重启完成后,进入管理页面,点击交换机 Exchange,点开 Add a new exchange,查看交换机类型 Type,发现里面新增了 x-delayed-message 类型,则延时插件安装成功
新建 SpringBoot 项目
pom.xml内容
- "1.0" encoding="UTF-8"?>
- <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0modelVersion>
- <groupId>com.wsjzzcbqgroupId>
- <artifactId>rabbitmq-demoartifactId>
- <version>0.0.1-SNAPSHOTversion>
- <name>rabbitmq-demoname>
- <description>Demo project for Spring Bootdescription>
-
- <properties>
- <java.version>1.8java.version>
- <project.build.sourceEncoding>UTF-8project.build.sourceEncoding>
- <project.reporting.outputEncoding>UTF-8project.reporting.outputEncoding>
- <spring-boot.version>2.3.7.RELEASEspring-boot.version>
- properties>
-
- <dependencies>
- <dependency>
- <groupId>org.springframework.bootgroupId>
- <artifactId>spring-boot-starter-amqpartifactId>
- dependency>
- <dependency>
- <groupId>org.springframework.bootgroupId>
- <artifactId>spring-boot-starter-webartifactId>
- dependency>
-
- <dependency>
- <groupId>org.projectlombokgroupId>
- <artifactId>lombokartifactId>
- <optional>trueoptional>
- dependency>
- <dependency>
- <groupId>org.springframework.bootgroupId>
- <artifactId>spring-boot-starter-testartifactId>
- <scope>testscope>
- <exclusions>
- <exclusion>
- <groupId>org.junit.vintagegroupId>
- <artifactId>junit-vintage-engineartifactId>
- exclusion>
- exclusions>
- dependency>
- <dependency>
- <groupId>org.springframework.amqpgroupId>
- <artifactId>spring-rabbit-testartifactId>
- <scope>testscope>
- dependency>
- dependencies>
-
- <dependencyManagement>
- <dependencies>
- <dependency>
- <groupId>org.springframework.bootgroupId>
- <artifactId>spring-boot-dependenciesartifactId>
- <version>${spring-boot.version}version>
- <type>pomtype>
- <scope>importscope>
- dependency>
- dependencies>
- dependencyManagement>
-
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.pluginsgroupId>
- <artifactId>maven-compiler-pluginartifactId>
- <version>3.8.1version>
- <configuration>
- <source>1.8source>
- <target>1.8target>
- <encoding>UTF-8encoding>
- configuration>
- plugin>
- <plugin>
- <groupId>org.springframework.bootgroupId>
- <artifactId>spring-boot-maven-pluginartifactId>
- <version>2.3.7.RELEASEversion>
- <configuration>
- <mainClass>com.wsjzzcbq.RabbitmqDemoApplicationmainClass>
- configuration>
- <executions>
- <execution>
- <id>repackageid>
- <goals>
- <goal>repackagegoal>
- goals>
- execution>
- executions>
- plugin>
- plugins>
- build>
-
- project>
application.properties内容
- # 应用名称
- spring.application.name=rabbitmq-demo
- # 应用服务 WEB 访问端口
- server.port=8080
-
- spring.rabbitmq.addresses=192.168.5.25
- spring.rabbitmq.username=guest
- spring.rabbitmq.password=guest
- spring.rabbitmq.virtual-host=/
- spring.rabbitmq.publisher-confirm-type=correlated
- spring.rabbitmq.publisher-returns=true
- spring.rabbitmq.template.mandatory=true
- spring.rabbitmq.connection-timeout=5000
-
- spring.rabbitmq.listener.simple.acknowledge-mode=manual
- spring.rabbitmq.listener.simple.concurrency=1
- spring.rabbitmq.listener.simple.max-concurrency=5
启动类 RabbitmqDemoApplication 内容
- package com.wsjzzcbq;
-
- import org.springframework.boot.SpringApplication;
- import org.springframework.boot.autoconfigure.SpringBootApplication;
-
- @SpringBootApplication
- public class RabbitmqDemoApplication {
-
- public static void main(String[] args) {
- SpringApplication.run(RabbitmqDemoApplication.class, args);
- }
-
- }
RabbitConsumer 消费者内容
- package com.wsjzzcbq.rabbit;
-
- import com.rabbitmq.client.Channel;
- import org.springframework.amqp.rabbit.annotation.*;
- import org.springframework.amqp.support.AmqpHeaders;
- import org.springframework.messaging.Message;
- import org.springframework.stereotype.Component;
- import java.io.IOException;
- import java.time.LocalDateTime;
- import java.time.format.DateTimeFormatter;
-
- /**
- * RabbitConsumer
- *
- * @author wsjz
- * @date 2022/10/26
- */
- @Component
- public class RabbitConsumer {
-
- /**
- * 注解会自动创建交换机队列及其绑定
- * @param message
- * @param channel
- * @throws IOException
- */
- @RabbitListener(bindings = @QueueBinding(
- value=@Queue(value="queue-poetry",durable="true"),
- exchange=@Exchange(
- value="exchange-delayed",
- arguments = {@Argument(name="x-delayed-type", value = "direct")},
- type = "x-delayed-message",
- ignoreDeclarationExceptions="true"),
- key="poetry"
- ))
- public void onMessage(Message
message, Channel channel) throws IOException { - String name = (String) message.getHeaders().get("name");
- System.out.println(name);
- Long deliveryTag = (Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
- //手工签收
- channel.basicAck(deliveryTag, false);
-
- String now = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
- System.out.println("消息已接收" + now + ":" + message.getPayload());
- }
- }
RabbitProducer 生产者内容
- package com.wsjzzcbq.rabbit;
-
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.messaging.Message;
- import org.springframework.messaging.MessageHeaders;
- import org.springframework.messaging.support.MessageBuilder;
- import org.springframework.stereotype.Component;
- import java.time.LocalDateTime;
- import java.time.format.DateTimeFormatter;
- import java.util.Map;
-
- /**
- * RabbitProducer
- *
- * @author wsjz
- * @date 2022/10/26
- */
- @Component
- public class RabbitProducer {
-
- @Autowired
- private RabbitTemplate rabbitTemplate;
-
- /**
- * 发送消息
- * @param msg
- * @param properties
- * @param millisecond
- */
- public void sendMessage(Object msg, Map
properties, int millisecond) { - MessageHeaders messageHeaders = new MessageHeaders(properties);
- Message content = MessageBuilder.createMessage(msg, messageHeaders);
-
- rabbitTemplate.convertAndSend("exchange-delayed", "poetry", content, (message)->{
- message.getMessageProperties().setHeader("x-delay", millisecond);
- return message;
- });
-
- String now = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
- System.out.println("消息已发送" + now + ":" + msg);
- }
-
- }
DemoController 内容
- package com.wsjzzcbq.controller;
-
- import com.wsjzzcbq.rabbit.RabbitProducer;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.web.bind.annotation.RequestMapping;
- import org.springframework.web.bind.annotation.RestController;
- import java.util.HashMap;
- import java.util.Map;
-
- /**
- * DemoController
- *
- * @author wsjz
- * @date 2022/10/26
- */
- @RestController
- public class DemoController {
-
- @Autowired
- private RabbitProducer producer;
-
- @RequestMapping("/send")
- public String send() {
- Map
map = new HashMap<>(); - map.put("name", "花月吟.唐伯虎");
- //消息发送,延时5秒钟
- producer.sendMessage("如此好花如此月, 莫将花月作寻常", map, 5 * 1000);
- return "ok";
- }
- }
启动项目,会自动创建交换机和队列,进入管理页面查看交换机 exchange-delayed
运行测试
浏览器访问:http://localhost:8080/send
看控制台打印时间间隔
消费者端延时5秒钟收到消息
至此完