• 关于ActiveMQ的demo


    我们学习使用activemq的时候有时候不知道怎么入手这个时候不妨看看下面的demo帮助你学习交流使用。

    首先 :

            安装启动activemq登录到http://127.0.0.1:8161/index.html

    之后建立maven项目,添加maven依赖

    1. <dependency>
    2. <groupId>org.apache.activemqgroupId>
    3. <artifactId>activemq-clientartifactId>
    4. <version>5.13.4version>
    5. dependency>

    然后写入测试代码

    Queues点对点方式,这种方式中生产者和消费者是一对一的,一个生产者产生的消息置灰被一个消费者消费掉,如果生产者在消费者在队列中注册之前发送了某条消息,在消费者注册到对列中依旧是可以获取到这条消息的。

    创建queue的生产者

    1. package com.chunbaosheng.learn;
    2. import javax.jms.Connection;
    3. import javax.jms.ConnectionFactory;
    4. import javax.jms.MessageProducer;
    5. import javax.jms.Queue;
    6. import javax.jms.Session;
    7. import javax.jms.TextMessage;
    8. import org.apache.activemq.ActiveMQConnectionFactory;
    9. public class ActiveMQQueue {
    10. public static void main(String[] args) throws Exception {
    11. //创建一个连接工厂
    12. ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
    13. //创建连接
    14. Connection connection = connectionFactory.createConnection();
    15. //启动连接
    16. connection.start();
    17. /**
    18. * 连接建立后,建立一个回话。
    19. * 其中第一个参数为是否使用事物:
    20. * 为true时:paramB的值忽略, acknowledgment mode被jms服务器设置为SESSION_TRANSACTED
    21. * 为false时:paramB的值可为Session.AUTO_ACKNOWLEDGE、
    22. * Session.CLIENT_ACKNOWLEDGE、DUPS_OK_ACKNOWLEDGE其中一个。
    23. * paramB 取值有:
    24. * 1、Session.AUTO_ACKNOWLEDGE:为自动确认,客户端发送和接收消息不需要做额外的工作.
    25. * 2、Session.CLIENT_ACKNOWLEDGE:为客户端确认。客户端接收到消息后,
    26. * 必须调用javax.jms.Message的acknowledge方法。jms服务器才会删除消息。
    27. * 3、DUPS_OK_ACKNOWLEDGE:允许副本的确认模式。一旦接收方应用程序的方法调用从处理消息处返回,
    28. * 会话对象就会确认消息的接收;而且允许重复确认。在需要考虑资源使用时,这种模式非常有效
    29. * 4、SESSION_TRANSACTED
    30. * 以上参见:https://www.cnblogs.com/MIC2016/p/6086321.html
    31. */
    32. Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    33. //创建一个消息队列
    34. Queue queue = session.createQueue("FirstQueue_song");
    35. //创建一个消息生产者
    36. MessageProducer producer = session.createProducer(queue);
    37. //消息
    38. TextMessage textMessage = session.createTextMessage("ok now");
    39. //发送消息
    40. producer.send(textMessage);
    41. /**
    42. * 按照次序关闭资源
    43. */
    44. producer.close();
    45. session.close();
    46. connection.close();
    47. }
    48. }

    创建queue的消费者

    1. package com.chunbaosheng.learn;
    2. import javax.jms.Connection;
    3. import javax.jms.ConnectionFactory;
    4. import javax.jms.Message;
    5. import javax.jms.MessageConsumer;
    6. import javax.jms.MessageListener;
    7. import javax.jms.Queue;
    8. import javax.jms.Session;
    9. import javax.jms.TextMessage;
    10. import org.apache.activemq.ActiveMQConnectionFactory;
    11. public class ActiveMQConsumer {
    12. public static void main(String[] args) throws Exception {
    13. ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
    14. Connection connection = connectionFactory.createConnection();
    15. connection.start();
    16. Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    17. Queue queue = session.createQueue("FirstQueue_song");
    18. //创建消费者
    19. MessageConsumer consumer = session.createConsumer(queue);
    20. //设置监听
    21. consumer.setMessageListener(new MessageListener() {
    22. @Override
    23. public void onMessage(Message message) {
    24. TextMessage text = (TextMessage)message;
    25. try {
    26. System.out.println(text.getText());
    27. } catch (Exception e) {
    28. e.printStackTrace();
    29. }
    30. }
    31. });
    32. System.in.read();
    33. consumer.close();
    34. session.close();
    35. connection.close();
    36. }
    37. }

    topic:发布订阅模式,这种方式不只一个消费者可以消费。在消息发布之前注册到topic的消费者都可以获取到这条消息,但是如果消费者注册后于消息发布,那么这个消费者是无法收到这条消息的。

    创建topic的生产者

    1. package com.chunbaosheng.learn;
    2. import javax.jms.Connection;
    3. import javax.jms.ConnectionFactory;
    4. import javax.jms.MessageProducer;
    5. import javax.jms.Session;
    6. import javax.jms.TextMessage;
    7. import javax.jms.Topic;
    8. import org.apache.activemq.ActiveMQConnectionFactory;
    9. public class ActiveMQTopic {
    10. public static void main(String[] args) throws Exception {
    11. ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
    12. Connection connection = connectionFactory.createConnection();
    13. connection.start();
    14. Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    15. Topic topic = session.createTopic("message queue");
    16. MessageProducer producer = session.createProducer(topic);
    17. TextMessage textMessage = session.createTextMessage("广播第三条条消息。");
    18. producer.send(textMessage);
    19. producer.close();
    20. session.close();
    21. connection.close();
    22. }
    23. }

    创建topic的消费者

    1. package com.chunbaosheng.learn;
    2. import javax.jms.Connection;
    3. import javax.jms.ConnectionFactory;
    4. import javax.jms.Message;
    5. import javax.jms.MessageConsumer;
    6. import javax.jms.MessageListener;
    7. import javax.jms.Session;
    8. import javax.jms.TextMessage;
    9. import javax.jms.Topic;
    10. import org.apache.activemq.ActiveMQConnectionFactory;
    11. public class ActiveMQDescribe {
    12. public static void main(String[] args)throws Exception {
    13. ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
    14. Connection connection = connectionFactory.createConnection();
    15. connection.start();
    16. Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    17. Topic topic = session.createTopic("message queue");
    18. MessageConsumer consumer = session.createConsumer(topic);
    19. consumer.setMessageListener(new MessageListener() {
    20. @Override
    21. public void onMessage(Message message) {
    22. TextMessage text = (TextMessage)message;
    23. try {
    24. System.out.println(text.getText());
    25. } catch (Exception e) {
    26. e.printStackTrace();
    27. }
    28. }
    29. });
    30. System.in.read();
    31. consumer.close();
    32. session.close();
    33. connection.close();
    34. }
    35. }

    --- 交流学习如有雷同纯属巧合 天天向上~

  • 相关阅读:
    什么是高可用架构
    2023-10-09 LeetCode每日一题(最小和分割)
    [激光原理与应用-68]:如何消除50Hz工频干扰和差分信号应对工频干扰
    重要的代理模式到底是什么
    关系型数据库的问题和NoSQL数据库的应用
    KMP模式匹配算法
    在JS中使用精灵图的原理
    基于Django的博客系统之登录增加忘记密码(八)
    力扣每日一题45:跳跃游戏
    数字赋能机械制造业,供应链协同管理系统解决方案助力企业供应链再升级
  • 原文地址:https://blog.csdn.net/s_sos0/article/details/126551686