• Java中使用Map存储在线用户的集合(登录新增、退出移除)-SpringBoot中集成websocket示例


    场景

    在做服务端与客户端进行通讯时,比如websocket通讯与IM通讯等,需要自定义用户连接/登录时存储用户

    会话信息,断开连接/退出时移除用户会话信息,获取所以当前用户,群发消息等功能。

    所以需要来对用户/客户端信息进行存储。

    若依前后端分离版本地搭建开发环境并运行项目的教程:

    若依前后端分离版手把手教你本地搭建环境并运行项目_霸道流氓气质的博客-CSDN博客_前后端分离项目本地运行

    在上面搭建起来前后端分离的架构基础上,比如集成websocket做一个聊天室功能,需要获取并存取所有的

    连接用户的数据,就需要在连接打开时存储用户,连接关闭时移除用户。

    注:

    博客:
    霸道流氓气质的博客_CSDN博客-C#,架构之路,SpringBoot领域博主
    关注公众号
    霸道的程序猿
    获取编程相关电子书、教程推送与免费下载。

    实现

    1、若依中集成websocket实现获取所有用户

    可参考官方文档-插件集成

    插件集成 | RuoYi

    添加websocket依赖

    1. <!-- SpringBoot Websocket -->
    2. <dependency> 
    3.    <groupId>org.springframework.boot</groupId> 
    4.    <artifactId>spring-boot-starter-websocket</artifactId> 
    5. </dependency>

    其他参考官方说明,这里可以借助其示例代码中存储所有用户信息的实现思路

    新建类WebSocketUsers用来存储所有客户端用户集合

    1. package com.chrisf.websocket;
    2. import java.io.IOException;
    3. import java.util.Collection;
    4. import java.util.Map;
    5. import java.util.Set;
    6. import java.util.concurrent.ConcurrentHashMap;
    7. import javax.websocket.Session;
    8. import org.slf4j.Logger;
    9. import org.slf4j.LoggerFactory;
    10. /**
    11.  * websocket 客户端用户集
    12.  *
    13.  * @author ruoyi
    14.  */
    15. public class WebSocketUsers
    16. {
    17.     /**
    18.      * WebSocketUsers 日志控制器
    19.      */
    20.     private static final Logger LOGGER = LoggerFactory.getLogger(WebSocketUsers.class);
    21.     /**
    22.      * 用户集
    23.      */
    24.     private static Map<String, Session> USERS = new ConcurrentHashMap<String, Session>();
    25.     /**
    26.      * 存储用户
    27.      *
    28.      * @param key 唯一键
    29.      * @param session 用户信息
    30.      */
    31.     public static void put(String key, Session session)
    32.     {
    33.         USERS.put(key, session);
    34.     }
    35.     /**
    36.      * 移除用户
    37.      *
    38.      * @param session 用户信息
    39.      *
    40.      * @return 移除结果
    41.      */
    42.     public static boolean remove(Session session)
    43.     {
    44.         String key = null;
    45.         boolean flag = USERS.containsValue(session);
    46.         if (flag)
    47.         {
    48.             Set<Map.Entry<String, Session>> entries = USERS.entrySet();
    49.             for (Map.Entry<String, Session> entry : entries)
    50.             {
    51.                 Session value = entry.getValue();
    52.                 if (value.equals(session))
    53.                 {
    54.                     key = entry.getKey();
    55.                     break;
    56.                 }
    57.             }
    58.         }
    59.         else
    60.         {
    61.             return true;
    62.         }
    63.         return remove(key);
    64.     }
    65.     /**
    66.      * 移出用户
    67.      *
    68.      * @param key
    69.      */
    70.     public static boolean remove(String key)
    71.     {
    72.         LOGGER.info("\n 正在移出用户 - {}", key);
    73.         Session remove = USERS.remove(key);
    74.         if (remove != null)
    75.         {
    76.             boolean containsValue = USERS.containsValue(remove);
    77.             LOGGER.info("\n 移出结果 - {}", containsValue ? "失败" : "成功");
    78.             return containsValue;
    79.         }
    80.         else
    81.         {
    82.             return true;
    83.         }
    84.     }
    85.     /**
    86.      * 获取在线用户列表
    87.      *
    88.      * @return 返回用户集合
    89.      */
    90.     public static Map<String, Session> getUsers()
    91.     {
    92.         return USERS;
    93.     }
    94.     /**
    95.      * 群发消息文本消息
    96.      *
    97.      * @param message 消息内容
    98.      */
    99.     public static void sendMessageToUsersByText(String message)
    100.     {
    101.         Collection<Session> values = USERS.values();
    102.         for (Session value : values)
    103.         {
    104.             sendMessageToUserByText(value, message);
    105.         }
    106.     }
    107.     /**
    108.      * 发送文本消息
    109.      *
    110.      * @param userName 自己的用户名
    111.      * @param message 消息内容
    112.      */
    113.     public static void sendMessageToUserByText(Session session, String message)
    114.     {
    115.         if (session != null)
    116.         {
    117.             try
    118.             {
    119.                 session.getBasicRemote().sendText(message);
    120.             }
    121.             catch (IOException e)
    122.             {
    123.                 LOGGER.error("\n[发送消息异常]", e);
    124.             }
    125.         }
    126.         else
    127.         {
    128.             LOGGER.info("\n[你已离线]");
    129.         }
    130.     }
    131. }

    可以看到这里用的

    private static Map<String, Session> USERS = new ConcurrentHashMap<String, Session>();

    来存取所有的用户会话数据。

    然后在连接建立成功的回调中添加用户

    WebSocketUsers.put(session.getId(), session);

    在连接关闭和抛出异常时删除用户

    WebSocketUsers.remove(sessionId);

    完整的WebsocketServer代码

    1. package com.chrisf.websocket;
    2. import java.util.concurrent.Semaphore;
    3. import javax.websocket.OnClose;
    4. import javax.websocket.OnError;
    5. import javax.websocket.OnMessage;
    6. import javax.websocket.OnOpen;
    7. import javax.websocket.Session;
    8. import javax.websocket.server.ServerEndpoint;
    9. import org.slf4j.Logger;
    10. import org.slf4j.LoggerFactory;
    11. import org.springframework.stereotype.Component;
    12. /**
    13.  * websocket 消息处理
    14.  *
    15.  * @author ruoyi
    16.  */
    17. @Component
    18. @ServerEndpoint("/chatroom/websocket/message")
    19. public class WebSocketServer
    20. {
    21.     /**
    22.      * WebSocketServer 日志控制器
    23.      */
    24.     private static final Logger LOGGER = LoggerFactory.getLogger(WebSocketServer.class);
    25.     /**
    26.      * 默认最多允许同时在线人数100
    27.      */
    28.     public static int socketMaxOnlineCount = 100;
    29.     private static Semaphore socketSemaphore = new Semaphore(socketMaxOnlineCount);
    30.     /**
    31.      * 连接建立成功调用的方法
    32.      */
    33.     @OnOpen
    34.     public void onOpen(Session session) throws Exception
    35.     {
    36.         boolean semaphoreFlag = false;
    37.         // 尝试获取信号量
    38.         semaphoreFlag = SemaphoreUtils.tryAcquire(socketSemaphore);
    39.         if (!semaphoreFlag)
    40.         {
    41.             // 未获取到信号量
    42.             LOGGER.error("\n 当前在线人数超过限制数- {}", socketMaxOnlineCount);
    43.             WebSocketUsers.sendMessageToUserByText(session, "当前在线人数超过限制数:" + socketMaxOnlineCount);
    44.             session.close();
    45.         }
    46.         else
    47.         {
    48.             // 添加用户
    49.             WebSocketUsers.put(session.getId(), session);
    50.             LOGGER.info("\n 建立连接 - {}", session);
    51.             LOGGER.info("\n 当前人数 - {}", WebSocketUsers.getUsers().size());
    52.             WebSocketUsers.sendMessageToUserByText(session, "连接成功");
    53.         }
    54.     }
    55.     /**
    56.      * 连接关闭时处理
    57.      */
    58.     @OnClose
    59.     public void onClose(Session session)
    60.     {
    61.         LOGGER.info("\n 关闭连接 - {}", session);
    62.         // 移除用户
    63.         WebSocketUsers.remove(session.getId());
    64.         // 获取到信号量则需释放
    65.         SemaphoreUtils.release(socketSemaphore);
    66.     }
    67.     /**
    68.      * 抛出异常时处理
    69.      */
    70.     @OnError
    71.     public void onError(Session session, Throwable exception) throws Exception
    72.     {
    73.         if (session.isOpen())
    74.         {
    75.             // 关闭连接
    76.             session.close();
    77.         }
    78.         String sessionId = session.getId();
    79.         LOGGER.info("\n 连接异常 - {}", sessionId);
    80.         LOGGER.info("\n 异常信息 - {}", exception);
    81.         // 移出用户
    82.         WebSocketUsers.remove(sessionId);
    83.         // 获取到信号量则需释放
    84.         SemaphoreUtils.release(socketSemaphore);
    85.     }
    86.     /**
    87.      * 服务器接收到客户端消息时调用的方法
    88.      */
    89.     @OnMessage
    90.     public void onMessage(String message, Session session)
    91.     {
    92.         String msg = message.replace("你", "我").replace("吗", "");
    93.         WebSocketUsers.sendMessageToUserByText(session, msg);
    94.     }
    95. }

    其他代码

    WebsocketConfig

    1. package com.chrisf.websocket;
    2. import org.springframework.context.annotation.Bean;
    3. import org.springframework.context.annotation.Configuration;
    4. import org.springframework.web.socket.server.standard.ServerEndpointExporter;
    5. /**
    6.  * websocket 配置
    7.  *
    8.  * @author ruoyi
    9.  */
    10. @Configuration
    11. public class WebSocketConfig
    12. {
    13.     @Bean
    14.     public ServerEndpointExporter serverEndpointExporter()
    15.     {
    16.         return new ServerEndpointExporter();
    17.     }
    18. }

    SemaphoreUtils

    1. package com.chrisf.websocket;
    2. import java.util.concurrent.Semaphore;
    3. import org.slf4j.Logger;
    4. import org.slf4j.LoggerFactory;
    5. /**
    6.  * 信号量相关处理
    7.  *
    8.  * @author ruoyi
    9.  */
    10. public class SemaphoreUtils
    11. {
    12.     /**
    13.      * SemaphoreUtils 日志控制器
    14.      */
    15.     private static final Logger LOGGER = LoggerFactory.getLogger(SemaphoreUtils.class);
    16.     /**
    17.      * 获取信号量
    18.      *
    19.      * @param semaphore
    20.      * @return
    21.      */
    22.     public static boolean tryAcquire(Semaphore semaphore)
    23.     {
    24.         boolean flag = false;
    25.         try
    26.         {
    27.             flag = semaphore.tryAcquire();
    28.         }
    29.         catch (Exception e)
    30.         {
    31.             LOGGER.error("获取信号量异常", e);
    32.         }
    33.         return flag;
    34.     }
    35.     /**
    36.      * 释放信号量
    37.      *
    38.      * @param semaphore
    39.      */
    40.     public static void release(Semaphore semaphore)
    41.     {
    42.         try
    43.         {
    44.             semaphore.release();
    45.         }
    46.         catch (Exception e)
    47.         {
    48.             LOGGER.error("释放信号量异常", e);
    49.         }
    50.     }
    51. }

    2、这里的群发消息直接采用的循环所有的用户发送消息,如果用户量较大并且群发消息的频率较高

    可以修改群发消息的逻辑集成自定义线程池。

    SpringBoot集成websocket通过自定义线程池实现高频率、多用户下的群发消息。

    修改上面的WebSocketUsers类

    1. package com.ruoyi.websocket.websocketConfig;
    2. import cn.hutool.core.thread.ExecutorBuilder;
    3. import cn.hutool.core.thread.ThreadFactoryBuilder;
    4. import org.slf4j.Logger;
    5. import org.slf4j.LoggerFactory;
    6. import javax.websocket.Session;
    7. import java.io.IOException;
    8. import java.util.Collection;
    9. import java.util.Map;
    10. import java.util.Set;
    11. import java.util.concurrent.ConcurrentHashMap;
    12. import java.util.concurrent.ExecutorService;
    13. import java.util.concurrent.LinkedBlockingQueue;
    14. /**
    15.  * websocket 客户端用户集
    16.  *
    17.  * @author ruoyi
    18.  */
    19. public class WebSocketUsers {
    20.     /**
    21.      * WebSocketUsers 日志控制器
    22.      */
    23.     private static final Logger LOGGER = LoggerFactory.getLogger(WebSocketUsers.class);
    24.     private static ExecutorService pool = ExecutorBuilder.create()
    25.             .setCorePoolSize(WebSocketServer.socketMaxOnlineCount)
    26.             .setMaxPoolSize(WebSocketServer.socketMaxOnlineCount * 2)
    27.             .setWorkQueue(new LinkedBlockingQueue<>(WebSocketServer.socketMaxOnlineCount * 3))
    28.             .setThreadFactory(ThreadFactoryBuilder.create().setNamePrefix("Websocket-Pool-").build())
    29.             .build();
    30.     /**
    31.      * 用户集
    32.      */
    33.     private static Map<String, Session> USERS = new ConcurrentHashMap<String, Session>();
    34.     /**
    35.      * 存储用户
    36.      *
    37.      * @param key     唯一键
    38.      * @param session 用户信息
    39.      */
    40.     public static void put(String key, Session session) {
    41.         USERS.put(key, session);
    42.     }
    43.     /**
    44.      * 移除用户
    45.      *
    46.      * @param session 用户信息
    47.      * @return 移除结果
    48.      */
    49.     public static boolean remove(Session session) {
    50.         String key = null;
    51.         boolean flag = USERS.containsValue(session);
    52.         if (flag) {
    53.             Set<Map.Entry<String, Session>> entries = USERS.entrySet();
    54.             for (Map.Entry<String, Session> entry : entries) {
    55.                 Session value = entry.getValue();
    56.                 if (value.equals(session)) {
    57.                     key = entry.getKey();
    58.                     break;
    59.                 }
    60.             }
    61.         } else {
    62.             return true;
    63.         }
    64.         return remove(key);
    65.     }
    66.     /**
    67.      * 移出用户
    68.      *
    69.      * @param key
    70.      */
    71.     public static boolean remove(String key) {
    72.         LOGGER.info("\n 正在移出用户 - {}", key);
    73.         Session remove = USERS.remove(key);
    74.         if (remove != null) {
    75.             boolean containsValue = USERS.containsValue(remove);
    76.             LOGGER.info("\n 移出结果 - {}", containsValue ? "失败" : "成功");
    77.             return containsValue;
    78.         } else {
    79.             return true;
    80.         }
    81.     }
    82.     /**
    83.      * 获取在线用户列表
    84.      *
    85.      * @return 返回用户集合
    86.      */
    87.     public static Map<String, Session> getUsers() {
    88.         return USERS;
    89.     }
    90.     /**
    91.      * 群发消息文本消息
    92.      *
    93.      * @param message 消息内容
    94.      */
    95.     public static void sendMessageToUsersByText(String message) {
    96.         Collection<Session> values = USERS.values();
    97.         for (Session value : values) {
    98.             pool.submit(new Runnable() {
    99.                 @Override
    100.                 public void run() {
    101.                     sendMessageToUserByText(value, message);
    102.                 }
    103.             });
    104.         }
    105.     }
    106.     /**
    107.      * 发送文本消息
    108.      *
    109.      * @param userName 自己的用户名
    110.      * @param message  消息内容
    111.      */
    112.     public static void sendMessageToUserByText(Session session, String message) {
    113.         if (session != null) {
    114.             try {
    115.                 synchronized (session) {
    116.                     session.getBasicRemote().sendText(message);
    117.                 }
    118.             } catch (IOException e) {
    119.                 e.printStackTrace();
    120.             }
    121.         } else {
    122.             LOGGER.info("\n[你已离线]");
    123.         }
    124.     }
    125. }

    这里需要用到Hutool的ExecutorBuilder,所以需要引入项目依赖

    1.         <dependency>
    2.             <groupId>cn.hutool</groupId>
    3.             <artifactId>hutool-all</artifactId>
    4.             <version>5.8.3</version>
    5.         </dependency>

    这块具体的说明可以参考

    Java中使用Hutool的ExecutorBuilder实现自定义线程池:

    Java中使用Hutool的ExecutorBuilder实现自定义线程池_霸道流氓气质的博客-CSDN博客

    3、以上是集成websocket协议并存储所有的用户,其他类似的比如集成IM的也可以修改后复用

    若依(基于SpringBoot的权限管理系统)集成MobileIMSDK实现IM服务端的搭建:

    若依(基于SpringBoot的权限管理系统)集成MobileIMSDK实现IM服务端的搭建_霸道流氓气质的博客-CSDN博客_mobileimsdk

    比如在上面的基础上实现在IM中存储所有用户的信息并实现群发功能。

    新建IMUsers存储所有的im用户

    1. package com.chrisf.imextend;
    2. import cn.hutool.core.thread.ExecutorBuilder;
    3. import cn.hutool.core.thread.ThreadFactoryBuilder;
    4. import com.chrisf.sdk.protocal.Protocal;
    5. import com.chrisf.sdk.utils.LocalSendHelper;
    6. import io.netty.channel.Channel;
    7. import java.util.Collection;
    8. import java.util.Map;
    9. import java.util.Set;
    10. import java.util.concurrent.ConcurrentHashMap;
    11. import java.util.concurrent.ExecutorService;
    12. import java.util.concurrent.LinkedBlockingQueue;
    13. /**
    14.  * im 客户端用户集
    15.  *
    16.  * @author badao
    17.  */
    18. public class ImUsers {
    19.     private static ExecutorService pool = ExecutorBuilder.create()
    20.             .setCorePoolSize(20)//初始20个线程
    21.             .setMaxPoolSize(40)//最大40个线程
    22.             .setWorkQueue(new LinkedBlockingQueue<>(60))//有界等待队列,最大等待数是60
    23.             .setThreadFactory(ThreadFactoryBuilder.create().setNamePrefix("IM-Pool-").build())//设置线程前缀
    24.             .build();
    25.     /**
    26.      * 用户集
    27.      */
    28.     private static Map<String, Channel> USERS = new ConcurrentHashMap<String, Channel>();
    29.     /**
    30.      * 存储用户
    31.      *
    32.      * @param key     唯一键
    33.      * @param session 用户信息
    34.      */
    35.     public static void put(String key, Channel session) {
    36.         USERS.put(key, session);
    37.     }
    38.     /**
    39.      * 移除用户
    40.      *
    41.      * @param session 用户信息
    42.      * @return 移除结果
    43.      */
    44.     public static boolean remove(Channel session) {
    45.         String key = null;
    46.         boolean flag = USERS.containsValue(session);
    47.         if (flag) {
    48.             Set<Map.Entry<String, Channel>> entries = USERS.entrySet();
    49.             for (Map.Entry<String, Channel> entry : entries) {
    50.                 Channel value = entry.getValue();
    51.                 if (value.equals(session)) {
    52.                     key = entry.getKey();
    53.                     break;
    54.                 }
    55.             }
    56.         } else {
    57.             return true;
    58.         }
    59.         return remove(key);
    60.     }
    61.     /**
    62.      * 移出用户
    63.      *
    64.      * @param key
    65.      */
    66.     public static boolean remove(String key) {
    67.         Channel remove = USERS.remove(key);
    68.         if (remove != null) {
    69.             boolean containsValue = USERS.containsValue(remove);
    70.             return containsValue;
    71.         } else {
    72.             return true;
    73.         }
    74.     }
    75.     /**
    76.      * 获取在线用户列表
    77.      *
    78.      * @return 返回用户集合
    79.      */
    80.     public static Map<String, Channel> getUsers() {
    81.         return USERS;
    82.     }
    83.     /**
    84.      * 群发消息文本消息
    85.      *
    86.      * @param protocal 消息内容
    87.      */
    88.     public static void sendMessageToUsersByText(Protocal protocal) {
    89.         Collection<Channel> values = USERS.values();
    90.         for (Channel value : values) {
    91.             pool.submit(() -> sendMessageToUserByText(value, protocal));
    92.         }
    93.     }
    94.     /**
    95.      * 发送消息
    96.      *
    97.      * @param session
    98.      * @param protocal
    99.      */
    100.     public static void sendMessageToUserByText(Channel session, Protocal protocal) {
    101.         if (session != null) {
    102.             synchronized (session) {
    103.                 try {
    104.                     LocalSendHelper.sendData(session,protocal,null);
    105.                 } catch (Exception e) {
    106.                     e.printStackTrace();
    107.                 }
    108.             }
    109.         } else {
    110.         }
    111.     }
    112. }

    在登录回调中新增用户

    1.  @Override
    2.  public void onUserLoginSucess(String userId, String extra, Channel session)
    3.  {
    4.   //添加用户
    5.   ImUsers.put(userId,session);
    6.   logger.debug("【IM_回调通知OnUserLoginAction_CallBack】用户:"+userId+" 上线了!");
    7.  }

    在登出回调中移除用户

    1.  @Override
    2.  public void onUserLogout(String userId, Object obj, Channel session)
    3.  {
    4.   //移除用户
    5.   ImUsers.remove(userId);
    6.   logger.debug("【DEBUG_回调通知OnUserLogoutAction_CallBack】用户:"+userId+" 离线了!");
    7.  }

    在需要群发时群发消息

    1.   if(ImTypeUEnum.CHAT_ROOM.getCode() == typeu)
    2.   {
    3.    ImUsers.sendMessageToUsersByText(p);
    4.   }

  • 相关阅读:
    【C语言】栈和队列的相互实现
    c++ - 第10节 - list类
    java php nodejs python旅游网站设计与开发需求分析Springboot SpringcloudVue汇总一览
    当npm下载库失败时可以用cnpm替代
    stata做莫兰指数,结果不显示是怎么回事,求解答
    百日筑基第五天-关于maven
    利用css var函数让你的组件样式输出规范样式API,可定制性更高;
    机器学习__02__机器学习工程实践
    【毕业设计】基于深度学习的植物识别算法 - cnn opencv python
    pnpm和yarn2 pnp的对比
  • 原文地址:https://blog.csdn.net/BADAO_LIUMANG_QIZHI/article/details/126992039