• docker安装RabbitMQ和延时插件


    RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)

    使用 docker安装 RabbitMQ和延时插件,实现消息延时消费

    目录

    1、docker 安装 RabbitMQ

    2、安装延时插件 

    3、测试延时消息


    1、docker 安装 RabbitMQ

    docker拉取镜像

    docker pull rabbitmq:3.10-management

    开启容器

    docker run -it --name rabbitmq -p 5672:5672 -p 15672:15672 -d rabbitmq:3.10-management

    开启容器后,浏览器访问宿主机器 ip+15672端口,访问RabbitMQ管理页面

    笔者的宿主机器 ip 是 192.168.5.25

    初始账号密码都是guest 

    2、安装延时插件 

    下载插件地址:https://www.rabbitmq.com/community-plugins.html

    找到 rabbitmq_delayed_message_exchange

    点击进入下载页面

    找到对应的版本进行下载

    下载插件后,将插件上传到服务器

    使用 docker 命令将插件复制到容器内部 plugins目录下

    docker cp rabbitmq_delayed_message_exchange-3.10.0.ez  rabbitmq:/plugins

    进入容器内部进行查看

    docker exec -it rabbitmq bash

     进入 plugins 目录查看

    cd plugins

    执行命令安装插件

    rabbitmq-plugins enable rabbitmq_delayed_message_exchange

    通过命令查看已安装的插件

    rabbitmq-plugins list

     

    退出容器后,重启 rabbitmq 容器

    docker restart rabbitmq

     

    重启完成后,进入管理页面,点击交换机 Exchange,点开 Add a new exchange,查看交换机类型 Type,发现里面新增了 x-delayed-message 类型,则延时插件安装成功

    3、测试延时消息

    新建 SpringBoot 项目

    pom.xml内容

    1. "1.0" encoding="UTF-8"?>
    2. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    3. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    4. <modelVersion>4.0.0modelVersion>
    5. <groupId>com.wsjzzcbqgroupId>
    6. <artifactId>rabbitmq-demoartifactId>
    7. <version>0.0.1-SNAPSHOTversion>
    8. <name>rabbitmq-demoname>
    9. <description>Demo project for Spring Bootdescription>
    10. <properties>
    11. <java.version>1.8java.version>
    12. <project.build.sourceEncoding>UTF-8project.build.sourceEncoding>
    13. <project.reporting.outputEncoding>UTF-8project.reporting.outputEncoding>
    14. <spring-boot.version>2.3.7.RELEASEspring-boot.version>
    15. properties>
    16. <dependencies>
    17. <dependency>
    18. <groupId>org.springframework.bootgroupId>
    19. <artifactId>spring-boot-starter-amqpartifactId>
    20. dependency>
    21. <dependency>
    22. <groupId>org.springframework.bootgroupId>
    23. <artifactId>spring-boot-starter-webartifactId>
    24. dependency>
    25. <dependency>
    26. <groupId>org.projectlombokgroupId>
    27. <artifactId>lombokartifactId>
    28. <optional>trueoptional>
    29. dependency>
    30. <dependency>
    31. <groupId>org.springframework.bootgroupId>
    32. <artifactId>spring-boot-starter-testartifactId>
    33. <scope>testscope>
    34. <exclusions>
    35. <exclusion>
    36. <groupId>org.junit.vintagegroupId>
    37. <artifactId>junit-vintage-engineartifactId>
    38. exclusion>
    39. exclusions>
    40. dependency>
    41. <dependency>
    42. <groupId>org.springframework.amqpgroupId>
    43. <artifactId>spring-rabbit-testartifactId>
    44. <scope>testscope>
    45. dependency>
    46. dependencies>
    47. <dependencyManagement>
    48. <dependencies>
    49. <dependency>
    50. <groupId>org.springframework.bootgroupId>
    51. <artifactId>spring-boot-dependenciesartifactId>
    52. <version>${spring-boot.version}version>
    53. <type>pomtype>
    54. <scope>importscope>
    55. dependency>
    56. dependencies>
    57. dependencyManagement>
    58. <build>
    59. <plugins>
    60. <plugin>
    61. <groupId>org.apache.maven.pluginsgroupId>
    62. <artifactId>maven-compiler-pluginartifactId>
    63. <version>3.8.1version>
    64. <configuration>
    65. <source>1.8source>
    66. <target>1.8target>
    67. <encoding>UTF-8encoding>
    68. configuration>
    69. plugin>
    70. <plugin>
    71. <groupId>org.springframework.bootgroupId>
    72. <artifactId>spring-boot-maven-pluginartifactId>
    73. <version>2.3.7.RELEASEversion>
    74. <configuration>
    75. <mainClass>com.wsjzzcbq.RabbitmqDemoApplicationmainClass>
    76. configuration>
    77. <executions>
    78. <execution>
    79. <id>repackageid>
    80. <goals>
    81. <goal>repackagegoal>
    82. goals>
    83. execution>
    84. executions>
    85. plugin>
    86. plugins>
    87. build>
    88. project>

    application.properties内容

    1. # 应用名称
    2. spring.application.name=rabbitmq-demo
    3. # 应用服务 WEB 访问端口
    4. server.port=8080
    5. spring.rabbitmq.addresses=192.168.5.25
    6. spring.rabbitmq.username=guest
    7. spring.rabbitmq.password=guest
    8. spring.rabbitmq.virtual-host=/
    9. spring.rabbitmq.publisher-confirm-type=correlated
    10. spring.rabbitmq.publisher-returns=true
    11. spring.rabbitmq.template.mandatory=true
    12. spring.rabbitmq.connection-timeout=5000
    13. spring.rabbitmq.listener.simple.acknowledge-mode=manual
    14. spring.rabbitmq.listener.simple.concurrency=1
    15. spring.rabbitmq.listener.simple.max-concurrency=5

    启动类 RabbitmqDemoApplication 内容

    1. package com.wsjzzcbq;
    2. import org.springframework.boot.SpringApplication;
    3. import org.springframework.boot.autoconfigure.SpringBootApplication;
    4. @SpringBootApplication
    5. public class RabbitmqDemoApplication {
    6. public static void main(String[] args) {
    7. SpringApplication.run(RabbitmqDemoApplication.class, args);
    8. }
    9. }

    RabbitConsumer 消费者内容

    1. package com.wsjzzcbq.rabbit;
    2. import com.rabbitmq.client.Channel;
    3. import org.springframework.amqp.rabbit.annotation.*;
    4. import org.springframework.amqp.support.AmqpHeaders;
    5. import org.springframework.messaging.Message;
    6. import org.springframework.stereotype.Component;
    7. import java.io.IOException;
    8. import java.time.LocalDateTime;
    9. import java.time.format.DateTimeFormatter;
    10. /**
    11. * RabbitConsumer
    12. *
    13. * @author wsjz
    14. * @date 2022/10/26
    15. */
    16. @Component
    17. public class RabbitConsumer {
    18. /**
    19. * 注解会自动创建交换机队列及其绑定
    20. * @param message
    21. * @param channel
    22. * @throws IOException
    23. */
    24. @RabbitListener(bindings = @QueueBinding(
    25. value=@Queue(value="queue-poetry",durable="true"),
    26. exchange=@Exchange(
    27. value="exchange-delayed",
    28. arguments = {@Argument(name="x-delayed-type", value = "direct")},
    29. type = "x-delayed-message",
    30. ignoreDeclarationExceptions="true"),
    31. key="poetry"
    32. ))
    33. public void onMessage(Message message, Channel channel) throws IOException {
    34. String name = (String) message.getHeaders().get("name");
    35. System.out.println(name);
    36. Long deliveryTag = (Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
    37. //手工签收
    38. channel.basicAck(deliveryTag, false);
    39. String now = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
    40. System.out.println("消息已接收" + now + ":" + message.getPayload());
    41. }
    42. }

    RabbitProducer 生产者内容

    1. package com.wsjzzcbq.rabbit;
    2. import org.springframework.amqp.rabbit.core.RabbitTemplate;
    3. import org.springframework.beans.factory.annotation.Autowired;
    4. import org.springframework.messaging.Message;
    5. import org.springframework.messaging.MessageHeaders;
    6. import org.springframework.messaging.support.MessageBuilder;
    7. import org.springframework.stereotype.Component;
    8. import java.time.LocalDateTime;
    9. import java.time.format.DateTimeFormatter;
    10. import java.util.Map;
    11. /**
    12. * RabbitProducer
    13. *
    14. * @author wsjz
    15. * @date 2022/10/26
    16. */
    17. @Component
    18. public class RabbitProducer {
    19. @Autowired
    20. private RabbitTemplate rabbitTemplate;
    21. /**
    22. * 发送消息
    23. * @param msg
    24. * @param properties
    25. * @param millisecond
    26. */
    27. public void sendMessage(Object msg, Map properties, int millisecond) {
    28. MessageHeaders messageHeaders = new MessageHeaders(properties);
    29. Message content = MessageBuilder.createMessage(msg, messageHeaders);
    30. rabbitTemplate.convertAndSend("exchange-delayed", "poetry", content, (message)->{
    31. message.getMessageProperties().setHeader("x-delay", millisecond);
    32. return message;
    33. });
    34. String now = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
    35. System.out.println("消息已发送" + now + ":" + msg);
    36. }
    37. }

    DemoController 内容

    1. package com.wsjzzcbq.controller;
    2. import com.wsjzzcbq.rabbit.RabbitProducer;
    3. import org.springframework.beans.factory.annotation.Autowired;
    4. import org.springframework.web.bind.annotation.RequestMapping;
    5. import org.springframework.web.bind.annotation.RestController;
    6. import java.util.HashMap;
    7. import java.util.Map;
    8. /**
    9. * DemoController
    10. *
    11. * @author wsjz
    12. * @date 2022/10/26
    13. */
    14. @RestController
    15. public class DemoController {
    16. @Autowired
    17. private RabbitProducer producer;
    18. @RequestMapping("/send")
    19. public String send() {
    20. Map map = new HashMap<>();
    21. map.put("name", "花月吟.唐伯虎");
    22. //消息发送,延时5秒钟
    23. producer.sendMessage("如此好花如此月, 莫将花月作寻常", map, 5 * 1000);
    24. return "ok";
    25. }
    26. }

    启动项目,会自动创建交换机和队列,进入管理页面查看交换机 exchange-delayed

    运行测试

    浏览器访问:http://localhost:8080/send

    看控制台打印时间间隔

    消费者端延时5秒钟收到消息

    至此完

  • 相关阅读:
    Python按照拼音顺序给数组排序
    CAP项目集成带身份和证书验证的MongoDB
    【附源码】计算机毕业设计SSM甜心驿站饮品信息管理
    python 里面对于字典进行key或value排序输出
    基于一个多分类问题比较bert单任务训练和多任务训练
    Unity Shader Graph 节点入门
    JVM调优前置知识-深堆Retained Heap和浅堆Shallow Heap
    [附源码]java毕业设计st音乐网站论文
    tensorrt: pycuda, onnx, onnxruntime, tensorrt,torch-tensorrt 安装
    Map.Entry理解和应用
  • 原文地址:https://blog.csdn.net/wsjzzcbq/article/details/127558183