• RabbitMQ:hello结构


     

    1.在Linux环境上面装入rabbitMQ

    doker-compose.yml

    1. version: "3.1"
    2. services:
    3. rabbitmq:
    4. image: daocloud.io/library/rabbitmq:management
    5. restart: always
    6. container_name: rabbitmq
    7. ports:
    8. - 6786:5672
    9. - 16786:15672
    10. volumes:
    11. - ./data:/var/lib/rabbitmq

    doker-compose up -d 运行

    2.进入rabbitMQ提供的客户端路径

    自己的路径,和客户端端口号

    RabbitMQ Management

    http://8.140.244.227:   16786

    3.在客户端注册虚拟主机

    4.创建角色

    5.给角色绑定虚拟主机 

     

    6.导入RabbitMQ依赖 

    1. <dependency>
    2. <groupId>com.rabbitmqgroupId>
    3. <artifactId>amqp-clientartifactId>
    4. <version>5.6.0version>
    5. dependency>

    7.写个工具类,获取连接

    1. package com.qf.springbootRbMQ.utils;
    2. import com.rabbitmq.client.Connection;
    3. import com.rabbitmq.client.ConnectionFactory;
    4. public class MQUtils {
    5. public static Connection getConnection() throws Exception {
    6. //创建连接工厂对象
    7. ConnectionFactory connectionFactory = new ConnectionFactory();
    8. //设置MQ服务器的相关信息
    9. connectionFactory.setHost("8.140.244.227");
    10. connectionFactory.setPort(6786);//注意:不要写成管理工具的端口号
    11. connectionFactory.setUsername("root");
    12. connectionFactory.setPassword("1234");
    13. connectionFactory.setVirtualHost("/email");//设置虚拟主机
    14. Connection connection = connectionFactory.newConnection();
    15. return connection;
    16. }
    17. }

    8.写提供者类Send

    1. package com.qf.springbootRbMQ.email;
    2. import com.qf.springbootRbMQ.entity.EmailMessage;
    3. import com.qf.springbootRbMQ.utils.MQUtils;
    4. import com.rabbitmq.client.Channel;
    5. import com.rabbitmq.client.Connection;
    6. import org.springframework.util.SerializationUtils;
    7. public class Send {
    8. //队列的名字
    9. public static final String QUEUE_NAME="QQEmail";
    10. public static void main(String[] args) throws Exception {
    11. //1.获取连接对象
    12. Connection conn = MQUtils.getConnection();
    13. //2. 创建一个channel对象,对于MQ的大部分操作,都定义在了channel对象上
    14. Channel channel = conn.createChannel();
    15. //3.声明了一个队列
    16. /**
    17. * queue – the name of the queue
    18. * durable – true代表创建的队列是持久化的(当mq重启后,该对立依然存在)
    19. * exclusive – 该队列是不是排他的 (该对列是否只能由当前创建该队列的连接使用)
    20. * autoDelete – 该队列是否可以被mq服务器自动删除
    21. * arguments – 队列的其他参数,可以为null
    22. */
    23. channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    24. EmailMessage emailMessage = new EmailMessage();
    25. emailMessage.setQq("1393087444@QQ.com");
    26. emailMessage.setSubject("你好啊,又见面了,发送邮箱给你啊!!!");
    27. emailMessage.setText("

      谢谢你看我的邮件啦啦啦~~~

      "
      );
    28. byte[] bytes = SerializationUtils.serialize(emailMessage);
    29. //生产者如何发送消息,使用下面的方法即可
    30. /**
    31. * exchange – 交换机的名字 ,如果是空串,说明是把消息发给了默认交换机
    32. * routingKey – 路由的key,当发送消息给默认交换机时,routingkey代表队列的名字
    33. * other properties - 消息的其他属性,可以为null
    34. * body – 消息的内容,注意,要是有 字节数组
    35. */
    36. channel.basicPublish("", QUEUE_NAME, null, bytes);
    37. System.out.println(" [x] Sent '" + emailMessage + "'");
    38. //关闭资源
    39. channel.close();
    40. conn.close();
    41. }
    42. }

    9.写消费者类Recv

    1. package com.qf.springbootRbMQ.email;
    2. import cn.hutool.core.collection.CollUtil;
    3. import cn.hutool.extra.mail.MailAccount;
    4. import cn.hutool.extra.mail.MailUtil;
    5. import com.qf.springbootRbMQ.entity.EmailMessage;
    6. import com.qf.springbootRbMQ.utils.MQUtils;
    7. import com.rabbitmq.client.Channel;
    8. import com.rabbitmq.client.Connection;
    9. import com.rabbitmq.client.DeliverCallback;
    10. import com.rabbitmq.client.Delivery;
    11. import org.springframework.beans.factory.annotation.Autowired;
    12. import org.springframework.stereotype.Component;
    13. import org.springframework.util.SerializationUtils;
    14. import javax.mail.MessagingException;
    15. import javax.mail.internet.MimeMessage;
    16. import java.io.File;
    17. import java.io.IOException;
    18. public class Recv {
    19. private final static String QUEUE_NAME="QQEmail";
    20. public static void custormer() throws Exception {
    21. //1.获取连接对象
    22. Connection conn = MQUtils.getConnection();
    23. //2. 创建一个channel对象,对于MQ的大部分操作,都定义在了channel对象上
    24. Channel channel = conn.createChannel();
    25. channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    26. //3.该消费者收到消息之后的处理逻辑,写在DeliverCallback对象中
    27. DeliverCallback deliverCallback =new DeliverCallback() {
    28. @Override
    29. public void handle(String consumerTag, Delivery message) throws IOException {
    30. //这个相当于标识,消费者的ID
    31. System.out.println(consumerTag);
    32. //从Delivery对象中可以获取到生产者,发送的消息的字节数组
    33. byte[] body = message.getBody();
    34. EmailMessage emailMessage = (EmailMessage) SerializationUtils.deserialize(body);
    35. System.out.println(emailMessage);
    36. //在这里写消费者的业务逻辑,例如,发送邮件
    37. MailAccount account = new MailAccount();
    38. account.setHost("smtp.qq.com"); // 设置SMTP服务器地址
    39. account.setPort(25); // 设置SMTP服务器端口
    40. account.setAuth(true); // 设置是否需要身份认证
    41. account.setFrom("1393087444@qq.com"); // 设置发件人邮箱地址
    42. account.setUser("1393087444@qq.com"); // 设置用户名
    43. account.setPass("gqrjqpilpadcjbdi"); // 设置密码
    44. // 发送邮件
    45. MailUtil.send(account, CollUtil.newArrayList("1393087444@qq.com"),emailMessage.getSubject(),emailMessage.getText(),false);
    46. }
    47. };
    48. //4.让当前消费者开始消费(QUEUE_NAME)队列中的消息
    49. /**
    50. * queue – the name of the queue
    51. * autoAck – true 代表当前消费者是不是自动确认模式。true代表自动确认。
    52. * deliverCallback – 当有消息发送给该消费者时,消费者如何处理消息的逻辑
    53. * cancelCallback – 当消费者被取消掉时,如果要执行代码,写到这里
    54. */
    55. channel.basicConsume(QUEUE_NAME,true,deliverCallback,consumerTag -> {});
    56. }
    57. public static void main(String[] args) throws Exception {
    58. custormer();
    59. }
    60. }

    10.发送提供者,建立连接消息队列,将信息放入消息队列中

    11.运行消费者接收消息,并处理消息 

     

  • 相关阅读:
    Huffman哈夫曼树思想即代码
    Java架构师分析和设计技术架构
    ubuntu下网卡插入网线后仍然不连接
    2.9 PE结构:重建导入表结构
    ECharts柱状图加滚动条
    nginx测试rewrite
    ElasticSearch--整合SpringBoot
    SSM处理过程
    08【MyBatis之动态SQL】
    使用Nginx来实现限流
  • 原文地址:https://blog.csdn.net/qq_53374893/article/details/132721297