代码已上传至gitee上,地址:https://gitee.com/lin-jinghao/dazuodianping
全局唯一ID生成策略:
这里使用Redis自增的数值,并拼接一些其它信息
Redis自增ID策略:
ID的组成部分:
@Component
public class RedisIdWorker {
private static final long BEGIN_TIMESTAMP=1640995200L;
private static final int COUNT_BITS=32;
@Resource
private StringRedisTemplate stringRedisTemplate;
public long nextId(String keyPrefix){
// 1.生成时间戳
LocalDateTime now=LocalDateTime.now();
long nowSecond=now.toEpochSecond(ZoneOffset.UTC);
long timestamp=nowSecond-BEGIN_TIMESTAMP;
// 2.生成序列号
// 获取到当前日期
String date = now.format(DateTimeFormatter.ofPattern("yyyyMMdd"));
long count =stringRedisTemplate.opsForValue().increment("icr:"+keyPrefix+":"+date);
// 3.拼接并返回
return timestamp <<COUNT_BITS | count;
}
}
下单时需要判断两点:
@Resource
private ISeckillVoucherService seckillVoucherService;
@Resource
private SeckillVoucherMapper seckillVoucherMapper;
@Resource
private RedisIdWorker redisIdWorker;
// 秒杀优惠券订单
@Transactional
public Result seckillVoucherOrder(Long voucherId) {
// 1.根据id查询优惠券
SeckillVoucher seckillVoucher = seckillVoucherService.getById(voucherId);
// 2.判断秒杀是否开始
if (seckillVoucher.getBeginTime().isAfter(LocalDateTime.now())) {
// 秒杀未开始
return Result.fail("秒杀尚未开始");
}
// 3.判断秒杀是否结束
if (seckillVoucher.getEndTime().isBefore(LocalDateTime.now())) {
// 秒杀已经结束
return Result.fail("秒杀已经结束");
}
// 4.判断库存是否充足
if (seckillVoucher.getStock() < 1){
// 库存不足
return Result.fail("库存不足");
}
// 5.扣减库存
UpdateWrapper<SeckillVoucher> updateWrapper = new UpdateWrapper<>();
updateWrapper.set("stock",seckillVoucher.getStock() - 1);
int update = seckillVoucherMapper.update(null, updateWrapper);
// 6.创建订单
VoucherOrder voucherOrder = new VoucherOrder();
// 订单id
long orderId = redisIdWorker.uniqueId("order");
voucherOrder.setId(orderId);
// 用户id
voucherOrder.setVoucherId(UserHolder.getUser().getId());
// 代金券id
voucherOrder.setUserId(voucherId);
save(voucherOrder);
// 7.返回订单id
return Result.ok(orderId);
}
结果如下:
就是在高并发的场景下,可能会有多个线程同时进行查询,当商品数量仅剩1个时,多个线程同时查询,都判断为1,都会进行下单。
使用jmeter测试:
采用CAS法解决多线程并发安全问题:
@Transactional
public Result seckillVoucher(Long voucherId) {
// 1.根据id查询优惠券
SeckillVoucher seckillVoucher = seckillVoucherService.getById(voucherId);
// 2.判断秒杀是否开始
if (seckillVoucher.getBeginTime().isAfter(LocalDateTime.now())) {
// 秒杀未开始
return Result.fail("秒杀尚未开始");
}
// 3.判断秒杀是否结束
if (seckillVoucher.getEndTime().isBefore(LocalDateTime.now())) {
// 秒杀已经结束
return Result.fail("秒杀已经结束");
}
// 4.判断库存是否充足
if (seckillVoucher.getStock() < 1){
// 库存不足
return Result.fail("库存不足");
}
// 5.扣减库存
boolean update = seckillVoucherService
.update()
.setSql("stock = stock -1")
.eq("voucher_id", voucherId)
.gt("stock", 0).update(); //设置库存大于0
if (!update){
return Result.fail("库存不足!");
}
// 6.创建订单
VoucherOrder voucherOrder = new VoucherOrder();
// 订单id
long orderId = redisIdWorker.nextId("order");
voucherOrder.setId(orderId);
// 用户id
voucherOrder.setUserId(UserHolder.getUser().getId());
// 代金券id
voucherOrder.setVoucherId(voucherId);
save(voucherOrder);
// 7.返回订单id
return Result.ok(orderId);
}
需求:修改秒杀业务,要求同一个优惠券,一个用户只能下一单
<!-- 基于aop代理工厂面向切面编程所需依赖-->
<dependency>
<groupId>org.aspectj</groupId>
<artifactId>aspectjweaver</artifactId>
</dependency>
暴露代理对象
// 秒杀优惠券订单
public Result seckillVoucher(Long voucherId) {
// 1.根据id查询优惠券
SeckillVoucher seckillVoucher = seckillVoucherService.getById(voucherId);
// 2.判断秒杀是否开始
if (seckillVoucher.getBeginTime().isAfter(LocalDateTime.now())) {
// 秒杀未开始
return Result.fail("秒杀尚未开始");
}
// 3.判断秒杀是否结束
if (seckillVoucher.getEndTime().isBefore(LocalDateTime.now())) {
// 秒杀已经结束
return Result.fail("秒杀已经结束");
}
// 4.判断库存是否充足
if (seckillVoucher.getStock() < 1) {
// 库存不足
return Result.fail("库存不足");
}
Long userId = UserHolder.getUser().getId();
// 确保当用户id一样时,锁就会一样
synchronized (userId.toString().intern()) {
// createVoucherOrder不具有事务功能,需要获得当前对象的代理对象
IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
return proxy.createVoucherOrder(voucherId);
}
}
@Transactional
public Result createVoucherOrder(Long voucherId) {
Long userId = UserHolder.getUser().getId();
//查询用户是否已经购买过了
int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
if (count > 0) {
return Result.fail("您已经购买过了!");
}
// 6.扣减库存
boolean update = seckillVoucherService
.update()
.setSql("stock = stock -1")
.eq("voucher_id", voucherId)
.gt("stock", 0).update();
if (!update) {
return Result.fail("库存不足!");
}
// 7.创建订单
VoucherOrder voucherOrder = new VoucherOrder();
// 订单id
long orderId = redisIdWorker.nextId("order");
voucherOrder.setId(orderId);
// 用户id
voucherOrder.setUserId(userId);
// 代金券id
voucherOrder.setVoucherId(voucherId);
save(voucherOrder);
// 8.返回订单id
return Result.ok(orderId);
}
使用悲观锁解决一人一单问题时时采用synchronize(同步锁)的方式来实现,但是在集群部署的模式下并不能解决多线程并发的安全性问题。所以可以采用Redis中的setnx在集群当中充当锁监视器,实现在多个服务器当中只有一个锁。
public class SimpleRedisLock implements ILock{
private StringRedisTemplate stringRedisTemplate;
private String name;
public SimpleRedisLock(StringRedisTemplate stringRedisTemplate, String name) {
this.stringRedisTemplate = stringRedisTemplate;
this.name = name;
}
private static final String ID_PREFIX= UUID.randomUUID().toString(true)+"-";
private static final String KEY_PREFIX="lock:";
private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;
static {
UNLOCK_SCRIPT=new DefaultRedisScript<>();
UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));
UNLOCK_SCRIPT.setResultType(Long.class);
}
@Override
public boolean tryLock(long timeoutSec) {
String threadId=ID_PREFIX+Thread.currentThread().getId();
Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, threadId, timeoutSec, TimeUnit.SECONDS);
return Boolean.TRUE.equals(success);
}
/*@Override
public void unlock() {
stringRedisTemplate.execute(
UNLOCK_SCRIPT,
Collections.singletonList(KEY_PREFIX+name),
ID_PREFIX+Thread.currentThread().getId());
}
*/
public void unlock() {
String threadId=ID_PREFIX+Thread.currentThread().getId();
String id = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name);
if(threadId.equals(id)) {
stringRedisTemplate.delete(KEY_PREFIX + name);
}
}
}
public Result seckillVoucher(Long voucherId) {
// 1.根据id查询优惠券
SeckillVoucher seckillVoucher = seckillVoucherService.getById(voucherId);
// 2.判断秒杀是否开始
if (seckillVoucher.getBeginTime().isAfter(LocalDateTime.now())) {
// 秒杀未开始
return Result.fail("秒杀尚未开始");
}
// 3.判断秒杀是否结束
if (seckillVoucher.getEndTime().isBefore(LocalDateTime.now())) {
// 秒杀已经结束
return Result.fail("秒杀已经结束");
}
// 4.判断库存是否充足
if (seckillVoucher.getStock() < 1) {
// 库存不足
return Result.fail("库存不足");
}
Long userId = UserHolder.getUser().getId();
// 创建分布式锁对象
SimpleRedisLock distriLock = new SimpleRedisLock( stringRedisTemplate,"order:" + userId);
boolean isLock = distriLock.tryLock(1200L);
// 判断是否获取锁成功
if (!isLock) {
// 获取锁失败
return Result.fail("不允许重复下单");
}
// 获取锁成功
// createVoucherOrder不具有事务功能,需要获得当前对象的代理对象
try {
IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
return proxy.createVoucherOrder(voucherId);
} finally {
distriLock.unlock();
}
}
// 扣减库存、创建订单
@Transactional
public Result createVoucherOrder(Long voucherId) {
Long userId = UserHolder.getUser().getId();
//查询用户是否已经购买过了
int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
if (count > 0) {
return Result.fail("您已经购买过了!");
}
// 6.扣减库存
boolean update = seckillVoucherService
.update()
.setSql("stock = stock -1")
.eq("voucher_id", voucherId)
.gt("stock", 0).update();
if (!update) {
return Result.fail("库存不足!");
}
// 7.创建订单
VoucherOrder voucherOrder = new VoucherOrder();
// 订单id
long orderId = redisIdWorker.nextId("order");
voucherOrder.setId(orderId);
// 用户id
voucherOrder.setUserId(userId);
// 代金券id
voucherOrder.setVoucherId(voucherId);
save(voucherOrder);
// 8.返回订单id
return Result.ok(orderId);
}
为了防止因为线程阻塞而导致的分布式锁误删问题,在线程获取分布式锁的时候,向缓存中添加分布式锁的标识。当线程要释放锁的时候,查询缓存中的分布式锁的标识是否和自己的相同,相同的话就释放锁,不同的话就不做操作。
if(redis.call('get',KEYS[1])==ARGV[1]) then
return redis.call('del',KEYS[1])
end
return 0
public class SimpleRedisLock implements ILock{
private StringRedisTemplate stringRedisTemplate;
private String name;
public SimpleRedisLock(StringRedisTemplate stringRedisTemplate, String name) {
this.stringRedisTemplate = stringRedisTemplate;
this.name = name;
}
private static final String ID_PREFIX= UUID.randomUUID().toString(true)+"-";
private static final String KEY_PREFIX="lock:";
private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;
static {
UNLOCK_SCRIPT=new DefaultRedisScript<>();
UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));
UNLOCK_SCRIPT.setResultType(Long.class);
}
@Override
public boolean tryLock(long timeoutSec) {
String threadId=ID_PREFIX+Thread.currentThread().getId();
Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, threadId, timeoutSec, TimeUnit.SECONDS);
return Boolean.TRUE.equals(success);
}
@Override
public void unlock() {
stringRedisTemplate.execute(
UNLOCK_SCRIPT,
Collections.singletonList(KEY_PREFIX+name),
ID_PREFIX+Thread.currentThread().getId());
}
/*public void unlock() {
String threadId=ID_PREFIX+Thread.currentThread().getId();
String id = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name);
if(threadId.equals(id)) {
stringRedisTemplate.delete(KEY_PREFIX + name);
}
}*/
}
Redisson是一个在Redis的基础上实现的Java驻内存数据网格。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务。其中就包含了各种分布式锁的实现。
public Result seckillVoucher(Long voucherId) {
// 1.根据id查询优惠券
SeckillVoucher seckillVoucher = seckillVoucherService.getById(voucherId);
// 2.判断秒杀是否开始
if (seckillVoucher.getBeginTime().isAfter(LocalDateTime.now())) {
// 秒杀未开始
return Result.fail("秒杀尚未开始");
}
// 3.判断秒杀是否结束
if (seckillVoucher.getEndTime().isBefore(LocalDateTime.now())) {
// 秒杀已经结束
return Result.fail("秒杀已经结束");
}
// 4.判断库存是否充足
if (seckillVoucher.getStock() < 1) {
// 库存不足
return Result.fail("库存不足");
}
Long userId = UserHolder.getUser().getId();
RLock lock = redissonClient.getLock("lock:order:" + userId);
boolean isLock = lock.tryLock();
// 判断是否获取锁成功
if (!isLock) {
// 获取锁失败
return Result.fail("不允许重复下单");
}
// 获取锁成功,创建订单
try {
return createVoucherOrder(voucherId);
} finally {
lock.unlock();
}
}
// 扣减库存、创建订单
@Transactional
public Result createVoucherOrder(Long voucherId) {
Long userId = UserHolder.getUser().getId();
//查询用户是否已经购买过了
int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
if (count > 0) {
return Result.fail("您已经购买过了!");
}
// 6.扣减库存
boolean update = seckillVoucherService
.update()
.setSql("stock = stock -1")
.eq("voucher_id", voucherId)
.gt("stock", 0).update();
if (!update) {
return Result.fail("库存不足!");
}
// 7.创建订单
VoucherOrder voucherOrder = new VoucherOrder();
// 订单id
long orderId = redisIdWorker.nextId("order");
voucherOrder.setId(orderId);
// 用户id
voucherOrder.setUserId(userId);
// 代金券id
voucherOrder.setVoucherId(voucherId);
save(voucherOrder);
// 8.返回订单id
return Result.ok(orderId);
}
问题描述:在之前的秒杀业务中,客户端向Nginx代理服务器发送请求,Nginx做负载代理到Tomcat服务器,整个业务流程中,查询优惠券、查询订单、减库存、创建订单都是操作数据库来完成的。对数据库做太多的读写操作的话整个业务耗时就会很长,并发能力就会很差。
采用异步操作进行优化:
为了保证判断用户是否有购买资格的业务的原子性,需要使用Lua脚本执行业务。如果用户没有购买资格,就直接返回异常。如果有购买资格,完成将优惠券、用户、订单id写入阻塞队列,等待数据库完成异步下单操作。
需求:
@SpringBootTest
class HmDianPingApplicationTests {
@Resource
private ShopServiceImpl shopService;
@Resource
private StringRedisTemplate stringRedisTemplate;
@Test
void testSaveShop(){
shopService.saveShop2Redis(1L,10L);
}
@Resource
private VoucherServiceImpl voucherService;
@Test
void add(){
Voucher voucher = new Voucher();
voucher.setShopId(1L);
voucher.setTitle("200元代金券");
voucher.setSubTitle("周一至周五均可使用")
.setRules("全场通用\\n无需预约\\n可无限叠加\\n不兑现、不找零\\n仅限堂食")
.setPayValue(8000L)
.setActualValue(10000L)
.setType(1)
.setStock(100)
.setBeginTime(LocalDateTime.of(2022,10,10,0,0,0))
.setEndTime(LocalDateTime.of(2022,11,29,0,0,0));
voucherService.addSeckillVoucher(voucher);
}
@Test
void loadShopDats(){
//1.查询店铺信息
List<Shop> list = shopService.list();
//2.把店铺分组,按照typeId分组,typeId一致放到一个集合
Map<Long, List<Shop>> map = list.stream().collect(Collectors.groupingBy(Shop::getTypeId));
//3.分批完成写入Redis
for (Map.Entry<Long, List<Shop>> entry : map.entrySet()) {
Long typeId= entry.getKey();
String key=SHOP_GEO_KEY+typeId;
//3.2.获取同类型的店铺的集合
List<Shop> value = entry.getValue();
List<RedisGeoCommands.GeoLocation<String>> locations = new ArrayList<>(value.size());
//3.3.写入redis
for (Shop shop : value) {
locations.add(new RedisGeoCommands.GeoLocation<>(
shop.getId().toString(),
new Point(shop.getX(),shop.getY())
));
}
stringRedisTemplate.opsForGeo().add(key,locations);
}
}
}
@Override
@Transactional
public void addSeckillVoucher(Voucher voucher) {
// 保存优惠券
save(voucher);
// 保存秒杀信息
SeckillVoucher seckillVoucher = new SeckillVoucher();
seckillVoucher.setVoucherId(voucher.getId());
seckillVoucher.setStock(voucher.getStock());
seckillVoucher.setBeginTime(voucher.getBeginTime());
seckillVoucher.setEndTime(voucher.getEndTime());
seckillVoucherService.save(seckillVoucher);
//秒杀到库存
stringRedisTemplate.opsForValue().set(SECKILL_STOCK_KEY+voucher.getId(),voucher.getStock().toString());
}
--1.参数列表
--1.1.优惠券id
local voucherId =ARGV[1]
--1.2。用户id
local userId = ARGV[2]
--1.3.订单id
local orderId=ARGV[3]
--2.数据key
--2.1.库存key
local stockKey = 'seckill:stock:' .. voucherId
--2.2.订单key
local orderKey = 'seckill:order:' .. voucherId
--3.脚本业务
--3.1.判断库存是否充足get stockKey
if( tonumber (redis.call('get' , stockKey)) <= 0) then
--3.2库存不足,返回1
return 1
end
--3.2.判断用户是否下单 sismember orderKey userId
if(redis.call('sismember', orderKey,userId) == 1) then
--3.3.存在。说明是重复下单。返回2
return 2
end
--3.4.扣库存 incrby stockKey -1
redis.call('incrby' , stockKey , -1)
--3.5.下单保存用户sadd orderKey userId
redis.call('sadd', orderKey,userId )
--3.6 发送消息到队列当中XADD stream.orders * k1 v1 k2 v2..
redis.call('xadd', 'stream.orders' , '*' , 'userId', userId ,'voucherId' ,voucherId,'id', orderId)
return 0
1.导入pow文件和配置application.yml文件
<dependency>
<groupId>org.springframework.bootgroupId>
<artifactId>spring-boot-starter-amqpartifactId>
dependency>
spring:
# RabbitMQ
rabbitmq:
host: 192.168.232.5
username: guest
password: guest
virtual-host: /
port: 5672
listener:
simple:
# 消费者最小数量
concurrency: 10
# 消费者最大数量
max-concurrency: 10
# 限制消费者每次处理消息的数量
prefetch: 1
template:
retry:
# 发布重试
enabled: true
2.配置RabbitConfig类:创建交换机和队列(通过路由key)
@Configuration
public class RabbitMqConfig {
//seckill
private static final String QUEUE = "seckillQueue";
private static final String EXCHANGE = "seckillExchange";
@Bean
public Queue queue(){
return new Queue(QUEUE);
}
@Bean
public TopicExchange topicExchange(){
return new TopicExchange(EXCHANGE);
}
@Bean
public Binding binding(){
return BindingBuilder.bind(queue()).to(topicExchange()).with("seckill.#");
}
}
3.在业务中加入RabbitMq逻辑
/**
* 秒杀优惠券订单(消息队列异步——RabbitMq)
* @param voucherId
* @return
*/
public Result seckillVoucher(Long voucherId) {
Long userId = UserHolder.getUser().getId();
//执行lua脚本
Long result = stringRedisTemplate.execute(
SECKILL_SCRIPT,
Collections.emptyList(),
voucherId.toString(),
userId.toString());
//判断是否为0
int r = result.intValue();
if(r != 0){
//不为0,代表没有购买资格
return Result.fail(r == 1? "库存不足" : "不能重复下单");
}
//2.2 为0,有购买资格,把下单信息保存到消息队列
long orderId = redisIdWorker.nextId("order");
//订单信息
VoucherOrder voucherOrder = new VoucherOrder();
voucherOrder.setId(orderId);
voucherOrder.setUserId(userId);
voucherOrder.setVoucherId(voucherId);
//添加到消息队列中
mqSender.sendSeckillMessage(JsonUtil.object2JsonStr(voucherOrder));
return Result.ok(orderId);
}
@Service
@Slf4j
public class MqSender {
@Autowired
private RabbitTemplate rabbitTemplate;
//发送秒杀信息
public void sendSeckillMessage(String msg){
log.info("发送消息:" + msg);
rabbitTemplate.convertAndSend("seckillExchange", "seckill.message", msg);
}
}
@Service
@Slf4j
public class MQReceiver {
@Autowired
private VoucherOrderServiceImpl voucherOrderService;
@Autowired
private RedisTemplate redisTemplate;
@RabbitListener(queues = "seckillQueue")
public void receive(String msg){
log.info("接收消息:" + msg);
VoucherOrder voucherOrder = JsonUtil.jsonStr2Object(msg, VoucherOrder.class);
voucherOrderService.createVoucherOrder(voucherOrder);
}
}