Netty is an asynchronous event-driven network application framework
for rapid development of maintainable high performance protocol servers & clients.
Netty是由JBOSS提供的一个java开源框架,是一个异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端;
Netty 是对Java的高级网络编程的封装,隐藏了Java背后的复杂性,提供了一个易于使用的 API 客户端/服务器框架;底层是JDK里面的java.net.、java.nio.、java.util.concurrent.* 包下的封装;
官网:https://netty.io/
Github:https://github.com/netty/netty
Dubbo、RocketMQ、Spark、ElasticSearch、Cassandra、Flink、gPRC、xxl-job、lettuce、redission等等都使用了Netty;
请求 - 响应
客户端 - 服务端
<!-- netty-all -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.51.Final</version>
</dependency>
Netty 实现一个服务端需要两个核心步骤:
1、一个服务端 handler:这个组件实现了服务端的业务逻辑,决定了连接创建后和接收到信息后该如何处理;
2、ServerBootstrap: 这个是配置服务端的启动代码,最少需要设置服务器绑定的端口,用来监听连接请求;
public class EchoServer {
public static final int PORT = 1234;
public static void main(String[] args) {
EchoServer echoServer = new EchoServer();
echoServer.run();
}
/**
* web程序下,在webListener、servlet.init 可以调一下该方法,那么netty服务端就启动起来了
*/
public void run() {
final EchoServerHandler echoServerHandler = new EchoServerHandler();
final EchoServerHandler2 echoServerHandler2 = new EchoServerHandler2();
//1、创建一个线程池
EventLoopGroup boosLoopGroup = new NioEventLoopGroup();
EventLoopGroup workLoopGroup = new NioEventLoopGroup();
try {
//2、启动引导类
ServerBootstrap bootstrap = new ServerBootstrap();
//3、给启动引导类做一些配置:
// - NioServerSocketChannel
// - childHandler
// - bind端口
bootstrap.group(boosLoopGroup, workLoopGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.option(ChannelOption.SO_REUSEADDR, true)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
System.out.println("服务端启动中.....");
}
})
.childOption(ChannelOption.SO_REUSEADDR, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline channelPipeline = socketChannel.pipeline();
//channelPipeline.addLast(new LoggingHandler(LogLevel.INFO));
//netty的粘包和拆包
channelPipeline.addLast(new LengthFieldBasedFrameDecoder(65535, 0, 4, 0, 4));
channelPipeline.addLast(new LengthFieldPrepender(4));
channelPipeline.addLast(new StringEncoder());
channelPipeline.addLast(new StringDecoder());
channelPipeline.addLast(echoServerHandler);
channelPipeline.addLast(echoServerHandler2);
}
});
//4、绑定一个端口,返回未来的通道
ChannelFuture channelFuture = bootstrap.bind(PORT).sync();
//5、当channel被关闭的时候会通知此处关闭chanel(closeFuture方法)
channelFuture.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
Future future = boosLoopGroup.shutdownGracefully();
workLoopGroup.shutdownGracefully();
//阻塞等待
future.syncUninterruptibly();
System.out.println("1111111111");
}
}
}
ChannelHandler将在某些事件中触发调用,可以实现ChannelInboundHandler 接口或者继承ChannelInboundHandlerAdapter
类,用来定义处理入站事件的方法。ChannelInboundHandlerAdapter这个类 提供了默认ChannelInboundHandler 的实现,所以只需要覆盖下面的方法:
channelRead() - 每个信息入站都会调用;
channelReadComplete() - 通知处理器最后的 channelread() 是当前批处理中的最后一条消息时调用;
exceptionCaught()- 读操作时捕获到异常时调用;
/**
* 服务端的handler,用于业务处理
*
* 基于事件的异步通信框架
*
*/
@ChannelHandler.Sharable
public class EchoServerHandler2 extends ChannelInboundHandlerAdapter {
public EchoServerHandler2() {
System.out.println("EchoServerHandler2构造方法执行.......");
}
/**
* 通道是活跃的,有数据过来了,触发该方法
*
* @param ctx
* @param msg
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("channelRead数据读取方法执行2.......");
// msg 这个就是客户端发送过来的数据: ByteBuf包装的
ByteBuf byteBuf = (ByteBuf) msg;
String message = byteBuf.toString(CharsetUtil.UTF_8);
System.out.println("接收到的消息2:" + message);
//也可以根据实际情况然后给客户端一个响应,比如把接收到的数据原封不动地写回去
ctx.writeAndFlush(msg);
}
/**
* 数据读取完的方法
* @param ctx
* @throws Exception
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelReadComplete2方法执行.......");
//数据读完了,就把通道关闭
//ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
}
/**
* 发生异常,触发该方法
*
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("exceptionCaught方法执行2.......");
}
}
ServerBootstrap引导服务器启动服务端;
监听和接收进来的连接请求;
配置 Channel来通知一个入站消息的 EchoServerHandler 实例;
public class EchoClient {
public static final String IP = "127.0.0.1";
public static final int PORT = 1234;
public static void main(String[] args) {
EchoClient echoClient = new EchoClient();
echoClient.run();
}
public void run() {
//1、创建一个线程池
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
try {
//2、启动引导类 (客户端需要new Bootstrap)
Bootstrap bootstrap = new Bootstrap();
//3、给启动引导类做一些配置:
// - NioServerSocketChannel
// - childHandler
// - bind端口
bootstrap.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.option(ChannelOption.SO_REUSEADDR, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline channelPipeline = socketChannel.pipeline();
//netty的粘包和拆包
channelPipeline.addLast(new LengthFieldBasedFrameDecoder(65535, 0, 4, 0, 4));
channelPipeline.addLast(new LengthFieldPrepender(4));
channelPipeline.addLast(new StringEncoder());
channelPipeline.addLast(new StringDecoder());
channelPipeline.addLast(new EchoClientHandler());
}
});
//绑定一个端口,返回未来的通道 .bind() --> udp
ChannelFuture channelFuture = bootstrap.connect(IP, PORT).sync();
//连接之后再加option就无效了
//bootstrap.option();
//得到一个通道
Channel channel = channelFuture.channel();
//从控制台输入数据,往服务端发送
Scanner scanner = new Scanner(System.in);
for (;;) {
String line = scanner.nextLine();
if (line.equals("bye")) {
break;
}
//数据需要写到一个ByteBuf, 然后把ByteBuf写到服务端
channel.writeAndFlush(Unpooled.copiedBuffer(line, CharsetUtil.UTF_8));
}
//4、当channel被关闭的时候会通知此处关闭chanel(closeFuture方法)
channelFuture.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
eventLoopGroup.shutdownGracefully();
}
}
}
跟编写服务端一样,也是通过ChannelInboundHandler 来处理数据,也可以采用 SimpleChannelInboundHandler 或者
ChannelInboundHandlerAdapter来处理所有的任务,需要覆盖三个方法:
channelActive() - 服务器的连接被建立后调用
channelRead0() - 数据后从服务器接收到调用
exceptionCaught() - 捕获一个异常时调用
ByteBuf(Netty的字节容器)‘
/**
* 客户端的handler,用于业务处理
*
* 基于事件的异步通信框架,触发该类中方法
*
*/
public class EchoClientHandler extends ChannelInboundHandlerAdapter {
public EchoClientHandler() {
System.out.println("EchoServerHandler构造方法执行.......");
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelRegistered方法执行.......");
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelUnregistered方法执行.......");
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelActive方法执行.......");
StringBuffer stringBuffer = new StringBuffer();
for (int i=0; i<100; i++) {
stringBuffer.append("sdkfhsfbsdfsdhbfsdfjksdjkfhsdjkfhhhhhhhhhhhhhhhhhhhhhhhhhhhhh");
}
//通道被激活了,我们也可以往服务端写数据
for (int i=0; i<20; i++) {
ctx.writeAndFlush(stringBuffer.toString());
}
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelInactive方法执行.......");
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("channelRead方法执行.......");
// msg 这个就是客户端发送过来的数据: ByteBuf包装的
//ByteBuf byteBuf = (ByteBuf) msg;
String message = (String) msg;
//String message = byteBuf.toString(CharsetUtil.UTF_8);
System.out.println("客户端接收到的消息:" + message);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelReadComplete方法执行.......");
//数据读完了,就把通道关闭
//ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
System.out.println("userEventTriggered方法执行.......");
}
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelWritabilityChanged方法执行.......");
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("exceptionCaught方法执行.......");
}
@Override
protected void ensureNotSharable() {
System.out.println("ensureNotSharable方法执行.......");
}
@Override
public boolean isSharable() {
System.out.println("isSharable方法执行.......");
return super.isSharable();
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
System.out.println("handlerAdded方法执行.......");
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
System.out.println("handlerRemoved方法执行.......");
}
}
Netty 的核心组件包括:
Netty应用程序通过设置 Bootstrap类开始,该类提供了一个用于应用程序网络层配置的容器;
Bootstrap有以下两种类型:
抽象通用的引导类AbstractBootstrap,具体的客户端或服务器引导类分别是Bootstrap 和 ServerBootstrap 处理,这两个类可以进行流式编程;
其方法说明如下:
1、Bootstrap group(EventLoopGroup)
设置用于处理Channel所有事件的 EventLoopGroup
;
2、Bootstrap channel(Class extends C>)
channel()方法指定了Channel的实现类;
3、Bootstrap localAddress(SocketAddress)
指定Channel应该绑定到的本地地址,如果没有指定,则由操作系统创建一个随机的地址,也可以通过bind()或者connect()方法指定localAddress;
4、
,其将被应用到每一个新创建的Channel 的 ChannelConfig
,这些选项将会通过bind()或者connect()方法设置到Channel,这个方法在 Channel已经被创建后再调用将不会有任何的效果,支持的
ChannelOption 取决于使用的Channel类型;
5、Bootstrap handler(ChannelHandler)
设置将被添加到ChannelPipeline 以接收事件通知的ChannelHandler;
6、Bootstrap remoteAddress(SocketAddress) 设置远程地址,也可以通过 connect()方法来指定;
7、ChannelFuture connect()
连接到远程节点并返回一个ChannelFuture,其将会在连接操作完成后接收到通知;
8、ChannelFuture bind()
绑定Channel并返回一个ChannelFuture,其将会在绑定操作完成后接收到通知;
注意:
在引导的过程中,在调用bind()或者connect()方法之前,必须调用以下方法来设置所需的组件:
group();
channel();
handler();
否则将会导致IllegalStateException异常;
服务端启动引导类ServerBootStrap
1、group()方法设置ServerBootstra要用的EventLoopGroup,这个 EventLoopGroup将用于 ServerChannel 和被接受的子 Channel 的 I/O 处理;
2、Channel()方法,设置将要被实例化的ServerChannel类;
3、localAddress()方法,指定ServerChannel应该绑定到的本地地址,如果没有指定,则将由操作系统使用一个随机地址,也可以通过 bind()方法来指定该 localAddress;
4、option()方法,指定要应用到新创建的ServerChannel的ChannelConfig的ChannelOption,这些选项将会通过bind()方法设置到Channel,在bind()方法被调用之后,设置或者改变ChannelOption都不会有任何的效果,所支持的ChannelOption取决于所使用的Channel类型;
5、childOption()方法,指定当子Channel被接受时,应用到子Channel的 ChannelConfig的ChannelOption,所支持的ChannelOption取决于所使用的Channel的类型;
6、handler()方法,设置被添加到ServerChannel的ChannelPipeline中的ChannelHandler,更加常用的方法参见childHandler();
7、childHandler()方法,设置将被添加到已被接受的子Channel的 ChannelPipeline中的ChannelHandler,handler()方法和 childHandler()方法之间的区别是:前者所添加的 ChannelHandler由接受子Channel 的 ServerChannel 处理,而childHandler()方法所添加的 ChannelHandler 将由已被接受的子 Channel处理,其代表一个绑定到远程节点的套接字;
8、bind()方法, 绑定ServerChannel并且返回一个ChannelFuture,其将会在绑定操作完成后收到通知(带着成功或者失败的结果);
Channel是底层网络传输操作的接口,如读,写,连接,绑定等等,相当于一个Socket,Channel定义了与Socket 丰富的交互操作集:bind, close, config, connect, isActive, isOpen, isWritable, read, write 等等,常用的 Channel 类型:
NioSocketChannel
,异步的客户端TCP Socket连接;
NioServerSocketChannel
,异步的服务器端TCP Socket连接;
NioDatagramChannel
,异步的UDP连接;
NioSctpChannel
,异步的客户端Sctp连接;
NioSctpServerChannel
,异步的服务器端Sctp连接;
ChannelHandler由特定事件触发,ChannelHandler可专用于几乎所有的动作,包括将一个对象转为字节(或相反),执行过程中抛出的异常处理;
常用的一个接口是 ChannelInboundHandler
,这个类型接收到入站事件(包括接收到的数据)可以处理应用程序逻辑,当你需要提供响应时,你也可以从 ChannelInboundHandler
写出数据,所有业务逻辑经常存活于一个或者多个 ChannelInboundHandler
中;
实现业务处理的handler有两种方式,
@Sharable
public class DiscardHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ReferenceCountUtil.release(msg);
}
}
这种实现方式,消息接收后,正确的做法是调用
ReferenceCountUtil.release(msg);方法释放已接收的消息,避免内存溢出问题;
public class CoderClientHandler extends SimpleChannelInboundHandler
这种实现方式,消息接收后,不需要释放已接收的消息,另外这种实现方式,如果有指定的编码器和解码器,可以通过类的泛型直接得到泛型对象,不需要通过ByteBuf读取字节流;
ChannelPipeline就是 ChannelHandler链的容器;
ChannelPipeline提供了一个容器给ChannelHandler链并提供了一个API 用于管理沿着链入站和出站事件的流动,每个Channel都有自己的ChannelPipeline,ChannelHandler 是如何设置到ChannelPipeline?主要是实现了ChannelHandler的抽象 ChannelInitializer,ChannelInitializer子类通过ServerBootstrap 进行注册,当它的方法initChannel()被调用时,这个对象将设置自定义的ChannelHandler集到pipeline,当这个操作完成时,ChannelInitializer子类则从ChannelPipeline 自动删除自身;
所以ChannelPipeline保存ChannelHandler的List,用于处理或拦截 Channel 的入站事件和出站操作;
它实现了一种高级形式的拦截过滤器模式,使用户可以完全控制事件的处理方式,以及Channel中各个的ChannelHandler 如何相互交互;
EventLoop用于处理 Channel 的 I/O 操作,维护了一个线程和任务队列,支持异步提交执行任务,线程启动时会调用 NioEventLoop 的run方法,执行 I/O任务和非I/O任务:
NioEventLoopGroup
,主要管理 eventLoop 的生命周期,可以理解为一个线程池,内部维护了一组线程,每个线程(NioEventLoop)负责处理多个 Channel 上的事件,而一个Channel只对应于一个线程;
Netty 所有的 I/O 操作都是异步的,因为一个操作可能无法立即返回,我们需要有一种方法在以后确定它的结果,出于这个目的,Netty 提供了接口 ChannelFuture,它的addListener方法注册了一个ChannelFutureListener,当操作完成时,可以被通知(不管成功与否);
可以想象成一个 ChannelFuture 对象作为一个未来执行操作结果的占位符,何时执行取决于一些因素,因此不可能预测与精确,但我们可以肯定的是,它会被执行;
保存 Channel 相关的所有上下文信息,同时关联一个ChannelHandler 对象;
ctx.fireChannelRegistered();
ctx.fireChannelUnregistered();
ctx.fireChannelActive();
ctx.fireChannelInactive();
ctx.fireChannelRead(msg);
ctx.fireChannelReadComplete();
ctx.fireUserEventTriggered(evt);
ctx.fireExceptionCaught(cause);
调用EventLoopGroup.shutdownGracefully()方法,它将处理任何挂起的事件和任务,并关闭EventLoopGroup,随后释放所有活动的线程;
这个方法调用将会返回一个Future,该Future将在关闭完成时接收到通知,shutdownGracefully()方法也是一个异步的操作,可以调用阻塞等待方法直到它完成,或者向所返回的Future注册一个监听器以在关闭完成时获得通知;
Future<?> future = group.shutdownGracefully();
// block until the group has shutdown
future.syncUninterruptibly();
另外显式地关闭活动的Channel ,可以调用Channel.close()方法;
1、浏览器 --> netty服务器 (http协议、webscoket协议),不需要自定义协议(也就是说不需要自定义编码器和解码器,netty已经提供好了)
2、Netty客户端 --> Netty服务端 (可以用netty内置的编码器和解码器,也可以自己开发编码器和解码器,自己开发编码器和解码器相当于是自定义私有协议,自定义私有协议就是自己定义数据包的格式,那么就需要自己开发编码器和解码器,像Dubbo的rpc调用网络通信是自定义协议)
3、其他的netty代码开发都是模板式的,可以多看一下那些类里面的方法;