• RabbitMQ工作模式——Topics模式


    1.Topics通配符模式

    *是一个单词,#是0到多个单词
    在这里插入图片描述
    Topics模式生产者代码

    public class Producer_Topic {
    	public static void main(String[] args) throws IOException, TimeoutException {
    		//1.创建连接工厂
    		ConnectionFactory factory = new ConnectionFactory();
    		//2.设置参数
    		factory.setHost("172.16.98.133"); ip 默认值 localhost
    		factory.setPort(5672);//端口 默认值5672
    		factory.setVirtualHost("/itcast");//虚拟机 默认值
    		factory.setUsername("heima");//用户名 默认guest
    		factory.setPassword("heima");//密码 默认值 guest
    		//3.创建连接 Connection
    		Connection connection = factory.newConnection();
    		//4.创建Channel
    		Channel channel = connection.creatChannel();
    		/*
    		exchange(String exchange,String type,boolean durable,boolean autoDelete,boolean internal,Map arguments)
    		参数:
    		1.exchange:交换机名称
    		2.type:交换机类型
    			DIRECT("direct"),:定向
    			FANOUT("fanout"),:扇形(广播)发送消息到每一个与之绑定的队列
    			TOPIC("topic"),:通配符方式
    			HEADERS("headers");:参数匹配
    		3.durable:是否持久化
    		4.autoDelete:自动删除
    		5.internal:内部使用。一般为false
    		6.arguments:参数,一般设为null
    		*/
    		//5.创建交换机
    		String exchangeName = "test_topic";
    		channel.exchangeDeclare(exchangeName,BuiltinExchangeType.TOPIC,true,false,false,null);
    		//6.创建队列
    		String queue1Name = "test_topic_queue1";
    		String queue2Name = "test_topic_queue2";
    		channel.queueDeclare(queue1Name,true,false,false,null);
    		channel.queueDeclare(queue2Name,true,false,false,null);
    		//7.绑定队列和交换机
    		/*
    		queueBind(String queue,String exchange,String routingKey)
    		参数:
    			1.queue:队列名称
    			2.exchange:交换机名称
    			3.routingKey:路由键,绑定规则
    				如果交换机的类型为:fanout,routingKey设置为空字符串
    		*/
    		//routingKey 系统的名称.日志的级别。
    		//需求:所有error级别的日志存数据库,所有order系统的日志存入数据库
    		channel.queueBind(queue1Name,exchangeName,"#.error");
    		channel.queueBind(queue1Name,exchangeName,"order.*");
    		channel.queueBind(queue2Name,exchangeName,"*.*");
    		//8.发送消息
    		String body = "日志信息,张三调用了findAll方法...日志级别:info...";
    		channel.basicPublish(exchangeName,"order.info",null,body.getBytes());
    		//9.释放资源
    		channel.close();
    		connection.close();
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58

    Topic1模式消费者代码

    public class Consumer_Topic1 {
    	public static void main(String[] args) throws IOException, TimeoutException {
    		//1.创建连接工厂
    		ConnectionFactory factory = new ConnectionFactory();
    		//2.设置参数
    		factory.setHost("172.16.98.133"); ip 默认值 localhost
    		factory.setPort(5672);//端口 默认值5672
    		factory.setVirtualHost("/itcast");//虚拟机 默认值
    		factory.setUsername("heima");//用户名 默认guest
    		factory.setPassword("heima");//密码 默认值 guest
    		//3.创建连接 Connection
    		Connection connection = factory.newConnection();
    		//4.创建Channel
    		Channel channel = connection.creatChannel();
    		
    		String queue1Name = "test_topic_queue1";
    		String queue2Name = "test_topic_queue2";
    		
    		/*
    			basicConsume(String queue,boolean autoAck,Consumer callback)
    			参数:
    			1.queue:队列名称
    			2.autoAck:是否自动确认
    			3.callback:回调对象
    		*/
    		//接收消息
    		Consumer consumer = new DefaultConsumer(channel){
    			/*
    				回调方法,当收到消息后会自动执行该方法
    				1.consumerTag:标识
    				2.envelope:获取一些信息,交换机,路由key...
    				3.properties:配置信息
    				4.body:数据
    			*/
    			@Override
    			public void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body){
    				System.out.println("consumerTag" + consumerTag);
    				System.out.println("Exchange" + envelope.getExchange());
    				System.out.println("RoutingKey" + envelope.getRoutingKey());
    				System.out.println("properties" + properties);
    				System.out.println("body" + new String(body));
    				System.out.println("将日志信息存入数据库......");
    			}
    		};
    		channel.basicConsume("queue1Name",true,consumer);
    
    		//消费者不能关闭资源
    	}
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50

    Topic2模式消费者代码

    public class Consumer_Topic2 {
    	public static void main(String[] args) throws IOException, TimeoutException {
    		//1.创建连接工厂
    		ConnectionFactory factory = new ConnectionFactory();
    		//2.设置参数
    		factory.setHost("172.16.98.133"); ip 默认值 localhost
    		factory.setPort(5672);//端口 默认值5672
    		factory.setVirtualHost("/itcast");//虚拟机 默认值
    		factory.setUsername("heima");//用户名 默认guest
    		factory.setPassword("heima");//密码 默认值 guest
    		//3.创建连接 Connection
    		Connection connection = factory.newConnection();
    		//4.创建Channel
    		Channel channel = connection.creatChannel();
    		
    		String queue1Name = "test_topic_queue1";
    		String queue2Name = "test_topic_queue2";
    		
    		/*
    			basicConsume(String queue,boolean autoAck,Consumer callback)
    			参数:
    			1.queue:队列名称
    			2.autoAck:是否自动确认
    			3.callback:回调对象
    		*/
    		//接收消息
    		Consumer consumer = new DefaultConsumer(channel){
    			/*
    				回调方法,当收到消息后会自动执行该方法
    				1.consumerTag:标识
    				2.envelope:获取一些信息,交换机,路由key...
    				3.properties:配置信息
    				4.body:数据
    			*/
    			@Override
    			public void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body){
    				System.out.println("consumerTag" + consumerTag);
    				System.out.println("Exchange" + envelope.getExchange());
    				System.out.println("RoutingKey" + envelope.getRoutingKey());
    				System.out.println("properties" + properties);
    				System.out.println("body" + new String(body));
    				System.out.println("将日志信息打印到控制台......");
    			}
    		};
    		channel.basicConsume("queue2Name",true,consumer);
    
    		//消费者不能关闭资源
    	}
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
  • 相关阅读:
    java字符串专项训练(手机号屏蔽)
    Redis 中的原子操作(1)-Redis 中命令的原子性
    【OpenCV 例程200篇】214. 绘制椭圆的参数详解
    前端面试总结
    JavaScript入门⑥-WEB浏览器API
    分布式全局唯一ID生成方案(附源码)
    RabbitMQ系列【8】消息可靠性之ACK机制
    Elasticsearch压测工具esrally详解
    【数据结构算法笔记】----贪心算法(简单贪心:月饼问题、最优装箱问题、整数配对、最大组合整数。区间贪心:区间不相交问题、区间选点问题)
    vue3自定义指令的学习和常用的几个自定义指令
  • 原文地址:https://blog.csdn.net/weixin_44860226/article/details/133238805