响应式编程基于 Project Reactor(Reactor 是一个运行在 Java8 之上的响应式框架)的思想,当你做一个带有一定延迟的才能够返回的 IO 操作时,不会阻塞,而是立刻返回一个流,并且订阅这个流,当这个流上产生了返回数据,可以立刻得到通知并调用回调函数处理数据。本文以 Reactive 方式访问 Redis 为例介绍 Project Reactor 响应式流框架~
目录
①初步环境搭建见于 【Spring 04】 链接的 2.1 小节
②然后新增 RedisConfig 继承 RedisReactiveAutoConfiguration 类
2.2 基于 ReactiveStringRedisTemplate 实现 CRUD
Reactor 是一个运行在 Java8 之上满足 Reactice 规范的响应式框架,它提供了一组响应式风格的 API,主要目的:希望用少量、有限个数的线程来满足高负载的需要。IO阻塞浪费系统性能,只有纯异步处理才能发挥系统的全部性能。
介绍一下最重要的两个类,可能单纯的介绍会让大家觉得云里雾里,但是看到后边 Redis 的实践内容就会恍然大悟了。
Reactor 有两个核心类: Flux 和 Mono,这两个类都实现 Publisher 接口。

简单来说,Mono 表示 0~1 的序列, Flux用来表示 0~N 个元素序列,
本文 Reactive 方式访问 Redis 为例,因为
路径:src/main/java/com/yinyu/redisdemo/config/RedisConfig.java
- @Configuration
- public class RedisConfig extends RedisReactiveAutoConfiguration {
-
- }
本文采用 ReactiveStringRedisTemplate,展示性更强,当然ReactiveRedisTemplate 也可使用,这两者的区别类似 RedisTemplate 和 StringRedisTemplate,见于 【Spring 04】 链接的 2.3.1 。
opsForValue、opsForList、opsForSet、opsForZSet、opsForHash 等具体方法的操作也与 StringRedisTemplate 一致,见于 【Spring 04】 链接的 2.3.1 小节,主要是返回内容有所区别(见下文),本文选择重要的内容,并简单介绍几个操作符~
路径:src/test/java/com/yinyu/redisdemo/reactiveStringRedisTemplateTest.java
首先注入 ReactiveStringRedisTemplate 类:
- @Autowired
- private ReactiveStringRedisTemplate reactiveStringRedisTemplate;
Ⅰmono.block() -- 返回 mono 内的元素
此处用到了 Reactor 的 Mono,同时 Mono 包含 Boolean,用于返回插入操作是否成功~
因为 Boolean 被封装在 Mono 内,所以无法直接得知 True 还是 False,那么 mono.block() 就起到了这个作用,简单来说是接触了 Mono 的这层封装,从而返回 Boolean。
- @Test
- public void reactiveOpsForValueSetTest1() {
- Mono
mono = reactiveStringRedisTemplate.opsForValue().set("human","yinyu"); - System.out.println(mono.block());
- }
插入成功 👇

控制台输出成功 👇

Ⅱ mono.subscribe() -- 订阅
所有的操作只有在订阅的那一刻才开始进行!!!详情链接:【Reactor学】
本文只用到 subscribe(System.out::println) ,里边就是函数式编程写法~
- @Test
- public void reactiveOpsForValueSetTest1() {
- Mono
mono = reactiveStringRedisTemplate.opsForValue().set("Chinese","yinyu"); - mono.subscribe(System.out::println);
- }
插入成功 👇

控制台输出成功 👇

Ⅲ mono.subscribe().dispose()
表示彻底停止正在推送数据中的Flux或Mono流
- @Test
- public void reactiveOpsForValueSetTest1() {
- Mono
mono = reactiveStringRedisTemplate.opsForValue().set("Chinese","yinyu"); - mono.subscribe(System.out::println).dispose();
- }
接下来都用 subscribe(System.out::println) 控制台输出,返回的是 append 后字符串的长度
- @Test
- public void reactiveOpsForValueAppendTest() {
- Mono
mono = reactiveStringRedisTemplate.opsForValue().append("human","+java"); - mono.subscribe(System.out::println);
- }
新增字符到末尾成功👇

控制台输出 append 后字符串的长度👇

根据 key 删除记录,返回 Mono 包装的 Boolean
- @Test
- public void reactiveOpsForValueDeleteTest() {
- Mono
mono = reactiveStringRedisTemplate.opsForValue().delete("human"); - mono.subscribe(System.out::println);
- }
根据 key 查询记录,返回 Mono 包装的 String
- @Test
- public void reactiveOpsForValueGetTest() {
- Mono
mono = reactiveStringRedisTemplate.opsForValue().get("human"); - mono.subscribe(System.out::println);
- }
查询成功:

Ⅰ reactiveopsForList 新增操作
新增 OpsForList 记录也能输出 Mono
- @Test
- public void reactiveOpsForListTest1() {
- ReactiveListOperations
listOperations = reactiveStringRedisTemplate.opsForList(); - //1、没有使用 subscribe()
- listOperations.leftPush("reactiveList", "hello1");
- //2、直接调用 subscribe()
- listOperations.leftPush("reactiveList", "world2").subscribe();
- //3、对输出的 mono 使用 subscribe()
- Mono
mono = listOperations.leftPush("reactiveList", "yinyu3"); - mono.subscribe(System.out::println);
- }
数据库界面 👇,可以看到未调用 subscribe() 的步骤未执行,这就是“所有的操作只有在订阅的那一刻才开始进行!!”的含义,同时用 block() 代替也可实现。

Ⅱ 查询操作
Flux
- @Test
- public void reactiveOpsForListTest2() {
- Flux
flux = reactiveStringRedisTemplate.opsForList().range("list",0,2);// 取 key 值为 list 的索引0到索引2的list - flux.subscribe(System.out::println);
- }
查询成功 👇

给每个新增的步骤加上 subscribe() ,需要注意的的是查询返回的 Mono 包装的是 Object
- @Test
- public void reactiveOpsForHashTest(){
- //1、reactiveOpsForHash 新增操作
- ReactiveHashOperations
hashOperations = reactiveStringRedisTemplate.opsForHash(); - hashOperations.put("Reactivekey", "hashkey1", "hello").subscribe();
- hashOperations.put("Reactivekey", "hashkey2", "world").subscribe();
- hashOperations.put("Reactivekey", "hashkey3", "java").subscribe();
- //2、reactiveOpsForHash 查询操作
- Mono
- mono2.subscribe(System.out::println);
- }
新增成功 👇

查询成功 👇

Ⅰ flux 经过 buffer 方法,转换成 list 传递给订阅者,buffer(1)表述1个元素成1个list,简单来说是对每个元素进行了列表封装。
- @Test
- public void bufferTest(){
- //opsForList() 查询
- Flux
flux = reactiveStringRedisTemplate.opsForList().range("list",0,2);// 取 key 值为 list 的索引0到索引2的list - flux.buffer(1).subscribe(System.out::println);
- }
查询成功 👇

Ⅱ 每经过一段时间,传递给订阅者一次数据,用到 Duration.ofSeconds(1)(1秒钟的延迟)
- @Test
- public void bufferTest(){
- //opsForList() 查询
- Flux
flux = reactiveStringRedisTemplate.opsForList().range("list",0,2);// 取 key 值为 list 的索引0到索引2的list - flux.buffer(Duration.ofSeconds(1)).subscribe(System.out::println);
- }
它缓存 Flux/Mono 前面步骤的结果,直到调用 cache() 方法为止
- @Test
- @SneakyThrows
- public void cacheTest1(){
- Flux
flux = reactiveStringRedisTemplate.opsForList().range("list",0,2);// 取 key 值为 list 的索引0到索引2的list - var cached = flux.cache(Duration.ofSeconds(2));
- cached.subscribe(System.out::println);
- }
本文举几个相对常用的操作符,详情的话还请看我之前写的这篇文章:【Java8:Stream流详解】
Ⅰmap()
接受一个函数作为参数,这个函数会被应用到每个元素上,并将其映射成一个新的元素。
- @Test
- public void mapTest(){
- //查询
- Flux
flux = reactiveStringRedisTemplate.opsForList().range("list",0,2);// 取 key 值为 list 的索引0到索引2的list - //map--对 Flux 里的每个元素加上 " Good",然后输出
- flux.map(e -> e + " Good").subscribe(System.out::println);
- //map--对 buffer(1) 后的每个列表元素里的 String 加上 " Great",然后输出每个列表元素里的 String
- flux.buffer(1).map(List->List.stream().map(e-> e + " Great")).subscribe(e->e.forEach(System.out::println));
- }
操作成功 👇

Ⅱ filter()
过滤出符合条件的记录~
- @Test
- public void filterTest(){
- //查询
- Flux
flux = reactiveStringRedisTemplate.opsForList().range("list",0,2);// 取 key 值为 list 的索引0到索引2的list - flux.filter(e->e.equals("yinyu")).subscribe(System.out::println);
- }
过滤成功 👇

Ⅲ take()
指定发送事件个数,以下为指定第一个事件 👇
- @Test
- public void takeTest(){
- //查询
- Flux
flux = reactiveStringRedisTemplate.opsForList().range("list",0,2);// 取 key 值为 list 的索引0到索引2的list - flux.take(1).subscribe(System.out::println);
- }
指定成功 👇

其实,Spring Data 也为 Reactive 形式访问数据库提供了支持,类似 CrudRepository ,只是多了 Reactor 响应式框架的内容,至于对数据流(Mono、Flux等)的操作参考前文即可,但可惜的是该形式不被 Redis 支持 👇,不过 MongoDB是可以用 ReactiveCrudRepository 的。

Flux、Mono、Reactor 实战(史上最全)_架构师-尼恩的博客-CSDN博客