• RocketMq快速入门(详解)


    目录

    Docker-compose安装RocketMQ

    一、docker目录下建立以下目录文件

    二、docker-compose.yml配置

    三、修改broker.conf

    四.开启端口

     五. 安装启动

     六. 查看是否启动

    七. 浏览器访问检验

    简单消息示例

    1.添加依赖 

    2.异步发送消息

    3.消费消息

    4.启动测试

     

    RocketMQ作为一款纯java、分布式、队列模型的开源消息中间件,支持事务消息、顺序消息、批量消息、定时消息、消息回溯等。

    RocketMQ主要有四大核心组成部分:NameServerBrokerProducer以及Consumer四部分。

    RocketMQ 优势 

    支持事务型消息(消息发送和 DB 操作保持两方的最终一致性,RabbitMQ 和 Kafka 不支持)
    支持结合 RocketMQ 的多个系统之间数据最终一致性(多方事务,二方事务是前提)
    支持 18 个级别的延迟消息(Kafka 不支持)
    支持指定次数和时间间隔的失败消息重发(Kafka 不支持,RabbitMQ 需要手动确认)
    支持 Consumer 端 Tag 过滤,减少不必要的网络传输(即过滤由MQ完成,而不是由消费者完成。RabbitMQ 和 Kafka 不支持)
    支持重复消费(RabbitMQ 不支持,Kafka 支持)
     

    Docker-compose安装RocketMQ

    一、docker目录下建立以下目录文件

    1. mkdir rocketmq
    2. mkdir conf
    3. mkdir logs
    4. mkdir store

     

    二、docker-compose.yml配置

    1. version: "3"
    2. services:
    3. mqnamesrv:
    4. image: foxiswho/rocketmq:4.7.0 #安装什么版本就写什么版本
    5. container_name: mqnamesrv
    6. ports:
    7. - 9876:9876
    8. environment:
    9. JAVA_OPT: -server -Xms256m -Xmx256m
    10. command: sh mqnamesrv
    11. mqbroker:
    12. image: foxiswho/rocketmq:4.7.0
    13. container_name: mqbroker
    14. ports:
    15. - 10911:10911
    16. - 10909:10909
    17. volumes:
    18. - ./conf/broker.conf:/usr/local/dockerCompose/rocketmq/conf/broker.conf
    19. environment:
    20. JAVA_OPT_EXT: -server -Xms256m -Xmx256m -Xmn128m
    21. NAMESRV_ADDR: mqnamesrv:9876
    22. command: sh mqbroker -n mqnamesrv:9876 -c /usr/local/dockerCompose/rocketmq/conf/broker.conf
    23. mqconsole:
    24. image: styletang/rocketmq-console-ng
    25. container_name: mqconsole
    26. ports:
    27. - 19876:8080
    28. environment:
    29. JAVA_OPTS: -Drocketmq.namesrv.addr=mqnamesrv:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=falses

    三、修改broker.conf

    vi broker.conf
    1. brokerClusterName = DefaultCluster
    2. brokerName = broker-a
    3. brokerId = 0
    4. deleteWhen = 04
    5. fileReservedTime = 48
    6. brokerRole = ASYNC_MASTER
    7. flushDiskType = ASYNC_FLUSH
    8. # 主机IP
    9. brokerIP1 = 自己的IP

     

    四.开启端口

     五. 安装启动

    docker-compose up -d

     六. 查看是否启动

    docker ps

     

    七. 浏览器访问检验

     

    简单消息示例

    1.添加依赖 

    1. <dependency>
    2. <groupId>org.apache.rocketmq</groupId>
    3. <artifactId>rocketmq-client</artifactId>
    4. <version>4.7.0</version>
    5. </dependency>

    2.异步发送消息

    1. public class AsyncProducer {
    2. public static void main(String[] args) throws Exception {
    3. // 创建指定分组名的生产者
    4. DefaultMQProducer producer = new DefaultMQProducer("qiu");
    5. //自己的服务器地址
    6. producer.setNamesrvAddr("106.52.242.189:9876");
    7. // 启动生产者
    8. producer.start();
    9. for (int i = 0; i < 128; i++)
    10. try {
    11. // 构建消息
    12. Message msg = new Message("TopicTest",
    13. "TagA",
    14. "OrderID188",
    15. "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
    16. // 同步发送
    17. SendResult sendResult = producer.send(msg);
    18. // 打印发送结果
    19. System.out.printf("%s%n", sendResult);
    20. } catch (Exception e) {
    21. e.printStackTrace();
    22. }
    23. producer.shutdown();
    24. }
    25. }

     

    3.消费消息

    1. public class Consumer {
    2. public static void main(String[] args) throws InterruptedException, MQClientException {
    3. // Instantiate with specified consumer group name.
    4. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("qiu");
    5. // Specify name server addresses.
    6. consumer.setNamesrvAddr("106.52.242.189:9876");
    7. // Subscribe one more more topics to consume.
    8. consumer.subscribe("TopicTest", "*");
    9. // Register callback to execute on arrival of messages fetched from brokers.
    10. consumer.registerMessageListener(new MessageListenerConcurrently() {
    11. @Override
    12. public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
    13. ConsumeConcurrentlyContext context) {
    14. System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
    15. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    16. }
    17. });
    18. //Launch the consumer instance.
    19. consumer.start();
    20. System.out.printf("Consumer Started.%n");
    21. }
    22. }

     

    4.启动测试

    发送消息成功

     接收消息成功

     

    后台监控成功

     

  • 相关阅读:
    跨平台Android和IOS百度语音在线识别原生插件
    java计算机毕业设计深州市特色蜜桃产业电子商务系统源程序+mysql+系统+lw文档+远程调试
    Go语言语法分析之我想打同事的脸--编译
    Atlas (二) --------- Atlas 安装
    k8s异常Too many requests: Too many requests, please try again later.
    来文心中国行厦门站,感受大模型落地生花的进展!
    PROSS程序设计
    【微服务】Feign 整合 Sentinel,深入探索 Sentinel 的隔离和熔断降级规则,以及授权规则和自定义异常返回结果
    算法竞赛入门【码蹄集新手村600题】(MT1401-1450)
    汉诺塔问题
  • 原文地址:https://blog.csdn.net/lu__lala/article/details/125457565