Nacos长轮询的基本思路是通过Servlet3.0后提供的异步处理能力,把请求的任务添加至队列中,在有数据发生变更时,从队列中取出相应请求,然后响应请求,负责拉取数据的接口通过延时任务完成超时处理,如果等到设定的超时时间还没有数据变更时,就主动推送超时信息完成响应。
关于心跳检测参考:https://blog.csdn.net/sunquan291/article/details/126556366
如下是源码的实现:
@RestController
@RequestMapping("/nacos")
public class NacosLongPollingController extends HttpServlet {
@Autowired
private NacosLongPollingService nacosLongPollingService;
@RequestMapping("/pull")
@Override
protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
String dataId = req.getParameter("dataId");
if (StringUtils.isEmpty(dataId)) {
throw new IllegalArgumentException("请求参数异常,dataId能为空");
}
nacosLongPollingService.doGet(dataId, req, resp);
}
@GetMapping("/push")
public Result push(@RequestParam("dataId") String dataId, @RequestParam("data") String data) {
if (StringUtils.isEmpty(dataId) || StringUtils.isEmpty(data)) {
throw new IllegalArgumentException("请求参数异常,dataId和data均不能为空");
}
nacosLongPollingService.push(dataId, data);
return new Result(data);
}
}
package com.example.demo.control;
public interface NacosLongPollingService {
void doGet(String dataId, HttpServletRequest req, HttpServletResponse resp);
void push(String dataId, String data);
Map<String, String> getDataStage();
}
package com.example.demo.control;
@Service
public class NacosLongPollingServiceImpl implements NacosLongPollingService {
private Map<String, String> dataStage = new HashMap<>();
final ScheduledExecutorService scheduler;
final Queue<NacosPullTask> nacosPullTasks;
public NacosLongPollingServiceImpl() {
scheduler = new ScheduledThreadPoolExecutor(1, r -> {
Thread t = new Thread(r);
t.setName("NacosLongPollingTask");
t.setDaemon(true);
return t;
});
nacosPullTasks = new ConcurrentLinkedQueue<>();
scheduler.scheduleAtFixedRate(() -> System.out.println("线程存活状态:" + new Date()), 0L, 5, TimeUnit.SECONDS);
}
@Override
public void doGet(String dataId, HttpServletRequest req, HttpServletResponse resp) {
// 一定要由当前HTTP线程调用,如果放在task线程容器会立即发送响应 相当于拿到一个future
final AsyncContext asyncContext = req.startAsync();
scheduler.execute(new NacosPullTask(nacosPullTasks, scheduler, asyncContext, dataId, req, resp));
}
@Override
public void push(String dataId, String data) {
scheduler.schedule(new NacosPushTask(this,dataId, data, nacosPullTasks), 0L, TimeUnit.MILLISECONDS);
}
@Override
public Map<String, String> getDataStage() {
return dataStage;
}
}
package com.example.demo.control;
@Slf4j
public class NacosPullTask implements Runnable {
Queue<NacosPullTask> nacosPullTasks;
ScheduledExecutorService scheduler;
AsyncContext asyncContext;
String dataId;
HttpServletRequest req;
HttpServletResponse resp;
Future<?> asyncTimeoutFuture;
public NacosPullTask(Queue<NacosPullTask> nacosPullTasks, ScheduledExecutorService scheduler, AsyncContext asyncContext, String dataId, HttpServletRequest req, HttpServletResponse resp) {
this.nacosPullTasks = nacosPullTasks;
this.scheduler = scheduler;
this.asyncContext = asyncContext;
this.dataId = dataId;
this.req = req;
this.resp = resp;
}
@Override
public void run() {
asyncTimeoutFuture = scheduler.schedule(() -> {
//超时等待10s
log.info("10秒后超时结束长轮询任务:" + new Date());
nacosPullTasks.remove(NacosPullTask.this);
sendResponse("time-out");
}, 10, TimeUnit.SECONDS);
nacosPullTasks.add(this);
}
public void sendResponse(String result) {
System.out.println("发送响应:" + new Date());
//取消等待执行的任务,避免已经响完了,还有资源被占用
if (asyncTimeoutFuture != null) {
//设置为true会立即中断执行中的任务,false对执行中的任务无影响,但会取消等待执行的任务
asyncTimeoutFuture.cancel(false);
}
//设置页码编码
resp.setContentType("application/json; charset=utf-8");
resp.setCharacterEncoding("utf-8");
//禁用缓存
resp.setHeader("Pragma", "no-cache");
resp.setHeader("Cache-Control", "no-cache,no-store");
resp.setDateHeader("Expires", 0);
resp.setStatus(HttpServletResponse.SC_OK);
//输出Json流
sendJsonResult(result);
}
private void sendJsonResult(String result) {
Result<String> pojoResult = new Result<>();
pojoResult.setCode(200);
pojoResult.setSuccess(!StringUtils.isEmpty(result));
pojoResult.setData(result);
PrintWriter writer = null;
try {
writer = asyncContext.getResponse().getWriter();
writer.write(pojoResult.toString());
writer.flush();
} catch (IOException e) {
e.printStackTrace();
} finally {
asyncContext.complete();
if (null != writer) {
writer.close();
}
}
}
}
package com.example.demo.control;
public class NacosPushTask implements Runnable {
private String dataId;
private String data;
private Queue<NacosPullTask> nacosPullTasks;
NacosLongPollingService nacosLongPollingService;
public NacosPushTask(NacosLongPollingService service, String dataId, String data, Queue<NacosPullTask> nacosPullTasks) {
this.nacosLongPollingService = service;
this.dataId = dataId;
this.data = data;
this.nacosPullTasks = nacosPullTasks;
}
@Override
public void run() {
Iterator<NacosPullTask> iterator = nacosPullTasks.iterator();
while (iterator.hasNext()) {
NacosPullTask nacosPullTask = iterator.next();
if (dataId != null && dataId.equals(nacosPullTask.dataId)) {
if (nacosLongPollingService.getDataStage().containsKey(dataId)) {
if (!nacosLongPollingService.getDataStage().get(dataId).equals(data)) {
//并且发生了修改
iterator.remove();
nacosPullTask.sendResponse(data);
}
} else {
iterator.remove();
nacosPullTask.sendResponse(data);
}
}
}
nacosLongPollingService.getDataStage().put(dataId, data);
}
}
拉数据
修改数据
结果:拉数据时,请求会被hold住10秒,超时后会自动返回,如果10秒内对应key的数据变化,则会将数据立即返回。
其原理图如下所示:
那么当config配置进行更新变化的时候就需要相关的应用进行跟着相关变化,这里就有个问题了那客户端怎么知道配置变化了呢,客户端是什么时候进行更新的呢?涉及更新方式就有2种,推和拉;
客户端主动从服务端定时拉取配置,如果有变化则进行替换。
服务端主动把变化的内容发送给客户端。
两种方式各有利弊,比如对于推的模式来讲,就需要服务端与客户端进行长连接,那么这种就会出现服务端需要耗费大量资源维护这个链接,并且还得加入心跳机制来维护连接有效性。而对于拉的模式则需要客户端定时去服务端访问,那么就会存在时间间隔,也就保证不了数据的实时性。那nacos采用哪种模式呢?nacos是采用了拉模式是一种特殊的拉模式,也就是我们通常听的长轮询机制。
如果客户端拉取发现客户端与服务端配置是一致的(其实是通过MD5判断的)那么服务端会先拿住这个请求不返回,直到这段时间内配置有变化了才把刚才拿住的请求返回。他的步骤是nacos服务端收到请求后检查配置是否发生变化,如果没有则开启定时任务,延迟29.5s执行。同时把当前客户端的连接请求放入队列。那么此时服务端并没有将结果返回给客户端,当有以下2种情况的时候才触发返回。
就是等待29.5s后触发自动检查
在29.5s内有配置进行了更改
经过这2种情况才完成这次的pull操作。这种的好处就是保证了客户端的配置能及时变化更新,也减少了轮询给服务端带来的压力。所以之前文章我们说过这个长链接回话超时时间默认是30s。