MQTT是机器对机器(M2M)/物联网(IoT)连接协议。它被设计为一个极其轻量级的发布/订阅消息传输协议。
限制:
MQTT协议传输内容格式目前只支持2中,一个为UTF-8和askii,传输的内容最大为256M
依赖
implementation 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1'
implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.1.1'
代码
@Route(path = "/main/MqttActivity")
public class MqttActivity extends AppCompatActivity {
/**
* 服务端地址
*/
private String serverUri;
/**
* 连接时使用的clientId, 必须唯一, 一般加时间戳 (作为客户端的标识)
*/
private String clientId;
/**
* 消息持久化
* <p>
* 可以 内存存储或者文件存储,这里用的是文件存储
*/
private MqttClientPersistence mqttClientPersistence;
/**
* 声明一个MQTT客户端对象
*/
private MqttAndroidClient mqttAndroidClient;
private MqttConnectOptions mqttConnectOptions;
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_mqtt);
newClient();
setClientConfig();
connectMqtt();
}
@Override
protected void onDestroy() {
super.onDestroy();
disconnectMqtt();
}
private void disconnectMqtt() {
try {
mqttAndroidClient.disconnect();
} catch (MqttException e) {
e.printStackTrace();
}
}
private String topicFilter;
private int qos;
/**
* 连接成功后,可以订阅了
*/
private void subscribeTopic() {
try {
mqttAndroidClient.subscribe(topicFilter, qos);
startKeepAliveAlarm();
} catch (MqttException e) {
e.printStackTrace();
//订阅失败可以重连
}
}
private static final String MQTT_ACTION_KEEP_ALIVE = "MQTT 保活广播";
private static final int MQTT_KEEP_ALIVE = 15 * 1000; //心跳包时间,毫秒
/**
* 开启保活的 Alarm
*/
private void startKeepAliveAlarm() {
//使用 AlarmManager 进行保活,通过 PendingIntent 调用到 广播或者服务,实现真正的保活
AlarmManager mAlarmManager = (AlarmManager) getSystemService(ALARM_SERVICE);
Intent intent = new Intent(MQTT_ACTION_KEEP_ALIVE);
PendingIntent pi = PendingIntent.getBroadcast(this, 0, intent, PendingIntent.FLAG_CANCEL_CURRENT);
mAlarmManager.cancel(pi);
mAlarmManager.setWindow(AlarmManager.RTC_WAKEUP, System.currentTimeMillis() + MQTT_KEEP_ALIVE, MQTT_KEEP_ALIVE, pi);
}
public static final byte[] MQTT_KEEP_ALIVE_MESSAGE = {0}; // 心跳包发送内容
private String topicPublish;
/**
* 用于保活调用,被 {@link MqttActivity#startKeepAliveAlarm()} 触发
*/
private void keepAlive() {
MqttMessage message = new MqttMessage(MQTT_KEEP_ALIVE_MESSAGE);
message.setQos(0);
topicPublish = "这是保活的topic";
try {
mqttAndroidClient.publish(topicPublish, message);
} catch (MqttException e) {
e.printStackTrace();
}
}
private void connectMqtt() {
try {
mqttAndroidClient.connect(mqttConnectOptions, new IMqttActionListener() {
/**
* 连接成功
* @param asyncActionToken -
*/
@Override
public void onSuccess(IMqttToken asyncActionToken) {
subscribeTopic();
}
/**
* 连接失败
* @param asyncActionToken -
* @param exception -
*/
@Override
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
}
});
} catch (MqttException e) {
e.printStackTrace();
}
}
/**
* 验证HTTPS
*/
private SSLSocketFactory sslSocketFactory;
private void setClientConfig() {
mqttConnectOptions = new MqttConnectOptions();
//设置自动重连
mqttConnectOptions.setAutomaticReconnect(true);
// 缓存,
mqttConnectOptions.setCleanSession(false);
// 设置超时时间,单位:秒
mqttConnectOptions.setConnectionTimeout(15);
// 心跳包发送间隔,单位:秒
mqttConnectOptions.setKeepAliveInterval(15);
// 用户名
mqttConnectOptions.setUserName("username");
// 密码
mqttConnectOptions.setPassword("password".toCharArray());
//版本
mqttConnectOptions.setMqttVersion(4);
//HTTPS
mqttConnectOptions.setSocketFactory(sslSocketFactory);
}
private void newClient() {
mqttClientPersistence = new MqttDefaultFilePersistence(getCacheDir().getAbsolutePath());
mqttAndroidClient = new MqttAndroidClient(getApplicationContext(), serverUri, clientId, mqttClientPersistence);
mqttAndroidClient.setCallback(new MqttCallback() {
/**
*当与服务器的连接丢失时 ,将调用此方法。
*@param cause 导致连接丢失的原因。
*/
@Override
public void connectionLost(Throwable cause) {
}
/**
* 当来自服务器的消息到达时,调用此方法。
*
* MQTT客户端同步调用此方法。
* 直到此方法干净地返回,才会将确认发送回服务器。
*
* 如果此方法的实现引发异常,则客户端将被关闭。下次重新连接客户端时,服务器将重新传递任何QoS 1或2消息。
*
* 在运行此方法的实现时到达的任何其他消息将在内存中建立,然后在网络上备份。
*
* 如果应用程序需要持久化数据,那么它应该确保在从该方法返回之前持久化数据,因为从该方法返回之后,消息被视为已传递,并且不可复制。
*
* 可以在该回调的实现中发送新消息(例如,对此消息的响应),但该实现不能断开客户端的连接,因为无法发送对正在处理的消息的确认,并且会发生死锁。
*
* @param topic 邮件主题的名称已发布到
* @param message 实际消息
* @throws Exception 如果发生终端错误,则应关闭客户端。
*/
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
}
/**
* 在完成邮件传递并收到所有确认时调用。对于QoS 0消息,一旦消息被传递到网络,就会调用它。对于QoS 1,当接收到PUBACK时调用它;对于QoS 2,当接收到PUBCOMP时调用它。该令牌将与发布消息时返回的令牌相同。
* @param token 与邮件关联的传递令牌。
*/
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
}
});
}
}
需要参数:
服务端地址 - serverUri
连接时使用的clientId, 必须唯一, 一般加时间戳 (作为客户端的标识) - clientId
消息持久化(文件存储) - mqttClientPersistence
MQTT客户端 - mqttAndroidClient
mqttAndroidClient 是Android 专用的,除此之外还可以用 MqttAsyncClient = = 基本一致
public interface MqttCallback {
public void connectionLost(Throwable cause);
public void messageArrived(String topic, MqttMessage message) throws Exception;
public void deliveryComplete(IMqttDeliveryToken token);
}
MqttCallback 是消息回调接口,订阅的消息动态由此返回
需要参数:
如需验证HTTPS - SSLSocketFactory
设置自动重连 -
缓存 是否会话持久化 - cleanSession
超时时间,单位:秒 - connectionTimeout
心跳包发送间隔,单位:秒 -
用户名
密码
版本 - MqttVersion
public interface IMqttActionListener {
public void onSuccess(IMqttToken asyncActionToken );
public void onFailure(IMqttToken asyncActionToken, Throwable exception);
}
返回成功 、 失败
参数
主题 - topic
服务质量 - qos
“至多一次 Qos level=0”,消息发布完全依赖底层 TCP/IP 网络。会发生消息丢失或重复。这一级别可用于如下情况,环境传感器数据,丢失一次读记录无所谓,因为不久后还会有第二次发送。
“至少一次Qos level=1”,确保消息到达,但消息重复可能会发生。
“只有一次Qos level=2”,确保消息到达一次。这一级别可用于如下情况,在计费系统中,消息重复或丢失会导致不正确的结果。
订阅的同时也要做心跳保活
基于MQTT协议的 org.eclipse.paho.client.mqttv3 源码学习(一)
https://blog.csdn.net/chenbifeng/article/details/25050693
基于MQTT协议的 org.eclipse.paho.client.mqttv3 源码学习(二)
https://blog.csdn.net/chenbifeng/article/details/25067761
Android实现MQTT客户端:https://www.jianshu.com/p/f2a043aeaa6d
Android Mqtt 客户端paho使用心得:https://www.jianshu.com/p/0d2086c4f6f4
基于paho在android平台上实现MQTT Client间的简单通信 - CSDN博客
http://itindex.net/detail/58745-paho-android-%E5%B9%B3%E5%8F%B0
MQTT基本应用(Mosquitto+Eclipse Paho):https://blog.csdn.net/AhaQianxun/article/details/102756981