• 牛客项目(五)-使用kafka实现发送系统通知


    kafka入门以及与spring整合

    在这里插入图片描述

    Message.java

    import java.util.Date;
    
    public class Message {
        private int id;
        private int fromId;
        private int toId;
        private String conversationId;
        private String content;
        private int status;
        private Date createTime;
    
        public int getId() {
            return id;
        }
    
        public void setId(int id) {
            this.id = id;
        }
    
        public int getFromId() {
            return fromId;
        }
    
        public void setFromId(int fromId) {
            this.fromId = fromId;
        }
    
        public int getToId() {
            return toId;
        }
    
        public void setToId(int toId) {
            this.toId = toId;
        }
    
        public String getConversationId() {
            return conversationId;
        }
    
        public void setConversationId(String conversationId) {
            this.conversationId = conversationId;
        }
    
        public String getContent() {
            return content;
        }
    
        public void setContent(String content) {
            this.content = content;
        }
    
        public int getStatus() {
            return status;
        }
    
        public void setStatus(int status) {
            this.status = status;
        }
    
        public Date getCreateTime() {
            return createTime;
        }
    
        public void setCreateTime(Date createTime) {
            this.createTime = createTime;
        }
    
        @Override
        public String toString() {
            return "Message{" +
                    "id=" + id +
                    ", fromId=" + fromId +
                    ", toId=" + toId +
                    ", conversationId='" + conversationId + '\'' +
                    ", content='" + content + '\'' +
                    ", status=" + status +
                    ", createTime=" + createTime +
                    '}';
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81

    EventConsumer.java

    定义事件消费者

    import com.alibaba.fastjson.JSONObject;
    import edu.npu.newcoder.community.community.entity.DiscussPost;
    import edu.npu.newcoder.community.community.entity.Event;
    import edu.npu.newcoder.community.community.entity.Message;
    import edu.npu.newcoder.community.community.service.DiscussPostService;
    import edu.npu.newcoder.community.community.service.ElasticsearchService;
    import edu.npu.newcoder.community.community.service.MessageService;
    import edu.npu.newcoder.community.community.util.CommunityConstant;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.kafka.annotation.KafkaListener;
    import org.springframework.stereotype.Component;
    
    import java.util.Date;
    import java.util.HashMap;
    import java.util.Map;
    
    @Component
    public class EventConsumer implements CommunityConstant {
    //    private static final Logger logger= LoggerFactory.getLogger(EventConsumer.class);
        @Autowired
        private MessageService messageService;
        @Autowired
        private DiscussPostService discussPostService;
        @Autowired
        private ElasticsearchService elasticsearchService;
        //加一个监听相关主题的listener
        @KafkaListener(topics = {TOPIC_COMMENT,TOPIC_LIKE,TOPIC_FOLLOW})
        public void handleCommentMessage(ConsumerRecord record){
            if(record == null || record.value()==null){
                System.out.println("错误发帖");
                return;
            }
            Event event= JSONObject.parseObject(record.value().toString(),Event.class);
            if(event == null){
                System.out.println("错误发帖");
                return;
            }
            //发送站内通知
            Message message = new Message();
            message.setFromId(SYSTEM_USERID);
            message.setToId(event.getEntityUserId());
            message.setConversationId(event.getTopic());
            message.setCreateTime(new Date());
            //message的内容
            Map<String,Object> content=new HashMap<>();
            content.put("userId",event.getUserId());
            content.put("entityType",event.getEntityType());
            content.put("entityId",event.getEntityId());
            if(!event.getData().isEmpty()){
                for(Map.Entry<String,Object> entry:event.getData().entrySet()){
                    content.put(entry.getKey(),entry.getValue());
                }
            }
            message.setContent(JSONObject.toJSONString(content));
            System.out.println(message);
            messageService.addMessage(message);
            System.out.println("成功处理事件");
        }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61

    Event.java

    定义一个事件实体 以方便在消息的发送与处理

    import java.util.HashMap;
    import java.util.Map;
    
    //用于事件驱动的kafka消息队列开发
    public class Event {
        private String topic;
        //事件触发的人
        private int userId;
        //事件发生在哪个实体
        private int entityType;
        private int entityId;
        //实体作者
        private int entityUserId;
        //存储额外数据
        private Map<String,Object> data = new HashMap<>();
    
        public String getTopic() {
            return topic;
        }
    
        public Event setTopic(String topic) {
            this.topic = topic;
            return this;
        }
    
        public int getUserId() {
            return userId;
        }
    
        public Event setUserId(int userId) {
            this.userId = userId;
            return this;
        }
    
        public int getEntityType() {
            return entityType;
        }
    
        public Event setEntityType(int entityType) {
            this.entityType = entityType;
            return this;
        }
    
        public int getEntityId() {
            return entityId;
        }
    
        public Event setEntityId(int entityId) {
            this.entityId = entityId;
            return this;
        }
    
        public int getEntityUserId() {
            return entityUserId;
        }
    
        public Event setEntityUserId(int entityUserId) {
            this.entityUserId = entityUserId;
            return this;
        }
    
        public Map<String, Object> getData() {
            return data;
        }
    
        public Event setData(String key,Object value) {
            this.data.put(key,value);
            return this;
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71

    EventProducer.java

    定义事件的生产者

    import com.alibaba.fastjson.JSONObject;
    import edu.npu.newcoder.community.community.entity.Event;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.stereotype.Component;
    
    @Component
    public class EventProducer {
    //生产者使用kafkaTemplate发送消息
        @Autowired
        KafkaTemplate kafkaTemplate;
        //处理事件
        public void fireEvent(Event event){
            //将事件发布到指定的主题
            //将event转换为json数据进行消息发送
            kafkaTemplate.send(event.getTopic(), JSONObject.toJSONString(event));
            System.out.println("成功发送"+event.getTopic());
    
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    EventConsumer.java

    定义事件消费者

    import com.alibaba.fastjson.JSONObject;
    import edu.npu.newcoder.community.community.entity.DiscussPost;
    import edu.npu.newcoder.community.community.entity.Event;
    import edu.npu.newcoder.community.community.entity.Message;
    import edu.npu.newcoder.community.community.service.DiscussPostService;
    import edu.npu.newcoder.community.community.service.ElasticsearchService;
    import edu.npu.newcoder.community.community.service.MessageService;
    import edu.npu.newcoder.community.community.util.CommunityConstant;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.kafka.annotation.KafkaListener;
    import org.springframework.stereotype.Component;
    
    import java.util.Date;
    import java.util.HashMap;
    import java.util.Map;
    
    @Component
    public class EventConsumer implements CommunityConstant {
    //    private static final Logger logger= LoggerFactory.getLogger(EventConsumer.class);
        @Autowired
        private MessageService messageService;
        @Autowired
        private DiscussPostService discussPostService;
        @Autowired
        private ElasticsearchService elasticsearchService;
        //加一个监听相关主题的listener
        @KafkaListener(topics = {TOPIC_COMMENT,TOPIC_LIKE,TOPIC_FOLLOW})
        public void handleCommentMessage(ConsumerRecord record){
            if(record == null || record.value()==null){
                System.out.println("错误发帖");
                return;
            }
            Event event= JSONObject.parseObject(record.value().toString(),Event.class);
            if(event == null){
                System.out.println("错误发帖");
                return;
            }
            //发送站内通知
            Message message = new Message();
            message.setFromId(SYSTEM_USERID);
            message.setToId(event.getEntityUserId());
            message.setConversationId(event.getTopic());
            message.setCreateTime(new Date());
            //message的内容
            Map<String,Object> content=new HashMap<>();
            content.put("userId",event.getUserId());
            content.put("entityType",event.getEntityType());
            content.put("entityId",event.getEntityId());
            if(!event.getData().isEmpty()){
                for(Map.Entry<String,Object> entry:event.getData().entrySet()){
                    content.put(entry.getKey(),entry.getValue());
                }
            }
            message.setContent(JSONObject.toJSONString(content));
            System.out.println(message);
            messageService.addMessage(message);
            System.out.println("成功处理事件");
        }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61

    在特定的地方触发消息产生

    CommentController

     //触发评论事件
            Event event=new Event().setTopic(TOPIC_COMMENT)
                    .setUserId(hostHolder.getUser().getId())
                    .setEntityType(comment.getEntityType())
                    .setEntityId(comment.getEntityId())
                    .setData("postId",discussPostId);
            if(comment.getEntityType() == ENTITY_TYPE_POST){
                DiscussPost target=discussPostService.findDiscussPostById(comment.getEntityId());
                event.setEntityUserId(target.getUserId());
            }else if(comment.getEntityType()==ENTITY_TYPE_COMMENT){
                //根据评论的id查询评论
                Comment target =commentService.findCommentById(comment.getEntityId());
                event.setEntityUserId(target.getUserId());
            }
             eventProducer.fireEvent(event);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    LikeController

     //触发点赞事件
     if(likeStatus ==1){
                Event event =new Event()
                        .setTopic(TOPIC_LIKE)
                        .setUserId(hostHolder.getUser().getId())
                        .setEntityType(entityType)
                        .setEntityId(entityId)
                        .setEntityUserId(entityUserId)
                        .setData("postId",postId);
                eventProducer.fireEvent(event);
            }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    FollowController

     //触发关注事件
     Event event = new Event()
                    .setTopic(TOPIC_FOLLOW)
                    .setUserId(hostHolder.getUser().getId())
                    .setEntityType(entityType)
                    .setEntityId(entityId)
                    .setEntityUserId(entityId);
            eventProducer.fireEvent(event);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
  • 相关阅读:
    django 操作
    前端ES6-ES11新特性
    flutter 打包apk
    网络协议攻击
    番外 1 : Java 环境下的 selenium 搭建
    angular项目指定端口,实现局域网内ip访问
    react-redux的connect函数实现
    最漂亮:yWorks yFiles Diagramming SDK 5.4.0.2
    【base-0】如果各个前端框架的父子通信都统一会怎样?
    卷妹带你回顾Java基础(一)每日更新Day2
  • 原文地址:https://blog.csdn.net/weixin_44925329/article/details/134183204