更多ruoyi-nbcio功能请看演示系统
gitee源代码地址
前后端代码: https://gitee.com/nbacheng/ruoyi-nbcio
演示地址:RuoYi-Nbcio后台管理系统
1、重写websocketserver,如下,这里主要是用到了redis的订阅机制,具体也可以看相关文章:
- package com.ruoyi.common.websocket;
-
- import cn.hutool.json.JSONUtil;
- import lombok.extern.slf4j.Slf4j;
-
- import com.alibaba.fastjson.JSONObject;
- import com.ruoyi.common.constant.WebsocketConst;
- import com.ruoyi.common.core.domain.BaseProtocol;
- import com.ruoyi.common.redis.NbcioRedisClient;
-
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.stereotype.Component;
-
- import javax.annotation.Resource;
- import javax.websocket.*;
- import javax.websocket.server.PathParam;
- import javax.websocket.server.ServerEndpoint;
- import java.io.IOException;
- import java.util.HashMap;
- import java.util.Map;
- import java.util.concurrent.CopyOnWriteArraySet;
- import java.util.concurrent.atomic.AtomicInteger;
- import com.ruoyi.common.base.BaseMap;
-
- // @ServerEndpoint 声明并创建了webSocket端点, 并且指明了请求路径
- // id 为客户端请求时携带的参数, 用于服务端区分客户端使用
-
- /**
- * @ServerEndpoint 声明并创建了websocket端点, 并且指明了请求路径
- * userId 为客户端请求时携带的用户userId, 用于区分发给哪个用户的消息
- * @author nbacheng
- * @date 2023-09-20
- */
-
- @Component
- @Slf4j
- @ServerEndpoint("/websocket/{userId}") //此注解相当于设置访问URL
- public class WebSocketServer {
-
- private Session session;
-
- private String userId;
-
- private static final String REDIS_TOPIC_NAME = "socketHandler";
-
- @Resource
- private NbcioRedisClient nbcioRedisClient;
-
- /**
- * 缓存 webSocket连接到单机服务class中(整体方案支持集群)
- */
- private static CopyOnWriteArraySet
webSockets = new CopyOnWriteArraySet<>(); - private static Map
sessionPool = new HashMap(); -
-
- @OnOpen
- public void onOpen(Session session, @PathParam(value = "userId") String userId) {
- try {
- this.session = session;
- this.userId = userId;
- webSockets.add(this);
- sessionPool.put(userId, session);
- log.info("【websocket消息】有新的连接,总数为:" + webSockets.size());
- } catch (Exception e) {
- }
- }
-
- @OnClose
- public void onClose() {
- try {
- webSockets.remove(this);
- sessionPool.remove(this.userId);
- log.info("【websocket消息】连接断开,总数为:" + webSockets.size());
- } catch (Exception e) {
- }
- }
-
-
- /**
- * 服务端推送消息
- *
- * @param userId
- * @param message
- */
- public void pushMessage(String userId, String message) {
- Session session = sessionPool.get(userId);
- if (session != null && session.isOpen()) {
- try {
- synchronized (session){
- log.info("【websocket消息】 单点消息:" + message);
- session.getBasicRemote().sendText(message);
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
-
- /**
- * 服务器端推送消息
- */
- public void pushMessage(String message) {
- try {
- webSockets.forEach(ws -> ws.session.getAsyncRemote().sendText(message));
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-
-
- @OnMessage
- public void onMessage(String message) {
- //todo 现在有个定时任务刷,应该去掉
- log.debug("【websocket消息】收到客户端消息:" + message);
- JSONObject obj = new JSONObject();
- //业务类型
- obj.put(WebsocketConst.MSG_CMD, WebsocketConst.CMD_CHECK);
- //消息内容
- obj.put(WebsocketConst.MSG_TXT, "心跳响应");
- for (WebSocketServer webSocket : webSockets) {
- webSocket.pushMessage(message);
- }
- }
-
- /**
- * 后台发送消息到redis
- *
- * @param message
- */
- public void sendMessage(String message) {
- log.info("【websocket消息】广播消息:" + message);
- BaseMap baseMap = new BaseMap();
- baseMap.put("userId", "");
- baseMap.put("message", message);
- nbcioRedisClient.sendMessage(REDIS_TOPIC_NAME, baseMap);
- }
-
- /**
- * 此为单点消息
- *
- * @param userId
- * @param message
- */
- public void sendMessage(String userId, String message) {
- BaseMap baseMap = new BaseMap();
- baseMap.put("userId", userId);
- baseMap.put("message", message);
- nbcioRedisClient.sendMessage(REDIS_TOPIC_NAME, baseMap);
- }
-
- /**
- * 此为单点消息(多人)
- *
- * @param userIds
- * @param message
- */
- public void sendMessage(String[] userIds, String message) {
- for (String userId : userIds) {
- sendMessage(userId, message);
- }
- }
-
- }
2、RedisConfig增加监听器
- /**
- * redis 监听配置
- *
- * @param redisConnectionFactory redis 配置
- * @return
- */
- @Bean
- public RedisMessageListenerContainer redisContainer(RedisConnectionFactory redisConnectionFactory, RedisReceiver redisReceiver, MessageListenerAdapter commonListenerAdapter) {
- RedisMessageListenerContainer container = new RedisMessageListenerContainer();
- container.setConnectionFactory(redisConnectionFactory);
- container.addMessageListener(commonListenerAdapter, new ChannelTopic(GlobalConstants.REDIS_TOPIC_NAME));
- return container;
- }
-
-
- @Bean
- MessageListenerAdapter commonListenerAdapter(RedisReceiver redisReceiver) {
- MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter(redisReceiver, "onMessage");
- messageListenerAdapter.setSerializer(jacksonSerializer());
- return messageListenerAdapter;
- }
-
- private Jackson2JsonRedisSerializer jacksonSerializer() {
- Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
- ObjectMapper objectMapper = new ObjectMapper();
- objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
- objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
- jackson2JsonRedisSerializer.setObjectMapper(objectMapper);
- return jackson2JsonRedisSerializer;
- }
3、RedisReceiver 如下:
- package com.ruoyi.common.redis;
-
-
- import org.springframework.stereotype.Component;
-
- import com.ruoyi.common.base.BaseMap;
- import com.ruoyi.common.constant.GlobalConstants;
- import com.ruoyi.common.redis.listener.NbcioRedisListener;
- import com.ruoyi.common.utils.SpringContextHolder;
-
- import cn.hutool.core.util.ObjectUtil;
- import lombok.Data;
-
- /**
- * 接受消息并调用业务逻辑处理器
- * @author nbacheng
- * @date 2023-09-20
- */
- @Component
- @Data
- public class RedisReceiver {
-
- public void onMessage(BaseMap params) {
- Object handlerName = params.get(GlobalConstants.HANDLER_NAME);
- NbcioRedisListener messageListener = SpringContextHolder.getHandler(handlerName.toString(), NbcioRedisListener.class);
- if (ObjectUtil.isNotEmpty(messageListener)) {
- messageListener.onMessage(params);
- }
- }
-
- }