RabbitMQ 消息传递模型的核心思想是: 生产者生产的消息从不会直接发送到队列。实际上,通常生产者甚至都不知道这些消息传递传递到了哪些队列中。相反,生产者只能将消息发送到交换机(exchange),交换机工作的内容非常简单,一方面它接收来自生产者的消息,另一方面将它们推入队列。交换机必须确切知道如何处理收到的消息。是应该把这些消息放到特定队列还是说把他们到许多队列中还是说应该丢弃它们。这就的由交换机的类型来决定

交换机的类型:
默认交换机

第一个参数是交换机的名称。空字符串表示默认或无名称交换机:消息能路由发送到队列中其实是由 routingKey(bindingkey)绑定 key 指定的,如果key存在的话
将接收到的所有消息广播到它知道的所有队列中

public static Channel getChannerl() throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setUsername("admin");
factory.setPassword("123");
Connection connection = factory.newConnection();
return connection.createChannel();
}
ReceiveLog01
package cn.edu.xd.fanout;
import cn.edu.xd.util.RabbitMQUtils;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class ReceiveLog01 {
private static final String EXCHANGE_NAME="logs";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel= RabbitMQUtils.getChannerl();
channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
/**
* 生成一个临时的队列 队列的名称是随机的
* 当消费者断开和该队列的连接时 队列自动删除
*/
String queueName=channel.queueDeclare().getQueue();
//把该临时队列绑定exchange 其中 routingkey(也称之为 binding key)为空字符串
channel.queueBind(queueName,EXCHANGE_NAME,"");
System.out.println("ReceiveLog01等待接收消息...");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody());
System.out.println("ReceiveLog01接受到的消息:"+message);
};
//取消消费的一个回调接口 如在消费的时候队列被删除掉了
CancelCallback cancelCallback = s -> {
System.out.println("消息被中断");
};
channel.basicConsume(queueName,true,deliverCallback,cancelCallback);
}
}
ReceiveLog02
package cn.edu.xd.fanout;
import cn.edu.xd.util.RabbitMQUtils;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class ReceiveLog02 {
private static final String EXCHANGE_NAME="logs";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel= RabbitMQUtils.getChannerl();
channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
/**
* 生成一个临时的队列 队列的名称是随机的
* 当消费者断开和该队列的连接时 队列自动删除
*/
String queueName=channel.queueDeclare().getQueue();
//把该临时队列绑定exchange 其中 routingkey(也称之为 binding key)为空字符串
channel.queueBind(queueName,EXCHANGE_NAME,"");
System.out.println("ReceiveLog02等待接收消息...");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody());
System.out.println("ReceiveLog02接受到的消息:"+message);
};
//取消消费的一个回调接口 如在消费的时候队列被删除掉了
CancelCallback cancelCallback = s -> {
System.out.println("消息被中断");
};
channel.basicConsume(queueName,true,deliverCallback,cancelCallback);
}
}
EmitLog
package cn.edu.xd.fanout;
import cn.edu.xd.util.RabbitMQUtils;
import com.rabbitmq.client.Channel;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;
public class EmitLog {
private static final String EXCHANGE_NAME="logs";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel= RabbitMQUtils.getChannerl();
/**
* 声明一个 exchange
* 1.exchange 的名称
* 2.exchange 的类型
*/
channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
Scanner sc=new Scanner(System.in);
while (sc.hasNext()){
String msg=sc.nextLine();
channel.basicPublish(EXCHANGE_NAME,"",null,msg.getBytes());
System.out.println("发生消息:"+msg);
}
}
}



可以看出,EmiLog发送的消息被发生到了两个队列中,两个消费者分别从两个队列中获取到了消息
消息只去到它绑定的routingKey 队列中去


当发送者指定key="orange"时,消息只会被交换机转发到Q1队列中,当发送者指定key="black"或"green"时,消息只会被交换机转发到Q2队列中
package cn.edu.xd.fanout;
import cn.edu.xd.util.RabbitMQUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class ReceiveLog01 {
private static final String EXCHANGE_NAME="direct_logs";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel= RabbitMQUtils.getChannerl();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
/**
* 生成一个临时的队列 队列的名称是随机的
* 当消费者断开和该队列的连接时 队列自动删除
*/
String queueName="Q1";
channel.queueDeclare(queueName,false,false,false,null);
//把该临时队列绑定exchange 其中 routingkey(也称之为 binding key)
channel.queueBind(queueName,EXCHANGE_NAME,"orange");
System.out.println("ReceiveLog01等待接收消息...");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody());
System.out.println("ReceiveLog01从Q1接受到的消息:"+message);
};
//取消消费的一个回调接口 如在消费的时候队列被删除掉了
CancelCallback cancelCallback = s -> {
System.out.println("消息被中断");
};
channel.basicConsume(queueName,true,deliverCallback,cancelCallback);
}
}
package cn.edu.xd.fanout;
import cn.edu.xd.util.RabbitMQUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class ReceiveLog02 {
private static final String EXCHANGE_NAME="direct_logs";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel= RabbitMQUtils.getChannerl();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
/**
* 生成一个临时的队列 队列的名称是随机的
* 当消费者断开和该队列的连接时 队列自动删除
*/
String queueName="Q2";
channel.queueDeclare(queueName,false,false,false,null);
//把该临时队列绑定exchange 其中 routingkey(也称之为 binding key)
//绑定两个key
channel.queueBind(queueName,EXCHANGE_NAME,"black");
channel.queueBind(queueName,EXCHANGE_NAME,"green");
System.out.println("ReceiveLog02等待接收消息...");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody());
System.out.println("ReceiveLog02从Q2接受到的消息:"+message);
};
//取消消费的一个回调接口 如在消费的时候队列被删除掉了
CancelCallback cancelCallback = s -> {
System.out.println("消息被中断");
};
channel.basicConsume(queueName,true,deliverCallback,cancelCallback);
}
}
package cn.edu.xd.fanout;
import cn.edu.xd.util.RabbitMQUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;
public class EmitLog {
private static final String EXCHANGE_NAME="direct_logs";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel= RabbitMQUtils.getChannerl();
/**
* 声明一个 exchange
* 1.exchange 的名称
* 2.exchange 的类型
*/
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
Map<String, String> bindingKeyMap = new HashMap<>();
bindingKeyMap.put("orange","orange...Q1");
bindingKeyMap.put("green","green...Q2");
bindingKeyMap.put("black","black...Q2");
for (Map.Entry<String, String> bindingKeyEntry: bindingKeyMap.entrySet()){
String bindingKey = bindingKeyEntry.getKey();
String message = bindingKeyEntry.getValue();
channel.basicPublish(EXCHANGE_NAME,bindingKey, null, message.getBytes("UTF-8"));
System.out.println("生产者发出消息:" + message);
}
}
}



发送的消息通过对应绑定的key被发送到了指定的队列中,orange消息被发布到了Q1队列中,green和black消息被发布到了Q1队列中,
发送到类型是 topic 交换机的消息的 routing_key 不能随意写,必须满足一定的要求,它必须是一个单词列表,以点号分隔开。这些单词可以是任意单词,比如说:“stock.usd.nyse”, “nyse.vmw”,“quick.orange.rabbit”.这种类型的。当然这个单词列表最多不能超过 255 个字节
*(星号)可以代替一个单词,#(井号)可以替代零个或多个单词
eg:

Q1绑定的是中间带 orange 带 3 个单词的字符串(*.orange.*)
Q2绑定的是最后一个单词是 rabbit 的 3 个单词(*.*.rabbit)和第一个单词是 lazy 的多个单词(lazy.#)
quick.orange.rabbit:被队列 Q1Q2 接收到
lazy.orange.elephant: 被队列 Q1Q2 接收到
quick.orange.fox: 被队列 Q1 接收到
lazy.brown.fox: 被队列 Q2 接收到
lazy.pink.rabbit: 虽然满足两个绑定但只被队列 Q2 接收一次
quick.brown.fox: 不匹配任何绑定不会被任何队列接收到会被丢弃
quick.orange.male.rabbit: 是四个单词不匹配任何绑定会被丢弃
lazy.orange.male.rabbit: 是四个单词但匹配 Q2
当一个队列绑定键是#,那么这个队列将接收所有数据,就有点像 fanout 了
如果队列绑定键当中没有#和*出现,那么该队列绑定类型就是 direct 了
package cn.edu.xd.fanout;
import cn.edu.xd.util.RabbitMQUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class ReceiveLog01 {
private static final String EXCHANGE_NAME="topic_logs";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel= RabbitMQUtils.getChannerl();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
/**
* 生成一个临时的队列 队列的名称是随机的
* 当消费者断开和该队列的连接时 队列自动删除
*/
String queueName="Q1";
channel.queueDeclare(queueName,false,false,false,null);
//把该临时队列绑定exchange 其中 routingkey(也称之为 binding key)
channel.queueBind(queueName,EXCHANGE_NAME,"*.orange.*");
System.out.println("ReceiveLog01等待接收消息...");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody());
System.out.println("ReceiveLog01从Q1接受到的消息:"+message);
};
//取消消费的一个回调接口 如在消费的时候队列被删除掉了
CancelCallback cancelCallback = s -> {
System.out.println("消息被中断");
};
channel.basicConsume(queueName,true,deliverCallback,cancelCallback);
}
}
package cn.edu.xd.fanout;
import cn.edu.xd.util.RabbitMQUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class ReceiveLog02 {
private static final String EXCHANGE_NAME="topic_logs";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel= RabbitMQUtils.getChannerl();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
/**
* 生成一个临时的队列 队列的名称是随机的
* 当消费者断开和该队列的连接时 队列自动删除
*/
String queueName="Q2";
channel.queueDeclare(queueName,false,false,false,null);
//把该临时队列绑定exchange 其中 routingkey(也称之为 binding key)
//绑定两个key
channel.queueBind(queueName,EXCHANGE_NAME,"*.*.rabbit");
channel.queueBind(queueName,EXCHANGE_NAME,"lazy.#");
System.out.println("ReceiveLog02等待接收消息...");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody());
System.out.println("ReceiveLog02从Q2接受到的消息:"+message);
};
//取消消费的一个回调接口 如在消费的时候队列被删除掉了
CancelCallback cancelCallback = s -> {
System.out.println("消息被中断");
};
channel.basicConsume(queueName,true,deliverCallback,cancelCallback);
}
}
package cn.edu.xd.fanout;
import cn.edu.xd.util.RabbitMQUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;
public class EmitLog {
private static final String EXCHANGE_NAME="topic_logs";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel= RabbitMQUtils.getChannerl();
/**
* 声明一个 exchange
* 1.exchange 的名称
* 2.exchange 的类型
*/
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
Map<String, String> bindingKeyMap = new HashMap<>();
bindingKeyMap.put("quick.orange.rabbit","被队列 Q1Q2 接收到");
bindingKeyMap.put("lazy.orange.elephant","被队列 Q1Q2 接收到");
bindingKeyMap.put("quick.orange.fox","被队列 Q1 接收到");
bindingKeyMap.put("lazy.brown.fox","被队列 Q2 接收到");
bindingKeyMap.put("lazy.pink.rabbit","虽然满足两个绑定但只被队列 Q2 接收一次");
bindingKeyMap.put("quick.brown.fox","不匹配任何绑定不会被任何队列接收到会被丢弃");
bindingKeyMap.put("quick.orange.male.rabbit","是四个单词不匹配任何绑定会被丢弃");
bindingKeyMap.put("lazy.orange.male.rabbit","是四个单词但匹配 Q2");
for (Map.Entry<String, String> bindingKeyEntry: bindingKeyMap.entrySet()){
String bindingKey = bindingKeyEntry.getKey();
String message = bindingKeyEntry.getValue();
channel.basicPublish(EXCHANGE_NAME,bindingKey, null, message.getBytes("UTF-8"));
System.out.println("生产者发出消息:" + message);
}
}
}


