最近项目中用到视频ai分析,由于sdk涉及保密,不便透露,仅对定时任务分析的思路作出分享,仅供参考。
由于ai服务器的性能上限,只能同时对64个rtsp流分析一种算法,或者对8个rtsp流分析8种算法。因此定时任务,做如下设计。
-
-
- import com.alibaba.fastjson.JSON;
- import com.alibaba.fastjson.JSONArray;
- import com.alibaba.fastjson.JSONObject;
- import com.ewaycloud.jw.ai.service.AiService;
- import com.ewaycloud.jw.camera.entity.Camera;
- import com.ewaycloud.jw.camera.mapper.CameraMapper;
- import com.ewaycloud.jw.camera.service.CameraService;
- import com.ewaycloud.jw.cases.dto.CaseDTO;
- import com.ewaycloud.jw.cases.service.CaseService;
- import com.ewaycloud.jw.channel.service.HikService;
- import com.ewaycloud.jw.task.entity.Task;
- import com.ewaycloud.jw.task.mapper.TaskMapper;
- import org.springframework.scheduling.annotation.EnableScheduling;
- import org.springframework.stereotype.Component;
-
- import javax.annotation.Resource;
- import java.util.Date;
- import java.util.List;
-
- /**
- * AI分析 定时任务 处理类
- *
- * @author gwh
- * @date 2024-04-14 13:59:17
- */
- @Component
- @EnableScheduling
- public class AiHandlerTask {
-
- @Resource
- AiService aiService;
- @Resource
- TaskService taskService;
- @Resource
- CameraService cameraService;
- @Resource
- private TaskMapper taskMapper;
-
- @Resource
- private CameraMapper cameraMapper;
- @Resource
- private HikService hkService;
-
- @Resource
- private CaseService caseService;
-
- /**
- * 注解中的Cron表达式: {秒数} {分钟} {小时} {日期} {月份} {星期} {年份(可为空)}
- * 注意:日和周其中的一个必须为"?"
- * 10/5 20 10 * * ? 每天10点20分第10秒以后,每3秒执行一次,到10点21分就不会执行了
- *
- * AI算法分析任务: 每5秒执行一次
- */
- // @Scheduled(cron = "0/5 * * * * ?")
- public void startTask(){
- // System.out.println("AI分析定时任务执行 每隔5秒执行一次:" + new Date());
- //查询要执行的任务
- List
aiTasks = taskMapper.findAiTasks(null); - if (null != aiTasks) {
- for(Task vo:aiTasks){
- if (null != vo.getDeptId()) {
- //查询某谈话室下边的摄像头列表(flag是1 谈话人特写 和2 被谈话人特写 的)
- List
cameraList = cameraMapper.findCamersByDeptId(vo.getDeptId()); - if (null != cameraList && cameraList.size()>0) {
- for(Camera camera:cameraList) {
- //根据摄像头编码cameraCode,调用海康接口拉流
- String cameraCode = camera.getCameraCode();
- try {
- //根据cameraCode、开始时间、结束时间 调用海康接口 拉回放流
- //查询时间(IOS8601格式yyyy-MM-dd'T'HH:mm:ss.SSSzzz,和结束时间相差不超过三天
- JSONObject data = hkService.playbackURLs( cameraCode, vo.getStartTime(), vo.getEndTime());
- //谈话人特写AI分析
- if (null != data && null != data.getString("url")) {
- String rtspUrl = data.getString("url");
- //疑似肢体冲突
- // startAiTask(rtspUrl, 1L, vo.getStartTime(), vo.getEndTime(), vo);
- //玩手机分析
- // startAiTask(rtspUrl, 2L, vo.getStartTime(), vo.getEndTime(), vo);
- //倒地分析
- // startAiTask(rtspUrl, 3L, vo.getStartTime(), vo.getEndTime(), vo);
- //人数异常
- startAiTask(rtspUrl, 5L, vo.getStartTime(), vo.getEndTime(), vo);
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
- }
- }
- }
- // System.out.println("AI分析定时任务执行 每隔10秒执行一次:: " + new Date());
- }
-
- //执行拉流调用AI分析的方法
- public void startAiTask(String rtspUrl, Long aiId, String startTime, String endTime, Task vo) {
- //调用AI分析接口
- if (null != rtspUrl) {
- //调用海康AI算法分析
- String aiResponse = "";
- if (aiId == 1) {//疑似肢体冲突
- aiResponse = aiService.indoorPhysicalConfront(rtspUrl, startTime, endTime);
- vo.setBreakName("疑似肢体冲突");
- vo.setAiId(1L);
- } else if (aiId == 2) {//玩手机
- aiResponse = aiService.playCellphone(rtspUrl, startTime, endTime);
- vo.setBreakName("玩手机");
- vo.setAiId(2L);
- } else if (aiId == 3) {//倒地
- aiResponse = aiService.failDown(rtspUrl, startTime, endTime);
- vo.setBreakName("倒地");
- vo.setAiId(3L);
- } else if (aiId == 4) {//人员站立
- aiResponse = aiService.Standup(rtspUrl,startTime, endTime);
- vo.setBreakName("人员站立");
- vo.setAiId(4L);
- } else if (aiId == 5) {//人数异常
- aiResponse = aiService.PeopleNumChange(rtspUrl, startTime, endTime);
- vo.setBreakName("人数异常");
- vo.setAiId(5L);
- } else if (aiId == 6) {//声强突变
- aiResponse = aiService.audioAbnormal(rtspUrl, startTime, endTime);
- vo.setBreakName("声强突变");
- vo.setAiId(6L);
- } else if (aiId == 7) {//超时滞留
- aiResponse = aiService.overtimeTarry(rtspUrl, startTime, endTime);
- vo.setBreakName("超时滞留");
- vo.setAiId(7L);
- } else if (aiId == 8) {//攀高
- aiResponse = aiService.reachHeight(rtspUrl, startTime, endTime);
- vo.setBreakName("攀高");
- vo.setAiId(8L);
- }
- JSONObject aiResponseJSONObject = JSON.parseObject(aiResponse);
- // System.out.println("AI分析定时任务返回aiResponseJSONObject:" + aiResponseJSONObject);
- String taskId = "";
- String taskStatus = "";
- if (null != aiResponseJSONObject && null != aiResponseJSONObject.getString("taskID") ){
- taskId = aiResponseJSONObject.getString("taskID");
- //调用海康查询任务状态接口获取AI分析任务状态
- String result = aiService.queryTaskVideoStatus(taskId);
- JSONObject resultJSONObject = JSON.parseObject(result);
- JSONArray statusJSONArray = resultJSONObject.getJSONArray("status");
- JSONObject statusJSONObject = (JSONObject) statusJSONArray.get(0);
- taskStatus = statusJSONObject.getString("taskStatus");
- //将AI分析结果taskStatus插入task表中,更新任务表,状态:1 未执行, 2等待, 3 正在执行 , 4 已完成
- vo.setTaskState(Integer.parseInt(taskStatus));
- vo.setTaskId(taskId); //保存 海康返回的 taskID
- //如果任务完成,关闭rtsp流
- if ("4".equals(taskStatus)) {
- //根据caseId更新案件表的 task_state =1 , ai任务状态(0:未执行 1:已执行)
- Long caseId = vo.getCaseId();
- CaseDTO caseDTO = new CaseDTO();
- caseDTO.setCaseId(caseId);
- caseDTO.setCaseState(1);
- caseService.updCaseInfo(caseDTO);
- //关闭rtsp流
- try {
- hkService.clearPlayUrls(rtspUrl);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
- System.out.println("AI分析定时任务返回 taskId:" + taskId +" breakName: "+ vo.getBreakName() +" taskStatus: "+ taskStatus);
- //更新任务表, 根据caseId 和taskId查询任务,如果有更新,没有插入
- Task dto = new Task();
- dto.setCaseId(vo.getCaseId());
- dto.setTaskId(vo.getTaskId());
- List
tasks = taskMapper.findTasks(dto); - if(null != tasks && tasks.size()>0){
- for(Task po : tasks){
- vo.setId(po.getId());
- vo.setUpdateTime(new Date());
- taskService.updateById(po);
- }
- }else {
- vo.setCreateTime(new Date());
- vo.setUpdateTime(new Date());
- taskMapper.insert(vo);
- }
- }
- }
-
- }
-
-
- import com.baomidou.mybatisplus.extension.service.IService;
- import com.ewaycloud.jw.ai.entity.Ai;
- import com.ewaycloud.jw.task.entity.Task;
-
- import java.util.List;
-
- /**
- * AI对接
- *
- * @author gwh
- * @date 2024-03-13 13:49:09
- */
- public interface AiService extends IService
{ -
- String getAiDeviceInfo();
-
- /**
- * 创建--疑似肢体冲突事件--分析视频分析任务
- *
- */
- String indoorPhysicalConfront(String streamUrl, String startTime, String endTime);
-
- /**
- * 创建--玩手机--分析视频分析任务
- *
- */
- String playCellphone(String streamUrl, String startTime, String endTime);
-
- /**
- * 创建--倒地检测--分析视频分析任务
- *
- */
- String failDown(String streamUrl, String startTime, String endTime);
-
- /**
- * 创建--人员站立--分析视频分析任务
- *
- */
- String Standup(String streamUrl, String startTime, String endTime);
-
- /**
- * 创建--人数异常--分析视频分析任务
- *
- */
- String PeopleNumChange(String streamUrl, String startTime, String endTime);
-
-
- /**
- * 创建--声强突变--分析视频分析任务
- *
- */
- String audioAbnormal(String streamUrl, String startTime, String endTime);
-
-
- /**
- * 创建--超时滞留--分析视频分析任务
- *
- */
- String overtimeTarry(String streamUrl, String startTime, String endTime);
-
- /**
- * 创建--攀高--分析视频分析任务
- *
- */
- String reachHeight(String streamUrl, String startTime, String endTime);
-
-
- /**
- * 查询分析视频分析任务状态
- *
- */
- String queryTaskVideoStatus(String taskId);
-
-
-
- }
-
-
- import com.ewaycloud.jw.ai.entity.AiResolveResult;
- import com.ewaycloud.jw.ai.mapper.AiResolveResultMapper;
- import com.ewaycloud.jw.task.entity.ContentTypeEnum;
- import com.ewaycloud.jw.task.entity.Task;
- import com.ewaycloud.jw.task.mapper.TaskMapper;
- import com.mysql.cj.util.StringUtils;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.stereotype.Service;
- import org.springframework.transaction.annotation.Transactional;
-
- import javax.annotation.Resource;
- import java.io.ByteArrayOutputStream;
- import java.io.InputStream;
- import java.io.OutputStream;
- import java.net.ServerSocket;
- import java.net.Socket;
- import java.util.Date;
- import java.util.List;
-
- /**
- * @author gwhui
- * @date 2024/1/18 17:38
- * @desc 监听处理线程
- */
- @Slf4j
- @Service
- public class ListenThread implements Runnable {
- private final AlarmDataParser alarmDataParser = new AlarmDataParser();
-
- private static TaskMapper taskMapper;
- @Resource
- public void setVerificDao(TaskMapper taskMapper) {
- ListenThread.taskMapper = taskMapper;
- }
- private static AiResolveResultMapper aiResolveResultMapper;
- @Resource
- public void setVerificDao(AiResolveResultMapper aiResolveResultMapper) {
- ListenThread.aiResolveResultMapper = aiResolveResultMapper;
- }
-
- @Override
- public void run() {
- // int listenPort = propertiesUtil.getIntegerProperty("custom.isapi.listen.port", 9999);
- int listenPort =10006;
- try {
- ServerSocket serverSocket = new ServerSocket(listenPort);
- System.out.println("启动监听, 监听端口:" + listenPort);
- while (!Thread.currentThread().isInterrupted()) {
- Socket accept = serverSocket.accept();
- accept.setKeepAlive(true);
- System.out.println("设备(客户端)信息:" + accept.getInetAddress().getHostAddress());
- if (accept.isConnected()) {
- handleData(accept);
- }
- accept.close();
- }
- serverSocket.close();
- System.out.println("停止监听完成");
- } catch (InterruptedException e) {
- // 线程被中断的处理逻辑
- System.out.println("停止监听完成: " + e.getMessage());
- } catch (Exception e) {
- System.out.println("监听创建异常: " + e.getMessage());
- }
- }
-
- @Transactional(rollbackFor = Exception.class)
- public synchronized void handleData(Socket accept) throws Exception {
- InputStream inputData = accept.getInputStream();
- OutputStream outputData = accept.getOutputStream();
-
- // 输出数据
- ByteArrayOutputStream byOutputData = new ByteArrayOutputStream();
-
- byte[] buffer = new byte[2 * 1024 * 1024];
- int length = 0;
-
- // 持续接收处理数据直到接收完毕
- String recvAlarmData = "";
- while ((length = inputData.read(buffer)) > 0) {
- byOutputData.write(buffer, 0, length);
-
- String recvData = byOutputData.toString();
- recvAlarmData = recvAlarmData + recvData;
-
- // 获取boundary
- String strBoundary = "boundary=";
- int beginIndex = recvData.indexOf(strBoundary);
- beginIndex += strBoundary.length();
- int lenIndex = recvData.indexOf("\r\n", beginIndex);
- String strBoundaryMark = recvData.substring(beginIndex, lenIndex);
-
- if (recvAlarmData.contains("--" + strBoundaryMark.trim() + "--")) {
- //表单结束符判断接收结束
- break;
- }
- }
- // System.out.println("==============recvAlarmData========>> "+recvAlarmData);
- if(null != recvAlarmData){
- String taskId = null;
- int index = recvAlarmData.indexOf("
" ); - if(index != -1){
- taskId = recvAlarmData.substring(index + 8, index + 40);
- }
- //获取服务器返回的图片
- String bkgUrl = null;
- int indexStartBkgUrl = recvAlarmData.indexOf("
" ); - int indexEndBkgUrl = recvAlarmData.indexOf("");
- if(indexStartBkgUrl != -1){
- bkgUrl = recvAlarmData.substring(indexStartBkgUrl+8, indexEndBkgUrl);
- bkgUrl =bkgUrl.replaceAll("&","&");
- }
- System.out.println("===AIrecieveData===>>taskId: "+taskId +" bkgUrl: "+ bkgUrl);
- //根据taskId查询 任务信息
- if(!StringUtils.isNullOrEmpty(taskId)){
- Task task = taskMapper.finTaskByTaskId(taskId);
- if(null != task){
- AiResolveResult vo = new AiResolveResult();
- vo.setCreateTime(new Date());
- vo.setUpdateTime(new Date());
- vo.setTaskId(taskId); //保存海康返回的 taskId
- vo.setBreakName(task.getBreakName());
- vo.setAiId(task.getAiId());
- vo.setDeptId(task.getDeptId());
- vo.setCameraId(task.getCameraId());
- vo.setBreakTypeId(task.getAiId());
- vo.setRiskTime(task.getTalkTime());
- vo.setTalkAddress(task.getTalkAddress());
- vo.setTalkAddressName(task.getTalkAddressName());
- vo.setTalkUnit(task.getTalkUnit());
- vo.setTalkUnitName(task.getTalkUnitName());
- vo.setPhoto(bkgUrl); //保存海康返回的图片
- vo.setCaseId(task.getCaseId());
- vo.setCaseName(task.getCaseName());
- vo.setInterviewerName(task.getInterviewerName());
- //根据taskId查询任务结果表,如果有做更新操作,没有做插入操作
- List
aiResolveResults = aiResolveResultMapper.findAiResults(vo); - if(null != aiResolveResults && aiResolveResults.size()>0){
- for(AiResolveResult aiResolveResult:aiResolveResults){
- if(null != aiResolveResult){
- aiResolveResult.setPhoto(vo.getPhoto());
- aiResolveResultMapper.updateById(aiResolveResult);
- }
- }
- }else {
- aiResolveResultMapper.insert(vo);
- }
- }
-
- }
- }
- String response = "HTTP/1.1 200 OK" +
- "\r\n" +
- "Connection: close" +
- "\r\n\r\n";
- outputData.write(response.getBytes());
- outputData.flush();
- outputData.close();
- inputData.close();
- //解析数据
- response = parseAlarmInfoByte(byOutputData);
- System.out.println("==============response========>> "+response);
- }
-
- private String parseAlarmInfoByte(ByteArrayOutputStream byOutputData) throws Exception {
- // 事件报文字节
- byte[] byAlarmDataInfo = byOutputData.toByteArray();
- int iDataLen = byAlarmDataInfo.length;
-
- String szBoundaryMark = "boundary=";
- String szContentTypeMark = "Content-Type: ";
- int iTypeMarkLen = szContentTypeMark.getBytes("UTF-8").length;
- String szContentLenMark = "Content-Length: ";
- int iLenMarkLen = szContentLenMark.getBytes("UTF-8").length;
- String szContentLenMark2 = "content-length: ";
- int iLenMarkLen2 = szContentLenMark2.getBytes("UTF-8").length;
- int iContentLen = 0;
- String szEndMark = "\r\n";
- int iMarkLen = szEndMark.getBytes("UTF-8").length;
- String szEndMark2 = "\r\n\r\n";
- int iMarkLen2 = szEndMark2.getBytes("UTF-8").length;
- String szJson = "text/json";
- String szJpg = "image/jpeg";
-
- int iStartBoundary = doDataSearch(byAlarmDataInfo, szBoundaryMark.getBytes("UTF-8"), 0, byAlarmDataInfo.length);
- iStartBoundary += szBoundaryMark.getBytes("UTF-8").length;
- int iEndBoundary = doDataSearch(byAlarmDataInfo, szEndMark.getBytes("UTF-8"), iStartBoundary, byAlarmDataInfo.length);
- byte[] byBoundary = new byte[iEndBoundary - iStartBoundary];
- System.arraycopy(byAlarmDataInfo, iStartBoundary, byBoundary, 0, iEndBoundary - iStartBoundary);
-
- String szBoundaryEndMark = "--" + new String(byBoundary).trim() + "--";
- int iDateEnd = doDataSearch(byAlarmDataInfo, szBoundaryEndMark.getBytes("UTF-8"), 0, byAlarmDataInfo.length);
-
- String szBoundaryMidMark = "--" + new String(byBoundary).trim();
- int iBoundaryMidLen = szBoundaryMidMark.getBytes("UTF-8").length;
- int startIndex = iEndBoundary;
-
- String szContentType = "";
-
- int[] iBoundaryPos = new int[11]; //boundary个数,这里最大解析10个
- int iBoundaryNum = 0;
- for (iBoundaryNum = 0; iBoundaryNum < 10; iBoundaryNum++) {
- startIndex = doDataSearch(byAlarmDataInfo, szBoundaryMidMark.getBytes("UTF-8"), startIndex, iDateEnd);
- if (startIndex < 0) {
- break;
- }
- startIndex += iBoundaryMidLen;
- iBoundaryPos[iBoundaryNum] = startIndex;
- }
- iBoundaryPos[iBoundaryNum] = iDateEnd;//最后一个是结束符
-
- for (int i = 0; i < iBoundaryNum; i++) {
- // Content-Type
- int iStartType = doDataSearch(byAlarmDataInfo, szContentTypeMark.getBytes("UTF-8"), iBoundaryPos[i], iBoundaryPos[i + 1]);
- if (iStartType > 0) {
- iStartType += iTypeMarkLen;
- int iEndType = doDataSearch(byAlarmDataInfo, szEndMark.getBytes("UTF-8"), iStartType, iBoundaryPos[i + 1]);
- if (iEndType > 0) {
- byte[] byType = new byte[iEndType - iStartType];
- System.arraycopy(byAlarmDataInfo, iStartType, byType, 0, iEndType - iStartType);
- szContentType = new String(byType).trim();
- }
- }
-
- // Content-Length
- int iStartLength = doDataSearch(byAlarmDataInfo, szContentLenMark.getBytes("UTF-8"), iBoundaryPos[i], iBoundaryPos[i + 1]);
- if (iStartLength > 0) {
- iStartLength += iLenMarkLen;
- int iEndLength = doDataSearch(byAlarmDataInfo, szEndMark.getBytes("UTF-8"), iStartLength, iBoundaryPos[i + 1]);
- if (iEndLength > 0) {
- byte[] byLength = new byte[iEndLength - iStartLength];
- System.arraycopy(byAlarmDataInfo, iStartLength, byLength, 0, iEndLength - iStartLength);
- iContentLen = Integer.parseInt(new String(byLength).trim());
- }
- }
-
- // Content-Length(兼容错误大小写)
- int iStartLength2 = doDataSearch(byAlarmDataInfo, szContentLenMark2.getBytes("UTF-8"), iBoundaryPos[i], iBoundaryPos[i + 1]);
- if (iStartLength2 > 0) {
- iStartLength2 += iLenMarkLen2;
- int iEndLength2 = doDataSearch(byAlarmDataInfo, szEndMark.getBytes("UTF-8"), iStartLength2, iBoundaryPos[i + 1]);
- if (iEndLength2 > 0) {
- byte[] byLength2 = new byte[iEndLength2 - iStartLength2];
- System.arraycopy(byAlarmDataInfo, iStartLength2, byLength2, 0, iEndLength2 - iStartLength2);
- iContentLen = Integer.parseInt(new String(byLength2).trim());
- }
- }
-
- // 通过\r\n\r\n判断报文数据起始位置
- int iStartData = doDataSearch(byAlarmDataInfo, szEndMark2.getBytes("UTF-8"), iBoundaryPos[i], iBoundaryPos[i + 1]);
- if (iStartData > 0) {
- iStartData += iMarkLen2;
-
- // 有的报文可能没有Content-Length
- if (iContentLen <= 0) {
- iContentLen = iBoundaryPos[i + 1] - iStartData;
- }
-
- // 截取数据内容
- byte[] byData = new byte[iContentLen];
- System.arraycopy(byAlarmDataInfo, iStartData, byData, 0, iContentLen);
-
- // 根据类型处理数据
- int contentType = ContentTypeEnum.getEventType(szContentType);
- String storeFolder = System.getProperty("user.dir") + "\\output\\listen\\event\\";
- switch (contentType) {
- case ContentTypeEnum.APPLICATION_JSON:
- case ContentTypeEnum.APPLICATION_XML: {
- String rawContent = new String(byData).trim();
- alarmDataParser.parseAlarmInfo(contentType, storeFolder, rawContent, null);
- break;
- }
- case ContentTypeEnum.IMAGE_JPEG:
- case ContentTypeEnum.IMAGE_PNG:
- case ContentTypeEnum.VIDEO_MPG:
- case ContentTypeEnum.VIDEO_MPEG4:
- case ContentTypeEnum.APPLICATION_ZIP: {
- alarmDataParser.parseAlarmInfo(contentType, storeFolder, null, byData);
- break;
- }
- default: {
- System.out.println("未匹配到可以解析的content-type, 请自行补全处理!");
- }
- }
- }
- }
- // 响应报文
- String response = "";
-
- // 消费交易事件 (实际如果没有消费机设备可以不需要消费机的处理代码)
- String eventType = "";
- String eventConfirm = "";
- if (eventType.equals("ConsumptionEvent") || eventType.equals("TransactionRecordEvent") || eventType.equals("HealthInfoSyncQuery")) {
- response = "HTTP/1.1 200 OK" +
- "\r\n" +
- "Content-Type: application/json; charset=\"UTF-8\"" +
- "\r\n" +
- "Content-Length: " + eventConfirm.length() +
- "\r\n\r\n" + eventConfirm +
- "\r\n";
- } else {
- response = "HTTP/1.1 200 OK" +
- "\r\n" +
- "Connection: close" +
- "\r\n\r\n";
- }
-
- return response;
- }
-
- private int doDataSearch(byte[] bySrcData, byte[] keyData, int startIndex, int endIndex) {
- if (bySrcData == null || keyData == null || bySrcData.length <= startIndex || bySrcData.length < keyData.length) {
- return -1;
- }
-
- if (endIndex > bySrcData.length) {
- endIndex = bySrcData.length;
- }
-
- int iPos, jIndex;
- for (iPos = startIndex; iPos < endIndex; iPos++) {
- if (bySrcData.length < keyData.length + iPos) {
- break;
- }
-
- for (jIndex = 0; jIndex < keyData.length; jIndex++) {
- if (bySrcData[iPos + jIndex] != keyData[jIndex]) {
- break;
- }
- }
- if (jIndex == keyData.length) {
- return iPos;
- }
- }
- return -1;
- }
-
-
- }



