• Spring定时任务+webSocket实现定时给指定用户发送消息


    生命无罪,健康万岁,我是laity。

    我曾七次鄙视自己的灵魂:

    第一次,当它本可进取时,却故作谦卑;

    第二次,当它在空虚时,用爱欲来填充;

    第三次,在困难和容易之间,它选择了容易;

    第四次,它犯了错,却借由别人也会犯错来宽慰自己;

    第五次,它自由软弱,却把它认为是生命的坚韧;

    第六次,当它鄙夷一张丑恶的嘴脸时,却不知那正是自己面具中的一副;

    第七次,它侧身于生活的污泥中,虽不甘心,却又畏首畏尾。

    Spring定时任务 + webSocket实现定时给指定用户发送消息:类似于消息中心;
    相信有需求的小伙伴读此文章可以有一定的帮助或者思路

    逻辑思路

    在做这个业务的时候也遇到了很多的坑,但是现在我帮你踩完了。

    • 使用Spring定时任务框架(@Scheduled)和websocket网络通信协议框架来进行定时向指定用户的客户端推送消息;
    • 编写并设置webSocketConfig配置文件(百度上的常规配置即可);
    • 基于@ServerEndpoint(value = “/websocket/{userId}”)注解标记路径并让前端进行连接服务器端
    • SocketServer中都是使用ConcurrentHashMap集合来进行接收和存储
      • 它是Java 5中引入了ConcurrentHashMap这个并发集合类
      • ConcurrentHashMap通过将一个大的数据结构分解为多个小的数据结构,然后对每个小的数据结构加锁来实现线程安全。这种方式称为分段锁。分段锁我记得是有问题的:性能不好(什么原因导致的有时间去查:查询热点数据),在JDK8时弃用后采用CAS和synchronized组合的方式来保证并发安全。
      • Java 8中的ConcurrentHashMap在内部实现上采用了数组+链表+红黑树的数据结构,也使得其具有较好的查找和遍历性能
      • 有兴趣的小伙伴可以去阅读下:一文看懂 jdk8 中的 ConcurrentHashMap,个人感觉写的很不错。
    • 集合clients用于记录连接的客户端(session)、key为生成的uuid;集合conn用于存放clients数据,key为用户唯一标识(id)。同时也充分使用了websocket的生命周期进行资源释放等操作。
    • 自定义方法sendMessageByUserId(Integer userId, String message),基于userId获取conn中的set集合,基于iterator进行集合迭代,使用hasNext()返回的Boolean进行判断是否继续进行循环遍历,.next()获取其中uuid数据(指针移动),通过uuid取出存在clients集合中的session进行发送数据
    • ===========================================以上是WebSocket实现思路
    • 基于@EnableScheduling注解开启定时任务,基于@Scheduled(cron = “……”)注解和cron表达式进行任务执行。
    • 业务逻辑:因不同的人业务不同,所以这里我省略掉了我的业务逻辑,有需求的小伙伴可以私信我进一步沟通;
    • 调用websocket的sendMessageUserById()方法来进行返回数据;

    代码实现

    依赖

    <dependency>
        <groupId>org.springframework.bootgroupId>
        <artifactId>spring-boot-starter-websocketartifactId>
        <version>2.7.0version>
    dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5

    webSocketServer.java

    /**
     * @Project: JavaLaity
     * @Description: 服务端
     */
    @Slf4j
    @Component
    @ServerEndpoint(value = "/websocket/{userId}")
    public class WebSocketServer {
    
        /**
         * concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象,若要实现服务器端与单一客户端通信的话,可以使用Map来存放,其中key可以为用户标识
         */
        private static final CopyOnWriteArraySet<WebSocketServer> webSocketSet = new CopyOnWriteArraySet<WebSocketServer>();
    
        // public static ThreadLocal clientUser = new ThreadLocal<>();
        /**
         * 用于记录连接的客户端 (sid,session)
         */
        public static Map<String, Session> clients = new ConcurrentHashMap<>();
        /**
         * 基于userId关联sid(用于解决同一用户id,在多个web端连接的问题)
         */
        public static Map<Integer, Set<String>> conn = new ConcurrentHashMap<>();
    
        /**
         * 与某个客户端的连接会话,需要通过它来给客户端发送数据
         */
        private Session session;
    
        private String sid = null;
    
    
        // 当WebSocket连接建立时,会调用标注有@OnOpen的方法
        @OnOpen
        public void onOpen(Session session, @PathParam("userId") Integer userId) {
            System.out.println("WebSocket opened: " + session.getId());
            this.session = session;
            this.sid = UUID.randomUUID().toString();
            clients.put(this.sid, session);
    
            Set<String> clientSet = conn.get(userId);
            if (clientSet == null) {
                clientSet = new HashSet<>();
                conn.put(userId, clientSet);
            }
            clientSet.add(this.sid);
            // 加入set中
            webSocketSet.add(this);
        }
    
        // 给所有当前连接的用户发送消息
        public void sendMessageAll(String message) {
            try {
                if (webSocketSet.size() != 0) {
                    for (WebSocketServer item : webSocketSet) {
                        if (item != null) {
                            // 同步锁,解决多线程下发送消息异常关闭
                            // 避免高并发下多处频繁调用sendMessage()方法发送消息而导致的webSocket挂掉,我们在sendMessage这个方法里面加入同步锁,
                            // 锁住session,这样就能保障webSocket有条不絮的向前端推送消息
                            synchronized (item.session) {
                                item.session.getBasicRemote().sendText(message);
                            }
                        }
                    }
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    
        /**
         * 根据用户ID发送给某一个用户
         */
        public void sendMessageByUserId(Integer userId, String message) {
            if (userId != null) {
                Set<String> clientSet = conn.get(userId);
                if (clientSet != null) {
                    Iterator<String> iterator = clientSet.iterator();
                    while (iterator.hasNext()) {
                        String sid = iterator.next();
                        Session session = clients.get(sid);
                        if (session != null) {
                            try {
                                session.getBasicRemote().sendText(message);
                            } catch (IOException e) {
                                e.printStackTrace();
                            }
                        }
                    }
                }
            }
        }
    
        // TODO 当浏览器关闭触发该事件
        @OnClose
        public void OnClose(Session session) {
            log.info(this.sid + "断开连接");
            clients.remove(this.sid);
            webSocketSet.remove(this);
        }
    
        // 当收到客户端(前端)发送的消息时,会调用@OnMessage方法
        @OnMessage
        public void OnMessage(String message, Session session) {
            System.out.println("Received message:" + message);
        }
    
        // TODO 发生错误时调用
        @OnError
        public void OnError(Session session, Throwable error) {
            System.out.println("发生错误!");
            error.printStackTrace();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114

    AideScheduleTask.java

    /**
     * @Project: JavaLaity
     * @Description: 实现业务逻辑的位置
     */
    @Configuration
    public class AideScheduleTask {
    
        @Autowired
        WebSocketServer webSocketServer;
    
        @Autowired
        RedisCache redisCache;
    
        // 测试
        @Scheduled(cron = "0/20 * * * * ?")
        private void testTask() throws Exception {
            // 使用ThreadLocal - 哈哈哈,弃用了,行不通!。
            // TODO: 你需要的业务逻辑
            SysUser sysUser = new SysUser().setId(1)
            if (sysUser.getId() != null) {
                webSocketServer.sendMessageByUserId(sysUser.getId(), "这个是老大可以发:role = ");
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    注意问题

    • 在写Corn表达式时需要注意的点:

      • 外国对于周几的定义和中国是不一样的,中国1-7对应周一到周日,外国的1-7对应的是周日到周六
      • 国内:1-7 MON,TUE,WED,THU,FRI,SAT,SUN
      • 国外:1-7 SUN,MON,TUE,WED,THU,FRI,SAT √
    • 个人认为有性能问题以及数据库压力的大问题,这里本人就只是简单的分享下设计思路,不同业务需求,实现策略不同。也有肯定公司不在乎这种性能问题。

    愿每个人都能遵循自己的时钟,做不后悔的选择。我是Laity,正在前行的Laity。

  • 相关阅读:
    SQLite实现的学生管理系统
    vue简单案例----小张记事本
    拓世大模型 | 立足行业所需,发力终端,缔造智能无限可能
    若依RuoYi-Vue分离版—PageHelper分页的坑
    深度解析Socks5代理与IP代理的网络应用
    通讯网关软件009——利用CommGate X2MQTT实现MQTT访问ODBC数据源
    【一起来学C++】————(7)多态
    node.js: socket.io服务端和客户端交互示例
    【SSA-LSTM】基于SSA-LSTM预测研究(Python代码实现)
    Stable Diffusion 模型下载:Juggernaut(主宰、真实、幻想)
  • 原文地址:https://blog.csdn.net/duyun0/article/details/134093179