• 从零手搓一个【消息队列】创建核心类, 数据库设计与实现



    创建 Spring Boot 项目, Spring Boot 2 系列版本, Java 8 , 引入 MyBatis, Lombok 依赖

    提示:是正在努力进步的小菜鸟一只,如有大佬发现文章欠佳之处欢迎批评指点~ 废话不多说,直接上干货!

    整体目录结构 :
    在这里插入图片描述

    本文主要实现 server 包


    一、创建核心类

    上篇文章 分析了项目需求, 介绍了项目中重要的核心概念和核心 API, 以及重要板块

    一个消息队列中需要的交换机, 队列, 绑定, 消息等核心概念, 以面向对象的思想, 在server.core 包下创建出来对应的类
    在这里插入图片描述


    1, 交换机

    @Data
    public class Exchange {
        // 身份标识(唯一, RabbitMQ 就是以 name 作为身份标识的)
        private String name;
    
        // 三种交换机类型
        private ExchangeTypeEnum type = ExchangeTypeEnum.DIRECT;
    
        // 是否需要持久化存储
        private boolean durable = false;
    
        // 是否(交换机没人使用时)自动删除 ------------------>先不实现
        private boolean autoDelete = false;
    
        // 创建交换机时, 指定的参数选项 ------------------>先不实现
        // 数据库中存储 String 类型, 需要序列化
        private Map<String, Object> arguments = new HashMap<>();
    
        /**
         * 实现序列化, 修改 getter()和 setter(), 供数据库使用
         */
        public String getArguments(){
            ObjectMapper objectMapper = new ObjectMapper();
            // 序列化 往数据库里写
            try {
                return objectMapper.writeValueAsString(arguments);
            } catch (JsonProcessingException e) {
                e.printStackTrace();
            }
            return "{}";
        }
    
        public void setArguments(String arguments) {
            ObjectMapper objectMapper = new ObjectMapper();
            // 反序列化 从数据库里读
            try {
                this.arguments = objectMapper.readValue(arguments, new TypeReference<HashMap<String, Object>>() {});
            } catch (JsonProcessingException e) {
                e.printStackTrace();
            }
        }
    
    
        /**
         * 便于 测试/ 代码内部调用 时使用
         */
        public Object getArguments(String key) {
            return arguments.get(key);
        }
    
        public void setArguments(String key, Object value) {
            arguments.put(key, value);
        }
    
        public void setArguments(Map<String, Object> arguments) {
            this.arguments = arguments;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58

    本项目未实现 autoDelete 和 arguments


    2, 交换机类型

    这是一个枚举类, 包含直接交换机, 扇出交换机, 主题交换机

    public enum ExchangeTypeEnum {
        DIRECT(0),
        FANOUT(1),
        TOPIC(2);
        private final int type;
    
        ExchangeTypeEnum(int type) {
            this.type = type;
        }
    
        public int getType() {
            return type;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    3, 队列

    类名不设为 Queue, 防止和标准库中的 Queue 冲突

    @Data
    public class MessageQueue {
        // 唯一标识
        private String name;
    
        // 是否需要持久化存储
        private boolean durable = false;
    
        // 是否为独有(如果是独有, 只能被一个消费者使用) ------------------>先不实现
        private boolean exclusive = false;
    
        // 是否(队列没人使用时)自动删除 ------------------>先不实现
        private boolean autoDelete = false;
    
        // 创建队列时, 指定的参数选项 ------------------>先不实现
        // 数据库中存储 String 类型, 需要序列化
        private Map<String, Object> arguments = new HashMap<>();
    
        /**
         * 实现序列化, 修改 getter()和 setter(), 供数据库使用
         */
        public String getArguments(){
            ObjectMapper objectMapper = new ObjectMapper();
            // 序列化 往数据库里写
            try {
                return objectMapper.writeValueAsString(arguments);
            } catch (JsonProcessingException e) {
                e.printStackTrace();
            }
            return "{}";
        }
    
        public void setArguments(String arguments) {
            ObjectMapper objectMapper = new ObjectMapper();
            // 反序列化 从数据库里读
            try {
                this.arguments = objectMapper.readValue(arguments, new TypeReference<HashMap<String, Object>>() {});
            } catch (JsonProcessingException e) {
                e.printStackTrace();
            }
        }
    
        /**
         * 便于 测试/ 代码内部调用 时使用
         */
        public Object getArguments(String key) {
            return arguments.get(key);
        }
    
        public void setArguments(String key, Object value) {
            arguments.put(key, value);
        }
    
        public void setArguments(Map<String, Object> arguments) {
            this.arguments = arguments;
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58

    暂不实现 exclusive, autoDelete, arguments


    4, 绑定

    @Data
    public class Binding {
        // 绑定的消息队列标识
        private String queueName;
    
        // 绑定的交换机标识
        private String exchangeName;
    
        // 绑定的 key
        private String bindingKey;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    bindingKey 是在创建交换机和队列的绑定时指定的, 生产者发布消息时, 需额外指定一个 routingKey
    如果是直接交换机, routingKey 作为队列的唯一标识
    如果是扇出交换机, routingKey 为 null, 无需使用
    如果是主题交换机, routingKey 需和 bindingKey 匹配


    5, 交换机转发 & 绑定规则

    在此先不展示, 在后续文章中对应的部分再展示 防止思路混淆


    6, 消息

    @Data
    public class Message implements Serializable {
        // 属性
        private BasicProperties basicProperties = new BasicProperties();
    
        // 正文
        private byte[] body;
    
        // 消息存储在文件中的偏移量(字节, 约定 "[,)" 区间 )
        private transient long offsetBegin = 0;
        private transient long offsetEnd = 0;
    
        // 是否合法(逻辑删除的标记, 0x1 有效, 0x0 无效)
        private byte isValid = 0x1;
    
    
        // 提供工厂方法, 封装 Message 类的创建过程
        public static Message createMessage(String routingKey, BasicProperties basicProperties, byte[] body) {
            Message message = new Message();
            if (basicProperties != null) {
                message.setBasicProperties(basicProperties);
            }
            message.setMessageId("M$" + UUID.randomUUID().toString().replace("-", ""));
            message.setRoutingKey(routingKey);
            message.setBody(body);
            return message;
        }
    
    	// 下面这些方法是为了封装 basicProperties 中的 getter()和 setter()
        public String getMessageId() {
            return basicProperties.getMessageId();
        }
    
        public void setMessageId(String id) {
            basicProperties.setMessageId(id);
        }
    
        public String getRoutingKey() {
            return basicProperties.getRoutingKey();
        }
    
        public void setRoutingKey(String key) {
            basicProperties.setRoutingKey(key);
        }
    
        public int getDeliverMode() {
            return basicProperties.getDeliverMode();
        }
    
        public void setDeliverMode(int value) {
            basicProperties.setDeliverMode(value);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • Message 需要实现 Serializable 接⼝. 后续需要把 Message 写⼊⽂件以及进⾏⽹络传输.
    • basicProperties 是消息的属性信息. body 是消息体.
    • offsetBeg 和 offsetEnd 表⽰消息在消息⽂件中所在的起始位置和结束位置. 这⼀块具体的设计后⾯再详细介绍. 使⽤ transient 关键字避免属性被序列化.
    • isValid ⽤来表⽰消息在⽂件中是否有效. 这⼀块具体的设计后⾯再详细介绍.
    • createMessage() 相当于⼀个⼯⼚⽅法, ⽤来创建⼀个 Message 实例. messageId 通过UUID 的⽅式⽣成.

    文件中的数据不相当于一个顺序表, 如果要真正删除一条消息, 是不是需要把后面的数据整体往前挪动? 这无疑是个低效操作, 因此, 对于 “删除消息” 这种高频操作, 逻辑删除显然是更优解, 但也不能让消息无限制的堆在文件中, 所以后面会参考 JVM 的 GC , 自主实现清理文件的功能


    7, 消息属性

    这个类作为Message 类的 引用类型的成员属性, 也需要实现 Serializable 接⼝, 否则 message 对象不能被序列化

    @Data
    public class BasicProperties implements Serializable {
        // 消息的唯一标识(UUID)
        private String messageId;
    
        // 和 bindingKey 匹配(如果交换机为 DIRECT, 该值就是队列名, 如果交换机为 FANOUT, 该值为 null )
        private String routingKey;
    
        // 是否要消息持久化( RabbitMQ 就是使用 1 表示不持久化, 2 表示持久化)
        private int deliverMode = 2;
    
        // ... 其他属性暂不考虑
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    二、数据库设计


    1, 使用 SQLite

    对于 Exchange, MSGQueue, Binding, 我们使⽤数据库进⾏持久化保存.

    此处我们使⽤的数据库是 SQLite, 是⼀个更轻量的数据库

    SQLite 只是⼀个动态库(当然, 官⽅也提供了可执⾏程序 exe), 我们在 Java 中直接引⼊ SQLite 依赖, 即可直接使⽤, 不必安装其他的软件.

    MySQL 是一个客户端服务器结构的程序, SQLite 相当于直接操作本地的硬盘文件

    • 在pom.xml文件中的 “dependencies” 标签中拷贝 :
    		<dependency>
    			<groupId>org.xerialgroupId>
    			<artifactId>sqlite-jdbcartifactId>
    			<version>3.41.0.1version>
    		dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 在 resource 目录下创建 application.yml 文件配置SQLite数据源, 拷贝:
    spring:
      datasource:
        url: jdbc:sqlite:./data/meta.db # 注意这个路径
        username:# 不需要
        password:# 不需要
        driver-class-name: org.sqlite.JDBC
    
    mybatis:
      mapper-locations: classpath:mapper/**Mapper.xml
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    数据库文件的位置就是 ./data/meta.db, 数据库的数据就在这里


    2, 使用 MyBatis

    实现 mapper 包
    在这里插入图片描述


    2.1, 创建 Interface

    在 server.mapper 包下定义一个 MetaMapper 接口, 需要提供 交换机, 队列, 绑定 的建表, 插入, 删除 , 查询的 API (抽象方法)

    不需要使用sql 语句建库, 创建出 ./data/meta.db 这个文件就相当于建库了, 后面再写创建文件的操作

    @Mapper
    public interface MetaMapper {
        /**
         * 建表
         */
        void createExchangeTable();
    
        void createQueueTable();
    
        void createBindingTable();
    
        /**
         * exchange 表
         */
        void insertExchange(Exchange exchange);
    
        void deleteExchange(String exchangeName);
    
        List<Exchange> selectAllExchanges();
    
        /**
         * queue 表
         */
        void insertQueue(MessageQueue queue);
    
        void deleteQueue(String queueName);
    
        List<MessageQueue> selectAllQueues();
    
        /**
         * binding 表
         */
        void insertBinding(Binding binding);
    
        void deleteBinding(Binding binding);
    
        List<Binding> selectAllBindings();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38

    2.2, 创建 xml 文件

    在 resource 目录下, 新建一个 mapper 包, 创建 MetaMapper.xml 文件, 在这个文件中编写 sql 语句, 实现上述在 MetaMapper 接口中的抽象方法

    在 xml 文件中拷贝:

    
    DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
    <mapper namespace="com.example.demo.server.mapper.MetaMapper">
    
    mapper>
    
    • 1
    • 2
    • 3
    • 4
    • 5

    其中 namespace 这个字段的值要对应刚才定义的 MetaMapper 接口 的路径


    • 建表(使用 update 标签即可)
        <update id="createExchangeTable">
            create table if not exists exchange (
                name varchar(50) primary key,
                type int,
                durable boolean,
                autoDelete boolean,
                arguments varchar(1024)
            );
        update>
    
        <update id="createQueueTable">
            create table if not exists queue (
            name varchar(50) primary key,
            durable boolean,
            exclusive boolean,
            autoDelete boolean,
            arguments varchar(1024)
            );
        update>
    
        <update id="createBindingTable">
            create table if not exists binding (
            exchangeName varchar(50),
            queueName varchar(50),
            bindingKey varchar(256)
            );
        update>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    • exchange 表的增删查的 sql
        <insert id="insertExchange" parameterType="com.example.demo.server.core.Exchange">
            insert into exchange values(#{name}, #{type}, #{durable}, #{autoDelete}, #{arguments});
        insert>
    
        <select id="selectAllExchanges" resultType="com.example.demo.server.core.Exchange">
            select * from exchange;
        select>
    
        <delete id="deleteExchange" parameterType="java.lang.String">
            delete from exchange where name = #{exchangeName};
        delete>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    • queue 表的增删查的 sql
    <insert id="insertQueue" parameterType="com.example.demo.server.core.MessageQueue">
            insert into queue values(#{name}, #{durable}, #{exclusive}, #{autoDelete}, #{arguments});
        insert>
    
        <select id="selectAllQueues" resultType="com.example.demo.server.core.MessageQueue">
            select * from queue;
        select>
    
        <delete id="deleteQueue" parameterType="java.lang.String">
            delete from queue where name = #{queueName};
        delete>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    • queue 表的增删查的 sql
    	<insert id="insertBinding" parameterType="com.example.demo.server.core.Binding">
            insert into binding values(#{exchangeName}, #{queueName}, #{bindingKey});
        insert>
    
        <select id="selectAllBindings" resultType="com.example.demo.server.core.Binding">
            select * from binding;
        select>
    
        <delete id="deleteBinding" parameterType="com.example.demo.server.core.Binding">
            delete from binding where exchangeName = #{exchangeName} and queueName = #{queueName};
        delete>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    三、硬盘管理 – 数据库

    实现 datacenter 包中的 DataBaseManager 类
    在这里插入图片描述

    datacenter 这个包中整合硬盘上的数据管理 + 内存上的数据管理
    硬盘上的数据管理又整合了 数据库中的数据管理 + 文件中的数据管理


    1, 创建 DataBaseManager 类

    成员属性需要 MetaMapper 的对象, 用来封装刚才编写的数据库的 API

    但并不使用 SpringBoot 的依赖注入 (@AutoWired), 而是使用以来查找的方式获取到 metaMapper

    public class DataBaseManager {
        private MetaMapper metaMapper;
      
    }
    
    • 1
    • 2
    • 3
    • 4

    在启动类中初始化容器

    @SpringBootApplication
    public class DemoApplication {
    	public static ConfigurableApplicationContext context;
    	public static void main(String[] args) throws IOException {
    		context = SpringApplication.run(DemoApplication.class, args);
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    2, init() 初始化数据库

    对于 DataBaseManager 类的初始化工作, 不仅仅是对成员属性的初始化, 而是需要一些额外的业务逻辑, 这种情况就不使用构造方法了, 而是单独定义一个方法

    初始化工作: metaMapper + 建库建表 + 插入默认数据

    如果是第一次启动服务器, 没有数据库则建库建表
    如果是重启服务器, 已有数据库则不做处理

    MyBatis 在第一次创建数据表的时候就会创建出 ./data/meta.db 这个文件, 但前提是要有 ./data 这个目录, 所以要先手动创建

        public void init() {
            this.metaMapper = DemoApplication.context.getBean(MetaMapper.class);
    
            if(!isDBExists()) {
                // 创建目录
                File file = new File("./data");
                file.mkdirs();
                // 创建数据表
                createTable();
                // 插入数据
                insertDefaultData();
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    3, insertDefaultData() 插入默认数据

    创建一个默认的交换机(直接交换机)

        public void insertDefaultData() {
            Exchange exchange = new Exchange();
            exchange.setName("");
            exchange.setType(ExchangeTypeEnum.DIRECT);
            exchange.setDurable(false);
            exchange.setAutoDelete(false);
            metaMapper.insertExchange(exchange);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    4, createTable() 创建数据表

        public void createTable() {
            metaMapper.createExchangeTable();
            metaMapper.createQueueTable();
            metaMapper.createBindingTable();
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    5, isDBExists() 数据库是否存在

    SQLite 是一个轻量级数据库, 操作 SQLite 相当于操作本地的硬盘文件, 所以检查数据库是否存在就是检查数据库文件是否存在

        public boolean isDBExists() {
            File file = new File("./data/meta.db");
            return file.exists();
        }
    
    • 1
    • 2
    • 3
    • 4

    6, deleteTables() 删除数据表

    同上, 删除数据库就是删除文件, 先删文件再删目录

        public void deleteTables(){
            File file = new File("./data/meta.db");
            file.delete();
    
            File dir = new File("./data");
            dir.delete();
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    7, 封装数据库的增删查操作

    public void insertExchange(Exchange exchange) {
            metaMapper.insertExchange(exchange);
        }
    
        public List<Exchange> selectAllExchanges() {
            return metaMapper.selectAllExchanges();
        }
    
        public void deleteExchange(String exchangeName) {
            metaMapper.deleteExchange(exchangeName);
        }
    
        public void insertQueue(MessageQueue queue) {
            metaMapper.insertQueue(queue);
        }
    
        public List<MessageQueue> selectAllQueues() {
            return metaMapper.selectAllQueues();
        }
    
        public void deleteQueue(String queueName) {
            metaMapper.deleteQueue(queueName);
        }
    
        public void insertBinding(Binding binding) {
            metaMapper.insertBinding(binding);
        }
    
        public List<Binding> selectAllBindings() {
            return metaMapper.selectAllBindings();
        }
    
        public void deleteBinding(Binding binding) {
            metaMapper.deleteBinding(binding);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35

    四、小结

    本文主要实现了两点 :

    • 1, 根据面向对象思想, 创建出了交换机, 队列, 绑定, 消息, 等核心概念的类
    • 2, 持久化存储 --> 硬盘管理 --> 数据库
      • 2.1, 数据库设计, 使用 SQLite, 并结合 MyBatis 编写了交换机, 队列, 绑定的建表, 增, 删, 查的 sql
      • 2.2, 使用 DataBaseManager 这个类管理数据库中的数据, 因为仅仅有sql语句不足以支撑所有的业务逻辑, 还需要对数据库的初始化, 判断存在, 删除等做进一步的封装

    篇幅有限, 目前为止, 持久化存储 --> 硬盘管理 --> 文件 这个板块还没实现

    下篇会实现消息在文件上的存储( 文件管理 : MessageFileManager 类)


  • 相关阅读:
    【BOOST C++ 19 应用库】(5)序列数据封装和优化
    理解STM32的低功耗模式
    RabbitMQ消息中间件
    Qt设置horizontal line 和vertical line的颜色
    力扣001-两数之和
    HSDC和独立生成树相关
    OSGeoLive 15.0 版本发布
    【VMware vCenter】使用Reduced Downtime Update (RDU)升级更新vCenter Server。
    【附源码】计算机毕业设计JAVA旅行指南网站
    基于Levy飞行策略的改进樽海鞘群算法-附代码
  • 原文地址:https://blog.csdn.net/yzhcjl_/article/details/133431202