@Data
public class Message implements Delayed {
private Map<String,Object> msgBody=new HashMap<>(); //消息内容
private long excuteTime;//执行时间
private String type;
public Message(long delayTime,String type) {
this.excuteTime = TimeUnit.NANOSECONDS.convert(delayTime, TimeUnit.MILLISECONDS) + System.nanoTime();
this.type=type;
}
@Override
public int compareTo(Delayed delayed) {
long d = (getDelay(TimeUnit.NANOSECONDS) - delayed.getDelay(TimeUnit.NANOSECONDS));
return (d == 0) ? 0 : ((d < 0) ? -1 : 1);
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(this.excuteTime - System.nanoTime(), TimeUnit.NANOSECONDS);
}
}
public class DelayQueueManager {
private static DelayQueue<Message> delayQueue=null;
static {
// 创建延时队列
delayQueue = new DelayQueue<Message>();
}
public static DelayQueue<Message> getDelayQueue(){
return delayQueue;
}
}
public class Consumer implements Runnable{
private DelayQueue<Message> queue;
public static boolean isRun=false;
public Consumer(DelayQueue<Message> queue){this.queue=queue;}
@SneakyThrows
@Override
public void run() {
isRun=true;
while (true) {
Message take = queue.take();
System.out.println("消息类型:" + take.getType()+"-->"+ LocalTime.now()+"-->start");
Map<String,Object> map=take.getMsgBody();
System.out.println("消息内容: "+map.toString());
System.out.println("消息类型:" + take.getType()+"-->"+ LocalTime.now()+"-->end");
}
}
}
public class Demo {
public static void main(String[] args) {
// 添加延时消息,msg 延时5s= 5*1000
Message msg = new Message( 5000,"延时任务1");
msg.getMsgBody().put("name","大黄");
msg.getMsgBody().put("color","黄");
msg.getMsgBody().put("age",2);
msg.getMsgBody().put("food","鸡腿");
DelayQueueManager.getDelayQueue().offer(msg);
if(!Consumer.isRun){
// 启动消费线程
new Thread(new Consumer(DelayQueueManager.getDelayQueue())).start();
}
}
}
消息类型:延时任务1-->11:08:29.557-->start
消息内容: {color=黄, name=大黄, age=2, food=鸡腿}
消息类型:延时任务1-->11:08:29.558-->end
注意:
生产者在使用延时队列的时候需要将消息内容通过Map或者实体类传进去,我觉得用Map好一点方便扩充消息内容。
map和实体类之间相互转换的依赖
<dependency>
<groupId>com.alibabagroupId>
<artifactId>fastjsonartifactId>
<version>1.2.54version>
dependency>
JSON.parseObject(JSON.toJSONString(源数据), 转换后数据类型.class);
// 将 Map 转换为 实体类
User user = JSON.parseObject(JSON.toJSONString(user01), User.class);
// 将 实体类 转换为 Map
Map map = JSON.parseObject(JSON.toJSONString(user), Map.class);
@Data
public class DelayDog {
private String name ;
private String color ;
private Integer age ;
private String food ;
}
public interface DelayService {
public void dogRun(DelayDog delayDog);
}
@Service
public class DelayServiceImpl implements DelayService {
@Override
public void dogRun(DelayDog delayDog) {
System.out.println("限时任务开始执行!");
System.out.println(delayDog.getName()+"喜欢吃"+delayDog.getFood());
System.out.println("限时任务结束!");
}
}
注意: 多线程为了线程安全会防止注入,因此在想使用service业务类时,需要使用ApplicationContext的方式获取bean的方法获取service类。不然则会导致我们的消费者在run方法里面调用service层业务时会报空指针异常。这是由于spring注入的业务类为null,或者直接new的业务对象也为null。
我们需要获取ApplicationContext的类要实现ApplicationContextAware接口,如下:
@Component
public class ApplicationContextUtil implements ApplicationContextAware {
private static ApplicationContext context;
@Override
public void setApplicationContext(ApplicationContext context) throws BeansException {
this.context = context;
}
public static ApplicationContext getContext() {
return context;
}
}
public class Consumer implements Runnable{
DelayService delayService= ApplicationContextUtil.getContext().getBean(DelayService.class);
private DelayQueue<Message> queue;
public static boolean isRun=false;
public Consumer(DelayQueue<Message> queue){this.queue=queue;}
@SneakyThrows
@Override
public void run() {
isRun=true;
while (true) {
Message take = queue.take();
System.out.println("消息类型:" + take.getType()+"-->"+ LocalTime.now()+"-->start");
Map<String,Object> map=take.getMsgBody();
System.out.println("消息内容: "+map.toString());
DelayDog delayDog = JSON.parseObject(JSON.toJSONString(map), DelayDog.class);
delayService.dogRun(delayDog);
System.out.println("消息类型:" + take.getType()+"-->"+ LocalTime.now()+"-->end");
}
}
}
public interface DemoDelayService {
public void demoDelay();
}
@Service
public class DemoDelayServiceImpl implements DemoDelayService{
@Override
public void demoDelay() {
DelayDog delayDog = new DelayDog();
delayDog.setName("小黑");
delayDog.setColor("黑色");
delayDog.setAge(3);
delayDog.setFood("鸭腿");
// HashMap msgMap = new HashMap<>();
// BeanUtils.copyProperties(delayDog,msgMap);
HashMap<String, Object> msgMap = JSON.parseObject(JSON.toJSONString(delayDog), HashMap.class);
System.out.println("map内容: "+msgMap.toString());
Integer delayTime=5000;
// 添加延时消息,msg 延时5s= 5*1000
Message msg = new Message( delayTime,"延时任务1");
msg.setMsgBody(msgMap);
DelayQueueManager.getDelayQueue().offer(msg);
if(!Consumer.isRun){
// 启动消费线程
System.out.println("延时任务加入队列 "+ LocalTime.now());
new Thread(new Consumer(DelayQueueManager.getDelayQueue())).start();
}
}
}
map内容: {color=黑色, name=小黑, age=3, food=鸭腿}
延时任务加入队列 12:11:09.644
消息类型:延时任务1-->12:08:03.581-->start
消息内容: {color=黑色, name=小黑, age=3, food=鸭腿}
限时任务开始执行!
小黑喜欢吃鸭腿
限时任务结束!
消息类型:延时任务1-->12:08:03.585-->end