• 分布式Netty集群方案 加代码 SpringBoot 版


    目录

    单机netty是怎么通信的?

    多节点集群netty是怎么通信的呢?

    netty集群是怎么搭建的呢?

    连接上的 client 的 channelId 怎么存入 redis 中?

    在集群模式中 客户端1向客户端2发送信息

    演示效果


    完整的讲解 netty 集群的搭建部署。从0讲解每一步,比如存入的数据是什么样的?多节点消息是怎么通信的?让没有搭建过的童鞋没有疑惑。例如:“唉,我存入redis中的实际是什么?我有没有写对?”

    首先说下单机版 netty 的操作

    普通的 springboot netty 项目,都是 springboot 项目启动加载完成后,启动netty 服务。代码如下

    1. @Component
    2. public class StartUpRunner implements ApplicationRunner {
    3. // 启动 netty 服务的代码
    4. @Override
    5. public void run(ApplicationArguments args) throws Exception {
    6. ServerBootstrap.bind(7177).sync();
    7. }
    8. }
    1. // netty pipeline,配置 websocket,访问路径是 socket.io
    2. @Override
    3. protected void initChannel(NioSocketChannel ch) throws Exception {
    4. ChannelPipeline pipeline = ch.pipeline();
    5. pipeline.addLast(new HttpServerCodec())
    6. .addLast(new ChunkedWriteHandler())
    7. .addLast(new HttpObjectAggregator(1024 * 1024))
    8. .addLast(new WebSocketServerProtocolHandler("/socket.io", null, false, 1024 * 1024 * 50, false, true, 10000L));
    9. }

    然后前端页面访问 ws://localhost:7177/socket.io 进行通讯。

    这样一个单机版本的就 netty 服务就起来了。

    单机netty是怎么通信的?

    client1 连接上 nettyServer,client2 同样连接上了 nettyServer。client1给client2发送信息,client1 查找ChannelGroup找到client2的 channel 信道把消息写入信道中,这样单机版的发送信息就完成了。

    // 如果想让这个单机版的netty服务,集群部署呢?

    首先我们把单机版的netty启动两份,把第一份的 tomcat 端口设置成 7111,netty 端口设置成 7177,第二份的 tomcat 端口设置成 7222,netty 端口设置成 7277,这样就启动了多节点集群服务。如果在多台服务器中,因为ip不同所以端口不用变动。

    client1 连接上 nettyServer1。client2 连接上 nettyServer2。client1给client2发送信息,client1找client2的channel信道发送信息,client1发现它找不到client2的channel。那就不能发送信息啊,要怎么才能发送信息呢?

    我们知道连接上netty服务的客户端 channel 都是存在本地的 自定义 map 或者 netty 提供的 ChannelGroup 里面。所以我的第一想法是 让连接上的客户端channel从存到本地,改成存到分布式缓存redis中,这样启动多个netty的时候,大家都是从redis中把对应客户端所属的channel拿下来发送消息。

    实操之后发现这种方法思路是对的,但 channel 存入redis 没反应。不报错,redis 中也没有数据。

    这里要说的知识点是 channel 是和机器绑定的,不能被序列化和反序列化,存入不了redis中,即使存进去了拿下来也是不能使用的。

    多节点集群netty是怎么通信的呢?

    channel 不能存到redis里面,但是 channel 的 id 是可以存到redis里面的,redis 中存入 client 的信息 key是用户id,value是channelId。

    client1 连接上 nettyServer1。client2 连接上 nettyServer2。client1给client2发送信息,client1 拿client2的用户id,到redis上获取对应client2的 channelId,通过channelId查找本节点中的ChannelGroup,在本节点中找到了 client2 的channel,就可以发送信息了。在本节点中没有找到client2的 channel,就向 消息中间件mq 中广播 client2的channelId,所有nettyServer订阅此消息,nettyServer2收到了mq中的 channelId消息,然后nettyServer2拿着channelId查找ChannelGroup,它找到了channel,通过 channel 向 client2 发送了信息"你真棒"。这样就完成了集群netty的通信。

    上面一直有提到集群,那么

    netty集群是怎么搭建的呢?

    这里介绍 3 种方式

    1:zookeeper 注册中心 

    需要额外编码网关服务实现负载均衡

    2:nginx                               

    只需要配置nginx的负载均衡即可

    3:nacos 注册中心

    需要集成nacos注册中心和gateway网关

    • zookeeper 注册中心

    zookeeper 的方式,是通过 zk 的临时编号节点EPHEMERAL_SEQUENTIAL。在netty服务启动的时候,创建 一个 zk 的临时编号节点,把 netty 服务的 ip 和 端口存入节点中。当 netty 服务停止后,zk 的心跳检测到来时会删除临时节点。zk教程传送门

    当 zk 节点存在的时候,表示有nettyServer,需要编写代码获取zk节点下的nettyServer节点,当有client 请求进来时,拿到这些节点下 netty 的 ip和port,连接对应的 netty 服务。如果需要轮询/IP等负载均衡的话,需要自己实现相应的算法。

    • nginx

    nginx 只需要配置负载均衡即可,nginx 本身就是一个强大的服务器,其他的不用我们考虑

    • nacos 注册中心

    1:集成 nacos,在 netty 服务启动成功后,把 netty 服务注册到 nacos 中,代码如下:

    1. @Component
    2. public class StartUpRunner implements ApplicationRunner {
    3. @Autowired
    4. private NacosDiscoveryProperties nacosDiscoveryProperties;
    5. // 启动 netty 服务的代码
    6. @Override
    7. public void run(ApplicationArguments args) throws Exception {
    8. ServerBootstrap.bind(7177).sync();
    9. registerNamingService("netty-customer-server", "7177");
    10. }
    11. /**
    12. * 注册到 nacos 服务中
    13. *
    14. * @param nettyName netty服务名称
    15. * @param nettyPort netty服务端口
    16. */
    17. private void registerNamingService(String nettyName, String nettyPort) {
    18. try {
    19. NamingService namingService = NamingFactory.createNamingService(nacosDiscoveryProperties.getServerAddr());
    20. InetAddress address = InetAddress.getLocalHost();
    21. namingService.registerInstance(nettyName, address.getHostAddress(), Integer.parseInt(nettyPort));
    22. } catch (Exception e) {
    23. throw new RuntimeException(e);
    24. }
    25. }
    26. }

    访问 nacos 控制台

    2:集成 gateway 实现负载均衡,配置如下:

    1. # 网关服务的端口号
    2. server:
    3. port: 7777
    4. spring:
    5. cloud:
    6. nacos:
    7. # 配置 nacos 服务地址
    8. discovery:
    9. server-addr: 127.0.0.1:8848
    10. gateway:
    11. routes:
    12. - id: netty-customer-service
    13. uri: lb://netty-customer-server
    14. predicates:
    15. - Path=/socket.io/**
    16. discovery:
    17. locator:
    18. enabled: true

    前端 websocket 访问地址:ws://127.0.0.1:7777/socket.io

    • 负载均衡效果

    当 client1 访问 网关,网关把 client1 路由到 nettyServer1 上,client2 访问 网关,网关把 client2 路由到 nettyServer2 上

    连接上的 client 的 channelId 怎么存入 redis 中?

    上面已经讲过存入逻辑了,这里直接贴代码了

    1. @Override
    2. protected void channelRead0(ChannelHandlerContext ctx, String msg) {
    3. // 把 string 转成 实体类型 TODO
    4. // 把 用户id:channelId 存入redis
    5. redisService.hSet("im_netty_channel_id", "userId", ctx.channel().id());
    6. }
    7. @Override
    8. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
    9. log.error("{} 异常断开,异常信息 {}", ctx.channel(), cause.getMessage());
    10. ctx.channel().close();
    11. cause.printStackTrace();
    12. }
    13. /**
    14. * channel 连接成功时 触发
    15. */
    16. @Override
    17. public void channelActive(ChannelHandlerContext ctx) {
    18. channelGroup.add(ctx.channel());
    19. }
    20. /**
    21. * channel 断开连接时 触发
    22. */
    23. @Override
    24. public void channelInactive(ChannelHandlerContext ctx) {
    25. channelGroup.remove(ctx.channel());
    26. }

    在集群模式中 客户端1向客户端2发送信息

    向 mq 广播队列中发送信息

    1. @Override
    2. protected void channelRead0(ChannelHandlerContext ctx, String msg) {
    3. // 从 redis 中获取 client2 的 channelId
    4. Object channelId = redisService.hGet("im_netty_channel_id", "收件人用户id");
    5. // 根据 channelId 从 channelGroup 中查找 channel
    6. Channel toChannel = channelGroup.find((ChannelId) channelId);
    7. if (null != toChannel) {
    8. // 找到了 channel 发送信息
    9. toChannel.writeAndFlush(new TextWebSocketFrame("你真棒"));
    10. } else {
    11. // 没有找到 channel,向 mq 广播队列中发送消息
    12. rabbitmqImService.sendMessageMQ(channelId);
    13. }
    14. }

    mq 的广播队列,监听消息

    1. @RabbitHandler
    2. @RabbitListener
    3. public void messageHandle(Object channelId) {
    4. try {
    5. Channel toChannel = channelGroup.find((ChannelId) channelId);
    6. if (null != toChannel) {
    7. System.err.println(" toChannel 有值");
    8. toChannel.writeAndFlush(new TextWebSocketFrame("你真棒"));
    9. }
    10. } catch (Exception e) {
    11. throw new RuntimeException("消费失败 {}", e);
    12. }
    13. }

    演示效果

    启动 网关服务,启动 2 个 netty 节点

    client1 连接上 nettyServer1 ,client2 连接上 nettyServer2 

    client1 向 client2 发送信息

    nettyServer1 没有找到 client2 的信道,向 mq 中广播信息

    广播节点收到信息,在 nettyServer2 中找到channel,向channel中发送信息

    client2 收到信息

    代码传送门  netty集群

  • 相关阅读:
    字符与字符串
    借鸡下蛋:室内定位之基于众包采集的 WiFi 指纹地图
    Symbol
    05-读写锁、阻塞队列及四组API、同步队列
    Java并发编程之ReentrantLock重入锁原理解析
    C++内存管理以及模板的引入
    产品经理书籍推荐
    Maven - 客户端 Nexus 配置
    CodeTON Round 3 (Div. 1 + Div. 2, Rated, Prizes!) A~D 题解
    Stable Diffusion 生成个性图片指南
  • 原文地址:https://blog.csdn.net/a1053765496/article/details/128063700