使用RabbitMQ提供的原生客户端API进行交互。
一、Maven依赖
com.rabbitmq
amqp-client
5.9.0
二、创建连接以及声明队列
1、首先创建连接,获取Channel
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(HOST_NAME);
factory.setPort(HOST_PORT);
factory.setUsername("guest");
factory.setPassword("guest");
factory.setVirtualHost("/mirror");
connection = factory.newConnection();
Channel channel = connection.createChannel();
2、声明queue队列
(1)classic队列
channel.queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map arguments)
(2)Quorum队列
需要在后面的arguments中传入一个参数,x-queue-type,参数值设定为quorum
Map params = new HashMap<>();
params.put("x-queue-type","quorum");
channel.queueDeclare(QUEUE_NAME, true, false, false, params); //durable参数必须是true,exclusive必须是false
(3)Stream队列
如果要声明一个Stream队列,则 x-queue-type参数要设置为stream
Map params = new HashMap<>();
params.put("x-queue-type","stream");
params.put("x-max-length-bytes", 20_000_000_000L);
params.put("x-stream-max-segment-size-bytes", 100_000_000);
channel.queueDeclare(QUEUE_NAME, true, false, false, params); //durable参数必须是true,exclusive必须是false
三、Producer根据应用场景发送消息到queue
channel.basicPublish(String exchange, String routingKey, AMQP.BasicProperties props, byte[] body)
其中exchange是一个Producer与queue的中间交互机制。可以让Producer把消息按一定的规则发送到不同的queue,不需要的话就传空字符串
四、Consumer消费消息
1、被动消费模式
Consumer等待rabbitMQ服务器将message推送过来再消费,一般是启动一个一直挂起的线程进行等待。
channel.basicConsume(String queue, boolean autoAck, Consumer callback)
//autoAck为true则表示消息发送到该Consumer后就被Consumer消费掉了,不需要再往其他Consumer转发,false则会继续往其他Consumer转发
//要注意如果每个Consumer一直为false,会导致消息不停的被转发,不停的吞噬系统资源,最终造成宕机
2、主动消费模式
Comsumer主动到rabbitMQ服务器上去获取指定的messge进行消费
GetResponse response = channel.basicGet(QUEUE_NAME, boolean autoAck)
3、Stream队列消费
消费Stream队列时,需要注意以下三点的设置:
Connection connection = RabbitMQUtil.getConnection();
Channel channel = connection.createChannel();
//1、这个属性必须设置。
channel.basicQos(1);
//2、声明Stream队列
Map params = new HashMap<>();
params.put("x-queue-type","stream");
params.put("x-max-length-bytes", 20_000_000_000L);
params.put("x-stream-max-segment-size-bytes", 100_000_000);
channel.queueDeclare(QUEUE_NAME, true, false, false, params);
//Consumer接口还一个实现QueueConsuemr,但是代码注释过期了。
Consumer myconsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("========================");
String routingKey = envelope.getRoutingKey();
System.out.println("routingKey >" + routingKey);
String contentType = properties.getContentType();
System.out.println("contentType >" + contentType);
long deliveryTag = envelope.getDeliveryTag();
System.out.println("deliveryTag >" + deliveryTag);
System.out.println("content:" + new String(body, "UTF-8"));
// (process the message components here ...)
//消息处理完后,进行答复。答复过的消息,服务器就不会再次转发。
//没有答复过的消息,服务器会一直不停转发。
channel.basicAck(deliveryTag, false);
}
};
//3、消费时,必须指定offset。 可选的值:
// first: 从日志队列中第一个可消费的消息开始消费
// last: 消费消息日志中最后一个消息
// next: 相当于不指定offset,消费不到消息。
// Offset: 一个数字型的偏移量
// Timestamp:一个代表时间的Data类型变量,表示从这个时间点开始消费。例如 一个小时前 Date timestamp = new Date(System.currentTimeMillis() - 60 * 60 * 1_000)
Map consumeParam = new HashMap<>();
consumeParam.put("x-stream-offset","last");
channel.basicConsume(QUEUE_NAME, false,consumeParam, myconsumer);
//channel.close();
五、关闭连接,释放资源
channel.close();
connection.close();