前面我们已经将三个类和三个类的操作模块进行的单元测试全部通过。只有消息类未进行测试,这是因为消息是特殊的,需要持久化存储。接下来创建第二个消息队列比较重要的部分(数据持久化):
- 实现消息存储在硬盘上
- 对于消息的操作不需要大量的增删改查
- 对于文件的操作效率要比数据库高
在数据库所在的文件夹中创建一些文件或文件夹来存储对应的消息。
在每个文件夹中分配两个文件用于存储不一样的数据
①保存消息的具体内容
data.txt
- 二进制格式
- 每个消息分为两部分
- 前4字节表示Message的长度
- 后面的就是Message内容
定位消息使用Message偏移量
offsetBeg
开始offsetEnd
结束
②保存消息的统计信息
state.txt
- 使用文本文件存储,里面包含两列,使用’\t’分割
- 第一列包含总消息数目
- 第二列表示有效消息数目
例如:
2000\t1500
垃圾回收机制一共有五种方法:
- 引用计数
- 跟随每个对象的引用次数,如果当前对象引用次数为0,表示不再引用该对象,可以被回收。
- 标记清除
- 通过根节点遍历,标记所有可达对象,然后对其他直接进行清除操作。
- 复制算法
- 将内存空间划分为两个大小相同的内存,每次只使用一半的内存,然后当内存使用超过一半时将不是垃圾的对象拷贝到另一半的空间中,然后清空旧区域所有对象。
- 标记整理
- 结合了标记清除和复制算法的优点,首先标记可达对象,再将存活的对象复制到另一部分,然后清除未被标记的内存。
- 分代回收思想
- 根据对象存在的时间不同,将对象划分为不同的代,一半新创建的对象,通常只存活较短的时间,而对于长时间存活的对象,采用较长的垃圾回收周期。
使用逻辑删除原理,标记消息的存在还是已经删除。虽然这样可以有效的使用数据,不过随着时间越来越久,数据量也越来越大,我们就需要对数据进行垃圾回收(JVM
)
使用复制算法来解决这个问题,复制算法有效的前提是文件中有效数据较少,无效数据多,将有效数据复制到一个新文件中。
何时触发,有效数据不多时,进行复制算法
大量信息时的解决方案:如果某个队列中,消息特别多,而且都是有效消息,可能就会导致这个消息数据文件特别大,之后针对这个文件进行的操作就会比较慢。
- 文件拆分:当单个文件达到一定阈值时,就会将这个文件拆分为两个文件,慢慢的,就会形成许多文件。
- 文件合并:每个单独的文件都会进行
GC
,如果检查后,发现这个文件变小了许多,就可能会和相邻的文件进行合并操作,节省空间。
第一步,只需要创建一个内部类囊括了统计文件数据和三个方法:
内部类:封装统计文件的两个数据,总消息数和有效消息数。
- 获取队列文件位置
- 获取统计消息文件位置
- 获取消息数据文件位置
具体实现代码:
public class MessageFileManger {
//定义内部类来表示队列的统计信息
static public class State{
public int totalCount;//总消息数
public int validCount;//有效消息数
}
public void init(){
//初始化,扩展!
}
//创建两个文件
//1.获取文件目录
private String getQueueDir(String queueName){
return "./data/"+queueName;
}
//2.创建统计文件
private String getQueueStatePath(String queueName){
return getQueueDir(queueName)+"/queue_state.txt";
}
//3.创建消息文件
private String getQueueDataPath(String queueName){
return getQueueDir(queueName)+"/queue_data.txt";
}
}
因为统计文件是由二进制文件构成,所以这里可以直接使用inputStream
和OutputStream
来进行文件读写操作。在之前的基础上操作文件数据。
读取文件:
//读取统计消息文件
//读取统计消息文件
private Stat readStat(String queueName){
//由于当前消息统计文件是文本文件。可以直接使用Scanner读取文件
Stat stat = new Stat();
try(InputStream inputStream = new FileInputStream(getQueueDataPath(queueName))) {
Scanner scanner = new Scanner(inputStream);
stat.totalCount = scanner.nextInt();
stat.validCount = scanner.nextInt();
return stat;
} catch (IOException e) {
e.printStackTrace();
}
return null;
}
写入文件操作:
//写入统计消息文件
private void writeStat(String queueName,Stat stat){
//使用 printWrite来写入文件
//使用OutPutStream会将原文件清空,相对于新数据会覆盖旧数据
try(OutputStream outputStream = new FileOutputStream(getQueueDataPath(queueName),true)){
//参数`true`意思是在重新写入文件时将消息写在文件末尾
PrintWriter writer = new PrintWriter(outputStream);
writer.write(stat.totalCount+"\t"+stat.validCount);
writer.flush();//手动提交
}catch (IOException e) {
e.printStackTrace();
}
}
在创建文件目录时,创建可能会失败,而我们不能再依靠原来的方案来进行异常抛出,引入一个新方法来统一将异常进行处理。
自定义异常:
public class MqException extends Exception{
public MqException(String reason){
super(reason);
}
}
接着我们便继续创建每个消息队列专属的文件目录和文件:
//1, 创建每个消息队列专属的文件目录及文件
public void createQueueFiles(String queueName) throws IOException, MqException {
//1. 创建队列以及对应的消息目录
File baseDir = new File(getQueueDir(queueName));
if(!baseDir.exists()){
boolean ok = baseDir.mkdirs();
if(!ok){
throw new MqException("[MessageFileManger] 创建目录失败! queueName = "+queueName);
}
}
//2. 创建数据文件
File dataFile = new File(getQueueDataPath(queueName));
if(!dataFile.exists()){
boolean ok = dataFile.createNewFile();
if(!ok){
throw new MqException("[MessageFileManger] 创建数据文件失败! queueName = "+queueName);
}
}
//3.创建统计文件
File stateFile = new File(getQueueStatePath(queueName));
if(!stateFile.exists()){
boolean ok = stateFile.createNewFile();
if(!ok){
throw new MqException("[MessageFileManger] 创建数据文件失败! queueName = "+queueName);
}
}
//4. 初始化统计文件 ----> 0/t0
State state = new State();
state.validCount = 0;
state.totalCount = 0;
writeState(queueName,state);
}
有创建便有删除操作:
//2. 删除每个消息队列专属的目录和文件
public void deleteQueueFiles(String queueName) throws MqException {
//1.先将文件删除,再删除目录
File stateFile = new File(getQueueStatePath(queueName));
boolean state = stateFile.delete();
File dataFile = new File(getQueueDataPath(queueName));
boolean data = dataFile.delete();
File baseFile = new File(getQueueDir(queueName));
boolean delDir = baseFile.delete();
if(!state || !data || !delDir){
throw new MqException("[MessageFileManger] 删除目录失败! queueName = "+queueName);
}
}
当需要操作一个消息队列时,我们必须提前检查消息队列的文件是否正常:
//3. 检查目录和文件是否存在,如果要对一个消息队列进行操作,就应该先判定
public boolean checkFileExits(String queueName){
//实际上只需要判断两个文件的存在即可,因为只要文件存在,目录便一定存在
File dataFile = new File(getQueueDataPath(queueName));
if(!dataFile.exists()){
return false;
}
File stateFile = new File(getQueueStatePath(queueName));
if(!stateFile.exists()){
return false;
}
return true;
}
在整理消息之前,不要忘了数据文件中存储的是二进制数据!对于二进制数据,就必须要进行序列化和反序列化。
序列化的好处
将一个结构体转化成一个字符串或者字节数组存储在系统中,序列化后方便存储。
经过这个的这样的序列化后,结构体中的信息是不会丢失的,这样后面可以进行反序列化取出原来的数据。
使用二进制的序列化方式,针对Message对象进行序列化,如果一个对象能够反序列化或序列化,需要让这个类实现 Serializable
接口。
在公共类文件夹中创建一个序列化和反序列化的文件。
import java.io.*;
//这个类中的逻辑,不仅仅是针对消息队列的反序列化和序列化操作,对于java任意数据都通过以下逻辑进行序列化操作
// 如果一个对象能够反序列化或序列化,需要让这个类实现 Serializable 接口
public class BinaryTool {
//把一个对象序列化成一个数组
public static byte[] toBytes(Object object){
//这个流对象相对于一个变长的字节数组
// 可以把 object 序列化的数据给写入到 byteArrayOutputStream ,再统一转成 byte[]
try(ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream()){
try(ObjectOutputStream outputStream = new ObjectOutputStream(byteArrayOutputStream)) {
// 此处的 writeObject 就会把该对象进行序列化,生成二进制的字节数据,就会写到 objectOutPutStream
// 由于 objectOutPutStream 关联了 ByteArrayOutPutStream ,最终结果写入到 byteArray
outputStream .writeObject(object);
//这个操作就是把 byteArrayOutputStream 中持有的二进制数据取出来,转成byte[]
return byteArrayOutputStream.toByteArray();
}
} catch (IOException e) {
e.printStackTrace();
}
return null;
}
//把一个数组序列化成一个对象
public static Object fromBytes(byte[] data) {
Object ret = null;
try (ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(data)) {
try (ObjectInputStream inputStream = new ObjectInputStream(byteArrayInputStream)) {
//这里的readObject是在数组中读取数据并进行反序列化
ret = inputStream.readObject();
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
} catch (IOException e) {
e.printStackTrace();
}
return ret;
}
}
注意使用序列化的对象需要实现serializable
接口。
实现完接口后,在对项目的版本号进行更新操作。
//验证开发者版本号
private static final long serialVersionUID=1L;
准备好消息队列的文件后,接下来就是想办法将消息存储到队列上,这里整理了对消息存储到文件的几个步骤。
使用两个方法来进行发送和获取:
//消息的写入与读取
//1.在创建消息序列化和反序列化的基础上,创建发送消息方法
public void sendMessage(MSGQueue queue, Message message) throws MqException, IOException {
//1. 检查当前队列是否存在
if(!checkFileExits(queue.getName())){
throw new MqException("[MessageFileManger] 队列不存在!无法发送消息到队列 queueName = "+queue.getName());
}
//2.对将要发送的消息进行序列化处理
byte[] messageBinary = BinaryTool.toBytes(message);
//发送消息时可能存在线程安全问题,需要进行加锁
synchronized (queue){
//3. 获取队列中数据文件的长度,用来计算 Message 中的偏移量 offsetBeg 和 offsetEnd
// 写入数据到队列的末尾 因为前面的len存储数据长度用了4个字节,所以 offsetEnd和offsetBeg都需要经过计算后+4
// 3.1 得到数据文件
File datafile = new File(getQueueDataPath(queue.getName()));
// 3.2 设置偏移量,预留出消息位置
message.setOffsetBeg(datafile.length()+4);
message.setOffsetEnd(datafile.length()+4 + messageBinary.length);
//3.3 追加数据到文件中
try(OutputStream outputStream = new FileOutputStream(datafile,true)){
try(DataOutputStream dataOutputStream = new DataOutputStream(outputStream)){
//写入文件长度,前4个字节
dataOutputStream.writeInt(messageBinary.length);
//写入消息数据
dataOutputStream.write(messageBinary);
}
}
//更新消息统计文件
State state = readState(queue.getName());
state.totalCount +=1;
state.validCount +=1;
writeState(queue.getName(),state);
}
}
在写入文件中存在着线程安全问题,在两个线程同时执行这段代码时会出现特别要注意在发送消息时可能会发生线程安全问题——脏读问题。如果多个线程对一个消息队列发送消息,可能会进行消息覆盖,所以我们这里需要加锁操作。
解决方法:
以队列对象为加锁单位即可。
如果是两个线程,往一个队列中写消息,此时需要阻塞等待。如果是两个线程往不同队列中写消息不需要阻塞等待。
删除步骤
这里是逻辑删除信息,也就是将硬盘上存储的数据的 isValid 属性改为 0 1. 先把文件中数据读取出来,还原为Message对象 2. 把isValid属性改为 0 3. 在把信息重新写入文件中
- 1
- 2
- 3
- 4
这里的message参数必须包含有效的 offsetBeg
和 offsetEnd
来标识信息的位置
此处读取信息不能使用FileOutputStream
和FileInputStream
,这两个类是从文件头进行读写,这里我们需要指定位置进行读取信息,这里使用RandomAccessFile
,内存支持随机访问,只有这样才能更准确定位到消息。
具体代码实现:
//逻辑删除消息
// 这里我们只需要将文章中的 valid 改成 0 即可
public void deleteMessage(MSGQueue queue,Message message) throws IOException, ClassNotFoundException {
//要知道删除的是哪一个队列和那条消息
//注意!这里也需要加锁操作,因为 不能并发操作一个消息的删除工作,例如一个消息抢先一步删除,后续程序可能会出现异常情况
synchronized (queue) {
//设置可读写权限
try (RandomAccessFile randomAccessFile = new RandomAccessFile(getQueueDataPath(queue.getName()), "rw")) {
//1. 得到具体的消息
byte[] bufferSrc = new byte[(int) (message.getOffsetEnd() - message.getOffsetBeg())];
randomAccessFile.seek(message.getOffsetBeg());
randomAccessFile.read(bufferSrc);
//2. 将原来的二进制数据转为 Message 对象
Message diskMessage = (Message) BinaryTool.fromBytes(bufferSrc);
//3. 修改 valid 参数
diskMessage.setIsValid((byte) 0x0);
//4.重新覆盖当时的文件
byte[] buffDest = BinaryTool.toBytes(diskMessage);
// 5. 重新调整文件位置
randomAccessFile.seek(message.getOffsetBeg());
randomAccessFile.write(buffDest);
}
//因为删除了一条消息,所以统计文件也应该更新
State state = readState(queue.getName());
if(state.validCount > 0){
state.validCount -= 1;
}
writeState(queue.getName(),state);
}
}
将文件中的消息加载到内存中,当然是读取队列中的有效消息。
这个方法是在程序准备启动时执行的,计算机需要将硬盘中的有效消息加载到内存中。
这里我们使用链表这种数据结构,主要是为了后期删除时可以从头部进行删除,避免浪费更多的空间。
- 得到消息文件。
- 读取文件,记录文件开始光标,得到文件长度,计算文件结尾光标位置。将这一段封装为一个消息进行保存。在消息存储在硬盘上时进行了序列化加密操作,所以在取出消息时,也应该对消息进行反序列化操作。
- 判断消息是否为有效消息,如果有效则加入链表中。
- 循环进行扫描该消息队列。
//加载有效消息
public LinkedList<Message> loadAllMangeFromQueue(String queueName) throws IOException, MqException{
//返回参数
LinkedList<Message> messages = new LinkedList<>();
//1. 得到消息文件
try(InputStream inputStream = new FileInputStream(getQueueDataPath(queueName))){
try(DataInputStream dataInputStream = new DataInputStream(inputStream)){
//记录文件光标,慢慢向后移动,相对于得到下一个文件的开始光标
long currentOffSet = 0;
//循环查找
while (true){
//1.得到消息总长度
int messageSize = dataInputStream.readInt();
//2. 按照消息总长度+光标得到消息的位置
byte[] buffer = new byte[messageSize];
int actualSize = dataInputStream.read(buffer);
//对比读取消息的实际长度和消息总长度
if(messageSize != actualSize){
throw new MqException("[MessageFileManger] 消息读取错误,消息长度不符合规范!queueName ="+queueName);
}
//3.将数据封装为一个消息
Message message = (Message)BinaryTool.fromBytes(buffer);
//4.判断数据的有效性
if(message.getIsValid() != 0x1){
currentOffSet += (4+messageSize);
//直接跳过,无效数据
continue;
}
//5.有效数据加入链表中,注意设置消息的 OffsetBeg 和 OffSetEnd 找到在文件中具体位置
message.setOffsetBeg(currentOffSet+4);
message.setOffsetEnd(currentOffSet+4+messageSize);
currentOffSet += (4+messageSize);
messages.add(message);
}
}catch (EOFException | ClassNotFoundException e){
System.out.println("[MessageFileManger] 恢复Message数据完成!");
}
}
return messages;
}
这里并不像逻辑删除那样,这个删除的数据是已经经过消费者进行使用了,所以在这里是真正的将Message进行删除。因为逻辑删除存在一系列的缺点,比如无效消息过多导致资源浪费,以及在查找消息时需要更多的时间来排除无效消息。需要注意这里并不需要将isvalid
设置为0x0
。直接在硬盘以及内存中将文件内容进行修改即可。
- 使用复制算法进行垃圾回收。
- 判断文件是否需要进行垃圾回收,根据统计文件中的有效消息和总消息,如果总消息大于2000且有效消息小于总消息的一半,那么就进行垃圾回收。
- 首先创建一个新文件。
- 在旧文件中得到有效消息然后写入新文件中,按照一个消息的格式写入。
- 删除旧文件,将新文件命名为旧文件名。
- 更新统计文件数据。
//检查是否需要进行垃圾回收
public boolean checkGC(String queueName){
State state = readState(queueName);
assert state != null;
return state.totalCount > 2000 && state.validCount *2 < state.totalCount;
}
//创建一个临时文件作为复制目的
private String getQueueDataNewPath(String queueName){
return getQueueDataPath(queueName)+"/queue_data_new.txt";
}
//复制算法实现
//写入文件!写入文件操作是存在线程安全问题的,当两个线程对同一个消息队列进行写入时可能会出现错误,所以我们需要针对队列进行加锁
public void gc(MSGQueue queue) throws MqException, IOException {
synchronized (queue) {
//1. 创建一个新文件
File queueDataNewFile = new File(getQueueDataNewPath(queue.getName()));
if (queueDataNewFile.exists()) {
//判断新文件的存在性,注意此刻并没有进行创建操作
throw new MqException("[MessageFileManger] gc失败,新文件已经存在! queueName = " + queue.getName());
}
boolean ok = queueDataNewFile.createNewFile();
if (!ok) {
throw new MqException("[MessageFileManger] gc时文件创建失败! queueName = " + queue.getName());
}
//2.读取旧文件中的有效对象
LinkedList<Message> messages = loadAllMangeFromQueue(queue.getName());
//3. 把有效消息写入新文件
try (OutputStream outputStream = new FileOutputStream(queueDataNewFile)) {
try (DataOutputStream dataInputStream = new DataOutputStream(outputStream)) {
for (Message m : messages) {
byte[] buffer = BinaryTool.toBytes(m);
//按照格式写入新文件
dataInputStream.writeInt(buffer.length);
dataInputStream.write(buffer);
}
}
}
//4.删除旧文件,将新文件重新命名
File queueOldFile = new File(getQueueDataPath(queue.getName()));
ok = queueOldFile.delete();
if (!ok) {
throw new MqException("[MessageFileManger] gc时旧文件删除失败! queueName = " + queue.getName());
}
ok = queueDataNewFile.renameTo(queueOldFile);
if (!ok) {
throw new MqException("[MessageFileManger] gc时新文件修改文件名失败! queueName = " + queue.getName());
}
//5.更新统计文件
State state = readState(queue.getName());
state.validCount = messages.size();
state.totalCount = messages.size();
writeState(queue.getName(), state);
System.out.println("[MessageFileManger] gc执行完毕! queueName = " + queue.getName());
}
}
MessageFileManger
总结:
- 设计了目录结构和文件格式
- 实现了目录创建和删除
- 实现了统计文件的读写
- 实现了消息的写入,写入数据文件
- 消息删除 ==》随机访问文件
- 加载所有消息
- 垃圾回收机制===》复制算法
测试文件管理功能,每写完一个功能都需要进行一个单元测试,为了避免后续操作的错误,我们应该将当前功能测试完毕再进行后续文件的编码。
文件操作测试用例:
- 做好每个用例的准备工作
- 创建队列以备后续使用
- 做好收尾工作
- 释放准备工作中创建的资源
具体实现:
@SpringBootTest
public class MessageFileMangeTest {
private MessageFileManger messageFileManger ;
private static final String queueName1 = "testQueue1";
private static final String queueName2 = "testQueue2";
@BeforeEach
public void setUp() throws IOException, MqException {
messageFileManger = new MessageFileManger();
messageFileManger.createQueueFiles(queueName1);
messageFileManger.createQueueFiles(queueName2);
}
@AfterEach
public void tearDown() throws MqException {
messageFileManger.deleteQueueFiles(queueName1);
messageFileManger.deleteQueueFiles(queueName2);
}
}
功能一:创建消息文件和统计文件
//1.测试文件是否可以被创建
@Test
public void testCreateFile(){
File queueDataFile1 = new File("./Data/" + queueName1 + "/queue_data.txt");
Assertions.assertEquals(true,queueDataFile1.isFile());
File queueStateFile1 = new File("./data/"+ queueName1+"/queue_state.txt");
Assertions.assertEquals(true,queueStateFile1.isFile());
File queueDataFile2 = new File("./Data/" + queueName2 + "/queue_data.txt");
Assertions.assertEquals(true,queueDataFile2.isFile());
File queueStateFile2 = new File("./data/"+ queueName2+"/queue_state.txt");
Assertions.assertEquals(true,queueStateFile2.isFile());
}
测试结果:
功能二:统计文件的正常写入和读取操作(利用反射使用 private方法)
@Test
public void testReadWriteState(){
MessageFileManger.State state = new MessageFileManger.State();
state.totalCount = 1000;
state.validCount = 600;
//使用反射调用 private 方法
ReflectionTestUtils.invokeMethod(messageFileManger,"writeState",queueName1,state);
//写入state文件后读取
MessageFileManger.State newState = ReflectionTestUtils.invokeMethod(messageFileManger,"readState",queueName1);
Assertions.assertEquals(1000,newState.totalCount);
Assertions.assertEquals(600,newState.validCount);
}
功能三:测试发送数据到队列
//1创建一个队列
private MSGQueue createQueue(String queueName) {
MSGQueue queue = new MSGQueue();
queue.setName(queueName);
queue.setDurable(true);//持久化
queue.setAutoDelete(false);//不使用删除
queue.setExclusive(false);//只属于一个消费者
return queue;
}
//2.初始化一个消息
private Message createTestMessage(String contain) {
return Message.createMessage("testRoutingKey", null, contain.getBytes());
}
@Test
public void testSendMessage() throws IOException, MqException {
MSGQueue queue = createQueue(queueName1);//调用了创建队列函数
Message message = createTestMessage("testMessage");//调用了初始化消息函数
//调用完发送消息,下一步应该检查统计文件和消息文件的内容
messageFileManger.sendMessage(queue,message);
//检查State
MessageFileManger.State state = ReflectionTestUtils.invokeMethod(messageFileManger, "readState", queueName1);
Assertions.assertEquals(1,state.validCount);
Assertions.assertEquals(1,state.totalCount);
//检查消息文件
LinkedList<Message> messages = messageFileManger.loadAllMangeFromQueue(queueName1);
Assertions.assertEquals(1,messages.size());
Message message1 = messages.get(0);
Assertions.assertEquals(message.getMessageId(),message1.getMessageId());
Assertions.assertEquals(message.getRoutingKey(),message1.getRoutingKey());
Assertions.assertEquals(message.getDeliverMode(),message1.getDeliverMode());
Assertions.assertArrayEquals(message.getBody(),message1.getBody());
}
功能四:测试在队列中读取消息
@Test
public void testLoadAllMessageFromQueue() throws IOException, MqException {
MSGQueue queue = createQueue(queueName1);
LinkedList<Message> exceptMessages = new LinkedList<>();//存入实际数据
//插入数据
for (int i = 0; i < 100; i++) {
Message message = createTestMessage("testMessage" + i);
messageFileManger.sendMessage(queue,message);
exceptMessages.add(message);
}
//读取消息并对比
LinkedList<Message> actualMessages = messageFileManger.loadAllMangeFromQueue(queue.getName());
Assertions.assertEquals(exceptMessages.size(),actualMessages.size());//对比大小
for (int i = 0; i < exceptMessages.size(); i++) {
//对比内容
Message exceptMessage = exceptMessages.get(i);
Message actualMessage = actualMessages.get(i);
System.out.println("["+i+"]actualMessages = "+actualMessage);
Assertions.assertEquals(exceptMessage.getMessageId(),actualMessage.getMessageId());
Assertions.assertEquals(exceptMessage.getRoutingKey(),actualMessage.getRoutingKey());
Assertions.assertEquals(exceptMessage.getDeliverMode(),actualMessage.getDeliverMode());
Assertions.assertEquals(0x1,actualMessage.getIsValid());
}
}
功能五:测试删除消息数据
//测试删除消息功能
@Test
public void testDeleteMessage() throws IOException, MqException, ClassNotFoundException {
MSGQueue queue = createQueue(queueName1);
LinkedList<Message> exceptMessages = new LinkedList<>();
for (int i = 0; i < 10; i++) {
Message testMessage = createTestMessage("testMessage" + i);
exceptMessages.add(testMessage);
messageFileManger.sendMessage(queue,testMessage);
}
//删除三个消息
messageFileManger.deleteMessage(queue,exceptMessages.get(7));
messageFileManger.deleteMessage(queue,exceptMessages.get(8));
messageFileManger.deleteMessage(queue,exceptMessages.get(9));
//对比剩余消息
LinkedList<Message> actualMessages = messageFileManger.loadAllMangeFromQueue(queue.getName());
Assertions.assertEquals(7,actualMessages.size());
for (int i = 0; i < 7 ; i++) {
Message exceptMessage = exceptMessages.get(i);
Message actualMessage = actualMessages.get(i);
System.out.println("["+i+"]actualMessages = "+actualMessage);
Assertions.assertEquals(exceptMessage.getMessageId(),actualMessage.getMessageId());
Assertions.assertEquals(exceptMessage.getRoutingKey(),actualMessage.getRoutingKey());
Assertions.assertEquals(exceptMessage.getDeliverMode(),actualMessage.getDeliverMode());
Assertions.assertEquals(0x1,actualMessage.getIsValid());
}
}
功能六:测试垃圾回收,真正的删除内存中的消息
//1.写入消息
//2.删除一半消息
//3.手动GC
@Test
public void testGC() throws IOException, MqException, ClassNotFoundException {
MSGQueue queue = createQueue(queueName1);
LinkedList<Message> exceptMessages = new LinkedList<>();
for (int i = 0; i < 100; i++) {
Message testMessage = createTestMessage("testMessage" + i);
exceptMessages.add(testMessage);
messageFileManger.sendMessage(queue,testMessage);
}
//获取GC前文件
File beforeGC= new File("./data" + queueName1 + "queue_data.txt");
long begGCLength = beforeGC.length();
System.out.println(begGCLength);
//删除下标为偶数的文件
for (int i = 0; i < 100; i+=2) {
messageFileManger.deleteMessage(queue,exceptMessages.get(i));
}
//手动调用垃圾回收
messageFileManger.gc(queue);
//重新读取消息并验证消息的准确性
List<Message> actualMessage = messageFileManger.loadAllMangeFromQueue(queue.getName());
System.out.println(exceptMessages.size()+"==="+actualMessage.size());
Assertions.assertEquals(50,actualMessage.size());
//对比两个数组之间的消息是否一致
for (int i = 0; i < actualMessage.size(); i++) {
Message actualMessages = actualMessage.get(i);
Message exceptMessage = exceptMessages.get(i * 2 + 1);
Assertions.assertEquals(exceptMessage.getMessageId(),actualMessages.getMessageId());
Assertions.assertEquals(exceptMessage.getRoutingKey(),actualMessages.getRoutingKey());
Assertions.assertEquals(exceptMessage.getDeliverMode(),actualMessages.getDeliverMode());
Assertions.assertEquals(0x1,actualMessages.getIsValid());
}
File afterGCFile = new File("./data/" + queueName1 + "queue_data.txt");
long aftGCLength = afterGCFile.length();
System.out.println(aftGCLength);
Assertions.assertTrue(begGCLength>=aftGCLength);
}