我们核心类主要有四个:
交换机、队列、绑定、消息。这四个核心类还可以继续向下划分。除了这几个还有其他的核心类,先介绍这些,等后面讲到了相关功能实现后面再补充。
关于交换机,我们主要需要实现三种交换机,还有其他一些属性(包括:id、是否持久化、自动删除、参数),思维导图如下图(代码就不演示出来了,具体的代码在末尾我会将马云地址贴出来代码中也有注释):
此外方法中还包括了 getting、setting。
我这里单独设置了一个ExchangeType 利用枚举分别表示不同的交换机。这段代码很简单:
- public enum ExchangeType {
- DIRECT(0),
- FANOUT(1),
- TOPIC(2);
- private final int type;
-
- private ExchangeType(int type) {
- this.type = type;
- }
-
- public int getType() {
- return type;
- }
- }
这里先把几个基础的属性了解清楚,其余的等到后面具体实现相关功能再来详细介绍。
先了解清楚如下几个属性:name、durable、autoDelete、exclusive、arguments。
思维导图如下:
绑定主要是用来连接交换机和队列的。Binding 这个东西是依附于 Exchange 和 Queue 的,对于持久化来说,如果 Exchange 和 Queue 任何一个都没有持久化,此时你针对 Binding 持久化时没有意义的
Binding 其实很简单,只有三个参数交换机的名字、队列的名字、绑定的 Key,方法也是只有 对应的 setting、getting 加上 toString 方法;
思维导图如下:
关于消息,相比于上述的较为复杂,它是由生产者通过网络传输到 BrokerServer ,再由 BrokerServer 推送给消费者,中途需要对消息进行序列化和反序列化操作等等。
属性这里来一个一个介绍,
首先我们来进行一个约定,约定好消息是怎么组成的。
我们这里约定,我们的消息分为两个部分,第一个部分是 BasicProperties ;
另一个部分就是消息本体 body (用于存储具体的消息)、offsetBeg 、 offsetEnd(这两个属性作为两个偏移量,来找到某个具体的消息,并且这两个属性不参与序列化和反序列化,需要用 transient 修饰)、isValid(byte类型用于判断消息是否有效,0为有效,1为无效)
为什么 isValid 这里不采用 布尔类型来表示消息是否有效呢?
主要原因:
BasicProperties 单独写一个类
这个BasicProperties 表示消息具体有哪些属性,消息 Id,routingKey(这属性是根据交换机类型来确定的,是直接交换机,这就表示为队列名、是扇出交换机,这个就无意义、是主题交换机就需要相互匹配)、deliverMode(这表示消息是否需要持久化,这个持久化和上述持久化是不一样的,这个是保存在文件中)
思维导图如下:
我这里并没有采用到 MySQL数据库,而是选用了 SQLlite 数据库,原因如下:
1. 直接在 pom.xml 文件引入 SQLite库 依赖
- <!-- https://mvnrepository.com/artifact/org.xerial/sqlite-jdbc -->
- <dependency>
- <groupId>org.xerial</groupId>
- <artifactId>sqlite-jdbc</artifactId>
- <version>3.41.0.1</version>
- </dependency>
我用的就是上述版本
2. 在 appliaction.yml 配置文件中配置 库信息
- spring:
- datasource:
- url: jdbc:sqlite:./data/meta.db
- username:
- password:
- driver-class-name: org.sqlite.JDBC
上述依赖和配置处理好后,只要程序启动,那么就会自动进行建库操作,接下来就只需要进行建表操作即可。
存放在数据库中的只有三个:交换机、队列、绑定,那么我们只需要建这三种表即可。
3. 配置 MyBatis 文件
- mybatis:
- mapper-locations: classpath:mapper/**Mapper.xml
MyBatis 的配置信息已经提到了这个路径,所以需要去这个路径底下创建一个 interface
4. 创建一个 interface
关于交换机库操作只有这三种:添加交换机、查询所有交换机、删除交换机,分别对应各自的sql 语句。
1. 实现接口方法
同样我们需要先在 mapper 包下写接口方法
- void insertExchange(Exchange exchange);
- List
selectAllExchanges() ; - void deleteExchange(String exchangeName);
2. 在 xml 文件下实现 sql 语句
关于队列库操作也只有这三种:添加队列、查询所有队列、删除队列,分别对应各自的sql 语句;和交换机一样:
1. 实现接口方法
- void insertQueue(MSGQueue queue);
- List
selectAllQueues() ; - void deleteQueue(String queueName);
2. 在 xml 文件下实现 sql 语句
关于绑定库操作也只有这三种:添加绑定、查询所有绑定、删除绑定,分别对应各自的sql 语句;和交换机一样:
1. 实现接口方法
- void insertBinding(Binding binding);
- List
selectAllBindings() ; - void deleteBinding(Binding binding);
2. 在 xml 文件下实现 sql 语句