• Reactor的Publisher与Subscriber


    Project Reactor介绍

    在计算机中,响应式变成或者反应式编程(Reactive Programming)是一种面向数据流和变化传播的编程范式。这意味着可以在编程语言中很方便地变大静态或动态的数据流,而相关的计算模型会自动将变化的值通过数据流进行传播。

    作用

    Reactor希望用少量、有限个数的线程来满足高负载的需要。 IO阻塞浪费系统性能,只有纯异步处理才能发挥系统的全部性能。JDK的异步API较为难用,成为异步编程的瓶颈。

    响应式编程特性
    • Responsive(响应式)
    • Resilient(弹性)
    • Message Driven(消息驱动)
    • asynchronous request(异步请求)
    • non-blocking(非阻塞)
    • Backpressure(背压)
    数据处理流程

    image-20221030094515383

    测试代码

    Subscriber增强类:

    public class LoggingSubscriber<T> implements Subscriber<T> {
        private static final Logger log = LoggerFactory.getLogger(LoggingSubscriber.class);
    
        private Subscription subscription;
        private long requested;
        private long received;
        private CountDownLatch finished = new CountDownLatch(1);
    
        @Override
        public void onComplete() {
            log.info("onComplete: sub={}", subscription.hashCode());
            finished.countDown();
        }
    
        @Override
        public void onError(Throwable t) {
            log.error("Error: sub={}, message={}", subscription.hashCode(), t.getMessage(),t);
            finished.countDown();
        }
    
        @Override
        public void onNext(T value) {
            log.info("onNext: sub={}, value={}", subscription.hashCode(), value);
            this.received++;
            this.requested++;
            subscription.request(1);
        }
    
        @Override
        public void onSubscribe(Subscription sub) {
            log.info("onSubscribe: sub={}", sub.hashCode());
            this.subscription = sub;
            this.received = 0;
            this.requested = 1;
            sub.request(1);
        }
        
        
        public long getRequested() {
            return requested;
        }
        
        public long getReceived() {
            return received;
        }
    
        /**
         * 阻塞调用者,直到发布者发出所有对象或产生错误
         */
        public void block() {
            try {
                finished.await(10, TimeUnit.SECONDS);
            }
            catch(InterruptedException iex) {
                throw new RuntimeException(iex);
            }
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    使用Streams处理数据
    public class SteamTest {
        private static Logger log = LoggerFactory.getLogger(SteamTest.class);
        public static void main(String[] args) {
    
            Publisher<String> pub = Streams.publish(Arrays.asList("hello", "hello again"));
            LoggingSubscriber<String> sub = new LoggingSubscriber<String>();
            pub.subscribe(sub);
            sub.block();
    
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    限制请求数量
    public class YieldTest {
        private static Logger log = LoggerFactory.getLogger(SteamTest.class);
    
        public static void main(String[] args) {
            /**
             * 限制对象创建数量
             * 接收yieldRequest对象并返回下一个要发出的对象的Function参数
             */
            Publisher<String> pub = Streams.yield((t) -> {
                System.out.println(t.getRequestNum());
                return t.getRequestNum() < 5 ? "hello" : null;
            });
    
            LoggingSubscriber<String> sub = new LoggingSubscriber<String>();
            pub.subscribe(sub);
            sub.block();
            assertEquals(5, sub.getReceived());
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    周期性请求
    public class PeriodicTest {
        private static Logger log = LoggerFactory.getLogger(SteamTest.class);
    
        public static void main(String[] args) {
            /**
             * 周期性做请求
             */
            ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
            Publisher<String> pub = Streams.periodically(executor, Duration.ofSeconds(1), (t) -> {
                return t < 5 ? String.format("hello %d", t) : null;
            });
    
            LoggingSubscriber<String> sub = new LoggingSubscriber<String>();
            pub.subscribe(sub);
            sub.block();
            assertEquals(5, sub.getReceived());
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    一些核心概念

    Operators-Publisher/Subscriber
    • Flux是一个标准的Reactive Streams规范中的Publisher,它代表一个包含了[0…N]个元素的异步序列流。在Reactive Streams规范中,针对流中每个元素,订阅者将会监听这三个事件:onNextonCompleteonError

    • Mono是一个特殊的Flux,它代表一个仅包含1个元素的异步序列流。因为只有一个元素,所以订阅者只需要监听onCompleteonError

    Backpressure
    • Subscription
    • onRequest()、onCancel()、onDispose()
    线程调度Schedulers
    • immediate()/single()/newSingle()
    • Elastic()/parallel()/newParallel()
    错误处理
    • onError/onErrorReturn/onErrorResume
    • doOnError/doFinally
  • 相关阅读:
    第十五届蓝桥杯题解-数字接龙
    【工具 & 技巧 & 笔试】PyCharm搜索快捷键大总结 | 【笔试题分享】2023美团算法策略方向题目 解析思路及实例代码(含时间复杂度分析)| 动态规划 求解:人在地图中行走 获得最大金币数量
    城市排水管网监测方案(新型排水管网智能监测技术)
    力扣刷题 day54:10-24
    Livox SLAM(带LIO+闭环检测优化)
    SSM整合之Mybatis笔记(MyBatis的分页插件(代码生成器))(P060—P062)
    SparkSql批量插入或更新,保存数据到Mysql中
    springboot 整合 dubbo 实现分组聚合
    java基于微信小程序的驾校预约管理系统+ssm+uinapp+Mysql+计算机毕业设计
    聊聊HttpClient的close
  • 原文地址:https://blog.csdn.net/weixin_42313773/article/details/127595704