在redis-cli中执行如下指令
XGROUP CREATE key groupName ID [MKSTREAM]
key:队列名称
groupName:消费者组名称
ID:起始ID标示,$代表队列中最后一个消息,0代表队列中第一个消息
MKSTREAM:队列不存在时自动创建队列
示例:
XGROUP CREATE streams.orders g1 0 MKSTREAM
-- lua脚本中其他事项处理部分
-- 获取调用的参数列表
-- 优惠卷id
local voucherId = ARGV[1]
-- 用户id
local userId = ARGV[2]
-- 订单id
local orderId = ARGV[3]
-- key
-- 库存key
local stockKey = 'seckill:stock:' .. voucherId
-- 订单key
local orderKey = 'seckill:order:' .. voucherId
-- 业务
-- 判断库存是否充足
if(tonumber(redis.call('get', stockKey)) <= 0) then
-- 库存不足
return 1
end
-- 判断用户是否已经下单 SISMEMBER orderKey userId
if(redis.call('sismember', orderKey, userId) == 1) then
-- 存在说明重复下单
return 2
end
-- 扣库存,下单
redis.call('incrby', stockKey, -1)
redis.call('sadd', orderKey, userId)
-- 发消息到队列, XADD stream.orders * k1 v1 k2 v2 ...
redis.call('xadd', 'stream.orders', '*', 'userId', userId, 'voucherId', voucherId, 'id', orderId)
return 0
private static final DefaultRedisScript<Long> SECKILL_SCRIPT;
static {
SECKILL_SCRIPT = new DefaultRedisScript<>();
// 从resources目录下加载脚本
SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));
// lua脚本执行返回值
SECKILL_SCRIPT.setResultType(Long.class);
}
@Override
public Result seckillVoucher(Long voucherId) {
// 获取用户
Long userId = UserHolder.getUser().getId();
// 订单Id
long orderId = redisIdWorker.nextId("order");
// 执行lua脚本
int result = stringRedisTemplate.execute(
SECKILL_SCRIPT,
Collections.emptyList(),
voucherId.toString(),
userId.toString(),
String.valueOf(orderId)
).intValue();
// 判断结果为0
if (result != 0) {
// 不为0,没有购买资格
return Result.fail(result == 1 ? "库存不足" : "不能重复下单");
}
// 获取代理对象(事务)
proxy = (IVoucherOrderService) AopContext.currentProxy();
// 返回订单信息
return Result.ok(orderId);
}
// 线程池
private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();
// 注解含义,在Bean被创建完毕后执行
@PostConstruct
private void init() {
//
SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
}
// 从消息队列中获取消息,异步下单
private class VoucherOrderHandler implements Runnable {
String queueName = "stream.orders";
@Override
public void run() {
while (true) {
try {
// 获取消息队列中的订单信息 XREADGROUP GROUP g1 c1 count 1 block 2000 STREAMS stream.order >
List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
Consumer.from("g1", "c1"),
StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),
StreamOffset.create(queueName, ReadOffset.lastConsumed())
);
// 判断消息是否获取成功
if (list == null || list.isEmpty()) {
// 如果获取失败,说明没有消息,继续下一次循环
continue;
}
// 解析消息中的订单消息
MapRecord<String, Object, Object> record = list.get(0);
Map<Object, Object> values = record.getValue();
VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true);
// 如果获取成功,执行下单
handleVoucherOrder(voucherOrder);
// ACK确认,SACK stream.orders g1 id
stringRedisTemplate.opsForStream().acknowledge(queueName, "g1", record.getId());
} catch (Exception e) {
log.error("订单处理异常", e);
// 发生异常后去pending-list中处理消息
handlePendingList();
}
}
}
private void handlePendingList() {
while (true) {
try {
// 获取pending-list中的订单信息 XREADGROUP GROUP g1 c1 count 1 STREAMS stream.order 0
List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
Consumer.from("g1", "c1"),
StreamReadOptions.empty().count(1),
StreamOffset.create(queueName, ReadOffset.from("0"))
);
// 判断消息是否获取成功
if (list == null || list.isEmpty()) {
// 如果获取失败,说明pending-list没有异常消息,结束循环
break;
}
// 解析消息中的订单消息
MapRecord<String, Object, Object> record = list.get(0);
Map<Object, Object> values = record.getValue();
VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true);
// 如果获取成功,执行下单
handleVoucherOrder(voucherOrder);
// ACK确认,SACK stream.orders g1 id
stringRedisTemplate.opsForStream().acknowledge(queueName, "g1", record.getId());
} catch (Exception e) {
log.error("处理pending-list订单处理异常", e);
}
}
}
}