• ActiveMQ(二)


    目录

    八:Spring整合Activemq

    生产者的实现:

    消费者的实现:

    设置监听器

    九:Springboot整合ActiveMQ

    1.queue中实现生产者和消费者

    2.topic中实现生产者和消费者

    十:ActiveMQ的传输协议

    1.各协议介绍

    2.配置协议

    这里我们的要求是进行配置一个NIO协议

    3.使用auto+协议

    十一:ActiveMQ的持久化机制

    十二:ActiveMQ集群搭建

    十三:ActiveMQ的高级特性

    1.同步投递和异步投递

    区别:

    2.异步投递的实现

    3.延时投递

    4.消息的重试机制

    重试机制流程分析:

    5.死信队列

    6.幂等性消费


    八:Spring整合Activemq

    生产者与消费者的依赖:

    
        
        
            org.apache.activemq
            activemq-all
            5.17.0
        
        
            com.fasterxml.jackson.core
            jackson-databind
            2.13.3
        
        
        
            org.apache.activemq
            activemq-pool
            5.17.0
        
        
        
            org.springframework
            spring-jms
            5.3.22
        
        
        
            org.apache.xbean
            xbean-spring
            3.16
        
        
            org.springframework
            spring-aop
            5.3.6
        
        
        
            org.springframework
            spring-core
            5.3.22
        
        
            org.springframework
            spring-context
            5.3.20
        
        
            org.springframework
            spring-aop
            5.3.6
        
        
            org.springframework
            spring-orm
            5.3.22
        
        
            org.springframework
            spring-test
            5.3.6
        
        
        
            junit
            junit
            4.12
            test
        
    

    生产者与消费者共同的Spring配置文件:

    
    
    
        
    
        
            
    
                
                    
                
            
            
        
    
        
            
        
    
        
            
        
    
        
    
            
    
            
    
            
                
            
        
    

    生产者的实现:

    public class Producer   {
        @Autowired
        private JmsTemplate jmsTemplate ;
        /**
         * 发送消息
         */
        public void send(String message) {
            jmsTemplate.send(new MessageCreator() {
                @Override
                public Message createMessage(Session session) throws JMSException {
                    TextMessage textMessage = session.createTextMessage(message) ;
                    return textMessage ;
                }
            });
        }
    }

    消费者的实现:

    @RunWith(SpringJUnit4ClassRunner.class)
    @ContextConfiguration("classpath:spring-activemq.xml")
    public class TestConsumer {
    ​
        @Autowired
        private JmsTemplate jmsTemplate ;
    ​
        @Test
        public void testReceive() {
            while (true) {
                //receiveAndConvert()表示接收到消息并且获取到内容
                String context = (String) jmsTemplate.receiveAndConvert();
                System.out.println("接收到的消息为:" + context);
            }
        }
    }

    设置监听器

    public class Consumer implements MessageListener {
        /**
         * 监听到消息后,对消息的处理
         * @param message
         */
        @Override
        public void onMessage(Message message) {
            if (message instanceof TextMessage) {
                TextMessage textMessage = (TextMessage) message ;
                try {
                    System.out.println(textMessage.getText());
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    九:Springboot整合ActiveMQ

    1.queue中实现生产者和消费者

    1.引入相同的依赖

    2.生产者的编写

    配置类:

    制造类:

    application.yml :

    3.消费者的编写

    2.topic中实现生产者和消费者

    生产者:

    1.

    2.

    消费者:

    十:ActiveMQ的传输协议

    1.各协议介绍

    2.配置协议

    这里我们的要求是进行配置一个NIO协议

    1.

    2.所以我们要进行配置一个NIO协议

    3.

    3.使用auto+协议

    代码演示:

    1.在activemq.xml配置文件中只进行配置这一个auto+nio

    2.

    十一:ActiveMQ的持久化机制

    十二:ActiveMQ集群搭建

    当master主机宕机之后,某一台slave会变为master主机

    总结:

    (1)集群中的三台broker共享一个kahadb文件系统。也就是说虽然broker有三台,但是持久化的日志文件只有一份。

    (2)集群中只有master身份的broker负责对消息进行处理,也就是说对于整个集群,只有master能够提供对外服务,接收消息和发送消息

    的服务

    (3)当master出现宕机,此时会在slave从机中自动选出新的master主机,对外进行提供服务,实现对activemq的高可用

    (4)针对于jdbc的高可用,除了将持久化方案改为jdbc外,还可以进行配置mysql的主从集群,实现mysql的高可用

    十三:ActiveMQ的高级特性

    1.同步投递和异步投递

    同步投递:

    (1)生产者投递一个Message给到broker,在生产者接收到broker返回的ack确认之前,都会一直阻塞而不会执行下面的一系列逻辑操作

    (2)当生产者接收到broker的ack确认机制之后,会停止阻塞而执行之后的逻辑

    异步投递:

    (1) 当生产者进行投递一个Message给到broker,投递完成之后,不需要broker返回ack确认,即可被视为成功 然后执行之后的逻辑

    区别:

    (1)ActiveMQ支持同步和异步两种方式进行将消息发送到broker。

    (2)同步和异步的主要区别为:发送Message的效率高低以及是否成功可靠投递

    对于同步发送来说:

    1.不会丢失消息,消息的可靠性高。因为对于同步向broker发送Message来说,我们需要broker进行返回一个ack表示确认。

    2.但是相对异步发送Message效率较低,需要阻塞进行broker确认。

    对于异步发送来说:

    1.无需broker返回ack确认,因此效率较高。

    2.但是有可能丢失Message数据。分析:生产者发送一个Message给broker,对于生产者而言,只要消息发送出去那么就算是

    发送成功。但是有可能broker发生网络动荡,那么第一个Message就接收不成功。但是对于生产者而言 第一个Message已经发送成功,

    再向broker发送时,是进行发送第二个Message。此时就会导致丢失了第一个Message数据

    (3)ActiveMQ默认使用的是异步发送

    2.异步投递的实现

    (1)对于ActiveMQ中的异步投递,我们使用的是回调方法机制

    (2) 生产者进行投递Message给broker,投递出去之后,不阻塞,继续执行之后的逻辑

    (3) 当broker成功接收到了Message时,那么就回调onSuccess方法。当broker没有接收到Message,就会回调onException方法

    代码实现:

    public class ProducerDemo01 {
    ​
        //指明activemq的地址
        public static final String URL = "tcp://192.168.204.134:61616" ;
        //指明destination目的地
        public static final String QUEUE_NAME = "my_queue_1" ;
    ​
        public static void main(String[] args) throws Exception{
            //1.获得连接工厂
            ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL) ;
            //设置异步投递的方式
            connectionFactory.setUseAsyncSend(true);
            //2.获得连接对象
            Connection connection = connectionFactory.createConnection();
            //3.开启连接
            connection.start();
            //4.从连接对象中获得一个Session会话,该Session是MQ与消息生产者之间开启的会话
            Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE) ;
            //5.创建队列对象
            Queue queue = session.createQueue(QUEUE_NAME) ;
            // 6.创建ActiveMQMessageProducer对象
            ActiveMQMessageProducer producer = (ActiveMQMessageProducer) session.createProducer(queue) ;
            for (int i = 0;i<3;i++) {
                TextMessage message = session.createTextMessage("hello" + i) ;
                producer.send(message, new AsyncCallback() {
                    //生产者发送Message,broker确认成功之后回调的方法
                    @Override
                    public void onSuccess() {
                        System.out.println("发送成功");
                    }
                    //生产者发送Message,broker确认失败之后回调的方法
                    @Override
                    public void onException(JMSException e) {
                        System.out.println("发送失败");
                    }
                });
            }
            //9.关闭连接
            connection.close();
        }
    }

    3.延时投递

    演示:

    1.找到conf文件夹下的activemq.xml文件 并且进入修改

    2.修改保存之后,进行重启activemq服务器

    3.

    4.消息的重试机制

    重试机制流程分析:

    (1) 生产者投递一个Message到broker所在的队列queue或topic中

    (2)消费者进行消费Message的时候,不进行开启事务,但是一直不进行手动commit提交。所以一条Message会一直存留在broker对应的队列当中从而重复被进行消费读取。

    (3) 但是消息也具有重试机制,重试机制默认的阙值为6,也就是最多被重试消费6次,也就是一条Message最多被消费7次。

    (4) 当被消费满7次,也就是达到阈值6时,但是还没有被打包为事务进行手动提交时,消费者就会发送一个"poison ack"给broker,则

    Message就会变成一个毒消息。该Message会被从broker对应的队列拿出放到死信队列中去。

    (5) 等到消费者还想继续消费这个Message第8次时,发现broker的队列中找不到该Message。

    代码演示:

    1.如果什么也不设置,默认重试的阙值为6

    2.重新进行设置阙值

    5.死信队列

    死信队列不是说访问不了的队列,反而可以结合这个队列进行开展一些业务。

    eg:订单超过支付时间应该被取消

    (1) 上游创建订单时,将消息发送到等待支付的队列

    (2) 消息的超时时间为30分钟,如果三十分钟还没有支付,则该消息进入到死信队列

    (3) 对于死信队列也配有专门的消费者来处理死信队列的消息,处理方案即是把死信队列中的订单状态改为"已取消"

    演示开启死信队列的步骤:

    1.编辑activemq.xml配置文件

    2.生产者:要确定broker对应死信队列的名字:DLQ.*

    3.

    6.幂等性消费

    (1) 当在一些业务场景出现非幂等性情况

    (2) 我们进行用户的注册,由于出现了网络动荡的问题,因此用户点击了多次,所以向broker发送了多条关于同一个的用户id的Message

    (3)消费者进行消费接收broker中的Message,然后存储到MySql数据库中。但是同一个用户 相同的id怎么可以存储多份呢?

    (4) 为了解决这个问题,保证幂等性,有两种方法:1.mysql中进行插入业务id作为主键,主键是唯一的,一次只可以插入一条。2.使用

    redis或zookeeper的分布式锁。

    (5) 我们通常使用的是redis的分布式锁,因为为了尽可能避免影响Mysql数据库的性能。

    基于redis的分布式锁的实现原理:每当一个id进入之后,我们都会进行上一把以id号为唯一标识的id锁。当相同id的记录再一次过来时,

    由于已经上了该id对应的锁,所以就不可以再允许通过该id对应的记录了。所以就不会再把该id对应的记录存储到数据库中了。

  • 相关阅读:
    分布式下的 ID 实现
    STM32MP157按键中断实验
    校园二手交易小程序,微信小程序二手交易系统毕设作品
    Hadoop(三)通过C#/python实现Hadoop MapReduce
    前端——Vue响应式适配
    MySQL数据库查询排序
    vulhub struts2 s2-001漏洞复现
    郑泽康:一名热爱技术的“保安”
    alist网盘自动同步
    vue 基于vue-cli3 发布npm 插件
  • 原文地址:https://blog.csdn.net/m0_61784000/article/details/127131768