Netty 是由 JBOSS 提供的一个 Java 开源框架,所以在使用得时候首先得导入Netty的maven坐标
- <dependency>
- <groupId>io.nettygroupId>
- <artifactId>netty-allartifactId>
- <version>4.1.42.Finalversion>
- dependency>
- package com.lagou.demo;
-
- import io.netty.bootstrap.ServerBootstrap;
- import io.netty.channel.*;
- import io.netty.channel.nio.NioEventLoopGroup;
- import io.netty.channel.socket.SocketChannel;
- import io.netty.channel.socket.nio.NioServerSocketChannel;
-
- /**
- * Netty服务端
- */
- public class NettyServer {
- public static void main(String[] args) throws InterruptedException {
- // 1. 创建bossGroup线程组: 处理网络事件--连接事件
- EventLoopGroup bossGroup = new NioEventLoopGroup(1);
- // 2. 创建workerGroup线程组: 处理网络事件--读写事件 2*处理器线程数
- EventLoopGroup workerGroup = new NioEventLoopGroup(2);
- // 3. 创建服务端启动助手
- ServerBootstrap serverBootstrap = new ServerBootstrap();
- // 4. 设置bossGroup线程组和workerGroup线程组
- serverBootstrap.group(bossGroup, workerGroup)
- .channel(NioServerSocketChannel.class) // 5. 设置服务端通道实现为NIO
- .option(ChannelOption.SO_BACKLOG, 128) // 6. 参数设置
- .childOption(ChannelOption.SO_KEEPALIVE, Boolean.TRUE) // 6. 参数设置
- .childHandler(new ChannelInitializer
() {// 7. 创建一个通道初始化对象 - @Override
- protected void initChannel(SocketChannel ch) throws Exception {
- // 8. 向pipeline中添加自定义业务处理handler
- ch.pipeline().addLast(new NettyServerHandler());
- }
- });
-
- // 9. 启动服务端并绑定端口,同时将异步改为同步
- // ChannelFuture future = serverBootstrap.bind(9999).sync();
- ChannelFuture future = serverBootstrap.bind(9999);
- future.addListener(new ChannelFutureListener() {
- @Override
- public void operationComplete(ChannelFuture future) throws Exception {
- if (future.isSuccess()) {
- System.out.println("端口绑定成功");
- } else {
- System.out.println("端口绑定失败");
- }
- }
- });
- System.out.println("服务端启动成功");
- // 10. 关闭通道(并不是真正意义上的关闭,而是监听通道关闭的状态)和关闭连接池
- future.channel().closeFuture().sync();
- bossGroup.shutdownGracefully();
- workerGroup.shutdownGracefully();
- }
- }
- package com.lagou.demo;
-
- import io.netty.buffer.ByteBuf;
- import io.netty.buffer.Unpooled;
- import io.netty.channel.ChannelHandlerContext;
- import io.netty.channel.ChannelInboundHandler;
- import io.netty.util.CharsetUtil;
-
- /**
- * 自定义处理handler
- */
- public class NettyServerHandler implements ChannelInboundHandler {
-
- /**
- * 通道读取事件
- *
- * @param ctx
- * @param msg
- * @throws Exception
- */
- @Override
- public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
- ByteBuf byteBuf = (ByteBuf) msg;
- System.out.println("客户端发送过来的消息:" + byteBuf.toString(CharsetUtil.UTF_8));
- }
-
- /**
- * 通道读取完毕事件
- *
- * @param ctx
- * @throws Exception
- */
- @Override
- public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
- ctx.writeAndFlush(Unpooled.copiedBuffer("你好,我是Netty服务端", CharsetUtil.UTF_8)); // 消息出站
-
- }
-
- /**
- * 通道异常事件
- * @param ctx
- * @param cause
- * @throws Exception
- */
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
- cause.printStackTrace();
- ctx.close();
- }
-
- @Override
- public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
-
- }
-
- @Override
- public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
-
- }
-
- @Override
- public void channelActive(ChannelHandlerContext ctx) throws Exception {
-
- }
-
- @Override
- public void channelInactive(ChannelHandlerContext ctx) throws Exception {
-
- }
-
- @Override
- public void userEventTriggered(ChannelHandlerContext ctx, Object msg) throws Exception {
-
- }
-
- @Override
- public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
-
- }
-
- @Override
- public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
-
- }
-
- @Override
- public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
-
- }
- }
客户端实现步骤:
- package com.lagou.demo;
-
- import io.netty.bootstrap.Bootstrap;
- import io.netty.channel.ChannelFuture;
- import io.netty.channel.ChannelInitializer;
- import io.netty.channel.EventLoopGroup;
- import io.netty.channel.nio.NioEventLoopGroup;
- import io.netty.channel.socket.SocketChannel;
- import io.netty.channel.socket.nio.NioSocketChannel;
-
- /**
- * Netty客户端
- */
- public class NettyClient {
- public static void main(String[] args) throws InterruptedException {
- // 1. 创建线程组
- EventLoopGroup group = new NioEventLoopGroup();
- // 2. 创建客户端启动助手
- Bootstrap bootstrap = new Bootstrap();
- // 3. 设置线程组
- bootstrap.group(group)
- .channel(NioSocketChannel.class) // 4. 设置客户端通道实现为NIO
- .handler(new ChannelInitializer
() { // 5. 创建一个通道初始化对象 - @Override
- protected void initChannel(SocketChannel ch) throws Exception {
- // 6. 向pipeline中添加自定义业务处理handler
- ch.pipeline().addLast(new NettyClientHandler());
- }
- });
- // 7. 启动客户端,等待连接服务端,同时将异步改为同步
- ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 9999).sync();
- // 8. 关闭通道和关闭连接池
- channelFuture.channel().closeFuture().sync();
- group.shutdownGracefully();
- }
- }
- package com.lagou.demo;
-
- import io.netty.buffer.ByteBuf;
- import io.netty.buffer.Unpooled;
- import io.netty.channel.ChannelFuture;
- import io.netty.channel.ChannelFutureListener;
- import io.netty.channel.ChannelHandlerContext;
- import io.netty.channel.ChannelInboundHandler;
- import io.netty.util.CharsetUtil;
-
- /**
- * 客户端处理类
- */
- public class NettyClientHandler implements ChannelInboundHandler {
- /**
- * 通道就绪事件
- *
- * @param ctx
- * @throws Exception
- */
- @Override
- public void channelActive(ChannelHandlerContext ctx) throws Exception {
- // ctx.writeAndFlush(Unpooled.copiedBuffer("你好呀,我是Netty客户端", CharsetUtil.UTF_8));
- ChannelFuture future = ctx.writeAndFlush(Unpooled.copiedBuffer("你好呀,我是Netty客户端", CharsetUtil.UTF_8));
- future.addListener(new ChannelFutureListener() {
- @Override
- public void operationComplete(ChannelFuture future) throws Exception {
- if (future.isSuccess()) {
- System.out.println("数据发送成功");
- }else {
- System.out.println("数据发送失败");
- }
- }
- });
- }
-
- /**
- * 通道读就绪事件
- *
- * @param ctx
- * @param msg
- * @throws Exception
- */
- @Override
- public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
- ByteBuf byteBuf = (ByteBuf) msg;
- System.out.println("服务端发送的消息:" + byteBuf.toString(CharsetUtil.UTF_8));
- }
-
- @Override
- public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
-
- }
-
- @Override
- public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
-
- }
-
- @Override
- public void channelInactive(ChannelHandlerContext ctx) throws Exception {
-
- }
-
- @Override
- public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
-
- }
-
- @Override
- public void userEventTriggered(ChannelHandlerContext ctx, Object msg) throws Exception {
-
- }
-
- @Override
- public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
-
- }
-
- @Override
- public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
-
- }
-
- @Override
- public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
-
- }
-
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable throwable) throws Exception {
-
- }
- }
异步的概念和同步相对。当一个异步过程调用发出后,调用者不能立刻得到结果。实际处理这个调用的组件在完成后,通过状态、通知和回调来通知调用者。

Netty 中的 I/O 操作是异步的,包括 Bind、Write、Connect 等操作会简单的返回一个ChannelFuture。调用者并不能立刻获得结果,而是通过 Future-Listener 机制,用户可以方便的主动获取或者通过通知机制获得IO 操作结果. Netty 的异步模型是建立在 future 和 callback 的之上的。callback 就是回调。重点说 Future,它的核心思想是:假设一个方法 fun,计算过程可能非常耗时,等待 fun 返回显然不合适。那么可以在调用 fun 的时候,立马返回一个 Future,后续可以通过 Future 去监控方法 fun 的处理过程(即 : Future-Listener 机制)
表示异步的执行结果, 可以通过它提供的方法来检测执行是否完成,ChannelFuture 是他的一个子接口. ChannelFuture 是一个接口 ,可以添加监听器,当监听的事件发生时,就会通知到监听器
当 Future 对象刚刚创建时,处于非完成状态,调用者可以通过返回的 ChannelFuture 来获取操作执行的状态, 注册监听函数来执行完成后的操作。
常用方法有:
| 方法名 | 说明 |
| sync | 阻塞等待程序结果返回 |
| isDone | 来判断当前操作是否完成 |
| isSuccess | 来判断已完成的当前操作是否成功 |
| getCause | 来获取已完成的当前操作失败的原因 |
| isCancelled | 来判断已完成的当前操作是否被取消 |
| addListener | 来注册监听器,当操作已完成(isDone 方法返回完成),将会通知指定的监听器;如果Future 对象已完成,则通知指定的监听器 |
给Future添加监听器,监听操作结果
代码实现:
- // 服务端部分代码
- ChannelFuture future = serverBootstrap.bind(9999);
- future.addListener(new ChannelFutureListener() {
- @Override
- public void operationComplete(ChannelFuture future) throws Exception {
- if (future.isSuccess()) {
- System.out.println("端口绑定成功");
- } else {
- System.out.println("端口绑定失败");
- }
- }
- });
-
-
- // 客户端部分代码
- ChannelFuture future = ctx.writeAndFlush(Unpooled.copiedBuffer("你好呀,我是Netty客户端", CharsetUtil.UTF_8));
- future.addListener(new ChannelFutureListener() {
- @Override
- public void operationComplete(ChannelFuture future) throws Exception {
- if (future.isSuccess()) {
- System.out.println("数据发送成功");
- }else {
- System.out.println("数据发送失败");
- }
- }
- });