• Spring和Netty整合详解


    Spring和Netty整合详解

    本篇主要介绍netty如何跟Spring配合,其实真的很没必要将netty和Spring牵扯在一起,我们完全可以用netty做出一个spring的;然而在《Spring环境下使用Netty写Socket和Http详解》一篇中,因为没怎么用到Spring,遭到部分网友质疑,因此这一篇着重介绍如何跟Spring做配合。

    官方主页

    Spring

    Netty

    一、概述

    Netty是目前最流行的由JBOSS提供的一个Java开源框架NIO框架,Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。

    相比JDK原生NIO,Netty提供了相对十分简单易用的API,非常适合网络编程。Netty是完全基于NIO实现的,所以Netty是异步的。

    Mina同样也是一款优秀的NIO框架,而且跟Netty是出自同一个人之手,但是Netty要晚一点,优点更多一些,想了解更多可以直接搜索mina和netty比较。

    使用Netty,我们可以作为Socket服务器,也可以用来做Http服务器,这里,我们将这两种方式都详细介绍一下。

    Git地址:
    Gitee

    首发地址:
    品茗IT-首发

    品茗IT 提供在线支持:

    一键快速构建Spring项目工具

    一键快速构建SpringBoot项目工具

    一键快速构建SpringCloud项目工具

    一站式Springboot项目生成

    如果大家正在寻找一个java的学习环境,或者在开发中遇到困难,可以加入我们的java学习圈,点击即可加入,共同学习,节约学习时间,减少很多在学习中遇到的难题。

    二、依赖Jar包

    本文假设你已经引入Spring必备的一切了,已经是个Spring mvc项目了,如果不会搭建,可以打开这篇文章看一看《Spring和Spring Mvc 5整合详解》

    我们假定你已经建好了Spring环境。这里只引入Netty的jar包。

    
    	io.netty
    	netty-all
    	4.1.17.Final
    
    
    • 1
    • 2
    • 3
    • 4
    • 5

    完整依赖:

    
    
    	4.0.0
    	
    		cn.pomit
    		SpringWork
    		0.0.1-SNAPSHOT
    	
    	Netty
    	jar
    	Netty
    	http://maven.apache.org
    	
    		
    			io.netty
    			netty-all
    			4.1.17.Final
    		
    		
    			org.springframework
    			spring-core
    		
    		
    			org.springframework
    			spring-context
    		
    	
    	
    		Netty
    	
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33

    父pom管理了所有依赖jar包的版本,地址:
    https://www.pomit.cn/spring/SpringWork/pom.xml

    三、Netty服务器配置

    我们写一个tcp的server,作为线程运行。

    NettyServer :

    package cn.pomit.springwork.netty.server;
    
    
    import org.springframework.beans.factory.annotation.Autowired;
    
    import cn.pomit.springwork.netty.config.NettyServerInitializer;
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.Channel;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    
    public class NettyServer implements Runnable {
    	private int port;
    	private Channel channel;
    	private EventLoopGroup bossGroup;
    	private EventLoopGroup workerGroup;
    	private Thread nserver;
    	@Autowired
    	NettyServerInitializer nettyServerInitializer;
    
    	public void init() {
    		nserver = new Thread(this);
        	nserver.start();
    	}
    
    	public void destory() {
    		System.out.println("destroy server resources");
    		if (null == channel) {
    			System.out.println("server channel is null");
    		}
    		bossGroup.shutdownGracefully();
    		workerGroup.shutdownGracefully();
    		channel.closeFuture().syncUninterruptibly();
    		bossGroup = null;
    		workerGroup = null;
    		channel = null;
    	}
    
    	public int getPort() {
    		return port;
    	}
    
    	public void setPort(int port) {
    		this.port = port;
    	}
    
    	@Override
    	public void run() {
    		bossGroup = new NioEventLoopGroup();
    		workerGroup = new NioEventLoopGroup();
    		System.out.println(Thread.currentThread().getName() + "----位置4");
    		try {
    			ServerBootstrap b = new ServerBootstrap();
    			b.group(bossGroup, workerGroup);
    			b.channel(NioServerSocketChannel.class);
    			b.childHandler(nettyServerInitializer);
    
    			// 服务器绑定端口监听
    			ChannelFuture f = b.bind(port).sync();
    			// 监听服务器关闭监听
    			f.channel().closeFuture().sync();
    			channel = f.channel();
    			// 可以简写为
    			/* b.bind(portNumber).sync().channel().closeFuture().sync(); */
    		} catch (InterruptedException e) {
    			e.printStackTrace();
    		} finally {
    			bossGroup.shutdownGracefully();
    			workerGroup.shutdownGracefully();
    		}
    	}
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75

    代码贴出来之后,我们还是要讲下里面的一些重点:

    bossGroup和workerGroup就是为了建立线程池,这个自行百度。

    以上使用了

    f.channel().closeFuture().sync();
    
    • 1

    这部分代码是阻塞了当前主线程一直等待结束,后面的代码就不能执行了。

    两次sync的不同:两次sync虽然都是针对ChannelFuture的,但是两次的ChannelFuture不一样,
    原理就不细说了,我也不知道,毕竟不是专精这方面的。sync换成await是一样的。

    这里面NettyServerInitializer使用Spring的bean注入功能。

    四、启动Server

    启动NettyServer。

    4.1 Spring的xml配置

    
    
    
    	
    	
    	
    	
    	
    		
    		
    		
    			
    				classpath:netty.properties
    			
    		
    	
    	
    	
    		
    	
    	
    	
    		
    	
    	
    	
    		
    	
    	
    	
    		
    	
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53

    配置文件netty.properties:

    netty_port=4444
    
    • 1

    4.2 注解启动

    只是@Service注解和@PostConstruct注解的运用而已。

    package cn.pomit.springwork.netty.server;
    
    
    import javax.annotation.PostConstruct;
    import javax.annotation.PreDestroy;
    
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Service;
    
    import cn.pomit.springwork.netty.config.NettyServerInitializer;
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.Channel;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    
    @Service
    public class NettyServer implements Runnable {
    	private int port;
    	private Channel channel;
    	private EventLoopGroup bossGroup;
    	private EventLoopGroup workerGroup;
    	private Thread nserver;
    	@Autowired
    	NettyServerInitializer nettyServerInitializer;
    
    	@PostConstruct
    	public void init() {
    		nserver = new Thread(this);
        	nserver.start();
    	}
    
    	@PreDestroy
    	public void destory() {
    		System.out.println("destroy server resources");
    		if (null == channel) {
    			System.out.println("server channel is null");
    		}
    		bossGroup.shutdownGracefully();
    		workerGroup.shutdownGracefully();
    		channel.closeFuture().syncUninterruptibly();
    		bossGroup = null;
    		workerGroup = null;
    		channel = null;
    	}
    
    	public int getPort() {
    		return port;
    	}
    
    	public void setPort(int port) {
    		this.port = port;
    	}
    
    	@Override
    	public void run() {
    		bossGroup = new NioEventLoopGroup();
    		workerGroup = new NioEventLoopGroup();
    		System.out.println(Thread.currentThread().getName() + "----位置4");
    		try {
    			ServerBootstrap b = new ServerBootstrap();
    			b.group(bossGroup, workerGroup);
    			b.channel(NioServerSocketChannel.class);
    			b.childHandler(nettyServerInitializer);
    
    			// 服务器绑定端口监听
    			ChannelFuture f = b.bind(port).sync();
    			// 监听服务器关闭监听
    			f.channel().closeFuture().sync();
    			channel = f.channel();
    			// 可以简写为
    			/* b.bind(portNumber).sync().channel().closeFuture().sync(); */
    		} catch (InterruptedException e) {
    			e.printStackTrace();
    		} finally {
    			bossGroup.shutdownGracefully();
    			workerGroup.shutdownGracefully();
    		}
    	}
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82

    五、NettyServerInitializer

    NettyServerInitializer是对NettyServer的handler初始化配置。可以作为Spring的bean处理,所以这里用了@Component将NettyServerInitializer声明为Spring的bean.

    NettyServerInitializer:

    package cn.pomit.springwork.netty.config;
    
    
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelPipeline;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.handler.codec.DelimiterBasedFrameDecoder;
    import io.netty.handler.codec.Delimiters;
    import io.netty.handler.codec.string.StringDecoder;
    import io.netty.handler.codec.string.StringEncoder;
    
    @Component
    public class NettyServerInitializer extends ChannelInitializer {
    	@Autowired
    	NettyServerHandler nettyServerHandler;
    
    	@Override
    	protected void initChannel(SocketChannel ch) throws Exception {
    		ChannelPipeline pipeline = ch.pipeline();
    
    		// 以("
    ")为结尾分割的 解码器
    		pipeline.addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
    
    		// 字符串解码 和 编码
    		pipeline.addLast("decoder", new StringDecoder());
    		pipeline.addLast("encoder", new StringEncoder());
    		System.out.println(Thread.currentThread().getName() + "----位置5");
    		// 自己的逻辑Handler
    		pipeline.addLast("handler", nettyServerHandler);
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34

    六、Netty的共享ChannelHandler

    netty的ChannelHandler是不能共享的。是一个线程一个的,如果之间将ChannelHandler作为Spring的bean管理,是会报错的。

    因此,需要用@Sharable注解对ChannelHandler做声明,然后再由Spring进行管理。

    NettyServerHandler:

    package cn.pomit.springwork.netty.config;
    
    
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.stereotype.Component;
    
    import cn.pomit.springwork.netty.handler.Handler;
    import io.netty.channel.ChannelHandler.Sharable;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.SimpleChannelInboundHandler;
    
    @Component
    @Sharable
    public class NettyServerHandler extends SimpleChannelInboundHandler{
    	@Autowired(required = false)
    	@Qualifier("closeFutureHandler")
    	public Handler closeFutureHandler;
    	@Autowired(required = false)
    	@Qualifier("exceptionFutureHandler")
    	public Handler exceptionFutureHandler;
    	@Autowired
    	@Qualifier("bussinessFutureHandler")
    	public Handler bussinessFutureHandler;
    	
    	@Override
    	protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {		
    //		System.out.println(((HandlerServiceImp) exportServiceMap.get("helloWorldService")).test());
    //		// 返回客户端消息 - 我已经接收到了你的消息
    		System.out.println(Thread.currentThread().getName()+"----位置6");
    //		handlerService.handle(msg);
    		String retMsg = bussinessFutureHandler.hander(msg);
    		ctx.writeAndFlush(retMsg);
    
    	}
    
    	@Override
    	public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
    		System.out.println(ctx.channel().remoteAddress() + " channelRegistered " );
    		super.channelRegistered(ctx);
    	}
    
    	@Override
    	public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
    		System.out.println(ctx.channel().remoteAddress() + " channelUnregistered " );
    		super.channelUnregistered(ctx);
    		if(closeFutureHandler !=null){
    			closeFutureHandler.hander(ctx.name());
    		}
    	}
    
    	@Override
    	public void channelActive(ChannelHandlerContext ctx) throws Exception {
    		System.out.println(ctx.channel().remoteAddress() + " channelActive " );
    		super.channelActive(ctx);
    	}
    
    	@Override
    	public void channelInactive(ChannelHandlerContext ctx) throws Exception {
    		System.out.println(ctx.channel().remoteAddress() + " channelInactive " );
    		super.channelInactive(ctx);
    	}
    
    	@Override
    	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    		System.out.println(ctx.channel().remoteAddress() + " exceptionCaught :" + cause.getMessage() );
    		super.exceptionCaught(ctx, cause);
    		if(exceptionFutureHandler !=null){
    			exceptionFutureHandler.hander(cause.getMessage());
    		}
    	}
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73

    这里面使用了上面xml中定义的三个业务处理Handler。

    七、业务Handler

    7.1 Handler接口

    Handler :

    package cn.pomit.springwork.netty.handler;
    
    
    public interface Handler {
    	public static String COMMONRET="200";
    	public String hander(String msg);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    7.2 BussinessFutureHandler

    BussinessFutureHandler:

    package cn.pomit.springwork.netty.handler;
    
    public class BussinessFutureHandler implements Handler {
    	private Handler nextHandler;
    
    	public Handler getNextHandler() {
    		return nextHandler;
    	}
    
    	public void setNextHandler(Handler nextHandler) {
    		this.nextHandler = nextHandler;
    	}
    
    	@Override
    	public String hander(String msg) {
    		System.out.println("接收到信息:" + msg);
    		if (nextHandler != null) {
    			nextHandler.hander(msg);
    		}
    		return msg;
    	}
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    7.3 CloseFutureHandler

    CloseFutureHandler:

    package cn.pomit.springwork.netty.handler;
    
    public class CloseFutureHandler implements Handler {
    	private Handler nextHandler;
    
    	public Handler getNextHandler() {
    		return nextHandler;
    	}
    
    	public void setNextHandler(Handler nextHandler) {
    		this.nextHandler = nextHandler;
    	}
    
    	@Override
    	public String hander(String msg) {
    		System.out.println(msg + "正在关闭。");
    		if (nextHandler != null) {
    			nextHandler.hander(msg);
    		}
    		return COMMONRET;
    	}
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    7.4 ExceptionFutureHandler

    ExceptionFutureHandler:

    package cn.pomit.springwork.netty.handler;
    
    public class ExceptionFutureHandler implements Handler {
    	private Handler nextHandler;
    
    	public Handler getNextHandler() {
    		return nextHandler;
    	}
    
    	public void setNextHandler(Handler nextHandler) {
    		this.nextHandler = nextHandler;
    	}
    
    	@Override
    	public String hander(String msg) {
    		System.out.println("出现异常,异常信息:" + msg);
    		if (nextHandler != null) {
    			nextHandler.hander(msg);
    		}
    		
    		return COMMONRET;
    	}
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    至此,Spring整合netty简单的处理方式就完成了。

    快速构建项目

    Spring组件化构建

    喜欢这篇文章么,喜欢就加入我们一起讨论Spring技术吧!

  • 相关阅读:
    微信小程序提示确认框 wx.showModal
    数字化转型“黑话”知多少?一文让你不仅听得懂、还会落地执行
    《计算几何》学习笔记
    【网页前端】CSS样式表进阶之图像的灵活使用与拓展知识
    zookeeper+kafka消息队列群集部署
    c++实现多重继承
    文件上传与安全狗
    【AGC】增长服务1-远程配置示例
    目标检测 YOLO 系列模型
    空气中PM2.5问题的研究(matlab代码)
  • 原文地址:https://blog.csdn.net/m0_67401134/article/details/126516511