• flutter 消息并发时处理,递归查询


    收到新消息的时候执行receiveNewConversation方法

    可以自己模拟一下两条数据插入,延时执行插入会话的操作
    收到一条新的会话消息,先记录会话ID到列表,直到第一条处理完(插入数据库后清理这个会话ID),才处理第二条同会话ID的消息(一直在空递归中)
    (递归处理的时候看能一直监听到global的那个conversation列表是否有某个会话的数据,因为在不断重复执行)

    核心观念是:1.确保处理多条同会话ID的消息时一定是单线程,2.且插入第一条时要卡住,不要让第二条进来,等第一条插入数据库完成后再执行第二条

    注意:如果单纯延时100毫秒再执行第二条同会话ID消息,这种方法依旧是卡不住第二条消息,还是会出现两个会话同时插入数据库,A在12:01执行插入,B在12:02执行插入,A耗时两秒,B耗时一秒,两者都是12:03同时插入消息,导致出现两个会话。

    所以还是要加事务锁,第一条执行后锁住,等完成后再执行第二条

      //新会话限制列表,存放会话ID
      static List<String> newConversationList = [];
    
      //会话队列
      static List<WsImMessageQueue> wsImConversationQueue = [];
    
      //是否正在处理会话递归
      static bool wsImConversationQueueState = false;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    //不存在会话,则保存会话ID到新会话列表,等插完数据库后再清除新会话列表。
    // 第二条会话进来插入到消息列表要等待,等新会话列表没有这个会话ID才把消息插入数据库
    
    
      ///收到新会话处理
      receiveNewConversation(RecvChatMsg recvChatMsg, bool isOffLineMessage) {
        //会话队列添加数据
        Global.wsImConversationQueue.add(WsImMessageQueue(
            recvChatMsg: recvChatMsg, isOffLineMessage: isOffLineMessage));
    
        PrintUtil.prints(
            '$TAG 中台IM消息 插入会话消息到wsImConversationQueue: ${utf8.decode(recvChatMsg.body)}   Global.wsImConversationQueue的长度${Global.wsImConversationQueue.length}');
    
        if (!Global.wsImConversationQueueState) {
          Global.wsImConversationQueueState = true;
          //开始递归
          startProcessingConversationQueue(true);
          PrintUtil.prints(
              '$TAG 中台IM消息 递归完成:${Global.wsImConversationQueueState}');
    
        }
      }
    
      startProcessingConversationQueue(bool isExistMessage) {
        if (isExistMessage) {
          //不为空的时候执行
          if (Global.wsImConversationQueue.isNotEmpty) {
            PrintUtil.prints(
                '$TAG 中台IM消息 === > 开始执行递归方法体');
            //取第一个数据
            if (Global.wsImConversationQueue[0].recvChatMsg != null) {
    
              WsImMessage wsImMessage =
                  addWsImMessage(Global.wsImConversationQueue[0].recvChatMsg);
    
              PrintUtil.prints(
                  '$TAG 中台IM消息 === > 会话列表第一个会话ID${wsImMessage.conversationId}');
    
              //第一条新消息不会被拦截,第二条同会话的消息会被拦截,查询该会话是否在新会话列表里面,如果为true,不给执行,一直执行递归,直到第一条会话插入数据库成功再执行这个判断里面
              if (checkConversationId(wsImMessage.conversationId ?? '') == false) {
                PrintUtil.prints(
                    '$TAG 中台IM消息 === > 第一个数据或者会话列表第一个会话ID已从新会话限制列表里面移除');
                WsImDBUtil().insertMessage(
                    Global.wsImConversationQueue[0].recvChatMsg!,
                    Global.wsImConversationQueue[0].isOffLineMessage);
    
                //如果下一条进来后还是没有查出来赋值,那还是会拦截不到,一般在一秒内完成,应该影响不大
                WsImConversationDb.queryId(wsImMessage.conversationId).then((value) {
                  if (value.isEmpty) {
                    if (wsImMessage.conversationId != null) {
                      //插入到新会话限制列表里面
                      Global.newConversationList.add(wsImMessage.conversationId!);
                      PrintUtil.prints(
                          '$TAG 中台IM消息 === > 没有该会话就添加会话ID到会话队列:${Global.newConversationList}');
                    }
                  } else {
                  }
                });
    
                PrintUtil.prints(
                    '$TAG 中台IM消息 === > 插入消息,新会话限制列表:${Global.newConversationList}');
                Global.wsImConversationQueue
                    .remove(Global.wsImConversationQueue[0]);
                PrintUtil.prints(
                    '$TAG 中台IM消息 === > 移除消息,消息队列还剩:${Global.wsImConversationQueue.length}');
              }
    
              Future.delayed(const Duration(milliseconds: 1000), () {
                //1秒后进行递归
                startProcessingConversationQueue(
                    Global.wsImConversationQueue.isNotEmpty);
                PrintUtil.prints(
                    '$TAG 中台IM消息 === > 递归完成或者下一个递归');
              });
            }
          }
        } else {
          PrintUtil.prints('$TAG 中台IM消息 === > 消息处理完成');
          Global.wsImConversationQueueState = false;
        }
      }
    
      //判断会话ID是否在新会话列表里面,存在说明要等待这会话ID插入数据完成后才进行其他新会话的插入
      bool checkConversationId(String id) {
        bool check = false;
        for (int i = 0; i < Global.newConversationList.length; i++) {
          if (id == Global.newConversationList[i]) {
            check = true;
          }
        }
        PrintUtil.prints(
            '$TAG 中台IM消息 === > 会话列表第一个会话ID在不在新会话限制列表里面:${check}   新会话限制列表:${Global.newConversationList}');
        return check;
      }
    
      ///添加消息内容
      WsImMessage addWsImMessage(RecvChatMsg? recvChatMsg) {
        //默认会话ID是:收到谁消息就是谁的会话ID
        String conId = 'c2c_${recvChatMsg?.fromUserId.toInt()}';
    
        //如果发送者是自己,且接收者不是自己,则是服务器下发的消息,会话ID是对方
        if ('${recvChatMsg?.fromUserId.toInt()}' == Global.userId) {
          if ('${recvChatMsg?.toUserId.toInt()}' != Global.userId) {
            conId = 'c2c_${recvChatMsg?.toUserId.toInt()}';
          }
        }
    
        WsImMessage wsImMessage = WsImMessage(
            userId: int.parse(Global.userId),
            conversationId: conId,
            serverMsgId: recvChatMsg?.serverMsgId.toInt(),
            fromUserId: recvChatMsg?.fromUserId.toInt(),
            toUserId: recvChatMsg?.toUserId.toInt(),
            sendTime: recvChatMsg?.sendTime.toInt(),
            messageBody: WsImMessageBody.fromJson(utf8.decode(recvChatMsg!.body)),
            messageState: WsImMessageState.wsImMessageSendSuccess,
            isSelf: 0,
            isRead: 0);
    
        return wsImMessage;
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
  • 相关阅读:
    聚合大模型场景助力产业升级,WAIC 2024 容联云论坛即将开幕
    力扣第459题 重复的子字符串 巧用erase find c++注释版
    Python基础——下载安装、卸载和系统环境配置
    GoLong的学习之路(二)语法之基本数据类型
    SpringCloud链路追踪SkyWalking-第二章-部署搭建及高可用
    拆解一下任务队列、消息队列、任务调度系统
    Docker-系统环境
    java基于Springboot+vue的医院体检预约挂号系统 elementui
    软件面试笔试复习之C语言
    头条文章_signature
  • 原文地址:https://blog.csdn.net/weixin_44911775/article/details/133862725