• Mono 的执行流程



    前言

    本文主要同时简单的示例来分析一下Mono在发布订阅过程中的执行流程。

    一、示例

    	@Test
        public void executeProcessTest() {
            Mono.just("hello mono")
                    .filter(v -> v != null)
                    .map(v -> v + " map")
                    .defaultIfEmpty("default value")
                    .subscribe(System.out::println);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    二、流程

    1、构建数据发布者

    (1)Mono.just(“hello mono”)

    返回 MonoJust,包装值

    public static <T> Mono<T> just(T data) {
    		return onAssembly(new MonoJust<>(data));
    	}
    
    MonoJust(T value) {
    		this.value = Objects.requireNonNull(value, "value");
    	}
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    (2)filter

    返回 MonoFilterFuseable ,包装 MonoJust 和 predicate

    public final Mono<T> filter(final Predicate<? super T> tester) {
    		if (this instanceof Fuseable) {
    			return onAssembly(new MonoFilterFuseable<>(this, tester));
    		}
    		return onAssembly(new MonoFilter<>(this, tester));
    	}
    
    MonoFilterFuseable(Mono<? extends T> source, Predicate<? super T> predicate) {
    		super(source);
    		this.predicate = Objects.requireNonNull(predicate, "predicate");
    	}
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    (3)map

    返回 MonoMapFuseable 包装 MonoFilterFuseable 和 mapper

    public final <R> Mono<R> map(Function<? super T, ? extends R> mapper) {
    		if (this instanceof Fuseable) {
    			return onAssembly(new MonoMapFuseable<>(this, mapper));
    		}
    		return onAssembly(new MonoMap<>(this, mapper));
    	}
    
    MonoMapFuseable(Mono<? extends T> source, Function<? super T, ? extends R> mapper) {
    		super(source);
    		this.mapper = Objects.requireNonNull(mapper, "mapper");
    	}
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    (4)defaultIfEmpty

    返回MonoDefaultIfEmpty,包装 MonoMapFuseable 和 defaultValue

    public final Mono<T> defaultIfEmpty(T defaultV) {
    	    if (this instanceof Fuseable.ScalarCallable) {
    		    try {
    			    T v = block();
    			    if (v == null) {
    				    return Mono.just(defaultV);
    			    }
    		    }
    		    catch (Throwable e) {
    			    //leave MonoError returns as this
    		    }
    		    return this;
    	    }
    		return onAssembly(new MonoDefaultIfEmpty<>(this, defaultV));
    	}
    
     MonoDefaultIfEmpty(Mono<? extends T> source, T defaultValue) {
            super(source);
            this.defaultValue = Objects.requireNonNull(defaultValue, "defaultValue");
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    数据发布者的发布流程:

    数据 -> MonoJust -> MonoFilterFuseable -> MonoMapFuseable -> MonoDefaultIfEmpty

    2、构建数据订阅者

    从示例中的 subscribe() 开始

    (1) subscribe()

    传入 consumer 消费者

    public final Disposable subscribe(Consumer<? super T> consumer) {
    		Objects.requireNonNull(consumer, "consumer");
    		return subscribe(consumer, null, null);
    	}
    
    public final Disposable subscribe(
    			@Nullable Consumer<? super T> consumer,
    			@Nullable Consumer<? super Throwable> errorConsumer,
    			@Nullable Runnable completeConsumer) {
    		return subscribe(consumer, errorConsumer, completeConsumer, (Context) null);
    	}
    
    public final Disposable subscribe(
    			@Nullable Consumer<? super T> consumer,
    			@Nullable Consumer<? super Throwable> errorConsumer,
    			@Nullable Runnable completeConsumer,
    			@Nullable Context initialContext) {
    		return subscribeWith(new LambdaMonoSubscriber<>(consumer, errorConsumer,
    				completeConsumer, null, initialContext));
    	}
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    创建 LambdaMonoSubscriber 对象,包装最终的消费者consumer

    (2)subscribeWith()

    public final <E extends Subscriber<? super T>> E subscribeWith(E subscriber) {
    		subscribe(subscriber);
    		return subscriber;
    	}
    
    • 1
    • 2
    • 3
    • 4
    public final void subscribe(Subscriber<? super T> actual) {
    		//最后一层发布者,这里是 MonoDefaultIfEmpty
    		CorePublisher publisher = Operators.onLastAssembly(this);
    		//最后一层订阅者,这里是 LambdaMonoSubscriber 
    		CoreSubscriber subscriber = Operators.toCoreSubscriber(actual);
    		
    		//发布者与订阅者建立联系
    		try {
    			if (publisher instanceof OptimizableOperator) {
    				OptimizableOperator operator = (OptimizableOperator) publisher;
    				while (true) {
    					subscriber = operator.subscribeOrReturn(subscriber);
    					if (subscriber == null) {
    						// null means "I will subscribe myself", returning...
    						return;
    					}
    
    					OptimizableOperator newSource = operator.nextOptimizableSource();
    					if (newSource == null) {
    						publisher = operator.source();
    						break;
    					}
    					operator = newSource;
    				}
    			}
    			
    			//发布者发布数据给订阅者
    			publisher.subscribe(subscriber);
    		}
    		catch (Throwable e) {
    			Operators.reportThrowInSubscribe(subscriber, e);
    			return;
    		}
    	}
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34

    (3)发布者与订阅者建立联系的过程

    核心方法:

    subscriber = operator.subscribeOrReturn(subscriber);
    
    • 1

    a). operator 是 MonoDefaultIfEmpty,subscriber 是 LambdaMonoSubscriber

    返回 DefaultIfEmptySubscriber,作为 LambdaMonoSubscriber 的发布者

    @Override
        public CoreSubscriber<? super T> subscribeOrReturn(CoreSubscriber<? super T> actual) {
            return new FluxDefaultIfEmpty.DefaultIfEmptySubscriber<>(actual, defaultValue);
        }
    
    • 1
    • 2
    • 3
    • 4

    b). operator 是 MonoMapFuseable ,subscriber 是 DefaultIfEmptySubscriber

    返回 MapFuseableSubscriber,作为 DefaultIfEmptySubscriber 的发布者

    public CoreSubscriber<? super T> subscribeOrReturn(CoreSubscriber<? super R> actual) {
    		if (actual instanceof ConditionalSubscriber) {
    			ConditionalSubscriber<? super R> cs = (ConditionalSubscriber<? super R>) actual;
    			return new FluxMapFuseable.MapFuseableConditionalSubscriber<>(cs, mapper);
    		}
    		return new FluxMapFuseable.MapFuseableSubscriber<>(actual, mapper);
    	}
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    c).operator 是 MonoFilterFuseable ,subscriber 是 MapFuseableSubscriber

    返回 FilterFuseableSubscriber,作为 MapFuseableSubscriber 的发布者

    public CoreSubscriber<? super T> subscribeOrReturn(CoreSubscriber<? super T> actual) {
    		if (actual instanceof ConditionalSubscriber) {
    			return new FluxFilterFuseable.FilterFuseableConditionalSubscriber<>((ConditionalSubscriber<? super T>) actual, predicate);
    		}
    		return new FluxFilterFuseable.FilterFuseableSubscriber<>(actual, predicate);
    	}
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    此时发布者与订阅者关系:

    FilterFuseableSubscriber -> MapFuseableSubscriber -> DefaultIfEmptySubscriber -> LambdaMonoSubscriber -> consumer

    3、建立订阅关系

    publisher.subscribe(subscriber);
    
    • 1

    此时publisher 是 MonoJust, subscriber 是 FilterFuseableSubscriber
    创建 scalarSubscription ,包装 FilterFuseableSubscriber

    	@Override
    	public void subscribe(CoreSubscriber<? super T> actual) {
    		actual.onSubscribe(Operators.scalarSubscription(actual, value));
    	}
    
    • 1
    • 2
    • 3
    • 4

    根据发布订阅关系依次调用订阅者的 onSubscribe() 建立订阅关系

    FilterFuseableSubscriber -> MapFuseableSubscriber -> DefaultIfEmptySubscriber -> LambdaMonoSubscriber

    进入 LambdaMonoSubscriber 的 onSubscribe()

    	@Override
    	public final void onSubscribe(Subscription s) {
    		if (Operators.validate(subscription, s)) {
    			this.subscription = s;
    
    			if (subscriptionConsumer != null) {
    				try {
    					subscriptionConsumer.accept(s);
    				}
    				catch (Throwable t) {
    					Exceptions.throwIfFatal(t);
    					s.cancel();
    					onError(t);
    				}
    			}
    			else {
    				//请求数据
    				s.request(Long.MAX_VALUE);
    			}
    
    		}
    	}
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    4、请求数据

    通过订阅关系调用 request() 请求数据,

    s.request(Long.MAX_VALUE);
    
    • 1

    即根据下面的关系链反向请求数据

    FilterFuseableSubscriber -> MapFuseableSubscriber -> DefaultIfEmptySubscriber -> LambdaMonoSubscriber

    最终到了 Operators 类中

    		@Override
    		public void request(long n) {
    			if (validate(n)) {
    				if (ONCE.compareAndSet(this, 0, 1)) {
    					Subscriber<? super T> a = actual;
    					//发布数据
    					a.onNext(value);
    					if(once != 2) {
    						//发布完成
    						a.onComplete();
    					}
    				}
    			}
    		}
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    5、发布数据

    从 FilterFuseableSubscriber 开始调用 onNext() 发布数据,根据依次发布给各自的订阅者,最终数据到了最后一个订阅者 LambdaMonoSubscriber

        LambdaMonoSubscriber.java
    
    	@Override
    	public final void onNext(T x) {
    		Subscription s = S.getAndSet(this, Operators.cancelledSubscription());
    		if (s == Operators.cancelledSubscription()) {
    			Operators.onNextDropped(x, this.initialContext);
    			return;
    		}
    		if (consumer != null) {
    			try {
    				//最终调用 consumer 消费数据
    				consumer.accept(x);
    			}
    			catch (Throwable t) {
    				Exceptions.throwIfFatal(t);
    				s.cancel();
    				doError(t);
    			}
    		}
    		if (completeConsumer != null) {
    			try {
    				completeConsumer.run();
    			}
    			catch (Throwable t) {
    				Operators.onErrorDropped(t, this.initialContext);
    			}
    		}
    	}
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    6、发布完成

    在数据发布依次到消费者消费后,进入第4步中的 a.onComplete();

    依次调用各自的订阅者调用 onComplete()。

  • 相关阅读:
    Kubernetes二进制部署——理论部分
    【C++】类和对象(上)
    我对《RAG/大模型/非结构化数据知识库类产品》技术架构的思考、杂谈
    React富文本编辑器开发(十二)插件
    C++类和对象【2】—— 对象特性(构造函数、析构函数、拷贝构造函数、深浅拷贝、初始化列表、类对象作为成员类、静态成员变量及静态成员函数等。)
    Python批量保存Excel文件中的图表为图片
    观察者模式与发布订阅者模式
    【 React 】React 构建组件的方式有哪些?区别?
    Karl Guttag:AR眼镜应根据用途来设计,VST并未解决技术难题
    Python3 错误和异常一篇就够了
  • 原文地址:https://blog.csdn.net/RenshenLi/article/details/125469945