• Sprint Cloud Stream整合RocketMq和websocket实现消息发布订阅


    1.引入RocketMQ依赖:首先,在pom.xml文件中添加RocketMQ的依赖:

    1. <dependency>
    2. <groupId>org.apache.rocketmq</groupId>
    3. <artifactId>rocketmq-spring-boot-starter</artifactId>
    4. <version>2.2.0</version> <!-- 版本号根据实际情况调整 -->
    5. </dependency>

    2.配置RocketMQ连接信息:在application.propertiesapplication.yml中配置RocketMQ的连接信息,包括Name Server地址等:

    1. spring:
    2. application:
    3. name: ${sn.publish}
    4. cloud:
    5. stream:
    6. rocketmq:
    7. binder:
    8. name-server: ${rocket-mq.name-server}
    9. bindings:
    10. output:
    11. producer:
    12. group: testSocket
    13. sync: true
    14. bindings:
    15. output:
    16. destination: test-topic
    17. content-type: application/json

    3.消息发布组件

    1. @Component
    2. public class MqSourceComponent {
    3. @Resource
    4. Source source;
    5. public void publishNotify(SampleNotifyDTO notify) {
    6. source.output().send(MessageBuilder.withPayload(notify).build());
    7. }
    8. }

    4.消息发布控制器

    1. @RestController
    2. @Api(tags = "rocketmq")
    3. public class MqController {
    4. @Resource
    5. MqSourceComponent mq;
    6. @ApiOperation(value = "测试发布消息")
    7. @PostMapping("test-publish")
    8. public JsonVO testSend(SampleNotifyDTO notify) {
    9. mq.publishNotify(notify);
    10. return JsonVO.success("消息已发送");
    11. }
    12. }

    项目结构:

    接下来是websocket模块的搭建

    1. 依赖添加

    1. org.apache.rocketmq
    2. rocketmq-spring-boot-starter
    3. 2.2.0

    2.application.yml配置文件

    1. server:
    2. port: ${sp.ws}
    3. spring:
    4. application:
    5. name: ${sn.ws}
    6. cloud:
    7. stream:
    8. rocketmq:
    9. binder:
    10. name-server: ${rocket-mq.name-server}
    11. bindings:
    12. input:
    13. destination: test-topic
    14. content-type: application/json
    15. group: testSocket

    3.将应用程序绑定到消息代理

    @EnableBinding(Sink.class): 这是Spring Cloud Stream的注解,它用于将应用程序绑定到消息代理(如Kafka、RabbitMQ等)。Sink.class是Spring Cloud Stream提供的预定义输入通道,允许你接收消息。通过这个注解,你的应用程序可以监听消息通道,并定义消息处理逻辑。

    1. @SpringBootApplication
    2. @EnableDiscoveryClient
    3. @EnableBinding(Sink.class)
    4. public class WsApplication {
    5. public static void main(String[] args) {
    6. SpringApplication.run(WsApplication.class, args);
    7. }
    8. }

    4.消息订阅组件

    监听消息通道中的消息,一旦有消息到达,就会触发listenNotify方法,该方法负责处理消息并通过chat服务发送响应。

    1. @Component
    2. @Slf4j
    3. public class MqListenComponent {
    4. @Resource
    5. ChatService chat;
    6. @StreamListener(Sink.INPUT)
    7. public void listenNotify(SampleNotifyDTO notify) {
    8. log.info(notify.toString());
    9. chat.sendMessage(notify.getClientId(), notify);
    10. }
    11. }

    5.消息通知服务

    1. package com.zeroone.star.ws.service;
    2. import cn.hutool.json.JSONUtil;
    3. import lombok.SneakyThrows;
    4. import org.springframework.stereotype.Component;
    5. import javax.websocket.*;
    6. import javax.websocket.server.ServerEndpoint;
    7. import java.io.IOException;
    8. import java.util.concurrent.ConcurrentHashMap;
    9. @Component
    10. @ServerEndpoint("/chat")
    11. public class ChatService {
    12. /**
    13. * 连接会话池
    14. */
    15. private static ConcurrentHashMap SESSION_POOL = new ConcurrentHashMap<>();
    16. @OnOpen
    17. public void onOpen(Session session) throws IOException {
    18. // 判断客户端对象是否存在
    19. if (SESSION_POOL.containsKey(session.getQueryString())) {
    20. CloseReason closeReason = new CloseReason(CloseReason.CloseCodes.CANNOT_ACCEPT, "ID冲突,连接拒绝");
    21. session.getUserProperties().put("reason", closeReason);
    22. session.close();
    23. return;
    24. }
    25. // 将客户端对象存储到会话池
    26. SESSION_POOL.put(session.getQueryString(), session);
    27. System.out.println("客户端(" + session.getQueryString() + "):开启了连接");
    28. }
    29. @OnMessage
    30. public String onMessage(String msg, Session session) throws IOException {
    31. // 解析消息 ==> ID::消息内容
    32. String[] msgArr = msg.split("::", 2);
    33. // 处理群发消息,ID==all表示群发
    34. if ("all".equalsIgnoreCase(msgArr[0])) {
    35. for (Session one : SESSION_POOL.values()) {
    36. // 排除自己
    37. if (one == session) {
    38. continue;
    39. }
    40. // 发送消息
    41. one.getBasicRemote().sendText(msgArr[1]);
    42. }
    43. }
    44. // 指定发送
    45. else {
    46. // 获取接收方
    47. Session target = SESSION_POOL.get(msgArr[0]);
    48. if (target != null) {
    49. target.getBasicRemote().sendText(msgArr[1]);
    50. }
    51. }
    52. return session.getQueryString() + ":消息发送成功";
    53. }
    54. @OnClose
    55. public void onClose(Session session) {
    56. // 连接拒绝关闭会话
    57. Object reason = session.getUserProperties().get("reason");
    58. if (reason instanceof CloseReason) {
    59. CloseReason creason = (CloseReason) reason;
    60. if (creason.getCloseCode() == CloseReason.CloseCodes.CANNOT_ACCEPT) {
    61. System.out.println("拒绝客户(" + session.getQueryString() + "):关闭连接");
    62. return;
    63. }
    64. }
    65. // 从会话池中移除会话
    66. SESSION_POOL.remove(session.getQueryString());
    67. System.out.println("客户端(" + session.getQueryString() + "):关闭连接");
    68. }
    69. @OnError
    70. public void onError(Session session, Throwable throwable) {
    71. System.out.println("客户端(" + session.getQueryString() + ")错误信息:" + throwable.getMessage());
    72. }
    73. @SneakyThrows
    74. public void sendMessage(String id, Object message) {
    75. // 群发
    76. if ("all".equalsIgnoreCase(id)) {
    77. for (Session one : SESSION_POOL.values()) {
    78. // 发送消息
    79. one.getBasicRemote().sendText(JSONUtil.toJsonStr(message));
    80. }
    81. }
    82. // 指定发送
    83. else {
    84. // 获取接收方
    85. Session target = SESSION_POOL.get(id);
    86. if (target != null) {
    87. target.getBasicRemote().sendText(JSONUtil.toJsonStr(message));
    88. }
    89. }
    90. }
    91. }

    项目结构:

  • 相关阅读:
    Leetcode 76. 最小覆盖子串
    ChatGPT聊图像超分
    【DS基础】栈与队列
    Android Studio初学者实例:RecyclerView学习--模仿今日头条--续
    web前端期末大作业——基于Bootstrap响应式汽车经销商4S店官网21页
    MapReduce序列化【用户流量使用统计】
    【Java基础】注释、关键字、常量及数据类型
    TensorFlow 的基本概念和使用场景
    【SpringBoot】Spring常用注解总结
    Kafka环境搭建与相关启动命令
  • 原文地址:https://blog.csdn.net/Candy___i/article/details/134083129