RocketMQ 简介
RocketMQ是一个开源的分布式消息系统。它基于高可用分布式集群技术,提供低延迟、高稳定性的消息发布和订阅服务。RocketMQ广泛应用于各种行业,如异步通信解耦、企业服务、金融结算、电信、电子商务、物流、营销、社交媒体、即时通讯、移动应用、手机游戏、视频、物联网、车联网。
它具有以下特点: 面试可能会问啊
消息发送和消费的严格顺序
丰富的消息拉取模式
消费者的横向可扩展性
实时消息订阅
亿级消息积累能力
首先就是JAVA运行环境吧
- $ java -version
- java version "1.8.0_121"
然后将这个下载下来并解压缩
- # Download release from the Apache mirror
- $ wget https://archive.apache.org/dist/rocketmq/4.9.3/rocketmq-all-4.9.3-bin-release.zip
-
- # Unpack the release
- $ unzip rocketmq-all-4.9.3-bin-release.zip
启动一个namesrv吧
nohup sh mqnamesrv &
看到这个就启动了吧 端口号是9876啦
The Name Server boot success...
然后启动一个broker吧
nohup sh ./bin/mqbroker -n xx.xx.xxx.xx:9876 -c ./conf/broker.conf autoCreateTopicEnable=true &
卡一下子就起来了
The broker[broker-a, xxx.xx.x.xxx:10911] boot success...
这个配置文件别忘记配置哈 第一个是暴漏的IP 第二个是namesrv的IP哈
- brokerIP1=xxx.xx.x.xxx
- namesrvAddr=xxx.xx.x.xxx:9876
防火墙别忘记关闭哈
如果不关闭要开放端口啊 后边还有一个集群地呀 如果你集群的话
9876 10911 10912 10909
也可以使用命令查看运行情况
- ps -ef|grep broker
- ps -er|grep namesvr
最后看心情关闭吧 使用kill 也行 使用 mqshutdown 也行
- kill -9 xxxx
- ./bin/mqshutdown namesrv
- ./bin/mqshutdown broker
然后就是JAVA开始上场啦
显示POM里的依赖
- <dependency>
- <groupId>org.apache.rocketmq</groupId>
- <artifactId>rocketmq-client</artifactId>
- <version>4.9.3</version>
- </dependency>
这个消费者啊 接受消息 并处理
- package com.xlcdb.webapp;
-
- import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
- import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
- import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
- import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
- import org.apache.rocketmq.client.exception.MQClientException;
- import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
- import org.apache.rocketmq.common.message.MessageExt;
-
- import java.util.List;
-
- public class Consumer {
- public static void main(String[] args) throws MQClientException {
- DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(Config.producerGroup);
- //指定NameServer地址,多个地址以 ; 隔开
- consumer.setNamesrvAddr(Config.namesrvAddr); //修改为自己的
- /**
- * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费
- * 如果非第一次启动,那么按照上次消费的位置继续消费
- */
- consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
- consumer.subscribe("TopicTest", "*");
- consumer.registerMessageListener(new MessageListenerConcurrently() {
- @Override
- public ConsumeConcurrentlyStatus consumeMessage(List
msgs, - ConsumeConcurrentlyContext context) {
- try {
- for(MessageExt msg:msgs){
- String msgbody = new String(msg.getBody(), "utf-8");
- System.out.println(" MessageBody: "+ msgbody);//输出消息内容
- }
- } catch (Exception e) {
- e.printStackTrace();
- return ConsumeConcurrentlyStatus.RECONSUME_LATER; //稍后再试
- }
- return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; //消费成功
- }
- });
- consumer.start();
- System.out.printf("Consumer Started.%n");
- }
- }
最后来个发射器 发射给接收者
- package com.xlcdb.webapp;
-
- import org.apache.rocketmq.client.exception.MQClientException;
- import org.apache.rocketmq.client.producer.DefaultMQProducer;
- import org.apache.rocketmq.client.producer.SendResult;
- import org.apache.rocketmq.common.message.Message;
- import org.apache.rocketmq.remoting.common.RemotingHelper;
-
- public class Producer {
-
-
-
- public static void main(String[] args) throws InterruptedException, MQClientException {
- DefaultMQProducer producer = new DefaultMQProducer(Config.producerGroup);
- //指定NameServer地址
- producer.setNamesrvAddr(Config.namesrvAddr); //修改为自己的
- /**
- * Producer对象在使用之前必须要调用start初始化,初始化一次即可
- * 注意:切记不可以在每次发送消息时,都调用start方法
- */
- producer.start();
-
- for (int i = 0; i < 997892; i++) {
- try {
- //构建消息
- Message msg = new Message("TopicTest" /* Topic */,
- "TagA" /* Tag */,
- ("测试RocketMQ" + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
- );
- //发送同步消息
- SendResult sendResult = producer.send(msg);
- System.out.printf("%s%n", sendResult);
- } catch (Exception e) {
- e.printStackTrace();
- Thread.sleep(1000);
- }
- }
- producer.shutdown();
- }
-
- }
最后看看一边是发射 一边是接受呢
给出官方网站看看吧 强大又简单的好东西呢