消息推送(push
)通常是指网站的运营工作等人员,通过某种工具对用户当前网页或移动设备APP进行的主动消息推送。推送的场景比较多,比如有人关注我的公众号,这时我就会收到一条推送消息,以此来吸引我点击打开应用,消息推送一般又分为web端消息推送
和移动端消息推送
。
另外注意主流浏览器只支持6个连接
详情可以参考:我有 7种 实现web实时消息推送的方案,7种
短轮询
客户端定期向服务器发送请求。如果服务器有更新,它会向客户端发送响应并关闭连接。如果服务器没有更新,它也会向客户端发送一个响应并关闭连接。
长轮询
客户端向服务器发送请求。如果服务器有更新,它会向客户端发送响应并关闭连接。如果服务器没有更新,它会保持连接直到更新可用。当更新可用时,服务器向客户端发送响应并关闭连接。如果更新在某个超时时间内不可用,服务器会向客户端发送响应并关闭连接。
iframe流
iframe流就是在页面中插入一个隐藏的标签,通过在
src
中请求消息数量API接口,由此在服务端和客户端之间创建一条长连接,服务端持续向iframe
传输数据
SSE (推荐的方式)
SSE 是一种在基于浏览器的 Web 应用程序中仅从服务器向客户端发送文本消息的技术。SSE基于 HTTP 协议中的持久连接, 具有由 W3C 标准化的网络协议和 EventSource 客户端接口,作为 HTML5 标准套件的一部分
MQTT
该协议将消息的发布者(publisher
)与订阅者(subscriber
)进行分离,因此可以在不可靠的网络环境中,为远程连接的设备提供可靠的消息服务,使用方式与传统的MQ有点类似,可以做智能家居通讯
websocket
WebSocket 是一种在 Web 应用程序中实现同时、双向、实时通信的技术。WebSocket 基于 HTTP 以外的协议(TCP),因此可能需要额外设置网络基础设施(代理服务器、NAT、防火墙等)
三方平台 (例如极光推送)
SSE(Server Sent Event
),直译为服务器发送事件,顾名思义,也就是客户端可以获取到服务器发送的事件。我们常见的 http 交互方式是客户端发起请求,服务端响应,然后一次请求完毕;但是在 sse 的场景下,客户端发起请求,连接一直保持,服务端有数据就可以返回数据给客户端,这个返回可以是多次间隔的方式
长连接
服务端可以向客户端推送信息
从 sse 的特点出发,我们可以大致的判断出它的应用场景,需要轮询获取服务端最新数据的 case 下,多半是可以用它的。比如显示当前网站在线的实时人数,法币汇率显示当前实时汇率,电商大促的实时成交额等等
WebSocket可以参考:SpringBoot项目整合WebSocket几种方式
sse 是单通道,只能服务端向客户端发消息;而 webscoket 是双通道
sse | websocket |
---|---|
http 协议 | 独立的 websocket 协议 |
轻量,使用简单 | 相对复杂 |
默认支持断线重连 | 需要自己实现断线重连 |
文本传输 | 二进制传输 |
支持自定义发送的消息类型 | - |
使用 @RestController
注解创建一个控制器类(Controller)
创建一个方法来创建一个客户端连接,它返回一个 SseEmitter,处理 GET 请求并产生(produces)文本/事件流 (text/event-stream
)
创建一个新的 SseEmitter, 保存它并从方法中返回
在另一个线程中异步发送事件, 先拿到保存的 SseEmitter 并根据需要多次调用调用SseEmitter.send()
方法
完成事件发送, 调用 SseEmitter.complete()
方法
要异常完成发送事件,请调用 SseEmitter.completeWithError()
方法
可以参考:WebFlux入门详解
使用 @RestController
注解创建一个控制器类(Controller)
创建一个方法来创建一个客户端连接,它返回一个 Flux,处理 GET 请求并产生(produces)文本/事件流 (text/event-stream
)
创建一个新的 Flux对象且在方法中返回它
在web
端消息推送功能中,由于传统的HTTP
协议是由客户端主动发起请求,服务端才会响应。基本的ajax
轮询技术便是如此。而在SSE
中,浏览发送一个请求给服务端,通过响应头中的Content-Type:text/event-stream
等向客户端声名这是一个长连接,发送的是流数据,这样客户端就不会关闭连接,一直等待服务端发送数据。
如果服务器返回的数据中包含了事件标识符,浏览器会记录最后一次接收的事件的标识符。如果与服务器的连接中断,当浏览器再次进行连接时,会通过头来声明最后一次接收的事件的标识符。服务器端可以通过浏览器发送的事件标识符来确定从哪个事件来继续连接
保证数据的完整性 :客户端在每次接收到消息时,会把消息的id
字段作为内部属性 Last-Event-ID
储存起来。SSE
默认支持断线重连机制,在连接断开时会 触发EventSource
的error
事件,同时自动重连。再次连接成功时 EventSource
会把Last-Event-ID
属性作为请求头发送给服务器,这样服务器就可以根据这个Last-Event-ID
作出相应的处理。这里需要注意⚠️的是,id
字段不是必须的,服务器有可能不会在消息中带上id
字段,这样子客户端就不会存在Last-Event-ID
这个属性。所以为了保证数据可靠,我们需要在每条消息上带上id
字段。
SSE
不支持IE
浏览器
后端代码
@RestController
@CrossOrigin
@RequestMapping("/sse")
public class SseEmitterServer {
private static final Logger logger = LoggerFactory.getLogger(SseEmitterServer.class);
/**
* 当前连接数,不准确,如果用户多次刷新会导致偏少
*/
private static AtomicInteger count = new AtomicInteger(0);
/**
* 使用map对象,便于根据userId来获取对应的SseEmitter,或者放redis里面
*/
private static Map<String, SseEmitter> sseEmitterMap = new ConcurrentHashMap<>();
/**
* 创建用户连接并返回 SseEmitter
*
* @param userId 用户ID
* @return SseEmitter
*/
@GetMapping(path = "/subscribe/{userId}", produces = {MediaType.TEXT_EVENT_STREAM_VALUE})
public SseEmitter subscribe(@PathVariable("userId") String userId) throws IOException {
// 设置超时时间,0表示不过期。默认30秒,超过时间未完成会抛出异常:AsyncRequestTimeoutException
SseEmitter sseEmitter = new SseEmitter(30_000L);
// 设置前端的重试时间为15s,如果不加这个发送一下,前端就不会显示连接成功
sseEmitter.send("连接成功");
// 注册回调
sseEmitter.onCompletion(completionCallBack(userId));
sseEmitter.onError(errorCallBack(userId));
sseEmitter.onTimeout(timeoutCallBack(userId));
sseEmitterMap.put(userId, sseEmitter);
// 数量+1
count.getAndIncrement();
logger.info("创建新的sse连接,当前用户:{}", userId);
return sseEmitter;
}
/**
* 给指定用户发送信息 -- 单播
*/
@GetMapping(path = "sendMessage")
public void sendMessage(String userId, String message) {
if (sseEmitterMap.containsKey(userId)) {
try {
// sseEmitterMap.get(userId).send(message, MediaType.APPLICATION_JSON);
System.out.println(userId + "==" + message);
sseEmitterMap.get(userId).send(message);
} catch (IOException e) {
logger.error("用户[{}]推送异常:{}", userId, e.getMessage());
removeUser(userId);
}
}
}
/**
* 向多人发布消息 -- 组播
* @param groupId 开头标识
* @param message 消息内容
*/
public void groupSendMessage(String groupId, String message) {
if (MapUtils.isNotEmpty(sseEmitterMap)) {
/*Set ids = sseEmitterMap.keySet().stream().filter(m -> m.startsWith(groupId)).collect(Collectors.toSet());
batchSendMessage(message, ids);*/
sseEmitterMap.forEach((k, v) -> {
try {
if (k.startsWith(groupId)) {
v.send(message, MediaType.APPLICATION_JSON);
}
} catch (IOException e) {
logger.error("用户[{}]推送异常:{}", k, e.getMessage());
removeUser(k);
}
});
}
}
/**
* 群发所有人 -- 广播
*/
public void batchSendMessage(String message) {
sseEmitterMap.forEach((k, v) -> {
try {
v.send(message, MediaType.APPLICATION_JSON);
} catch (IOException e) {
logger.error("用户[{}]推送异常:{}", k, e.getMessage());
removeUser(k);
}
});
}
/**
* 群发消息
*/
public void batchSendMessage(String message, Set<String> ids) {
ids.forEach(userId -> sendMessage(userId, message));
}
/**
* 移除用户连接
*/
@GetMapping("/close/{userId}")
public void removeUser(@PathVariable("userId") String userId) {
sseEmitterMap.remove(userId);
// 数量-1
count.getAndDecrement();
logger.info("移除用户:{}", userId);
}
/**
* 获取当前连接信息
*/
public List<String> getIds() {
return new ArrayList<>(sseEmitterMap.keySet());
}
/**
* 获取当前连接数量
*/
@GetMapping("/getUserCount")
public int getUserCount() {
return count.intValue();
}
private Runnable completionCallBack(String userId) {
return () -> {
logger.info("结束连接:{}", userId);
// removeUser(userId);
};
}
private Runnable timeoutCallBack(String userId) {
return () -> {
logger.info("连接超时:{}", userId);
removeUser(userId);
};
}
private Consumer<Throwable> errorCallBack(String userId) {
return throwable -> {
logger.info("连接异常:{}", userId);
removeUser(userId);
};
}
}
前端代码
DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>SseEmittertitle>
head>
<body>
<button onclick="closeSse()">关闭连接button>
<div id="message">div>
body>
<script>
let source = null;
// 用时间戳模拟登录用户
const userId = new Date().getTime();
if (window.EventSource) {
// 建立连接
source = new EventSource('http://localhost:8080/sse/subscribe/' + userId);
setMessageInnerHTML("连接用户=" + userId);
/**
* 连接一旦建立,就会触发open事件
* 另一种写法:source.onopen = function (event) {}
*/
source.addEventListener('open', function(e) {
setMessageInnerHTML("建立连接。。。");
}, false);
/**
* 客户端收到服务器发来的数据
* 另一种写法:source.onmessage = function (event) {}
*/
source.addEventListener('message', function(e) {
setMessageInnerHTML(e.data);
});
/**
* 如果发生通信错误(比如连接中断),就会触发error事件
* 或者:
* 另一种写法:source.onerror = function (event) {}
*/
source.addEventListener('error', function(e) {
if (e.readyState === EventSource.CLOSED) {
setMessageInnerHTML("连接关闭");
} else if (e.target.readyState === EventSource.CONNECTING) {
console.log('Connecting...');
}else {
console.log(e);
}
}, false);
} else {
setMessageInnerHTML("你的浏览器不支持SSE");
}
// 监听窗口关闭事件,主动去关闭sse连接,如果服务端设置永不过期,浏览器关闭后手动清理服务端数据
window.onbeforeunload = function() {
closeSse();
};
// 关闭Sse连接
function closeSse() {
source.close();
const httpRequest = new XMLHttpRequest();
httpRequest.open('GET', 'http://localhost:8080/sse/close/' + userId, true);
httpRequest.send();
console.log("close");
}
// 将消息显示在网页上
function setMessageInnerHTML(innerHTML) {
document.getElementById('message').innerHTML += innerHTML + '
';
}
script>
html>
https://zhuanlan.zhihu.com/p/444011262