• 如何在项目中使用kafka?


    1.如何在项目中使用kafka?

    1.1)因为kafka的使用依赖于zookeeper(https://mp.weixin.qq.com/s/geR3pDw_Yjhmu8KMsXQosg在kafka v2.8版本后将zookeeper也集成在了服务中在kafka v2.8版本后官网取消了kafka依赖zookeeper集群的机制,采用内置kraft的方式),配置zookeeper的信息,即需要在kafka/config/zookeeper.properties配置zookeeper服务运行期间产生的数据存放位置dataDir,

     1.2)在kafka/config/server.properties 中配置kafka服务运行期间产生的log文件的位置,(注意:Kafka 使用消息日志(Log)来保存数据)

    1.3)运行zookeeper和kafka服务,

    1. ./bin/zookeeper-server-start.sh ./config/zookeeper.properties
    2. ./bin/kafka-server-start.sh ./config/server.properties

     1.4)在spring中引入kafka依赖

    1. <dependency>
    2. <groupId>org.springframework.kafkagroupId>
    3. <artifactId>spring-kafkaartifactId>
    4. dependency>

    spring中如何使用kafka?

    发送消息:利用KafkaTemplate类->kafkaTemplate.send(topic, message);

    消费消息:利用注解@KafkaListener(topics = {**},groupId = "")

    1.5)应用:如何在项目中使用kafka实现发布系统通知功能?

    用户进行关注,评论,回复等行为都是时刻在发生的.如果当某个用户进行关注/评论/回复(上游服务)后,系统立即对此请求进行处理,即向被关注/评论/回复的用户发送系统通知(下游服务)这一功能,当用户关注,评论,回复行为异常活跃时,就会拖垮服务器或者数据库.

    利用kafka消息引擎的异步,解耦,流量削峰的特性,来实现项目中系统通知的功能.

    1.5.1)将用户进行关注/评论/回复的行为抽象为事件Event对象

    1. @Component
    2. public class Event {
    3. String topic;//事件的主题(点赞/关注/回复)
    4. int userId;//事件的发起者
    5. int entityType;//被点赞/关注/回复的实体类型(项目中只有三种:用户,帖子,评论)
    6. int entityId;//被点赞/关注/回复的实体id
    7. int entityUserId;//被点赞/关注/回复的实体的作者(实体类型为用户时,entityId==entityUserId)
    8. Map data = new HashMap<>();//事件中其他额外需要装的数据
    9. /**
    10. 省略get/set方法
    11. */
    12. }

    1.5.2)写生产者Producer类

    1. @Component
    2. public class EventProducer {
    3. @Autowired
    4. private KafkaTemplate kafkaTemplate;
    5. public void fireEvent(Event event){
    6. kafkaTemplate.send(event.getTopic(), JSONObject.toJSONString(event));
    7. }
    8. }

    1.5.3)写消费者Consumer 类

    1. @Component
    2. public class EventConsumer implements CommunityConst {
    3. @Autowired
    4. private MessageService messageService;
    5. @KafkaListener(topics = {KAFKA_TOPIC_COMMENT,KAFKA_TOPIC_LIKE,KAFKA_TOPIC_FOLLOW})
    6. public void handleCommentMessage(ConsumerRecord record){
    7. //spring监听到以上某一个主题下有消息了,就会自动调用此方法,并将消息封装到ConsumerRecord对象中
    8. if(record ==null || record.value()!=null) {
    9. logger.error("event is null");
    10. }
    11. Event event = JSONObject.parseObject( record.value().toString(), Event.class);
    12. if(event == null){
    13. logger.error("event is not null, but it has a wrong form");
    14. }
    15. //produce message
    16. Message message = new Message();
    17. message.setFromId(SYSTEM_USER_ID);
    18. message.setToId(event.getEntityUserId());
    19. message.setConversationId(event.getTopic());
    20. message.setCreateTime(new Date());
    21. Map content = new HashMap<>();
    22. content.put("userId",event.getUserId());
    23. content.put("entityType",event.getEntityType());
    24. content.put("entityId",event.getEntityId());
    25. if(event.getData()!=null){
    26. for(Map.Entry entry: event.getData().entrySet()){
    27. content.put(entry.getKey(),entry.getValue());
    28. }
    29. }
    30. message.setContent(JSONObject.toJSONString(content));
    31. //add a system message
    32. messageService.sendMessage(message);//插入一条消息到数据库消息表中
    33. }
    34. }

    1.5.4)什么时刻触发消息的发送

    以评论行为为例,

    1. @Controller
    2. @RequestMapping(path = "/comment")
    3. public class CommentController implements CommunityConst {
    4. @Autowired
    5. private HostHolder hostHolder;
    6. @Autowired
    7. private CommentService commentService;
    8. @Autowired
    9. private EventProducer eventProducer;
    10. @Autowired
    11. private DiscussPostService discussPostService;
    12. @Autowired
    13. private RedisTemplate redisTemplate;
    14. @RequestMapping(path = "/add/{PostId}",method = RequestMethod.POST)
    15. public String addComments(@PathVariable("PostId") int postId, Comment comment){
    16. //1.
    17. comment.setUserId(hostHolder.getUser().getId());
    18. comment.setCreateTime(new Date());
    19. comment.setStatus(0);
    20. //2.
    21. commentService.addComment(comment);
    22. //3.send system comment message
    23. Event event = new Event()
    24. .setTopic(KAFKA_TOPIC_COMMENT)
    25. .setUserId(hostHolder.getUser().getId())
    26. .setEntityType(comment.getEntityType())
    27. .setEntityId(comment.getEntityId())
    28. .setData("postId",postId);
    29. if(comment.getEntityType() == ENTITY_TYPE_POST){
    30. //the author of post
    31. event.setEntityUserId(discussPostService.findDiscussPostById(comment.getEntityId()).getUserId());
    32. }else if(comment.getEntityType() == ENTITY_TYPE_COMMENT){
    33. //the author of comment
    34. event.setEntityUserId(commentService.findCommentById(comment.getEntityId()).getUserId());
    35. }
    36. eventProducer.fireEvent(event);
    37. }

    其他两类业务代码类似.

    1.5.4)展示某个用户的通知列表

    1. @RequestMapping(path = "/notice/list",method = RequestMethod.GET)
    2. public String getNoticeList(Model model){
    3. User user = hostHolder.getUser();
    4. //comment
    5. Map map = new HashMap<>();
    6. Message leastNotice = messageService.findLeastNotice(KAFKA_TOPIC_COMMENT, user.getId());
    7. if(leastNotice!=null){
    8. map.put("leastNotice",leastNotice);
    9. HashMap content = JSONObject.parseObject(leastNotice.getContent(), HashMap.class);
    10. map.put("entityType",content.get("entityType"));
    11. map.put("entityId",content.get("entityId"));
    12. map.put("user",userService.findUserById((Integer) content.get("userId")));
    13. map.put("postId",content.get("postId"));
    14. int noticeCount = messageService.findNoticeCount(KAFKA_TOPIC_COMMENT, user.getId());
    15. int unReadNoticeCount = messageService.findUnReadNoticeCount(KAFKA_TOPIC_COMMENT, user.getId());
    16. map.put("noticeCount",noticeCount);
    17. map.put("unReadNoticeCount",unReadNoticeCount);
    18. model.addAttribute("comment",map);
    19. }
    20. //like
    21. map = new HashMap<>();
    22. leastNotice = messageService.findLeastNotice(KAFKA_TOPIC_LIKE, user.getId());
    23. if(leastNotice!=null){
    24. map.put("leastNotice",leastNotice);
    25. HashMap content = JSONObject.parseObject(leastNotice.getContent(), HashMap.class);
    26. map.put("entityType",content.get("entityType"));
    27. map.put("entityId",content.get("entityId"));
    28. map.put("user",userService.findUserById((Integer) content.get("userId")));
    29. map.put("postId",content.get("postId"));
    30. int noticeCount = messageService.findNoticeCount(KAFKA_TOPIC_LIKE, user.getId());
    31. int unReadNoticeCount = messageService.findUnReadNoticeCount(KAFKA_TOPIC_LIKE, user.getId());
    32. map.put("noticeCount",noticeCount);
    33. map.put("unReadNoticeCount",unReadNoticeCount);
    34. model.addAttribute("like",map);
    35. }
    36. //follow
    37. map = new HashMap<>();
    38. leastNotice = messageService.findLeastNotice(KAFKA_TOPIC_FOLLOW, user.getId());
    39. if(leastNotice!=null){
    40. map.put("leastNotice",leastNotice);
    41. HashMap content = JSONObject.parseObject(leastNotice.getContent(), HashMap.class);
    42. map.put("entityType",content.get("entityType"));
    43. map.put("entityId",content.get("entityId"));
    44. map.put("user",userService.findUserById((Integer) content.get("userId")));
    45. int noticeCount = messageService.findNoticeCount(KAFKA_TOPIC_FOLLOW, user.getId());
    46. int unReadNoticeCount = messageService.findUnReadNoticeCount(KAFKA_TOPIC_FOLLOW, user.getId());
    47. map.put("noticeCount",noticeCount);
    48. map.put("unReadNoticeCount",unReadNoticeCount);
    49. model.addAttribute("follow",map);
    50. }
    51. //
    52. model.addAttribute("unreadConversationCount",messageService.findUnreadLetterCount(user.getId(), null));
    53. model.addAttribute("unreadNoticeCount",messageService.findUnReadNoticeCount(null,user.getId()));
    54. return "/site/notice";
    55. }

    1.5.5)某类通知的详情

    .....

    总结:到此就实现了在spring项目中用kafka作为消息引擎系统来实现系统通知的功能.

  • 相关阅读:
    java内存地址
    WPF必须掌握的技能之自定义控件——实战:自制上传文件显示进度按钮
    html中一些简单的css动画 包括滚动进度条 滑动动画
    计算机算法分析与设计(23)---二分搜索算法(C++)
    node.js——模块化
    springboot整合其它项目
    mysql数据库的授权访问
    错过等一年!2022年全国注册城乡规划师考试报名入口开启!
    Python 运算符和表达式
    java毕业设计教学平台(附源码、数据库)
  • 原文地址:https://blog.csdn.net/DYYssb/article/details/126706679