同步消息系统:
1)定义:双向通信,需要保证发送方向接受者发送消息,接受者接收消息,这个时间段内两边不能做其他的事情,需要同时在线。发送程序和接收程序必须同步运行,否则会发送失败。
2)**优缺点:**时效性较强,可以立即得到结果;耦合度高,性能和吞吐能力下降。
3)应用场景:时序上有严格执行关系; 需要进行原子操作;没有耗时的操作
4)实现方式:消息处理=BIO(同步阻塞IO)与NIO(同步非阻塞IO),消息传输=TCP/IP与UDP/IP。Socket进行实现。
异步消息系统:
1)定义:单向通信,发送端发送消息不需要接收方在线,可以后续推送。
2)优缺点:执行效率高,节约时间,耦合度低;缺点是不利于对流程进行控制,且实现更加复杂。
3)应用场景:没有严格时序关系,没有原子操作,有耗时的操作。
4)实现方式:AOI(异步阻塞IO);消息队列,Kafka
**异步消息系统:**以异步方式实现消息的传递。
目的:解决线程通信的问题,比较原始的方法。
问题:生产者和消费者两者消耗不匹配,会占用系统资源,故需要阻塞队列(生产者生产一些消息后,如果消费者还没接收,就会存入阻塞队列中存起来,之后生产者就去做别的事情了,消费者慢慢消费)
使用对象:BlockingQueue(使用其接口)
(1):解决线程通信问题
(2):阻塞方法:put和take
生产者和消费者模式
(1):生产者:产生数据的线程
(2):消费者:使用数据的线程
消息队列实现类
(1):ArrayBlockingQueue:测试中用到的
(2):LinkedBlockingQueue
(3):PriorityBlockingQueue,SynchronousQueue,DelayQueue等
阻塞队列实现消息队列步骤
1)生产者实现Runnable接口,重写run方法,通过put把消息放进去。
2)消费者实现Runnable接口,重写run方法,通过take接收消息。
3)调用函数,新建一个消息队列,BlockingQueue。新建一个线程生产者,对消息进行生产放入消息队列中。新建多干个线程消费者,把消息队列中的消息进行输出。

(1)Broker:服务器
(2)Zookeeper:独立软件,管理集群(集群中,不同节点的通信)
(3)Topic:发布订阅模式的空间
(4)Partition:对Topic的分区,提高并发能力
(5)Offset:消息在分区内存存放的索引序列
(6)Leader Replica:主副本,调用数据
(7)FollowerReplica:从副本,主要是对主副本的文件备份,提高容错率。
5. 使用kafka
1)安装程序
2)配置zookeeper.properties与service.properties
3)启动zookeeper:使用之前的配置文件
F:\kafka\kafka>bin\windows\zookeeper-server-start.bat config\zookeeper.properties
4)启动kafka:使用之前的配置文件
F:\kafka\kafka>bin\windows\kafka-server-start.bat config\server.properties
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=community-consumer-group
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=3000
public void sendMessage(String topic, String content) {
kafkaTemplate.send(topic, content);
}
(2):消费者(被动调用,接收消息):(使用注解表明主题)
@KafkaListener(topics = {"test"})
public void handleMessage(ConsumerRecord record) {
System.out.println(record.value());
}
//核心方法:调用kafka的接收消息功能
@KafkaListener(topics = {TOPIC_COMMENT, TOPIC_LIKE, TOPIC_FOLLOW})
public void handleCommentMessage(ConsumerRecord record) {
1)核心方法:调用的是kafka的接收消息功能,对象为recoder
2)具体:把recoder转化为Event对象,然后通过这个对象获取事件到的基本信息,封装成一个message,存入到message数据库表格里面。
3)其中重要信息为:把event的data数据保存好(即在显示栏会显示的信息),然后封装到message中的content里面,便于展示。
修改CommentController:发送评论
(1):需要把评论的数据封装为一个envet对象,然后调用producer方法。这里需要显示跳转到评论,故可以写一个方法,根据评论的id获取评论信息。
修改LikeController:点赞*
(1):需要把点赞的数据封装为一个envet对象,然后调用producer方法。这里需要显示跳转到帖子,这里在点赞开头多传入一个参数,即postId。即从前端页面多返回个帖子的id。
修改FollowController:关注*
(1):需要把关注的数据封装为一个envet对象,然后调用producer方法。关注的是人,不需要显示额外信息。
运行注意事项
1)如果kafka运行不正常,把数据中的kafka日志删除
// 查询某个主题下最新的通知
Message selectLatestNotice(int userId, String topic);
// 查询某个主题所包含的通知数量
int selectNoticeCount(int userId, String topic);
// 查询未读的通知的数量
int selectNoticeUnreadCount(int userId, String topic);
// 查询某个主题所包含的通知列表
List<Message> selectNotices(int userId, String topic, int offset, int limit);


(9)写入数据的流程:
新来一个数据,是写入到主分片的,副本分片会备份主分片的信息。

@Document(indexName = "discusspost", type = "_doc", shards = 6, replicas = 3)
indexName:索引名字,type:类型;shards:分片;replicas :副本
作用:实体DiscussPost和es中的索引discusspost有了对应的关系。
@Document(indexName = "discusspost", type = "_doc", shards = 6, replicas = 3)
public class DiscussPost {
@Id
private int id;
@Field(type = FieldType.Integer)
private int userId;
// 互联网校招(重要:因为就靠这个搜索)
//analyzer:存储使用的分词器;searchAnalyzer:搜索使用的分词器
@Field(type = FieldType.Text, analyzer = "ik_max_word", searchAnalyzer = "ik_smart")
private String title;
@Field(type = FieldType.Text, analyzer = "ik_max_word", searchAnalyzer = "ik_smart")
private String content;
@Field(type = FieldType.Integer)
private int type;
@Field(type = FieldType.Integer)
private int status;
@Field(type = FieldType.Date)
private Date createTime;
@Field(type = FieldType.Integer)
private int commentCount;
@Field(type = FieldType.Double)
private double score;
@Repository
public interface DiscussPostRepository extends ElasticsearchRepository<DiscussPost, Integer> {
}
2)访问ES的逻辑:在ES中搜索,数据来源是mysql数据库里面,然后转存到ES服务器的数据库里;后面的话会先做初始化操作,即把mysql中的数据全部转存到ES服务器中,以后如果mysql有新增,在es没有所搜到的时候,会先存到es中去。
3)增加数据:调用DiscussPostRepository的save方法。(PostMan中的URL:localhost:9230/discusspost/_doc/231,指向对应的索引名字和类型)
4)更新数据:修改一点内容,再次save存储,会进行覆盖。
5)删除:进行删除操作,deleteById和deleteAll。
6)查找(有两个类Dao层可选:Elasticsearch Repository和Elasticsearch Template)
(1)通过方法1:Elasticsearch Repository
(2)指定搜索的对象searchQuery:内容withQuery,排序方法ithSort,分页查询withPageable,哪些属性高亮显示withHighlightFields。
(3)discussRepository.search(searchQuery):通过discussRepository的search,获取搜索结果。
(4)缺点:找到了高亮内容,但是不直接范围,而是分为两部分数据返回。
@Test
public void testSearchByRepository() {
SearchQuery searchQuery = new NativeSearchQueryBuilder()
.withQuery(QueryBuilders.multiMatchQuery("互联网寒冬", "title", "content"))
.withSort(SortBuilders.fieldSort("type").order(SortOrder.DESC))
.withSort(SortBuilders.fieldSort("score").order(SortOrder.DESC))
.withSort(SortBuilders.fieldSort("createTime").order(SortOrder.DESC))
.withPageable(PageRequest.of(0, 10))
.withHighlightFields(
new HighlightBuilder.Field("title").preTags("").postTags(""),
new HighlightBuilder.Field("content").preTags("").postTags("")
).build();
// elasticTemplate.queryForPage(searchQuery, class, SearchResultMapper)
// 底层获取得到了高亮显示的值, 但是没有返回.
Page<DiscussPost> page = discussRepository.search(searchQuery);
System.out.println(page.getTotalElements());
System.out.println(page.getTotalPages());
System.out.println(page.getNumber());
System.out.println(page.getSize());
for (DiscussPost post : page) {
System.out.println(post);
}
}
-----------------------------------
(2)通过方法2:Elasticsearch Template
(2)指定搜索的对象searchQuery:内容withQuery,排序方法ithSort,分页查询withPageable,哪些属性高亮显示withHighlightFields。(和上一个方法一样)
(3)Page page = elasticTemplate.queryForPage(searchQuery, DiscussPost.class, new SearchResultMapper() :重写queryForPage方法。
@Test
public void testSearchByTemplate() {
SearchQuery searchQuery = new NativeSearchQueryBuilder()
.withQuery(QueryBuilders.multiMatchQuery("互联网寒冬", "title", "content"))
.withSort(SortBuilders.fieldSort("type").order(SortOrder.DESC))
.withSort(SortBuilders.fieldSort("score").order(SortOrder.DESC))
.withSort(SortBuilders.fieldSort("createTime").order(SortOrder.DESC))
.withPageable(PageRequest.of(0, 10))
.withHighlightFields(
new HighlightBuilder.Field("title").preTags("").postTags(""),
new HighlightBuilder.Field("content").preTags("").postTags("")
).build();
Page<DiscussPost> page = elasticTemplate.queryForPage(searchQuery, DiscussPost.class, new SearchResultMapper() {
@Override
public <T> AggregatedPage<T> mapResults(SearchResponse response, Class<T> aClass, Pageable pageable) {
SearchHits hits = response.getHits();
if (hits.getTotalHits() <= 0) {
return null;
}
List<DiscussPost> list = new ArrayList<>();
for (SearchHit hit : hits) {
DiscussPost post = new DiscussPost();
String id = hit.getSourceAsMap().get("id").toString();
post.setId(Integer.valueOf(id));
String userId = hit.getSourceAsMap().get("userId").toString();
post.setUserId(Integer.valueOf(userId));
String title = hit.getSourceAsMap().get("title").toString();
post.setTitle(title);
String content = hit.getSourceAsMap().get("content").toString();
post.setContent(content);
String status = hit.getSourceAsMap().get("status").toString();
post.setStatus(Integer.valueOf(status));
String createTime = hit.getSourceAsMap().get("createTime").toString();
post.setCreateTime(new Date(Long.valueOf(createTime)));
String commentCount = hit.getSourceAsMap().get("commentCount").toString();
post.setCommentCount(Integer.valueOf(commentCount));
// 处理高亮显示的结果
HighlightField titleField = hit.getHighlightFields().get("title");
if (titleField != null) {
post.setTitle(titleField.getFragments()[0].toString());
}
HighlightField contentField = hit.getHighlightFields().get("content");
if (contentField != null) {
post.setContent(contentField.getFragments()[0].toString());
}
list.add(post);
}
return new AggregatedPageImpl(list, pageable,
hits.getTotalHits(), response.getAggregations(), response.getScrollId(), hits.getMaxScore());
}
});
System.out.println(page.getTotalElements());
System.out.println(page.getTotalPages());
System.out.println(page.getNumber());
System.out.println(page.getSize());
for (DiscussPost post : page) {
System.out.println(post);
}
}
文章参考:
牛客网Java项目:https://www.nowcoder.com/study/live/246
代码记录:https://gitee.com/xkh-dasiy/newcoder
漫画形式说明Linux的五种IO模型
Kafka【入门】就这一篇!
计网 - 网络 I/O 模型:BIO、NIO 和 AIO 有什么区别?
Elasticsearch术语及概念熟悉
什么是 Elasticsearch?一篇搞懂