项目需要集成实时消息通讯,所以尝试在项目中集成websocket。技术上选择了Socket.io,前/后端统一使用此开源项目来实现需求。
一、版本
spring cloud: 2022.0.4
注册中心: nacos
Netty-Socket.io : 2.0.9
- <dependency>
- <groupId>com.corundumstudio.socketiogroupId>
- <artifactId>netty-socketioartifactId>
- <version>${netty-socketio.version}version>
- dependency>
前端:vue3、socket.io-client【 4.7.4】
二、关键代码
socket event handler
- @Component
- @Slf4j
- public class NettySocketEventHandler {
-
- @Autowired
- private SocketIOServer socketIoServer;
-
- @Autowired
- private SocketClientService socketClientService;
-
- @Value("${socketio.application.name}")
- private String serverName;
-
- @Value("${socketio.reg-server}")
- private String host;
-
- @Autowired
- private NacosDiscoveryProperties nacosDiscoveryProperties;
-
-
- private void start() throws Exception {
- //注册到Nacos里
- registerNamingService(serverName, String.valueOf(socketIoServer.getConfiguration().getPort()));
- }
-
- /**
- * 注册到 nacos 服务中
- *
- * @param nettyName netty服务名称
- * @param nettyPort netty服务端口
- */
- private void registerNamingService(String nettyName, String nettyPort) {
- try {
- log.info("-------------- register socket server {} {}", nettyName, nettyPort);
- NamingService namingService = NamingFactory.createNamingService(nacosDiscoveryProperties.getServerAddr());
- // 注册到nacos
- Instance instance = new Instance();
- instance.setIp(host);
- instance.setPort(socketIoServer.getConfiguration().getPort());
- instance.setServiceName(nettyName);
- instance.setWeight(1.0);
- Map
map = new HashMap<>(); - map.put("preserved.register.source", "SPRING_CLOUD");
- instance.setMetadata(map);
- namingService.registerInstance(nettyName, instance);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- @PostConstruct
- private void autoStartup() {
- try {
- socketIoServer.start();
- start();
- log.info("-------------- start socket server ----------");
- } catch (Exception ex) {
- log.error("SocketIOServer启动失败", ex);
- }
- }
-
- @PreDestroy
- private void autoStop() {
- socketIoServer.stop();
- }
-
- //socket事件消息接收入口
- @OnEvent(value = MessageConstant.SOCKET_EVENT_NAME) //value值与前端自行商定
- public void onEvent(SocketIOClient client, AckRequest ackRequest, SendMessageDTO data) {
- // client.sendEvent("message_event", "已成功接收数据"); //向前端发送接收数据成功标识
- log.info("socket event {}", JSON.toJSONString(data));
- }
-
- //socket添加@OnDisconnect事件,客户端断开连接时调用,刷新客户端信息
- @OnDisconnect
- public void onDisconnect(SocketIOClient client) {
-
- String userId = client.getHandshakeData().getSingleUrlParam("userId");
- UUID sessionId = client.getSessionId();
-
- log.info("socket Disconnect {} {}", userId, sessionId);
- socketClientService.deleteSessionClientByUserId(userId, sessionId);
-
- log.info("socket Disconnect {} {}", userId, sessionId);
- client.disconnect();
- }
-
- //socket添加connect事件,当客户端发起连接时调用
- @OnConnect
- public void onConnect(SocketIOClient client) {
- // log.info("socket onConnect {}", JSON.toJSONString(client));
- if (client != null) {
- HandshakeData client_mac = client.getHandshakeData();
- String userId = client_mac.getSingleUrlParam("userId");
- // 处理业务
- } else {
- log.error("客户端为空");
- }
- }
-
- }
socket client service
- @Component
- public class SocketClientService {
- private static ConcurrentHashMap
> concurrentHashMap = new ConcurrentHashMap<>(); -
-
- /**
- * 保存客户端实例,发送消息时使用
- *
- * @param userId 用户ID
- * @param sessionId 用户连接的session,可能存在多个页面连接
- * @param socketIOClient 客户的实例
- */
- public void saveClient(String userId, UUID sessionId, SocketIOClient socketIOClient) {
- HashMap
sessionIdClientCache = concurrentHashMap.get(userId); - if (sessionIdClientCache == null) {
- sessionIdClientCache = new HashMap<>();
- }
- sessionIdClientCache.put(sessionId, socketIOClient);
- concurrentHashMap.put(userId, sessionIdClientCache);
- }
-
- /**
- * 获取用户的客户端实例
- *
- * @param userId 用户的ID
- * @return HashMap
- */
- public HashMap
getUserClient(String userId) { - return concurrentHashMap.get(userId);
- }
-
- /**
- * 获取所有客户端,不区分用户
- *
- * @return 集合
- */
- public Collection
> getAllClient() { - return concurrentHashMap.values();
- }
-
- /**
- * 删除用户的某个页面的连接
- *
- * @param userId 用户ID
- * @param sessionId 页面的sessionID
- */
- public void deleteSessionClientByUserId(String userId, UUID sessionId) {
- if(concurrentHashMap.get(userId) != null){
- concurrentHashMap.get(userId).remove(sessionId);
- }
- }
-
- /**
- * 删除用户的所有连接的实例
- *
- * @param userId 用户的ID
- */
- public void deleteUserCacheByUserId(String userId) {
- concurrentHashMap.remove(userId);
- }
- }
socket config
- @Data
- @Configuration
- @ConfigurationProperties(prefix = "socketio")
- public class SocketIOConfig {
-
- private String host;
- private Integer port;
- private int bossCount;
- private int workCount;
- private boolean allowCustomRequests;
- private int upgradeTimeout;
- private int pingTimeout;
- private int pingInterval;
-
- @Bean
- public SocketIOServer socketIOServer() {
- SocketConfig socketConfig = new SocketConfig();
- socketConfig.setTcpNoDelay(true);
- socketConfig.setSoLinger(0);
- com.corundumstudio.socketio.Configuration config = new com.corundumstudio.socketio.Configuration();
- config.setSocketConfig(socketConfig);
- config.setHostname(host);
- config.setPort(port);
- config.setBossThreads(bossCount);
- config.setWorkerThreads(workCount);
- config.setAllowCustomRequests(allowCustomRequests);
- config.setUpgradeTimeout(upgradeTimeout);
- config.setPingTimeout(pingTimeout);
- config.setPingInterval(pingInterval);
- return new SocketIOServer(config);
- }
-
- @Bean
- public SpringAnnotationScanner springAnnotationScanner() {
- return new SpringAnnotationScanner(socketIOServer());
- }
- }
nacos 里的网关的配置【关键:StripPrefix 需要是0,否则长连接,并不能连接上】
- # socket
- - id: socket-service
- uri: lb://socket-service
- predicates:
- - Path=/socket.io/**
- filters:
- - StripPrefix=0
socket.io的配置
- socketio:
- application:
- name: socket-service
- reg-server: 127.0.0.1
- host: 127.0.0.1
- port: 16001
- # 设置最大每帧处理数据的长度,防止他人利用大数据来攻击服务器
- maxFramePayloadLength: 1048576
- # 设置http交互最大内容长度
- maxHttpContentLength: 1048576
- # socket连接数大小(如只监听一个端口boss线程组为1即可)
- bossCount: 1
- workCount: 100
- allowCustomRequests: true
- # 协议升级超时时间(毫秒),默认10秒。HTTP握手升级为ws协议超时时间
- upgradeTimeout: 100000
- # Ping消息超时时间(毫秒),默认60秒,这个时间间隔内没有接收到心跳消息就会发送超时事件
- pingTimeout: 6000000
- # Ping消息间隔(毫秒),默认25秒。客户端向服务器发送一条心跳消息间隔
- pingInterval: 25000
三、问题
1、socket.io与其它微服务在同一个web容器里,这时候是2个端口。所以socket.io另注册了一个服务名。
2、解决分布式的问题。我是采用了后端增加消息中间件来分发。
有问题可以私信我。