当然在写一个项目的时候肯定是会造一些工具类的轮子的,在 Redis
延迟队列中,涉及到的工具如下:
睡眠工具是根据搬运线程每次搬运之后返回剩余的队列中最小的分数,因为在 zset
中分数是用时间戳,所以拿到最小的时间戳就可以计算出本次睡眠的时间:
/**
* 根据搬运的结果中返回队列中剩余的最小分数,通过分数计算出本次睡眠的时间
*
* @author zyred
* @since v 0.5 新建
* @since v 1.2 优化 v 0.5 版本中根据剩余数量计算睡眠时间的方式
*/
public class SmartSleepUtil {
/**
* @update v 1.2 没有值的情况,睡眠 5 分钟
*/
private static final int MAX_SLEEP_SEC = 5 * 60 * 1000;
/**
* 这里保存当前线程的目的,是为了任何时候需要打断搬运线程睡眠都是可以的,并且是立即打断
*/
private volatile static Thread transferThread = null;
/**
* 立即唤醒,起来上班.......
* 该方法存在多个线程同时调用的情况,所以需要加锁,且 transferThread 要内存可见
*/
public static synchronized void immediatelyArouse () {
if (Objects.nonNull(transferThread)) {
LockSupport.unpark(transferThread);
transferThread = null;
}
}
/**
* 搬运线程睡眠
*
* @param min timeout_table 列表内最小的一个时间
*/
public static void sleep (long min) {
transferThread = Thread.currentThread();
if (min == 0) {
LockSupport.parkNanos(MAX_SLEEP_SEC * Constant.nanos);
transferThread = null;
return;
}
if (System.currentTimeMillis() >= min) {
transferThread = null;
return;
}
long sleepTime = min - System.currentTimeMillis();
if (sleepTime == 0) {
transferThread = null;
return;
}
LockSupport.parkNanos(sleepTime * Constant.nanos);
transferThread = null;
}
}
spring boot starter
注解驱动/**
* 开启 spring 自动配置
*/
@Target(ElementType.TYPE)
@Import(RedisDelayQueueAutoConfiguration.class)
@Retention(RetentionPolicy.RUNTIME)
public @interface EnableDelayQueue {
}
@AllArgsConstructor
@EnableConfigurationProperties(RedisDelayQueueProperties.class)
public class RedisDelayQueueAutoConfiguration {
private Environment environment;
private RedisDelayQueueProperties properties;
/**
* spring 加载延迟队列核心 bean 后初始化隔离策列
* @since v 0.4
*/
@PostConstruct
public void parseServerPort () {
final String port = this.environment.getProperty(Constant.serverPort);
final String address = IpUtils.getAddress();
String isolation = address + Constant.underline + port;
isolation = DigestUtils.md5DigestAsHex(isolation.getBytes(StandardCharsets.UTF_8)).toUpperCase(Locale.ROOT);
isolation = isolation.substring(0, Constant.isolation);
this.properties.setIsolation(isolation);
}
@Bean
public DelayQueueContextFactory delayQueueCoreContext (RedissonClient client) {
return new DelayQueueContextFactory(this.properties, client);
}
@Bean
public RedissonClient redissonClient () {
Config config = this.properties.getConfig();
return Redisson.create(config);
}
@Bean
public ProviderDelayJob<QueueJob> providerDelayQueue (RedissonClient client) {
return this.delayQueueCoreContext(client).getProviderDelayQueue();
}
@Bean
public CrashHandlerContext<QueueJob> shutdownHandlerContext (DelayQueueContextFactory factory) {
return new CrashHandlerContext<>(factory, this.properties);
}
/**
* 广播
* @return 广播上下文工厂
* @since v 1.0
*/
@Bean
@ConditionalOnProperty(prefix = "delay.queue", name = "enable-radio", havingValue = "true")
public DelayRadioContextFactory radioQueueContextFactory (RedissonClient redissonClient, RedisDelayQueueProperties properties) {
return new DelayRadioContextFactory(redissonClient, properties);
}
}
lua
脚本lua 脚本只展示搬运脚本,其他脚本都非常简单
## 构造返回结果
local resultMap = {}
## 查看 zset 中剩余未搬运 TOPIC 数量
local topicCount = redis.call('ZCard', KEYS[2])
if (tonumber(topicCount) > 0)
then
## 根据当前传入的时间戳,拿到时间都小于当前的所有 topic
local member = redis.call('ZRangeByScore', KEYS[2], 0, tonumber(ARGV[1]))
if next(member) == nil
then
## 成功搬运的 TOPIC
table.insert(resultMap, '')
local min = redis.call('ZRangeByScore', KEYS[2], '-inf', '+inf', 'WithScores', 'limit', 0, 1)
## 剩余的 TOPIC 数量
table.insert(resultMap, min)
## 广播的 TOPIC
table.insert(resultMap, '')
return resultMap
else
## 准备变量,装填已经搬运的 topic,返回给 Java
local readyTopic = ''
## 准备变量,装填需要广播的 topic
local publishTopic = ''
## 便利读取出来的内容
for key, value in ipairs(member)
do
## 循环内,每个都要去验证以下时间是不是满足被消费的条件
local score = redis.call('ZScore', KEYS[2], value)
if tonumber(ARGV[1]) >= tonumber(score)
then
## 字符串切分,没啥好说的
local index = string.find(value, ':')
local subTopic = string.sub(value, 1, index - 1)
local topic = string.gsub(subTopic, '\"', '')
## 取值,看看在 TOPIC 上是否有广播的标识
local radioTag = string.find(topic, ARGV[3])
if radioTag == nil
then
## 这种写法是为了避免 阿里云 对 lua 脚本的限制,实际上
## Java 传入的参数中,没有数组下标为 4 的内容
KEYS[4] = KEYS[1] ..':'.. ARGV[2] ..':'..topic
## 将数据添加到 set 集合中
redis.call('SAdd', KEYS[4], value)
## 删除掉 zset 里面的内容
redis.call('ZRem', KEYS[2], value)
## 重新赋值,不然会内容叠加
KEYS[4] = ''
readyTopic = readyTopic .. ',' .. value
else
## 处理广播,只需要返回给 Java 即可,后续的逻辑由 Java 来发布
redis.call('ZRem', KEYS[2], value)
publishTopic = publishTopic .. ',' .. value
end
else
return nil;
end
end
## 重新读取一次剩余队列中最小的分数,提供给下次计算
local min = redis.call('ZRangeByScore', KEYS[2], '-inf', '+inf', 'WithScores', 'limit', 0, 1)
## 装填数据,返回
table.insert(resultMap, readyTopic)
table.insert(resultMap, min)
table.insert(resultMap, publishTopic)
return resultMap
end
return nil
end
return nil
说明:本文所写的项目是 无法开源
的,通过几篇文章的表述,已经将核心的代码与设计思想梳理清楚,如果疑问,可以联系博主一起优化和重新设计。
本项目的来源也是对公司的一个 2017 年的现有功能进行了全面优化和提升性能和重构,项目开发周期大概一个月时间,因为其中也遇到过一些棘手的问题需要解决,并且利用的是业余时间进行开发的,所以周期相对拉长了很多。
对我而言,开发这个项目确实有一定的提升,起码奠定了我对中间件的认知程度,让我对中间件开发有了更深入的兴趣。
完 …