如上图,其实它属于观察者模式的实现主要包括四个内容
该类注入了ApplicationEventPublisher,当事件生产出来之后调用applicationEventPublisher.publishEvent提交事件到事件管理者ApplicationEvent,由他去触发ApplicationListener通知订阅者
@Service
public class EventDtoPush {
@Autowired
private ApplicationEventPublisher applicationEventPublisher;
public void push(String message) {
applicationEventPublisher.publishEvent(new EventDto(message));
}
}
他使用注解的形式去ApplicationListener订阅类型为EventDto类型的消息通知,每当ApplicationListener收到了消息通知就会去执行EventDtoListener.listener方法
@Component
@Slf4j
@EnableAsync
public class EventDtoListener {
@Async("asyncServiceExecutor")
@EventListener
public void listener(EventDto eventDto) {
System.out.println(eventDto.getMessage());
}
}
抽象类,继承了JDK的EventObject接口,用于包装事件,也可以说他就是事件,它本身带有一个source和一个时间戳
实现了JDK的EventListener接口,起到监听器的作用,用于事件的通知,让观察者执行逻辑。
public class EventDto extends ApplicationEvent {
private String message;
public EventDto( String message) {
// ApplicationEvent有一个spurce变量用于存储数据
super("");
this.message = message;
}
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
}
for (int i = 0; i < 100000; i++) {
eventPush.push("第" + i + "条消息");
}
@Configuration
@Slf4j
public class SchedulerConfiguration {
/**
* 使用Task自定义线程池,
* 禁止使用spring自带的线程
*
* @return
*/
@Bean
public TaskScheduler taskScheduler() {
ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
taskScheduler.setPoolSize(32);
return taskScheduler;
}
/**
* 使用异步自定义线程池,
* 禁止使用spring自带的线程
*
* @return
*/
@Bean
public Executor asyncServiceExecutor() {
log.info("start executor -->");
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 核心线程数,核心线程会一直存活,即使没有任务需要处理.当线程数小于核心线程数时,
// 即使现有的线程空闲,线程池也会优先创建新线程来处理任务,而不是直接交给现有的线程处理;
// 核心线程在allowCoreThreadTimeout被设置为true时会超时退出,默认情况下不会退出..
executor.setCorePoolSize(16);
// 当线程数大于或等于核心线程,且任务队列已满时,线程池会创建新的线程,直到线程数量达到maxPoolSize.
// 如果线程数已等于maxPoolSize,且任务队列已满,则已超出线程池的处理能力,线程池会拒绝处理任务而抛出异常;
executor.setMaxPoolSize(20);
// 任务队列容量.从maxPoolSize的描述上可以看出,任务队列的容量会影响到线程的变化,
// 因此任务队列的长度也需要恰当的设置.我们中给了10000,相当于就是没有上限了....
executor.setQueueCapacity(600);
//配置线程池的前缀 调试的时候显示的名字
executor.setThreadNamePrefix("async-thread-service-");
//策略
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
// 当线程空闲时间达到keepAliveTime,该线程会退出,直到线程数量等于corePoolSize,
executor.setKeepAliveSeconds(60);
//进行加载
executor.initialize();
return executor;
}
}
前边的代码有写到,在class类上增加@EnableAsync异步注解开启异步执行,并且在方法上指定使用哪个线程池并且开启异步 @Async(“asyncServiceExecutor”)