我们学习使用activemq的时候有时候不知道怎么入手这个时候不妨看看下面的demo帮助你学习交流使用。
首先 :
安装启动activemq登录到http://127.0.0.1:8161/index.html
之后建立maven项目,添加maven依赖
- <dependency>
- <groupId>org.apache.activemqgroupId>
- <artifactId>activemq-clientartifactId>
- <version>5.13.4version>
- dependency>
然后写入测试代码
Queues点对点方式,这种方式中生产者和消费者是一对一的,一个生产者产生的消息置灰被一个消费者消费掉,如果生产者在消费者在队列中注册之前发送了某条消息,在消费者注册到对列中依旧是可以获取到这条消息的。
创建queue的生产者
- package com.chunbaosheng.learn;
-
- import javax.jms.Connection;
- import javax.jms.ConnectionFactory;
- import javax.jms.MessageProducer;
- import javax.jms.Queue;
- import javax.jms.Session;
- import javax.jms.TextMessage;
-
-
- import org.apache.activemq.ActiveMQConnectionFactory;
-
- public class ActiveMQQueue {
- public static void main(String[] args) throws Exception {
- //创建一个连接工厂
- ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
- //创建连接
- Connection connection = connectionFactory.createConnection();
- //启动连接
- connection.start();
- /**
- * 连接建立后,建立一个回话。
- * 其中第一个参数为是否使用事物:
- * 为true时:paramB的值忽略, acknowledgment mode被jms服务器设置为SESSION_TRANSACTED
- * 为false时:paramB的值可为Session.AUTO_ACKNOWLEDGE、
- * Session.CLIENT_ACKNOWLEDGE、DUPS_OK_ACKNOWLEDGE其中一个。
- * paramB 取值有:
- * 1、Session.AUTO_ACKNOWLEDGE:为自动确认,客户端发送和接收消息不需要做额外的工作.
- * 2、Session.CLIENT_ACKNOWLEDGE:为客户端确认。客户端接收到消息后,
- * 必须调用javax.jms.Message的acknowledge方法。jms服务器才会删除消息。
- * 3、DUPS_OK_ACKNOWLEDGE:允许副本的确认模式。一旦接收方应用程序的方法调用从处理消息处返回,
- * 会话对象就会确认消息的接收;而且允许重复确认。在需要考虑资源使用时,这种模式非常有效
- * 4、SESSION_TRANSACTED
- * 以上参见:https://www.cnblogs.com/MIC2016/p/6086321.html
- */
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- //创建一个消息队列
- Queue queue = session.createQueue("FirstQueue_song");
- //创建一个消息生产者
- MessageProducer producer = session.createProducer(queue);
- //消息
- TextMessage textMessage = session.createTextMessage("ok now");
- //发送消息
- producer.send(textMessage);
- /**
- * 按照次序关闭资源
- */
- producer.close();
-
- session.close();
-
- connection.close();
- }
-
- }
创建queue的消费者
- package com.chunbaosheng.learn;
-
- import javax.jms.Connection;
- import javax.jms.ConnectionFactory;
- import javax.jms.Message;
- import javax.jms.MessageConsumer;
- import javax.jms.MessageListener;
- import javax.jms.Queue;
- import javax.jms.Session;
- import javax.jms.TextMessage;
-
- import org.apache.activemq.ActiveMQConnectionFactory;
-
- public class ActiveMQConsumer {
- public static void main(String[] args) throws Exception {
- ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
-
- Connection connection = connectionFactory.createConnection();
-
- connection.start();
-
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- Queue queue = session.createQueue("FirstQueue_song");
- //创建消费者
- MessageConsumer consumer = session.createConsumer(queue);
- //设置监听
- consumer.setMessageListener(new MessageListener() {
-
- @Override
- public void onMessage(Message message) {
- TextMessage text = (TextMessage)message;
- try {
- System.out.println(text.getText());
- } catch (Exception e) {
- e.printStackTrace();
- }
-
- }
- });
-
- System.in.read();
-
- consumer.close();
-
- session.close();
-
- connection.close();
- }
-
- }
topic:发布订阅模式,这种方式不只一个消费者可以消费。在消息发布之前注册到topic的消费者都可以获取到这条消息,但是如果消费者注册后于消息发布,那么这个消费者是无法收到这条消息的。
创建topic的生产者
- package com.chunbaosheng.learn;
-
- import javax.jms.Connection;
- import javax.jms.ConnectionFactory;
- import javax.jms.MessageProducer;
- import javax.jms.Session;
- import javax.jms.TextMessage;
- import javax.jms.Topic;
-
- import org.apache.activemq.ActiveMQConnectionFactory;
-
- public class ActiveMQTopic {
-
- public static void main(String[] args) throws Exception {
- ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
-
- Connection connection = connectionFactory.createConnection();
-
- connection.start();
-
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- Topic topic = session.createTopic("message queue");
-
- MessageProducer producer = session.createProducer(topic);
-
- TextMessage textMessage = session.createTextMessage("广播第三条条消息。");
-
- producer.send(textMessage);
-
- producer.close();
-
- session.close();
-
- connection.close();
-
- }
-
- }
创建topic的消费者
- package com.chunbaosheng.learn;
-
- import javax.jms.Connection;
- import javax.jms.ConnectionFactory;
- import javax.jms.Message;
- import javax.jms.MessageConsumer;
- import javax.jms.MessageListener;
- import javax.jms.Session;
- import javax.jms.TextMessage;
- import javax.jms.Topic;
-
- import org.apache.activemq.ActiveMQConnectionFactory;
-
- public class ActiveMQDescribe {
-
- public static void main(String[] args)throws Exception {
- ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
-
- Connection connection = connectionFactory.createConnection();
-
- connection.start();
-
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- Topic topic = session.createTopic("message queue");
-
- MessageConsumer consumer = session.createConsumer(topic);
-
- consumer.setMessageListener(new MessageListener() {
-
- @Override
- public void onMessage(Message message) {
- TextMessage text = (TextMessage)message;
- try {
- System.out.println(text.getText());
- } catch (Exception e) {
- e.printStackTrace();
- }
-
- }
- });
- System.in.read();
-
- consumer.close();
-
- session.close();
-
- connection.close();
- }
-
- }
--- 交流学习如有雷同纯属巧合 天天向上~