• Android - MQTT客户端


    什么是MQTT

    MQTT是机器对机器(M2M)/物联网(IoT)连接协议。它被设计为一个极其轻量级的发布/订阅消息传输协议。

    限制:
    MQTT协议传输内容格式目前只支持2中,一个为UTF-8和askii,传输的内容最大为256M

    Android 客户端的使用

    依赖

        implementation 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1'
        implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.1.1'
    
    • 1
    • 2

    代码

    
    @Route(path = "/main/MqttActivity")
    public class MqttActivity extends AppCompatActivity {
    
        /**
         * 服务端地址
         */
        private String serverUri;
    
        /**
         * 连接时使用的clientId, 必须唯一, 一般加时间戳 (作为客户端的标识)
         */
        private String clientId;
    
        /**
         * 消息持久化
         * <p>
         * 可以 内存存储或者文件存储,这里用的是文件存储
         */
        private MqttClientPersistence mqttClientPersistence;
        /**
         * 声明一个MQTT客户端对象
         */
        private MqttAndroidClient mqttAndroidClient;
    
    
        private MqttConnectOptions mqttConnectOptions;
    
    
        @Override
        protected void onCreate(Bundle savedInstanceState) {
            super.onCreate(savedInstanceState);
            setContentView(R.layout.activity_mqtt);
    
            newClient();
    
            setClientConfig();
    
            connectMqtt();
    
    
        }
    
        @Override
        protected void onDestroy() {
            super.onDestroy();
            disconnectMqtt();
        }
    
        private void disconnectMqtt() {
            try {
                mqttAndroidClient.disconnect();
            } catch (MqttException e) {
                e.printStackTrace();
            }
        }
    
        private String topicFilter;
        private int qos;
    
        /**
         * 连接成功后,可以订阅了
         */
        private void subscribeTopic() {
            try {
                mqttAndroidClient.subscribe(topicFilter, qos);
                startKeepAliveAlarm();
            } catch (MqttException e) {
                e.printStackTrace();
                //订阅失败可以重连
            }
        }
    
    
        private static final String MQTT_ACTION_KEEP_ALIVE = "MQTT 保活广播";
        private static final int MQTT_KEEP_ALIVE = 15 * 1000; //心跳包时间,毫秒
    
        /**
         * 开启保活的 Alarm
         */
        private void startKeepAliveAlarm() {
            //使用 AlarmManager 进行保活,通过 PendingIntent 调用到 广播或者服务,实现真正的保活
            AlarmManager mAlarmManager = (AlarmManager) getSystemService(ALARM_SERVICE);
            Intent intent = new Intent(MQTT_ACTION_KEEP_ALIVE);
            PendingIntent pi = PendingIntent.getBroadcast(this, 0, intent, PendingIntent.FLAG_CANCEL_CURRENT);
            mAlarmManager.cancel(pi);
            mAlarmManager.setWindow(AlarmManager.RTC_WAKEUP, System.currentTimeMillis() + MQTT_KEEP_ALIVE, MQTT_KEEP_ALIVE, pi);
        }
    
        public static final byte[] MQTT_KEEP_ALIVE_MESSAGE = {0}; // 心跳包发送内容
        private String topicPublish;
    
        /**
         * 用于保活调用,被 {@link MqttActivity#startKeepAliveAlarm()} 触发
         */
        private void keepAlive() {
            MqttMessage message = new MqttMessage(MQTT_KEEP_ALIVE_MESSAGE);
            message.setQos(0);
            topicPublish = "这是保活的topic";
            try {
                mqttAndroidClient.publish(topicPublish, message);
            } catch (MqttException e) {
                e.printStackTrace();
            }
        }
    
        private void connectMqtt() {
            try {
                mqttAndroidClient.connect(mqttConnectOptions, new IMqttActionListener() {
                    /**
                     * 连接成功
                     * @param asyncActionToken -
                     */
                    @Override
                    public void onSuccess(IMqttToken asyncActionToken) {
                        subscribeTopic();
                    }
    
                    /**
                     * 连接失败
                     * @param asyncActionToken -
                     * @param exception -
                     */
                    @Override
                    public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
    
                    }
                });
            } catch (MqttException e) {
                e.printStackTrace();
            }
        }
    
        /**
         * 验证HTTPS
         */
        private SSLSocketFactory sslSocketFactory;
    
        private void setClientConfig() {
    
            mqttConnectOptions = new MqttConnectOptions();
            //设置自动重连
            mqttConnectOptions.setAutomaticReconnect(true);
            // 缓存,
            mqttConnectOptions.setCleanSession(false);
            // 设置超时时间,单位:秒
            mqttConnectOptions.setConnectionTimeout(15);
            // 心跳包发送间隔,单位:秒
            mqttConnectOptions.setKeepAliveInterval(15);
            // 用户名
            mqttConnectOptions.setUserName("username");
            // 密码
            mqttConnectOptions.setPassword("password".toCharArray());
            //版本
            mqttConnectOptions.setMqttVersion(4);
            //HTTPS
            mqttConnectOptions.setSocketFactory(sslSocketFactory);
        }
    
        private void newClient() {
    
            mqttClientPersistence = new MqttDefaultFilePersistence(getCacheDir().getAbsolutePath());
            mqttAndroidClient = new MqttAndroidClient(getApplicationContext(), serverUri, clientId, mqttClientPersistence);
    
            mqttAndroidClient.setCallback(new MqttCallback() {
    
                /**
                 *当与服务器的连接丢失时       ,将调用此方法。
                 *@param cause 导致连接丢失的原因。
                 */
                @Override
                public void connectionLost(Throwable cause) {
    
                }
    
                /**
                 * 当来自服务器的消息到达时,调用此方法。
                 *
                 * MQTT客户端同步调用此方法。
                 * 直到此方法干净地返回,才会将确认发送回服务器。
                 *
                 * 如果此方法的实现引发异常,则客户端将被关闭。下次重新连接客户端时,服务器将重新传递任何QoS 1或2消息。
                 *
                 * 在运行此方法的实现时到达的任何其他消息将在内存中建立,然后在网络上备份。
                 *
                 * 如果应用程序需要持久化数据,那么它应该确保在从该方法返回之前持久化数据,因为从该方法返回之后,消息被视为已传递,并且不可复制。
                 *
                 * 可以在该回调的实现中发送新消息(例如,对此消息的响应),但该实现不能断开客户端的连接,因为无法发送对正在处理的消息的确认,并且会发生死锁。
                 *
                 * @param topic 邮件主题的名称已发布到
                 * @param message 实际消息
                 * @throws Exception 如果发生终端错误,则应关闭客户端。
                 */
                @Override
                public void messageArrived(String topic, MqttMessage message) throws Exception {
    
                }
    
                /**
                 * 在完成邮件传递并收到所有确认时调用。对于QoS 0消息,一旦消息被传递到网络,就会调用它。对于QoS 1,当接收到PUBACK时调用它;对于QoS 2,当接收到PUBCOMP时调用它。该令牌将与发布消息时返回的令牌相同。
                 * @param token 与邮件关联的传递令牌。
                 */
                @Override
                public void deliveryComplete(IMqttDeliveryToken token) {
    
                }
            });
        }
    
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166
    • 167
    • 168
    • 169
    • 170
    • 171
    • 172
    • 173
    • 174
    • 175
    • 176
    • 177
    • 178
    • 179
    • 180
    • 181
    • 182
    • 183
    • 184
    • 185
    • 186
    • 187
    • 188
    • 189
    • 190
    • 191
    • 192
    • 193
    • 194
    • 195
    • 196
    • 197
    • 198
    • 199
    • 200
    • 201
    • 202
    • 203
    • 204
    • 205
    • 206
    • 207
    • 208
    • 209
    • 210
    • 211

    操作1.创建新连接 - newClient

    需要参数:

    服务端地址 - serverUri
    连接时使用的clientId, 必须唯一, 一般加时间戳 (作为客户端的标识) - clientId
    消息持久化(文件存储) - mqttClientPersistence
    MQTT客户端 - mqttAndroidClient

    mqttAndroidClient 是Android 专用的,除此之外还可以用 MqttAsyncClient = = 基本一致

    public interface MqttCallback {
    
    	public void connectionLost(Throwable cause);
    
    	public void messageArrived(String topic, MqttMessage message) throws Exception;
    
    	public void deliveryComplete(IMqttDeliveryToken token);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    MqttCallback 是消息回调接口,订阅的消息动态由此返回

    操作2.设置参数 - setClientConfig

    需要参数:

    如需验证HTTPS - SSLSocketFactory

    设置自动重连 -
    缓存 是否会话持久化 - cleanSession
    超时时间,单位:秒 - connectionTimeout
    心跳包发送间隔,单位:秒 -
    用户名
    密码
    版本 - MqttVersion

    操作3.连接 - connectMqtt

    public interface IMqttActionListener {
    
    	public void onSuccess(IMqttToken asyncActionToken );
    
    	public void onFailure(IMqttToken asyncActionToken, Throwable exception);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    返回成功 、 失败

    操作4.订阅 - subscribeTopic

    参数

    主题 - topic

    服务质量 - qos

    “至多一次 Qos level=0”,消息发布完全依赖底层 TCP/IP 网络。会发生消息丢失或重复。这一级别可用于如下情况,环境传感器数据,丢失一次读记录无所谓,因为不久后还会有第二次发送。
    “至少一次Qos level=1”,确保消息到达,但消息重复可能会发生。
    “只有一次Qos level=2”,确保消息到达一次。这一级别可用于如下情况,在计费系统中,消息重复或丢失会导致不正确的结果。

    操作5.心跳- startKeepAliveAlarm

    订阅的同时也要做心跳保活

    参考地址

    基于MQTT协议的 org.eclipse.paho.client.mqttv3 源码学习(一)
    https://blog.csdn.net/chenbifeng/article/details/25050693

    基于MQTT协议的 org.eclipse.paho.client.mqttv3 源码学习(二)
    https://blog.csdn.net/chenbifeng/article/details/25067761

    Android实现MQTT客户端:https://www.jianshu.com/p/f2a043aeaa6d

    Android Mqtt 客户端paho使用心得:https://www.jianshu.com/p/0d2086c4f6f4

    基于paho在android平台上实现MQTT Client间的简单通信 - CSDN博客
    http://itindex.net/detail/58745-paho-android-%E5%B9%B3%E5%8F%B0

    MQTT基本应用(Mosquitto+Eclipse Paho):https://blog.csdn.net/AhaQianxun/article/details/102756981

  • 相关阅读:
    printf函数中表达式的结合顺序
    layui子界面操作数据后主界面刷新怎么操作
    pytorch 多GPU训练
    本机spark 通idea连接Oracle的坑
    安卓学习:广播
    打不过就加入 | 动植物泛基因组研究(一)
    Redis客户端常见异常
    猿创征文|聊一聊我在字节跳动做项目质量改进的经验
    并发编程基础概念
    裁员浪潮,回顾一下自己去年的毕业吧(二)
  • 原文地址:https://blog.csdn.net/weixin_35691921/article/details/124409848