• RocketMQ部署


    RocketMQ部署手册

    单MasterRocketMQ集群

    系统要求与准备条件

    1. 64位操作系统,推荐 Linux/Unix/macOS

    2. 64位 JDK 1.8+

    3. Maven

      tips

      检验java环境与maven环境

      java -version

      截屏2022-09-15 15.20.46

      mvn -v截屏2022-09-15 15.20.09

    下载安装Apache RocketMQ

    RocketMQ 的安装包分为两种,二进制包和源码包,二进制包是已经编译完成后可以直接运行的,源码包是需要编译后运行的。

    截屏2022-09-15 15.31.08

    启动NameServer

    ### 启动namesrv
    $ nohup sh bin/mqnamesrv &
     
    ### 验证namesrv是否启动成功
    $ tail -f ~/logs/rocketmqlogs/namesrv.log
    The Name Server boot success...
    

    截屏2022-09-15 16.28.15

    我们可以在namesrv.log 中看到 'The Name Server boot success..', 表示NameServer 已成功启动。

    截屏2022-09-15 16.28.50

    启动Broker

    ### 先启动broker
    $ nohup sh bin/mqbroker -n localhost:9876 &
    
    ### 验证broker是否启动成功, 比如, broker的ip是192.168.1.2 然后名字是broker-a
    $ tail -f ~/logs/rocketmqlogs/Broker.log 
    The broker[broker-a,192.169.1.2:10911] boot success...
    

    截屏2022-09-15 16.30.48

    我们可以在 Broker.log 中看到“The broker[brokerName,ip:port] boot success..”,这表明 broker 已成功启动。

    截屏2022-09-15 16.37.38

    工具测试消息收发

    在进行工具测试消息收发之前,我们需要告诉客户端NameServer的地址,RocketMQ有多种方式在客户端中设置NameServer地址,这里我们利用环境变量NAMESRV_ADDR

    截屏2022-09-15 16.42.18

    $ export NAMESRV_ADDR=localhost:9876
    $ sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
     SendResult [sendStatus=SEND_OK, msgId= ...
    
    $ sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
     ConsumeMessageThread_%d Receive New Messages: [MessageExt...
    

    截屏2022-09-15 16.44.46

    截屏2022-09-15 16.44.15

    安装可视化控制台

    1.下载项目

    在 GitHub 中搜索 rocketmq-externals,其中 rocketmq-console 就是 RocketMQ 可视化控制台,我们可以将源码克隆下来,然后自己 mvn package,然后运行 jar 包。

    或者直接下载官方提供的 1.0.0 版本的 rocketmq-console

    https://github.com/apache/rocketmq-externals/releases/tag/rocketmq-console-1.0.0

    下载 zip 包或者 tar 包

    截屏2022-09-15 17.29.41

    • 修改配置文件application.properties

      配置rocketmq.config.namesrvAddr属性的值,即nameserver的服务地址

      rocketmq.config.namesrvAddr=localhost:9876
      

    截屏2022-09-15 17.32.25

    • 保存修改后的配置文件,返回rocketmq-console目录

    • 使用maven打包命令打包

      mvn clean package -Dmaven.test.skip=true
      
    • 打包完成后进入target目录

    截屏2022-09-15 17.33.53

    rocketmq-console-ng-2.0.0.jar即为打包后得到的jar包

    • 启动程序

      nohup java -jar rocketmq-console-ng-2.0.0.jar &
      

      截屏2022-09-15 17.38.01

    截屏2022-09-15 17.38.48

    SDK测试消息收发

    准备工作

    启动 NameServer 和 broker

    nohup sh bin/mqnamesrv >mqnamesrv-log.txt &
    
    nohup sh bin/mqbroker -n 127.0.0.1:9876 >mqbroker-log.txt &
    

    启动控制台

    mvn spring-boot:run
    

    创建一个 topic名为 test_quick_topic

    截屏2022-09-15 17.41.37

    工具测试完成后,我们可以尝试使用 SDK 收发消息。这里以 Java SDK 为例介绍一下消息收发过程。

    • 在IDEA中创建一个Java工程。

    • pom.xml 文件中添加以下依赖引入Java依赖库。

        <dependencies>
            <dependency>
                <groupId>org.apache.rocketmqgroupId>
                <artifactId>rocketmq-clientartifactId>
                <version>4.9.4version>
            dependency>
            <dependency>
                <groupId>com.alibabagroupId>
                <artifactId>fastjsonartifactId>
                <version>2.0.13.graalversion>
            dependency>
        dependencies>
    

    生产者

    • 同步投递10条消息
    package TestProducer;
    import org.apache.rocketmq.client.exception.MQBrokerException;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.remoting.exception.RemotingException;
    public class TestProducer01 {
        public static final String NAMESRV_ADDR = "127.0.0.1:9876";
        public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
            DefaultMQProducer producer = new DefaultMQProducer("test_quick_producer_name");
            producer.setNamesrvAddr(NAMESRV_ADDR);
            producer.start();
            for (int i = 0; i <10; i++) {
                Message message = new Message("test_quick_topic",//主题
                        "tagA", //标签
                        "key" + i, //自定义key,唯一标识
                        ("第" + i+"次消息").getBytes()); //消息内容实体 (byte[])
                SendResult result = producer.send(message);
                System.out.println("第" + i + "条消息发出,结果:" + result);
            }
            producer.shutdown();
        }
    }
    

    截屏2022-09-15 17.42.14

    消费者

    • 消费上面生产者生产的10条消息
    package TestConsumer;
    
    import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
    import org.apache.rocketmq.common.message.MessageExt;
    import org.apache.rocketmq.remoting.common.RemotingHelper;
    
    import java.util.List;
    
    public class TestConsumer01 {
        public static final String NAMESRV_ADDR = "127.0.0.1:9876";
        public static void main(String[] args) throws MQClientException {
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test_quick_consumer_name");
            consumer.setNamesrvAddr(NAMESRV_ADDR);
            //从最后开始消费
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
    //        consumer.subscribe("test_quick_topic","tagA"); //过滤:消费tag为tagA的消息
            consumer.subscribe("test_quick_topic", "*"); //消费所有的
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                    MessageExt messageExt = list.get(0);
                    try {
                        String topic = messageExt.getTopic();
                        String tags = messageExt.getTags();
                        String keys = messageExt.getKeys();
                        String msgBody = new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET);
                        System.out.println("topic: " + topic + ",tags: " + tags + ", keys: " + keys + ", body: " + msgBody);
                    } catch (Exception e) {
                        e.printStackTrace();
                        return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                    }
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
            consumer.start();
            System.out.println("comsumer start");
        }
    }
    

    截屏2022-09-15 17.43.25

    关闭服务器

    $ sh bin/mqshutdown broker
    The mqbroker(36695) is running...
    Send shutdown request to mqbroker(36695) OK
    
    $ sh bin/mqshutdown namesrv
    The mqnamesrv(36664) is running...
    Send shutdown request to mqnamesrv(36664) OK
    
  • 相关阅读:
    Latex 爬过的坑(5)——解决 IEEE 模板论文作者对齐问题
    java哈希表(含校园系统散列表的代码)
    小车PWM调速-模式选择
    MATLAB | 你是猫猫教还是狗狗教还是ikun
    周赛总结--LeetCode单周赛321场 && AcWing79场
    std::minus
    【实践篇】Redis最强Java客户端(三)之Redisson 7种分布式锁使用指南
    ESP32_esp-idf_lvgl_V8环境搭建移植
    Linux本地部署1Panel现代化运维管理面板并实现公网访问
    《设计模式Java版》读书笔记
  • 原文地址:https://www.cnblogs.com/POCOPOCOPOCO/p/16697680.html