反应式流的核心是Observable和Observer,Observable表示一个数据流,而Observer则表示这个数据流的消费者。Observable在数据流上产生事件,而Observer则对这些事件进行响应。反应式流的数据流是一种推式的流,Observable发布事件时不需要等待Observer接收,Observable会把事件推送给Observer,而不是Observer去轮询Observable。
Java的反应式流通常基于Reactor或RxJava等库,这些库提供了丰富的函数式编程API和运算符,可以非常方便地处理异步事件。这些库都提供了类似于Observable和Observer的抽象概念,可以用来描述和处理异步数据流。同时还提供了常用的运算符,包括map、filter、reduce等,这些运算符可以方便地对数据流进行变换和过滤。
反应式流还有一个重要的概念是背压(backpressure),它是指在高并发和大流量情况下,消费者无法处理生产者产生的数据流,导致数据积压的情况。为了解决这个问题,反应式流引入了背压机制,生产者会在发送数据前先询问消费者的处理能力,如果消费者没有处理能力,生产者会等待一段时间或者缓存数据,等待消费者处理完数据后再继续发送。
反应式流已经被广泛应用于大规模的互联网应用中,包括机器学习、数据分析、网络爬虫等领域。它的优点在于处理高并发和大流量的数据流时,能够更加高效地利用系统资源,提高系统的性能和可扩展性。
总之,反应式流是Java编程中的一个重要概念,它可以帮助我们更好地处理异步和事件驱动的数据流,提高系统的性能和可扩展性。
- package com.example.jdk9.react;
-
- import java.util.concurrent.Flow.*;
-
- public class PublisherSubscriberDemo {
- public static void main(String[] args) {
- SimplePublisher<String> publisher = new SimplePublisher<>();
- SimpleSubscriber<String> subscriber1 = new SimpleSubscriber<>();
- SimpleSubscriber<String> subscriber2 = new SimpleSubscriber<>();
- publisher.subscribe(subscriber1);
- publisher.subscribe(subscriber2);
- publisher.submit("hello");
- publisher.submit("world");
- publisher.close();
- }
- }
-
- class SimplePublisher
implements Publisher { - private Subscription subscription;
-
- @Override
- public void subscribe(Subscriber super T> subscriber) {
- subscriber.onSubscribe(new Subscription() {
-
- @Override
- public void request(long n) {
-
- }
-
- @Override
- public void cancel() {
-
- // nothing to do
- }
- });
-
- this.subscription = new Subscription() {
- private boolean cancelled = false;
-
- @Override
- public void request(long n) {
- // nothing to do
- }
-
- @Override
- public void cancel() {
- this.cancelled = true;
- }
-
- public boolean isCancelled() {
- return this.cancelled;
- }
- };
-
- subscriber.onSubscribe(this.subscription);
- }
-
- public void submit(T item) {
- subscriptionLimitedQueue.offer(item);
- subscription.request(1);
- }
-
- public void close() {
- while (!subscriptionLimitedQueue.isEmpty()) {
- subscriptionLimitedQueue.poll();
- }
- subscription.cancel();
- }
-
- private SubscriptionLimitedQueue
subscriptionLimitedQueue = new SubscriptionLimitedQueue<>(2); -
- static class SubscriptionLimitedQueue
{ - private final int limit;
- private int size = 0;
- private Node
head; - private Node
tail; -
- public SubscriptionLimitedQueue(int limit) {
- this.limit = limit;
- }
-
- private static class Node
{ - final T item;
- Node
next; -
- Node(T item, Node
next) { - this.item = item;
- this.next = next;
- }
- }
-
- public void offer(T item) {
- Node
node = new Node<>(item, null); - if (head == null) {
- head = node;
- tail = head;
- } else {
- tail.next = node;
- tail = tail.next;
- }
- size++;
- if (size > limit) {
- Node
newHead = head.next; - head.next = null;
- head = newHead;
- size--;
- }
- }
-
- public boolean isEmpty() {
- return size == 0;
- }
-
- public T poll() {
- if (isEmpty()) {
- return null;
- }
- T item = head.item;
- Node
newHead = head.next; - head.next = null;
- head = newHead;
- size--;
- return item;
- }
- }
- }
-
- class SimpleSubscriber
implements Subscriber { - private Subscription subscription;
-
- @Override
- public void onSubscribe(Subscription subscription) {
- this.subscription = subscription;
- System.out.println("订阅成功");
- subscription.request(1);
- }
-
- @Override
- public void onNext(T item) {
- System.out.println("Received item: " + item);
- subscription.request(1);
- }
-
- @Override
- public void onError(Throwable throwable) {
- throwable.printStackTrace();
- }
-
- @Override
- public void onComplete() {
- System.out.println("Done");
- }
- }
这段代码演示了使用Flow API来发布和订阅消息的过程,它包含以下类和接口:
具体解释:
运行结果:

第一步,引入依赖:
- <dependency>
- <groupId>io.projectreactor</groupId>
- <artifactId>reactor-core</artifactId>
- <version>3.5.11</version>
- </dependency>
第二步,编写代码:
- package com.example.jdk9.react;
-
- import reactor.core.publisher.Flux;
- import reactor.core.publisher.Mono;
-
- public class ReactiveStreamExample {
- public static void main(String[] args) {
- Flux<Integer> stream = Flux.range(1, 10);
-
- stream
- .map(i -> i * 2)
- .filter(i -> i % 3 == 0)
- .flatMap(i -> Mono.just(i).zipWith(Mono.just(i * 3)))
- .subscribe(System.out::println);
- }
- }
上面的代码首先创建了一个从1到10的数字列表,然后通过map操作符将每个元素乘以2,再使用filter操作符过滤掉不能被3整除的元素。接下来,使用flatMap操作符来创建一个新的流,该流将原始元素和该元素乘以3的结果合并在一起。最后,使用subscribe方法来订阅这个流并打印出每个元素的值。
这个例子展示了Reactor库中的一些常见操作符,包括map、filter和flatMap。通过这些操作符的链式调用,我们可以轻松地对数据流进行复杂的操作。在实际的应用中,我们可以根据具体的需求选择不同的操作符来实现所需的数据处理逻辑。
- package com.example.jdk9.react;
-
- import reactor.core.publisher.Flux;
-
- public class PublisherSubscriberExample {
- public static void main(String[] args) {
- // 创建发布者
- Flux<Integer> publisher = Flux.just(1, 2, 3, 4, 5);
-
- // 订阅者1:打印每个元素
- publisher.subscribe(System.out::println);
-
- // 订阅者2:计算元素的总和并打印
- publisher.reduce(0, Integer::sum)
- .subscribe(total -> System.out.println("Sum = " + total));
- }
- }