• linux安装rocketmq


    rocketmq文档:https://rocketmq.apache.org/docs/quick-start/.

    一、安装rocketMQ

    下载链接https://rocketmq.apache.org/dowloading/releases/.
    在这里插入图片描述

    //下载
    [root@liar local]# wget https://dist.apache.org/repos/dist/dev/rocketmq/5.0.0-ALPHA-rc2/rocketmq-all-5.0.0-ALPHA-bin-release.zip
    解压
    [root@liar local]# unzip rocketmq-all-5.0.0-ALPHA-bin-release.zip
    
    • 1
    • 2
    • 3
    • 4
    配置rocketmq环境变量
    [root@liar local]# vim /etc/profile
    
    export rocketmq=/usr/local/rocketmq-all-5.0.0-ALPHA-bin-release/
    export PATH=$PATH:$rocketmq/bin
    
    • 1
    • 2
    • 3
    • 4
    • 5

    在这里插入图片描述

    二、启动RocketMQ

    修改一下RocketMQ启动的内存大小

    [root@liar bin]# vim runbroker.sh
    
    • 1

    在这里插入图片描述

    [root@liar bin]# [root@liar bin]# vim runserver.sh
    
    • 1

    在这里插入图片描述

    启动mqnameserver(&表示后台启动,不能少)

    进入rocket的bin目录执行命令命令:nohup sh mqnamesrv &

    [root@liar bin]# nohup sh mqnamesrv &
    [1] 146587
    [root@liar bin]# nohup: ignoring input and appending output to 'nohup.out'
    
    • 1
    • 2
    • 3

    查看是否成功

    [root@liar bin]# tail -1000f nohup.out
    
    • 1

    在这里插入图片描述
    或者jps查看

    [root@liar bin]# jps
    146587 NamesrvStartup
    146604 Jps
    
    • 1
    • 2
    • 3

    启动broker

    进入rocket的bin目录执行命令命令:nohup sh mqbroker -n localhost:9876 &

    nohup sh mqbroker -n localhost:9876 &
    
    • 1

    发送消息报错指定自动创建topic
    自动创建topic:启动broker时加上自动创建topic的参数,如下,其中autoCreateTopicEnable=true表示自动创建topic

    nohup sh mqbroker -n localhost:9876 autoCreateTopicEnable=true > ../broker.log &
    
    • 1

    命令行发送消息

    export NAMESRV_ADDR=localhost:9876
    
    sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
    
    • 1
    • 2
    • 3

    在这里插入图片描述

    命令行接收消息

    sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
    
    • 1

    在这里插入图片描述

    三、集群搭建

    配置相关属性

    序号主机名/IPIP功能BROKER角色
    1rocketmqOS192.168.182.147NameServer + BrokerMaster1 + Slave2
    2rocketmqOS292.168.182.148NameServer + BrokerMaster2 + Slave1

    修改rocketmqOS1配置文件

    在这里插入图片描述
    相关属性解析

    # broker所属集群的名称
    brokerClusterName = DefaultCluster
     
    # broker的名称
    brokerName = broker-a
     
    # broker的ID, 0表示Master,非0表示Slave
    brokerId = 0
     
    # 删除文件时间点,默认是凌晨4点
    deleteWhen = 04
     
    # 文件保留时间,默认保留48小时
    fileReservedTime = 48
     
    # broker的角色
    # ASYNC_MASTER: 异步复制Master
    # SYNC_MASTER: 同步双写Master
    # SLAVE: slave
    brokerRole = ASYNC_MASTER
     
    # 刷盘方式
    # ASYNC_FLUSH: 异步刷盘
    # SYNC_FLUSH: 同步刷盘
    flushDiskType = ASYNC_FLUSH
     
    # NameServer的地址,如果有多个的话,使用分号分隔开
    namesrvAddr=10.0.90.59:9876
     
    # 当前broker监听的IP地址
    brokerIP1=10.0.90.59
     
    # 在发送消息时,自动创建服务器不存在的topic,默认创建4个队列 
    defaultTopicQueueNums=4
     
    # 是否允许broker自动创建Topic
    autoCreateTopicEnable=true
     
    #是否允许broker自动创建订阅组
    autoCreateSubscriptionGroup=true
     
    # broker对外服务的监听端口 
    listenPort=10911
     
    # 每个commitLog文件的大小默认是1G
    mapedFileSizeCommitLog=1073741824
     
    # ConsumeQueue每个文件默认存30W条
    mapedFileSizeConsumeQueue=300000
     
    # store的存储路径
    storePathRootDir=/rocketmq/rocketmq-4.9.2/store
     
    # commitLog的存储路径 
    storePathCommitLog=/rocketmq/rocketmq-4.9.2/store/commitlog
     
    # 消费队列的存储路径
    storePathConsumeQueue=/rocketmq/rocketmq-4.9.2/store/consumequeue
     
    # 消息索引的存储路径
    storePathIndex=/rocketmq/rocketmq-4.9.2/store/index
     
    # checkpoint文件的存储路径
    storeCheckpoint=/rocketmq/rocketmq-4.9.2/store/checkpoint
     
    # abort文件的存储路径
    abortFile=/rocketmq/rocketmq-4.9.2/store/abort
     
    # 限制的消息大小,默认为4M 
    maxMessageSize=65536
     
    # 检测物理文件磁盘空间
    diskMaxUsedSpaceRatio=75
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73

    修改broker-a.properties

    namesrvAddr=192.168.182.147:9876;192.168.182.148:9876
    
    • 1

    在这里插入图片描述
    修改broker-b-s.properties

    namesrvAddr=192.168.182.147:9876;192.168.182.148:9876
    listenPort=11911
    storePathRootDir=~/store-s 
    storePathCommitLog=~/store-s/commitlog 
    storePathConsumeQueue=~/store-s/consumequeue 
    storePathIndex=~/store-s/index 
    storeCheckpoint=~/store-s/checkpoint 
    abortFile=~/store-s/abort
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    在这里插入图片描述

    修改rocketmqOS2配置文件

    在这里插入图片描述
    在这里插入图片描述

    namesrvAddr=192.168.182.147:9876;192.168.182.148:9876
    listenPort=11911
    storePathRootDir=~/store-s 
    storePathCommitLog=~/store-s/commitlog 
    storePathConsumeQueue=~/store-s/consumequeue 
    storePathIndex=~/store-s/index 
    storeCheckpoint=~/store-s/checkpoint 
    abortFile=~/store-s/abort
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    在这里插入图片描述
    broker-b.properties

    namesrvAddr=192.168.182.147:9876;192.168.182.148:9876
    
    • 1

    启动服务器

    启动NameServer集群
    分别启动rocketmqOS1与rocketmqOS2两个主机中的NameServer。启动命令完全相同。

    nohup sh bin/mqnamesrv & 
    tail -f ~/logs/rocketmqlogs/namesrv.log
    
    • 1
    • 2

    启动两个Master
    分别启动rocketmqOS1与rocketmqOS2两个主机中的broker master。注意,它们指定所要加载的配置
    文件是不同的。

    nohup sh bin/mqbroker -c conf/2m-2s-async/broker-a.properties & 
    tail -f ~/logs/rocketmqlogs/broker.log
    
    • 1
    • 2
    nohup sh bin/mqbroker -c conf/2m-2s-async/broker-b.properties & 
    tail -f ~/logs/rocketmqlogs/broker.log
    
    • 1
    • 2

    启动两个Slave
    分别启动rocketmqOS1与rocketmqOS2两个主机中的broker slave。注意,它们指定所要加载的配置文
    件是不同的。

    nohup sh bin/mqbroker -c conf/2m-2s-async/broker-b-s.properties & 
    tail -f ~/logs/rocketmqlogs/broker.log
    
    • 1
    • 2
    nohup sh bin/mqbroker -c conf/2m-2s-async/broker-a-s.properties & 
    tail -f ~/logs/rocketmqlogs/broker.log
    
    • 1
    • 2

    确定启动正常
    在这里插入图片描述

    四、控制台安装

    下载地址:https://github.com/apache/rocketmq-externals/releases
    在这里插入图片描述
    在这里插入图片描述
    pom.xml添加依赖

            <dependency>
                <groupId>javax.xml.bindgroupId>
                <artifactId>jaxb-apiartifactId>
                <version>2.3.0version>
            dependency>
            <dependency>
                <groupId>com.sun.xml.bindgroupId>
                <artifactId>jaxb-implartifactId>
                <version>2.3.0version>
            dependency>
            <dependency>
                <groupId>com.sun.xml.bindgroupId>
                <artifactId>jaxb-coreartifactId>
                <version>2.3.0version>
            dependency>
            <dependency>
                <groupId>javax.activationgroupId>
                <artifactId>activationartifactId>
                <version>1.1.1version>
            dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    在console下打包

    在这里插入图片描述

    mvn clean package -Dmaven.test.skip=ture
    
    • 1

    启动在这里插入图片描述
    访问:http://localhost:7000.
    在这里插入图片描述

    同步

    public class SyncProducer {
        public static void main(String[] args) throws Exception {
            // 创建一个producer,参数为Producer Group名称
            DefaultMQProducer producer = new DefaultMQProducer("pg");
            // 指定nameServer地址
            producer.setNamesrvAddr("rocketmqOS:9876");
            // 设置同步发送失败时重试发送的次数,默认为2次
            producer.setRetryTimesWhenSendFailed(3);
            // 设置发送超时时限为5s,默认3s
            producer.setSendMsgTimeout(5000);
    
            // 开启生产者
            producer.start();
    
            // 生产并发送100条消息
            for (int i = 0; i < 100; i++) {
                byte[] body = ("Hi," + i).getBytes();
                Message msg = new Message("someTopic", "someTag", body);
                // 为消息指定key
                msg.setKeys("key-" + i);
                // 同步发送消息
                SendResult sendResult = producer.send(msg);
                System.out.println(sendResult);
            }
            // 关闭producer
            producer.shutdown();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    在linux种查看
    在这里插入图片描述
    控制台查看
    在这里插入图片描述

    定义异步消息发送生产者

    public class AsyncProducer {
        public static void main(String[] args) throws Exception {
            DefaultMQProducer producer = new DefaultMQProducer("pg");
            producer.setNamesrvAddr("rocketmqOS:9876");
            // 指定异步发送失败后不进行重试发送
            producer.setRetryTimesWhenSendAsyncFailed(0);
            // 指定新创建的Topic的Queue数量为2,默认为4
            producer.setDefaultTopicQueueNums(2);
    
            producer.start();
    
            for (int i = 0; i < 100; i++) {
                byte[] body = ("Hi," + i).getBytes();
                try {
                    Message msg = new Message("myTopicA", "myTag", body);
                    // 异步发送。指定回调
                    producer.send(msg, new SendCallback() {
                        // 当producer接收到MQ发送来的ACK后就会触发该回调方法的执行
                        @Override
                        public void onSuccess(SendResult sendResult) {
                            System.out.println(sendResult);
                        }
    
                        @Override
                        public void onException(Throwable e) {
                            e.printStackTrace();
                        }
                    });
                } catch (Exception e) {
                    e.printStackTrace();
                }
            } // end-for
    
            // sleep一会儿
            // 由于采用的是异步发送,所以若这里不sleep,
            // 则消息还未发送就会将producer给关闭,报错
            TimeUnit.SECONDS.sleep(3);
            producer.shutdown();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    public class SomeConsumer {
    
        public static void main(String[] args) throws MQClientException {
            // 定义一个pull消费者
            // DefaultLitePullConsumer consumer = new DefaultLitePullConsumer("cg");
            // 定义一个push消费者
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("cg");
            // 指定nameServer
            consumer.setNamesrvAddr("rocketmqOS:9876");
            // 指定从第一条消息开始消费
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
            // 指定消费topic与tag
            consumer.subscribe("someTopic", "*");
            // 指定采用“广播模式”进行消费,默认为“集群模式”
            // consumer.setMessageModel(MessageModel.BROADCASTING);
    
            // 顺序消息消费失败的消费重试时间间隔,默认为1000毫秒,其取值范围为[10, 30000]毫秒
            consumer.setSuspendCurrentQueueTimeMillis(100);
    
            // 修改消费重试次数
            consumer.setMaxReconsumeTimes(20);
    
            // 注册消息监听器
            consumer.registerMessageListener(new MessageListenerConcurrently() {
    
                // 一旦broker中有了其订阅的消息就会触发该方法的执行,
                // 其返回值为当前consumer消费的状态
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                                ConsumeConcurrentlyContext context) {
                    // 逐条消费消息
                    for (MessageExt msg : msgs) {
                        System.out.println(msg);
                    }
                    // 返回消费状态:消费成功
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
    
            // 开启消费者消费
            consumer.start();
            System.out.println("Consumer Started");
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44

    定义单向消息发送生产者

    public class OnewayProducer {
        public static void main(String[] args) throws Exception {
            DefaultMQProducer producer = new DefaultMQProducer("pg");
            producer.setNamesrvAddr("rocketmqOS:9876");
            producer.start();
            for (int i = 0; i < 10; i++) {
                byte[] body = ("Hi," + i).getBytes();
                Message msg = new Message("single", "someTag", body);
                // 单向发送 producer.sendOneway(msg); }
                producer.shutdown();
                System.out.println("producer shutdown");
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    消费

    public class RetryConsumer {
    
        public static void main(String[] args) throws MQClientException {
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("cg");
            consumer.setNamesrvAddr("mqOS:9876");
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
            consumer.subscribe("someTopic", "*");
            consumer.registerMessageListener(new MessageListenerConcurrently() {
    
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                                ConsumeConcurrentlyContext context) {
                   try {
                       // ....
                   } catch (Throwable e) {
                       return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                   }
    
                    // 返回消费状态:消费成功
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
    
            // 开启消费者消费
            consumer.start();
            System.out.printf("Consumer Started.%n");
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
  • 相关阅读:
    【324. 摆动排序 II】
    最全常用高数公式
    【Docker】二、docker镜像的制作运行发布
    百度一面:谈谈 @Transactional 的原理和坑
    笔试强训48天——day15
    WebScoket
    配置web&数据库开发环境
    程序员刚毕业,先去大厂镀金还是先去小厂攒经验?
    # 用acme.sh申请证书(含泛域名)
    图像超分算法总结
  • 原文地址:https://blog.csdn.net/weixin_56219549/article/details/126143231