import java.util.Date;
public class Message {
private int id;
private int fromId;
private int toId;
private String conversationId;
private String content;
private int status;
private Date createTime;
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public int getFromId() {
return fromId;
}
public void setFromId(int fromId) {
this.fromId = fromId;
}
public int getToId() {
return toId;
}
public void setToId(int toId) {
this.toId = toId;
}
public String getConversationId() {
return conversationId;
}
public void setConversationId(String conversationId) {
this.conversationId = conversationId;
}
public String getContent() {
return content;
}
public void setContent(String content) {
this.content = content;
}
public int getStatus() {
return status;
}
public void setStatus(int status) {
this.status = status;
}
public Date getCreateTime() {
return createTime;
}
public void setCreateTime(Date createTime) {
this.createTime = createTime;
}
@Override
public String toString() {
return "Message{" +
"id=" + id +
", fromId=" + fromId +
", toId=" + toId +
", conversationId='" + conversationId + '\'' +
", content='" + content + '\'' +
", status=" + status +
", createTime=" + createTime +
'}';
}
}
定义事件消费者
import com.alibaba.fastjson.JSONObject;
import edu.npu.newcoder.community.community.entity.DiscussPost;
import edu.npu.newcoder.community.community.entity.Event;
import edu.npu.newcoder.community.community.entity.Message;
import edu.npu.newcoder.community.community.service.DiscussPostService;
import edu.npu.newcoder.community.community.service.ElasticsearchService;
import edu.npu.newcoder.community.community.service.MessageService;
import edu.npu.newcoder.community.community.util.CommunityConstant;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
@Component
public class EventConsumer implements CommunityConstant {
// private static final Logger logger= LoggerFactory.getLogger(EventConsumer.class);
@Autowired
private MessageService messageService;
@Autowired
private DiscussPostService discussPostService;
@Autowired
private ElasticsearchService elasticsearchService;
//加一个监听相关主题的listener
@KafkaListener(topics = {TOPIC_COMMENT,TOPIC_LIKE,TOPIC_FOLLOW})
public void handleCommentMessage(ConsumerRecord record){
if(record == null || record.value()==null){
System.out.println("错误发帖");
return;
}
Event event= JSONObject.parseObject(record.value().toString(),Event.class);
if(event == null){
System.out.println("错误发帖");
return;
}
//发送站内通知
Message message = new Message();
message.setFromId(SYSTEM_USERID);
message.setToId(event.getEntityUserId());
message.setConversationId(event.getTopic());
message.setCreateTime(new Date());
//message的内容
Map<String,Object> content=new HashMap<>();
content.put("userId",event.getUserId());
content.put("entityType",event.getEntityType());
content.put("entityId",event.getEntityId());
if(!event.getData().isEmpty()){
for(Map.Entry<String,Object> entry:event.getData().entrySet()){
content.put(entry.getKey(),entry.getValue());
}
}
message.setContent(JSONObject.toJSONString(content));
System.out.println(message);
messageService.addMessage(message);
System.out.println("成功处理事件");
}
}
定义一个事件实体 以方便在消息的发送与处理
import java.util.HashMap;
import java.util.Map;
//用于事件驱动的kafka消息队列开发
public class Event {
private String topic;
//事件触发的人
private int userId;
//事件发生在哪个实体
private int entityType;
private int entityId;
//实体作者
private int entityUserId;
//存储额外数据
private Map<String,Object> data = new HashMap<>();
public String getTopic() {
return topic;
}
public Event setTopic(String topic) {
this.topic = topic;
return this;
}
public int getUserId() {
return userId;
}
public Event setUserId(int userId) {
this.userId = userId;
return this;
}
public int getEntityType() {
return entityType;
}
public Event setEntityType(int entityType) {
this.entityType = entityType;
return this;
}
public int getEntityId() {
return entityId;
}
public Event setEntityId(int entityId) {
this.entityId = entityId;
return this;
}
public int getEntityUserId() {
return entityUserId;
}
public Event setEntityUserId(int entityUserId) {
this.entityUserId = entityUserId;
return this;
}
public Map<String, Object> getData() {
return data;
}
public Event setData(String key,Object value) {
this.data.put(key,value);
return this;
}
}
定义事件的生产者
import com.alibaba.fastjson.JSONObject;
import edu.npu.newcoder.community.community.entity.Event;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
@Component
public class EventProducer {
//生产者使用kafkaTemplate发送消息
@Autowired
KafkaTemplate kafkaTemplate;
//处理事件
public void fireEvent(Event event){
//将事件发布到指定的主题
//将event转换为json数据进行消息发送
kafkaTemplate.send(event.getTopic(), JSONObject.toJSONString(event));
System.out.println("成功发送"+event.getTopic());
}
}
定义事件消费者
import com.alibaba.fastjson.JSONObject;
import edu.npu.newcoder.community.community.entity.DiscussPost;
import edu.npu.newcoder.community.community.entity.Event;
import edu.npu.newcoder.community.community.entity.Message;
import edu.npu.newcoder.community.community.service.DiscussPostService;
import edu.npu.newcoder.community.community.service.ElasticsearchService;
import edu.npu.newcoder.community.community.service.MessageService;
import edu.npu.newcoder.community.community.util.CommunityConstant;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
@Component
public class EventConsumer implements CommunityConstant {
// private static final Logger logger= LoggerFactory.getLogger(EventConsumer.class);
@Autowired
private MessageService messageService;
@Autowired
private DiscussPostService discussPostService;
@Autowired
private ElasticsearchService elasticsearchService;
//加一个监听相关主题的listener
@KafkaListener(topics = {TOPIC_COMMENT,TOPIC_LIKE,TOPIC_FOLLOW})
public void handleCommentMessage(ConsumerRecord record){
if(record == null || record.value()==null){
System.out.println("错误发帖");
return;
}
Event event= JSONObject.parseObject(record.value().toString(),Event.class);
if(event == null){
System.out.println("错误发帖");
return;
}
//发送站内通知
Message message = new Message();
message.setFromId(SYSTEM_USERID);
message.setToId(event.getEntityUserId());
message.setConversationId(event.getTopic());
message.setCreateTime(new Date());
//message的内容
Map<String,Object> content=new HashMap<>();
content.put("userId",event.getUserId());
content.put("entityType",event.getEntityType());
content.put("entityId",event.getEntityId());
if(!event.getData().isEmpty()){
for(Map.Entry<String,Object> entry:event.getData().entrySet()){
content.put(entry.getKey(),entry.getValue());
}
}
message.setContent(JSONObject.toJSONString(content));
System.out.println(message);
messageService.addMessage(message);
System.out.println("成功处理事件");
}
}
在特定的地方触发消息产生
//触发评论事件
Event event=new Event().setTopic(TOPIC_COMMENT)
.setUserId(hostHolder.getUser().getId())
.setEntityType(comment.getEntityType())
.setEntityId(comment.getEntityId())
.setData("postId",discussPostId);
if(comment.getEntityType() == ENTITY_TYPE_POST){
DiscussPost target=discussPostService.findDiscussPostById(comment.getEntityId());
event.setEntityUserId(target.getUserId());
}else if(comment.getEntityType()==ENTITY_TYPE_COMMENT){
//根据评论的id查询评论
Comment target =commentService.findCommentById(comment.getEntityId());
event.setEntityUserId(target.getUserId());
}
eventProducer.fireEvent(event);
//触发点赞事件
if(likeStatus ==1){
Event event =new Event()
.setTopic(TOPIC_LIKE)
.setUserId(hostHolder.getUser().getId())
.setEntityType(entityType)
.setEntityId(entityId)
.setEntityUserId(entityUserId)
.setData("postId",postId);
eventProducer.fireEvent(event);
}
//触发关注事件
Event event = new Event()
.setTopic(TOPIC_FOLLOW)
.setUserId(hostHolder.getUser().getId())
.setEntityType(entityType)
.setEntityId(entityId)
.setEntityUserId(entityId);
eventProducer.fireEvent(event);