• Netty入门案例 与 Netty异步模型


    Netty入门案例

    Netty 是由 JBOSS 提供的一个 Java 开源框架,所以在使用得时候首先得导入Netty的maven坐标

    1. <dependency>
    2. <groupId>io.nettygroupId>
    3. <artifactId>netty-allartifactId>
    4. <version>4.1.42.Finalversion>
    5. dependency>

    1、Netty服务端编写

    服务端实现步骤:

    1. 创建bossGroup线程组: 处理网络事件--连接事件
    2. 创建workerGroup线程组: 处理网络事件--读写事件
    3. 创建服务端启动助手
    4. 设置bossGroup线程组和workerGroup线程组
    5. 设置服务端通道实现为NIO
    6. 参数设置
    7. 创建一个通道初始化对象
    8. 向pipeline中添加自定义业务处理handler
    9. 启动服务端并绑定端口,同时将异步改为同步
    10. 关闭通道和关闭连接池

    代码实现:

    1. package com.lagou.demo;
    2. import io.netty.bootstrap.ServerBootstrap;
    3. import io.netty.channel.*;
    4. import io.netty.channel.nio.NioEventLoopGroup;
    5. import io.netty.channel.socket.SocketChannel;
    6. import io.netty.channel.socket.nio.NioServerSocketChannel;
    7. /**
    8. * Netty服务端
    9. */
    10. public class NettyServer {
    11. public static void main(String[] args) throws InterruptedException {
    12. // 1. 创建bossGroup线程组: 处理网络事件--连接事件
    13. EventLoopGroup bossGroup = new NioEventLoopGroup(1);
    14. // 2. 创建workerGroup线程组: 处理网络事件--读写事件 2*处理器线程数
    15. EventLoopGroup workerGroup = new NioEventLoopGroup(2);
    16. // 3. 创建服务端启动助手
    17. ServerBootstrap serverBootstrap = new ServerBootstrap();
    18. // 4. 设置bossGroup线程组和workerGroup线程组
    19. serverBootstrap.group(bossGroup, workerGroup)
    20. .channel(NioServerSocketChannel.class) // 5. 设置服务端通道实现为NIO
    21. .option(ChannelOption.SO_BACKLOG, 128) // 6. 参数设置
    22. .childOption(ChannelOption.SO_KEEPALIVE, Boolean.TRUE) // 6. 参数设置
    23. .childHandler(new ChannelInitializer() {// 7. 创建一个通道初始化对象
    24. @Override
    25. protected void initChannel(SocketChannel ch) throws Exception {
    26. // 8. 向pipeline中添加自定义业务处理handler
    27. ch.pipeline().addLast(new NettyServerHandler());
    28. }
    29. });
    30. // 9. 启动服务端并绑定端口,同时将异步改为同步
    31. // ChannelFuture future = serverBootstrap.bind(9999).sync();
    32. ChannelFuture future = serverBootstrap.bind(9999);
    33. future.addListener(new ChannelFutureListener() {
    34. @Override
    35. public void operationComplete(ChannelFuture future) throws Exception {
    36. if (future.isSuccess()) {
    37. System.out.println("端口绑定成功");
    38. } else {
    39. System.out.println("端口绑定失败");
    40. }
    41. }
    42. });
    43. System.out.println("服务端启动成功");
    44. // 10. 关闭通道(并不是真正意义上的关闭,而是监听通道关闭的状态)和关闭连接池
    45. future.channel().closeFuture().sync();
    46. bossGroup.shutdownGracefully();
    47. workerGroup.shutdownGracefully();
    48. }
    49. }

    自定义服务端handle

    1. package com.lagou.demo;
    2. import io.netty.buffer.ByteBuf;
    3. import io.netty.buffer.Unpooled;
    4. import io.netty.channel.ChannelHandlerContext;
    5. import io.netty.channel.ChannelInboundHandler;
    6. import io.netty.util.CharsetUtil;
    7. /**
    8. * 自定义处理handler
    9. */
    10. public class NettyServerHandler implements ChannelInboundHandler {
    11. /**
    12. * 通道读取事件
    13. *
    14. * @param ctx
    15. * @param msg
    16. * @throws Exception
    17. */
    18. @Override
    19. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    20. ByteBuf byteBuf = (ByteBuf) msg;
    21. System.out.println("客户端发送过来的消息:" + byteBuf.toString(CharsetUtil.UTF_8));
    22. }
    23. /**
    24. * 通道读取完毕事件
    25. *
    26. * @param ctx
    27. * @throws Exception
    28. */
    29. @Override
    30. public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
    31. ctx.writeAndFlush(Unpooled.copiedBuffer("你好,我是Netty服务端", CharsetUtil.UTF_8)); // 消息出站
    32. }
    33. /**
    34. * 通道异常事件
    35. * @param ctx
    36. * @param cause
    37. * @throws Exception
    38. */
    39. @Override
    40. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    41. cause.printStackTrace();
    42. ctx.close();
    43. }
    44. @Override
    45. public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
    46. }
    47. @Override
    48. public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
    49. }
    50. @Override
    51. public void channelActive(ChannelHandlerContext ctx) throws Exception {
    52. }
    53. @Override
    54. public void channelInactive(ChannelHandlerContext ctx) throws Exception {
    55. }
    56. @Override
    57. public void userEventTriggered(ChannelHandlerContext ctx, Object msg) throws Exception {
    58. }
    59. @Override
    60. public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
    61. }
    62. @Override
    63. public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
    64. }
    65. @Override
    66. public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
    67. }
    68. }

    2、Netty客户端编写

    客户端实现步骤:

    1. 创建线程组
    2. 创建客户端启动助手
    3. 设置线程组
    4. 设置客户端通道实现为NIO
    5. 创建一个通道初始化对象
    6. 向pipeline中添加自定义业务处理handler
    7. 启动客户端,等待连接服务端,同时将异步改为同步
    8. 关闭通道和关闭连接池

    代码实现:

    1. package com.lagou.demo;
    2. import io.netty.bootstrap.Bootstrap;
    3. import io.netty.channel.ChannelFuture;
    4. import io.netty.channel.ChannelInitializer;
    5. import io.netty.channel.EventLoopGroup;
    6. import io.netty.channel.nio.NioEventLoopGroup;
    7. import io.netty.channel.socket.SocketChannel;
    8. import io.netty.channel.socket.nio.NioSocketChannel;
    9. /**
    10. * Netty客户端
    11. */
    12. public class NettyClient {
    13. public static void main(String[] args) throws InterruptedException {
    14. // 1. 创建线程组
    15. EventLoopGroup group = new NioEventLoopGroup();
    16. // 2. 创建客户端启动助手
    17. Bootstrap bootstrap = new Bootstrap();
    18. // 3. 设置线程组
    19. bootstrap.group(group)
    20. .channel(NioSocketChannel.class) // 4. 设置客户端通道实现为NIO
    21. .handler(new ChannelInitializer() { // 5. 创建一个通道初始化对象
    22. @Override
    23. protected void initChannel(SocketChannel ch) throws Exception {
    24. // 6. 向pipeline中添加自定义业务处理handler
    25. ch.pipeline().addLast(new NettyClientHandler());
    26. }
    27. });
    28. // 7. 启动客户端,等待连接服务端,同时将异步改为同步
    29. ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 9999).sync();
    30. // 8. 关闭通道和关闭连接池
    31. channelFuture.channel().closeFuture().sync();
    32. group.shutdownGracefully();
    33. }
    34. }

    自定义客户端handle

    1. package com.lagou.demo;
    2. import io.netty.buffer.ByteBuf;
    3. import io.netty.buffer.Unpooled;
    4. import io.netty.channel.ChannelFuture;
    5. import io.netty.channel.ChannelFutureListener;
    6. import io.netty.channel.ChannelHandlerContext;
    7. import io.netty.channel.ChannelInboundHandler;
    8. import io.netty.util.CharsetUtil;
    9. /**
    10. * 客户端处理类
    11. */
    12. public class NettyClientHandler implements ChannelInboundHandler {
    13. /**
    14. * 通道就绪事件
    15. *
    16. * @param ctx
    17. * @throws Exception
    18. */
    19. @Override
    20. public void channelActive(ChannelHandlerContext ctx) throws Exception {
    21. // ctx.writeAndFlush(Unpooled.copiedBuffer("你好呀,我是Netty客户端", CharsetUtil.UTF_8));
    22. ChannelFuture future = ctx.writeAndFlush(Unpooled.copiedBuffer("你好呀,我是Netty客户端", CharsetUtil.UTF_8));
    23. future.addListener(new ChannelFutureListener() {
    24. @Override
    25. public void operationComplete(ChannelFuture future) throws Exception {
    26. if (future.isSuccess()) {
    27. System.out.println("数据发送成功");
    28. }else {
    29. System.out.println("数据发送失败");
    30. }
    31. }
    32. });
    33. }
    34. /**
    35. * 通道读就绪事件
    36. *
    37. * @param ctx
    38. * @param msg
    39. * @throws Exception
    40. */
    41. @Override
    42. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    43. ByteBuf byteBuf = (ByteBuf) msg;
    44. System.out.println("服务端发送的消息:" + byteBuf.toString(CharsetUtil.UTF_8));
    45. }
    46. @Override
    47. public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
    48. }
    49. @Override
    50. public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
    51. }
    52. @Override
    53. public void channelInactive(ChannelHandlerContext ctx) throws Exception {
    54. }
    55. @Override
    56. public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
    57. }
    58. @Override
    59. public void userEventTriggered(ChannelHandlerContext ctx, Object msg) throws Exception {
    60. }
    61. @Override
    62. public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
    63. }
    64. @Override
    65. public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
    66. }
    67. @Override
    68. public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
    69. }
    70. @Override
    71. public void exceptionCaught(ChannelHandlerContext ctx, Throwable throwable) throws Exception {
    72. }
    73. }

    Netty异步模型

    1、基本介绍

            异步的概念和同步相对。当一个异步过程调用发出后,调用者不能立刻得到结果。实际处理这个调用的组件在完成后,通过状态、通知和回调来通知调用者。

            Netty 中的 I/O 操作是异步的,包括 Bind、Write、Connect 等操作会简单的返回一个ChannelFuture。调用者并不能立刻获得结果,而是通过 Future-Listener 机制,用户可以方便的主动获取或者通过通知机制获得IO 操作结果. Netty 的异步模型是建立在 future 和 callback 的之上的。callback 就是回调。重点说 Future,它的核心思想是:假设一个方法 fun,计算过程可能非常耗时,等待 fun 返回显然不合适。那么可以在调用 fun 的时候,立马返回一个 Future,后续可以通过 Future 去监控方法 fun 的处理过程(即 : Future-Listener 机制) 

    2、Future 和Future-Listener

    Future

            表示异步的执行结果, 可以通过它提供的方法来检测执行是否完成,ChannelFuture 是他的一个子接口. ChannelFuture 是一个接口 ,可以添加监听器,当监听的事件发生时,就会通知到监听器

            当 Future 对象刚刚创建时,处于非完成状态,调用者可以通过返回的 ChannelFuture 来获取操作执行的状态, 注册监听函数来执行完成后的操作。

    常用方法有:

    方法名说明
    sync阻塞等待程序结果返回
    isDone来判断当前操作是否完成
    isSuccess来判断已完成的当前操作是否成功
    getCause来获取已完成的当前操作失败的原因
    isCancelled来判断已完成的当前操作是否被取消
    addListener来注册监听器,当操作已完成(isDone 方法返回完成),将会通知指定的监听器;如果Future 对象已完成,则通知指定的监听器

    Future-Listener 机制

            给Future添加监听器,监听操作结果

    代码实现:

    1. // 服务端部分代码
    2. ChannelFuture future = serverBootstrap.bind(9999);
    3. future.addListener(new ChannelFutureListener() {
    4. @Override
    5. public void operationComplete(ChannelFuture future) throws Exception {
    6. if (future.isSuccess()) {
    7. System.out.println("端口绑定成功");
    8. } else {
    9. System.out.println("端口绑定失败");
    10. }
    11. }
    12. });
    13. // 客户端部分代码
    14. ChannelFuture future = ctx.writeAndFlush(Unpooled.copiedBuffer("你好呀,我是Netty客户端", CharsetUtil.UTF_8));
    15. future.addListener(new ChannelFutureListener() {
    16. @Override
    17. public void operationComplete(ChannelFuture future) throws Exception {
    18. if (future.isSuccess()) {
    19. System.out.println("数据发送成功");
    20. }else {
    21. System.out.println("数据发送失败");
    22. }
    23. }
    24. });
  • 相关阅读:
    Vue3+NodeJS 接入文心一言, 发布一个 VSCode 大模型问答插件
    云智慧联合北航提出智能运维(AIOps)大语言模型及评测基准
    当你使用ChatGPT时,选择合适的提示(prompt)是引导对话方向的关键
    [设计模式] 静态代理居然能解决这种问题,我惊讶了!
    如何理解Java中眼花缭乱的各种并发锁?
    C++中的多态和虚函数及多态原理
    Spring MVC 十一:中文乱码
    windows 深度学习环境部署
    使用 ES 实现疫情地图或者外卖点餐功能(含代码及数据)
    2024年特种设备作业人员考试题库及答案(流动式起重机Q2)
  • 原文地址:https://blog.csdn.net/weixin_52851967/article/details/126107209