本文主要记录mqtt消息件emqx的安装及使用
基于liunx centos7,docker-compose,emqx:4.4.4。
version: '3.7'
services:
emqx01:
image: emqx:4.4.4
container_name: emqx01
ports:
- "1893:1883" #tcp连接
- "18083:18083" #控制台
- "8083:8083" #控制台工具websocket ws用
- "8084:8084" #控制台工具websocket wss用
environment:
- TZ=Asia/Shanghai
networks:
- my-net
networks:
#新增的网络 内部服务名调用
my-net:
external: true
[root@m emqx]# docker-compose ps -a
Name Command State Ports
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
emqx01 /usr/bin/docker-entrypoint ... Up 11883/tcp, 0.0.0.0:18093->18083/tcp, 0.0.0.0:1893->1883/tcp, 4369/tcp, 4370/tcp, 5369/tcp, 6369/tcp, 6370/tcp, 8081/tcp, 0.0.0.0:8093->8083/tcp,
0.0.0.0:8094->8084/tcp, 8883/tcp
[root@m emqx]# docker-compose logs -f
Attaching to emqx01
emqx01 | Starting emqx on node e15d8dee531f@172.19.0.2
emqx01 | Start mqtt:tcp:internal listener on 127.0.0.1:11883 successfully.
emqx01 | Start mqtt:tcp:external listener on 0.0.0.0:1883 successfully.
emqx01 | Start mqtt:ws:external listener on 0.0.0.0:8083 successfully.
emqx01 | Start mqtt:ssl:external listener on 0.0.0.0:8883 successfully.
emqx01 | Start mqtt:wss:external listener on 0.0.0.0:8084 successfully.
emqx01 | Start http:management listener on 8081 successfully.
emqx01 | 2022-06-27T18:40:03.425652+08:00 [warning] [Dashboard] Using default password for dashboard 'admin' user. Please use './bin/emqx_ctl admins' command to change it. NOTE: the default password in config file is only used to initialise the database record, changing the config file after database is initialised has no effect.
emqx01 | Start http:dashboard listener on 18083 successfully.
从日志可以看出,dashboard listener on 18083,admin/public
基于springboot2.5.6
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
<version>5.5.13</version>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stream</artifactId>
<version>5.5.13</version>
</dependency>
mqtt:
test:
url: tcp://192.168.0.221:1883
clientId: testmqtt #连接的一个标识
topic: mqtt/test
userName: admin
passWord: public
timeout: 5000
keepAlive: 10
@Slf4j
@ConfigurationProperties("mqtt.test")
@Component
@Data
public class MqttConfig {
String url;
String clientId;
String topic;
String userName;
String passWord;
Integer timeout;
Integer keepAlive;
@Bean
public MqttClient initClient() {
try {
MqttClient client = new MqttClient(url, clientId);
MqttConnectOptions options = new MqttConnectOptions();
options.setUserName(userName);
options.setPassword(passWord.toCharArray());
options.setCleanSession(true);
options.setConnectionTimeout(timeout);
options.setKeepAliveInterval(keepAlive);
client.setCallback(new MqttConsume());
IMqttToken iMqttToken = client.connectWithResult(options);
boolean complete = iMqttToken.isComplete();
log.info("mqtt建立连接:{}", complete);
//我这里就直接订阅了 消息消费者MqttConsume
client.subscribe(topic, 0);
log.info("已订阅topic:{}", topic);
return client;
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException("mqtt 连接异常");
}
}
}
@Slf4j
public class MqttConsume implements MqttCallback {
//连接丢失
@Override
public void connectionLost(Throwable throwable) {
}
@Override
public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
log.info("收到消息:topic:{},Qos:{},msg:{}",
topic, mqttMessage.getQos(), new String(mqttMessage.getPayload()));
}
//传输完成
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
}
}
@Autowired
MqttClient client;
@Autowired
MqttConfig mqttConfig;
@GetMapping("/send")
public String send() {
try {
MqttMessage message = new MqttMessage();
message.setQos(0);
message.setRetained(false);
for (int i = 0; i < 10; i++) {
message.setPayload(("1241234&" + i).getBytes());
client.publish(mqttConfig.getTopic(), message);
log.info("发送成功:{}", i);
Thread.sleep(1000);
}
} catch (Exception e) {
e.printStackTrace();
}
return "SUCCESS";
}
com.example.mqtt.config.MqttConfig : mqtt连接:true
com.example.mqtt.config.MqttConfig : 已订阅topic:mqtt/test
c.e.mqtt.controller.MqttController : 发送成功:0
com.example.mqtt.consume.MqttConsume : 收到消息:topic:mqtt/test,Qos:0,msg:1241234&0
c.e.mqtt.controller.MqttController : 发送成功:1
com.example.mqtt.consume.MqttConsume : 收到消息:topic:mqtt/test,Qos:0,msg:1241234&1
QoS0,发送就不管了,最多一次;
QoS1,发送之后依赖MQTT规范,是否启动重传消息,所以至少一次;
QoS2,发送之后依赖MQTT消息机制,确保只有一次。
以上就是本章的全部内容了。
上一篇:RocketMQ第三话 – RocketMQ高可用集群搭建
下一篇:MQTT第二话 – emqx高可用集群实现
人生天地之间,若白驹过隙,忽然而已