• Android MQTT开发之 Hivemq MQTT Client


    使用一个开源库:hivemq-mqtt-client,这是Java生态的一个MQTT客户端框架,需要Java 8,Android上使用的话问题不大,需要一些额外的配置,下面列出了相关的配置,尤其是 packagingOptions,不然编译不过,因为框架使用了Java8新增的语言特性,所以 minSdk 设置为24,即Android7.0,如果要兼容Android7.0以下系统,可以参考这份详细文档配置一下语法脱糖的SDK: Installation on Android

    1. android {
    2. defaultConfig {
    3. minSdk 24
    4. }
    5. compileOptions {
    6. sourceCompatibility JavaVersion.VERSION_8
    7. targetCompatibility JavaVersion.VERSION_8
    8. }
    9. kotlinOptions {
    10. jvmTarget = '8'
    11. }
    12. packagingOptions {
    13. resources {
    14. excludes += ['META-INF/INDEX.LIST', 'META-INF/io.netty.versions.properties']
    15. }
    16. }
    17. }
    18. dependencies {
    19. implementation 'com.hivemq:hivemq-mqtt-client:1.3.3'
    20. }

    刚开始在自动连接这块花了好多时间,最后才发现是设置用户名和密码的地方不对,一定要在设置自动重连(初始化Client)的地方设置,而不是连接的时候!下面是一个简单的使用示例代码

    MqttManager.kt

    1. import android.util.Log
    2. import com.hivemq.client.mqtt.datatypes.MqttQos
    3. import com.hivemq.client.mqtt.lifecycle.MqttClientConnectedContext
    4. import com.hivemq.client.mqtt.lifecycle.MqttClientConnectedListener
    5. import com.hivemq.client.mqtt.lifecycle.MqttClientDisconnectedContext
    6. import com.hivemq.client.mqtt.lifecycle.MqttClientDisconnectedListener
    7. import com.hivemq.client.mqtt.mqtt5.Mqtt5AsyncClient
    8. import com.hivemq.client.mqtt.mqtt5.Mqtt5Client
    9. import com.hivemq.client.mqtt.mqtt5.message.connect.connack.Mqtt5ConnAckReasonCode
    10. import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish
    11. import com.hivemq.client.mqtt.mqtt5.message.subscribe.suback.Mqtt5SubAck
    12. import java.util.UUID
    13. import java.util.concurrent.CompletableFuture
    14. import java.util.concurrent.Executors
    15. import java.util.function.Consumer
    16. open class MqttListener {
    17. open fun onConnected() {}
    18. open fun onDisconnected() {}
    19. open fun onSubscribed(vararg topics: String) {}
    20. open fun onReceiveMessage(topic: String, data: ByteArray) {}
    21. open fun onSendMessage(topic: String, data: ByteArray) {}
    22. }
    23. /*
    24. 文档
    25. https://github.com/hivemq/hivemq-mqtt-client
    26. https://hivemq.github.io/hivemq-mqtt-client/docs/installation/android/
    27. */
    28. class MqttManager private constructor() : MqttClientConnectedListener, MqttClientDisconnectedListener {
    29. private val executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()) {
    30. Thread(it).apply { isDaemon = true }
    31. }
    32. private val mqttAsynClient: Mqtt5AsyncClient = Mqtt5Client.builder()
    33. .identifier(UUID.randomUUID().toString())
    34. .serverHost(SERVER_HOST)
    35. .serverPort(SERVER_PORT)
    36. .addConnectedListener(this)
    37. .addDisconnectedListener(this)
    38. .simpleAuth()//在初始化的时候设置账号密码,重连才能成功
    39. .username(USERNAME)
    40. .password(PASSWORD.toByteArray())
    41. .applySimpleAuth()
    42. .automaticReconnectWithDefaultConfig()//自动重连
    43. .buildAsync()
    44. private val listeners = mutableListOf()
    45. private val subTopics
    46. get() = arrayOf("top1", "top2", "top3")
    47. fun addMqttListener(listener: MqttListener) {
    48. if (!listeners.contains(listener)) {
    49. listeners.add(listener)
    50. }
    51. }
    52. fun removeMqttListener(listener: MqttListener) {
    53. listeners.remove(listener)
    54. }
    55. override fun onConnected(context: MqttClientConnectedContext) {
    56. Log.i(TAG, "onConnected()")
    57. for (l in listeners) {
    58. l.onConnected()
    59. }
    60. subscribeAll()
    61. }
    62. private fun subscribeAll() {
    63. CompletableFuture.supplyAsync({
    64. val futures = subTopics.map(::subscribe)
    65. .map {
    66. it.thenCompose {
    67. CompletableFuture.supplyAsync({
    68. val success = !it.reasonString.isPresent
    69. if (success) {
    70. Log.i(TAG, "subscribe success")
    71. } else {
    72. Log.e(
    73. TAG, "subscribe() - reasonCodes=[${it.reasonCodes.joinToString(", ")}]" +
    74. ", reasonString=${it.reasonString}"
    75. )
    76. }
    77. success
    78. }, executor)
    79. }
    80. }
    81. .toTypedArray()
    82. CompletableFuture.allOf(*futures).join()//等待所有订阅结果
    83. if(futures.all { it.get() }) {
    84. Log.i(TAG, "subscribeAll() - 全部订阅成功")
    85. }
    86. for (l in listeners) {
    87. l.onSubscribed(*subTopics)
    88. }
    89. }, executor)
    90. }
    91. override fun onDisconnected(context: MqttClientDisconnectedContext) {
    92. Log.e(
    93. TAG, "onDisconnected() - isConnected=${mqttAsynClient.state.isConnected}" +
    94. ", isConnectedOrReconnect=${mqttAsynClient.state.isConnectedOrReconnect}"
    95. )
    96. for (l in listeners) {
    97. l.onDisconnected()
    98. }
    99. }
    100. fun connect() {
    101. mqttAsynClient
    102. .connectWith()
    103. .cleanStart(true)
    104. .keepAlive(30)
    105. .send()
    106. .thenAccept {
    107. if (it.reasonCode == Mqtt5ConnAckReasonCode.SUCCESS) {
    108. Log.i(TAG, "connect() - SUCCESS")
    109. } else {
    110. Log.e(TAG, "connect() - ${it.reasonCode}")
    111. }
    112. }
    113. }
    114. fun disconnect() {
    115. mqttAsynClient.disconnect().thenAccept {
    116. Log.i(TAG, "disconnect()")
    117. }
    118. }
    119. private val callback = Consumer {
    120. val topic = it.topic.toString()
    121. val data = it.payloadAsBytes
    122. processReceivedMessage(topic, data)
    123. }
    124. private fun processReceivedMessage(topic: String, data: ByteArray) {
    125. //处理接收的数据
    126. for (l in listeners) {
    127. l.onReceiveMessage(topic, data)
    128. }
    129. }
    130. fun subscribe(topic: String): CompletableFuture {
    131. return mqttAsynClient.subscribeWith()
    132. .topicFilter(topic)
    133. .noLocal(true)// we do not want to receive our own message
    134. .qos(MqttQos.AT_MOST_ONCE)
    135. .callback(callback)
    136. .executor(executor)
    137. .send()
    138. }
    139. fun unsubscribe(topic: String) {
    140. mqttAsynClient.unsubscribeWith()
    141. .topicFilter(topic)
    142. .send().thenAccept {
    143. Log.i(TAG, "unsubscribe() - $it")
    144. }
    145. }
    146. /**
    147. * 发送数据
    148. */
    149. fun publish(topic: String, payload: ByteArray) {
    150. mqttAsynClient.publishWith()
    151. .topic(topic)
    152. .qos(MqttQos.AT_MOST_ONCE)
    153. .payload(payload)
    154. .send()
    155. .thenAccept { mqtt5PublishResult ->
    156. mqtt5PublishResult.publish.let { mqtt5Publish ->
    157. // val topic = mqtt5Publish.topic.toString()
    158. val data = mqtt5Publish.payloadAsBytes
    159. for (l in listeners) {
    160. l.onSendMessage(topic, data)
    161. }
    162. }
    163. }
    164. }
    165. companion object {
    166. private const val TAG = "MqttManager"
    167. private const val SERVER_HOST = "example.com"
    168. private const val SERVER_PORT = 1883 // 1883即TCP协议,host不要再加上"tcp://",否则连不成功
    169. private const val USERNAME = "admin"
    170. private const val PASSWORD = "123456"
    171. val instance = MqttManager()
    172. }
    173. }

  • 相关阅读:
    试试这2个流动图片制作方法让你的图片动起来吧
    当语文课本上的古诗词遇上拓世AI,文生图绘就东方美学画卷
    安卓可视大屏寻呼台 兼容标准sip协议
    长篇图解java反射机制及其应用场景
    【CSS】Tailwind CSS
    【博客475】alertmanager集群如何同步数据
    编写程序,建立一动态链表,其中包含学生学号,姓名,年龄,输入一个学生学号删除相对应的节点
    动态编译库 Natasha 5.0 兼容版本发布
    C笔记:引用调用,通过指针传递
    elementUI elfrom表单验证无效、不起作用常见原因
  • 原文地址:https://blog.csdn.net/jjf19891208/article/details/134437931