Kafka | RocketMQ | RabbitMQ | |
---|---|---|---|
定位 | 日志消息、监控数据 | 非日志的可靠消息传输 | 非日志的可靠消息传输 |
可用性 | 非常高、分布式、主从 | 非常高、分布式、主从 | 高、主从、采用镜像模式实现,数据量大时可能有性能问题 |
消息可靠性 | 异步刷盘,容易丢数据 | 同步刷盘、异步刷盘 | 同步刷盘 |
单机吞吐量 | 百万级 | 十万级 | 万级 |
堆积能力 | 非常好 | 非常好 | 一般 |
顺序消费 | 支持,一台broker宕机后,消息会乱序 | 支持,顺序消费场景下,消费失败时消费队列将会暂停 | 支持,如果一个消费失败,此消息的顺序会被打乱 |
定时消息 | 不支持 | 支持 | 支持 |
事务消息 | 不支持 | 支持 | 不支持 |
消息重试 | 不支持 | 支持 | 支持 |
死信队列 | 不支持 | 支持 | 支持 |
访问权限 | 无 | 无 | 类似数据库,配置用户名和密码 |
引入jar包
org.apache.rocketmq
rocketmq-client
4.9.5
生产者
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
public class SimpleProducer {
public static void main(String[] args) throws Exception{
//定义消费的生产者对象
DefaultMQProducer producer = new DefaultMQProducer("helloProducerGroup");
//定义nameServer地址
producer.setNamesrvAddr("127.0.0.1:9876");
//自动连接(从nameServer拉取broker地址,并且建立连接)
producer.start();
//定义消息发送的目的地Topic
String topic = "helloTopic";
for(int i=0;i<10;i++){
//定义消息
Message message = new Message(topic,("helloTopic的消息=="+i).getBytes());
//发送消息
SendResult result = producer.send(message);
//输出消息储存的结果
System.out.println("消息存储的状态:"+result.getSendStatus());
System.out.println("消息存储的消息ID:"+result.getMsgId());
}
//关闭连接
producer.shutdown();
}
}
消费者
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
public class SimpleConsumer {
public static void main(String[] args) throws Exception {
//定义消息的消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("helloConsumerGroup");
//定义nameServer地址
consumer.setNamesrvAddr("127.0.0.1:9876");
//定义消费的主题
String topic = "helloTopic";
//监听该主题消息
consumer.subscribe(topic,"*");
//设置消息监听器,服务器把消息推送给我们,消费消息
consumer.setMessageListener((MessageListenerConcurrently) (list, consumeConcurrentlyContext) -> {
for(MessageExt messageExt:list){
System.out.println("处理的线程:"+Thread.currentThread()+",消息内容:"+new String(messageExt.getBody()));
}
//告诉消息中间件,消息处理的情况
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();
}
}