• netty入门


    Reactor 模式

    事件驱动模型

    Netty是 一个异步事件驱动的网络应用程序框架, 用于快速开发可维护的高性能协议服务器和客户端。

    事件驱动模型主要应用在图形用户界面(GUI)、网络服务和 Web 前端上。

    比如编写图形用户界面程序, 要给界面上每个按钮都添加监听函数, 而该函数只有在相应的按钮被用户点击的事件发生时才会执行, 开发者并不需要事先确定事件何时发生, 只需要编写事件的响应函数即可。监听函数或者响应函数就是所谓的事件处理器(event handler), 类似的事件还有鼠标移动、按下、松开、双击等等, 这就是事件驱动。

    事件驱动的程序一般都有一个主循环(main loop)或称事件循环(event loop), 该循环不停地做两件事: 事件监测和事件处理。首先要监测是否发生了事件, 如果有事件发生则调用相应的事件处理程序, 处理完毕再继续监测新事件。


    Reactor 是什么

    关于 reactor 是什么

    1. 事件驱动(event handling)

    2. 可以处理一个或多个输入源(one or more inputs)

    3. 通过 Service Handler 同步的将输入事件(Event)采用多路复用分发给相应的 Request Handler(多个)处理

    只提供一个阻塞对象service handler(reactor)

    常见的网络服务中,如果每一个客户端都维持一个与登陆服务器的连接。那么服务器将维护多个和客户端的连接以出来和客户端的 contnect 、read、write ,特别是对于长链接的服务,有多少个 c 端,就需要在 s 端维护同等的 IO 连接。这对服务器来说是一个很大的开销。


    单 Reactor 单线程模型


    单 Reactor 多线程模型

    Reactor对象通过select()监控客户端请求事件,收到事件后,通过dispatch()进行分发。worker线程池会分配独立线程完成真正的业务,并将结果返回给handler。

    优点:可以充分利用多核cpu的处理能力。

    缺点是reactor处理所有的事件监听和响应,此reactor是单线程运行,在高并发场景容易出现性能瓶颈。


    主从Reactor 多线程模型

    Reactor主线程只处理连接请求

    Reactor子线程可以有多个,每一个去处理io的读取和业务处理

    优点1:父线程与子线程的数据交互简单职责明确,父线程只需要接收新连接,子线程完成后续的业务处理。
    优点2:父线程与子线程的数据交互简单,Reactor主线程只需要把新连接传给子线程,子线程无需返回数据。


    Netty模型

    简单版:

    进阶版:

    bossgroup中的每个线程

    workgroup中的每个线程

    pipeline


    Netty组件入门

    ChannelInitializer

    创建一个 ChannelInitializer(通道初始化),这里主要就是管理自定义 Handler,最终把这些 Handler 组装成一条双向链表,Channel 有事件时则触发链表进行业务处理逻辑


    ChannelPipeline

    ChannelPipeline 可认为是一个管道,是管理业务 Handler,通俗理解是保存 ChannelHandler 的 List 集合,负责编排ChannelHandler以使ChannelHandler能有效的协同作业

    ChannelPipeline 底层设计是采用责任链设计模式,作用是管理 Handler 双向链表,包括入站和出站,主要是拦截 inboud 和 outbound 事件,然后每个 handler 节点负责处理具体逻辑


    ChannelHandler

    ChannelHandler: 在Netty中作为处理Channel中的事件以及数据的一种方式。

    对于入站与出站消息又分别使用ChannelInboundHandler与ChannelOutboundHandler来处理

    • ChannelOutboundHandler:处理出站数据并且允许拦截所有的操作
    • ChannelInboundHandler:处理入站数据以及各种状态的变化

    ChannelInboundHandler 的实现类 ChannelInboundHandlerAdapter 处理入站 I/O 事件

    ChannelOutboundHandler 的实现类 ChannelOutboundHandlerAdapter 处理出站 I/O 事件

    ChannelHandler生命周期:

    类型描述
    handlerAdded当把ChannelHandler添加到ChannelPipeline中时被调用
    handlerRemoved当把ChannelHandler在ChannelPipeline中移除时调用
    exceptionCaught当ChannelHandler在处理过程中出现异常时调用

    ChannelInboundHandler

    ChannelInboundHandler中可以获取网络数据并处理各种事件,列出能处理的各种事件

    入站事件是被动接收事件,例如接收远端数据,通道注册成功,通道变的活跃等等。
    事件说明
    channelRegistered当Channel注册到它的EventLoop并且能够处理I/O时调用
    channelUnregistered当Channel从它的EventLoop中注销并且无法处理任何I/O时调用
    channelActive当Channel处理于活动状态时被调用
    channelInactive不再是活动状态且不再连接它的远程节点时被调用
    channelReadComplete当Channel上的一个读操作完成时被调
    channelRead当从Channel读取数据时被调用
    channelWritabilityChanged当Channel的可写状态发生改变时被调用
    userEventTriggered当ChannelInboundHandler.fireUserEventTriggered()方法被调用时触发

    Netty另外还提供了一个类来简化这一过程SimpleChannelInboundHandler类,

    1. @Sharable// 表示它可以被多个channel安全地共享。在实际运行时,是起不到什么作用的
    2. public class ClientHandler extends SimpleChannelInboundHandler {
    3. @Override
    4. protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
    5. System.err.println(msg);
    6. }
    7. @Override
    8. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
    9. cause.printStackTrace();
    10. ctx.close();
    11. }
    12. }

    继承SimpleChannelInboundHandler后,处理入站的数据我们只需要重新实现channelRead0()方法。

    当channelRead真正被调用的时候我们的逻辑才会被处理。这里使用的是模板模式,让主要的处理逻辑保持不变,让变化的步骤通过接口实现来完成

    ChannelOutboundHandler

    出站的数据和操作由ChannelOutboundHandler接口处理

    出站事件是主动触发事件,例如绑定,注册,连接,断开,写入等等。

    ChannelOutboundHandler定义的方法列出

    方法描述
    bind当请求将Channel绑定到本地地址时被调用
    connet当请求将Channel连接到远程节点时被调用
    disconnect当请求将Channel从远程节点断开时调用
    close当请求关闭Channel时调用
    deregister当请求将Channel从它的EventLoop注销时调用
    read当请求从Channel中读取数据时调用
    flush当请求通过Channel将入队数据冲刷到远程节点时调用
    write当请求通过Channel将数据写入远程节点时被调用

    ChannelHandlerContext

    ChannelHandlerContext 是对 ChannelHandler 的封装

    当一个 ChannelHandler 添加到管道ChannelPipeline时,由ChannelPipeline创建一个包裹 ChannelHandler 的上下文ChannelHandlerContext对象添加进去。

    ChannelHandler 中可以得到这个上下文对象ChannelHandlerContext,这样它就可以:

    可以向上游或下游传递事件,实现责任链的功能,将事件传递给下一个处理器ChannelHandler 处理。

    责任传播和终止机制:以 ChannelInboundHandlerAdapter 的 channelRead 方法为例

    ChannelHandlerContext 会默认调用 fireChannelRead 方法将事件默认传递到下一个处理器。

    如果我们重写了 ChannelInboundHandlerAdapter 的 channelRead 方法,并且没有调用 fireChannelRead 进行事件传播,那么表示此次事件传播已终止。

    ChannelHandlerContext 负责保存责任链节点上下文信息

    首先ChannelHandlerContext是一个AttributeMap,可以用来存储多个数据。

    可以从ChannelHandlerContext中获取到对应的channel,handler和pipline:

    Channel channel();
    ChannelHandler handler();
    ChannelPipeline pipeline();


    Channel

    Channel:Netty 的网络通信组件,客户端和服务端建立连接之后会维持一个 Channel。读写网络数据等等都需要Channel这个组件的参与

    Channel生命同期:

    状态描述
    ChannelUnregisteredChannel已经被创建,但还未注册到EventLoop
    ChannelRegisteredChannel已经被注册到EventLoop
    ChannelActiveChannel已经处理活动状态并可以接收与发送数据
    ChannelInactiveChannel没有连接到远程节点

    关系梳理

    1. 一个客户端对应一个 Channel;
    2. 一个 Channel 对应一个 ChannelPipeline;
    3. 一个 ChannelPipeline 又维护了一个双向链表,其中链表的节点是 ChannelHandlerContext
    4. 一个 ChannelHandlerContext 则关联一个 ChannelHandler(自定义 Handler);
    5. ChannelHandler 则是自定义 Handler 的一个顶级抽象接口。

    入站和出站的顺序:

    1. 入站: 事件会从链表的头部一直往后执行,ChannelInboundHandlerAdapter 子类会被触发;
    2. 出站: 事件会从链表的尾部一直往前执行,ChannelOutboundHandlerAdapter 子类会被触发。

    Hello World案例

    1、依赖引入

    1. <dependency>
    2. <groupId>io.nettygroupId>
    3. <artifactId>netty-allartifactId>
    4. <version>4.1.39.Finalversion>
    5. dependency>

    2、服务端编写

    NioEventLoopGroup是EventLoopGroup的具体实现,

    NioEventLoopGroup是线程组,包含一组NIO线程,专门用于网络事件的处理

    netty的实例化,需要启动两个EventLoopGroup

    一个boss,一个worker

    1. EventLoopGroup bossGroup = new NioEventLoopGroup();
    2. EventLoopGroup workerGroup = new NioEventLoopGroup();

    初始化两个线程: 一个线程(boss)负责接受新的连接,一个(worker)负责处理读写;

    bootstrap:直译为"引导"

    Netty 中 ServerBootStrap 和 Bootstrap 引导器是最经典的建造者模式实现,在构建过程中需要设置非常多的参数。

    1. //服务器端
    2. public class HelloServer {
    3. public static void main(String[] args) {
    4. //创建boss线程组 用于服务端接受客户端的连接
    5. EventLoopGroup bossGroup = new NioEventLoopGroup(1);
    6. //创建 worker 线程组 用于进行 SocketChannel 的数据读写
    7. EventLoopGroup workerGroup = new NioEventLoopGroup();
    8. //创建 ServerBootstrap 对象,用于设置服务端的启动配置
    9. ServerBootstrap b = new ServerBootstrap()
    10. .group(bossGroup, workerGroup)
    11. //选择服务器的ServerSocketChannel实现
    12. .channel(NioServerSocketChannel.class)
    13. //通道初始化
    14. .childHandler(new ChannelInitializer() {
    15. @Override
    16. protected void initChannel(NioSocketChannel channel) throws Exception {
    17. //添加具体的handler
    18. channel.pipeline().addLast(new StringDecoder());
    19. channel.pipeline().addLast(new ChannelInboundHandlerAdapter() {
    20. @Override
    21. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    22. System.out.println(msg);
    23. }
    24. });
    25. }
    26. });
    27. //服务端启动绑定端口号
    28. ChannelFuture future = b.bind(8888);
    29. }
    30. }


    3、ChannelFuture

    获取一个ChannelFuture对象,它的作用是用来保存 Channel 异步操作的结果(咋感觉那么像promise对象??)

    在 Netty 中所有的 IO 操作都是异步的,不能立刻得到 IO 操作的执行结果,但是可以通过注册一个监听器来监听其执行结果。在 Netty 当中是通过 ChannelFuture 来实现异步结果的监听

    Netty 是异步操作,无法知道什么时候执行完成,在 Netty 当中 Bind 、Write 、Connect 等操作会简单的返回一个 ChannelFuture,来进行执行结果的监听。

    1. //ChannelFuture 用来保存 Channel 异步操作的结果
    2. ChannelFuture future = b.connect(new InetSocketAddress("localhost", 8888));
    3. //等待异步操作执行完毕
    4. future.sync();
    5. ------------------------------
    6. //服务端启动绑定端口号
    7. ChannelFuture future = b.bind(8888);
    8. future.addListener(new GenericFutureListenersuper Void>>() {
    9. public void operationComplete(Futuresuper Void> future) {
    10. if (future.isSuccess()) {
    11. System.out.println("端口绑定成功!");
    12. } else {
    13. System.err.println("端口绑定失败!");
    14. }
    15. }
    16. });
    序号方法描述
    1addListener注册监听器,当操作已完成 (isDone 方法返回完成),将会通知指定的监听器;如果 Future 对象已完成,则通知指定的监听器
    2removeListener移除监听器
    3sync等待异步操作执行完毕
    4await等待异步操作执行完毕
    5isDone判断当前操作是否完成
    6isSuccess判断已完成的当前操作是否成功
    7isCancellable判断已完成的当前操作是否被取消
    8cause获取已完成的当前操作失败的原因

    sync () 和 await () 都是等待异步操作执行完成,那么它们有什么区别呢?

    1. sync () 会抛出异常,建议使用 sync ();
    2. await () 不会抛出异常,主线程无法捕捉子线程执行抛出的异常。

    Future 可以通过四个核心方法来判断任务的执行情况。

    状态说明
    isDone()任务是否执行完成,无论成功还是失败
    isSuccess()任务是否执行采购
    isCancelled()任务是否被取消
    cause()获取执行异常信息
    1. if(future.isDone()){
    2. if(future.isSuccess()){
    3. System.out.println("执行成功...");
    4. }else if(future.isCancelled()){
    5. System.out.println("任务被取消...");
    6. }else if(future.cause()!=null){
    7. System.out.println("执行出错:"+future.cause().getMessage());
    8. }
    9. }

    4、客户端编写

    连接的时候第一个参数为 IP 或者域名,第二个参数为端口号

    1. public class HelloClient {
    2. public static void main(String[] args) throws InterruptedException {
    3. Bootstrap b = new Bootstrap()
    4. //EventLoop即(selector,thread)
    5. .group(new NioEventLoopGroup())
    6. //选择客户端实现
    7. .channel(NioSocketChannel.class)
    8. //添加处理器
    9. .handler(new ChannelInitializer() {
    10. @Override
    11. protected void initChannel(NioSocketChannel channel) throws Exception {
    12. channel.pipeline().addLast(new StringEncoder());
    13. }
    14. });
    15. //ChannelFuture 用来保存 Channel 异步操作的结果
    16. ChannelFuture future = b.connect(new InetSocketAddress("localhost", 8888));
    17. //等待异步操作执行完毕.sync () 会抛出异常
    18. future.sync();
    19. Channel channel = future.channel();
    20. channel.writeAndFlush("hello,world");
    21. }
    22. }

    Channe组件:Netty 的网络通信组件,客户端和服务端建立连接之后会维持一个 Channel


    5、失败重连机制

    在网络情况差的情况下,客户端第一次连接可能会连接失败,这个时候我们可能会尝试重新连接

    1. private static void connect(Bootstrap bootstrap, String host, int port) {
    2. bootstrap.connect(host, port).addListener(future -> {
    3. if (future.isSuccess()) {
    4. System.out.println("连接成功!");
    5. } else {
    6. //获取EventLoopGroup
    7. EventLoopGroup thread=bootstrap.config().group();
    8. //每隔5秒钟重连一次
    9. thread.schedule(new Runnable() {
    10. public void run() {
    11. //递归调用连接方法
    12. connect(bootstrap, host, port);
    13. }
    14. }, 5, TimeUnit.SECONDS);
    15. }
    16. });
    17. }

    bootstrap.config().group() 

    bootstrap.config().group() 返回的是我们最开始配置的线程模型 workerGroup,调用 workerGroup 的 schedule 方法即可实现定时任务逻辑。

    通常情况下,连接建立失败不会立即重新连接,而是会通过一个指数退避的方式,根据一定是的时间间隔,比如 2 的次幂来简历连接,到达一定次数之后就放弃连接

    1. private static void connect(Bootstrap bootstrap, String host, int port, int retry) {
    2. bootstrap.connect(host, port).addListener(future -> {
    3. if (future.isSuccess()) {
    4. System.out.println("连接成功!");
    5. } else if (retry == 0) {
    6. System.err.println("重试次数已用完,放弃连接!");
    7. } else {
    8. // 第几次重连
    9. int order = (MAX_RETRY - retry) + 1;
    10. // 本次重连的间隔
    11. int delay = 1 << order;
    12. System.err.println(new Date() + ": 连接失败,第 " + order + " 次重连……");
    13. bootstrap.config()
    14. .group()
    15. .schedule(() -> connect(bootstrap, host, port, retry - 1), delay, TimeUnit.SECONDS);
    16. }
    17. });
    18. }
    19. // 调用方式
    20. // MAX_RETRY 最大重试次数
    21. connect(bootstrap, "127.0.0.1", 8070, MAX_RETRY);

    6、Hello World 测试

    1.只启动客户端HelloClient,会报错

    2、只启动服务端HelloServer,不会报错

    3、启动服务端后,再次启动客户端


    DevOps

    DevOps这个词,其实就是Development和Operations两个词的组合。

    在DevOps的流程下,运维人员会在项目开发期间就介入到开发过程中,了解开发人员使用的系统架构和技术路线,从而制定适当的运维方案。而开发人员也会在运维的初期参与到系统部署中,并提供系统部署的优化建议。

    微服务

    所谓“微服务”,就是将原来黑盒化的一个整体产品进行拆分(解耦),从一个提供多种服务的整体,拆成各自提供不同服务的多个个体。

    微服务架构下,不同的工程师可以对各自负责的模块进行处理,例如开发、测试、部署、迭代。


    虚拟化

    虚拟化,其实就是一种敏捷的云计算服务。它从硬件上,将一个系统“划分”为多个系统,系统之间相互隔离,为微服务提供便利。


    容器

    在操作系统上划分为不同的“运行环境”(Container),占用资源更少,部署速度更快。

    虚拟化和容器,其实为DevOps提供了很好的前提条件。开发环境和部署环境都可以更好地隔离了,减小了相互之间的影响。


    云平台

    公有云

    安全组:防火墙

  • 相关阅读:
    docker使用入门
    【论文阅读】半监督时序动作检测 Semi-Supervised Action Detection
    系统架构师2022年案例分析考前
    《深入了解java虚拟机》高效并发读书笔记——Java内存模型,线程,线程安全 与锁优化
    【C++】topk问题
    MyBatis实现多层级collection嵌套查询
    8个独立键盘驱动程
    信息安全实验四:Ip包监视程序实现
    MySQL 查询 唯一约束 对应的字段,列名称合并
    idea 中Maven项目转Gradle项目
  • 原文地址:https://blog.csdn.net/m0_56799642/article/details/133838177