1.引入RocketMQ依赖:首先,在pom.xml文件中添加RocketMQ的依赖:
- <dependency>
- <groupId>org.apache.rocketmq</groupId>
- <artifactId>rocketmq-spring-boot-starter</artifactId>
- <version>2.2.0</version> <!-- 版本号根据实际情况调整 -->
- </dependency>
2.配置RocketMQ连接信息:在application.properties或application.yml中配置RocketMQ的连接信息,包括Name Server地址等:
- spring:
- application:
- name: ${sn.publish}
- cloud:
- stream:
- rocketmq:
- binder:
- name-server: ${rocket-mq.name-server}
- bindings:
- output:
- producer:
- group: testSocket
- sync: true
- bindings:
- output:
- destination: test-topic
- content-type: application/json
3.消息发布组件
- @Component
- public class MqSourceComponent {
- @Resource
- Source source;
-
- public void publishNotify(SampleNotifyDTO notify) {
- source.output().send(MessageBuilder.withPayload(notify).build());
- }
- }
4.消息发布控制器
- @RestController
- @Api(tags = "rocketmq")
- public class MqController {
- @Resource
- MqSourceComponent mq;
-
- @ApiOperation(value = "测试发布消息")
- @PostMapping("test-publish")
- public JsonVO
testSend(SampleNotifyDTO notify) { - mq.publishNotify(notify);
- return JsonVO.success("消息已发送");
- }
- }
项目结构:

接下来是websocket模块的搭建
1. 依赖添加
-
org.apache.rocketmq -
rocketmq-spring-boot-starter -
2.2.0
2.application.yml配置文件
- server:
- port: ${sp.ws}
- spring:
- application:
- name: ${sn.ws}
- cloud:
- stream:
- rocketmq:
- binder:
- name-server: ${rocket-mq.name-server}
- bindings:
- input:
- destination: test-topic
- content-type: application/json
- group: testSocket
3.将应用程序绑定到消息代理
@EnableBinding(Sink.class): 这是Spring Cloud Stream的注解,它用于将应用程序绑定到消息代理(如Kafka、RabbitMQ等)。Sink.class是Spring Cloud Stream提供的预定义输入通道,允许你接收消息。通过这个注解,你的应用程序可以监听消息通道,并定义消息处理逻辑。
- @SpringBootApplication
- @EnableDiscoveryClient
- @EnableBinding(Sink.class)
- public class WsApplication {
-
- public static void main(String[] args) {
- SpringApplication.run(WsApplication.class, args);
- }
-
- }
4.消息订阅组件
监听消息通道中的消息,一旦有消息到达,就会触发listenNotify方法,该方法负责处理消息并通过chat服务发送响应。
- @Component
- @Slf4j
- public class MqListenComponent {
- @Resource
- ChatService chat;
-
- @StreamListener(Sink.INPUT)
- public void listenNotify(SampleNotifyDTO notify) {
- log.info(notify.toString());
- chat.sendMessage(notify.getClientId(), notify);
- }
- }
5.消息通知服务
- package com.zeroone.star.ws.service;
-
- import cn.hutool.json.JSONUtil;
- import lombok.SneakyThrows;
- import org.springframework.stereotype.Component;
-
- import javax.websocket.*;
- import javax.websocket.server.ServerEndpoint;
- import java.io.IOException;
- import java.util.concurrent.ConcurrentHashMap;
-
-
- @Component
- @ServerEndpoint("/chat")
- public class ChatService {
- /**
- * 连接会话池
- */
- private static ConcurrentHashMap
SESSION_POOL = new ConcurrentHashMap<>(); -
- @OnOpen
- public void onOpen(Session session) throws IOException {
- // 判断客户端对象是否存在
- if (SESSION_POOL.containsKey(session.getQueryString())) {
- CloseReason closeReason = new CloseReason(CloseReason.CloseCodes.CANNOT_ACCEPT, "ID冲突,连接拒绝");
- session.getUserProperties().put("reason", closeReason);
- session.close();
- return;
- }
- // 将客户端对象存储到会话池
- SESSION_POOL.put(session.getQueryString(), session);
- System.out.println("客户端(" + session.getQueryString() + "):开启了连接");
- }
-
- @OnMessage
- public String onMessage(String msg, Session session) throws IOException {
- // 解析消息 ==> ID::消息内容
- String[] msgArr = msg.split("::", 2);
- // 处理群发消息,ID==all表示群发
- if ("all".equalsIgnoreCase(msgArr[0])) {
- for (Session one : SESSION_POOL.values()) {
- // 排除自己
- if (one == session) {
- continue;
- }
- // 发送消息
- one.getBasicRemote().sendText(msgArr[1]);
- }
- }
- // 指定发送
- else {
- // 获取接收方
- Session target = SESSION_POOL.get(msgArr[0]);
- if (target != null) {
- target.getBasicRemote().sendText(msgArr[1]);
- }
- }
- return session.getQueryString() + ":消息发送成功";
- }
-
- @OnClose
- public void onClose(Session session) {
- // 连接拒绝关闭会话
- Object reason = session.getUserProperties().get("reason");
- if (reason instanceof CloseReason) {
- CloseReason creason = (CloseReason) reason;
- if (creason.getCloseCode() == CloseReason.CloseCodes.CANNOT_ACCEPT) {
- System.out.println("拒绝客户(" + session.getQueryString() + "):关闭连接");
- return;
- }
- }
- // 从会话池中移除会话
- SESSION_POOL.remove(session.getQueryString());
- System.out.println("客户端(" + session.getQueryString() + "):关闭连接");
- }
-
- @OnError
- public void onError(Session session, Throwable throwable) {
- System.out.println("客户端(" + session.getQueryString() + ")错误信息:" + throwable.getMessage());
- }
-
- @SneakyThrows
- public void sendMessage(String id, Object message) {
- // 群发
- if ("all".equalsIgnoreCase(id)) {
- for (Session one : SESSION_POOL.values()) {
- // 发送消息
- one.getBasicRemote().sendText(JSONUtil.toJsonStr(message));
- }
- }
- // 指定发送
- else {
- // 获取接收方
- Session target = SESSION_POOL.get(id);
- if (target != null) {
- target.getBasicRemote().sendText(JSONUtil.toJsonStr(message));
- }
- }
- }
- }
项目结构:
