• SpringBoot整合消息中间件(ActiveMQ,RabbitMQ,RocketMQ,Kafka)


    消息

    消息的发送方:生产者
    消息的接收方:消费者
    同步消息:发送方发送消息到接收方,接收方有所回应后才能够进行下一次的消息发送
    异步消息:不需要接收方回应就可以进行下一步的发送

    消息队列

    什么是消息队列?
    消息队列
    当此时有很多个用户同时访问服务器,需要服务器进行操作,但此时由于操作太多服务器运转不过来,这时将非常多的操作转换成消息的格式储存器来,所有的子服务器从中获取到消息进行操作分担主服务器的压力,而这个中间存储消息的容器我们一般称为消息队列

    • 企业级应用中广泛使用的三种异步消息传递技术(实现高并发的有效处理):
    1. JMS
    2. AMQP
    3. MQTT

    JMS

    (java Message Service):一个规范,等同于JDBC规范,提供了与消息服务相关的API接口

    • JMS消息模型
    1. peer-2-peer: 点对点模型,消息发送到一个队列中,队列保存信息,队列的消息只能被一个消费者消费,或超时
    2. publish-subscribe:发布订阅模式,消息可以被多个消费者消费,生产者和消费者完全独立,不需要感知对方存在
    • JMS消息种类

    TextMessage,MapMessage, BytesMessage,StreamMessage,ObjectMessage,Message(Message只有消息头和属性)

    • 实现JMS规范的MQ

    ActiveMQ,Redis,HornetMQ,RabbitMQ,RocketMQ(RocketMQ并未完全遵守JMS规范)

    AMQP

    AMQP(advanced message queuing protocol):一种协议(高级队列协议,消息代理规范),规范了网络交换的数据格式,兼容JMS

    JMS存在一定的问题,JMS规范对对应的语言进行了规范,但若是我使用不是规范语言进行操作的时候就会出现问题,这时我们推出AMQP,这更像是一种协议,规范消息的格式,就是无论用什么语言什么环境都无所谓,它只人消息的格式

    优点:跨平台性,服务器供应商,生产者,消费者可以使用不同的语言来实现

    • AMQP的消息模型

    direct exchange,fanout exchange,topic exchange,headers exchange,system exchange

    AMQP的消息种类:byte[]

    • 实现AMQP的MQ:

    RabbitMQ,StormMQ,RocketMQ

    MQTT

    (Message Queueing Telemetry Transport)消息队列遥测传输,专为小设备设计,是物联网(IOT)生态系统中主要成分之一

    Kafka

    kafka,一种高吞吐量的分布式订阅消息系统,提供实时消息功能

    Spring整合消息队列

    模拟消息队列的工作流程

    模拟消息队列的处理过程

    import java.util.ArrayList;
    
    @Service
    public class Messageservice implements MessageService {
        private ArrayList<String> megList=new ArrayList<String>();
    
        @Override
        public void sendMessage(String id) {
            System.out.println("将待发送的消息订单纳入到处理队列.id:"+id);
            megList.add(id);
        }
    
        @Override
        public String doMessage() {
            String remove = megList.remove(0);
            System.out.println("已完成短信业务的发送,id:"+remove);
            return remove;
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    模拟将消息导入到消息队列

    @Service
    public class orderserviceimpl implements orderService {
        @Autowired
        private MessageService messageService;
        @Override
        public void order(String id) {
            //发送消息队列
            messageService.sendMessage(id);
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    Spring整合ActiveMQ

    首先下载activeMQ
    下载地址:https://activemq.apache.org/components/classic/download/
    下载之后进行解压缩

    • 启动服务

    打开x64的bin目录下执行activemq.bat命令启动服务
    在这里插入图片描述
    在这里插入图片描述
    启动成功 ,其中给出其web控制台的访问地址:
    在这里插入图片描述
    进入其管理界面:
    在这里插入图片描述
    默认用户名&密码:admin

    • SpringBoot进行整合activemq

    添加依赖:

            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-activemq</artifactId>
            </dependency>
    
    • 1
    • 2
    • 3
    • 4

    添加配置:
    配置Spirng连接的地址,以及后边消息存入的位置

    server:
      port: 80
    spring:
      activemq:
      # 说明spring连接的active的端口地址
        broker-url: tcp://localhost:61616
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    进行消息队列的操作:

    
    @Service
    public class Messageservice implements MessageService {
        @Autowired
        private JmsMessagingTemplate jmsMessagingTemplate;
        @Override
        public void sendMessage(String id) {
            System.out.println("将待发送的消息订单纳入到处理队列.id:"+id);
            jmsMessagingTemplate.convertAndSend(id);
        }
        @Override
        public String doMessage() {
            //将消息队列中的类型转移出来,并在参数中规定转移出来的消息类型
            String s = jmsMessagingTemplate.receiveAndConvert(String.class);
            System.out.println("已完成短信业务的发送,id:"+s);
            return s;
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    在发送和获取期间也可以规定名称

     jmsMessagingTemplate.convertAndSend("order.shishi.id",id);
     String s = jmsMessagingTemplate.receiveAndConvert("order.shishi.id",String.class);
    
    • 1
    • 2

    上述之中也有一个小问题,就是在并不是每次消费都需要进行访问,而是当消息队列中有消息就开始消费我们可以创建一个Listener

    
    @Component
    public class MessageListener {
        @JmsListener(destination = "order.shishi.id")
        public void receive(String id){
            System.out.println("已完成的短信业务:id:"+id);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    这样就自动监听指定位置下的消息,一有消息就自动开始消费,从服务开始就一直存在
    还有一个消息转发的操作:

    @Component
    public class MessageListener {
        @JmsListener(destination = "order.shishi.id")
        @SendTo("order.bushi.id")
        public void receive(String id){
            System.out.println("已完成的短信业务:id:"+id);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    注解 @SendTo的作用是将监听到的消息消费之后将返回值返回到对应的消息中去
    上述使用的都是点对点的模型,如果要使用发布订阅的模型,可以在配置文件中进行配置:

    spring:
      activemq:
        broker-url: tcp://localhost:61616
      jms:
        template:
          default-destination: shishi
        pub-sub-domain: true
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    Spring整合RabbitMQ

    rabbitMQ基于Erlang语言编写,需要安装Erlang
    首先需要下载Erlang:
    下载地址:https://www.erlang.org/downloads
    下载完成之需要重启一下操作系统(重启电脑)
    配置环境变量
    在这里插入图片描述
    添加path:
    在这里插入图片描述
    安装完成后下载RabbitMQ
    下载地址:https://rabbitmq.com/install-windows.html

    • 启动rabbitMQ

    在这里插入图片描述
    注意:要启动rabbitMQ服务需要命令行进入到管理员身份运行
    rabbitMQ的控制台界面(需要手动配置插件):
    在sbin目录下找到:rabbitmq:plugins.bat命令
    在这里插入图片描述
    执行命令展示其插件列表,通过命令开启插件
    在这里插入图片描述
    这样就可以访问它的控制台界面,端口号是15672,地址:http://localhost:15672
    在这里插入图片描述
    输入默认密码:guest

    Spring进行整合rabbitMQ首先添加依赖:

            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-amqp</artifactId>
            </dependency>
    
    • 1
    • 2
    • 3
    • 4

    在配置文件中进行rabbits的配置:

    spring:
     activemq:
       broker-url: tcp://localhost:61616
     jms:
       template:
         default-destination: shishi
       pub-sub-domain: true
     rabbitmq:
       host: localhost
       port: 5672
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    直连交换机模式

    使用直连模式的交换机进行消息队列的开发:
    首先需要在配置类中进行直连交换机与消息队列的绑定

    
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.DirectExchange;
    import org.springframework.amqp.core.Queue;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    @Configuration
    public class ConfigQM {
       @Bean
       public Queue directQueue(){
           //第一个是消息队列的名称,第一个true表示消息持久化,第二个表示当前的消息队列是否是连接专用(连接一关消息队列就关闭),第三个参数是是否删除(当消费者生产者都不使用就删除)
           return new Queue("direct_queue",true,true,true);
       }
       //我们需要一个交换机去绑定消息队列,此处设置一个交换机
       @Bean
       public DirectExchange directExchange(){
           return new DirectExchange("directexchange");
       }
       @Bean
       public Binding binding(){
           //将消息队列与交换机进行绑定
           return BindingBuilder.bind(directQueue()).to(directExchange()).with("direct");
       }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    绑定之后通过直连交换机进行消息的存储

    @Service
    public class amqpservice implements MessageService{
    
       @Autowired
       private AmqpTemplate amqpTemplate;
       @Override
       public void sendMessage(String id) {
           //使用直连交换机
           amqpTemplate.convertAndSend("directExchange","direct",id);
       }
       @Override
       public String doMessage() {
           return null;
       }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    然后从消息队列中读取消息写在rabbitMQ监听器下面:

    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    @Component
    public class RabbitMQListener {
       @RabbitListener(queues = "direct_queue")
       public void reveive(String id){
           System.out.println("已完成短信发送业务 id:"+id);
       }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    主题交换机模式

    主题交换机可以模糊设置交换机绑定的名称来达到分发的目的
    例如:

        @Bean
       public Binding binding(){
           //将消息队列与交换机进行绑定
           return BindingBuilder.bind(directQueue()).to(directExchange()).with("topic_*_id");
       }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    在消息进入消息队列的时候:

            amqpTemplate.convertAndSend("directExchange","topic_ni_id",id);
            amqpTemplate.convertAndSend("directExchange","topic_bu_id",id);
    
    • 1
    • 2

    这个两种消息都可以进入到消息队列中去,而且通过这种方式也可以使消息进入到不同的消息队列中去

    • 绑定案件的规则:

    *(星号):用来表示一个单词,且该单词必须出现
    #(井号):用来表示任意数量
    在这里插入图片描述

    Spring整合RocketMQ

    下载地址:https://rocketmq.apache.org/
    默认服务端口:9876
    配置环境变量:ROCKETMQ_HOME,PATH,NAMESER_ADDR(建议):127.0.0.1:9876

    • 命名服务器与broker

    在这里插入图片描述
    当后期的业务服务器增多时,就需要不停的进行服务器之间的连接,会变得非常繁琐,但是如果我们有一台服务器将所有的业务服务器注册进行,消费者与生产者只需要连接命名服务器即可

    • 首先启动命名服务器
      在这里插入图片描述
      双击文件启动命名服务器
      在这里插入图片描述
      然后双击mqbroker文件启动服务器:
      在这里插入图片描述
      如何测试服务器是否正常启动:
      在bin目录下启动cmd:
      在这里插入图片描述

    首先使用第一个命名生成对应的消息:
    在这里插入图片描述
    再使用第二个命令对生成的消息进行消费:
    在这里插入图片描述
    进行整合:
    首先导入依赖坐标:

            <dependency>
                <groupId>org.apache.rocketmq</groupId>
                <artifactId>rocketmq-spring-boot-starter</artifactId>
                <version>2.2.1</version>
            </dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5

    在配置文件中配置其命名服务器:
    rocketmq是与spring在同一层次下

    rocketmq:
       name-server: localhost:9876
       producer:
          group: group_rocketmq
    
    • 1
    • 2
    • 3
    • 4

    进行消息队列的相关操作:

    @Service
    public class MessageRocketmqimpl implements MessageService {
       @Autowired
       private RocketMQTemplate rocketMQTemplate;
       @Override
       public void sendMessage(String id) {
           rocketMQTemplate.convertAndSend("sdasda",id);
       }
       @Override
       public String doMessage() {
           return null;
       }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    消费者监听器:

     
    @Component
    @RocketMQMessageListener(topic = "sdasda",consumerGroup = "group_rocketmq")
    public class MessageRocketmqListener implements RocketMQListener<String> {
    
        @Override
        public void onMessage(String s) {
            System.out.println("id:"+s);
            
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    使用异步方式进行发送:

    
    @Service
    public class MessageRocketmqimpl implements MessageService {
        @Autowired
        private RocketMQTemplate rocketMQTemplate;
        @Override
        public void sendMessage(String id) {
            SendCallback callback=new SendCallback() {
                @Override
                public void onSuccess(SendResult sendResult) {
                    System.out.println("消息发送成功");
                }
    
                @Override
                public void onException(Throwable throwable) {
                    System.out.println("消息发送失败");
                }
            }
    //        rocketMQTemplate.convertAndSend("sdasda",id);
            rocketMQTemplate.asyncSend("sdasda",id,callback);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 同步发送与异步发送的区别:

    同步发送和异步发送是两种不同的消息发送方式。在同步发送中,发送线程会等待消息发送完成并收到发送结果后继续执行,而在异步发送中,发送线程不会阻塞,可以立即执行后续逻辑。选择哪种方式取决于业务需求和对消息发送结果的要求。

    Spring整合kafka

    下载地址:https://kafka.apache.org/downloads
    下载之后进行解压缩文件
    解压之后首先需要运行:zookeeper-server-start.bat文件
    这个文件相当于一个注册中心,需要先进行注册才能够启动kafka服务器,作用相当于RocketMQ中的命名服务器,需要在对应目录下cmd命令携带参数进行启动:
    在这里插入图片描述
    启动注册服务器后,然后开启kafka服务器:
    在这里插入图片描述
    在这里插入图片描述
    spring进行整合kafka:
    导入依赖坐标:

            <dependency>
                <groupId>org.springframework.kafka</groupId>
                <artifactId>spring-kafka</artifactId>
            </dependency>
    
    • 1
    • 2
    • 3
    • 4

    在配置文件中进行配置,配置注册服务器的地址:

      kafka:
        bootstrap-servers: localhost:9092
        consumer:
          group-id: order
    
    • 1
    • 2
    • 3
    • 4
    @Service
    public class kafka implements MessageService {
        @Autowired
        private KafkaTemplate<String,String> kafkaTemplate;
        @Override
        public void sendMessage(String id) {
            kafkaTemplate.send("adad",id);
        }
    
        @Override
        public String doMessage() {
            return null;
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    创建消费者监听器:

    
    @Component
    public class kafkaListener {
        @KafkaListener(topics = "adad")
        public void onMessage(ConsumerRecord<String,String> consumerRecord){
            System.out.println("id:"+consumerRecord.value());
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
  • 相关阅读:
    AI推介-大语言模型LLMs论文速览(arXiv方向):2024.03.01-2024.03.05
    Linux环境配置jdk
    [数据结构]实现双向链表
    详解127.0.0.1和localhost的关系和区别
    这些阻碍程序员升职加薪的行为,你中招了几个?
    Java基础- StringBuilder & StringBuffer
    学习WiFi,怎么入手?
    【AI 测试】分词器
    华清远见11.2
    Django之图谱查询与标注平台
  • 原文地址:https://blog.csdn.net/weixin_62513677/article/details/137719014