实现人离开房间超过一定时间,自动关闭空调联动控制功能。
通过客流密度摄像机监测客流人数变化,发送订阅消息到mqtt 消息服务器,WEB后台服务器订阅mqtt主题,接收客流密度摄像机发送的订阅消息后,处理订阅消息通过后台webSocket 客户端发送关闭空调的主题到webSocket 服务端,webSocket 中间件(C语言开发的)进而控制空调的关闭。
1.通过mqtt 订阅客流密度主题:topic/whg/areapeoplecount/publish/WHG37020001
注: 此主题配置到 Spring properties 配置文件
接收到客流密度设备推送的订阅消息。
消息格式为:{
organCode: ‘’,
areaCode: ‘’,
deviceCode: ‘’,
insidePeopleNum: ‘’,
dataTime: ‘’
}
创建po:AutoColseAirPo 缓存到全局 Map 里面,开启线程定时轮训Map 里面设备是否超时,如果超时则执行关闭空调,从map里面删除该条记录;期间如果有人进入则刷新
AutoColseAirPo{
organCode: ‘’,
areaCode: ‘’,
deviceCode: ‘’,
insidePeopleNum: ‘’,
dataTime: ‘’,yyyy-MM-dd HH:mm:ss
startTime: ‘’, yyyy-MM-dd HH:mm:ss
expireTime: 20
}
当收到客流消息判断:
设备类型 type.equals(‘humanDensity’) && insidePeopleNum == 0 && currentDateTime-dataTime>20 时向webSocket 发布订阅消息控制空调关闭
“CONFIG|”+code+“|”+value+“|END”
创建关联表:ibs_linkagectrl_rel
id ctrled_device_id(被控设备id) ctrl_device_code(客流密度设备code)
自增主键 303 WHG37020001A0001
CREATE TABLE ibs_linkagectrl_rel (
id INT PRIMARY KEY AUTO_INCREMENT COMMENT ‘自增主键’,
ctrled_device_id INT COMMENT ‘受控设备id’,
ctrl_device_code VARCHAR(20) COMMENT ‘控制设备code’
)COMMENT=‘设备联动关联表’
INSERT INTO ibs_linkagectrl_rel(ctrled_device_id,ctrl_device_code)VALUES
(303,‘WHG37020001A0001’)
根据 客流密度消息返回的ctrl_code 查询出 ctrled_id ,再根据ctrled_id 查询出受控设备,再查询出受控设备的 code
具体sql:
SELECT * from ibs_item_parameter where item_id =‘303’ and parameter_type = QTSZ
查询出: code = Channel131.Device0403.T008
关闭空调: value = 0
“CONFIG|Channel131.Device0403.T008|0|END”
注: 空调设备类型:DLJNJ 客流密度设备类型:humanDensity
MqttConfig.java
package com.wzw.config.mq;
import com.wzw.common.util.StringUtil;
import com.wzw.service.IbsAlarmRecordService;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
@Configuration
@ConditionalOnProperty(name = "spring.mqtt.enabled",havingValue = "true")
public class MqttConfig {
public static final Logger logger = LoggerFactory.getLogger(MqttConfig.class);
public static final String PEOPLE_TOPIC="topic/whg/peoplecount/publish";
public static final String ACS_RECORD_TOPIC="topic/whg/acs/record";
@Value("${spring.mqtt.username}")
private String username;
@Value("${spring.mqtt.password}")
private String password;
@Value("${spring.mqtt.url}")
private String[] hostUrl;
@Value("${spring.mqtt.client.id}")
private String clientId;
@Value("${spring.mqtt.default.topic}")
private String defaultTopic;
@Value("${spring.mqtt.completionTimeout:5000}")
private int completionTimeout;
@Value("${spring.mqtt.keepAliveInterval:30000}")
private int keepAliveInterval;
@Value("${props.linkagectrl.mqtt.humanDensityTopic}")
private String humanDensityTopic;
// 连接配置
@Bean
public MqttConnectOptions getMqttConnectOptions(){
MqttConnectOptions mqttConnectOptions=new MqttConnectOptions();
mqttConnectOptions.setUserName(username);
mqttConnectOptions.setPassword(password.toCharArray());
mqttConnectOptions.setServerURIs(hostUrl);
mqttConnectOptions.setKeepAliveInterval(keepAliveInterval);
//mqttConnectOptions.setMaxInflight(100); 当前版本不支持
mqttConnectOptions.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1);
return mqttConnectOptions;
}
@Bean
public MqttPahoClientFactory mqttClientFactory() {
//检测mqttserver连接可用性
checkMqttServer();
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
factory.setConnectionOptions(getMqttConnectOptions());
return factory;
}
// 发送通道
@Bean
public MessageChannel mqttOutboundChannel() {
return new DirectChannel();
}
//联动控制消息接收通道
@Bean
@ConditionalOnProperty(name = "props.linkagectrl.enabled",havingValue = "true")
public MessageChannel mqttInputChannelForLinkageCtrl(){
return new DirectChannel();
}
//检测MqttServer服务器连接可用性
//解决artemis服务启动慢导致初始化时连接mqtt服务器失败问题
private boolean checkMqttServer(){
if(hostUrl==null||hostUrl.length<=0||hostUrl[0].equals("")){
logger.error("mqtthostUrl is null or empty");
return false;
}
String[] temp= hostUrl[0].split(":");
if(temp.length<3) return false;
boolean res = false;
for(int i=0;i<200;i++){
Socket connect = new Socket();
try {
connect.connect(new InetSocketAddress(temp[1].replaceAll("/", ""), Integer.parseInt(temp[2])),100);//建立连接
res = connect.isConnected();//通过现有方法查看连通状态
if(res){
logger.info("connect to artemis success!");
return res;
}else{
logger.error("connect to artemis failed!");//当连不通时,直接抛异常,异常捕获即可
}
} catch (IOException e) {
logger.error("connect to artemis failed! "+e.getMessage());//当连不通时,直接抛异常,异常捕获即可
}finally{
try {
connect.close();
connect=null;
} catch (IOException e) {
logger.error("close connect to artemis failed! "+e.getMessage());
}
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
}
return false;
}
//发送通道处理类
@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MessageHandler mqttOutbound() {
MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(clientId+"out_"+ StringUtil.getUUID_8(), mqttClientFactory());
messageHandler.setAsync(true);
messageHandler.setDefaultTopic(defaultTopic);
messageHandler.setDefaultQos(0);
return messageHandler;
}
//消息生产者,MqttPahoMessageDrivenChannelAdapter适配器配置接收mqtt 服务器相应主题的消息、设置Qos、接收通道等
@Bean
@ConditionalOnProperty(name = "props.linkagectrl.enabled",havingValue = "true")
public MessageProducer mqttInboundForLinkageCtrl(){
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(clientId+"LinkageCtrl_in_"+StringUtil.getUUID_8(),mqttClientFactory(),humanDensityTopic+"/+");
adapter.setCompletionTimeout(completionTimeout);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(2);
adapter.setOutputChannel(mqttInputChannelForLinkageCtrl());
return adapter;
}
// 自定义配置消息处理类,主要业务逻辑在MqttLinkageCtrlMessageHandler 类
@Bean
@ServiceActivator(inputChannel = "mqttInputChannelForLinkageCtrl")
@ConditionalOnProperty(name = "props.linkagectrl.enabled",havingValue = "true")
public MessageHandler mqttOutLinkageCtrl(){
MqttLinkageCtrlMessageHandler linkageCtrlMessageHandler = new MqttLinkageCtrlMessageHandler();
linkageCtrlMessageHandler.setHumanDensityTopic(this.humanDensityTopic);
return linkageCtrlMessageHandler;
}
public String getBranchMsgTopic() {
return branchMsgTopic;
}
public String getControlMsgTopic() {
return controlMsgTopic;
}
public String getWarnMsgTopic() {
return warnMsgTopic;
}
public String getUsername() {
return username;
}
public String getPassword() {
return password;
}
public String[] getHostUrl() {
return hostUrl;
}
}
application.properties
# 客流密度联动控制空调设备配置
props.linkagectrl.enabled=true
props.linkagectrl.air.expireTime=600000
props.linkagectrl.wsServerUrl=ws://127.0.0.1:8082/webSocket
props.linkagectrl.mqtt.humanDensityTopic=topic/whg/areapeoplecount/publish
MqttLinkageCtrlMessageHandler.java
package com.wzw.config.mq;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import com.wzw.common.po.AutoCloseAirPo;
import com.wzw.common.po.AutoCloseAirPoFactory;
import com.wzw.service.IbsItemService;
import com.wzw.vo.IbsItemVO;
import lombok.extern.slf4j.Slf4j;
import org.java_websocket.WebSocket;
import org.java_websocket.client.WebSocketClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.*;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* @description: mqtt 联动处理类
* @author:houqd
* @time: 2022/6/23 11:31
*/
@Slf4j
public class MqttLinkageCtrlMessageHandler implements MessageHandler {
private static final String HUMAN_DENSITY_DEVICE_TYPE = "humanDensity";
private static Map<String, Object> cache = new ConcurrentHashMap<>();
private final Lock lock = new ReentrantLock();
private String humanDensityTopic;
@Autowired
private IbsItemService ibsItemService;
private ExecutorService executorService;
private ScheduledExecutorService scheduledExecutorService;
@Autowired
private AutoCloseAirPoFactory autoCloseAirPoFactory;
@Autowired
private WebSocketClient webSocketClient;
private volatile boolean isOpen = false;
private Timer timer ;
@PostConstruct
public void init() {
if (null == executorService) {
executorService = new ThreadPoolExecutor(5,5,60L,TimeUnit.SECONDS,new ArrayBlockingQueue<>(100),new ThreadPoolExecutor.AbortPolicy());
// executorService = Executors.newFixedThreadPool(5);
}
if (null == scheduledExecutorService) {
scheduledExecutorService = Executors.newScheduledThreadPool(5);
}
isOpen = webSocketClient.isOpen();
if (!isOpen){
reconnet();
}
while (isOpen){
closeAir();
break;
}
}
public synchronized void reconnet(){
if (webSocketClient!=null&&!webSocketClient.isOpen()){
timer = new Timer();
timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
if (webSocketClient.getReadyState().equals(WebSocket.READYSTATE.NOT_YET_CONNECTED)){
try {
isOpen = webSocketClient.connectBlocking();
if (isOpen){
closeAir();
timer.cancel();
timer=null;
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}else if (webSocketClient.getReadyState().equals(WebSocket.READYSTATE.CLOSING)||webSocketClient.getReadyState().equals(WebSocket.READYSTATE.CLOSED)){
try {
isOpen = webSocketClient.reconnectBlocking();
if (isOpen){
closeAir();
timer.cancel();
timer = null;
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
},0,1);
}
}
@PreDestroy
public void destroy() {
if (null != executorService) {
executorService = null;
}
if (null != scheduledExecutorService) {
scheduledExecutorService = null;
}
}
// 执行关闭空调指令
private void closeAir() {
scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
if (cache.size() > 0) {
try {
lock.lock();
for (Map.Entry<String, Object> entry : cache.entrySet()) {
String key = entry.getKey();
AutoCloseAirPo airPo = (AutoCloseAirPo) entry.getValue();
LocalDateTime startTime = airPo.getStartTime();//上次人离开房间时间
LocalDateTime now = LocalDateTime.now();// 当前时间
// TODO isAfter 测试,正式需要修改为isBefore
if (startTime.plusSeconds(airPo.getExpireTime()).isBefore(now)) {// 如果超过超期时间则执行空调关闭
List<Map<Long, String>> closeCmdList = airPo.getCloseCmdList();
for (Map<Long, String> map : closeCmdList) {
for (Map.Entry entry1 : map.entrySet()) {
String closeCmd = (String) entry1.getValue();
// 向webscoket 中间件发送关闭空调指令
webSocketClient.send(closeCmd);
log.info("-------[发送联动控制指令成功:"+closeCmd+"]");
cache.remove(key);
}
}
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
}, 5, 10, TimeUnit.SECONDS);
}
@Override
public void handleMessage(Message<?> message) throws MessagingException {
//springBoot2.0 中 此处为 mqtt_receivedTopic 。这个问题卡了好久。
String topic = message.getHeaders().get("mqtt_topic").toString();
if (topic.startsWith(humanDensityTopic)) {
dealCtrlAirClose(message);
}
}
/**
* 根据客流密度控制空调关闭
* 接收到客流密度设备推送的订阅消息。
* 消息格式为:{
* organCode: ‘’,
* areaCode: ‘’,
* deviceCode: ‘’,
* insidePeopleNum: ‘’,
* dataTime: ‘’
* }
*
* @param message
*/
private void dealCtrlAirClose(Message<?> message) {
executorService.execute(new Runnable() {
@Override
public void run() {
JsonObject jsonObject = JsonParser.parseString(message.getPayload().toString()).getAsJsonObject();
String deviceCode = jsonObject.get("deviceCode").getAsString();// 客流密度设备编码
int insidePeopleNum = jsonObject.get("insidePeopleNum").getAsInt();// 客流人数
IbsItemVO ctrlDevice = ibsItemService.getIbsItemByCode(deviceCode);
// 客流密度摄像头监测到人数为零时向缓存添加
if (null != ctrlDevice && HUMAN_DENSITY_DEVICE_TYPE.equals(ctrlDevice.getType())) {
// TODO 测试用 insidePeopleNum > 0, 正式环境改为 =0
if (insidePeopleNum == 0) { // 人离开事件
AutoCloseAirPo oldAirPo = (AutoCloseAirPo) cache.get(deviceCode);
if (null == oldAirPo) {
AutoCloseAirPo autoCloseAirPo = autoCloseAirPoFactory.build(jsonObject, ctrlDevice);
cache.put(autoCloseAirPo.getCtrlDeviceCode(), autoCloseAirPo);
} else {
//TODO 刷新缓存
oldAirPo.setStartTime(LocalDateTime.now());
}
} else {// 人进入事件
AutoCloseAirPo oldAirPo = (AutoCloseAirPo) cache.get(deviceCode);
if (null != oldAirPo) {
//TODO 刷新缓存
oldAirPo.setStartTime(LocalDateTime.now());
}
}
}
}
});
}
public String getHumanDensityTopic() {
return humanDensityTopic;
}
public void setHumanDensityTopic(String humanDensityTopic) {
this.humanDensityTopic = humanDensityTopic;
}
}
MqttSenderTool.java
发送消息到mqtt 工具类
package com.wzw.config.mq;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;
@Component
public class MqttSenderTool {
@Autowired
private MqttGateway mqttGateway;
@Value("${spring.mqtt.controlMsgTopic:/topic/controlMsg/smartLight}")
private String controlMsgTopic;
@Value("${spring.mqtt.warnMsgTopic:/topic/warnMsg/smartLight}")
private String warnMsgTopic;
public String sendControlMsg(String sendData){
mqttGateway.sendToMqtt(controlMsgTopic, 2, sendData);
return "OK";
}
public String sendWarnMsg(String sendData){
mqttGateway.sendToMqtt(warnMsgTopic, 1, sendData);
return "OK";
}
}
SpringBoot 后台 实现WebSocketClient 配置
pom.xml 里
<!--websocket 后台客户端-->
<dependency>
<groupId>org.java-websocket</groupId>
<artifactId>Java-WebSocket</artifactId>
<version>1.3.8</version>
</dependency>
WebSocketConfig.java
package com.wzw.config;
import lombok.extern.slf4j.Slf4j;
import org.java_websocket.client.WebSocketClient;
import org.java_websocket.drafts.Draft_6455;
import org.java_websocket.handshake.ServerHandshake;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
import java.net.URI;
@Slf4j
@Configuration
public class WebSocketConfig {
@Value("${props.linkagectrl.wsServerUrl}")
private String wsServerUrl;
/**
* 注入ServerEndpointExporter,
* 这个bean会自动注册使用了@ServerEndpoint注解声明的Websocket endpoint
*
* 开发环境使用时才有效,方便调试,运行环境使用tomcat部署时,不需要在此处注册
*/
@Profile({"dev", "test","dev2","local"})
@Bean
public ServerEndpointExporter serverEndpointExporter(){
return new ServerEndpointExporter();
}
@Bean
@ConditionalOnProperty(name = "props.linkagectrl.enabled",havingValue = "true")
public WebSocketClient webSocketClient() {
try {
WebSocketClient webSocketClient = new WebSocketClient(new URI(wsServerUrl),new Draft_6455()) {
@Override
public void onOpen(ServerHandshake handshakedata) {
log.info("[websocket] 连接成功");
}
@Override
public void onMessage(String message) {
log.info("[websocket] 收到消息={}",message);
}
@Override
public void onClose(int code, String reason, boolean remote) {
log.info("[websocket] 退出连接");
}
@Override
public void onError(Exception ex) {
log.info("[websocket] 连接错误={}",ex.getMessage());
}
};
webSocketClient.connect();
// webSocketClient.setConnectionLostTimeout(-1);// 停止心跳检查
return webSocketClient;
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
}
TestWebSocketClientController.java 测试类
package com.wzw.controller;
import com.wzw.config.annotation.IgnoreAuth;
import org.java_websocket.client.WebSocketClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
/**
* @description:
* @author:houqd
* @time: 2022/6/22 10:43
*/
@RestController
@RequestMapping("/webSocketClient")
@ConditionalOnProperty(name = "props.linkagectrl.enabled",havingValue = "true")
public class TestWebSocketClientController {
@Autowired
private WebSocketClient webSocketClient;
private ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(5);
private boolean flag = false;
@RequestMapping("/send")
@IgnoreAuth
public String send(){
webSocketClient.send("dechnicWebSocketClient 发送订阅信息成功!");
return "发送订阅成功";
}
}
SpringBoot webSocketServer 配置
package com.example.demo.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.websocket.*;
import javax.websocket.server.ServerEndpoint;
import java.util.concurrent.CopyOnWriteArrayList;
/**
* @description:
* @author:houqd
* @time: 2022/6/22 15:07
*/
@Slf4j
@Component
@ServerEndpoint("/webSocket")
public class WebSocketServer {
private static CopyOnWriteArrayList<WebSocketServer> list = new CopyOnWriteArrayList<>();
@OnOpen
public void onOpen(Session session) {
log.info("【websocket消息推送模块】客户端已连接!sessionID={}",session.getId());
}
@OnClose
public void onClose(Session session) {
log.info("【websocket消息推送模块】断开连接---- sessionID={}",session.getId());
list.remove(this);
}
@OnError
public void onError(Throwable error){
log.info("ws连接出错:{}", error.getMessage());
error.printStackTrace();
}
@OnMessage
public void onMessage(Session session, String message) {
//前端心跳检测
log.debug("WebSocketServer onMessage: sessionID={}, message={}, ", session.getId(), message);
}
}