一篇文章理解WebSocket原理
一文搞懂四种 WebSocket 使用方式
Spring Boot 中的 WebSocketSession 是什么,原理,如何使用
HTTP是客户端向服务器发起请求,服务器返回响应给客户端的一种模式。
特点:
1.只能是客户端向服务器发起请求,是单向的。
2.服务器不能主动发送数据给客户端。
半双工通信的局限性也从中体现出来,同一时刻数据的传输只能是单向的,想在某一段时间内监听服务器是否有新数据的更新就要不停的从客户端这边发起请求,如果服务器有数据更新那么就会返回响应。那么这种做法是特别消耗性能的,想到一种更优的办法就是监听服务器如果有数据改变就立刻返回响应,不需要客户端一直不停的请求。
举个例子,HTTP协议就是,小明要去超市买薯片,老板说没有,过了一会小明又跑来超市买薯片,老板还是说没有,这样反反复复过了很多次,超市进货的薯片终于到了,小明也拿到薯片了。这样感觉是不是特别麻烦呢?如果使用WebSocket协议就是,小明把他的电话和地址给了超市老板,当超市进货的薯片到了后,老板第一时间给小明打电话告诉他薯片到了,小明可以自己来拿,也可以超市老板送货上门。这样是不是就更省时更省事呢?
WbeSocket 是 Html5 开始提供的一种浏览器与服务器之间进行全双工通信的协议(websocket协议本质上是一个基于tcp的协议),它实现了浏览器与服务器全双工通信,能更好的节省服务器资源和带宽并达到实时通讯的目的,属于应用层,基于TCP协议,并且复用HTTP握手通道,是一个持久化的协议
简单来说,建立一个Websocket连接,客户端浏览器首先要向服务器发起一个HTTP请求,这个请求头中包含了特殊的"Upgrade: WebSocket"信息表明这是一个从HTTP升级到WebSocket的请求,服务器解析之后返回响应给客户端并建立了WebSocket连接。


相同点:
都是基于TCP协议的,都是可靠性传输协议。
都是应用层协议
不同点:
WebSocket是全双工通信协议,模拟Socket协议,可以双向发送或接收信息。
HTTP是单向通信的。
WebSocket是需要浏览器和服务器握手建立连接的。
HTTP是浏览器发请求向服务器的连接,而服务器则不会提前知道这个连接。
WebSocket在建立握手是,数据是通过HTTP传输的,但是建立了连接后,传输则不需要HTTP协议。
总体过程:
客户端发起HTTP请求,请过三次握手后与服务器建立TCP连接,HTTP请求中包含了WebSocket的版本号信息:Upgrade、Connection、WebSocket-Version等。
服务器接收到客户端的握手请求后,使用HTTP协议返回响应给客户端。
最后,客户端收到连接成功消息后,可以借助TCP传输协议和服务器进行全双工通信。
WebSocket约定了一个通信的规范,通过一个握手机制,将客户端与服务器端进行一个类似TCP的连接,实现了通信。
在使用WebSocket之前,客户端与服务器端的交互是基于HTTP协议的短连接或长连接。
WebSocket的协议名是"ws",是一种全新的协议,不属于HTTP无状态协议。
WebSocket和socket的区分:从本质上来说,socket并不是一个新的协议,它只是为了便于程序员进行网络编程而对tcp/ip协议族通信机制的一种封装。
事件 | 说明 |
open | 连接建立时触发 |
message | 客户端接收到服务器消息时触发 |
error | 通信出现错误时触发 |
close | 连接关闭时触发 |
send | 客户端给服务器发送数据 |
import org.java_websocket.WebSocket;
import org.java_websocket.handshake.ClientHandshake;
import org.java_websocket.server.WebSocketServer;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.InetSocketAddress;
public class SocketServer extends WebSocketServer {
public static void main(String[] args) throws InterruptedException, IOException {
int port = 8887; // 843 flash policy port
SocketServer s = new SocketServer(port);
s.start();
System.out.println("ChatServer started on port: " + s.getPort());
BufferedReader sysIn = new BufferedReader(new InputStreamReader(System.in));
while (true) {
String in = sysIn.readLine();
s.broadcast(in);
if (in.equals("exit")) {
s.stop(1000);
break;
}
}
}
public SocketServer(int port) {
super(new InetSocketAddress(port));
}
@Override
public void onOpen(WebSocket conn, ClientHandshake handshake) {
conn.send("Welcome to the server!"); // This method sends a message to the new client
broadcast("new connection: " + handshake
.getResourceDescriptor()); // This method sends a message to all clients connected
System.out.println(
conn.getRemoteSocketAddress().getAddress().getHostAddress() + " entered the room!");
}
@Override
public void onClose(WebSocket conn, int code, String reason, boolean remote) {
broadcast(conn + " has left the room!");
System.out.println(conn + " has left the room!");
}
@Override
public void onMessage(WebSocket conn, String message) {
broadcast(message);
System.out.println(conn + ": " + message);
}
@Override
public void onError(WebSocket conn, Exception ex) {
ex.printStackTrace();
if (conn != null) {
// some errors like port binding failed may not be assignable to a specific
// websocket
}
}
@Override
public void onStart() {
System.out.println("Server started!");
setConnectionLostTimeout(0);
setConnectionLostTimeout(100);
}
}
启动服务
http://www.websocket-test.com/
进入此网站,连接本地websokcet服务ws://127.0.0.1:8887


可以互相发送消息
WebSocketSession 是一个 WebSocket 连接的会话对象。每当客户端与服务器建立一个 WebSocket 连接时,服务器都会创建一个新的 WebSocketSession 对象。WebSocketSession 对象代表了服务器和客户端之间的一个持久连接,可以用来发送和接收消息。
WebSocketSession 接口定义了一组用于与客户端进行通信的方法。这些方法包括:
void sendMessage(TextMessage message):发送文本消息。
void sendMessage(BinaryMessage message):发送二进制消息。
void sendMessage(PongMessage message):发送 Pong 消息。
void close():关闭 WebSocket 连接。
boolean isOpen():检查 WebSocket 连接是否打开。
WebSocketSession 还提供了一些其他的方法,例如获取会话 ID、获取远程地址等。
在使用 WebSocketSession 之前,我们需要了解一些 WebSocket 的原理。
WebSocket 协议是一个基于 HTTP 的协议。在客户端和服务器建立 WebSocket 连接之前,客户端和服务器之间首先要建立一个普通的 HTTP 连接。当客户端发送一个包含 WebSocket 握手信息的 HTTP 请求时,服务器会将其升级为 WebSocket 连接。在升级完成后,客户端和服务器之间的通信就变成了基于 WebSocket 协议的双向通信。
在 Spring Boot 中,使用 WebSocketSession 进行通信的过程与上述原理类似。当客户端和服务器建立 WebSocket 连接时,服务器会创建一个新的 WebSocketSession 对象。客户端和服务器之间的通信就是通过这个 WebSocketSession 对象进行的。
在 Spring Boot 中使用 WebSocketSession 需要进行以下步骤:
添加依赖
首先,我们需要在项目中添加 Spring Boot 的 WebSocket 依赖。在 Maven 中,可以通过以下方式添加依赖:
<dependency>
<groupId>org.springframework.bootgroupId>
<artifactId>spring-boot-starter-websocketartifactId>
dependency>
接下来,我们需要配置 WebSocket。在 Spring Boot 中,可以通过实现 WebSocketConfigurer 接口来配置 WebSocket。WebSocketConfigurer 接口定义了一个 configureWebSocket 方法,我们可以在这个方法中注册 WebSocket 处理器和拦截器。
下面是一个示例 WebSocketConfigurer 的实现:
@Configuration
public class WebSocketConfig implements WebSocketConfigurer {
@Autowired
private WebSocketHandler webSocketMessageHandler;
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
registry.addHandler(webSocketMessageHandler, "/websocket")
.addInterceptors(new WebSocketInterceptor())
.setAllowedOrigins("*");
}
}
在上面的示例中,我们实现了 WebSocketConfigurer 接口,并注册了一个 WebSocket 处理器。在 registerWebSocketHandlers 方法中,我们调用了 addHandler 方法来注册 WebSocket 处理器,并指定了 WebSocket 的路径。在这个示例中,WebSocket 的路径是 “/websocket”。setAllowedOrigins 方法用于设置允许的来源,这里设置为 “*” 表示允许所有来源。
接下来,我们需要实现 WebSocket 处理器。WebSocket 处理器负责处理客户端发送的消息,并向客户端发送响应消息。在 Spring Boot 中,可以通过实现 WebSocketHandler 接口来实现 WebSocket 处理器。
public interface WebSocketHandler {
/**
* Invoked after WebSocket negotiation has succeeded and the WebSocket connection is
* opened and ready for use.
* @throws Exception this method can handle or propagate exceptions; see class-level
* Javadoc for details.
*/
void afterConnectionEstablished(WebSocketSession session) throws Exception;
/**
* Invoked when a new WebSocket message arrives.
* @throws Exception this method can handle or propagate exceptions; see class-level
* Javadoc for details.
*/
void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception;
/**
* Handle an error from the underlying WebSocket message transport.
* @throws Exception this method can handle or propagate exceptions; see class-level
* Javadoc for details.
*/
void handleTransportError(WebSocketSession session, Throwable exception) throws Exception;
/**
* Invoked after the WebSocket connection has been closed by either side, or after a
* transport error has occurred. Although the session may technically still be open,
* depending on the underlying implementation, sending messages at this point is
* discouraged and most likely will not succeed.
* @throws Exception this method can handle or propagate exceptions; see class-level
* Javadoc for details.
*/
void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception;
/**
* Whether the WebSocketHandler handles partial messages. If this flag is set to
* {@code true} and the underlying WebSocket server supports partial messages,
* then a large WebSocket message, or one of an unknown size may be split and
* maybe received over multiple calls to
* {@link #handleMessage(WebSocketSession, WebSocketMessage)}. The flag
* {@link org.springframework.web.socket.WebSocketMessage#isLast()} indicates if
* the message is partial and whether it is the last part.
*/
boolean supportsPartialMessages();
}
afterConnectionEstablished:连接成功后调用。
handleMessage:处理发送来的消息。
handleTransportError: WS 连接出错时调用。
afterConnectionClosed:连接关闭后调用。
supportsPartialMessages:是否支持分片消息。
以上这几个方法重点可以来看一下 handleMessage 方法,handleMessage 方法中有一个 WebSocketMessage 参数,这也是一个接口,我们一般不直接使用这个接口而是使用它的实现类,它有以下几个实现类:
BinaryMessage:二进制消息体
TextMessage:文本消息体
PingMessage: Ping ****消息体
PongMessage: Pong ****消息体
但是由于 handleMessage 这个方法参数是WebSocketMessage,所以我们实际使用中可能需要判断一下当前来的消息具体是它的哪个子类,比如这样:
@Override
public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception {
if (message instanceof TextMessage) {
handleTextMessage(session, (TextMessage) message);
}
else if (message instanceof BinaryMessage) {
handleBinaryMessage(session, (BinaryMessage) message);
}
else if (message instanceof PongMessage) {
handlePongMessage(session, (PongMessage) message);
}
else {
throw new IllegalStateException("Unexpected WebSocket message type: " + message);
}
}
下面是一个示例 WebSocketHandler 的实现:
@Component
@Slf4j
public class WebSocketMessageHandler extends TextWebSocketHandler {
/**
* redis 订阅通道名
*/
public static final String CHANNEL_NAME = "msgRedisTopic";
/**
* userId字段名
*/
public static final String USER_ID = "userId";
/**
* 当前节点在线session
*/
protected static final Map<String, WebSocketSession> CLIENTS = new ConcurrentHashMap<>();
@Resource
private RedisTemplate<String, WebSocketMessageDto> redisTemplate;
/**
* 连接成功后调用
*/
@Override
public void afterConnectionEstablished(WebSocketSession session) {
String uid = String.valueOf(session.getAttributes().get(USER_ID));
CLIENTS.put(uid, session);
log.info("uri :" + session.getUri());
log.info("连接建立:uid{} ", uid);
log.info("当前连接服务器客户端数: {}", CLIENTS.size());
log.info("===================================");
}
/**
* 连接关闭后调用
*/
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) {
String uid = String.valueOf(session.getAttributes().get(USER_ID));
CLIENTS.remove(uid);
log.info("断开连接: uid{}", uid);
log.info("当前连接服务器客户端数: {}", CLIENTS.size());
}
/**
* 处理发送来的消息
*/
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) throws IOException {
String payload = message.getPayload();
log.info("服务端收到消息:{}", payload);
boolean isValid = JSON.isValidObject(payload);
if (!isValid) {
log.info("服务端收到消息的数据格式不符合要求,要求是json格式");
return;
}
WebSocketMessageDto webSocketMessageDto = JSONUtil.toBean(payload, WebSocketMessageDto.class);
String toUid = webSocketMessageDto.getToUid();
if (CLIENTS.containsKey(toUid)) {
try {
log.info("当前ws服务器内包含客户端uid {},直接发送消息", toUid);
CLIENTS.get(toUid).sendMessage(new TextMessage("收到" + webSocketMessageDto.getSendUid() + "的信息:" + payload));
} catch (Exception e) {
log.error("发送消息给uid:{}失败", toUid, e);
}
} else {
log.warn("当前ws服务器内未找到客户端uid {},推送到redis", toUid);
// 向指定频道发布消息
redisTemplate.convertAndSend(CHANNEL_NAME, webSocketMessageDto);
}
}
}
在上面的示例中,我们将所有连接到服务器的 WebSocketSession 对象保存到一个列表中。
我们实现了 WebSocketHandler 接口,并重写了其中的几个方法。afterConnectionEstablished 方法在建立 WebSocket 连接后被调用,可以在这个方法中进行一些初始化操作。handleMessage 方法用于处理客户端发送的消息,并向客户端发送响应消息。handleTransportError 方法在 WebSocket 传输发生错误时被调用。afterConnectionClosed 方法在 WebSocket 连接关闭后被调用,可以在这个方法中进行一些清理操作。supportsPartialMessages 方法用于设置是否支持部分消息传输。
总结
WebSocketSession 是 Spring Boot 中用于与客户端进行 WebSocket 通信的核心概念。在使用 WebSocketSession 时,我们需要先添加 Spring Boot 的 WebSocket 依赖,然后配置 WebSocket,并实现一个 WebSocket 处理器,最后在处理器中使用 WebSocketSession 进行通信。客户端也可以使用 WebSocketSession 进行通信,非常简单。
总的来说,WebSocketSession 是实现 WebSocket 通信的关键。它提供了一组用于与客户端进行通信的方法,可以用来发送和接收消息。在 Spring Boot 中,使用 WebSocketSession 进行通信非常方便,只需要实现一个 WebSocket 处理器,并使用 WebSocketSession 进行通信即可。
服务做了集群,不同客户端可能连接的服务端是不同的机器,如果按照原先写的代码发送消息就会失败,因为不在同一个服务内找不到另一个客户端,这时候发送消息就会失败,所以用到了redis发布订阅模式,订阅同一个频道的服务,都会收到消息,然后判断此客户在不在当前服务内,再就转发给他
@Data
@AllArgsConstructor
@NoArgsConstructor
public class WebSocketMessageDto implements Serializable {
public static void main(String[] args) {
WebSocketMessageDto webSocketMessageDto = new WebSocketMessageDto();
webSocketMessageDto.setToUid("3");
webSocketMessageDto.setSendUid("1");
webSocketMessageDto.setMsg("牛逼");
System.out.println(JSON.toJSONString(webSocketMessageDto));
}
private static final long serialVersionUID = -4291728346293647762L;
private String toUid;
private String sendUid;
private String msg;
}
@Component
@Slf4j
public class WebSocketMessageHandler extends TextWebSocketHandler {
/**
* redis 订阅通道名
*/
public static final String CHANNEL_NAME = "msgRedisTopic";
/**
* userId字段名
*/
public static final String USER_ID = "userId";
/**
* 当前节点在线session
*/
protected static final Map<String, WebSocketSession> CLIENTS = new ConcurrentHashMap<>();
@Resource
private RedisTemplate<String, WebSocketMessageDto> redisTemplate;
/**
* 连接成功后调用
*/
@Override
public void afterConnectionEstablished(WebSocketSession session) {
String uid = String.valueOf(session.getAttributes().get(USER_ID));
CLIENTS.put(uid, session);
log.info("uri :" + session.getUri());
log.info("连接建立:uid{} ", uid);
log.info("当前连接服务器客户端数: {}", CLIENTS.size());
log.info("===================================");
}
/**
* 连接关闭后调用
*/
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) {
String uid = String.valueOf(session.getAttributes().get(USER_ID));
CLIENTS.remove(uid);
log.info("断开连接: uid{}", uid);
log.info("当前连接服务器客户端数: {}", CLIENTS.size());
}
/**
* 处理发送来的消息
*/
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) throws IOException {
String payload = message.getPayload();
log.info("服务端收到消息:{}", payload);
boolean isValid = JSON.isValidObject(payload);
if (!isValid) {
log.info("服务端收到消息的数据格式不符合要求,要求是json格式");
return;
}
WebSocketMessageDto webSocketMessageDto = JSONUtil.toBean(payload, WebSocketMessageDto.class);
String toUid = webSocketMessageDto.getToUid();
if (CLIENTS.containsKey(toUid)) {
try {
log.info("当前ws服务器内包含客户端uid {},直接发送消息", toUid);
CLIENTS.get(toUid).sendMessage(new TextMessage("收到" + webSocketMessageDto.getSendUid() + "的信息:" + payload));
} catch (Exception e) {
log.error("发送消息给uid:{}失败", toUid, e);
}
} else {
log.warn("当前ws服务器内未找到客户端uid {},推送到redis", toUid);
// 向指定频道发布消息
redisTemplate.convertAndSend(CHANNEL_NAME, webSocketMessageDto);
}
}
}
把我们实现的websocket消息处理器注册进来,并配置连接路径/websocket**
@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {
@Autowired
private WebSocketHandler webSocketMessageHandler;
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
registry.addHandler(webSocketMessageHandler, "/websocket")
.addInterceptors(new WebSocketInterceptor())
.setAllowedOrigins("*");
}
}
@Component
@Slf4j
public class WebSocketRedisMessageListener implements MessageListener {
@Resource
private RedisTemplate<String, Object> redisTemplate;
@Override
public void onMessage(Message message, byte[] bytes) {
try {
// 获取消息
byte[] messageBody = message.getBody();
TypeReference<WebSocketMessageDto> reference = new TypeReference<WebSocketMessageDto>() {
};
WebSocketMessageDto webSocketMessageDto = Convert.convert(reference, redisTemplate.getValueSerializer().deserialize(messageBody));
Map<String, WebSocketSession> onlineSessionMap = WebSocketMessageHandler.CLIENTS;
String toUid = webSocketMessageDto.getToUid();
if (onlineSessionMap.containsKey(toUid)) {
String sendUid = webSocketMessageDto.getSendUid();
String msg = webSocketMessageDto.getMsg();
log.info("redis监听消息,{} 收到 {} 的消息:{}", sendUid, toUid, msg);
onlineSessionMap.get(toUid).sendMessage(new TextMessage("收到" + sendUid + "的消息:" + msg));
}
} catch (IOException e) {
log.error("Redis监听消息失败", e);
}
}
}
@Configuration
@EnableCaching
public class RedisConfig extends CachingConfigurerSupport {
@Bean
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory connectionFactory) {
RedisTemplate<String, Object> template = new RedisTemplate<>();
template.setConnectionFactory(connectionFactory);
StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
// 使用Jackson2JsonRedisSerialize 替换默认序列化(默认采用的是JDK序列化)
Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<>(Object.class);
ObjectMapper om = new ObjectMapper();
om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
om.activateDefaultTyping(LaissezFaireSubTypeValidator.instance, ObjectMapper.DefaultTyping.NON_FINAL, JsonTypeInfo.As.WRAPPER_ARRAY);
jackson2JsonRedisSerializer.setObjectMapper(om);
template.setValueSerializer(jackson2JsonRedisSerializer);
// key 采用 String的序列化
template.setKeySerializer(stringRedisSerializer);
// hash 的key采用String的序列化
template.setHashKeySerializer(stringRedisSerializer);
// hash 的 value 采用 String 的序列化
template.setHashValueSerializer(jackson2JsonRedisSerializer);
template.afterPropertiesSet();
return template;
}
@Bean
public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory factory, WebSocketRedisMessageListener webSocketRedisMessageListener) {
RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer();
redisMessageListenerContainer.setConnectionFactory(factory);
// 订阅(多个)频道:将消息侦听器添加到(可能正在运行的)容器中。如果容器正在运行,侦听器会尽快开始接收(匹配)消息。
//参数1:消息监听器,参数2:消息频道
redisMessageListenerContainer.addMessageListener(webSocketRedisMessageListener, new ChannelTopic(WebSocketMessageHandler.CHANNEL_NAME));
//redisMessageListenerContainer.addMessageListener(webSocketRedisMessageListener2, new ChannelTopic(WebSocketMessageHandler.CHANNEL_NAME2));
return redisMessageListenerContainer;
}
}
业务逻辑是判断请求参数是否完整,获取userId,塞入attributes,在WebSocketMessageHandler的WebSocketSession.getAttributes().get(USER_ID)获取出来**
@Slf4j
@Component
public class WebSocketInterceptor implements HandshakeInterceptor {
/**
* 握手前
* @param attributes 如果该方法通过,可以在WebSocketHandler拿到这里设置的数据,org.springframework.web.socket.WebSocketSession#getAttributes()这个可以拿到这里设置的值
*/
@Override
public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) {
log.info("uid 握手开始");
// 获得请求参数
Map<String, String> paramMap = HttpUtil.decodeParamMap(request.getURI().getQuery(), Charset.defaultCharset());
String uid = paramMap.get(WebSocketMessageHandler.USER_ID);
if (CharSequenceUtil.isNotBlank(uid)) {
// 放入属性域
attributes.put(WebSocketMessageHandler.USER_ID, uid);
log.info("用户{}握手成功!", uid);
return true;
}
return false;
}
/**
* 握手后
*/
@Override
public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception exception) {
log.info("握手完成");
}
}
到此代码结束,下面是启动验证方式


http://www.websocket-test.com/
依旧进入此网站连接本地启动的websocket服务
其中1和11两个客户端连同一台服务
ws://127.0.0.1:8080/websocket?userId=1
ws://127.0.0.1:8080/websocket?userId=11
ws://127.0.0.1:8081/websocket?userId=2
ws://127.0.0.1:8082/websocket?userId=3



在1客户端发送消息给3

1和3连接的服务端不在同一个,通过redis发布,3所在服务端的redis监听消息,输出信息,然后发给3客户端
1服务端

3服务端输出redis监听的消息

3服务端收到发给3客户端
