
将库存量和一人一单的判断放在Redis中进行判断,如果满足下单条件异步完成下单操作,减少整体业务时间,和数据库交互次数。

判断库存是否充足,用户是否已经下过单,扣除库存,必须需要具有原子性,则使用Lua脚本完成


1.新增秒杀优惠券同时,保存在Redis中
- private static final String SECKILL_STOCK_KEY = "seckill:stock:";
-
- @Autowired
- private StringRedisTemplate stringRedisTemplate;
-
-
- public void addSeckillVoucher(Voucher voucher) {
- //新增秒杀卷 -- 保存数据库
- //saveSeckillVoucher(voucher);
- //将秒杀卷库存数量保存到Redis中
- stringRedisTemplate.opsForValue().set(SECKILL_STOCK_KEY + voucher.getId(), voucher.getStock().toString());
- }
2.利用Lua脚本,判断库存,一人一单,是否有抢购资格
- --1删除列表
- --1.1优惠券ID
- local voucherId = ARGV[1]
- --1.2 用户ID
- local userId = ARGV[2]
-
- --2.数据key
- --2.1库存key
- local stockKey = 'seckill:stock:' .. voucherId
- --2.2订单key
- local orderKey = 'seckill:order:' .. voucherId
-
- --3.脚本业务
- --3.1判断库存是否充足
- --字符串不能跟数字进行比对,需要使用tonumber转为数字
- if (tonumber(redis.call('get', stockKey)) <= 0) then
- --3.2库存不足返回1
- return 1
- end
- --3.3判断用户是否下单
- if (redis.call('sismember', orderKey, userId) == 1) then
- --3.4存在,说明用户重复下单,返回2
- return 2
- end
- --3.5扣库存
- redis.call('incrby', stockKey, -1)
- --3.6下单保存用户
- redis.call('sadd', orderKey, userId)
- return 0
3.抢购成功加入阻塞队列
- private static final DefaultRedisScript
SECKILL_SCRIPT; -
- static {
- SECKILL_SCRIPT = new DefaultRedisScript<>();
- SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));
- SECKILL_SCRIPT.setResultType(Long.class);
- }
- @Autowired
- private RedisIDGenerator redisIDGenerator;
- public void seckillVoucher(Long voucherId, Long userId) {
- //1.执行lua脚本
- Long result = stringRedisTemplate.execute(
- SECKILL_SCRIPT,
- Collections.emptyList(),
- voucherId.toString(), userId.toString());
- //2.判断是否为0
- int r = result.intValue();
- if (r != 0) {
- //操作失败 1库存不足 2重复下单
- log.info("下单失败, {}", (result == 1 ? "库存不足" : "重复下单"));
- return;
- }
- //3 为0,将下单信息保存到阻塞队列中
- long orderId = redisIDGenerator.nextId("order");
- VourcherOrder vourcherOrder = new VourcherOrder();
- vourcherOrder.setOrderId(orderId);
- vourcherOrder.setUserId(userId);
- vourcherOrder.setVoucherId(voucherId);
- blockingQueue.add(vourcherOrder);
- log.info("下单成功");
- }
4.开启线程从阻塞队列中获取task,完成下单
- private BlockingQueue
blockingQueue = new ArrayBlockingQueue<>(1024 * 1024); -
- private static ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();
-
- private VoucherService proxy;
-
- //类启动时就执行handler处理阻塞队列内容
- @PostConstruct
- private void init() {
- SECKILL_ORDER_EXECUTOR.execute(new VoucherOrderHandler());
- proxy = (VoucherService) AopContext.currentProxy();
- }
-
- private class VoucherOrderHandler implements Runnable {
-
- @Override
- public void run() {
- while (true) {
- try {
- //获取队列中的订单信息
- VourcherOrder vourcherOrder = blockingQueue.take();
- //创建订单
- handlerVoucherOrder(vourcherOrder);
- } catch (Exception e) {
- log.error("处理订单异常{}", e);
- }
- }
- }
- }
-
- private void handlerVoucherOrder(VourcherOrder vourcherOrder) {
- //1获取用户
- Long userId = vourcherOrder.getUserId();
- //2获取锁对象
- RLock lock = redissonClient.getLock("lock:order:" + userId);
- //3判断锁是否获取成功
- boolean isLock = lock.isLocked();
- if (!isLock) {
- log.error("handlerVoucherOrder -- 不能重复下单");
- return;
- }
- //4获取成功,则保存数据库,因为调用的方法存在事务则需要代理对象
- // 但是调用方式一个子线程,无法获取到代理对象,则需要在主线程中提前获得proxy
- try {
- proxy.createOrder(vourcherOrder);
- } finally {
- lock.unlock();
- }
- }
-
- @Transactional
- public String createOrder(VourcherOrder vourcherOrder) {
- //查询数据库order中userId是否存在记录,存在则不让购买
- Long userId = vourcherOrder.getUserId();
- int count = queryOrderCount(userId);
- if (count > 0) {
- //已买过购买失败
- return "fail";
- }
-
- boolean success = order_count(userId);
- if (!success) {
- return "库存不足";
- }
-
- saveOrder(userId);
- return "成功";
- // }
- }
-
- private void saveOrder(Long userId) {
- //模拟提交订单插入数据库
- //insert into order values(userId.....);
- }
-
- private boolean order_count(Long userId) {
- //修改库存
- //update order_count set count = count -1 where userId =? and count >0;
- return true;
- }
-
- private int queryOrderCount(Long userId) {
- //模拟查询数据库
- //select count(*) from order where userId = ?
- return 0;
- }
存在的问题: 但是自实现的阻塞队列,存在安全问题,例如:任务过多内存溢出,消费后失败不能重试,宕机导致数据丢失等....
解决方案:使用第三方的MQ服务,来存储任务,第三方的MQ服务独立在JVM外,则可以解决内存问题,并且服务本身有很好的安全属性,来解决宕机等问题。第三方的MQ服务--RabbitMQ,ActiveMQ,Kafka等
这里使用Redis所带功能实现消息队列:





解决消息丢失:



代码实现:

1.创建Stream类型的消息队列
2,lua脚本
- --1删除列表
- --1.1优惠券ID
- local voucherId = ARGV[1]
- --1.2 用户ID
- local userId = ARGV[2]
- --1.3 订单ID
- local orderId = ARGV[2]
-
- --2.数据key
- --2.1库存key
- local stockKey = 'seckill:stock:' .. voucherId
- --2.2订单key
- local orderKey = 'seckill:order:' .. voucherId
-
- --3.脚本业务
- --3.1判断库存是否充足
- --字符串不能跟数字进行比对,需要使用tonumber转为数字
- if (tonumber(redis.call('get', stockKey)) <= 0) then
- --3.2库存不足返回1
- return 1
- end
- --3.3判断用户是否下单
- if (redis.call('sismember', orderKey, userId) == 1) then
- --3.4存在,说明用户重复下单,返回2
- return 2
- end
- --3.5扣库存
- redis.call('incrby', stockKey, -1)
- --3.6下单保存用户
- redis.call('sadd', orderKey, userId)
- --3.7 发送消息到队列中 XADD stream.order * k1 v1 k2 v2
- redis.call('xadd', 'stream.order', '*', 'userId', userId, 'voucherId', voucherId, 'id', orderId)
- return 0
3.启动线程完成下单
- private class VoucherOrderHandler implements Runnable {
- String queueName = "stream.order";
-
- @Override
- public void run() {
- while (true) {
- try {
- //1.获取消息队列中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS stream.order >
- List
> list = stringRedisTemplate.opsForStream().read( - Consumer.from("g1", "c1"),
- StreamReadOptions.empty().count(1L).block(Duration.ofSeconds(2)),
- StreamOffset.create(queueName, ReadOffset.lastConsumed()));
- //2.判断消息获取是否成功
- if (CollectionUtil.isEmpty(list)) {
- //2.1如果获取失败,说明没有消息,继续下一次循环
- continue;
- }
- //3.解析消息订单信息
- MapRecord
record = list.get(0); - Map
- VourcherOrder vourcherOrder = BeanUtil.fillBeanWithMap(value, new VourcherOrder(), true);
- //4.如果获取成功可以下单
- handlerVoucherOrder(vourcherOrder);
- //5.ACK确认
- stringRedisTemplate.opsForStream().acknowledge(queueName, "g1", record.getId());
-
- } catch (Exception e) {
- log.error("处理订单异常{}", e);
- handlePendingList();
- }
- }
- }
-
- private void handlePendingList() {
- while (true) {
- try {
- //1.获取消息队列中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS stream.order 0
- List
> list = stringRedisTemplate.opsForStream().read( - Consumer.from("g1", "c1"),
- StreamReadOptions.empty().count(1L),
- StreamOffset.create(queueName, ReadOffset.from("0")));
- //2.判断消息获取是否成功
- if (CollectionUtil.isEmpty(list)) {
- //2.1如果获取失败,说明没有消息,跳出循环
- break;
- }
- //3.解析消息订单信息
- MapRecord
record = list.get(0); - Map
- VourcherOrder vourcherOrder = BeanUtil.fillBeanWithMap(value, new VourcherOrder(), true);
- //4.如果获取成功可以下单
- handlerVoucherOrder(vourcherOrder);
- //5.ACK确认
- stringRedisTemplate.opsForStream().acknowledge(queueName, "g1", record.getId());
-
- } catch (Exception e) {
- log.error("处理订单异常{}", e);
- }
- }
- }
-
- }
-
- private void handlerVoucherOrder(VourcherOrder vourcherOrder) {
- //1获取用户
- Long userId = vourcherOrder.getUserId();
- //2获取锁对象
- RLock lock = redissonClient.getLock("lock:order:" + userId);
- //3判断锁是否获取成功
- boolean isLock = lock.isLocked();
- if (!isLock) {
- log.error("handlerVoucherOrder -- 不能重复下单");
- return;
- }
- //4获取成功,则保存数据库,因为调用的方法存在事务则需要代理对象
- // 但是调用方式一个子线程,无法获取到代理对象,则需要在主线程中提前获得proxy
- try {
- proxy.createOrder(vourcherOrder);
- } finally {
- lock.unlock();
- }
- }