一、简介
MQTT(消息队列遥测传输)是ISO 标准(ISO/IEC PRF 20922)下基于发布/订阅范式的消息协议。它工作在 TCP/IP协议族上,是为硬件性能低下的远程设备以及网络状况糟糕的情况下而设计的发布/订阅型消息协议,为此,它需要一个消息中间件 。
MQTT是一个基于客户端-服务器的消息发布/订阅传输协议。MQTT协议是轻量、简单、开放和易于实现的,这些特点使它适用范围非常广泛。在很多情况下,包括受限的环境中,如:机器与机器(M2M)通信和物联网(IoT)。其在,通过卫星链路通信传感器、偶尔拨号的医疗设备、智能家居、及一些小型化设备中已广泛使用。
MQTT协议在物联网中应用广泛,下面使用插件式集成到IoTBrowser平台,提供JS API即可发布broker和客户端实现发布、订阅等功能。

二、 开发插件
- 添加引用
- 添加MQTTNet,在NuGet搜索MQTTNet
- 添加Core,路径:\IoTBrowser\src\app_x64\Core.dll
- 添加Infrastructure,路径:\IoTBrowser\src\app_x64\Infrastructure.dll
- 添加Newtonsoft,路径:\IoTBrowser\src\app_x64\Newtonsoft.Json.dll
- 开发MqttHostCom和MqttClientCom插件
- MqttHostCom 服务端 broker
using DDS.IoT.Com; using System; using System.Collections.Generic; using System.IO.Ports; using System.Linq; using System.Runtime.InteropServices; using System.Text; using System.Threading; using System.Threading.Tasks; namespace DDS.IoT.Mqtt { public class MqttHostCom : ComBase { public override string Type => "mqttHostCom"; public override string Name => "Mqtt主机"; private MqttHostService hostService; public override bool Init(int port, int baudRate = 9600, string extendData = null) { this.Port = port; hostService = new MqttHostService(); hostService.PushId = this.Id; hostService.StartAsync(extendData, OnPushData); Console.WriteLine("初始化MqttHostCom驱动程序成功!"); return true; } public override event PushData OnPushData; public override bool Open() { var b = false; try { b = true; IsOpen = true; } catch (Exception ex) { string msg = string.Format("MqttHostCom串口打开失败:{0} ", ex.Message); Console.WriteLine(msg); } return b; } public override bool Close() { hostService.Dispose(); hostService = null; IsOpen = false; OnPushData = null; return true; } public override string Command(string name, string data) { var outData = string.Empty; return outData; } } }
2.MqttClientCom 客户端
实现发布和订阅接口
using DDS.IoT.Com; using System; using System.Collections.Generic; using System.IO.Ports; using System.Linq; using System.Runtime.InteropServices; using System.Text; using System.Threading; using System.Threading.Tasks; namespace DDS.IoT.Mqtt { public class MqttClientCom : ComBase { public override string Type => "mqttClientCom"; public override string Name => "Mqtt客户端"; private MqttClientService mqttClientService; public override bool Init(int port, int baudRate = 9600, string extendData = null) { mqttClientService = new MqttClientService(); mqttClientService.PushId = this.Id; this.Port = port; mqttClientService.MqttClientStart(extendData,this.OnPushData); Console.WriteLine("初始化MqttClientCom驱动程序成功!"); return true; } public override event PushData OnPushData; public override bool Open() { var b = false; try { mqttClientService.Open(); b = true; IsOpen = true; } catch (Exception ex) { string msg = string.Format("MqttClientCom串口打开失败:{0} ", ex.Message); Console.WriteLine(msg); } return b; } public override bool Close() { mqttClientService.Close(); mqttClientService = null; IsOpen = false; OnPushData = null; return true; } public override string Command(string name, string data) { var outData = string.Empty; var dataObj = Newtonsoft.Json.JsonConvert.DeserializeObject<dynamic>(data); switch (name) { case "Publish": string topic = dataObj.topic; string payload = dataObj.data; int? level = dataObj.level; bool? retain = dataObj.retain; if (!level.HasValue) { level = 1; } if (!retain.HasValue) { retain = false; } outData =mqttClientService.Publish(topic, payload, level.Value, retain.Value).ToString(); break; case "Subscribe": topic = dataObj.topic; level = dataObj.level; if (!level.HasValue) { level = 0; } outData = mqttClientService.Subscribe(topic, level.Value).ToString(); break; } return outData; } } }
3.前端测试
DOCTYPE HTML PUBLIC "-//WAPFORUM//DTD XHTML Mobile 1.0//EN" "http://www.wapforum.org/DTD/xhtml-mobile10.dtd">
<html>
<head lang="en">
<title>Mqtttitle>
<meta charset="UTF-8">
<meta http-equiv="X-UA-Compatible" content="IE=edge">
<meta name="viewport" content="width=device-width,initial-scale=1.0,maximum-scale=1.0,user-scalable=0">
<meta name="format-detection" content="telephone=no">
<meta name="renderer" content="webkit">
<META HTTP-EQUIV="pragma" CONTENT="no-cache">
<META HTTP-EQUIV="Cache-Control" CONTENT="no-cache, must-revalidate">
<META HTTP-EQUIV="expires" CONTENT="0">
<link rel="alternate icon" type="image/png" href="favicon.png">
<link rel="stylesheet" href="/scripts/amazeui/amazeui.min.css" />
<link rel="stylesheet" href="../css/main.css" />
<script src="/scripts/jquery-3.3.1.min.js">script>
<script src="/scripts/amazeui/amazeui.min.js">script>
<script src="/scripts/jquery.signalR-2.4.1.min.js">script>
<style>
.list {
width: 1000px !important;
}
.am-form {
width: 100%;
background: #fff;
}
.refresh-port {
width: 80px !important;
height: 30px !important;
}
#msg, #msgWrite {
clear: both;
}
.am-u-sm-4 {
padding: 3px;
}
style>
<script type="text/javascript">
var hostid;// 主机id
var clientid;// 客户端id
function startHost() {
var args = $('#txtHostArgs').val();
dds.iot.com.open({
type: 'mqttHostCom',//mqtt主机
port: 1,
baudRate: 1,
extendData: args,
//extendData: JSON.stringify({ server: "*", port: 1883 }),
onReceive: function (res) {
addMsg('host:' + JSON.stringify(res.data))
console.log('host', res.data)
},
onOpen: function (ar) {
if (ar.Success) {
hostid = ar.Data;
addMsg('连接成功!')
} else {
alert(ar.Message)
}
}
})
}
function closeHost() {
dds.iot.com.close(hostid)
}
function startClient() {
var args = $('#txtClientArgs').val();
dds.iot.com.open({
type: 'mqttClientCom',//mqtt客户端
port: 1,
baudRate: 1,
extendData: args,
//extendData: JSON.stringify({ server: "localhost", port: 1883, clientid: "1", username: "", password:""}),
onReceive: function (res) {
addMsg('client:' + JSON.stringify(res.data))
console.log('client',res.data)
},
onOpen: function (ar) {
if (ar.Success) {
clientid = ar.Data;
addMsg('连接成功!')
} else {
alert(ar.Message)
}
}
})
}
function subscribe() {
var topic = $('#txtTopic').val();
dds.iot.com.exeCommand({ id: clientid, name: "Subscribe", data: { topic: topic, level: 0 } }, function (ar) {
if (ar.Success) {
addMsg('订阅成功!')
} else {
addMsg('操作失败:' + ar.Message)
}
})
}
function publish() {
var topic = $('#txtTopic').val();
var contents = $('#txtContents').val();
dds.iot.com.exeCommand({ id: clientid, name: "Publish", data: { topic: topic, data: contents } }, function (ar) {
if (ar.Success) {
addMsg('发布成功!')
} else {
addMsg('操作失败:' + ar.Message)
}
})
}
function closeClient() {
dds.iot.com.close(clientid)
}
var $msg;
function addMsg(msg) {
$msg.val($msg.val()+"\n"+msg);
}
function clearLog() {
$msg.val('');
}
// 窗口初始化事件(操作窗口大小、标题)
$(document).bind('dds.window.init', function (e, win) {
$msg = $("#msg");
})
script>
head>
<body>
<div class="fun_bd" style="padding:10px;">
<form class="am-form">
<h3>数据读取h3>
<fieldset>
<div class="am-form-group">
<label for="doc-ipt-email-1" class="am-u-sm-4">主机参数label>
<div class="am-u-sm-6">
<input id="txtHostArgs" type="text" value='{ server: "*", port: 1883 }' />
div>
<button onclick="startHost()" class="am-btn-primary" type="button">启动主机button>
<button onclick="closeHost()" class="am-btn-danger" type="button">关闭主机button>
div>
<div class="am-form-group">
<label for="doc-ipt-email-1" class="am-u-sm-4">客户端参数label>
<div class="am-u-sm-6">
<input id="txtClientArgs" type="text" value='{ server: "localhost", port: 1883, clientid: "1", username: "", password:""}' />
div>
<button onclick="startClient()" class="am-btn-primary" type="button">启动客户端button>
<button onclick="closeClient()" class="am-btn-danger" type="button">关闭客户端button>
div>
<div class="am-form-group">
<label for="doc-ipt-email-1" class="am-u-sm-4">主题label>
<div class="am-u-sm-6">
<input id="txtTopic" type="text" value="/dds/iot/mqtt/test" />
div>
<button onclick="subscribe()" class="am-btn-primary" type="button">订阅主题button>
div>
<div class="am-form-group">
<label for="doc-ipt-email-1" class="am-u-sm-4">主题内容label>
<div class="am-u-sm-6">
<input id="txtContents" type="text" value="测试测试" />
div>
<button onclick="publish()" class="am-btn-primary" type="button">发布主题button>
div>
<textarea id="msg" rows="18">textarea>
<div class="am-form-group">
<button onclick="clearLog()" class="am-btn-default" type="button">清除日志button>
div>
fieldset>
form>
div>
body>
html>
代码地址:https://gitee.com/yizhuqing/IoTBrowser
基于Chromium内核使用H5快速开发工控系统界面,使用JS API前端人员既可以完成界面展示与硬件控制。系统自带串口、RFID、电子秤等硬件协议支持,并且支持二次定制开发。可以用来开发人机界面(HMI)或数据采集与监督控制系统(SCADA) 。 使用H5或Vue可以本地打包离线应用,也可以在线加载Web网页来控制设备硬件。
