• SpringBoot分布式Netty集群,通过Redis发布/订阅广播


    一、前言

            之前做用springboot+websocket做双向通讯时,websocket的session存在无法序列化导致集群不能通过共享session来实现,后来采取了记录需要推送客户端ip,然后用http去请求web接口这个不友好的方式。当然需求只需要做扫码登录,这种方式影响也不会有什么影响。但集群问题一直没解决就在心里埋下了个种子。

    二、正文

            用netty搭建websocket集群服务,因为netty中的channel是在本地的需要整合rabbitmq或者redis等发布/订阅模式来实现消息发送(或者Channel共享,具体还没考究母鸡能不能实现)。这里用redis做广播(如果用redis需要考虑消息丢失和ack等情况,这里只做演示)

    三、流程图 

            根据流程图,用户端client1想给client2发送消息,如果是单机的话就很简单,直接拿到client2的Channel发送Message就行了。但如果是server集群的话,就需要广播消息,让每个集群节点都收到内容,Channel不在本地的话就忽略。

                

    四、环境搭建

    1、pom文件需要的依赖

    1. <!-- 缓存begin -->
    2. <dependency>
    3. <groupId>org.springframework.boot</groupId>
    4. <artifactId>spring-boot-starter-data-redis</artifactId>
    5. </dependency>
    6. <!-- redisson -->
    7. <dependency>
    8. <groupId>org.redisson</groupId>
    9. <artifactId>redisson</artifactId>
    10. <version>2.13.0</version>
    11. </dependency>
    12. <!-- netty -->
    13. <dependency>
    14. <groupId>io.netty</groupId>
    15. <artifactId>netty-all</artifactId>
    16. <version>4.1.29.Final</version>
    17. </dependency>
    18. <!--lombok 依赖包 -->
    19. <dependency>
    20. <groupId>org.projectlombok</groupId>
    21. <artifactId>lombok</artifactId>
    22. <optional>true</optional>
    23. </dependency>

    2、RedisConfig配置文件,注意:要与MessageListener实现类统一序列化

    1. import org.springframework.context.annotation.Bean;
    2. import org.springframework.context.annotation.Configuration;
    3. import org.springframework.data.redis.connection.RedisConnectionFactory;
    4. import org.springframework.data.redis.core.RedisTemplate;
    5. import org.springframework.data.redis.serializer.RedisSerializer;
    6. import java.net.UnknownHostException;
    7. /**
    8. * @Description
    9. * @Author kele
    10. * @Data 2023/8/31 16:34
    11. */
    12. @Configuration
    13. public class RedisConfig {
    14. @Bean
    15. public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) throws UnknownHostException {
    16. // 将template 泛型设置为 <String, Object>
    17. RedisTemplate<String, Object> template = new RedisTemplate();
    18. // 连接工厂,不必修改
    19. template.setConnectionFactory(redisConnectionFactory);
    20. /*
    21. * 序列化设置
    22. */
    23. // key、hash的key 采用 String序列化方式
    24. template.setKeySerializer(RedisSerializer.string());
    25. template.setHashKeySerializer(RedisSerializer.string());
    26. // value、hash的value 采用 Jackson 序列化方式
    27. template.setValueSerializer(RedisSerializer.json());
    28. template.setHashValueSerializer(RedisSerializer.json());
    29. template.afterPropertiesSet();
    30. return template;
    31. }
    32. }

    3.定义redis订阅监听类配置

    RedisMessageListenerConfiguration .java

    1. import org.springframework.beans.factory.annotation.Autowired;
    2. import org.springframework.context.annotation.Bean;
    3. import org.springframework.context.annotation.Configuration;
    4. import org.springframework.data.redis.connection.RedisConnectionFactory;
    5. import org.springframework.data.redis.listener.PatternTopic;
    6. import org.springframework.data.redis.listener.RedisMessageListenerContainer;
    7. import java.util.Arrays;
    8. import java.util.List;
    9. /**
    10. * @Description
    11. * @Author kele
    12. * @Data 2023/8/31 16:07
    13. */
    14. @Configuration
    15. public class RedisMessageListenerConfiguration {
    16. @Autowired
    17. private LifeRedisMessageListener lifeRedisMessageListener;
    18. @Autowired
    19. private RedisConnectionFactory redisConnectionFactory;
    20. /**
    21. * 配置订阅关系
    22. */
    23. @Bean
    24. public RedisMessageListenerContainer container() {
    25. RedisMessageListenerContainer container = new RedisMessageListenerContainer();
    26. container.setConnectionFactory(redisConnectionFactory);
    27. //订阅频道
    28. List topicList = Arrays.asList(new PatternTopic("life.*"),new PatternTopic("*.life"));
    29. container.addMessageListener(lifeRedisMessageListener, topicList);
    30. return container;
    31. }
    32. }

    LifeRedisMessageListener .java

    1. import com.alibaba.fastjson.JSONObject;
    2. import com.na.integration.socket.websocket.WebSocketHandler;
    3. import com.na.model.dto.NettyRedisConnectionDto;
    4. import com.na.common.utils.JSONUtils;
    5. import io.netty.channel.Channel;
    6. import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
    7. import lombok.extern.slf4j.Slf4j;
    8. import org.springframework.beans.factory.annotation.Autowired;
    9. import org.springframework.data.redis.connection.Message;
    10. import org.springframework.data.redis.connection.MessageListener;
    11. import org.springframework.data.redis.core.RedisTemplate;
    12. import org.springframework.data.redis.serializer.RedisSerializer;
    13. import org.springframework.stereotype.Component;
    14. import java.time.LocalDateTime;
    15. import java.util.Map;
    16. /**
    17. * @Description
    18. * @Author kele
    19. * @Data 2023/8/31 16:07
    20. */
    21. @Component
    22. @Slf4j
    23. public class LifeRedisMessageListener implements MessageListener {
    24. @Autowired
    25. private RedisTemplate redisTemplate;
    26. @Override
    27. public void onMessage(Message message, byte[] pattern) {
    28. //需要在加载bean的时候配置相同的redis序列化器,否则会乱码
    29. RedisSerializer keySerializer = redisTemplate.getKeySerializer();
    30. RedisSerializer valueSerializer = redisTemplate.getValueSerializer();
    31. log.info("----------Life接收到发布者消息----------");
    32. log.info("|频道:{}", keySerializer.deserialize(message.getChannel()));
    33. log.info("|当前监听器绑定的pattern:{}", new String(pattern));
    34. log.info("|消息内容:{}", valueSerializer.deserialize(message.getBody()));
    35. log.info("---------------------------------");
    36. //反序列化
    37. JSONObject jsonObject = JSONUtils.toJsonObj((String) valueSerializer.deserialize(message.getBody()));
    38. NettyRedisConnectionDto dto = new NettyRedisConnectionDto();
    39. Long id = Long.valueOf(jsonObject.get("id").toString());
    40. dto.setId(id);
    41. dto.setChannel((Channel) jsonObject.get("key"));
    42. dto.setSendMessage(jsonObject.get("sendMessage").toString());
    43. dto.setSendId(Long.valueOf(jsonObject.get("sendId").toString()));
    44. /**
    45. * 发送内容
    46. */
    47. //获取所有连接的客户端,如果是集群环境使用redis的hash数据类型存储即可
    48. Map<String, Channel> channelMap = WebSocketHandler.getChannelMap();
    49. //获取与用户主键绑定的channel,如果是集群环境使用redis的hash数据类型存储即可
    50. Map<Long, String> clientMap = WebSocketHandler.getClientMap();
    51. //解决问题六,websocket集群中一个客户端向其他客户端主动发送消息,如何实现?
    52. String v = clientMap.get(dto.getSendId());
    53. Channel channel = null;
    54. try {
    55. channel = channelMap.get(v);
    56. } catch (NullPointerException e) {
    57. log.error("消息id:" + id + "通道不在本地");
    58. return;
    59. }
    60. Channel finalChannel = channel;
    61. channel.eventLoop().execute(() -> finalChannel.writeAndFlush(new TextWebSocketFrame(
    62. Thread.currentThread().getName() + "服务器时间" + LocalDateTime.now() + "来自id为"
    63. + dto.getId() + ",发送的内容message=" + dto.getSendMessage())));
    64. // redisTemplate.opsForValue().get("");
    65. }

    4、消息接收对象

    1. import lombok.Data;
    2. import lombok.experimental.Accessors;
    3. import java.io.Serializable;
    4. @Data
    5. @Accessors(chain = true)
    6. public class MessageRequest implements Serializable {
    7. private Long unionId;
    8. private Integer current = 1;
    9. private Integer size = 10;
    10. }

    5、websocket通道初始化器

    1. import io.netty.channel.ChannelInitializer;
    2. import io.netty.channel.ChannelPipeline;
    3. import io.netty.channel.socket.SocketChannel;
    4. import io.netty.handler.codec.http.HttpObjectAggregator;
    5. import io.netty.handler.codec.http.HttpServerCodec;
    6. import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
    7. import io.netty.handler.stream.ChunkedWriteHandler;
    8. import org.springframework.beans.factory.annotation.Autowired;
    9. import org.springframework.beans.factory.annotation.Value;
    10. import org.springframework.stereotype.Component;
    11. /**
    12. * @Description websocket通道初始化器
    13. **/
    14. @Component
    15. public class WebsocketChannelInitializer extends ChannelInitializer<SocketChannel> {
    16. @Autowired
    17. private WebSocketHandler webSocketHandler;
    18. @Value("${websocket.url}")
    19. private String websocketUrl;
    20. @Override
    21. protected void initChannel(SocketChannel socketChannel) throws Exception {
    22. //获取pipeline通道
    23. ChannelPipeline pipeline = socketChannel.pipeline();
    24. //因为基于http协议,使用http的编码和解码器
    25. pipeline.addLast(new HttpServerCodec());
    26. //是以块方式写,添加ChunkedWriteHandler处理器
    27. pipeline.addLast(new ChunkedWriteHandler());
    28. /*
    29. 说明
    30. 1\. http数据在传输过程中是分段, HttpObjectAggregator ,就是可以将多个段聚合
    31. 2\. 这就就是为什么,当浏览器发送大量数据时,就会发出多次http请求
    32. */
    33. pipeline.addLast(new HttpObjectAggregator(8192));
    34. /* 说明
    35. 1\. 对应websocket ,它的数据是以 帧(frame) 形式传递
    36. 2\. 可以看到WebSocketFrame 下面有六个子类
    37. 3\. 浏览器请求时 ws://localhost:7000/msg 表示请求的uri
    38. 4\. WebSocketServerProtocolHandler 核心功能是将 http协议升级为 ws协议 , 保持长连接
    39. 5\. 是通过一个 状态码 101
    40. */
    41. pipeline.addLast(new WebSocketServerProtocolHandler(websocketUrl));
    42. //自定义的handler ,处理业务逻辑
    43. pipeline.addLast(webSocketHandler);
    44. }
    45. }

    6、定义Handler处理器

    1. import com.alibaba.fastjson.JSON;
    2. import com.na.enums.ResultCode;
    3. import com.na.exceptions.RRException;
    4. import com.na.utils.RedisLockUtil;
    5. import io.netty.channel.Channel;
    6. import io.netty.channel.ChannelHandler;
    7. import io.netty.channel.ChannelHandlerContext;
    8. import io.netty.channel.SimpleChannelInboundHandler;
    9. import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
    10. import io.netty.util.concurrent.Future;
    11. import lombok.extern.slf4j.Slf4j;
    12. import org.apache.commons.lang.StringUtils;
    13. import org.springframework.beans.factory.annotation.Autowired;
    14. import org.springframework.data.redis.core.RedisTemplate;
    15. import org.springframework.stereotype.Component;
    16. import javax.annotation.Resource;
    17. import java.io.IOException;
    18. import java.time.LocalDateTime;
    19. import java.util.Map;
    20. import java.util.Optional;
    21. import java.util.concurrent.ConcurrentHashMap;
    22. import java.util.concurrent.TimeUnit;
    23. /**
    24. * @Description websocket处理器
    25. **/
    26. @Slf4j
    27. @Component
    28. @ChannelHandler.Sharable//保证处理器,在整个生命周期中就是以单例的形式存在,方便统计客户端的在线数量
    29. public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
    30. @Autowired
    31. private RedisLockUtil redisLockUtil;
    32. //通道map,存储channel,用于群发消息,以及统计客户端的在线数量,解决问题问题三,如果是集群环境使用redis的hash数据类型存储即可
    33. private static Map<String, Channel> channelMap = new ConcurrentHashMap<>();
    34. //任务map,存储future,用于停止队列任务
    35. private static Map<String, Future> futureMap = new ConcurrentHashMap<>();
    36. //存储channel的id和用户主键的映射,客户端保证用户主键传入的是唯一值,解决问题四,如果是集群中需要换成redis的hash数据类型存储即可
    37. private static Map<Long, String> clientMap = new ConcurrentHashMap<>();
    38. @Resource
    39. private RedisTemplate<String, Object> redisTemplate;
    40. /**
    41. * 客户端发送给服务端的消息
    42. *
    43. * @param ctx
    44. * @param msg
    45. * @throws Exception
    46. */
    47. @Override
    48. protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
    49. try {
    50. //接受客户端发送的消息
    51. MessageRequest messageRequest = JSON.parseObject(msg.text(), MessageRequest.class);
    52. //每个channel都有id,asLongText是全局channel唯一id
    53. String key = ctx.channel().id().asLongText();
    54. //存储channel的id和用户的主键
    55. clientMap.put(messageRequest.getUnionId(), key);
    56. log.info("接受客户端的消息......" + ctx.channel().remoteAddress() + "-参数[" + messageRequest.getUnionId() + "]");
    57. if (!channelMap.containsKey(key)) {
    58. //使用channel中的任务队列,做周期循环推送客户端消息,解决问题二和问题五
    59. Future future = ctx.channel().eventLoop().scheduleAtFixedRate(new WebsocketRunnable(ctx, messageRequest), 0, 10, TimeUnit.SECONDS);
    60. //存储客户端和服务的通信的Chanel
    61. channelMap.put(key, ctx.channel());
    62. //存储每个channel中的future,保证每个channel中有一个定时任务在执行
    63. futureMap.put(key, future);
    64. } else {
    65. //每次客户端和服务的主动通信,和服务端周期向客户端推送消息互不影响 解决问题一
    66. ctx.channel().writeAndFlush(new TextWebSocketFrame(Thread.currentThread().getName() + "服务器时间" + LocalDateTime.now() + "wdy"));
    67. }
    68. } catch (Exception e) {
    69. log.error("websocket服务器推送消息发生错误:", e);
    70. }
    71. }
    72. /**
    73. * 注册时执行
    74. */
    75. @Override
    76. public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
    77. super.channelRegistered(ctx);
    78. log.info("--channelRegistered注册时执行--" + ctx.channel().id().toString());
    79. }
    80. /**
    81. * 客户端连接时候的操作
    82. *
    83. * @param ctx
    84. * @throws Exception
    85. */
    86. @Override
    87. public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
    88. log.info("一个客户端连接......" + ctx.channel().remoteAddress() + Thread.currentThread().getName());
    89. }
    90. /**
    91. * 离线时执行
    92. */
    93. @Override
    94. public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
    95. super.channelUnregistered(ctx);
    96. log.info("--channelUnregistered离线时执行--" + ctx.channel().id().toString());
    97. }
    98. /**
    99. * 客户端掉线时的操作
    100. *
    101. * @param ctx
    102. * @throws Exception
    103. */
    104. @Override
    105. public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
    106. String key = ctx.channel().id().asLongText();
    107. //移除通信过的channel
    108. channelMap.remove(key);
    109. //移除和用户绑定的channel
    110. clientMap.remove(key);
    111. //关闭掉线客户端的future
    112. Optional.ofNullable(futureMap.get(key)).ifPresent(future -> {
    113. future.cancel(true);
    114. futureMap.remove(key);
    115. });
    116. log.info("一个客户端移除......" + ctx.channel().remoteAddress());
    117. ctx.close(); //断开连接时,必须关闭,否则造成资源浪费,并发量很大情况下可能造成宕机
    118. }
    119. /**
    120. * 从客户端收到新的数据、读取完成时调用
    121. */
    122. @Override
    123. public void channelReadComplete(ChannelHandlerContext ctx) throws IOException {
    124. log.info("--channelReadComplete从客户端收到新的数据--");
    125. ctx.flush();
    126. }
    127. /**
    128. * 发生异常时执行的操作
    129. * 当出现 Throwable 对象才会被调用,即当 Netty 由于 IO 错误或者处理器在处理事件时抛出的异常时
    130. *
    131. * @param ctx
    132. * @param cause
    133. * @throws Exception
    134. */
    135. @Override
    136. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    137. String key = ctx.channel().id().asLongText();
    138. //移除通信过的channel
    139. channelMap.remove(key);
    140. //移除和用户绑定的channel
    141. clientMap.remove(key);
    142. //移除定时任务
    143. Optional.ofNullable(futureMap.get(key)).ifPresent(future -> {
    144. future.cancel(true);
    145. futureMap.remove(key);
    146. });
    147. //关闭长连接
    148. ctx.close();
    149. log.info("异常发生 " + cause.getMessage());
    150. }
    151. public static Map<String, Channel> getChannelMap() {
    152. return channelMap;
    153. }
    154. public static Map<String, Future> getFutureMap() {
    155. return futureMap;
    156. }
    157. public static Map<Long, String> getClientMap() {
    158. return clientMap;
    159. }
    160. /**
    161. * redission 防止重复在线,多个实例的本地缓存是否存在同一个id和与幂等性,这样会导致想要接收方混乱的bug
    162. * @param key
    163. * @param v
    164. */
    165. private void addChannelMap(String key, Channel v) {
    166. try {
    167. //定义keykey的锁
    168. redisLockUtil.lock(key, key, 10000, 3, 5000);
    169. WebSocketHandler.channelMap.put(key, v);
    170. } finally {
    171. redisLockUtil.unlock(key.toString(), key.toString());
    172. }
    173. }
    174. private void addFutureMap(String key, Future v) {
    175. try {
    176. //定义keykey的锁
    177. redisLockUtil.lock(key, key, 10000, 3, 5000);
    178. WebSocketHandler.futureMap.put(key, v);
    179. } finally {
    180. redisLockUtil.unlock(key.toString(), key.toString());
    181. }
    182. }
    183. private void addClientMap(Long key, String v) {
    184. try {
    185. //定义keykey的锁
    186. redisLockUtil.lock(key.toString(), key.toString(), 10000, 3, 5000);
    187. WebSocketHandler.clientMap.put(key, v);
    188. } finally {
    189. redisLockUtil.unlock(key.toString(), key.toString());
    190. }
    191. }
    192. public static void sendMessage(String key, String message) {
    193. if (StringUtils.isEmpty(key)) {
    194. throw new RRException(ResultCode.PARAM_NOT_NULL);
    195. }
    196. Channel channel = channelMap.get(key);
    197. if (channel == null) {
    198. throw new RRException(ResultCode.ID_IS_NULL);
    199. }
    200. try {
    201. channel.writeAndFlush(message);
    202. } catch (Exception e) {
    203. throw new RRException(ResultCode.SOME_USERS_SEND_FAIL);
    204. }
    205. }
    206. }

    7、websocket初始化器

    1. import io.netty.bootstrap.ServerBootstrap;
    2. import io.netty.channel.ChannelFuture;
    3. import io.netty.channel.EventLoopGroup;
    4. import io.netty.channel.nio.NioEventLoopGroup;
    5. import io.netty.channel.socket.nio.NioServerSocketChannel;
    6. import io.netty.handler.logging.LogLevel;
    7. import io.netty.handler.logging.LoggingHandler;
    8. import lombok.extern.slf4j.Slf4j;
    9. import org.springframework.beans.factory.annotation.Value;
    10. import org.springframework.scheduling.annotation.Async;
    11. import org.springframework.stereotype.Component;
    12. import javax.annotation.Resource;
    13. /**
    14. * @Description websocket初始化器
    15. **/
    16. @Slf4j
    17. @Component
    18. public class WebsocketInitialization {
    19. @Resource
    20. private WebsocketChannelInitializer websocketChannelInitializer;
    21. @Value("${websocket.port}")
    22. private Integer port;
    23. @Async
    24. public void init() throws InterruptedException {
    25. //bossGroup连接线程组,主要负责接受客户端连接,一般一个线程足矣
    26. EventLoopGroup bossGroup = new NioEventLoopGroup(1);
    27. //workerGroup工作线程组,主要负责网络IO读写
    28. EventLoopGroup workerGroup = new NioEventLoopGroup();
    29. try {
    30. //启动辅助类
    31. ServerBootstrap serverBootstrap = new ServerBootstrap();
    32. //bootstrap绑定两个线程组
    33. serverBootstrap.group(bossGroup, workerGroup);
    34. //设置通道为NioChannel
    35. serverBootstrap.channel(NioServerSocketChannel.class);
    36. //可以对入站\出站事件进行日志记录,从而方便我们进行问题排查。
    37. serverBootstrap.handler(new LoggingHandler(LogLevel.INFO));
    38. //设置自定义的通道初始化器,用于入站操作
    39. serverBootstrap.childHandler(websocketChannelInitializer);
    40. //启动服务器,本质是Java程序发起系统调用,然后内核底层起了一个处于监听状态的服务,生成一个文件描述符FD
    41. ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
    42. //异步
    43. channelFuture.channel().closeFuture().sync();
    44. } finally {
    45. bossGroup.shutdownGracefully();
    46. workerGroup.shutdownGracefully();
    47. }
    48. }
    49. }

    8、自定义消息发送类

    1. import io.netty.channel.ChannelHandlerContext;
    2. import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
    3. import lombok.extern.slf4j.Slf4j;
    4. import java.time.LocalDateTime;
    5. /**
    6. * @Description 向客户端发送消息
    7. **/
    8. @Slf4j
    9. public class WebsocketRunnable implements Runnable {
    10. private ChannelHandlerContext channelHandlerContext;
    11. private MessageRequest messageRequest;
    12. public WebsocketRunnable(ChannelHandlerContext channelHandlerContext,MessageRequest messageRequest) {
    13. this.channelHandlerContext = channelHandlerContext;
    14. this.messageRequest = messageRequest;
    15. }
    16. @Override
    17. public void run() {
    18. try {
    19. log.info(Thread.currentThread().getName()+"--"+LocalDateTime.now().toString());
    20. channelHandlerContext.channel().writeAndFlush(new TextWebSocketFrame(LocalDateTime.now().toString()));
    21. } catch (Exception e) {
    22. log.error("websocket服务器推送消息发生错误:",e);
    23. }
    24. }
    25. }

    9、如果使用redission来加锁添加配置,不使用(跳过)

    RedissonConfiguration.java配置类
    1. import lombok.extern.slf4j.Slf4j;
    2. import org.redisson.Redisson;
    3. import org.redisson.api.RedissonClient;
    4. import org.redisson.config.ClusterServersConfig;
    5. import org.redisson.config.Config;
    6. import org.redisson.config.SingleServerConfig;
    7. import org.springframework.beans.factory.annotation.Autowired;
    8. import org.springframework.boot.autoconfigure.data.redis.RedisProperties;
    9. import org.springframework.context.annotation.Bean;
    10. import org.springframework.context.annotation.Configuration;
    11. import org.springframework.util.StringUtils;
    12. import java.util.ArrayList;
    13. import java.util.List;
    14. @Slf4j
    15. @Configuration
    16. public class RedissonConfiguration {
    17. @Autowired
    18. private RedisProperties redisProperties;
    19. /**
    20. * 初始化RedissonClient客户端
    21. * 注意:
    22. * 此实例集群为3节点,各节点1主1从
    23. * 集群模式,集群节点的地址须使用“redis://”前缀,否则将会报错。
    24. *
    25. * @return {@link RedissonClient}
    26. */
    27. @Bean
    28. public RedissonClient getRedissonClient() {
    29. Config config = new Config();
    30. if (redisProperties.getCluster() != null) {
    31. //集群模式配置
    32. List<String> nodes = redisProperties.getCluster().getNodes();
    33. List<String> clusterNodes = new ArrayList<>();
    34. for (int i = 0; i < nodes.size(); i++) {
    35. clusterNodes.add("redis://" + nodes.get(i));
    36. }
    37. ClusterServersConfig clusterServersConfig = config.useClusterServers()
    38. .addNodeAddress(clusterNodes.toArray(new String[clusterNodes.size()]));
    39. if (!StringUtils.isEmpty(redisProperties.getPassword())) {
    40. clusterServersConfig.setPassword(redisProperties.getPassword());
    41. }
    42. } else {
    43. //单节点配置
    44. String address = "redis://" + redisProperties.getHost() + ":" + redisProperties.getPort();
    45. SingleServerConfig serverConfig = config.useSingleServer();
    46. serverConfig.setAddress(address);
    47. if (!StringUtils.isEmpty(redisProperties.getPassword())) {
    48. serverConfig.setPassword(redisProperties.getPassword());
    49. }
    50. serverConfig.setDatabase(redisProperties.getDatabase());
    51. }
    52. //看门狗的锁续期时间,默认30000ms,这里配置成15000ms
    53. // config.setLockWatchdogTimeout(15000);
    54. config.setLockWatchdogTimeout(15000);
    55. return Redisson.create(config);
    56. }
    57. }
    RedisLockUtil.java
    1. import org.redisson.api.RLock;
    2. import org.redisson.api.RedissonClient;
    3. import org.slf4j.Logger;
    4. import org.slf4j.LoggerFactory;
    5. import org.springframework.beans.factory.annotation.Autowired;
    6. import org.springframework.core.annotation.Order;
    7. import org.springframework.stereotype.Component;
    8. import java.util.concurrent.TimeUnit;
    9. @Component
    10. //@Order(value = 2)
    11. public class RedisLockUtil {
    12. private final Logger logger = LoggerFactory.getLogger(RedisLockUtil.class);
    13. @Autowired
    14. private RedissonClient redissonClient;
    15. /* @Autowired
    16. public RedisLockUtil(@Qualifier("customRedisson") RedissonClient redissonClient) {
    17. this.redissonClient = redissonClient;
    18. }*/
    19. /**
    20. * 源码
    21. * 1.固定有效期的锁:超过有效期leaseTime后,自动释放锁。
    22. *
    23. * public void lock(long leaseTime, TimeUnit unit) {
    24. * try {
    25. * this.lockInterruptibly(leaseTime, unit);
    26. * } catch (InterruptedException var5) {
    27. * Thread.currentThread().interrupt();
    28. * }
    29. * }
    30. * 2.没有有效期的锁:默认30秒,然后采用Watchdog进行续期,直到业务逻辑执行完毕。
    31. *
    32. * public void lock() {
    33. * try {
    34. * this.lockInterruptibly();
    35. * } catch (InterruptedException var2) {
    36. * Thread.currentThread().interrupt();
    37. * }
    38. * }
    39. * ————————————————
    40. */
    41. /**
    42. * 加锁
    43. * @param key 锁的 key
    44. * @param value value ( key + value 必须保证唯一)
    45. * @param expire key 的过期时间,单位 ms
    46. * @param retryTimes 重试次数,即加锁失败之后的重试次数
    47. * @param retryInterval 重试时间间隔,单位 ms
    48. * @return 加锁 true 成功
    49. */
    50. public RLock lock(String key, String value, long expire, int retryTimes, long retryInterval) {
    51. logger.info("locking... redisK = {}", key);
    52. RLock fairLock = redissonClient.getFairLock(key + ":" + value);
    53. try {
    54. boolean tryLock = fairLock.tryLock(0, expire, TimeUnit.MILLISECONDS);//是否加锁成功
    55. if (tryLock) {
    56. logger.info("locked... redisK = {}", key);
    57. return fairLock;
    58. } else {
    59. //重试获取锁
    60. logger.info("retry to acquire lock: [redisK = {}]", key);
    61. int count = 0;
    62. while(count < retryTimes) {
    63. try {
    64. Thread.sleep(retryInterval);
    65. tryLock = fairLock.tryLock(0, expire, TimeUnit.MILLISECONDS);
    66. if(tryLock) {
    67. logger.info("locked... redisK = {}", key);
    68. return fairLock;
    69. }
    70. logger.warn("{} times try to acquire lock", count + 1);
    71. count++;
    72. } catch (Exception e) {
    73. logger.error("acquire redis occurred an exception", e);
    74. break;
    75. }
    76. }
    77. logger.info("fail to acquire lock {}", key);
    78. }
    79. } catch (Throwable e1) {
    80. logger.error("acquire redis occurred an exception", e1);
    81. }
    82. return fairLock;
    83. }
    84. /**
    85. * 加锁
    86. * @param key 锁的 key
    87. * @param value value ( key + value 必须保证唯一)
    88. * @param expire key 的过期时间,单位 ms
    89. * @param retryTimes 重试次数,即加锁失败之后的重试次数
    90. * @param retryInterval 重试时间间隔,单位 ms
    91. * @return 加锁 true 成功
    92. */
    93. public boolean lock2(String key, String value, long expire, int retryTimes, long retryInterval) {
    94. logger.info("locking... redisK = {}", key);
    95. RLock fairLock = redissonClient.getFairLock(key + ":" + value);
    96. try {
    97. boolean tryLock = fairLock.tryLock(0, expire, TimeUnit.MILLISECONDS);
    98. if (tryLock) {
    99. logger.info("locked... redisK = {}", key);
    100. return true;
    101. } else {
    102. //重试获取锁
    103. logger.info("retry to acquire lock: [redisK = {}]", key);
    104. int count = 0;
    105. while(count < retryTimes) {
    106. try {
    107. Thread.sleep(retryInterval);
    108. tryLock = fairLock.tryLock(0, expire, TimeUnit.MILLISECONDS);
    109. if(tryLock) {
    110. logger.info("locked... redisK = {}", key);
    111. return true;
    112. }
    113. logger.warn("{} times try to acquire lock", count + 1);
    114. count++;
    115. } catch (Exception e) {
    116. logger.error("acquire redis occurred an exception", e);
    117. break;
    118. }
    119. }
    120. logger.info("fail to acquire lock {}", key);
    121. return false;
    122. }
    123. } catch (Throwable e1) {
    124. logger.error("acquire redis occurred an exception", e1);
    125. return false;
    126. }
    127. }
    128. /**
    129. * 加锁
    130. * @param key 锁的 key
    131. * @param value value ( key + value 必须保证唯一)
    132. * @param expire key 的过期时间,单位 ms
    133. * @return 加锁 true 成功
    134. */
    135. public boolean lockCheck(String key, String value, long expire) {
    136. logger.info("locking... redisK = {}", key);
    137. RLock fairLock = redissonClient.getFairLock(key + ":" + value);
    138. boolean tryLock = false;
    139. try {
    140. tryLock = fairLock.tryLock(0, expire, TimeUnit.MILLISECONDS);
    141. } catch (Throwable e1) {
    142. logger.error("acquire redis occurred an exception", e1);
    143. }
    144. return tryLock;
    145. }
    146. /**
    147. * 加锁
    148. * @param key 锁的 key
    149. * @param value value ( key + value 必须保证唯一)
    150. * @param expire key 的过期时间,单位 ms
    151. * @return 加锁 true 成功
    152. */
    153. public boolean lockDog(String key, String value, long expire) {
    154. logger.info("locking... redisK = {}", key);
    155. RLock fairLock = redissonClient.getFairLock(key + ":" + value);
    156. boolean tryLock = false;
    157. try {
    158. fairLock.tryLock(0, TimeUnit.MILLISECONDS);
    159. } catch (Throwable e1) {
    160. logger.error("acquire redis occurred an exception", e1);
    161. }
    162. return tryLock;
    163. }
    164. /**
    165. * 释放KEY
    166. * @return 释放锁 true 成功
    167. */
    168. public boolean unlock(String key, String value) {
    169. RLock fairLock = redissonClient.getFairLock(key + ":" + value);
    170. try {
    171. //如果这里抛异常,后续锁无法释放
    172. if (fairLock.isLocked()) {
    173. fairLock.unlock();
    174. logger.info("release lock success");
    175. return true;
    176. }
    177. } catch (Throwable e) {
    178. logger.error("release lock occurred an exception", e);
    179. }finally {
    180. fairLock.unlock();
    181. }
    182. return false;
    183. }
    184. }

     10、定义controller,自定义发送消息

    1. import com.na.integration.socket.websocket.WebSocketHandler;
    2. import com.na.model.dto.NettyRedisConnectionDto;
    3. import com.na.model.vo.NettyRedisConnectionVo;
    4. import com.na.common.utils.JSONUtils;
    5. import io.netty.channel.Channel;
    6. import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
    7. import org.springframework.beans.factory.annotation.Autowired;
    8. import org.springframework.data.redis.core.RedisTemplate;
    9. import org.springframework.web.bind.annotation.RequestBody;
    10. import org.springframework.web.bind.annotation.RequestMapping;
    11. import org.springframework.web.bind.annotation.RestController;
    12. import java.time.LocalDateTime;
    13. import java.util.List;
    14. import java.util.Map;
    15. @RequestMapping("ws")
    16. @RestController
    17. public class WebsocketController {
    18. @Autowired
    19. private RedisTemplate redisTemplate;
    20. /**
    21. * 群发消息
    22. *
    23. * @param idList 要把消息发送给其他用户的主键
    24. */
    25. @RequestMapping("hello1")
    26. private Map hello(List<Long> idList) {
    27. //获取所有连接的客户端,如果是集群环境使用redis的hash数据类型存储即可
    28. Map<String, Channel> channelMap = WebSocketHandler.getChannelMap();
    29. //获取与用户主键绑定的channel,如果是集群环境使用redis的hash数据类型存储即可
    30. Map<Long, String> clientMap = WebSocketHandler.getClientMap();
    31. //解决问题六,websocket集群中一个客户端向其他客户端主动发送消息,如何实现?
    32. idList.stream().forEach(id -> {
    33. String v = clientMap.get(id);
    34. Channel channel = channelMap.get(v);
    35. channel.eventLoop().execute(() -> channel.writeAndFlush(new TextWebSocketFrame(Thread.currentThread().getName() + "服务器时间" + LocalDateTime.now() + "wdy")));
    36. });
    37. redisTemplate.convertAndSend("life.all", "hello publish/subscribe");
    38. return clientMap;
    39. }
    40. /**
    41. * 向redis 发布/订阅模式发送消息 可采用广播消息集群监听
    42. * 需要考虑 接收方是否在线,不在线的情况是缓存还是延迟推送
    43. * 需要考虑是否重复在线,多个实例的本地缓存是否存在同一个id,这样会导致想要接收方混乱的bug
    44. */
    45. @RequestMapping("sendMessage")
    46. private Map sendMessage(@RequestBody NettyRedisConnectionVo vo) {
    47. //获取所有连接的客户端,如果是集群环境使用redis的hash数据类型存储即可
    48. Map<String, Channel> channelMap = WebSocketHandler.getChannelMap();
    49. //获取与用户主键绑定的channel,如果是集群环境使用redis的hash数据类型存储即可
    50. Map<Long, String> clientMap = WebSocketHandler.getClientMap();
    51. //解决问题六,websocket集群中一个客户端向其他客户端主动发送消息,如何实现?
    52. String v = clientMap.get(vo.getSendId());
    53. Channel channel = null;
    54. if (v != null) {
    55. channel = channelMap.get(v);
    56. Channel finalChannel = channel;
    57. //需要发送的 与redis监听定义不同的内容方便测试分辨
    58. channel.eventLoop().execute(() -> finalChannel.writeAndFlush(new TextWebSocketFrame(Thread.currentThread().getName() + "服务器时间" + LocalDateTime.now() + ",message=" + vo.getSendMessage())));
    59. } else {
    60. channel = (Channel) redisTemplate.opsForValue().get("id");
    61. //封装序列化
    62. NettyRedisConnectionDto dto = new NettyRedisConnectionDto()
    63. .setId(vo.getId())
    64. .setSendMessage(vo.getSendMessage())
    65. .setSendId(vo.getSendId())
    66. .setChannel(channel);
    67. redisTemplate.convertAndSend("life.all", JSONUtils.bean2JSONObject(dto));
    68. }
    69. return clientMap;
    70. }
    71. }

    11、启动类配置(与springBoot启动类区分)

    也可以启动方法代码写在springboot启动类里

    WebsocketApplication.java
    1. import com.na.integration.socket.websocket.WebsocketInitialization;
    2. import lombok.extern.slf4j.Slf4j;
    3. import org.springframework.stereotype.Component;
    4. import javax.annotation.PostConstruct;
    5. import javax.annotation.Resource;
    6. @Slf4j
    7. @Component
    8. public class WebsocketApplication {
    9. @Resource
    10. private WebsocketInitialization websocketInitialization;
    11. @PostConstruct
    12. public void start() {
    13. try {
    14. log.info(Thread.currentThread().getName() + ":websocket启动中......");
    15. websocketInitialization.init();
    16. log.info(Thread.currentThread().getName() + ":websocket启动成功!!!");
    17. } catch (Exception e) {
    18. log.error("websocket发生错误:",e);
    19. }
    20. }
    21. }

    12、yml配置,其他配置根据自己实际情况来

    1. websocket:
    2. port: 7000 #端口
    3. url: /msg #访问url

    五、集群测试

    1、启动两个springboot server实例端口分别18088、18089,websocket端口分别是7000、7001,用测试工具创建ws协议请求,

    注意:

    1、unionId需要保持全局唯一

    2、websocket端口和springboot端口不一样

    这里可以看到用户client1已经连接进来了

    2、如果要是用群发或者指定用户的话,就需要用到广播模式。

    指定unionId用户发送请求

    http://localhost:18088/ws/sendMessage

    3 这里可以看到unionId=2的Channel实例在18089服务上从而发送成功,而18088的实例没有unionId=2的Channel就忽略。

    7001客户端也接收到了服务器发来的消息 

    部分代码参照:微服务springcloud环境下基于Netty搭建websocket集群实现服务器消息推送----netty是yyds_netty 集群_码学弟的博客-CSDN博客 

  • 相关阅读:
    ARM编程模型-常用指令集
    学生学python编程---实现贪吃蛇小游戏+原码
    第五章 树和二叉树(下)【哈夫曼树、并查集】
    94后字节P7晒出工资单:狠补了这个,真不错...
    2022年9月8号Java的23设计模式学习(课时一)单例模式
    【爬虫进阶】易班登录加密逆向
    【容器化】Kubernetes(k8s)
    Pyinstaller的生成exe图标
    postgresql-物化视图
    linux 查看 io使用率iotop
  • 原文地址:https://blog.csdn.net/qq_38038507/article/details/132619446