一.使用docker搭建Emqx
1.拉取emqx镜像
docker pull emqx/emqx:5.7
2.运行
docker run -d --name emqx emqx/emqx:5.7
3.拷贝 docker中 etc data log 到宿主机的 /opt/emqx 下
mkdir -p /opt/emqx docker cp emqx:/opt/emqx/etc /opt/emqx docker cp emqx:/opt/emqx/log /opt/emqx docker cp emqx:/opt/emqx/data /opt/emqx
4.重新部署
docker rm -f emqx ## 授权目录 chmod -R 777 /opt/emqx/data /opt/emqx/log /opt/emqx/etc docker run -d --memory 2G --read-only --name emqx -v /opt/emqx/data:/opt/emqx/data -v /opt/emqx/etc:/opt/emqx/etc -v /opt/emqx/log:/opt/emqx/log -p 1883:1883 -p 8083:8083 -p 8084:8084 -p 8883:8883 -p 18083:18083 emqx/emqx:5.7
5.打开 1883(TCP)、8083、8084、8883(SSL)、18083 服务器安全组(端口);浏览器输入IP:18083 默认账号密码:admin public
二.配置ssl/tls双向认证
1.生成自签名的CA key和证书
cd /opt/emqx/etc/certs openssl genrsa -out ca.key 2048 openssl req -x509 -new -nodes -key ca.key -sha256 -days 36500 -out ca.pem -subj "/C=CN/ST=ZheJiang/L=HangZhou/O=HY/CN=SelfCA"
2.生成服务器端的key和证书
openssl genrsa -out emqx.key 2048 openssl req -new -key ./emqx.key -config /etc/ssl/openssl.cnf -out emqx.csr
注意这里因为 openssl.cnf 目录不同可能报错
执行以下指令 openssl version -a 获取 openssl.cnf 目录 (如果没有则需要安装 openssl)
修改一下 openssl.cnf 目录继续进行下一步
openssl req -new -key ./emqx.key -config /etc/pki/tls/openssl.cnf -out emqx.csr
## 注意 openssl.cnf 目录地址
openssl x509 -req -in ./emqx.csr -CA ca.pem -CAkey ca.key -CAcreateserial -out emqx.pem -days 36500 -sha256 -extensions v3_req -extfile /etc/pki/tls/openssl.cnf
3.生成客户端key和证书
openssl genrsa -out client.key 2048 openssl req -new -key client.key -out client.csr -subj "/C=CN/ST=ZheJiang/L=HangZhou/O=HY/CN=client" openssl x509 -req -days 36500 -in client.csr -CA ca.pem -CAkey ca.key -CAcreateserial -out client.pem
4.修改配置文件
通过以上三步生成了以下九个文件
修改 vim /opt/emqx/etc/emqx.conf 文件:在末尾 加入以下配置
listeners.tcp.default { bind = "0.0.0.0:1883" max_connections = 512000 } listeners.ssl.default { bind = "0.0.0.0:8883" max_connections = 512000 ssl_options { keyfile = "/opt/emqx/etc/certs/emqx.key" certfile = "/opt/emqx/etc/certs/emqx.pem" cacertfile = "/opt/emqx/etc/certs/ca.pem" ## password = "123456" verify = verify_peer fail_if_no_peer_cert = true } }
5.重启emqx
docker restart emqx
6.测试工具连接
下载 MQTTX桌面连接工具:https://mqttx.app/zh/downloads ;双向认证需要 ca 证书、客户端证书、客户端证书key,如下图所示
没有配置客户端认证,所以密码可以为空
到此,MQTT搭建、双向认证已经完成!
三.HTTP客户端认证
1.EMQX认证简介
EMQX认证:MQTT每次连接时会先走这里进行一个认证过程,EMQX提供了Password-Base、JWT、SCRAM三种认证方式;其中Password-Base又提供了 内置数据库、MySQL、MongoDB、PostgreSQL、Redis、LDAP、HTTP等多种认证过程,前几种都是基于数据库的认证,通过连接时去执行对应的SQL来判断登录用户有没有在数据库中存在来进行简单的认证。HTTP 则是通过连接时调用自定义的HTTP/HTTPS 接口来实现认证,使用HTTP认证可以更灵活的根据自己的业务流程来进行更复杂的认证。
2.具体认证流程
2.1.前置条件
2.2.规约 参照阿里云IOT平台 https://help.aliyun.com/zh/iot/user-guide/establish-mqtt-connections-over-tcp?spm=a2c4g.11186623.0.0.6212282cO9B7oJ
2.2.1假设:
clientId = 666666,productkey=a1PzPc1bRRN,deviceName = 5D3B393432C3, timestamp=1719309618,signmethod=hmacsha1
连接时MQTT的三个入参:
clientid
|
666666|signmethod=hmacsha1,timestamp=1719309618| |
username
|
5D3B393432C3&a1PzPc1bRRN |
username
|
hmacSha1 (clientid666666devicename5D3B393432C3productkeya1PzPc1bRRNtimestamp1719309618).toHexString |
2.2.2描述:
clientid:使用设备clientid + 签名方法 + 时间戳组成
username:设备DN + & + 产品PK组成
password: clientid666666devicename5D3B393432C3productkeya1PzPc1bRRNtimestamp1719309618 字符串使用设备的 deviceSecret 为秘钥进行hmacSha1加密,然后转16进制字符串
2.3服务端收到MQTT连接信息:
根据 username 信息获取到设备PK和DN ---> 到数据库查询秘钥信息 ----> 根据 clientid参数获取clientid、时间戳、签名方法 ----> 使用从数据库获取的秘钥对参数进行加密得到新得password ----> 比对两个密码是否相同 ---> 完成验证设备连接到MQTT。
3.控制台配置
4.HTTP认证代码示例
/** * @description: MQTT认证接口 * @author: zhouhong * @date: 2024-06-20 18:20 */ @Log4j2 @RestController @RequestMapping("/mqtt") public class MqttAuthController { @Resource private IotDeviceMapper iotDeviceMapper; @RequestMapping("/check") public MqttAuthRes check(@RequestBody MqttAuthParam mqttAuthParam) { log.info("1.入参:" + mqttAuthParam.toString()); MqttAuthRes authRes = new MqttAuthRes(); if (ObjectUtil.isNotNull(mqttAuthParam)) { // 超级用户、特殊用户,不需要进行设备 PK、DN 验证 if ("username".equals(mqttAuthParam.getUsername()) && "************".equals(mqttAuthParam.getPassword())) { authRes.setResult("allow"); authRes.setIs_superuser(true); return authRes; } else if ("admin".equals(mqttAuthParam.getUsername()) && "************".equals(mqttAuthParam.getPassword())) { authRes.setResult("allow"); authRes.setIs_superuser(true); return authRes; } else { // 对普通设备进行鉴权认证 // 1. 根据PK、DN查询数据库设备的DS信息 String username = mqttAuthParam.getUsername(); if (!username.contains("&")) { throw new RuntimeException("设备登录MQTT账号格式错误"); } String[] nameArr = username.split("&"); String dn = nameArr[0]; String pk = nameArr[1]; log.info("3.PK、DN" + pk + "|" + dn); String deviceSecret = iotDeviceMapper.getDeviceSecretByPkDn(pk, dn); log.info("4.查询到的设备秘钥信息" + deviceSecret); if (ObjectUtil.isNotNull(deviceSecret)) { // 3. 校验加密后的结果与传进来的password是否一致 String client = mqttAuthParam.getClientid(); String[] clientArr = client.split("\\|"); String clientid = clientArr[0]; String otherParam = clientArr[1]; // 时间戳 String timestampStr = otherParam.split(",")[1]; String timestamp = timestampStr.split("=")[1]; // 加密方法 String signmethodStr = otherParam.split(",")[0]; String signmethod = signmethodStr.split("=")[1]; String hexStr = "clientid"+clientid+"devicename"+dn+"productkey"+pk+"timestamp"+timestamp; log.info("5.加密前的字符串" + hexStr); // 对 hexStr 使用deviceSecret 进行加密 String password = ""; switch (signmethod.toLowerCase()) { case "hmacsha1" -> { password = HmacUtil.encrypt(hexStr, deviceSecret, HmacUtil.HMAC_SHA1); } case "hmacsha256" -> { password = HmacUtil.encrypt(hexStr, deviceSecret, HmacUtil.HMAC_SHA256); } case "hmacsha512" -> { password = HmacUtil.encrypt(hexStr, deviceSecret, HmacUtil.HMAC_SHA512); } case "hmacmd5" -> { password = HmacUtil.encrypt(hexStr, deviceSecret, HmacUtil.HMAC_MD5); } default -> { } } log.info("6.加密后的字符串" + password); if (!Objects.equals(password.toUpperCase(), "") && password.equalsIgnoreCase(mqttAuthParam.getPassword())) { authRes.setResult("allow"); authRes.setIs_superuser(true); log.info("7.鉴权成功"); return authRes; } else { authRes.setResult("deny"); } } } } authRes.setResult("deny"); return authRes; } }
四. Java基于双向认证连接MQTT
public static SSLSocketFactory getSocketFactory(final String caCrtFile, final String crtFile, final String keyFile, final String password) throws Exception { InputStream caInputStream = null; InputStream crtInputStream = null; InputStream keyInputStream = null; try { Security.addProvider(new BouncyCastleProvider()); CertificateFactory cf = CertificateFactory.getInstance("X.509"); caInputStream = new ClassPathResource(caCrtFile).getInputStream(); X509Certificate caCert = null; while (caInputStream.available() > 0) { caCert = (X509Certificate) cf.generateCertificate(caInputStream); } crtInputStream = new ClassPathResource(crtFile).getInputStream(); X509Certificate cert = null; while (crtInputStream.available() > 0) { cert = (X509Certificate) cf.generateCertificate(crtInputStream); } keyInputStream = new ClassPathResource(keyFile).getInputStream(); PEMParser pemParser = new PEMParser(new InputStreamReader(keyInputStream)); Object object = pemParser.readObject(); PEMDecryptorProvider decProv = new JcePEMDecryptorProviderBuilder().build(password.toCharArray()); JcaPEMKeyConverter converter = new JcaPEMKeyConverter().setProvider("BC"); KeyPair key; if (object instanceof PEMEncryptedKeyPair) { key = converter.getKeyPair(((PEMEncryptedKeyPair) object).decryptKeyPair(decProv)); } else { key = converter.getKeyPair((PEMKeyPair) object); } pemParser.close(); KeyStore caKs = KeyStore.getInstance(KeyStore.getDefaultType()); caKs.load(null, null); caKs.setCertificateEntry("ca-certificate", caCert); TrustManagerFactory tmf = TrustManagerFactory.getInstance("X509"); tmf.init(caKs); KeyStore ks = KeyStore.getInstance(KeyStore.getDefaultType()); ks.load(null, null); ks.setCertificateEntry("certificate", cert); ks.setKeyEntry("private-key", key.getPrivate(), password.toCharArray(), new java.security.cert.Certificate[]{cert}); KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm()); kmf.init(ks, password.toCharArray()); SSLContext context = SSLContext.getInstance("TLSv1.2"); context.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null); return context.getSocketFactory(); } finally { if (null != caInputStream) { caInputStream.close(); } if (null != crtInputStream) { crtInputStream.close(); } if (null != keyInputStream) { keyInputStream.close(); } } }
这里只提供解析证书的代码,连接时直接放进去即可,如果有需要可以私信我发全代码