码农知识堂 - 1000bd
  •   Python
  •   PHP
  •   JS/TS
  •   JAVA
  •   C/C++
  •   C#
  •   GO
  •   Kotlin
  •   Swift
  • 【Java开发】 Spring 05 :Project Reactor 响应式流框架(以Reactive方式访问Redis为例)


    响应式编程基于 Project Reactor(Reactor 是一个运行在 Java8 之上的响应式框架)的思想,当你做一个带有一定延迟的才能够返回的 IO 操作时,不会阻塞,而是立刻返回一个流,并且订阅这个流,当这个流上产生了返回数据,可以立刻得到通知并调用回调函数处理数据。本文以 Reactive 方式访问 Redis 为例介绍 Project Reactor 响应式流框架~

    目录

    1 Project Reactor 介绍

    2 以 Reactive 方式访问Redis

    2.1 环境搭建

    ①初步环境搭建见于 【Spring 04】 链接的 2.1 小节

    ②然后新增 RedisConfig 继承 RedisReactiveAutoConfiguration 类

    2.2 基于 ReactiveStringRedisTemplate 实现 CRUD

    ① set + Mono 

    ② append + Mono 

    ③ delete + Mono 

    ④ get + Mono 

    ⑤ opsForList + Flux

    ⑥ opsForHash+ Flux

    ⑦ get + Flux + buffer()

    ⑧ get + Flux + cache()

    ⑨  map()、filter()、take()等操作符

    2.3 关于 ReactiveCrudRepository


    1 Project Reactor 介绍

    Reactor 是一个运行在 Java8 之上满足 Reactice 规范的响应式框架,它提供了一组响应式风格的 API,主要目的:希望用少量、有限个数的线程来满足高负载的需要。IO阻塞浪费系统性能,只有纯异步处理才能发挥系统的全部性能。

    介绍一下最重要的两个类,可能单纯的介绍会让大家觉得云里雾里,但是看到后边 Redis 的实践内容就会恍然大悟了。

    Reactor 有两个核心类: Flux 和 Mono,这两个类都实现 Publisher 接口。

    • Flux 可以触发零到多个事件,并根据实际情况结束处理或触发错误。
    • Mono 最多只触发一个事件,所以可以把 Mono 用于在异步任务完成时发出通知。

    简单来说,Mono 表示 0~1 的序列, Flux用来表示 0~N 个元素序列,

    本文 Reactive 方式访问 Redis 为例,因为

    2 以 Reactive 方式访问Redis

    2.1 环境搭建

    ①初步环境搭建见于 【Spring 04】 链接的 2.1 小节

    ②然后新增 RedisConfig 继承 RedisReactiveAutoConfiguration 类

    路径:src/main/java/com/yinyu/redisdemo/config/RedisConfig.java

    1. @Configuration
    2. public class RedisConfig extends RedisReactiveAutoConfiguration {
    3. }

    2.2 基于 ReactiveStringRedisTemplate 实现 CRUD

    本文采用 ReactiveStringRedisTemplate,展示性更强,当然ReactiveRedisTemplate 也可使用,这两者的区别类似 RedisTemplate 和 StringRedisTemplate,见于 【Spring 04】 链接的 2.3.1 。

    opsForValue、opsForList、opsForSet、opsForZSet、opsForHash 等具体方法的操作也与 StringRedisTemplate 一致,见于 【Spring 04】 链接的 2.3.1 小节,主要是返回内容有所区别(见下文),本文选择重要的内容,并简单介绍几个操作符~

    路径:src/test/java/com/yinyu/redisdemo/reactiveStringRedisTemplateTest.java

    首先注入 ReactiveStringRedisTemplate 类:

    1. @Autowired
    2. private ReactiveStringRedisTemplate reactiveStringRedisTemplate;

    ① set + Mono 

    Ⅰmono.block() -- 返回 mono 内的元素

    此处用到了 Reactor 的 Mono,同时 Mono 包含 Boolean,用于返回插入操作是否成功~

    因为 Boolean 被封装在 Mono 内,所以无法直接得知 True 还是 False,那么 mono.block() 就起到了这个作用,简单来说是接触了 Mono 的这层封装,从而返回 Boolean。

    1. @Test
    2. public void reactiveOpsForValueSetTest1() {
    3. Mono mono = reactiveStringRedisTemplate.opsForValue().set("human","yinyu");
    4. System.out.println(mono.block());
    5. }

    插入成功 👇

     控制台输出成功 👇

    Ⅱ mono.subscribe() -- 订阅

    所有的操作只有在订阅的那一刻才开始进行!!!详情链接:【Reactor学】

    本文只用到 subscribe(System.out::println) ,里边就是函数式编程写法~

    1. @Test
    2. public void reactiveOpsForValueSetTest1() {
    3. Mono mono = reactiveStringRedisTemplate.opsForValue().set("Chinese","yinyu");
    4. mono.subscribe(System.out::println);
    5. }

    插入成功 👇

    控制台输出成功 👇

    Ⅲ mono.subscribe().dispose()

    表示彻底停止正在推送数据中的Flux或Mono流

    1. @Test
    2. public void reactiveOpsForValueSetTest1() {
    3. Mono mono = reactiveStringRedisTemplate.opsForValue().set("Chinese","yinyu");
    4. mono.subscribe(System.out::println).dispose();
    5. }

    ② append + Mono 

    接下来都用 subscribe(System.out::println) 控制台输出,返回的是 append 后字符串的长度

    1. @Test
    2. public void reactiveOpsForValueAppendTest() {
    3. Mono mono = reactiveStringRedisTemplate.opsForValue().append("human","+java");
    4. mono.subscribe(System.out::println);
    5. }

    新增字符到末尾成功👇

    控制台输出 append 后字符串的长度👇

    ③ delete + Mono 

    根据 key 删除记录,返回 Mono 包装的 Boolean

    1. @Test
    2. public void reactiveOpsForValueDeleteTest() {
    3. Mono mono = reactiveStringRedisTemplate.opsForValue().delete("human");
    4. mono.subscribe(System.out::println);
    5. }

    ④ get + Mono 

    根据 key 查询记录,返回 Mono 包装的 String

    1. @Test
    2. public void reactiveOpsForValueGetTest() {
    3. Mono mono = reactiveStringRedisTemplate.opsForValue().get("human");
    4. mono.subscribe(System.out::println);
    5. }

    查询成功:

    ⑤ opsForList + Flux

    Ⅰ reactiveopsForList 新增操作

    新增 OpsForList 记录也能输出 Mono,还记得 subscribe() 的作用吗,我们在这做个示范,若第一条列表新增方法没调用 subscribe() 会如何👇

    1. @Test
    2. public void reactiveOpsForListTest1() {
    3. ReactiveListOperations listOperations = reactiveStringRedisTemplate.opsForList();
    4. //1、没有使用 subscribe()
    5. listOperations.leftPush("reactiveList", "hello1");
    6. //2、直接调用 subscribe()
    7. listOperations.leftPush("reactiveList", "world2").subscribe();
    8. //3、对输出的 mono 使用 subscribe()
    9. Mono mono = listOperations.leftPush("reactiveList", "yinyu3");
    10. mono.subscribe(System.out::println);
    11. }

    数据库界面 👇,可以看到未调用 subscribe() 的步骤未执行,这就是“所有的操作只有在订阅的那一刻才开始进行!!”的含义,同时用 block() 代替也可实现。

    Ⅱ 查询操作

    Flux 中 Flus 的作用相当于 List ,接触封装后打印出类似 List 的形式

    1. @Test
    2. public void reactiveOpsForListTest2() {
    3. Flux flux = reactiveStringRedisTemplate.opsForList().range("list",0,2);// 取 key 值为 list 的索引0到索引2的list
    4. flux.subscribe(System.out::println);
    5. }

    查询成功 👇

    ⑥ opsForHash+ Flux

    给每个新增的步骤加上 subscribe() ,需要注意的的是查询返回的 Mono 包装的是 Object

    1. @Test
    2. public void reactiveOpsForHashTest(){
    3. //1、reactiveOpsForHash 新增操作
    4. ReactiveHashOperations hashOperations = reactiveStringRedisTemplate.opsForHash();
    5. hashOperations.put("Reactivekey", "hashkey1", "hello").subscribe();
    6. hashOperations.put("Reactivekey", "hashkey2", "world").subscribe();
    7. hashOperations.put("Reactivekey", "hashkey3", "java").subscribe();
    8. //2、reactiveOpsForHash 查询操作
    9. Mono mono2 = reactiveStringRedisTemplate.opsForHash().get("Reactivekey","hashkey2");
    10. mono2.subscribe(System.out::println);
    11. }
    12. 新增成功 👇

       查询成功 👇

      ⑦ get + Flux + buffer()

      Ⅰ flux 经过 buffer 方法,转换成 list 传递给订阅者,buffer(1)表述1个元素成1个list,简单来说是对每个元素进行了列表封装。

      1. @Test
      2. public void bufferTest(){
      3. //opsForList() 查询
      4. Flux flux = reactiveStringRedisTemplate.opsForList().range("list",0,2);// 取 key 值为 list 的索引0到索引2的list
      5. flux.buffer(1).subscribe(System.out::println);
      6. }

      查询成功 👇

      Ⅱ 每经过一段时间,传递给订阅者一次数据,用到 Duration.ofSeconds(1)(1秒钟的延迟)

      1. @Test
      2. public void bufferTest(){
      3. //opsForList() 查询
      4. Flux flux = reactiveStringRedisTemplate.opsForList().range("list",0,2);// 取 key 值为 list 的索引0到索引2的list
      5. flux.buffer(Duration.ofSeconds(1)).subscribe(System.out::println);
      6. }

      ⑧ get + Flux + cache()

      它缓存 Flux/Mono 前面步骤的结果,直到调用 cache() 方法为止

      1. @Test
      2. @SneakyThrows
      3. public void cacheTest1(){
      4. Flux flux = reactiveStringRedisTemplate.opsForList().range("list",0,2);// 取 key 值为 list 的索引0到索引2的list
      5. var cached = flux.cache(Duration.ofSeconds(2));
      6. cached.subscribe(System.out::println);
      7. }

      ⑨  map()、filter()、take()等操作符

      本文举几个相对常用的操作符,详情的话还请看我之前写的这篇文章:【Java8:Stream流详解】

      Ⅰmap()

      接受一个函数作为参数,这个函数会被应用到每个元素上,并将其映射成一个新的元素。

      1. @Test
      2. public void mapTest(){
      3. //查询
      4. Flux flux = reactiveStringRedisTemplate.opsForList().range("list",0,2);// 取 key 值为 list 的索引0到索引2的list
      5. //map--对 Flux 里的每个元素加上 " Good",然后输出
      6. flux.map(e -> e + " Good").subscribe(System.out::println);
      7. //map--对 buffer(1) 后的每个列表元素里的 String 加上 " Great",然后输出每个列表元素里的 String
      8. flux.buffer(1).map(List->List.stream().map(e-> e + " Great")).subscribe(e->e.forEach(System.out::println));
      9. }

      操作成功 👇

      Ⅱ filter()

      过滤出符合条件的记录~

      1. @Test
      2. public void filterTest(){
      3. //查询
      4. Flux flux = reactiveStringRedisTemplate.opsForList().range("list",0,2);// 取 key 值为 list 的索引0到索引2的list
      5. flux.filter(e->e.equals("yinyu")).subscribe(System.out::println);
      6. }

      过滤成功 👇

      Ⅲ take()

      指定发送事件个数,以下为指定第一个事件 👇

      1. @Test
      2. public void takeTest(){
      3. //查询
      4. Flux flux = reactiveStringRedisTemplate.opsForList().range("list",0,2);// 取 key 值为 list 的索引0到索引2的list
      5. flux.take(1).subscribe(System.out::println);
      6. }

      指定成功 👇

      2.3 关于 ReactiveCrudRepository

      其实,Spring Data 也为 Reactive 形式访问数据库提供了支持,类似 CrudRepository ,只是多了 Reactor 响应式框架的内容,至于对数据流(Mono、Flux等)的操作参考前文即可,但可惜的是该形式不被 Redis 支持 👇,不过 MongoDB是可以用 ReactiveCrudRepository 的。

       


      参考文章

      Flux、Mono、Reactor 实战(史上最全)_架构师-尼恩的博客-CSDN博客

      Reactive的方式访问Redis - 腾讯云开发者社区-腾讯云 (tencent.com)

      flux 中的 buffer 的原理__lrs的博客-CSDN博客

    13. 相关阅读:
      mvvm框架下对wpf的DataGrid多选,右键操作
      NOI2022 游记
      【运维日常】infiniband网络架构,容器间跨机器不同网段通信
      如何将项目部署到服务器上(全套教程)
      RK3576:革新智能设备体验的高性能AI芯片
      【Java编程进阶】标识符和关键字
      流行框架:Glide的使用
      2-Dubbo架构设计与底层原理-SPI源码分析
      在CentOs7中设置tomcat应用systemd启动服务
      【数据库数据恢复】SQL Server数据库磁盘空间不足的数据恢复案例
    14. 原文地址:https://blog.csdn.net/weixin_51407397/article/details/127926313
      • 最新文章
      • 攻防演习之三天拿下官网站群
        数据安全治理学习——前期安全规划和安全管理体系建设
        企业安全 | 企业内一次钓鱼演练准备过程
        内网渗透测试 | Kerberos协议及其部分攻击手法
        0day的产生 | 不懂代码的"代码审计"
        安装scrcpy-client模块av模块异常,环境问题解决方案
        leetcode hot100【LeetCode 279. 完全平方数】java实现
        OpenWrt下安装Mosquitto
        AnatoMask论文汇总
        【AI日记】24.11.01 LangChain、openai api和github copilot
      • 热门文章
      • 十款代码表白小特效 一个比一个浪漫 赶紧收藏起来吧!!!
        奉劝各位学弟学妹们,该打造你的技术影响力了!
        五年了,我在 CSDN 的两个一百万。
        Java俄罗斯方块,老程序员花了一个周末,连接中学年代!
        面试官都震惊,你这网络基础可以啊!
        你真的会用百度吗?我不信 — 那些不为人知的搜索引擎语法
        心情不好的时候,用 Python 画棵樱花树送给自己吧
        通宵一晚做出来的一款类似CS的第一人称射击游戏Demo!原来做游戏也不是很难,连憨憨学妹都学会了!
        13 万字 C 语言从入门到精通保姆级教程2021 年版
        10行代码集2000张美女图,Python爬虫120例,再上征途
      Copyright © 2022 侵权请联系2656653265@qq.com    京ICP备2022015340号-1
      正则表达式工具 cron表达式工具 密码生成工具

      京公网安备 11010502049817号