spring:
rabbitmq:
host: # 主机名
port: 5672 # 端口
virtual-host: / # 虚拟主机
username: zhangsan # 用户名
password: 1234 # 密码
=============================================
public class User implements Serializable {
private static final long serliaVersionUid = 1L;
private String name;
@Override
public String toString() {
return "User{" +
"name='" + name + '\'' +
", age=" + age +
'}';
}
private int age;
public static long getSerliaVersionUid() {
return serliaVersionUid;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
public User(String name, int age) {
this.name = name;
this.age = age;
}
}
===========================================================
生产者
@SpringBootApplication
public class PublisherApplication {
public static void main(String[] args) {
SpringApplication.run(PublisherApplication.class);
}
@Bean
public MessageConverter jsonMessageConverter(){
return new Jackson2JsonMessageConverter();
}
}
=========================================================
@SpringBootTest(classes = PublisherApplication.class)
@RunWith(SpringRunner.class)
public class PublisherTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test/*简单消息模型*/
public void simple() {
String queueName = "simple.queue";
// 消息
String message = "hello, spring amqp!";
// 发送消息
//需要先申明一个queue
rabbitTemplate.convertAndSend(queueName, message);
}
@Test/*工作消息模型*/
public void worker() {
String queueName = "simple.queue";
// 消息
String message = "hello, spring amqp!";
// 发送消息
//需要先申明一个queue
for (int i = 0; i < 50; i++) {
rabbitTemplate.convertAndSend(queueName, "发送的第" + i + "条消息:" + message);
}
}
@Test/*简单消息模型*/
public void testSendMessage() throws IOException, TimeoutException {
// 1.建立连接
ConnectionFactory factory = new ConnectionFactory();
// 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
factory.setHost("192.168.238.128");
factory.setPort(5672);
factory.setVirtualHost("/itcast");
factory.setUsername("zhangsan");
factory.setPassword("1234");
// 1.2.建立连接
Connection connection = factory.newConnection();
// 2.创建通道Channel
Channel channel = connection.createChannel();
// 3.创建队列
String queueName = "simple.queue";
channel.queueDeclare(queueName, false, false, false, null);
// 4.发送消息
String message = "hello, rabbitmq!";
channel.basicPublish("", queueName, null, message.getBytes());
System.out.println("发送消息成功:【" + message + "】");
// 5.关闭通道和连接
channel.close();
connection.close();
}
@Test/*工作消息模型*/
public void testSendWorkerMessage() throws IOException, TimeoutException {
// 1.建立连接
ConnectionFactory factory = new ConnectionFactory();
// 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
factory.setHost("192.168.238.128");
factory.setPort(5672);
factory.setVirtualHost("/itcast");
factory.setUsername("zhangsan");
factory.setPassword("1234");
// 1.2.建立连接
Connection connection = factory.newConnection();
// 2.创建通道Channel
Channel channel = connection.createChannel();
// 3.创建队列
String queueName = "simple.queue";
channel.queueDeclare(queueName, false, false, false, null);
// 4.发送消息
String message = "hello, rabbitmq!";
for (int i = 0; i < 50; i++) {
channel.basicPublish("", queueName, null, message.getBytes());
}
System.out.println("发送消息成功:【" + message + "】");
// 5.关闭通道和连接
channel.close();
connection.close();
}
/*发布/订阅*/
@Test/*Fanout广播*/
public void fanout() {
// 消息
String message = "hello,Fanout广播!";
//交换机
String fanoutName = "fanoutExchange";
// 发送消息
//需要先申明一个queue
for (int i = 0; i < 10; i++) {
rabbitTemplate.convertAndSend(fanoutName, "", message);
}
}
@Test/*Direct定向路由*/
public void direct() {
// 消息
String message = "hello,Direct定向路由!";
//交换机
String directName = "directExchange";
// 发送消息
//需要先申明一个queue
User user = new User("张三", 22);
rabbitTemplate.convertAndSend(directName, "red", user);
rabbitTemplate.convertAndSend(directName, "blue", user);
rabbitTemplate.convertAndSend(directName, "yellow", user);
}
@Test/*Topic话题*/
public void topic() {
// 消息
String message = "hello,Topic话题!";
//交换机
String topicName = "topicExchange";
// 发送消息
//需要先申明一个queue
rabbitTemplate.convertAndSend(topicName, "china.watch", message);
rabbitTemplate.convertAndSend(topicName, "japan.watch", message);
rabbitTemplate.convertAndSend(topicName, "china.niubi", message);
}
@Test/*Topic话题*/
public void topic2() {
// 消息
String message = "hello,Topic话题!";
//交换机
String topicName = "topicExchange2";
// 发送消息
//需要先申明一个queue
rabbitTemplate.setExchange(topicName);
rabbitTemplate.convertAndSend(topicName, "china.watch", message);
rabbitTemplate.convertAndSend(topicName, "japan.watch", message);
rabbitTemplate.convertAndSend(topicName, "china.niubi", message);
}
============================================================
消费者
@SpringBootApplication
public class ConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(ConsumerApplication.class, args);
}
@Bean
public MessageConverter jsonMessageConverter(){
return new Jackson2JsonMessageConverter();
}
}
==========================================================
@Component
public class FanoutConfig {
/*定义声明交换机对垒,交换机绑定*/
//1定义一个交换机
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange("fanoutExchange");
}
//2.定义声明一个队列
@Bean
public Queue queue1() {
return new Queue("fanoutQueue1");
}
@Bean
public Queue queue2() {
return new Queue("fanoutQueue2");
}
//3.绑定
@Bean
public Binding builder1(FanoutExchange fanoutExchange, Queue queue1) {
return BindingBuilder.bind(queue1).to(fanoutExchange);
}
@Bean
public Binding builder2(FanoutExchange fanoutExchange, Queue queue2) {
return BindingBuilder.bind(queue2).to(fanoutExchange);
}
}
============================================================
@Component
public class ConsumerListener {
int count1 = 1;
int count2 = 1;
@RabbitListener(queues = "simple.queue")/*简单队列监听;工作队列1*/
public void simplequeue(Object msg) {
System.err.println("Q1-接收到消息:【" + msg + "】" + count1++);
}
@RabbitListener(queues = "simple.queue") /*简单队列监听;工作队列2*/
public void simplequeue2(Object msg) throws InterruptedException {
System.out.println("Q2-接收到消息:【" + msg + "】" + count2++);
Thread.sleep(20);
}
//===========================================================================
@RabbitListener(queues = "fanoutQueue1") /* fanout广播1*/
public void fanoutQueue1(Object msg) throws InterruptedException {
System.out.println(" fanout1-接收到消息:【" + msg + "】");
Thread.sleep(20);
}
@RabbitListener(queues = "fanoutQueue2") /* fanout广播2*/
public void fanoutQueue2(Object msg) throws InterruptedException {
System.out.println(" fanout2-接收到消息:【" + msg + "】");
Thread.sleep(20);
}
//===========================================================================
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue1"),
exchange = @Exchange(name = "directExchange", type = ExchangeTypes.DIRECT),
key = {"red", "blue"}
))/*Direct定向路由1*/
public void listenDirectQueue1(Object msg) {
System.out.println("消费者接收到direct.queue1的消息:【" + msg + "】");
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue2"),
exchange = @Exchange(name = "directExchange", type = ExchangeTypes.DIRECT),
key = {"red", "yellow"}
))/*Direct定向路由2*/
public void listenDirectQueue2(Object msg) {
System.out.println("消费者接收到direct.queue2的消息:【" + msg + "】");
}
//===========================================================================
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic.queue2"),
exchange = @Exchange(name = "topicExchange", type = ExchangeTypes.TOPIC),
key = {"china.#"}
))/*Topic话题*/
public void listenTopicQueue2(Object msg) {
System.out.println("消费者接收到topic.queue2的消息:【" + msg + "】");
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic.queue1"),
exchange = @Exchange(name = "topicExchange", type = ExchangeTypes.TOPIC),
key = {"japan.#"}
))/*Topic话题*/
public void listenTopicQueue1(Object msg) {
System.out.println("消费者接收到topic.queue1的消息:【" + msg + "】");
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic.queue3"),
exchange = @Exchange(name = "topicExchange", type = ExchangeTypes.TOPIC),
key = {"#.watch"}
))/*Topic话题*/
public void listenTopicQueue3(Object msg) {
System.out.println("消费者接收到topic.queue3的消息:【" + msg + "】");
}
}