本篇主要介绍netty如何跟Spring配合,其实真的很没必要将netty和Spring牵扯在一起,我们完全可以用netty做出一个spring的;然而在《Spring环境下使用Netty写Socket和Http详解》一篇中,因为没怎么用到Spring,遭到部分网友质疑,因此这一篇着重介绍如何跟Spring做配合。
Netty是目前最流行的由JBOSS提供的一个Java开源框架NIO框架,Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。
相比JDK原生NIO,Netty提供了相对十分简单易用的API,非常适合网络编程。Netty是完全基于NIO实现的,所以Netty是异步的。
Mina同样也是一款优秀的NIO框架,而且跟Netty是出自同一个人之手,但是Netty要晚一点,优点更多一些,想了解更多可以直接搜索mina和netty比较。
使用Netty,我们可以作为Socket服务器,也可以用来做Http服务器,这里,我们将这两种方式都详细介绍一下。
Git地址:
Gitee
首发地址:
品茗IT-首发
品茗IT 提供在线支持:
如果大家正在寻找一个java的学习环境,或者在开发中遇到困难,可以加入我们的java学习圈,点击即可加入,共同学习,节约学习时间,减少很多在学习中遇到的难题。
本文假设你已经引入Spring必备的一切了,已经是个Spring mvc项目了,如果不会搭建,可以打开这篇文章看一看《Spring和Spring Mvc 5整合详解》。
我们假定你已经建好了Spring环境。这里只引入Netty的jar包。
io.netty
netty-all
4.1.17.Final
完整依赖:
4.0.0
cn.pomit
SpringWork
0.0.1-SNAPSHOT
Netty
jar
Netty
http://maven.apache.org
io.netty
netty-all
4.1.17.Final
org.springframework
spring-core
org.springframework
spring-context
Netty
父pom管理了所有依赖jar包的版本,地址:
https://www.pomit.cn/spring/SpringWork/pom.xml
我们写一个tcp的server,作为线程运行。
NettyServer :
package cn.pomit.springwork.netty.server;
import org.springframework.beans.factory.annotation.Autowired;
import cn.pomit.springwork.netty.config.NettyServerInitializer;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
public class NettyServer implements Runnable {
private int port;
private Channel channel;
private EventLoopGroup bossGroup;
private EventLoopGroup workerGroup;
private Thread nserver;
@Autowired
NettyServerInitializer nettyServerInitializer;
public void init() {
nserver = new Thread(this);
nserver.start();
}
public void destory() {
System.out.println("destroy server resources");
if (null == channel) {
System.out.println("server channel is null");
}
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
channel.closeFuture().syncUninterruptibly();
bossGroup = null;
workerGroup = null;
channel = null;
}
public int getPort() {
return port;
}
public void setPort(int port) {
this.port = port;
}
@Override
public void run() {
bossGroup = new NioEventLoopGroup();
workerGroup = new NioEventLoopGroup();
System.out.println(Thread.currentThread().getName() + "----位置4");
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup);
b.channel(NioServerSocketChannel.class);
b.childHandler(nettyServerInitializer);
// 服务器绑定端口监听
ChannelFuture f = b.bind(port).sync();
// 监听服务器关闭监听
f.channel().closeFuture().sync();
channel = f.channel();
// 可以简写为
/* b.bind(portNumber).sync().channel().closeFuture().sync(); */
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
代码贴出来之后,我们还是要讲下里面的一些重点:
bossGroup和workerGroup就是为了建立线程池,这个自行百度。
以上使用了
f.channel().closeFuture().sync();
这部分代码是阻塞了当前主线程一直等待结束,后面的代码就不能执行了。
两次sync的不同:两次sync虽然都是针对ChannelFuture的,但是两次的ChannelFuture不一样,
原理就不细说了,我也不知道,毕竟不是专精这方面的。sync换成await是一样的。
这里面NettyServerInitializer使用Spring的bean注入功能。
启动NettyServer。
classpath:netty.properties
配置文件netty.properties:
netty_port=4444
只是@Service注解和@PostConstruct注解的运用而已。
package cn.pomit.springwork.netty.server;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import cn.pomit.springwork.netty.config.NettyServerInitializer;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
@Service
public class NettyServer implements Runnable {
private int port;
private Channel channel;
private EventLoopGroup bossGroup;
private EventLoopGroup workerGroup;
private Thread nserver;
@Autowired
NettyServerInitializer nettyServerInitializer;
@PostConstruct
public void init() {
nserver = new Thread(this);
nserver.start();
}
@PreDestroy
public void destory() {
System.out.println("destroy server resources");
if (null == channel) {
System.out.println("server channel is null");
}
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
channel.closeFuture().syncUninterruptibly();
bossGroup = null;
workerGroup = null;
channel = null;
}
public int getPort() {
return port;
}
public void setPort(int port) {
this.port = port;
}
@Override
public void run() {
bossGroup = new NioEventLoopGroup();
workerGroup = new NioEventLoopGroup();
System.out.println(Thread.currentThread().getName() + "----位置4");
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup);
b.channel(NioServerSocketChannel.class);
b.childHandler(nettyServerInitializer);
// 服务器绑定端口监听
ChannelFuture f = b.bind(port).sync();
// 监听服务器关闭监听
f.channel().closeFuture().sync();
channel = f.channel();
// 可以简写为
/* b.bind(portNumber).sync().channel().closeFuture().sync(); */
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
NettyServerInitializer是对NettyServer的handler初始化配置。可以作为Spring的bean处理,所以这里用了@Component将NettyServerInitializer声明为Spring的bean.
NettyServerInitializer:
package cn.pomit.springwork.netty.config;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.Delimiters;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
@Component
public class NettyServerInitializer extends ChannelInitializer {
@Autowired
NettyServerHandler nettyServerHandler;
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// 以("
")为结尾分割的 解码器
pipeline.addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
// 字符串解码 和 编码
pipeline.addLast("decoder", new StringDecoder());
pipeline.addLast("encoder", new StringEncoder());
System.out.println(Thread.currentThread().getName() + "----位置5");
// 自己的逻辑Handler
pipeline.addLast("handler", nettyServerHandler);
}
}
netty的ChannelHandler是不能共享的。是一个线程一个的,如果之间将ChannelHandler作为Spring的bean管理,是会报错的。
因此,需要用@Sharable注解对ChannelHandler做声明,然后再由Spring进行管理。
NettyServerHandler:
package cn.pomit.springwork.netty.config;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;
import cn.pomit.springwork.netty.handler.Handler;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
@Component
@Sharable
public class NettyServerHandler extends SimpleChannelInboundHandler{
@Autowired(required = false)
@Qualifier("closeFutureHandler")
public Handler closeFutureHandler;
@Autowired(required = false)
@Qualifier("exceptionFutureHandler")
public Handler exceptionFutureHandler;
@Autowired
@Qualifier("bussinessFutureHandler")
public Handler bussinessFutureHandler;
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
// System.out.println(((HandlerServiceImp) exportServiceMap.get("helloWorldService")).test());
// // 返回客户端消息 - 我已经接收到了你的消息
System.out.println(Thread.currentThread().getName()+"----位置6");
// handlerService.handle(msg);
String retMsg = bussinessFutureHandler.hander(msg);
ctx.writeAndFlush(retMsg);
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
System.out.println(ctx.channel().remoteAddress() + " channelRegistered " );
super.channelRegistered(ctx);
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
System.out.println(ctx.channel().remoteAddress() + " channelUnregistered " );
super.channelUnregistered(ctx);
if(closeFutureHandler !=null){
closeFutureHandler.hander(ctx.name());
}
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println(ctx.channel().remoteAddress() + " channelActive " );
super.channelActive(ctx);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println(ctx.channel().remoteAddress() + " channelInactive " );
super.channelInactive(ctx);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println(ctx.channel().remoteAddress() + " exceptionCaught :" + cause.getMessage() );
super.exceptionCaught(ctx, cause);
if(exceptionFutureHandler !=null){
exceptionFutureHandler.hander(cause.getMessage());
}
}
}
这里面使用了上面xml中定义的三个业务处理Handler。
Handler :
package cn.pomit.springwork.netty.handler;
public interface Handler {
public static String COMMONRET="200";
public String hander(String msg);
}
BussinessFutureHandler:
package cn.pomit.springwork.netty.handler;
public class BussinessFutureHandler implements Handler {
private Handler nextHandler;
public Handler getNextHandler() {
return nextHandler;
}
public void setNextHandler(Handler nextHandler) {
this.nextHandler = nextHandler;
}
@Override
public String hander(String msg) {
System.out.println("接收到信息:" + msg);
if (nextHandler != null) {
nextHandler.hander(msg);
}
return msg;
}
}
CloseFutureHandler:
package cn.pomit.springwork.netty.handler;
public class CloseFutureHandler implements Handler {
private Handler nextHandler;
public Handler getNextHandler() {
return nextHandler;
}
public void setNextHandler(Handler nextHandler) {
this.nextHandler = nextHandler;
}
@Override
public String hander(String msg) {
System.out.println(msg + "正在关闭。");
if (nextHandler != null) {
nextHandler.hander(msg);
}
return COMMONRET;
}
}
ExceptionFutureHandler:
package cn.pomit.springwork.netty.handler;
public class ExceptionFutureHandler implements Handler {
private Handler nextHandler;
public Handler getNextHandler() {
return nextHandler;
}
public void setNextHandler(Handler nextHandler) {
this.nextHandler = nextHandler;
}
@Override
public String hander(String msg) {
System.out.println("出现异常,异常信息:" + msg);
if (nextHandler != null) {
nextHandler.hander(msg);
}
return COMMONRET;
}
}
至此,Spring整合netty简单的处理方式就完成了。
喜欢这篇文章么,喜欢就加入我们一起讨论Spring技术吧!