• Spring Cloud + Nacos 集成Netty Socket.IO


            项目需要集成实时消息通讯,所以尝试在项目中集成websocket。技术上选择了Socket.io,前/后端统一使用此开源项目来实现需求。

    一、版本

    spring cloud:  2022.0.4

    注册中心: nacos 

    Netty-Socket.io : 2.0.9

    1. <dependency>
    2. <groupId>com.corundumstudio.socketiogroupId>
    3. <artifactId>netty-socketioartifactId>
    4. <version>${netty-socketio.version}version>
    5. dependency>

    前端:vue3、socket.io-client【 4.7.4】

    二、关键代码

    socket event handler

    1. @Component
    2. @Slf4j
    3. public class NettySocketEventHandler {
    4. @Autowired
    5. private SocketIOServer socketIoServer;
    6. @Autowired
    7. private SocketClientService socketClientService;
    8. @Value("${socketio.application.name}")
    9. private String serverName;
    10. @Value("${socketio.reg-server}")
    11. private String host;
    12. @Autowired
    13. private NacosDiscoveryProperties nacosDiscoveryProperties;
    14. private void start() throws Exception {
    15. //注册到Nacos里
    16. registerNamingService(serverName, String.valueOf(socketIoServer.getConfiguration().getPort()));
    17. }
    18. /**
    19. * 注册到 nacos 服务中
    20. *
    21. * @param nettyName netty服务名称
    22. * @param nettyPort netty服务端口
    23. */
    24. private void registerNamingService(String nettyName, String nettyPort) {
    25. try {
    26. log.info("-------------- register socket server {} {}", nettyName, nettyPort);
    27. NamingService namingService = NamingFactory.createNamingService(nacosDiscoveryProperties.getServerAddr());
    28. // 注册到nacos
    29. Instance instance = new Instance();
    30. instance.setIp(host);
    31. instance.setPort(socketIoServer.getConfiguration().getPort());
    32. instance.setServiceName(nettyName);
    33. instance.setWeight(1.0);
    34. Map map = new HashMap<>();
    35. map.put("preserved.register.source", "SPRING_CLOUD");
    36. instance.setMetadata(map);
    37. namingService.registerInstance(nettyName, instance);
    38. } catch (Exception e) {
    39. throw new RuntimeException(e);
    40. }
    41. }
    42. @PostConstruct
    43. private void autoStartup() {
    44. try {
    45. socketIoServer.start();
    46. start();
    47. log.info("-------------- start socket server ----------");
    48. } catch (Exception ex) {
    49. log.error("SocketIOServer启动失败", ex);
    50. }
    51. }
    52. @PreDestroy
    53. private void autoStop() {
    54. socketIoServer.stop();
    55. }
    56. //socket事件消息接收入口
    57. @OnEvent(value = MessageConstant.SOCKET_EVENT_NAME) //value值与前端自行商定
    58. public void onEvent(SocketIOClient client, AckRequest ackRequest, SendMessageDTO data) {
    59. // client.sendEvent("message_event", "已成功接收数据"); //向前端发送接收数据成功标识
    60. log.info("socket event {}", JSON.toJSONString(data));
    61. }
    62. //socket添加@OnDisconnect事件,客户端断开连接时调用,刷新客户端信息
    63. @OnDisconnect
    64. public void onDisconnect(SocketIOClient client) {
    65. String userId = client.getHandshakeData().getSingleUrlParam("userId");
    66. UUID sessionId = client.getSessionId();
    67. log.info("socket Disconnect {} {}", userId, sessionId);
    68. socketClientService.deleteSessionClientByUserId(userId, sessionId);
    69. log.info("socket Disconnect {} {}", userId, sessionId);
    70. client.disconnect();
    71. }
    72. //socket添加connect事件,当客户端发起连接时调用
    73. @OnConnect
    74. public void onConnect(SocketIOClient client) {
    75. // log.info("socket onConnect {}", JSON.toJSONString(client));
    76. if (client != null) {
    77. HandshakeData client_mac = client.getHandshakeData();
    78. String userId = client_mac.getSingleUrlParam("userId");
    79. // 处理业务
    80. } else {
    81. log.error("客户端为空");
    82. }
    83. }
    84. }

    socket client service

    1. @Component
    2. public class SocketClientService {
    3. private static ConcurrentHashMap> concurrentHashMap = new ConcurrentHashMap<>();
    4. /**
    5. * 保存客户端实例,发送消息时使用
    6. *
    7. * @param userId 用户ID
    8. * @param sessionId 用户连接的session,可能存在多个页面连接
    9. * @param socketIOClient 客户的实例
    10. */
    11. public void saveClient(String userId, UUID sessionId, SocketIOClient socketIOClient) {
    12. HashMap sessionIdClientCache = concurrentHashMap.get(userId);
    13. if (sessionIdClientCache == null) {
    14. sessionIdClientCache = new HashMap<>();
    15. }
    16. sessionIdClientCache.put(sessionId, socketIOClient);
    17. concurrentHashMap.put(userId, sessionIdClientCache);
    18. }
    19. /**
    20. * 获取用户的客户端实例
    21. *
    22. * @param userId 用户的ID
    23. * @return HashMap
    24. */
    25. public HashMap getUserClient(String userId) {
    26. return concurrentHashMap.get(userId);
    27. }
    28. /**
    29. * 获取所有客户端,不区分用户
    30. *
    31. * @return 集合
    32. */
    33. public Collection> getAllClient() {
    34. return concurrentHashMap.values();
    35. }
    36. /**
    37. * 删除用户的某个页面的连接
    38. *
    39. * @param userId 用户ID
    40. * @param sessionId 页面的sessionID
    41. */
    42. public void deleteSessionClientByUserId(String userId, UUID sessionId) {
    43. if(concurrentHashMap.get(userId) != null){
    44. concurrentHashMap.get(userId).remove(sessionId);
    45. }
    46. }
    47. /**
    48. * 删除用户的所有连接的实例
    49. *
    50. * @param userId 用户的ID
    51. */
    52. public void deleteUserCacheByUserId(String userId) {
    53. concurrentHashMap.remove(userId);
    54. }
    55. }

    socket config

    1. @Data
    2. @Configuration
    3. @ConfigurationProperties(prefix = "socketio")
    4. public class SocketIOConfig {
    5. private String host;
    6. private Integer port;
    7. private int bossCount;
    8. private int workCount;
    9. private boolean allowCustomRequests;
    10. private int upgradeTimeout;
    11. private int pingTimeout;
    12. private int pingInterval;
    13. @Bean
    14. public SocketIOServer socketIOServer() {
    15. SocketConfig socketConfig = new SocketConfig();
    16. socketConfig.setTcpNoDelay(true);
    17. socketConfig.setSoLinger(0);
    18. com.corundumstudio.socketio.Configuration config = new com.corundumstudio.socketio.Configuration();
    19. config.setSocketConfig(socketConfig);
    20. config.setHostname(host);
    21. config.setPort(port);
    22. config.setBossThreads(bossCount);
    23. config.setWorkerThreads(workCount);
    24. config.setAllowCustomRequests(allowCustomRequests);
    25. config.setUpgradeTimeout(upgradeTimeout);
    26. config.setPingTimeout(pingTimeout);
    27. config.setPingInterval(pingInterval);
    28. return new SocketIOServer(config);
    29. }
    30. @Bean
    31. public SpringAnnotationScanner springAnnotationScanner() {
    32. return new SpringAnnotationScanner(socketIOServer());
    33. }
    34. }

    nacos 里的网关的配置【关键:StripPrefix 需要是0,否则长连接,并不能连接上】

    1. # socket
    2. - id: socket-service
    3. uri: lb://socket-service
    4. predicates:
    5. - Path=/socket.io/**
    6. filters:
    7. - StripPrefix=0

    socket.io的配置

    1. socketio:
    2. application:
    3. name: socket-service
    4. reg-server: 127.0.0.1
    5. host: 127.0.0.1
    6. port: 16001
    7. # 设置最大每帧处理数据的长度,防止他人利用大数据来攻击服务器
    8. maxFramePayloadLength: 1048576
    9. # 设置http交互最大内容长度
    10. maxHttpContentLength: 1048576
    11. # socket连接数大小(如只监听一个端口boss线程组为1即可)
    12. bossCount: 1
    13. workCount: 100
    14. allowCustomRequests: true
    15. # 协议升级超时时间(毫秒),默认10秒。HTTP握手升级为ws协议超时时间
    16. upgradeTimeout: 100000
    17. # Ping消息超时时间(毫秒),默认60秒,这个时间间隔内没有接收到心跳消息就会发送超时事件
    18. pingTimeout: 6000000
    19. # Ping消息间隔(毫秒),默认25秒。客户端向服务器发送一条心跳消息间隔
    20. pingInterval: 25000

    三、问题

         1、socket.io与其它微服务在同一个web容器里,这时候是2个端口。所以socket.io另注册了一个服务名。

         2、解决分布式的问题。我是采用了后端增加消息中间件来分发。

    有问题可以私信我。

  • 相关阅读:
    Pr:更改剪辑的速度和持续时间
    useRef 定义的 ref 在控制台可以打印但是页面不生效?
    考虑阶梯式碳交易机制与电制氢的综合能源系统热电优化附Matlab代码
    JDK、JRE 和 JVM 的区别和联系
    上周热点回顾(11.28-12.4)
    保姆级教程 --redis启动命令
    element-ui在vue中如何实现校验两个复选框至少选择一个!
    IO流【】【】【】
    四目大视场四目夜视镜 --TFN TD401 大视场头盔四目夜视仪 夜视镜 无需转头微光夜视系统 四目四管
    【C++】STL常用容器:string类(详解及模拟实现)
  • 原文地址:https://blog.csdn.net/saperliu/article/details/136619465