RocketMQ是一个分布式队列模型的消息中间件,现支持事务消息、顺序消息、批量消息。定时消息
RocketMQ由mqnamesrv和mqbroker组成,内部由group、topic、queue共同工作。
系统架构:
全局顺序:一个Topic内所有的消息都发布到到同一个Queue按先进先出的顺序进行消费
基于liunx centos7,docker-compose,apache/rocketmq:4.9.3
brokerClusterName=DefaultCluster
#容器化部署 这里需要填写 宿主机IP
brokerIP1=192.168.0.221
brokerName=broker-a
brokerId=0
deleteWhen=04
fileReservedTime=48
brokerRole=ASYNC_MASTER
flushDiskType=ASYNC_FLUSH
autoCreateTopicEnable=true
autoCreateSubscriptionGroup=true
version: '3.7'
services:
rocket-namesrv:
image: apache/rocketmq:4.9.3
container_name: rocket-namesrv
ports:
- "9876:9876"
volumes:
- /root/local/rocketmq/logs:/root/logs
networks:
- my-net
command: sh mqnamesrv
rocket-broker:
image: apache/rocketmq:4.9.3
container_name: rocket-broker
ports:
- 10909:10909
- 10911:10911
- 10912:10912
volumes:
- /root/local/rocketmq/broker.conf:/home/rocketmq/rocketmq-4.9.3/conf/broker.conf
networks:
- my-net
command: sh mqbroker -n rocket-namesrv:9876 -c /home/rocketmq/rocketmq-4.9.3/conf/broker.conf
depends_on:
- rocket-namesrv
#web控制台
rocket-console:
image: styletang/rocketmq-console-ng
container_name: rocket-console
ports:
- "8100:8080"
environment:
JAVA_OPTS: -Drocketmq.config.namesrvAddr=rocket-namesrv:9876 -Drocketmq.config.isVIPChannel=false
networks:
- my-net
depends_on:
- rocket-namesrv
networks:
my-net:
external: true
[root@m ~]# docker ps -a
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
4e076d69b867 styletang/rocketmq-console-ng "sh -c 'java $JAVA_O…" 2 hours ago Up 2 hours 0.0.0.0:8100->8080/tcp rocket-console
2bf702304170 apache/rocketmq:4.9.3 "sh mqbroker -n rock…" 2 hours ago Up 2 hours 0.0.0.0:10909->10909/tcp, 9876/tcp, 0.0.0.0:10911-10912->10911-10912/tcp rocket-broker
2e9fa82ef3b2 apache/rocketmq:4.9.3 "sh mqnamesrv" 2 hours ago Up 2 hours 10909/tcp, 0.0.0.0:9876->9876/tcp, 10911-10912/tcp rocket-namesrv
都为up状态即表示启动成功,访问一下web控制台:192.168.0.221:8100
已经测试发送过消息了,页面图可能不同

基于springboot 2.6.8
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.2</version>
</dependency>
rocketmq:
name-server: 192.168.0.221:9876
producer:
group: test_group
#默认3000 过短会报timeout错误
send-message-timeout: 10000
@Autowired
RocketMQTemplate rocketMQTemplate;
@GetMapping("/send")
public String send() {
JSONObject object = new JSONObject();
object.put("hello", "123456789");
try {
//send
SendResult sendResult = rocketMQTemplate.syncSend("test_topic", object.toJSONString());
log.info("消息推送成功,{},{}", sendResult, new Date().getTime());
} catch (Exception e) {
log.info("消息发送异常" + e);
}
return "success";
}
@Slf4j
@Component
@RocketMQMessageListener(topic = "test_topic", consumerGroup = "test_group")
public class RocketmqConsume implements RocketMQListener<String> {
@Override
public void onMessage(String s) {
log.info("收到消息:{},{}", s,new Date().getTime());
//默认自动确认 如报异常则会丢回队列 重新消费
// throw new RuntimeException("1234");
}
}
c.e.r.controller.RocketmqController: 消息推送成功,SendResult [sendStatus=SEND_OK, msgId=7F0000010D5018B4AAC278D1ECC40000, offsetMsgId=C0A800DD00002A9F0000000000001AF4, messageQueue=MessageQueue [topic=test_topic, brokerName=broker-a, queueId=1], queueOffset=3],1656039823574
c.e.rocketmq.consume.RocketmqConsume: 收到消息:{"hello":"123456789"},1656039823587
记录一下报错的点:
1.启动broker时需要指向namesrv服务的地址加端口
2.配置中一定要填的宿主机ip地址,不是容器名称
3.出现broker连接不上一般都是namesrv地址不对
以上就是本章的全部内容了。
上一篇:RabbitMQ第三话 – RabbitMQ高可用集群搭建
下一篇:RocketMQ第二话 – RocketMQ事务消息、延时消息实现
旧书不厌百回读,熟读精思子自知