• 从 0 到 1 ,手把手教你编写《消息队列》项目(Java实现) —— 创建项目 / 创建核心类



    一、创建SpringBoot项目

    在项目中添加这四个依赖!
    在这里插入图片描述


    二、创建核心类

    交换机 :Exchange
    队列 :Queue
    绑定关系: Binding
    消息 :Message
    这些核心类都存在于 BrokerServer 中.

    先创建出服务器与客户端的包.
    再在服务器中创建 core包,用来存放这些核心类.
    在这里插入图片描述


    创建 Exchange类

    首先考虑,咱们在此处共实现了三种交换机类型,所以咱们可以创建一个枚举类来表示交换机类型.

    /**
     * 表示交换机类型
     */
    public enum ExchangeType {
        DIRECT(0),
        FANOUT(1),
        TOPIC(2);
    
        private final int type;
    
        private ExchangeType(int type) {
            this.type = type;
        }
    
        public int getType() {
            return type;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    咱们再考虑,Exchange类中有哪些属性?

    • 1.name,当作交换机的唯一身份标识
    • 2.ExchangeType,表示交换机类型
    • 3.durable,表示这个交换机是否需要持久化存储
    • 4.autoDelete,表示该交换机在无人使用后,是否会自动删除
    • 5.arguments,表示后续的一些拓展功能
    /**
     * 表示一个交换机
     * 交换机的使用者是生产者
     */
    @Data
    public class Exchange {
        // 此处使用 name 作为交换机的身份标识,(唯一的)
        private String name;
    
        // 交换机类型,DIRECT,FANOUT,TOPIC
        private ExchangeType type = ExchangeType.DIRECT;
    
        // 该交换机是否要持久化存储,true表示要,false表示不要
        private boolean durable = false;
    
        // 如果当前交换机,没人使用了,就会自动删除
        // 这个属性暂时放在这(后续代码中没有实现,RabbitMQ中实现了)
        private boolean autoDelete = false;
    
        // arguments 表示的是创建交换机时指定的一些额外参数
        // 这个属性也暂时放在这(后续代码中没有实现,RabbitMQ中实现了)
        // 为了把这个 arguments 存到数据库中,需要将 arguments 转换为 json 格式的字符串
        private Map<String,Object> arguments = new HashMap<>();
    
    
        // 这里的 get set 用于与数据库交互使用
        public String getArguments() {
            ObjectMapper objectMapper = new ObjectMapper();
            try {
                // 将 arguments 按照 JSON 格式 转换成 字符串
                return objectMapper.writeValueAsString(arguments);
            } catch (JsonProcessingException e) {
                e.printStackTrace();
            }
            // 如果代码抛出异常,返回一个空的 json 字符串
            return "{}";
        }
    
        public void setArguments(String arguments) {
            ObjectMapper objectMapper = new ObjectMapper();
            try {
                // 将库中的 arguments 按照 JSON 格式解析,转换成 Map 对象
                this.arguments = objectMapper.readValue(arguments, new TypeReference<HashMap<String,Object>>() {});
            } catch (JsonProcessingException e) {
                e.printStackTrace();
            }
        }
    
        public void setArguments(Map<String,Object> arguments) {
            this.arguments = arguments;
        }
    
    
        // 这里的 get set ,用来更方便的获取/设置 arguments 中的键值对
        // 这一组 getter setter 是在Java内部代码使用的(比如测试的时候)
        public Object getArguments(String key) {
            return arguments.get(key);
        }
    
        public void setArguments(String key,Object value) {
            arguments.put(key, value);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63

    创建 MSGQueue类

    MSGQueue类中有哪些属性?
    与Exchange类大差不差.
    直接贴代码

    /**
     * 表示一个存储消息的队列
     * MSG =》Message
     * 消息队列的使用者是消费者
     */
    @Data
    public class MSGQueue {
        // 表示队列的身份标识
        private String name;
    
        // 表示队列是否持久化
        private boolean durable = false;
    
        // true -> 这个队列只能被一个消费者使用,false -> 大家都能使用这个队列
        // 后续代码不实现相关功能
        private boolean exclusive = false;
    
        // true -> 没人使用后,自动删除,false -> 没人使用,不自动删除
        private boolean autoDelete = false;
    
        // 表示扩展参数,后续代码没有实现
        private Map<String,Object> arguments = new HashMap<>();
    
    
        public String getArguments() {
            ObjectMapper objectMapper = new ObjectMapper();
            try {
                return objectMapper.writeValueAsString(arguments);
            } catch (JsonProcessingException e) {
                e.printStackTrace();
            }
            return null;
        }
    
        public void setArguments(String arguments) {
            ObjectMapper objectMapper = new ObjectMapper();
            try {
                this.arguments = objectMapper.readValue(arguments, new TypeReference<HashMap<String,Object>>() {});
            } catch (JsonProcessingException e) {
                e.printStackTrace();
            }
        }
        public void setArguments(Map<String,Object> arguments) {
            this.arguments = arguments;
        }
    
        public Object getArguments(String key) {
    
            return arguments.get(key);
        }
    
        public void setArguments(String key,Object value) {
            arguments.put(key, value);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55

    创建 Binding类

    /**
     * 表示队列和交换机之间的绑定关系
     */
    @Data
    public class Binding {
        private String exchangeName;
        private String queueName;
        // 主题交换机的匹配key
        private String bindingKey;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    创建Message类

    Message类,大致可以分为三个部分.

    • 消息自身的属性
    • 消息的正文
    • 消息的持久化存储所需属性

    我们新建一个 BasicProperties 类来表示 消息的属性.

    /**
     * 这个类表示消息的属性
     */
    @Data					// 实现 Serializable 接口是为了后续的序列化操作
    public class BasicProperties implements Serializable {
        // 消息的唯一身份标识
        private String messageId;
    
        // 如果当前交换机是 DIRECT,此时 routingKey 表示要转发的队列名
        // 如果当前交换机是 FANOUT,此时 routingKey 无意义
        // 如果当前交换机是 TOPIC,此时 routingKey 就要和bindingKey进行匹配,匹配成功才转发给对应的消息队列
        private String routingKey;
    
        // 这个属性表示消息是否要持久化,1表示不持久化,2 表示持久化
        private int deliverMode = 1;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    持久化存储会在下面讲到,莫慌.

    /**
     * 这个类表示一个消息
     */
    @Data					// 实现 Serializable 接口是为了后续的序列化操作
    public class Message implements Serializable {
        // 消息的属性
        private BasicProperties basicProperties = new BasicProperties();
        // 消息的正文
        private byte[] body;
    
        // 相当于消息的版本号,主要针对 Message 类有改动后,再去反序列化之前旧的 message时,可能会出现错误
        // 因此引入消息版本号,如果版本号不匹配,就不允许反序列化直接报错,来告知程序猿,后续代码中并未实现该功能
        private static final long serialVersionUid = 1L;
    
    
    
        // 下面的属性是持久化存储需要的属性
    
        // 消息存储到文件中,使用一下两个偏移量来确定消息在文件中的位置 [offsetBeg,offsetEnd)
        // 这两个属性不需要 序列化 存储到文件中,存储到文件中后位置就固定了,
        // 这两个属性的作用是让 内存 中的 message 能顺利找到 文件 中的 message
        // 被 transient 修饰的属性,不会被 标准库 的 序列化方式 序列化
        private transient long offsetBeg = 0; // 消息数据的开头举例文件开头的位置偏移(字节)
        private transient long offsetEnd = 0; // 消息数据的结尾举例文件开头的位置偏移(字节)
    
    
        // 使用这个属性表示该消息在文件中是否是有效信息(逻辑删除)
        // 0x1表示有效,0x0表示无效
        private byte isValid = 0x1;
    
        // 创建工厂方法,让工厂方法封装 new Message 对象的过程
        // 该方法创建的 Message 对象,会自动生成唯一的MessageId
        public static Message createMessageWithId(String routingKey,BasicProperties basicProperties,byte[] body) {
            Message message = new Message();
            if (basicProperties != null) {
                message.setBasicProperties(basicProperties);
            }
            message.basicProperties.setRoutingKey(routingKey);
            // 此处生成的 MessageId 以 M- 作为前缀
            message.setMessageId("M-" + UUID.randomUUID());
            message.setBody(body);
    
            // 此处先将 message的核心部分 basicProperties 与 body设置了
            // 而 offsetBeg,offsetEnd,isValid,这些属性是持久化时才设置的
            return message;
        }
    
    
        // 直接获取消息id
        public String getMessageId() {
            return basicProperties.getMessageId();
        }
        // 直接更改消息id
        public void setMessageId(String messageId) {
            basicProperties.setMessageId(messageId);
        }
        // 直接获取 消息的key
        public String getRoutingKey() {
            return basicProperties.getRoutingKey();
        }
        // 直接更改 消息的key
        public void setRoutingKey(String routingKey) {
            basicProperties.setRoutingKey(routingKey);
        }
        // 直接获取 消息的是否持久化存储字段
        public int getDeliverMode() {
            return basicProperties.getDeliverMode();
        }
        // 直接修改 消息的是否持久化存储字段
        public void setDeliverMode(int mode) {
            basicProperties.setDeliverMode(mode);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73

    在这里插入图片描述

    这些核心类就都建好了,下篇文章就来考虑他们的持久化存储与内存存储!

  • 相关阅读:
    React antd Select 无法在disabled状态下选择并复制输入框内的内容情况分析及解决方案
    java集合
    MySQL-有锁读
    LeetCode116. Populating Next Right Pointers in Each Node
    数据类型详解
    基于springboot的房屋租赁系统
    golang结构与接口方法实现与交互使用示例
    SpringBoot+Dubbo+Nacos 开发实战教程
    CISP模拟试题(三)
    XC6206P332MR(0.25V低压差线性LDO稳压器,稳压输出3.3V,最大电压输入6V,输出电流250mA)
  • 原文地址:https://blog.csdn.net/The_emperoor_man/article/details/133486185