• ActiveMQ


    ActiveMQ简介

    消息中间件应用场景
    异步处理
    应用解耦
    流量削锋
    异步处理
    场景说明:用户注册,需要执行三个业务逻辑,分别为写入用户表,发注册邮件以及注册短信。
    串行方式
    将注册信息写入数据库成功后,发送注册邮件,再发送注册短信。以上三个任务全部完成后,返回给客户端。
    并行方式
    将注册信息写入数据库成功后,发送注册邮件的同时,发送注册短信。以上三个任务完成后,返回给客户端。与串行的差别是,并行的方式可以提高处理的时间
    异步处理
    引入消息中间件,将部分的业务逻辑,进行异步处理。改造后的架构如下:
    按照以上约定,用户的响应时间相当于是注册信息写入数据库的时间,也就是 50 毫秒。注册邮件,发送短信写入消息队列后,直接返回,因此写入消息队列的速度很快,基本可以忽略,因此用户的响应时间可能是50 毫秒。因此架构改变后,系统的吞吐量提高啦,比串行提高了 3 倍,比并行提高了两倍。
    应用解耦
    场景说明:用户下单后,订单系统需要通知库存系统。传统的做法是,订单系统调用库存系统的接口。如下图
    传统模式的缺点:假如库存系统无法访问,则订单减库存将失败,从而导致订单失败,订单系统与库存系统耦合。如何解决以上问题呢?引入应用消息队列后的方案,如下图:
    订单系统:用户下单后,订单系统完成持久化处理,将消息写入消息队列,返回用户订单下单成功库存系统:订阅下单的消息,采用拉/ 推的方式,获取下单信息,库存系统根据下单信息,进行库存操作假 如:在下单时库存系统不能正常使用。也不影响正常下单,因为下单后,订单系统写入消息队列就不再关心其他的后续操作了。实现订单系统与库存系统的应用解耦。
    流量消峰
    流量削锋也是消息队列中的常用场景,一般在秒杀或团抢活动中使用广泛。应用场景:秒杀活动,一般会因为流量过大,导致流量暴增,应用挂掉。为解决这个问题,一般需要在应用前端加入消息队列。通过加入消息队列完成如下功能:
    a 、可以控制活动的人数
    b 、可以缓解短时间内高流量压垮应用
    用户的请求,服务器接收后,首先写入消息队列。假如消息队列长度超过最大数量,则直接抛弃用户请求或跳转到错误页面。秒杀业务根据消息队列中的请求信息,再做后续处理
    常见的消息中间件产品对比
    ActiveMQ 简介及 JMS
    什么是 ActiveMQ
    官网: http://activemq.apache.org/
    ActiveMQ Apache 出品,最流行的,能力强劲的开源消息总线。 ActiveMQ 是一个完全支持 JMS1.1 和J2EE 1.4规范的 JMS Provider 实现。我们在本次课程中介绍 ActiveMQ的使用。
    什么是 JMS
    JMS 消息模型
    消息中间件一般有两种传递模式:点对点模式 (P2P) 和发布 - 订阅模式 (Pub/Sub)
    (1) P2P (Point to Point) 点对点模型( Queue 队列模型)
    (2) Publish/Subscribe(Pub/Sub) 发布 / 订阅模型 (Topic 主题模型 )
    点对点模型
    点对点模型( Pointer-to-Pointer ):即生产者和消费者之间的消息往来。
    每个消息都被发送到特定的消息队列,接收者从队列中获取消息。队列保留着消息,直到他们被消费或超时。
    点对点模型的特点:
    • 每个消息只有一个消费者(Consumer(即一旦被消费,消息就不再在消息队列中)
    • 发送者和接收者之间在时间上没有依赖性,也就是说当发送者发送了消息之后,不管接收者有没有正在运行,它不会影响到消息被发送到队列;
    • 接收者在成功接收消息之后需向队列应答成功
    发布 / 订阅模型
    发布 / 订阅( Publish-Subscribe
    包含三个角色:主题( Topic ),发布者( Publisher ),订阅者( Subscriber ),多个发布者将消息发送到topic ,系统将这些消息投递到订阅此 topic 的订阅者
    发布者发送到 topic 的消息,只有订阅了 topic 的订阅者才会收到消息。 topic 实现了发布和订阅,当你发布一个消息,所有订阅这个topic 的服务都能得到这个消息,所以从 1 N 个订阅者都能得到这个消息的拷贝。
    发布 / 订阅模型的特点:
    每个消息可以有多个消费者;
    发布者和订阅者之间有时间上的依赖性(先订阅主题,再来发送消息)。
    订阅者必须保持运行的状态,才能接受发布者发布的消息;
    JMS 编程 API
    1 ConnectionFactory
    创建 Connection 对象的工厂,针对两种不同的 jms 消息模型,分别有 QueueConnectionFactory 和TopicConnectionFactory两种。
    2 Destination
    Destination 的意思是消息生产者的消息发送目标或者说消息消费者的消息来源。对于消息生产者来说,它的Destination 是某个队列( Queue )或某个主题( Topic ; 对于消息消费者来说,它的Destination也是某个队列或主题(即消息来源)。所以, Destination 实际上就是两种类型的对象:Queue、 Topic
    3 Connection
    Connection 表示在客户端和 JMS 系统之间建立的链接(对 TCP/IP socket 的包装)。 Connection 可以产生一个或多个Session
    4 Session
    Session 是我们对消息进行操作的接口,可以通过 session 创建生产者、消费者、消息等。 Session 提供了事务的功能,如果需要使用session 发送 / 接收多个消息时,可以将这些发送 / 接收动作放到一个事务 中。
    5 Producter
    Producter (消息生产者):消息生产者由 Session 创建,并用于将消息发送到 Destination 。同样,消息生产者分两种类型:QueueSender TopicPublisher 。可以调用消息生产者的方法( send publish方法)发送消息。
    6 Consumer
    Consumer (消息消费者):消息消费者由 Session 创建,用于接收被发送到 Destination 的消息。两种 类型:QueueReceiver TopicSubscriber 。可分别通过 session createReceiver(Queue) 或 createSubscriber(Topic)来创建。当然,也可以 session creatDurableSubscriber 方法来创建持久化的订阅者。
    7 MessageListener
    消息监听器。如果注册了消息监听器,一旦消息到达,将自动调用监听器的 onMessage 方法。 EJB 中的MDB( Message-Driven Bean )就是一种 MessageListener

    ActiveMQ安装

    第一步:安装 jdk (略)
    第二步:把 activemq 的压缩包( apache-activemq-5.14.5-bin.tar.gz )上传到 linux 系统
    第三步:解压缩压缩包 tar -zxvf apache-activemq-5.14.5-bin.tar.gz
    第四步:进入 apache-activemq-5.14.5 bin 目录  cd apache-activemq-5.14.5/bin
    第五步:启动 activemq       ./activemq start (执行 2 次:第一次:生成配置信息;第二次:启动 )

    第六步:停止 activemq :     ./activemq stop
    访问
    http://192.168.12.132:8161
    页面控制台: http : //ip:8161 ( 监控 )
    请求地址: tcp : //ip:61616 java 代码访问消息中间件)
    账号: admin
    密码: admin
    1 :登陆:
    2 :点击 Queues 队列或者 Topics 主题消息
    列表各列信息含义如下:
    Number Of Pending Messages :等待消费的消息 这个是当前未出队列的数量。
    Number Of Consumers :消费者 这个是消费者端的消费者数量
    Messages Enqueued :进入队列的消息 进入队列的总数量 , 包括出队列的。
    Messages Dequeued :出了队列的消息 可以理解为是消费这消费掉的数量。

    原生JMS API操作ActiveMQ

    PTP 模式 ( 生产者 )
    1 )引入坐标
    1. <dependencies>
    2. <dependency>
    3. <groupId>org.apache.activemq</groupId>
    4. <artifactId>activemq-all</artifactId>
    5. <version>5.11.2</version>
    6. </dependency>
    7. </dependencies>
    2 )编写生产消息的测试类 QueueProducer
    步骤:
    1. 创建连接工厂
    2. 创建连接
    3. 打开连接
    4. 创建 session
    5. 创建目标地址( Queue: 点对点消息, Topic :发布订阅消息)
    6. 创建消息生产者
    7. 创建消息
    8. 发送消息
    9. 释放资源
    1. package com.itheima.producer;
    2. import org.apache.activemq.ActiveMQConnectionFactory;
    3. import javax.jms.*;
    4. /**
    5. * 演示点对点模式 -- 消息生产者
    6. */
    7. public class PTP_Producer {
    8. public static void main(String[] args) throws JMSException {
    9. //1.创建连接工厂
    10. ConnectionFactoryfactory=newActiveMQConnectionFactory("tcp://192.168.66.133:61616");
    11. //2.创建连接
    12. Connection connection = factory.createConnection();
    13. //3.打开连接
    14. connection.start();
    15. //4.创建session
    16. /**
    17. * 参数一:是否开启事务操作
    18. * 参数二:消息确认机制
    19. */
    20. Session session = connection.createSession(false,
    21. Session.AUTO_ACKNOWLEDGE);
    22. //5.创建目标地址(Queue:点对点消息,Topic:发布订阅消息)
    23. Queue queue = session.createQueue("queue01");
    24. //6.创建消息生产者
    25. MessageProducer producer = session.createProducer(queue);
    26. //7.创建消息
    27. //createTextMessage: 文本类型
    28. TextMessage textMessage = session.createTextMessage("test message");
    29. //8.发送消息
    30. producer.send(textMessage);
    31. System.out.println("消息发送完成");
    32. //9.释放资源
    33. session.close();
    34. connection.close();
    35. }
    36. }
    观察发送消息的结果:
    PTP 模式 ( 消费者 )
    步骤:
    1. 创建连接工厂
    2. 创建连接
    3. 打开连接
    4. 创建 session
    5. 指定目标地址
    6. 创建消息的消费者
    7. 配置消息监听器
    1. package com.itheima.consumer;
    2. import org.apache.activemq.ActiveMQConnectionFactory;
    3. import javax.jms.*;
    4. /**
    5. * 演示点对点模式- 消息消费者(第二种方案) -- 更加推荐
    6. */
    7. public class PTP_Consumer2 {
    8. public static void main(String[] args) throws JMSException {
    9. //1.创建连接工厂
    10. ConnectionFactory factory
    11. = new ActiveMQConnectionFactory("tcp://192.168.66.133:61616");
    12. //2.创建连接
    13. Connection connection = factory.createConnection();
    14. //3.打开连接
    15. connection.start();
    16. //4.创建session
    17. Session session = connection.createSession(false,
    18. Session.AUTO_ACKNOWLEDGE);
    19. //5.指定目标地址
    20. Queue queue = session.createQueue("queue01");
    21. //6.创建消息的消费者
    22. MessageConsumer consumer = session.createConsumer(queue);
    23. //7.设置消息监听器来接收消息
    24. consumer.setMessageListener(new MessageListener() {
    25. //处理消息
    26. @Override
    27. public void onMessage(Message message) {
    28. if(message instanceof TextMessage){
    29. TextMessage textMessage = (TextMessage)message;
    30. try {
    31. System.out.println("接收的消息(2):"+textMessage.getText());
    32. } catch (JMSException e) {
    33. e.printStackTrace();
    34. }
    35. }
    36. }
    37. });
    38. //注意:在监听器的模式下千万不要关闭连接,一旦关闭,消息无法接收
    39. }
    40. }
    观察消费消息的结果:
    Pub/Sub 模式 ( 生成者 )
    1. 创建连接工厂
    2. 创建连接
    3. 打开连接
    4. 创建 session
    5. 创建目标地址( Queue: 点对点消息, Topic :发布订阅消息)
    6. 创建消息生产者
    7. 创建消息
    8. 发送消息
    9. 释放资源
    1. package cn.itcast.activemq;
    2. import org.apache.activemq.ActiveMQConnectionFactory;
    3. import javax.jms.*;
    4. /**
    5. * 主题消息,消息的发送方
    6. */
    7. public class TopicProducer {
    8. public static void main(String[] args) throws Exception {
    9. //1.创建连接工厂
    10. ConnectionFactory factory = new
    11. ActiveMQConnectionFactory("tcp://192.168.12.132:61616");
    12. //2.创建连接
    13. Connection connection = factory.createConnection();
    14. //3.打开连接
    15. connection.start();
    16. //4.创建session
    17. Session session = connection.createSession(false,
    18. Session.AUTO_ACKNOWLEDGE);
    19. //5.创建目标地址(Queue:点对点消息,Topic:发布订阅消息)
    20. Topic topic = session.createTopic("sms");
    21. //6.创建消息生产者
    22. MessageProducer producer = session.createProducer(topic);
    23. //7.创建消息
    24. TextMessage message = session.createTextMessage("发短信...");
    25. //8.发送消息
    26. producer.send(message);
    27. System.out.println("发送消息:发短信...");
    28. session.close();;
    29. connection.close();
    30. }
    31. }
    查看主题消息:
    Pub/Sub 模式 ( 消费者 )
    1. 创建连接工厂
    2. 创建连接
    3. 打开连接
    4. 创建 session
    5 指定目标地址
    6. 创建消息的消费者
    7. 配置消息监听器
    1. package cn.itcast.activemq;
    2. import org.apache.activemq.ActiveMQConnectionFactory;
    3. import javax.jms.*;
    4. /**
    5. * 主题消息,消息的消费方
    6. */
    7. public class TopicConsumer {
    8. public static void main(String[] args) throws Exception {
    9. //1.创建连接工厂
    10. ConnectionFactory factory = new
    11. ActiveMQConnectionFactory("tcp://192.168.12.132:61616");
    12. //2.创建连接
    13. Connection connection = factory.createConnection();
    14. //3.打开连接
    15. connection.start();
    16. //4.创建session
    17. Session session = connection.createSession(false,
    18. Session.AUTO_ACKNOWLEDGE);
    19. //5.创建目标地址(Queue:点对点消息,Topic:发布订阅消息)
    20. Topic topic = session.createTopic("sms");
    21. //6.创建消息的消费者
    22. MessageConsumer consumer = session.createConsumer(topic);
    23. //7.配置消息监听器
    24. consumer.setMessageListener(new MessageListener() {
    25. @Override
    26. public void onMessage(Message message) {
    27. TextMessage textMessage = (TextMessage) message;
    28. try {
    29. System.out.println("消费消息:" + textMessage.getText());
    30. } catch (JMSException e) {
    31. e.printStackTrace();
    32. }
    33. }
    34. });
    35. }
    36. }

    SpringBootActiveMQ整合

    生产者

    1. <dependency>
    2. <groupId>org.springframework.boot</groupId>
    3. <artifactId>spring-boot-starter-activemq</artifactId>
    4. </dependency>
    配置:
    1. server:
    2. port: 9001 #端口
    3. spring:
    4. application:
    5. name: activemq-producer # 服务名称
    6. # springboot与activemq整合配置
    7. activemq:
    8. broker-url: tcp://192.168.66.133:61616 # 连接地址
    9. user: admin # activemq用户名
    10. password: admin # activemq密码
    11. # 指定发送模式 (点对点 false , 发布订阅 true)
    12. jms:
    13. pub-sub-domain: false
    1. public class SpringBootProducer {
    2. //JmsMessagingTemplate: 用于工具类发送消息
    3. @Autowired
    4. private JmsMessagingTemplate jmsMessagingTemplate;
    5. @Test
    6. public void ptpSender(){
    7. /**
    8. * 参数一:队列的名称或主题名称
    9. * 参数二:消息内容
    10. */
    11. jmsMessagingTemplate.convertAndSend("springboot_queue","spring boot
    12. message");
    13. }
    14. }

    消费者

    1. <dependency>
    2. <groupId>org.springframework.boot</groupId>
    3. <artifactId>spring-boot-starter-activemq</artifactId>
    4. </dependency>
    配置:
    1. server:
    2. port: 9002 #端口
    3. spring:
    4. application:
    5. name: activemq-consumer # 服务名称
    6. # springboot与activemq整合配置
    7. activemq:
    8. broker-url: tcp://192.168.66.133:61616 # 连接地址
    9. user: admin # activemq用户名
    10. password: admin # activemq密码
    11. # 指定发送模式 (点对点 false , 发布订阅 true)
    12. jms:
    13. pub-sub-domain: false
    14. activemq:
    15. name: springboot_queue
    1. /**
    2. * 用于监听消息类(既可以用于队列的监听,也可以用于主题监听)
    3. */
    4. @Component // 放入IOC容器
    5. public class MsgListener {
    6. /**
    7. * 用于接收消息的方法
    8. * destination: 队列的名称或主题的名称
    9. */
    10. @JmsListener(destination = "${activemq.name}")
    11. public void receiveMessage(Message message){
    12. if(message instanceof TextMessage){
    13. TextMessage textMessage = (TextMessage)message;
    14. try {
    15. System.out.println("接收消息:"+textMessage.getText());
    16. } catch (JMSException e) {
    17. e.printStackTrace();
    18. }
    19. }
    20. }
    21. }

    ActiveMQ消息组成与高级特性

    JMS消息组成详解

    JMS 消息组成格式
    整个 JMS 协议组成结构如下:
    JMS Message 消息由三部分组成:
    1 )消息头
    2 )消息体
    3 )消息属性
    JMS 消息头
    JMS 消息头预定义了若干字段用于客户端与 JMS 提供者之间识别和发送消息,预编译头如下:
    红色 为重要的消息头
    名称
    描述
    JMSDestination
    消息发送的 Destination ,在发送过程中由提供者设置
    JMSMessageID
    唯一标识提供者发送的每一条消息。这个字段是在发送过程中由提供者设置的,客户机只能在消息发送后才能确定消息的 JMSMessageID
    JMSDeliveryMode
    消息持久化。包含值 DeliveryMode.PERSISTENT 或者
    DeliveryMode.NON_PERSISTENT
    JMSTimestamp
    提供者发送消息的时间,由提供者在发送过程中设置
    JMSExpiration
    消息失效的时间,毫秒,值 0 表明消息不会过期,默认值为 0
    JMSPriority
    消息的优先级,由提供者在发送过程中设置。优先级 0 的优先级最低,优先级 9 的优先级最高。0-4为普通消息,5-9为加急消息。ActiveMQ不保证优先级高就一定先发送,只保证了加急消息必须先于普通消息发送。默认值为4
    JMSCorrelationID
    通常用来链接响应消息与请求消息,由发送消息的 JMS 程序设置。
    JMSReplyTo
    请求程序用它来指出回复消息应发送的地方,由发送消息的 JMS 程序设置
    JMSType
    JMS 程序用它来指出消息的类型。
    JMSRedelivered
    消息的重发标志, false ,代表该消息是第一次发生, true ,代表该消息为重发消息
    不过需要注意的是,在传送消息时,消息头的值由 JMS 提供者来设置, 因此开发者使用以上
    setJMSXXX() 方法分配的值就被忽略了 ,只有以下几个值是可以由开发者设置的:
    JMSCorrelationID JMSReplyTo JMSType
    JMS 消息体
    在消息体中, JMS API 定义了五种类型的消息格式,让我们可以以不同的形式发送和接受消息,并提供了对已有消息格式的兼容。不同的消息类型如下:
    JMS 定义了五种不同的消息正文格式,以及调用的消息类型,允许你发送并接收一些不同形式的数据,提供现有消息格式的一些级别的兼容性。
    · TextMessage-- 一个字符串对象 *
    · MapMessage-- 一套名称 - 值对
    · ObjectMessage-- 一个序列化的 Java 对象 *
    · BytesMessage-- 一个字节的数据流 *
    · StreamMessage -- Java 原始值的数据流
    TextMessage:
    写出:
    1. /**
    2. * 发送TextMessage消息
    3. */
    4. @Test
    5. public void testMessage(){
    6. jmsTemplate.send(name, new MessageCreator() {
    7. @Override
    8. public Message createMessage(Session session) throws JMSException {
    9. TextMessage textMessage = session.createTextMessage("文本消息");
    10. return textMessage;
    11. }
    12. });
    13. }
    读取:
    1. /**
    2. * 接收TextMessage的方法
    3. */
    4. @JmsListener(destination = "${activemq.name}")
    5. public void receiveMessage(Message message){
    6. if(message instanceof TextMessage){
    7. TextMessage textMessage = (TextMessage)message;
    8. try {
    9. System.out.println("接收消息:"+textMessage.getText());
    10. } catch (JMSException e) {
    11. e.printStackTrace();
    12. }
    13. }
    14. }
    MapMessage:
    发送:
    1. /**
    2. * 发送MapMessage消息
    3. */
    4. @Test
    5. public void mapMessage(){
    6. jmsTemplate.send(name, new MessageCreator() {
    7. @Override
    8. public Message createMessage(Session session) throws JMSException {
    9. MapMessage mapMessage = session.createMapMessage();
    10. mapMessage.setString("name","张三");
    11. mapMessage.setInt("age",20);
    12. return mapMessage;
    13. }
    14. });
    15. }
    接收:
    1. @JmsListener(destination = "${activemq.name}")
    2. public void receiveMessage(Message message){
    3. if(message instanceof MapMessage){
    4. MapMessage mapMessage = (MapMessage)message;
    5. try {
    6. System.out.println("名称:"+mapMessage.getString("name"));
    7. System.out.println("年龄:"+mapMessage.getString("age"));
    8. } catch (JMSException e) {
    9. e.printStackTrace();
    10. }
    11. }
    12. }
    ObjectMessage:
    发送:
    1. //发送ObjectMessage消息
    2. @Test
    3. public void test2(){
    4. jmsTemplate.send(name, new MessageCreator() {
    5. @Override
    6. public Message createMessage(Session session) throws JMSException {
    7. User user = new User();
    8. user.setName("小苍");
    9. user.setAge(18);
    10. ObjectMessage objectMessage = session.createObjectMessage(user);
    11. return objectMessage;
    12. }
    13. });
    14. }
    接收:
    1. @JmsListener(destination = "${activemq.name}")
    2. public void receiveMessage(Message message){
    3. if(message instanceof ObjectMessage){
    4. ObjectMessage objectMessage = (ObjectMessage)message;
    5. try {
    6. User user = (User)objectMessage.getObject();
    7. System.out.println(user.getUsername());
    8. System.out.println(user.getPassword());
    9. } catch (JMSException e) {
    10. e.printStackTrace();
    11. }
    12. }
    13. }
    注意: ActiveMQ5.12 后 ,为了安全考虑, ActiveMQ 默认不接受自定义的序列化对象,需要将自定义的加入到受信任的列表。
    1. spring:
    2. activemq:
    3. broker-url: tcp://192.168.66.133:61616
    4. user: admin
    5. password: admin
    6. packages:
    7. trust-all: true # 添加所有包到信任列表
    BytesMessage
    写出:
    1. //发送BytesMessage消息
    2. @Test
    3. public void test3(){
    4. jmsTemplate.send(name, new MessageCreator() {
    5. @Override
    6. public Message createMessage(Session session) throws JMSException {
    7. BytesMessage bytesMessage = session.createBytesMessage();
    8. try {
    9. File file = new File("d:/spring.jpg");
    10. FileInputStream in = new FileInputStream(file);
    11. byte[] bytes = new byte[(int)file.length()];
    12. in.read(bytes);
    13. bytesMessage.writeBytes(bytes);
    14. } catch (Exception e) {
    15. e.printStackTrace();
    16. }
    17. return bytesMessage;
    18. }
    19. });
    20. }
    读取:
    1. @JmsListener(destination="${activemq.name}")
    2. public void receiveMessage(Message message) throws Exception {
    3. BytesMessage bytesMessage = (BytesMessage)message;
    4. FileOutputStream out = new FileOutputStream("d:/abc.jpg");
    5. byte[] buf = new byte[(int)bytesMessage.getBodyLength()];
    6. bytesMessage.readBytes(buf);
    7. out.write(buf);
    8. out.close();
    9. }
    StreamMessage:
    写出:
    1. //发送StreamMessage消息
    2. @Test
    3. public void test4(){
    4. jmsTemplate.send(name, new MessageCreator() {
    5. @Override
    6. public Message createMessage(Session session) throws JMSException {
    7. StreamMessage streamMessage = session.createStreamMessage();
    8. streamMessage.writeString("你好,ActiveMQ");
    9. streamMessage.writeInt(20);
    10. return streamMessage;
    11. }
    12. });
    13. }
    读取:
    1. @JmsListener(destination="${activemq.name}")
    2. public void receiveMessage(Message message) throws Exception {
    3. StreamMessage streamMessage = (StreamMessage)message;
    4. String str = streamMessage.readString();
    5. int i = streamMessage.readInt();
    6. System.out.println(str);
    7. System.out.println(i);
    8. }
    JMS 消息属性
    我们可以给消息设置自定义属性,这些属性主要是提供给应用程序的。对于实现消息过滤功能,消息属性非常有用JMS API定义了一些标准属性,这些属性在所有JMS实现中都是可用的。这些标准属性包括:
    • JMSMessageID:消息的唯一标识符
    • JMSDestination:消息的目标目的地
    • JMSDeliveryMode:消息的传递模式(持久化或非持久化)
    • JMSPriority:消息的优先级
    • JMSExpiration:消息的过期时间
    • JMSTimestamp:消息的创建时间戳
    • JMSCorrelationID:用于关联消息的唯一标识符
    • JMSReplyTo:回复消息应该发送到的目的地
    • JMSRedelivered:表示消息是否被重新传递的布尔值
    • JMSType:消息的类型
    message . setStringProperty ( "Property" , Property ); // 自定义属性
    消息持久化
    消息持久化是保证消息不丢失的重要方式!!! ActiveMQ提供了以下三种的消息存储方式:
    1. Memory 消息存储-基于内存的消息存储。
    2. 基于日志消息存储方式,KahaDBActiveMQ的默认日志存储方式,它提供了容量的提升和恢复能力。
    3. 基于JDBC的消息存储方式-数据存储于数据库(例如:MySQL)中。
    ActiveMQ 持久化机制流程图:
  • 相关阅读:
    《梦醒蝶飞:释放Excel函数与公式的力量》1.1.8认识 excel后台视图
    k8s nginx ingress 开启缓存(cache)的方法
    TCP通讯CS模式之C#设计笔记(十八)
    LeetCode【474. 一和零】
    2022年全球及中国280Ah磷酸铁锂铝壳电芯行业头部企业市场占有率及排名调研报告
    持续性能优化:确保应用保持高性能
    C/C++模板类模板与函数模板区别,以及用法详解
    Excel如何快速将表格纵向数据变成横向
    文献速递:深度学习乳腺癌诊断---基于深度学习的图像分析预测乳腺癌中H&E染色组织病理学图像的PD-L1状态
    BigDecimal精度丢失问题
  • 原文地址:https://blog.csdn.net/weixin_44680802/article/details/134428333