• 仿牛客网项目第五,六章:异步消息系统和分布式搜索引擎(详细步骤和思路)


    1. Kafka:构建TB级异步消息系统

    1.0 同步/异步消息的区别

    1. 同步消息系统:
      1)定义:双向通信,需要保证发送方向接受者发送消息,接受者接收消息,这个时间段内两边不能做其他的事情,需要同时在线。发送程序和接收程序必须同步运行,否则会发送失败
      2)**优缺点:**时效性较强,可以立即得到结果;耦合度高,性能和吞吐能力下降。
      3)应用场景:时序上有严格执行关系; 需要进行原子操作;没有耗时的操作
      4)实现方式消息处理=BIO(同步阻塞IO)与NIO(同步非阻塞IO),消息传输=TCP/IP与UDP/IP。Socket进行实现。

    2. 异步消息系统:
      1)定义:单向通信,发送端发送消息不需要接收方在线,可以后续推送。
      2)优缺点:执行效率高,节约时间,耦合度低;缺点是不利于对流程进行控制,且实现更加复杂。
      3)应用场景:没有严格时序关系,没有原子操作,有耗时的操作。
      4)实现方式:AOI(异步阻塞IO);消息队列,Kafka

    1.1 项目的目的

    **异步消息系统:**以异步方式实现消息的传递。

    1. 需要实现功能:当其他用户给你点赞,发帖子和评论的时候,你能够在系统消息栏接收到消息。
    2. 实现方法有很多种,比较传统的是阻塞队列实现,大型项目中常常使用Kafka进行实现。

    1. 2 阻塞队列实现异步消息系统

    1. 目的:解决线程通信的问题,比较原始的方法。

    2. 问题:生产者和消费者两者消耗不匹配,会占用系统资源,故需要阻塞队列(生产者生产一些消息后,如果消费者还没接收,就会存入阻塞队列中存起来,之后生产者就去做别的事情了,消费者慢慢消费)

    3. 使用对象:BlockingQueue(使用其接口)
      (1):解决线程通信问题
      (2):阻塞方法:put和take

    4. 生产者和消费者模式
      (1):生产者:产生数据的线程
      (2):消费者:使用数据的线程

    5. 消息队列实现类
      (1):ArrayBlockingQueue:测试中用到的
      (2):LinkedBlockingQueue
      (3):PriorityBlockingQueue,SynchronousQueue,DelayQueue等

    6. 阻塞队列实现消息队列步骤
      1)生产者实现Runnable接口,重写run方法,通过put把消息放进去。
      2)消费者实现Runnable接口,重写run方法,通过take接收消息。
      3)调用函数,新建一个消息队列,BlockingQueue。新建一个线程生产者,对消息进行生产放入消息队列中。新建多干个线程消费者,把消息队列中的消息进行输出。

    1.4 Kafka入门

    1. Kafka简介:
      (1):是一个分布式的流媒体平台
      (2):应用:消息系统,日志收集,用户行为追踪和流式处理
    2. Kafka特点
      (1):高吞吐量;消息持久化(存在数据库中);高可靠性;高扩展性
    3. Kafka术语

    (1)Broker:服务器
    (2)Zookeeper:独立软件,管理集群(集群中,不同节点的通信)
    (3)Topic:发布订阅模式的空间
    (4)Partition:对Topic的分区,提高并发能力
    (5)Offset:消息在分区内存存放的索引序列
    (6)Leader Replica:主副本,调用数据
    (7)FollowerReplica:从副本,主要是对主副本的文件备份,提高容错率。
    5. 使用kafka
    1)安装程序
    2)配置zookeeper.properties与service.properties
    3)启动zookeeper:使用之前的配置文件
    F:\kafka\kafka>bin\windows\zookeeper-server-start.bat config\zookeeper.properties
    4)启动kafka:使用之前的配置文件
    F:\kafka\kafka>bin\windows\kafka-server-start.bat config\server.properties

    1.5 Spring整合Kafka

    1. 引入依赖:spring-kafka
    2. 配置Kafka:配置server和consumer(生产者和消费者)
    spring.kafka.bootstrap-servers=localhost:9092
    spring.kafka.consumer.group-id=community-consumer-group
    spring.kafka.consumer.enable-auto-commit=true
    spring.kafka.consumer.auto-commit-interval=3000
    
    • 1
    • 2
    • 3
    • 4
    1. 访问Kafka:
      (1):生产者(主动发消息):
     public void sendMessage(String topic, String content) {
            kafkaTemplate.send(topic, content);
        }
    
    • 1
    • 2
    • 3

    (2):消费者(被动调用,接收消息):(使用注解表明主题)

    @KafkaListener(topics = {"test"})
        public void handleMessage(ConsumerRecord record) {
            System.out.println(record.value());
        }
    
    • 1
    • 2
    • 3
    • 4

    1.6 发送系统通知

    1.6.1 介绍

    1. 为什么使用消息队列
      1)如果采用消息队列,那么评论、点赞、关注三类不同的事,可以定义三类不同的主题(评论、点赞、关注)
      2)发生相应的事件可以将其包装成一条消息放入对应的队列里。那么当前的线程可以继续处理下一条消息,
      3)不用处理后续的业务(后续由消费者线程处理)
      4)面向事件驱动编程(管道中传递的是事件)
    2. 触发事件:
      (1):评论后,发布通知
      (2):点赞后,发布通知
      (3):关注后,发布通知
    3. 处理事件:
      (1):封装事件对象,而不是字符串,更加灵活
      (2):开发事件的生产者
      (3):开发事件的消费者

    1.6.2 实现发送消息和接收消息功能

    1. 创建Event实体类:上面三个事件其实很相似,故可以抽象出一个事件实体类(传递的信息对象就是一个个Event实体对象
    1. 新建Event文件夹,常见方法:EventProducer和EventConsumer
    2. EventProducer
      1)kafkaTemplate.send(event.getTopic(), JSONObject.toJSONString(event));
      2)只有一个方法,调用kafka,传递主题和信息,然后转换为json数据
      3)传递的信息为event对象(即上面定义的)
    1. EventConsumer
        //核心方法:调用kafka的接收消息功能
        @KafkaListener(topics = {TOPIC_COMMENT, TOPIC_LIKE, TOPIC_FOLLOW})
        public void handleCommentMessage(ConsumerRecord record) {
    
    • 1
    • 2
    • 3

    1)核心方法:调用的是kafka的接收消息功能,对象为recoder
    2)具体:把recoder转化为Event对象,然后通过这个对象获取事件到的基本信息,封装成一个message,存入到message数据库表格里面。
    3)其中重要信息为:把event的data数据保存好(即在显示栏会显示的信息),然后封装到message中的content里面,便于展示。

    1.6.3 发送消息和接收消息功能(三类)

    1. 修改CommentController:发送评论
      (1):需要把评论的数据封装为一个envet对象,然后调用producer方法。这里需要显示跳转到评论,故可以写一个方法,根据评论的id获取评论信息。

    2. 修改LikeController:点赞*
      (1):需要把点赞的数据封装为一个envet对象,然后调用producer方法。这里需要显示跳转到帖子,这里在点赞开头多传入一个参数,即postId。即从前端页面多返回个帖子的id。

    3. 修改FollowController:关注*
      (1):需要把关注的数据封装为一个envet对象,然后调用producer方法。关注的是人,不需要显示额外信息。

    4. 运行注意事项
      1)如果kafka运行不正常,把数据中的kafka日志删除

    1.7 显示系统通知()

    1.7.1 通知列表:显示评论,点赞和关注三类通知

    1. Dao层,MessageMapper,并书写sql进行实现
       // 查询某个主题下最新的通知
        Message selectLatestNotice(int userId, String topic);
        // 查询某个主题所包含的通知数量
        int selectNoticeCount(int userId, String topic);
        // 查询未读的通知的数量
        int selectNoticeUnreadCount(int userId, String topic);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    1. Service层:MessageService进行书写。直接调用Dao层,进行方法的实现。
    2. Controller层:MessageController层,书写请求:getNoticeList
      1)查询评论类通知
      2)查询点赞类通知
      3)查询关注类通知
      4)把上述数据返回给前端,就能够在消息通知进行展示数据了。

    1.7.2. 通知详情:分页显示某一类主题所包含的通知

    1. Dao层:MessageMapper,并书写sql进行实现
    // 查询某个主题所包含的通知列表
     List<Message> selectNotices(int userId, String topic, int offset, int limit);
    
    • 1
    • 2
    1. Service层:MessageService进行书写。直接调用Dao层,进行方法的实现。
    2. Controller层:MessageController层,书写请求:getNoticeDetail
      1)对不同类事件的详情页进行展示。对展示出来的信息标记为已读。

    1.7.3. 未读消息:在页面头部显示所有的未读消息数量

    1. 通过拦截器进行计算,因为在每一个登录后的页面,都会显示未读消息,故在每一次访问页面的时候,都会进行统计未读消息的个数,显示在页面上面。
      1)新建:新建一个拦截器MessageIntercepter
      2)实现方法:拦截器实现HandlerInceptor方法
      3)重写posthandle方法:即在调用Controller之前进行操作
      4)重写内容:先判断是否已经登陆了,再计算私信和系统通知的未读消息,然后传入前端,进行数据展示。
      5)配置:在拦截器的配置文件(WebMvcConfig)中书写拦截的范围。

    2. Elasticsearch,分布式搜索引擎

    2.0 为什么要搜索引擎

    1. 数据库进行搜索有什么缺陷?
      1)数据库搜索可以针对索引进行搜索速度非常快。
      2)如果进行模糊搜索,不走索引,就会很慢。
      3)对于一个句子,数据库无法进行搜索,并且提取中间的分词进行搜索
    2. Elasticsearch的优势
      1)能够对一个句子进行分词,然后根据所分得词进行联合搜索。
      2)由于Elasticsearch是分布式的,所以需要从各个节点都拉取对应的数据,然后最终统一合成给客户端
    3. Elasticsearch如何工作
      1)可以实现快速的“模糊匹配”/“相关性查询”,实际上是你写入数据到Elasticsearch的时候会进行分词。

    2.1 Elasticsearch(分布式搜索)

    1. Elasticserach简介
      (1):一个分布式的,Restful风格的搜索引擎
      (2):支持对各种类型数据的检索
      (3):搜索速度快,可以提供实时的搜索服务
      (4):便于水平扩展,每秒可以处理PB级的海量数据
    2. Elasticserach术语

      (1):索引Index:=MySQL中的数据库Database
      (2):索引Index类型:=MySQL中的表Table
      (3):文档:=MySQL中的数据行Row
      (4):字段:=MySQL中的数据列Column
      (5):集群:若干个节点构成集群,对外提供索引和搜索功能。有共同的cluster.name。
      (6):节点:一个集群若干个节点,节点即运行Elasticserach的机器。分为主节点和副节点。如果主节点挂了,会选举出一个新的主节点。
      (7):分片:一个index类型即表能够存储在多个节点中,这种操作叫做分片。四个接待您
      (8):副本:分片分为主分片和副分片,实现高可用性。

    (9)写入数据的流程:
    新来一个数据,是写入到主分片的,副本分片会备份主分片的信息。

    2.2 Spring整合Elasticsearch

    1. 引入依赖:spring-boot-starter-data-elasticserach
    2. 配置Elasticsearch:cluster-name集群名字; cluster-nodes节点(是分布式的,不过这里这有一个节点,说明IP地址和端口号9300(TCP端口);注:9200(HTTP端口)
    3. 注意:redis和Elasticsearch都是基于netty的,故需要配置一下,不然会报错。
    4. Spring Data Elasticsearch(查找帖子的两种方法)
      (1):Elasticsearch Template
      (2):Elasticsearch Repository(简单,把帖子转存到Elasticsearch服务器中)
    5. ES链接MYSQL
      (1)修改帖子的实体类:加上注解Document,然后就会直接把帖子的表格映射到es服务器中。
      (2)字段:添加与es中类型相匹配。尤其注意两个字段title和content,后续主要也是靠这两个进行检索,需要添加存储和搜索的分词器。
    @Document(indexName = "discusspost", type = "_doc", shards = 6, replicas = 3)
    indexName:索引名字,type:类型;shards:分片;replicas :副本
    作用:实体DiscussPost和es中的索引discusspost有了对应的关系。
    
    • 1
    • 2
    • 3
    @Document(indexName = "discusspost", type = "_doc", shards = 6, replicas = 3)
    public class DiscussPost {
    
        @Id
        private int id;
    
        @Field(type = FieldType.Integer)
        private int userId;
    
        // 互联网校招(重要:因为就靠这个搜索)
        //analyzer:存储使用的分词器;searchAnalyzer:搜索使用的分词器
        @Field(type = FieldType.Text, analyzer = "ik_max_word", searchAnalyzer = "ik_smart")
        private String title;
    
        @Field(type = FieldType.Text, analyzer = "ik_max_word", searchAnalyzer = "ik_smart")
        private String content;
    
        @Field(type = FieldType.Integer)
        private int type;
    
        @Field(type = FieldType.Integer)
        private int status;
    
        @Field(type = FieldType.Date)
        private Date createTime;
    
        @Field(type = FieldType.Integer)
        private int commentCount;
    
        @Field(type = FieldType.Double)
        private double score;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    1. 针对es的Dao层
      1)只需要新建一个接口,并且继承子接口,里面已经封装好了ES的增删改查操作。其中两个参数分别为:对应的实体类和主键的类型(Integer)
    @Repository
    public interface DiscussPostRepository extends ElasticsearchRepository<DiscussPost, Integer> {
    }
    
    • 1
    • 2
    • 3

    2)访问ES的逻辑:在ES中搜索,数据来源是mysql数据库里面,然后转存到ES服务器的数据库里;后面的话会先做初始化操作,即把mysql中的数据全部转存到ES服务器中,以后如果mysql有新增,在es没有所搜到的时候,会先存到es中去。
    3)增加数据:调用DiscussPostRepository的save方法。(PostMan中的URL:localhost:9230/discusspost/_doc/231,指向对应的索引名字和类型)
    4)更新数据:修改一点内容,再次save存储,会进行覆盖。
    5)删除:进行删除操作,deleteById和deleteAll。
    6)查找(有两个类Dao层可选:Elasticsearch RepositoryElasticsearch Template
    (1)通过方法1:Elasticsearch Repository
    (2)指定搜索的对象searchQuery:内容withQuery,排序方法ithSort,分页查询withPageable,哪些属性高亮显示withHighlightFields。
    (3)discussRepository.search(searchQuery):通过discussRepository的search,获取搜索结果。
    (4)缺点:找到了高亮内容,但是不直接范围,而是分为两部分数据返回。

        @Test
        public void testSearchByRepository() {
            SearchQuery searchQuery = new NativeSearchQueryBuilder()
                    .withQuery(QueryBuilders.multiMatchQuery("互联网寒冬", "title", "content"))
                    .withSort(SortBuilders.fieldSort("type").order(SortOrder.DESC))
                    .withSort(SortBuilders.fieldSort("score").order(SortOrder.DESC))
                    .withSort(SortBuilders.fieldSort("createTime").order(SortOrder.DESC))
                    .withPageable(PageRequest.of(0, 10))
                    .withHighlightFields(
                            new HighlightBuilder.Field("title").preTags("").postTags(""),
                            new HighlightBuilder.Field("content").preTags("").postTags("")
                    ).build();
    
            // elasticTemplate.queryForPage(searchQuery, class, SearchResultMapper)
            // 底层获取得到了高亮显示的值, 但是没有返回.
    
            Page<DiscussPost> page = discussRepository.search(searchQuery);
            System.out.println(page.getTotalElements());
            System.out.println(page.getTotalPages());
            System.out.println(page.getNumber());
            System.out.println(page.getSize());
            for (DiscussPost post : page) {
                System.out.println(post);
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    -----------------------------------
    
    • 1

    (2)通过方法2:Elasticsearch Template
    (2)指定搜索的对象searchQuery:内容withQuery,排序方法ithSort,分页查询withPageable,哪些属性高亮显示withHighlightFields。(和上一个方法一样)
    (3)Page page = elasticTemplate.queryForPage(searchQuery, DiscussPost.class, new SearchResultMapper() :重写queryForPage方法

    @Test
        public void testSearchByTemplate() {
            SearchQuery searchQuery = new NativeSearchQueryBuilder()
                    .withQuery(QueryBuilders.multiMatchQuery("互联网寒冬", "title", "content"))
                    .withSort(SortBuilders.fieldSort("type").order(SortOrder.DESC))
                    .withSort(SortBuilders.fieldSort("score").order(SortOrder.DESC))
                    .withSort(SortBuilders.fieldSort("createTime").order(SortOrder.DESC))
                    .withPageable(PageRequest.of(0, 10))
                    .withHighlightFields(
                            new HighlightBuilder.Field("title").preTags("").postTags(""),
                            new HighlightBuilder.Field("content").preTags("").postTags("")
                    ).build();
    
            Page<DiscussPost> page = elasticTemplate.queryForPage(searchQuery, DiscussPost.class, new SearchResultMapper() {
                @Override
                public <T> AggregatedPage<T> mapResults(SearchResponse response, Class<T> aClass, Pageable pageable) {
                    SearchHits hits = response.getHits();
                    if (hits.getTotalHits() <= 0) {
                        return null;
                    }
    
                    List<DiscussPost> list = new ArrayList<>();
                    for (SearchHit hit : hits) {
                        DiscussPost post = new DiscussPost();
    
                        String id = hit.getSourceAsMap().get("id").toString();
                        post.setId(Integer.valueOf(id));
    
                        String userId = hit.getSourceAsMap().get("userId").toString();
                        post.setUserId(Integer.valueOf(userId));
    
                        String title = hit.getSourceAsMap().get("title").toString();
                        post.setTitle(title);
    
                        String content = hit.getSourceAsMap().get("content").toString();
                        post.setContent(content);
    
                        String status = hit.getSourceAsMap().get("status").toString();
                        post.setStatus(Integer.valueOf(status));
    
                        String createTime = hit.getSourceAsMap().get("createTime").toString();
                        post.setCreateTime(new Date(Long.valueOf(createTime)));
    
                        String commentCount = hit.getSourceAsMap().get("commentCount").toString();
                        post.setCommentCount(Integer.valueOf(commentCount));
    
                        // 处理高亮显示的结果
                        HighlightField titleField = hit.getHighlightFields().get("title");
                        if (titleField != null) {
                            post.setTitle(titleField.getFragments()[0].toString());
                        }
    
                        HighlightField contentField = hit.getHighlightFields().get("content");
                        if (contentField != null) {
                            post.setContent(contentField.getFragments()[0].toString());
                        }
    
                        list.add(post);
                    }
    
                    return new AggregatedPageImpl(list, pageable,
                            hits.getTotalHits(), response.getAggregations(), response.getScrollId(), hits.getMaxScore());
                }
            });
    
            System.out.println(page.getTotalElements());
            System.out.println(page.getTotalPages());
            System.out.println(page.getNumber());
            System.out.println(page.getSize());
            for (DiscussPost post : page) {
                System.out.println(post);
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    1. 综合
      (1)使用Elasticsearch Repository的save,delete等功能完成增删改操作。不推荐使用其search功能,因为高亮词汇不能正常显示。会分批进行输出。
      (2)使用Elasticsearch Template,重写queryForPage功能,完成增删改操作。

    3.3 开发社区搜索功能

    3.3.1 理论步骤

    1. 搜索服务:
      (1):新建帖子时,将帖子保存至Elasticsearch 服务器
      (2):从Elasticsearch 服务器删除帖子
      (3):从Elasticsearch 服务器搜索帖子(核心是为了搜索功能)
    2. 发布事件
      (1):发布帖子时,将帖子异步的提交到Elasticsearch 服务器
      (2):增加评论的时候,将帖子异步的提交到Elasticsearch 服务器
      (3):在消费主键中添加一个方法,消费帖子发布事件
      (注意):异步方式,不用等待,并行实现功能。
    3. 显示结果
      (1):在控制器中处理搜索请求,在HTML上显示搜索结果。

    3.3.2 实际步骤

    1. Service层(之前展示了Dao层)
      1)service层ElasticsearchService:
      (1)新增,删除和修改:向es服务器提交数据,即为新增;再提交一次则为修改。使用Elasticsearch Repository的save,delete。
      (2)搜索:使用Elasticsearch Repository,重写其中queryForPage方法。
      2)修改发帖和评论部分,增加消息发布事件(采用异步,并行处理,加强效率)
      (1)在DiscussPostController中,触发事件发布事件
      (2)具体内容:eventProducer.fireEvent(event);event里面存入帖子的内容和发布的主题。
      (3)在CommentController中,触发帖子评论发布事件
      (2)具体内容:eventProducer.fireEvent(event);event里面存入帖子的内容和发布的主题。
      3)消费发帖:EventConsumer中新建handlePublishMessage
      (1)目的:针对kafka中主题为TOPIC_PUBLISH的事件消息进行消费。
      (2)逻辑:先参数校验,然后存入es的数据库中。
    2. Controller层(调用)
      1)新建Controller:SearchController
      (1)目的:展现所有结果的展示列表。
      (2)逻辑:通过输入需要搜索的关键词,调用搜索的方法,得到搜索的内容,进行封装,展示给页面。
      (3)注意:一开始展示的页面时第一页,但是后面如果想更改页面,那么就需要在前端修改page页的内容,传回给后端,后端收到了进行换页展示。
    3. 前端的数据展示调用(这里就省略一下了)

    文章参考:
    牛客网Java项目:https://www.nowcoder.com/study/live/246
    代码记录:https://gitee.com/xkh-dasiy/newcoder
    漫画形式说明Linux的五种IO模型
    Kafka【入门】就这一篇!
    计网 - 网络 I/O 模型:BIO、NIO 和 AIO 有什么区别?
    Elasticsearch术语及概念熟悉
    什么是 Elasticsearch?一篇搞懂

  • 相关阅读:
    为什么要打日志?怎么打日志?打什么日志?
    详解LockSupport的使用
    腾讯云国际站-阿里云OSS如何迁移到腾讯云COS?腾讯云cos迁移教程
    【云原生】服务网格kiali开发环境搭建问题解析
    javaee SpringMVC 乱码问题解决
    Spring 循环依赖
    Tomcat调优【精简版】
    移动端开发:WebView介绍和使用、JSBridge等
    Linux日志管理
    杰理之 DAC详细配置【篇】
  • 原文地址:https://blog.csdn.net/qq_42974034/article/details/126023471