1.如何在项目中使用kafka?
1.1)因为kafka的使用依赖于zookeeper(https://mp.weixin.qq.com/s/geR3pDw_Yjhmu8KMsXQosg在kafka v2.8版本后将zookeeper也集成在了服务中在kafka v2.8版本后官网取消了kafka依赖zookeeper集群的机制,采用内置kraft的方式),配置zookeeper的信息,即需要在kafka/config/zookeeper.properties配置zookeeper服务运行期间产生的数据存放位置dataDir,
1.2)在kafka/config/server.properties 中配置kafka服务运行期间产生的log文件的位置,(注意:Kafka 使用消息日志(Log)来保存数据)
1.3)运行zookeeper和kafka服务,
- ./bin/zookeeper-server-start.sh ./config/zookeeper.properties
- ./bin/kafka-server-start.sh ./config/server.properties
1.4)在spring中引入kafka依赖
- <dependency>
- <groupId>org.springframework.kafkagroupId>
- <artifactId>spring-kafkaartifactId>
- dependency>
spring中如何使用kafka?
发送消息:利用KafkaTemplate类->kafkaTemplate.send(topic, message);
消费消息:利用注解@KafkaListener(topics = {**},groupId = "")
1.5)应用:如何在项目中使用kafka实现发布系统通知功能?
用户进行关注,评论,回复等行为都是时刻在发生的.如果当某个用户进行关注/评论/回复(上游服务)后,系统立即对此请求进行处理,即向被关注/评论/回复的用户发送系统通知(下游服务)这一功能,当用户关注,评论,回复行为异常活跃时,就会拖垮服务器或者数据库.
利用kafka消息引擎的异步,解耦,流量削峰的特性,来实现项目中系统通知的功能.
1.5.1)将用户进行关注/评论/回复的行为抽象为事件Event对象
- @Component
- public class Event {
- String topic;//事件的主题(点赞/关注/回复)
- int userId;//事件的发起者
- int entityType;//被点赞/关注/回复的实体类型(项目中只有三种:用户,帖子,评论)
- int entityId;//被点赞/关注/回复的实体id
- int entityUserId;//被点赞/关注/回复的实体的作者(实体类型为用户时,entityId==entityUserId)
- Map
data = new HashMap<>();//事件中其他额外需要装的数据 -
- /**
- 省略get/set方法
- */
- }
1.5.2)写生产者Producer类
- @Component
- public class EventProducer {
- @Autowired
- private KafkaTemplate kafkaTemplate;
-
- public void fireEvent(Event event){
- kafkaTemplate.send(event.getTopic(), JSONObject.toJSONString(event));
- }
- }
1.5.3)写消费者Consumer 类
- @Component
- public class EventConsumer implements CommunityConst {
-
-
- @Autowired
- private MessageService messageService;
-
-
- @KafkaListener(topics = {KAFKA_TOPIC_COMMENT,KAFKA_TOPIC_LIKE,KAFKA_TOPIC_FOLLOW})
- public void handleCommentMessage(ConsumerRecord record){
- //spring监听到以上某一个主题下有消息了,就会自动调用此方法,并将消息封装到ConsumerRecord对象中
- if(record ==null || record.value()!=null) {
- logger.error("event is null");
- }
- Event event = JSONObject.parseObject( record.value().toString(), Event.class);
- if(event == null){
- logger.error("event is not null, but it has a wrong form");
- }
- //produce message
- Message message = new Message();
- message.setFromId(SYSTEM_USER_ID);
- message.setToId(event.getEntityUserId());
- message.setConversationId(event.getTopic());
- message.setCreateTime(new Date());
-
- Map
content = new HashMap<>(); - content.put("userId",event.getUserId());
- content.put("entityType",event.getEntityType());
- content.put("entityId",event.getEntityId());
- if(event.getData()!=null){
- for(Map.Entry
entry: event.getData().entrySet()){ - content.put(entry.getKey(),entry.getValue());
- }
- }
- message.setContent(JSONObject.toJSONString(content));
- //add a system message
- messageService.sendMessage(message);//插入一条消息到数据库消息表中
-
- }
-
- }
1.5.4)什么时刻触发消息的发送
以评论行为为例,
-
- @Controller
- @RequestMapping(path = "/comment")
- public class CommentController implements CommunityConst {
- @Autowired
- private HostHolder hostHolder;
- @Autowired
- private CommentService commentService;
- @Autowired
- private EventProducer eventProducer;
- @Autowired
- private DiscussPostService discussPostService;
- @Autowired
- private RedisTemplate redisTemplate;
-
- @RequestMapping(path = "/add/{PostId}",method = RequestMethod.POST)
- public String addComments(@PathVariable("PostId") int postId, Comment comment){
- //1.
- comment.setUserId(hostHolder.getUser().getId());
- comment.setCreateTime(new Date());
- comment.setStatus(0);
- //2.
- commentService.addComment(comment);
- //3.send system comment message
- Event event = new Event()
- .setTopic(KAFKA_TOPIC_COMMENT)
- .setUserId(hostHolder.getUser().getId())
- .setEntityType(comment.getEntityType())
- .setEntityId(comment.getEntityId())
- .setData("postId",postId);
- if(comment.getEntityType() == ENTITY_TYPE_POST){
- //the author of post
- event.setEntityUserId(discussPostService.findDiscussPostById(comment.getEntityId()).getUserId());
- }else if(comment.getEntityType() == ENTITY_TYPE_COMMENT){
- //the author of comment
- event.setEntityUserId(commentService.findCommentById(comment.getEntityId()).getUserId());
- }
- eventProducer.fireEvent(event);
-
-
- }
其他两类业务代码类似.
1.5.4)展示某个用户的通知列表
- @RequestMapping(path = "/notice/list",method = RequestMethod.GET)
- public String getNoticeList(Model model){
- User user = hostHolder.getUser();
- //comment
- Map
map = new HashMap<>(); - Message leastNotice = messageService.findLeastNotice(KAFKA_TOPIC_COMMENT, user.getId());
- if(leastNotice!=null){
- map.put("leastNotice",leastNotice);
- HashMap content = JSONObject.parseObject(leastNotice.getContent(), HashMap.class);
- map.put("entityType",content.get("entityType"));
- map.put("entityId",content.get("entityId"));
- map.put("user",userService.findUserById((Integer) content.get("userId")));
- map.put("postId",content.get("postId"));
-
- int noticeCount = messageService.findNoticeCount(KAFKA_TOPIC_COMMENT, user.getId());
- int unReadNoticeCount = messageService.findUnReadNoticeCount(KAFKA_TOPIC_COMMENT, user.getId());
- map.put("noticeCount",noticeCount);
- map.put("unReadNoticeCount",unReadNoticeCount);
- model.addAttribute("comment",map);
- }
-
-
- //like
- map = new HashMap<>();
- leastNotice = messageService.findLeastNotice(KAFKA_TOPIC_LIKE, user.getId());
- if(leastNotice!=null){
- map.put("leastNotice",leastNotice);
- HashMap content = JSONObject.parseObject(leastNotice.getContent(), HashMap.class);
- map.put("entityType",content.get("entityType"));
- map.put("entityId",content.get("entityId"));
- map.put("user",userService.findUserById((Integer) content.get("userId")));
- map.put("postId",content.get("postId"));
-
- int noticeCount = messageService.findNoticeCount(KAFKA_TOPIC_LIKE, user.getId());
- int unReadNoticeCount = messageService.findUnReadNoticeCount(KAFKA_TOPIC_LIKE, user.getId());
- map.put("noticeCount",noticeCount);
- map.put("unReadNoticeCount",unReadNoticeCount);
- model.addAttribute("like",map);
- }
-
-
- //follow
- map = new HashMap<>();
- leastNotice = messageService.findLeastNotice(KAFKA_TOPIC_FOLLOW, user.getId());
- if(leastNotice!=null){
- map.put("leastNotice",leastNotice);
- HashMap content = JSONObject.parseObject(leastNotice.getContent(), HashMap.class);
- map.put("entityType",content.get("entityType"));
- map.put("entityId",content.get("entityId"));
- map.put("user",userService.findUserById((Integer) content.get("userId")));
- int noticeCount = messageService.findNoticeCount(KAFKA_TOPIC_FOLLOW, user.getId());
- int unReadNoticeCount = messageService.findUnReadNoticeCount(KAFKA_TOPIC_FOLLOW, user.getId());
- map.put("noticeCount",noticeCount);
- map.put("unReadNoticeCount",unReadNoticeCount);
- model.addAttribute("follow",map);
- }
-
-
- //
- model.addAttribute("unreadConversationCount",messageService.findUnreadLetterCount(user.getId(), null));
- model.addAttribute("unreadNoticeCount",messageService.findUnReadNoticeCount(null,user.getId()));
-
- return "/site/notice";
- }
1.5.5)某类通知的详情
.....
总结:到此就实现了在spring项目中用kafka作为消息引擎系统来实现系统通知的功能.