• 消息队列(四):内存管理


    结束了硬盘的管理,就到了内存之间的管理,内存上存数据是为了更快的进行访问。

    使用内存管理先前的数据,对于这个 MQ 来说,内存存储数据为主;硬盘存储数据为辅(主要存在目的还是为了持久化,以防重启或宕机之后,数据不丢失)

    操作核心组件(MemoryDataCenter )

    设计数据结构

    我们存在哪些组件进行管理呢?该用啥数据结构呢?

    关于选用的数据结构,我们这里组件不只有一个,有多个交换机、队列等等,所以我们这里选用了 ConcurrentHashMap 这个线程安全的哈希表。

    交换机

    Key : 交换机的名字 ; Value : 交换机

    相关的代码:

    private ConcurrentHashMap < String , Exchange > exchangeMap = new ConcurrentHas
    hMap <>();

    队列

    Key : 队列的名字 ; Value : 队列

    相关的代码:

    private ConcurrentHashMap < String , MSGQueue > queueMap = new ConcurrentHashMa
    p <>();

    绑定

    绑定使用了嵌套的 哈希表;

    Key : 交换机的名字 ; Value 作为一个 哈希表 (Key : 队列名字 ; Value : 绑定关系)

    相关的代码:

    private ConcurrentHashMap < String , ConcurrentHashMap < String , Binding >> bindi
    ngsMap = new ConcurrentHashMap <>();

    消息

    Key : MessageId ; Value :Message 对象

    相关的代码:

    private ConcurrentHashMap < String , Message > messageMap = new ConcurrentHashM
    ap <>();

    表示队列于消息之间的关联

    Key : 队列名字; Value :存消息的链表

    private ConcurrentHashMap < String , LinkedList < Message >> queueMessageMap = ne
    w ConcurrentHashMap <>();

    带确认消息集合

    这个集合主要是存储了哪些消息被消费者取走了,但是还没有应答

    Key : 队列名字; Value : 作为嵌套的哈希表(Key:MesssageId ; Value : Message对象)

    private ConcurrentHashMap<String, ConcurrentHashMap<String, Message>> queue = new ConcurrentHashMap<>();

    关于应答(ACK):

    我们设置了两中应答模式;

    1. 自动应答:消费者取走消息,这个消息就是被应答了,就可以直接删除
    2. ⼿动应答:消费者取⾛元素,还不算应答,需要消费者再主动调⽤⼀个 basicAck ⽅法,此
      时才算真正应答了,才可以删除消息

    实现交换机的管理

    对于上述数据结构都是 添加、获取、删除 这三种操作。

    交换机的管理很简单,我们就不多介绍了,直接看看代码即可。

    实现队列的管理

    队列的管理很简单,这里也就不多介绍了,直接看看代码即可。

    实现绑定的管理

    绑定的管理没那么简单,我们来看看:

    添加绑定:

    获取绑定:

    我们获取绑定写两个版本:

    1. 根据 exchangeName 和 queueName 确定唯一一个 binding

    2. 根据 exchangeName 获取全部的 binding

    删除绑定:

    实现消息的管理

    关于消息的 添加、根据 id 查询,根据 id 删除都很简单,看一眼就好:

    发送消息到指定队列

    这个就要看队列和消息之间的关联了:

    代码的注解我都写在上面了,正真难的方法我会重点介绍。

    从队列中获取指定的消息

    此处需要进⾏加锁操作,两个线程同时获取的时候破坏链表结构

    获取指定队列的消息个数

    实现待确认集合的管理

    添加未确认的消息

    删除待确认的消息(已经确认过的消息)

    获取到指定的待确认消息

    实现数据从硬盘中恢复

    这个是个重点,需要从硬盘中读取数据,如果机器宕机了或其他异常情况,就需要把硬盘中的数据恢复到内存中。

    这个方法就是从硬盘上读取数据, 把硬盘中之前持久化存储的各个维度的数据都恢复到内存中。

    这个方法是个重点方法,大致的流程如下:

    1. 清空之前集合中的数据
    2. 恢复所有的交换机数据
    3. 恢复所有的队列数据
    4. 恢复所有的绑定数据
    5. 恢复所有消息数据

    每一次恢复都是一次for循环遍历。

    注意!! 针对 "未确认的消息" 这部分内存中的数据, 不需要从硬盘恢复。 之前考虑硬盘存储的时候, 也没设定这一块;

    一旦在等待 ack 的过程中, 服务器重启了, 此时这些 "未被确认的消息", 就恢复成 "未被取走的消息";

    这个消息在硬盘上存储的时候, 就是当做 "未被取走"。

    来看看代码:

    对上述内存管理做一个小小的总结

    总结:

    借助内存中的一些数据结构,对交换机、队列、绑定、消息,广泛使用了哈希表、链表、嵌套等数据结构等。

    在线程安全那一块,我们需要考虑,需不需要加锁,不加锁会造成什么后果,锁应该加到哪里?

    这些问题需要谨慎考虑清楚,再去使用。

    此外,上述代码都是交由 MemoryDataCenter 这个类进行管理的。

    在内存中管理硬盘上的数据(DiskDataCenter)

    我们还需要一个类,来管理所有硬盘上的数据;

    这个硬盘上一章已经说了;包括文件部分和数据库不放呢。

    这一层相对硬盘来说属于上层逻辑,上层逻辑要操作硬盘,它们是不关心当前数据是存在数据库还是文件中的。

    这个类其实又是一层封装,这一层封装的含义就是为了在内存中调用硬盘上的数据。

    本层的代码还是很简单的。

    参数

    参数就只有两个:

    用来管理数据库中的数据的   DataBaseManager  的实例化对象

    用来管理数据文件中的数据   MessageFileManager  的实例化对象

    方法

    初始化方法:

    至于其他方法都是一个调用,很简单,这里就不一一列举,简单的看一看就好,我会把完整代码连接放在这里。

    DiskDataCenter

    MemoryDataCenter

  • 相关阅读:
    【探索Linux】—— 强大的命令行工具 P.11(基础IO,文件操作)
    Linux 安装 Docker +Docker Compose + cucker/get_command_4_run_container
    [oeasy]python0021_python虚拟机的位置_可执行文件_转化为字节形态
    金融机构如何做好中小微物流企业的风控措施?
    php高校师生交流作业信息管理系统mysql
    abc262-D(dp)
    学习Git---20分钟git快速上手
    苹果Audio Classifier使用问题
    Libuv源码解析 - uv_loop整个初始化模块
    25-全局函数和变量
  • 原文地址:https://blog.csdn.net/weixin_67807492/article/details/132862803