一、什么是响应式编程
1.1 什么是WebFlux
WebFlux是从Spring Framework5.0以后开始引入的响应式web编程框架。与传统的Spring mvc不同WebFlux不需要Servlet API,在完全异步且无阻塞的通过Reactor项目实现Reactive Streams 规范。
WebFlux可以在有限资源下提高系统的吞吐量和伸缩性,这意味着在资源相同的情况下WebFlux可以处理更多的请求。
1.2 MVC和WebFlux的比较
(1) 工作方式:
mvc的工作流程:主线程接收到请求(request)-> ..... -> 返回数据. 整个过程是单线程阻塞的,在处理好数据后才返回数据,如果用户请求比较多,那么吞吐量就比较低。
(2)WebFlux:
WebFlux的工作流程:主线程得到请求-> 立即返回数据与函数的组合(Mono或Flux) -> 开启一个新Work线程准备数据 -> 执行业务操作 --> Work线程工作完成 ---> 返回数据。
| 区别 | Spring mvc | Spring WebFlux |
| 地址映射 | @Controller @RequestMapping 等 | Router Functions 提供函数式的API,用于创建Router Handler Filter
|
| 数据流 | Servlet API | Reactive Streams: 一种支持背压的异步数据量标准。WebFlux默认使用的是Reactor。 |
| 容器 | Tomcat Jetty Undertow | Tomcat Jetty Netty Undertow |
| IO 模型 | 同步的、阻塞的IO | 异步非阻塞的IO |
| 吞吐量 | 低 | 高 |
| 数据库 | Sql NoSql | 支持NoSql,不支持Sql |
| 请求和响应 | HttpServletRequest和HttpServletResponse | ServletResponse和ServletRequest |
| 业务处理性能 | 相同 | 相同 |
2. 认识Mono和Flux
2.1 什么是Mono和Flux
Mono和Flux是Reactor中的两个基本概念。
Mono和Flux都是事件发布者,为消费者提供订阅接口。当有事件发生时,Mono和Flux会回调消费者的相应方法,然后通知消费者相应的事件。这也是响应式编程模型。
Mono和Flux用于处理异步数据流,它不是MVC中直接返回String或Object,而是将异步数据流包装成Mono或Flux对象。
2.2 Mono和Flux的区别
Flux可以发送多个item(例如:列表)。这些item可以经过若干算子(operators)后才被订阅。Mono只能发送一个item(例如:根据id查询)
Mono 主要用于返回单个数据,Flux用于返回多个数据。
二、开发WebFlux的流程:
2.1 配置WebFlux依赖:
- <dependency>
- <groupId>org.springframework.bootgroupId>
- <artifactId>spring-boot-starter-webfluxartifactId>
- dependency>
2.2 编写控制器:
- mport org.springframework.web.bind.annotation.GetMapping;
- import org.springframework.web.bind.annotation.RestController;
- import reactor.core.publisher.Mono;
-
- @RestController
- public class HelloController {
-
- @GetMapping("/")
- public Mono
hello(){ - return Mono.just("Hello WebFlux Test dev!");
- }
- }

2.3 添加mongo依赖并创建实体类:
- <dependency>
- <groupId>org.springframework.bootgroupId>
- <artifactId>spring-boot-starter-data-mongodb-reactiveartifactId>
- dependency>
- @Data
- @AllArgsConstructor
- @NoArgsConstructor
- @Document
- public class User {
- @Id
- private String id;
- private String name;
- private Integer age;
- }
2.4 创建DAO
- import com.example.webfluxdemo.entity.User;
- import org.springframework.data.mongodb.repository.ReactiveMongoRepository;
-
- public interface UserMongoDao extends ReactiveMongoRepository
{ - }
2.5 编写Handler:
- import org.springframework.http.MediaType;
- import org.springframework.stereotype.Component;
- import org.springframework.web.reactive.function.server.ServerRequest;
- import org.springframework.web.reactive.function.server.ServerResponse;
- import reactor.core.publisher.Mono;
-
- @Component
- public class HelloWorldHandler {
-
- public Mono
sayHello(ServerRequest serverRequest){ - return ServerResponse.ok().contentType(MediaType.TEXT_PLAIN)
- .body(Mono.just("This is WebFlux Demo!"),String.class);
- }
- }
2.6 编写路由:
- @Configuration
- public class Router {
-
- @Resource
- private HelloWorldHandler helloWorldHandler;
-
- @Bean
- public RouterFunction
getString(){ - return route(GET("/hello"),req->helloWorldHandler.sayHello(req));
- }
- }
2.7 编写控制器:
- mport com.example.webfluxdemo.dao.UserMongoDao;
- import com.example.webfluxdemo.entity.User;
- import org.springframework.http.HttpStatus;
- import org.springframework.http.MediaType;
- import org.springframework.http.ResponseEntity;
- import org.springframework.web.bind.annotation.*;
- import reactor.core.publisher.Flux;
- import reactor.core.publisher.Mono;
-
- import javax.annotation.PostConstruct;
- import javax.annotation.Resource;
- import java.time.Duration;
- import java.util.HashMap;
- import java.util.Map;
- import java.util.stream.Collectors;
-
- @RestController
- @RequestMapping("/user")
- public class UserController {
-
- @Resource
- private UserMongoDao userMongoDao;
-
- @GetMapping(value ="/list")
- public Flux
findAll(){ - return userMongoDao.findAll();
- }
-
- @PostMapping("")
- public Mono
create( User user){ - return this.userMongoDao.save(user);
- }
-
- @GetMapping("/{id}")
- public Mono
> getUserById(@PathVariable("id") String id){ - return this.userMongoDao.findById(id)
- .map(getUser -> ResponseEntity.ok(getUser))
- .defaultIfEmpty(ResponseEntity.notFound().build());
- }
- @DeleteMapping("/{id}")
- public Mono
> delete(@PathVariable("id")String id){ - return userMongoDao.findById(id)
- .flatMap(existingUser ->
- userMongoDao.delete(existingUser)
- .then(Mono.just(new ResponseEntity
(HttpStatus.OK))) - )
- .defaultIfEmpty(new ResponseEntity<>(HttpStatus.NOT_FOUND));
- }
-
- @PutMapping("/{id}")
- public Mono update(@PathVariable("id")String id,User user){
- return this.userMongoDao.findById(id)
- .flatMap(existingUser-> {
- existingUser.setName(user.getName());
- existingUser.setAge(user.getAge());
- return userMongoDao.save(user);
- })
- .map(update-> new ResponseEntity<>(update,HttpStatus.OK))
- .defaultIfEmpty(new ResponseEntity<>(HttpStatus.NOT_FOUND));
- }
-
- @GetMapping(value ="/listdelay",produces = MediaType.APPLICATION_STREAM_JSON_VALUE)
- public Flux
getAll(){ - return userMongoDao.findAll().delayElements(Duration.ofSeconds(1));
- }
- }

