• 【深入解析spring cloud gateway】08 Reactor 知识扫盲


    一、响应式编程概述

    1.1 背景知识

    为了应对高并发服务器端开发场景,在2009 年,微软提出了一个更优雅地实现异步编程的方式——Reactive Programming,我们称之为响应式编程。随后,Netflix 和LightBend 公司提供了RxJava 和Akka Stream 等技术,使得Java 平台也有了能够实现响应式编程的框架。
    在2017 年9 月28 日,Spring 5 正式发布。Spring 5 发布最大的意义在于,它将响应式编程技术的普及向前推进了一大步。而同时,作为在背后支持Spring 5 响应式编程的框架Spring Reactor,也进入了里程碑式的3.1.0 版本。

    1.2 什么是响应式编程

    响应式编程是一种面向数据流和变化传播的编程范式。这意味着可以在编程语言中很方便地表达静态或动态的数据流,而相关的计算模型会自动将变化的值通过数据流进行传播。
    响应式编程基于reactor(Reactor 是一个运行在 Java8 之上的响应式框架)的思想,当你做一个带有一定延迟的才能够返回的io操作时,不会阻塞,而是立刻返回一个流,并且订阅这个流,当这个流上产生了返回数据,可以立刻得到通知并调用回调函数处理数据。
    电子表格程序就是响应式编程的一个例子。单元格可以包含字面值或类似"=B1+C1"的公式,而包含公式的单元格的值会依据其他单元格的值的变化而变化。
    响应式传播核心特点之一:变化传播:一个单元格变化之后,会像多米诺骨牌一样,导致直接和间接引用它的其他单元格均发生相应变化。

    1.3 基于 Reactor 实现

    Reactor 是一个运行在 Java8 之上满足 Reactice 规范的响应式框架,它提供了一组响应式风格的 API。
    Reactor 有两个核心类: Flux 和 Mono,这两个类都实现 Publisher 接口。
    Flux 类似 RxJava 的 Observable,它可以触发零到多个事件,并根据实际情况结束处理或触发错误。
    Mono 最多只触发一个事件,所以可以把 Mono 用于在异步任务完成时发出通知。
    Flux 和 Mono 都是数据流的发布者,使用 Flux 和 Mono 都可以发出三种数据信号:元素值,错误信号,完成信号;错误信号和完成信号都代表终止信号,终止信号用于告诉订阅者数据流结束了,错误信号终止数据流同时把错误信息传递给订阅者。
    三种信号的特点:
    错误信号和完成信号都是终止信号,不能共存
    如果没有发送任何元素值,而是直接发送错误或者完成信号,表示是空数据流
    如果没有错误信号,也没有完成信号,表示是无限数据流

    Mono 原理图如下:
    在这里插入图片描述

    Flux原理图如下:
    在这里插入图片描述

    结合上面两个图,发现Mono和Flux非常相似。只是Mono只接收一个元素,而Flux接收多个元素

    二、示例代码

    2.1 Mono

    package com.reactor.demo;
    
    import lombok.extern.slf4j.Slf4j;
    import org.junit.Test;
    import reactor.core.publisher.Mono;
    
    import java.time.Duration;
    import java.util.Optional;
    import java.util.function.Consumer;
    import java.util.function.Function;
    
    @Slf4j
    public class MonoTest {
        @Test
        public void test1() {
            //just用法
            Mono.just("hello world").subscribe(System.out::println);
            //runnable创建mono
            Mono<Void> sinkMono = Mono.fromRunnable(() -> System.out.println("runnable"));
            //这句不会输出
            sinkMono.doOnNext(unused -> System.out.println("void success"));
            //这句也不会输出
            sinkMono.subscribe(o -> System.out.println("void result" + o));
    
            //创建一个不包含任何元素,只发布结束消息的序列。,这里的hello empty是不会输出的。
            Mono.empty()
                    //输出“empty的入参是null”
                    .doOnSuccess(o -> System.out.println("empty的入参是" + o))
                    //这句不会输出
                    .subscribe(o -> System.out.println("hello empty"));
            //empty里面至少还有一个结束消息,而never则是真的啥都没有。"never的入参是"不会输出 ,这里的hello never也不会输出
            Mono.never().doOnSuccess(o -> System.out.println("never的入参是" + o)).subscribe(o -> System.out.println("hello never"));
        }
    
        @Test
        public void test2() {
            //传入supplier
            Mono.fromSupplier(() -> "Hello supplier").subscribe(System.out::println);
            //传入optional
            Mono.justOrEmpty(Optional.of("Hello optional")).subscribe(System.out::println);
            //通过sink来创建一个正常执行的Mono
            Mono.create(sink -> sink.success("Hello sink")).subscribe(System.out::println);
            //通过sink来创建一个抛出异常的Mono
            Mono.create(sink -> sink.error(new RuntimeException("sink error"))).subscribe(System.out::println);
            //defer的入参实际上是一个Mono工厂
            Mono.defer(() -> Mono.just("hello defer")).subscribe(System.out::println);
        }
    
        @Test
        public void test3() {
            //callable,有返回值
            Mono.fromCallable(() -> "callable").subscribe(System.out::println);
            //runnable无返回值
            Mono<Void> mono = Mono.fromRunnable(() -> System.out.println("run"));
            //下面的hello runnable是不会输出的。因为subscribe一个Mono,不会产生任何结果
            mono.subscribe(o -> System.out.println("hello runnable"));
        }
    
        @Test
        public void test4() {
            //延迟3秒输出
            Mono.delay(Duration.ofSeconds(3)).doOnNext(new Consumer<Long>() {
                @Override
                public void accept(Long aLong) {
                    System.out.println(aLong);
                }
            }).block();
    
        }
    
        @Test
        public void test5() {
            //直接输出了异常
            Mono.error(new RuntimeException("这是一个异常")).subscribe(new Consumer<Object>() {
                @Override
                public void accept(Object o) {
                    System.out.println("error:" + o);
                }
            });
    
            Mono.defer(() -> {
                return Mono.error(new RuntimeException("这是第二个异常"));
            }).subscribe(new Consumer<Object>() {
                @Override
                public void accept(Object o) {
                    System.out.println("defer error:" + o);
                }
            });
        }
    
        @Test
        public void test6() {
            //通过map可以对元素进行转换
            Mono.just("just one").map(new Function<String, Integer>() {
                @Override
                public Integer apply(String s) {
                    return 1;
                }
            }).doOnNext(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) {
                    System.out.println("转换后的结果:" + integer);
                }
            }).subscribe();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106

    2.1 Flux

    package com.reactor.demo;
    
    import org.junit.Test;
    import reactor.core.publisher.Flux;
    import reactor.core.publisher.Mono;
    
    import java.time.Duration;
    import java.util.Arrays;
    import java.util.function.Consumer;
    
    public class FluxTest {
    
        /**
         * 基本用法
         */
        @Test
        public void test1() {
            //通过just传入可变的参数,依次输出
            Flux.just("hello", "world", "just").doOnNext(System.out::println)
                    .doOnComplete(() -> System.out.println("just over")).subscribe();
            //传入一个范围
            Flux.range(100, 10)
                    .doOnNext(System.out::println).doOnComplete(() -> System.out.println("OK")).subscribe();
            //传入list
            Flux.fromIterable(Arrays.asList("01", "02", "03")).doOnNext(System.out::println).subscribe();
            //传入一个数组
            Flux.fromArray(new Object[]{"obj1", "obj2"}).doOnNext(System.out::println).subscribe();
        }
    
    
        /**
         * 处理空值
         */
        @Test
        public void testEmpty() {
            //如果序列是个空的,就给个默认值
            Flux.empty().defaultIfEmpty(1).doOnNext(System.out::println).subscribe();
            //如果序列是空的,就用新序列代替
            Flux.empty().switchIfEmpty(Mono.just("100")).doOnNext(System.out::println).subscribe();
        }
    
    
        /**
         * 序列在执行时的一些监听方法doOnXXXX
         */
        @Test
        public void testDoOn() {
            System.out.println("----------");
            Flux.range(100, 10)
                    .doOnNext(System.out::println).doOnComplete(() -> System.out.println("OK"));
    
            System.out.println("----------");
            Flux.range(100, 10).doFirst(() -> System.out.println("第一个执行开始")).subscribe();
    
            System.out.println("----------");
            Flux.range(100, 10).doFinally(it -> System.out.println("终止信号的类型为" + it.name())).subscribe();
    
            System.out.println("----------");
            Flux.range(100, 10).doOnSubscribe(it -> System.out.println("该序列已被订阅")).subscribe();
    
            System.out.println("----------");
    
            Flux.range(100, 10).doOnRequest(value -> System.out.println("doOnRequest:" + value)).subscribe();
    
            //在完成或者error时,也就是序列终止时执行runnable
            System.out.println("----------");
            Flux.range(100, 10).doOnTerminate(() -> System.out.println("doOnTerminate")).subscribe();
    
            //doOnEach每次向下游传播,都会得到一个信号类型,可以根据该信号类型执行一些操作
            System.out.println("----------");
            Flux.range(100, 10).doOnEach(it -> System.out.println("doOnEach:" + it)).subscribe();
        }
    
    
        /**
         * filter用法
         */
        @Test
        public void testFilter() {
    
            System.out.println("----------");
            //将上游的数据进行类型判断,符合该类型的数据将流向下游
            Flux.just(new Object(), "Hello", 1)
                    .ofType(String.class).doOnNext(System.out::println)
                    .doOnComplete(() -> System.out.println("过滤String示例")).subscribe();
    
            System.out.println("----------");
            //过滤数据
            Flux.range(100, 10)
                    .filter(it -> it > 105)
                    .doOnComplete(() -> System.out.println("取出大于105示例")).subscribe();
    
            System.out.println("----------");
            //将重复数据过滤,重复数据在整个序列中只保留一个
            Flux.range(100, 10)
                    .concatWith(Flux.just(100, 100, 100))
                    .distinct().doOnNext(System.out::println)
                    .doOnComplete(() -> System.out.println("去除重复数字示例")).subscribe();
    
            System.out.println("----------");
    
            //将后来的重复数据过滤,如下,第二个flux拼接到第一个序列时,只会把第二个元素本身的重复元素过滤
            Flux.range(100, 10)
                    .concatWith(Flux.just(100, 100, 100))
                    .distinctUntilChanged().doOnNext(System.out::println)
                    .doOnComplete(() -> System.out.println("将后来的重复数据过滤")).subscribe();
    
            System.out.println("----------");
            //在序列的开始获取5个元素,
            // limitRequest为true时,则不管该序列会发射多少元素,该参数会向上传递背压,则上游序列只会发出设定的5个元素
            //为false时,则不控制上有元素可以发出N个元素
            Flux.range(100, 10).take(5, false)
                    .doOnComplete(() -> System.out.println("在序列的开始获取5个元素")).subscribe();
    
            System.out.println("----------");
            //参数为时间单位,意味着take获取元素,只会在该时间限制内获取。
            Flux.range(100, 10).take(Duration.ofSeconds(10))
                    .doOnNext(new Consumer<Integer>() {
                        @Override
                        public void accept(Integer integer) {
                            System.out.println("当前时间戳为:" + System.currentTimeMillis() + ",数字为:" + integer);
                        }
                    })
                    .doOnComplete(() -> System.out.println("在指定时间内获取元素"))
                    .subscribe(System.out::println);
    
            System.out.println("----------");
            //获取最后的N位元素
            Flux.range(100, 10).takeLast(2)
                    .doOnComplete(() -> System.out.println("获取最后的2位元素"))
                    .subscribe(System.out::println);
    
            System.out.println("----------");
            //获取元素,知道符合条件后停止向下游发送数据,包括条件本身,也就是当it>105的元素也会被发布至下游
            Flux.range(100, 10).takeUntil(it -> it > 105)
                    .doOnComplete(() -> System.out.println("一直取数,直到大于105结束"))
                    .subscribe(System.out::println);
    
            System.out.println("----------");
            //获取元素,当元素符合该断言时,如果不符合直接终止,不包含条件本身
            Flux.range(100, 10).takeWhile(it -> it < 105)
                    .doOnComplete(() -> System.out.println("取出小于105示例"))
                    .subscribe(System.out::println);
    
            System.out.println("----------");
            //获取指定某个位置的一个元素
            Flux.range(100, 10).elementAt(0)
                    .doOnSuccess(new Consumer<Integer>() {
                        @Override
                        public void accept(Integer i) {
                            System.out.println("获取指定某个位置的一个元素:" + i);
                        }
                    })
                    .subscribe();
    
            System.out.println("----------");
            //获取最后一个元素,last()如果为空则抛出异常,last(1)如果为空则发出默认值
            Flux.range(100, 10)
                    .takeWhile(it -> it > 105).last(1)
                    .subscribe(System.out::println);
    
            System.out.println("----------");
            //跳至第几秒开始执行
            Flux.range(100, 10)
                    .skip(Duration.ofSeconds(5)).subscribe(System.out::println);
    
            System.out.println("----------");
            //跳至第几个元素开始执行
            Flux.range(100, 10)
                    .skip(5).subscribe(System.out::println);
    
            System.out.println("----------");
            //从开始跳到最后第N个元素结束
            Flux.range(100, 10).skipLast(5).subscribe(System.out::println);
    
            System.out.println("----------");
            //跳至满足条件的地方开始执行,从第一个元素开始,知道满足条件,开始发送至下游
            Flux.range(100, 10).skipUntil(it -> it > 105).subscribe(System.out::println);
    
            System.out.println("----------");
            //每隔一段时间抽取样本数(取在这个时间的最后一个元素),如果相隔实现大于序列的执行时间,则去最后一元素
            Flux.range(100, 100000000).sample(Duration.ofMillis(100)).subscribe(System.out::println);
    
            System.out.println("----------");
            //每隔一段时间抽取样本数(取在这个时间的第一个元素),如果相隔实现大于序列的执行时间,则取第一个元素
            Flux.range(100, 10).sampleFirst(Duration.ofMillis(100)).subscribe(System.out::println);
    
            System.out.println("----------");
            //只获取一个元素,single()如果为空或者超多一个,抛出异常,single(1)如果为空返回默认值,如果多个抛出异常,singleOrEmpty()可以允许为空
            Flux.range(100, 10).single(1).subscribe(System.out::println);
        }
    
    
        /**
         * 当被订阅后如果发生异常,则stream会停止运行
         * 此时可以通过处理error来决定如何处理异常
         * 可以将异常跳过、将异常替换等
         */
        @Test
        public void testErrorHandle() {
            System.out.println("----------");
            Flux.just(1, 2, 3, 0, 5, 4).map(it -> {
                        it = 100 / it;
                        return it;
                    })
                    //报错后返回,并停止运行
                    .onErrorResume(e -> {
                        return Mono.just(10000);
                    })
                    .doFinally(type -> {
                        System.out.println(type);
                    })
                    .subscribe(System.out::println);
    
            System.out.println("----------");
            Flux.just(1, 2, 3).doOnNext(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) {
                    System.out.println(integer);
                    if (integer == 2) {
                        throw new RuntimeException("触发异常");
                    }
                }
            }).doOnError(new Consumer<Throwable>() {
                @Override
                public void accept(Throwable throwable) {
                    System.out.println("doOnError:" + throwable.getMessage());
                }
            }).subscribe();
    
    
            System.out.println("----------");
            Flux.just(1, 2, 3, 0, 5, 4).map(it -> {
                        it = 100 / it;
                        return it;
                    })
                    //报错后继续运行,并执行相关操作
                    .onErrorContinue((e, it) -> {
                        System.out.println(e.getMessage());
                    })
                    .doFinally(type -> {
                        System.out.println(type);
                    })
                    .subscribe(System.out::println);
        }
    
        @Test
        public void flatMapTest() {
            //输出50,100
            Flux.just(5, 10).flatMap(x -> Flux.just(x * 10)).toStream().forEach(System.out::println);
    
    
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166
    • 167
    • 168
    • 169
    • 170
    • 171
    • 172
    • 173
    • 174
    • 175
    • 176
    • 177
    • 178
    • 179
    • 180
    • 181
    • 182
    • 183
    • 184
    • 185
    • 186
    • 187
    • 188
    • 189
    • 190
    • 191
    • 192
    • 193
    • 194
    • 195
    • 196
    • 197
    • 198
    • 199
    • 200
    • 201
    • 202
    • 203
    • 204
    • 205
    • 206
    • 207
    • 208
    • 209
    • 210
    • 211
    • 212
    • 213
    • 214
    • 215
    • 216
    • 217
    • 218
    • 219
    • 220
    • 221
    • 222
    • 223
    • 224
    • 225
    • 226
    • 227
    • 228
    • 229
    • 230
    • 231
    • 232
    • 233
    • 234
    • 235
    • 236
    • 237
    • 238
    • 239
    • 240
    • 241
    • 242
    • 243
    • 244
    • 245
    • 246
    • 247
    • 248
    • 249
    • 250
    • 251
    • 252
    • 253
    • 254

    Mono源码分析

    以下面的代码为例,来分析一下Mono源码

    @Test
    public void test0() {
        //just用法
        Mono.just("hello world").subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) {
                System.out.println("accept:" + s);
            }
        });
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    Mono.just返回了个啥

    原来是静态方法,返回了个MonoJust对象,入参作为构造参数传入

    	public static <T> Mono<T> just(T data) {
    		return onAssembly(new MonoJust<>(data));
    	}
    
    • 1
    • 2
    • 3

    MonoJust原来是继承Mono的
    在这里插入图片描述
    有两个要点
    1、有一个value字段来保存入参
    2、一个subscribe方法,执行了什么动作,后面再分析

    final class MonoJust<T> 
    extends Mono<T>
    		implements Fuseable.ScalarCallable<T>, Fuseable, SourceProducer<T>  {
    
    	final T value;
    
    	MonoJust(T value) {
    		this.value = Objects.requireNonNull(value, "value");
    	}
    
    	@Override
    	public T call() throws Exception {
    		return value;
    	}
    
    	@Override
    	public T block(Duration m) {
    		return value;
    	}
    
    	@Override
    	public T block() {
    		return value;
    	}
    
    	@Override
    	public void subscribe(CoreSubscriber<? super T> actual) {
    		actual.onSubscribe(Operators.scalarSubscription(actual, value));
    	}
    
    	@Override
    	public Object scanUnsafe(Attr key) {
    		if (key == Attr.BUFFERED) return 1;
    		if (key == Attr.RUN_STYLE) return Attr.RunStyle.SYNC;
    		return null;
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37

    Mono.subscribe到底做了什么

    subscribe方法入参是一个Consumer对象,这里可以理解为一个回调方法

    	public final Disposable subscribe(Consumer<? super T> consumer) {
    		Objects.requireNonNull(consumer, "consumer");
    		return subscribe(consumer, null, null);
    	}
    
    • 1
    • 2
    • 3
    • 4

    继续往下,调用了三个参数的subscribe方法

    	public final Disposable subscribe(
    			@Nullable Consumer<? super T> consumer,
    			@Nullable Consumer<? super Throwable> errorConsumer,
    			@Nullable Runnable completeConsumer) {
    		return subscribe(consumer, errorConsumer, completeConsumer, (Context) null);
    	}
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    这里除了我们入参的第一个consumer,还有另外两个consumer。还有一个context,即上下文,这里也是空的。
    errorConsumer:在出错的时候的回调方法
    completeConsumer:完成时的回调方法,这里是一个Runnable

    	public final Disposable subscribe(
    			@Nullable Consumer<? super T> consumer,
    			@Nullable Consumer<? super Throwable> errorConsumer,
    			@Nullable Runnable completeConsumer,
    			@Nullable Context initialContext) {
    		return subscribeWith(new LambdaMonoSubscriber<>(consumer, errorConsumer,
    				completeConsumer, null, initialContext));
    	}
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    这里将几个consumer封装成了一个LambdaMonoSubscriber。
    接着往下看subscribeWith方法

    	public final <E extends Subscriber<? super T>> E subscribeWith(E subscriber) {
    		subscribe(subscriber);
    		return subscriber;
    	}
    
    • 1
    • 2
    • 3
    • 4

    继续往下分析

    @Override
    	@SuppressWarnings("unchecked")
    	public final void subscribe(Subscriber<? super T> actual) {
    		CorePublisher publisher = Operators.onLastAssembly(this);
    		CoreSubscriber subscriber = Operators.toCoreSubscriber(actual);
    	//省略部分代码
    			publisher.subscribe(subscriber);
    		}
    		catch (Throwable e) {
    			Operators.reportThrowInSubscribe(subscriber, e);
    			return;
    		}
    	}
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    省略了部分代码,publisher.subscribe(subscriber),
    publisher 即当前mono对象,MonoJust实现了这个方法
    这里又回到MonoJust里了

    @Override
    public void subscribe(CoreSubscriber<? super T> actual) {
    	actual.onSubscribe(Operators.scalarSubscription(actual, value));
    }
    
    • 1
    • 2
    • 3
    • 4

    这里的actual是什么,是上面的LambdaMonoSubscriber
    这里又把actual和value封装成Operators.scalarSubscription
    看一下LambdaMonoSubscriber定义
    LambdaMonoSubscriber主要就是定义了一系列consumer,即回调勾子

    final class LambdaMonoSubscriber<T> implements InnerConsumer<T>, Disposable {
    
    	final Consumer<? super T>            consumer;
    	final Consumer<? super Throwable>    errorConsumer;
    	final Runnable                       completeConsumer;
    	final Consumer<? super Subscription> subscriptionConsumer;
    	final Context                        initialContext;
    
    	volatile Subscription subscription;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    看一下LambdaMonoSubscriber.onSubscribe方法

    	@Override
    	public final void onSubscribe(Subscription s) {
    		if (Operators.validate(subscription, s)) {
    			this.subscription = s;
    
    			if (subscriptionConsumer != null) {
    				try {
    					subscriptionConsumer.accept(s);
    				}
    				catch (Throwable t) {
    					Exceptions.throwIfFatal(t);
    					s.cancel();
    					onError(t);
    				}
    			}
    			else {
    				s.request(Long.MAX_VALUE);
    			}
    
    		}
    	}
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    我们的示例,会进入s.request(Long.MAX_VALUE);这个逻辑
    这个s是什么:Operators.scalarSubscription
    再来看看request方法

    		@Override
    		public void request(long n) {
    			if (validate(n)) {
    				if (ONCE.compareAndSet(this, 0, 1)) {
    					Subscriber<? super T> a = actual;
    					a.onNext(value);
    					if(once != 2) {
    						a.onComplete();
    					}
    				}
    			}
    		}
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    即调用了LambdaMonoSubscriber的onNext和complete方法
    LambdaMonoSubscriber.next

    @Override
    	public final void onNext(T x) {
    		Subscription s = S.getAndSet(this, Operators.cancelledSubscription());
    		if (s == Operators.cancelledSubscription()) {
    			Operators.onNextDropped(x, this.initialContext);
    			return;
    		}
    		if (consumer != null) {
    			try {
    				consumer.accept(x);
    			}
    			catch (Throwable t) {
    				Exceptions.throwIfFatal(t);
    				s.cancel();
    				doError(t);
    			}
    		}
    		if (completeConsumer != null) {
    			try {
    				completeConsumer.run();
    			}
    			catch (Throwable t) {
    				Operators.onErrorDropped(t, this.initialContext);
    			}
    		}
    	}
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    这里主要是调用了consumer.accept(x);。这个consumer即我们最开始入参的那个回调方法
    onComplete同理,即运行completeConsumer这个runnable

    	@Override
    	public final void onComplete() {
    		Subscription s = S.getAndSet(this, Operators.cancelledSubscription());
    		if (s == Operators.cancelledSubscription()) {
    			return;
    		}
    		if (completeConsumer != null) {
    			try {
    				completeConsumer.run();
    			}
    			catch (Throwable t) {
    				Operators.onErrorDropped(t, this.initialContext);
    			}
    		}
    	}
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    如果出错的情况下,会执行

    	@Override
    	public final void onError(Throwable t) {
    		Subscription s = S.getAndSet(this, Operators.cancelledSubscription());
    		if (s == Operators.cancelledSubscription()) {
    			Operators.onErrorDropped(t, this.initialContext);
    			return;
    		}
    		doError(t);
    	}
    
    	void doError(Throwable t) {
    		if (errorConsumer != null) {
    			errorConsumer.accept(t);
    		}
    		else {
    			Operators.onErrorDropped(Exceptions.errorCallbackNotImplemented(t), this.initialContext);
    		}
    	}
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    到这里,就分析完了

    总结一下

    • Mono.Just构造了一个MonoJust对象,用于接收入参value。
    • Mono.subscribe方法,会接收一个Consumer,可以理解为回调方法
    • Mono.subscribe方法,最终会调用LambdaMonoSubscriber里面的onNext方法。onNext执行的,即我们传入的回调consumer
    • 在正常执行完时,会执行LambdaMonoSubscriber中的onComplete。这个runnable也是通过入参传过来的。
    • 在执行异常时,会执行LambdaMonoSubscriber中的onError。这个回调方法,也是通过入参传过来的。

    体会一下,下面的过程,最好调试一下

        @Test
        public void test0() {
            //just用法
            Mono.just("hello world").doOnNext(new Consumer<String>() {
                @Override
                public void accept(String s) {
                    System.out.println("doOnNext1:"+s);
                }
            }).doOnNext(new Consumer<String>() {
                @Override
                public void accept(String s) {
                    System.out.println("doOnNext2:"+s);
                }
            }).subscribe(new Consumer<String>() {
                @Override
                public void accept(String s) {
                    System.out.println("subscribe:" + s);
                }
            }, new Consumer<Throwable>() {
                @Override
                public void accept(Throwable throwable) {
                    System.out.println("subscribe exception:" + throwable.getMessage());
                }
            }, new Runnable() {
                @Override
                public void run() {
                    System.out.println("subscribe complete");
                }
            });
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    doOnNext1:hello world
    doOnNext2:hello world
    subscribe:hello world
    subscribe complete
    
    • 1
    • 2
    • 3
    • 4

    执行流程如下
    在这里插入图片描述

    参考文章

  • 相关阅读:
    Redis源码阅读02-数据结构sds
    服务器操作集合
    FFmpeg安卓平台编译
    如何评估以及优化谷歌ads
    【MySQL】表的约束
    【图像分割】基于神经气体网络 (NGN)实现图像分割附matlab代码
    codeforces:C. Almost All Multiples【构造 + 贪心】
    java计算机毕业设计工作流流程编辑OA系统源码+mysql数据库+系统+lw文档+部署
    模拟退火补完计划
    Java通过Lettuce访问Redis主从,哨兵,集群
  • 原文地址:https://blog.csdn.net/suyuaidan/article/details/132712085