• rabbitmq代码


    spring:
      rabbitmq:
        host:  # 主机名
        port: 5672 # 端口
        virtual-host: / # 虚拟主机
        username: zhangsan # 用户名
        password: 1234 # 密码
    =============================================
    public class User implements Serializable {
        private static final long serliaVersionUid = 1L;
        private String name;

        @Override
        public String toString() {
            return "User{" +
                    "name='" + name + '\'' +
                    ", age=" + age +
                    '}';
        }

        private int age;

        public static long getSerliaVersionUid() {
            return serliaVersionUid;
        }

        public String getName() {
            return name;
        }

        public void setName(String name) {
            this.name = name;
        }

        public int getAge() {
            return age;
        }

        public void setAge(int age) {
            this.age = age;
        }

        public User(String name, int age) {
            this.name = name;
            this.age = age;
        }
    }
    ===========================================================
    生产者
    @SpringBootApplication
    public class PublisherApplication {
        public static void main(String[] args) {
            SpringApplication.run(PublisherApplication.class);
        }

        @Bean
        public MessageConverter jsonMessageConverter(){

            return new Jackson2JsonMessageConverter();
        }
    }

    =========================================================
    @SpringBootTest(classes = PublisherApplication.class)
    @RunWith(SpringRunner.class)
    public class PublisherTest {
        @Autowired
        private RabbitTemplate rabbitTemplate;

        @Test/*简单消息模型*/
        public void simple() {
            String queueName = "simple.queue";
            // 消息
            String message = "hello, spring amqp!";
            // 发送消息
            //需要先申明一个queue
            rabbitTemplate.convertAndSend(queueName, message);
        }

        @Test/*工作消息模型*/
        public void worker() {
            String queueName = "simple.queue";
            // 消息
            String message = "hello, spring amqp!";
            // 发送消息
            //需要先申明一个queue
            for (int i = 0; i < 50; i++) {
                rabbitTemplate.convertAndSend(queueName, "发送的第" + i + "条消息:" + message);
            }
        }

        @Test/*简单消息模型*/
        public void testSendMessage() throws IOException, TimeoutException {
            // 1.建立连接
            ConnectionFactory factory = new ConnectionFactory();
            // 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
            factory.setHost("192.168.238.128");
            factory.setPort(5672);
            factory.setVirtualHost("/itcast");
            factory.setUsername("zhangsan");
            factory.setPassword("1234");
            // 1.2.建立连接
            Connection connection = factory.newConnection();

            // 2.创建通道Channel
            Channel channel = connection.createChannel();

            // 3.创建队列
            String queueName = "simple.queue";
            channel.queueDeclare(queueName, false, false, false, null);

            // 4.发送消息
            String message = "hello, rabbitmq!";
            channel.basicPublish("", queueName, null, message.getBytes());
            System.out.println("发送消息成功:【" + message + "】");

            // 5.关闭通道和连接
            channel.close();
            connection.close();

        }

        @Test/*工作消息模型*/
        public void testSendWorkerMessage() throws IOException, TimeoutException {
            // 1.建立连接
            ConnectionFactory factory = new ConnectionFactory();
            // 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
            factory.setHost("192.168.238.128");
            factory.setPort(5672);
            factory.setVirtualHost("/itcast");
            factory.setUsername("zhangsan");
            factory.setPassword("1234");
            // 1.2.建立连接
            Connection connection = factory.newConnection();

            // 2.创建通道Channel
            Channel channel = connection.createChannel();

            // 3.创建队列
            String queueName = "simple.queue";
            channel.queueDeclare(queueName, false, false, false, null);

            // 4.发送消息
            String message = "hello, rabbitmq!";
            for (int i = 0; i < 50; i++) {
                channel.basicPublish("", queueName, null, message.getBytes());
            }
            System.out.println("发送消息成功:【" + message + "】");

            // 5.关闭通道和连接
            channel.close();
            connection.close();

        }


        /*发布/订阅*/
        @Test/*Fanout广播*/
        public void fanout() {
            // 消息
            String message = "hello,Fanout广播!";
            //交换机
            String fanoutName = "fanoutExchange";
            // 发送消息
            //需要先申明一个queue
            for (int i = 0; i < 10; i++) {
                rabbitTemplate.convertAndSend(fanoutName, "", message);
            }
        }


        @Test/*Direct定向路由*/
        public void direct() {
            // 消息
            String message = "hello,Direct定向路由!";
            //交换机
            String directName = "directExchange";
            // 发送消息
            //需要先申明一个queue
            User user = new User("张三", 22);
            rabbitTemplate.convertAndSend(directName, "red", user);
            rabbitTemplate.convertAndSend(directName, "blue", user);
            rabbitTemplate.convertAndSend(directName, "yellow", user);
        }

        @Test/*Topic话题*/
        public void topic() {
            // 消息
            String message = "hello,Topic话题!";
            //交换机
            String topicName = "topicExchange";
            // 发送消息
            //需要先申明一个queue
            rabbitTemplate.convertAndSend(topicName, "china.watch", message);
            rabbitTemplate.convertAndSend(topicName, "japan.watch", message);
            rabbitTemplate.convertAndSend(topicName, "china.niubi", message);
        }


        @Test/*Topic话题*/
        public void topic2() {
            // 消息
            String message = "hello,Topic话题!";
            //交换机
            String topicName = "topicExchange2";
            // 发送消息
            //需要先申明一个queue
            rabbitTemplate.setExchange(topicName);
            rabbitTemplate.convertAndSend(topicName, "china.watch", message);
            rabbitTemplate.convertAndSend(topicName, "japan.watch", message);
            rabbitTemplate.convertAndSend(topicName, "china.niubi", message);
        }
        
        
    ============================================================
    消费者
    @SpringBootApplication
    public class ConsumerApplication {
        public static void main(String[] args) {
            SpringApplication.run(ConsumerApplication.class, args);
        }
          @Bean
        public MessageConverter jsonMessageConverter(){

            return new Jackson2JsonMessageConverter();
        }
    }
    ==========================================================
    @Component
    public class FanoutConfig {
        /*定义声明交换机对垒,交换机绑定*/
        //1定义一个交换机
        @Bean
        public FanoutExchange fanoutExchange() {
            return new FanoutExchange("fanoutExchange");
        }

        //2.定义声明一个队列
        @Bean
        public Queue queue1() {
            return new Queue("fanoutQueue1");
        }

        @Bean
        public Queue queue2() {
            return new Queue("fanoutQueue2");
        }

        //3.绑定
        @Bean
        public Binding builder1(FanoutExchange fanoutExchange, Queue queue1) {
            return BindingBuilder.bind(queue1).to(fanoutExchange);
        }

        @Bean
        public Binding builder2(FanoutExchange fanoutExchange, Queue queue2) {
            return BindingBuilder.bind(queue2).to(fanoutExchange);
        }
    }
    ============================================================
    @Component
    public class ConsumerListener {
        int count1 = 1;
        int count2 = 1;

        @RabbitListener(queues = "simple.queue")/*简单队列监听;工作队列1*/
        public void simplequeue(Object msg) {

            System.err.println("Q1-接收到消息:【" + msg + "】" + count1++);

        }

        @RabbitListener(queues = "simple.queue") /*简单队列监听;工作队列2*/
        public void simplequeue2(Object msg) throws InterruptedException {
            System.out.println("Q2-接收到消息:【" + msg + "】" + count2++);
            Thread.sleep(20);
        }


        //===========================================================================

        @RabbitListener(queues = "fanoutQueue1") /* fanout广播1*/
        public void fanoutQueue1(Object msg) throws InterruptedException {
            System.out.println(" fanout1-接收到消息:【" + msg + "】");
            Thread.sleep(20);
        }

        @RabbitListener(queues = "fanoutQueue2") /* fanout广播2*/
        public void fanoutQueue2(Object msg) throws InterruptedException {
            System.out.println(" fanout2-接收到消息:【" + msg + "】");
            Thread.sleep(20);
        }

        //===========================================================================

        @RabbitListener(bindings = @QueueBinding(
                value = @Queue(name = "direct.queue1"),
                exchange = @Exchange(name = "directExchange", type = ExchangeTypes.DIRECT),
                key = {"red", "blue"}
        ))/*Direct定向路由1*/
        public void listenDirectQueue1(Object msg) {
            System.out.println("消费者接收到direct.queue1的消息:【" + msg + "】");
        }

        @RabbitListener(bindings = @QueueBinding(
                value = @Queue(name = "direct.queue2"),
                exchange = @Exchange(name = "directExchange", type = ExchangeTypes.DIRECT),
                key = {"red", "yellow"}
        ))/*Direct定向路由2*/
        public void listenDirectQueue2(Object msg) {
            System.out.println("消费者接收到direct.queue2的消息:【" + msg + "】");
        }


        //===========================================================================


        @RabbitListener(bindings = @QueueBinding(
                value = @Queue(name = "topic.queue2"),
                exchange = @Exchange(name = "topicExchange", type = ExchangeTypes.TOPIC),
                key = {"china.#"}
        ))/*Topic话题*/
        public void listenTopicQueue2(Object msg) {
            System.out.println("消费者接收到topic.queue2的消息:【" + msg + "】");
        }

        @RabbitListener(bindings = @QueueBinding(
                value = @Queue(name = "topic.queue1"),
                exchange = @Exchange(name = "topicExchange", type = ExchangeTypes.TOPIC),
                key = {"japan.#"}
        ))/*Topic话题*/
        public void listenTopicQueue1(Object msg) {
            System.out.println("消费者接收到topic.queue1的消息:【" + msg + "】");
        }

        @RabbitListener(bindings = @QueueBinding(
                value = @Queue(name = "topic.queue3"),
                exchange = @Exchange(name = "topicExchange", type = ExchangeTypes.TOPIC),
                key = {"#.watch"}
        ))/*Topic话题*/
        public void listenTopicQueue3(Object msg) {
            System.out.println("消费者接收到topic.queue3的消息:【" + msg + "】");
        }


    }

  • 相关阅读:
    MIT课程分布式系统学习07——Fault Tolerance raft2
    JuiceFS 在多云存储架构中的应用 | 深势科技分享
    【Java设计模式 经典设计原则】 八 经典设计原则小结
    Spring MVC共享域对象操作
    【车载AI音视频电脑】车载管理软件,轨迹回放,远程录像回放,定位
    社保和五险一金那些事
    【C语言】详解栈和队列(定义、销毁、数据的操作)
    无代码开发和低代码开发的本质区别
    uniapp实现简单的九宫格抽奖(附源码)
    论文阅读 Predicting Dynamic Embedding Trajectory in Temporal Interaction Networks
  • 原文地址:https://blog.csdn.net/weixin_73510682/article/details/128074412