目录
RocketMQ作为一款纯java、分布式、队列模型的开源消息中间件,支持事务消息、顺序消息、批量消息、定时消息、消息回溯等。
RocketMQ主要有四大核心组成部分:NameServer、Broker、Producer以及Consumer四部分。
RocketMQ 优势
支持事务型消息(消息发送和 DB 操作保持两方的最终一致性,RabbitMQ 和 Kafka 不支持)
支持结合 RocketMQ 的多个系统之间数据最终一致性(多方事务,二方事务是前提)
支持 18 个级别的延迟消息(Kafka 不支持)
支持指定次数和时间间隔的失败消息重发(Kafka 不支持,RabbitMQ 需要手动确认)
支持 Consumer 端 Tag 过滤,减少不必要的网络传输(即过滤由MQ完成,而不是由消费者完成。RabbitMQ 和 Kafka 不支持)
支持重复消费(RabbitMQ 不支持,Kafka 支持)
- mkdir rocketmq
- mkdir conf
- mkdir logs
- mkdir store
- version: "3"
- services:
- mqnamesrv:
- image: foxiswho/rocketmq:4.7.0 #安装什么版本就写什么版本
- container_name: mqnamesrv
- ports:
- - 9876:9876
- environment:
- JAVA_OPT: -server -Xms256m -Xmx256m
- command: sh mqnamesrv
- mqbroker:
- image: foxiswho/rocketmq:4.7.0
- container_name: mqbroker
- ports:
- - 10911:10911
- - 10909:10909
- volumes:
- - ./conf/broker.conf:/usr/local/dockerCompose/rocketmq/conf/broker.conf
- environment:
- JAVA_OPT_EXT: -server -Xms256m -Xmx256m -Xmn128m
- NAMESRV_ADDR: mqnamesrv:9876
- command: sh mqbroker -n mqnamesrv:9876 -c /usr/local/dockerCompose/rocketmq/conf/broker.conf
- mqconsole:
- image: styletang/rocketmq-console-ng
- container_name: mqconsole
- ports:
- - 19876:8080
- environment:
- JAVA_OPTS: -Drocketmq.namesrv.addr=mqnamesrv:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=falses
vi broker.conf
- brokerClusterName = DefaultCluster
- brokerName = broker-a
- brokerId = 0
- deleteWhen = 04
- fileReservedTime = 48
- brokerRole = ASYNC_MASTER
- flushDiskType = ASYNC_FLUSH
-
-
- # 主机IP
- brokerIP1 = 自己的IP
docker-compose up -d
docker ps
- <dependency>
- <groupId>org.apache.rocketmq</groupId>
- <artifactId>rocketmq-client</artifactId>
- <version>4.7.0</version>
- </dependency>
- public class AsyncProducer {
- public static void main(String[] args) throws Exception {
- // 创建指定分组名的生产者
- DefaultMQProducer producer = new DefaultMQProducer("qiu");
- //自己的服务器地址
- producer.setNamesrvAddr("106.52.242.189:9876");
-
- // 启动生产者
- producer.start();
-
- for (int i = 0; i < 128; i++)
- try {
- // 构建消息
- Message msg = new Message("TopicTest",
- "TagA",
- "OrderID188",
- "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
-
- // 同步发送
- SendResult sendResult = producer.send(msg);
-
- // 打印发送结果
- System.out.printf("%s%n", sendResult);
- } catch (Exception e) {
- e.printStackTrace();
- }
-
- producer.shutdown();
- }
- }
-
- public class Consumer {
- public static void main(String[] args) throws InterruptedException, MQClientException {
-
- // Instantiate with specified consumer group name.
- DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("qiu");
-
- // Specify name server addresses.
- consumer.setNamesrvAddr("106.52.242.189:9876");
-
- // Subscribe one more more topics to consume.
- consumer.subscribe("TopicTest", "*");
- // Register callback to execute on arrival of messages fetched from brokers.
- consumer.registerMessageListener(new MessageListenerConcurrently() {
-
- @Override
- public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
- ConsumeConcurrentlyContext context) {
- System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
- return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
- }
- });
-
- //Launch the consumer instance.
- consumer.start();
-
- System.out.printf("Consumer Started.%n");
- }
- }
发送消息成功
接收消息成功
后台监控成功