结束了硬盘的管理,就到了内存之间的管理,内存上存数据是为了更快的进行访问。
使用内存管理先前的数据,对于这个 MQ 来说,内存存储数据为主;硬盘存储数据为辅(主要存在目的还是为了持久化,以防重启或宕机之后,数据不丢失)
我们存在哪些组件进行管理呢?该用啥数据结构呢?
关于选用的数据结构,我们这里组件不只有一个,有多个交换机、队列等等,所以我们这里选用了 ConcurrentHashMap 这个线程安全的哈希表。
交换机
Key : 交换机的名字 ; Value : 交换机
相关的代码:
private ConcurrentHashMap < String , Exchange > exchangeMap = new ConcurrentHashMap <>();
队列
Key : 队列的名字 ; Value : 队列
相关的代码:
private ConcurrentHashMap < String , MSGQueue > queueMap = new ConcurrentHashMap <>();
绑定
绑定使用了嵌套的 哈希表;
Key : 交换机的名字 ; Value 作为一个 哈希表 (Key : 队列名字 ; Value : 绑定关系)
相关的代码:
private ConcurrentHashMap < String , ConcurrentHashMap < String , Binding >> bindingsMap = new ConcurrentHashMap <>();
消息
Key : MessageId ; Value :Message 对象
相关的代码:
private ConcurrentHashMap < String , Message > messageMap = new ConcurrentHashMap <>();
表示队列于消息之间的关联
Key : 队列名字; Value :存消息的链表
private ConcurrentHashMap < String , LinkedList < Message >> queueMessageMap = new ConcurrentHashMap <>();
带确认消息集合
这个集合主要是存储了哪些消息被消费者取走了,但是还没有应答
Key : 队列名字; Value : 作为嵌套的哈希表(Key:MesssageId ; Value : Message对象)
private ConcurrentHashMap<String, ConcurrentHashMap<String, Message>> queue = new ConcurrentHashMap<>();
关于应答(ACK):
我们设置了两中应答模式;
对于上述数据结构都是 添加、获取、删除 这三种操作。
交换机的管理很简单,我们就不多介绍了,直接看看代码即可。

队列的管理很简单,这里也就不多介绍了,直接看看代码即可。

绑定的管理没那么简单,我们来看看:
添加绑定:

获取绑定:
我们获取绑定写两个版本:
1. 根据 exchangeName 和 queueName 确定唯一一个 binding

2. 根据 exchangeName 获取全部的 binding

删除绑定:

关于消息的 添加、根据 id 查询,根据 id 删除都很简单,看一眼就好:

发送消息到指定队列
这个就要看队列和消息之间的关联了:
代码的注解我都写在上面了,正真难的方法我会重点介绍。

从队列中获取指定的消息

获取指定队列的消息个数

添加未确认的消息

删除待确认的消息(已经确认过的消息)

获取到指定的待确认消息

这个是个重点,需要从硬盘中读取数据,如果机器宕机了或其他异常情况,就需要把硬盘中的数据恢复到内存中。
这个方法就是从硬盘上读取数据, 把硬盘中之前持久化存储的各个维度的数据都恢复到内存中。
这个方法是个重点方法,大致的流程如下:
每一次恢复都是一次for循环遍历。
注意!! 针对 "未确认的消息" 这部分内存中的数据, 不需要从硬盘恢复。 之前考虑硬盘存储的时候, 也没设定这一块;
一旦在等待 ack 的过程中, 服务器重启了, 此时这些 "未被确认的消息", 就恢复成 "未被取走的消息";
这个消息在硬盘上存储的时候, 就是当做 "未被取走"。
来看看代码:

对上述内存管理做一个小小的总结
借助内存中的一些数据结构,对交换机、队列、绑定、消息,广泛使用了哈希表、链表、嵌套等数据结构等。
在线程安全那一块,我们需要考虑,需不需要加锁,不加锁会造成什么后果,锁应该加到哪里?
这些问题需要谨慎考虑清楚,再去使用。
此外,上述代码都是交由 MemoryDataCenter 这个类进行管理的。
我们还需要一个类,来管理所有硬盘上的数据;
这个硬盘上一章已经说了;包括文件部分和数据库不放呢。
这一层相对硬盘来说属于上层逻辑,上层逻辑要操作硬盘,它们是不关心当前数据是存在数据库还是文件中的。
这个类其实又是一层封装,这一层封装的含义就是为了在内存中调用硬盘上的数据。
本层的代码还是很简单的。
参数
参数就只有两个:
用来管理数据库中的数据的 DataBaseManager 的实例化对象
用来管理数据文件中的数据 MessageFileManager 的实例化对象
方法
初始化方法:

至于其他方法都是一个调用,很简单,这里就不一一列举,简单的看一看就好,我会把完整代码连接放在这里。
