• 响应式编程-基本介绍


    响应式编程-基本介绍

    什么是响应式编程

    响应式宣言(The Reactive Manifesto) 对响应式系统进行了定义:响应式系统是具备以下特质即时响应性(Responsive)、回弹性(Resilient)、弹性(Elastic)以及消息驱动(Message Driven)的系统。

    响应式是一种思维模式,核心在于异步和非阻塞。最初起源于IO模型的Reactor模型,并形成理念产生了各类响应式框架,贯穿整个系统使用的编程模式,从系统接受输入开始到完成完整的系统功能输出结果的完整过程中都是响应式的。

    https://img-blog.csdnimg.cn/img_convert/db39eeaa35fc389a72b2ba7653cc49ea.png

    1 主从多线程模型的Reactor线程模型

    响应式的底层逻辑如下:阻塞是非常浪费和影响性能的,要构建一个无阻塞的系统。响应式编程有三个最重要的特点:1. 事件驱动的处理模式。 2. 只有事件/消息真正被订阅时才会执行,在此之前都是对事件/消息处理流的一种编排。3. 事件/消息处理流可以对背压进行管理。

    理解响应式编程:

    1. 以监听者模式来说,当需要发布一个事件时,通常会调用所欲listener事件处理接口

            for(Listener listener :  listeners) {

                listener.onEvent(event);

            }

    1. 以上例子中如果某个listener功能发生阻塞,将造成发布者线程阻塞。为改善阻塞,引入多线程/异步的方式:

    ExecutorService executor = Executors.newCachedThreadPool();

            for(Listener listener :  listeners) {

                executor.execute(()->{

                    listener.onEvent(event);

                });

        }

    1. 在上面使用线程池的情况下, 线程数量受服务器限制,当事件大量增多是,可能会发生线程池处理队列满而发生阻塞的情况,如网关中涌入大量请求时的场景,因此即使每一个连接或每一个请求都做异步处理,调度线程总会在某时刻无法获取空闲线程而阻塞事件分发线程。
    2. Listener处理事件如果是响应式编程, listener将对事件的处理编排成一个执行流并返回,但不会执行他,只有当执行流被订阅时才真正执行。

    比如 onEvent将返回一个MonoFlux对象。

    Mono result = Mono.fromSupplier(this::test);

    1. 响应式和多线程、异步的差别在哪:
      1. 响应式是惰性的,也就是在被订阅之前不会执行任何操作。而Feature或线程池总是会执行某些操作,比如放入任务队列, 申请线程等。
      2. 当收到响应式的发布者对象时,由调用者可以将其封装在响应式链条中,从而构建一个完整的响应链。
      3. 当需要订阅时(真正执行时),可以方便的使用Scheduler进行异步订阅处理。

    背压是指来自后端的压力,当消息消费者的消费能力无法跟上消息发送者的能力,造成消息的积压,甚至可能由于持续的压力造成消息消费者的崩溃。类似于DDOS拒绝服务攻击的概念。异步消息驱动的系统需要能够进行背压管理,如引入消息中间件,消息将安全存储在消息中间件中,直到被可靠消费。而在响应式编程中也需要提供背压的处理机制避免消息积压和系统阻塞。

    响应式编程的使用场景

    响应式编程并不会提高程序的运行速度,由于存在一定的学习门和思维模式转变的原因,在业务系统的开发中并没有显示器重要性而得到应用。但在IO密集型的适用场景应用中得到了广泛的应用,如网关应用:zuul2 spring cloud gateway等。

    Java 响应式流规范

           Reactive-Streams(https://www.reactive-streams.org/) 是Java语言的响应式流编程模型,并已被JDK9采用,以java.util.concurrent.Flow API提供。

          

    Reactive-Stream中核心编程模型抽象:

    图2 Reactor-Steam消息处理流程

    消息的处理流程如下:

    1. 当对publisher进行订阅时,publisher将Subscription对象发送到消费者,消费者适用Subscription控制消息的推送。
    2. 消费者调用Subscription.request(n)方法表示,可以消费n条消息, 消息发送者消息推送至消费者直至消息数量达到n(调用消费者的onXXX方法)。
    3. 消费者持续调用Subscription.request(n)方法像消息生产者反馈自身消费能力,使得消息推送在背压得到管理的情况下持续进行。

    下面是React-Stream规范定义的核心接口:

           Publisher接口:一个发送无限序列的发布者。

    public interface Publisher {

        void subscribe(Subscriber var1);

    }

           Subscriber接口: 定义特定事件触发的处理方法。包括onSubscribe:Publisher进行订阅时;onNext:消费消息;onError:处理异常信号;onComplete:处理结束信号。

    public static interface Subscriber {

        public void onSubscribe(Subscription subscription);

        public void onNext(T item);

        public void onError(Throwable throwable);

        public void onComplete();

    }

           Subscription接口:Subscriber对Publisher的消息消费请求, 只有调用SubScription接口方法,消息才从Publisher传递至Subscriber,并调用Subscriber相应接口方法。

    public interface Subscription {

        void request(long var1);

        void cancel();

    }

           Subscription的request接口提供了从订阅者到发起者的反馈,表示可以消费n个消息,在进行request之前,发送者并不会推送消息至消费者,从而实现了背压的管理。

    Reactor

    Reactor

    Reactor是一个Reactor-Stream规范的实现者,已被使用在了从多的框架项目中,如Spring MVC, Spring webFlux,Spring cloud gateway等。

    官方文档地址如下Reactor 3 Reference Guide

    下面简单介绍一下Flux,Mono和Scheduler的概念, 其他特性详见官方文档。

    Flux和Mono

    Flux和Mono是Reactor Stream publisher的标准实现。都表示一个异步的消息序列,Flux表示一个产生0或无限个消息的序列(持续产生消息直至产生一个Complete消息),而Mono表示一个0-1个消息的序列。

    图3 Flux会持续推送消息直至发送完成信号

    图4 Mono只会产生0-1个消息

    Scheduler

    Reactor, 和 RxJava一样,也可以被认为是 并发无关(concurrency agnostic) 的。指的是它并不强制要求使用并发模式,将选择权交给开发者。

    Reactor可以方便的再publishOn()或SubscribeOn()方法中使用Scheduler对象来是的线程的发布或订阅在

    PublishOn方法将其后面的转换操作和消息消费操作在Scheduler的线程中异步执行,而SubscribeOn()则将整个执行链条都在Scheduler线程中异步执行。

    基本示例

    Flux ints = Flux.range(1, 4)

          .map(i -> {

            if (i <= 3) return i;

            throw new RuntimeException("Got to 4");

          });

    ints.subscribe(i -> System.out.println(i),

          error -> System.err.println("Error: " + error));

    1. Flux.rang(1,4)表示生成一个1,2,3,4的整数序列
    2. .map操作表示Flux序列将经过一个转换
    3. Ints.subscribe之前不会执行任何操作,subscribe将处罚消息流的执行,onNext()方法是打印消息元素,error方法是打印“Error:”+error

    下面是Reactor-Netty-http模块中一个Http请求的Reactor使用示例:

    public Mono send() {

            if (this.markSentHeaderAndBody()) {

                HttpMessage request = this.newFullEmptyBodyMessage();

                return FutureMono.deferFuture(() -> {

                    return this.channel().writeAndFlush(request);

                });

            } else {

                return Mono.empty();

            }

        }

    调用该Send方法会构建一个Mono对象,当对该对象进行订阅时,将实际执行一个异步操作:调用this.channel().writeAndFlsh(request)发送http请求。

    总结

    本文简单的介绍了响应式编程的概念及其适应场景,及Java响应式编程规范Reactor-Streaming,及其实现者Reactor进行了接收, 后续将从Spring Web,Spring Flux,Spring Cloud Gateway等主流框架中对Reactor的使用来挖掘响应式编程的魅力。

  • 相关阅读:
    06、HSMS协议介绍
    买电脑常识——电脑性能
    怎么制作网站才能脱颖而出
    C/C++—Inline关键词
    【php快速入门】学习笔记
    程序员的 Windows 工具箱「GitHub 热点速览」
    anaconda 安装 pytorch 和 tensorflow
    SQL多个字段拼接组合成新字段的常用方法
    第四章:存储子系统 [计算机组成原理笔记](自用)
    java 二维数组与稀疏数组之间相互转换
  • 原文地址:https://blog.csdn.net/dreamsofa/article/details/134274051