阻塞队列(Blocking Queue)-> 生产者消费者模型 (是在一个进程内)
所谓的消息队列,就是把阻塞队列这样的数据结构,单独提取成了一个程序,进行独立部署~ --------> 生产者消费模型 (进程和进程之间/服务和服务之间)
生产者消费者模型作用:
一个生产者,一个消费者
N个生产者,N个消费者
Broker server 内部也涉及一些关键概念(是为了如何进出队列)
消息队列服务器(Broker Server),要提供的核心API
交换机在转发消息的时候,有一套转发规则的~
提供了几种不同的 交换机类型 (ExchangType)来描述这里不同的转发规则
Rabbit主要实现了四种交换机类型(也是由 AMQP协议定义的)
项目中实现了前三种
有两个关键概念
上述 虚拟机、交换机、队列、绑定、消息,需要存储起来。此时内存和硬盘各存储一份,内存为主,硬盘为辅。
在内存中存储的原因:
对于 MQ 来说,能够高效的转发处理数据,是非常关键的指标! 因此对于使用内存来组织数据,得到的效率,就比放硬盘要高很多
在硬盘中存储原因:
为了防止内存中数据随着进程重启/主机重启而丢失
其他的服务器(生产者/消费者)通过网络,和咱们的 Broker Server 进行交互的。
此处设定,使用 TCP + 自定义的应用层协议 实现 生产者/消费者 和 BrokerServer 之间的交互工作
应用层协议主要工作:就是让客户端可以通过网络,调用 brokerserver 提供的编程接口
因此,客户端这边也要提供上述API,只有服务器是真正干实事的;客户端只是发送/接受响应
虽然调用的客户端的方法,但是实际上好像调用了一个远端服务器的方法一样 (远程调用 RPC)
客户端除了提供上述9个方法之外,还需要提供 4个 额外的方法,支撑其他工作
需要做哪些工作?
上述的这些关键数据,在硬盘中怎么存储,啥格式存储,存储在数据库还是文件?
后续服务器重启了,如何读取这些数据,把内存中内容恢复过来?
模块划分
此处考虑的是更轻量的数据库SQLite, 因为一个完整的 SQLite 数据库,只有一个单独的可执行文件(不到1M)
<dependency>
<groupId>org.xerialgroupId>
<artifactId>sqlite-jdbcartifactId>
<version>3.42.0.0version>
dependency>
spring:
datasource:
url: jdbc:sqlite:./data/meta.db
username:
password:
driver-class-name: org.sqlite.JDBC
上述依赖和配置都弄完后,当程序启动时,会自动建立数据库。所以我们只需要建表就行。
此处我们根据之前的需求分析,建立三张表,此处我们通过 代码形式来建造三张表
mybatis:
mapper-locations: classpath:mapper/**Mapper.xml
void insertExchange(Exchange exchange);
List<Exchange> selectAllExchanges();
void deleteExchange(String exchangeName);
<insert id="insertExchange" parameterType="com.example.mq.mqserver.core.Exchange">
insert into exchange values (#{name},#{type},#{durable},#{autoDelete},#{arguments});
insert>
<select id="selectAllExchanges" resultType="com.example.mq.mqserver.core.Exchange">
select * from exchange;
select>
<delete id="deleteExchange" parameterType="java.lang.String">
delete from exchange where name = #{exchangeName};
delete>
void insertQueue(MSGQueue queue);
List<MSGQueue> selectAllQueues();
void deleteQueue(String queueName);
<insert id="insertQueue" parameterType="com.example.mq.mqserver.core.MSGQueue">
insert into queue values (#{name},#{durable},#{exclusive},#{autoDelete},#{arguments});
insert>
<select id="selectAllQueues" resultType="com.example.mq.mqserver.core.MSGQueue">
select * from queue;
select>
<delete id="deleteQueue" parameterType="java.lang.String">
delete from queue where name = #{queueName};
delete>
void insertBinding(Binding binding);
List<Binding> selectAllBindings();
void deleteBinding(Binding binding);
<insert id="insertBinding" parameterType="com.example.mq.mqserver.core.Binding">
insert into binding values (#{exchangeName},#{queueName},#{bindingKey});
insert>
<select id="selectAllBindings" resultType="com.example.mq.mqserver.core.Binding">
select * from binding;
select>
<delete id="deleteBinding" parameterType="java.lang.String">
delete from binding where exchangeName = #{exchangeName} and queueName = #{queueName};
delete>
在服务器(BrokerServer)启动的时候,能够做出以下逻辑判定:
构造一个类 DataBaseManager
package com.example.mq.mqserver.datacenter;
import com.example.mq.MqApplication;
import com.example.mq.mqserver.core.Binding;
import com.example.mq.mqserver.core.Exchange;
import com.example.mq.mqserver.core.ExchangeType;
import com.example.mq.mqserver.core.MSGQueue;
import com.example.mq.mqserver.mapper.MetaMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.io.File;
import java.lang.reflect.Field;
import java.util.List;
/**
* 通过这个类,来整合数据库操作
*/
public class DataBaseManager {
private MetaMapper metaMapper;
// 针对数据库进行初始化
public void init(){
// 要做的是从 Spring 获取到现成的对象
metaMapper = MqApplication.context.getBean(MetaMapper.class);
if(!checkDBExists()){
// 数据库不存在,就进行建库建表操作
// 先创建一个 data 目录
File dataDir = new File("./data");
dataDir.mkdirs();
// 创建数据表
createTable();
// 插入默认数据
createDefaultData();
System.out.println("[DataBaseManager] 数据库初始化完成!");
}else {
// 数据库已经存在,则什么都不做
System.out.println("[DataBaseManager] 数据库已经存在!");
}
}
public void deleteDB(){
File file = new File("./data/meta.db");
boolean ret = file.delete();
if (ret){
System.out.println("[DataBaseManager] 删除数据库文件成功!");
}else {
System.out.println("[DataBaseManager] 删除数据库文件失败!");
}
File dataDir = new File("./data");
ret = dataDir.delete();
if (ret){
System.out.println("[DataBaseManager] 删除数据库目录成功!");
}else {
System.out.println("[DataBaseManager] 删除数据库目录失败!");
}
}
private boolean checkDBExists() {
File file = new File("./data/meta.db");
if (file.exists()){
return true;
}
return false;
}
// 这个方法用来建表
// 建库操作并不需要手动执行(不需要手动创建 meta.db 文件)
// 首次执行这里的数据库操作的时候,就会自动创建 meta.db 文件 (mybatis 帮我们完成的)
private void createTable() {
metaMapper.createExchangeTable();
metaMapper.createQueueTable();
metaMapper.createBindingTable();
System.out.println("[DataBaseManager] 创建表完成!");
}
// 给数据库表中,添加默认的值
// 此处主要是添加一个默认的交换机
// RabbitMQ 里有一个这样的设定: 带有一个 匿名 的交换机,类型是 DIRECT
private void createDefaultData() {
// 构造一个默认交换机
Exchange exchange = new Exchange();
exchange.setName("");
exchange.setType(ExchangeType.DIRECT);
exchange.setDurable(true);
exchange.setAutoDelete(false);
metaMapper.insertExchange(exchange);
System.out.println("[DataBaseManager] 创建初始数据完成");
}
// 把其他的数据库操作,也在这个类封装下
public void insertExchange(Exchange exchange){
metaMapper.insertExchange(exchange);
}
public List<Exchange> selectAllExchanges(){
return metaMapper.selectAllExchanges();
}
public void deleteExchange(String exchangeName){
metaMapper.deleteExchange(exchangeName);
}
public void insertQueue(MSGQueue queue){
metaMapper.insertQueue(queue);
}
public List<MSGQueue> selectAllQueues(){
return metaMapper.selectAllQueues();
}
public void deleteQueue(String queueName){
metaMapper.deleteQueue(queueName);
}
public void insertBinding(Binding binding){
metaMapper.insertBinding(binding);
}
public List<Binding> selectAllBindings(){
return metaMapper.selectAllBindings();
}
public void deleteBinding(Binding binding){
metaMapper.deleteBinding(binding);
}
}
Message,如何在硬盘上存储?
所以要把消息直接存储在文件中
以下设定消息具体如何在文件中存储~
消息是依托于队列的,因此存储的时候,就要把 消息 按照 队列 维度展开
此处已经有了一个 data 目录(meta.db就在这个目录中)
在 data 中创建一些子目录,每个队列对应一个子目录,子目录名就是队列名
queue_data.txt:这个文件里面存储的是二进制的数据,我们约定转发到这个队列的队列所有消息都是以二进制的方式进行存储
首先规定前4个字节代表的该消息的长度,后面紧跟着的是消息本体。
对于BrokerServer来说,消息是需要新增和删除的。
生产者生产一个消息,就是新增一个消息
消费者消费一个消息,就是删除一个消息
对于内存中的消息新增删除就比较容易了:使用一些集合类就行
对于文件中新增:
我们采用追加方式,直接在当前文件末尾新增就行
对于文件中删除:
如果采用真正的删除,效率就会非常低。将文件视为顺序表结构,删除就会涉及到一系列的元素搬运。
所以我们采用逻辑删除的方式。根据消息中的一个变量 isValid 判断该消息是否有效,1 为有效消息;0 为
无效消息
那么如何找到每个消息对应在文件中的位置呢? 我们之前在 Message 中设置了两个变量,一个是 offsetBeg,一个是 offsetEnd。
我们存储消息的时候,是同时在内存中存一份和硬盘中存一份。而内存中存到那一份消息,记录了当前的消息的 offsetBeg 和 offsetEnd。通过先找到内存中的消息,再根据该消息的两个变量值,就能找到硬盘中的消息数据了。
随着时间的推移,文件中存放的消息可能会越来越多。并且可能很多消息都是无用的,所以就要针对当前消息数据文件进行垃圾回收。
此处我们采用的复制算法,原理也是比较容易理解的 (复制算法:比较适用的前提是,当前的空间,有效数据不多,大多数都是无效的数据)
直接遍历原有的消息数据文件,把所有的有效数据数据重新拷贝一份到新的文件中,新文件名字和原来文件名字相同,再把旧的文件直接删除掉。
那么垃圾回收的算法有了,何时触发垃圾回收?
此处就要用到我们每个队列目录中,所对应的另一个文件 queue_stat.txt了,使用这个文件来保存消息的统计信息
只存一行数据,用 \t 分割, 左边是 queue_data.txt 中消息的总数目,右边是 queue_data.txt中有效的消息数目。 形如 2000\t1500, 代表该队列总共有2000条消息,其中有效消息为1500条
所以此处我们就约定,当消息总数超过2000条,并且有效消息数目低于总消息数的50%,就处罚一次垃圾回收GC
如果当一个文件消息数目非常的多,而且都是有效信息,此时会导致整个消息的数据文件非常庞大,后续针对这个文件操作就会非常耗时。假设当前文件已经达到10个G了,那么此时如果触发一次GC,整个耗时就会非常高。
对于RabbitMQ来说,解决方案:
文件拆分:当某个文件长度达到一定的阈值的时候,就会拆分成两个文件(拆着拆着就成了很多文件)
文件合并:每个单独的文件都会进行GC,如果GC之后,发现文件变小了,就会和相邻的其他文件合并
这样做,可以保证在消息特别多的时候,也能保证性能上的及时响应
实现思路:
需要定义一个内部类,在表示该队列的统计消息,此处优先考虑 static 静态内部类
static public class Stat {
// 此处直接定义成 public
public int totalCount; // 总的消息数
public int validCount; // 有效消息数
}
private Stat readStat(String queueName) {
Stat stat = new Stat();
try (InputStream inputStream = new FileInputStream(getQueueStatPath(queueName))) {
Scanner scanner = new Scanner(inputStream);
stat.totalCount = scanner.nextInt();
stat.validCount = scanner.nextInt();
return stat;
} catch (IOException e) {
e.printStackTrace();
}
return null;
}
private void writeStat(String queueName, Stat stat) {
// 使用 PrintWrite 来写文件
// OutputStream 打开文件,默认情况下,会直接把源文件清空,此时就相当于 新数据把旧的数据覆盖了
// 加个 参数 true,就会变成追加 new FileOutputStream(getQueueStatPath(queueName),true)
try (OutputStream outputStream = new FileOutputStream(getQueueStatPath(queueName))) {
PrintWriter printWriter = new PrintWriter(outputStream);
printWriter.write(stat.totalCount + "\t" + stat.validCount);
printWriter.flush();
} catch (IOException e) {
e.printStackTrace();
}
}
// 创建队列对应的文件目录
public void createQueueFiles(String queueName) throws IOException {
// 1. 先创建队列对应的消息目录
File baseDir = new File(getQueueDir(queueName));
if (!baseDir.exists()) {
// 不存在就创建这个目录
Boolean ok = baseDir.mkdirs();
if (!ok) {
throw new IOException("创建目录失败!baseDir=" + baseDir.getAbsolutePath());
}
}
// 2. 创建队列数据文件
File queueDataFile = new File(getQueueDataPath(queueName));