• TeamTalk梳理概括


    即时通讯重点概括

    展开聊聊单聊消息流转流程

    • 消息如何封装
    1. 怎么解决半包、粘包问题?
    2. 消息流转流程介绍下?
    • 消息序号(msg_id )在哪里生成以及生成方式
    • 怎么保证数据的不丢失以及重复包?
    1. 接收端收到数据后(收到消息区别于阅读消息)如何应答?
    2. 消息发送后服务器怎么应答?
    3. 消息发送时的seq有什么作用(业务层的ack机制)

    展开聊聊群聊消息流转流程

    • 如何推送群聊
    • 群消息计数器(msg_id )
    • 群会话如何更新(每有一个人发送消息,则其他人都需要更新会话消息)

    群成员管理

    • 如何创建群
    • 如何删除群
    • 怎么使用redis管理群成员
      消息未读计数是怎么实现的?
    • 服务器怎么保留消息未读计数(redis 单聊和群聊机制不同)
    • 客户端的未读消息计数从何而来
    • 客户端未读消息计数清0时向服务器发送了什么?服务器又是怎么清除未读消息计数(单聊和群聊机制不同)

    数据库

    • 数据库表设计(表达笼统)
    • 密码存储方式
    • 未读消息如何体现
    • 聊天消息分表问题(单聊和群聊消息表)
    • 最近会话表

    MySQL连接池设计

    • 为什么使用连接池
    • 连接池设置多大合适?

    redis连接池设计

    • 为什么使用连接池
    • 连接池设置多大合适?

    文件传输原理

    • 在线传输和离线传输有什么区别

    实时性

    • Http(登录、图片服务)
    • Socket
    • websocket
      展开聊聊登录流程

    并发能力

    • 如何做到百万并发
    • 如何做到千万并发

    db_proxy_server reactor响应处理流程

    1. 数据入口 reactor CProxyConn:: HandlePduBuf
    2. 怎么初始化epoll+线程池
    3. 任务封装
    4. 把任务放入线程池
    5. 执行任务
    6. 把要回应的数据放入回复列表CProxyConn::SendResponsePdulist
    7. epoll所在线程读取回复列表的数据发给请求端

    单聊消息

    消息如何封装?如何保证对端完整解析一帧消息?协议格式?

    1. 答:消息封装采用包头(Header)+包体(Body)的格式。包头自定义格式如下代码所示,包体采用protobuf序列化。
    typedef struct {
       uint32_t    length;        // the whole pdu length
       uint16_t    version;       // pdu version number
       uint16_t    flag;          // not used
       uint16_t    service_id;    //
       uint16_t    command_id;    //
       uint16_t    seq_num;       // 包序号
       uint16_t    reversed;      // 保留
    } PduHeader_t;
    
    1. 答:
    • 采用tcp保证数据传输可靠性
    • 通过包头的 length 字段标记一帧消息的长度
    • 通过service id 和 command id区分不同的命令(比如登录、退出等)
    • 解决数据TCP粘包(包头长度字段)、半包(放入网络库的缓冲区)问题
    void CImConn::OnRead()
    {
        for (;;)
        {
                    uint32_t free_buf_len = m_in_buf.GetAllocSize() - m_in_buf.GetWriteOffset();
                    if (free_buf_len < READ_BUF_SIZE)
                            m_in_buf.Extend(READ_BUF_SIZE);
    
                    int ret = netlib_recv(m_handle, m_in_buf.GetBuffer() + m_in_buf.GetWriteOffset(), READ_BUF_SIZE);
                    if (ret <= 0)
                            break;
    
                    m_recv_bytes += ret;
                    m_in_buf.IncWriteOffset(ret);
    
                    m_last_recv_tick = get_tick_count();
            }
    
        CImPdu* pPdu = NULL;
            try
        {
                    while ( ( pPdu = CImPdu::ReadPdu(m_in_buf.GetBuffer(), m_in_buf.GetWriteOffset()) ) )
                    {
                uint32_t pdu_len = pPdu->GetLength();
                
                            HandlePdu(pPdu);
    
                            m_in_buf.Read(NULL, pdu_len);
                            delete pPdu;
                pPdu = NULL;
    //                        ++g_recv_pkt_cnt;
                    }
            } catch (CPduException& ex) {
                    log("!!!catch exception, sid=%u, cid=%u, err_code=%u, err_msg=%s, close the connection ",
                                    ex.GetServiceId(), ex.GetCommandId(), ex.GetErrorCode(), ex.GetErrorMsg());
            if (pPdu) {
                delete pPdu;
                pPdu = NULL;
            }
            OnClose();
            }
    }
    
    message IMMsgData{
            //cmd id:                0x0301
            required uint32 from_user_id = 1;    //消息发送方
            required uint32 to_session_id = 2;    //消息接受方
            required uint32 msg_id = 3;
            required uint32 create_time = 4; 
            required IM.BaseDefine.MsgType msg_type = 5;
            required bytes msg_data = 6;
            optional bytes attach_data = 20;
    }
    
    message IMMsgDataAck{
            //cmd id:                0x0302
            required uint32 user_id = 1;    //发送此信令的用户id
            required uint32 session_id = 2;                                
            required uint32 msg_id = 3;
            required IM.BaseDefine.SessionType session_type = 4;
    }
    

    消息序号(msg_id )为什么使用redis生成?

    1. 消息ID(msg_id )的作用是防止消息乱序。
    2. 消息ID(msg_id )为什么这么设计?
      答:msg_id 存储在 unread 连接池所在的redis数据库。单聊 msg_id 的 key涉及到 nRelateId。nRelateId 从关系表(IMRelationShip :两个用户id的映射关系)中获取。
    /**
     *  获取会话关系ID
     *  对于群组,必须把nUserBId设置为群ID
     *
     *  @param nUserAId  <#nUserAId description#>
     *  @param nUserBId  <#nUserBId description#>
     *  @param bAdd      <#bAdd description#>
     *  @param nStatus 0 获取未被删除会话,1获取所有。
     */
    uint32_t CRelationModel::getRelationId(uint32_t nUserAId, uint32_t nUserBId, bool bAdd)
    {
        uint32_t nRelationId = INVALID_VALUE;
        if (nUserAId == 0 || nUserBId == 0) {
            log("invalied user id:%u->%u", nUserAId, nUserBId);
            return nRelationId;
        }
        CDBManager* pDBManager = CDBManager::getInstance();
        CDBConn* pDBConn = pDBManager->GetDBConn("teamtalk_slave");
        if (pDBConn)
        {
            uint32_t nBigId = nUserAId > nUserBId ? nUserAId : nUserBId;
            uint32_t nSmallId = nUserAId > nUserBId ? nUserBId : nUserAId;
            string strSql = "select id from IMRelationShip where smallId=" + int2string(nSmallId) + " and bigId="+ int2string(nBigId) + " and status = 0";
            
            CResultSet* pResultSet = pDBConn->ExecuteQuery(strSql.c_str());
            if (pResultSet)
            {
                while (pResultSet->Next())
                {
                    nRelationId = pResultSet->GetInt("id");
                }
                delete pResultSet;
            }
            else
            {
                log("there is no result for sql:%s", strSql.c_str());
            }
            pDBManager->RelDBConn(pDBConn);
            if (nRelationId == INVALID_VALUE && bAdd)
            {
                nRelationId = addRelation(nSmallId, nBigId);
            }
        }
        else
        {
            log("no db connection for teamtalk_slave");
        }
        return nRelationId;
    }
    
    1. 群聊和单聊msg_id 的区别:key设置不同。
    uint32_t CMessageModel::getMsgId(uint32_t nRelateId)
    {
        uint32_t nMsgId = 0;
        CacheManager* pCacheManager = CacheManager::getInstance();
        CacheConn* pCacheConn = pCacheManager->GetCacheConn("unread");
        if(pCacheConn)
        {
            string strKey = "msg_id_" + int2string(nRelateId);
            nMsgId = pCacheConn->incrBy(strKey, 1);
            pCacheManager->RelCacheConn(pCacheConn);
        }
        return nMsgId;
    }
    
    /**
     *  获取一个群组的msgId,自增,通过redis控制
     *  @param nGroupId 群Id
     *  @return 返回msgId
     */
    uint32_t CGroupMessageModel::getMsgId(uint32_t nGroupId)
    {
        uint32_t nMsgId = 0;
        CacheManager* pCacheManager = CacheManager::getInstance();
        CacheConn* pCacheConn = pCacheManager->GetCacheConn("unread");
        if(pCacheConn)
        {
            // TODO
            string strKey = "group_msg_id_" + int2string(nGroupId);
            nMsgId = pCacheConn->incrBy(strKey, 1);
            pCacheManager->RelCacheConn(pCacheConn);
        }
        else
        {
            log("no cache connection for unread");
        }
        return nMsgId;
    }
    

    展开聊聊单聊消息流转流程

    答:两个用户A给B发消息,用户A把聊天消息封装好以后发送给MsgServer;同时把消息进行持久化,将聊天消息发给这个 DBProxy(数据库代理服务),存储消息成功后,DBProxyServer组包应答MsgServer,MsgServer收到回复后组包应答Client A。如果 A 和 B 两个用户不在同一个 MsgServer 上,那么会通过这个 RouteServer 去中转Pdu包数据(广播给所有的MsgServer,MsgServer再广播给Client B),B收到消息后应答MsgServer,至此,流程结束。然后如果是一些热点数据,我们同时也会写Redis。
    群聊消息

    展开聊聊群聊消息流转流程

    群聊消息流转

    void CGroupChat::HandleGroupMessage(CImPdu* pPdu)
    {
        IM::Message::IMMsgData msg;
        CHECK_PB_PARSE_MSG(msg.ParseFromArray(pPdu->GetBodyData(), pPdu->GetBodyLength()));
            uint32_t from_user_id = msg.from_user_id();
            uint32_t to_group_id = msg.to_session_id();
            string msg_data = msg.msg_data();
        uint32_t msg_id = msg.msg_id();
        if (msg_id == 0) {
            log("HandleGroupMsg, write db failed, %u->%u. ", from_user_id, to_group_id);
            return;
        }
        uint8_t msg_type = msg.msg_type();
        CDbAttachData attach_data((uchar_t*)msg.attach_data().c_str(), msg.attach_data().length());
    
        log("HandleGroupMsg, %u->%u, msg id=%u. ", from_user_id, to_group_id, msg_id);
    
        CMsgConn* pFromConn = CImUserManager::GetInstance()->GetMsgConnByHandle(from_user_id,
                                            attach_data.GetHandle());
        if (pFromConn)
        {
            //接收反馈
            IM::Message::IMMsgDataAck msg2;
            msg2.set_user_id(from_user_id);
            msg2.set_session_id(to_group_id);
            msg2.set_msg_id(msg_id);
            msg2.set_session_type(::IM::BaseDefine::SESSION_TYPE_GROUP);
            CImPdu pdu;
            pdu.SetPBMsg(&msg2);
            pdu.SetServiceId(SID_MSG);
            pdu.SetCommandId(CID_MSG_DATA_ACK);
            pdu.SetSeqNum(pPdu->GetSeqNum());
            pFromConn->SendPdu(&pdu);
        }
        
        CRouteServConn* pRouteConn = get_route_serv_conn();
        if (pRouteConn)
        {
            pRouteConn->SendPdu(pPdu);
        }
        
        // 服务器没有群的信息,向DB服务器请求群信息,并带上消息作为附件,返回时在发送该消息给其他群成员
        //IM::BaseDefine::GroupVersionInfo group_version_info;
        CPduAttachData pduAttachData(ATTACH_TYPE_HANDLE_AND_PDU, attach_data.GetHandle(), pPdu->GetBodyLength(), pPdu->GetBodyData());
        
        IM::Group::IMGroupInfoListReq msg3;
        msg3.set_user_id(from_user_id);
        IM::BaseDefine::GroupVersionInfo* group_version_info = msg3.add_group_version_list();
        group_version_info->set_group_id(to_group_id);
        group_version_info->set_version(0);
        msg3.set_attach_data(pduAttachData.GetBuffer(), pduAttachData.GetLength());
        CImPdu pdu;
        pdu.SetPBMsg(&msg3);
        pdu.SetServiceId(SID_GROUP);
        pdu.SetCommandId(CID_GROUP_INFO_REQUEST);
        CDBServConn* pDbConn = get_db_serv_conn();
        if(pDbConn)
        {
            pDbConn->SendPdu(&pdu);
        }
    }
    void CGroupChat::HandleGroupInfoResponse(CImPdu* pPdu)
    {
        IM::Group::IMGroupInfoListRsp msg;
        CHECK_PB_PARSE_MSG(msg.ParseFromArray(pPdu->GetBodyData(), pPdu->GetBodyLength()));
    
        uint32_t user_id = msg.user_id();
        uint32_t group_cnt = msg.group_info_list_size();
        CPduAttachData pduAttachData((uchar_t*)msg.attach_data().c_str(), msg.attach_data().length());
        
        log("HandleGroupInfoResponse, user_id=%u, group_cnt=%u. ", user_id, group_cnt);
    
        //此处是查询成员时使用,主要用于群消息从数据库获得msg_id后进行发送,一般此时group_cnt = 1
        if (pduAttachData.GetPduLength() > 0 && group_cnt > 0)
        {
            IM::BaseDefine::GroupInfo group_info = msg.group_info_list(0);
            uint32_t group_id = group_info.group_id();
            log("GroupInfoRequest is send by server, group_id=%u ", group_id);
            
            std::set group_member_set;
            for (uint32_t i = 0; i < group_info.group_member_list_size(); i++)
            {
                uint32_t member_user_id = group_info.group_member_list(i);
                group_member_set.insert(member_user_id);
            }
            if (group_member_set.find(user_id) == group_member_set.end())
            {
                log("user_id=%u is not in group, group_id=%u. ", user_id, group_id);
                return;
            }
            
            IM::Message::IMMsgData msg2;
            CHECK_PB_PARSE_MSG(msg2.ParseFromArray(pduAttachData.GetPdu(), pduAttachData.GetPduLength()));
            CImPdu pdu;
            pdu.SetPBMsg(&msg2);
            pdu.SetServiceId(SID_MSG);
            pdu.SetCommandId(CID_MSG_DATA);
            
            //Push相关
            IM::Server::IMGroupGetShieldReq msg3;
            msg3.set_group_id(group_id);
            msg3.set_attach_data(pdu.GetBodyData(), pdu.GetBodyLength());
            for (uint32_t i = 0; i < group_info.group_member_list_size(); i++)
            {
                uint32_t member_user_id = group_info.group_member_list(i);
                
                msg3.add_user_id(member_user_id);
                
                CImUser* pToImUser = CImUserManager::GetInstance()->GetImUserById(member_user_id);
                if (pToImUser)
                {
                    CMsgConn* pFromConn = NULL;
                    if( member_user_id == user_id )
                    {
                        uint32_t reqHandle = pduAttachData.GetHandle();
                        if(reqHandle != 0)
                            pFromConn = CImUserManager::GetInstance()->GetMsgConnByHandle(user_id, reqHandle);
                    }
                    
                    pToImUser->BroadcastData(pdu.GetBuffer(), pdu.GetLength(), pFromConn);
                }
            }
            
            CImPdu pdu2;
            pdu2.SetPBMsg(&msg3);
            pdu2.SetServiceId(SID_OTHER);
            pdu2.SetCommandId(CID_OTHER_GET_SHIELD_REQ);
            CDBServConn* pDbConn = get_db_serv_conn();
            if (pDbConn)
            {
                pDbConn->SendPdu(&pdu2);
            }
        }
        else if (pduAttachData.GetPduLength() == 0)
        {
            //正常获取群信息的返回
            CMsgConn* pConn = CImUserManager::GetInstance()->GetMsgConnByHandle(user_id, pduAttachData.GetHandle());
            if (pConn)
            {
                msg.clear_attach_data();
                pPdu->SetPBMsg(&msg);
                pConn->SendPdu(pPdu);
            }
        }
    }
    

    同步群组聊天信息:群会话如何更新(每有一个人发送消息,则其他人都需要更新会话消息)
    分析:
    如果和单聊类似,实时更新会有大量操作数据库的成本。
    某个群成员发消息时,存储消息成功后,会更新群的最新发言时间。
    后续优化,将session放在redis中查询不方便???所以要分库分表???

    void CGroupModel::updateGroupChat(uint32_t nGroupId)
    {
        CDBManager* pDBManager = CDBManager::getInstance();
        CDBConn* pDBConn = pDBManager->GetDBConn("teamtalk_master");
        if(pDBConn)
        {
            uint32_t nNow = (uint32_t)time(NULL);
            string strSql = "update IMGroup set lastChated=" + int2string(nNow) + " where id=" + int2string(nGroupId);
            pDBConn->ExecuteUpdate(strSql.c_str());
            pDBManager->RelDBConn(pDBConn);
        }
        else
        {
            log("no db connection for teamtalk_master");
        }
    }
    

    CSyncCenter类,群会话的更新-独立的线程

    CSyncCenter :: doSyncGroupChat

    1. 根据时间节点将需要更新会话的群id和最近的聊天时间读取出来放到map
    2. 保存当前时间到CSyncCenter
    3. 根据群id从redis读取群成员,然后遍历群成员更新会话信息。
    /**
     *  开启内网数据同步以及群组聊天记录同步
     */
    void CSyncCenter::startSync()
    {
    #ifdef _WIN32
        (void)CreateThread(NULL, 0, doSyncGroupChat, NULL, 0, &m_nGroupChatThreadId);
    #else
        (void)pthread_create(&m_nGroupChatThreadId, NULL, doSyncGroupChat, NULL);
    #endif
    }
    //谈取更新的时间大于之前更新的时间点,把对应的群id-nLastChat读取出来放到mapChangedGroup存储
    /**
     *  同步群组聊天信息
     *
     *  @param arg NULL
     *
     *  @return NULL
     */
    void* CSyncCenter::doSyncGroupChat(void* arg)
    {
        m_bSyncGroupChatRuning = true;
        CDBManager* pDBManager = CDBManager::getInstance();
        map mapChangedGroup;
        do{
            mapChangedGroup.clear();
            CDBConn* pDBConn = pDBManager->GetDBConn("teamtalk_slave");
            if(pDBConn)
            {
                string strSql = "select id, lastChated from IMGroup where status=0 and lastChated >=" + int2string(m_pInstance->getLastUpdateGroup());
                CResultSet* pResult = pDBConn->ExecuteQuery(strSql.c_str());
                if(pResult)
                {
                    while (pResult->Next()) {
                        uint32_t nGroupId = pResult->GetInt("id");
                        uint32_t nLastChat = pResult->GetInt("lastChated");
                        if(nLastChat != 0)
                        {
                            mapChangedGroup[nGroupId] = nLastChat;
                        }
                    }
                    delete pResult;
                }
                pDBManager->RelDBConn(pDBConn);
            }
            else
            {
                log("no db connection for teamtalk_slave");
            }
            m_pInstance->updateLastUpdateGroup(time(NULL));
            for (auto it=mapChangedGroup.begin(); it!=mapChangedGroup.end(); ++it)
            {
                uint32_t nGroupId =it->first;
                list lsUsers;
                uint32_t nUpdate = it->second;
                 // 读取该群的群成员
                CGroupModel::getInstance()->getGroupUser(nGroupId, lsUsers);
                //遍历群成员,更新Session
                for (auto it1=lsUsers.begin(); it1!=lsUsers.end(); ++it1)
                {
                    uint32_t nUserId = *it1;
                    uint32_t nSessionId = INVALID_VALUE;
                    nSessionId = CSessionModel::getInstance()->getSessionId(nUserId, nGroupId, IM::BaseDefine::SESSION_TYPE_GROUP, true);
                    if(nSessionId != INVALID_VALUE)
                    {
                        CSessionModel::getInstance()->updateSession(nSessionId, nUpdate);
                    }
                    else
                    {
                        CSessionModel::getInstance()->addSession(nUserId, nGroupId, IM::BaseDefine::SESSION_TYPE_GROUP);
                    }
                }
            }
    //    } while (!m_pInstance->m_pCondSync->waitTime(5*1000));
        } while (m_pInstance->m_bSyncGroupChatWaitting && !(m_pInstance->m_pCondGroupChat->waitTime(5*1000)));
    //    } while(m_pInstance->m_bSyncGroupChatWaitting);
        m_bSyncGroupChatRuning = false;
        return NULL;
    }
    

    怎么保证数据的不丢失以及重复包?

    1. 包头的seq_num字段(包序号),未回复消息列表
    2. 业务层的ack机制,收到数据会回复ack

    消息未读计数是怎么实现的?

    服务器怎么保留消息未读计数(单聊和群聊)
    7. 单聊和群聊消息未读计数机制为什么不同?
    答:加入该群很大,有1000人,999个人的未读消息计数都+1,效率低下。
    8. 单聊消息未读计数机制

    • key设计:“unread_” + int2string(nToId);field:int2string(nFromId),value:自增1
    • 发送消息时,将消息写入 mysql 消息表成功后,更新redis。
    void CMessageModel::incMsgCount(uint32_t nFromId, uint32_t nToId)
    {
            CacheManager* pCacheManager = CacheManager::getInstance();
            // increase message count
            CacheConn* pCacheConn = pCacheManager->GetCacheConn("unread");
            if (pCacheConn) {
                    pCacheConn->hincrBy("unread_" + int2string(nToId), int2string(nFromId), 1);
                    pCacheManager->RelCacheConn(pCacheConn);
            } else {
                    log("no cache connection to increase unread count: %d->%d", nFromId, nToId);
            }
    }
    9. 群聊消息未读计数机制
    - 群总的消息计数key设计:int2string(nGroupId) + _im_group_msg;
    - field:count
    - 群内某个成员已经读取的消息计数key设计:
    int2string(nUserId) + "_" + int2string(nGroupId) + _im_user_group
    - field:count
    所以:群内某个成员未读消息计数 = 群总消息数量 - 该成员已经读取的消息数量
    
    #define     GROUP_TOTAL_MSG_COUNTER_REDIS_KEY_SUFFIX    "_im_group_msg"
    #define     GROUP_USER_MSG_COUNTER_REDIS_KEY_SUFFIX     "_im_user_group"
    #define     GROUP_COUNTER_SUBKEY_COUNTER_FIELD          "count"
    增加群消息计数
    /**
     *  增加群消息计数
     *  @param nUserId  用户Id
     *  @param nGroupId 群组Id
     *  @return 成功返回true,失败返回false
     */
    bool CGroupMessageModel::incMessageCount(uint32_t nUserId, uint32_t nGroupId)
    {
        bool bRet = false;
        CacheManager* pCacheManager = CacheManager::getInstance();
        CacheConn* pCacheConn = pCacheManager->GetCacheConn("unread");
        if (pCacheConn)
        {
            string strGroupKey = int2string(nGroupId) + GROUP_TOTAL_MSG_COUNTER_REDIS_KEY_SUFFIX;
            pCacheConn->hincrBy(strGroupKey, GROUP_COUNTER_SUBKEY_COUNTER_FIELD, 1);
            map mapGroupCount;
            bool bRet = pCacheConn->hgetAll(strGroupKey, mapGroupCount);
            if(bRet)
            {
                string strUserKey = int2string(nUserId) + "_" + int2string(nGroupId) + GROUP_USER_MSG_COUNTER_REDIS_KEY_SUFFIX;
                string strReply = pCacheConn->hmset(strUserKey, mapGroupCount);
                if(!strReply.empty())
                {
                    bRet = true;
                }
                else
                {
                    log("hmset %s failed !", strUserKey.c_str());
                }
            }
            else
            {
                log("hgetAll %s failed!", strGroupKey.c_str());
            }
            pCacheManager->RelCacheConn(pCacheConn);
        }
        else
        {
            log("no cache connection for unread");
        }
        return bRet;
    }
    获取用户群未读消息计数
    /**
     *  获取用户群未读消息计数
     *  @param nUserId       用户Id
     *  @param nTotalCnt     总条数
     *  @param lsUnreadCount 每个会话的未读信息包含了条数,最后一个消息的Id,最后一个消息的类型,最后一个消息的类容
     */
    void CGroupMessageModel::getUnreadMsgCount(uint32_t nUserId, uint32_t &nTotalCnt, list& lsUnreadCount)
    {
        list lsGroupId;
        CGroupModel::getInstance()->getUserGroupIds(nUserId, lsGroupId, 0);
        uint32_t nCount = 0;
        
        CacheManager* pCacheManager = CacheManager::getInstance();
        CacheConn* pCacheConn = pCacheManager->GetCacheConn("unread");
        if (pCacheConn)
        {
            for(auto it=lsGroupId.begin(); it!=lsGroupId.end(); ++it)
            {
                uint32_t nGroupId = *it;
                string strGroupKey = int2string(nGroupId) + GROUP_TOTAL_MSG_COUNTER_REDIS_KEY_SUFFIX;
                string strGroupCnt = pCacheConn->hget(strGroupKey, GROUP_COUNTER_SUBKEY_COUNTER_FIELD);
                if(strGroupCnt.empty())
                {
    //                log("hget %s : count failed !", strGroupKey.c_str());
                    continue;
                }
                uint32_t nGroupCnt = (uint32_t)(atoi(strGroupCnt.c_str()));
                
                string strUserKey = int2string(nUserId) + "_" + int2string(nGroupId) + GROUP_USER_MSG_COUNTER_REDIS_KEY_SUFFIX;
                string strUserCnt = pCacheConn->hget(strUserKey, GROUP_COUNTER_SUBKEY_COUNTER_FIELD);
                
                uint32_t nUserCnt = ( strUserCnt.empty() ? 0 : ((uint32_t)atoi(strUserCnt.c_str())) );
                if(nGroupCnt >= nUserCnt) {
                    nCount = nGroupCnt - nUserCnt;
                }
                if(nCount > 0)
                {
                    IM::BaseDefine::UnreadInfo cUnreadInfo;
                    cUnreadInfo.set_session_id(nGroupId);
                    cUnreadInfo.set_session_type(IM::BaseDefine::SESSION_TYPE_GROUP);
                    cUnreadInfo.set_unread_cnt(nCount);
                    nTotalCnt += nCount;
                    string strMsgData;
                    uint32_t nMsgId;
                    IM::BaseDefine::MsgType nType;
                    uint32_t nFromId;
                    getLastMsg(nGroupId, nMsgId, strMsgData, nType, nFromId);
                    if(IM::BaseDefine::MsgType_IsValid(nType))
                    {
                        cUnreadInfo.set_latest_msg_id(nMsgId);
                        cUnreadInfo.set_latest_msg_data(strMsgData);
                        cUnreadInfo.set_latest_msg_type(nType);
                        cUnreadInfo.set_latest_msg_from_user_id(nFromId);
                        lsUnreadCount.push_back(cUnreadInfo);
                    }
                    else
                    {
                        log("invalid msgType. userId=%u, groupId=%u, msgType=%u, msgId=%u", nUserId, nGroupId, nType, nMsgId);
                    }
                }
            }
            pCacheManager->RelCacheConn(pCacheConn);
        }
        else
        {
            log("no cache connection for unread");
        }
    }
    清除未读消息
    单聊和群聊清除未读消息都是调用如下函数
    1. 单聊直接删掉key
    2. 群聊将该成员的已读消息数量设置成群总消息数量。
    m_handler_map.insert(make_pair(uint32_t(CID_MSG_READ_ACK), DB_PROXY::clearUnreadMsgCounter));
    void CUserModel::clearUserCounter(uint32_t nUserId, uint32_t nPeerId, IM::BaseDefine::SessionType nSessionType)
    {
        if(IM::BaseDefine::SessionType_IsValid(nSessionType))
        {
            CacheManager* pCacheManager = CacheManager::getInstance();
            CacheConn* pCacheConn = pCacheManager->GetCacheConn("unread");
            if (pCacheConn)
            {
                // Clear P2P msg Counter
                if(nSessionType == IM::BaseDefine::SESSION_TYPE_SINGLE)
                {
                    int nRet = pCacheConn->hdel("unread_" + int2string(nUserId), int2string(nPeerId));
                    if(!nRet)
                    {
                        log("hdel failed %d->%d", nPeerId, nUserId);
                    }
                }
                // Clear Group msg Counter
                else if(nSessionType == IM::BaseDefine::SESSION_TYPE_GROUP)
                {
                    string strGroupKey = int2string(nPeerId) + GROUP_TOTAL_MSG_COUNTER_REDIS_KEY_SUFFIX;
                    map mapGroupCount;
                    bool bRet = pCacheConn->hgetAll(strGroupKey, mapGroupCount);
                    if(bRet)
                    {
                        string strUserKey = int2string(nUserId) + "_" + int2string(nPeerId) + GROUP_USER_MSG_COUNTER_REDIS_KEY_SUFFIX;
                        string strReply = pCacheConn->hmset(strUserKey, mapGroupCount);
                        if(strReply.empty()) {
                            log("hmset %s failed !", strUserKey.c_str());
                        }
                    }
                    else
                    {
                        log("hgetall %s failed!", strGroupKey.c_str());
                    }
                    
                }
                pCacheManager->RelCacheConn(pCacheConn);
            }
            else
            {
                log("no cache connection for unread");
            }
        }
        else{
            log("invalid sessionType. userId=%u, fromId=%u, sessionType=%u", nUserId, nPeerId, nSessionType);
        }
    }
    

    群成员管理

    1. 为什么使用 redis 管理群成员?
      答:发群消息需要通知群成员,群成员很多从redis获取,提高效率。
    2. 群成员管理使用redis 缓存设计,以hash为存储结构,
    • key 为 “group_member_”+int2string(nGroupId);
    • field 为 userId(用户id)
    • Value 为 创建时间:int2string(nCreated)
    1. 加入成员:insertNewMember,插入mysql数据库的同时也插入redis缓存
        /**
         *  修改群成员,增加或删除
         *
         *  @param pPdu      收到的packet包指针
         *  @param conn_uuid 该包过来的socket 描述符
         */
        void modifyMember(CImPdu* pPdu, uint32_t conn_uuid)
        {
            IM::Group::IMGroupChangeMemberReq msg;
            IM::Group::IMGroupChangeMemberRsp msgResp;
            if(msg.ParseFromArray(pPdu->GetBodyData(), pPdu->GetBodyLength()))
            {
                uint32_t nUserId = msg.user_id();
                uint32_t nGroupId = msg.group_id();
                IM::BaseDefine::GroupModifyType nType = msg.change_type();
                if (IM::BaseDefine::GroupModifyType_IsValid(nType) &&
                    CGroupModel::getInstance()->isValidateGroupId(nGroupId)) {
                    
                    CImPdu* pPduRes = new CImPdu;
                    
                    uint32_t nCnt = msg.member_id_list_size();
                    set setUserId;
                    for(uint32_t i=0; i lsCurUserId;
                    bool bRet = CGroupModel::getInstance()->modifyGroupMember(nUserId, nGroupId, nType, setUserId, lsCurUserId);
                    msgResp.set_user_id(nUserId);
                    msgResp.set_group_id(nGroupId);
                    msgResp.set_change_type(nType);
                    msgResp.set_result_code(bRet?0:1);
                    if(bRet)
                    {
                        for(auto it=setUserId.begin(); it!=setUserId.end(); ++it)
                        {
                            msgResp.add_chg_user_id_list(*it);
                        }
                        
                        for(auto it=lsCurUserId.begin(); it!=lsCurUserId.end(); ++it)
                        {
                            msgResp.add_cur_user_id_list(*it);
                        }
                    }
                    log("userId=%u, groupId=%u, result=%u, changeCount:%u, currentCount=%u",nUserId, nGroupId,  bRet?0:1, msgResp.chg_user_id_list_size(), msgResp.cur_user_id_list_size());
                    msgResp.set_attach_data(msg.attach_data());
                    pPduRes->SetPBMsg(&msgResp);
                    pPduRes->SetSeqNum(pPdu->GetSeqNum());
                    pPduRes->SetServiceId(IM::BaseDefine::SID_GROUP);
                    pPduRes->SetCommandId(IM::BaseDefine::CID_GROUP_CHANGE_MEMBER_RESPONSE);
                    CProxyConn::AddResponsePdu(conn_uuid, pPduRes);
                }
                else
                {
                    log("invalid groupModifyType or groupId. userId=%u, groupId=%u, groupModifyType=%u", nUserId, nGroupId, nType);
                }
                
            }
            else
            {
                log("parse pb failed");
            }
        }
    bool CGroupModel::insertNewMember(uint32_t nGroupId, set& setUsers)
    {
        bool bRet = false;
        uint32_t nUserCnt = (uint32_t)setUsers.size();
        if(nGroupId != INVALID_VALUE &&  nUserCnt > 0)
        {
            CDBManager* pDBManager = CDBManager::getInstance();
            CDBConn* pDBConn = pDBManager->GetDBConn("teamtalk_slave");
            if (pDBConn)
            {
                uint32_t nCreated = (uint32_t)time(NULL);
                // 获取 已经存在群里的用户
                string strClause;
                bool bFirst = true;
                for (auto it=setUsers.begin(); it!=setUsers.end(); ++it)
                {
                    if(bFirst)
                    {
                        bFirst = false;
                        strClause = int2string(*it);
                    }
                    else
                    {
                        strClause += ("," + int2string(*it));
                    }
                }
                string strSql = "select userId from IMGroupMember where groupId=" + int2string(nGroupId) + " and userId in (" + strClause + ")";
                CResultSet* pResult = pDBConn->ExecuteQuery(strSql.c_str());
                set setHasUser;
                if(pResult)
                {
                    while (pResult->Next()) {
                        setHasUser.insert(pResult->GetInt("userId"));
                    }
                    delete pResult;
                }
                else
                {
                    log("no result for sql:%s", strSql.c_str());
                }
                pDBManager->RelDBConn(pDBConn);
                
                pDBConn = pDBManager->GetDBConn("teamtalk_master");
                if (pDBConn)
                {
                    CacheManager* pCacheManager = CacheManager::getInstance();
                    CacheConn* pCacheConn = pCacheManager->GetCacheConn("group_member");
                    if (pCacheConn)
                    {
                        // 设置已经存在群中人的状态
                        if (!setHasUser.empty())
                        {
                            strClause.clear();
                            bFirst = true;
                            for (auto it=setHasUser.begin(); it!=setHasUser.end(); ++it) {
                                if(bFirst)
                                {
                                    bFirst = false;
                                    strClause = int2string(*it);
                                }
                                else
                                {
                                    strClause += ("," + int2string(*it));
                                }
                            }
                            
                            strSql = "update IMGroupMember set status=0, updated="+int2string(nCreated)+" where groupId=" + int2string(nGroupId) + " and userId in (" + strClause + ")";
                            pDBConn->ExecuteUpdate(strSql.c_str());
                        }
                        strSql = "insert into IMGroupMember(`groupId`, `userId`, `status`, `created`, `updated`) values\
                        (?,?,?,?,?)";
                        
                        //插入新成员
                        auto it = setUsers.begin();
                        uint32_t nStatus = 0;
                        uint32_t nIncMemberCnt = 0;
                        for (;it != setUsers.end();)
                        {
                            uint32_t nUserId = *it;
                            if(setHasUser.find(nUserId) == setHasUser.end())
                            {
                                CPrepareStatement* pStmt = new CPrepareStatement();
                                if (pStmt->Init(pDBConn->GetMysql(), strSql))
                                {
                                    uint32_t index = 0;
                                    pStmt->SetParam(index++, nGroupId);
                                    pStmt->SetParam(index++, nUserId);
                                    pStmt->SetParam(index++, nStatus);
                                    pStmt->SetParam(index++, nCreated);
                                    pStmt->SetParam(index++, nCreated);
                                    pStmt->ExecuteUpdate();
                                    ++nIncMemberCnt;
                                    delete pStmt;
                                }
                                else
                                {
                                    setUsers.erase(it++);
                                    delete pStmt;
                                    continue;
                                }
                            }
                            ++it;
                        }
                        if(nIncMemberCnt != 0)
                        {
                            strSql = "update IMGroup set userCnt=userCnt+" + int2string(nIncMemberCnt) + " where id="+int2string(nGroupId);
                            pDBConn->ExecuteUpdate(strSql.c_str());
                        }
                        
                        //更新一份到redis中
                        string strKey = "group_member_"+int2string(nGroupId);
                        for(auto it = setUsers.begin(); it!=setUsers.end(); ++it)
                        {
                            pCacheConn->hset(strKey, int2string(*it), int2string(nCreated));
                        }
                        pCacheManager->RelCacheConn(pCacheConn);
                        bRet = true;
                    }
                    else
                    {
                        log("no cache connection");
                    }
                    pDBManager->RelDBConn(pDBConn);
                }
                else
                {
                    log("no db connection for teamtalk_master");
                }
            }
            else
            {
                log("no db connection for teamtalk_slave");
            }
        }
        return bRet;
    }
    
    1. 删除成员:removeMember;从mysql数据库删除的同时,也从redis缓存删除
    bool CGroupModel::removeMember(uint32_t nGroupId, set &setUser, list& lsCurUserId)
    {
       if(setUser.size() <= 0)
       {
           return true;
       }
       bool bRet = false;
       CDBManager* pDBManager = CDBManager::getInstance();
       CDBConn* pDBConn = pDBManager->GetDBConn("teamtalk_master");
       if(pDBConn)
       {
           CacheManager* pCacheManager = CacheManager::getInstance();
           CacheConn* pCacheConn = pCacheManager->GetCacheConn("group_member");
           if (pCacheConn)
           {
               string strClause ;
               bool bFirst = true;
               for(auto it= setUser.begin(); it!=setUser.end();++it)
               {
                   if (bFirst) {
                       bFirst = false;
                       strClause = int2string(*it);
                   }
                   else
                   {
                       strClause += ("," + int2string(*it));
                   }
               }
               string strSql = "update IMGroupMember set status=1 where  groupId =" + int2string(nGroupId) + " and userId in(" + strClause + ")";
               pDBConn->ExecuteUpdate(strSql.c_str());
               
               //从redis中删除成员
               string strKey = "group_member_"+ int2string(nGroupId);
               for (auto it=setUser.begin(); it!=setUser.end(); ++it) {
                   string strField = int2string(*it);
                   pCacheConn->hdel(strKey, strField);
               }
               pCacheManager->RelCacheConn(pCacheConn);
               bRet = true;
           }
           else
           {
               log("no cache connection");
           }
           pDBManager->RelDBConn(pDBConn);
           if (bRet)
           {
               getGroupUser(nGroupId,lsCurUserId);
           }
       }
       else
       {
           log("no db connection for teamtalk_master");
       }
       return bRet;
    }
    

    redis连接池设计
    6. 为什么使用连接池?
    答:对象复用,减小频繁创建链接释放链接的开销时间。
    7. CacheInstances=unread,group_set,token,sync,group_member 5 个连接池
    8. 为什么分开不同的db redis?
    答:方便扩展。
    9. pool_name的意义?
    答:抽象,不必关注redis是否分布式。

    class CacheManager {
    public:
            virtual ~CacheManager();
    
            static CacheManager* getInstance();
    
            int Init();
            CacheConn* GetCacheConn(const char* pool_name);
            void RelCacheConn(CacheConn* pCacheConn);
    private:
            CacheManager();
    
    private:
            static CacheManager*         s_cache_manager;
            map        m_cache_pool_map;
    };
    int CacheManager::Init()
    {
            CConfigFileReader config_file("dbproxyserver.conf");
    
        //CacheInstances=unread,group_set,token,sync,group_member
            char* cache_instances = config_file.GetConfigName("CacheInstances");
            if (!cache_instances) {
                    log("not configure CacheIntance");
                    return 1;
            }
    
            char host[64];
            char port[64];
            char db[64];
        char maxconncnt[64];
            CStrExplode instances_name(cache_instances, ',');
            for (uint32_t i = 0; i < instances_name.GetItemCnt(); i++) {
                    char* pool_name = instances_name.GetItem(i);
                    //printf("%s", pool_name);
                    snprintf(host, 64, "%s_host", pool_name);
                    snprintf(port, 64, "%s_port", pool_name);
                    snprintf(db, 64, "%s_db", pool_name);
            snprintf(maxconncnt, 64, "%s_maxconncnt", pool_name);
    
                    char* cache_host = config_file.GetConfigName(host);
                    char* str_cache_port = config_file.GetConfigName(port);
                    char* str_cache_db = config_file.GetConfigName(db);
            char* str_max_conn_cnt = config_file.GetConfigName(maxconncnt);
                    if (!cache_host || !str_cache_port || !str_cache_db || !str_max_conn_cnt) {
                            log("not configure cache instance: %s", pool_name);
                            return 2;
                    }
    
                    CachePool* pCachePool = new CachePool(pool_name, cache_host, atoi(str_cache_port),
                                    atoi(str_cache_db), atoi(str_max_conn_cnt));
                    if (pCachePool->Init()) {
                            log("Init cache pool failed");
                            return 3;
                    }
    
                    m_cache_pool_map.insert(make_pair(pool_name, pCachePool));
            }
    
            return 0;
    }
    
  • 相关阅读:
    面经|快手经营分析二面
    acwing 861. 二分图的最大匹配(匈牙利算法)
    「SpringCloud Alibaba」Nacos服务注册和中心配置
    【store商城项目02】登录功能的开发
    深度学习 opencv python 实现中国交通标志识别 计算机竞赛
    LeetCode·每日一题·779.第K个语法符合·递归
    数据可视化学习:Matplotlib概述
    计组 | 各程序员、用户 可见 / 不可见 寄存器总结
    centos7的忘记root管理员账号的登录密码
    RP9-变量
  • 原文地址:https://blog.csdn.net/COOL_jack/article/details/142307099