同步阻塞:服务器实现模式一个连接一个线程,既客户端有连接请求时,服务器就需要启动一个线程进行处理,如果这个连接不任何事情会造成不必要的线程线程开销
适用场景: 连接数目比较小且固定的架构,这种方式对服务器资源要求比较高,并发局限于应用中,JDK1.4以前的唯一选择,但程序简单易理解
**同步非阻塞:**服务器实现一个模式为一个线程处理多个请求(连接),即可达发送的连接请求,都会注册到多路复用器上,多路复用器轮询到连接有I/O请求就进行处理。
适用场景:连接数目多且连接比较短,eg:聊天服务器,弹幕系,服务期间通讯。
基本介绍:是哟个线程从某个通道发送请求或者读取数据,但是它仅能得到目前可用的数据,如果目前没有数据可用时,就说明都不会获取,而不是保持线程阻塞,所以直至数据变得可以读取之前,该线程可以继续做其他的事情。
异步非阻塞:AIO引入异步通道的概念,采用Proactor模式,简化了程序编写,有效的请求才其启动线程,它的特点是先由操作系统完成后才通知服务端程序去处理,一般适用于链接数较多的且连接时间较长的应用。
适用场景: 连接数目多 且 链接比较长 ,eg: 相册服务器
- 服务器端启动一个 ServiceSocket
- 客户端启动 Socket 对服务器进行通信,默认情况下服务器需要对每个客户 建立一个线程与之通讯
- 客户端发出请求后,先咨询服务器是否有线程响应,如果没有则还会等待,或者被拒绝
- 如果有响应,客户端线程会的等待请求结束后,在继续执行
服务器端:
package com.atguigu.bio;
import java.io.IOException;
import java.io.InputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class BIOService {
public static void main(String[] args) throws Exception {
//线程池机制
//思路
//1. 创建一个线程池
//2 如果有客户端连接,就创建一个线程,与之通讯(单独写一个方法)
//1. 创建一个线程池
ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
//创建一个serversocket
ServerSocket serverSocket = new ServerSocket(6666);
System.out.println("服务器启动");
while (true) {
// 监听,等待客户端连接
final Socket socket = serverSocket.accept();
System.out.println("连接到一个客户端");
//就创建一个线程,与之通讯(单独写一个方法)
newCachedThreadPool.execute(new Runnable() {
public void run() {
handler(socket);
}
});
}
}
//编写一个handler方法,和客户端通讯
public static void handler(Socket socket) {
try {
System.out.println("线程信息 id = " + Thread.currentThread().getId() +
"线程名字 =" + Thread.currentThread().getName());
byte[] bytes = new byte[1024];
//通过socker 获取输入流
InputStream inputStream = socket.getInputStream();
//循环的读取客户端的发送的数据
while (true) {
System.out.println("线程信息 id = " + Thread.currentThread().getId() +
"线程名字 =" + Thread.currentThread().getName());
int read = inputStream.read(bytes);
if (read != -1) {
System.out.print(new String(bytes, 0, read));//输出客户端发送的数据
} else {
break;
}
}
} catch (IOException e) {
e.printStackTrace();
} finally {
System.out.println("关闭和client连接");
try {
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
客户端: cmd telnet 127.0.0.1 6666
NIO三大核心部分 : Channnel(通道) Buffer(缓冲区) Selector(选着器)
- 每个channel 都会对应一个Buffer
- Selector对对应一个线程,一个线程可以对应多个channel
- 该图反应了有三个channel注册到 该selector
- 程序切换到哪个channel是有时间决定的,Event就是一个重要的概念
- Selector会根据不同的事件,在各个通道上切换
- Buffer就是一个内存块,底层是有一个数组
- 数据的读取写入是通过Buffer,这个和BIO,BIO中要么是输入流,或者是输出流 不能双向,但是NIO的Buffer 是可以读也可以写,需要flip方法的切换
- channel是双向的,可以返回底层操作系统的情况、
Buffer支持类型化的put 和 get,put放入什么数据类型,get就应该使用响应的数据类型来取出,否则可嫩硅油 BufferUnderflowException 异常
属性 | 描述 |
---|---|
Capacity | 容量,即可以容纳的最大数量,在缓冲区创建时被设定并且不能改变 |
Limit | 表示缓冲区的当前终点,不能对缓冲区超过极限的位置进行读写操作。且极限是可以修改的 |
Postion | 位置,下一个要被读写的元素的索引,每次读写缓冲区数据都会被改变数值,为下一次的读写准备 |
Mark | 标记 |
通道可以同时进行读写,而流只能读or之后hi能写
通道可以实现异步读写数据
通道可以从缓存读数据,也可以写数据到缓存
基本介绍:
FileChannel主要用来对本地文件进行IO操作,常见的方法有:
public int read(ByteBuffer dst) ,从通道读取数据并放到缓冲区
public int write(ByteBuffer src),把缓冲区的数据写到通道中
public long transferFrom(ReadableByteChannel src,long position,long count),从目标通道中复制数据到当前通道
public long tansferTo(long postion,long count,WritableByteChannel target),吧数据从当前通道复制给目标通道
package com.atguigu.nio;
import java.io.FileOutputStream;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
public class NIOFileChannel01 {
public static void main(String[] args) throws Exception{
String str = "四达时代";
//创建一个输出流
FileOutputStream fileOutputStream = new FileOutputStream("D:\\file01.txt");
//通过fileOutputStream 获取 对应FileChannel
// 这个fileChannel 真实类型是 FileChannelImpl
FileChannel fileChannel = fileOutputStream.getChannel();
//创建一个缓冲区 ByteBuffer 再分配1024空间
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
//将 str 放入 Buffer
byteBuffer.put(str.getBytes());
//对buffer ,进行flip 功能反转
byteBuffer.flip();
//把缓冲区的数据 写入 FileChannel中
fileChannel.write(byteBuffer);
//关闭流
fileOutputStream.close();
}
}
package com.atguigu.nio;
import java.io.File;
import java.io.FileInputStream;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
public class NIOFileChannel02 {
public static void main(String[] args) throws Exception {
File file = new File("D:\\file01.txt");
FileInputStream fileInputStream = new FileInputStream(file);
//通过fileInputStream 获取对应的ileChannel
FileChannel channel = fileInputStream.getChannel();
//创建缓冲区
ByteBuffer byteBuffer = ByteBuffer.allocate((int) file.length());
//将 通道的数据 读入到Buffer
channel.read(byteBuffer);
// 将字节转出 字符串
System.out.println(new String(byteBuffer.array()));
fileInputStream.close();
}
}
Java的NIO,用非阻塞的IO方式,可以使用一个线程,处理多个的客户端链接,就会使用Selector(选择器)
Selector 能够检测多个组测的通道上是否有事件发生(注意:多个channel以事件的方式可以注册到同意哦个Selector),如果有事件发生,便获取事件然后针对每个事件进行响应的处理。这样就可以只用一个单线程曲棍里多个通道,也就是管理多个连接和请求。
只有在链接真正有读写事件发生时,才会进行读写,就大大地减少了系统开销,并且不必每个连接都创建一个线程
避免了多线程之间的上下文切换导致额开销。
监控所有注册的通道,当其中有io流
方法 | 说明 |
---|---|
open() | 得到一个选择器对象 |
selcet(long timeout) | 阻塞 xxxx时间,在xxxx毫秒后返回 |
select | 阻塞 |
wakeup() | 唤醒selector |
selectorNow() | 不阻塞,立马返还 |
Seletor、SelectionKey、ServerScokerChannel和SocketChannle 关系图梳理
下图说明:
当客户端连接时,会通过ServerSocketChannel 得到SocketChannel
Seletor进行监听,select方法,返回有事件发生的通道个数
将socketChannel注册到Selector上,register(Selector sel,int ops)方法
,一个selector上可以注册多个SocketChannel
register(Selector sel,int ops) Seletor 是对应的seletor ,ops 是xxx事件
注册后返回一个SeletionKey,会和该Seletor 关联(集合)
Seletor 进行监听 select 方法,返回有事件发生的通道的个数
进一步得到各个SelectionKey(有事件发生)
在通过SelectionKey 反向获取SocketChannel,方法channel()
可以得到channel
解释:
优点:模型简单、没有多线程、进程通信、竞争的问题,全部在一个线程中完成
缺点:性能问题,只有一个线程,无法完全发挥多核CPU的性能,
说明:
优点:充分利用多核CPU的处理
缺点:多线程数据共享,访问比较复杂,reactor处理所有事件的监听和响应,在单线程运行,在高并发场景容易出现性能瓶颈
**工作原理:**针单Reactor多线程模型中,Reactor在单线程中运行,高并发场景下容易成为性能瓶颈,可以让Reactor在多线程中运行
说明:
优点:父线程与子线程的数据交互简单职责明确,父线程只需要接收新连接,子线程完成后续的业务处理
父线程与子线程的数据交互简单,Reactor主线程只需要把新连接传给子线程,子线程无需返回数据
缺点:编程复杂的较高
说明:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Y0Nb2Dap-1668606912814)(D:/Typora/%E7%AC%94%E8%AE%B0%E5%9B%BE/)]
说明:
上述流程代码
Netty客户端,
package com.atguigu.netty.simple;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
public class NettyClient {
public static void main(String[] args) throws Exception {
//客户端只需要一个事件循环组
NioEventLoopGroup group = new NioEventLoopGroup();
try {
//创建客户端启动对象
//注意客户端使用的不是 ServerBootSTRAP 而是 Bootstrap
Bootstrap bootstrap = new Bootstrap();
//设置相关参数
bootstrap.group(group) //设置线程组
.channel(NioSocketChannel.class) //设置看客户端通道的实现类(反射)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new NettyClientHandler()); //加入自己的处理器
}
});
System.out.println("客户端 OK");
//启动客户端 去连接服务器端
//关于
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6668).sync();
//给关闭通道进行监听
channelFuture.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}
}
}
Netty客户端Handler (会被客户端调用)
package com.atguigu.netty.simple;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
//当通道就绪就会触发该方法
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("client" + ctx);
ctx.writeAndFlush(Unpooled.copiedBuffer("hello,服务器", CharsetUtil.UTF_8));
}
//当通道有读取事件时,会触发
/**
* @param ctx 上下文对象,含有管道 pipeline (管道里面含有好多handler) ,通道 channel(通道注重读和写), 地址
* @param msg 就是客户端发送的数据 ,默认是Object
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("server ctx =" + ctx);
//将msg 转换ByteBuf
//ByteBuf 是 Netty 提供的 , 不是NIO 的ByteBuffer
ByteBuf buf = (ByteBuf) msg;
System.out.println("服务器回复的消息:" + buf.toString(CharsetUtil.UTF_8));
System.out.println("服务器端地址:" + ctx.channel().remoteAddress());
}
//处理异常.一般需要关闭通道
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
Netty服务器端,
package com.atguigu.netty.simple;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
//服务端1
public class NettyServer {
public static void main(String[] args) throws Exception {
//创建BoosGroup 和 WorkerGroup
// 说明
// 1. 创建俩个线程,bossGroup 和 workerGroup
// 2. bossGroup 只是处理连接请求, 真正的和客户端业务处理,会交给workerGroup完成
// 3. 俩个都是无限循环
EventLoopGroup bossGroup = new NioEventLoopGroup();
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
try {
//创建服务器端的启动对象,配置参数
ServerBootstrap bootstrap = new ServerBootstrap();
//使用链式编程来进行设置
bootstrap.group(bossGroup, workerGroup) //设置俩个线程组
.channel(NioServerSocketChannel.class) //使用NioSocketChannel 作为服务器的通道实现
.option(ChannelOption.SO_BACKLOG, 128) //设置线程队列得到的连接个数
.childOption(ChannelOption.SO_KEEPALIVE, true) //设置保持活动连接状态
.childHandler(new ChannelInitializer<SocketChannel>() { //创建一个通道测试对象(匿名对象)
//给pipeline 设置处理器
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new NettyServerHandler());
}
}); //给我们得到 workerGroup 的EventLoop 对应的管道设置处理器
System.out.println("===================服务器 is ready");
//绑定一个端口并且同步,生成一个channelFuture 对象
//启动服务器并绑定端口
ChannelFuture cf = bootstrap.bind(6668).sync();
//对关闭通道进行监听
cf.channel().closeFuture().sync();
}finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
Netty服务器端Handler (会被服务器端调用)
package com.atguigu.netty.simple;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
import java.nio.ByteBuffer;
/**
* 说明:
* 1.我们自定义一个 Handler 需要继续netty 规定好的某个HandlerAdapter
* 2.这时我们自定义一个Handler,才能称为一个handler
*/
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
//读取数据实际(这里我们可以读取客户端发送的信息)
/**
* @param ctx 上下文对象,含有管道 pipeline (管道里面含有好多handler) ,通道 channel(通道注重读和写), 地址
* @param msg 就是客户端发送的数据 ,默认是Object
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("server ctx =" + ctx);
//将msg 转换ByteBuf
//ByteBuf 是 Netty 提供的 , 不是NIO 的ByteBuffer
ByteBuf buf = (ByteBuf) msg;
System.out.println("客户端发送消息是:" + buf.toString(CharsetUtil.UTF_8));
System.out.println("客户端地址:" + ctx.channel().remoteAddress());
}
//数据读取完毕
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
//将数据写入到缓冲中,并刷新
//一般讲,我们这个发送的数据进行编码
ctx.writeAndFlush(Unpooled.copiedBuffer("hello,客户端", CharsetUtil.UTF_8));
}
//处理异常.一般需要关闭通道
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
用户自定义的普通任务
用户自定义定时任务
非当前Reactor线程调用Channel的各种方法
eg: 在推送系统的业务线程里面,根据用户的标识,找到对应的Channel引用,然后调用write类方法向该用户推送消息,就会进入这种场景,最终的Write会提交到任务队里后被异步消费
Netty抽象出俩组线程池,BossGroup专门负责接受客户端连接,WorkerGroup专门负责网络读写操作
NioEventLoop 表示一个不断循环执行处理任务的线程,每个NioEventKLoop 都有一个selector,用于监听绑定在其他的socket网络通道
NioEventLoop内部采用串行化设计,从消息得到读取 -> 解码 -> 处理-> 编码 ->发送,始终由IO线程NioEventLoop负责
NioEventLoopGroup 下包含多个NioEventLoop
每个NioEventLoop 中包含一个Selector,一个taskQueue
每个NioEventLoop的Selector 上可以注册监听多个 Niochannel
每个NioChannel 只会绑定在唯一的NioEventChannle
每个NioChannel 都绑定有一个自己的hannelPipeline
基本介绍
异步的概念和同步相对。当一个异步过程调用发出后,调用者不能立刻得到结果。实际处理这个调用的组件在完成后,通过状态、通知和回调来通知调用者
Netty中的I/O操作时异步的,包括Bind、Write、Connect等操作会简单的返回一个ChannelFuture
调用者并不能立刻获得结果,而是通过Future-Listener机制,用户可以方便的主动获取or通过通知机制获得IO操作的结果
Netty的异步模型是建立在future 和 callback就是回调。重点说Futture,他的核心思想是:假设一个方法fun,计算过程可能非常的耗时,等待fun返回显然不合适,那么可以调用fun的时候,立马返回一个Future,后续可以勇敢Future区监听分发fun的处理过程(eg:Future-Listener机制)
public interface ChannelFuture extends Future<Void>
我们可以添加监听器,当监听的事件发生时,就会通知到监听器
Bootstrap 意思是 引导,一个Netty应用通常由一个Bootstrap开始,主要作用是配置整个Netty程序,串联各个组件,Netty中Bootstrap类是客户端程序的启动引导类,ServerBootstrap是服务端启动引导类
注意点:
ChannelPipeline是一个重点
我们经常需要自定义一个Handler类去继承ChannelInboundHandlerAdapter,然后通过重写相应方法实现业务逻辑
如下图:
Netty在创建Channel实例后,一般都需要设置 ChhannelOptiion参数
ChannelOption参数如下:
ChannlOption.SO_BACKLOG
对应TCP\IP协议 listen函数中的backlog参数,用来初始化服务器的可连接队列大小。服务器粗处理 客户端连接请求是顺序处理的,所以同一时间只能处理一个客户端连接。多个客户端来的时候,服务端将不能处理的客户端连接请求放在队列中等待处理,backlog参数指定了队列的大小
ChannelOption.SO_KEEPALIVE
一直保持连接活动状态
public NioEventLoopGroup() 构造方法
public Future<?> shutdownGacefully() 断开连接,关闭线程
package com.atguigu.netty.groupchat;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
public class GroupChatServer {
private int port; //监听端口
public GroupChatServer(int port) {
this.port = port;
}
//编写run方法,处理客户端请求
public void run() throws Exception{
//创建俩俩个线程组
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup wokerGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, wokerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel ch) throws Exception {
//获取到pipeline
ChannelPipeline pipeline = ch.pipeline();
//向pipeline加入解码器
pipeline.addLast("decoder", new StringDecoder());
//向pipeline加入编码器
pipeline.addLast("encoder", new StringEncoder());
//加入自己的业务处理handler
pipeline.addLast(new GroupChatServerHandler());
}
});
System.out.println("Netty 服务器启动");
//异步处理
ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
//监听关闭事件
channelFuture.channel().closeFuture().sync();
}finally {
bossGroup.shutdownGracefully();
wokerGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception{
new GroupChatServer(7000).run();
}
}
package com.atguigu.netty.groupchat;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;
import java.text.SimpleDateFormat;
public class GroupChatServerHandler extends SimpleChannelInboundHandler<String> {
//定义一个channel组 管理所有的channel
private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
/**
* 将该客户加入聊天的信息推送 给 其他在线的客户端该方法
*/
//handlerAdded 表示连接建立 ,一旦连接 第一个被执行
//逻辑: 将当前channel 加入到 channelGroup中
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
// 将 channelGroup 中所有的channel 遍历 ,并发送消息. 我们不需要自己遍历
channelGroup.writeAndFlush("[客户端]" + channel.remoteAddress() + "加入聊天"+sdf.format(new java.util.Date())+ "\n");
channelGroup.add(channel);
}
/**
* 断开连接,将xx客户离开后的信息推送给当前在线的客户
*
* @param ctx
* @throws Exception
*/
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
channelGroup.writeAndFlush("[客户端]" + channel.remoteAddress() + "离开了"+sdf.format(new java.util.Date())+ "\n");
System.out.println("channelGroup size = " + channelGroup.size());
}
/**
* 表示channel 处于活动状态,提示XX上线
*
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println(ctx.channel().remoteAddress() + " 上线了");
}
/**
* 表示channel 处于非活动状态,提示XX下线
*
* @param ctx
* @throws Exception
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println(ctx.channel().remoteAddress() + " 离线了");
}
/**
* 读取数据
* @param ctx
* @param msg
* @throws Exception
*/
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
//获取到当前channel
Channel channel = ctx.channel();
//这时我们遍历channelGroupp ,根据不同的情况,回送不同的信息
channelGroup.forEach(ch ->{
if(channel != ch){
ch.writeAndFlush("[客户]"+ channel.remoteAddress() + " 发送了信息" + msg + "\n");
}else{ //回显自己发送的信息给自己
ch.writeAndFlush("[自己]发送了信息" + msg + "\n");
}
});
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
//关闭通道
ctx.close();
}
}
package com.atguigu.netty.groupchat;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import java.util.Scanner;
public class GroupChatClient {
private final String host;
private final int port;
public GroupChatClient(String host, int port) {
this.host = host;
this.port = port;
}
public void run() throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//得到pipeline
ChannelPipeline pipeline = ch.pipeline();
//加入相关handler
//向pipeline加入解码器
pipeline.addLast("decoder", new StringDecoder());
//向pipeline加入编码器
pipeline.addLast("encoder", new StringEncoder());
//加入自己的业务处理handler
pipeline.addLast(new GroupChatClientHandler());
}
});
ChannelFuture future = bootstrap.connect(host, port).sync();
//得到channel
Channel channel = future.channel();
System.out.println("--------"+channel.localAddress()+ "----------");
//客户端需要输入信息,创建一个扫描器
Scanner scanner = new Scanner(System.in);
while (scanner.hasNextLine()){
String msg = scanner.nextLine();
//通过channel 发送到服务器端
channel.writeAndFlush(msg+ "\r\n");
}
} finally {
group.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception{
new GroupChatClient("127.0.0.1",7000).run();
}
}
package com.atguigu.netty.groupchat;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
public class GroupChatClientHandler extends SimpleChannelInboundHandler<String> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
System.out.println(msg.trim());
}
}
客户端每隔一段时间发送PING消息给服务端,服务端接受到后回复PONG消息。客户端如果在一定时间内没有收到PONG响应,则认为连接断开,服务端如果在一定时间内没有收到来自客户端的PING请求,则认为连接已经断开。通过这种来回的PING-PONG消息机制侦测连接的活跃性。
注意:
如果心跳机制这边 设置时间,谁的时间最短谁会被触发,如下图 最先触发的 读写空闲
package com.atguigu.netty.heartbeat;
import com.atguigu.netty.groupchat.GroupChatServerHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.timeout.IdleStateHandler;
import java.util.concurrent.TimeUnit;
public class MyServer {
public static void main(String[] args) throws Exception {
//创建俩俩个线程组
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup wokerGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, wokerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128)
.handler(new LoggingHandler(LogLevel.INFO)) //日志处理器
.childHandler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel ch) throws Exception {
//获取到pipeline
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new IdleStateHandler(3,5,1, TimeUnit.SECONDS));
//加入自己的业务处理handler
pipeline.addLast(new MyServerHnandler());
}
});
System.out.println("Netty 服务器启动");
//异步处理
ChannelFuture channelFuture = serverBootstrap.bind(7000).sync();
//监听关闭事件
channelFuture.channel().closeFuture().sync();
}finally {
bossGroup.shutdownGracefully();
wokerGroup.shutdownGracefully();
}
}
}
package com.atguigu.netty.heartbeat;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleStateEvent;
public class MyServerHnandler extends ChannelInboundHandlerAdapter {
/**
*
* @param ctx 上下文
* @param evt 事件
* @throws Exception
*/
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent){
// 将 evt 向下转型
IdleStateEvent event = (IdleStateEvent) evt;
String eventType = null;
switch (event.state()){
case READER_IDLE:
eventType = "读空闲";
break;
case WRITER_IDLE:
eventType = "写空闲";
break;
case ALL_IDLE:
eventType = "读写空闲";
break;
}
System.out.println(ctx.channel().remoteAddress() + "--超时时间--"+ eventType);
//检测到空闲 就会关闭
ctx.channel().close();
}
}
}
编写网络应用程序时,因为数据在网络中传输都是二进制字节码数据,在发送数据时就需要编码,接收数据是就需解码
codec(编解码器)对的组成部分有俩个:decoder(解码器) 和 encoder(编码器).
encoder负责把业务数据转换成字节码数据,decoder负责把字节码数据转换成业务数据
就像发快递一样,自己(C)发快递要包装(编码),别人(S)接受你的快递(数据),要拆包(解码)
- **服务消费方(client)**以本地调用方式调用服务
- client stub 接收到调用后负责将方法、参数等封装成能够进行网络传输的消息体
- client stub 将消息进行编码并发送到服务端
- server stub 收到信息后进行解码
- server stub 根据结果码结果调用本地服务
- 本地服务执行并将结果返回给 server stub
- server stub将返回结果进行编码并发送至消费放
- client stub 接受到消息并进行解码
- 服务消费方(client)得到结果
RPC的目标 就是将2-8这些步骤都封装起来,用户无需关心这些细节,可以像调用本地方法一样即可完成远程服务调用
,Protobuf 是将类的定义使用proto文件进行扫描.说明 在idea中编写proto文件时,会自动提示是否下载 ptotot编写插件,可以让语法高亮
5. 然后通过proto.exe 编译器根据proto自动生成. java文件
就像发快递一样,自己(C)发快递要包装(编码),别人(S)接受你的快递(数据),要拆包(解码)
- **服务消费方(client)**以本地调用方式调用服务
- client stub 接收到调用后负责将方法、参数等封装成能够进行网络传输的消息体
- client stub 将消息进行编码并发送到服务端
- server stub 收到信息后进行解码
- server stub 根据结果码结果调用本地服务
- 本地服务执行并将结果返回给 server stub
- server stub将返回结果进行编码并发送至消费放
- client stub 接受到消息并进行解码
- 服务消费方(client)得到结果
RPC的目标 就是将2-8这些步骤都封装起来,用户无需关心这些细节,可以像调用本地方法一样即可完成远程服务调用