• Spring响应式高并发编程


    JDK基础

    响应式编程将大量使用JDK中有关Stream流,Lambda表达式等知识

    Lambda表达式

    只能代替有一个未实现的方法的接口(函数式接口)
    可以直接代替函数式接口的实现类
    具有上下文推导功能,可以根据返回值,参数个数,推断有实现的接口方法
    简化了函数时接口的匿名实现类

    在这里插入图片描述
    语法:

    1. 参数列表不需要指定类型
    2. Lambda的类型是所实现的函数式接口的类型,所以只能用该类型的对象引用
    		QueryUser queryUser = new QueryUser() {
                @Override
                public String queryAll(String username, Integer age) 				   {
                    return username + ":" + age;
                }
            };
    
            QueryUser queryUser1 = (username, age) -> {
                return username + ":" + age;
            };
    
            System.out.println(queryUser.queryAll("a", 2));
            System.out.println(queryUser1.queryAll("a", 1));
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    在这里插入图片描述

    双冒号语法

    可以通过全类名::方法名,去引用一个方法
    至于传入方法的参数,将由jvm根据上下文的线索推断

    函数式接口

    JDK内置的函数式接口都位于java.util.function包下
    JDK内置了一些函数式接口,我们只需要通过lambda表达式实现其抽象方法即可
    @FunctionalInterface是检查接口,编译器将在编码阶段就能检查该接口是否为函数式接口

    @FunctionalInterface
    public interface Function<T, R> {
    //使用入参,得到结果并返回
        R apply(T t);
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    @FunctionalInterface
    public interface Consumer<T> {
    
    //接收入参,但不会返回结果
        void accept(T t);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    @FunctionalInterface
    public interface Supplier<T> {
    //不用传入参数,会返回结果
        T get();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    @FunctionalInterface
    public interface Predicate<T> {
    
       
        boolean test(T t);
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    StreamApi流式处理

    基于事件驱动模型,当事件发生时,将执行相应的回调函数
    可以理解为流式操作就是回调操作,或者是懒操作

    创建流对象

    JDK提供了多个api均可以得到一个流对象
    流对象:支持顺序和并行聚合操作的元素序列。
    Stream接口中定义了一系列针对元素的操作
    包括中间操作以及终结操作
    元素序列都可以转成流对象

    串行流:
    所有的中间操作都在同一个线程内完成

    ArrayList<Integer> views = new ArrayList<>();
    
    views.add(100);
    views.add(10);
    views.add(102);
    
    Stream<Integer> stream = views.stream();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    int[] viewArr = {1, 2, 3, 4, 5};
     IntStream arrStream = Arrays.stream(viewArr);
    
    • 1
    • 2
    Stream<Integer> stream1 = Stream.of(100, 2, 50, 4, 56);
    Stream<Object> stream2 = Stream.builder().add(1).add(20).add(3).build();
    Stream<Object> stream3 = Stream.concat(stream1, stream2);
    
    • 1
    • 2
    • 3

    并行流:
    将流对象由分成多个流对象,并将这些流对象以及中间操作,分配到不同的线程上,由CPU并发执行

    Stream<Integer> parallelStream = List.of(200, 1, 10, 300).parallelStream();
    Stream<Integer> parallel = Stream.of(1, 2, 3, 4, 5).parallel();
    
    • 1
    • 2
    public interface Stream<T> extends BaseStream<T, Stream<T>> {
    
       
        Stream<T> filter(Predicate<? super T> predicate);
    
        
        <R> Stream<R> map(Function<? super T, ? extends R> mapper);
    
      
        IntStream mapToInt(ToIntFunction<? super T> mapper);
    
       
        LongStream mapToLong(ToLongFunction<? super T> mapper);
    
        
        DoubleStream flatMapToDouble(Function<? super T, ? extends DoubleStream> mapper);
    
       
        void forEach(Consumer<? super T> action);
    
       
        T reduce(T identity, BinaryOperator<T> accumulator);
    
       
        Optional<T> findAny();
    
       
    Stream<T> build();
    
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    中间操作

    流是惰性的,只有指定了终止操作时,所有的中间操作才会别执行

    filter

    入参:传入一个断言对象
    predicate函数式接口提供了一个,根据入参,运行逻辑表达式的方法
    当入参,符合逻辑表达式时将返回真,否则,返回假
    返回真的元素将被放入新流中,否则,将被丢弃

    @FunctionalInterface
    public interface Predicate<T> {
    
        /**
         根据入参执行逻辑表达式。
         参数: t – 输入参数 
         如果输入参数与断言表达式匹配,则返回: true,否则为 false
         */
        boolean test(T t);
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    并行流
    public static void main(String[] args) {
    
            Stream<Integer> parallelStream = List.of(200, 1, 10, 300).parallelStream();
    
            Stream<Integer> stream1 = parallelStream.filter((item) -> {
                System.out.println("stream1:"+Thread.currentThread().getName()+":"+item);
                return item % 2 == 0;
            });
    
            Stream<Integer> stream2 = stream1.map((item) -> {
                System.out.println("stream2:"+Thread.currentThread().getName()+":"+item);
                item *= 2;
    
                return item;
            });
    
            stream2.forEach((item)-> {
                System.out.println("stream3:"+Thread.currentThread().getName()+":"+item);
            });
    
    
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    结果:

    stream1:ForkJoinPool.commonPool-worker-3:300
    stream1:ForkJoinPool.commonPool-worker-1:1
    stream1:main:10
    stream2:ForkJoinPool.commonPool-worker-3:300
    stream1:ForkJoinPool.commonPool-worker-2:200
    stream2:ForkJoinPool.commonPool-worker-2:200
    stream2:main:10
    stream3:ForkJoinPool.commonPool-worker-3:600
    stream3:main:20
    stream3:ForkJoinPool.commonPool-worker-2:400
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    从线程池中取出一个线程,去完成一个元素的所有中间操作
    比如,我这里,最初有四个元素,所以可以看到,整个过程用了四个线程
    单独看各个线程,可以看出,仍是按顺序执行各个元素的中间操作

    在这里插入图片描述

    如果是串行流的话,就是由同一线程去依次逐个完成每个元素的所有中间操作

    在这里插入图片描述

    stream1:main:200
    stream2:main:200
    stream3:main:400
    
    stream1:main:1
    
    stream1:main:10
    stream2:main:10
    stream3:main:20
    
    stream1:main:300
    stream2:main:300
    stream3:main:600
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    终结操作
    收集操作

    collect()方法
    入参:collector对象,该对象包含五个函数式接口方法
    通过这些函数式接口方法,可以把各个元素存入一个容器中

    public interface Collector<T, A, R> {
        /**
         创建存放元素的容器
         */
        Supplier<A> supplier();
    
        /**
         将元素放入容器
         */
        BiConsumer<A, T> accumulator();
    
        /**
         将两个容器合并成一个大容器
         */
        BinaryOperator<A> combiner();
    
        /**
          Perform the final transformation from the intermediate accumulation type
         */
        Function<A, R> finisher();
    
        /**
         Returns a {@code Set} of {@code Collector.Characteristics} indicating
         */
        Set<Characteristics> characteristics();
    
       
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    collectors.toList():
    相当于创建了一个collector对象

    new CollectorImpl<>(ArrayList::new, 
    					List::add,
                       (left, right) -> { left.addAll(right); return left; },
                        CH_ID);
    
    • 1
    • 2
    • 3
    • 4

    阻塞式编程和响应式编程

    大家一开始学的JDK的一些操作,多数是属于阻塞式编程,也就是各条编程语句之间存在高耦合,只有靠前的语句被执行并获得结果,后面的语句才能被执行到
    而响应式编程基于事件驱动,通过多线程共同完成任务,就能避免等待
    经典的模型包括生产者,消费者,以及缓冲队列,最大的特点是异步处理消息,生产者和消费者进行解耦,彼此都不需要等对方回应
    实现的基础是多线程协作
    提高性能的思路是,减少CPU的空闲时间,减少线程切换次数,最大程度增大CPU有效工作时间
    当CPU遇到某个线程阻塞式,马上切到其他线程工作
    所有异步线程是消息互通的,彼此可以通过触发事件,执行回调

    JUC

    JDK的Java.util.concurrent包体现了JDK的高并发思想的实现,是基于事件驱动模型

    flow

    在JUC的flow包下,专门用于构建无阻塞的异步数据流处理
    核心组件:
    发布者(生产者),订阅者(消费者),订阅关系,处理器(生产者+消费者)
    订阅者将监控生产者全生命周期事件

    public final class Flow {
    
        /**
        生产者
         */
        @FunctionalInterface
        public static interface Publisher<T> {
            /**
             订阅
             */
            public void subscribe(Subscriber<? super T> subscriber);
        }
    
        /**
        消费者
         */
        public static interface Subscriber<T> {
            /**
            在为给定订阅调用任何其他订阅服务器方法之前调用的方法。
             */
            public void onSubscribe(Subscription subscription);
    
            /**
            使用订阅的下一项调用的方法
             */
            public void onNext(T item);
    
            /**
            
             */
            public void onError(Throwable throwable);
    
            /**
            
             */
            public void onComplete();
        }
    
        /**
         链接 生产者 和 消费者 的消息控件
         */
        public static interface Subscription {
            /**
            
             */
            public void request(long n);
    
            /**
            
             */
            public void cancel();
        }
    
        /**
       同时充当订阅服务器和发布服务器的组件。
         */
        public static interface Processor<T,R> extends Subscriber<T>, Publisher<R> {
        }
    
    //默认缓冲区的大小
        static final int DEFAULT_BUFFER_SIZE = 256;
    
        public static int defaultBufferSize() {
            return DEFAULT_BUFFER_SIZE;
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67

    响应式流

    和Stream流类似,reactor框架中也有元素流的概念,其中,0或1个元素称为Mono流,0到N个元素,称为Flux流
    响应式流基于惰性调用,当不执行订阅操作时,流是不会自己执行的

    Flux流

    一个带有rx操作符的响应式流发布者,它会发出0到N个元素,然后发出成功或错误的信号。
    简单的说就是,消费者能拿到什么数据,只由生产者生产什么数据决定,并且,当没有消费者需求时,生产者将不会生产数据,并且,所有消费者拿到的数据都是相同的

    它旨在用于实现和返回类型。尽可能保持使用原始的Publisher作为输入参数。

    如果已知底层Publisher将发出0或1个元素,则应使用Mono。

    请注意,在Flux操作符内部使用的java.util.function /
    lambdas中避免使用状态,因为这些状态可能在多个订阅者之间共享。

    subscribe(CoreSubscriber)是用于上下文传递的内部扩展,用于内部使用。用户提供的Subscriber可以传递给此“subscribe”扩展,但将失去可用的每个订阅的Hooks.onLastOperator。

    泛型 -Flux流所传递的元素类型

    public abstract class Flux<T> implements CorePublisher<T> {
    
    //创建一个Flux,它会发出提供的元素,然后完成。
    
    参数:
    data - 要发出的元素,作为可变参数
    返回:
    一个新的Flux
    		public static <T> Flux<T> just(T... data) {
    			return fromArray(data);
    		}
    
    //订阅此Flux的Consumer,它将分别消耗序列中的所有元素,处理错误并对完成做出反应。此外,一个上下文与订阅相关联。在订阅时,隐式地进行无界请求。
    
    对于一个被动版本,观察并转发传入数据,请参阅doOnNext(Consumer)doOnError(Consumer)doOnComplete(Runnable)doOnSubscribe(Consumer)。
    
    对于一个给予您更多对背压和请求控制的版本,请参阅subscribe(Subscriber),其中包含一个BaseSubscriber。
    
    请记住,由于序列可能是异步的,因此这将立即将控制返回给调用线程。例如,在主线程或单元测试中执行时,这可能会给人一种消费者未被调用的印象。
    
    参数:
    consumer - 在每个值上调用的消费者
    errorConsumer - 在错误信号上调用的消费者
    completeConsumer - 在完成信号上调用的消费者
    initialContext - 与订阅相关联的基础上下文,将可见于上游操作符
    返回:
    一个新的Disposable,可用于取消底层订阅
    
    		public final Disposable subscribe(
    			@Nullable Consumer<? super T> consumer,
    			@Nullable Consumer<? super Throwable> errorConsumer,
    			@Nullable Runnable completeConsumer,
    			@Nullable Context initialContext) {
    		return subscribeWith(
    		new LambdaSubscriber<>(consumer, errorConsumer,
    				completeConsumer,
    				null,
    				initialContext)
    );
    	}
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    public static void main(String[] args) {
            Flux<Integer> flux = Flux.just(1, 2, 3, 4, 5);
            Disposable disposable = flux.subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer element) {
                    System.out.println(element);
                }
            });
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    Mono流

    一个具有基本rx操作符的响应式流发布者,通过onNext信号发出最多一个项,然后以onComplete信号终止(成功的Mono,有值或无值),或者仅发出单个onError信号(失败的Mono)。
    大多数Mono实现在调用Subscriber.onNext(T)之后都应立即调用Subscriber.onComplete()。Mono.never()是一个特例:它不发出任何信号,虽然在测试之外没有什么用处,但从技术上讲并不是被禁止的。另一方面,明确禁止使用onNext和onError的组合。
    学习Mono
    API并发现新操作符的推荐方法是通过参考文档,而不是通过此Javadoc(与学习单个操作符相反)。请参阅“我需要哪个操作符?”附录。

    rx操作符将为输入Mono类型提供别名,以保留结果Mono的“最多一个”属性。例如,flatMap返回一个Mono,而有可能会有多个发射的flatMapMany别名。
    应该使用Mono来表示只完成而没有任何值的发布者。 它旨在用于实现和返回类型,输入参数应尽可能使用原始发布者。
    请注意,在Mono操作符内部使用java.util.function /
    lambdas中的状态应该避免,因为这些状态可能在多个订阅者之间共享。
    泛型-此类的单个值的类型

    public abstract class Mono<T> implements CorePublisher<T> {
    	public static <T> Mono<T> just(T data) {
    		return onAssembly(new MonoJust<>(data));
    	}
    
    	public final Disposable subscribe(Consumer<? super T> consumer) {
    		Objects.requireNonNull(consumer, "consumer");
    		return subscribe(consumer, null, null);
    	}
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    public static void main(String[] args) {
            Mono<Integer> mono = Mono.just(1);
            mono.subscribe(e -> {
                e += 1;
                System.out.println(Thread.currentThread().getName()+":"+e);
            });
    
            mono.subscribe(e -> {
                System.out.println(Thread.currentThread().getName()+":"+e);
            });
    
            mono.subscribe(e -> {
                System.out.println(Thread.currentThread().getName()+":"+e);
            });
    
            mono.subscribe(e -> {
                System.out.println(Thread.currentThread().getName()+":"+e);
            });
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    结果:

    main:2
    main:1
    main:1
    main:1
    
    • 1
    • 2
    • 3
    • 4

    事件回调

    信号,专门用于表示流中的元素的状态:
    `public enum SignalType {

    /**
     * A signal when the subscription is triggered
     */
    SUBSCRIBE,
    /**
     * A signal when a request is made through the subscription
     */
    REQUEST,
    /**
     * A signal when the subscription is cancelled
     */
    CANCEL,
    /**
     * A signal when an operator receives a subscription
     */
    ON_SUBSCRIBE,
    /**
     * A signal when an operator receives an emitted value
     */
    ON_NEXT,
    /**
     * A signal when an operator receives an error
     */
    ON_ERROR,
    /**
     * A signal when an operator completes
     */
    ON_COMPLETE,
    /**
     * A signal when an operator completes
     */
    AFTER_TERMINATE,
    /**
     * A context read signal
     */
    CURRENT_CONTEXT,
    /**
     * A context update signal
     */
    ON_CONTEXT;
    
    @Override
    public String toString() {
    	switch (this) {
    		case ON_SUBSCRIBE:
    			return "onSubscribe";
    		case ON_NEXT:
    			return "onNext";
    		case ON_ERROR:
    			return "onError";
    		case ON_COMPLETE:
    			return "onComplete";
    		case REQUEST:
    			return "request";
    		case CANCEL:
    			return "cancel";
    		case CURRENT_CONTEXT:
    			return "currentContext";
    		case ON_CONTEXT:
    			return "onContextUpdate";
    		case AFTER_TERMINATE:
    			return "afterTerminate";
    		default:
    			return "subscribe";
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64

    `

    当信号发出时,将会执行回调函数
    doOnxxx方法专门用来传递信号
    onXXX是一个回调函数,是在接收到信号后执行的操作
    map,filter,distinct等方法,是中间操作,可以得到一个新流
    事件回调:
    doOnComplete,doOnCancel,doOnError,doOnEach,doOnNext

    在这里插入图片描述

    public static void main(String[] args) {
    
            Flux<Integer> flux = Flux.just(1, 2, 3, 4, 5).doOnComplete(() -> System.out.println(Thread.currentThread().getName() + ":"+"Flux流完成了"));;
    
    
    
            flux.subscribe(e -> {
                System.out.println(Thread.currentThread().getName() + ":"+e);
            });
    
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    main:1
    main:2
    main:3
    main:4
    main:5
    main:Flux流完成了
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
        public static void main(String[] args) {
    
            Flux<Integer> flux = Flux.just(1, 2, 3, 4, 5).doOnComplete(() -> System.out.println(Thread.currentThread().getName() + ":"+"Flux流完成了"));;
    
    
    
            flux.subscribe(e -> {
                System.out.println(Thread.currentThread().getName() + ":"+e);
            });
    
            flux.subscribe(new Subscriber<Integer>() {
                @Override
                public void onSubscribe(Subscription subscription) {
                    System.out.println("生产者,消费者订阅关系建立:" + subscription);
                    subscription.request(6);
                }
    
                @Override
                public void onNext(Integer integer) {
                    System.out.println("下一个元素到达:"+integer);
                }
    
                @Override
                public void onError(Throwable throwable) {
                    System.out.println("接收信号的时候出错了:" + throwable.getMessage());
                }
    
                @Override
                public void onComplete() {
                    System.out.println("接收元素完成了:");
                }
            });
    
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    main:1
    main:2
    main:3
    main:4
    main:5
    main:Flux流完成了
    main生产者,消费者订阅关系建立:reactor.core.publisher.StrictSubscriber@17c68925
    main下一个元素到达:1
    main下一个元素到达:2
    main下一个元素到达:3
    main下一个元素到达:4
    main下一个元素到达:5
    main:Flux流完成了
    main接收元素完成了:
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    由结果可以得出结论:Subscriber接口定义一系列由生产者信号所触发的回调函数
    消费者可以直接通过信号感知到生产者所生产的元素的状态

    Flux<Integer> flux = Flux.range(1, 50);
    
            flux.subscribe(new BaseSubscriber<Integer>() {
                @Override
                protected Subscription upstream() {
                    return super.upstream();
                }
    
                @Override
                public boolean isDisposed() {
                    return super.isDisposed();
                }
    
                @Override
                public void dispose() {
                    super.dispose();
                }
    
                @Override
                protected void hookOnSubscribe(Subscription subscription) {
                    super.hookOnSubscribe(subscription);
                }
    
                @Override
                protected void hookOnNext(Integer value) {
                    super.hookOnNext(value);
                }
    
                @Override
                protected void hookOnComplete() {
                    super.hookOnComplete();
                }
    
                @Override
                protected void hookOnError(Throwable throwable) {
                    super.hookOnError(throwable);
                }
    
                @Override
                protected void hookOnCancel() {
                    super.hookOnCancel();
                }
    
                @Override
                protected void hookFinally(SignalType type) {
                    super.hookFinally(type);
                }
    
                @Override
                public String toString() {
                    return super.toString();
                }
            });
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
            Flux<Integer> flux = Flux.range(1, 50);
    
            flux.subscribe(new BaseSubscriber<Integer>() {
                @Override
                protected void hookOnSubscribe(Subscription subscription) {
                    System.out.println(Thread.currentThread().getName()+"消费者和生产者绑定订阅关系" + subscription);
                    request(1);
                }
    
                @Override
                protected void hookOnNext(Integer value) {
                    System.out.println(Thread.currentThread().getName()+"元素到达" + value);
                    if (value==20){
                        this.cancel();
                        System.out.println("流元素20被取消了");
                    }
                    request(1);
                }
    
                @Override
                protected void hookOnComplete() {
                    System.out.println(Thread.currentThread().getName()+"流正常结束");
                }
    
                @Override
                protected void hookOnError(Throwable throwable) {
                    System.out.println(Thread.currentThread().getName()+"流异常结束" + throwable.getMessage());
                }
    
                @Override
                protected void hookOnCancel() {
                    System.out.println(Thread.currentThread().getName()+"流被取消了");
                }
    
                @Override
                protected void hookFinally(SignalType type) {
                    System.out.println(Thread.currentThread().getName()+"最终执行" + type);
                }
    
            });
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40

    缓冲区

    生产者可以将多个元素放进缓存中,在一起发给消费者

    public static void main(String[] args) {
    
            Flux<List<Integer>> flux = Flux.range(1, 183).buffer(10);
    
            flux.subscribe(list -> {
                System.out.println(list.size()+"接收到缓冲区中的元素:" + list);
            });
    
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    10接收到缓冲区中的元素:[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
    10接收到缓冲区中的元素:[11, 12, 13, 14, 15, 16, 17, 18, 19, 20]
    10接收到缓冲区中的元素:[21, 22, 23, 24, 25, 26, 27, 28, 29, 30]
    10接收到缓冲区中的元素:[31, 32, 33, 34, 35, 36, 37, 38, 39, 40]
    10接收到缓冲区中的元素:[41, 42, 43, 44, 45, 46, 47, 48, 49, 50]
    10接收到缓冲区中的元素:[51, 52, 53, 54, 55, 56, 57, 58, 59, 60]
    10接收到缓冲区中的元素:[61, 62, 63, 64, 65, 66, 67, 68, 69, 70]
    10接收到缓冲区中的元素:[71, 72, 73, 74, 75, 76, 77, 78, 79, 80]
    10接收到缓冲区中的元素:[81, 82, 83, 84, 85, 86, 87, 88, 89, 90]
    10接收到缓冲区中的元素:[91, 92, 93, 94, 95, 96, 97, 98, 99, 100]
    10接收到缓冲区中的元素:[101, 102, 103, 104, 105, 106, 107, 108, 109, 110]
    10接收到缓冲区中的元素:[111, 112, 113, 114, 115, 116, 117, 118, 119, 120]
    10接收到缓冲区中的元素:[121, 122, 123, 124, 125, 126, 127, 128, 129, 130]
    10接收到缓冲区中的元素:[131, 132, 133, 134, 135, 136, 137, 138, 139, 140]
    10接收到缓冲区中的元素:[141, 142, 143, 144, 145, 146, 147, 148, 149, 150]
    10接收到缓冲区中的元素:[151, 152, 153, 154, 155, 156, 157, 158, 159, 160]
    10接收到缓冲区中的元素:[161, 162, 163, 164, 165, 166, 167, 168, 169, 170]
    10接收到缓冲区中的元素:[171, 172, 173, 174, 175, 176, 177, 178, 179, 180]
    3接收到缓冲区中的元素:[181, 182, 183]
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    限流操作

    limitRate

    public static void main(String[] args) {
    
            Flux<Integer> flux = Flux.range(1, 183).log().limitRate(50);
    
            flux.subscribe(new BaseSubscriber<Integer>() {
                @Override
                protected void hookOnSubscribe(Subscription subscription) {
                    System.out.println("订阅关系建立" + subscription);
                    request(1);
                }
    
                @Override
                protected void hookOnNext(Integer value) {
                    System.out.println("下一个元素到达" + value);
                    request(1);
                }
            });
    
    
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    [main] INFO reactor.Flux.Range.1 - | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription)
    [main] INFO reactor.Flux.Range.1 - | request(50)
    [main] INFO reactor.Flux.Range.1 - | onNext(1)
    [main] INFO reactor.Flux.Range.1 - | onNext(2)
    [main] INFO reactor.Flux.Range.1 - | onNext(3)
    [main] INFO reactor.Flux.Range.1 - | onNext(4)
    [main] INFO reactor.Flux.Range.1 - | onNext(5)
    [main] INFO reactor.Flux.Range.1 - | onNext(6)
    [main] INFO reactor.Flux.Range.1 - | onNext(7)
    [main] INFO reactor.Flux.Range.1 - | onNext(8)
    [main] INFO reactor.Flux.Range.1 - | onNext(9)
    [main] INFO reactor.Flux.Range.1 - | onNext(10)
    [main] INFO reactor.Flux.Range.1 - | onNext(11)
    [main] INFO reactor.Flux.Range.1 - | onNext(12)
    [main] INFO reactor.Flux.Range.1 - | onNext(13)
    [main] INFO reactor.Flux.Range.1 - | onNext(14)
    [main] INFO reactor.Flux.Range.1 - | onNext(15)
    [main] INFO reactor.Flux.Range.1 - | onNext(16)
    [main] INFO reactor.Flux.Range.1 - | onNext(17)
    [main] INFO reactor.Flux.Range.1 - | onNext(18)
    [main] INFO reactor.Flux.Range.1 - | onNext(19)
    [main] INFO reactor.Flux.Range.1 - | onNext(20)
    [main] INFO reactor.Flux.Range.1 - | onNext(21)
    [main] INFO reactor.Flux.Range.1 - | onNext(22)
    [main] INFO reactor.Flux.Range.1 - | onNext(23)
    [main] INFO reactor.Flux.Range.1 - | onNext(24)
    [main] INFO reactor.Flux.Range.1 - | onNext(25)
    [main] INFO reactor.Flux.Range.1 - | onNext(26)
    [main] INFO reactor.Flux.Range.1 - | onNext(27)
    [main] INFO reactor.Flux.Range.1 - | onNext(28)
    [main] INFO reactor.Flux.Range.1 - | onNext(29)
    [main] INFO reactor.Flux.Range.1 - | onNext(30)
    [main] INFO reactor.Flux.Range.1 - | onNext(31)
    [main] INFO reactor.Flux.Range.1 - | onNext(32)
    [main] INFO reactor.Flux.Range.1 - | onNext(33)
    订阅关系建立reactor.core.publisher.FluxPublishOn$PublishOnSubscriber@4157f54e
    下一个元素到达1
    下一个元素到达2
    下一个元素到达3
    下一个元素到达4
    下一个元素到达5
    下一个元素到达6
    下一个元素到达7
    下一个元素到达8
    下一个元素到达9
    下一个元素到达10
    下一个元素到达11
    下一个元素到达12
    下一个元素到达13
    下一个元素到达14
    下一个元素到达15
    下一个元素到达16
    下一个元素到达17
    下一个元素到达18
    下一个元素到达19
    下一个元素到达20
    下一个元素到达21
    下一个元素到达22
    下一个元素到达23
    下一个元素到达24
    下一个元素到达25
    下一个元素到达26
    下一个元素到达27
    下一个元素到达28
    下一个元素到达29
    下一个元素到达30
    下一个元素到达31
    下一个元素到达32
    下一个元素到达33
    下一个元素到达34
    下一个元素到达35
    下一个元素到达36
    下一个元素到达37
    下一个元素到达38
    下一个元素到达39
    下一个元素到达40
    下一个元素到达41
    下一个元素到达42
    下一个元素到达43
    下一个元素到达44
    下一个元素到达45
    下一个元素到达46
    下一个元素到达47
    下一个元素到达48
    下一个元素到达49
    下一个元素到达50
    下一个元素到达51
    下一个元素到达52
    下一个元素到达53
    下一个元素到达54
    下一个元素到达55
    下一个元素到达56
    下一个元素到达57
    下一个元素到达58
    下一个元素到达59
    下一个元素到达60
    下一个元素到达61
    下一个元素到达62
    下一个元素到达63
    下一个元素到达64
    下一个元素到达65
    下一个元素到达66
    下一个元素到达67
    下一个元素到达68
    下一个元素到达69
    下一个元素到达70
    下一个元素到达71
    下一个元素到达72
    下一个元素到达73
    下一个元素到达74
    下一个元素到达75
    下一个元素到达76
    下一个元素到达77
    下一个元素到达78
    下一个元素到达79
    下一个元素到达80
    下一个元素到达81
    下一个元素到达82
    下一个元素到达83
    下一个元素到达84
    下一个元素到达85
    下一个元素到达86
    下一个元素到达87
    下一个元素到达88
    下一个元素到达89
    下一个元素到达90
    下一个元素到达91
    下一个元素到达92
    下一个元素到达93
    下一个元素到达94
    下一个元素到达95
    下一个元素到达96
    下一个元素到达97
    下一个元素到达98
    下一个元素到达99
    下一个元素到达100
    下一个元素到达101
    下一个元素到达102
    下一个元素到达103
    下一个元素到达104
    下一个元素到达105
    下一个元素到达106
    下一个元素到达107
    下一个元素到达108
    下一个元素到达109
    下一个元素到达110
    下一个元素到达111
    下一个元素到达112
    下一个元素到达113
    下一个元素到达114
    下一个元素到达115
    下一个元素到达116
    下一个元素到达117
    下一个元素到达118
    下一个元素到达119
    下一个元素到达120
    下一个元素到达121
    下一个元素到达122
    下一个元素到达123
    下一个元素到达124
    下一个元素到达125
    下一个元素到达126
    下一个元素到达127
    下一个元素到达128
    下一个元素到达129
    下一个元素到达130
    下一个元素到达131
    下一个元素到达132
    下一个元素到达133
    下一个元素到达134
    下一个元素到达135
    下一个元素到达136
    下一个元素到达137
    下一个元素到达138
    下一个元素到达139
    下一个元素到达140
    下一个元素到达141
    下一个元素到达142
    下一个元素到达143
    下一个元素到达144
    下一个元素到达145
    下一个元素到达146
    下一个元素到达147
    下一个元素到达148
    下一个元素到达149
    下一个元素到达150
    下一个元素到达151
    下一个元素到达152
    下一个元素到达153
    下一个元素到达154
    下一个元素到达155
    下一个元素到达156
    下一个元素到达157
    下一个元素到达158
    下一个元素到达159
    下一个元素到达160
    下一个元素到达161
    下一个元素到达162
    下一个元素到达163
    下一个元素到达164
    下一个元素到达165
    下一个元素到达166
    下一个元素到达167
    下一个元素到达168
    下一个元素到达169
    下一个元素到达170
    下一个元素到达171
    下一个元素到达172
    [main] INFO reactor.Flux.Range.1 - | onNext(34)
    [main] INFO reactor.Flux.Range.1 - | onNext(35)
    [main] INFO reactor.Flux.Range.1 - | onNext(36)
    [main] INFO reactor.Flux.Range.1 - | onNext(37)
    [main] INFO reactor.Flux.Range.1 - | onNext(38)
    [main] INFO reactor.Flux.Range.1 - | request(38)
    [main] INFO reactor.Flux.Range.1 - | onNext(39)
    [main] INFO reactor.Flux.Range.1 - | onNext(40)
    [main] INFO reactor.Flux.Range.1 - | onNext(41)
    [main] INFO reactor.Flux.Range.1 - | onNext(42)
    [main] INFO reactor.Flux.Range.1 - | onNext(43)
    [main] INFO reactor.Flux.Range.1 - | onNext(44)
    [main] INFO reactor.Flux.Range.1 - | onNext(45)
    [main] INFO reactor.Flux.Range.1 - | onNext(46)
    [main] INFO reactor.Flux.Range.1 - | onNext(47)
    [main] INFO reactor.Flux.Range.1 - | onNext(48)
    [main] INFO reactor.Flux.Range.1 - | onNext(49)
    [main] INFO reactor.Flux.Range.1 - | onNext(50)
    [main] INFO reactor.Flux.Range.1 - | onNext(51)
    [main] INFO reactor.Flux.Range.1 - | onNext(52)
    [main] INFO reactor.Flux.Range.1 - | onNext(53)
    [main] INFO reactor.Flux.Range.1 - | onNext(54)
    [main] INFO reactor.Flux.Range.1 - | onNext(55)
    [main] INFO reactor.Flux.Range.1 - | onNext(56)
    [main] INFO reactor.Flux.Range.1 - | onNext(57)
    [main] INFO reactor.Flux.Range.1 - | onNext(58)
    [main] INFO reactor.Flux.Range.1 - | onNext(59)
    [main] INFO reactor.Flux.Range.1 - | onNext(60)
    [main] INFO reactor.Flux.Range.1 - | onNext(61)
    [main] INFO reactor.Flux.Range.1 - | onNext(62)
    [main] INFO reactor.Flux.Range.1 - | onNext(63)
    [main] INFO reactor.Flux.Range.1 - | onNext(64)
    [main] INFO reactor.Flux.Range.1 - | onNext(65)
    [main] INFO reactor.Flux.Range.1 - | onNext(66)
    [main] INFO reactor.Flux.Range.1 - | onNext(67)
    [main] INFO reactor.Flux.Range.1 - | onNext(68)
    [main] INFO reactor.Flux.Range.1 - | onNext(69)
    [main] INFO reactor.Flux.Range.1 - | onNext(70)
    [main] INFO reactor.Flux.Range.1 - | onNext(71)
    [main] INFO reactor.Flux.Range.1 - | onNext(72)
    [main] INFO reactor.Flux.Range.1 - | onNext(73)
    [main] INFO reactor.Flux.Range.1 - | onNext(74)
    [main] INFO reactor.Flux.Range.1 - | onNext(75)
    [main] INFO reactor.Flux.Range.1 - | onNext(76)
    [main] INFO reactor.Flux.Range.1 - | request(38)
    [main] INFO reactor.Flux.Range.1 - | onNext(77)
    [main] INFO reactor.Flux.Range.1 - | onNext(78)
    [main] INFO reactor.Flux.Range.1 - | onNext(79)
    [main] INFO reactor.Flux.Range.1 - | onNext(80)
    [main] INFO reactor.Flux.Range.1 - | onNext(81)
    [main] INFO reactor.Flux.Range.1 - | onNext(82)
    [main] INFO reactor.Flux.Range.1 - | onNext(83)
    [main] INFO reactor.Flux.Range.1 - | onNext(84)
    [main] INFO reactor.Flux.Range.1 - | onNext(85)
    [main] INFO reactor.Flux.Range.1 - | onNext(86)
    [main] INFO reactor.Flux.Range.1 - | onNext(87)
    [main] INFO reactor.Flux.Range.1 - | onNext(88)
    [main] INFO reactor.Flux.Range.1 - | onNext(89)
    [main] INFO reactor.Flux.Range.1 - | onNext(90)
    [main] INFO reactor.Flux.Range.1 - | onNext(91)
    [main] INFO reactor.Flux.Range.1 - | onNext(92)
    [main] INFO reactor.Flux.Range.1 - | onNext(93)
    [main] INFO reactor.Flux.Range.1 - | onNext(94)
    [main] INFO reactor.Flux.Range.1 - | onNext(95)
    [main] INFO reactor.Flux.Range.1 - | onNext(96)
    [main] INFO reactor.Flux.Range.1 - | onNext(97)
    [main] INFO reactor.Flux.Range.1 - | onNext(98)
    [main] INFO reactor.Flux.Range.1 - | onNext(99)
    [main] INFO reactor.Flux.Range.1 - | onNext(100)
    [main] INFO reactor.Flux.Range.1 - | onNext(101)
    [main] INFO reactor.Flux.Range.1 - | onNext(102)
    [main] INFO reactor.Flux.Range.1 - | onNext(103)
    [main] INFO reactor.Flux.Range.1 - | onNext(104)
    [main] INFO reactor.Flux.Range.1 - | onNext(105)
    [main] INFO reactor.Flux.Range.1 - | onNext(106)
    [main] INFO reactor.Flux.Range.1 - | onNext(107)
    [main] INFO reactor.Flux.Range.1 - | onNext(108)
    [main] INFO reactor.Flux.Range.1 - | onNext(109)
    [main] INFO reactor.Flux.Range.1 - | onNext(110)
    [main] INFO reactor.Flux.Range.1 - | onNext(111)
    [main] INFO reactor.Flux.Range.1 - | onNext(112)
    [main] INFO reactor.Flux.Range.1 - | onNext(113)
    [main] INFO reactor.Flux.Range.1 - | onNext(114)
    [main] INFO reactor.Flux.Range.1 - | request(38)
    [main] INFO reactor.Flux.Range.1 - | onNext(115)
    [main] INFO reactor.Flux.Range.1 - | onNext(116)
    [main] INFO reactor.Flux.Range.1 - | onNext(117)
    [main] INFO reactor.Flux.Range.1 - | onNext(118)
    [main] INFO reactor.Flux.Range.1 - | onNext(119)
    [main] INFO reactor.Flux.Range.1 - | onNext(120)
    [main] INFO reactor.Flux.Range.1 - | onNext(121)
    [main] INFO reactor.Flux.Range.1 - | onNext(122)
    [main] INFO reactor.Flux.Range.1 - | onNext(123)
    [main] INFO reactor.Flux.Range.1 - | onNext(124)
    [main] INFO reactor.Flux.Range.1 - | onNext(125)
    [main] INFO reactor.Flux.Range.1 - | onNext(126)
    [main] INFO reactor.Flux.Range.1 - | onNext(127)
    [main] INFO reactor.Flux.Range.1 - | onNext(128)
    [main] INFO reactor.Flux.Range.1 - | onNext(129)
    [main] INFO reactor.Flux.Range.1 - | onNext(130)
    [main] INFO reactor.Flux.Range.1 - | onNext(131)
    [main] INFO reactor.Flux.Range.1 - | onNext(132)
    [main] INFO reactor.Flux.Range.1 - | onNext(133)
    [main] INFO reactor.Flux.Range.1 - | onNext(134)
    [main] INFO reactor.Flux.Range.1 - | onNext(135)
    [main] INFO reactor.Flux.Range.1 - | onNext(136)
    [main] INFO reactor.Flux.Range.1 - | onNext(137)
    [main] INFO reactor.Flux.Range.1 - | onNext(138)
    [main] INFO reactor.Flux.Range.1 - | onNext(139)
    [main] INFO reactor.Flux.Range.1 - | onNext(140)
    [main] INFO reactor.Flux.Range.1 - | onNext(141)
    [main] INFO reactor.Flux.Range.1 - | onNext(142)
    [main] INFO reactor.Flux.Range.1 - | onNext(143)
    [main] INFO reactor.Flux.Range.1 - | onNext(144)
    [main] INFO reactor.Flux.Range.1 - | onNext(145)
    [main] INFO reactor.Flux.Range.1 - | onNext(146)
    [main] INFO reactor.Flux.Range.1 - | onNext(147)
    [main] INFO reactor.Flux.Range.1 - | onNext(148)
    [main] INFO reactor.Flux.Range.1 - | onNext(149)
    [main] INFO reactor.Flux.Range.1 - | onNext(150)
    [main] INFO reactor.Flux.Range.1 - | onNext(151)
    [main] INFO reactor.Flux.Range.1 - | onNext(152)
    [main] INFO reactor.Flux.Range.1 - | request(38)
    [main] INFO reactor.Flux.Range.1 - | onNext(153)
    [main] INFO reactor.Flux.Range.1 - | onNext(154)
    [main] INFO reactor.Flux.Range.1 - | onNext(155)
    [main] INFO reactor.Flux.Range.1 - | onNext(156)
    [main] INFO reactor.Flux.Range.1 - | onNext(157)
    [main] INFO reactor.Flux.Range.1 - | onNext(158)
    [main] INFO reactor.Flux.Range.1 - | onNext(159)
    [main] INFO reactor.Flux.Range.1 - | onNext(160)
    [main] INFO reactor.Flux.Range.1 - | onNext(161)
    [main] INFO reactor.Flux.Range.1 - | onNext(162)
    [main] INFO reactor.Flux.Range.1 - | onNext(163)
    [main] INFO reactor.Flux.Range.1 - | onNext(164)
    [main] INFO reactor.Flux.Range.1 - | onNext(165)
    [main] INFO reactor.Flux.Range.1 - | onNext(166)
    [main] INFO reactor.Flux.Range.1 - | onNext(167)
    [main] INFO reactor.Flux.Range.1 - | onNext(168)
    [main] INFO reactor.Flux.Range.1 - | onNext(169)
    [main] INFO reactor.Flux.Range.1 - | onNext(170)
    [main] INFO reactor.Flux.Range.1 - | onNext(171)
    [main] INFO reactor.Flux.Range.1 - | onNext(172)
    [main] INFO reactor.Flux.Range.1 - | onNext(173)
    [main] INFO reactor.Flux.Range.1 - | onNext(174)
    [main] INFO reactor.Flux.Range.1 - | onNext(175)
    [main] INFO reactor.Flux.Range.1 - | onNext(176)
    [main] INFO reactor.Flux.Range.1 - | onNext(177)
    [main] INFO reactor.Flux.Range.1 - | onNext(178)
    [main] INFO reactor.Flux.Range.1 - | onNext(179)
    [main] INFO reactor.Flux.Range.1 - | onNext(180)
    [main] INFO reactor.Flux.Range.1 - | onNext(181)
    [main] INFO reactor.Flux.Range.1 - | onNext(182)
    [main] INFO reactor.Flux.Range.1 - | onNext(183)
    [main] INFO reactor.Flux.Range.1 - | onComplete()
    下一个元素到达173
    下一个元素到达174
    下一个元素到达175
    下一个元素到达176
    下一个元素到达177
    下一个元素到达178
    下一个元素到达179
    下一个元素到达180
    下一个元素到达181
    下一个元素到达182
    下一个元素到达183
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166
    • 167
    • 168
    • 169
    • 170
    • 171
    • 172
    • 173
    • 174
    • 175
    • 176
    • 177
    • 178
    • 179
    • 180
    • 181
    • 182
    • 183
    • 184
    • 185
    • 186
    • 187
    • 188
    • 189
    • 190
    • 191
    • 192
    • 193
    • 194
    • 195
    • 196
    • 197
    • 198
    • 199
    • 200
    • 201
    • 202
    • 203
    • 204
    • 205
    • 206
    • 207
    • 208
    • 209
    • 210
    • 211
    • 212
    • 213
    • 214
    • 215
    • 216
    • 217
    • 218
    • 219
    • 220
    • 221
    • 222
    • 223
    • 224
    • 225
    • 226
    • 227
    • 228
    • 229
    • 230
    • 231
    • 232
    • 233
    • 234
    • 235
    • 236
    • 237
    • 238
    • 239
    • 240
    • 241
    • 242
    • 243
    • 244
    • 245
    • 246
    • 247
    • 248
    • 249
    • 250
    • 251
    • 252
    • 253
    • 254
    • 255
    • 256
    • 257
    • 258
    • 259
    • 260
    • 261
    • 262
    • 263
    • 264
    • 265
    • 266
    • 267
    • 268
    • 269
    • 270
    • 271
    • 272
    • 273
    • 274
    • 275
    • 276
    • 277
    • 278
    • 279
    • 280
    • 281
    • 282
    • 283
    • 284
    • 285
    • 286
    • 287
    • 288
    • 289
    • 290
    • 291
    • 292
    • 293
    • 294
    • 295
    • 296
    • 297
    • 298
    • 299
    • 300
    • 301
    • 302
    • 303
    • 304
    • 305
    • 306
    • 307
    • 308
    • 309
    • 310
    • 311
    • 312
    • 313
    • 314
    • 315
    • 316
    • 317
    • 318
    • 319
    • 320
    • 321
    • 322
    • 323
    • 324
    • 325
    • 326
    • 327
    • 328
    • 329
    • 330
    • 331
    • 332
    • 333
    • 334
    • 335
    • 336
    • 337
    • 338
    • 339
    • 340
    • 341
    • 342
    • 343
    • 344
    • 345
    • 346
    • 347
    • 348
    • 349
    • 350
    • 351
    • 352
    • 353
    • 354
    • 355
    • 356
    • 357
    • 358
    • 359
    • 360
    • 361
    • 362
    • 363
    • 364
    • 365
    • 366
    • 367
    • 368
    • 369
    • 370
    • 371
    • 372
    • 373
    • 374

    手动产生流

    通过消费者回调和某些状态逐个生成信号,以编程方式创建 Flux。stateSupplier 可能返回 null。
    参数:
    stateSupplier – 要求每个传入用户为生成器双功能生成器提供初始状态 使用 Reactor 为每个用户提供的 SynchronousSink 以及当前状态,以在每次传递时生成单个信号并返回(新)状态。

    单线程:generate()

    public static void main(String[] args) {
    
            Flux<Object> flux = Flux.generate(() -> 0, (state, sink) -> {
    
                if (state < 20) {
                    sink.next(state);
                } else {
                    sink.complete();
                }
    
                return state + 1;
            }).log();
    
            flux.subscribe(System.out::println);
    
    
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    [main] INFO reactor.Flux.Generate.1 - | onSubscribe([Fuseable] FluxGenerate.GenerateSubscription)
    [main] INFO reactor.Flux.Generate.1 - | request(unbounded)
    [main] INFO reactor.Flux.Generate.1 - | onNext(0)
    [main] INFO reactor.Flux.Generate.1 - | onNext(1)
    [main] INFO reactor.Flux.Generate.1 - | onNext(2)
    [main] INFO reactor.Flux.Generate.1 - | onNext(3)
    [main] INFO reactor.Flux.Generate.1 - | onNext(4)
    [main] INFO reactor.Flux.Generate.1 - | onNext(5)
    [main] INFO reactor.Flux.Generate.1 - | onNext(6)
    [main] INFO reactor.Flux.Generate.1 - | onNext(7)
    [main] INFO reactor.Flux.Generate.1 - | onNext(8)
    [main] INFO reactor.Flux.Generate.1 - | onNext(9)
    [main] INFO reactor.Flux.Generate.1 - | onNext(10)
    [main] INFO reactor.Flux.Generate.1 - | onNext(11)
    [main] INFO reactor.Flux.Generate.1 - | onNext(12)
    [main] INFO reactor.Flux.Generate.1 - | onNext(13)
    [main] INFO reactor.Flux.Generate.1 - | onNext(14)
    [main] INFO reactor.Flux.Generate.1 - | onNext(15)
    [main] INFO reactor.Flux.Generate.1 - | onNext(16)
    [main] INFO reactor.Flux.Generate.1 - | onNext(17)
    [main] INFO reactor.Flux.Generate.1 - | onNext(18)
    [main] INFO reactor.Flux.Generate.1 - | onNext(19)
    [main] INFO reactor.Flux.Generate.1 - | onComplete()
    0
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43

    异步线程:create

    public static void main(String[] args) {
    
            Flux<Object> flux = Flux.create(fluxSink -> {
                UserServiceImpl userService = new UserServiceImpl(fluxSink);
                userService.online();
            }).log();
    
            flux.subscribe(System.out::println);
    
    
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    [main] INFO reactor.Flux.Create.1 - onSubscribe(FluxCreate.BufferAsyncSink)
    [main] INFO reactor.Flux.Create.1 - request(unbounded)
    main:用户上线
    main
    [main] INFO reactor.Flux.Create.1 - onNext(main)
    
    • 1
    • 2
    • 3
    • 4
    • 5

    自定义处理器

    处理器将直接返回新流,处理器既是消费者又是生产者

    通过调用具有每个 onNext 的输出接收器的双使用者来处理此 Flux 发出的项目。最多必须执行一个 SynchronousSink.next(Object) 调用,或者执行 0 或 1 个 SynchronousSink.error(Throwable) 或 SynchronousSink.complete()。错误模式支持:当 BiConsumer 抛出异常或通过 SynchronousSink.error(Throwable) 显式发出错误信号时,此运算符支持在错误时恢复(包括启用融合时)。
    Params: handler – 处理 BiConsumer
    Returns:转换后的 Flux

    Flux<Object> flux = Flux.range(100, 150)
                    .handle(((integer, sink) -> {
                        integer *= 2;
                        sink.next(integer);
                    }));
    
            flux.subscribe(System.out::println);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    200
    202
    204
    206
    208
    210
    212
    214
    216
    218
    220
    222
    224
    226
    228
    230
    232
    234
    236
    238
    240
    242
    244
    246
    248
    250
    252
    254
    256
    258
    260
    262
    264
    266
    268
    270
    272
    274
    276
    278
    280
    282
    284
    286
    288
    290
    292
    294
    296
    298
    300
    302
    304
    306
    308
    310
    312
    314
    316
    318
    320
    322
    324
    326
    328
    330
    332
    334
    336
    338
    340
    342
    344
    346
    348
    350
    352
    354
    356
    358
    360
    362
    364
    366
    368
    370
    372
    374
    376
    378
    380
    382
    384
    386
    388
    390
    392
    394
    396
    398
    400
    402
    404
    406
    408
    410
    412
    414
    416
    418
    420
    422
    424
    426
    428
    430
    432
    434
    436
    438
    440
    442
    444
    446
    448
    450
    452
    454
    456
    458
    460
    462
    464
    466
    468
    470
    472
    474
    476
    478
    480
    482
    484
    486
    488
    490
    492
    494
    496
    498
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150

    多线程的协作与调度

    在提供的 Scheduler Scheduler.Worker 上运行 onNext、onComplete 和
    onError。此运算符会影响线程上下文,在该上下文中,它下面的链中的其余运算符将执行该上下文,直到 publishOn
    出现新出现。通常用于快速发布者、慢速使用者方案。flux.publishOn(Schedulers.single()).subscribe()
    放弃支持:此运算符在数据信号取消或触发错误时丢弃其内部排队等待背压的元素。
    Params:
    scheduler – 一个Scheduler,提供 Scheduler.Worker 在哪里发布 delayError – 是否应该在转发任何错误之前消耗缓冲区 预取
    – 异步边界容量
    返回:异步生成的 Flux

    注意:当主线程关闭的时候,所有子线程都将被关闭(粗暴关闭)

    public static void main(String[] args) {
    
            Flux.range(1, 20)
                    .publishOn(Schedulers.boundedElastic())
                    .log()
                    .subscribe(integer -> {
                        System.out.println(Thread.currentThread().getName());
                    });
    
            try {
                System.in.read();
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
    
    
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    常见的 boundedElastic 实例,一个调度程序,它动态创建有限数量的基于 ExecutorService 的 Worker,并在 Worker 关闭后重用它们。如果空闲超过 60 秒,则可以逐出底层守护程序线程。创建的最大线程数受上限限制(默认情况下,是可用 CPU 内核数的 10 倍,请参见DEFAULT_BOUNDED_ELASTIC_SIZE)。可以在每个支持线程上排队和延迟的最大任务提交数是有限制的(默认情况下,有 100K 个附加任务,请参阅DEFAULT_BOUNDED_ELASTIC_QUEUESIZE)。超过该点,将引发 RejectedExecutionException。根据首选项的顺序,支持新 Scheduler.Worker 的线程将从空闲池中选取、重新创建或从繁忙的池中重用。在后一种情况下,尽最大努力选择支持最少工作线程的线程。请注意,如果一个线程支持少量的工作线程,但这些工作线程提交了大量待处理任务,则第二个工作线程最终可能会由同一线程支持,并看到任务被拒绝。在创建工作线程时,支持线程的拾取也是一劳永逸地完成的,因此,尽管另一个支持线程在此期间处于空闲状态,但由于两个工作线程共享同一个支持线程并提交长时间运行的任务,任务可能会延迟。在第一次调用时,只会创建此通用调度程序的一个实例,并对其进行缓存。在后续调用中返回相同的实例,直到它被释放。不能直接释放公共实例,因为它们在调用方之间缓存和共享。但是,它们可以一起关闭,或者由工厂中的更改替换。返回:通用的 boundedElastic 实例,一个 Scheduler,它动态创建工作线程,其上限为后备线程数,之后是排队任务数,可重用线程并逐出空闲线程

    public static void main(String[] args) {
    
            Flux.range(1, 20)
                    .publishOn(Schedulers.fromExecutor(new ThreadPoolExecutor(
                            4,
                            20,
                            60,
                            TimeUnit.SECONDS,
                            new LinkedBlockingDeque<>()
                    )))
                    .log()
                    .subscribe(integer -> {
                        System.out.println(Thread.currentThread().getName());
                    });
    
            try {
                System.in.read();
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
    
    
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
  • 相关阅读:
    安全防御拓扑1
    VSD Viewer 6.16.1(Visio绘图文件阅读器)
    C++之map_set的使用
    UE5 各种moveto
    redis6.2(二)Redis的新数据类型、使用java语言操作Redis
    GeoTools的AStar算法实现,自定义Node及Edge
    C++自学精简教程 目录(必读)
    应用出海新福祉,融云助IM社交迅速对齐海外用户体验
    ACL访问控制列表 基础、创建ACL访问控制列表的两种方式、配置ACL访问控制列表规则、修改ACL规则的默认步长。子网掩码、反掩码、通配符掩码的区别和作用。
    Redis概述
  • 原文地址:https://blog.csdn.net/2301_78960337/article/details/138057858