延时任务,可以理解为定时任务的一种,但是他们是有区别的。
延时任务:将程序代码延时执行,执行完毕,即为结束。
定时任务:周期性执行任务。代码执行完毕后,并不意味着结束,会根据定时的周期时间,继续下次的执行。
之前做过一期的博客:Redis做定时任务
但是是有错误的。我把redis
的那个空间变动通知当成定时了!其实他应该算是延时。。。。
那如何做呢?
新的项目新的需求,一个物联网那个项目。设备爆出警报来后,用户可以设置忽略误报,在多少时间内,不在提示这个错误。
对应到数据上,就是报警数据的[字段:处理状态]
修改为已处理
,就查不到报警信息了。但是忽略时间一到,状态会自动改回未处理
刚开始,我是想使用Redis来做的,但是想使用redis,就必须修改配置文件,服务器不在我们手里,修改不了,这个方案被pass掉了。
然后,我使用项目中自带的任务调度器xxl-job
(开源的任务调度器,跟Quartz
一样的东西),自己计算cron
表达式,添加任务,启动任务,然后执行修改方法,关闭任务。但是么,最后。。。被领导臭骂了一顿。说是用延时队列来做。。。
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.jetbrains.annotations.NotNull;
import java.util.List;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
@Data
@NoArgsConstructor
@AllArgsConstructor
public class DelayedAlarm implements Delayed {
/**
* 过期时间。这个时间一定是一个Date类型转成的
*/
private Long expireTime;
/**
* 集合ID
*/
private List<String> alarmIds;
/**
* 是否过期。小于等于0的,表示过期,大于0的,表示未过期,其差值表示还有多少时间过期。
* 这个我踩了一个大坑。我写的代码为啥就是不会延时执行?
* 因为这个方法表示还有多少时间过期,一定是过期时间减去当前时间,还剩下多少时间。
* 而我直接unit.convert(expireTime,TimeUnit.MILLISECONDS)了,
* 因为只是做了一下时间的转换,每次java调用这个方法判断还有多长时间过期,一直是这个数,所以他就一直延时。。。。
* @param unit 时间的单位
* @return 返回排序结果
*/
@Override
public long getDelay(@NotNull TimeUnit unit) {
return unit.convert(expireTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
/**
* 排序方法。用于排序,因为放进来的对象,根据延时时间的大小,不一定是排在后面的,,有可能是排在前面的。
* @param o 刚加入对象
* @return 返回排序结果
*/
@Override
public int compareTo(@NotNull Delayed o) {
return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
}
}
//省略不重要的导入
public class AlarmServiceImpl {
//省略其他不重要的注入。
private final DelayQueue<DelayedAlarm> DELAY_QUEUE = new DelayQueue<>();
@PostConstruct
public void updateAlarmStatusQueue() {
//因为是本地缓存的队列,重启服务会丢失,需要重新查库,重新添加队列
List<AlarmInfo> list = alarmInfoMapper.getIgnoreAlarmList();
list.forEach(entity -> {
long timeDiff = System.currentTimeMillis() - entity.getAlarmStartTime().getTime();
long expireTime = entity.getIgnoreTime() * 1000L;
if (expireTime > timeDiff) {
//当前忽略时间>时间差
expireTime = expireTime - timeDiff;
} else {
expireTime = 0;
}
DELAY_QUEUE.put(new DelayedAlarm(expireTime, Arrays.asList(entity.getId())));
});
//这个地方。等价于 ThreadPoolExecutor executor=Executors.newSingleThreadExecutor()。
//阿里巴巴代码扫描,总是飘黄,我给改了一下。。
ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
new DefaultThreadFactory("alarm_delaye_queue"));
executor.execute(new Thread(() -> {
while (true) {
try {
DelayedAlarm take = DELAY_QUEUE.take();
//获取id集合,执行业务逻辑
this.updateAlarmStatus(take.getAlarmIds());
log.info("延时执行队列成功");
} catch (Exception e) {
//注意:一定要捕获异常,否则出现异常while循环就结束了。
log.error("延时执行队列失败", e);
}
}
}));
}
/**
* 忽略误报时间到了,自动修改警报信息状态:已处理-->未处理
* @param alarmIds 警报信息ID 集合
* @return
*/
public void updateAlarmStatus(List<String> alarmIds) {
List<AlarmInfo> list = new ArrayList<>();
alarmIds.forEach(id -> {
//开始更新警报信息状态
list.add(new AlarmInfo()
.setId(id)
.setStatus(UN_DISPOSED.getType())
.setLastModifyUserId(userUtil.getUserId()));
});
//集成了mybaits-plus 插件,我代码给删除了,大家用的时候需要自己写Mapper
this.updateBatchById(list);
log.info("延时执行修改报警信息状态成功:{}",JSONObject.toJSONString(alarmIds));
}
/**
* 将当前的忽略误报信息添加延时队列
* @param ids 警报信息ID 集合
* @param expireTime 过期时间
*/
public void handleIgnoreAlarm(List<String> ids,Date expireTime) {
//省略不重要的业务。。。。
//添加队列
DELAY_QUEUE.put(new DelayedAlarm(expireTime.getTime(), ids));
}
}
最后看一下执行结果