1.6.1 使用Xshell连接Ubuntu
使用命令ifconfig查看虚拟机IP地址,如图1-xx所示。
图1-xx
使用如下命令安装ssh,以让Xshell软件远程连接到Ubuntu系统中:
sudo apt-get install openssh-server
如果出现如下错误:
E: 无法获得锁 /var/lib/dpkg/lock-frontend - open (11: 资源暂时不可用)
E: 无法获取 dpkg 前端锁 (/var/lib/dpkg/lock-frontend),是否有其他进程正占用它?
解决办法如下:
ghy@ghy-VirtualBox:~$ sudo apt-get install openssh-server
[sudo] ghy 的密码:
E: 无法获得锁 /var/lib/dpkg/lock-frontend - open (11: 资源暂时不可用)
E: 无法获取 dpkg 前端锁 (/var/lib/dpkg/lock-frontend),是否有其他进程正占用它?
ghy@ghy-VirtualBox:~$ sudo rm /var/lib/dpkg/lock-frontend
ghy@ghy-VirtualBox:~$ sudo apt-get install openssh-server
E: 无法获得锁 /var/lib/dpkg/lock - open (11: 资源暂时不可用)
E: 无法锁定管理目录(/var/lib/dpkg/),是否有其他进程正占用它?
ghy@ghy-VirtualBox:~$ sudo rm /var/lib/dpkg/lock
ghy@ghy-VirtualBox:~$
错误解决后再执行如下命令:
sudo apt-get install openssh-server
重新安装ssh。
安装成功效果如图1-xx所示。
图1-xx
在Xshell中配置主机IP地址,如图1-xx所示。
图1-xx
配置登陆账号及密码,如图1-xx所示。
图1-xx
连接成功效果如图1-xx所示。
图1-xx
使用ls命令查看文件夹,效果如图1-xx所示。
图1-xx
文件夹名称是中文的,在虚拟机中执行如下2条命令将文件夹名称改成英文版本:
export LANG=en_US
xdg-user-dirs-gtk-update
弹出界面如图1-xx所示。
图1-xx
点击“Update Names”按钮后再使用ls命令可以发现文件夹名称变成英文,效果如图1-xx所示。
图1-xx
至此使用Xshell工具成功连接到Ubuntu操作系统。
1.6.11 使用Java代码生产与消费消息
使用RocketMQ有三种方式发送消息:
(1)可靠同步
(2)可靠异步
(3)单向传输。
本章节使用“(1)可靠同步”的方式发送消息,也是最传统的方式。
1.6.11.1 创建消息生产者
(1)在IntelliJ IDEA中创建消息生产者项目Producer1。
(2)添加Maven依赖配置:
<dependency>
<groupId>org.apache.rocketmqgroupId>
<artifactId>rocketmq-clientartifactId>
<version>4.3.0version>
dependency>
(3)生产者代码如下:
package com.ghy.www;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class Producer {
public static void main(String[] args) throws Exception {
//指定ProducerGroupName实例化DefaultMQProducer对象。
DefaultMQProducer producer = new
DefaultMQProducer("my_group_name");
//指定NameServer地址。
producer.setNamesrvAddr("192.168.1.114:9876");
//启动DefaultMQProducer实例.
producer.start();
//for循环开始发送100条消息。
for (int i = 1; i <= 100; i++) {
//创建1个Message对象,指定主题topic,标记tag和消息主体message body。
Message msg = new Message("TopicTest",
"TagA",
("你好 RocketMQ! 消息ID为:" +
i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
//使用producer向其中一个Broker发送消息。
SendResult sendResult = producer.send(msg);
//打印发送结果
System.out.println((i) + " sendResult=" + sendResult);
}
//一旦DefaultMQProducer实例不再使用,请关闭。
producer.shutdown();
}
}
(4)程序运行后控制台输出部分结果如下:
99 sendResult=SendResult [sendStatus=SEND_OK, msgId=C0A801683A7418B4AAC2559EF2AF0062, offsetMsgId=C0A8017200002A9F0000000000030A13, messageQueue=MessageQueue [topic=TopicTest, brokerName=ghy-VirtualBox, queueId=3], queueOffset=274]
100 sendResult=SendResult [sendStatus=SEND_OK, msgId=C0A801683A7418B4AAC2559EF2AF0063, offsetMsgId=C0A8017200002A9F0000000000030AD8, messageQueue=MessageQueue [topic=TopicTest, brokerName=ghy-VirtualBox, queueId=0], queueOffset=274]
1.6.11.2 创建消息消费者
(1)在IntelliJ IDEA中创建消息消费者项目Consumer1。
(2)添加Maven依赖配置:
<dependency>
<groupId>org.apache.rocketmqgroupId>
<artifactId>rocketmq-clientartifactId>
<version>4.3.0version>
dependency>
(3)消费者代码如下:
package com.ghy.www;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class Consumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
//指定ProducerGroupName实例化DefaultMQPushConsumer对象。
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my_group_name");
//指定NameServer地址。
consumer.setNamesrvAddr("192.168.1.114:9876");
//订阅主题。
consumer.subscribe("TopicTest", "*");
//注册回调,从Broker获取消息时执行。
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.println("当前线程:" + Thread.currentThread().getName() + " 接收消息为:" + msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//启动消费者
consumer.start();
System.out.printf("消息消费者启动了!");
}
}
(4)程序运行后控制台输出部分结果如下:
99 sendResult=SendResult [sendStatus=SEND_OK, msgId=C0A801683A7418B4AAC2559EF2AF0062, offsetMsgId=C0A8017200002A9F0000000000030A13, messageQueue=MessageQueue [topic=TopicTest, brokerName=ghy-VirtualBox, queueId=3], queueOffset=274]
100 sendResult=SendResult [sendStatus=SEND_OK, msgId=C0A801683A7418B4AAC2559EF2AF0063, offsetMsgId=C0A8017200002A9F0000000000030AD8, messageQueue=MessageQueue [topic=TopicTest, brokerName=ghy-VirtualBox, queueId=0], queueOffset=274]
1.6.12 配置并运行rocketmq-console
rocketmq-console是RocketMQ基于Web的监控平台,网址如下:
https://github.com/apache/rocketmq-externals/tree/master/rocketmq-console
rocketmq-console是rocketmq-externals的子项目,下载rocketmq-console需要到如下网址:
https://github.com/apache/rocketmq-externals
下载zip源代码压缩包并解压,编辑文件:
rocketmq-console\src\main\resources\application.properties
指定NameServer服务器地址:
rocketmq.config.namesrvAddr=192.168.1.114:9876
进入rocketmq-console项目根文件夹执行如下命令:
mvn clean package -Dmaven.test.skip=true
使用Maven进行编译,生成jar文件,如图1-xx所示。
图1-xx
使用如下命令启动Web监控服务:
java -jar rocketmq-console-ng-1.0.1.jar
运行成功的效果如图1-xx所示。
图1-xx
执行网址:
显示界面如图1-xx所示。
图1-xx
成功显示Web监控界面。