• Springboot+Websocket+JWT实现的即时通讯模块


    场景

    目前做了一个接口:邀请用户成为某课程的管理员,于是我感觉有能在用户被邀请之后能有个立马通知他本人的机(类似微博、朋友圈被点赞后就有立马能收到通知一样),于是就琢磨琢磨搞了一套。

    涉及技术栈

    • Springboot
    • Websocket 协议
    • JWT
    • (非必要)RabbitMQ 消息中间件

    Websocket 协议

    ⭐推荐阅读:Websocket 协议简介

    WebSocket协议是基于TCP的一种新的网络协议。它实现了浏览器与服务器全双工(full-duplex)通信——允许服务器主动发送信息给客户端。
    image.png

    为什么使用Websocket?
    因为普通的http协议一个最大的问题就是:通信只能由客户端发起,服务器响应(半双工),而我们希望可以全双工通信。

    因此一句话总结就是:建立websocket(以下简称为ws)连接是为了让服务器主动向前端发消息,而无需等待前端的发起请求调用接口。

    业务逻辑

    我们现在有:

    • 用户A
    • 用户B
    • Springboot服务器
    • 场景:用户A调用接口邀请用户B成为课程成员
    • 涉及数据库MySQL的数据表:
      • course_member_invitation,记录课程邀请记录,其形式如下(忽略时间等列):
    id course_id account_id admin_id is_accepted bind_message_id
    邀请id 课程id 受邀用户id 邀请人id(因其本身为课程管理员) 受邀用户是否接受了邀请 绑定的消息id
    • course_message,记录消息记录,其形式如下(忽略时间等列):
    id type account_id source_id is_read is_ignored
    消息id 消息类型 收信人用户id 发信人用户id 是否已读 收信人是否忽略
    • (图中没有体现)course_message_type,记录消息类型,其形式如下
    id name description
    消息类型id 消息类型名称 描述
    • 涉及RabbitMQ(因不是重点,所以此处暂不讨论,最后一章叙述)

    rabbitMQ.png
    业务步骤主要涉及两个方法addCourseMemberInvitationsendMessage和一个组件CourseMemberInvitationListener,分别做:
    addCourseMemberInvitation:

    1. 用户A调用接口,邀请用户B成为某门课程的管理员
    2. Springboot服务器收到请求,将这一请求生成邀请记录、消息记录,写入下表:
      • course_member_invitation
      • course_message
    3. 写入DB后,调用sendMessage处理发送消息的业务。
    4. 将执行的结果返回给用户A

    sendMessage

    1. 将消息记录放入RabbitMQ中对应的消息队列。

    CourseMemberInvitationListener:

    1. 持续监听其绑定的消息队列
    2. 一旦消息队列中有新消息,就尝试通过ws连接发送消息。
      1. 用户B在线,则可发送。
      2. 否则,则消费掉该消息,待用户上线后从DB中读入。

    在Springboot中配置Websocket

    • pom.xml文件
    <!-- WebSocket相关 -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-websocket</artifactId>
    </dependency>
    
    • Websocket Server组件配置初步:com.xxxxx.course.webSocket.WebSocketServer
    /**
     * 进行前后端即时通信
     * https://blog.csdn.net/qq_33833327/article/details/105415393
     * session: https://www.codeleading.com/article/6950456772/
     * @author jojo
     */
    @ServerEndpoint(value = "/ws/{uid}",configurator = WebSocketConfig.class) //响应路径为 /ws/{uid} 的连接请求
    @Component
    public class WebSocketServer {
        /**
         * 静态变量,用来记录当前在线连接数。应该把它设计成线程安全的
         */
        private static int onlineCount = 0;
    
        /**
         * concurrent 包的线程安全Set,用来存放每个客户端对应的 myWebSocket对象
         * 根据 用户id 来获取对应的 WebSocketServer 示例
         */
        private static ConcurrentHashMap<String, WebSocketServer> webSocketMap = new ConcurrentHashMap<>();
    
        /**
         * 与某个客户端的连接会话,需要通过它来给客户端发送数据
         */
        private Session session;
    
        /**
         * 用户id
         */
        private String accountId ="";
    
        /**
         * logger
         */
        private static Logger LOGGER = LoggerUtil.getLogger();
    
    
        /**
         * 连接建立成功调用的方法
         *
         * @param session
         * @param uid 用户id
         */
        @OnOpen
        public void onOpen(Session session, @PathParam("uid") String uid) {
    
            this.session = session;
    
            //设置超时,同httpSession
            session.setMaxIdleTimeout(3600000);
    
            this.accountId = uid;
    
            //存储websocket连接,存在内存中,若有同一个用户同时在线,也会存,不会覆盖原有记录
            webSocketMap.put(accountId, this);
            LOGGER.info("webSocketMap -> " + JSON.toJSONString(webSocketMap.toString()));
    
            addOnlineCount(); // 在线数 +1
            LOGGER.info("有新窗口开始监听:" + accountId + ",当前在线人数为" + getOnlineCount());
    
            try {
                sendMessage(JSON.toJSONString("连接成功"));
            } catch (IOException e) {
                e.printStackTrace();
                throw new ApiException("websocket IO异常!!!!");
            }
    
        }
    
        /**
         * 关闭连接
         */
    
        @OnClose
        public void onClose() {
            if (webSocketMap.get(this.accountId) != null) {
                webSocketMap.remove(this.accountId);
                subOnlineCount(); // 人数 -1
                LOGGER.info("有一连接关闭,当前在线人数为:" + getOnlineCount());
            }
        }
    
        /**
         * 收到客户端消息后调用的方法
         * 这段代码尚未有在使用,可以先不看,在哪天有需求时再改写启用
        * @param message 客户端发送过来的消息
         * @param session
         */
        @OnMessage
        public void onMessage(String message, Session session) {
            LOGGER.info("收到来自用户 [" + this.accountId + "] 的信息:" + message);
    
            if (!StringTools.isNullOrEmpty(message)) {
                try {
                    // 解析发送的报文
                    JSONObject jsonObject = JSON.parseObject(message);
                    // 追加发送人(防窜改)
                    jsonObject.put("fromUserId", this.accountId);
                    String toUserId = jsonObject.getString("toUserId");
                    // 传送给对应 toUserId 用户的 WebSocket
                    if (!StringTools.isNullOrEmpty(toUserId) && webSocketMap.containsKey(toUserId)) {
                        webSocketMap.get(toUserId).sendMessage(jsonObject.toJSONString());
                    } else {
                        // 否则不在这个服务器上,发送到 MySQL 或者 Redis
                        LOGGER.info("请求的userId:" + toUserId + "不在该服务器上");
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
    
        }
    
        /**
         * @param session
         * @param error
         */
        @OnError
        public void onError(Session session, Throwable error) {
            LOGGER.error("用户错误:" + this.accountId + ",原因:" + error);
        }
    
        /**
         * 实现服务器主动推送
         *
         * @param message 消息字符串
         * @throws IOException
         */
        public void sendMessage(String message) throws IOException {
            //需要使用同步机制,否则多并发时会因阻塞而报错
            synchronized(this.session) {
                try {
                    this.session.getBasicRemote().sendText(message);
                } catch (IOException e) {
                    LOGGER.error("发送给用户 ["+this.accountId +"] 的消息出现错误",e.getMessage());
                    throw e;
                }
            }
        }
    
    
        /**
         * 点对点发送
         * 指定用户id
         * @param message 消息字符串
         * @param userId 目标用户id
         * @throws IOException
         */
        public static void sendInfo(String message, String userId) throws Exception {
    
            Iterator entrys = webSocketMap.entrySet().iterator();
            while (entrys.hasNext()) {
                Map.Entry entry = (Map.Entry) entrys.next();
                if (entry.getKey().toString().equals(userId)) {
                    webSocketMap.get(entry.getKey()).sendMessage(message);
                    LOGGER.info("发送消息到用户id为 [" + userId + "] ,消息:" + message);
                    return;
                }
            }
            //错误说明用户没有在线,不用记录log
            throw new Exception("用户没有在线");
        }
    
    
        private static synchronized int getOnlineCount() {
            return onlineCount;
        }
    
        private static synchronized void addOnlineCount() {
            WebSocketServer.onlineCount++;
        }
    
        private static synchronized void subOnlineCount() {
            WebSocketServer.onlineCount--;
        }
    }
    

    几点说明:

    • onOpen方法:服务器与前端建立ws连接成功时自动调用。
    • sendInfo方法:是服务器通过用户id向指定用户发送消息的方法,其为静态公有方法,因此可供各service调用。调用的例子:
    // WebSocket 通知前端
    try {
        //调用WebsocketServer向目标用户推送消息
        WebSocketServer.sendInfo(JSON.toJSONString(courseMemberInvitation),courseMemberInvitation.getAccountId().toString());
        LOGGER.info("send to "+courseMemberInvitation.getAccountId().toString());
    } 
    
    • @ServerEndpoint注解:
    @ServerEndpoint(value = "/ws/{uid}",configurator = WebSocketConfig.class) //响应路径为 /ws/{uid} 的连接请求
    

    这么注解之后,前端只用发起 ws://xxx.xxx:xxxx/ws/{uid} 即可开启ws连接(或者wss协议,增加TLS), 比如前端js代码这么写:

    <script>
        var socket;
    
    	/* 启动ws连接 */
        function openSocket() {
            if(typeof(WebSocket) == "undefined") {
                console.log("您的浏览器不支持WebSocket");
            }else{
                console.log("您的浏览器支持WebSocket");
                
                //实现化WebSocket对象,指定要连接的服务器地址与端口  建立连接
                var socketUrl="http://xxx.xxx.xxx:xxxx/ws/"+$("#uid").val();
                socketUrl=socketUrl.replace("https","ws").replace("http","ws"); //转换成ws协议
                console.log("正在连接:"+socketUrl);
                if(socket!=null){
                    socket.close();
                    socket=null;
                }
                socket = new WebSocket(socketUrl);
                
                /* websocket 基本方法 */
                
                //打开事件
                socket.onopen = function() {
                    console.log(new Date()+"websocket已打开,正在连接...");
                    //socket.send("这是来自客户端的消息" + location.href + new Date());
                };
                
                //获得消息事件
                socket.onmessage = function(msg) {
                    console.log(msg.data);
                    //发现消息进入    开始处理前端触发逻辑
                };
                
                //关闭事件
                socket.onclose = function() {
                    console.log(new Date()+"websocket已关闭,连接失败...");
                    //重新请求token
                };
                
                //发生了错误事件
                socket.onerror = function() {
                    console.log("websocket连接发生发生了错误");
                }
            }
        
        }
    
    	/* 发送消息 */
        function sendMessage() {
            if(typeof(WebSocket) == "undefined") {
                console.log("您的浏览器不支持WebSocket");
            }else {
                console.log("您的浏览器支持WebSocket");
                console.log('{"toUserId":"'+$("#toUserId").val()+'","contentText":"'+$("#contentText").val()+'"}');
                socket.send('{"toUserId":"'+$("#toUserId").val()+'","contentText":"'+$("#contentText").val()+'"}');
            }
        }
    </script>
    

    存在的问题

    一切看起来很顺利,我只要放个用户id进去,就可以想跟谁通讯就跟谁通讯咯!
    但设想一个场景, 我是小明,uid为250,我想找uid为520的小花聊天,理论上我只要发起ws://xxx.xxx:xxxx/ws/250请求与服务器连接,小花也发起ws://xxx.xxx:xxxx/ws/520与服务器建立ws连接,我们就能互发消息了吧!
    这时候出现了uid为1的小黄,他竟然想挖墙脚!?他竟然学过js,自己发了ws://xxx.xxx:xxxx/ws/520跟服务器建立ws连接,而小花根本不想和我发消息,所以实际上是小黄冒充了小花,把小花NTR了(实际上人家并不在乎😥),跟我愉快地聊天?!
    那怎么办啊?我怎么才能知道在跟我Websocket的究竟是美女小花还是黄毛小黄啊??!
    这就引入了JWT!

    JWT——JSON WEB TOKEN

    可以看到后端会响应/ws/{token}的连接请求,前端可以发/ws/{token}的连接请求,一开始写的时候看网上的都是用/ws/{userId}来建立该id的用户与服务器的ws连接,但这样的话可能就很不安全,无法保证使用某个id建立的ws确实就是真实用户发起的连接。(小花被小黄NTR的悲惨故事)
    所以在调研了很多公开的解决方案,看到有改用令牌(token)来建立ws连接的说法,同时验证用户身份(事实上一些其他接口也可以用令牌(token)来保证接口安全性),于是打算自己试试看,未必是最好的,甚至可能有点头痛医头,脚痛医脚,但总归是个经验,记录一下。。

    //Websocket Server
    @ServerEndpoint(value = "/ws/{token}",configurator = WebSocketConfig.class) //响应路径为 /ws/{token} 的连接请求
    @Component
    public class WebSocketServer {
        ...
    }
    

    js:

    var socketUrl="http://xxx.xxx.xxx.xxx:xxxx/ws/"+$("#token").val();
    socketUrl=socketUrl.replace("https","ws").replace("http","ws"); //转换成ws协议
    ....
    socket = new WebSocket(socketUrl);
    

    业务逻辑

    image.png

    为什么用JWT

    最初考虑的是用/ws/{userId}来建立ws连接,然后在后台拿session中的user来对比用户id,判断合法性。
    结果发现ws的session和http的session是不同的,还不好拿,可能得想办法把http的session存到redis或者DB(也可以存在内存中,只是可能又要消耗内存资源),在建立ws连接之前去拿出来验证合法性。后面查到了还有JWT这种好东西。

    JWT好在哪里?

    ⭐推荐阅读:什么是 JWT -- JSON WEB TOKEN

    我的总结:

    • token可以过期
    • 验证token可以不用存在redis或者DB或者内存,完全依赖算法即可
    • 只要从前端请求中拿到token,后端就可以根据封装好的算法验证这个token合不合法,有没有被篡改过(这点很重要),过期了没有
    • 可以将用户id、用户名等非敏感数据一同封装到token中,后端拿到token后可以解码拿到数据,只要这个token合法,这些发来的数据就是可信的(小黄就算自己发明了token也不作数),是没有被篡改的(小黄就算把我小花的token偷走把用户id改成自己的也没用,后台可以算出来被改过),可以建立ws连接,调用websocket server进行通讯。

      教程
      JWT教程

      整合到本项目中

    pom.xml

    <!-- JWT 相关     -->
    <dependency>
        <groupId>com.auth0</groupId>
        <artifactId>java-jwt</artifactId>
        <version>3.12.1</version>
    </dependency>
    
    <!--   base64     -->
    <dependency>
        <groupId>commons-codec</groupId>
        <artifactId>commons-codec</artifactId>
        <version>1.12</version>
    </dependency>
    

    token的前两个部分是由base64编码的,所以需要codec进行解码。

    实现一个JWT工具类

    目前基本当作工具使用
    com.xxxx.course.util.JWTUtil

    /**
     * @author jojo
     * JWT 令牌工具类
     */
    public class JWTUtil {
    
        /**
         * 默认本地密钥
         * @notice: 非常重要,请勿泄露
         */
        private static final String SECRET = "doyoulikevanyouxi?" //乱打的
    
        /**
         * 默认有效时间单位,为分钟
         */
        private static final int TIME_TYPE = Calendar.MINUTE;
    
        /**
         * 默认有效时间长度,同http Session时长,为60分钟
         */
        private static final int TIME_AMOUNT = 600;
    
        /**
         * 全自定生成令牌
         * @param payload payload部分
         * @param secret 本地密钥
         * @param timeType 时间类型:按Calender类中的常量传入:
         *         Calendar.YEAR;
         *         Calendar.MONTH;
         *         Calendar.HOUR;
         *         Calendar.MINUTE;
         *         Calendar.SECOND;等
         * @param expiredTime 过期时间,单位由 timeType 决定
         * @return 令牌
         */
        public static String generateToken(Map<String,String> payload,String secret,int timeType,int expiredTime){
            JWTCreator.Builder builder = JWT.create();
    
            //payload部分
            payload.forEach((k,v)->{
                builder.withClaim(k,v);
            });
            Calendar instance = Calendar.getInstance();
            instance.add(timeType,expiredTime);
    
            //设置超时时间
            builder.withExpiresAt(instance.getTime());
    
            //签名
            return builder.sign(Algorithm.HMAC256(secret)).toString();
        }
    
        /**
         * 生成token
         * @param payload payload部分
         * @return 令牌
         */
        public static String generateToken(Map<String,String> payload){
            return generateToken(payload,SECRET,TIME_TYPE,TIME_AMOUNT);
        }
    
    
        省略了重载方法....
            
    
        /**
         * 验证令牌合法性
         * @param token 令牌
         * @return
         */
        public static void verify(String token) {
            //如果有任何验证异常,此处都会抛出异常
            JWT.require(Algorithm.HMAC256(SECRET)).build().verify(token);
        }
    
        /**
         * 自定义密钥解析
         * @param token 令牌
         * @param secret 密钥
         * @return 结果
         */
        public static DecodedJWT parseToken(String token,String secret) {
            DecodedJWT decodedJWT = JWT.require(Algorithm.HMAC256(secret)).build().verify(token);
            return decodedJWT;
        }
    
        /**
         * 解析令牌
         * 当令牌不合法将抛出错误
         * @param token
         * @return
         */
        public static DecodedJWT parseToken(String token) {
            return parseToken(token,SECRET);
        }
    
        /**
         * 解析令牌获得payload,值为claims形式
         * @param token
         * @param secret
         * @return
         */
        public static Map<String,Claim> getPayloadClaims(String token,String secret){
            DecodedJWT decodedJWT = parseToken(token,secret);
            return decodedJWT.getClaims();
        }
    
        /**
         * 默认解析令牌获得payload,值为claims形式
         * @param token 令牌
         * @return
         */
        public static Map<String,Claim> getPayloadClaims(String token){
            return getPayloadClaims(token,SECRET);
        }
    
        /**
         * 解析令牌获得payload,值为String形式
         * @param token 令牌
         * @return
         */
        public static Map<String,String> getPayloadString(String token,String secret){
            Map<String, Claim> claims = getPayloadClaims(token,secret);
            Map<String,String> payload = new HashMap<>();
            claims.forEach((k,v)->{
                if("exp".equals(k)){
                    payload.put(k,v.asDate().toString());
                }
                else {
                    payload.put(k, v.asString());
                }
            });
    
            return payload;
        }
        /**
         * 默认解析令牌获得payload,值为String形式
         * @param token 令牌
         * @return
         */
        public static Map<String,String> getPayloadString(String token){
            return getPayloadString(token,SECRET);
        }
    
    
        /**
         * 通过用户实体生成令牌
         * @param user 用户实体
         * @return
         */
        public static String generateUserToken(Account user){
            return generateUserToken(user.getId());
        }
    
        /**
         * 通过用户id生成令牌
         * @param accountId 用户id
         * @return
         */
        public static String generateUserToken(Integer accountId){
            return generateUserToken(accountId.toString());
        }
    
    
            /**
             *  通过用户id生成令牌
             * @param accountId 用户id
             * @return
             */
        public static String generateUserToken(String accountId){
            Map<String,String> payload = new HashMap<>();
            payload.put("accountId",accountId);
            return generateToken(payload);
        }
    
        /**
         * 从令牌中解析出用户id
         * @param token 令牌
         * @return
         */
        public static String parseUserToken(String token){
            Map<String, String> payload = getPayloadString(token);
            return payload.get("accountId");
        }
    
    }
    

    调整登陆 service 中,登陆时返回一个token

    com.xxxx.course.service.impl.AccountServiceImpl

    public JSONObject login(){
        登陆成功...
        ...
        //生成并放入通信令牌token,令牌中带有用户id,用以鉴别身份
        String token = JWTUtil.generateUserToken(user);
        jsonObject.put("token",token);
        ...
        后续操作...
        return jsonObject; 
    }
    
    

    WebSocket 连接握手时进行身份验证

    之后前端只要携带token进行ws连接即可,写了个ws的配置类,继承了一个websocket连接的监听器ServerEndpointConfig.Configurator,进行token的验证。
    com.XXXXX.course.config.webSocket.WebSocketConfig

    /**
     * 开启 WebSocket 支持,进行前后端即时通讯
     * https://blog.csdn.net/qq_33833327/article/details/105415393
     * session配置:https://www.codeleading.com/article/6950456772/
     * @author jojo
     */
    @Configuration
    public class WebSocketConfig extends ServerEndpointConfig.Configurator implements WebSocketConfigurer {
    
        /**
         * logger
         */
        private static final Logger LOGGER = LoggerUtil.getLogger();
    
        /**
         * 监听websocket连接,处理握手前行为
         * @param sec
         * @param request
         * @param response
         */
        @Override
        public void modifyHandshake(ServerEndpointConfig sec, HandshakeRequest request, HandshakeResponse response) {
    
            String[] path = request.getRequestURI().getPath().split("/");
            String token = path[path.length-1];
    
            //todo 验证用户令牌是否有效
            try {
                JWTUtil.verify(token);
            } catch (Exception e) {
                LOGGER.info("拦截了非法连接",e.getMessage());
                return;
            }
            super.modifyHandshake(sec, request, response);
        }
    
    
        @Bean
        public ServerEndpointExporter serverEndpointExporter(){
            return new ServerEndpointExporter();
        }
    
        ...
    }
    

    这样,每次服务器建立ws连接前,都要验证token的合法性,仅仅通过JWTUtil.verify(token);即可!当token不合法,就会抛出异常。

    再配合重写websocket serveronOpen方法,应该就能进行身份可信的通信了!

    /**
         * 连接建立成功调用的方法
         *
         * @param session
         * @param token 用户令牌
         */
        @OnOpen
        public void onOpen(Session session, @PathParam("token") String token) {
    
            this.session = session;
            this.token = token;
    
            //设置超时,同httpSession
            session.setMaxIdleTimeout(3600000);
    
            //解析令牌,拿取用户信息
            Map<String, String> payload = JWTUtil.getPayloadString(token);
            String accountId = payload.get("accountId");
            this.accountId = accountId;
    
            //存储websocket连接,存在内存中,若有同一个用户同时在线,也会存,不会覆盖原有记录
            webSocketMap.put(accountId, this);
            LOGGER.info("webSocketMap -> " + JSON.toJSONString(webSocketMap.toString()));
    
            addOnlineCount(); // 在线数 +1
            LOGGER.info("有新窗口开始监听:" + accountId + ",当前在线人数为" + getOnlineCount());
            ...
    

    (非必须)RabbitMQ消息中间件

    教程

    为什么我要用RabbitMQ

    • 正经理由:
      • 可以将写DB与发送消息两件事情异步处理,这样响应会更快。
      • 未来可以拓展为集群
    • 真正理由:
      • 人傻
      • 闲的

    整合到本项目中

    pom.xml

    <!-- rabbitMQ 相关-->
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    

    RabbitMQ 配置类

    • com.xxxx.course.config.rabbitMQ.RabbitMQConfig
    /**
     * @author jojo
     */
    @Configuration
    public class RabbitMQConfig {
    	
        /**
        * 指定环境
        */
        @Value("${spring.profiles.active}")
        private String env; 
    
        /**
         * logger
         */
        public static final Logger LOGGER = LoggerUtil.getLogger();
    
        /**
         * 交换机名
         */
        public String MEMBER_INVITATION_EXCHANGE = RabbitMQConst.MEMBER_INVITATION_EXCHANGE;
    
    
        /**
         * 交换机队列
         */
        public String MEMBER_INVITATION_QUEUE = RabbitMQConst.MEMBER_INVITATION_QUEUE;
    
    
        /**
         * 声明 课程成员邀请消息 交换机
         * @return
         */
        @Bean("memberInvitationDirectExchange")
        public Exchange memberInvitationDirectExchange(){
            //根据项目环境起名,比如开发环境会带dev字样
            String exchangeName = RabbitUtil.generateRabbitName(env,MEMBER_INVITATION_EXCHANGE);
            return ExchangeBuilder.directExchange(exchangeName).durable(true).build();
        }
    	
        
    
        /**
         * 声明 课程成员邀请消息 队列
         * @return
         */
        @Bean("memberInvitationQueue")
        public Queue memberInvitationQueue(){
            //同上
            String queueName = RabbitUtil.generateRabbitName(env,MEMBER_INVITATION_QUEUE);
            return QueueBuilder.durable(queueName).build();
        }
    
        /**
         * 课程成员邀请消息的队列与交换机绑定
         * @param queue
         * @param exchange
         * @return
         */
        @Bean
        public Binding memberInvitationBinding(@Qualifier("memberInvitationQueue") Queue queue,@Qualifier("memberInvitationDirectExchange") Exchange exchange){
            String queueName = RabbitUtil.generateRabbitName(env,MEMBER_INVITATION_QUEUE);
            return BindingBuilder.bind(queue).to(exchange).with(queueName).noargs();
        }
        
        /**
        * Springboot启动时, 验证队列名根据环境命名正确
        */
        @Bean
        public void verify(){
            Queue memberInvitationQueue = SpringUtil.getBean("memberInvitationQueue", Queue.class);
            Exchange memberInvitationDirectExchange = SpringUtil.getBean("memberInvitationDirectExchange", Exchange.class);
    
            LOGGER.info("消息队列 ["+memberInvitationQueue.getName()+"] 创建成功");
            LOGGER.info("消息交换器 ["+memberInvitationDirectExchange.getName()+"] 创建成功");
    
            //放入映射中存储
            RabbitMQConst.QUEUE_MAP.put(MessageConst.MEMBER_INVITATION,memberInvitationQueue.getName());
            RabbitMQConst.EXCHANGE_MAP.put(MessageConst.MEMBER_INVITATION,memberInvitationDirectExchange.getName());
        }
    
        /**
         * 自定义messageConverter使得消息中携带的pojo序列化成json格式
         * @return
         */
        @Bean
        public MessageConverter messageConverter(){
            return new Jackson2JsonMessageConverter();
        }
    
    }
    

    项目运行后,在 RabbitMQ服务器 中就会出现刚刚注册的队列与交换器(图是旧的,没有体现根据环境命名队列,但是其实做到了):
    image.png

    课程管理成员邀请接口

    • com.xxxx.course.service.impl.CourseMemberInvitationServiceImpl
      • 首先在接口中注入操作rabbitMQ的Bean
    @Autowired
    RabbitTemplate rabbitTemplate;
    
    • 在课程管理员业务代码中加入向rabbitMQ发送消息的逻辑
      • CourseMemberInvitationServiceImpl
    @Override
    @Transactional(rollbackFor = Exception.class) //开启事务,以防万一
    public JSONObject addCourseMemberInvitation(Integer courseId, String username, String requestIp) {
    
        //检查课程是否存在
        courseService.hasCourse(courseId);
    
        //检查用户是否已加入课程平台
        accountService.hasAccount(username);
    
        /* 若存在则查看邀请记录是否已经存在 */
    
        //获取用户id
        Account account = accountService.getAccountByUsernameOrEmail(username);
    
        //检查用户名是否存在
        if(account==null){
            JSONObject result = new JSONObject();
            result.put(RESULT,FAILED);
            result.put(MSG,"用户不存在");
            return result;
        }
    
        Integer accountId = account.getId();
    
        //获得发出邀请人的id
        Account user = (Account) SecurityUtils.getSubject().getSession().getAttribute("user");
        Integer adminId = user.getId();
    
        //检查是否自己邀请自己,是则不再执行
        hasInvitedOneself(accountId,adminId);
    
        //检查是否已经邀请过,是则不再执行
        hasInvited(courseId,accountId,adminId);
    
        /* 若不存在则新建邀请记录 */
    
        CourseMemberInvitation courseMemberInvitation = new CourseMemberInvitation();
        courseMemberInvitation.setCourseId(courseId);
        courseMemberInvitation.setAccountId(accountId);
        courseMemberInvitation.setAdminId(adminId);
        courseMemberInvitation.setCreateTime(new Date());
        courseMemberInvitation.setCreateIp(requestIp);
    
        //新建消息
        CourseMessage courseMessage = courseMessageService.newMessage(MessageConst.MEMBER_INVITATION, accountId, adminId);
    
        //绑定邀请记录与消息记录
        courseMemberInvitation.setBindMessageId(courseMessage.getId());
    
        //插入数据库(这里用的是MybatisPlus)
        int insertResult = courseMemberInvitationDao.insert(courseMemberInvitation);
    
        //根据数据库插入返回值封装json
        JSONObject result = insertCourseMemberInvitationResult(insertResult, courseMemberInvitation);
    
        if(result.get(RESULT).equals(FAILED)){
            //若数据库操作没有成功,则直接返回json
            return result;
        }
    
    
        /* 发送消息 */
        courseMessageService.sendMessage(courseMessage);
    
        //根据插入情况返回json
        return result;
    }
    
    • courseMessageService中实现的sendMessage方法:
    @Autowired
    RabbitTemplate rabbitTemplate;
    
    @Override
    public void sendMessage(CourseMessage courseMessage) {
        //尝试发送
        //将消息放入rabbitMQ
        storeInRabbitMQ(courseMessage);
    }
    
    private void storeInRabbitMQ(CourseMessage courseMessage){
        //将消息放入rabbitMQ
        String exchangeName = (String) RabbitMQConst.EXCHANGE_MAP.get(courseMessage.getType());
        String routeKey = (String) RabbitMQConst.QUEUE_MAP.get(courseMessage.getType());
        try {
            //送到rabbitMQ队列中
            rabbitTemplate.convertAndSend(exchangeName,routeKey,courseMessage);
        }
        catch (Exception e){
            LOGGER.error("插入rabbitMQ失败",e);
        }
    }
    
    • com.xxxx.course.rabbitMQ.listener.CourseMemberInvitationListener

    该类是用以监听_课程成员邀请_消息的,即是在rabbitMQ服务器建立的member_invitation队列。

    /**
     * @author jojo
     */
    @Component
    public class CourseMemberInvitationListener {
    
        @Autowired
        MessageHandler messageHandler;
    
        /**
         * logger
         */
        public static final Logger LOGGER = LoggerUtil.getLogger();
    
        /**
         * spEL表达式
         * 一旦队列中有新消息,这个方法就会被触发
         */
        @RabbitListener(queues = "#{memberInvitationQueue.name}")
        public void listenCourseMemberInvitation(Message message){
            messageHandler.handleMessage(message);
        }
    
    
    }
    
    • com.xxxx.course.rabbitMQ.MessageHandler, 该类是用来处理监听事件的:
    @Service
    public class MessageHandler {
    
        @Autowired
        MessageConverter messageConverter;
    
        @Autowired
        RabbitTemplate rabbitTemplate;
    
        /**
         * logger
         */
        public static final Logger LOGGER = LoggerUtil.getLogger();
    
        /**
         * 队列消息处理业务
         * @param message
         */
        public void handleMessage(Message message){
            CourseMessage courseMessage = (CourseMessage) messageConverter.fromMessage(message);
    
            // WebSocket 通知前端
            try {
                //将消息发给指定用户
                WebSocketServer.sendInfo(JSON.toJSONString(courseMessage),courseMessage.getAccountId().toString());
            } catch (Exception e) {
                //消息存在数据库中了,待用户上线后再获取
                LOGGER.info("发送消息id为 ["+courseMessage.getId()+"] 的消息给->消息待收方id为 ["+courseMessage.getAccountId().toString()+"] 的用户,但其不在线上。");
            }
        }
    }
    

    这样做应该就可以用RabbitMQ了。

    为什么不用HttpSession来验证用户真实性?

    如果要验证一个用户的真实性,为什么不直接用HttpSession来验证?比如在登录时,将用户的id存入HttpSession中,然后在Websocket握手连接之前,验明前端通过/ws/{userId}中的userId与HttpSession中的用户id进行比较就行了

    这属实是灵魂拷问了

    经过自己实验之后,发现按照很多方案,都没办法在ws连接的时候中,拿到HttpSession,比如在WebSocketConfig中按如下代码:

    	@Override
    20     public void modifyHandshake(ServerEndpointConfig sec, HandshakeRequest request, HandshakeResponse response) {
    21         ...
        	   //尝试获取HttpSession
        	   HttpSession httpSession = (HttpSession)request.getHttpSession();
    22         sec.getUserProperties().put(HttpSession.class.getName(),httpSession);
        	   ...
    23     }
    

    调试一番发现httpSession都是null,于是跟前端佬聊了聊,得到结论是wshttp连接是两个连接,当然拿不到啦
    哎,我太菜了,就先这么认为好了

    那或者能否把sessionId对应的userid存数据库(mysql或者redis),每次ws连接时再验证?
    那就不如JWT了,JWT不需要存储。

    更理想的情况是,我能在WebSocketServer这个模块中拿到httpSession的实体,这样子验证用户真实性就很好做了,但是暂时还没找到方法,需要再研究研究。
    暂时想到这些,如果有更好的做法会再更新

    总结

    本文的难点是ws的认证问题,虽然用超级好用的JWT解决了,但是随之而来的还有很多问题,比如:

    1. 注销登录等场景下 token 还有效
      与之类似的具体相关场景有:
      1. 退出登录;
      2. 修改密码;
      3. 服务端修改了某个用户具有的权限或者角色;
      4. 用户的帐户被删除/暂停。
      5. 用户由管理员注销;
    2. token 的续签问题
      token 有效期一般都建议设置的不太长,那么 token 过期后如何认证,如何实现动态刷新 token,避免用户经常需要重新登录?

    这些问题还是有待解决是😂
    本文就当记录一下自己的胡作非为吧😂

    总之,至少小花再也不怕被小黄NTR了

  • 相关阅读:
    Vscode中注释变成繁体的解决方法
    【去除若依首页】有些小项目不需要首页,去除方法
    学习笔记(15)跨域
    简述信息都有哪些特征?
    (pytorch进阶之路)IDDPM之diffusion实现
    [附源码]计算机毕业设计springboot文具商城购物系统
    Html将Json对象在页面结构化显示与Json文件生成下载
    【深入理解Kotlin协程】协程作用域、启动模式、调度器、异常和取消【使用篇】
    猿创征文|九、BLE蓝牙通信流程(建立连接,广播,扫描,断开连接)
    Vue---8种组件传值方式总结,总有一款适合你
  • 原文地址:https://www.cnblogs.com/excelsiorly/p/15900410.html