目录
异步处理应用解耦流量削锋
第一步:安装 jdk (略)第二步:把 activemq 的压缩包( apache-activemq-5.14.5-bin.tar.gz )上传到 linux 系统第三步:解压缩压缩包 tar -zxvf apache-activemq-5.14.5-bin.tar.gz第四步:进入 apache-activemq-5.14.5 的 bin 目录 cd apache-activemq-5.14.5/bin第五步:启动 activemq ./activemq start (执行 2 次:第一次:生成配置信息;第二次:启动 )第六步:停止 activemq : ./activemq stop
Number Of Pending Messages :等待消费的消息 这个是当前未出队列的数量。Number Of Consumers :消费者 这个是消费者端的消费者数量Messages Enqueued :进入队列的消息 进入队列的总数量 , 包括出队列的。Messages Dequeued :出了队列的消息 可以理解为是消费这消费掉的数量。
- <dependencies>
- <dependency>
- <groupId>org.apache.activemq</groupId>
- <artifactId>activemq-all</artifactId>
- <version>5.11.2</version>
- </dependency>
- </dependencies>
1. 创建连接工厂2. 创建连接3. 打开连接4. 创建 session5. 创建目标地址( Queue: 点对点消息, Topic :发布订阅消息)6. 创建消息生产者7. 创建消息8. 发送消息9. 释放资源
- package com.itheima.producer;
- import org.apache.activemq.ActiveMQConnectionFactory;
- import javax.jms.*;
- /**
- * 演示点对点模式 -- 消息生产者
- */
- public class PTP_Producer {
- public static void main(String[] args) throws JMSException {
- //1.创建连接工厂
- ConnectionFactoryfactory=newActiveMQConnectionFactory("tcp://192.168.66.133:61616");
- //2.创建连接
- Connection connection = factory.createConnection();
- //3.打开连接
- connection.start();
- //4.创建session
- /**
- * 参数一:是否开启事务操作
- * 参数二:消息确认机制
- */
- Session session = connection.createSession(false,
- Session.AUTO_ACKNOWLEDGE);
- //5.创建目标地址(Queue:点对点消息,Topic:发布订阅消息)
- Queue queue = session.createQueue("queue01");
- //6.创建消息生产者
- MessageProducer producer = session.createProducer(queue);
- //7.创建消息
- //createTextMessage: 文本类型
- TextMessage textMessage = session.createTextMessage("test message");
- //8.发送消息
- producer.send(textMessage);
- System.out.println("消息发送完成");
- //9.释放资源
- session.close();
- connection.close();
- }
- }
1. 创建连接工厂2. 创建连接3. 打开连接4. 创建 session5. 指定目标地址6. 创建消息的消费者7. 配置消息监听器
- package com.itheima.consumer;
- import org.apache.activemq.ActiveMQConnectionFactory;
- import javax.jms.*;
- /**
- * 演示点对点模式- 消息消费者(第二种方案) -- 更加推荐
- */
- public class PTP_Consumer2 {
- public static void main(String[] args) throws JMSException {
- //1.创建连接工厂
- ConnectionFactory factory
- = new ActiveMQConnectionFactory("tcp://192.168.66.133:61616");
- //2.创建连接
- Connection connection = factory.createConnection();
- //3.打开连接
- connection.start();
- //4.创建session
- Session session = connection.createSession(false,
- Session.AUTO_ACKNOWLEDGE);
- //5.指定目标地址
- Queue queue = session.createQueue("queue01");
- //6.创建消息的消费者
- MessageConsumer consumer = session.createConsumer(queue);
- //7.设置消息监听器来接收消息
- consumer.setMessageListener(new MessageListener() {
- //处理消息
- @Override
- public void onMessage(Message message) {
- if(message instanceof TextMessage){
- TextMessage textMessage = (TextMessage)message;
- try {
- System.out.println("接收的消息(2):"+textMessage.getText());
- } catch (JMSException e) {
- e.printStackTrace();
- }
- }
- }
- });
- //注意:在监听器的模式下千万不要关闭连接,一旦关闭,消息无法接收
- }
- }
1. 创建连接工厂2. 创建连接3. 打开连接4. 创建 session5. 创建目标地址( Queue: 点对点消息, Topic :发布订阅消息)6. 创建消息生产者7. 创建消息8. 发送消息9. 释放资源
- package cn.itcast.activemq;
- import org.apache.activemq.ActiveMQConnectionFactory;
- import javax.jms.*;
- /**
- * 主题消息,消息的发送方
- */
- public class TopicProducer {
- public static void main(String[] args) throws Exception {
- //1.创建连接工厂
- ConnectionFactory factory = new
- ActiveMQConnectionFactory("tcp://192.168.12.132:61616");
- //2.创建连接
- Connection connection = factory.createConnection();
- //3.打开连接
- connection.start();
- //4.创建session
- Session session = connection.createSession(false,
- Session.AUTO_ACKNOWLEDGE);
- //5.创建目标地址(Queue:点对点消息,Topic:发布订阅消息)
- Topic topic = session.createTopic("sms");
- //6.创建消息生产者
- MessageProducer producer = session.createProducer(topic);
- //7.创建消息
- TextMessage message = session.createTextMessage("发短信...");
- //8.发送消息
- producer.send(message);
- System.out.println("发送消息:发短信...");
- session.close();;
- connection.close();
- }
- }
1. 创建连接工厂2. 创建连接3. 打开连接4. 创建 session5 指定目标地址6. 创建消息的消费者7. 配置消息监听器
- package cn.itcast.activemq;
- import org.apache.activemq.ActiveMQConnectionFactory;
- import javax.jms.*;
- /**
- * 主题消息,消息的消费方
- */
- public class TopicConsumer {
- public static void main(String[] args) throws Exception {
- //1.创建连接工厂
- ConnectionFactory factory = new
- ActiveMQConnectionFactory("tcp://192.168.12.132:61616");
- //2.创建连接
- Connection connection = factory.createConnection();
- //3.打开连接
- connection.start();
- //4.创建session
- Session session = connection.createSession(false,
- Session.AUTO_ACKNOWLEDGE);
- //5.创建目标地址(Queue:点对点消息,Topic:发布订阅消息)
- Topic topic = session.createTopic("sms");
- //6.创建消息的消费者
- MessageConsumer consumer = session.createConsumer(topic);
- //7.配置消息监听器
- consumer.setMessageListener(new MessageListener() {
- @Override
- public void onMessage(Message message) {
- TextMessage textMessage = (TextMessage) message;
- try {
- System.out.println("消费消息:" + textMessage.getText());
- } catch (JMSException e) {
- e.printStackTrace();
- }
- }
- });
- }
- }
生产者
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-activemq</artifactId>
- </dependency>
- server:
- port: 9001 #端口
- spring:
- application:
- name: activemq-producer # 服务名称
- # springboot与activemq整合配置
- activemq:
- broker-url: tcp://192.168.66.133:61616 # 连接地址
- user: admin # activemq用户名
- password: admin # activemq密码
- # 指定发送模式 (点对点 false , 发布订阅 true)
- jms:
- pub-sub-domain: false
- public class SpringBootProducer {
- //JmsMessagingTemplate: 用于工具类发送消息
- @Autowired
- private JmsMessagingTemplate jmsMessagingTemplate;
- @Test
- public void ptpSender(){
- /**
- * 参数一:队列的名称或主题名称
- * 参数二:消息内容
- */
- jmsMessagingTemplate.convertAndSend("springboot_queue","spring boot
- message");
- }
- }
消费者
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-activemq</artifactId>
- </dependency>
- server:
- port: 9002 #端口
- spring:
- application:
- name: activemq-consumer # 服务名称
- # springboot与activemq整合配置
- activemq:
- broker-url: tcp://192.168.66.133:61616 # 连接地址
- user: admin # activemq用户名
- password: admin # activemq密码
- # 指定发送模式 (点对点 false , 发布订阅 true)
- jms:
- pub-sub-domain: false
- activemq:
- name: springboot_queue
- /**
- * 用于监听消息类(既可以用于队列的监听,也可以用于主题监听)
- */
- @Component // 放入IOC容器
- public class MsgListener {
- /**
- * 用于接收消息的方法
- * destination: 队列的名称或主题的名称
- */
- @JmsListener(destination = "${activemq.name}")
- public void receiveMessage(Message message){
- if(message instanceof TextMessage){
- TextMessage textMessage = (TextMessage)message;
- try {
- System.out.println("接收消息:"+textMessage.getText());
- } catch (JMSException e) {
- e.printStackTrace();
- }
- }
- }
- }
1 )消息头2 )消息体3 )消息属性
|
名称
|
描述
|
|
JMSDestination
|
消息发送的
Destination
,在发送过程中由提供者设置
|
|
JMSMessageID
|
唯一标识提供者发送的每一条消息。这个字段是在发送过程中由提供者设置的,客户机只能在消息发送后才能确定消息的 JMSMessageID
|
|
JMSDeliveryMode
|
消息持久化。包含值
DeliveryMode.PERSISTENT
或者
DeliveryMode.NON_PERSISTENT
。
|
|
JMSTimestamp
|
提供者发送消息的时间,由提供者在发送过程中设置
|
|
JMSExpiration
|
消息失效的时间,毫秒,值
0
表明消息不会过期,默认值为
0
|
|
JMSPriority
| 消息的优先级,由提供者在发送过程中设置。优先级 0 的优先级最低,优先级 9 的优先级最高。0-4为普通消息,5-9为加急消息。ActiveMQ不保证优先级高就一定先发送,只保证了加急消息必须先于普通消息发送。默认值为4 |
|
JMSCorrelationID
|
通常用来链接响应消息与请求消息,由发送消息的
JMS
程序设置。
|
|
JMSReplyTo
|
请求程序用它来指出回复消息应发送的地方,由发送消息的
JMS
程序设置
|
|
JMSType
|
JMS
程序用它来指出消息的类型。
|
|
JMSRedelivered
|
消息的重发标志,
false
,代表该消息是第一次发生,
true
,代表该消息为重发消息
|
· TextMessage-- 一个字符串对象 *· MapMessage-- 一套名称 - 值对· ObjectMessage-- 一个序列化的 Java 对象 *· BytesMessage-- 一个字节的数据流 *· StreamMessage -- Java 原始值的数据流
- /**
- * 发送TextMessage消息
- */
- @Test
- public void testMessage(){
- jmsTemplate.send(name, new MessageCreator() {
- @Override
- public Message createMessage(Session session) throws JMSException {
- TextMessage textMessage = session.createTextMessage("文本消息");
- return textMessage;
- }
- });
- }
- /**
- * 接收TextMessage的方法
- */
- @JmsListener(destination = "${activemq.name}")
- public void receiveMessage(Message message){
- if(message instanceof TextMessage){
- TextMessage textMessage = (TextMessage)message;
- try {
- System.out.println("接收消息:"+textMessage.getText());
- } catch (JMSException e) {
- e.printStackTrace();
- }
- }
- }
- /**
- * 发送MapMessage消息
- */
- @Test
- public void mapMessage(){
- jmsTemplate.send(name, new MessageCreator() {
- @Override
- public Message createMessage(Session session) throws JMSException {
- MapMessage mapMessage = session.createMapMessage();
- mapMessage.setString("name","张三");
- mapMessage.setInt("age",20);
- return mapMessage;
- }
- });
- }
- @JmsListener(destination = "${activemq.name}")
- public void receiveMessage(Message message){
- if(message instanceof MapMessage){
- MapMessage mapMessage = (MapMessage)message;
- try {
- System.out.println("名称:"+mapMessage.getString("name"));
- System.out.println("年龄:"+mapMessage.getString("age"));
- } catch (JMSException e) {
- e.printStackTrace();
- }
- }
- }
- //发送ObjectMessage消息
- @Test
- public void test2(){
- jmsTemplate.send(name, new MessageCreator() {
- @Override
- public Message createMessage(Session session) throws JMSException {
- User user = new User();
- user.setName("小苍");
- user.setAge(18);
- ObjectMessage objectMessage = session.createObjectMessage(user);
- return objectMessage;
- }
- });
- }
- @JmsListener(destination = "${activemq.name}")
- public void receiveMessage(Message message){
- if(message instanceof ObjectMessage){
- ObjectMessage objectMessage = (ObjectMessage)message;
- try {
- User user = (User)objectMessage.getObject();
- System.out.println(user.getUsername());
- System.out.println(user.getPassword());
- } catch (JMSException e) {
- e.printStackTrace();
- }
- }
- }
- spring:
- activemq:
- broker-url: tcp://192.168.66.133:61616
- user: admin
- password: admin
- packages:
- trust-all: true # 添加所有包到信任列表
- //发送BytesMessage消息
- @Test
- public void test3(){
- jmsTemplate.send(name, new MessageCreator() {
- @Override
- public Message createMessage(Session session) throws JMSException {
- BytesMessage bytesMessage = session.createBytesMessage();
- try {
- File file = new File("d:/spring.jpg");
- FileInputStream in = new FileInputStream(file);
- byte[] bytes = new byte[(int)file.length()];
- in.read(bytes);
- bytesMessage.writeBytes(bytes);
- } catch (Exception e) {
- e.printStackTrace();
- }
- return bytesMessage;
- }
- });
- }
- @JmsListener(destination="${activemq.name}")
- public void receiveMessage(Message message) throws Exception {
- BytesMessage bytesMessage = (BytesMessage)message;
- FileOutputStream out = new FileOutputStream("d:/abc.jpg");
- byte[] buf = new byte[(int)bytesMessage.getBodyLength()];
- bytesMessage.readBytes(buf);
- out.write(buf);
- out.close();
- }
- //发送StreamMessage消息
- @Test
- public void test4(){
- jmsTemplate.send(name, new MessageCreator() {
- @Override
- public Message createMessage(Session session) throws JMSException {
- StreamMessage streamMessage = session.createStreamMessage();
- streamMessage.writeString("你好,ActiveMQ");
- streamMessage.writeInt(20);
- return streamMessage;
- }
- });
- }
- @JmsListener(destination="${activemq.name}")
- public void receiveMessage(Message message) throws Exception {
- StreamMessage streamMessage = (StreamMessage)message;
- String str = streamMessage.readString();
- int i = streamMessage.readInt();
- System.out.println(str);
- System.out.println(i);
- }
message . setStringProperty ( "Property" , Property ); // 自定义属性