• 在CentOS装个RocketMQ 然后用JAVA操作吧


    RocketMQ 简介 
    RocketMQ是一个开源的分布式消息系统。它基于高可用分布式集群技术,提供低延迟、高稳定性的消息发布和订阅服务。RocketMQ广泛应用于各种行业,如异步通信解耦、企业服务、金融结算、电信、电子商务、物流、营销、社交媒体、即时通讯、移动应用、手机游戏、视频、物联网、车联网。

    它具有以下特点: 面试可能会问啊

    消息发送和消费的严格顺序
    丰富的消息拉取模式
    消费者的横向可扩展性
    实时消息订阅
    亿级消息积累能力
     

    首先就是JAVA运行环境吧

    1. $ java -version
    2. java version "1.8.0_121"

    然后将这个下载下来并解压缩

    1. # Download release from the Apache mirror
    2. $ wget https://archive.apache.org/dist/rocketmq/4.9.3/rocketmq-all-4.9.3-bin-release.zip
    3. # Unpack the release
    4. $ unzip rocketmq-all-4.9.3-bin-release.zip

    启动一个namesrv吧

    nohup sh mqnamesrv &

    看到这个就启动了吧 端口号是9876啦

    The Name Server boot success...

    然后启动一个broker吧

    nohup sh ./bin/mqbroker -n xx.xx.xxx.xx:9876 -c ./conf/broker.conf autoCreateTopicEnable=true &

    卡一下子就起来了

    The broker[broker-a, xxx.xx.x.xxx:10911] boot success...

    这个配置文件别忘记配置哈 第一个是暴漏的IP 第二个是namesrv的IP哈

    1. brokerIP1=xxx.xx.x.xxx
    2. namesrvAddr=xxx.xx.x.xxx:9876

    防火墙别忘记关闭哈

    如果不关闭要开放端口啊 后边还有一个集群地呀 如果你集群的话

    9876 10911 10912 10909

    也可以使用命令查看运行情况

    1. ps -ef|grep broker
    2. ps -er|grep namesvr

    最后看心情关闭吧 使用kill 也行 使用 mqshutdown 也行

    1. kill -9 xxxx
    2. ./bin/mqshutdown namesrv
    3. ./bin/mqshutdown broker

    然后就是JAVA开始上场啦
    显示POM里的依赖

    1. <dependency>
    2. <groupId>org.apache.rocketmq</groupId>
    3. <artifactId>rocketmq-client</artifactId>
    4. <version>4.9.3</version>
    5. </dependency>

    这个消费者啊 接受消息 并处理

    1. package com.xlcdb.webapp;
    2. import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
    3. import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    4. import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    5. import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    6. import org.apache.rocketmq.client.exception.MQClientException;
    7. import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
    8. import org.apache.rocketmq.common.message.MessageExt;
    9. import java.util.List;
    10. public class Consumer {
    11. public static void main(String[] args) throws MQClientException {
    12. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(Config.producerGroup);
    13. //指定NameServer地址,多个地址以 ; 隔开
    14. consumer.setNamesrvAddr(Config.namesrvAddr); //修改为自己的
    15. /**
    16. * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费
    17. * 如果非第一次启动,那么按照上次消费的位置继续消费
    18. */
    19. consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
    20. consumer.subscribe("TopicTest", "*");
    21. consumer.registerMessageListener(new MessageListenerConcurrently() {
    22. @Override
    23. public ConsumeConcurrentlyStatus consumeMessage(List msgs,
    24. ConsumeConcurrentlyContext context) {
    25. try {
    26. for(MessageExt msg:msgs){
    27. String msgbody = new String(msg.getBody(), "utf-8");
    28. System.out.println(" MessageBody: "+ msgbody);//输出消息内容
    29. }
    30. } catch (Exception e) {
    31. e.printStackTrace();
    32. return ConsumeConcurrentlyStatus.RECONSUME_LATER; //稍后再试
    33. }
    34. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; //消费成功
    35. }
    36. });
    37. consumer.start();
    38. System.out.printf("Consumer Started.%n");
    39. }
    40. }

    最后来个发射器 发射给接收者

    1. package com.xlcdb.webapp;
    2. import org.apache.rocketmq.client.exception.MQClientException;
    3. import org.apache.rocketmq.client.producer.DefaultMQProducer;
    4. import org.apache.rocketmq.client.producer.SendResult;
    5. import org.apache.rocketmq.common.message.Message;
    6. import org.apache.rocketmq.remoting.common.RemotingHelper;
    7. public class Producer {
    8. public static void main(String[] args) throws InterruptedException, MQClientException {
    9. DefaultMQProducer producer = new DefaultMQProducer(Config.producerGroup);
    10. //指定NameServer地址
    11. producer.setNamesrvAddr(Config.namesrvAddr); //修改为自己的
    12. /**
    13. * Producer对象在使用之前必须要调用start初始化,初始化一次即可
    14. * 注意:切记不可以在每次发送消息时,都调用start方法
    15. */
    16. producer.start();
    17. for (int i = 0; i < 997892; i++) {
    18. try {
    19. //构建消息
    20. Message msg = new Message("TopicTest" /* Topic */,
    21. "TagA" /* Tag */,
    22. ("测试RocketMQ" + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
    23. );
    24. //发送同步消息
    25. SendResult sendResult = producer.send(msg);
    26. System.out.printf("%s%n", sendResult);
    27. } catch (Exception e) {
    28. e.printStackTrace();
    29. Thread.sleep(1000);
    30. }
    31. }
    32. producer.shutdown();
    33. }
    34. }

    最后看看一边是发射 一边是接受呢

     

    给出官方网站看看吧 强大又简单的好东西呢

    https://github.com/apache/rocketmq/

  • 相关阅读:
    zemax慧差与消慧差
    2.7 Python-运算符
    算法学习笔记Day8——回溯算法
    Task01|GriModel统计分析(下)|方法论与一元数值检验|假设检验1
    [JAVAee]Spring MVC
    Packet Tracer - 研究 DUAL FSM
    软件测试需求分析
    ardupilot开发 --- 外设适配器、外设拓展、AP_Periph 篇
    论文日记五:QueryInst
    Linux发散小知识
  • 原文地址:https://blog.csdn.net/winnershili/article/details/126401428