接到一个需求,实现方案时需要提供一个HTTP接口,接口需要hold住5-8秒,轮询查询数据库,一旦数据库中值有变化,取出变化的值进行处理,处理完成后返回响应。这不就是长轮询吗,如何优雅的实现呢?
在这之前先简单介绍下长连接和短连接
概念: HTTP长链接是指客户端与服务器在一次TCP连接上可以传输多个HTTP请求和响应。在请求完成后,连接不会立即关闭,而是保持开放状态,等待可能的后续请求。
优势:
应用场景:
概念: HTTP短连接是指每个HTTP请求都需要建立一个新的TCP连接,请求完成后立即关闭连接。
优势:
应用场景:
所谓轮询,即是在一个循环周期内不断发起请求来得到数据的机制。只要有请求的的地方,都可以实现轮询,譬如各种事件驱动模型。它的长短是在于某次请求的返回周期。
短轮询指的是在循环周期内,不断发起请求,每一次请求都立即返回结果,根据新1日数据对比决定是否使用这个结果。
而长轮询及是在请求的过程中,若是服务器端数据并没有更新,那么则将这个连接挂起,直到服务器推送新的数据,再返回,然后再进入循环周期。
一个TCP连接是否为长连接,是通过设置HTTP的Connection Header来决定的,而且是需要两边都设置才有效。而一种轮询方式是否为长轮询,是根据服务端的处理方式来决定的,与客户端没有关系。
连接的长短是通过协议来规定和实现的。而轮询的长短,是服务器通过编程的方式手动挂起请求来实现的。
在 Spring 中,AsyncContext 是用于支持异步处理的一个重要的特性。它允许我们在 servlet 请求处理过程中,将长时间运行的操作放在一个单独的线程中执行,而不会阻塞其他请求的处理。
AsyncContext 在以下两种情况下特别有用:
长时间运行的操作:当我们需要执行一些耗时的操作,例如网络请求、数据库查询或其他 I/O 操作时,通过将这些操作放在一个新的线程中,可以避免阻塞 servlet 容器中的线程,提高应用的并发性能。
推送异步响应:有时候,我们可能需要推送异步产生的响应,而不是等到所有操作都完成后再下发响应。通过 AsyncContext,我们可以在任何时间点上触发异步响应,将结果返回给客户端。
使用 AsyncContext 的步骤如下:
startAsync()
方法,可以获取到当前请求的 AsyncContext 对象,从而启用异步处理模式。- HttpServletRequest request = ...;
- AsyncContext asyncContext = request.startAsync();
start()
方法,在新的线程中执行需要异步处理的任务。- asyncContext.start(() -> {
- // 异步任务逻辑
- });
complete()
方法,以表示异步操作完成。asyncContext.complete();
需要注意的是,我们在使用 AsyncContext 时需要特别注意线程安全。由于异步任务在单独的线程中执行,所以可能存在并发问题。因此,在编写异步任务逻辑时,需要注意线程安全性,使用合适的同步措施。
另外,AsyncContext 也支持超时设置、错误处理、事件监听等功能,这些可以通过相应的方法和回调进行配置。可以根据具体的需求使用这些功能来优化异步处理的逻辑。
总结来说,Spring 的 AsyncContext 提供了方便的异步处理机制,可以提高应用的并发性能,并支持推送异步响应,使得应用更具有响应性和可伸缩性。
- import com.google.common.util.concurrent.ThreadFactoryBuilder;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.data.redis.core.RedisTemplate;
- import org.springframework.web.bind.annotation.PostMapping;
- import org.springframework.web.bind.annotation.RequestBody;
- import org.springframework.web.bind.annotation.RequestMapping;
- import org.springframework.web.bind.annotation.RestController;
-
- import javax.annotation.Resource;
- import javax.servlet.AsyncContext;
- import javax.servlet.AsyncEvent;
- import javax.servlet.AsyncListener;
- import javax.servlet.http.HttpServletRequest;
- import javax.servlet.http.HttpServletResponse;
- import java.io.IOException;
- import java.util.concurrent.*;
-
- @RestController
- @RequestMapping("/api/test")
- @Slf4j
- public class AsyncTestController {
-
- @Resource
- private RedisTemplate
redisTemplate; -
- private final ExecutorService timeoutChecker = new ThreadPoolExecutor(1,1,1000,TimeUnit.SECONDS,new ArrayBlockingQueue<>(1000));
- private static final ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("longPolling-timeout-checker-%d").build();
-
-
- // private static boolean result = false;
-
- @PostMapping("/async")
- public void async(HttpServletRequest request, HttpServletResponse response) {
- // 创建AsyncContext
- AsyncContext asyncContext = request.startAsync(request, response);
- // 设置处理超时时间8s
- asyncContext.setTimeout(8000L);
- // asyncContext监听
- AsyncTestListener asyncListener = new AsyncTestListener(redisTemplate,asyncContext);
- asyncContext.addListener(asyncListener);
- // 定时处理业务,处理成功后asyncContext.complete();完成异步请求
- asyncContext.start(asyncListener);
- }
-
- // 模拟业务处理完成
- @PostMapping("/set")
- public ResultModel notify(String key, String value) {
- redisTemplate.opsForValue().set(key, value);
- return ResultModel.success();
- }
-
- @PostMapping("/get")
- public ResultModel get(String key) {
- String s = redisTemplate.opsForValue().get(key);
- return ResultModel.success(s);
- }
-
- @PostMapping("/del")
- public ResultModel del(String key) {
- redisTemplate.delete(key);
- return ResultModel.success();
- }
- }
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.data.redis.core.RedisTemplate;
-
- import javax.servlet.AsyncContext;
- import javax.servlet.AsyncEvent;
- import javax.servlet.AsyncListener;
- import java.io.IOException;
-
-
- @Slf4j
- public class AsyncTestListener implements AsyncListener,Runnable {
- boolean isComplete;
-
- private RedisTemplate
redisTemplate; - private AsyncContext asyncContext;
- public JdAsyncTestListener(RedisTemplate
redisTemplate, AsyncContext asyncContext) { - this.redisTemplate = redisTemplate;
- this.asyncContext = asyncContext;
- }
-
- @Override
- public void run() {
- try {
- while(true){
- if(isComplete){
- log.info("已经退出");
- break;
- }
- boolean b = redisTemplate.opsForValue().get(1) != null;
- log.info("获取标志位:"+b);
- Thread.sleep(300);
- if (b) {
- asyncContext.getResponse().getWriter().print(1);
- asyncContext.complete();
- }
- }
-
- } catch (IOException e) {
- e.printStackTrace();
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public void onComplete(AsyncEvent asyncEvent) throws IOException {
- log.info("结束了");
- isComplete = true;
-
- }
-
- @Override
- public void onTimeout(AsyncEvent asyncEvent) throws IOException {
- log.info("超时了");
- isComplete = true;
- }
-
- @Override
- public void onError(AsyncEvent asyncEvent) throws IOException {
-
- }
-
- @Override
- public void onStartAsync(AsyncEvent asyncEvent) throws IOException {
-
- }
- }
- import com.alibaba.fastjson.JSON;
- import com.google.common.collect.Lists;
- import com.google.common.collect.Maps;
- import com.google.common.util.concurrent.ThreadFactoryBuilder;
- import lombok.extern.slf4j.Slf4j;
- import org.apache.commons.lang3.StringUtils;
- import org.apache.dubbo.config.annotation.Reference;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.data.redis.core.RedisTemplate;
- import org.springframework.validation.annotation.Validated;
- import org.springframework.web.bind.annotation.PostMapping;
- import org.springframework.web.bind.annotation.RequestBody;
- import org.springframework.web.bind.annotation.RequestMapping;
- import org.springframework.web.bind.annotation.RestController;
-
- import javax.annotation.Resource;
- import javax.servlet.AsyncContext;
- import javax.servlet.AsyncEvent;
- import javax.servlet.AsyncListener;
- import javax.servlet.http.HttpServletRequest;
- import javax.servlet.http.HttpServletResponse;
- import java.io.IOException;
- import java.util.List;
- import java.util.Map;
- import java.util.concurrent.*;
-
- @Validated
- @RestController
- @RequestMapping("/api/test")
- @Slf4j
- public class TestController {
-
- @Resource
- private RedisTemplate
redisTemplate; -
- private final ScheduledExecutorService timeoutChecker = new ScheduledThreadPoolExecutor(10, threadFactory);
- private static final ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("longPolling-timeout-checker-%d").build();
-
- private static boolean result = false;
-
- private final boolean isTimeout = false;
-
-
- /**
- * 消息
- *
- * @return
- */
- @PostMapping("/test")
- public void callback(@RequestBody TestLongPollRequest testLongPollRequest, HttpServletRequest request, HttpServletResponse response) {
- // 创建AsyncContext
- AsyncContext asyncContext = request.startAsync(request, response);
- String test = LongPollRequest.getctomerId();
- // 设置处理超时时间8s
- asyncContext.setTimeout(8000L);
- // asyncContext监听
- asyncContext.addListener(new AsyncListener() {
- @Override
- public void onComplete(AsyncEvent asyncEvent) throws IOException {
- log.info("onComplete={}", asyncEvent);
- }
-
- @Override
- public void onTimeout(AsyncEvent asyncEvent) throws IOException {
- log.info("onTimeout={}", asyncEvent);
- ConcurrentHashMap
map = new ConcurrentHashMap<>(); - map.put("code", "500"); asyncContext.getResponse().getWriter().print(JSON.toJSONString(map));
- asyncContext.complete();
- }
-
- @Override
- public void onError(AsyncEvent asyncEvent) throws IOException {
- log.info("onError={}", asyncEvent);
- }
-
- @Override
- public void onStartAsync(AsyncEvent asyncEvent) throws IOException {
- log.info("onStartAsync={}", asyncEvent);
- }
- });
- // 定时处理业务,处理成功后asyncContext.complete();完成异步请求
- timeoutChecker.scheduleAtFixedRate(() -> {
- try {
- String redisKey = getRedisKey(customerId);
- String redisValue = redisTemplate.opsForValue().get(redisKey);
- result = StringUtils.isNotBlank(redisValue);
- if (result) {
- //todo 长轮询查询数据库。通过customerId查询
- send(test, redisValue);
- ConcurrentHashMap
map = new ConcurrentHashMap<>(); - map.put("code", "200");
- map.put("msg", redisValue);
- asyncContext.getResponse().getWriter().print(JSON.toJSONString(map));
- asyncContext.complete();
- }
- } catch (IOException e) {
- e.printStackTrace();
- }
- }, 0, 100L, TimeUnit.MILLISECONDS);
- }
-
- /**
- * 发送消息
- */
- private void send(String test, String content) {
-
- }
-
- }