• 基于若依ruoyi-nbcio增加flowable流程待办消息的提醒,并提供右上角的红字数字提醒(二)


    更多ruoyi-nbcio功能请看演示系统

    gitee源代码地址

    前后端代码: https://gitee.com/nbacheng/ruoyi-nbcio

    演示地址:RuoYi-Nbcio后台管理系统

    1、重写websocketserver,如下,这里主要是用到了redis的订阅机制,具体也可以看相关文章:

    1. package com.ruoyi.common.websocket;
    2. import cn.hutool.json.JSONUtil;
    3. import lombok.extern.slf4j.Slf4j;
    4. import com.alibaba.fastjson.JSONObject;
    5. import com.ruoyi.common.constant.WebsocketConst;
    6. import com.ruoyi.common.core.domain.BaseProtocol;
    7. import com.ruoyi.common.redis.NbcioRedisClient;
    8. import org.slf4j.Logger;
    9. import org.slf4j.LoggerFactory;
    10. import org.springframework.stereotype.Component;
    11. import javax.annotation.Resource;
    12. import javax.websocket.*;
    13. import javax.websocket.server.PathParam;
    14. import javax.websocket.server.ServerEndpoint;
    15. import java.io.IOException;
    16. import java.util.HashMap;
    17. import java.util.Map;
    18. import java.util.concurrent.CopyOnWriteArraySet;
    19. import java.util.concurrent.atomic.AtomicInteger;
    20. import com.ruoyi.common.base.BaseMap;
    21. // @ServerEndpoint 声明并创建了webSocket端点, 并且指明了请求路径
    22. // id 为客户端请求时携带的参数, 用于服务端区分客户端使用
    23. /**
    24. * @ServerEndpoint 声明并创建了websocket端点, 并且指明了请求路径
    25. * userId 为客户端请求时携带的用户userId, 用于区分发给哪个用户的消息
    26. * @author nbacheng
    27. * @date 2023-09-20
    28. */
    29. @Component
    30. @Slf4j
    31. @ServerEndpoint("/websocket/{userId}") //此注解相当于设置访问URL
    32. public class WebSocketServer {
    33. private Session session;
    34. private String userId;
    35. private static final String REDIS_TOPIC_NAME = "socketHandler";
    36. @Resource
    37. private NbcioRedisClient nbcioRedisClient;
    38. /**
    39. * 缓存 webSocket连接到单机服务class中(整体方案支持集群)
    40. */
    41. private static CopyOnWriteArraySet webSockets = new CopyOnWriteArraySet<>();
    42. private static Map sessionPool = new HashMap();
    43. @OnOpen
    44. public void onOpen(Session session, @PathParam(value = "userId") String userId) {
    45. try {
    46. this.session = session;
    47. this.userId = userId;
    48. webSockets.add(this);
    49. sessionPool.put(userId, session);
    50. log.info("【websocket消息】有新的连接,总数为:" + webSockets.size());
    51. } catch (Exception e) {
    52. }
    53. }
    54. @OnClose
    55. public void onClose() {
    56. try {
    57. webSockets.remove(this);
    58. sessionPool.remove(this.userId);
    59. log.info("【websocket消息】连接断开,总数为:" + webSockets.size());
    60. } catch (Exception e) {
    61. }
    62. }
    63. /**
    64. * 服务端推送消息
    65. *
    66. * @param userId
    67. * @param message
    68. */
    69. public void pushMessage(String userId, String message) {
    70. Session session = sessionPool.get(userId);
    71. if (session != null && session.isOpen()) {
    72. try {
    73. synchronized (session){
    74. log.info("【websocket消息】 单点消息:" + message);
    75. session.getBasicRemote().sendText(message);
    76. }
    77. } catch (Exception e) {
    78. e.printStackTrace();
    79. }
    80. }
    81. }
    82. /**
    83. * 服务器端推送消息
    84. */
    85. public void pushMessage(String message) {
    86. try {
    87. webSockets.forEach(ws -> ws.session.getAsyncRemote().sendText(message));
    88. } catch (Exception e) {
    89. e.printStackTrace();
    90. }
    91. }
    92. @OnMessage
    93. public void onMessage(String message) {
    94. //todo 现在有个定时任务刷,应该去掉
    95. log.debug("【websocket消息】收到客户端消息:" + message);
    96. JSONObject obj = new JSONObject();
    97. //业务类型
    98. obj.put(WebsocketConst.MSG_CMD, WebsocketConst.CMD_CHECK);
    99. //消息内容
    100. obj.put(WebsocketConst.MSG_TXT, "心跳响应");
    101. for (WebSocketServer webSocket : webSockets) {
    102. webSocket.pushMessage(message);
    103. }
    104. }
    105. /**
    106. * 后台发送消息到redis
    107. *
    108. * @param message
    109. */
    110. public void sendMessage(String message) {
    111. log.info("【websocket消息】广播消息:" + message);
    112. BaseMap baseMap = new BaseMap();
    113. baseMap.put("userId", "");
    114. baseMap.put("message", message);
    115. nbcioRedisClient.sendMessage(REDIS_TOPIC_NAME, baseMap);
    116. }
    117. /**
    118. * 此为单点消息
    119. *
    120. * @param userId
    121. * @param message
    122. */
    123. public void sendMessage(String userId, String message) {
    124. BaseMap baseMap = new BaseMap();
    125. baseMap.put("userId", userId);
    126. baseMap.put("message", message);
    127. nbcioRedisClient.sendMessage(REDIS_TOPIC_NAME, baseMap);
    128. }
    129. /**
    130. * 此为单点消息(多人)
    131. *
    132. * @param userIds
    133. * @param message
    134. */
    135. public void sendMessage(String[] userIds, String message) {
    136. for (String userId : userIds) {
    137. sendMessage(userId, message);
    138. }
    139. }
    140. }

    2、RedisConfig增加监听器

    1. /**
    2. * redis 监听配置
    3. *
    4. * @param redisConnectionFactory redis 配置
    5. * @return
    6. */
    7. @Bean
    8. public RedisMessageListenerContainer redisContainer(RedisConnectionFactory redisConnectionFactory, RedisReceiver redisReceiver, MessageListenerAdapter commonListenerAdapter) {
    9. RedisMessageListenerContainer container = new RedisMessageListenerContainer();
    10. container.setConnectionFactory(redisConnectionFactory);
    11. container.addMessageListener(commonListenerAdapter, new ChannelTopic(GlobalConstants.REDIS_TOPIC_NAME));
    12. return container;
    13. }
    14. @Bean
    15. MessageListenerAdapter commonListenerAdapter(RedisReceiver redisReceiver) {
    16. MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter(redisReceiver, "onMessage");
    17. messageListenerAdapter.setSerializer(jacksonSerializer());
    18. return messageListenerAdapter;
    19. }
    20. private Jackson2JsonRedisSerializer jacksonSerializer() {
    21. Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
    22. ObjectMapper objectMapper = new ObjectMapper();
    23. objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
    24. objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
    25. jackson2JsonRedisSerializer.setObjectMapper(objectMapper);
    26. return jackson2JsonRedisSerializer;
    27. }

    3、RedisReceiver 如下:

    1. package com.ruoyi.common.redis;
    2. import org.springframework.stereotype.Component;
    3. import com.ruoyi.common.base.BaseMap;
    4. import com.ruoyi.common.constant.GlobalConstants;
    5. import com.ruoyi.common.redis.listener.NbcioRedisListener;
    6. import com.ruoyi.common.utils.SpringContextHolder;
    7. import cn.hutool.core.util.ObjectUtil;
    8. import lombok.Data;
    9. /**
    10. * 接受消息并调用业务逻辑处理器
    11. * @author nbacheng
    12. * @date 2023-09-20
    13. */
    14. @Component
    15. @Data
    16. public class RedisReceiver {
    17. public void onMessage(BaseMap params) {
    18. Object handlerName = params.get(GlobalConstants.HANDLER_NAME);
    19. NbcioRedisListener messageListener = SpringContextHolder.getHandler(handlerName.toString(), NbcioRedisListener.class);
    20. if (ObjectUtil.isNotEmpty(messageListener)) {
    21. messageListener.onMessage(params);
    22. }
    23. }
    24. }

  • 相关阅读:
    深度学习环境配置9——Ubuntu下的tensorflow-gpu==2.4.0环境配置
    git学习笔记 - 下载gitlab项目
    python练习5
    [量化投资-学习笔记001]Python+TDengine从零开始搭建量化分析平台-数据存储
    ARP欺骗
    如果你是Java程序员,你会选择Cloud Studio进行云端开发,放弃IDEA吗?
    (仿牛客社区项目)Java开发笔记7.9:优化网站的性能
    GO微服务实战第二十九节 如何追踪分布式系统调用链路的问题?
    Java Class反射
    机器视觉光源案例锦集(一)
  • 原文地址:https://blog.csdn.net/qq_40032778/article/details/133188656