使用一个开源库:hivemq-mqtt-client,这是Java生态的一个MQTT客户端框架,需要Java 8,Android上使用的话问题不大,需要一些额外的配置,下面列出了相关的配置,尤其是 packagingOptions,不然编译不过,因为框架使用了Java8新增的语言特性,所以 minSdk 设置为24,即Android7.0,如果要兼容Android7.0以下系统,可以参考这份详细文档配置一下语法脱糖的SDK: Installation on Android
- android {
- defaultConfig {
- minSdk 24
- }
- compileOptions {
- sourceCompatibility JavaVersion.VERSION_8
- targetCompatibility JavaVersion.VERSION_8
- }
- kotlinOptions {
- jvmTarget = '8'
- }
- packagingOptions {
- resources {
- excludes += ['META-INF/INDEX.LIST', 'META-INF/io.netty.versions.properties']
- }
- }
- }
-
- dependencies {
- implementation 'com.hivemq:hivemq-mqtt-client:1.3.3'
- }
刚开始在自动连接这块花了好多时间,最后才发现是设置用户名和密码的地方不对,一定要在设置自动重连(初始化Client)的地方设置,而不是连接的时候!下面是一个简单的使用示例代码
MqttManager.kt
- import android.util.Log
- import com.hivemq.client.mqtt.datatypes.MqttQos
- import com.hivemq.client.mqtt.lifecycle.MqttClientConnectedContext
- import com.hivemq.client.mqtt.lifecycle.MqttClientConnectedListener
- import com.hivemq.client.mqtt.lifecycle.MqttClientDisconnectedContext
- import com.hivemq.client.mqtt.lifecycle.MqttClientDisconnectedListener
- import com.hivemq.client.mqtt.mqtt5.Mqtt5AsyncClient
- import com.hivemq.client.mqtt.mqtt5.Mqtt5Client
- import com.hivemq.client.mqtt.mqtt5.message.connect.connack.Mqtt5ConnAckReasonCode
- import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish
- import com.hivemq.client.mqtt.mqtt5.message.subscribe.suback.Mqtt5SubAck
- import java.util.UUID
- import java.util.concurrent.CompletableFuture
- import java.util.concurrent.Executors
- import java.util.function.Consumer
-
- open class MqttListener {
- open fun onConnected() {}
- open fun onDisconnected() {}
- open fun onSubscribed(vararg topics: String) {}
- open fun onReceiveMessage(topic: String, data: ByteArray) {}
- open fun onSendMessage(topic: String, data: ByteArray) {}
- }
-
- /*
- 文档
- https://github.com/hivemq/hivemq-mqtt-client
- https://hivemq.github.io/hivemq-mqtt-client/docs/installation/android/
- */
- class MqttManager private constructor() : MqttClientConnectedListener, MqttClientDisconnectedListener {
- private val executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()) {
- Thread(it).apply { isDaemon = true }
- }
-
- private val mqttAsynClient: Mqtt5AsyncClient = Mqtt5Client.builder()
- .identifier(UUID.randomUUID().toString())
- .serverHost(SERVER_HOST)
- .serverPort(SERVER_PORT)
- .addConnectedListener(this)
- .addDisconnectedListener(this)
- .simpleAuth()//在初始化的时候设置账号密码,重连才能成功
- .username(USERNAME)
- .password(PASSWORD.toByteArray())
- .applySimpleAuth()
- .automaticReconnectWithDefaultConfig()//自动重连
- .buildAsync()
-
- private val listeners = mutableListOf
() -
- private val subTopics
- get() = arrayOf("top1", "top2", "top3")
-
- fun addMqttListener(listener: MqttListener) {
- if (!listeners.contains(listener)) {
- listeners.add(listener)
- }
- }
-
- fun removeMqttListener(listener: MqttListener) {
- listeners.remove(listener)
- }
-
- override fun onConnected(context: MqttClientConnectedContext) {
- Log.i(TAG, "onConnected()")
- for (l in listeners) {
- l.onConnected()
- }
- subscribeAll()
- }
-
- private fun subscribeAll() {
- CompletableFuture.supplyAsync({
- val futures = subTopics.map(::subscribe)
- .map {
- it.thenCompose {
- CompletableFuture.supplyAsync({
- val success = !it.reasonString.isPresent
- if (success) {
- Log.i(TAG, "subscribe success")
- } else {
- Log.e(
- TAG, "subscribe() - reasonCodes=[${it.reasonCodes.joinToString(", ")}]" +
- ", reasonString=${it.reasonString}"
- )
- }
- success
- }, executor)
- }
- }
- .toTypedArray()
- CompletableFuture.allOf(*futures).join()//等待所有订阅结果
- if(futures.all { it.get() }) {
- Log.i(TAG, "subscribeAll() - 全部订阅成功")
- }
- for (l in listeners) {
- l.onSubscribed(*subTopics)
- }
- }, executor)
- }
-
- override fun onDisconnected(context: MqttClientDisconnectedContext) {
- Log.e(
- TAG, "onDisconnected() - isConnected=${mqttAsynClient.state.isConnected}" +
- ", isConnectedOrReconnect=${mqttAsynClient.state.isConnectedOrReconnect}"
- )
- for (l in listeners) {
- l.onDisconnected()
- }
- }
-
- fun connect() {
- mqttAsynClient
- .connectWith()
- .cleanStart(true)
- .keepAlive(30)
- .send()
- .thenAccept {
- if (it.reasonCode == Mqtt5ConnAckReasonCode.SUCCESS) {
- Log.i(TAG, "connect() - SUCCESS")
- } else {
- Log.e(TAG, "connect() - ${it.reasonCode}")
- }
- }
- }
-
- fun disconnect() {
- mqttAsynClient.disconnect().thenAccept {
- Log.i(TAG, "disconnect()")
- }
- }
-
-
- private val callback = Consumer
{ - val topic = it.topic.toString()
- val data = it.payloadAsBytes
- processReceivedMessage(topic, data)
- }
-
- private fun processReceivedMessage(topic: String, data: ByteArray) {
- //处理接收的数据
- for (l in listeners) {
- l.onReceiveMessage(topic, data)
- }
- }
-
- fun subscribe(topic: String): CompletableFuture
{ - return mqttAsynClient.subscribeWith()
- .topicFilter(topic)
- .noLocal(true)// we do not want to receive our own message
- .qos(MqttQos.AT_MOST_ONCE)
- .callback(callback)
- .executor(executor)
- .send()
- }
-
- fun unsubscribe(topic: String) {
- mqttAsynClient.unsubscribeWith()
- .topicFilter(topic)
- .send().thenAccept {
- Log.i(TAG, "unsubscribe() - $it")
- }
- }
-
- /**
- * 发送数据
- */
- fun publish(topic: String, payload: ByteArray) {
- mqttAsynClient.publishWith()
- .topic(topic)
- .qos(MqttQos.AT_MOST_ONCE)
- .payload(payload)
- .send()
- .thenAccept { mqtt5PublishResult ->
- mqtt5PublishResult.publish.let { mqtt5Publish ->
- // val topic = mqtt5Publish.topic.toString()
- val data = mqtt5Publish.payloadAsBytes
- for (l in listeners) {
- l.onSendMessage(topic, data)
- }
- }
- }
- }
-
- companion object {
- private const val TAG = "MqttManager"
-
- private const val SERVER_HOST = "example.com"
- private const val SERVER_PORT = 1883 // 1883即TCP协议,host不要再加上"tcp://",否则连不成功
- private const val USERNAME = "admin"
- private const val PASSWORD = "123456"
-
- val instance = MqttManager()
- }
- }