核心思路:减少对数据库的访问,利用Redis的高并发特性来实现。
- 系统初始化,把商品库存数量加载到Redis。
//这一步操作确保了在秒杀开始之前,商品库存数据已经预加载到Redis中,减少了在秒杀开始时对数据库的访问。- 收到请求,Redis预减库存,库存不足,直接返回,否则进入3。
//在收到用户的秒杀请求后,首先从Redis中减去相应的商品数量。如果减去后的数量小于0,说明商品库存不足,直接返回秒杀失败。否则,进入下一步。- 请求入队,立即返回排队中。(1-3 异步下单)
//如果商品库存充足,用户的秒杀请求会被放入一个队列中,然后立即返回给用户一个正在排队中的信息。这一步操作是异步的,即不需要等待队列中的请求全部处理完毕,就可以立即返回。- 请求出队,生成订单,减少库存。
//当队列中有请求需要处理时,就从队列中取出一个请求,然后生成相应的订单,并从Redis中减去相应的商品数量。- 客户端轮询,是否秒杀成功(4-5 并发操作)
//客户端在发送秒杀请求后,会定时向服务器发送轮询请求,询问秒杀是否成功。在这个过程中,步骤4和5是并发进行的,即服务器的订单生成和库存减少操作,以及客户端的轮询操作,可以同时进行。
内容模块
- Redis预减库存减少数据库访问
- 内存标记减少Redis访问
- RabbitMQ队列缓冲,异步下单,增强用户体验
- RabbitMQ安装与Spring Boot集成
- 访问Nginx水平扩展
- 压测
RabbitMQ是一个由erlang开发的AMQP(Advanved Message Queue Protocol)的开源实现
安装 erlang:安装erlang步骤
这里要注意:rabbitmq版本要和erlang版本对应
这里因为版本问题 rabbitmq一直无法运行
解决:https://blog.csdn.net/qq_43616898/article/details/105602839
运行成功
步骤
- 添加依赖 spring-boot-starter-amqp
- 创建消息接受者
- 创建消息发送者
<dependency>
<groupId>org.springframework.bootgroupId>
<artifactId>spring-boot-starter-amqpartifactId>
dependency>
#rabbitmq
spring.rabbitmq.host=192.168.33.10
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
#\u6D88\u8D39\u8005\u6570\u91CF
spring.rabbitmq.listener.simple.concurrency= 10
spring.rabbitmq.listener.simple.max-concurrency= 10
#\u6D88\u8D39\u8005\u6BCF\u6B21\u4ECE\u961F\u5217\u83B7\u53D6\u7684\u6D88\u606F\u6570\u91CF
spring.rabbitmq.listener.simple.prefetch= 1
#\u6D88\u8D39\u8005\u81EA\u52A8\u542F\u52A8
spring.rabbitmq.listener.simple.auto-startup=true
#\u6D88\u8D39\u5931\u8D25\uFF0C\u81EA\u52A8\u91CD\u65B0\u5165\u961F
spring.rabbitmq.listener.simple.default-requeue-rejected= true
#\u542F\u7528\u53D1\u9001\u91CD\u8BD5
spring.rabbitmq.template.retry.enabled=true
spring.rabbitmq.template.retry.initial-interval=1000
spring.rabbitmq.template.retry.max-attempts=3
spring.rabbitmq.template.retry.max-interval=10000
spring.rabbitmq.template.retry.multiplier=1.0
package com.example.rabbitmq;
import java.util.HashMap;
import java.util.Map;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.HeadersExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class MQConfig {
public static final String QUEUE = "queue";
@Bean
public Queue queue() {
return new Queue(QUEUE, true);
}
}
AmqpTemplate对象作用
- 发送消息:允许你发送各种类型的消息(如简单文本、JSON、POJOs等)到队列、主题或交换机。
- 接收消息:通过在方法上使用@RabbitListener注解,AmqpTemplate可以自动为你处理接收到的消息。
- 连接和断开连接:AmqpTemplate提供了连接和断开连接的方法,这样你就可以轻松地在你的应用中使用RabbitMQ。
- 错误处理:如果在发送或接收消息时发生异常,AmqpTemplate可以自动为你处理这些异常。
//创建发送者
@Service
public class MQSender {
private static Logger log = LoggerFactory.getLogger(MQSender.class);
@Autowired
AmqpTemplate amqpTemplate ;
public void send(Object message) {
String msg = RedisService.beanToString(message);
log.info("send message:"+msg);
amqpTemplate.convertAndSend(MQConfig.QUEUE, msg);
}
}
//创建接受者
@Service
public class MQReceiver {
private static Logger log = LoggerFactory.getLogger(MQReceiver.class);
@Autowired
RedisService redisService;
@RabbitListener(queues=MQConfig.QUEUE)
public void receive(String message) {
log.info("receive message:"+message);
}
测试
@RequestMapping("/mq")
@ResponseBody
public Result<String> mq(){
mqSender.send("nihao,我是mq");
return Result.success("hello,mq");
}
以下的图表展示了RabbitMQ四种交换机模式:
名称 | 描述 |
---|---|
Direct Exchange(直连交换机) | 根据Routing Key将消息投递到不同的队列。 每个队列都会绑定一个交换机和特定的Routing Key。当消息被发送到交换机时,它会被送到指定的队列。 |
Fanout Exchange(扇出交换机) | 采用广播模式 ,根据绑定的交换机,路由到与之对应的所有队列。也就是说,如果一个交换机绑定了多个队列,消息将会被广播到这些队列。 |
Topic Exchange(主题交换机) | 对路由键进行模式匹配后进行投递 。这种类型的交换机将根据Routing Key的模式匹配来决定将消息投递到哪个队列。 |
Headers Exchange(头交换机) | 不处理Routing Key,而是根据发送的消息内容中的headers属性 进行匹配。这种类型的交换机将根据消息的headers属性来决定将消息投递到哪个队列。 |
@Configuration
public class MQConfig {
public static final String MIAOSHA_QUEUE = "miaosha.queue";
public static final String QUEUE = "queue";
public static final String TOPIC_QUEUE1 = "topic.queue1";
public static final String TOPIC_QUEUE2 = "topic.queue2";
public static final String HEADER_QUEUE = "header.queue";
public static final String TOPIC_EXCHANGE = "topicExchage";
public static final String FANOUT_EXCHANGE = "fanoutxchage";
public static final String HEADERS_EXCHANGE = "headersExchage";
/**
* Direct模式 交换机Exchange
* */
@Bean
public Queue queue() {
return new Queue(QUEUE, true);
}
/**
* Topic模式 交换机Exchange
* */
@Bean
public Queue topicQueue1() {
return new Queue(TOPIC_QUEUE1, true);
}
@Bean
public Queue topicQueue2() {
return new Queue(TOPIC_QUEUE2, true);
}
@Bean
public TopicExchange topicExchage(){
return new TopicExchange(TOPIC_EXCHANGE);
}
@Bean
public Binding topicBinding1() {
return BindingBuilder.bind(topicQueue1()).to(topicExchage()).with("topic.key1");
}
@Bean
public Binding topicBinding2() {
return BindingBuilder.bind(topicQueue2()).to(topicExchage()).with("topic.#");
}
/**
* Fanout模式 交换机Exchange
* */
@Bean
public FanoutExchange fanoutExchage(){
return new FanoutExchange(FANOUT_EXCHANGE);
}
@Bean
public Binding FanoutBinding1() {
return BindingBuilder.bind(topicQueue1()).to(fanoutExchage());
}
@Bean
public Binding FanoutBinding2() {
return BindingBuilder.bind(topicQueue2()).to(fanoutExchage());
}
/**
* Header模式 交换机Exchange
* */
@Bean
public HeadersExchange headersExchage(){
return new HeadersExchange(HEADERS_EXCHANGE);
}
@Bean
public Queue headerQueue1() {
return new Queue(HEADER_QUEUE, true);
}
@Bean
public Binding headerBinding() {
Map<String, Object> map = new HashMap<String, Object>();
map.put("header1", "value1");
map.put("header2", "value2");
return BindingBuilder.bind(headerQueue1()).to(headersExchage()).whereAll(map).match();
}
}
package com.example.rabbitmq;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import com.example.redis.RedisService;
//创建发送者
@Service
public class MQSender {
private static Logger log = LoggerFactory.getLogger(MQSender.class);
@Autowired
AmqpTemplate amqpTemplate ;
public void sendMiaoshaMessage(MiaoshaMessage mm) {
String msg = RedisService.beanToString(mm);
log.info("send message:"+msg);
amqpTemplate.convertAndSend(MQConfig.MIAOSHA_QUEUE, msg);
}
// public void send(Object message) {
// String msg = RedisService.beanToString(message);
// log.info("send message:"+msg);
// amqpTemplate.convertAndSend(MQConfig.QUEUE, msg);
// }
//
public void sendTopic(Object message) {
String msg = RedisService.beanToString(message);
log.info("send topic message:"+msg);
amqpTemplate.convertAndSend(MQConfig.TOPIC_EXCHANGE, "topic.key1", msg+"1");
amqpTemplate.convertAndSend(MQConfig.TOPIC_EXCHANGE, "topic.key2", msg+"2");
}
public void sendFanout(Object message) {
String msg = RedisService.beanToString(message);
log.info("send fanout message:"+msg);
amqpTemplate.convertAndSend(MQConfig.FANOUT_EXCHANGE, "", msg);
}
public void sendHeader(Object message) {
String msg = RedisService.beanToString(message);
log.info("send fanout message:"+msg);
MessageProperties properties = new MessageProperties();
properties.setHeader("header1", "value1");
properties.setHeader("header2", "value2");
Message obj = new Message(msg.getBytes(), properties);
amqpTemplate.convertAndSend(MQConfig.HEADERS_EXCHANGE, "", obj);
}
}
package com.example.rabbitmq;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import com.example.domain.MiaoshaOrder;
import com.example.domain.MiaoShaUser;
import com.example.redis.RedisService;
import com.example.service.GoodsService;
import com.example.service.MiaoshaService;
import com.example.service.OrderService;
import com.example.vo.GoodsVo;
@Service
public class MQReceiver {
private static Logger log = LoggerFactory.getLogger(MQReceiver.class);
@Autowired
RedisService redisService;
@Autowired
GoodsService goodsService;
@Autowired
OrderService orderService;
@Autowired
MiaoshaService miaoshaService;
// @RabbitListener(queues=MQConfig.MIAOSHA_QUEUE)
// public void receive(String message) {
// log.info("receive message:"+message);
// MiaoshaMessage mm = RedisService.stringToBean(message, MiaoshaMessage.class);
// MiaoShaUser user = mm.getUser();
// long goodsId = mm.getGoodsId();
//
// GoodsVo goods = goodsService.getGoodsVoByGoodsId(goodsId);
// int stock = goods.getStockCount();
// if(stock <= 0) {
// return;
// }
// //判断是否已经秒杀到了
// MiaoshaOrder order = orderService.getMiaoshaOrderByUserIdGoodsId(user.getId(), goodsId);
// if(order != null) {
// return;
// }
// //减库存 下订单 写入秒杀订单
// miaoshaService.miaosha(user, goods);
// }
// @RabbitListener(queues=MQConfig.QUEUE)
// public void receive(String message) {
// log.info("receive message:"+message);
// }
@RabbitListener(queues=MQConfig.TOPIC_QUEUE1)
public void receiveTopic1(String message) {
log.info(" topic queue1 message:"+message);
}
@RabbitListener(queues=MQConfig.TOPIC_QUEUE2)
public void receiveTopic2(String message) {
log.info(" topic queue2 message:"+message);
}
@RabbitListener(queues=MQConfig.HEADER_QUEUE)
public void receiveHeaderQueue(byte[] message) {
log.info(" header queue message:"+new String(message));
}
}
初始实现秒杀功能
@RequestMapping(value="/do_miaosha", method= RequestMethod.POST)
@ResponseBody
public Result<OrderInfo> miaosha(Model model, MiaoShaUser user,
@RequestParam("goodsId")long goodsId) {
model.addAttribute("user", user);
if(user == null) {
return Result.error(CodeMsg.SESSION_ERROR);
}
//判断库存
GoodsVo goods = goodsService.getGoodsVoByGoodsId(goodsId);//10个商品,req1 req2
int stock = goods.getStockCount();
if(stock <= 0) {
return Result.error(CodeMsg.MIAO_SHA_OVER);
}
//判断是否已经秒杀到了
MiaoshaOrder order = orderService.getMiaoshaOrderByUserIdGoodsId(user.getId(), goodsId);
if(order != null) {
return Result.error(CodeMsg.REPEATE_MIAOSHA);
}
//减库存 下订单 写入秒杀订单
OrderInfo orderInfo = miaoshaService.miaosha(user, goods);
return Result.success(orderInfo);
}
核心思路:减少对数据库的访问,利用Redis的高并发特性来实现。
- 系统初始化,把商品库存数量加载到Redis。
//这一步操作确保了在秒杀开始之前,商品库存数据已经预加载到Redis中,减少了在秒杀开始时对数据库的访问。- 收到请求,Redis预减库存,库存不足,直接返回,否则进入3。
//在收到用户的秒杀请求后,首先从Redis中减去相应的商品数量。如果减去后的数量小于0,说明商品库存不足,直接返回秒杀失败。否则,进入下一步。- 请求入队,立即返回排队中。(1-3 异步下单)
//如果商品库存充足,用户的秒杀请求会被放入一个队列中,然后立即返回给用户一个正在排队中的信息。这一步操作是异步的,即不需要等待队列中的请求全部处理完毕,就可以立即返回。- 请求出队,生成订单,减少库存。
//当队列中有请求需要处理时,就从队列中取出一个请求,然后生成相应的订单,并从Redis中减去相应的商品数量。- 客户端轮询,是否秒杀成功(4-5 并发操作)
//客户端在发送秒杀请求后,会定时向服务器发送轮询请求,询问秒杀是否成功。在这个过程中,步骤4和5是并发进行的,即服务器的订单生成和库存减少操作,以及客户端的轮询操作,可以同时进行。