可以用阻塞队列来实现消息队列,阻塞队列是一个接口:BlockingQueue,可以用来解决线程通信问题,依靠两个方法,这两个方法都是阻塞的,put方法往队列里存数据,take从队列里拿数据。

生产者消费者模式:
阻塞队列BlockingQueue接口的实现类:ArrayBlockingQueue、LinkedBlockingQueue、PriorityBlockingQueue、SynchronousQueue、DelayQueue等。
代码示例:
public class BlockingQueueTests {
public static void main(String[] args) {
BlockingQueue queue = new ArrayBlockingQueue(10);
new Thread(new Producer(queue)).start();
new Thread(new Consumer(queue)).start();
new Thread(new Consumer(queue)).start();
new Thread(new Consumer(queue)).start();
}
}
// 生产者线程
class Producer implements Runnable {
private BlockingQueue<Integer> queue;
public Producer(BlockingQueue<Integer> queue) {
this.queue = queue;
}
@Override
public void run() {
try {
for (int i = 0; i < 100; i++) {
Thread.sleep(20);
queue.put(i);
System.out.println(Thread.currentThread().getName() + "生产:" + queue.size());
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
// 消费者线程
class Consumer implements Runnable {
private BlockingQueue<Integer> queue;
public Consumer(BlockingQueue<Integer> queue) {
this.queue = queue;
}
@Override
public void run() {
try {
while (true) {
Thread.sleep(new Random().nextInt(1000));
queue.take();
System.out.println(Thread.currentThread().getName() + "消费:" + queue.size());
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
第一步:引入依赖 spring-kafka
<dependency>
<groupId>org.springframework.kafkagroupId>
<artifactId>spring-kafkaartifactId>
dependency>
第二步:配置Kafka
# KafkaProperties
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=community-consumer-group
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=3000
第三步:访问Kafka
代码示例:
public class KafkaTests {
@Autowired
private KafkaProducer kafkaProducer;
@Test
public void testKafka() {
kafkaProducer.sendMessage("test", "你好");
kafkaProducer.sendMessage("test", "在吗");
try {
Thread.sleep(1000 * 10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
@Component
class KafkaProducer {
@Autowired
private KafkaTemplate kafkaTemplate;
public void sendMessage(String topic, String content) {
kafkaTemplate.send(topic, content);
}
}
@Component
class KafkaConsumer {
@KafkaListener(topics = {"test"})
public void handleMessage(ConsumerRecord record) {
System.out.println(record.value());
}
}
功能描述:当用户收到评论、点赞和关注后,会收到来自系统的通知信息。
实现:从技术上来讲,使用kafka消息队列的方式,根据不同的主题处理不同的任务。从业务的角度来讲,采用事件驱动的方式,评论、点赞、关注分别为事件。开发时,在Kafka框架的基础上,基于事件对代码逻辑进一步的封装。

触发事件
处理事件

具体实现:
1.封装事件对象 Event,对类中的set函数都进行改写,使得构造对象时可以连续set
private String topic;
private int userId;
private int entityType;
private int entityId;
private int entityUserId;
private Map<String,Object> data = new HashMap<>();
// 思考重写set方法的用处,全部都做这样的处理,以topic为例。
// 好处:可以连续set
public Event setTopic(String topic) {
this.topic = topic;
return this;
}
2.开发事件的生产者EventProducer,将事件发布到指定的主题,主要有评论、点赞、关注3个topic
@Component
public class EventProducer {
@Autowired
private KafkaTemplate kafkaTemplate;
// 处理事件
public void fireEvent(Event event){
// 将事件发布到指定的主题
kafkaTemplate.send(event.getTopic(), JSONObject.toJSONString(event));
}
}
3.开发事件的消费者EventConsumer
@Component
public class EventConsumer implements CommunityConstant {
private static final Logger logger = LoggerFactory.getLogger(EventConsumer.class);
@Autowired
private MessageService messageService;
@KafkaListener(topics = {TOPIC_COMMENT,TOPIC_LIKE,TOPIC_FOLLOW})
public void handleCommentMessage(ConsumerRecord record){
if(record == null || record.value() == null){
logger.error("消息的内容为空!");
return;
}
Event event = JSONObject.parseObject(record.value().toString(),Event.class);
if(event == null){
logger.error("消息格式错误!");
return;
}
// 发送站内通知
Message message = new Message();
message.setFromId(SYSTEM_USER_ID);
message.setToId(event.getEntityUserId());
message.setConversationId(event.getTopic());
message.setCreateTime(new Date());
Map<String,Object> content = new HashMap<>();
content.put("userId",event.getUserId());
content.put("entityType",event.getEntityType());
content.put("entityId",event.getEntityId());
if(!event.getData().isEmpty()){
for(Map.Entry<String,Object> entry:event.getData().entrySet()){
content.put(entry.getKey(),entry.getValue());
}
}
message.setContent(JSONObject.toJSONString(content));
messageService.addMessage(message);
}
}
4.触发事件,有3个地方会触发该事件
// 触发评论事件
Event event = new Event()
.setTopic(TOPIC_COMMENT)
.setUserId(hostHolder.getUser().getId())
.setEntityType(comment.getEntityType())
.setEntityId(comment.getEntityId())
.setData("postId",discussPostId);
if(comment.getEntityType() == ENTITY_TYPE_POST){
DiscussPost target = discussPostService.findDiscussPostById(comment.getEntityId());
event.setEntityUserId(target.getUserId());
}else if(comment.getEntityType() == ENTITY_TYPE_COMMENT){
Comment target = commentService.findCommentById(comment.getEntityId());
event.setEntityUserId(target.getUserId());
}
eventProducer.fireEvent(event);
// 触发点赞事件
if(likeStatus == 1){
Event event = new Event()
.setTopic(TOPIC_LIKE)
.setUserId(hostHolder.getUser().getId())
.setEntityType(entityType)
.setEntityId(entityId)
.setEntityUserId(entityUserId)
.setData("postId",postId);
eventProducer.fireEvent(event);
}
// 触发关注事件
Event event = new Event()
.setTopic(TOPIC_FOLLOW)
.setUserId(hostHolder.getUser().getId())
.setEntityType(entityType)
.setEntityId(entityId)
.setEntityUserId(entityId);
eventProducer.fireEvent(event);
**小结:**首先需要创建事件生产者类,将事件发布到指定主题,主题包括评论、点赞、关注这3种,然后创建事件消费者类,定义消费该类事件的方法,使用@KafkaListener注解,属性topics中是消费的时间类型,然后在相应的controller类中加入触发事件的代码块,发布评论时、点赞后、关注后都需要触发事件。
功能描述:
