• Spring Boot 2 中通过 WebSocket 发送 STOMP 消息


    描述

    在这篇博客中,我们将了解如何设置应用程序以通过 WebSocket 连接,发送和接收 STOMP 消息。我们将以 Spring Boot 2 为基础,因为它包含对 STOMP 和 WebSocket 的支持,并且它还提供了一个简单的消息代理

    连接 websocket

    根据 WebSocket RFC,WebSocket 协议支持在受控环境中运行不受信任的代码的客户端与选择加入来自该代码的通信的远程主机之间的双向通信。 那么,这句话是什么意思呢? 简而言之,WebSockets 在客户端和服务器之间创建了一个连接,允许在客户端和服务器之间来回发送消息,而无需重新打开连接或使用长轮询来获取来自服务器的更新。

    因此,一旦建立 WebSocket 连接,它就会保持打开状态并允许传输数据。 这是 WebSocket 相对于传统 HTTP 请求的一大好处,传统的 HTTP 请求需要重新打开连接并发送一堆标头和 cookie 数据才能成功完成请求,这种开销减少甚至消除。

    Data 通过 WebSocket 作为包含有效负载的消息传输,该有效负载由一个或多个帧组成。 帧由头和主体组成,其中头描述了帧和主体中的应用数据。 基于帧的系统将非有效负载数据减少到最低限度并显着减少延迟,从而允许以非常快速和可靠的方式传输数据。 这可以使 data 在快节奏的环境中平稳运行。

    为了建立 WebSocket 连接,一个传统的 HTTP 请求被发送到包含 “升级” 标头的服务器。 如果服务器支持该协议,它会通过在响应中添加 “升级” 标头来执行升级。 因此,socket 现在被打开并允许客户端和服务器之间的 “实时” 通信。 请注意,在此握手之后,初始 HTTP 连接将替换为 WebSocket 连接。 WebSocket URLS 分别使用 ws 和 wss 进行常规连接和安全连接。

    消息代理 (stomp broker)

    消息代理是一种调解应用程序之间通信的架构模式。

    消息代理最简单的可视化是中枢神经系统,它将子系统连接到一个大网络上。消息代理负责接收和发送消息,将它们路由到正确的目的地,并在标准消息协议之间转换消息,稍后会详细介绍。

    使用消息代理而不是直接连接系统的主要好处是数据流的处理不是由系统本身处理的。消息代理充当其他系统之间的中介,允许发送者在不知道接收者在哪里、有多少个接收者的情况下发送消息,甚至不必担心是否会收到消息。消息代理通过依赖存储和有序消息的消息队列来保证传递消息,直到这些消息被应用程序使用。

    已知的消息代理: Apache Kafka 和 RabbitMQ, RocketMQ

    通过 WebSocket 进行简单文本的消息传递

    WebSocket 是一种消息传递体系结构,不要求任何特定的消息传递协议。它将字节流转换为消息流。应用程序本身需要转换每条消息的含义。出于这个原因,WebSocket RFC 定义了子协议的使用,允许应用程序选择客户端和服务器都能转换的消息格式。这种格式可以是自定义的、某些特定的或标准的消息传递协议。

    标准消息传递协议的一个示例是 STOMP。 STOMP 是简单(或流式)文本的消息传递协议。它提供了一种可互操作的有线格式,以使客户端能够与消息代理进行通信,只要它们都识别 STOMP。因此,使用 STOMP 可以在多种语言、多平台和代理之间进行广泛的消息传递。

    STOMP 定义了一些映射到 WebSocket 帧的帧类型:

    • CONNECT:客户端用来向服务器发起流的帧
    • SUBSCRIBE:用于在给定目的地注册订阅的框架。发送到此目的地的消息将被路由到此目的地上的所有活动订阅
    • UNSUBSCRIBE:用于删除现有订阅的框架。删除后,客户端将不再是发送到目的地的消息的接收者;
    • ACK:用于确认订阅消息消费的帧,如果支持,则从消息队列中删除消息
    • SEND:用于将消息发送到消息传递系统中的特定目的地的帧。它由所需的标头 “destination” 组成,该标头指示将消息发送到何处;帧的主体是要发送的消息。 这些命令允许管理客户端和服务器之间的消息传输。注意,WebSocket 连接本身并不是由 STOMP 直接维护的,而是由 WebSocket 客户端本身维护的。因此,只要客户端和 / 或服务器不断开与 WebSocket 的连接,WebSocket 连接就会保持打开状态。 STOMP 客户端将通过 WebSocket 连接发送到服务器和客户端的指令来处理帧。

    Spring Boot 2 示例

    我们将使用 Spring Boot 2 为 WebSocket 握手 STOMP 端点,并在其上发送和接收消息。 Spring 的简单消息代理将用于处理消息的编码和解码以及管理客户端订阅。

    项目依赖

    1. <dependency>
    2. <groupId>org.springframework.boot</groupId>
    3. <artifactId>spring-boot-starter-websocket</artifactId>
    4. </dependency>

    服务端配置

    1. @Slf4j
    2. @Configuration
    3. @EnableWebSocketMessageBroker
    4. public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
    5. @Override
    6. public void configureMessageBroker(MessageBrokerRegistry config) {
    7. config
    8. .setApplicationDestinationPrefixes("/app")
    9. .setUserDestinationPrefix("/user")
    10. .enableStompBrokerRelay("/topic", "/queue")
    11. .setRelayHost("127.0.0.1")
    12. .setRelayPort(61613)
    13. .setSystemLogin("admin")
    14. .setSystemPasscode("a123456")
    15. .setClientLogin("admin")
    16. .setClientPasscode("a123456")
    17. .setSystemHeartbeatSendInterval(10000L)
    18. .setSystemHeartbeatReceiveInterval(10000L)
    19. ;
    20. }
    21. @Override
    22. public void registerStompEndpoints(StompEndpointRegistry registry) {
    23. // Not withSockJs. This will allow you to use ws://localhost:8080/test to establish websocket connection
    24. // withSockJS. This will allow you to use http://localhost:8080/test to establish websocket connection
    25. registry.addEndpoint("/ws")
    26. .setAllowedOriginPatterns("*")
    27. .withSockJS()
    28. .setHeartbeatTime(60_000L)
    29. .setInterceptors(new UserHandshakeInterceptor())
    30. ;
    31. }
    32. @Override
    33. public void configureWebSocketTransport(WebSocketTransportRegistration registration) {
    34. registration.setSendTimeLimit(15 * 1000)
    35. .setSendBufferSizeLimit(512 * 1024)
    36. .setMessageSizeLimit(128 * 1024)
    37. ;
    38. }
    39. @Override
    40. public void configureClientInboundChannel(ChannelRegistration registration) {
    41. registration.interceptors(new TokenChannelInterceptor());
    42. }
    43. }

    上述代码片段简单描述:

    @EnableWebSocketMessageBroker 将通过 WebSocket 连接启用代理支持的消息传递。

    registerStompEndpoints 允许将 HTTP URL 配置为 WebSocket 握手的端点。因此,端点 /ws 将可供客户端发送 HTTP 请求以将连接升级为 WebSocket 连接。

    configureMessageBroker 被配置为允许发生两件事:

    1. 用于订阅和广播的 Spring 内置消息代理以及目的地以 “/topic” 或 “/queue” 为前缀的消息的路由将被启用。topic 将用于路由公共消息,queue 的地用于私人消息。请注意,queue 目标并不要求严格用于私人消息,而 topic 则用于公共消息。此约定是可选的。
    2. 定义一个前缀应用程序。目的地以 “/app” 为前缀的消息被路由到使用 @MessageMapping 注释的 @Controller 方法。

    例如,如果客户端向 “/app/message” 发送消息,则将调用带有 “/message” 注释的控制器方法。

    处理消息

    可以添加一个示例来处理消息。 在下面的代码片段中,描述了一个示例处理路由到前缀为 “/app” 的目的地的消息。

    1. @Slf4j
    2. @Controller
    3. @RequiredArgsConstructor
    4. public class MessagingController {
    5. private final SimpMessagingTemplate simpMessagingTemplate;
    6. /**
    7. * [@SendTo("/topic/mural")]
    8. */
    9. @MessageMapping("/demo/message")
    10. public void send(@Payload DemoMessage message, Principal user) {
    11. log.info("===>message: {}", message);
    12. GlobalUserPrincipal userPrincipal = (GlobalUserPrincipal) user;
    13. log.info("===>user: {}", userPrincipal);
    14. }
    15. @MessageMapping("/chat/message")
    16. public void send(@Payload ChatMessage message, Principal user) {
    17. log.info("===>message: {}", message);
    18. GlobalUserPrincipal userPrincipal = (GlobalUserPrincipal) user;
    19. log.info("===>user: {}", userPrincipal);
    20. message.setTime(LocalDateTime.now());
    21. simpMessagingTemplate.convertAndSend("/topic/chat." + message.getTo(), message);
    22. }
    23. }

    消息流

    下图展示简单内置代理在应用程序中的消息流。 

    当通过 WebSocket 连接接收到消息时,它们被解码为 STOMP 帧并转换为 Spring-message-representations。然后将这些 Spring 消息发送到 channel 进行进一步处理。

    在 Spring 应用程序示例配置中,发送到目的地 “/topic” 和 “/queue” 的 STOMP 消息直接路由到消息代理,而 “/app” 消息路由到服务被 controllers 处理

    调用通过 @MessageMapping 注解方法,将处理 Spring 消息的有效负载,并通过代理通道向 SimpleBroker 发送 Spring 消息来响应 topic 和 queue 上的连接客户端。 SimpleBroker 将根据消息发送到的目的地确保正确的订阅将接收消息。

    在客户端通过向 http://localhost:8080/ws 发送带有 “Upgrade” 标头的 HTTP 请求建立 WebSocket 连接 

     示例:客户端向服务发送消息,服务向所有订阅的客户端广播回复,包括原始发送者。

    1. 客户端发送一个 SUBSCRIBE 帧,其目的标头为 “/topic/reply”。 收到并解码后,消息将被路由到消息代理,该代理将存储客户端订阅。
    2. 客户端发送一个 SEND 帧到 “/app/message” 目的地。 “/app” 前缀被去除,“/message” 将被 controller 中 @MessageMapping 接收
    3. 方法的返回值变成 Spring 消息的 payload,将发送到 “/topic/reply” 目的地的消息代理。
    4. 消息代理找到并向所有匹配的订阅发送消息。 Spring 消息被编码为 STOMP 帧并通过 WebSocket 连接发送。
  • 相关阅读:
    mybatis和mybatiplus中Error attempting to get column ‘xx‘ from result set
    MQTT协议消息代理服务公网远程连接
    Python 何时传的是值,何时传的是引用?
    BGP进阶:BGP 综合实验二
    jdk8 | Function<T,R>实践应用
    20230921研发面经总结
    深度学习入门(5) - RNN
    C语言学习之路(基础篇)—— 文件操作(上)
    jupyter使用tensorflow遇到的问题
    【数据库】形式化关系查询语言(一):关系代数Relational Algebra
  • 原文地址:https://blog.csdn.net/li6151770/article/details/126493904