• RabbitMQ实现数据库与ElasticSearch的数据同步和分享文件过期处理


    🎈 1 参考文档

    RabbitMQ实现数据库与ElasticSearch的数据同步 | Hannya。-CSDN

    企业级开发项目实战——基于RabbitMQ实现数据库、elasticsearch的数据同步 | 波总说先赚它一个小目标-CSDN

    SPringBoot集成RabbitMQ实现30秒过期删除功能 | 军大君-CSDN


    🔍 2 个人需求

    1. 当进行文件上传、文件创建、文件重命名等操作时:

      通过RabbitMQ:

      • 生产者:文件服务,执行上传、创建、重命名等文件操作,将用户文件信息(例如文件名、文件ID等)发送到RabbitMQ新增队列。
      • 消费者:查询服务,监听RabbitMQ新增队列,一旦收到消息,将用户文件信息新增或更新到Elasticsearch中。
    2. 文件删除时:

      通过RabbitMQ:

      • 生产者:文件服务,执行文件删除操作,将用户文件ID发送到RabbitMQ删除队列。
      • 消费者:查询服务,监听 RabbitMQ 队列,一旦收到消息,通过用户文件ID从Elasticsearch中删除相应的用户文件信息。
    3. 根据文件名进行文件模糊查询:

      通过OpenFeign:

      • 生产者:文件服务,查询服务调用文件服务提供的OpenFeign接口,通过用户文件ID从查询该用户文件是否存在。
      • 消费者:查询服务,如果不存在,将数据根据用户文件ID从Elasticsearch中删除。
    4. 分享文件时间到期处理:

      通过RabbitMQ的TTL(生存时间) + 死信队列:

      • 生产者:文件服务, 使用TTL模拟一个“延时队列”,在文件分享时间到期后,将消息传递到死信队列。
      • 消费者:文件服务,死信监听器监听到之后,将分享文件的分享状态改为已过期状态。

    🔈3 声明

    只是提供思路,代码不是很完整,直接复制运行不了。

    最后面有完整网盘项目代码。


    🚀4 OpenFeign相关部分(查询服务)

    4.1 引入依赖

    
    <dependency>
        <groupId>com.alibaba.cloudgroupId>
        <artifactId>spring-cloud-starter-alibaba-nacos-discoveryartifactId>
    dependency>
    
    <dependency>
        <groupId>org.springframework.cloudgroupId>
        <artifactId>spring-cloud-starter-openfeignartifactId>
    dependency>
    
    <dependency>
        <groupId>org.springframework.cloudgroupId>
        <artifactId>spring-cloud-starter-loadbalancerartifactId>
    dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    4.2 application.yml

    spring:
      # nacos注册的服务名
      application:
        name: netdisk-search
      cloud:
        nacos:
          discovery:
            # 配置注册服务的IP地址
            server-addr: (IP地址):8848
            username: nacos
            password: nacos
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    4.3 FileFeignService 接口

    @FeignClient(name = "netdisk-file", configuration = FeignInterceptor.class)
    public interface FileFeignService {
        @RequestMapping("/file/getUserFile/{userFileId}")
        ResultResponse<Boolean> getUserFile(@PathVariable Long userFileId);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    4.4 @EnableFeignClients 注解

    @ComponentScan(value = "com.cauli.search.*")
    @EnableFeignClients(basePackages = "com.cauli.search")
    @SpringBootApplication
    public class NetdiskSearchApplication {
        public static void main(String[] args) {
            SpringApplication.run(NetdiskSearchApplication.class, args);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    🚀5 Elasticsearch相关部分(查询服务)

    5.1 引入依赖

    
    <dependency>
        <groupId>co.elastic.clientsgroupId>
        <artifactId>elasticsearch-javaartifactId>
        <version>8.0.1version>
    dependency>
    <dependency>
        <groupId>com.fasterxml.jackson.coregroupId>
        <artifactId>jackson-databindartifactId>
        <version>2.12.3version>
    dependency>
    <dependency>
        <groupId>jakarta.jsongroupId>
        <artifactId>jakarta.json-apiartifactId>
        <version>2.0.1version>
    dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    5.2 application.yml

    # elasticsearch相关的配置
    elasticsearch:
      # ES网关地址
      hostname: (IP地址)
      # ES网关端口
      port: 9200
      # ES网官方方案
      scheme: http
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    5.3 ElasticSearchConfig 配置类

    @Configuration
    public class ElasticSearchConfig {
        @Value("${elasticsearch.hostname}")
        String hostname;
    
        @Value("${elasticsearch.port}")
        int port;
    
        @Value("${elasticsearch.scheme}")
        String scheme;
    
        @Bean
        public ElasticsearchClient elasticsearchClient(){
            // 创建低级客户端
            RestClient client = RestClient.builder(new HttpHost(hostname, port,scheme)).build();
    		// 创建API客户端,使用Jackson映射器创建传输层
            ElasticsearchTransport transport = new RestClientTransport(client,new JacksonJsonpMapper());
            return new ElasticsearchClient(transport);
        }
    }	
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    5.4 Elasticsearch 服务类和服务实现类

    public interface ElasticsearchService {
        /**
         * 更新ES数据
         *
         * @param fileSearchDTO
         */
        void uploadES(FileSearchDTO fileSearchDTO);
    
        /**
         * 删除ES数据
         *
         * @param userFileId
         */
        void deleteES(Long userFileId);
    
    
        /**
         * 搜索ES数据
         *
         * @return
         */
        List<SearchFileVO> searchES(SearchFileQueryDTO searchFileVO);
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    @Slf4j
    @Service
    public class ElasticsearchServiceImpl implements ElasticsearchService {
        @Autowired
        private ElasticsearchClient elasticsearchClient;
    
        @Resource
        private FileFeignService feignService;
    
        private final ThreadPoolExecutor executor = new ThreadPoolExecutor(
                12, // 核心线程数
                20, // 最大线程数
                1, // 线程存活时间
                TimeUnit.SECONDS, // 存活时间单位
                new ArrayBlockingQueue<>(1000) // 任务队列
        );
    
        public void uploadES(FileSearchDTO fileSearchDTO) {
            executor.execute(() -> {
                try {
                    elasticsearchClient.index(i -> i.index("file_search")
                            .id(fileSearchDTO.getUserFileId())
                            .document(fileSearchDTO));
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            });
        }
    
        public void deleteES(Long userFileId) {
            executor.execute(() -> {
                try {
                    elasticsearchClient.delete(d -> d
                            .index("file_search")
                            .id(String.valueOf(userFileId)));
                } catch (Exception e) {
                    log.debug("ES删除操作失败,请检查配置");
                }
            });
        }
    
        @Override
        public List<SearchFileVO> searchES(SearchFileQueryDTO searchFileQueryDTO) {
            int pageNum = (int) searchFileQueryDTO.getPageNum() - 1;
            int pageSize = (int) (searchFileQueryDTO.getPageSize() == 0 ? 10 : searchFileQueryDTO.getPageSize());
    
            SearchResponse<FileSearchDTO> search = null;
            try {
                search = elasticsearchClient.search(s -> s
                        .index("file_search")
                        .query(_1 -> _1
                                .bool(_2 -> _2
                                        .must(_3 -> _3
                                                .bool(_4 -> _4
                                                        .should(_5 -> _5
                                                                .match(_6 -> _6
                                                                        .field("fileName")
                                                                        .query(searchFileQueryDTO.getFileName())))
                                                        .should(_5 -> _5
                                                                .wildcard(_6 -> _6
                                                                        .field("fileName")
                                                                        .wildcard("*" + searchFileQueryDTO.getFileName() + "*")))
                                                ))
                                        .must(_3 -> _3
                                                .term(_4 -> _4
                                                        .field("userId")
                                                        .value(StpUtil.getLoginIdAsLong())))
                                ))
                        .from(pageNum)
                        .size(pageSize)
                        .highlight(h -> h
                                .fields("fileName", f -> f.type("plain")
                                        .preTags("").postTags(""))
                                .encoder(HighlighterEncoder.Html)), FileSearchDTO.class);
            } catch (IOException e) {
                e.printStackTrace();
            }
    
            List<SearchFileVO> searchFileVOList = new ArrayList<>();
            if (search != null) {
                for (Hit<FileSearchDTO> hit : search.hits().hits()) {
                    SearchFileVO searchFileVO = new SearchFileVO();
                    BeanUtil.copyProperties(hit.source(), searchFileVO);
                    searchFileVO.setHighLight(hit.highlight());
                    searchFileVOList.add(searchFileVO);
    
                    // 如果文件不存在,也从ES中删除
                    if (!feignService.getUserFile(searchFileVO.getUserFileId()).getData()) {
                        executor.execute(() -> this.deleteES(searchFileVO.getUserFileId()));
    
                    }
                }
            }
            return searchFileVOList;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96

    5.5 ElasticsearchController 前端控制器

    @RestController
    @RequestMapping("/search")
    public class ElasticsearchController {
        @Autowired
        private ElasticsearchService elasticService;
    
        @GetMapping(value = "/searchFile")
        public RestResult<SearchFileVO> searchFile(SearchFileQueryDTO searchFileQueryDTO) {
            List<SearchFileVO> searchFileVOList = elasticService.searchES(searchFileQueryDTO);
            return RestResult.success().dataList(searchFileVOList, searchFileVOList.size());
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    5.6 相关实体类

    /**
     * 文件搜索VO
     */
    @Data
    public class SearchFileVO {
        @JsonSerialize(using = ToStringSerializer.class)
        private Long userFileId;
    
        private String fileName;
    
        private String filePath;
    
        private String extendName;
    
        private Long fileSize;
    
        private String fileUrl;
    
        private Map<String, List<String>> highLight;
    
        private Integer isDir;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    /**
     * 文件搜索DTO
     */
    @Data
    @JsonIgnoreProperties(ignoreUnknown = true)
    public class FileSearchDTO {
        private String indexName;
    
        private String userFileId;
    
        private String fileId;
    
        private String fileName;
    
        private String content;
    
        private String fileUrl;
    
        private Long fileSize;
    
        private Integer storageType;
    
        private String identifier;
    
        private Long userId;
    
        private String filePath;
    
        private String extendName;
    
        private Integer isDir;
    
        private String deleteTime;
    
        private String deleteBatchNum;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    /**
     * 文件查询条件DTO
     */
    @Data
    public class SearchFileQueryDTO {
        @ApiModelProperty("文件名")
        private String fileName;
    
        @ApiModelProperty("当前页")
        private long pageNum;
    
        @ApiModelProperty("每页数量")
        private long pageSize;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    🚀6 RabbitMQ相关部分

    6.1 生产者部分(文件服务)

    6.1.1 引入依赖

    
    <dependency>
        <groupId>com.alibaba.cloudgroupId>
        <artifactId>spring-cloud-starter-alibaba-nacos-discoveryartifactId>
    dependency>
    
    <dependency>
        <groupId>org.springframework.bootgroupId>
        <artifactId>spring-boot-starter-amqpartifactId>
    dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    6.1.2 完整application.yml

    server:
      port: 8083
    spring:
      # MySQL配置
      datasource:
        driver-class-name: com.mysql.cj.jdbc.Driver
        url: jdbc:mysql://127.0.0.1:3306/(数据库名)?characterEncoding=utf8&serverTimezone=Asia/Shanghai&useSSL=false&allowPublicKeyRetrieval=true
        username: root
        password: (MySQL密码)
      # nacos注册的服务名
      application:
        name: netdisk-file
      cloud:
        nacos:
          discovery:
            # 配置注册服务的IP地址
            server-addr: (IP地址):8848
            username: nacos
            password: nacos
      # rabbitmq相关的配置
      rabbitmq:
        host: (IP地址)
        port: 5672
        virtual-host: (虚拟主机名,比如:/file)
        username: (用户名,默认:guest)
        password: (密码,默认:guest)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    6.1.3 RabbitMQConfig 配置类

    @Configuration
    public class RabbitMQConfig {
        // 普通交换机
        public static final String FILE_EXCHANGE = "file.exchange";
    
        // 文件保存相关
        public static final String QUEUE_FILE_SAVE = "queue.file.save";
        public static final String KEY_FILE_SAVE = "key.file.save";
    
        // 文件删除相关
        public static final String QUEUE_FILE_REMOVE = "queue.file.remove";
        public static final String KEY_FILE_REMOVE = "key.file.remove";
    
        // 死信相关
        public static final String DEAD_LETTER_EXCHANGE = "deadLetter.exchange";
        public static final String DEAD_LETTER_QUEUE = "deadLetter.queue";
        public static final String KEY_FILE_DEAD_LETTER = "key.file.dead.letter";
    
        //延迟队列
        public static final String DELAY_QUEUE = "delay.queue";
    
        /**
         * 文件保存队列
         *
         * @return
         */
        @Bean
        public Queue queueFileSave() {
            return new Queue(QUEUE_FILE_SAVE);
        }
    
        /**
         * 文件删除队列
         *
         * @return
         */
        @Bean
        public Queue queueFileRemove() {
            return new Queue(QUEUE_FILE_REMOVE);
        }
    
        /**
         * 交换机
         *
         * @return
         */
        @Bean
        public TopicExchange topicExchange() {
            return new TopicExchange(FILE_EXCHANGE);
        }
    
        /**
         * 绑定文件保存队列到交换机
         *
         * @return
         */
        @Bean
        public Binding bindFileSave() {
            return BindingBuilder.bind(queueFileSave()).to(topicExchange()).with(KEY_FILE_SAVE);
        }
    
        /**
         * 绑定文件删除队列到交换机
         *
         * @return
         */
        @Bean
        public Binding bindFileRemove() {
            return BindingBuilder.bind(queueFileRemove()).to(topicExchange()).with(KEY_FILE_REMOVE);
        }
    
        /**
         * 定义延时队列
         *
         * @return
         */
        @Bean
        public Queue delayQueue() {
            //设置死信交换机和路由key
            return QueueBuilder.durable(DELAY_QUEUE)
                    //如果消息过时,则会被投递到当前对应的死信交换机
                    .withArgument("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE)
                    //如果消息过时,死信交换机会根据routing-key投递消息到对应的队列
                    .withArgument("x-dead-letter-routing-key", KEY_FILE_DEAD_LETTER)
                    .build();
        }
    
        /**
         * 定义死信交换机
         *
         * @return
         */
        @Bean
        public TopicExchange deadLetterExchange() {
            return new TopicExchange(DEAD_LETTER_EXCHANGE);
        }
    
        /**
         * 定义死信队列
         *
         * @return
         */
        @Bean
        public Queue deadLetterQueue() {
            return new Queue(DEAD_LETTER_QUEUE);
        }
    
        /**
         * 绑定死信队列到死信交换机
         *
         * @return
         */
        @Bean
        public Binding deadLetterBinding() {
            return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange()).with(KEY_FILE_DEAD_LETTER);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117

    6.1.4 FileDealComp 文件逻辑处理组件伪代码

    /**
     * 文件逻辑处理组件
     */
    @Slf4j
    @Component
    public class FileDealComp {
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        private final ThreadPoolExecutor executor = new ThreadPoolExecutor(
                12, // 核心线程数
                20, // 最大线程数
                1, // 线程存活时间
                TimeUnit.SECONDS, // 存活时间单位
                new ArrayBlockingQueue<>(1000) // 任务队列
        );
    
        /**
         * 更新ES数据
         *
         * @param userFileId
         */
        public void uploadES(Long userFileId) {
            executor.execute(() -> {
                FileSearchDTO fileSearchDTO = new FileSearchDTO();
                // 通过用户文件ID查询用户文件信息
                ...
                // 通过文件ID查询文件信息
                ...
                // 将用户文件信息和文件信息同步到fileSearchDTO对象
                ...  
               	// 消息队列更新ES
                rabbitTemplate.convertAndSend(RabbitMQConfig.FILE_EXCHANGE, RabbitMQConfig.KEY_FILE_SAVE, fileSearchDTO);
            });
        }
    
        /**
         * 删除ES数据
         *
         * @param userFileId
         */
        public void deleteES(Long userFileId) {
            // 消息队列删除ES
            rabbitTemplate.convertAndSend(RabbitMQConfig.FILE_EXCHANGE, RabbitMQConfig.KEY_FILE_REMOVE, userFileId);
        }
       
        /**
         * 分享文件过期处理
         *
         * @param shareBatchNum 分享批次号
         */
        public void expiredShareFile(String shareBatchNum) {
            Share share = new Share();
            // 根据分享批次号获取分享信息
            ...
            // 将分享信息同步到share对象
            ...
            // 定义日期格式
            SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            long differenceInMillis = 0;
            try {
                // 解析日期字符串为日期对象
                Date shareDate = sdf.parse(share.getShareTime());
                Date endDate = sdf.parse(share.getEndTime());
                // 计算时间差(毫秒数)
                differenceInMillis = endDate.getTime() - shareDate.getTime();
            } catch (ParseException e) {
                e.printStackTrace();
            }
    
            // 存活时间
            String expiration = Long.toString(differenceInMillis);
            // 延时队列
            rabbitTemplate.convertAndSend(RabbitMQConfig.DELAY_QUEUE, share, message -> {
                message.getMessageProperties().setExpiration(expiration);
                return message;
            });
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79

    6.1.5 ExpiredShareFileListener 过期的分享文件处理监听器

    @Slf4j
    @Component
    @RabbitListener(queues = "my-dlx-queue")
    public class ExpiredShareFileListener {
        @Autowired
        private ShareService shareService;
    
        // 死信相关
        public static final String DEAD_LETTER_EXCHANGE = "deadLetter.exchange";
        public static final String DEAD_LETTER_QUEUE = "deadLetter.queue";
        public static final String KEY_FILE_DEAD_LETTER = "key.file.dead.letter";
    
        @RabbitListener(bindings = {
                @QueueBinding(
                        key = KEY_FILE_DEAD_LETTER,
                        value = @Queue(value = DEAD_LETTER_QUEUE, durable = "true"),
                        exchange = @Exchange(value = DEAD_LETTER_EXCHANGE, type = ExchangeTypes.TOPIC, ignoreDeclarationExceptions = "true")
                )})
        public void receiveShareMessage(Share share) {
            log.info("监听到文件过期处理操作:{}", share);
    
            // 将share的分享状态改为已过期 → 将share的shareStatus由0改为1
            ...
    
            log.info("操作完成:{}", share);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    6.2 消费者部分(查询服务)

    6.2.1 引入依赖

    
    <dependency>
        <groupId>org.springframework.bootgroupId>
        <artifactId>spring-boot-starter-amqpartifactId>
    dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5

    6.2.2 完整application.yml

    server:
      port: 8084
    spring:
      # MySQL配置
      datasource:
        driver-class-name: com.mysql.cj.jdbc.Driver
        url: jdbc:mysql://127.0.0.1:3306/(数据库名)?characterEncoding=utf8&serverTimezone=Asia/Shanghai&useSSL=false&allowPublicKeyRetrieval=true
        username: root
        password: (MySQL密码)
      # nacos注册的服务名
      application:
        name: netdisk-search
      cloud:
        nacos:
          discovery:
            # 配置注册服务的IP地址
            server-addr: (IP地址):8848
            username: nacos
            password: nacos
      mvc:
        path match:
          matching-strategy: ant_path_matcher
      servlet:
        multipart:
          enabled: true
          # 单个文件最大限制
          max-file-size: 1024MB
          # 多个文件最大限制
          max-request-size: 2048MB
      # rabbitmq相关的配置
      rabbitmq:
        host: (IP地址)
        port: 5672
        virtual-host: (虚拟主机名,比如:/file)
        username: (用户名,默认:guest)
        password: (密码,默认:guest)
    
    # elasticsearch相关的配置
    elasticsearch:
      # ES网关地址
      hostname: (IP地址)
      # ES网关端口
      port: 9200
      # ES网官方方案
      scheme: http
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45

    6.2.3 FileMQListener 文件处理消息队列监听

    @Slf4j
    @Component
    public class FileMQListener {
        // 普通交换机
        public static final String FILE_EXCHANGE = "file.exchange";
    
        // 文件保存相关
        public static final String QUEUE_FILE_SAVE = "queue.file.save";
        public static final String KEY_FILE_SAVE = "key.file.save";
    
        // 文件删除相关
        public static final String QUEUE_FILE_REMOVE = "queue.file.remove";
        public static final String KEY_FILE_REMOVE = "key.file.remove";
    
        @Autowired
        private ElasticsearchService elasticsearchService;
    
        /**
         * 监听文件信息添加操作
         *
         * @param fileSearchDTO
         */
        @RabbitListener(bindings = {@QueueBinding(
                key = KEY_FILE_SAVE,
                value = @Queue(value = QUEUE_FILE_SAVE, durable = "true"),
                exchange = @Exchange(value = FILE_EXCHANGE, type = ExchangeTypes.TOPIC, ignoreDeclarationExceptions = "true"))})
        public void receiveFileSaveMessage(FileSearchDTO fileSearchDTO) {
            try {
                log.info("监听到文件信息添加操作:{}", fileSearchDTO);
                
                // 更新ES数据
                elasticsearchService.uploadES(fileSearchDTO);
                
                log.info("添加完成:{}", fileSearchDTO);
            } catch (Exception ex) {
                ex.printStackTrace();
            }
        }
    
        /**
         * 监听文件信息删除操作
         *
         * @param userFileId
         */
        @RabbitListener(bindings = {@QueueBinding(
                key = KEY_FILE_REMOVE,
                value = @Queue(value = QUEUE_FILE_REMOVE, durable = "true"),
                exchange = @Exchange(value = FILE_EXCHANGE, type = ExchangeTypes.TOPIC, ignoreDeclarationExceptions = "true"))})
        public void receiveFileDeleteMessage(Long userFileId) {
            try {
                log.info("监听到文件信息删除操作:{}", userFileId);
                
                // 删除ES数据
                elasticsearchService.deleteES(userFileId);
                
                log.info("文件信息删除完成:{}", userFileId);
            } catch (Exception ex) {
                ex.printStackTrace();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61

    📫7 代码仓库

    netdisk-cloud | Gitee

  • 相关阅读:
    MariaDB Tutorial
    tomcat探究二搭建简单的servlet容器
    CPU上下文切换
    【Linux】 - Linux中查看命令文档的命令
    获取依赖库的N种方法
    Java进阶:Docker
    2D物理引擎 Box2D for javascript Games 第四章 将力作用到刚体上
    C#教学辅助系统网站as.net+sqlserver
    讲解Windows系统中如何使用Python读取图片的元数据【Metadata】
    MIMO-UNet复现,DeepRFT复现及总结
  • 原文地址:https://blog.csdn.net/ManGooo0/article/details/132675938