• flink接入mqtt数据源


    flink没有原生的mqtt数据源,但可以通过自定义数据源进行添加mqtt的数据源。

    1. package com.agioe.flink.source.mqtt;
    2. import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
    3. import org.eclipse.paho.mqttv5.client.*;
    4. import org.eclipse.paho.mqttv5.client.persist.MemoryPersistence;
    5. import org.eclipse.paho.mqttv5.common.MqttException;
    6. import org.eclipse.paho.mqttv5.common.MqttMessage;
    7. import org.eclipse.paho.mqttv5.common.packet.MqttProperties;
    8. import java.io.ByteArrayOutputStream;
    9. import java.io.ObjectOutputStream;
    10. import java.io.Serializable;
    11. import java.nio.charset.StandardCharsets;
    12. import java.util.concurrent.ArrayBlockingQueue;
    13. import java.util.concurrent.BlockingQueue;
    14. public class MqttSource extends RichSourceFunction {
    15. //存储服务
    16. private static MqttClient client;
    17. //存储订阅主题
    18. private static MqttTopic mqttTopic;
    19. //阻塞队列存储订阅的消息
    20. private BlockingQueue queue = new ArrayBlockingQueue<>(10);
    21. private StartupParams startupParams;
    22. public MqttSource(StartupParams startupParams) {
    23. this.startupParams = startupParams;
    24. }
    25. //包装连接的方法
    26. private void connect() throws MqttException {
    27. String username = startupParams.pro.getProperty("mqtt.username");
    28. String password = startupParams.pro.getProperty("mqtt.password");
    29. //配置连接参数
    30. MqttConfig mqttConfigBean = new MqttConfig(null, null, "tcp://192.168.15.13:1883", "mqtt-client", "testtopic/#");
    31. //连接mqtt服务器
    32. client = new MqttClient(mqttConfigBean.getHostUrl(), mqttConfigBean.getClientId(), new MemoryPersistence());
    33. MqttConnectionOptions options = new MqttConnectionOptions();
    34. options.setUserName(mqttConfigBean.getUsername());
    35. options.setPassword(mqttConfigBean.getPassword() == null ? null : mqttConfigBean.getPassword().getBytes(StandardCharsets.UTF_8));
    36. // options.setCleanSession(false); //是否清除session
    37. // options.setSessionExpiryInterval();
    38. // 设置超时时间
    39. options.setConnectionTimeout(30);
    40. // 设置会话心跳时间
    41. options.setKeepAliveInterval(20);
    42. try {
    43. String[] msgtopic = mqttConfigBean.getMsgTopic();
    44. //订阅消息
    45. int[] qos = new int[msgtopic.length];
    46. for (int i = 0; i < msgtopic.length; i++) {
    47. qos[i] = 0;
    48. }
    49. client.setCallback(new MsgCallback(client, options, msgtopic, qos) {
    50. });
    51. client.connect(options);
    52. client.subscribe(msgtopic, qos);
    53. System.out.println("MQTT连接成功:" + mqttConfigBean.getClientId() + ":" + client);
    54. } catch (Exception e) {
    55. System.out.println("MQTT连接异常:" + e);
    56. }
    57. }
    58. //实现MqttCallback,内部函数可回调
    59. class MsgCallback implements MqttCallback {
    60. private MqttClient client;
    61. private MqttConnectionOptions options;
    62. private String[] topic;
    63. private int[] qos;
    64. public MsgCallback() {
    65. }
    66. public MsgCallback(MqttClient client, MqttConnectionOptions options, String[] topic, int[] qos) {
    67. this.client = client;
    68. this.options = options;
    69. this.topic = topic;
    70. this.qos = qos;
    71. }
    72. //连接失败回调该函数
    73. @Override
    74. public void disconnected(MqttDisconnectResponse mqttDisconnectResponse) {
    75. System.out.println("MQTT连接断开,发起重连");
    76. while (true) {
    77. try {
    78. Thread.sleep(1000);
    79. client.connect(options);
    80. //订阅消息
    81. client.subscribe(topic, qos);
    82. System.out.println("MQTT重新连接成功:" + client);
    83. break;
    84. } catch (Exception e) {
    85. e.printStackTrace();
    86. continue;
    87. }
    88. }
    89. }
    90. @Override
    91. public void mqttErrorOccurred(MqttException e) {
    92. }
    93. //收到消息回调该函数
    94. @Override
    95. public void messageArrived(String s, MqttMessage message) throws Exception {
    96. System.out.println();
    97. //订阅消息字符
    98. String msg = new String(message.getPayload());
    99. byte[] bymsg = getBytesFromObject(msg);
    100. System.out.println("topic:" + topic);
    101. queue.put(msg);
    102. }
    103. @Override
    104. public void deliveryComplete(IMqttToken iMqttToken) {
    105. }
    106. @Override
    107. public void connectComplete(boolean b, String s) {
    108. System.out.println("MQTT重新连接成功:" + client);
    109. }
    110. @Override
    111. public void authPacketArrived(int i, MqttProperties mqttProperties) {
    112. }
    113. //对象转化为字节码
    114. public byte[] getBytesFromObject(Serializable obj) throws Exception {
    115. if (obj == null) {
    116. return null;
    117. }
    118. ByteArrayOutputStream bo = new ByteArrayOutputStream();
    119. ObjectOutputStream oo = new ObjectOutputStream(bo);
    120. oo.writeObject(obj);
    121. return bo.toByteArray();
    122. }
    123. }
    124. //flink线程启动函数
    125. @Override
    126. public void run(final SourceContext ctx) throws Exception {
    127. connect();
    128. //利用死循环使得程序一直监控主题是否有新消息
    129. while (true) {
    130. //使用阻塞队列的好处是队列空的时候程序会一直阻塞到这里不会浪费CPU资源
    131. ctx.collect(queue.take());
    132. }
    133. }
    134. @Override
    135. public void cancel() {
    136. }
    137. /**
    138. * 订阅某个主题
    139. *
    140. * @param topic
    141. * @param qos
    142. */
    143. public void subscribe(String topic, int qos) {
    144. try {
    145. System.out.println("topic:" + topic);
    146. client.subscribe(topic, qos);
    147. } catch (MqttException e) {
    148. e.printStackTrace();
    149. }
    150. }
    151. public MqttClient getClient() {
    152. return client;
    153. }
    154. public void setClient(MqttClient client) {
    155. this.client = client;
    156. }
    157. public MqttTopic getMqttTopic() {
    158. return mqttTopic;
    159. }
    160. public void setMqttTopic(MqttTopic mqttTopic) {
    161. this.mqttTopic = mqttTopic;
    162. }
    163. }
    1. public class MqttConfig implements Serializable {
    2. public MqttConfig(String username, String password, String hostUrl, String clientId, String msgTopic) {
    3. this.username = username;
    4. this.password = password;
    5. this.hostUrl = hostUrl;
    6. this.clientId = clientId;
    7. this.msgTopic = msgTopic;
    8. }
    9. //连接名称
    10. private String username;
    11. //连接密码
    12. private String password;
    13. //ip地址以及端口号
    14. private String hostUrl;
    15. //服务器ID注意不能与其他连接重复,否则会连接失败
    16. private String clientId;
    17. //订阅的主题
    18. private String msgTopic;
    19. //获得用户名
    20. public String getUsername() {
    21. return username;
    22. }
    23. //获得密码
    24. public String getPassword() {
    25. return password;
    26. }
    27. //获得客户端id
    28. public String getClientId() {
    29. return clientId;
    30. }
    31. //获得服务端url
    32. public String getHostUrl() {
    33. return hostUrl;
    34. }
    35. //获得订阅
    36. public String[] getMsgTopic() {
    37. String[] topic = msgTopic.split(",");
    38. return topic;
    39. }
    40. }

    在flink的main函数中增加

        stream = env.addSource(new MqttSource(StartupParams.instance));

  • 相关阅读:
    C++11继承构造函数在类中的使用
    代码检测利器“利特莫斯”之优化血泪史
    多线程和并发编程(3)—AQS和ReentrantLock实现的互斥锁
    OpenCV3图像处理笔记
    2022-11-27 ARM- 用C语言实现stm32的三盏灯的点亮
    企业上云原来如此简单,华为云带你体验云上风采
    SpringBoot的多环境切换(已废除)
    nginx优化--压缩
    inno setup自定义卸载程序和美化
    Unity中的PostProcessScene:深入解析与实用案例
  • 原文地址:https://blog.csdn.net/qq_35642849/article/details/134007075